add freeze, start and end transaction

m3hm3t/add-freeze-copy
Mehmet YILMAZ 2024-09-09 15:16:39 +00:00
parent 4775715691
commit d32e7263ae
1 changed files with 129 additions and 64 deletions

View File

@ -83,6 +83,8 @@ static int ReadFromLocalBufferCallback(void *outBuf, int minRead, int maxRead);
static void LocalCopyToShard(ShardCopyDestReceiver *copyDest, CopyOutState static void LocalCopyToShard(ShardCopyDestReceiver *copyDest, CopyOutState
localCopyOutState); localCopyOutState);
static void ConnectToRemoteAndStartCopy(ShardCopyDestReceiver *copyDest); static void ConnectToRemoteAndStartCopy(ShardCopyDestReceiver *copyDest);
static List *CreateCopyOptions(bool isBinaryCopy);
static StringInfo ConstructShardTruncateStatement(List *destinationShardFullyQualifiedName);
static bool static bool
@ -108,12 +110,26 @@ ConnectToRemoteAndStartCopy(ShardCopyDestReceiver *copyDest)
NULL /* database (current) */); NULL /* database (current) */);
ClaimConnectionExclusively(copyDest->connection); ClaimConnectionExclusively(copyDest->connection);
StartRemoteTransactionBegin(copyDest->connection);
RemoteTransactionBeginIfNecessary(copyDest->connection);
SetupReplicationOriginRemoteSession(copyDest->connection); SetupReplicationOriginRemoteSession(copyDest->connection);
// Construct and send the TRUNCATE statement to the remote node
StringInfo truncateStatement = ConstructShardTruncateStatement(copyDest->destinationShardFullyQualifiedName);
if (!SendRemoteCommand(copyDest->connection, truncateStatement->data))
{
ReportConnectionError(copyDest->connection, ERROR);
}
PGresult *truncateResult = GetRemoteCommandResult(copyDest->connection, true);
if (PQresultStatus(truncateResult) != PGRES_COMMAND_OK)
{
ReportResultError(copyDest->connection, truncateResult, ERROR);
}
PQclear(truncateResult);
// Construct the COPY command and send it to the remote node
StringInfo copyStatement = ConstructShardCopyStatement( StringInfo copyStatement = ConstructShardCopyStatement(
copyDest->destinationShardFullyQualifiedName, copyDest->destinationShardFullyQualifiedName,
copyDest->copyOutState->binary, copyDest->copyOutState->binary,
@ -329,7 +345,7 @@ ShardCopyDestReceiverShutdown(DestReceiver *dest)
/* check whether there were any COPY errors */ /* check whether there were any COPY errors */
PGresult *result = GetRemoteCommandResult(copyDest->connection, PGresult *result = GetRemoteCommandResult(copyDest->connection,
true /* raiseInterrupts */); true /* raiseInterrupts */);
if (PQresultStatus(result) != PGRES_COMMAND_OK) if (!IsResponseOK(result))
{ {
ReportCopyError(copyDest->connection, result); ReportCopyError(copyDest->connection, result);
} }
@ -339,6 +355,21 @@ ShardCopyDestReceiverShutdown(DestReceiver *dest)
ResetReplicationOriginRemoteSession(copyDest->connection); ResetReplicationOriginRemoteSession(copyDest->connection);
/* End the transaction by sending a COMMIT command */
if (!SendRemoteCommand(copyDest->connection, "COMMIT"))
{
HandleRemoteTransactionConnectionError(copyDest->connection, true);
}
PGresult *commitResult = GetRemoteCommandResult(copyDest->connection, true);
if (!IsResponseOK(result))
{
ereport(ERROR, (errcode(ERRCODE_TRANSACTION_RESOLUTION_UNKNOWN),
errmsg("Failed to commit transaction")));
}
PQclear(commitResult);
CloseConnection(copyDest->connection); CloseConnection(copyDest->connection);
} }
} }
@ -424,6 +455,23 @@ CopyableColumnNamesFromRelationName(const char *schemaName, const char *relation
} }
/*
* ConstructShardTruncateStatement constructs the text of a TRUNCATE statement
* for the destination shard.
*/
static StringInfo
ConstructShardTruncateStatement(List *destinationShardFullyQualifiedName)
{
StringInfo command = makeStringInfo();
appendStringInfo(command, "TRUNCATE %s.%s;",
quote_identifier(linitial(destinationShardFullyQualifiedName)),
quote_identifier(lsecond(destinationShardFullyQualifiedName)));
return command;
}
/* /*
* ConstructShardCopyStatement constructs the text of a COPY statement * ConstructShardCopyStatement constructs the text of a COPY statement
* for copying into a result table * for copying into a result table
@ -436,22 +484,22 @@ ConstructShardCopyStatement(List *destinationShardFullyQualifiedName, bool
char *destinationShardSchemaName = linitial(destinationShardFullyQualifiedName); char *destinationShardSchemaName = linitial(destinationShardFullyQualifiedName);
char *destinationShardRelationName = lsecond(destinationShardFullyQualifiedName); char *destinationShardRelationName = lsecond(destinationShardFullyQualifiedName);
StringInfo command = makeStringInfo(); StringInfo command = makeStringInfo();
const char *columnList = CopyableColumnNamesFromTupleDesc(tupleDesc); const char *columnList = CopyableColumnNamesFromTupleDesc(tupleDesc);
appendStringInfo(command, "COPY %s.%s (%s) FROM STDIN", appendStringInfo(command, "COPY %s.%s (%s) FROM STDIN",
quote_identifier(destinationShardSchemaName), quote_identifier( quote_identifier(destinationShardSchemaName),
destinationShardRelationName), columnList); quote_identifier(destinationShardRelationName),
columnList);
if (useBinaryFormat) if (useBinaryFormat)
{ {
appendStringInfo(command, " WITH (format binary);"); appendStringInfo(command, " WITH (format binary, FREEZE);");
} }
else else
{ {
appendStringInfo(command, ";"); appendStringInfo(command, " WITH (FREEZE);");
} }
return command; return command;
@ -488,6 +536,29 @@ WriteLocalTuple(TupleTableSlot *slot, ShardCopyDestReceiver *copyDest)
} }
/*
* CreateCopyOptions creates the options list for the COPY command.
*/
static List *
CreateCopyOptions(bool isBinaryCopy)
{
List *options = NIL;
// Add the FREEZE option
DefElem *freezeOption = makeDefElem("freeze", (Node *) makeInteger(true), -1);
options = lappend(options, freezeOption);
// If binary format is used, add the binary format option
if (isBinaryCopy)
{
DefElem *binaryFormatOption = makeDefElem("format", (Node *) makeString("binary"), -1);
options = lappend(options, binaryFormatOption);
}
return options;
}
/* /*
* LocalCopyToShard performs local copy for the given destination shard. * LocalCopyToShard performs local copy for the given destination shard.
*/ */
@ -495,6 +566,7 @@ static void
LocalCopyToShard(ShardCopyDestReceiver *copyDest, CopyOutState localCopyOutState) LocalCopyToShard(ShardCopyDestReceiver *copyDest, CopyOutState localCopyOutState)
{ {
bool isBinaryCopy = localCopyOutState->binary; bool isBinaryCopy = localCopyOutState->binary;
if (isBinaryCopy) if (isBinaryCopy)
{ {
AppendCopyBinaryFooters(localCopyOutState); AppendCopyBinaryFooters(localCopyOutState);
@ -507,40 +579,33 @@ LocalCopyToShard(ShardCopyDestReceiver *copyDest, CopyOutState localCopyOutState
*/ */
LocalCopyBuffer = localCopyOutState->fe_msgbuf; LocalCopyBuffer = localCopyOutState->fe_msgbuf;
char *destinationShardSchemaName = linitial( // Extract schema and relation names
copyDest->destinationShardFullyQualifiedName); char *destinationShardSchemaName = linitial(copyDest->destinationShardFullyQualifiedName);
char *destinationShardRelationName = lsecond( char *destinationShardRelationName = lsecond(copyDest->destinationShardFullyQualifiedName);
copyDest->destinationShardFullyQualifiedName);
Oid destinationSchemaOid = get_namespace_oid(destinationShardSchemaName, // Get OIDs for schema and shard
false /* missing_ok */); Oid destinationSchemaOid = get_namespace_oid(destinationShardSchemaName, false);
Oid destinationShardOid = get_relname_relid(destinationShardRelationName, Oid destinationShardOid = get_relname_relid(destinationShardRelationName, destinationSchemaOid);
destinationSchemaOid);
DefElem *binaryFormatOption = NULL; // Create options list for COPY command
if (isBinaryCopy) List *options = CreateCopyOptions(isBinaryCopy);
{
binaryFormatOption = makeDefElem("format", (Node *) makeString("binary"), -1);
}
// Open the shard relation
Relation shard = table_open(destinationShardOid, RowExclusiveLock); Relation shard = table_open(destinationShardOid, RowExclusiveLock);
ParseState *pState = make_parsestate(NULL /* parentParseState */);
(void) addRangeTableEntryForRelation(pState, shard, AccessShareLock,
NULL /* alias */, false /* inh */,
false /* inFromCl */);
List *options = (isBinaryCopy) ? list_make1(binaryFormatOption) : NULL; // Create and configure parse state
CopyFromState cstate = BeginCopyFrom(pState, shard, ParseState *pState = make_parsestate(NULL);
NULL /* whereClause */, (void) addRangeTableEntryForRelation(pState, shard, AccessShareLock, NULL, false, false);
NULL /* fileName */,
false /* is_program */, // Begin and execute the COPY FROM operation
ReadFromLocalBufferCallback, CopyFromState cstate = BeginCopyFrom(pState, shard, NULL, NULL, false, ReadFromLocalBufferCallback, NULL, options);
NULL /* attlist (NULL is all columns) */,
options);
CopyFrom(cstate); CopyFrom(cstate);
EndCopyFrom(cstate); EndCopyFrom(cstate);
// Reset the local copy buffer
resetStringInfo(localCopyOutState->fe_msgbuf); resetStringInfo(localCopyOutState->fe_msgbuf);
// Close the shard relation and free parse state
table_close(shard, NoLock); table_close(shard, NoLock);
free_parsestate(pState); free_parsestate(pState);
} }