Incorporated review comments.

pull/6453/head
Rajesh Kumar Thandapani 2022-11-24 21:49:44 +05:30
parent 4635c1533f
commit 69cc241e6d
2 changed files with 37 additions and 56 deletions

View File

@ -33,11 +33,11 @@
*/ */
static StringInfo LocalCopyBuffer; static StringInfo LocalCopyBuffer;
#define CDC_REPLICATION_ORIGIN_CREATE_IF_NOT_EXISTS_CMD \ #define CDC_REPLICATION_ORIGIN_CREATE_IF_NOT_EXISTS_CMD \
"SELECT pg_catalog.pg_replication_origin_create('citus_cdc') \ "SELECT pg_catalog.pg_replication_origin_create('citus_cdc_%d') \
where (select pg_catalog.pg_replication_origin_oid('citus_cdc')) IS NULL;" where (select pg_catalog.pg_replication_origin_oid('citus_cdc_%d')) IS NULL;"
#define CDC_REPLICATION_ORIGIN_SESION_SETUP_CMD \ #define CDC_REPLICATION_ORIGIN_SESION_SETUP_CMD \
"SELECT pg_catalog.pg_replication_origin_session_setup('citus_cdc') \ "SELECT pg_catalog.pg_replication_origin_session_setup('citus_cdc_%d') \
where pg_catalog.pg_replication_origin_session_is_setup()='f';" where pg_catalog.pg_replication_origin_session_is_setup()='f';"
#define CDC_REPLICATION_ORIGIN_SESION_RESET_CMD \ #define CDC_REPLICATION_ORIGIN_SESION_RESET_CMD \
@ -93,8 +93,6 @@ static void LocalCopyToShard(ShardCopyDestReceiver *copyDest, CopyOutState
static void ConnectToRemoteAndStartCopy(ShardCopyDestReceiver *copyDest); static void ConnectToRemoteAndStartCopy(ShardCopyDestReceiver *copyDest);
static void CreateReplicationOriginIfNotExists(ShardCopyDestReceiver *dest); static void CreateReplicationOriginIfNotExists(ShardCopyDestReceiver *dest);
static void SetupReplicationOriginSessionIfNotSetupAlready(MultiConnection *connection);
static void ResetReplicationOriginSessionIfSetupAlready(MultiConnection *connection);
static bool static bool
CanUseLocalCopy(uint32_t destinationNodeId) CanUseLocalCopy(uint32_t destinationNodeId)
@ -104,36 +102,7 @@ CanUseLocalCopy(uint32_t destinationNodeId)
} }
/* #define REPLICATION_ORIGIN_CMD_BUFFER_SIZE 1024
* SetupReplicationOriginSessionIfNotSetupAlready sets up the replication origin session
* if it is not already setup.
*/
static void
SetupReplicationOriginSessionIfNotSetupAlready(MultiConnection *connection)
{
/* Setup replication Origin if not setup already */
if (!SendRemoteCommand(connection, CDC_REPLICATION_ORIGIN_SESION_SETUP_CMD))
{
ReportConnectionError(connection, ERROR);
}
ForgetResults(connection);
}
/*
* ResetReplicationOriginSessionIfSetupAlready resets the replication origin session
* if it has been setup currently.
*/
static void
ResetReplicationOriginSessionIfSetupAlready(MultiConnection *connection)
{
if (!SendRemoteCommand(connection, CDC_REPLICATION_ORIGIN_SESION_RESET_CMD))
{
ReportConnectionError(connection, ERROR);
}
ForgetResults(connection);
}
/* Connect to node with source shard and trigger copy start. */ /* Connect to node with source shard and trigger copy start. */
static void static void
@ -152,7 +121,11 @@ ConnectToRemoteAndStartCopy(ShardCopyDestReceiver *copyDest)
/* Setup Replication Origin Session if not setup already for /* Setup Replication Origin Session if not setup already for
* avoiding publication of events more than once. */ * avoiding publication of events more than once. */
SetupReplicationOriginSessionIfNotSetupAlready(copyDest->connection); char replicationOrginSetupCommand[REPLICATION_ORIGIN_CMD_BUFFER_SIZE];
int originId = GetLocalNodeId();
snprintf(replicationOrginSetupCommand, REPLICATION_ORIGIN_CMD_BUFFER_SIZE,
CDC_REPLICATION_ORIGIN_SESION_SETUP_CMD, originId);
ExecuteCriticalRemoteCommand(copyDest->connection, replicationOrginSetupCommand);
StringInfo copyStatement = ConstructShardCopyStatement( StringInfo copyStatement = ConstructShardCopyStatement(
copyDest->destinationShardFullyQualifiedName, copyDest->destinationShardFullyQualifiedName,
@ -327,9 +300,12 @@ CreateReplicationOriginIfNotExists(ShardCopyDestReceiver *dest)
workerNode->workerPort, workerNode->workerPort,
currentUser, currentUser,
NULL /* database (current) */); NULL /* database (current) */);
ClaimConnectionExclusively(connection); char replicationOrginCreateCommand[REPLICATION_ORIGIN_CMD_BUFFER_SIZE];
ExecuteCriticalRemoteCommand(connection, int originId = GetLocalNodeId();
CDC_REPLICATION_ORIGIN_CREATE_IF_NOT_EXISTS_CMD); snprintf(replicationOrginCreateCommand, REPLICATION_ORIGIN_CMD_BUFFER_SIZE,
CDC_REPLICATION_ORIGIN_CREATE_IF_NOT_EXISTS_CMD, originId, originId);
ExecuteCriticalRemoteCommand(connection, replicationOrginCreateCommand);
CloseConnection(connection); CloseConnection(connection);
} }

View File

@ -17,6 +17,7 @@
#include "replication/logical.h" #include "replication/logical.h"
#include "utils/typcache.h" #include "utils/typcache.h"
#include "utils/lsyscache.h" #include "utils/lsyscache.h"
#include "catalog/pg_namespace.h"
extern void _PG_output_plugin_init(OutputPluginCallbacks *cb); extern void _PG_output_plugin_init(OutputPluginCallbacks *cb);
static LogicalDecodeChangeCB pgoutputChangeCB; static LogicalDecodeChangeCB pgoutputChangeCB;
@ -48,7 +49,8 @@ static bool PublishChangesIfCdcSlot(LogicalDecodingContext *ctx, ReorderBufferTX
/* used in the replication_origin_filter_cb function. */ /* used in the replication_origin_filter_cb function. */
#define InvalidRepOriginId 0 #define InvalidRepOriginId 0
#define CITUS_CDC_SLOT_NAME "citus_cdc_slot"
#define CITUS_SHARD_PREFIX_SLOT "citus_shard_"
/* /*
* Postgres uses 'pgoutput' as default plugin for logical replication. * Postgres uses 'pgoutput' as default plugin for logical replication.
@ -108,31 +110,28 @@ PublishChangesIfCdcSlot(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
char *replicationSlotName = ctx->slot->data.name.data; char *replicationSlotName = ctx->slot->data.name.data;
/* Check if the replication slot is CITUS_CDC_SLOT*/ /* Check if the replication slot is CITUS_CDC_SLOT*/
if (replicationSlotName != NULL && strcmp(replicationSlotName, CITUS_CDC_SLOT_NAME) == if (replicationSlotName != NULL &&
0) strncmp(replicationSlotName, CITUS_SHARD_PREFIX_SLOT, strlen(
CITUS_SHARD_PREFIX_SLOT)) != 0)
{ {
/* Skip publishing changes for Citus metadata tables*/ /* Skip publishing changes for system relations in pg_catalog*/
ObjectAddress objectAdress = { RelationRelationId, relation->rd_id, 0 }; if (relation->rd_rel->relnamespace == PG_CATALOG_NAMESPACE)
if (IsObjectAddressOwnedByCitus(&objectAdress))
{ {
return true; return true;
} }
/* Check if this change is for a shard in distributed table. */ char *shardRelationName = RelationGetRelationName(relation);
if (RelationIsAKnownShard(relation->rd_id)) uint64 shardId = ExtractShardIdFromTableName(shardRelationName, true);
if (shardId != INVALID_SHARD_ID && ShardExists(shardId))
{ {
char *shardRelationName = RelationGetRelationName(relation); /* try to get the distributed relation id for the shard */
uint64 shardId = ExtractShardIdFromTableName(shardRelationName, true); Oid distributedRelationId = RelationIdForShard(shardId);
if (shardId != INVALID_SHARD_ID) if (OidIsValid(distributedRelationId))
{ {
/* try to get the distributed relation id for the shard */ relation = RelationIdGetRelation(distributedRelationId);
Oid distributedRelationId = RelationIdForShard(shardId);
if (OidIsValid(distributedRelationId))
{
relation = RelationIdGetRelation(distributedRelationId);
}
} }
} }
pgoutputChangeCB(ctx, txn, relation, change); pgoutputChangeCB(ctx, txn, relation, change);
return true; return true;
} }
@ -148,6 +147,12 @@ static void
split_change_cb(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, split_change_cb(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
Relation relation, ReorderBufferChange *change) Relation relation, ReorderBufferChange *change)
{ {
/*check if Citus extension is loaded. */
if (!CitusHasBeenLoaded())
{
return;
}
if (!is_publishable_relation(relation)) if (!is_publishable_relation(relation))
{ {
return; return;