diff --git a/src/backend/distributed/operations/shard_split_replicatoin.c b/src/backend/distributed/operations/shard_split_replicatoin.c index c15577274..a290874cb 100644 --- a/src/backend/distributed/operations/shard_split_replicatoin.c +++ b/src/backend/distributed/operations/shard_split_replicatoin.c @@ -25,7 +25,7 @@ static HTAB *ShardInfoHashMap = NULL; /* Entry for hash map */ typedef struct NodeShardMappingEntry { - uint64_t key; + uint64_t keyNodeId; List *shardSplitInfoList; } NodeShardMappingEntry; @@ -397,7 +397,7 @@ AddShardSplitInfoEntryForNodeInMap(ShardSplitInfo *shardSplitInfo) if (!found) { nodeMappingEntry->shardSplitInfoList = NULL; - nodeMappingEntry->key = keyNodeId; + nodeMappingEntry->keyNodeId = keyNodeId; } nodeMappingEntry->shardSplitInfoList = @@ -433,7 +433,7 @@ PopulateShardSplitInfoInSM(ShardSplitInfo *shardSplitInfoArray, int index = 0; while ((entry = (NodeShardMappingEntry *) hash_seq_search(&status)) != NULL) { - uint64_t nodeId = entry->key; + uint64_t nodeId = entry->keyNodeId; char *derivedSlotName = encode_replication_slot(nodeId, dsmHandle); diff --git a/src/test/regress/expected/multi_test_helpers.out b/src/test/regress/expected/multi_test_helpers.out index 5c40c886d..5ae18f26b 100644 --- a/src/test/regress/expected/multi_test_helpers.out +++ b/src/test/regress/expected/multi_test_helpers.out @@ -124,62 +124,3 @@ BEGIN END LOOP; END; $$ LANGUAGE plpgsql; -\c - - - :worker_1_port --- Create replication slots for targetNode1 and targetNode2 -CREATE OR REPLACE FUNCTION CreateReplicationSlot(targetNode1 integer, targetNode2 integer) RETURNS text AS $$ -DECLARE - targetOneSlotName text; - targetTwoSlotName text; - sharedMemoryId text; - derivedSlotName text; -begin - - SELECT * into sharedMemoryId from public.SplitShardReplicationSetup(targetNode1, targetNode2); - SELECT FORMAT('%s_%s', targetNode1, sharedMemoryId) into derivedSlotName; - SELECT slot_name into targetOneSlotName from pg_create_logical_replication_slot(derivedSlotName, 'decoding_plugin_for_shard_split'); - - -- if new child shards are placed on different nodes, create one more replication slot - if (targetNode1 != targetNode2) then - SELECT FORMAT('%s_%s', targetNode2, sharedMemoryId) into derivedSlotName; - SELECT slot_name into targetTwoSlotName from pg_create_logical_replication_slot(derivedSlotName, 'decoding_plugin_for_shard_split'); - INSERT INTO slotName_table values(targetTwoSlotName, targetNode2, 1); - end if; - - INSERT INTO slotName_table values(targetOneSlotName, targetNode1, 2); - return targetOneSlotName; -end -$$ LANGUAGE plpgsql; --- targetNode1, targetNode2 are the locations where childShard1 and childShard2 are placed respectively -CREATE OR REPLACE FUNCTION SplitShardReplicationSetup(targetNode1 integer, targetNode2 integer) RETURNS text AS $$ -DECLARE - memoryId bigint := 0; - memoryIdText text; -begin - SELECT * into memoryId from split_shard_replication_setup(ARRAY[ARRAY[1,2,-2147483648,-1, targetNode1], ARRAY[1,3,0,2147483647,targetNode2]]); - SELECT FORMAT('%s', memoryId) into memoryIdText; - return memoryIdText; -end -$$ LANGUAGE plpgsql; -CREATE OR REPLACE FUNCTION CreateSubscription(targetNodeId integer, subscriptionName text) RETURNS text AS $$ -DECLARE - replicationSlotName text; - nodeportLocal int; - subname text; -begin - SELECT name into replicationSlotName from slotName_table where nodeId = targetNodeId; - EXECUTE FORMAT($sub$create subscription %s connection 'host=localhost port=xxxxx user=postgres dbname=regression' publication PUB1 with(create_slot=false, enabled=true, slot_name='%s', copy_data=false)$sub$, subscriptionName, replicationSlotName); - return 'a'; -end -$$ LANGUAGE plpgsql; -\c - - - :worker_2_port -CREATE OR REPLACE FUNCTION CreateSubscription(targetNodeId integer, subscriptionName text) RETURNS text AS $$ -DECLARE - replicationSlotName text; - nodeportLocal int; - subname text; -begin - SELECT name into replicationSlotName from slotName_table where nodeId = targetNodeId; - EXECUTE FORMAT($sub$create subscription %s connection 'host=localhost port=xxxxx user=postgres dbname=regression' publication PUB1 with(create_slot=false, enabled=true, slot_name='%s', copy_data=false)$sub$, subscriptionName, replicationSlotName); - return 'a'; -end -$$ LANGUAGE plpgsql; diff --git a/src/test/regress/expected/split_shard_replication_colocated_setup.out b/src/test/regress/expected/split_shard_replication_colocated_setup.out new file mode 100644 index 000000000..fe9aa21df --- /dev/null +++ b/src/test/regress/expected/split_shard_replication_colocated_setup.out @@ -0,0 +1,202 @@ +\c - - - :master_port +SET citus.shard_replication_factor TO 1; +SET citus.shard_count TO 1; +SET citus.next_shard_id TO 4; +-- Add two additional nodes to cluster. +SELECT 1 FROM citus_add_node('localhost', :worker_1_port); + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + +SELECT 1 FROM citus_add_node('localhost', :worker_2_port); + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + +SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset +SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \gset +-- Create distributed table (co-located) +CREATE TABLE table_first (id bigserial PRIMARY KEY, value char); +CREATE TABLE table_second (id bigserial PRIMARY KEY, value char); +SELECT create_distributed_table('table_first','id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SET citus.next_shard_id TO 7; +SET citus.shard_count TO 1; +SELECT create_distributed_table('table_second', 'id', colocate_with => 'table_first'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +\c - - - :worker_1_port +CREATE TABLE table_first_5(id bigserial PRIMARY KEY, value char); +CREATE TABLE table_first_6(id bigserial PRIMARY KEY, value char); +CREATE TABLE table_second_8(id bigserial PRIMARY KEY, value char); +CREATE TABLE table_second_9(id bigserial PRIMARY KEY, value char); +\c - - - :worker_2_port +CREATE TABLE table_first_4(id bigserial PRIMARY KEY, value char); +CREATE TABLE table_first_5(id bigserial PRIMARY KEY, value char); +CREATE TABLE table_first_6(id bigserial PRIMARY KEY, value char); +CREATE TABLE table_second_7(id bigserial PRIMARY KEY, value char); +CREATE TABLE table_second_8(id bigserial PRIMARY KEY, value char); +CREATE TABLE table_second_9(id bigserial PRIMARY KEY, value char); +-- Test scenario one starts from here +-- 1. table_first and table_second are colocated tables. +-- 2. Shard table_first_4 and table_second_7 are colocated on worker1 +-- 3. table_first_4 is split into table_first_5 and table_first_6 with target as worker2 +-- 4. table_second_7 is split into table_second_8 and table_second_9 with target as worker2 +-- 5. Create Replication slot with 'decoding_plugin_for_shard_split' +-- 6. Setup Pub/Sub +-- 7. Insert into table_first_4 and table_second_7 at source worker1 +-- 8. Expect the results in child shards on worker2 +\c - - - :worker_1_port +-- Create publication at worker1 +BEGIN; + CREATE PUBLICATION PUB1 for table table_first_4, table_first_5, table_first_6, table_second_7, table_second_8, table_second_9; +COMMIT; +BEGIN; +select 1 from public.CreateReplicationSlotForColocatedShards(:worker_2_node, :worker_2_node); + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + +COMMIT; +SELECT pg_sleep(10); + pg_sleep +--------------------------------------------------------------------- + +(1 row) + +\c - - - :worker_2_port +-- Create subscription at worker2 with copy_data to 'false' and derived replication slot name +BEGIN; +SELECT 1 from public.CreateSubscription(:worker_2_node, 'SUB1'); + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + +COMMIT; +-- No data is present at this moment in all the below tables at worker2 +SELECT * from table_first_4; + id | value +--------------------------------------------------------------------- +(0 rows) + +SELECT * from table_first_5; + id | value +--------------------------------------------------------------------- +(0 rows) + +SELECT * from table_first_6; + id | value +--------------------------------------------------------------------- +(0 rows) + +select pg_sleep(10); + pg_sleep +--------------------------------------------------------------------- + +(1 row) + +-- Insert data in table_to_split_1 at worker1 +\c - - - :worker_1_port +INSERT into table_first_4 values(100, 'a'); +INSERT into table_first_4 values(400, 'a'); +INSERT into table_first_4 values(500, 'a'); +SELECT * from table_first_4; + id | value +--------------------------------------------------------------------- + 100 | a + 400 | a + 500 | a +(3 rows) + +SELECT * from table_first_5; + id | value +--------------------------------------------------------------------- +(0 rows) + +SELECT * from table_first_6; + id | value +--------------------------------------------------------------------- +(0 rows) + +INSERT into table_second_7 values(100, 'a'); +INSERT into table_second_7 values(400, 'a'); +INSERT into table_second_7 values(500, 'a'); +SELECT * FROM table_second_7; + id | value +--------------------------------------------------------------------- + 100 | a + 400 | a + 500 | a +(3 rows) + +SELECT * FROM table_second_8; + id | value +--------------------------------------------------------------------- +(0 rows) + +SELECT * FROM table_second_9; + id | value +--------------------------------------------------------------------- +(0 rows) + +select pg_sleep(10); + pg_sleep +--------------------------------------------------------------------- + +(1 row) + +-- Expect data to be present in shard xxxxx,6 and 8,9 based on the hash value. +\c - - - :worker_2_port +select pg_sleep(10); + pg_sleep +--------------------------------------------------------------------- + +(1 row) + +SELECT * from table_first_4; -- should alwasy have zero rows + id | value +--------------------------------------------------------------------- +(0 rows) + +SELECT * from table_first_5; + id | value +--------------------------------------------------------------------- + 400 | a +(1 row) + +SELECT * from table_first_6; + id | value +--------------------------------------------------------------------- + 100 | a + 500 | a +(2 rows) + +SELECT * FROM table_second_7; --should alwasy have zero rows + id | value +--------------------------------------------------------------------- +(0 rows) + +SELECT * FROM table_second_8; + id | value +--------------------------------------------------------------------- + 400 | a +(1 row) + +SELECT * FROM table_second_9; + id | value +--------------------------------------------------------------------- + 100 | a + 500 | a +(2 rows) + diff --git a/src/test/regress/expected/split_shard_replication_setup.out b/src/test/regress/expected/split_shard_replication_setup.out index e0011f60f..948493071 100644 --- a/src/test/regress/expected/split_shard_replication_setup.out +++ b/src/test/regress/expected/split_shard_replication_setup.out @@ -356,14 +356,17 @@ DELETE FROM slotName_table; SET client_min_messages TO WARNING; DROP SUBSCRIPTION SUB2; DELETE FROM slotName_table; --- Test Scenario 3 +-- Test scenario three starts from here (parent shard and child shards are located on same machine) +-- 1. table_to_split_1 is split into table_to_split_2 and table_to_split_3. +-- 2. table_to_split_1 is located on worker1. +-- 3. table_to_split_2 and table_to_split_3 are located on worker1 \c - - - :worker_1_port +SET client_min_messages TO WARNING; -- Create publication at worker1 BEGIN; CREATE PUBLICATION PUB1 for table table_to_split_1, table_to_split_2, table_to_split_3; COMMIT; --- Create replication slots for two target nodes worker1 and worker2. --- Worker1 is target for table_to_split_2 and Worker2 is target for table_to_split_3 +-- Worker1 is target for table_to_split_2 and table_to_split_3 BEGIN; select 1 from public.CreateReplicationSlot(:worker_1_node, :worker_1_node); ?column? @@ -393,7 +396,7 @@ SELECT pg_sleep(10); (1 row) - INSERT into table_to_split_1 values(100, 'a'); +INSERT into table_to_split_1 values(100, 'a'); INSERT into table_to_split_1 values(400, 'a'); INSERT into table_to_split_1 values(500, 'a'); select pg_sleep(10); @@ -402,7 +405,7 @@ select pg_sleep(10); (1 row) --- expect data to present in table_to_split_2 on worker1 +-- expect data to present in table_to_split_2/3 on worker1 SELECT * from table_to_split_1; id | value --------------------------------------------------------------------- @@ -452,3 +455,7 @@ SELECT * from table_to_split_3; --------------------------------------------------------------------- (0 rows) +-- clean up +DROP PUBLICATION PUB1; +DELETE FROM slotName_table; +DROP SUBSCRIPTION SUB1; diff --git a/src/test/regress/expected/split_shard_test_helpers.out b/src/test/regress/expected/split_shard_test_helpers.out new file mode 100644 index 000000000..27f4cd24c --- /dev/null +++ b/src/test/regress/expected/split_shard_test_helpers.out @@ -0,0 +1,92 @@ +-- File to create functions and helpers needed for split shard tests +-- Populates shared memory mapping for parent shard with id 1. +-- targetNode1, targetNode2 are the locations where child shard xxxxx and 3 are placed respectively +CREATE OR REPLACE FUNCTION SplitShardReplicationSetup(targetNode1 integer, targetNode2 integer) RETURNS text AS $$ +DECLARE + memoryId bigint := 0; + memoryIdText text; +begin + SELECT * into memoryId from split_shard_replication_setup(ARRAY[ARRAY[1,2,-2147483648,-1, targetNode1], ARRAY[1,3,0,2147483647,targetNode2]]); + SELECT FORMAT('%s', memoryId) into memoryIdText; + return memoryIdText; +end +$$ LANGUAGE plpgsql; +-- Create replication slots for targetNode1 and targetNode2 incase of non-colocated shards +CREATE OR REPLACE FUNCTION CreateReplicationSlot(targetNode1 integer, targetNode2 integer) RETURNS text AS $$ +DECLARE + targetOneSlotName text; + targetTwoSlotName text; + sharedMemoryId text; + derivedSlotName text; +begin + + SELECT * into sharedMemoryId from public.SplitShardReplicationSetup(targetNode1, targetNode2); + SELECT FORMAT('%s_%s', targetNode1, sharedMemoryId) into derivedSlotName; + SELECT slot_name into targetOneSlotName from pg_create_logical_replication_slot(derivedSlotName, 'decoding_plugin_for_shard_split'); + + -- if new child shards are placed on different nodes, create one more replication slot + if (targetNode1 != targetNode2) then + SELECT FORMAT('%s_%s', targetNode2, sharedMemoryId) into derivedSlotName; + SELECT slot_name into targetTwoSlotName from pg_create_logical_replication_slot(derivedSlotName, 'decoding_plugin_for_shard_split'); + INSERT INTO slotName_table values(targetTwoSlotName, targetNode2, 1); + end if; + + INSERT INTO slotName_table values(targetOneSlotName, targetNode1, 2); + return targetOneSlotName; +end +$$ LANGUAGE plpgsql; +-- Populates shared memory mapping for colocated parent shards 4 and 7. +-- shard xxxxx has child shards 5 and 6. Shard 7 has child shards 8 and 9. +CREATE OR REPLACE FUNCTION SplitShardReplicationSetupForColocatedShards(targetNode1 integer, targetNode2 integer) RETURNS text AS $$ +DECLARE + memoryId bigint := 0; + memoryIdText text; +begin + SELECT * into memoryId from split_shard_replication_setup( + ARRAY[ + ARRAY[4, 5, -2147483648,-1, targetNode1], + ARRAY[4, 6, 0 ,2147483647, targetNode2], + ARRAY[7, 8, -2147483648,-1, targetNode1], + ARRAY[7, 9, 0, 2147483647 , targetNode2] + ]); + + SELECT FORMAT('%s', memoryId) into memoryIdText; + return memoryIdText; +end +$$ LANGUAGE plpgsql; +-- Create replication slots for targetNode1 and targetNode2 incase of colocated shards +CREATE OR REPLACE FUNCTION CreateReplicationSlotForColocatedShards(targetNode1 integer, targetNode2 integer) RETURNS text AS $$ +DECLARE + targetOneSlotName text; + targetTwoSlotName text; + sharedMemoryId text; + derivedSlotName text; +begin + + SELECT * into sharedMemoryId from public.SplitShardReplicationSetupForColocatedShards(targetNode1, targetNode2); + SELECT FORMAT('%s_%s', targetNode1, sharedMemoryId) into derivedSlotName; + SELECT slot_name into targetOneSlotName from pg_create_logical_replication_slot(derivedSlotName, 'decoding_plugin_for_shard_split'); + + -- if new child shards are placed on different nodes, create one more replication slot + if (targetNode1 != targetNode2) then + SELECT FORMAT('%s_%s', targetNode2, sharedMemoryId) into derivedSlotName; + SELECT slot_name into targetTwoSlotName from pg_create_logical_replication_slot(derivedSlotName, 'decoding_plugin_for_shard_split'); + INSERT INTO slotName_table values(targetTwoSlotName, targetNode2, 1); + end if; + + INSERT INTO slotName_table values(targetOneSlotName, targetNode1, 2); + return targetOneSlotName; +end +$$ LANGUAGE plpgsql; +-- create subscription on target node with given 'subscriptionName' +CREATE OR REPLACE FUNCTION CreateSubscription(targetNodeId integer, subscriptionName text) RETURNS text AS $$ +DECLARE + replicationSlotName text; + nodeportLocal int; + subname text; +begin + SELECT name into replicationSlotName from slotName_table where nodeId = targetNodeId; + EXECUTE FORMAT($sub$create subscription %s connection 'host=localhost port=xxxxx user=postgres dbname=regression' publication PUB1 with(create_slot=false, enabled=true, slot_name='%s', copy_data=false)$sub$, subscriptionName, replicationSlotName); + return replicationSlotName; +end +$$ LANGUAGE plpgsql; diff --git a/src/test/regress/sql/multi_test_helpers.sql b/src/test/regress/sql/multi_test_helpers.sql index 4e3eeeb51..9e70cb971 100644 --- a/src/test/regress/sql/multi_test_helpers.sql +++ b/src/test/regress/sql/multi_test_helpers.sql @@ -135,66 +135,3 @@ BEGIN END; $$ LANGUAGE plpgsql; -\c - - - :worker_1_port --- Create replication slots for targetNode1 and targetNode2 -CREATE OR REPLACE FUNCTION CreateReplicationSlot(targetNode1 integer, targetNode2 integer) RETURNS text AS $$ -DECLARE - targetOneSlotName text; - targetTwoSlotName text; - sharedMemoryId text; - derivedSlotName text; -begin - - SELECT * into sharedMemoryId from public.SplitShardReplicationSetup(targetNode1, targetNode2); - SELECT FORMAT('%s_%s', targetNode1, sharedMemoryId) into derivedSlotName; - SELECT slot_name into targetOneSlotName from pg_create_logical_replication_slot(derivedSlotName, 'decoding_plugin_for_shard_split'); - - -- if new child shards are placed on different nodes, create one more replication slot - if (targetNode1 != targetNode2) then - SELECT FORMAT('%s_%s', targetNode2, sharedMemoryId) into derivedSlotName; - SELECT slot_name into targetTwoSlotName from pg_create_logical_replication_slot(derivedSlotName, 'decoding_plugin_for_shard_split'); - INSERT INTO slotName_table values(targetTwoSlotName, targetNode2, 1); - end if; - - INSERT INTO slotName_table values(targetOneSlotName, targetNode1, 2); - return targetOneSlotName; -end -$$ LANGUAGE plpgsql; - --- targetNode1, targetNode2 are the locations where childShard1 and childShard2 are placed respectively -CREATE OR REPLACE FUNCTION SplitShardReplicationSetup(targetNode1 integer, targetNode2 integer) RETURNS text AS $$ -DECLARE - memoryId bigint := 0; - memoryIdText text; -begin - SELECT * into memoryId from split_shard_replication_setup(ARRAY[ARRAY[1,2,-2147483648,-1, targetNode1], ARRAY[1,3,0,2147483647,targetNode2]]); - SELECT FORMAT('%s', memoryId) into memoryIdText; - return memoryIdText; -end -$$ LANGUAGE plpgsql; - -CREATE OR REPLACE FUNCTION CreateSubscription(targetNodeId integer, subscriptionName text) RETURNS text AS $$ -DECLARE - replicationSlotName text; - nodeportLocal int; - subname text; -begin - SELECT name into replicationSlotName from slotName_table where nodeId = targetNodeId; - EXECUTE FORMAT($sub$create subscription %s connection 'host=localhost port=57637 user=postgres dbname=regression' publication PUB1 with(create_slot=false, enabled=true, slot_name='%s', copy_data=false)$sub$, subscriptionName, replicationSlotName); - return 'a'; -end -$$ LANGUAGE plpgsql; - -\c - - - :worker_2_port -CREATE OR REPLACE FUNCTION CreateSubscription(targetNodeId integer, subscriptionName text) RETURNS text AS $$ -DECLARE - replicationSlotName text; - nodeportLocal int; - subname text; -begin - SELECT name into replicationSlotName from slotName_table where nodeId = targetNodeId; - EXECUTE FORMAT($sub$create subscription %s connection 'host=localhost port=57637 user=postgres dbname=regression' publication PUB1 with(create_slot=false, enabled=true, slot_name='%s', copy_data=false)$sub$, subscriptionName, replicationSlotName); - return 'a'; -end -$$ LANGUAGE plpgsql; - diff --git a/src/test/regress/sql/split_shard_replication_colocated_setup.sql b/src/test/regress/sql/split_shard_replication_colocated_setup.sql new file mode 100644 index 000000000..5c9017e3c --- /dev/null +++ b/src/test/regress/sql/split_shard_replication_colocated_setup.sql @@ -0,0 +1,100 @@ +\c - - - :master_port +SET citus.shard_replication_factor TO 1; +SET citus.shard_count TO 1; +SET citus.next_shard_id TO 4; + +-- Add two additional nodes to cluster. +SELECT 1 FROM citus_add_node('localhost', :worker_1_port); +SELECT 1 FROM citus_add_node('localhost', :worker_2_port); + +SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset +SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \gset + +-- Create distributed table (co-located) +CREATE TABLE table_first (id bigserial PRIMARY KEY, value char); +CREATE TABLE table_second (id bigserial PRIMARY KEY, value char); + +SELECT create_distributed_table('table_first','id'); + +SET citus.next_shard_id TO 7; +SET citus.shard_count TO 1; +SELECT create_distributed_table('table_second', 'id', colocate_with => 'table_first'); + +\c - - - :worker_1_port +CREATE TABLE table_first_5(id bigserial PRIMARY KEY, value char); +CREATE TABLE table_first_6(id bigserial PRIMARY KEY, value char); + +CREATE TABLE table_second_8(id bigserial PRIMARY KEY, value char); +CREATE TABLE table_second_9(id bigserial PRIMARY KEY, value char); + +\c - - - :worker_2_port +CREATE TABLE table_first_4(id bigserial PRIMARY KEY, value char); +CREATE TABLE table_first_5(id bigserial PRIMARY KEY, value char); +CREATE TABLE table_first_6(id bigserial PRIMARY KEY, value char); + +CREATE TABLE table_second_7(id bigserial PRIMARY KEY, value char); +CREATE TABLE table_second_8(id bigserial PRIMARY KEY, value char); +CREATE TABLE table_second_9(id bigserial PRIMARY KEY, value char); + +-- Test scenario one starts from here +-- 1. table_first and table_second are colocated tables. +-- 2. Shard table_first_4 and table_second_7 are colocated on worker1 +-- 3. table_first_4 is split into table_first_5 and table_first_6 with target as worker2 +-- 4. table_second_7 is split into table_second_8 and table_second_9 with target as worker2 +-- 5. Create Replication slot with 'decoding_plugin_for_shard_split' +-- 6. Setup Pub/Sub +-- 7. Insert into table_first_4 and table_second_7 at source worker1 +-- 8. Expect the results in child shards on worker2 + +\c - - - :worker_1_port +-- Create publication at worker1 +BEGIN; + CREATE PUBLICATION PUB1 for table table_first_4, table_first_5, table_first_6, table_second_7, table_second_8, table_second_9; +COMMIT; + +BEGIN; +select 1 from public.CreateReplicationSlotForColocatedShards(:worker_2_node, :worker_2_node); +COMMIT; +SELECT pg_sleep(10); + +\c - - - :worker_2_port +-- Create subscription at worker2 with copy_data to 'false' and derived replication slot name +BEGIN; +SELECT 1 from public.CreateSubscription(:worker_2_node, 'SUB1'); +COMMIT; + +-- No data is present at this moment in all the below tables at worker2 +SELECT * from table_first_4; +SELECT * from table_first_5; +SELECT * from table_first_6; +select pg_sleep(10); + +-- Insert data in table_to_split_1 at worker1 +\c - - - :worker_1_port +INSERT into table_first_4 values(100, 'a'); +INSERT into table_first_4 values(400, 'a'); +INSERT into table_first_4 values(500, 'a'); + +SELECT * from table_first_4; +SELECT * from table_first_5; +SELECT * from table_first_6; + +INSERT into table_second_7 values(100, 'a'); +INSERT into table_second_7 values(400, 'a'); +INSERT into table_second_7 values(500, 'a'); + +SELECT * FROM table_second_7; +SELECT * FROM table_second_8; +SELECT * FROM table_second_9; +select pg_sleep(10); + +-- Expect data to be present in shard 5,6 and 8,9 based on the hash value. +\c - - - :worker_2_port +select pg_sleep(10); +SELECT * from table_first_4; -- should alwasy have zero rows +SELECT * from table_first_5; +SELECT * from table_first_6; + +SELECT * FROM table_second_7; --should alwasy have zero rows +SELECT * FROM table_second_8; +SELECT * FROM table_second_9; \ No newline at end of file diff --git a/src/test/regress/sql/split_shard_replication_setup.sql b/src/test/regress/sql/split_shard_replication_setup.sql index 75e6588de..c8908d0ea 100644 --- a/src/test/regress/sql/split_shard_replication_setup.sql +++ b/src/test/regress/sql/split_shard_replication_setup.sql @@ -185,20 +185,25 @@ DROP SUBSCRIPTION SUB1; DELETE FROM slotName_table; \c - - - :worker_2_port + SET client_min_messages TO WARNING; DROP SUBSCRIPTION SUB2; DELETE FROM slotName_table; --- Test Scenario 3 +-- Test scenario three starts from here (parent shard and child shards are located on same machine) +-- 1. table_to_split_1 is split into table_to_split_2 and table_to_split_3. +-- 2. table_to_split_1 is located on worker1. +-- 3. table_to_split_2 and table_to_split_3 are located on worker1 + \c - - - :worker_1_port +SET client_min_messages TO WARNING; -- Create publication at worker1 BEGIN; CREATE PUBLICATION PUB1 for table table_to_split_1, table_to_split_2, table_to_split_3; COMMIT; --- Create replication slots for two target nodes worker1 and worker2. --- Worker1 is target for table_to_split_2 and Worker2 is target for table_to_split_3 +-- Worker1 is target for table_to_split_2 and table_to_split_3 BEGIN; select 1 from public.CreateReplicationSlot(:worker_1_node, :worker_1_node); COMMIT; @@ -210,12 +215,12 @@ SELECT 1 from public.CreateSubscription(:worker_1_node, 'SUB1'); COMMIT; SELECT pg_sleep(10); - INSERT into table_to_split_1 values(100, 'a'); +INSERT into table_to_split_1 values(100, 'a'); INSERT into table_to_split_1 values(400, 'a'); INSERT into table_to_split_1 values(500, 'a'); select pg_sleep(10); --- expect data to present in table_to_split_2 on worker1 +-- expect data to present in table_to_split_2/3 on worker1 SELECT * from table_to_split_1; SELECT * from table_to_split_2; SELECT * from table_to_split_3; @@ -225,4 +230,9 @@ DELETE FROM table_to_split_1; SELECT pg_sleep(10); SELECT * from table_to_split_1; SELECT * from table_to_split_2; -SELECT * from table_to_split_3; \ No newline at end of file +SELECT * from table_to_split_3; + +-- clean up +DROP PUBLICATION PUB1; +DELETE FROM slotName_table; +DROP SUBSCRIPTION SUB1; \ No newline at end of file diff --git a/src/test/regress/sql/split_shard_test_helpers.sql b/src/test/regress/sql/split_shard_test_helpers.sql new file mode 100644 index 000000000..2c88d8c3f --- /dev/null +++ b/src/test/regress/sql/split_shard_test_helpers.sql @@ -0,0 +1,97 @@ +-- File to create functions and helpers needed for split shard tests + +-- Populates shared memory mapping for parent shard with id 1. +-- targetNode1, targetNode2 are the locations where child shard 2 and 3 are placed respectively +CREATE OR REPLACE FUNCTION SplitShardReplicationSetup(targetNode1 integer, targetNode2 integer) RETURNS text AS $$ +DECLARE + memoryId bigint := 0; + memoryIdText text; +begin + SELECT * into memoryId from split_shard_replication_setup(ARRAY[ARRAY[1,2,-2147483648,-1, targetNode1], ARRAY[1,3,0,2147483647,targetNode2]]); + SELECT FORMAT('%s', memoryId) into memoryIdText; + return memoryIdText; +end +$$ LANGUAGE plpgsql; + +-- Create replication slots for targetNode1 and targetNode2 incase of non-colocated shards +CREATE OR REPLACE FUNCTION CreateReplicationSlot(targetNode1 integer, targetNode2 integer) RETURNS text AS $$ +DECLARE + targetOneSlotName text; + targetTwoSlotName text; + sharedMemoryId text; + derivedSlotName text; +begin + + SELECT * into sharedMemoryId from public.SplitShardReplicationSetup(targetNode1, targetNode2); + SELECT FORMAT('%s_%s', targetNode1, sharedMemoryId) into derivedSlotName; + SELECT slot_name into targetOneSlotName from pg_create_logical_replication_slot(derivedSlotName, 'decoding_plugin_for_shard_split'); + + -- if new child shards are placed on different nodes, create one more replication slot + if (targetNode1 != targetNode2) then + SELECT FORMAT('%s_%s', targetNode2, sharedMemoryId) into derivedSlotName; + SELECT slot_name into targetTwoSlotName from pg_create_logical_replication_slot(derivedSlotName, 'decoding_plugin_for_shard_split'); + INSERT INTO slotName_table values(targetTwoSlotName, targetNode2, 1); + end if; + + INSERT INTO slotName_table values(targetOneSlotName, targetNode1, 2); + return targetOneSlotName; +end +$$ LANGUAGE plpgsql; + +-- Populates shared memory mapping for colocated parent shards 4 and 7. +-- shard 4 has child shards 5 and 6. Shard 7 has child shards 8 and 9. +CREATE OR REPLACE FUNCTION SplitShardReplicationSetupForColocatedShards(targetNode1 integer, targetNode2 integer) RETURNS text AS $$ +DECLARE + memoryId bigint := 0; + memoryIdText text; +begin + SELECT * into memoryId from split_shard_replication_setup( + ARRAY[ + ARRAY[4, 5, -2147483648,-1, targetNode1], + ARRAY[4, 6, 0 ,2147483647, targetNode2], + ARRAY[7, 8, -2147483648,-1, targetNode1], + ARRAY[7, 9, 0, 2147483647 , targetNode2] + ]); + + SELECT FORMAT('%s', memoryId) into memoryIdText; + return memoryIdText; +end +$$ LANGUAGE plpgsql; + +-- Create replication slots for targetNode1 and targetNode2 incase of colocated shards +CREATE OR REPLACE FUNCTION CreateReplicationSlotForColocatedShards(targetNode1 integer, targetNode2 integer) RETURNS text AS $$ +DECLARE + targetOneSlotName text; + targetTwoSlotName text; + sharedMemoryId text; + derivedSlotName text; +begin + + SELECT * into sharedMemoryId from public.SplitShardReplicationSetupForColocatedShards(targetNode1, targetNode2); + SELECT FORMAT('%s_%s', targetNode1, sharedMemoryId) into derivedSlotName; + SELECT slot_name into targetOneSlotName from pg_create_logical_replication_slot(derivedSlotName, 'decoding_plugin_for_shard_split'); + + -- if new child shards are placed on different nodes, create one more replication slot + if (targetNode1 != targetNode2) then + SELECT FORMAT('%s_%s', targetNode2, sharedMemoryId) into derivedSlotName; + SELECT slot_name into targetTwoSlotName from pg_create_logical_replication_slot(derivedSlotName, 'decoding_plugin_for_shard_split'); + INSERT INTO slotName_table values(targetTwoSlotName, targetNode2, 1); + end if; + + INSERT INTO slotName_table values(targetOneSlotName, targetNode1, 2); + return targetOneSlotName; +end +$$ LANGUAGE plpgsql; + +-- create subscription on target node with given 'subscriptionName' +CREATE OR REPLACE FUNCTION CreateSubscription(targetNodeId integer, subscriptionName text) RETURNS text AS $$ +DECLARE + replicationSlotName text; + nodeportLocal int; + subname text; +begin + SELECT name into replicationSlotName from slotName_table where nodeId = targetNodeId; + EXECUTE FORMAT($sub$create subscription %s connection 'host=localhost port=57637 user=postgres dbname=regression' publication PUB1 with(create_slot=false, enabled=true, slot_name='%s', copy_data=false)$sub$, subscriptionName, replicationSlotName); + return replicationSlotName; +end +$$ LANGUAGE plpgsql; \ No newline at end of file