From b6662ff872f1d0dffff0efc895fdd383b26ee42e Mon Sep 17 00:00:00 2001 From: Sameer Awasekar Date: Wed, 29 Jun 2022 12:13:51 +0530 Subject: [PATCH] Add hashmap, cleanup methods --- .../shardsplit/shardsplit_decoder.c | 257 +++++++----------- .../shardsplit/shardsplit_shared_memory.c | 126 +++++---- .../distributed/shardsplit_shared_memory.h | 22 +- ...plit_shard_replication_colocated_setup.out | 2 +- ...t_shard_replication_setup_remote_local.out | 2 +- 5 files changed, 178 insertions(+), 231 deletions(-) diff --git a/src/backend/distributed/shardsplit/shardsplit_decoder.c b/src/backend/distributed/shardsplit/shardsplit_decoder.c index 90213d2f4..87f2d8f2a 100644 --- a/src/backend/distributed/shardsplit/shardsplit_decoder.c +++ b/src/backend/distributed/shardsplit/shardsplit_decoder.c @@ -11,6 +11,7 @@ #include "postgres.h" #include "distributed/shardinterval_utils.h" #include "distributed/shardsplit_shared_memory.h" +#include "distributed/listutils.h" #include "replication/logical.h" /* @@ -23,18 +24,17 @@ PG_MODULE_MAGIC; extern void _PG_output_plugin_init(OutputPluginCallbacks *cb); static LogicalDecodeChangeCB pgoutputChangeCB; -static ShardSplitInfoSMHeader *ShardSplitInfo_SMHeader = NULL; -static ShardSplitInfoForReplicationSlot *ShardSplitInfoForSlot = NULL; +static HTAB *SourceToDestinationShardMap = NULL; /* Plugin callback */ static void split_change_cb(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change); /* Helper methods */ -static bool IsCommitRecursive(Relation sourceShardRelation); static int32_t GetHashValueForIncomingTuple(Relation sourceShardRelation, HeapTuple tuple, - bool *shouldHandleUpdate); + int partitionColumIndex, + Oid distributedTableOid); static Oid FindTargetRelationOid(Relation sourceShardRelation, HeapTuple tuple, @@ -67,151 +67,6 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb) } -/* - * GetHashValueForIncomingTuple returns the hash value of the partition - * column for the incoming tuple. It also checks if the change should be - * handled as the incoming committed change can belong to a relation - * that is not under going split. - */ -static int32_t -GetHashValueForIncomingTuple(Relation sourceShardRelation, - HeapTuple tuple, - bool *shouldHandleChange) -{ - ShardSplitInfo *shardSplitInfo = NULL; - int partitionColumnIndex = -1; - Oid distributedTableOid = InvalidOid; - - Oid sourceShardOid = sourceShardRelation->rd_id; - for (int i = ShardSplitInfoForSlot->startIndex; i <= ShardSplitInfoForSlot->endIndex; - i++) - { - shardSplitInfo = &ShardSplitInfo_SMHeader->splitInfoArray[i]; - if (shardSplitInfo->sourceShardOid == sourceShardOid) - { - distributedTableOid = shardSplitInfo->distributedTableOid; - partitionColumnIndex = shardSplitInfo->partitionColumnIndex; - break; - } - } - - /* - * The commit can belong to any other table that is not going - * under split. Ignore such commit's. - */ - if (partitionColumnIndex == -1 || - distributedTableOid == InvalidOid) - { - /* - * TODO(saawasek): Change below warning to DEBUG once more test case - * are added. - */ - ereport(WARNING, errmsg("Skipping Commit as " - "Relation: %s isn't splitting", - RelationGetRelationName(sourceShardRelation))); - *shouldHandleChange = false; - return 0; - } - - CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(distributedTableOid); - if (cacheEntry == NULL) - { - ereport(ERROR, errmsg("null entry found for cacheEntry")); - } - - TupleDesc relationTupleDes = RelationGetDescr(sourceShardRelation); - bool isNull = false; - Datum partitionColumnValue = heap_getattr(tuple, - partitionColumnIndex + 1, - relationTupleDes, - &isNull); - - FmgrInfo *hashFunction = cacheEntry->hashFunction; - - /* get hashed value of the distribution value */ - Datum hashedValueDatum = FunctionCall1(hashFunction, partitionColumnValue); - int32_t hashedValue = DatumGetInt32(hashedValueDatum); - - *shouldHandleChange = true; - - return hashedValue; -} - - -/* - * FindTargetRelationOid returns the destination relation Oid for the incoming - * tuple. - * sourceShardRelation - Relation on which a commit has happened. - * tuple - changed tuple. - * currentSlotName - Name of replication slot that is processing this update. - */ -static Oid -FindTargetRelationOid(Relation sourceShardRelation, - HeapTuple tuple, - char *currentSlotName) -{ - Oid targetRelationOid = InvalidOid; - Oid sourceShardRelationOid = sourceShardRelation->rd_id; - - bool shouldHandleUpdate = false; - int hashValue = GetHashValueForIncomingTuple(sourceShardRelation, tuple, - &shouldHandleUpdate); - if (shouldHandleUpdate == false) - { - return InvalidOid; - } - - for (int i = ShardSplitInfoForSlot->startIndex; i <= ShardSplitInfoForSlot->endIndex; - i++) - { - ShardSplitInfo *shardSplitInfo = &ShardSplitInfo_SMHeader->splitInfoArray[i]; - - /* - * Each commit message is processed by all the configured replication slots. - * A replication slot is responsible for shard placements belonging to unique - * table owner and nodeId combination. We check if the current slot which is - * processing the commit should emit a target relation Oid. - */ - if (shardSplitInfo->sourceShardOid == sourceShardRelationOid && - shardSplitInfo->shardMinValue <= hashValue && - shardSplitInfo->shardMaxValue >= hashValue) - { - targetRelationOid = shardSplitInfo->splitChildShardOid; - break; - } - } - - return targetRelationOid; -} - - -/* - * IsCommitRecursive returns true when commit is recursive. When the source shard - * recives a commit(1), the WAL sender processes this commit message. This - * commit is applied to a child shard which is placed on the same node as a - * part of replication. This in turn creates one more commit(2) which is recursive in nature. - * Commit 2 should be skipped as the source shard and destination for commit 2 - * are same and the commit has already been applied. - */ -bool -IsCommitRecursive(Relation sourceShardRelation) -{ - Oid sourceShardOid = sourceShardRelation->rd_id; - for (int i = ShardSplitInfoForSlot->startIndex; i <= ShardSplitInfoForSlot->endIndex; - i++) - { - /* skip the commit when destination is equal to the source */ - ShardSplitInfo *shardSplitInfo = &ShardSplitInfo_SMHeader->splitInfoArray[i]; - if (sourceShardOid == shardSplitInfo->splitChildShardOid) - { - return true; - } - } - - return false; -} - - /* * split_change function emits the incoming tuple change * to the appropriate destination shard. @@ -226,19 +81,13 @@ split_change_cb(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, } /* - * Get ShardSplitInfoForSlot if not already initialized. + * Initialize SourceToDestinationShardMap if not already initialized. * This gets initialized during the replication of first message. */ - if (ShardSplitInfoForSlot == NULL) + if (SourceToDestinationShardMap == NULL) { - ShardSplitInfoForSlot = PopulateShardSplitInfoForReplicationSlot( + SourceToDestinationShardMap = PopulateSourceToDestinationShardMapForSlot( ctx->slot->data.name.data); - ShardSplitInfo_SMHeader = ShardSplitInfoForSlot->shardSplitInfoHeader; - } - - if (IsCommitRecursive(relation)) - { - return; } Oid targetRelationOid = InvalidOid; @@ -288,3 +137,95 @@ split_change_cb(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, pgoutputChangeCB(ctx, txn, targetRelation, change); RelationClose(targetRelation); } + + +/* + * FindTargetRelationOid returns the destination relation Oid for the incoming + * tuple. + * sourceShardRelation - Relation on which a commit has happened. + * tuple - changed tuple. + * currentSlotName - Name of replication slot that is processing this update. + */ +static Oid +FindTargetRelationOid(Relation sourceShardRelation, + HeapTuple tuple, + char *currentSlotName) +{ + Oid targetRelationOid = InvalidOid; + Oid sourceShardRelationOid = sourceShardRelation->rd_id; + + /* Get child shard list for source(parent) shard from hashmap*/ + bool found = false; + SourceToDestinationShardMapEntry *entry = + (SourceToDestinationShardMapEntry *) hash_search( + SourceToDestinationShardMap, &sourceShardRelationOid, HASH_FIND, &found); + + /* + * Source shard Oid might not exist in the hash map. This can happen + * in below cases: + * 1) The commit can belong to any other table that is not under going split. + * 2) The commit can be recursive in nature. When the source shard + * receives a commit(a), the WAL sender processes this commit message. This + * commit is applied to a child shard which is placed on the same node as a + * part of replication. This in turn creates one more commit(b) which is recursive in nature. + * Commit 'b' should be skipped as the source shard and destination for commit 'b' + * are same and the commit has already been applied. + */ + if (!found) + { + return InvalidOid; + } + + ShardSplitInfo *shardSplitInfo = (ShardSplitInfo *) lfirst(list_head( + entry-> + shardSplitInfoList)); + int hashValue = GetHashValueForIncomingTuple(sourceShardRelation, tuple, + shardSplitInfo->partitionColumnIndex, + shardSplitInfo->distributedTableOid); + + shardSplitInfo = NULL; + foreach_ptr(shardSplitInfo, entry->shardSplitInfoList) + { + if (shardSplitInfo->shardMinValue <= hashValue && + shardSplitInfo->shardMaxValue >= hashValue) + { + targetRelationOid = shardSplitInfo->splitChildShardOid; + break; + } + } + + return targetRelationOid; +} + + +/* + * GetHashValueForIncomingTuple returns the hash value of the partition + * column for the incoming tuple. + */ +static int32_t +GetHashValueForIncomingTuple(Relation sourceShardRelation, + HeapTuple tuple, + int partitionColumnIndex, + Oid distributedTableOid) +{ + CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(distributedTableOid); + if (cacheEntry == NULL) + { + ereport(ERROR, errmsg( + "Expected valid Citus Cache entry to be present. But found null")); + } + + TupleDesc relationTupleDes = RelationGetDescr(sourceShardRelation); + bool isNull = false; + Datum partitionColumnValue = heap_getattr(tuple, + partitionColumnIndex + 1, + relationTupleDes, + &isNull); + + FmgrInfo *hashFunction = cacheEntry->hashFunction; + + /* get hashed value of the distribution value */ + Datum hashedValueDatum = FunctionCall1(hashFunction, partitionColumnValue); + + return DatumGetInt32(hashedValueDatum); +} diff --git a/src/backend/distributed/shardsplit/shardsplit_shared_memory.c b/src/backend/distributed/shardsplit/shardsplit_shared_memory.c index e2d7f1460..b96ead387 100644 --- a/src/backend/distributed/shardsplit/shardsplit_shared_memory.c +++ b/src/backend/distributed/shardsplit/shardsplit_shared_memory.c @@ -17,6 +17,7 @@ #include "distributed/citus_safe_lib.h" #include "storage/ipc.h" #include "utils/memutils.h" +#include "common/hashfn.h" const char *SharedMemoryNameForHandleManagement = "Shared memory handle for shard split"; @@ -82,15 +83,8 @@ GetShardSplitInfoSMHeaderFromDSMHandle(dsm_handle dsmHandle) * GetShardSplitInfoSMHeader returns pointer to the header of shared memory segment. */ ShardSplitInfoSMHeader * -GetShardSplitInfoSMHeader(char *slotName) +GetShardSplitInfoSMHeader() { - if (slotName == NULL) - { - ereport(ERROR, - (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED), - errmsg("Expected slot name but found NULL"))); - } - dsm_handle dsmHandle = GetShardSplitSharedMemoryHandle(); ShardSplitInfoSMHeader *shardSplitInfoSMHeader = @@ -168,6 +162,32 @@ CreateSharedMemoryForShardSplitInfo(int shardSplitInfoCount, dsm_handle *dsmHand } +/* + * ReleaseSharedMemoryOfShardSplitInfo releases(unpins) the dynamic shared memory segment + * allocated by 'worker_split_shard_replication_setup'. This shared memory was pinned + * to Postmaster process and is valid till Postmaster shutsdown or + * explicitly unpinned by calling 'dsm_unpin_segment'. + */ +void +ReleaseSharedMemoryOfShardSplitInfo() +{ + /* Get handle of dynamic shared memory segment*/ + dsm_handle dsmHandle = GetShardSplitSharedMemoryHandle(); + + /* + * Unpin the dynamic shared memory segment. 'dsm_pin_segment' was + * called previously by 'AllocateSharedMemoryForShardSplitInfo'. + */ + dsm_unpin_segment(dsmHandle); + + /* + * As dynamic shared memory is unpinned, store an invalid handle in static + * shared memory used for handle management. + */ + StoreShardSplitSharedMemoryHandle(DSM_HANDLE_INVALID); +} + + /* * encode_replication_slot returns an encoded replication slot name * in the following format. @@ -244,8 +264,8 @@ ShardSplitShmemInit(void) /* * StoreShardSplitSharedMemoryHandle stores a handle of shared memory * allocated and populated by 'worker_split_shard_replication_setup' UDF. - * This handle is stored in a different shared memory segment with name - * 'Shared memory handle for shard split'. + * This handle is stored in a different statically allocated shared memory + * segment with name 'Shared memory handle for shard split'. */ void StoreShardSplitSharedMemoryHandle(dsm_handle dsmHandle) @@ -277,8 +297,8 @@ StoreShardSplitSharedMemoryHandle(dsm_handle dsmHandle) { ereport(WARNING, errmsg( - "As a part of split shard workflow,unexpectedly found a valid" - " shared memory handle while storing a new one.")); + "Previous split shard worflow was not successfully and could not complete the cleanup phase." + " Continuing with the current split shard workflow.")); } /* Store the incoming handle */ @@ -289,9 +309,9 @@ StoreShardSplitSharedMemoryHandle(dsm_handle dsmHandle) /* - * GetShardSplitSharedMemoryHandle returns the shared memory handle stored - * by 'worker_split_shard_replication_setup' UDF. This handle - * is requested by wal sender processes during logical replication phase. + * GetShardSplitSharedMemoryHandle returns the handle of dynamic shared memory segment stored + * by 'worker_split_shard_replication_setup' UDF. This handle is requested by WAL sender processes + * during logical replication phase or during cleanup. */ dsm_handle GetShardSplitSharedMemoryHandle(void) @@ -316,64 +336,52 @@ GetShardSplitSharedMemoryHandle(void) /* - * PopulateShardSplitInfoForReplicationSlot function traverses 'ShardSplitInfo' array - * stored within shared memory segment. It returns the starting and ending index position - * of a given slot within this array. When the given replication slot processes a commit, - * traversal is only limited within this bound thus enhancing performance. + * PopulateSourceToDestinationShardMapForSlot populates 'SourceToDestinationShard' hash map for a given slot. + * Key of the map is Oid of source shard which is undergoing a split and value is a list of corresponding child shards. + * To populate the map, the function traverses 'ShardSplitInfo' array stored within shared memory segment. */ -ShardSplitInfoForReplicationSlot * -PopulateShardSplitInfoForReplicationSlot(char *slotName) +HTAB * +PopulateSourceToDestinationShardMapForSlot(char *slotName) { - ShardSplitInfoSMHeader *smHeader = GetShardSplitInfoSMHeader(slotName); + HASHCTL info; + memset(&info, 0, sizeof(info)); + info.keysize = sizeof(Oid); + info.entrysize = sizeof(SourceToDestinationShardMapEntry); + info.hash = uint32_hash; + info.hcxt = TopMemoryContext; + + int hashFlags = (HASH_ELEM | HASH_CONTEXT | HASH_FUNCTION); + HTAB *sourceShardToDesShardMap = hash_create("SourceToDestinationShardMap", 128, + &info, hashFlags); MemoryContext oldContext = MemoryContextSwitchTo(TopMemoryContext); - ShardSplitInfoForReplicationSlot *infoForReplicationSlot = - (ShardSplitInfoForReplicationSlot *) palloc0( - sizeof(ShardSplitInfoForReplicationSlot)); - infoForReplicationSlot->shardSplitInfoHeader = smHeader; - infoForReplicationSlot->startIndex = -1; - infoForReplicationSlot->endIndex = -1; - - int splitInfoIndex = 0; - while (splitInfoIndex < smHeader->count) + ShardSplitInfoSMHeader *smHeader = GetShardSplitInfoSMHeader(); + for (int index = 0; index < smHeader->count; index++) { - if (strcmp(smHeader->splitInfoArray[splitInfoIndex].slotName, slotName) == 0) + if (strcmp(smHeader->splitInfoArray[index].slotName, slotName) == 0) { - /* Found the starting index from where current slot information begins */ - infoForReplicationSlot->startIndex = splitInfoIndex; + Oid sourceShardOid = smHeader->splitInfoArray[index].sourceShardOid; + bool found = false; + SourceToDestinationShardMapEntry *entry = + (SourceToDestinationShardMapEntry *) hash_search( + sourceShardToDesShardMap, &sourceShardOid, HASH_ENTER, &found); - /* Slide forward to get the ending index */ - splitInfoIndex++; - while (splitInfoIndex < smHeader->count && strcmp( - smHeader->splitInfoArray[splitInfoIndex].slotName, slotName) == 0) + if (!found) { - splitInfoIndex++; + entry->shardSplitInfoList = NIL; + entry->sourceShardKey = sourceShardOid; } - /* Found ending index */ - infoForReplicationSlot->endIndex = splitInfoIndex - 1; + ShardSplitInfo *shardSplitInfoForSlot = (ShardSplitInfo *) palloc0( + sizeof(ShardSplitInfo)); + *shardSplitInfoForSlot = smHeader->splitInfoArray[index]; - /* - * 'ShardSplitInfo' with same slot name are stored contiguously in shared memory segment. - * After the current 'index' position, we should not encounter any 'ShardSplitInfo' with incoming slot name. - * If this happens, there is shared memory corruption. Its worth to go ahead and assert for this assumption. - */ - break; + entry->shardSplitInfoList = lappend(entry->shardSplitInfoList, + (ShardSplitInfo *) shardSplitInfoForSlot); } - - splitInfoIndex++; - } - - if (infoForReplicationSlot->startIndex == -1) - { - ereport(ERROR, - (errmsg("Unexpectedly could not find information " - "corresponding to replication slot name:%s in shared memory.", - slotName))); } MemoryContextSwitchTo(oldContext); - - return infoForReplicationSlot; + return sourceShardToDesShardMap; } diff --git a/src/include/distributed/shardsplit_shared_memory.h b/src/include/distributed/shardsplit_shared_memory.h index 31e89f99a..a553686c8 100644 --- a/src/include/distributed/shardsplit_shared_memory.h +++ b/src/include/distributed/shardsplit_shared_memory.h @@ -29,17 +29,15 @@ typedef struct ShardSplitInfoSMHeader * Shard split information is populated and stored in shared memory in the form of one dimensional * array by 'worker_split_shard_replication_setup'. Information belonging to same replication * slot is grouped together and stored contiguously within this array. - * 'ShardSplitInfoForReplicationSlot' stores the starting and ending indices for a particular - * replication slot within shared memory segment. - * When a slot processes a commit, traversing only within this boundary of shared memory segment - * improves performance. + * 'SourceToDestinationShardMap' maps list of child(destination) shards that should be processed by a replication + * slot corresponding to a parent(source) shard. When a parent shard receives a change, the decoder can use this map + * to traverse only the list of child shards corresponding the given parent. */ -typedef struct ShardSplitInfoForReplicationSlot +typedef struct SourceToDestinationShardMapEntry { - ShardSplitInfoSMHeader *shardSplitInfoHeader; /* shared memory segment header */ - int startIndex; /* starting index for a given slot */ - int endIndex; /* ending index for a given slot */ -} ShardSplitInfoForReplicationSlot; + Oid sourceShardKey; + List *shardSplitInfoList; +} SourceToDestinationShardMapEntry; typedef struct ShardSplitShmemData { @@ -60,11 +58,11 @@ dsm_handle GetShardSplitSharedMemoryHandle(void); extern ShardSplitInfoSMHeader * CreateSharedMemoryForShardSplitInfo(int shardSplitInfoCount, dsm_handle *dsmHandle); -extern ShardSplitInfoSMHeader * GetShardSplitInfoSMHeader(char *slotName); +extern void ReleaseSharedMemoryOfShardSplitInfo(void); -extern ShardSplitInfoForReplicationSlot * PopulateShardSplitInfoForReplicationSlot( - char *slotName); +extern ShardSplitInfoSMHeader * GetShardSplitInfoSMHeader(void); +extern HTAB * PopulateSourceToDestinationShardMapForSlot(char *slotName); char * encode_replication_slot(uint32_t nodeId, uint32_t tableOwnerId); #endif /* SHARDSPLIT_SHARED_MEMORY_H */ diff --git a/src/test/regress/expected/split_shard_replication_colocated_setup.out b/src/test/regress/expected/split_shard_replication_colocated_setup.out index cb513c023..c05dc81b0 100644 --- a/src/test/regress/expected/split_shard_replication_colocated_setup.out +++ b/src/test/regress/expected/split_shard_replication_colocated_setup.out @@ -70,7 +70,7 @@ SELECT worker_split_shard_replication_setup(ARRAY[ ROW(7, 8, '-2147483648', '-1', :worker_2_node)::citus.split_shard_info, ROW(7, 9, '0', '2147483647', :worker_2_node)::citus.split_shard_info ]); -WARNING: As a part of split shard workflow,unexpectedly found a valid shared memory handle while storing a new one. +WARNING: Previous split shard worflow was not successfully and could not complete the cleanup phase. Continuing with the current split shard workflow. worker_split_shard_replication_setup --------------------------------------------------------------------- diff --git a/src/test/regress/expected/split_shard_replication_setup_remote_local.out b/src/test/regress/expected/split_shard_replication_setup_remote_local.out index 6106bf2cb..09c5fddec 100644 --- a/src/test/regress/expected/split_shard_replication_setup_remote_local.out +++ b/src/test/regress/expected/split_shard_replication_setup_remote_local.out @@ -12,7 +12,7 @@ SELECT worker_split_shard_replication_setup(ARRAY[ ROW(1, 2, '-2147483648', '-1', :worker_1_node)::citus.split_shard_info, ROW(1, 3, '0', '2147483647', :worker_2_node)::citus.split_shard_info ]); -WARNING: As a part of split shard workflow,unexpectedly found a valid shared memory handle while storing a new one. +WARNING: Previous split shard worflow was not successfully and could not complete the cleanup phase. Continuing with the current split shard workflow. worker_split_shard_replication_setup ---------------------------------------------------------------------