m3hm3t/add-freeze-copy
Mehmet YILMAZ 2024-09-10 17:55:39 +00:00
parent 9b76b62bda
commit 842f5e486f
1 changed files with 45 additions and 45 deletions

View File

@ -100,61 +100,61 @@ CanUseLocalCopy(uint32_t destinationNodeId)
static void static void
ConnectToRemoteAndStartCopy(ShardCopyDestReceiver *copyDest) ConnectToRemoteAndStartCopy(ShardCopyDestReceiver *copyDest)
{ {
int connectionFlags = OUTSIDE_TRANSACTION; int connectionFlags = OUTSIDE_TRANSACTION;
char *currentUser = CurrentUserName(); char *currentUser = CurrentUserName();
WorkerNode *workerNode = FindNodeWithNodeId(copyDest->destinationNodeId, WorkerNode *workerNode = FindNodeWithNodeId(copyDest->destinationNodeId,
false /* missingOk */); false /* missingOk */);
copyDest->connection = GetNodeUserDatabaseConnection(connectionFlags, copyDest->connection = GetNodeUserDatabaseConnection(connectionFlags,
workerNode->workerName, workerNode->workerName,
workerNode->workerPort, workerNode->workerPort,
currentUser, currentUser,
NULL /* database (current) */); NULL /* database (current) */);
ClaimConnectionExclusively(copyDest->connection); ClaimConnectionExclusively(copyDest->connection);
/* Begin the remote transaction */ /* Begin the remote transaction */
RemoteTransactionBegin(copyDest->connection); RemoteTransactionBegin(copyDest->connection);
SetupReplicationOriginRemoteSession(copyDest->connection); SetupReplicationOriginRemoteSession(copyDest->connection);
/* Handle TRUNCATE or any setup commands */ /* Handle TRUNCATE or any setup commands */
StringInfo truncateStatement = ConstructShardTruncateStatement( StringInfo truncateStatement = ConstructShardTruncateStatement(
copyDest->destinationShardFullyQualifiedName); copyDest->destinationShardFullyQualifiedName);
if (!SendRemoteCommand(copyDest->connection, truncateStatement->data)) if (!SendRemoteCommand(copyDest->connection, truncateStatement->data))
{ {
ReportConnectionError(copyDest->connection, ERROR); ReportConnectionError(copyDest->connection, ERROR);
RemoteTransactionAbort(copyDest->connection); RemoteTransactionAbort(copyDest->connection);
ResetRemoteTransaction(copyDest->connection); ResetRemoteTransaction(copyDest->connection);
} }
PGresult *truncateResult = GetRemoteCommandResult(copyDest->connection, true); PGresult *truncateResult = GetRemoteCommandResult(copyDest->connection, true);
if (!IsResponseOK(truncateResult)) if (!IsResponseOK(truncateResult))
{ {
ReportResultError(copyDest->connection, truncateResult, ERROR); ReportResultError(copyDest->connection, truncateResult, ERROR);
PQclear(truncateResult); PQclear(truncateResult);
ForgetResults(copyDest->connection); ForgetResults(copyDest->connection);
RemoteTransactionAbort(copyDest->connection);
ResetRemoteTransaction(copyDest->connection);
}
PQclear(truncateResult);
ForgetResults(copyDest->connection);
/* Construct and send the COPY statement with FREEZE */
StringInfo copyStatement = ConstructShardCopyStatement(
copyDest->destinationShardFullyQualifiedName,
copyDest->copyOutState->binary,
copyDest->tupleDescriptor);
if (!SendRemoteCommand(copyDest->connection, copyStatement->data))
{
ReportConnectionError(copyDest->connection, ERROR);
RemoteTransactionAbort(copyDest->connection); RemoteTransactionAbort(copyDest->connection);
ResetRemoteTransaction(copyDest->connection); ResetRemoteTransaction(copyDest->connection);
} }
PQclear(truncateResult);
ForgetResults(copyDest->connection);
/* Construct and send the COPY statement with FREEZE */
StringInfo copyStatement = ConstructShardCopyStatement(
copyDest->destinationShardFullyQualifiedName,
copyDest->copyOutState->binary,
copyDest->tupleDescriptor);
if (!SendRemoteCommand(copyDest->connection, copyStatement->data))
{
ReportConnectionError(copyDest->connection, ERROR);
RemoteTransactionAbort(copyDest->connection);
ResetRemoteTransaction(copyDest->connection);
}
PGresult *copyResult = GetRemoteCommandResult(copyDest->connection, PGresult *copyResult = GetRemoteCommandResult(copyDest->connection,
true /* raiseInterrupts */); true /* raiseInterrupts */);
if (PQresultStatus(copyResult ) != PGRES_COPY_IN) if (PQresultStatus(copyResult) != PGRES_COPY_IN)
{ {
ReportResultError(copyDest->connection, copyResult, ERROR); ReportResultError(copyDest->connection, copyResult, ERROR);
} }