diff --git a/src/backend/distributed/operations/worker_shard_copy.c b/src/backend/distributed/operations/worker_shard_copy.c index 24f50e1d0..4c0d0ebd8 100644 --- a/src/backend/distributed/operations/worker_shard_copy.c +++ b/src/backend/distributed/operations/worker_shard_copy.c @@ -83,8 +83,9 @@ static int ReadFromLocalBufferCallback(void *outBuf, int minRead, int maxRead); static void LocalCopyToShard(ShardCopyDestReceiver *copyDest, CopyOutState localCopyOutState); static void ConnectToRemoteAndStartCopy(ShardCopyDestReceiver *copyDest); -static List *CreateCopyOptions(bool isBinaryCopy); -static StringInfo ConstructShardTruncateStatement(List *destinationShardFullyQualifiedName); +static List * CreateCopyOptions(bool isBinaryCopy); +static StringInfo ConstructShardTruncateStatement( + List *destinationShardFullyQualifiedName); static bool @@ -114,8 +115,9 @@ ConnectToRemoteAndStartCopy(ShardCopyDestReceiver *copyDest) SetupReplicationOriginRemoteSession(copyDest->connection); - // Construct and send the TRUNCATE statement to the remote node - StringInfo truncateStatement = ConstructShardTruncateStatement(copyDest->destinationShardFullyQualifiedName); + /* Construct and send the TRUNCATE statement to the remote node */ + StringInfo truncateStatement = ConstructShardTruncateStatement( + copyDest->destinationShardFullyQualifiedName); if (!SendRemoteCommand(copyDest->connection, truncateStatement->data)) { @@ -127,9 +129,9 @@ ConnectToRemoteAndStartCopy(ShardCopyDestReceiver *copyDest) { ReportResultError(copyDest->connection, truncateResult, ERROR); } - PQclear(truncateResult); + PQclear(truncateResult); - // Construct the COPY command and send it to the remote node + /* Construct the COPY command and send it to the remote node */ StringInfo copyStatement = ConstructShardCopyStatement( copyDest->destinationShardFullyQualifiedName, copyDest->copyOutState->binary, @@ -478,31 +480,31 @@ ConstructShardTruncateStatement(List *destinationShardFullyQualifiedName) */ static StringInfo ConstructShardCopyStatement(List *destinationShardFullyQualifiedName, bool - useBinaryFormat, - TupleDesc tupleDesc) + useBinaryFormat, + TupleDesc tupleDesc) { - char *destinationShardSchemaName = linitial(destinationShardFullyQualifiedName); - char *destinationShardRelationName = lsecond(destinationShardFullyQualifiedName); + char *destinationShardSchemaName = linitial(destinationShardFullyQualifiedName); + char *destinationShardRelationName = lsecond(destinationShardFullyQualifiedName); - StringInfo command = makeStringInfo(); + StringInfo command = makeStringInfo(); - const char *columnList = CopyableColumnNamesFromTupleDesc(tupleDesc); + const char *columnList = CopyableColumnNamesFromTupleDesc(tupleDesc); - appendStringInfo(command, "COPY %s.%s (%s) FROM STDIN", - quote_identifier(destinationShardSchemaName), - quote_identifier(destinationShardRelationName), - columnList); + appendStringInfo(command, "COPY %s.%s (%s) FROM STDIN", + quote_identifier(destinationShardSchemaName), + quote_identifier(destinationShardRelationName), + columnList); - if (useBinaryFormat) - { - appendStringInfo(command, " WITH (format binary, FREEZE);"); - } - else - { - appendStringInfo(command, " WITH (FREEZE);"); - } + if (useBinaryFormat) + { + appendStringInfo(command, " WITH (format binary, FREEZE);"); + } + else + { + appendStringInfo(command, " WITH (FREEZE);"); + } - return command; + return command; } @@ -542,20 +544,21 @@ WriteLocalTuple(TupleTableSlot *slot, ShardCopyDestReceiver *copyDest) static List * CreateCopyOptions(bool isBinaryCopy) { - List *options = NIL; + List *options = NIL; - // Add the FREEZE option - DefElem *freezeOption = makeDefElem("freeze", (Node *) makeInteger(true), -1); - options = lappend(options, freezeOption); + /* Add the FREEZE option */ + DefElem *freezeOption = makeDefElem("freeze", (Node *) makeInteger(true), -1); + options = lappend(options, freezeOption); - // If binary format is used, add the binary format option - if (isBinaryCopy) - { - DefElem *binaryFormatOption = makeDefElem("format", (Node *) makeString("binary"), -1); - options = lappend(options, binaryFormatOption); - } + /* If binary format is used, add the binary format option */ + if (isBinaryCopy) + { + DefElem *binaryFormatOption = makeDefElem("format", (Node *) makeString("binary"), + -1); + options = lappend(options, binaryFormatOption); + } - return options; + return options; } @@ -565,49 +568,54 @@ CreateCopyOptions(bool isBinaryCopy) static void LocalCopyToShard(ShardCopyDestReceiver *copyDest, CopyOutState localCopyOutState) { - bool isBinaryCopy = localCopyOutState->binary; + bool isBinaryCopy = localCopyOutState->binary; - if (isBinaryCopy) - { - AppendCopyBinaryFooters(localCopyOutState); - } + if (isBinaryCopy) + { + AppendCopyBinaryFooters(localCopyOutState); + } - /* - * Set the buffer as a global variable to allow ReadFromLocalBufferCallback - * to read from it. We cannot pass additional arguments to - * ReadFromLocalBufferCallback. - */ - LocalCopyBuffer = localCopyOutState->fe_msgbuf; + /* + * Set the buffer as a global variable to allow ReadFromLocalBufferCallback + * to read from it. We cannot pass additional arguments to + * ReadFromLocalBufferCallback. + */ + LocalCopyBuffer = localCopyOutState->fe_msgbuf; - // Extract schema and relation names - char *destinationShardSchemaName = linitial(copyDest->destinationShardFullyQualifiedName); - char *destinationShardRelationName = lsecond(copyDest->destinationShardFullyQualifiedName); + /* Extract schema and relation names */ + char *destinationShardSchemaName = linitial( + copyDest->destinationShardFullyQualifiedName); + char *destinationShardRelationName = lsecond( + copyDest->destinationShardFullyQualifiedName); - // Get OIDs for schema and shard - Oid destinationSchemaOid = get_namespace_oid(destinationShardSchemaName, false); - Oid destinationShardOid = get_relname_relid(destinationShardRelationName, destinationSchemaOid); + /* Get OIDs for schema and shard */ + Oid destinationSchemaOid = get_namespace_oid(destinationShardSchemaName, false); + Oid destinationShardOid = get_relname_relid(destinationShardRelationName, + destinationSchemaOid); - // Create options list for COPY command - List *options = CreateCopyOptions(isBinaryCopy); + /* Create options list for COPY command */ + List *options = CreateCopyOptions(isBinaryCopy); - // Open the shard relation - Relation shard = table_open(destinationShardOid, RowExclusiveLock); + /* Open the shard relation */ + Relation shard = table_open(destinationShardOid, RowExclusiveLock); - // Create and configure parse state - ParseState *pState = make_parsestate(NULL); - (void) addRangeTableEntryForRelation(pState, shard, AccessShareLock, NULL, false, false); + /* Create and configure parse state */ + ParseState *pState = make_parsestate(NULL); + (void) addRangeTableEntryForRelation(pState, shard, AccessShareLock, NULL, false, + false); - // Begin and execute the COPY FROM operation - CopyFromState cstate = BeginCopyFrom(pState, shard, NULL, NULL, false, ReadFromLocalBufferCallback, NULL, options); - CopyFrom(cstate); - EndCopyFrom(cstate); + /* Begin and execute the COPY FROM operation */ + CopyFromState cstate = BeginCopyFrom(pState, shard, NULL, NULL, false, + ReadFromLocalBufferCallback, NULL, options); + CopyFrom(cstate); + EndCopyFrom(cstate); - // Reset the local copy buffer - resetStringInfo(localCopyOutState->fe_msgbuf); + /* Reset the local copy buffer */ + resetStringInfo(localCopyOutState->fe_msgbuf); - // Close the shard relation and free parse state - table_close(shard, NoLock); - free_parsestate(pState); + /* Close the shard relation and free parse state */ + table_close(shard, NoLock); + free_parsestate(pState); }