diff --git a/src/backend/distributed/operations/worker_shard_copy.c b/src/backend/distributed/operations/worker_shard_copy.c index 4b0d7aab7..40114d7cb 100644 --- a/src/backend/distributed/operations/worker_shard_copy.c +++ b/src/backend/distributed/operations/worker_shard_copy.c @@ -103,11 +103,14 @@ CanUseLocalCopy(uint32_t destinationNodeId) return GetLocalNodeId() == (int32) destinationNodeId; } + /* * SetupReplicationOriginSessionIfNotSetupAlready sets up the replication origin session * if it is not already setup. */ - static void SetupReplicationOriginSessionIfNotSetupAlready(MultiConnection *connection) { +static void +SetupReplicationOriginSessionIfNotSetupAlready(MultiConnection *connection) +{ /* Setup replication Origin if not setup already */ if (!SendRemoteCommand(connection, CDC_REPLICATION_ORIGIN_SESION_SETUP_CMD)) { @@ -116,11 +119,14 @@ CanUseLocalCopy(uint32_t destinationNodeId) ForgetResults(connection); } + /* * ResetReplicationOriginSessionIfSetupAlready resets the replication origin session * if it has been setup currently. */ -static void ResetReplicationOriginSessionIfSetupAlready(MultiConnection *connection) { +static void +ResetReplicationOriginSessionIfSetupAlready(MultiConnection *connection) +{ if (!SendRemoteCommand(connection, CDC_REPLICATION_ORIGIN_SESION_RESET_CMD)) { ReportConnectionError(connection, ERROR); @@ -128,6 +134,7 @@ static void ResetReplicationOriginSessionIfSetupAlready(MultiConnection *connect ForgetResults(connection); } + /* Connect to node with source shard and trigger copy start. */ static void ConnectToRemoteAndStartCopy(ShardCopyDestReceiver *copyDest) @@ -143,8 +150,8 @@ ConnectToRemoteAndStartCopy(ShardCopyDestReceiver *copyDest) NULL /* database (current) */); ClaimConnectionExclusively(copyDest->connection); - /* Setup Replication Origin Session if not setup already for - avoiding publication of events more than once. */ + /* Setup Replication Origin Session if not setup already for + * avoiding publication of events more than once. */ SetupReplicationOriginSessionIfNotSetupAlready(copyDest->connection); StringInfo copyStatement = ConstructShardCopyStatement( diff --git a/src/backend/distributed/shardsplit/shardsplit_decoder.c b/src/backend/distributed/shardsplit/shardsplit_decoder.c index ecfb84aa9..6fccb3ec7 100644 --- a/src/backend/distributed/shardsplit/shardsplit_decoder.c +++ b/src/backend/distributed/shardsplit/shardsplit_decoder.c @@ -40,10 +40,11 @@ static Oid FindTargetRelationOid(Relation sourceShardRelation, static HeapTuple GetTupleForTargetSchema(HeapTuple sourceRelationTuple, TupleDesc sourceTupleDesc, TupleDesc targetTupleDesc); -static bool replication_origin_filter_cb(LogicalDecodingContext *ctx, RepOriginId origin_id); +static bool replication_origin_filter_cb(LogicalDecodingContext *ctx, RepOriginId + origin_id); static bool PublishChangesIfCdcSlot(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, - Relation relation, ReorderBufferChange *change); + Relation relation, ReorderBufferChange *change); /* used in the replication_origin_filter_cb function. */ #define InvalidRepOriginId 0 @@ -84,7 +85,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb) * identified by the "origin_id" of the changes. The origin_id is set to * a non-zero value in the origin node as part of WAL replication. */ - static bool +static bool replication_origin_filter_cb(LogicalDecodingContext *ctx, RepOriginId origin_id) { if (origin_id != InvalidRepOriginId) @@ -94,25 +95,29 @@ replication_origin_filter_cb(LogicalDecodingContext *ctx, RepOriginId origin_id) return false; } -/* + +/* * PublishChangesIfCdcSlot checks if the current slot is a CDC slot. If so, it publishes - * the changes as the change for the distributed table instead of shard. + * the changes as the change for the distributed table instead of shard. * If not, it returns false. It also skips the Citus metadata tables. */ static bool PublishChangesIfCdcSlot(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, - Relation relation, ReorderBufferChange *change) + Relation relation, ReorderBufferChange *change) { char *replicationSlotName = ctx->slot->data.name.data; + /* Check if the replication slot is CITUS_CDC_SLOT*/ - if (replicationSlotName != NULL && strcmp(replicationSlotName, CITUS_CDC_SLOT_NAME) == 0) + if (replicationSlotName != NULL && strcmp(replicationSlotName, CITUS_CDC_SLOT_NAME) == + 0) { /* Skip publishing changes for Citus metadata tables*/ - ObjectAddress objectAdress = {RelationRelationId, relation->rd_id, 0}; - if (IsObjectAddressOwnedByCitus(&objectAdress)) + ObjectAddress objectAdress = { RelationRelationId, relation->rd_id, 0 }; + if (IsObjectAddressOwnedByCitus(&objectAdress)) { return true; - } + } + /* Check if this change is for a shard in distributed table. */ if (RelationIsAKnownShard(relation->rd_id)) { @@ -130,7 +135,7 @@ PublishChangesIfCdcSlot(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, } pgoutputChangeCB(ctx, txn, relation, change); return true; - } + } return false; }