pull/7687/merge
Mehmet YILMAZ 2025-06-24 12:52:16 -07:00 committed by GitHub
commit cf4205c3e2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 72 additions and 13 deletions

View File

@ -83,6 +83,8 @@ static int ReadFromLocalBufferCallback(void *outBuf, int minRead, int maxRead);
static void LocalCopyToShard(ShardCopyDestReceiver *copyDest, CopyOutState static void LocalCopyToShard(ShardCopyDestReceiver *copyDest, CopyOutState
localCopyOutState); localCopyOutState);
static void ConnectToRemoteAndStartCopy(ShardCopyDestReceiver *copyDest); static void ConnectToRemoteAndStartCopy(ShardCopyDestReceiver *copyDest);
static StringInfo ConstructShardTruncateStatement(
List *destinationShardFullyQualifiedName);
static bool static bool
@ -108,12 +110,35 @@ ConnectToRemoteAndStartCopy(ShardCopyDestReceiver *copyDest)
NULL /* database (current) */); NULL /* database (current) */);
ClaimConnectionExclusively(copyDest->connection); ClaimConnectionExclusively(copyDest->connection);
/* Begin the remote transaction */
RemoteTransactionBeginIfNecessary(copyDest->connection); RemoteTransactionBegin(copyDest->connection);
SetupReplicationOriginRemoteSession(copyDest->connection); SetupReplicationOriginRemoteSession(copyDest->connection);
/* Handle TRUNCATE or any setup commands */
StringInfo truncateStatement = ConstructShardTruncateStatement(
copyDest->destinationShardFullyQualifiedName);
if (!SendRemoteCommand(copyDest->connection, truncateStatement->data))
{
ReportConnectionError(copyDest->connection, ERROR);
RemoteTransactionAbort(copyDest->connection);
ResetRemoteTransaction(copyDest->connection);
}
PGresult *truncateResult = GetRemoteCommandResult(copyDest->connection, true);
if (!IsResponseOK(truncateResult))
{
ReportResultError(copyDest->connection, truncateResult, ERROR);
PQclear(truncateResult);
ForgetResults(copyDest->connection);
RemoteTransactionAbort(copyDest->connection);
ResetRemoteTransaction(copyDest->connection);
}
PQclear(truncateResult);
ForgetResults(copyDest->connection);
/* Construct and send the COPY statement with FREEZE */
StringInfo copyStatement = ConstructShardCopyStatement( StringInfo copyStatement = ConstructShardCopyStatement(
copyDest->destinationShardFullyQualifiedName, copyDest->destinationShardFullyQualifiedName,
copyDest->copyOutState->binary, copyDest->copyOutState->binary,
@ -122,16 +147,18 @@ ConnectToRemoteAndStartCopy(ShardCopyDestReceiver *copyDest)
if (!SendRemoteCommand(copyDest->connection, copyStatement->data)) if (!SendRemoteCommand(copyDest->connection, copyStatement->data))
{ {
ReportConnectionError(copyDest->connection, ERROR); ReportConnectionError(copyDest->connection, ERROR);
RemoteTransactionAbort(copyDest->connection);
ResetRemoteTransaction(copyDest->connection);
} }
PGresult *result = GetRemoteCommandResult(copyDest->connection, PGresult *copyResult = GetRemoteCommandResult(copyDest->connection,
true /* raiseInterrupts */); true /* raiseInterrupts */);
if (PQresultStatus(result) != PGRES_COPY_IN) if (PQresultStatus(copyResult) != PGRES_COPY_IN)
{ {
ReportResultError(copyDest->connection, result, ERROR); ReportResultError(copyDest->connection, copyResult, ERROR);
} }
PQclear(result); PQclear(copyResult);
} }
@ -329,7 +356,7 @@ ShardCopyDestReceiverShutdown(DestReceiver *dest)
/* check whether there were any COPY errors */ /* check whether there were any COPY errors */
PGresult *result = GetRemoteCommandResult(copyDest->connection, PGresult *result = GetRemoteCommandResult(copyDest->connection,
true /* raiseInterrupts */); true /* raiseInterrupts */);
if (PQresultStatus(result) != PGRES_COMMAND_OK) if (!IsResponseOK(result))
{ {
ReportCopyError(copyDest->connection, result); ReportCopyError(copyDest->connection, result);
} }
@ -339,6 +366,21 @@ ShardCopyDestReceiverShutdown(DestReceiver *dest)
ResetReplicationOriginRemoteSession(copyDest->connection); ResetReplicationOriginRemoteSession(copyDest->connection);
/* End the transaction by sending a COMMIT command */
if (!SendRemoteCommand(copyDest->connection, "COMMIT"))
{
HandleRemoteTransactionConnectionError(copyDest->connection, true);
}
PGresult *commitResult = GetRemoteCommandResult(copyDest->connection, true);
if (!IsResponseOK(result))
{
ereport(ERROR, (errcode(ERRCODE_TRANSACTION_RESOLUTION_UNKNOWN),
errmsg("Failed to commit transaction")));
}
PQclear(commitResult);
CloseConnection(copyDest->connection); CloseConnection(copyDest->connection);
} }
} }
@ -424,6 +466,23 @@ CopyableColumnNamesFromRelationName(const char *schemaName, const char *relation
} }
/*
* ConstructShardTruncateStatement constructs the text of a TRUNCATE statement
* for the destination shard.
*/
static StringInfo
ConstructShardTruncateStatement(List *destinationShardFullyQualifiedName)
{
StringInfo command = makeStringInfo();
appendStringInfo(command, "TRUNCATE %s.%s;",
quote_identifier(linitial(destinationShardFullyQualifiedName)),
quote_identifier(lsecond(destinationShardFullyQualifiedName)));
return command;
}
/* /*
* ConstructShardCopyStatement constructs the text of a COPY statement * ConstructShardCopyStatement constructs the text of a COPY statement
* for copying into a result table * for copying into a result table
@ -436,22 +495,22 @@ ConstructShardCopyStatement(List *destinationShardFullyQualifiedName, bool
char *destinationShardSchemaName = linitial(destinationShardFullyQualifiedName); char *destinationShardSchemaName = linitial(destinationShardFullyQualifiedName);
char *destinationShardRelationName = lsecond(destinationShardFullyQualifiedName); char *destinationShardRelationName = lsecond(destinationShardFullyQualifiedName);
StringInfo command = makeStringInfo(); StringInfo command = makeStringInfo();
const char *columnList = CopyableColumnNamesFromTupleDesc(tupleDesc); const char *columnList = CopyableColumnNamesFromTupleDesc(tupleDesc);
appendStringInfo(command, "COPY %s.%s (%s) FROM STDIN", appendStringInfo(command, "COPY %s.%s (%s) FROM STDIN",
quote_identifier(destinationShardSchemaName), quote_identifier( quote_identifier(destinationShardSchemaName),
destinationShardRelationName), columnList); quote_identifier(destinationShardRelationName),
columnList);
if (useBinaryFormat) if (useBinaryFormat)
{ {
appendStringInfo(command, " WITH (format binary);"); appendStringInfo(command, " WITH (format binary, FREEZE);");
} }
else else
{ {
appendStringInfo(command, ";"); appendStringInfo(command, " WITH (FREEZE);");
} }
return command; return command;