diff --git a/src/backend/distributed/operations/worker_shard_copy.c b/src/backend/distributed/operations/worker_shard_copy.c index 40114d7cb..ce00a2e73 100644 --- a/src/backend/distributed/operations/worker_shard_copy.c +++ b/src/backend/distributed/operations/worker_shard_copy.c @@ -33,11 +33,11 @@ */ static StringInfo LocalCopyBuffer; #define CDC_REPLICATION_ORIGIN_CREATE_IF_NOT_EXISTS_CMD \ - "SELECT pg_catalog.pg_replication_origin_create('citus_cdc') \ - where (select pg_catalog.pg_replication_origin_oid('citus_cdc')) IS NULL;" + "SELECT pg_catalog.pg_replication_origin_create('citus_cdc_%d') \ + where (select pg_catalog.pg_replication_origin_oid('citus_cdc_%d')) IS NULL;" #define CDC_REPLICATION_ORIGIN_SESION_SETUP_CMD \ - "SELECT pg_catalog.pg_replication_origin_session_setup('citus_cdc') \ + "SELECT pg_catalog.pg_replication_origin_session_setup('citus_cdc_%d') \ where pg_catalog.pg_replication_origin_session_is_setup()='f';" #define CDC_REPLICATION_ORIGIN_SESION_RESET_CMD \ @@ -93,8 +93,6 @@ static void LocalCopyToShard(ShardCopyDestReceiver *copyDest, CopyOutState static void ConnectToRemoteAndStartCopy(ShardCopyDestReceiver *copyDest); static void CreateReplicationOriginIfNotExists(ShardCopyDestReceiver *dest); -static void SetupReplicationOriginSessionIfNotSetupAlready(MultiConnection *connection); -static void ResetReplicationOriginSessionIfSetupAlready(MultiConnection *connection); static bool CanUseLocalCopy(uint32_t destinationNodeId) @@ -104,36 +102,7 @@ CanUseLocalCopy(uint32_t destinationNodeId) } -/* - * SetupReplicationOriginSessionIfNotSetupAlready sets up the replication origin session - * if it is not already setup. - */ -static void -SetupReplicationOriginSessionIfNotSetupAlready(MultiConnection *connection) -{ - /* Setup replication Origin if not setup already */ - if (!SendRemoteCommand(connection, CDC_REPLICATION_ORIGIN_SESION_SETUP_CMD)) - { - ReportConnectionError(connection, ERROR); - } - ForgetResults(connection); -} - - -/* - * ResetReplicationOriginSessionIfSetupAlready resets the replication origin session - * if it has been setup currently. - */ -static void -ResetReplicationOriginSessionIfSetupAlready(MultiConnection *connection) -{ - if (!SendRemoteCommand(connection, CDC_REPLICATION_ORIGIN_SESION_RESET_CMD)) - { - ReportConnectionError(connection, ERROR); - } - ForgetResults(connection); -} - +#define REPLICATION_ORIGIN_CMD_BUFFER_SIZE 1024 /* Connect to node with source shard and trigger copy start. */ static void @@ -152,7 +121,11 @@ ConnectToRemoteAndStartCopy(ShardCopyDestReceiver *copyDest) /* Setup Replication Origin Session if not setup already for * avoiding publication of events more than once. */ - SetupReplicationOriginSessionIfNotSetupAlready(copyDest->connection); + char replicationOrginSetupCommand[REPLICATION_ORIGIN_CMD_BUFFER_SIZE]; + int originId = GetLocalNodeId(); + snprintf(replicationOrginSetupCommand, REPLICATION_ORIGIN_CMD_BUFFER_SIZE, + CDC_REPLICATION_ORIGIN_SESION_SETUP_CMD, originId); + ExecuteCriticalRemoteCommand(copyDest->connection, replicationOrginSetupCommand); StringInfo copyStatement = ConstructShardCopyStatement( copyDest->destinationShardFullyQualifiedName, @@ -327,9 +300,12 @@ CreateReplicationOriginIfNotExists(ShardCopyDestReceiver *dest) workerNode->workerPort, currentUser, NULL /* database (current) */); - ClaimConnectionExclusively(connection); - ExecuteCriticalRemoteCommand(connection, - CDC_REPLICATION_ORIGIN_CREATE_IF_NOT_EXISTS_CMD); + char replicationOrginCreateCommand[REPLICATION_ORIGIN_CMD_BUFFER_SIZE]; + int originId = GetLocalNodeId(); + snprintf(replicationOrginCreateCommand, REPLICATION_ORIGIN_CMD_BUFFER_SIZE, + CDC_REPLICATION_ORIGIN_CREATE_IF_NOT_EXISTS_CMD, originId, originId); + + ExecuteCriticalRemoteCommand(connection, replicationOrginCreateCommand); CloseConnection(connection); } diff --git a/src/backend/distributed/shardsplit/shardsplit_decoder.c b/src/backend/distributed/shardsplit/shardsplit_decoder.c index 6fccb3ec7..b8187fd45 100644 --- a/src/backend/distributed/shardsplit/shardsplit_decoder.c +++ b/src/backend/distributed/shardsplit/shardsplit_decoder.c @@ -17,6 +17,7 @@ #include "replication/logical.h" #include "utils/typcache.h" #include "utils/lsyscache.h" +#include "catalog/pg_namespace.h" extern void _PG_output_plugin_init(OutputPluginCallbacks *cb); static LogicalDecodeChangeCB pgoutputChangeCB; @@ -48,7 +49,8 @@ static bool PublishChangesIfCdcSlot(LogicalDecodingContext *ctx, ReorderBufferTX /* used in the replication_origin_filter_cb function. */ #define InvalidRepOriginId 0 -#define CITUS_CDC_SLOT_NAME "citus_cdc_slot" + +#define CITUS_SHARD_PREFIX_SLOT "citus_shard_" /* * Postgres uses 'pgoutput' as default plugin for logical replication. @@ -108,31 +110,28 @@ PublishChangesIfCdcSlot(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, char *replicationSlotName = ctx->slot->data.name.data; /* Check if the replication slot is CITUS_CDC_SLOT*/ - if (replicationSlotName != NULL && strcmp(replicationSlotName, CITUS_CDC_SLOT_NAME) == - 0) + if (replicationSlotName != NULL && + strncmp(replicationSlotName, CITUS_SHARD_PREFIX_SLOT, strlen( + CITUS_SHARD_PREFIX_SLOT)) != 0) { - /* Skip publishing changes for Citus metadata tables*/ - ObjectAddress objectAdress = { RelationRelationId, relation->rd_id, 0 }; - if (IsObjectAddressOwnedByCitus(&objectAdress)) + /* Skip publishing changes for system relations in pg_catalog*/ + if (relation->rd_rel->relnamespace == PG_CATALOG_NAMESPACE) { return true; } - /* Check if this change is for a shard in distributed table. */ - if (RelationIsAKnownShard(relation->rd_id)) + char *shardRelationName = RelationGetRelationName(relation); + uint64 shardId = ExtractShardIdFromTableName(shardRelationName, true); + if (shardId != INVALID_SHARD_ID && ShardExists(shardId)) { - char *shardRelationName = RelationGetRelationName(relation); - uint64 shardId = ExtractShardIdFromTableName(shardRelationName, true); - if (shardId != INVALID_SHARD_ID) + /* try to get the distributed relation id for the shard */ + Oid distributedRelationId = RelationIdForShard(shardId); + if (OidIsValid(distributedRelationId)) { - /* try to get the distributed relation id for the shard */ - Oid distributedRelationId = RelationIdForShard(shardId); - if (OidIsValid(distributedRelationId)) - { - relation = RelationIdGetRelation(distributedRelationId); - } + relation = RelationIdGetRelation(distributedRelationId); } } + pgoutputChangeCB(ctx, txn, relation, change); return true; } @@ -148,6 +147,12 @@ static void split_change_cb(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change) { + /*check if Citus extension is loaded. */ + if (!CitusHasBeenLoaded()) + { + return; + } + if (!is_publishable_relation(relation)) { return;