Intial commit for CDC using Logical Replication

pull/6453/head
Rajesh Kumar Thandapani 2022-10-20 12:21:24 +05:30
parent 02fd1e6c03
commit 773dbe147d
2 changed files with 103 additions and 8 deletions

View File

@ -24,6 +24,7 @@
#include "distributed/relation_utils.h" #include "distributed/relation_utils.h"
#include "distributed/version_compat.h" #include "distributed/version_compat.h"
#include "distributed/local_executor.h" #include "distributed/local_executor.h"
#include "replication/origin.h"
/* /*
* LocalCopyBuffer is used in copy callback to return the copied rows. * LocalCopyBuffer is used in copy callback to return the copied rows.
@ -79,6 +80,7 @@ static int ReadFromLocalBufferCallback(void *outBuf, int minRead, int maxRead);
static void LocalCopyToShard(ShardCopyDestReceiver *copyDest, CopyOutState static void LocalCopyToShard(ShardCopyDestReceiver *copyDest, CopyOutState
localCopyOutState); localCopyOutState);
static void ConnectToRemoteAndStartCopy(ShardCopyDestReceiver *copyDest); static void ConnectToRemoteAndStartCopy(ShardCopyDestReceiver *copyDest);
static void SetupReplicationOrigin(RepOriginId nodeId);
static bool static bool
CanUseLocalCopy(uint32_t destinationNodeId) CanUseLocalCopy(uint32_t destinationNodeId)
@ -152,7 +154,7 @@ CreateShardCopyDestReceiver(EState *executorState,
return (DestReceiver *) copyDest; return (DestReceiver *) copyDest;
} }
#define InvalidRepOriginId 0
/* /*
* ShardCopyDestReceiverReceive implements the receiveSlot function of * ShardCopyDestReceiverReceive implements the receiveSlot function of
* ShardCopyDestReceiver. It takes a TupleTableSlot and sends the contents to * ShardCopyDestReceiver. It takes a TupleTableSlot and sends the contents to
@ -232,6 +234,27 @@ ShardCopyDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest)
return true; return true;
} }
static void
SetupReplicationOrigin(RepOriginId nodeId) {
RepOriginId originid = InvalidRepOriginId;
XLogRecPtr origin_startpos = InvalidXLogRecPtr;
//Check if there is a replication origin session already active.
if (replorigin_session_origin == InvalidRepOriginId) {
//Lookup the replication origin and create it if it does not exist.
char originname[NAMEDATALEN];
snprintf(originname, sizeof(originname), "pg_%u",nodeId);
originid = replorigin_by_name(originname, true);
if (originid == InvalidRepOriginId) {
originid = replorigin_create(originname);
}
//Setup the replication origin session.
replorigin_session_setup(originid);
replorigin_session_origin = originid;
origin_startpos = replorigin_session_get_progress(false);
//elog(LOG, "!!!! Citus: ShardCopyDestReceiverReceive replorigin_session_origin %d", replorigin_session_origin);
}
}
/* /*
* ShardCopyDestReceiverStartup implements the rStartup interface of ShardCopyDestReceiver. * ShardCopyDestReceiverStartup implements the rStartup interface of ShardCopyDestReceiver.
@ -259,6 +282,7 @@ ShardCopyDestReceiverStartup(DestReceiver *dest, int operation, TupleDesc
copyDest->columnOutputFunctions = ColumnOutputFunctions(inputTupleDescriptor, copyDest->columnOutputFunctions = ColumnOutputFunctions(inputTupleDescriptor,
copyOutState->binary); copyOutState->binary);
copyDest->copyOutState = copyOutState; copyDest->copyOutState = copyOutState;
SetupReplicationOrigin(copyDest->destinationNodeId);
} }

View File

@ -10,10 +10,12 @@
#include "postgres.h" #include "postgres.h"
#include "distributed/shardinterval_utils.h" #include "distributed/shardinterval_utils.h"
#include "distributed/shardsplit_shared_memory.h" #include "distributed/shardsplit_shared_memory.h"
#include "distributed/worker_shard_visibility.h"
#include "distributed/worker_protocol.h"
#include "distributed/listutils.h" #include "distributed/listutils.h"
#include "replication/logical.h" #include "replication/logical.h"
#include "utils/typcache.h" #include "utils/typcache.h"
#include "utils/lsyscache.h"
extern void _PG_output_plugin_init(OutputPluginCallbacks *cb); extern void _PG_output_plugin_init(OutputPluginCallbacks *cb);
static LogicalDecodeChangeCB pgoutputChangeCB; static LogicalDecodeChangeCB pgoutputChangeCB;
@ -37,7 +39,15 @@ static Oid FindTargetRelationOid(Relation sourceShardRelation,
static HeapTuple GetTupleForTargetSchema(HeapTuple sourceRelationTuple, static HeapTuple GetTupleForTargetSchema(HeapTuple sourceRelationTuple,
TupleDesc sourceTupleDesc, TupleDesc sourceTupleDesc,
TupleDesc targetTupleDesc); TupleDesc targetTupleDesc);
static bool
cdc_origin_filter(LogicalDecodingContext *ctx, RepOriginId origin_id);
static bool
is_cdc_replication_slot(LogicalDecodingContext *ctx);
bool
handle_cdc_changes(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
Relation relation, ReorderBufferChange *change);
/* /*
* Postgres uses 'pgoutput' as default plugin for logical replication. * Postgres uses 'pgoutput' as default plugin for logical replication.
* We want to reuse Postgres pgoutput's functionality as much as possible. * We want to reuse Postgres pgoutput's functionality as much as possible.
@ -47,7 +57,8 @@ void
_PG_output_plugin_init(OutputPluginCallbacks *cb) _PG_output_plugin_init(OutputPluginCallbacks *cb)
{ {
LogicalOutputPluginInit plugin_init = LogicalOutputPluginInit plugin_init =
(LogicalOutputPluginInit) (void *) load_external_function("pgoutput", (LogicalOutputPluginInit) (void *)
load_external_function("pgoutput",
"_PG_output_plugin_init", "_PG_output_plugin_init",
false, NULL); false, NULL);
@ -58,12 +69,69 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
/* ask the output plugin to fill the callback struct */ /* ask the output plugin to fill the callback struct */
plugin_init(cb); plugin_init(cb);
/* actual pgoutput callback will be called with the appropriate destination shard */ /* actual pgoutput callback will be called with the appropriate destination shard */
pgoutputChangeCB = cb->change_cb; pgoutputChangeCB = cb->change_cb;
cb->change_cb = split_change_cb; cb->change_cb = split_change_cb;
cb->filter_by_origin_cb = cdc_origin_filter;
} }
#define InvalidRepOriginId 0
/*
* cdc_origin_filter is called for each change. If the change is not
* originated from the CDC replication slot, we return false to skip
* the change.
*/
static bool
cdc_origin_filter(LogicalDecodingContext *ctx, RepOriginId origin_id)
{
if (origin_id != InvalidRepOriginId)
{
elog(LOG,"!!!! cdc_origin_filter: filtering because origin_id: %d \n",origin_id);
return true;
}
//elog(LOG,"!!!! cdc_origin_filter: NOT filtering because origin_id: %d \n",origin_id);
return false;
}
static bool
is_cdc_replication_slot(LogicalDecodingContext *ctx) {
char *replicationSlotName = ctx->slot->data.name.data;
//elog(LOG,"is_cdc_replication_slot: replicationSlotName %s", replicationSlotName);
return (replicationSlotName != NULL && strcmp(replicationSlotName,"cdc")== 0);
}
bool
handle_cdc_changes(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
Relation relation, ReorderBufferChange *change) {
bool is_cdc_translated_to_distribution_table = false;
if (is_cdc_replication_slot(ctx)) {
// Check if it is a change in a Shard table
if (RelationIsAKnownShard(relation->rd_id)) {
//Oid shardRelationId = relation->rd_id;
char *shardRelationName = RelationGetRelationName(relation);
uint64 shardId = ExtractShardIdFromTableName(shardRelationName, true);
if (shardId != INVALID_SHARD_ID)
{
// try to get the distributed relation id for the shard
Oid distributedRelationId = RelationIdForShard(shardId);
if (OidIsValid(distributedRelationId))
{
char* relationName = get_rel_name(distributedRelationId);
//elog(LOG,"changing to distributed relation name:%s id:%d ", relationName, distributedRelationId);
Relation distributedRelation = RelationIdGetRelation(distributedRelationId);
pgoutputChangeCB(ctx, txn, distributedRelation, change);
is_cdc_translated_to_distribution_table = true;
}
}
}
if (!is_cdc_translated_to_distribution_table) {
pgoutputChangeCB(ctx, txn, relation, change);
}
return true;
}
return false;
}
/* /*
* split_change function emits the incoming tuple change * split_change function emits the incoming tuple change
@ -73,6 +141,10 @@ static void
split_change_cb(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, split_change_cb(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
Relation relation, ReorderBufferChange *change) Relation relation, ReorderBufferChange *change)
{ {
//elog(LOG,"Citus split_change_cb called");
if (handle_cdc_changes(ctx,txn,relation,change)) {
return;
}
if (!is_publishable_relation(relation)) if (!is_publishable_relation(relation))
{ {
return; return;
@ -202,7 +274,6 @@ split_change_cb(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
RelationClose(targetRelation); RelationClose(targetRelation);
} }
/* /*
* FindTargetRelationOid returns the destination relation Oid for the incoming * FindTargetRelationOid returns the destination relation Oid for the incoming
* tuple. * tuple.