From 59c3c93aa43d6626e48024870bfb8abd9e542b68 Mon Sep 17 00:00:00 2001 From: Sameer Awasekar Date: Wed, 25 May 2022 15:31:22 +0530 Subject: [PATCH] Addressing comments - Changed name of functions - Moved some functions from .h to .c file --- .../operations/shard_split_replicatoin.c | 36 ++++++------ src/backend/distributed/shardsplit/pgoutput.c | 26 +++++---- .../shardsplit/shardsplit_shared_memory.c | 56 ++++++++++++------- .../distributed/shardsplit_shared_memory.h | 20 +------ 4 files changed, 72 insertions(+), 66 deletions(-) diff --git a/src/backend/distributed/operations/shard_split_replicatoin.c b/src/backend/distributed/operations/shard_split_replicatoin.c index 932125753..3d96c0468 100644 --- a/src/backend/distributed/operations/shard_split_replicatoin.c +++ b/src/backend/distributed/operations/shard_split_replicatoin.c @@ -49,15 +49,15 @@ static void ParseShardSplitInfo(ArrayType *shardInfoArrayObject, int32 *maxValue, int32 *nodeId); static ShardSplitInfo * CreateShardSplitInfo(uint64 sourceShardIdToSplit, - uint64 desSplitChildShardId, - int32 minValue, - int32 maxValue, - int32 nodeId); + uint64 desSplitChildShardId, + int32 minValue, + int32 maxValue, + int32 nodeId); static void AddShardSplitInfoEntryForNodeInMap(ShardSplitInfo *shardSplitInfo); static void * PopulateShardSplitInfoInSM(ShardSplitInfo *shardSplitInfoArray, - HTAB *shardInfoHashMap, - dsm_handle dsmHandle, - int shardSplitInfoCount); + HTAB *shardInfoHashMap, + dsm_handle dsmHandle, + int shardSplitInfoCount); static void SetupHashMapForShardInfo(); /* @@ -130,12 +130,12 @@ split_shard_replication_setup(PG_FUNCTION_ARGS) dsm_handle dsmHandle; ShardSplitInfo *splitShardInfoSMArray = - GetSharedMemoryForShardSplitInfo(shardSplitInfoCount, &dsmHandle); + CreateSharedMemoryForShardSplitInfo(shardSplitInfoCount, &dsmHandle); PopulateShardSplitInfoInSM(splitShardInfoSMArray, - ShardInfoHashMap, - dsmHandle, - shardSplitInfoCount); + ShardInfoHashMap, + dsmHandle, + shardSplitInfoCount); return dsmHandle; } @@ -315,10 +315,10 @@ ParseShardSplitInfo(ArrayType *shardInfoArrayObject, */ ShardSplitInfo * CreateShardSplitInfo(uint64 sourceShardIdToSplit, - uint64 desSplitChildShardId, - int32 minValue, - int32 maxValue, - int32 nodeId) + uint64 desSplitChildShardId, + int32 minValue, + int32 maxValue, + int32 nodeId) { ShardInterval *shardIntervalToSplit = LoadShardInterval(sourceShardIdToSplit); CitusTableCacheEntry *cachedTableEntry = GetCitusTableCacheEntry( @@ -425,9 +425,9 @@ AddShardSplitInfoEntryForNodeInMap(ShardSplitInfo *shardSplitInfo) */ void * PopulateShardSplitInfoInSM(ShardSplitInfo *shardSplitInfoArray, - HTAB *shardInfoHashMap, - dsm_handle dsmHandle, - int shardSplitInfoCount) + HTAB *shardInfoHashMap, + dsm_handle dsmHandle, + int shardSplitInfoCount) { HASH_SEQ_STATUS status; hash_seq_init(&status, shardInfoHashMap); diff --git a/src/backend/distributed/shardsplit/pgoutput.c b/src/backend/distributed/shardsplit/pgoutput.c index 1917b7e09..2d029ef3b 100644 --- a/src/backend/distributed/shardsplit/pgoutput.c +++ b/src/backend/distributed/shardsplit/pgoutput.c @@ -44,13 +44,13 @@ int shardSplitInfoArraySize = 0; /* Plugin callback */ static void split_change_cb(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, - Relation relation, ReorderBufferChange *change); + Relation relation, ReorderBufferChange *change); /* Helper methods */ static bool ShouldCommitBeApplied(Relation sourceShardRelation); static int32_t GetHashValueForIncomingTuple(Relation sourceShardRelation, - HeapTuple tuple, - bool *shouldHandleUpdate); + HeapTuple tuple, + bool *shouldHandleUpdate); void _PG_output_plugin_init(OutputPluginCallbacks *cb) @@ -202,6 +202,8 @@ FindTargetRelationOid(Relation sourceShardRelation, * part of replication. This in turn creates one more commit(2). * Commit 2 should be skipped as the source shard and destination for commit 2 * are same and the commit has already been applied. + * + * TODO(saawasek): Add the information in Hashmap for performance reasons. */ bool ShouldCommitBeApplied(Relation sourceShardRelation) @@ -231,7 +233,7 @@ ShouldCommitBeApplied(Relation sourceShardRelation) */ static void split_change_cb(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, - Relation relation, ReorderBufferChange *change) + Relation relation, ReorderBufferChange *change) { /* * Get ShardSplitInfo array from Shared Memory if not already @@ -243,8 +245,7 @@ split_change_cb(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, { shardSplitInfoArray = GetShardSplitInfoSMArrayForSlot(ctx->slot->data.name.data, - &arraySize); - shardSplitInfoArraySize = arraySize; + &shardSplitInfoArraySize); } char *replicationSlotName = ctx->slot->data.name.data; @@ -277,11 +278,14 @@ split_change_cb(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, default: Assert(false); } - - if (targetRelationOid != InvalidOid) + + /* Current replication slot is not responsible for handling the change */ + if (targetRelationOid == InvalidOid) { - Relation targetRelation = RelationIdGetRelation(targetRelationOid); - pgoutputChangeCB(ctx, txn, targetRelation, change); - RelationClose(targetRelation); + return; } + + Relation targetRelation = RelationIdGetRelation(targetRelationOid); + pgoutputChangeCB(ctx, txn, targetRelation, change); + RelationClose(targetRelation); } diff --git a/src/backend/distributed/shardsplit/shardsplit_shared_memory.c b/src/backend/distributed/shardsplit/shardsplit_shared_memory.c index a70999fec..f40ff1d0c 100644 --- a/src/backend/distributed/shardsplit/shardsplit_shared_memory.c +++ b/src/backend/distributed/shardsplit/shardsplit_shared_memory.c @@ -18,12 +18,28 @@ #include "distributed/shardsplit_shared_memory.h" #include "distributed/citus_safe_lib.h" +/* Function declarations */ +static ShardSplitInfoSMHeader * AllocateSharedMemoryForShardSplitInfo(int + shardSplitInfoCount, + Size + shardSplitInfoSize, + dsm_handle * + dsmHandle); + +static void * ShardSplitInfoSMSteps(ShardSplitInfoSMHeader *shardSplitInfoSMHeader); + +static ShardSplitInfoSMHeader * GetShardSplitInfoSMHeaderFromDSMHandle(dsm_handle + dsmHandle, + dsm_segment ** + attachedSegment); +static dsm_handle GetSMHandleFromSlotName(char *slotName); + /* * GetShardSplitInfoSMHeaderFromDSMHandle returns the header of the shared memory * segment beloing to 'dsmHandle'. It pins the shared memory segment mapping till * lifetime of the backend process accessing it. */ -ShardSplitInfoSMHeader * +static ShardSplitInfoSMHeader * GetShardSplitInfoSMHeaderFromDSMHandle(dsm_handle dsmHandle, dsm_segment **attachedSegment) { @@ -87,7 +103,7 @@ GetShardSplitInfoSMArrayForSlot(char *slotName, int *arraySize) * from the replication slot name. Replication slot name is encoded as * "NODEID_SlotType_SharedMemoryHANDLE". */ -dsm_handle +static dsm_handle GetSMHandleFromSlotName(char *slotName) { if (slotName == NULL) @@ -105,27 +121,29 @@ GetSMHandleFromSlotName(char *slotName) /* - * CreateShardSplitInfoSharedMemory is used to create a place to store - * information about the shard undergoing a split. The function creates dynamic - * shared memory segment consisting of a header regarding the processId and an - * array of "steps" which store ShardSplitInfo. The contents of this shared - * memory segment are consumed by WAL sender process during catch up phase of + * AllocateSharedMemoryForShardSplitInfo is used to create a place to store + * information about the shard undergoing a split. The function allocates dynamic + * shared memory segment consisting of a header which stores the id of process + * creating it and an array of "steps" which store ShardSplitInfo. The contents of + * this shared memory segment are consumed by WAL sender process during catch up phase of * replication through logical decoding plugin. * * The shared memory segment exists till the catch up phase completes or the * postmaster shutsdown. */ -ShardSplitInfoSMHeader * -CreateShardSplitInfoSharedMemory(int stepCount, Size stepSize, dsm_handle *dsmHandle) +static ShardSplitInfoSMHeader * +AllocateSharedMemoryForShardSplitInfo(int shardSplitInfoCount, Size shardSplitInfoSize, + dsm_handle *dsmHandle) { - if (stepSize <= 0 || stepCount <= 0) + if (shardSplitInfoCount <= 0 || shardSplitInfoSize <= 0) { ereport(ERROR, - (errmsg("number of steps and size of each step should be " + (errmsg("count and size of each step should be " "positive values"))); } - Size totalSize = sizeof(ShardSplitInfoSMHeader) + stepSize * stepCount; + Size totalSize = sizeof(ShardSplitInfoSMHeader) + shardSplitInfoCount * + shardSplitInfoSize; dsm_segment *dsmSegment = dsm_create(totalSize, DSM_CREATE_NULL_IF_MAXSEGMENTS); if (dsmSegment == NULL) @@ -146,7 +164,7 @@ CreateShardSplitInfoSharedMemory(int stepCount, Size stepSize, dsm_handle *dsmHa ShardSplitInfoSMHeader *shardSplitInfoSMHeader = GetShardSplitInfoSMHeaderFromDSMHandle(*dsmHandle, &dsmSegment); - shardSplitInfoSMHeader->stepCount = stepCount; + shardSplitInfoSMHeader->stepCount = shardSplitInfoCount; shardSplitInfoSMHeader->processId = MyProcPid; return shardSplitInfoSMHeader; @@ -154,7 +172,7 @@ CreateShardSplitInfoSharedMemory(int stepCount, Size stepSize, dsm_handle *dsmHa /* - * GetSharedMemoryForShardSplitInfo is a wrapper function which creates shared memory + * CreateSharedMemoryForShardSplitInfo is a wrapper function which creates shared memory * for storing shard split infomation. The function returns pointer the first element * within this array. * @@ -162,12 +180,12 @@ CreateShardSplitInfoSharedMemory(int stepCount, Size stepSize, dsm_handle *dsmHa * dsmHandle - handle of the allocated shared memory segment */ ShardSplitInfo * -GetSharedMemoryForShardSplitInfo(int shardSplitInfoCount, dsm_handle *dsmHandle) +CreateSharedMemoryForShardSplitInfo(int shardSplitInfoCount, dsm_handle *dsmHandle) { ShardSplitInfoSMHeader *shardSplitInfoSMHeader = - CreateShardSplitInfoSharedMemory(shardSplitInfoCount, - sizeof(ShardSplitInfo), - dsmHandle); + AllocateSharedMemoryForShardSplitInfo(shardSplitInfoCount, + sizeof(ShardSplitInfo), + dsmHandle); ShardSplitInfo *shardSplitInfoSMArray = (ShardSplitInfo *) ShardSplitInfoSMSteps(shardSplitInfoSMHeader); @@ -181,7 +199,7 @@ GetSharedMemoryForShardSplitInfo(int shardSplitInfoCount, dsm_handle *dsmHandle) * right after the header, so this function is trivial. The main purpose of * this function is to make the intent clear to readers of the code. */ -void * +static void * ShardSplitInfoSMSteps(ShardSplitInfoSMHeader *shardSplitInfoSMHeader) { return shardSplitInfoSMHeader + 1; diff --git a/src/include/distributed/shardsplit_shared_memory.h b/src/include/distributed/shardsplit_shared_memory.h index 4eb2b867c..4670c3926 100644 --- a/src/include/distributed/shardsplit_shared_memory.h +++ b/src/include/distributed/shardsplit_shared_memory.h @@ -30,27 +30,11 @@ typedef struct ShardSplitInfoSMHeader /* Functions for creating and accessing shared memory segments */ -extern ShardSplitInfoSMHeader * CreateShardSplitInfoSharedMemory(int stepCount, - Size stepSize, - dsm_handle *dsmHandle); - -extern ShardSplitInfo * GetSharedMemoryForShardSplitInfo(int shardSplitInfoCount, - dsm_handle *dsmHandle); +extern ShardSplitInfo * CreateSharedMemoryForShardSplitInfo(int shardSplitInfoCount, + dsm_handle *dsmHandle); extern ShardSplitInfo * GetShardSplitInfoSMArrayForSlot(char *slotName, int *arraySize); -extern dsm_handle GetSMHandleFromSlotName(char *slotName); - -/* - * ShardSplitInfoSMSteps returns a pointer to the array of shard split info - * steps that are stored in shared memory. - */ -extern void * ShardSplitInfoSMSteps(ShardSplitInfoSMHeader *shardSplitInfoSMHeader); - -extern ShardSplitInfoSMHeader * GetShardSplitInfoSMHeaderFromDSMHandle(dsm_handle - dsmHandle, - dsm_segment ** - attachedSegment); /* Functions related to encoding-decoding for replication slot name */ char * encode_replication_slot(uint64_t nodeId,