From 827393ab41fa0fced4030c15e2b4292129edded4 Mon Sep 17 00:00:00 2001 From: Rajesh Kumar Date: Tue, 15 Nov 2022 22:54:01 +0530 Subject: [PATCH] Using replication origin remote commands for shard moves/splits. --- .../operations/worker_shard_copy.c | 94 +++++++++++++----- .../shardsplit/shardsplit_decoder.c | 99 ++++++++++--------- 2 files changed, 121 insertions(+), 72 deletions(-) diff --git a/src/backend/distributed/operations/worker_shard_copy.c b/src/backend/distributed/operations/worker_shard_copy.c index 4be1b394f..4b0d7aab7 100644 --- a/src/backend/distributed/operations/worker_shard_copy.c +++ b/src/backend/distributed/operations/worker_shard_copy.c @@ -32,6 +32,17 @@ * argument to the copy callback. */ static StringInfo LocalCopyBuffer; +#define CDC_REPLICATION_ORIGIN_CREATE_IF_NOT_EXISTS_CMD \ + "SELECT pg_catalog.pg_replication_origin_create('citus_cdc') \ + where (select pg_catalog.pg_replication_origin_oid('citus_cdc')) IS NULL;" + +#define CDC_REPLICATION_ORIGIN_SESION_SETUP_CMD \ + "SELECT pg_catalog.pg_replication_origin_session_setup('citus_cdc') \ + where pg_catalog.pg_replication_origin_session_is_setup()='f';" + +#define CDC_REPLICATION_ORIGIN_SESION_RESET_CMD \ + "SELECT pg_catalog.pg_replication_origin_session_reset() \ + where pg_catalog.pg_replication_origin_session_is_setup()='t';" typedef struct ShardCopyDestReceiver { @@ -80,7 +91,10 @@ static int ReadFromLocalBufferCallback(void *outBuf, int minRead, int maxRead); static void LocalCopyToShard(ShardCopyDestReceiver *copyDest, CopyOutState localCopyOutState); static void ConnectToRemoteAndStartCopy(ShardCopyDestReceiver *copyDest); -static void SetupReplicationOrigin(RepOriginId nodeId); + +static void CreateReplicationOriginIfNotExists(ShardCopyDestReceiver *dest); +static void SetupReplicationOriginSessionIfNotSetupAlready(MultiConnection *connection); +static void ResetReplicationOriginSessionIfSetupAlready(MultiConnection *connection); static bool CanUseLocalCopy(uint32_t destinationNodeId) @@ -89,6 +103,30 @@ CanUseLocalCopy(uint32_t destinationNodeId) return GetLocalNodeId() == (int32) destinationNodeId; } +/* + * SetupReplicationOriginSessionIfNotSetupAlready sets up the replication origin session + * if it is not already setup. + */ + static void SetupReplicationOriginSessionIfNotSetupAlready(MultiConnection *connection) { + /* Setup replication Origin if not setup already */ + if (!SendRemoteCommand(connection, CDC_REPLICATION_ORIGIN_SESION_SETUP_CMD)) + { + ReportConnectionError(connection, ERROR); + } + ForgetResults(connection); +} + +/* + * ResetReplicationOriginSessionIfSetupAlready resets the replication origin session + * if it has been setup currently. + */ +static void ResetReplicationOriginSessionIfSetupAlready(MultiConnection *connection) { + if (!SendRemoteCommand(connection, CDC_REPLICATION_ORIGIN_SESION_RESET_CMD)) + { + ReportConnectionError(connection, ERROR); + } + ForgetResults(connection); +} /* Connect to node with source shard and trigger copy start. */ static void @@ -105,10 +143,13 @@ ConnectToRemoteAndStartCopy(ShardCopyDestReceiver *copyDest) NULL /* database (current) */); ClaimConnectionExclusively(copyDest->connection); + /* Setup Replication Origin Session if not setup already for + avoiding publication of events more than once. */ + SetupReplicationOriginSessionIfNotSetupAlready(copyDest->connection); + StringInfo copyStatement = ConstructShardCopyStatement( copyDest->destinationShardFullyQualifiedName, copyDest->copyOutState->binary); - if (!SendRemoteCommand(copyDest->connection, copyStatement->data)) { ReportConnectionError(copyDest->connection, ERROR); @@ -154,7 +195,9 @@ CreateShardCopyDestReceiver(EState *executorState, return (DestReceiver *) copyDest; } + #define InvalidRepOriginId 0 + /* * ShardCopyDestReceiverReceive implements the receiveSlot function of * ShardCopyDestReceiver. It takes a TupleTableSlot and sends the contents to @@ -234,27 +277,6 @@ ShardCopyDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest) return true; } -static void -SetupReplicationOrigin(RepOriginId nodeId) { - RepOriginId originid = InvalidRepOriginId; - XLogRecPtr origin_startpos = InvalidXLogRecPtr; - //Check if there is a replication origin session already active. - if (replorigin_session_origin == InvalidRepOriginId) { - //Lookup the replication origin and create it if it does not exist. - char originname[NAMEDATALEN]; - snprintf(originname, sizeof(originname), "pg_%u",nodeId); - originid = replorigin_by_name(originname, true); - if (originid == InvalidRepOriginId) { - originid = replorigin_create(originname); - } - //Setup the replication origin session. - replorigin_session_setup(originid); - replorigin_session_origin = originid; - origin_startpos = replorigin_session_get_progress(false); - //elog(LOG, "!!!! Citus: ShardCopyDestReceiverReceive replorigin_session_origin %d", replorigin_session_origin); - } -} - /* * ShardCopyDestReceiverStartup implements the rStartup interface of ShardCopyDestReceiver. @@ -282,7 +304,26 @@ ShardCopyDestReceiverStartup(DestReceiver *dest, int operation, TupleDesc copyDest->columnOutputFunctions = ColumnOutputFunctions(inputTupleDescriptor, copyOutState->binary); copyDest->copyOutState = copyOutState; - SetupReplicationOrigin(copyDest->destinationNodeId); + CreateReplicationOriginIfNotExists(copyDest); +} + + +static void +CreateReplicationOriginIfNotExists(ShardCopyDestReceiver *dest) +{ + int connectionFlags = OUTSIDE_TRANSACTION; + char *currentUser = CurrentUserName(); + WorkerNode *workerNode = FindNodeWithNodeId(dest->destinationNodeId, + false /* missingOk */); + MultiConnection *connection = GetNodeUserDatabaseConnection(connectionFlags, + workerNode->workerName, + workerNode->workerPort, + currentUser, + NULL /* database (current) */); + ClaimConnectionExclusively(connection); + ExecuteCriticalRemoteCommand(connection, + CDC_REPLICATION_ORIGIN_CREATE_IF_NOT_EXISTS_CMD); + CloseConnection(connection); } @@ -341,6 +382,11 @@ ShardCopyDestReceiverShutdown(DestReceiver *dest) PQclear(result); ForgetResults(copyDest->connection); + + /*Reset Replication Origin. */ + ExecuteCriticalRemoteCommand(copyDest->connection, + CDC_REPLICATION_ORIGIN_SESION_RESET_CMD); + CloseConnection(copyDest->connection); } } diff --git a/src/backend/distributed/shardsplit/shardsplit_decoder.c b/src/backend/distributed/shardsplit/shardsplit_decoder.c index 1e22f8dc2..ecfb84aa9 100644 --- a/src/backend/distributed/shardsplit/shardsplit_decoder.c +++ b/src/backend/distributed/shardsplit/shardsplit_decoder.c @@ -13,6 +13,7 @@ #include "distributed/worker_shard_visibility.h" #include "distributed/worker_protocol.h" #include "distributed/listutils.h" +#include "distributed/metadata/distobject.h" #include "replication/logical.h" #include "utils/typcache.h" #include "utils/lsyscache.h" @@ -39,15 +40,15 @@ static Oid FindTargetRelationOid(Relation sourceShardRelation, static HeapTuple GetTupleForTargetSchema(HeapTuple sourceRelationTuple, TupleDesc sourceTupleDesc, TupleDesc targetTupleDesc); -static bool -cdc_origin_filter(LogicalDecodingContext *ctx, RepOriginId origin_id); +static bool replication_origin_filter_cb(LogicalDecodingContext *ctx, RepOriginId origin_id); -static bool -is_cdc_replication_slot(LogicalDecodingContext *ctx); +static bool PublishChangesIfCdcSlot(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, + Relation relation, ReorderBufferChange *change); + +/* used in the replication_origin_filter_cb function. */ +#define InvalidRepOriginId 0 +#define CITUS_CDC_SLOT_NAME "citus_cdc_slot" -bool -handle_cdc_changes(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, - Relation relation, ReorderBufferChange *change); /* * Postgres uses 'pgoutput' as default plugin for logical replication. * We want to reuse Postgres pgoutput's functionality as much as possible. @@ -57,10 +58,10 @@ void _PG_output_plugin_init(OutputPluginCallbacks *cb) { LogicalOutputPluginInit plugin_init = - (LogicalOutputPluginInit) (void *) + (LogicalOutputPluginInit) (void *) load_external_function("pgoutput", - "_PG_output_plugin_init", - false, NULL); + "_PG_output_plugin_init", + false, NULL); if (plugin_init == NULL) { @@ -69,70 +70,71 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb) /* ask the output plugin to fill the callback struct */ plugin_init(cb); + /* actual pgoutput callback will be called with the appropriate destination shard */ pgoutputChangeCB = cb->change_cb; cb->change_cb = split_change_cb; - cb->filter_by_origin_cb = cdc_origin_filter; + cb->filter_by_origin_cb = replication_origin_filter_cb; } -#define InvalidRepOriginId 0 /* - * cdc_origin_filter is called for each change. If the change is not - * originated from the CDC replication slot, we return false to skip - * the change. + * replication_origin_filter_cb call back function filters out publication of changes + * originated from any other node other than the current node. This is + * identified by the "origin_id" of the changes. The origin_id is set to + * a non-zero value in the origin node as part of WAL replication. */ -static bool -cdc_origin_filter(LogicalDecodingContext *ctx, RepOriginId origin_id) + static bool +replication_origin_filter_cb(LogicalDecodingContext *ctx, RepOriginId origin_id) { if (origin_id != InvalidRepOriginId) { - elog(LOG,"!!!! cdc_origin_filter: filtering because origin_id: %d \n",origin_id); return true; } - //elog(LOG,"!!!! cdc_origin_filter: NOT filtering because origin_id: %d \n",origin_id); return false; } -static bool -is_cdc_replication_slot(LogicalDecodingContext *ctx) { +/* + * PublishChangesIfCdcSlot checks if the current slot is a CDC slot. If so, it publishes + * the changes as the change for the distributed table instead of shard. + * If not, it returns false. It also skips the Citus metadata tables. + */ +static bool +PublishChangesIfCdcSlot(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, + Relation relation, ReorderBufferChange *change) +{ char *replicationSlotName = ctx->slot->data.name.data; - //elog(LOG,"is_cdc_replication_slot: replicationSlotName %s", replicationSlotName); - return (replicationSlotName != NULL && strcmp(replicationSlotName,"cdc")== 0); -} - -bool -handle_cdc_changes(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, - Relation relation, ReorderBufferChange *change) { - bool is_cdc_translated_to_distribution_table = false; - if (is_cdc_replication_slot(ctx)) { - // Check if it is a change in a Shard table - if (RelationIsAKnownShard(relation->rd_id)) { - //Oid shardRelationId = relation->rd_id; + /* Check if the replication slot is CITUS_CDC_SLOT*/ + if (replicationSlotName != NULL && strcmp(replicationSlotName, CITUS_CDC_SLOT_NAME) == 0) + { + /* Skip publishing changes for Citus metadata tables*/ + ObjectAddress objectAdress = {RelationRelationId, relation->rd_id, 0}; + if (IsObjectAddressOwnedByCitus(&objectAdress)) + { + return true; + } + /* Check if this change is for a shard in distributed table. */ + if (RelationIsAKnownShard(relation->rd_id)) + { char *shardRelationName = RelationGetRelationName(relation); uint64 shardId = ExtractShardIdFromTableName(shardRelationName, true); if (shardId != INVALID_SHARD_ID) { - // try to get the distributed relation id for the shard + /* try to get the distributed relation id for the shard */ Oid distributedRelationId = RelationIdForShard(shardId); if (OidIsValid(distributedRelationId)) { - char* relationName = get_rel_name(distributedRelationId); - //elog(LOG,"changing to distributed relation name:%s id:%d ", relationName, distributedRelationId); - Relation distributedRelation = RelationIdGetRelation(distributedRelationId); - pgoutputChangeCB(ctx, txn, distributedRelation, change); - is_cdc_translated_to_distribution_table = true; + relation = RelationIdGetRelation(distributedRelationId); } } } - if (!is_cdc_translated_to_distribution_table) { - pgoutputChangeCB(ctx, txn, relation, change); - } + pgoutputChangeCB(ctx, txn, relation, change); return true; - } + } return false; } + /* * split_change function emits the incoming tuple change * to the appropriate destination shard. @@ -141,15 +143,15 @@ static void split_change_cb(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change) { - //elog(LOG,"Citus split_change_cb called"); - if (handle_cdc_changes(ctx,txn,relation,change)) { - return; - } if (!is_publishable_relation(relation)) { return; } - + if (PublishChangesIfCdcSlot(ctx, txn, relation, change)) + { + return; + } + char *replicationSlotName = ctx->slot->data.name.data; /* @@ -274,6 +276,7 @@ split_change_cb(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, RelationClose(targetRelation); } + /* * FindTargetRelationOid returns the destination relation Oid for the incoming * tuple.