From d32e7263ae7de7aedecdee11b62430d3b1eabbc9 Mon Sep 17 00:00:00 2001 From: Mehmet YILMAZ Date: Mon, 9 Sep 2024 15:16:39 +0000 Subject: [PATCH] add freeze, start and end transaction --- .../operations/worker_shard_copy.c | 193 ++++++++++++------ 1 file changed, 129 insertions(+), 64 deletions(-) diff --git a/src/backend/distributed/operations/worker_shard_copy.c b/src/backend/distributed/operations/worker_shard_copy.c index f99c9b537..24f50e1d0 100644 --- a/src/backend/distributed/operations/worker_shard_copy.c +++ b/src/backend/distributed/operations/worker_shard_copy.c @@ -83,6 +83,8 @@ static int ReadFromLocalBufferCallback(void *outBuf, int minRead, int maxRead); static void LocalCopyToShard(ShardCopyDestReceiver *copyDest, CopyOutState localCopyOutState); static void ConnectToRemoteAndStartCopy(ShardCopyDestReceiver *copyDest); +static List *CreateCopyOptions(bool isBinaryCopy); +static StringInfo ConstructShardTruncateStatement(List *destinationShardFullyQualifiedName); static bool @@ -108,12 +110,26 @@ ConnectToRemoteAndStartCopy(ShardCopyDestReceiver *copyDest) NULL /* database (current) */); ClaimConnectionExclusively(copyDest->connection); - - RemoteTransactionBeginIfNecessary(copyDest->connection); + StartRemoteTransactionBegin(copyDest->connection); SetupReplicationOriginRemoteSession(copyDest->connection); + // Construct and send the TRUNCATE statement to the remote node + StringInfo truncateStatement = ConstructShardTruncateStatement(copyDest->destinationShardFullyQualifiedName); + if (!SendRemoteCommand(copyDest->connection, truncateStatement->data)) + { + ReportConnectionError(copyDest->connection, ERROR); + } + + PGresult *truncateResult = GetRemoteCommandResult(copyDest->connection, true); + if (PQresultStatus(truncateResult) != PGRES_COMMAND_OK) + { + ReportResultError(copyDest->connection, truncateResult, ERROR); + } + PQclear(truncateResult); + + // Construct the COPY command and send it to the remote node StringInfo copyStatement = ConstructShardCopyStatement( copyDest->destinationShardFullyQualifiedName, copyDest->copyOutState->binary, @@ -329,7 +345,7 @@ ShardCopyDestReceiverShutdown(DestReceiver *dest) /* check whether there were any COPY errors */ PGresult *result = GetRemoteCommandResult(copyDest->connection, true /* raiseInterrupts */); - if (PQresultStatus(result) != PGRES_COMMAND_OK) + if (!IsResponseOK(result)) { ReportCopyError(copyDest->connection, result); } @@ -339,6 +355,21 @@ ShardCopyDestReceiverShutdown(DestReceiver *dest) ResetReplicationOriginRemoteSession(copyDest->connection); + /* End the transaction by sending a COMMIT command */ + if (!SendRemoteCommand(copyDest->connection, "COMMIT")) + { + HandleRemoteTransactionConnectionError(copyDest->connection, true); + } + + PGresult *commitResult = GetRemoteCommandResult(copyDest->connection, true); + if (!IsResponseOK(result)) + { + ereport(ERROR, (errcode(ERRCODE_TRANSACTION_RESOLUTION_UNKNOWN), + errmsg("Failed to commit transaction"))); + } + + PQclear(commitResult); + CloseConnection(copyDest->connection); } } @@ -424,37 +455,54 @@ CopyableColumnNamesFromRelationName(const char *schemaName, const char *relation } +/* + * ConstructShardTruncateStatement constructs the text of a TRUNCATE statement + * for the destination shard. + */ +static StringInfo +ConstructShardTruncateStatement(List *destinationShardFullyQualifiedName) +{ + StringInfo command = makeStringInfo(); + + appendStringInfo(command, "TRUNCATE %s.%s;", + quote_identifier(linitial(destinationShardFullyQualifiedName)), + quote_identifier(lsecond(destinationShardFullyQualifiedName))); + + return command; +} + + /* * ConstructShardCopyStatement constructs the text of a COPY statement * for copying into a result table */ static StringInfo ConstructShardCopyStatement(List *destinationShardFullyQualifiedName, bool - useBinaryFormat, - TupleDesc tupleDesc) + useBinaryFormat, + TupleDesc tupleDesc) { - char *destinationShardSchemaName = linitial(destinationShardFullyQualifiedName); - char *destinationShardRelationName = lsecond(destinationShardFullyQualifiedName); + char *destinationShardSchemaName = linitial(destinationShardFullyQualifiedName); + char *destinationShardRelationName = lsecond(destinationShardFullyQualifiedName); + StringInfo command = makeStringInfo(); - StringInfo command = makeStringInfo(); + const char *columnList = CopyableColumnNamesFromTupleDesc(tupleDesc); - const char *columnList = CopyableColumnNamesFromTupleDesc(tupleDesc); + appendStringInfo(command, "COPY %s.%s (%s) FROM STDIN", + quote_identifier(destinationShardSchemaName), + quote_identifier(destinationShardRelationName), + columnList); - appendStringInfo(command, "COPY %s.%s (%s) FROM STDIN", - quote_identifier(destinationShardSchemaName), quote_identifier( - destinationShardRelationName), columnList); + if (useBinaryFormat) + { + appendStringInfo(command, " WITH (format binary, FREEZE);"); + } + else + { + appendStringInfo(command, " WITH (FREEZE);"); + } - if (useBinaryFormat) - { - appendStringInfo(command, " WITH (format binary);"); - } - else - { - appendStringInfo(command, ";"); - } - - return command; + return command; } @@ -488,61 +536,78 @@ WriteLocalTuple(TupleTableSlot *slot, ShardCopyDestReceiver *copyDest) } +/* + * CreateCopyOptions creates the options list for the COPY command. + */ +static List * +CreateCopyOptions(bool isBinaryCopy) +{ + List *options = NIL; + + // Add the FREEZE option + DefElem *freezeOption = makeDefElem("freeze", (Node *) makeInteger(true), -1); + options = lappend(options, freezeOption); + + // If binary format is used, add the binary format option + if (isBinaryCopy) + { + DefElem *binaryFormatOption = makeDefElem("format", (Node *) makeString("binary"), -1); + options = lappend(options, binaryFormatOption); + } + + return options; +} + + /* * LocalCopyToShard performs local copy for the given destination shard. */ static void LocalCopyToShard(ShardCopyDestReceiver *copyDest, CopyOutState localCopyOutState) { - bool isBinaryCopy = localCopyOutState->binary; - if (isBinaryCopy) - { - AppendCopyBinaryFooters(localCopyOutState); - } + bool isBinaryCopy = localCopyOutState->binary; - /* - * Set the buffer as a global variable to allow ReadFromLocalBufferCallback - * to read from it. We cannot pass additional arguments to - * ReadFromLocalBufferCallback. - */ - LocalCopyBuffer = localCopyOutState->fe_msgbuf; + if (isBinaryCopy) + { + AppendCopyBinaryFooters(localCopyOutState); + } - char *destinationShardSchemaName = linitial( - copyDest->destinationShardFullyQualifiedName); - char *destinationShardRelationName = lsecond( - copyDest->destinationShardFullyQualifiedName); + /* + * Set the buffer as a global variable to allow ReadFromLocalBufferCallback + * to read from it. We cannot pass additional arguments to + * ReadFromLocalBufferCallback. + */ + LocalCopyBuffer = localCopyOutState->fe_msgbuf; - Oid destinationSchemaOid = get_namespace_oid(destinationShardSchemaName, - false /* missing_ok */); - Oid destinationShardOid = get_relname_relid(destinationShardRelationName, - destinationSchemaOid); + // Extract schema and relation names + char *destinationShardSchemaName = linitial(copyDest->destinationShardFullyQualifiedName); + char *destinationShardRelationName = lsecond(copyDest->destinationShardFullyQualifiedName); - DefElem *binaryFormatOption = NULL; - if (isBinaryCopy) - { - binaryFormatOption = makeDefElem("format", (Node *) makeString("binary"), -1); - } + // Get OIDs for schema and shard + Oid destinationSchemaOid = get_namespace_oid(destinationShardSchemaName, false); + Oid destinationShardOid = get_relname_relid(destinationShardRelationName, destinationSchemaOid); - Relation shard = table_open(destinationShardOid, RowExclusiveLock); - ParseState *pState = make_parsestate(NULL /* parentParseState */); - (void) addRangeTableEntryForRelation(pState, shard, AccessShareLock, - NULL /* alias */, false /* inh */, - false /* inFromCl */); + // Create options list for COPY command + List *options = CreateCopyOptions(isBinaryCopy); - List *options = (isBinaryCopy) ? list_make1(binaryFormatOption) : NULL; - CopyFromState cstate = BeginCopyFrom(pState, shard, - NULL /* whereClause */, - NULL /* fileName */, - false /* is_program */, - ReadFromLocalBufferCallback, - NULL /* attlist (NULL is all columns) */, - options); - CopyFrom(cstate); - EndCopyFrom(cstate); - resetStringInfo(localCopyOutState->fe_msgbuf); + // Open the shard relation + Relation shard = table_open(destinationShardOid, RowExclusiveLock); - table_close(shard, NoLock); - free_parsestate(pState); + // Create and configure parse state + ParseState *pState = make_parsestate(NULL); + (void) addRangeTableEntryForRelation(pState, shard, AccessShareLock, NULL, false, false); + + // Begin and execute the COPY FROM operation + CopyFromState cstate = BeginCopyFrom(pState, shard, NULL, NULL, false, ReadFromLocalBufferCallback, NULL, options); + CopyFrom(cstate); + EndCopyFrom(cstate); + + // Reset the local copy buffer + resetStringInfo(localCopyOutState->fe_msgbuf); + + // Close the shard relation and free parse state + table_close(shard, NoLock); + free_parsestate(pState); }