From 75c6484e02f4a182789953057004ee2c6f979500 Mon Sep 17 00:00:00 2001 From: Sameer Awasekar Date: Tue, 24 May 2022 20:02:00 +0530 Subject: [PATCH] Address review comments(partially done) - Parameterize testcase methods - remove code for handline 'UPDATE'. - replication slot encoding/decoding logic changed now --- .../operations/shard_split_replicatoin.c | 45 +++++----- src/backend/distributed/shardsplit/Makefile | 1 - src/backend/distributed/shardsplit/pgoutput.c | 89 +++---------------- .../shardsplit/shardsplit_shared_memory.c | 15 +--- .../distributed/shardsplit_shared_memory.h | 22 ----- 5 files changed, 36 insertions(+), 136 deletions(-) diff --git a/src/backend/distributed/operations/shard_split_replicatoin.c b/src/backend/distributed/operations/shard_split_replicatoin.c index 504ac1874..932125753 100644 --- a/src/backend/distributed/operations/shard_split_replicatoin.c +++ b/src/backend/distributed/operations/shard_split_replicatoin.c @@ -48,27 +48,28 @@ static void ParseShardSplitInfo(ArrayType *shardInfoArrayObject, int32 *minValue, int32 *maxValue, int32 *nodeId); -static ShardSplitInfo * GetShardSplitInfo(uint64 sourceShardIdToSplit, +static ShardSplitInfo * CreateShardSplitInfo(uint64 sourceShardIdToSplit, uint64 desSplitChildShardId, int32 minValue, int32 maxValue, int32 nodeId); -static void AddShardSplitInfoEntryForNode(ShardSplitInfo *shardSplitInfo); -static void * CopyShardSplitInfoToSM(ShardSplitInfo *shardSplitInfoArray, +static void AddShardSplitInfoEntryForNodeInMap(ShardSplitInfo *shardSplitInfo); +static void * PopulateShardSplitInfoInSM(ShardSplitInfo *shardSplitInfoArray, HTAB *shardInfoHashMap, dsm_handle dsmHandle, int shardSplitInfoCount); -static HTAB * SetupHashMapForShardInfo(); +static void SetupHashMapForShardInfo(); /* * split_shard_replication_setup UDF creates in-memory data structures - * to store the meta information about the new shards and their placements - * required during the catch up phase of logical replication. + * to store the meta information about the shard undergoing split and new split + * children along with their placements required during the catch up phase + * of logical replication. * This meta information is stored in a shared memory segment and accessed * by logical decoding plugin. * * Split information is given by user as an Array in the below format - * [{sourceShardId, childShardID, minValue, maxValue, Destination NodeId}] + * [{sourceShardId, childShardId, minValue, maxValue, Destination NodeId}] * * sourceShardId - id of the shard that is undergoing a split * childShardId - id of shard that stores a specific range of values @@ -83,7 +84,7 @@ static HTAB * SetupHashMapForShardInfo(); * Multiple shards can be placed on the same destiation node. Source and * destinations nodes can be same too. * - * There is a 1-1 mapping between a node and a replication slot as one replication + * There is a 1-1 mapping between a target node and a replication slot as one replication * slot takes care of replicating changes for one node. * The 'logical_decoding_plugin' consumes this information and routes the tuple * from the source shard to the appropriate destination shard that falls in the @@ -94,7 +95,6 @@ split_shard_replication_setup(PG_FUNCTION_ARGS) { ArrayType *shardInfoArrayObject = PG_GETARG_ARRAYTYPE_P(0); int shardInfoArrayLength = ARR_DIMS(shardInfoArrayObject)[0]; - int insideCount = ARR_DIMS(shardInfoArrayObject)[1]; /* SetupMap */ SetupHashMapForShardInfo(); @@ -117,14 +117,14 @@ split_shard_replication_setup(PG_FUNCTION_ARGS) &maxValue, &nodeId); - ShardSplitInfo *shardSplitInfo = GetShardSplitInfo( + ShardSplitInfo *shardSplitInfo = CreateShardSplitInfo( sourceShardId, desShardId, minValue, maxValue, nodeId); - AddShardSplitInfoEntryForNode(shardSplitInfo); + AddShardSplitInfoEntryForNodeInMap(shardSplitInfo); shardSplitInfoCount++; } @@ -132,7 +132,7 @@ split_shard_replication_setup(PG_FUNCTION_ARGS) ShardSplitInfo *splitShardInfoSMArray = GetSharedMemoryForShardSplitInfo(shardSplitInfoCount, &dsmHandle); - CopyShardSplitInfoToSM(splitShardInfoSMArray, + PopulateShardSplitInfoInSM(splitShardInfoSMArray, ShardInfoHashMap, dsmHandle, shardSplitInfoCount); @@ -147,7 +147,7 @@ split_shard_replication_setup(PG_FUNCTION_ARGS) * is 'nodeId' and value is a list of ShardSplitInfo that are placed on * this particular node. */ -static HTAB * +static void SetupHashMapForShardInfo() { HASHCTL info; @@ -159,7 +159,6 @@ SetupHashMapForShardInfo() int hashFlags = (HASH_ELEM | HASH_CONTEXT | HASH_FUNCTION); ShardInfoHashMap = hash_create("ShardInfoMap", 128, &info, hashFlags); - return ShardInfoHashMap; } @@ -173,7 +172,7 @@ ParseShardSplitInfo(ArrayType *shardInfoArrayObject, int32 *nodeId) { Oid elemtypeId = ARR_ELEMTYPE(shardInfoArrayObject); - int16 elemtypeLength = 0; + int elemtypeLength = 0; bool elemtypeByValue = false; char elemtypeAlignment = 0; get_typlenbyvalalign(elemtypeId, &elemtypeLength, &elemtypeByValue, @@ -304,7 +303,7 @@ ParseShardSplitInfo(ArrayType *shardInfoArrayObject, /* - * GetShardSplitInfo function constructs ShardSplitInfo data structure + * CreateShardSplitInfo function constructs ShardSplitInfo data structure * with appropriate OIs' for source and destination relation. * * sourceShardIdToSplit - Existing shardId which has a valid entry in cache and catalogue @@ -315,7 +314,7 @@ ParseShardSplitInfo(ArrayType *shardInfoArrayObject, * However we can use shard ID and construct qualified shardName. */ ShardSplitInfo * -GetShardSplitInfo(uint64 sourceShardIdToSplit, +CreateShardSplitInfo(uint64 sourceShardIdToSplit, uint64 desSplitChildShardId, int32 minValue, int32 maxValue, @@ -386,11 +385,11 @@ GetShardSplitInfo(uint64 sourceShardIdToSplit, /* - * AddShardSplitInfoEntryForNode function add's ShardSplitInfo entry + * AddShardSplitInfoEntryForNodeInMap function add's ShardSplitInfo entry * to the hash map. The key is nodeId on which the new shard is to be placed. */ void -AddShardSplitInfoEntryForNode(ShardSplitInfo *shardSplitInfo) +AddShardSplitInfoEntryForNodeInMap(ShardSplitInfo *shardSplitInfo) { uint64_t keyNodeId = shardSplitInfo->nodeId; bool found = false; @@ -412,7 +411,7 @@ AddShardSplitInfoEntryForNode(ShardSplitInfo *shardSplitInfo) /* - * CopyShardSplitInfoToSM function copies information from the hash map + * PopulateShardSplitInfoInSM function copies information from the hash map * into shared memory segment. This information is consumed by the WAL sender * process during logical replication. * @@ -425,7 +424,7 @@ AddShardSplitInfoEntryForNode(ShardSplitInfo *shardSplitInfo) * dsmHandle - Shared memory segment handle */ void * -CopyShardSplitInfoToSM(ShardSplitInfo *shardSplitInfoArray, +PopulateShardSplitInfoInSM(ShardSplitInfo *shardSplitInfoArray, HTAB *shardInfoHashMap, dsm_handle dsmHandle, int shardSplitInfoCount) @@ -439,9 +438,7 @@ CopyShardSplitInfoToSM(ShardSplitInfo *shardSplitInfoArray, { uint64_t nodeId = entry->key; char *derivedSlotName = - encode_replication_slot(nodeId, - SLOT_HANDLING_INSERT_AND_DELETE, - dsmHandle); + encode_replication_slot(nodeId, dsmHandle); List *shardSplitInfoList = entry->shardSplitInfoList; ListCell *listCell = NULL; diff --git a/src/backend/distributed/shardsplit/Makefile b/src/backend/distributed/shardsplit/Makefile index 942125ca4..f0181816d 100644 --- a/src/backend/distributed/shardsplit/Makefile +++ b/src/backend/distributed/shardsplit/Makefile @@ -7,7 +7,6 @@ # src/backend/replication/pgoutput # #------------------------------------------------------------------------- -citus_subdir = src/backend/distributed/shardsplit citus_top_builddir = ../../../.. safestringlib_srcdir = $(citus_abs_top_srcdir)/vendor/safestringlib SUBDIRS = . safeclib diff --git a/src/backend/distributed/shardsplit/pgoutput.c b/src/backend/distributed/shardsplit/pgoutput.c index 6c1b05b0c..1917b7e09 100644 --- a/src/backend/distributed/shardsplit/pgoutput.c +++ b/src/backend/distributed/shardsplit/pgoutput.c @@ -43,13 +43,12 @@ int shardSplitInfoArraySize = 0; /* Plugin callback */ -static void split_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, +static void split_change_cb(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change); /* Helper methods */ -static bool ShouldSlotHandleChange(char *slotName, ReorderBufferChange *change); static bool ShouldCommitBeApplied(Relation sourceShardRelation); -static int GetHashValueForIncomingTuple(Relation sourceShardRelation, +static int32_t GetHashValueForIncomingTuple(Relation sourceShardRelation, HeapTuple tuple, bool *shouldHandleUpdate); @@ -73,7 +72,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb) pgoutputChangeCB = cb->change_cb; - cb->change_cb = split_change; + cb->change_cb = split_change_cb; } @@ -83,7 +82,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb) * handled as the incoming committed change would belong to a relation * that is not under going split. */ -static int +static int32_t GetHashValueForIncomingTuple(Relation sourceShardRelation, HeapTuple tuple, bool *shouldHandleChange) @@ -139,7 +138,7 @@ GetHashValueForIncomingTuple(Relation sourceShardRelation, /* get hashed value of the distribution value */ Datum hashedValueDatum = FunctionCall1(hashFunction, partitionColumnValue); - int hashedValue = DatumGetInt32(hashedValueDatum); + int32_t hashedValue = DatumGetInt32(hashedValueDatum); *shouldHandleChange = true; @@ -226,42 +225,12 @@ ShouldCommitBeApplied(Relation sourceShardRelation) } -bool -ShouldSlotHandleChange(char *slotName, ReorderBufferChange *change) -{ - if (slotName == NULL) - { - ereport(ERROR, errmsg("Invalid null replication slot name.")); - } - - uint64_t nodeId = 0; - uint32_t slotType = 0; - dsm_handle dsmHandle; - - /* change this to enum */ - decode_replication_slot(slotName, &nodeId, &slotType, &dsmHandle); - if (slotType != SLOT_HANDLING_INSERT_AND_DELETE && - slotType != SLOT_HANDLING_DELETE_OF_UPDATE) - { - ereport(ERROR, errmsg("Invalid replication slot type.")); - } - - if (slotType == SLOT_HANDLING_DELETE_OF_UPDATE && - change->action != REORDER_BUFFER_CHANGE_UPDATE) - { - return false; - } - - return true; -} - - /* * split_change function emits the incoming tuple change * to the appropriate destination shard. */ static void -split_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, +split_change_cb(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change) { /* @@ -279,21 +248,11 @@ split_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, } char *replicationSlotName = ctx->slot->data.name.data; - bool shouldHandleChanges = false; - if (!ShouldSlotHandleChange(replicationSlotName, change)) - { - return; - } - if (!ShouldCommitBeApplied(relation)) { return; } - uint64_t nodeId = 0; - uint32 slotType = 0; - dsm_handle dsmHandle = 0; - decode_replication_slot(replicationSlotName, &nodeId, &slotType, &dsmHandle); Oid targetRelationOid = InvalidOid; switch (change->action) { @@ -305,36 +264,6 @@ split_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, break; } - case REORDER_BUFFER_CHANGE_UPDATE: - { - switch (slotType) - { - case SLOT_HANDLING_INSERT_AND_DELETE: - { - HeapTuple newTuple = &(change->data.tp.newtuple->tuple); - Oid destinationForInsert = FindTargetRelationOid(relation, newTuple, - replicationSlotName); - targetRelationOid = destinationForInsert; - change->action = REORDER_BUFFER_CHANGE_INSERT; - break; - } - - case SLOT_HANDLING_DELETE_OF_UPDATE: - { - char *modifiedSlotName = encode_replication_slot(nodeId, 0, - dsmHandle); - HeapTuple oldTuple = &(change->data.tp.oldtuple->tuple); - Oid destinationForDelete = FindTargetRelationOid(relation, oldTuple, - modifiedSlotName); - targetRelationOid = destinationForDelete; - change->action = REORDER_BUFFER_CHANGE_DELETE; - break; - } - } - - break; - } - case REORDER_BUFFER_CHANGE_DELETE: { HeapTuple oldTuple = &(change->data.tp.oldtuple->tuple); @@ -343,8 +272,12 @@ split_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, break; } - } + /* Only INSERT/DELETE are visible in the replication path of split shard */ + default: + Assert(false); + } + if (targetRelationOid != InvalidOid) { Relation targetRelation = RelationIdGetRelation(targetRelationOid); diff --git a/src/backend/distributed/shardsplit/shardsplit_shared_memory.c b/src/backend/distributed/shardsplit/shardsplit_shared_memory.c index 973c007d1..a70999fec 100644 --- a/src/backend/distributed/shardsplit/shardsplit_shared_memory.c +++ b/src/backend/distributed/shardsplit/shardsplit_shared_memory.c @@ -97,9 +97,8 @@ GetSMHandleFromSlotName(char *slotName) } uint64_t nodeId = 0; - uint32_t type = 0; dsm_handle handle = 0; - decode_replication_slot(slotName, &nodeId, &type, &handle); + decode_replication_slot(slotName, &nodeId, &handle); return handle; } @@ -192,27 +191,25 @@ ShardSplitInfoSMSteps(ShardSplitInfoSMHeader *shardSplitInfoSMHeader) /* * encode_replication_slot returns an encoded replication slot name * in the following format. - * Slot Name = NodeId_ReplicationSlotType_SharedMemoryHandle + * Slot Name = NodeId_SharedMemoryHandle */ char * encode_replication_slot(uint64_t nodeId, - uint32 slotType, dsm_handle dsmHandle) { StringInfo slotName = makeStringInfo(); - appendStringInfo(slotName, "%ld_%u_%u", nodeId, slotType, dsmHandle); + appendStringInfo(slotName, "%ld_%u", nodeId, dsmHandle); return slotName->data; } /* * decode_replication_slot decodes the replication slot name - * into node id, slotType, shared memory handle. + * into node id, shared memory handle. */ void decode_replication_slot(char *slotName, uint64_t *nodeId, - uint32_t *slotType, dsm_handle *dsmHandle) { if (slotName == NULL) @@ -233,10 +230,6 @@ decode_replication_slot(char *slotName, *nodeId = SafeStringToUint64(slotNameString); } else if (index == 1) - { - *slotType = strtoul(slotNameString, NULL, 10); - } - else if (index == 2) { *dsmHandle = strtoul(slotNameString, NULL, 10); } diff --git a/src/include/distributed/shardsplit_shared_memory.h b/src/include/distributed/shardsplit_shared_memory.h index 3ca41ffe7..4eb2b867c 100644 --- a/src/include/distributed/shardsplit_shared_memory.h +++ b/src/include/distributed/shardsplit_shared_memory.h @@ -52,33 +52,11 @@ extern ShardSplitInfoSMHeader * GetShardSplitInfoSMHeaderFromDSMHandle(dsm_handl dsm_segment ** attachedSegment); -/* - * An UPADATE request for a partition key, is realized as 'DELETE' on - * old shard and 'INSERT' on new shard. So a commit of UPDATE has to be - * seggrated in two replication messages. WAL sender belonging to a - * replication slot can send only one message and hence to handle UPDATE we - * have to create one extra replication slot per node that handles the deletion - * part of an UPDATE. - * - * SLOT_HANDING_INSERT_AND_DELETE - Responsible for handling INSERT and DELETE - * operations. - * SLOT_HANDLING_DELETE_OF_UPDATE - Responsible for only handling DELETE on old shard - * for an UPDATE. Its a no-op for INSERT and DELETE - * operations. - */ -enum ReplicationSlotType -{ - SLOT_HANDLING_INSERT_AND_DELETE, - SLOT_HANDLING_DELETE_OF_UPDATE -}; - /* Functions related to encoding-decoding for replication slot name */ char * encode_replication_slot(uint64_t nodeId, - uint32 slotType, dsm_handle dsmHandle); void decode_replication_slot(char *slotName, uint64_t *nodeId, - uint32_t *slotType, dsm_handle *dsmHandle); #endif /* SHARDSPLIT_SHARED_MEMORY_H */