From 60ee33cfcc0c324de358c25c3ccc4e2bfd5b8740 Mon Sep 17 00:00:00 2001 From: Sameer Awasekar Date: Fri, 27 May 2022 18:14:03 +0530 Subject: [PATCH] Add testcase and handling of UPDATE for non-partition column. - Change nodeId to uint32 - some test enchancement - fix comments --- .../split_shard_replication_setup.c | 12 ++--- src/backend/distributed/shardsplit/pgoutput.c | 13 ++++- .../shardsplit/shardsplit_shared_memory.c | 31 +++++++++--- src/include/distributed/shard_split.h | 2 +- .../distributed/shardsplit_shared_memory.h | 10 ++-- ...plit_shard_replication_colocated_setup.out | 13 ----- .../split_shard_replication_setup.out | 50 +++++++++++++------ ...plit_shard_replication_colocated_setup.sql | 4 -- .../sql/split_shard_replication_setup.sql | 19 +++++-- 9 files changed, 95 insertions(+), 59 deletions(-) diff --git a/src/backend/distributed/operations/split_shard_replication_setup.c b/src/backend/distributed/operations/split_shard_replication_setup.c index ddec5a4c8..477b796a9 100644 --- a/src/backend/distributed/operations/split_shard_replication_setup.c +++ b/src/backend/distributed/operations/split_shard_replication_setup.c @@ -25,7 +25,7 @@ static HTAB *ShardInfoHashMap = NULL; /* Entry for hash map */ typedef struct NodeShardMappingEntry { - uint64_t keyNodeId; + uint32_t keyNodeId; List *shardSplitInfoList; } NodeShardMappingEntry; @@ -76,8 +76,8 @@ static void SetupHashMapForShardInfo(); * * Usage Semantics: * This UDF returns a shared memory handle where the information is stored. This shared memory - * handle is used by caller to encode replication slot name as "NodeId_MemoryHandle" for every - * distinct target node. The same encoded slot name is stored in one of the fields of the + * handle is used by caller to encode replication slot name as "NodeId_SharedMemoryHandle" for every + * distinct target node. The same encoded slot name is stored in one of the fields of the * in-memory data structure(ShardSplitInfo). * * There is a 1-1 mapping between a target node and a replication slot as one replication @@ -151,7 +151,7 @@ SetupHashMapForShardInfo() { HASHCTL info; memset(&info, 0, sizeof(info)); - info.keysize = sizeof(uint64_t); + info.keysize = sizeof(uint32_t); info.entrysize = sizeof(NodeShardMappingEntry); info.hash = uint32_hash; info.hcxt = CurrentMemoryContext; @@ -388,7 +388,7 @@ CreateShardSplitInfo(uint64 sourceShardIdToSplit, void AddShardSplitInfoEntryForNodeInMap(ShardSplitInfo *shardSplitInfo) { - uint64_t keyNodeId = shardSplitInfo->nodeId; + uint32_t keyNodeId = shardSplitInfo->nodeId; bool found = false; NodeShardMappingEntry *nodeMappingEntry = (NodeShardMappingEntry *) hash_search(ShardInfoHashMap, &keyNodeId, HASH_ENTER, @@ -433,7 +433,7 @@ PopulateShardSplitInfoInSM(ShardSplitInfo *shardSplitInfoArray, int index = 0; while ((entry = (NodeShardMappingEntry *) hash_seq_search(&status)) != NULL) { - uint64_t nodeId = entry->keyNodeId; + uint32_t nodeId = entry->keyNodeId; char *derivedSlotName = encode_replication_slot(nodeId, dsmHandle); diff --git a/src/backend/distributed/shardsplit/pgoutput.c b/src/backend/distributed/shardsplit/pgoutput.c index 42422962d..ac23cf05a 100644 --- a/src/backend/distributed/shardsplit/pgoutput.c +++ b/src/backend/distributed/shardsplit/pgoutput.c @@ -241,6 +241,15 @@ split_change_cb(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, break; } + /* updating non-partition column value */ + case REORDER_BUFFER_CHANGE_UPDATE: + { + HeapTuple newTuple = &(change->data.tp.newtuple->tuple); + targetRelationOid = FindTargetRelationOid(relation, newTuple, + replicationSlotName); + break; + } + case REORDER_BUFFER_CHANGE_DELETE: { HeapTuple oldTuple = &(change->data.tp.oldtuple->tuple); @@ -250,10 +259,10 @@ split_change_cb(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, break; } - /* Only INSERT/DELETE actions are visible in the replication path of split shard */ + /* Only INSERT/DELETE/UPDATE actions are visible in the replication path of split shard */ default: ereport(ERROR, errmsg( - "Unexpected Action :%d. Expected action is INSERT or DELETE", + "Unexpected Action :%d. Expected action is INSERT/DELETE/UPDATE", change->action)); } diff --git a/src/backend/distributed/shardsplit/shardsplit_shared_memory.c b/src/backend/distributed/shardsplit/shardsplit_shared_memory.c index 8dceb20a2..4e40ce52f 100644 --- a/src/backend/distributed/shardsplit/shardsplit_shared_memory.c +++ b/src/backend/distributed/shardsplit/shardsplit_shared_memory.c @@ -51,7 +51,11 @@ GetShardSplitInfoSMHeaderFromDSMHandle(dsm_handle dsmHandle) "corresponding to handle:%u", dsmHandle))); } - /* Remain attached until end of backend or DetachSession(). */ + /* + * By default, mappings are owned by current resource owner, which typically + * means they stick around for the duration of current query. + * Keep a dynamic shared memory mapping until end of session to avoid warnings and leak. + */ dsm_pin_mapping(dsmSegment); ShardSplitInfoSMHeader *header = (ShardSplitInfoSMHeader *) dsm_segment_address( @@ -77,7 +81,7 @@ GetShardSplitInfoSMArrayForSlot(char *slotName, int *shardSplitInfoCount) } dsm_handle dsmHandle; - uint64_t nodeId = 0; + uint32_t nodeId = 0; decode_replication_slot(slotName, &nodeId, &dsmHandle); ShardSplitInfoSMHeader *shardSplitInfoSMHeader = @@ -182,11 +186,11 @@ ShardSplitInfoSMData(ShardSplitInfoSMHeader *shardSplitInfoSMHeader) * Slot Name = NodeId_SharedMemoryHandle */ char * -encode_replication_slot(uint64_t nodeId, +encode_replication_slot(uint32_t nodeId, dsm_handle dsmHandle) { StringInfo slotName = makeStringInfo(); - appendStringInfo(slotName, "%ld_%u", nodeId, dsmHandle); + appendStringInfo(slotName, "%u_%u", nodeId, dsmHandle); return slotName->data; } @@ -197,7 +201,7 @@ encode_replication_slot(uint64_t nodeId, */ void decode_replication_slot(char *slotName, - uint64_t *nodeId, + uint32_t *nodeId, dsm_handle *dsmHandle) { if (slotName == NULL || @@ -206,7 +210,7 @@ decode_replication_slot(char *slotName, { ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), - errmsg("Invalid null replication slot name."))); + errmsg("Invalid Out parameters"))); } int index = 0; @@ -215,10 +219,13 @@ decode_replication_slot(char *slotName, char *slotNameString = strtok_r(dupSlotName, "_", &strtokPosition); while (slotNameString != NULL) { + /* first part of the slot name is NodeId */ if (index == 0) { - *nodeId = SafeStringToUint64(slotNameString); + *nodeId = strtoul(slotNameString, NULL, 10); } + + /* second part of the name is memory handle */ else if (index == 1) { *dsmHandle = strtoul(slotNameString, NULL, 10); @@ -226,4 +233,14 @@ decode_replication_slot(char *slotName, slotNameString = strtok_r(NULL, "_", &strtokPosition); index++; } + + /* + * Replication slot name is encoded as NodeId_SharedMemoryHandle. Hence the number of tokens + * would be strictly two considering "_" as delimiter. + */ + if (index != 2) + { + ereport(ERROR, + (errmsg("Invalid Replication Slot name encoding: %s", slotName))); + } } diff --git a/src/include/distributed/shard_split.h b/src/include/distributed/shard_split.h index 46c32ff45..521a9b2b2 100644 --- a/src/include/distributed/shard_split.h +++ b/src/include/distributed/shard_split.h @@ -39,7 +39,7 @@ typedef struct ShardSplitInfo Oid splitChildShardOid; /* child shard Oid */ int32 shardMinValue; /* min hash value */ int32 shardMaxValue; /* max hash value */ - uint64 nodeId; /* node where child shard is to be placed */ + uint32_t nodeId; /* node where child shard is to be placed */ char slotName[NAMEDATALEN]; /* replication slot name belonging to this node */ } ShardSplitInfo; diff --git a/src/include/distributed/shardsplit_shared_memory.h b/src/include/distributed/shardsplit_shared_memory.h index ee1e389bd..7ecc47348 100644 --- a/src/include/distributed/shardsplit_shared_memory.h +++ b/src/include/distributed/shardsplit_shared_memory.h @@ -2,9 +2,9 @@ * * shardsplit_shared_memory.h * API's for creating and accessing shared memory segments to store - * shard split information. 'setup_shard_replication' UDF creates the - * shared memory, populates the contents and WAL sender processes are - * the consumers. + * shard split information. 'split_shard_replication_setup' UDF creates the + * shared memory and populates the contents. WAL sender processes are consumer + * of split information for appropriate tuple routing. * * Copyright (c) Citus Data, Inc. * @@ -36,10 +36,10 @@ extern ShardSplitInfo * GetShardSplitInfoSMArrayForSlot(char *slotName, /* Functions related to encoding-decoding for replication slot name */ -char * encode_replication_slot(uint64_t nodeId, +char * encode_replication_slot(uint32_t nodeId, dsm_handle dsmHandle); void decode_replication_slot(char *slotName, - uint64_t *nodeId, + uint32_t *nodeId, dsm_handle *dsmHandle); #endif /* SHARDSPLIT_SHARED_MEMORY_H */ diff --git a/src/test/regress/expected/split_shard_replication_colocated_setup.out b/src/test/regress/expected/split_shard_replication_colocated_setup.out index fe9aa21df..6f8e71e45 100644 --- a/src/test/regress/expected/split_shard_replication_colocated_setup.out +++ b/src/test/regress/expected/split_shard_replication_colocated_setup.out @@ -2,19 +2,6 @@ SET citus.shard_replication_factor TO 1; SET citus.shard_count TO 1; SET citus.next_shard_id TO 4; --- Add two additional nodes to cluster. -SELECT 1 FROM citus_add_node('localhost', :worker_1_port); - ?column? ---------------------------------------------------------------------- - 1 -(1 row) - -SELECT 1 FROM citus_add_node('localhost', :worker_2_port); - ?column? ---------------------------------------------------------------------- - 1 -(1 row) - SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \gset -- Create distributed table (co-located) diff --git a/src/test/regress/expected/split_shard_replication_setup.out b/src/test/regress/expected/split_shard_replication_setup.out index 948493071..67c0e26a1 100644 --- a/src/test/regress/expected/split_shard_replication_setup.out +++ b/src/test/regress/expected/split_shard_replication_setup.out @@ -1,19 +1,6 @@ SET citus.shard_replication_factor TO 1; SET citus.shard_count TO 1; SET citus.next_shard_id TO 1; --- Add two additional nodes to cluster. -SELECT 1 FROM citus_add_node('localhost', :worker_1_port); - ?column? ---------------------------------------------------------------------- - 1 -(1 row) - -SELECT 1 FROM citus_add_node('localhost', :worker_2_port); - ?column? ---------------------------------------------------------------------- - 1 -(1 row) - SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \gset -- Create distributed table (non co-located) @@ -162,7 +149,37 @@ SELECT * from table_to_split_3; 500 | a (2 rows) --- Delete data from table_to_split_1 from worker1 +-- UPDATE data of table_to_split_1 from worker1 +\c - - - :worker_1_port +UPDATE table_to_split_1 SET value='b' where id = 100; +UPDATE table_to_split_1 SET value='b' where id = 400; +UPDATE table_to_split_1 SET value='b' where id = 500; +SELECT pg_sleep(10); + pg_sleep +--------------------------------------------------------------------- + +(1 row) + +-- Value should be updated in table_to_split_2; +\c - - - :worker_2_port +SELECT * FROM table_to_split_1; + id | value +--------------------------------------------------------------------- +(0 rows) + +SELECT * FROM table_to_split_2; + id | value +--------------------------------------------------------------------- + 400 | b +(1 row) + +SELECT * FROM table_to_split_3; + id | value +--------------------------------------------------------------------- + 100 | b + 500 | b +(2 rows) + \c - - - :worker_1_port DELETE FROM table_to_split_1; SELECT pg_sleep(10); @@ -267,6 +284,7 @@ select pg_sleep(10); INSERT into table_to_split_1 values(100, 'a'); INSERT into table_to_split_1 values(400, 'a'); INSERT into table_to_split_1 values(500, 'a'); +UPDATE table_to_split_1 SET value='b' where id = 400; select pg_sleep(10); pg_sleep --------------------------------------------------------------------- @@ -278,14 +296,14 @@ SELECT * from table_to_split_1; id | value --------------------------------------------------------------------- 100 | a - 400 | a 500 | a + 400 | b (3 rows) SELECT * from table_to_split_2; id | value --------------------------------------------------------------------- - 400 | a + 400 | b (1 row) SELECT * from table_to_split_3; diff --git a/src/test/regress/sql/split_shard_replication_colocated_setup.sql b/src/test/regress/sql/split_shard_replication_colocated_setup.sql index 5c9017e3c..6caf2b114 100644 --- a/src/test/regress/sql/split_shard_replication_colocated_setup.sql +++ b/src/test/regress/sql/split_shard_replication_colocated_setup.sql @@ -3,10 +3,6 @@ SET citus.shard_replication_factor TO 1; SET citus.shard_count TO 1; SET citus.next_shard_id TO 4; --- Add two additional nodes to cluster. -SELECT 1 FROM citus_add_node('localhost', :worker_1_port); -SELECT 1 FROM citus_add_node('localhost', :worker_2_port); - SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \gset diff --git a/src/test/regress/sql/split_shard_replication_setup.sql b/src/test/regress/sql/split_shard_replication_setup.sql index c8908d0ea..82f29d561 100644 --- a/src/test/regress/sql/split_shard_replication_setup.sql +++ b/src/test/regress/sql/split_shard_replication_setup.sql @@ -2,10 +2,6 @@ SET citus.shard_replication_factor TO 1; SET citus.shard_count TO 1; SET citus.next_shard_id TO 1; --- Add two additional nodes to cluster. -SELECT 1 FROM citus_add_node('localhost', :worker_1_port); -SELECT 1 FROM citus_add_node('localhost', :worker_2_port); - SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \gset @@ -89,7 +85,19 @@ SELECT * from table_to_split_1; -- should alwasy have zero rows SELECT * from table_to_split_2; SELECT * from table_to_split_3; --- Delete data from table_to_split_1 from worker1 +-- UPDATE data of table_to_split_1 from worker1 +\c - - - :worker_1_port +UPDATE table_to_split_1 SET value='b' where id = 100; +UPDATE table_to_split_1 SET value='b' where id = 400; +UPDATE table_to_split_1 SET value='b' where id = 500; +SELECT pg_sleep(10); + +-- Value should be updated in table_to_split_2; +\c - - - :worker_2_port +SELECT * FROM table_to_split_1; +SELECT * FROM table_to_split_2; +SELECT * FROM table_to_split_3; + \c - - - :worker_1_port DELETE FROM table_to_split_1; SELECT pg_sleep(10); @@ -150,6 +158,7 @@ select pg_sleep(10); INSERT into table_to_split_1 values(100, 'a'); INSERT into table_to_split_1 values(400, 'a'); INSERT into table_to_split_1 values(500, 'a'); +UPDATE table_to_split_1 SET value='b' where id = 400; select pg_sleep(10); -- expect data to present in table_to_split_2 on worker1 as its destination for value '400'