diff --git a/src/test/regress/expected/split_shard_replication_setup.out b/src/test/regress/expected/split_shard_replication_setup.out index 09114c715..d51525fd6 100644 --- a/src/test/regress/expected/split_shard_replication_setup.out +++ b/src/test/regress/expected/split_shard_replication_setup.out @@ -16,6 +16,8 @@ SELECT 1 FROM citus_add_node('localhost', :worker_2_port); 1 (1 row) +SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset +SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \gset -- Create distributed table (non co-located) CREATE TABLE table_to_split (id bigserial PRIMARY KEY, value char); SELECT create_distributed_table('table_to_split','id'); @@ -26,56 +28,67 @@ SELECT create_distributed_table('table_to_split','id'); -- slotName_table is used to persist replication slot name. -- It is only used for testing as the worker2 needs to create subscription over the same replication slot. -CREATE TABLE slotName_table (name text, id int primary key); +CREATE TABLE slotName_table (name text, nodeId int, id int primary key); SELECT create_distributed_table('slotName_table','id'); create_distributed_table --------------------------------------------------------------------- (1 row) --- Shard with id '1' of table table_to_split is undergoing a split into two new shards --- with id '2' and '3' respectively. table_to_split_1 is placed on worker1(NodeId 16) and --- new child shards, table_to_split_2 and table_to_split_3 are placed on worker2(NodeId 18). --- TODO(saawasek): make it parameterized -CREATE OR REPLACE FUNCTION SplitShardReplicationSetup() RETURNS text AS $$ +-- targetNode1, targetNode2 are the locations where childShard1 and childShard2 are placed respectively +CREATE OR REPLACE FUNCTION SplitShardReplicationSetup(targetNode1 integer, targetNode2 integer) RETURNS text AS $$ DECLARE memoryId bigint := 0; memoryIdText text; begin - SELECT * into memoryId from split_shard_replication_setup(ARRAY[ARRAY[1,2,-2147483648,-1,18], ARRAY[1,3,0,2147483647,18]]); + SELECT * into memoryId from split_shard_replication_setup(ARRAY[ARRAY[1,2,-2147483648,-1, targetNode1], ARRAY[1,3,0,2147483647,targetNode2]]); SELECT FORMAT('%s', memoryId) into memoryIdText; return memoryIdText; end $$ LANGUAGE plpgsql; --- Sets up split shard information and returns Slot Name in format : DestinationNodeId_SlotType_SharedMemoryId --- TODO(saawasek): make it parameterized -CREATE OR REPLACE FUNCTION CreateReplicationSlot() RETURNS text AS $$ +-- Create replication slots for targetNode1 and targetNode2 +CREATE OR REPLACE FUNCTION CreateReplicationSlot(targetNode1 integer, targetNode2 integer) RETURNS text AS $$ DECLARE - replicationSlotName text; - createdSlotName text; + targetOneSlotName text; + targetTwoSlotName text; sharedMemoryId text; derivedSlotName text; begin - SELECT * into sharedMemoryId from SplitShardReplicationSetup(); - -- '18' is nodeId of worker2 - SELECT FORMAT('18_0_%s', sharedMemoryId) into derivedSlotName; - SELECT slot_name into replicationSlotName from pg_create_logical_replication_slot(derivedSlotName, 'logical_decoding_plugin'); - INSERT INTO slotName_table values(replicationSlotName, 1); - return replicationSlotName; + + SELECT * into sharedMemoryId from SplitShardReplicationSetup(targetNode1, targetNode2); + SELECT FORMAT('%s_%s', targetNode1, sharedMemoryId) into derivedSlotName; + SELECT slot_name into targetOneSlotName from pg_create_logical_replication_slot(derivedSlotName, 'logical_decoding_plugin'); + + -- if new child shards are placed on different nodes, create one more replication slot + if (targetNode1 != targetNode2) then + SELECT FORMAT('%s_%s', targetNode2, sharedMemoryId) into derivedSlotName; + SELECT slot_name into targetTwoSlotName from pg_create_logical_replication_slot(derivedSlotName, 'logical_decoding_plugin'); + INSERT INTO slotName_table values(targetTwoSlotName, targetNode2, 1); + end if; + + INSERT INTO slotName_table values(targetOneSlotName, targetNode1, 2); + return targetOneSlotName; end $$ LANGUAGE plpgsql; -CREATE OR REPLACE FUNCTION CreateSubscription() RETURNS text AS $$ +CREATE OR REPLACE FUNCTION CreateSubscription(targetNodeId integer, subscriptionName text) RETURNS text AS $$ DECLARE replicationSlotName text; nodeportLocal int; subname text; begin - SELECT name into replicationSlotName from slotName_table; - EXECUTE FORMAT($sub$create subscription subforID1 connection 'host=localhost port=xxxxx user=postgres dbname=regression' publication PUB1 with(create_slot=false, enabled=true, slot_name='%s', copy_data=false)$sub$, replicationSlotName); + SELECT name into replicationSlotName from slotName_table where nodeId = targetNodeId; + EXECUTE FORMAT($sub$create subscription %s connection 'host=localhost port=xxxxx user=postgres dbname=regression' publication PUB1 with(create_slot=false, enabled=true, slot_name='%s', copy_data=false)$sub$, subscriptionName, replicationSlotName); return 'a'; end $$ LANGUAGE plpgsql; --- Test scenario starts from here +CREATE OR REPLACE FUNCTION DropSubscription(subscriptionName text) RETURNS text AS $$ +DECLARE +begin + EXECUTE FORMAT('DROP SUBSCRIPTION %s', subscriptionName); + return subscriptionName; +end +$$ LANGUAGE plpgsql; +-- Test scenario one starts from here -- 1. table_to_split is a citus distributed table -- 2. Shard table_to_split_1 is located on worker1. -- 3. table_to_split_1 is split into table_to_split_2 and table_to_split_3. @@ -112,7 +125,7 @@ BEGIN; COMMIT; -- Create replication slot and setup shard split information at worker1 BEGIN; -select 1 from CreateReplicationSlot(); +select 1 from CreateReplicationSlot(:worker_2_node, :worker_2_node); ?column? --------------------------------------------------------------------- 1 @@ -123,7 +136,7 @@ COMMIT; SET search_path TO citus_split_shard_by_split_points; -- Create subscription at worker2 with copy_data to 'false' and derived replication slot name BEGIN; -SELECT 1 from CreateSubscription(); +SELECT 1 from CreateSubscription(:worker_2_node, 'SUB1'); ?column? --------------------------------------------------------------------- 1 @@ -166,6 +179,16 @@ SELECT * from table_to_split_1; 500 | a (3 rows) +SELECT * from table_to_split_2; + id | value +--------------------------------------------------------------------- +(0 rows) + +SELECT * from table_to_split_3; + id | value +--------------------------------------------------------------------- +(0 rows) + select pg_sleep(10); pg_sleep --------------------------------------------------------------------- @@ -227,3 +250,169 @@ SELECT * FROM table_to_split_3; --------------------------------------------------------------------- (0 rows) + -- drop publication from worker1 +\c - - - :worker_1_port +SET search_path TO citus_split_shard_by_split_points; +drop PUBLICATION PUB1; +DELETE FROM slotName_table; +\c - - - :worker_2_port +SET search_path TO citus_split_shard_by_split_points; +SET client_min_messages TO WARNING; +DROP SUBSCRIPTION SUB1; +DELETE FROM slotName_table; +-- Test scenario two starts from here +-- 1. table_to_split_1 is split into table_to_split_2 and table_to_split_3. table_to_split_1 is +-- located on worker1. +-- table_to_split_2 is located on worker1 and table_to_split_3 is located on worker2 +\c - - - :worker_1_port +SET search_path TO citus_split_shard_by_split_points; +-- Create publication at worker1 +BEGIN; + CREATE PUBLICATION PUB1 for table table_to_split_1, table_to_split_2, table_to_split_3; +COMMIT; +-- Create replication slot and setup shard split information at worker1 +-- table_to_split2 is located on Worker1 and table_to_split_3 is located on worker2 +BEGIN; +select 1 from CreateReplicationSlot(:worker_1_node, :worker_2_node); + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + +COMMIT; +SELECT pg_sleep(10); + pg_sleep +--------------------------------------------------------------------- + +(1 row) + +-- Create subscription at worker1 with copy_data to 'false' and derived replication slot name +BEGIN; +SELECT 1 from CreateSubscription(:worker_1_node, 'SUB1'); + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + +COMMIT; +\c - - - :worker_2_port +SET search_path TO citus_split_shard_by_split_points; +-- Create subscription at worker2 with copy_data to 'false' and derived replication slot name +BEGIN; +SELECT 1 from CreateSubscription(:worker_2_node, 'SUB2'); + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + +COMMIT; +-- No data is present at this moment in all the below tables at worker2 +SELECT * from table_to_split_1; + id | value +--------------------------------------------------------------------- +(0 rows) + +SELECT * from table_to_split_2; + id | value +--------------------------------------------------------------------- +(0 rows) + +SELECT * from table_to_split_3; + id | value +--------------------------------------------------------------------- +(0 rows) + +select pg_sleep(10); + pg_sleep +--------------------------------------------------------------------- + +(1 row) + +-- Insert data in table_to_split_1 at worker1 +\c - - - :worker_1_port +SET search_path TO citus_split_shard_by_split_points; +INSERT into table_to_split_1 values(100, 'a'); +INSERT into table_to_split_1 values(400, 'a'); +INSERT into table_to_split_1 values(500, 'a'); +select pg_sleep(10); + pg_sleep +--------------------------------------------------------------------- + +(1 row) + +-- expect data to present in table_to_split_2 on worker1 +SELECT * from table_to_split_1; + id | value +--------------------------------------------------------------------- + 100 | a + 400 | a + 500 | a +(3 rows) + +SELECT * from table_to_split_2; + id | value +--------------------------------------------------------------------- + 400 | a +(1 row) + +SELECT * from table_to_split_3; + id | value +--------------------------------------------------------------------- +(0 rows) + +select pg_sleep(10); + pg_sleep +--------------------------------------------------------------------- + +(1 row) + +-- Expect data to be present in table_to_split3 on worker2 +\c - - - :worker_2_port +select pg_sleep(10); + pg_sleep +--------------------------------------------------------------------- + +(1 row) + +SET search_path TO citus_split_shard_by_split_points; +SELECT * from table_to_split_1; + id | value +--------------------------------------------------------------------- +(0 rows) + +SELECT * from table_to_split_2; + id | value +--------------------------------------------------------------------- +(0 rows) + +SELECT * from table_to_split_3; + id | value +--------------------------------------------------------------------- + 100 | a + 500 | a +(2 rows) + +-- delete all from table_to_split_1 +\c - - - :worker_1_port +SET search_path TO citus_split_shard_by_split_points; +DELETE FROM table_to_split_1; +SELECT pg_sleep(5); + pg_sleep +--------------------------------------------------------------------- + +(1 row) + +-- rows from table_to_split_2 should be deleted +SELECT * from table_to_split_2; + id | value +--------------------------------------------------------------------- +(0 rows) + +-- rows from table_to_split_3 should be deleted +\c - - - :worker_2_port +SET search_path TO citus_split_shard_by_split_points; +SELECT * from table_to_split_3; + id | value +--------------------------------------------------------------------- +(0 rows) + diff --git a/src/test/regress/sql/split_shard_replication_setup.sql b/src/test/regress/sql/split_shard_replication_setup.sql index df104fc11..ab3c1b494 100644 --- a/src/test/regress/sql/split_shard_replication_setup.sql +++ b/src/test/regress/sql/split_shard_replication_setup.sql @@ -8,61 +8,76 @@ SET citus.next_shard_id TO 1; SELECT 1 FROM citus_add_node('localhost', :worker_1_port); SELECT 1 FROM citus_add_node('localhost', :worker_2_port); +SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset +SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \gset + -- Create distributed table (non co-located) CREATE TABLE table_to_split (id bigserial PRIMARY KEY, value char); SELECT create_distributed_table('table_to_split','id'); -- slotName_table is used to persist replication slot name. -- It is only used for testing as the worker2 needs to create subscription over the same replication slot. -CREATE TABLE slotName_table (name text, id int primary key); +CREATE TABLE slotName_table (name text, nodeId int, id int primary key); SELECT create_distributed_table('slotName_table','id'); --- Shard with id '1' of table table_to_split is undergoing a split into two new shards --- with id '2' and '3' respectively. table_to_split_1 is placed on worker1(NodeId 16) and --- new child shards, table_to_split_2 and table_to_split_3 are placed on worker2(NodeId 18). --- TODO(saawasek): make it parameterized -CREATE OR REPLACE FUNCTION SplitShardReplicationSetup() RETURNS text AS $$ +-- targetNode1, targetNode2 are the locations where childShard1 and childShard2 are placed respectively +CREATE OR REPLACE FUNCTION SplitShardReplicationSetup(targetNode1 integer, targetNode2 integer) RETURNS text AS $$ DECLARE memoryId bigint := 0; memoryIdText text; begin - SELECT * into memoryId from split_shard_replication_setup(ARRAY[ARRAY[1,2,-2147483648,-1,18], ARRAY[1,3,0,2147483647,18]]); + SELECT * into memoryId from split_shard_replication_setup(ARRAY[ARRAY[1,2,-2147483648,-1, targetNode1], ARRAY[1,3,0,2147483647,targetNode2]]); SELECT FORMAT('%s', memoryId) into memoryIdText; return memoryIdText; end $$ LANGUAGE plpgsql; --- Sets up split shard information and returns Slot Name in format : DestinationNodeId_SlotType_SharedMemoryId --- TODO(saawasek): make it parameterized -CREATE OR REPLACE FUNCTION CreateReplicationSlot() RETURNS text AS $$ +-- Create replication slots for targetNode1 and targetNode2 +CREATE OR REPLACE FUNCTION CreateReplicationSlot(targetNode1 integer, targetNode2 integer) RETURNS text AS $$ DECLARE - replicationSlotName text; - createdSlotName text; + targetOneSlotName text; + targetTwoSlotName text; sharedMemoryId text; derivedSlotName text; begin - SELECT * into sharedMemoryId from SplitShardReplicationSetup(); - -- '18' is nodeId of worker2 - SELECT FORMAT('18_0_%s', sharedMemoryId) into derivedSlotName; - SELECT slot_name into replicationSlotName from pg_create_logical_replication_slot(derivedSlotName, 'logical_decoding_plugin'); - INSERT INTO slotName_table values(replicationSlotName, 1); - return replicationSlotName; + + SELECT * into sharedMemoryId from SplitShardReplicationSetup(targetNode1, targetNode2); + SELECT FORMAT('%s_%s', targetNode1, sharedMemoryId) into derivedSlotName; + SELECT slot_name into targetOneSlotName from pg_create_logical_replication_slot(derivedSlotName, 'logical_decoding_plugin'); + + -- if new child shards are placed on different nodes, create one more replication slot + if (targetNode1 != targetNode2) then + SELECT FORMAT('%s_%s', targetNode2, sharedMemoryId) into derivedSlotName; + SELECT slot_name into targetTwoSlotName from pg_create_logical_replication_slot(derivedSlotName, 'logical_decoding_plugin'); + INSERT INTO slotName_table values(targetTwoSlotName, targetNode2, 1); + end if; + + INSERT INTO slotName_table values(targetOneSlotName, targetNode1, 2); + return targetOneSlotName; end $$ LANGUAGE plpgsql; -CREATE OR REPLACE FUNCTION CreateSubscription() RETURNS text AS $$ +CREATE OR REPLACE FUNCTION CreateSubscription(targetNodeId integer, subscriptionName text) RETURNS text AS $$ DECLARE replicationSlotName text; nodeportLocal int; subname text; begin - SELECT name into replicationSlotName from slotName_table; - EXECUTE FORMAT($sub$create subscription subforID1 connection 'host=localhost port=57637 user=postgres dbname=regression' publication PUB1 with(create_slot=false, enabled=true, slot_name='%s', copy_data=false)$sub$, replicationSlotName); + SELECT name into replicationSlotName from slotName_table where nodeId = targetNodeId; + EXECUTE FORMAT($sub$create subscription %s connection 'host=localhost port=57637 user=postgres dbname=regression' publication PUB1 with(create_slot=false, enabled=true, slot_name='%s', copy_data=false)$sub$, subscriptionName, replicationSlotName); return 'a'; end $$ LANGUAGE plpgsql; --- Test scenario starts from here +CREATE OR REPLACE FUNCTION DropSubscription(subscriptionName text) RETURNS text AS $$ +DECLARE +begin + EXECUTE FORMAT('DROP SUBSCRIPTION %s', subscriptionName); + return subscriptionName; +end +$$ LANGUAGE plpgsql; + +-- Test scenario one starts from here -- 1. table_to_split is a citus distributed table -- 2. Shard table_to_split_1 is located on worker1. -- 3. table_to_split_1 is split into table_to_split_2 and table_to_split_3. @@ -103,7 +118,7 @@ COMMIT; -- Create replication slot and setup shard split information at worker1 BEGIN; -select 1 from CreateReplicationSlot(); +select 1 from CreateReplicationSlot(:worker_2_node, :worker_2_node); COMMIT; \c - - - :worker_2_port @@ -111,7 +126,7 @@ SET search_path TO citus_split_shard_by_split_points; -- Create subscription at worker2 with copy_data to 'false' and derived replication slot name BEGIN; -SELECT 1 from CreateSubscription(); +SELECT 1 from CreateSubscription(:worker_2_node, 'SUB1'); COMMIT; -- No data is present at this moment in all the below tables at worker2 @@ -127,6 +142,8 @@ INSERT into table_to_split_1 values(100, 'a'); INSERT into table_to_split_1 values(400, 'a'); INSERT into table_to_split_1 values(500, 'a'); SELECT * from table_to_split_1; +SELECT * from table_to_split_2; +SELECT * from table_to_split_3; select pg_sleep(10); -- Expect data to be present in shard 2 and shard 3 based on the hash value. @@ -149,3 +166,90 @@ SET search_path TO citus_split_shard_by_split_points; SELECT * FROM table_to_split_1; SELECT * FROM table_to_split_2; SELECT * FROM table_to_split_3; + + -- drop publication from worker1 +\c - - - :worker_1_port +SET search_path TO citus_split_shard_by_split_points; +drop PUBLICATION PUB1; +DELETE FROM slotName_table; + +\c - - - :worker_2_port +SET search_path TO citus_split_shard_by_split_points; +SET client_min_messages TO WARNING; +DROP SUBSCRIPTION SUB1; +DELETE FROM slotName_table; + +-- Test scenario two starts from here +-- 1. table_to_split_1 is split into table_to_split_2 and table_to_split_3. table_to_split_1 is +-- located on worker1. +-- table_to_split_2 is located on worker1 and table_to_split_3 is located on worker2 + +\c - - - :worker_1_port +SET search_path TO citus_split_shard_by_split_points; + +-- Create publication at worker1 +BEGIN; + CREATE PUBLICATION PUB1 for table table_to_split_1, table_to_split_2, table_to_split_3; +COMMIT; + +-- Create replication slot and setup shard split information at worker1 +-- table_to_split2 is located on Worker1 and table_to_split_3 is located on worker2 +BEGIN; +select 1 from CreateReplicationSlot(:worker_1_node, :worker_2_node); +COMMIT; +SELECT pg_sleep(10); + +-- Create subscription at worker1 with copy_data to 'false' and derived replication slot name +BEGIN; +SELECT 1 from CreateSubscription(:worker_1_node, 'SUB1'); +COMMIT; + +\c - - - :worker_2_port +SET search_path TO citus_split_shard_by_split_points; + +-- Create subscription at worker2 with copy_data to 'false' and derived replication slot name +BEGIN; +SELECT 1 from CreateSubscription(:worker_2_node, 'SUB2'); +COMMIT; + +-- No data is present at this moment in all the below tables at worker2 +SELECT * from table_to_split_1; +SELECT * from table_to_split_2; +SELECT * from table_to_split_3; +select pg_sleep(10); + +-- Insert data in table_to_split_1 at worker1 +\c - - - :worker_1_port +SET search_path TO citus_split_shard_by_split_points; +INSERT into table_to_split_1 values(100, 'a'); +INSERT into table_to_split_1 values(400, 'a'); +INSERT into table_to_split_1 values(500, 'a'); +select pg_sleep(10); + +-- expect data to present in table_to_split_2 on worker1 +SELECT * from table_to_split_1; +SELECT * from table_to_split_2; +SELECT * from table_to_split_3; +select pg_sleep(10); + +-- Expect data to be present in table_to_split3 on worker2 +\c - - - :worker_2_port +select pg_sleep(10); +SET search_path TO citus_split_shard_by_split_points; +SELECT * from table_to_split_1; +SELECT * from table_to_split_2; +SELECT * from table_to_split_3; + +-- delete all from table_to_split_1 +\c - - - :worker_1_port +SET search_path TO citus_split_shard_by_split_points; +DELETE FROM table_to_split_1; +SELECT pg_sleep(5); + +-- rows from table_to_split_2 should be deleted +SELECT * from table_to_split_2; + +-- rows from table_to_split_3 should be deleted +\c - - - :worker_2_port +SET search_path TO citus_split_shard_by_split_points; +SELECT * from table_to_split_3; \ No newline at end of file