diff --git a/src/backend/distributed/operations/worker_shard_copy.c b/src/backend/distributed/operations/worker_shard_copy.c index 4c0d0ebd8..5a22fc95b 100644 --- a/src/backend/distributed/operations/worker_shard_copy.c +++ b/src/backend/distributed/operations/worker_shard_copy.c @@ -100,56 +100,66 @@ CanUseLocalCopy(uint32_t destinationNodeId) static void ConnectToRemoteAndStartCopy(ShardCopyDestReceiver *copyDest) { - int connectionFlags = OUTSIDE_TRANSACTION; - char *currentUser = CurrentUserName(); - WorkerNode *workerNode = FindNodeWithNodeId(copyDest->destinationNodeId, - false /* missingOk */); - copyDest->connection = GetNodeUserDatabaseConnection(connectionFlags, - workerNode->workerName, - workerNode->workerPort, - currentUser, - NULL /* database (current) */); - ClaimConnectionExclusively(copyDest->connection); + int connectionFlags = OUTSIDE_TRANSACTION; + char *currentUser = CurrentUserName(); + WorkerNode *workerNode = FindNodeWithNodeId(copyDest->destinationNodeId, + false /* missingOk */); + copyDest->connection = GetNodeUserDatabaseConnection(connectionFlags, + workerNode->workerName, + workerNode->workerPort, + currentUser, + NULL /* database (current) */); + ClaimConnectionExclusively(copyDest->connection); - StartRemoteTransactionBegin(copyDest->connection); + /* Begin the remote transaction */ + RemoteTransactionBegin(copyDest->connection); SetupReplicationOriginRemoteSession(copyDest->connection); - /* Construct and send the TRUNCATE statement to the remote node */ - StringInfo truncateStatement = ConstructShardTruncateStatement( - copyDest->destinationShardFullyQualifiedName); + /* Handle TRUNCATE or any setup commands */ + StringInfo truncateStatement = ConstructShardTruncateStatement( + copyDest->destinationShardFullyQualifiedName); - if (!SendRemoteCommand(copyDest->connection, truncateStatement->data)) - { - ReportConnectionError(copyDest->connection, ERROR); - } + if (!SendRemoteCommand(copyDest->connection, truncateStatement->data)) + { + ReportConnectionError(copyDest->connection, ERROR); + RemoteTransactionAbort(copyDest->connection); + ResetRemoteTransaction(copyDest->connection); + } - PGresult *truncateResult = GetRemoteCommandResult(copyDest->connection, true); - if (PQresultStatus(truncateResult) != PGRES_COMMAND_OK) - { - ReportResultError(copyDest->connection, truncateResult, ERROR); - } - PQclear(truncateResult); + PGresult *truncateResult = GetRemoteCommandResult(copyDest->connection, true); + if (!IsResponseOK(truncateResult)) + { + ReportResultError(copyDest->connection, truncateResult, ERROR); + PQclear(truncateResult); + ForgetResults(copyDest->connection); + RemoteTransactionAbort(copyDest->connection); + ResetRemoteTransaction(copyDest->connection); + } + PQclear(truncateResult); + ForgetResults(copyDest->connection); - /* Construct the COPY command and send it to the remote node */ - StringInfo copyStatement = ConstructShardCopyStatement( - copyDest->destinationShardFullyQualifiedName, - copyDest->copyOutState->binary, - copyDest->tupleDescriptor); + /* Construct and send the COPY statement with FREEZE */ + StringInfo copyStatement = ConstructShardCopyStatement( + copyDest->destinationShardFullyQualifiedName, + copyDest->copyOutState->binary, + copyDest->tupleDescriptor); - if (!SendRemoteCommand(copyDest->connection, copyStatement->data)) - { - ReportConnectionError(copyDest->connection, ERROR); - } + if (!SendRemoteCommand(copyDest->connection, copyStatement->data)) + { + ReportConnectionError(copyDest->connection, ERROR); + RemoteTransactionAbort(copyDest->connection); + ResetRemoteTransaction(copyDest->connection); + } - PGresult *result = GetRemoteCommandResult(copyDest->connection, + PGresult *copyResult = GetRemoteCommandResult(copyDest->connection, true /* raiseInterrupts */); - if (PQresultStatus(result) != PGRES_COPY_IN) + if (PQresultStatus(copyResult ) != PGRES_COPY_IN) { - ReportResultError(copyDest->connection, result, ERROR); + ReportResultError(copyDest->connection, copyResult, ERROR); } - PQclear(result); + PQclear(copyResult); }