From af68b32ccb3638295510569c720346f3c1c29f60 Mon Sep 17 00:00:00 2001 From: Rajesh Kumar Thandapani Date: Thu, 8 Dec 2022 11:09:20 +0530 Subject: [PATCH] Added replication origin session for local shard split and move operations. --- .../operations/worker_shard_copy.c | 140 +++++++++++++----- 1 file changed, 105 insertions(+), 35 deletions(-) diff --git a/src/backend/distributed/operations/worker_shard_copy.c b/src/backend/distributed/operations/worker_shard_copy.c index 2178c6186..17272f048 100644 --- a/src/backend/distributed/operations/worker_shard_copy.c +++ b/src/backend/distributed/operations/worker_shard_copy.c @@ -33,11 +33,11 @@ */ static StringInfo LocalCopyBuffer; #define CDC_REPLICATION_ORIGIN_CREATE_IF_NOT_EXISTS_CMD \ - "SELECT pg_catalog.pg_replication_origin_create('citus_cdc_%d') \ - where (select pg_catalog.pg_replication_origin_oid('citus_cdc_%d')) IS NULL;" + "SELECT pg_catalog.pg_replication_origin_create('citus_internal_%d') \ + where (select pg_catalog.pg_replication_origin_oid('citus_internal_%d')) IS NULL;" #define CDC_REPLICATION_ORIGIN_SESION_SETUP_CMD \ - "SELECT pg_catalog.pg_replication_origin_session_setup('citus_cdc_%d') \ + "SELECT pg_catalog.pg_replication_origin_session_setup('citus_internal_%d') \ where pg_catalog.pg_replication_origin_session_is_setup()='f';" #define CDC_REPLICATION_ORIGIN_SESION_RESET_CMD \ @@ -68,6 +68,9 @@ typedef struct ShardCopyDestReceiver /* local copy if destination shard in same node */ bool useLocalCopy; + /* Replication Origin Id for local copy*/ + RepOriginId originId; + /* EState for per-tuple memory allocation */ EState *executorState; @@ -92,7 +95,9 @@ static void LocalCopyToShard(ShardCopyDestReceiver *copyDest, CopyOutState localCopyOutState); static void ConnectToRemoteAndStartCopy(ShardCopyDestReceiver *copyDest); -static void CreateReplicationOriginIfNotExists(ShardCopyDestReceiver *dest); +static void ReplicationOriginSessionCreate(ShardCopyDestReceiver *dest); +static void ReplicationOriginSessionSetup(ShardCopyDestReceiver *dest); +static void ReplicationOriginSessionReset(ShardCopyDestReceiver *dest); static bool CanUseLocalCopy(uint32_t destinationNodeId) @@ -119,13 +124,7 @@ ConnectToRemoteAndStartCopy(ShardCopyDestReceiver *copyDest) NULL /* database (current) */); ClaimConnectionExclusively(copyDest->connection); - /* Setup Replication Origin Session if not setup already for - * avoiding publication of events more than once. */ - char replicationOrginSetupCommand[REPLICATION_ORIGIN_CMD_BUFFER_SIZE]; - int originId = GetLocalNodeId(); - snprintf(replicationOrginSetupCommand, REPLICATION_ORIGIN_CMD_BUFFER_SIZE, - CDC_REPLICATION_ORIGIN_SESION_SETUP_CMD, originId); - ExecuteCriticalRemoteCommand(copyDest->connection, replicationOrginSetupCommand); + ReplicationOriginSessionSetup(copyDest); StringInfo copyStatement = ConstructShardCopyStatement( copyDest->destinationShardFullyQualifiedName, @@ -209,6 +208,9 @@ ShardCopyDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest) CopyOutState copyOutState = copyDest->copyOutState; if (copyDest->useLocalCopy) { + /* Setup replication origin session for local copy*/ + ReplicationOriginSessionSetup(copyDest); + WriteLocalTuple(slot, copyDest); if (copyOutState->fe_msgbuf->len > LocalCopyFlushThresholdByte) { @@ -284,32 +286,102 @@ ShardCopyDestReceiverStartup(DestReceiver *dest, int operation, TupleDesc copyDest->columnOutputFunctions = ColumnOutputFunctions(inputTupleDescriptor, copyOutState->binary); copyDest->copyOutState = copyOutState; - CreateReplicationOriginIfNotExists(copyDest); + ReplicationOriginSessionCreate(copyDest); } -/* CreateReplicationOriginIfNotExists creates a replication origin if it does - * not already exist already. To make the origin id unique for different nodes, - * origin node's id is appended to the prefix citus_cdc_.*/ +/* ReplicationOriginSessionCreate creates a new replication origin if it does + * not already exist already. To make the replication origin name unique + * for different nodes, origin node's id is appended to the prefix citus_internal_.*/ static void -CreateReplicationOriginIfNotExists(ShardCopyDestReceiver *dest) +ReplicationOriginSessionCreate(ShardCopyDestReceiver *dest) { - int connectionFlags = OUTSIDE_TRANSACTION; - char *currentUser = CurrentUserName(); - WorkerNode *workerNode = FindNodeWithNodeId(dest->destinationNodeId, - false /* missingOk */); - MultiConnection *connection = GetNodeUserDatabaseConnection(connectionFlags, - workerNode->workerName, - workerNode->workerPort, - currentUser, - NULL /* database (current) */); - char replicationOrginCreateCommand[REPLICATION_ORIGIN_CMD_BUFFER_SIZE]; - int originId = GetLocalNodeId(); - snprintf(replicationOrginCreateCommand, REPLICATION_ORIGIN_CMD_BUFFER_SIZE, - CDC_REPLICATION_ORIGIN_CREATE_IF_NOT_EXISTS_CMD, originId, originId); + int localid = GetLocalNodeId(); + if (dest->useLocalCopy) + { + char originName[64]; + snprintf(originName, sizeof(originName), "citus_internal_%d", localid); + RepOriginId originId = replorigin_by_name(originName, true); + if (originId == InvalidRepOriginId) + { + originId = replorigin_create(originName); + } + dest->originId = originId; + } + else + { + int connectionFlags = OUTSIDE_TRANSACTION; + char *currentUser = CurrentUserName(); + WorkerNode *workerNode = FindNodeWithNodeId(dest->destinationNodeId, + false /* missingOk */); + MultiConnection *connection = GetNodeUserDatabaseConnection(connectionFlags, + workerNode->workerName, + workerNode->workerPort, + currentUser, + NULL /* database (current) */); - ExecuteCriticalRemoteCommand(connection, replicationOrginCreateCommand); - CloseConnection(connection); + char replicationOrginCreateCommand[REPLICATION_ORIGIN_CMD_BUFFER_SIZE]; + snprintf(replicationOrginCreateCommand, REPLICATION_ORIGIN_CMD_BUFFER_SIZE, + CDC_REPLICATION_ORIGIN_CREATE_IF_NOT_EXISTS_CMD, localid, localid); + + ExecuteCriticalRemoteCommand(connection, replicationOrginCreateCommand); + CloseConnection(connection); + dest->originId = InvalidRepOriginId; + } +} + + +/* ReplicationOriginSessionSetup sets up a new replication origin session in a + * local or remote session depending on the useLocalCopy flag. If useLocalCopy + * is set, a local replication origin session is setup, otherwise a remote + * replication origin session is setup to the destination node. + */ +void +ReplicationOriginSessionSetup(ShardCopyDestReceiver *dest) +{ + if (dest->useLocalCopy) + { + /*Setup Replication Origin in local session */ + if (replorigin_session_origin == InvalidRepOriginId) + { + replorigin_session_setup(dest->originId); + replorigin_session_origin = dest->originId; + } + } + else + { + /*Setup Replication Origin in remote session */ + char replicationOrginSetupCommand[REPLICATION_ORIGIN_CMD_BUFFER_SIZE]; + int localId = GetLocalNodeId(); + snprintf(replicationOrginSetupCommand, REPLICATION_ORIGIN_CMD_BUFFER_SIZE, + CDC_REPLICATION_ORIGIN_SESION_SETUP_CMD, localId); + ExecuteCriticalRemoteCommand(dest->connection, replicationOrginSetupCommand); + } +} + + +/* ReplicationOriginSessionReset resets the replication origin session in a + * local or remote session depending on the useLocalCopy flag. + */ +void +ReplicationOriginSessionReset(ShardCopyDestReceiver *dest) +{ + if (dest->useLocalCopy) + { + /*Reset Replication Origin in local session */ + if (replorigin_session_origin != InvalidRepOriginId) + { + replorigin_session_reset(); + replorigin_session_origin = InvalidRepOriginId; + } + dest->originId = InvalidRepOriginId; + } + else + { + /*Reset Replication Origin in remote session */ + ExecuteCriticalRemoteCommand(dest->connection, + CDC_REPLICATION_ORIGIN_SESION_RESET_CMD); + } } @@ -331,6 +403,7 @@ ShardCopyDestReceiverShutdown(DestReceiver *dest) /* end the COPY input */ LocalCopyToShard(copyDest, copyDest->copyOutState); } + ReplicationOriginSessionReset(copyDest); } else if (copyDest->connection != NULL) { @@ -368,10 +441,7 @@ ShardCopyDestReceiverShutdown(DestReceiver *dest) PQclear(result); ForgetResults(copyDest->connection); - - /*Reset Replication Origin. */ - ExecuteCriticalRemoteCommand(copyDest->connection, - CDC_REPLICATION_ORIGIN_SESION_RESET_CMD); + ReplicationOriginSessionReset(copyDest); CloseConnection(copyDest->connection); }