Add testcase and handling of UPDATE for non-partition column.

- Change nodeId to uint32
- some test enchancement
- fix comments
users/saawasek/non_blocking_split_integrated
Sameer Awasekar 2022-05-27 18:14:03 +05:30
parent 3864cd3187
commit 60ee33cfcc
9 changed files with 95 additions and 59 deletions

View File

@ -25,7 +25,7 @@ static HTAB *ShardInfoHashMap = NULL;
/* Entry for hash map */ /* Entry for hash map */
typedef struct NodeShardMappingEntry typedef struct NodeShardMappingEntry
{ {
uint64_t keyNodeId; uint32_t keyNodeId;
List *shardSplitInfoList; List *shardSplitInfoList;
} NodeShardMappingEntry; } NodeShardMappingEntry;
@ -76,8 +76,8 @@ static void SetupHashMapForShardInfo();
* *
* Usage Semantics: * Usage Semantics:
* This UDF returns a shared memory handle where the information is stored. This shared memory * This UDF returns a shared memory handle where the information is stored. This shared memory
* handle is used by caller to encode replication slot name as "NodeId_MemoryHandle" for every * handle is used by caller to encode replication slot name as "NodeId_SharedMemoryHandle" for every
* distinct target node. The same encoded slot name is stored in one of the fields of the * distinct target node. The same encoded slot name is stored in one of the fields of the
* in-memory data structure(ShardSplitInfo). * in-memory data structure(ShardSplitInfo).
* *
* There is a 1-1 mapping between a target node and a replication slot as one replication * There is a 1-1 mapping between a target node and a replication slot as one replication
@ -151,7 +151,7 @@ SetupHashMapForShardInfo()
{ {
HASHCTL info; HASHCTL info;
memset(&info, 0, sizeof(info)); memset(&info, 0, sizeof(info));
info.keysize = sizeof(uint64_t); info.keysize = sizeof(uint32_t);
info.entrysize = sizeof(NodeShardMappingEntry); info.entrysize = sizeof(NodeShardMappingEntry);
info.hash = uint32_hash; info.hash = uint32_hash;
info.hcxt = CurrentMemoryContext; info.hcxt = CurrentMemoryContext;
@ -388,7 +388,7 @@ CreateShardSplitInfo(uint64 sourceShardIdToSplit,
void void
AddShardSplitInfoEntryForNodeInMap(ShardSplitInfo *shardSplitInfo) AddShardSplitInfoEntryForNodeInMap(ShardSplitInfo *shardSplitInfo)
{ {
uint64_t keyNodeId = shardSplitInfo->nodeId; uint32_t keyNodeId = shardSplitInfo->nodeId;
bool found = false; bool found = false;
NodeShardMappingEntry *nodeMappingEntry = NodeShardMappingEntry *nodeMappingEntry =
(NodeShardMappingEntry *) hash_search(ShardInfoHashMap, &keyNodeId, HASH_ENTER, (NodeShardMappingEntry *) hash_search(ShardInfoHashMap, &keyNodeId, HASH_ENTER,
@ -433,7 +433,7 @@ PopulateShardSplitInfoInSM(ShardSplitInfo *shardSplitInfoArray,
int index = 0; int index = 0;
while ((entry = (NodeShardMappingEntry *) hash_seq_search(&status)) != NULL) while ((entry = (NodeShardMappingEntry *) hash_seq_search(&status)) != NULL)
{ {
uint64_t nodeId = entry->keyNodeId; uint32_t nodeId = entry->keyNodeId;
char *derivedSlotName = char *derivedSlotName =
encode_replication_slot(nodeId, dsmHandle); encode_replication_slot(nodeId, dsmHandle);

View File

@ -241,6 +241,15 @@ split_change_cb(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
break; break;
} }
/* updating non-partition column value */
case REORDER_BUFFER_CHANGE_UPDATE:
{
HeapTuple newTuple = &(change->data.tp.newtuple->tuple);
targetRelationOid = FindTargetRelationOid(relation, newTuple,
replicationSlotName);
break;
}
case REORDER_BUFFER_CHANGE_DELETE: case REORDER_BUFFER_CHANGE_DELETE:
{ {
HeapTuple oldTuple = &(change->data.tp.oldtuple->tuple); HeapTuple oldTuple = &(change->data.tp.oldtuple->tuple);
@ -250,10 +259,10 @@ split_change_cb(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
break; break;
} }
/* Only INSERT/DELETE actions are visible in the replication path of split shard */ /* Only INSERT/DELETE/UPDATE actions are visible in the replication path of split shard */
default: default:
ereport(ERROR, errmsg( ereport(ERROR, errmsg(
"Unexpected Action :%d. Expected action is INSERT or DELETE", "Unexpected Action :%d. Expected action is INSERT/DELETE/UPDATE",
change->action)); change->action));
} }

View File

@ -51,7 +51,11 @@ GetShardSplitInfoSMHeaderFromDSMHandle(dsm_handle dsmHandle)
"corresponding to handle:%u", dsmHandle))); "corresponding to handle:%u", dsmHandle)));
} }
/* Remain attached until end of backend or DetachSession(). */ /*
* By default, mappings are owned by current resource owner, which typically
* means they stick around for the duration of current query.
* Keep a dynamic shared memory mapping until end of session to avoid warnings and leak.
*/
dsm_pin_mapping(dsmSegment); dsm_pin_mapping(dsmSegment);
ShardSplitInfoSMHeader *header = (ShardSplitInfoSMHeader *) dsm_segment_address( ShardSplitInfoSMHeader *header = (ShardSplitInfoSMHeader *) dsm_segment_address(
@ -77,7 +81,7 @@ GetShardSplitInfoSMArrayForSlot(char *slotName, int *shardSplitInfoCount)
} }
dsm_handle dsmHandle; dsm_handle dsmHandle;
uint64_t nodeId = 0; uint32_t nodeId = 0;
decode_replication_slot(slotName, &nodeId, &dsmHandle); decode_replication_slot(slotName, &nodeId, &dsmHandle);
ShardSplitInfoSMHeader *shardSplitInfoSMHeader = ShardSplitInfoSMHeader *shardSplitInfoSMHeader =
@ -182,11 +186,11 @@ ShardSplitInfoSMData(ShardSplitInfoSMHeader *shardSplitInfoSMHeader)
* Slot Name = NodeId_SharedMemoryHandle * Slot Name = NodeId_SharedMemoryHandle
*/ */
char * char *
encode_replication_slot(uint64_t nodeId, encode_replication_slot(uint32_t nodeId,
dsm_handle dsmHandle) dsm_handle dsmHandle)
{ {
StringInfo slotName = makeStringInfo(); StringInfo slotName = makeStringInfo();
appendStringInfo(slotName, "%ld_%u", nodeId, dsmHandle); appendStringInfo(slotName, "%u_%u", nodeId, dsmHandle);
return slotName->data; return slotName->data;
} }
@ -197,7 +201,7 @@ encode_replication_slot(uint64_t nodeId,
*/ */
void void
decode_replication_slot(char *slotName, decode_replication_slot(char *slotName,
uint64_t *nodeId, uint32_t *nodeId,
dsm_handle *dsmHandle) dsm_handle *dsmHandle)
{ {
if (slotName == NULL || if (slotName == NULL ||
@ -206,7 +210,7 @@ decode_replication_slot(char *slotName,
{ {
ereport(ERROR, ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR), (errcode(ERRCODE_SYNTAX_ERROR),
errmsg("Invalid null replication slot name."))); errmsg("Invalid Out parameters")));
} }
int index = 0; int index = 0;
@ -215,10 +219,13 @@ decode_replication_slot(char *slotName,
char *slotNameString = strtok_r(dupSlotName, "_", &strtokPosition); char *slotNameString = strtok_r(dupSlotName, "_", &strtokPosition);
while (slotNameString != NULL) while (slotNameString != NULL)
{ {
/* first part of the slot name is NodeId */
if (index == 0) if (index == 0)
{ {
*nodeId = SafeStringToUint64(slotNameString); *nodeId = strtoul(slotNameString, NULL, 10);
} }
/* second part of the name is memory handle */
else if (index == 1) else if (index == 1)
{ {
*dsmHandle = strtoul(slotNameString, NULL, 10); *dsmHandle = strtoul(slotNameString, NULL, 10);
@ -226,4 +233,14 @@ decode_replication_slot(char *slotName,
slotNameString = strtok_r(NULL, "_", &strtokPosition); slotNameString = strtok_r(NULL, "_", &strtokPosition);
index++; index++;
} }
/*
* Replication slot name is encoded as NodeId_SharedMemoryHandle. Hence the number of tokens
* would be strictly two considering "_" as delimiter.
*/
if (index != 2)
{
ereport(ERROR,
(errmsg("Invalid Replication Slot name encoding: %s", slotName)));
}
} }

View File

@ -39,7 +39,7 @@ typedef struct ShardSplitInfo
Oid splitChildShardOid; /* child shard Oid */ Oid splitChildShardOid; /* child shard Oid */
int32 shardMinValue; /* min hash value */ int32 shardMinValue; /* min hash value */
int32 shardMaxValue; /* max hash value */ int32 shardMaxValue; /* max hash value */
uint64 nodeId; /* node where child shard is to be placed */ uint32_t nodeId; /* node where child shard is to be placed */
char slotName[NAMEDATALEN]; /* replication slot name belonging to this node */ char slotName[NAMEDATALEN]; /* replication slot name belonging to this node */
} ShardSplitInfo; } ShardSplitInfo;

View File

@ -2,9 +2,9 @@
* *
* shardsplit_shared_memory.h * shardsplit_shared_memory.h
* API's for creating and accessing shared memory segments to store * API's for creating and accessing shared memory segments to store
* shard split information. 'setup_shard_replication' UDF creates the * shard split information. 'split_shard_replication_setup' UDF creates the
* shared memory, populates the contents and WAL sender processes are * shared memory and populates the contents. WAL sender processes are consumer
* the consumers. * of split information for appropriate tuple routing.
* *
* Copyright (c) Citus Data, Inc. * Copyright (c) Citus Data, Inc.
* *
@ -36,10 +36,10 @@ extern ShardSplitInfo * GetShardSplitInfoSMArrayForSlot(char *slotName,
/* Functions related to encoding-decoding for replication slot name */ /* Functions related to encoding-decoding for replication slot name */
char * encode_replication_slot(uint64_t nodeId, char * encode_replication_slot(uint32_t nodeId,
dsm_handle dsmHandle); dsm_handle dsmHandle);
void decode_replication_slot(char *slotName, void decode_replication_slot(char *slotName,
uint64_t *nodeId, uint32_t *nodeId,
dsm_handle *dsmHandle); dsm_handle *dsmHandle);
#endif /* SHARDSPLIT_SHARED_MEMORY_H */ #endif /* SHARDSPLIT_SHARED_MEMORY_H */

View File

@ -2,19 +2,6 @@
SET citus.shard_replication_factor TO 1; SET citus.shard_replication_factor TO 1;
SET citus.shard_count TO 1; SET citus.shard_count TO 1;
SET citus.next_shard_id TO 4; SET citus.next_shard_id TO 4;
-- Add two additional nodes to cluster.
SELECT 1 FROM citus_add_node('localhost', :worker_1_port);
?column?
---------------------------------------------------------------------
1
(1 row)
SELECT 1 FROM citus_add_node('localhost', :worker_2_port);
?column?
---------------------------------------------------------------------
1
(1 row)
SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset
SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \gset SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \gset
-- Create distributed table (co-located) -- Create distributed table (co-located)

View File

@ -1,19 +1,6 @@
SET citus.shard_replication_factor TO 1; SET citus.shard_replication_factor TO 1;
SET citus.shard_count TO 1; SET citus.shard_count TO 1;
SET citus.next_shard_id TO 1; SET citus.next_shard_id TO 1;
-- Add two additional nodes to cluster.
SELECT 1 FROM citus_add_node('localhost', :worker_1_port);
?column?
---------------------------------------------------------------------
1
(1 row)
SELECT 1 FROM citus_add_node('localhost', :worker_2_port);
?column?
---------------------------------------------------------------------
1
(1 row)
SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset
SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \gset SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \gset
-- Create distributed table (non co-located) -- Create distributed table (non co-located)
@ -162,7 +149,37 @@ SELECT * from table_to_split_3;
500 | a 500 | a
(2 rows) (2 rows)
-- Delete data from table_to_split_1 from worker1 -- UPDATE data of table_to_split_1 from worker1
\c - - - :worker_1_port
UPDATE table_to_split_1 SET value='b' where id = 100;
UPDATE table_to_split_1 SET value='b' where id = 400;
UPDATE table_to_split_1 SET value='b' where id = 500;
SELECT pg_sleep(10);
pg_sleep
---------------------------------------------------------------------
(1 row)
-- Value should be updated in table_to_split_2;
\c - - - :worker_2_port
SELECT * FROM table_to_split_1;
id | value
---------------------------------------------------------------------
(0 rows)
SELECT * FROM table_to_split_2;
id | value
---------------------------------------------------------------------
400 | b
(1 row)
SELECT * FROM table_to_split_3;
id | value
---------------------------------------------------------------------
100 | b
500 | b
(2 rows)
\c - - - :worker_1_port \c - - - :worker_1_port
DELETE FROM table_to_split_1; DELETE FROM table_to_split_1;
SELECT pg_sleep(10); SELECT pg_sleep(10);
@ -267,6 +284,7 @@ select pg_sleep(10);
INSERT into table_to_split_1 values(100, 'a'); INSERT into table_to_split_1 values(100, 'a');
INSERT into table_to_split_1 values(400, 'a'); INSERT into table_to_split_1 values(400, 'a');
INSERT into table_to_split_1 values(500, 'a'); INSERT into table_to_split_1 values(500, 'a');
UPDATE table_to_split_1 SET value='b' where id = 400;
select pg_sleep(10); select pg_sleep(10);
pg_sleep pg_sleep
--------------------------------------------------------------------- ---------------------------------------------------------------------
@ -278,14 +296,14 @@ SELECT * from table_to_split_1;
id | value id | value
--------------------------------------------------------------------- ---------------------------------------------------------------------
100 | a 100 | a
400 | a
500 | a 500 | a
400 | b
(3 rows) (3 rows)
SELECT * from table_to_split_2; SELECT * from table_to_split_2;
id | value id | value
--------------------------------------------------------------------- ---------------------------------------------------------------------
400 | a 400 | b
(1 row) (1 row)
SELECT * from table_to_split_3; SELECT * from table_to_split_3;

View File

@ -3,10 +3,6 @@ SET citus.shard_replication_factor TO 1;
SET citus.shard_count TO 1; SET citus.shard_count TO 1;
SET citus.next_shard_id TO 4; SET citus.next_shard_id TO 4;
-- Add two additional nodes to cluster.
SELECT 1 FROM citus_add_node('localhost', :worker_1_port);
SELECT 1 FROM citus_add_node('localhost', :worker_2_port);
SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset
SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \gset SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \gset

View File

@ -2,10 +2,6 @@ SET citus.shard_replication_factor TO 1;
SET citus.shard_count TO 1; SET citus.shard_count TO 1;
SET citus.next_shard_id TO 1; SET citus.next_shard_id TO 1;
-- Add two additional nodes to cluster.
SELECT 1 FROM citus_add_node('localhost', :worker_1_port);
SELECT 1 FROM citus_add_node('localhost', :worker_2_port);
SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset
SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \gset SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \gset
@ -89,7 +85,19 @@ SELECT * from table_to_split_1; -- should alwasy have zero rows
SELECT * from table_to_split_2; SELECT * from table_to_split_2;
SELECT * from table_to_split_3; SELECT * from table_to_split_3;
-- Delete data from table_to_split_1 from worker1 -- UPDATE data of table_to_split_1 from worker1
\c - - - :worker_1_port
UPDATE table_to_split_1 SET value='b' where id = 100;
UPDATE table_to_split_1 SET value='b' where id = 400;
UPDATE table_to_split_1 SET value='b' where id = 500;
SELECT pg_sleep(10);
-- Value should be updated in table_to_split_2;
\c - - - :worker_2_port
SELECT * FROM table_to_split_1;
SELECT * FROM table_to_split_2;
SELECT * FROM table_to_split_3;
\c - - - :worker_1_port \c - - - :worker_1_port
DELETE FROM table_to_split_1; DELETE FROM table_to_split_1;
SELECT pg_sleep(10); SELECT pg_sleep(10);
@ -150,6 +158,7 @@ select pg_sleep(10);
INSERT into table_to_split_1 values(100, 'a'); INSERT into table_to_split_1 values(100, 'a');
INSERT into table_to_split_1 values(400, 'a'); INSERT into table_to_split_1 values(400, 'a');
INSERT into table_to_split_1 values(500, 'a'); INSERT into table_to_split_1 values(500, 'a');
UPDATE table_to_split_1 SET value='b' where id = 400;
select pg_sleep(10); select pg_sleep(10);
-- expect data to present in table_to_split_2 on worker1 as its destination for value '400' -- expect data to present in table_to_split_2 on worker1 as its destination for value '400'