diff --git a/src/backend/distributed/operations/worker_shard_copy.c b/src/backend/distributed/operations/worker_shard_copy.c index 6ed81020b..3b45cb913 100644 --- a/src/backend/distributed/operations/worker_shard_copy.c +++ b/src/backend/distributed/operations/worker_shard_copy.c @@ -100,61 +100,61 @@ CanUseLocalCopy(uint32_t destinationNodeId) static void ConnectToRemoteAndStartCopy(ShardCopyDestReceiver *copyDest) { - int connectionFlags = OUTSIDE_TRANSACTION; - char *currentUser = CurrentUserName(); - WorkerNode *workerNode = FindNodeWithNodeId(copyDest->destinationNodeId, - false /* missingOk */); - copyDest->connection = GetNodeUserDatabaseConnection(connectionFlags, - workerNode->workerName, - workerNode->workerPort, - currentUser, - NULL /* database (current) */); - ClaimConnectionExclusively(copyDest->connection); + int connectionFlags = OUTSIDE_TRANSACTION; + char *currentUser = CurrentUserName(); + WorkerNode *workerNode = FindNodeWithNodeId(copyDest->destinationNodeId, + false /* missingOk */); + copyDest->connection = GetNodeUserDatabaseConnection(connectionFlags, + workerNode->workerName, + workerNode->workerPort, + currentUser, + NULL /* database (current) */); + ClaimConnectionExclusively(copyDest->connection); - /* Begin the remote transaction */ - RemoteTransactionBegin(copyDest->connection); + /* Begin the remote transaction */ + RemoteTransactionBegin(copyDest->connection); SetupReplicationOriginRemoteSession(copyDest->connection); - /* Handle TRUNCATE or any setup commands */ - StringInfo truncateStatement = ConstructShardTruncateStatement( - copyDest->destinationShardFullyQualifiedName); + /* Handle TRUNCATE or any setup commands */ + StringInfo truncateStatement = ConstructShardTruncateStatement( + copyDest->destinationShardFullyQualifiedName); - if (!SendRemoteCommand(copyDest->connection, truncateStatement->data)) - { - ReportConnectionError(copyDest->connection, ERROR); + if (!SendRemoteCommand(copyDest->connection, truncateStatement->data)) + { + ReportConnectionError(copyDest->connection, ERROR); RemoteTransactionAbort(copyDest->connection); - ResetRemoteTransaction(copyDest->connection); - } + ResetRemoteTransaction(copyDest->connection); + } - PGresult *truncateResult = GetRemoteCommandResult(copyDest->connection, true); - if (!IsResponseOK(truncateResult)) - { - ReportResultError(copyDest->connection, truncateResult, ERROR); - PQclear(truncateResult); - ForgetResults(copyDest->connection); - RemoteTransactionAbort(copyDest->connection); - ResetRemoteTransaction(copyDest->connection); - } - PQclear(truncateResult); - ForgetResults(copyDest->connection); - - /* Construct and send the COPY statement with FREEZE */ - StringInfo copyStatement = ConstructShardCopyStatement( - copyDest->destinationShardFullyQualifiedName, - copyDest->copyOutState->binary, - copyDest->tupleDescriptor); - - if (!SendRemoteCommand(copyDest->connection, copyStatement->data)) - { - ReportConnectionError(copyDest->connection, ERROR); + PGresult *truncateResult = GetRemoteCommandResult(copyDest->connection, true); + if (!IsResponseOK(truncateResult)) + { + ReportResultError(copyDest->connection, truncateResult, ERROR); + PQclear(truncateResult); + ForgetResults(copyDest->connection); RemoteTransactionAbort(copyDest->connection); - ResetRemoteTransaction(copyDest->connection); - } + ResetRemoteTransaction(copyDest->connection); + } + PQclear(truncateResult); + ForgetResults(copyDest->connection); + + /* Construct and send the COPY statement with FREEZE */ + StringInfo copyStatement = ConstructShardCopyStatement( + copyDest->destinationShardFullyQualifiedName, + copyDest->copyOutState->binary, + copyDest->tupleDescriptor); + + if (!SendRemoteCommand(copyDest->connection, copyStatement->data)) + { + ReportConnectionError(copyDest->connection, ERROR); + RemoteTransactionAbort(copyDest->connection); + ResetRemoteTransaction(copyDest->connection); + } PGresult *copyResult = GetRemoteCommandResult(copyDest->connection, - true /* raiseInterrupts */); - if (PQresultStatus(copyResult ) != PGRES_COPY_IN) + true /* raiseInterrupts */); + if (PQresultStatus(copyResult) != PGRES_COPY_IN) { ReportResultError(copyDest->connection, copyResult, ERROR); }