diff --git a/src/backend/distributed/operations/split_shard_replication_setup.c b/src/backend/distributed/operations/split_shard_replication_setup.c index 5441da7ee..c14d5331e 100644 --- a/src/backend/distributed/operations/split_shard_replication_setup.c +++ b/src/backend/distributed/operations/split_shard_replication_setup.c @@ -22,10 +22,17 @@ PG_FUNCTION_INFO_V1(split_shard_replication_setup); static HTAB *ShardInfoHashMap = NULL; +/* key for NodeShardMappingEntry */ +typedef struct NodeShardMappingKey +{ + uint32_t nodeId; + Oid tableOwnerId; +} NodeShardMappingKey; + /* Entry for hash map */ typedef struct NodeShardMappingEntry { - uint32_t keyNodeId; + NodeShardMappingKey key; List *shardSplitInfoList; } NodeShardMappingEntry; @@ -47,7 +54,11 @@ static void PopulateShardSplitInfoInSM(ShardSplitInfo *shardSplitInfoArray, HTAB *shardInfoHashMap, dsm_handle dsmHandle, int shardSplitInfoCount); + static void SetupHashMapForShardInfo(void); +static uint32 NodeShardMappingHash(const void *key, Size keysize); +static int NodeShardMappingHashCompare(const void *left, const void *right, Size keysize); + /* * split_shard_replication_setup UDF creates in-memory data structures @@ -80,8 +91,8 @@ static void SetupHashMapForShardInfo(void); * distinct target node. The same encoded slot name is stored in one of the fields of the * in-memory data structure(ShardSplitInfo). * - * There is a 1-1 mapping between a target node and a replication slot as one replication - * slot takes care of replicating changes for one node. + * There is a 1-1 mapping between a target node and a replication slot. One replication + * slot takes care of replicating changes for all shards belonging to the same owner on that node. * * During the replication phase, 'decoding_plugin_for_shard_split' called for a change on a particular * replication slot, will decode the shared memory handle from its slot name and will attach to the @@ -151,11 +162,13 @@ SetupHashMapForShardInfo() { HASHCTL info; memset(&info, 0, sizeof(info)); - info.keysize = sizeof(uint32_t); + info.keysize = sizeof(NodeShardMappingKey); info.entrysize = sizeof(NodeShardMappingEntry); - info.hash = uint32_hash; + info.hash = NodeShardMappingHash; + info.match = NodeShardMappingHashCompare; info.hcxt = CurrentMemoryContext; - int hashFlags = (HASH_ELEM | HASH_CONTEXT | HASH_FUNCTION); + + int hashFlags = (HASH_ELEM | HASH_CONTEXT | HASH_FUNCTION | HASH_COMPARE); ShardInfoHashMap = hash_create("ShardInfoMap", 128, &info, hashFlags); } @@ -386,16 +399,17 @@ CreateShardSplitInfo(uint64 sourceShardIdToSplit, static void AddShardSplitInfoEntryForNodeInMap(ShardSplitInfo *shardSplitInfo) { - uint32_t keyNodeId = shardSplitInfo->nodeId; + NodeShardMappingKey key; + key.nodeId = shardSplitInfo->nodeId; + key.tableOwnerId = TableOwnerOid(shardSplitInfo->distributedTableOid); + bool found = false; NodeShardMappingEntry *nodeMappingEntry = - (NodeShardMappingEntry *) hash_search(ShardInfoHashMap, &keyNodeId, HASH_ENTER, + (NodeShardMappingEntry *) hash_search(ShardInfoHashMap, &key, HASH_ENTER, &found); - if (!found) { nodeMappingEntry->shardSplitInfoList = NULL; - nodeMappingEntry->keyNodeId = keyNodeId; } nodeMappingEntry->shardSplitInfoList = @@ -429,9 +443,10 @@ PopulateShardSplitInfoInSM(ShardSplitInfo *shardSplitInfoArray, int index = 0; while ((entry = (NodeShardMappingEntry *) hash_seq_search(&status)) != NULL) { - uint32_t nodeId = entry->keyNodeId; + uint32_t nodeId = entry->key.nodeId; + uint32_t tableOwnerId = entry->key.tableOwnerId; char *derivedSlotName = - encode_replication_slot(nodeId, dsmHandle); + encode_replication_slot(nodeId, dsmHandle, tableOwnerId); List *shardSplitInfoList = entry->shardSplitInfoList; ListCell *listCell = NULL; @@ -452,3 +467,38 @@ PopulateShardSplitInfoInSM(ShardSplitInfo *shardSplitInfoArray, } } } + + +/* + * NodeShardMappingHash returns hash value by combining hash of node id + * and tableowner Id. + */ +static uint32 +NodeShardMappingHash(const void *key, Size keysize) +{ + NodeShardMappingKey *entry = (NodeShardMappingKey *) key; + uint32 hash = hash_uint32(entry->nodeId); + hash = hash_combine(hash, hash_uint32(entry->tableOwnerId)); + return hash; +} + + +/* + * Comparator function for hash keys + */ +static int +NodeShardMappingHashCompare(const void *left, const void *right, Size keysize) +{ + NodeShardMappingKey *leftKey = (NodeShardMappingKey *) left; + NodeShardMappingKey *rightKey = (NodeShardMappingKey *) right; + + if (leftKey->nodeId != rightKey->nodeId || + leftKey->tableOwnerId != rightKey->tableOwnerId) + { + return 1; + } + else + { + return 0; + } +} diff --git a/src/backend/distributed/shardsplit/shardsplit_shared_memory.c b/src/backend/distributed/shardsplit/shardsplit_shared_memory.c index 4e40ce52f..cbad1e1c7 100644 --- a/src/backend/distributed/shardsplit/shardsplit_shared_memory.c +++ b/src/backend/distributed/shardsplit/shardsplit_shared_memory.c @@ -183,14 +183,15 @@ ShardSplitInfoSMData(ShardSplitInfoSMHeader *shardSplitInfoSMHeader) /* * encode_replication_slot returns an encoded replication slot name * in the following format. - * Slot Name = NodeId_SharedMemoryHandle + * Slot Name = NodeId_SharedMemoryHandle_TableOwnerOid */ char * encode_replication_slot(uint32_t nodeId, - dsm_handle dsmHandle) + dsm_handle dsmHandle, + uint32_t tableOwnerId) { StringInfo slotName = makeStringInfo(); - appendStringInfo(slotName, "%u_%u", nodeId, dsmHandle); + appendStringInfo(slotName, "%u_%u_%u", nodeId, dsmHandle, tableOwnerId); return slotName->data; } @@ -230,15 +231,18 @@ decode_replication_slot(char *slotName, { *dsmHandle = strtoul(slotNameString, NULL, 10); } + slotNameString = strtok_r(NULL, "_", &strtokPosition); index++; + + /*Ignoring TableOwnerOid*/ } /* - * Replication slot name is encoded as NodeId_SharedMemoryHandle. Hence the number of tokens - * would be strictly two considering "_" as delimiter. + * Replication slot name is encoded as NodeId_SharedMemoryHandle_TableOwnerOid. + * Hence the number of tokens would be strictly three considering "_" as delimiter. */ - if (index != 2) + if (index != 3) { ereport(ERROR, (errmsg("Invalid Replication Slot name encoding: %s", slotName))); diff --git a/src/include/distributed/shardsplit_shared_memory.h b/src/include/distributed/shardsplit_shared_memory.h index 7ecc47348..9daf013a5 100644 --- a/src/include/distributed/shardsplit_shared_memory.h +++ b/src/include/distributed/shardsplit_shared_memory.h @@ -37,7 +37,8 @@ extern ShardSplitInfo * GetShardSplitInfoSMArrayForSlot(char *slotName, /* Functions related to encoding-decoding for replication slot name */ char * encode_replication_slot(uint32_t nodeId, - dsm_handle dsmHandle); + dsm_handle dsmHandle, + uint32_t tableOwnerId); void decode_replication_slot(char *slotName, uint32_t *nodeId, dsm_handle *dsmHandle); diff --git a/src/test/regress/expected/split_shard_replication_colocated_setup.out b/src/test/regress/expected/split_shard_replication_colocated_setup.out index 0aede87ae..a79f6ff24 100644 --- a/src/test/regress/expected/split_shard_replication_colocated_setup.out +++ b/src/test/regress/expected/split_shard_replication_colocated_setup.out @@ -2,53 +2,64 @@ SET citus.shard_replication_factor TO 1; SET citus.shard_count TO 1; SET citus.next_shard_id TO 4; +CREATE USER myuser; +CREATE USER admin_user; SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \gset --- Create distributed table (co-located) +\c - myuser - - +SET citus.shard_replication_factor TO 1; +SET citus.shard_count TO 1; +SET citus.next_shard_id TO 4; CREATE TABLE table_first (id bigserial PRIMARY KEY, value char); -CREATE TABLE table_second (id bigserial PRIMARY KEY, value char); SELECT create_distributed_table('table_first','id'); create_distributed_table --------------------------------------------------------------------- (1 row) +\c - admin_user - - SET citus.next_shard_id TO 7; SET citus.shard_count TO 1; +CREATE TABLE table_second (id bigserial PRIMARY KEY, value char); SELECT create_distributed_table('table_second', 'id', colocate_with => 'table_first'); create_distributed_table --------------------------------------------------------------------- (1 row) -\c - - - :worker_1_port +\c - myuser - :worker_1_port CREATE TABLE table_first_5(id bigserial PRIMARY KEY, value char); CREATE TABLE table_first_6(id bigserial PRIMARY KEY, value char); -CREATE TABLE table_second_8(id bigserial PRIMARY KEY, value char); -CREATE TABLE table_second_9(id bigserial PRIMARY KEY, value char); -\c - - - :worker_2_port +\c - myuser - :worker_2_port CREATE TABLE table_first_4(id bigserial PRIMARY KEY, value char); CREATE TABLE table_first_5(id bigserial PRIMARY KEY, value char); CREATE TABLE table_first_6(id bigserial PRIMARY KEY, value char); +\c - admin_user - :worker_1_port +CREATE TABLE table_second_8(id bigserial PRIMARY KEY, value char); +CREATE TABLE table_second_9(id bigserial PRIMARY KEY, value char); +\c - admin_user - :worker_2_port CREATE TABLE table_second_7(id bigserial PRIMARY KEY, value char); CREATE TABLE table_second_8(id bigserial PRIMARY KEY, value char); CREATE TABLE table_second_9(id bigserial PRIMARY KEY, value char); --- Test scenario one starts from here --- 1. table_first and table_second are colocated tables. --- 2. Shard table_first_4 and table_second_7 are colocated on worker1 --- 3. table_first_4 is split into table_first_5 and table_first_6 with target as worker2 --- 4. table_second_7 is split into table_second_8 and table_second_9 with target as worker2 --- 5. Create Replication slot with 'decoding_plugin_for_shard_split' --- 6. Setup Pub/Sub --- 7. Insert into table_first_4 and table_second_7 at source worker1 --- 8. Expect the results in child shards on worker2 -\c - - - :worker_1_port +--- Test scenario one starts from here +--- 1. table_first and table_second are colocated tables. +--- 2. myuser is the owner table_first and admin_user is the owner of table_second. +--- 3. Shard table_first_4 and table_second_7 are colocated on worker1 +--- 4. table_first_4 is split into table_first_5 and table_first_6 with target as worker2 +--- 5. table_second_7 is split into table_second_8 and table_second_9 with target as worker2 +--- 6. Create two publishers and two subscribers for respective table owners. +--- 7. Insert into table_first_4 and table_second_7 at source worker1 +--- 8. Expect the results in child shards on worker2 +\c - postgres - :worker_1_port -- Create publication at worker1 BEGIN; - CREATE PUBLICATION PUB1 for table table_first_4, table_first_5, table_first_6, table_second_7, table_second_8, table_second_9; + CREATE PUBLICATION PUB1 for table table_first_4, table_first_5, table_first_6; COMMIT; BEGIN; -select 1 from public.CreateReplicationSlotForColocatedShards(:worker_2_node, :worker_2_node); + CREATE PUBLICATION PUB2 for table table_second_7, table_second_8, table_second_9; +COMMIT; +BEGIN; +select 1 from public.create_replication_slot_for_colocated_shards(:worker_2_node, :worker_2_node); ?column? --------------------------------------------------------------------- 1 @@ -61,84 +72,44 @@ SELECT pg_sleep(5); (1 row) -\c - - - :worker_2_port -- Create subscription at worker2 with copy_data to 'false' and derived replication slot name +\c - postgres - :worker_2_port +SET client_min_messages TO WARNING; BEGIN; -SELECT 1 from public.CreateSubscription(:worker_2_node, 'SUB1'); +SELECT 1 from public.create_subscription_for_owner_one(:worker_2_node, 'SUB1'); ?column? --------------------------------------------------------------------- 1 (1 row) COMMIT; --- No data is present at this moment in all the below tables at worker2 -SELECT * from table_first_4; - id | value ---------------------------------------------------------------------- -(0 rows) - -SELECT * from table_first_5; - id | value ---------------------------------------------------------------------- -(0 rows) - -SELECT * from table_first_6; - id | value ---------------------------------------------------------------------- -(0 rows) - --- Insert data in table_to_split_1 at worker1 -\c - - - :worker_1_port +\c - myuser - :worker_1_port INSERT into table_first_4 values(100, 'a'); INSERT into table_first_4 values(400, 'a'); INSERT into table_first_4 values(500, 'a'); -SELECT * from table_first_4; - id | value ---------------------------------------------------------------------- - 100 | a - 400 | a - 500 | a -(3 rows) - -SELECT * from table_first_5; - id | value ---------------------------------------------------------------------- -(0 rows) - -SELECT * from table_first_6; - id | value ---------------------------------------------------------------------- -(0 rows) - -INSERT into table_second_7 values(100, 'a'); -INSERT into table_second_7 values(400, 'a'); -INSERT into table_second_7 values(500, 'a'); -SELECT * FROM table_second_7; - id | value ---------------------------------------------------------------------- - 100 | a - 400 | a - 500 | a -(3 rows) - -SELECT * FROM table_second_8; - id | value ---------------------------------------------------------------------- -(0 rows) - -SELECT * FROM table_second_9; - id | value ---------------------------------------------------------------------- -(0 rows) - -select pg_sleep(5); +select pg_sleep(2); pg_sleep --------------------------------------------------------------------- (1 row) --- Expect data to be present in shard xxxxx,6 and 8,9 based on the hash value. -\c - - - :worker_2_port +\c - admin_user - :worker_1_port +INSERT INTO table_second_7 values(100, 'a'); +INSERT INTO table_second_7 values(400, 'a'); +SELECT * from table_second_7; + id | value +--------------------------------------------------------------------- + 100 | a + 400 | a +(2 rows) + +select pg_sleep(2); + pg_sleep +--------------------------------------------------------------------- + +(1 row) + +\c - myuser - :worker_2_port SELECT * from table_first_4; -- should alwasy have zero rows id | value --------------------------------------------------------------------- @@ -157,21 +128,53 @@ SELECT * from table_first_6; 500 | a (2 rows) -SELECT * FROM table_second_7; --should alwasy have zero rows +\c - admin_user - :worker_2_port +SELECT * from table_second_7; id | value --------------------------------------------------------------------- (0 rows) -SELECT * FROM table_second_8; +SELECT * from table_second_8; + id | value +--------------------------------------------------------------------- +(0 rows) + +SELECT * from table_second_9; + id | value +--------------------------------------------------------------------- +(0 rows) + +\c - postgres - :worker_2_port +SET client_min_messages TO WARNING; +BEGIN; +SELECT 1 from public.create_subscription_for_owner_two(:worker_2_node, 'SUB2'); + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + +COMMIT; +select pg_sleep(5); + pg_sleep +--------------------------------------------------------------------- + +(1 row) + +\c - admin_user - :worker_2_port +SELECT * from table_second_7; + id | value +--------------------------------------------------------------------- +(0 rows) + +SELECT * from table_second_8; id | value --------------------------------------------------------------------- 400 | a (1 row) -SELECT * FROM table_second_9; +SELECT * from table_second_9; id | value --------------------------------------------------------------------- 100 | a - 500 | a -(2 rows) +(1 row) diff --git a/src/test/regress/expected/split_shard_replication_setup.out b/src/test/regress/expected/split_shard_replication_setup.out index 151b4e316..6c182dd13 100644 --- a/src/test/regress/expected/split_shard_replication_setup.out +++ b/src/test/regress/expected/split_shard_replication_setup.out @@ -55,7 +55,7 @@ BEGIN; COMMIT; -- Create replication slot for target node worker2 BEGIN; -select 1 from public.CreateReplicationSlot(:worker_2_node, :worker_2_node); +select 1 from public.create_replication_slot(:worker_2_node, :worker_2_node); ?column? --------------------------------------------------------------------- 1 @@ -65,7 +65,7 @@ COMMIT; \c - - - :worker_2_port -- Create subscription at worker2 with copy_data to 'false' and derived replication slot name BEGIN; -SELECT 1 from public.CreateSubscription(:worker_2_node, 'SUB1'); +SELECT 1 from public.create_subscription(:worker_2_node, 'SUB1'); ?column? --------------------------------------------------------------------- 1 @@ -219,7 +219,7 @@ COMMIT; -- Create replication slots for two target nodes worker1 and worker2. -- Worker1 is target for table_to_split_2 and Worker2 is target for table_to_split_3 BEGIN; -select 1 from public.CreateReplicationSlot(:worker_1_node, :worker_2_node); +select 1 from public.create_replication_slot(:worker_1_node, :worker_2_node); ?column? --------------------------------------------------------------------- 1 @@ -228,7 +228,7 @@ select 1 from public.CreateReplicationSlot(:worker_1_node, :worker_2_node); COMMIT; -- Create subscription at worker1 with copy_data to 'false' and derived replication slot name BEGIN; -SELECT 1 from public.CreateSubscription(:worker_1_node, 'SUB1'); +SELECT 1 from public.create_subscription(:worker_1_node, 'SUB1'); ?column? --------------------------------------------------------------------- 1 @@ -244,7 +244,7 @@ select pg_sleep(5); \c - - - :worker_2_port -- Create subscription at worker2 with copy_data to 'false' and derived replication slot name BEGIN; -SELECT 1 from public.CreateSubscription(:worker_2_node, 'SUB2'); +SELECT 1 from public.create_subscription(:worker_2_node, 'SUB2'); ?column? --------------------------------------------------------------------- 1 @@ -368,7 +368,7 @@ BEGIN; COMMIT; -- Worker1 is target for table_to_split_2 and table_to_split_3 BEGIN; -select 1 from public.CreateReplicationSlot(:worker_1_node, :worker_1_node); +select 1 from public.create_replication_slot(:worker_1_node, :worker_1_node); ?column? --------------------------------------------------------------------- 1 @@ -377,7 +377,7 @@ select 1 from public.CreateReplicationSlot(:worker_1_node, :worker_1_node); COMMIT; -- Create subscription at worker1 with copy_data to 'false' and derived replication slot name BEGIN; -SELECT 1 from public.CreateSubscription(:worker_1_node, 'SUB1'); +SELECT 1 from public.create_subscription(:worker_1_node, 'SUB1'); ?column? --------------------------------------------------------------------- 1 diff --git a/src/test/regress/expected/split_shard_test_helpers.out b/src/test/regress/expected/split_shard_test_helpers.out index 27f4cd24c..f1a198b53 100644 --- a/src/test/regress/expected/split_shard_test_helpers.out +++ b/src/test/regress/expected/split_shard_test_helpers.out @@ -1,7 +1,7 @@ -- File to create functions and helpers needed for split shard tests -- Populates shared memory mapping for parent shard with id 1. -- targetNode1, targetNode2 are the locations where child shard xxxxx and 3 are placed respectively -CREATE OR REPLACE FUNCTION SplitShardReplicationSetup(targetNode1 integer, targetNode2 integer) RETURNS text AS $$ +CREATE OR REPLACE FUNCTION split_shard_replication_setup_helper(targetNode1 integer, targetNode2 integer) RETURNS text AS $$ DECLARE memoryId bigint := 0; memoryIdText text; @@ -12,7 +12,7 @@ begin end $$ LANGUAGE plpgsql; -- Create replication slots for targetNode1 and targetNode2 incase of non-colocated shards -CREATE OR REPLACE FUNCTION CreateReplicationSlot(targetNode1 integer, targetNode2 integer) RETURNS text AS $$ +CREATE OR REPLACE FUNCTION create_replication_slot(targetNode1 integer, targetNode2 integer) RETURNS text AS $$ DECLARE targetOneSlotName text; targetTwoSlotName text; @@ -20,13 +20,13 @@ DECLARE derivedSlotName text; begin - SELECT * into sharedMemoryId from public.SplitShardReplicationSetup(targetNode1, targetNode2); - SELECT FORMAT('%s_%s', targetNode1, sharedMemoryId) into derivedSlotName; + SELECT * into sharedMemoryId from public.split_shard_replication_setup_helper(targetNode1, targetNode2); + SELECT FORMAT('%s_%s_10', targetNode1, sharedMemoryId) into derivedSlotName; SELECT slot_name into targetOneSlotName from pg_create_logical_replication_slot(derivedSlotName, 'decoding_plugin_for_shard_split'); -- if new child shards are placed on different nodes, create one more replication slot if (targetNode1 != targetNode2) then - SELECT FORMAT('%s_%s', targetNode2, sharedMemoryId) into derivedSlotName; + SELECT FORMAT('%s_%s_10', targetNode2, sharedMemoryId) into derivedSlotName; SELECT slot_name into targetTwoSlotName from pg_create_logical_replication_slot(derivedSlotName, 'decoding_plugin_for_shard_split'); INSERT INTO slotName_table values(targetTwoSlotName, targetNode2, 1); end if; @@ -37,7 +37,7 @@ end $$ LANGUAGE plpgsql; -- Populates shared memory mapping for colocated parent shards 4 and 7. -- shard xxxxx has child shards 5 and 6. Shard 7 has child shards 8 and 9. -CREATE OR REPLACE FUNCTION SplitShardReplicationSetupForColocatedShards(targetNode1 integer, targetNode2 integer) RETURNS text AS $$ +CREATE OR REPLACE FUNCTION split_shard_replication_setup_for_colocated_shards(targetNode1 integer, targetNode2 integer) RETURNS text AS $$ DECLARE memoryId bigint := 0; memoryIdText text; @@ -55,31 +55,36 @@ begin end $$ LANGUAGE plpgsql; -- Create replication slots for targetNode1 and targetNode2 incase of colocated shards -CREATE OR REPLACE FUNCTION CreateReplicationSlotForColocatedShards(targetNode1 integer, targetNode2 integer) RETURNS text AS $$ +CREATE OR REPLACE FUNCTION create_replication_slot_for_colocated_shards(targetNode1 integer, targetNode2 integer) RETURNS text AS $$ DECLARE targetOneSlotName text; targetTwoSlotName text; sharedMemoryId text; - derivedSlotName text; + derivedSlotNameOne text; + derivedSlotNameTwo text; + tableOwnerOne bigint; + tableOwnerTwo bigint; begin + -- setup shared memory information + SELECT * into sharedMemoryId from public.split_shard_replication_setup_for_colocated_shards(targetNode1, targetNode2); - SELECT * into sharedMemoryId from public.SplitShardReplicationSetupForColocatedShards(targetNode1, targetNode2); - SELECT FORMAT('%s_%s', targetNode1, sharedMemoryId) into derivedSlotName; - SELECT slot_name into targetOneSlotName from pg_create_logical_replication_slot(derivedSlotName, 'decoding_plugin_for_shard_split'); + SELECT relowner into tableOwnerOne from pg_class where relname='table_first'; + SELECT FORMAT('%s_%s_%s', targetNode1, sharedMemoryId, tableOwnerOne) into derivedSlotNameOne; + SELECT slot_name into targetOneSlotName from pg_create_logical_replication_slot(derivedSlotNameOne, 'decoding_plugin_for_shard_split'); - -- if new child shards are placed on different nodes, create one more replication slot - if (targetNode1 != targetNode2) then - SELECT FORMAT('%s_%s', targetNode2, sharedMemoryId) into derivedSlotName; - SELECT slot_name into targetTwoSlotName from pg_create_logical_replication_slot(derivedSlotName, 'decoding_plugin_for_shard_split'); - INSERT INTO slotName_table values(targetTwoSlotName, targetNode2, 1); - end if; + SELECT relowner into tableOwnerTwo from pg_class where relname='table_second'; + SELECT FORMAT('%s_%s_%s', targetNode2, sharedMemoryId, tableOwnerTwo) into derivedSlotNameTwo; + SELECT slot_name into targetTwoSlotName from pg_create_logical_replication_slot(derivedSlotNameTwo, 'decoding_plugin_for_shard_split'); + + + INSERT INTO slotName_table values(targetOneSlotName, targetNode1, 1); + INSERT INTO slotName_table values(targetTwoSlotName, targetNode2, 2); - INSERT INTO slotName_table values(targetOneSlotName, targetNode1, 2); return targetOneSlotName; end $$ LANGUAGE plpgsql; -- create subscription on target node with given 'subscriptionName' -CREATE OR REPLACE FUNCTION CreateSubscription(targetNodeId integer, subscriptionName text) RETURNS text AS $$ +CREATE OR REPLACE FUNCTION create_subscription(targetNodeId integer, subscriptionName text) RETURNS text AS $$ DECLARE replicationSlotName text; nodeportLocal int; @@ -90,3 +95,29 @@ begin return replicationSlotName; end $$ LANGUAGE plpgsql; +-- create subscription on target node with given 'subscriptionName' +CREATE OR REPLACE FUNCTION create_subscription_for_owner_one(targetNodeId integer, subscriptionName text) RETURNS text AS $$ +DECLARE + replicationSlotName text; + nodeportLocal int; + subname text; +begin + SELECT name into replicationSlotName from slotName_table where id = 1; + EXECUTE FORMAT($sub$create subscription %s connection 'host=localhost port=xxxxx user=postgres dbname=regression' publication PUB1 with(create_slot=false, enabled=true, slot_name='%s', copy_data=false)$sub$, subscriptionName, replicationSlotName); + RAISE NOTICE 'sameer %', replicationSlotName; + return replicationSlotName; +end +$$ LANGUAGE plpgsql; +-- create subscription on target node with given 'subscriptionName' +CREATE OR REPLACE FUNCTION create_subscription_for_owner_two(targetNodeId integer, subscriptionName text) RETURNS text AS $$ +DECLARE + replicationSlotName text; + nodeportLocal int; + subname text; +begin + SELECT name into replicationSlotName from slotName_table where id = 2; + EXECUTE FORMAT($sub$create subscription %s connection 'host=localhost port=xxxxx user=postgres dbname=regression' publication PUB2 with(create_slot=false, enabled=true, slot_name='%s', copy_data=false)$sub$, subscriptionName, replicationSlotName); + RAISE NOTICE 'sameer %', replicationSlotName; + return replicationSlotName; +end +$$ LANGUAGE plpgsql; diff --git a/src/test/regress/sql/split_shard_replication_colocated_setup.sql b/src/test/regress/sql/split_shard_replication_colocated_setup.sql index 94da83e0e..c90bca8e0 100644 --- a/src/test/regress/sql/split_shard_replication_colocated_setup.sql +++ b/src/test/regress/sql/split_shard_replication_colocated_setup.sql @@ -3,93 +3,106 @@ SET citus.shard_replication_factor TO 1; SET citus.shard_count TO 1; SET citus.next_shard_id TO 4; +CREATE USER myuser; +CREATE USER admin_user; + SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \gset --- Create distributed table (co-located) +\c - myuser - - +SET citus.shard_replication_factor TO 1; +SET citus.shard_count TO 1; +SET citus.next_shard_id TO 4; CREATE TABLE table_first (id bigserial PRIMARY KEY, value char); -CREATE TABLE table_second (id bigserial PRIMARY KEY, value char); - SELECT create_distributed_table('table_first','id'); +\c - admin_user - - SET citus.next_shard_id TO 7; SET citus.shard_count TO 1; +CREATE TABLE table_second (id bigserial PRIMARY KEY, value char); SELECT create_distributed_table('table_second', 'id', colocate_with => 'table_first'); -\c - - - :worker_1_port +\c - myuser - :worker_1_port CREATE TABLE table_first_5(id bigserial PRIMARY KEY, value char); CREATE TABLE table_first_6(id bigserial PRIMARY KEY, value char); -CREATE TABLE table_second_8(id bigserial PRIMARY KEY, value char); -CREATE TABLE table_second_9(id bigserial PRIMARY KEY, value char); - -\c - - - :worker_2_port +\c - myuser - :worker_2_port CREATE TABLE table_first_4(id bigserial PRIMARY KEY, value char); CREATE TABLE table_first_5(id bigserial PRIMARY KEY, value char); CREATE TABLE table_first_6(id bigserial PRIMARY KEY, value char); +\c - admin_user - :worker_1_port +CREATE TABLE table_second_8(id bigserial PRIMARY KEY, value char); +CREATE TABLE table_second_9(id bigserial PRIMARY KEY, value char); + +\c - admin_user - :worker_2_port CREATE TABLE table_second_7(id bigserial PRIMARY KEY, value char); CREATE TABLE table_second_8(id bigserial PRIMARY KEY, value char); CREATE TABLE table_second_9(id bigserial PRIMARY KEY, value char); --- Test scenario one starts from here --- 1. table_first and table_second are colocated tables. --- 2. Shard table_first_4 and table_second_7 are colocated on worker1 --- 3. table_first_4 is split into table_first_5 and table_first_6 with target as worker2 --- 4. table_second_7 is split into table_second_8 and table_second_9 with target as worker2 --- 5. Create Replication slot with 'decoding_plugin_for_shard_split' --- 6. Setup Pub/Sub --- 7. Insert into table_first_4 and table_second_7 at source worker1 --- 8. Expect the results in child shards on worker2 +--- Test scenario one starts from here +--- 1. table_first and table_second are colocated tables. +--- 2. myuser is the owner table_first and admin_user is the owner of table_second. +--- 3. Shard table_first_4 and table_second_7 are colocated on worker1 +--- 4. table_first_4 is split into table_first_5 and table_first_6 with target as worker2 +--- 5. table_second_7 is split into table_second_8 and table_second_9 with target as worker2 +--- 6. Create two publishers and two subscribers for respective table owners. +--- 7. Insert into table_first_4 and table_second_7 at source worker1 +--- 8. Expect the results in child shards on worker2 -\c - - - :worker_1_port +\c - postgres - :worker_1_port -- Create publication at worker1 BEGIN; - CREATE PUBLICATION PUB1 for table table_first_4, table_first_5, table_first_6, table_second_7, table_second_8, table_second_9; + CREATE PUBLICATION PUB1 for table table_first_4, table_first_5, table_first_6; COMMIT; BEGIN; -select 1 from public.CreateReplicationSlotForColocatedShards(:worker_2_node, :worker_2_node); + CREATE PUBLICATION PUB2 for table table_second_7, table_second_8, table_second_9; +COMMIT; + +BEGIN; +select 1 from public.create_replication_slot_for_colocated_shards(:worker_2_node, :worker_2_node); COMMIT; SELECT pg_sleep(5); -\c - - - :worker_2_port + -- Create subscription at worker2 with copy_data to 'false' and derived replication slot name +\c - postgres - :worker_2_port +SET client_min_messages TO WARNING; BEGIN; -SELECT 1 from public.CreateSubscription(:worker_2_node, 'SUB1'); +SELECT 1 from public.create_subscription_for_owner_one(:worker_2_node, 'SUB1'); COMMIT; --- No data is present at this moment in all the below tables at worker2 -SELECT * from table_first_4; -SELECT * from table_first_5; -SELECT * from table_first_6; - - --- Insert data in table_to_split_1 at worker1 -\c - - - :worker_1_port +\c - myuser - :worker_1_port INSERT into table_first_4 values(100, 'a'); INSERT into table_first_4 values(400, 'a'); INSERT into table_first_4 values(500, 'a'); +select pg_sleep(2); -SELECT * from table_first_4; -SELECT * from table_first_5; -SELECT * from table_first_6; +\c - admin_user - :worker_1_port +INSERT INTO table_second_7 values(100, 'a'); +INSERT INTO table_second_7 values(400, 'a'); +SELECT * from table_second_7; +select pg_sleep(2); -INSERT into table_second_7 values(100, 'a'); -INSERT into table_second_7 values(400, 'a'); -INSERT into table_second_7 values(500, 'a'); - -SELECT * FROM table_second_7; -SELECT * FROM table_second_8; -SELECT * FROM table_second_9; -select pg_sleep(5); - --- Expect data to be present in shard 5,6 and 8,9 based on the hash value. -\c - - - :worker_2_port +\c - myuser - :worker_2_port SELECT * from table_first_4; -- should alwasy have zero rows SELECT * from table_first_5; SELECT * from table_first_6; -SELECT * FROM table_second_7; --should alwasy have zero rows -SELECT * FROM table_second_8; -SELECT * FROM table_second_9; \ No newline at end of file +\c - admin_user - :worker_2_port +SELECT * from table_second_7; +SELECT * from table_second_8; +SELECT * from table_second_9; + +\c - postgres - :worker_2_port +SET client_min_messages TO WARNING; +BEGIN; +SELECT 1 from public.create_subscription_for_owner_two(:worker_2_node, 'SUB2'); +COMMIT; +select pg_sleep(5); + +\c - admin_user - :worker_2_port +SELECT * from table_second_7; +SELECT * from table_second_8; +SELECT * from table_second_9; \ No newline at end of file diff --git a/src/test/regress/sql/split_shard_replication_setup.sql b/src/test/regress/sql/split_shard_replication_setup.sql index 546c82506..e95fe866a 100644 --- a/src/test/regress/sql/split_shard_replication_setup.sql +++ b/src/test/regress/sql/split_shard_replication_setup.sql @@ -53,13 +53,13 @@ COMMIT; -- Create replication slot for target node worker2 BEGIN; -select 1 from public.CreateReplicationSlot(:worker_2_node, :worker_2_node); +select 1 from public.create_replication_slot(:worker_2_node, :worker_2_node); COMMIT; \c - - - :worker_2_port -- Create subscription at worker2 with copy_data to 'false' and derived replication slot name BEGIN; -SELECT 1 from public.CreateSubscription(:worker_2_node, 'SUB1'); +SELECT 1 from public.create_subscription(:worker_2_node, 'SUB1'); COMMIT; select pg_sleep(5); @@ -132,19 +132,19 @@ COMMIT; -- Create replication slots for two target nodes worker1 and worker2. -- Worker1 is target for table_to_split_2 and Worker2 is target for table_to_split_3 BEGIN; -select 1 from public.CreateReplicationSlot(:worker_1_node, :worker_2_node); +select 1 from public.create_replication_slot(:worker_1_node, :worker_2_node); COMMIT; -- Create subscription at worker1 with copy_data to 'false' and derived replication slot name BEGIN; -SELECT 1 from public.CreateSubscription(:worker_1_node, 'SUB1'); +SELECT 1 from public.create_subscription(:worker_1_node, 'SUB1'); COMMIT; select pg_sleep(5); \c - - - :worker_2_port -- Create subscription at worker2 with copy_data to 'false' and derived replication slot name BEGIN; -SELECT 1 from public.CreateSubscription(:worker_2_node, 'SUB2'); +SELECT 1 from public.create_subscription(:worker_2_node, 'SUB2'); COMMIT; select pg_sleep(5); @@ -212,12 +212,12 @@ COMMIT; -- Worker1 is target for table_to_split_2 and table_to_split_3 BEGIN; -select 1 from public.CreateReplicationSlot(:worker_1_node, :worker_1_node); +select 1 from public.create_replication_slot(:worker_1_node, :worker_1_node); COMMIT; -- Create subscription at worker1 with copy_data to 'false' and derived replication slot name BEGIN; -SELECT 1 from public.CreateSubscription(:worker_1_node, 'SUB1'); +SELECT 1 from public.create_subscription(:worker_1_node, 'SUB1'); COMMIT; SELECT pg_sleep(5); diff --git a/src/test/regress/sql/split_shard_test_helpers.sql b/src/test/regress/sql/split_shard_test_helpers.sql index 2c88d8c3f..a06f08193 100644 --- a/src/test/regress/sql/split_shard_test_helpers.sql +++ b/src/test/regress/sql/split_shard_test_helpers.sql @@ -2,7 +2,7 @@ -- Populates shared memory mapping for parent shard with id 1. -- targetNode1, targetNode2 are the locations where child shard 2 and 3 are placed respectively -CREATE OR REPLACE FUNCTION SplitShardReplicationSetup(targetNode1 integer, targetNode2 integer) RETURNS text AS $$ +CREATE OR REPLACE FUNCTION split_shard_replication_setup_helper(targetNode1 integer, targetNode2 integer) RETURNS text AS $$ DECLARE memoryId bigint := 0; memoryIdText text; @@ -14,7 +14,7 @@ end $$ LANGUAGE plpgsql; -- Create replication slots for targetNode1 and targetNode2 incase of non-colocated shards -CREATE OR REPLACE FUNCTION CreateReplicationSlot(targetNode1 integer, targetNode2 integer) RETURNS text AS $$ +CREATE OR REPLACE FUNCTION create_replication_slot(targetNode1 integer, targetNode2 integer) RETURNS text AS $$ DECLARE targetOneSlotName text; targetTwoSlotName text; @@ -22,13 +22,13 @@ DECLARE derivedSlotName text; begin - SELECT * into sharedMemoryId from public.SplitShardReplicationSetup(targetNode1, targetNode2); - SELECT FORMAT('%s_%s', targetNode1, sharedMemoryId) into derivedSlotName; + SELECT * into sharedMemoryId from public.split_shard_replication_setup_helper(targetNode1, targetNode2); + SELECT FORMAT('%s_%s_10', targetNode1, sharedMemoryId) into derivedSlotName; SELECT slot_name into targetOneSlotName from pg_create_logical_replication_slot(derivedSlotName, 'decoding_plugin_for_shard_split'); -- if new child shards are placed on different nodes, create one more replication slot if (targetNode1 != targetNode2) then - SELECT FORMAT('%s_%s', targetNode2, sharedMemoryId) into derivedSlotName; + SELECT FORMAT('%s_%s_10', targetNode2, sharedMemoryId) into derivedSlotName; SELECT slot_name into targetTwoSlotName from pg_create_logical_replication_slot(derivedSlotName, 'decoding_plugin_for_shard_split'); INSERT INTO slotName_table values(targetTwoSlotName, targetNode2, 1); end if; @@ -40,7 +40,7 @@ $$ LANGUAGE plpgsql; -- Populates shared memory mapping for colocated parent shards 4 and 7. -- shard 4 has child shards 5 and 6. Shard 7 has child shards 8 and 9. -CREATE OR REPLACE FUNCTION SplitShardReplicationSetupForColocatedShards(targetNode1 integer, targetNode2 integer) RETURNS text AS $$ +CREATE OR REPLACE FUNCTION split_shard_replication_setup_for_colocated_shards(targetNode1 integer, targetNode2 integer) RETURNS text AS $$ DECLARE memoryId bigint := 0; memoryIdText text; @@ -59,32 +59,37 @@ end $$ LANGUAGE plpgsql; -- Create replication slots for targetNode1 and targetNode2 incase of colocated shards -CREATE OR REPLACE FUNCTION CreateReplicationSlotForColocatedShards(targetNode1 integer, targetNode2 integer) RETURNS text AS $$ +CREATE OR REPLACE FUNCTION create_replication_slot_for_colocated_shards(targetNode1 integer, targetNode2 integer) RETURNS text AS $$ DECLARE targetOneSlotName text; targetTwoSlotName text; sharedMemoryId text; - derivedSlotName text; + derivedSlotNameOne text; + derivedSlotNameTwo text; + tableOwnerOne bigint; + tableOwnerTwo bigint; begin + -- setup shared memory information + SELECT * into sharedMemoryId from public.split_shard_replication_setup_for_colocated_shards(targetNode1, targetNode2); - SELECT * into sharedMemoryId from public.SplitShardReplicationSetupForColocatedShards(targetNode1, targetNode2); - SELECT FORMAT('%s_%s', targetNode1, sharedMemoryId) into derivedSlotName; - SELECT slot_name into targetOneSlotName from pg_create_logical_replication_slot(derivedSlotName, 'decoding_plugin_for_shard_split'); + SELECT relowner into tableOwnerOne from pg_class where relname='table_first'; + SELECT FORMAT('%s_%s_%s', targetNode1, sharedMemoryId, tableOwnerOne) into derivedSlotNameOne; + SELECT slot_name into targetOneSlotName from pg_create_logical_replication_slot(derivedSlotNameOne, 'decoding_plugin_for_shard_split'); - -- if new child shards are placed on different nodes, create one more replication slot - if (targetNode1 != targetNode2) then - SELECT FORMAT('%s_%s', targetNode2, sharedMemoryId) into derivedSlotName; - SELECT slot_name into targetTwoSlotName from pg_create_logical_replication_slot(derivedSlotName, 'decoding_plugin_for_shard_split'); - INSERT INTO slotName_table values(targetTwoSlotName, targetNode2, 1); - end if; + SELECT relowner into tableOwnerTwo from pg_class where relname='table_second'; + SELECT FORMAT('%s_%s_%s', targetNode2, sharedMemoryId, tableOwnerTwo) into derivedSlotNameTwo; + SELECT slot_name into targetTwoSlotName from pg_create_logical_replication_slot(derivedSlotNameTwo, 'decoding_plugin_for_shard_split'); - INSERT INTO slotName_table values(targetOneSlotName, targetNode1, 2); + + INSERT INTO slotName_table values(targetOneSlotName, targetNode1, 1); + INSERT INTO slotName_table values(targetTwoSlotName, targetNode2, 2); + return targetOneSlotName; end $$ LANGUAGE plpgsql; -- create subscription on target node with given 'subscriptionName' -CREATE OR REPLACE FUNCTION CreateSubscription(targetNodeId integer, subscriptionName text) RETURNS text AS $$ +CREATE OR REPLACE FUNCTION create_subscription(targetNodeId integer, subscriptionName text) RETURNS text AS $$ DECLARE replicationSlotName text; nodeportLocal int; @@ -94,4 +99,32 @@ begin EXECUTE FORMAT($sub$create subscription %s connection 'host=localhost port=57637 user=postgres dbname=regression' publication PUB1 with(create_slot=false, enabled=true, slot_name='%s', copy_data=false)$sub$, subscriptionName, replicationSlotName); return replicationSlotName; end +$$ LANGUAGE plpgsql; + +-- create subscription on target node with given 'subscriptionName' +CREATE OR REPLACE FUNCTION create_subscription_for_owner_one(targetNodeId integer, subscriptionName text) RETURNS text AS $$ +DECLARE + replicationSlotName text; + nodeportLocal int; + subname text; +begin + SELECT name into replicationSlotName from slotName_table where id = 1; + EXECUTE FORMAT($sub$create subscription %s connection 'host=localhost port=57637 user=postgres dbname=regression' publication PUB1 with(create_slot=false, enabled=true, slot_name='%s', copy_data=false)$sub$, subscriptionName, replicationSlotName); + RAISE NOTICE 'sameer %', replicationSlotName; + return replicationSlotName; +end +$$ LANGUAGE plpgsql; + +-- create subscription on target node with given 'subscriptionName' +CREATE OR REPLACE FUNCTION create_subscription_for_owner_two(targetNodeId integer, subscriptionName text) RETURNS text AS $$ +DECLARE + replicationSlotName text; + nodeportLocal int; + subname text; +begin + SELECT name into replicationSlotName from slotName_table where id = 2; + EXECUTE FORMAT($sub$create subscription %s connection 'host=localhost port=57637 user=postgres dbname=regression' publication PUB2 with(create_slot=false, enabled=true, slot_name='%s', copy_data=false)$sub$, subscriptionName, replicationSlotName); + RAISE NOTICE 'sameer %', replicationSlotName; + return replicationSlotName; +end $$ LANGUAGE plpgsql; \ No newline at end of file