remote shard update

m3hm3t/add-freeze-copy
Mehmet YILMAZ 2024-09-10 13:37:07 +00:00
parent d13ecedcfa
commit 812f9b182f
1 changed files with 47 additions and 37 deletions

View File

@ -100,56 +100,66 @@ CanUseLocalCopy(uint32_t destinationNodeId)
static void static void
ConnectToRemoteAndStartCopy(ShardCopyDestReceiver *copyDest) ConnectToRemoteAndStartCopy(ShardCopyDestReceiver *copyDest)
{ {
int connectionFlags = OUTSIDE_TRANSACTION; int connectionFlags = OUTSIDE_TRANSACTION;
char *currentUser = CurrentUserName(); char *currentUser = CurrentUserName();
WorkerNode *workerNode = FindNodeWithNodeId(copyDest->destinationNodeId, WorkerNode *workerNode = FindNodeWithNodeId(copyDest->destinationNodeId,
false /* missingOk */); false /* missingOk */);
copyDest->connection = GetNodeUserDatabaseConnection(connectionFlags, copyDest->connection = GetNodeUserDatabaseConnection(connectionFlags,
workerNode->workerName, workerNode->workerName,
workerNode->workerPort, workerNode->workerPort,
currentUser, currentUser,
NULL /* database (current) */); NULL /* database (current) */);
ClaimConnectionExclusively(copyDest->connection); ClaimConnectionExclusively(copyDest->connection);
StartRemoteTransactionBegin(copyDest->connection); /* Begin the remote transaction */
RemoteTransactionBegin(copyDest->connection);
SetupReplicationOriginRemoteSession(copyDest->connection); SetupReplicationOriginRemoteSession(copyDest->connection);
/* Construct and send the TRUNCATE statement to the remote node */ /* Handle TRUNCATE or any setup commands */
StringInfo truncateStatement = ConstructShardTruncateStatement( StringInfo truncateStatement = ConstructShardTruncateStatement(
copyDest->destinationShardFullyQualifiedName); copyDest->destinationShardFullyQualifiedName);
if (!SendRemoteCommand(copyDest->connection, truncateStatement->data)) if (!SendRemoteCommand(copyDest->connection, truncateStatement->data))
{ {
ReportConnectionError(copyDest->connection, ERROR); ReportConnectionError(copyDest->connection, ERROR);
} RemoteTransactionAbort(copyDest->connection);
ResetRemoteTransaction(copyDest->connection);
}
PGresult *truncateResult = GetRemoteCommandResult(copyDest->connection, true); PGresult *truncateResult = GetRemoteCommandResult(copyDest->connection, true);
if (PQresultStatus(truncateResult) != PGRES_COMMAND_OK) if (!IsResponseOK(truncateResult))
{ {
ReportResultError(copyDest->connection, truncateResult, ERROR); ReportResultError(copyDest->connection, truncateResult, ERROR);
} PQclear(truncateResult);
PQclear(truncateResult); ForgetResults(copyDest->connection);
RemoteTransactionAbort(copyDest->connection);
ResetRemoteTransaction(copyDest->connection);
}
PQclear(truncateResult);
ForgetResults(copyDest->connection);
/* Construct the COPY command and send it to the remote node */ /* Construct and send the COPY statement with FREEZE */
StringInfo copyStatement = ConstructShardCopyStatement( StringInfo copyStatement = ConstructShardCopyStatement(
copyDest->destinationShardFullyQualifiedName, copyDest->destinationShardFullyQualifiedName,
copyDest->copyOutState->binary, copyDest->copyOutState->binary,
copyDest->tupleDescriptor); copyDest->tupleDescriptor);
if (!SendRemoteCommand(copyDest->connection, copyStatement->data)) if (!SendRemoteCommand(copyDest->connection, copyStatement->data))
{ {
ReportConnectionError(copyDest->connection, ERROR); ReportConnectionError(copyDest->connection, ERROR);
} RemoteTransactionAbort(copyDest->connection);
ResetRemoteTransaction(copyDest->connection);
}
PGresult *result = GetRemoteCommandResult(copyDest->connection, PGresult *copyResult = GetRemoteCommandResult(copyDest->connection,
true /* raiseInterrupts */); true /* raiseInterrupts */);
if (PQresultStatus(result) != PGRES_COPY_IN) if (PQresultStatus(copyResult ) != PGRES_COPY_IN)
{ {
ReportResultError(copyDest->connection, result, ERROR); ReportResultError(copyDest->connection, copyResult, ERROR);
} }
PQclear(result); PQclear(copyResult);
} }