diff --git a/src/backend/distributed/commands/create_distributed_table.c b/src/backend/distributed/commands/create_distributed_table.c index 28d6795f5..eb23c5c76 100644 --- a/src/backend/distributed/commands/create_distributed_table.c +++ b/src/backend/distributed/commands/create_distributed_table.c @@ -75,6 +75,10 @@ #include "utils/syscache.h" #include "utils/inval.h" +/* + * once every LOG_PER_TUPLE_AMOUNT, the copy will be logged. + */ +#define LOG_PER_TUPLE_AMOUNT 1000000 /* Replication model to use when creating distributed tables */ int ReplicationModel = REPLICATION_MODEL_COORDINATOR; @@ -107,6 +111,10 @@ static bool DistributionColumnUsesGeneratedStoredColumn(TupleDesc relationDesc, Var *distributionColumn); static bool RelationUsesHeapAccessMethodOrNone(Relation relation); static bool CanUseExclusiveConnections(Oid relationId, bool localTableEmpty); +static void DoCopyFromLocalTableIntoShards(Relation distributedRelation, + DestReceiver *copyDest, + TupleTableSlot *slot, + EState *estate); /* exports for SQL callable functions */ PG_FUNCTION_INFO_V1(master_create_distributed_table); @@ -1205,26 +1213,8 @@ RegularTable(Oid relationId) static void CopyLocalDataIntoShards(Oid distributedRelationId) { - DestReceiver *copyDest = NULL; - List *columnNameList = NIL; - Relation distributedRelation = NULL; - TupleDesc tupleDescriptor = NULL; - Var *partitionColumn = NULL; - int partitionColumnIndex = INVALID_PARTITION_COLUMN_INDEX; - bool stopOnFailure = true; - - EState *estate = NULL; -#if PG_VERSION_NUM >= PG_VERSION_12 - TableScanDesc scan = NULL; -#else - HeapScanDesc scan = NULL; -#endif - HeapTuple tuple = NULL; - MemoryContext oldContext = NULL; - uint64 rowsCopied = 0; - /* take an ExclusiveLock to block all operations except SELECT */ - distributedRelation = heap_open(distributedRelationId, ExclusiveLock); + Relation distributedRelation = heap_open(distributedRelationId, ExclusiveLock); /* * Skip copying from partitioned tables, we will copy the data from @@ -1249,24 +1239,27 @@ CopyLocalDataIntoShards(Oid distributedRelationId) PushActiveSnapshot(GetLatestSnapshot()); /* get the table columns */ - tupleDescriptor = RelationGetDescr(distributedRelation); + TupleDesc tupleDescriptor = RelationGetDescr(distributedRelation); TupleTableSlot *slot = MakeSingleTupleTableSlotCompat(tupleDescriptor, &TTSOpsHeapTuple); - columnNameList = TupleDescColumnNameList(tupleDescriptor); + List *columnNameList = TupleDescColumnNameList(tupleDescriptor); + + int partitionColumnIndex = INVALID_PARTITION_COLUMN_INDEX; /* determine the partition column in the tuple descriptor */ - partitionColumn = PartitionColumn(distributedRelationId, 0); + Var *partitionColumn = PartitionColumn(distributedRelationId, 0); if (partitionColumn != NULL) { partitionColumnIndex = partitionColumn->varattno - 1; } /* initialise per-tuple memory context */ - estate = CreateExecutorState(); + EState *estate = CreateExecutorState(); ExprContext *econtext = GetPerTupleExprContext(estate); econtext->ecxt_scantuple = slot; - copyDest = + bool stopOnFailure = true; + DestReceiver *copyDest = (DestReceiver *) CreateCitusCopyDestReceiver(distributedRelationId, columnNameList, partitionColumnIndex, @@ -1276,15 +1269,43 @@ CopyLocalDataIntoShards(Oid distributedRelationId) /* initialise state for writing to shards, we'll open connections on demand */ copyDest->rStartup(copyDest, 0, tupleDescriptor); + DoCopyFromLocalTableIntoShards(distributedRelation, copyDest, slot, estate); + + /* finish writing into the shards */ + copyDest->rShutdown(copyDest); + copyDest->rDestroy(copyDest); + + /* free memory and close the relation */ + ExecDropSingleTupleTableSlot(slot); + FreeExecutorState(estate); + heap_close(distributedRelation, NoLock); + + PopActiveSnapshot(); +} + + +/* + * DoCopyFromLocalTableIntoShards performs a copy operation + * from local tables into shards. + */ +static void +DoCopyFromLocalTableIntoShards(Relation distributedRelation, + DestReceiver *copyDest, + TupleTableSlot *slot, + EState *estate) +{ /* begin reading from local table */ #if PG_VERSION_NUM >= PG_VERSION_12 - scan = table_beginscan(distributedRelation, GetActiveSnapshot(), 0, NULL); + TableScanDesc scan = table_beginscan(distributedRelation, GetActiveSnapshot(), 0, + NULL); #else - scan = heap_beginscan(distributedRelation, GetActiveSnapshot(), 0, NULL); + HeapScanDesc scan = heap_beginscan(distributedRelation, GetActiveSnapshot(), 0, NULL); #endif - oldContext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); + MemoryContext oldContext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); + uint64 rowsCopied = 0; + HeapTuple tuple = NULL; while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL) { /* materialize tuple and send it to a shard */ @@ -1308,13 +1329,13 @@ CopyLocalDataIntoShards(Oid distributedRelationId) rowsCopied++; - if (rowsCopied % 1000000 == 0) + if (rowsCopied % LOG_PER_TUPLE_AMOUNT == 0) { ereport(DEBUG1, (errmsg("Copied " UINT64_FORMAT " rows", rowsCopied))); } } - if (rowsCopied % 1000000 != 0) + if (rowsCopied % LOG_PER_TUPLE_AMOUNT != 0) { ereport(DEBUG1, (errmsg("Copied " UINT64_FORMAT " rows", rowsCopied))); } @@ -1327,17 +1348,6 @@ CopyLocalDataIntoShards(Oid distributedRelationId) #else heap_endscan(scan); #endif - - /* finish writing into the shards */ - copyDest->rShutdown(copyDest); - copyDest->rDestroy(copyDest); - - /* free memory and close the relation */ - ExecDropSingleTupleTableSlot(slot); - FreeExecutorState(estate); - heap_close(distributedRelation, NoLock); - - PopActiveSnapshot(); }