From 773dbe147dc19b6a656c041aeba3fb281233cb43 Mon Sep 17 00:00:00 2001 From: Rajesh Kumar Thandapani Date: Thu, 20 Oct 2022 12:21:24 +0530 Subject: [PATCH] Intial commit for CDC using Logical Replication --- .../operations/worker_shard_copy.c | 26 +++++- .../shardsplit/shardsplit_decoder.c | 85 +++++++++++++++++-- 2 files changed, 103 insertions(+), 8 deletions(-) diff --git a/src/backend/distributed/operations/worker_shard_copy.c b/src/backend/distributed/operations/worker_shard_copy.c index 9239caffb..4be1b394f 100644 --- a/src/backend/distributed/operations/worker_shard_copy.c +++ b/src/backend/distributed/operations/worker_shard_copy.c @@ -24,6 +24,7 @@ #include "distributed/relation_utils.h" #include "distributed/version_compat.h" #include "distributed/local_executor.h" +#include "replication/origin.h" /* * LocalCopyBuffer is used in copy callback to return the copied rows. @@ -79,6 +80,7 @@ static int ReadFromLocalBufferCallback(void *outBuf, int minRead, int maxRead); static void LocalCopyToShard(ShardCopyDestReceiver *copyDest, CopyOutState localCopyOutState); static void ConnectToRemoteAndStartCopy(ShardCopyDestReceiver *copyDest); +static void SetupReplicationOrigin(RepOriginId nodeId); static bool CanUseLocalCopy(uint32_t destinationNodeId) @@ -152,7 +154,7 @@ CreateShardCopyDestReceiver(EState *executorState, return (DestReceiver *) copyDest; } - +#define InvalidRepOriginId 0 /* * ShardCopyDestReceiverReceive implements the receiveSlot function of * ShardCopyDestReceiver. It takes a TupleTableSlot and sends the contents to @@ -232,6 +234,27 @@ ShardCopyDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest) return true; } +static void +SetupReplicationOrigin(RepOriginId nodeId) { + RepOriginId originid = InvalidRepOriginId; + XLogRecPtr origin_startpos = InvalidXLogRecPtr; + //Check if there is a replication origin session already active. + if (replorigin_session_origin == InvalidRepOriginId) { + //Lookup the replication origin and create it if it does not exist. + char originname[NAMEDATALEN]; + snprintf(originname, sizeof(originname), "pg_%u",nodeId); + originid = replorigin_by_name(originname, true); + if (originid == InvalidRepOriginId) { + originid = replorigin_create(originname); + } + //Setup the replication origin session. + replorigin_session_setup(originid); + replorigin_session_origin = originid; + origin_startpos = replorigin_session_get_progress(false); + //elog(LOG, "!!!! Citus: ShardCopyDestReceiverReceive replorigin_session_origin %d", replorigin_session_origin); + } +} + /* * ShardCopyDestReceiverStartup implements the rStartup interface of ShardCopyDestReceiver. @@ -259,6 +282,7 @@ ShardCopyDestReceiverStartup(DestReceiver *dest, int operation, TupleDesc copyDest->columnOutputFunctions = ColumnOutputFunctions(inputTupleDescriptor, copyOutState->binary); copyDest->copyOutState = copyOutState; + SetupReplicationOrigin(copyDest->destinationNodeId); } diff --git a/src/backend/distributed/shardsplit/shardsplit_decoder.c b/src/backend/distributed/shardsplit/shardsplit_decoder.c index 51a56b36e..1e22f8dc2 100644 --- a/src/backend/distributed/shardsplit/shardsplit_decoder.c +++ b/src/backend/distributed/shardsplit/shardsplit_decoder.c @@ -10,10 +10,12 @@ #include "postgres.h" #include "distributed/shardinterval_utils.h" #include "distributed/shardsplit_shared_memory.h" +#include "distributed/worker_shard_visibility.h" +#include "distributed/worker_protocol.h" #include "distributed/listutils.h" #include "replication/logical.h" #include "utils/typcache.h" - +#include "utils/lsyscache.h" extern void _PG_output_plugin_init(OutputPluginCallbacks *cb); static LogicalDecodeChangeCB pgoutputChangeCB; @@ -37,7 +39,15 @@ static Oid FindTargetRelationOid(Relation sourceShardRelation, static HeapTuple GetTupleForTargetSchema(HeapTuple sourceRelationTuple, TupleDesc sourceTupleDesc, TupleDesc targetTupleDesc); +static bool +cdc_origin_filter(LogicalDecodingContext *ctx, RepOriginId origin_id); +static bool +is_cdc_replication_slot(LogicalDecodingContext *ctx); + +bool +handle_cdc_changes(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, + Relation relation, ReorderBufferChange *change); /* * Postgres uses 'pgoutput' as default plugin for logical replication. * We want to reuse Postgres pgoutput's functionality as much as possible. @@ -47,9 +57,10 @@ void _PG_output_plugin_init(OutputPluginCallbacks *cb) { LogicalOutputPluginInit plugin_init = - (LogicalOutputPluginInit) (void *) load_external_function("pgoutput", - "_PG_output_plugin_init", - false, NULL); + (LogicalOutputPluginInit) (void *) + load_external_function("pgoutput", + "_PG_output_plugin_init", + false, NULL); if (plugin_init == NULL) { @@ -58,12 +69,69 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb) /* ask the output plugin to fill the callback struct */ plugin_init(cb); - /* actual pgoutput callback will be called with the appropriate destination shard */ pgoutputChangeCB = cb->change_cb; cb->change_cb = split_change_cb; + cb->filter_by_origin_cb = cdc_origin_filter; } +#define InvalidRepOriginId 0 + +/* + * cdc_origin_filter is called for each change. If the change is not + * originated from the CDC replication slot, we return false to skip + * the change. + */ +static bool +cdc_origin_filter(LogicalDecodingContext *ctx, RepOriginId origin_id) +{ + if (origin_id != InvalidRepOriginId) + { + elog(LOG,"!!!! cdc_origin_filter: filtering because origin_id: %d \n",origin_id); + return true; + } + //elog(LOG,"!!!! cdc_origin_filter: NOT filtering because origin_id: %d \n",origin_id); + return false; +} + +static bool +is_cdc_replication_slot(LogicalDecodingContext *ctx) { + char *replicationSlotName = ctx->slot->data.name.data; + //elog(LOG,"is_cdc_replication_slot: replicationSlotName %s", replicationSlotName); + return (replicationSlotName != NULL && strcmp(replicationSlotName,"cdc")== 0); +} + +bool +handle_cdc_changes(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, + Relation relation, ReorderBufferChange *change) { + bool is_cdc_translated_to_distribution_table = false; + if (is_cdc_replication_slot(ctx)) { + // Check if it is a change in a Shard table + if (RelationIsAKnownShard(relation->rd_id)) { + //Oid shardRelationId = relation->rd_id; + char *shardRelationName = RelationGetRelationName(relation); + uint64 shardId = ExtractShardIdFromTableName(shardRelationName, true); + if (shardId != INVALID_SHARD_ID) + { + // try to get the distributed relation id for the shard + Oid distributedRelationId = RelationIdForShard(shardId); + if (OidIsValid(distributedRelationId)) + { + char* relationName = get_rel_name(distributedRelationId); + //elog(LOG,"changing to distributed relation name:%s id:%d ", relationName, distributedRelationId); + Relation distributedRelation = RelationIdGetRelation(distributedRelationId); + pgoutputChangeCB(ctx, txn, distributedRelation, change); + is_cdc_translated_to_distribution_table = true; + } + } + } + if (!is_cdc_translated_to_distribution_table) { + pgoutputChangeCB(ctx, txn, relation, change); + } + return true; + } + return false; +} /* * split_change function emits the incoming tuple change @@ -73,11 +141,15 @@ static void split_change_cb(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change) { + //elog(LOG,"Citus split_change_cb called"); + if (handle_cdc_changes(ctx,txn,relation,change)) { + return; + } if (!is_publishable_relation(relation)) { return; } - + char *replicationSlotName = ctx->slot->data.name.data; /* @@ -202,7 +274,6 @@ split_change_cb(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, RelationClose(targetRelation); } - /* * FindTargetRelationOid returns the destination relation Oid for the incoming * tuple.