diff --git a/src/backend/distributed/commands/multi_copy.c b/src/backend/distributed/commands/multi_copy.c index bf762cc7e..f7f0e31d9 100644 --- a/src/backend/distributed/commands/multi_copy.c +++ b/src/backend/distributed/commands/multi_copy.c @@ -8,9 +8,9 @@ * COPY ... FROM commands on distributed tables. CitusCopyFrom parses the input * from stdin, a program, or a file, and decides to copy new rows to existing * shards or new shards based on the partition method of the distributed table. - * If copy is run a worker node, CitusCopyFrom calls CopyFromWorkerNode which - * parses the master node copy options and handles communication with the master - * node. + * If copy is run a worker node, CitusCopyFrom calls LocalCopyFromWorkerNode + * which parses the master node copy options and handles communication with + * the master node. * * It opens a new connection for every shard placement and uses the PQputCopyData * function to copy the data. Because PQputCopyData transmits data, asynchronously, @@ -128,6 +128,15 @@ #include "utils/memutils.h" +/* struct type to use a local COPY .. FROM statement as a tuple source */ +typedef struct LocalCopyContext +{ + CopyStmt *copyStatement; + CopyState copyState; + EState *executorState; +} LocalCopyContext; + + /* constant used in binary protocol */ static const char BinarySignature[11] = "PGCOPY\n\377\r\n\0"; @@ -136,21 +145,28 @@ static PGconn *masterConnection = NULL; /* Local functions forward declarations */ -static void CopyFromWorkerNode(CopyStmt *copyStatement, char *completionTag); -static void CopyToExistingShards(CopyStmt *copyStatement, char *completionTag); -static void CopyToNewShards(CopyStmt *copyStatement, char *completionTag, Oid relationId); +static CopyTupleSource * CreateLocalCopyTupleSource(CopyStmt *copyStatement); +static void LocalCopyOpen(void *context, Relation relation, + ErrorContextCallback *errorCallback); +static bool LocalCopyNextTuple(void *context, Datum *columnValues, bool *columnNulls); +static void LocalCopyClose(void *context); +static uint64 LocalCopyFromWorkerNode(CopyStmt *copyStatement); +static uint64 CopyToExistingShards(CopyTupleSource *tupleSource, RangeVar *relation); +static uint64 CopyToNewShards(CopyTupleSource *tupleSource, RangeVar *relation, + Oid relationId); static char MasterPartitionMethod(RangeVar *relation); static void RemoveMasterOptions(CopyStmt *copyStatement); -static void OpenCopyTransactions(CopyStmt *copyStatement, - ShardConnections *shardConnections, bool stopOnFailure, - bool useBinaryCopyFormat); +static void OpenCopyTransactions(RangeVar *relation, ShardConnections *shardConnections, + bool stopOnFailure, bool useBinaryCopyFormat); +static CopyOutState CreateCopyOutState(TupleDesc tupleDescriptor, + MemoryContext rowContext); static bool CanUseBinaryCopyFormat(TupleDesc tupleDescription, CopyOutState rowOutputState); static List * MasterShardPlacementList(uint64 shardId); static List * RemoteFinalizedShardPlacementList(uint64 shardId); static void SendCopyBinaryHeaders(CopyOutState copyOutState, List *connectionList); static void SendCopyBinaryFooters(CopyOutState copyOutState, List *connectionList); -static StringInfo ConstructCopyStatement(CopyStmt *copyStatement, int64 shardId, +static StringInfo ConstructCopyStatement(RangeVar *relation, int64 shardId, bool useBinaryCopyFormat); static void SendCopyDataToAll(StringInfo dataBuffer, List *connectionList); static void SendCopyDataToPlacement(StringInfo dataBuffer, PGconn *connection, @@ -158,8 +174,8 @@ static void SendCopyDataToPlacement(StringInfo dataBuffer, PGconn *connection, static void EndRemoteCopy(List *connectionList, bool stopOnFailure); static void ReportCopyError(PGconn *connection, PGresult *result); static uint32 AvailableColumnCount(TupleDesc tupleDescriptor); -static void StartCopyToNewShard(ShardConnections *shardConnections, - CopyStmt *copyStatement, bool useBinaryCopyFormat); +static void StartCopyToNewShard(RangeVar *relation, ShardConnections *shardConnections, + bool useBinaryCopyFormat); static int64 MasterCreateEmptyShard(char *relationName); static int64 CreateEmptyShard(char *relationName); static int64 RemoteCreateEmptyShard(char *relationName); @@ -186,6 +202,7 @@ void CitusCopyFrom(CopyStmt *copyStatement, char *completionTag) { bool isCopyFromWorker = false; + uint64 processedRowCount = 0; /* disallow COPY to/from file or program except for superusers */ if (copyStatement->filename != NULL && !superuser()) @@ -211,33 +228,143 @@ CitusCopyFrom(CopyStmt *copyStatement, char *completionTag) isCopyFromWorker = IsCopyFromWorker(copyStatement); if (isCopyFromWorker) { - CopyFromWorkerNode(copyStatement, completionTag); + processedRowCount = LocalCopyFromWorkerNode(copyStatement); } else { - Oid relationId = RangeVarGetRelid(copyStatement->relation, NoLock, false); - char partitionMethod = PartitionMethod(relationId); + CopyTupleSource *tupleSource = CreateLocalCopyTupleSource(copyStatement); + RangeVar *relation = copyStatement->relation; - if (partitionMethod == DISTRIBUTE_BY_HASH || partitionMethod == - DISTRIBUTE_BY_RANGE) - { - CopyToExistingShards(copyStatement, completionTag); - } - else if (partitionMethod == DISTRIBUTE_BY_APPEND) - { - CopyToNewShards(copyStatement, completionTag, relationId); - } - else - { - ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("unsupported partition method"))); - } + processedRowCount = CopyTupleSourceToShards(tupleSource, relation); + } + + if (completionTag != NULL) + { + snprintf(completionTag, COMPLETION_TAG_BUFSIZE, + "COPY " UINT64_FORMAT, processedRowCount); } XactModificationLevel = XACT_MODIFICATION_DATA; } +/* + * CopyTupleSourceToShards copies the tuples provided by tupleSource to + * the shards of the given relation. For hash- and range-distributed + * tables it copies to the existing shards and for append-distributed + * tables it adds a new shard. + */ +uint64 +CopyTupleSourceToShards(CopyTupleSource *tupleSource, RangeVar *relation) +{ + uint64 processedRowCount = 0; + + Oid relationId = RangeVarGetRelid(relation, NoLock, false); + char partitionMethod = PartitionMethod(relationId); + + if (partitionMethod == DISTRIBUTE_BY_HASH || partitionMethod == DISTRIBUTE_BY_RANGE) + { + processedRowCount = CopyToExistingShards(tupleSource, relation); + } + else if (partitionMethod == DISTRIBUTE_BY_APPEND) + { + processedRowCount = CopyToNewShards(tupleSource, relation, relationId); + } + else + { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("unsupported partition method"))); + } + + return processedRowCount; +} + + +/* + * CreateLocalCopyTupleSource creates and returns a tuple source for a local + * COPY .. FROM command. + */ +static CopyTupleSource * +CreateLocalCopyTupleSource(CopyStmt *copyStatement) +{ + LocalCopyContext *localCopyContext = palloc0(sizeof(LocalCopyContext)); + CopyTupleSource *tupleSource = palloc0(sizeof(CopyTupleSource)); + EState *executorState = CreateExecutorState(); + + localCopyContext->copyStatement = copyStatement; + localCopyContext->copyState = NULL; + localCopyContext->executorState = executorState; + + tupleSource->context = localCopyContext; + tupleSource->rowContext = GetPerTupleMemoryContext(executorState); + tupleSource->Open = LocalCopyOpen; + tupleSource->NextTuple = LocalCopyNextTuple; + tupleSource->Close = LocalCopyClose; + + return tupleSource; +} + + +/* + * LocalCopyOpen opens the COPY input for copying into the given relation. + */ +static void +LocalCopyOpen(void *context, Relation relation, ErrorContextCallback *errorCallback) +{ + LocalCopyContext *localCopyContext = (LocalCopyContext *) context; + + CopyStmt *copyStatement = localCopyContext->copyStatement; + CopyState copyState = NULL; + + copyState = BeginCopyFrom(relation, + copyStatement->filename, + copyStatement->is_program, + copyStatement->attlist, + copyStatement->options); + + localCopyContext->copyState = copyState; + + errorCallback->callback = CopyFromErrorCallback; + errorCallback->arg = (void *) copyState; + errorCallback->previous = error_context_stack; +} + + +/* + * LocalCopyNextTuple reads a tuple from the COPY input. columnValeus + * will contain values for all columns, using the default values for + * columns that were not provided by the COPY command. columnNulls + * is true for columns that contain NULL. LocalCopyNextTuple returns + * if the end of the input is reached and true otherwise. + */ +static bool +LocalCopyNextTuple(void *context, Datum *columnValues, bool *columnNulls) +{ + LocalCopyContext *localCopyContext = (LocalCopyContext *) context; + + CopyState copyState = localCopyContext->copyState; + EState *executorState = localCopyContext->executorState; + ExprContext *expressionContext = GetPerTupleExprContext(executorState); + + bool nextRowFound = NextCopyFrom(copyState, expressionContext, columnValues, + columnNulls, NULL); + + return nextRowFound; +} + + +/* + * LocalCopyClose closes the COPY input. + */ +static void +LocalCopyClose(void *context) +{ + LocalCopyContext *localCopyContext = (LocalCopyContext *) context; + + EndCopyFrom(localCopyContext->copyState); +} + + /* * IsCopyFromWorker checks if the given copy statement has the master host option. */ @@ -259,16 +386,17 @@ IsCopyFromWorker(CopyStmt *copyStatement) /* - * CopyFromWorkerNode implements the COPY table_name FROM ... from worker nodes - * for append-partitioned tables. + * LocalCopyFromWorkerNode implements the COPY table_name FROM ... from worker + * nodes for append-partitioned tables. */ -static void -CopyFromWorkerNode(CopyStmt *copyStatement, char *completionTag) +static uint64 +LocalCopyFromWorkerNode(CopyStmt *copyStatement) { NodeAddress *masterNodeAddress = MasterNodeAddress(copyStatement); char *nodeName = masterNodeAddress->nodeName; int32 nodePort = masterNodeAddress->nodePort; char *nodeUser = CurrentUserName(); + uint64 processedRowCount = 0; if (XactModificationLevel > XACT_MODIFICATION_NONE) { @@ -285,17 +413,19 @@ CopyFromWorkerNode(CopyStmt *copyStatement, char *completionTag) PGresult *queryResult = NULL; Oid relationId = InvalidOid; char partitionMethod = 0; + CopyTupleSource *tupleSource = NULL; /* strip schema name for local reference */ - char *schemaName = copyStatement->relation->schemaname; - copyStatement->relation->schemaname = NULL; + RangeVar *relation = copyStatement->relation; + char *schemaName = relation->schemaname; + relation->schemaname = NULL; - relationId = RangeVarGetRelid(copyStatement->relation, NoLock, false); + relationId = RangeVarGetRelid(relation, NoLock, false); /* put schema name back */ - copyStatement->relation->schemaname = schemaName; + relation->schemaname = schemaName; - partitionMethod = MasterPartitionMethod(copyStatement->relation); + partitionMethod = MasterPartitionMethod(relation); if (partitionMethod != DISTRIBUTE_BY_APPEND) { ereport(ERROR, (errmsg("copy from worker nodes is only supported " @@ -317,7 +447,9 @@ CopyFromWorkerNode(CopyStmt *copyStatement, char *completionTag) */ RemoveMasterOptions(copyStatement); - CopyToNewShards(copyStatement, completionTag, relationId); + tupleSource = CreateLocalCopyTupleSource(copyStatement); + + processedRowCount = CopyToNewShards(tupleSource, relation, relationId); /* commit metadata transactions */ queryResult = PQexec(masterConnection, "COMMIT"); @@ -341,6 +473,8 @@ CopyFromWorkerNode(CopyStmt *copyStatement, char *completionTag) PG_RE_THROW(); } PG_END_TRY(); + + return processedRowCount; } @@ -349,10 +483,10 @@ CopyFromWorkerNode(CopyStmt *copyStatement, char *completionTag) * range-partitioned tables where there are already shards into which to copy * rows. */ -static void -CopyToExistingShards(CopyStmt *copyStatement, char *completionTag) +static uint64 +CopyToExistingShards(CopyTupleSource *tupleSource, RangeVar *relation) { - Oid tableId = RangeVarGetRelid(copyStatement->relation, NoLock, false); + Oid tableId = RangeVarGetRelid(relation, NoLock, false); char *relationName = get_rel_name(tableId); Relation distributedRelation = NULL; TupleDesc tupleDescriptor = NULL; @@ -363,8 +497,6 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag) FmgrInfo *compareFunction = NULL; bool hasUniformHashDistribution = false; DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(tableId); - const char *delimiterCharacter = "\t"; - const char *nullPrintCharacter = "\\N"; int shardCount = 0; List *shardIntervalList = NULL; @@ -375,11 +507,6 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag) ShardConnections *shardConnections = NULL; List *connectionList = NIL; - EState *executorState = NULL; - MemoryContext executorTupleContext = NULL; - ExprContext *executorExpressionContext = NULL; - - CopyState copyState = NULL; CopyOutState copyOutState = NULL; FmgrInfo *columnOutputFunctions = NULL; uint64 processedRowCount = 0; @@ -446,25 +573,7 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag) useBinarySearch = true; } - /* initialize copy state to read from COPY data source */ - copyState = BeginCopyFrom(distributedRelation, - copyStatement->filename, - copyStatement->is_program, - copyStatement->attlist, - copyStatement->options); - - executorState = CreateExecutorState(); - executorTupleContext = GetPerTupleMemoryContext(executorState); - executorExpressionContext = GetPerTupleExprContext(executorState); - - copyOutState = (CopyOutState) palloc0(sizeof(CopyOutStateData)); - copyOutState->delim = (char *) delimiterCharacter; - copyOutState->null_print = (char *) nullPrintCharacter; - copyOutState->null_print_client = (char *) nullPrintCharacter; - copyOutState->binary = CanUseBinaryCopyFormat(tupleDescriptor, copyOutState); - copyOutState->fe_msgbuf = makeStringInfo(); - copyOutState->rowcontext = executorTupleContext; - + copyOutState = CreateCopyOutState(tupleDescriptor, tupleSource->rowContext); columnOutputFunctions = ColumnOutputFunctions(tupleDescriptor, copyOutState->binary); /* @@ -480,10 +589,10 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag) { ErrorContextCallback errorCallback; + /* open the tuple source */ + tupleSource->Open(tupleSource->context, distributedRelation, &errorCallback); + /* set up callback to identify error line number */ - errorCallback.callback = CopyFromErrorCallback; - errorCallback.arg = (void *) copyState; - errorCallback.previous = error_context_stack; error_context_stack = &errorCallback; /* ensure transactions have unique names on worker nodes */ @@ -498,14 +607,13 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag) bool shardConnectionsFound = false; MemoryContext oldContext = NULL; - ResetPerTupleExprContext(executorState); + MemoryContextReset(tupleSource->rowContext); - oldContext = MemoryContextSwitchTo(executorTupleContext); + oldContext = MemoryContextSwitchTo(tupleSource->rowContext); /* parse a row from the input */ - nextRowFound = NextCopyFrom(copyState, executorExpressionContext, - columnValues, columnNulls, NULL); - + nextRowFound = tupleSource->NextTuple(tupleSource->context, columnValues, + columnNulls); if (!nextRowFound) { MemoryContextSwitchTo(oldContext); @@ -547,7 +655,7 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag) if (!shardConnectionsFound) { /* open connections and initiate COPY on shard placements */ - OpenCopyTransactions(copyStatement, shardConnections, false, + OpenCopyTransactions(relation, shardConnections, false, copyOutState->binary); /* send copy binary headers to shard placements */ @@ -586,7 +694,8 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag) PrepareRemoteTransactions(connectionList); } - EndCopyFrom(copyState); + tupleSource->Close(tupleSource->context); + heap_close(distributedRelation, NoLock); /* check for cancellation one last time before committing */ @@ -614,11 +723,7 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag) CommitRemoteTransactions(connectionList, false); CloseConnections(connectionList); - if (completionTag != NULL) - { - snprintf(completionTag, COMPLETION_TAG_BUFSIZE, - "COPY " UINT64_FORMAT, processedRowCount); - } + return processedRowCount; } @@ -626,10 +731,10 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag) * CopyToNewShards implements the COPY table_name FROM ... for append-partitioned * tables where we create new shards into which to copy rows. */ -static void -CopyToNewShards(CopyStmt *copyStatement, char *completionTag, Oid relationId) +static uint64 +CopyToNewShards(CopyTupleSource *tupleSource, RangeVar *relation, Oid relationId) { - FmgrInfo *columnOutputFunctions = NULL; + uint64 processedRowCount = 0; /* allocate column values and nulls arrays */ Relation distributedRelation = heap_open(relationId, RowExclusiveLock); @@ -638,36 +743,18 @@ CopyToNewShards(CopyStmt *copyStatement, char *completionTag, Oid relationId) Datum *columnValues = palloc0(columnCount * sizeof(Datum)); bool *columnNulls = palloc0(columnCount * sizeof(bool)); - EState *executorState = CreateExecutorState(); - MemoryContext executorTupleContext = GetPerTupleMemoryContext(executorState); - ExprContext *executorExpressionContext = GetPerTupleExprContext(executorState); - - const char *delimiterCharacter = "\t"; - const char *nullPrintCharacter = "\\N"; + ShardConnections *shardConnections = NULL; + CopyOutState copyOutState = NULL; + FmgrInfo *columnOutputFunctions = NULL; /* * Shard connections should be initialized before the PG_TRY, since it is * used in PG_CATCH. Otherwise, it may be undefined in the PG_CATCH * (see sigsetjmp documentation). */ - ShardConnections *shardConnections = - (ShardConnections *) palloc0(sizeof(ShardConnections)); - - /* initialize copy state to read from COPY data source */ - CopyState copyState = BeginCopyFrom(distributedRelation, - copyStatement->filename, - copyStatement->is_program, - copyStatement->attlist, - copyStatement->options); - - CopyOutState copyOutState = (CopyOutState) palloc0(sizeof(CopyOutStateData)); - copyOutState->delim = (char *) delimiterCharacter; - copyOutState->null_print = (char *) nullPrintCharacter; - copyOutState->null_print_client = (char *) nullPrintCharacter; - copyOutState->binary = CanUseBinaryCopyFormat(tupleDescriptor, copyOutState); - copyOutState->fe_msgbuf = makeStringInfo(); - copyOutState->rowcontext = executorTupleContext; + shardConnections = palloc0(sizeof(ShardConnections)); + copyOutState = CreateCopyOutState(tupleDescriptor, tupleSource->rowContext); columnOutputFunctions = ColumnOutputFunctions(tupleDescriptor, copyOutState->binary); /* we use a PG_TRY block to close connections on errors (e.g. in NextCopyFrom) */ @@ -675,14 +762,12 @@ CopyToNewShards(CopyStmt *copyStatement, char *completionTag, Oid relationId) { uint64 shardMaxSizeInBytes = (int64) ShardMaxSize * 1024L; uint64 copiedDataSizeInBytes = 0; - uint64 processedRowCount = 0; /* set up callback to identify error line number */ ErrorContextCallback errorCallback; - errorCallback.callback = CopyFromErrorCallback; - errorCallback.arg = (void *) copyState; - errorCallback.previous = error_context_stack; + /* open the tuple source */ + tupleSource->Open(tupleSource->context, distributedRelation, &errorCallback); while (true) { @@ -690,16 +775,18 @@ CopyToNewShards(CopyStmt *copyStatement, char *completionTag, Oid relationId) MemoryContext oldContext = NULL; uint64 messageBufferSize = 0; - ResetPerTupleExprContext(executorState); + /* clear the previous tuple's memory */ + MemoryContextReset(tupleSource->rowContext); - /* switch to tuple memory context and start showing line number in errors */ + /* start showing the line number in errors */ error_context_stack = &errorCallback; - oldContext = MemoryContextSwitchTo(executorTupleContext); + + /* switch to tuple memory context */ + oldContext = MemoryContextSwitchTo(tupleSource->rowContext); /* parse a row from the input */ - nextRowFound = NextCopyFrom(copyState, executorExpressionContext, - columnValues, columnNulls, NULL); - + nextRowFound = tupleSource->NextTuple(tupleSource->context, columnValues, + columnNulls); if (!nextRowFound) { /* switch to regular memory context and stop showing line number in errors */ @@ -723,8 +810,7 @@ CopyToNewShards(CopyStmt *copyStatement, char *completionTag, Oid relationId) if (copiedDataSizeInBytes == 0) { /* create shard and open connections to shard placements */ - StartCopyToNewShard(shardConnections, copyStatement, - copyOutState->binary); + StartCopyToNewShard(relation, shardConnections, copyOutState->binary); /* send copy binary headers to shard placements */ if (copyOutState->binary) @@ -782,17 +868,12 @@ CopyToNewShards(CopyStmt *copyStatement, char *completionTag, Oid relationId) MasterUpdateShardStatistics(shardConnections->shardId); } - EndCopyFrom(copyState); + tupleSource->Close(tupleSource->context); + heap_close(distributedRelation, NoLock); /* check for cancellation one last time before returning */ CHECK_FOR_INTERRUPTS(); - - if (completionTag != NULL) - { - snprintf(completionTag, COMPLETION_TAG_BUFSIZE, - "COPY " UINT64_FORMAT, processedRowCount); - } } PG_CATCH(); { @@ -804,6 +885,8 @@ CopyToNewShards(CopyStmt *copyStatement, char *completionTag, Oid relationId) PG_RE_THROW(); } PG_END_TRY(); + + return processedRowCount; } @@ -920,7 +1003,7 @@ RemoveMasterOptions(CopyStmt *copyStatement) * shard placements. */ static void -OpenCopyTransactions(CopyStmt *copyStatement, ShardConnections *shardConnections, +OpenCopyTransactions(RangeVar *relation, ShardConnections *shardConnections, bool stopOnFailure, bool useBinaryCopyFormat) { List *finalizedPlacementList = NIL; @@ -986,7 +1069,7 @@ OpenCopyTransactions(CopyStmt *copyStatement, ShardConnections *shardConnections } PQclear(result); - copyCommand = ConstructCopyStatement(copyStatement, shardConnections->shardId, + copyCommand = ConstructCopyStatement(relation, shardConnections->shardId, useBinaryCopyFormat); result = PQexec(connection, copyCommand->data); @@ -1040,6 +1123,28 @@ OpenCopyTransactions(CopyStmt *copyStatement, ShardConnections *shardConnections } +/* + * CreateCopyOutState creates a copy output state to pass to the COPY + * functions. + */ +static CopyOutState +CreateCopyOutState(TupleDesc tupleDescriptor, MemoryContext rowContext) +{ + static const char *delimiterCharacter = "\t"; + static const char *nullPrintCharacter = "\\N"; + + CopyOutState copyOutState = (CopyOutState) palloc0(sizeof(CopyOutStateData)); + copyOutState->delim = (char *) delimiterCharacter; + copyOutState->null_print = (char *) nullPrintCharacter; + copyOutState->null_print_client = (char *) nullPrintCharacter; + copyOutState->binary = CanUseBinaryCopyFormat(tupleDescriptor, copyOutState); + copyOutState->fe_msgbuf = makeStringInfo(); + copyOutState->rowcontext = rowContext; + + return copyOutState; +} + + /* * CanUseBinaryCopyFormat iterates over columns of the relation given in rowOutputState * and looks for a column whose type is array of user-defined type or composite type. @@ -1173,12 +1278,12 @@ SendCopyBinaryFooters(CopyOutState copyOutState, List *connectionList) * shard. */ static StringInfo -ConstructCopyStatement(CopyStmt *copyStatement, int64 shardId, bool useBinaryCopyFormat) +ConstructCopyStatement(RangeVar *relation, int64 shardId, bool useBinaryCopyFormat) { StringInfo command = makeStringInfo(); - char *schemaName = copyStatement->relation->schemaname; - char *relationName = copyStatement->relation->relname; + char *schemaName = relation->schemaname; + char *relationName = relation->relname; char *shardName = pstrdup(relationName); char *shardQualifiedName = NULL; @@ -1536,11 +1641,11 @@ AppendCopyBinaryFooters(CopyOutState footerOutputState) * opens connections to shard placements. */ static void -StartCopyToNewShard(ShardConnections *shardConnections, CopyStmt *copyStatement, +StartCopyToNewShard(RangeVar *relation, ShardConnections *shardConnections, bool useBinaryCopyFormat) { - char *relationName = copyStatement->relation->relname; - char *schemaName = copyStatement->relation->schemaname; + char *relationName = relation->relname; + char *schemaName = relation->schemaname; char *qualifiedName = quote_qualified_identifier(schemaName, relationName); int64 shardId = MasterCreateEmptyShard(qualifiedName); @@ -1551,7 +1656,7 @@ StartCopyToNewShard(ShardConnections *shardConnections, CopyStmt *copyStatement, shardConnections->connectionList = NIL; /* connect to shards placements and start transactions */ - OpenCopyTransactions(copyStatement, shardConnections, true, useBinaryCopyFormat); + OpenCopyTransactions(relation, shardConnections, true, useBinaryCopyFormat); } diff --git a/src/include/distributed/multi_copy.h b/src/include/distributed/multi_copy.h index d27926bbf..30a316c34 100644 --- a/src/include/distributed/multi_copy.h +++ b/src/include/distributed/multi_copy.h @@ -43,8 +43,20 @@ typedef struct NodeAddress int32 nodePort; } NodeAddress; +/* struct type for a generic source of tuples to copy to shards */ +typedef struct CopyTupleSource +{ + void *context; + MemoryContext rowContext; + + void (*Open)(void *context, Relation relation, ErrorContextCallback *errorCallback); + bool (*NextTuple)(void *context, Datum *columnValues, bool *columnNulls); + void (*Close)(void *context); +} CopyTupleSource; + /* function declarations for copying into a distributed table */ +extern uint64 CopyTupleSourceToShards(CopyTupleSource *tupleSource, RangeVar *relation); extern FmgrInfo * ColumnOutputFunctions(TupleDesc rowDescriptor, bool binaryFormat); extern void AppendCopyRowData(Datum *valueArray, bool *isNullArray, TupleDesc rowDescriptor,