CREATE SCHEMA split_shard_replication_setup_schema; SET search_path TO split_shard_replication_setup_schema; SET citus.shard_replication_factor TO 1; SET citus.shard_count TO 1; SET citus.next_shard_id TO 1; SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \gset CREATE OR REPLACE FUNCTION wait_for_expected_rowcount_at_table(tableName text, expectedCount integer) RETURNS void AS $$ DECLARE actualCount integer; BEGIN EXECUTE FORMAT('SELECT COUNT(*) FROM %s', tableName) INTO actualCount; WHILE expectedCount != actualCount LOOP EXECUTE FORMAT('SELECT COUNT(*) FROM %s', tableName) INTO actualCount; END LOOP; END$$ LANGUAGE plpgsql; CREATE OR REPLACE FUNCTION wait_for_updated_rowcount_at_table(tableName text, expectedCount integer) RETURNS void AS $$ DECLARE actualCount integer; BEGIN EXECUTE FORMAT($query$SELECT COUNT(*) FROM %s WHERE value='b'$query$, tableName) INTO actualCount; WHILE expectedCount != actualCount LOOP EXECUTE FORMAT($query$SELECT COUNT(*) FROM %s WHERE value='b'$query$, tableName) INTO actualCount; END LOOP; END$$ LANGUAGE plpgsql; -- Create distributed table (non co-located) CREATE TABLE table_to_split (id bigserial PRIMARY KEY, value char); SELECT create_distributed_table('table_to_split','id'); -- Test scenario one starts from here -- 1. table_to_split is a citus distributed table -- 2. Shard table_to_split_1 is located on worker1. -- 3. table_to_split_1 is split into table_to_split_2 and table_to_split_3. -- table_to_split_2/3 are located on worker2 -- 4. execute UDF split_shard_replication_setup on worker1 with below -- params: -- worker_split_shard_replication_setup -- ( -- ARRAY[ -- ROW(1 /*source shardId */, 2 /* new shardId */,-2147483648 /* minHashValue */, -1 /* maxHasValue */ , 18 /* nodeId where new shard is placed */ ), -- ROW(1, 3 , 0 , 2147483647, 18 ) -- ] -- ); -- 5. Create Replication slot with 'citus' -- 6. Setup Pub/Sub -- 7. Insert into table_to_split_1 at source worker1 -- 8. Expect the results in either table_to_split_2 or table_to_split_3 at worker2 \c - - - :worker_2_port SET search_path TO split_shard_replication_setup_schema; CREATE TABLE table_to_split_1(id bigserial PRIMARY KEY, value char); CREATE TABLE table_to_split_2(id bigserial PRIMARY KEY, value char); CREATE TABLE table_to_split_3(id bigserial PRIMARY KEY, value char); -- Create dummy shard tables(table_to_split_2/3b) at worker1 -- This is needed for Pub/Sub framework to work. \c - - - :worker_1_port SET search_path TO split_shard_replication_setup_schema; CREATE TABLE table_to_split_2(id bigserial PRIMARY KEY, value char); CREATE TABLE table_to_split_3(id bigserial PRIMARY KEY, value char); -- Create publication at worker1 SET citus.enable_ddl_propagation TO off; CREATE PUBLICATION pub1 FOR TABLE table_to_split_1, table_to_split_2, table_to_split_3; RESET citus.enable_ddl_propagation; SELECT count(*) FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[ ROW(1, 'id', 2, '-2147483648', '-1', :worker_2_node)::pg_catalog.split_shard_info, ROW(1, 'id', 3, '0', '2147483647', :worker_2_node)::pg_catalog.split_shard_info ], 0); -- we create replication slots with a name including the next_operation_id as a suffix -- if this test file fails, make sure you compare the next_operation_id output to the object name in the next command SHOW citus.next_operation_id; SELECT slot_name FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_slot_%s_10_0', :worker_2_node), 'citus') \gset -- Create subscription at worker2 with copy_data to 'false' and derived replication slot name \c - - - :worker_2_port SET search_path TO split_shard_replication_setup_schema; CREATE SUBSCRIPTION sub1 CONNECTION 'host=localhost port=57637 user=postgres dbname=regression' PUBLICATION pub1 WITH ( create_slot=false, enabled=true, slot_name=:slot_name, copy_data=false); -- No data is present at this moment in all the below tables at worker2 SELECT * FROM table_to_split_1; SELECT * FROM table_to_split_2; SELECT * FROM table_to_split_3; -- Insert data in table_to_split_1 at worker1 \c - - - :worker_1_port SET search_path TO split_shard_replication_setup_schema; INSERT INTO table_to_split_1 values(100, 'a'); INSERT INTO table_to_split_1 values(400, 'a'); INSERT INTO table_to_split_1 values(500, 'a'); SELECT * FROM table_to_split_1; SELECT * FROM table_to_split_2; SELECT * FROM table_to_split_3; -- Expect data to be present in shard 2 and shard 3 based on the hash value. \c - - - :worker_2_port SET search_path TO split_shard_replication_setup_schema; SELECT * FROM table_to_split_1; -- should alwasy have zero rows SELECT wait_for_expected_rowcount_at_table('table_to_split_2', 1); SELECT * FROM table_to_split_2; SELECT wait_for_expected_rowcount_at_table('table_to_split_3', 2); SELECT * FROM table_to_split_3; -- UPDATE data of table_to_split_1 from worker1 \c - - - :worker_1_port SET search_path TO split_shard_replication_setup_schema; UPDATE table_to_split_1 SET value='b' WHERE id = 100; UPDATE table_to_split_1 SET value='b' WHERE id = 400; UPDATE table_to_split_1 SET value='b' WHERE id = 500; \c - - - :worker_2_port SET search_path TO split_shard_replication_setup_schema; SELECT * FROM table_to_split_1; -- Value should be updated in table_to_split_2; SELECT wait_for_updated_rowcount_at_table('table_to_split_2', 1); SELECT * FROM table_to_split_2; -- Value should be updated in table_to_split_3; SELECT wait_for_updated_rowcount_at_table('table_to_split_3', 2); SELECT * FROM table_to_split_3; \c - - - :worker_1_port SET search_path TO split_shard_replication_setup_schema; DELETE FROM table_to_split_1; -- Child shard rows should be deleted \c - - - :worker_2_port SET search_path TO split_shard_replication_setup_schema; SELECT wait_for_expected_rowcount_at_table('table_to_split_1', 0); SELECT * FROM table_to_split_1; SELECT wait_for_expected_rowcount_at_table('table_to_split_2', 0); SELECT * FROM table_to_split_2; SELECT wait_for_expected_rowcount_at_table('table_to_split_3', 0); SELECT * FROM table_to_split_3; -- drop publication from worker1 \c - - - :worker_1_port SET search_path TO split_shard_replication_setup_schema; DROP PUBLICATION pub1; \c - - - :worker_2_port SET search_path TO split_shard_replication_setup_schema; SET client_min_messages TO ERROR; DROP SUBSCRIPTION sub1; \c - - - :master_port -- Test scenario (Parent and one child on same node. Other child on different node) -- 1. table_to_split_1 is split into table_to_split_2 and table_to_split_3. -- 2. table_to_split_1 is located on worker1. -- 3. table_to_split_2 is located on worker1 and table_to_split_3 is located on worker2 SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \gset \c - - - :worker_1_port SET search_path TO split_shard_replication_setup_schema; -- Create publication at worker1 SET citus.enable_ddl_propagation TO off; CREATE PUBLICATION pub1 FOR TABLE table_to_split_1, table_to_split_2, table_to_split_3; RESET citus.enable_ddl_propagation; SELECT count(*) FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[ ROW(1, 'id', 2, '-2147483648', '-1', :worker_1_node)::pg_catalog.split_shard_info, ROW(1, 'id', 3, '0', '2147483647', :worker_2_node)::pg_catalog.split_shard_info ], 0); -- we create replication slots with a name including the next_operation_id as a suffix -- if this test file fails, make sure you compare the next_operation_id output to the object name in the next command SHOW citus.next_operation_id; SELECT slot_name AS slot_for_worker1 FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_slot_%s_10_0', :worker_1_node), 'citus') \gset SELECT slot_name AS slot_for_worker2 FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_slot_%s_10_0', :worker_2_node), 'citus') \gset -- Create subscription at worker1 with copy_data to 'false' and 'slot_for_worker1' CREATE SUBSCRIPTION sub_worker1 CONNECTION 'host=localhost port=57637 user=postgres dbname=regression' PUBLICATION pub1 WITH ( create_slot=false, enabled=true, slot_name=:slot_for_worker1, copy_data=false); \c - - - :worker_2_port SET search_path TO split_shard_replication_setup_schema; -- Create subscription at worker2 with copy_data to 'false' and 'slot_for_worker2' CREATE SUBSCRIPTION sub_worker2 CONNECTION 'host=localhost port=57637 user=postgres dbname=regression' PUBLICATION pub1 WITH ( create_slot=false, enabled=true, slot_name=:slot_for_worker2, copy_data=false); -- No data is present at this moment in all the below tables at worker2 SELECT * FROM table_to_split_1; SELECT * FROM table_to_split_2; SELECT * FROM table_to_split_3; -- Insert data in table_to_split_1 at worker1 \c - - - :worker_1_port SET search_path TO split_shard_replication_setup_schema; INSERT INTO table_to_split_1 VALUES(100, 'a'); INSERT INTO table_to_split_1 VALUES(400, 'a'); INSERT INTO table_to_split_1 VALUES(500, 'a'); UPDATE table_to_split_1 SET value='b' WHERE id = 400; SELECT * FROM table_to_split_1; -- expect data to present in table_to_split_2 on worker1 as its destination for value '400' SELECT wait_for_expected_rowcount_at_table('table_to_split_2', 1); SELECT id FROM table_to_split_2; SELECT * FROM table_to_split_3; -- Expect data to be present only in table_to_split3 on worker2 \c - - - :worker_2_port SET search_path TO split_shard_replication_setup_schema; SELECT * FROM table_to_split_1; SELECT * FROM table_to_split_2; SELECT wait_for_expected_rowcount_at_table('table_to_split_3', 2); SELECT * FROM table_to_split_3; -- delete all from table_to_split_1 \c - - - :worker_1_port SET search_path TO split_shard_replication_setup_schema; DELETE FROM table_to_split_1; -- rows from table_to_split_2 should be deleted SELECT wait_for_expected_rowcount_at_table('table_to_split_2', 0); SELECT * FROM table_to_split_2; -- rows from table_to_split_3 should be deleted \c - - - :worker_2_port SET search_path TO split_shard_replication_setup_schema; SELECT wait_for_expected_rowcount_at_table('table_to_split_3', 0); SELECT * FROM table_to_split_3; \c - - - :worker_2_port SET search_path TO split_shard_replication_setup_schema; SET client_min_messages TO ERROR; DROP SUBSCRIPTION sub_worker2; -- drop publication from worker1 \c - - - :worker_1_port SET search_path TO split_shard_replication_setup_schema; SET client_min_messages TO ERROR; DROP SUBSCRIPTION sub_worker1; DROP PUBLICATION pub1; \c - - - :master_port -- Test scenario (parent shard and child shards are located on same machine) -- 1. table_to_split_1 is split into table_to_split_2 and table_to_split_3. -- 2. table_to_split_1 is located on worker1. -- 3. table_to_split_2 and table_to_split_3 are located on worker1 SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \gset \c - - - :worker_1_port SET search_path TO split_shard_replication_setup_schema; SET client_min_messages TO ERROR; -- Create publication at worker1 SET citus.enable_ddl_propagation TO off; CREATE PUBLICATION pub1 for table table_to_split_1, table_to_split_2, table_to_split_3; RESET citus.enable_ddl_propagation; -- Worker1 is target for table_to_split_2 and table_to_split_3 SELECT count(*) FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[ ROW(1, 'id', 2, '-2147483648', '-1', :worker_1_node)::pg_catalog.split_shard_info, ROW(1, 'id', 3, '0', '2147483647', :worker_1_node)::pg_catalog.split_shard_info ], 0); -- we create replication slots with a name including the next_operation_id as a suffix -- if this test file fails, make sure you compare the next_operation_id output to the object name in the next command SHOW citus.next_operation_id; SELECT slot_name AS local_slot FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_slot_%s_10_0', :worker_1_node), 'citus') \gset -- Create subscription at worker1 with copy_data to 'false' a BEGIN; CREATE SUBSCRIPTION local_subscription CONNECTION 'host=localhost port=57637 user=postgres dbname=regression' PUBLICATION pub1 WITH ( create_slot=false, enabled=true, slot_name=:local_slot, copy_data=false); COMMIT; INSERT INTO table_to_split_1 VALUES(100, 'a'); INSERT INTO table_to_split_1 VALUES(400, 'a'); INSERT INTO table_to_split_1 VALUES(500, 'a'); -- expect data to present in table_to_split_2/3 on worker1 SELECT * FROM table_to_split_1; SELECT wait_for_expected_rowcount_at_table('table_to_split_2', 1); SELECT * FROM table_to_split_2; SELECT wait_for_expected_rowcount_at_table('table_to_split_3', 2); SELECT * FROM table_to_split_3; DELETE FROM table_to_split_1; SELECT wait_for_expected_rowcount_at_table('table_to_split_1', 0); SELECT * FROM table_to_split_1; SELECT wait_for_expected_rowcount_at_table('table_to_split_2', 0); SELECT * FROM table_to_split_2; SELECT wait_for_expected_rowcount_at_table('table_to_split_3', 0); SELECT * FROM table_to_split_3; -- clean up DROP SUBSCRIPTION local_subscription; DROP PUBLICATION pub1; \c - - - :master_port CREATE USER myuser; CREATE USER admin_user; GRANT USAGE, CREATE ON SCHEMA split_shard_replication_setup_schema, public to myuser; GRANT USAGE, CREATE ON SCHEMA split_shard_replication_setup_schema, public to admin_user; SET search_path TO split_shard_replication_setup_schema; SET citus.shard_replication_factor TO 1; SET citus.shard_count TO 1; SET citus.next_shard_id TO 4; SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \gset \c - myuser - - SET search_path TO split_shard_replication_setup_schema; SET citus.shard_replication_factor TO 1; SET citus.shard_count TO 1; SET citus.next_shard_id TO 4; CREATE TABLE table_first (id bigserial PRIMARY KEY, value char); SELECT create_distributed_table('table_first','id'); \c - admin_user - - SET search_path TO split_shard_replication_setup_schema; SET citus.next_shard_id TO 7; SET citus.shard_count TO 1; CREATE TABLE table_second (id bigserial PRIMARY KEY, value char); SELECT create_distributed_table('table_second', 'id', colocate_with => 'table_first'); \c - myuser - :worker_1_port SET search_path TO split_shard_replication_setup_schema; CREATE TABLE table_first_5(id bigserial PRIMARY KEY, value char); CREATE TABLE table_first_6(id bigserial PRIMARY KEY, value char); \c - myuser - :worker_2_port SET search_path TO split_shard_replication_setup_schema; CREATE TABLE table_first_4(id bigserial PRIMARY KEY, value char); CREATE TABLE table_first_5(id bigserial PRIMARY KEY, value char); CREATE TABLE table_first_6(id bigserial PRIMARY KEY, value char); \c - admin_user - :worker_1_port SET search_path TO split_shard_replication_setup_schema; CREATE TABLE table_second_8(id bigserial PRIMARY KEY, value char); CREATE TABLE table_second_9(id bigserial PRIMARY KEY, value char); \c - admin_user - :worker_2_port SET search_path TO split_shard_replication_setup_schema; CREATE TABLE table_second_7(id bigserial PRIMARY KEY, value char); CREATE TABLE table_second_8(id bigserial PRIMARY KEY, value char); CREATE TABLE table_second_9(id bigserial PRIMARY KEY, value char); --- Test scenario one starts from here --- 1. table_first and table_second are colocated tables. --- 2. myuser is the owner table_first and admin_user is the owner of table_second. --- 3. Shard table_first_4 and table_second_7 are colocated on worker1 --- 4. table_first_4 is split into table_first_5 and table_first_6 with target as worker2 --- 5. table_second_7 is split into table_second_8 and table_second_9 with target as worker2 --- 6. Create two publishers and two subscribers for respective table owners. --- 7. Insert into table_first_4 and table_second_7 at source worker1 --- 8. Expect the results in child shards on worker2 -- Create publication at worker1 \c - postgres - :worker_1_port SET search_path TO split_shard_replication_setup_schema; SET citus.enable_ddl_propagation TO off; CREATE PUBLICATION pub1 FOR TABLE table_first_4, table_first_5, table_first_6; CREATE PUBLICATION pub2 FOR TABLE table_second_7, table_second_8, table_second_9; RESET citus.enable_ddl_propagation; SELECT count(*) FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[ ROW(4, 'id', 5, '-2147483648', '-1', :worker_2_node)::pg_catalog.split_shard_info, ROW(4, 'id', 6, '0', '2147483647', :worker_2_node)::pg_catalog.split_shard_info, ROW(7, 'id', 8, '-2147483648', '-1', :worker_2_node)::pg_catalog.split_shard_info, ROW(7, 'id', 9, '0', '2147483647', :worker_2_node)::pg_catalog.split_shard_info ], 0); SELECT relowner AS table_owner_one FROM pg_class WHERE relname='table_first' \gset SELECT relowner AS table_owner_two FROM pg_class WHERE relname='table_second' \gset -- we create replication slots with a name including the next_operation_id as a suffix -- if this test file fails, make sure you compare the next_operation_id output to the object name in the next command SHOW citus.next_operation_id; SELECT slot_name AS slot_for_first_owner FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_slot_%s_%s_0', :worker_2_node, :table_owner_one), 'citus') \gset SELECT slot_name AS slot_for_second_owner FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_slot_%s_%s_0', :worker_2_node, :table_owner_two), 'citus') \gset -- Create subscription at worker2 with copy_data to 'false' \c - postgres - :worker_2_port SET search_path TO split_shard_replication_setup_schema; SET client_min_messages TO WARNING; CREATE SUBSCRIPTION sub1 CONNECTION 'host=localhost port=57637 user=postgres dbname=regression' PUBLICATION pub1 WITH ( create_slot=false, enabled=true, slot_name=:slot_for_first_owner, copy_data=false); \c - myuser - :worker_1_port SET search_path TO split_shard_replication_setup_schema; INSERT INTO table_first_4 VALUES(100, 'a'); INSERT INTO table_first_4 VALUES(400, 'a'); INSERT INTO table_first_4 VALUES(500, 'a'); SELECT wait_for_expected_rowcount_at_table('table_first_4', 3); SELECT * FROM table_first_4; \c - admin_user - :worker_1_port SET search_path TO split_shard_replication_setup_schema; INSERT INTO table_second_7 VALUES(100, 'a'); INSERT INTO table_second_7 VALUES(400, 'a'); SELECT wait_for_expected_rowcount_at_table('table_second_7', 2); SELECT * FROM table_second_7; -- expect data in table_first_5/6 \c - myuser - :worker_2_port SET search_path TO split_shard_replication_setup_schema; SELECT * FROM table_first_4; SELECT wait_for_expected_rowcount_at_table('table_first_5', 1); SELECT * FROM table_first_5; SELECT wait_for_expected_rowcount_at_table('table_first_6', 2); SELECT * FROM table_first_6; -- should have zero rows in all the below tables as the subscription is not yet created for admin_user \c - admin_user - :worker_2_port SET search_path TO split_shard_replication_setup_schema; SELECT * FROM table_second_7; SELECT * FROM table_second_8; SELECT * FROM table_second_9; \c - postgres - :worker_2_port SET search_path TO split_shard_replication_setup_schema; SET client_min_messages TO WARNING; CREATE SUBSCRIPTION sub2 CONNECTION 'host=localhost port=57637 user=postgres dbname=regression' PUBLICATION pub2 WITH ( create_slot=false, enabled=true, slot_name=:slot_for_second_owner, copy_data=false); -- expect data \c - admin_user - :worker_2_port SET search_path TO split_shard_replication_setup_schema; SELECT * FROM table_second_7; SELECT wait_for_expected_rowcount_at_table('table_second_8', 1); SELECT * FROM table_second_8; SELECT wait_for_expected_rowcount_at_table('table_second_9', 1); SELECT * FROM table_second_9; \c - postgres - :worker_1_port SET search_path TO split_shard_replication_setup_schema; DROP PUBLICATION pub1; DROP PUBLICATION pub2; \c - postgres - :worker_2_port SET search_path TO split_shard_replication_setup_schema; SET client_min_messages TO ERROR; DROP SUBSCRIPTION sub1; DROP SUBSCRIPTION sub2; \c - - - :master_port -- Test Secneario -- 1) Setup shared memory segment by calling worker_split_shard_replication_setup. -- 2) Redo step 1 as the earlier memory. Redoing will trigger warning as earlier memory isn't released. -- 3) Execute worker_split_shard_release_dsm to release the dynamic shared memory segment -- 4) Redo step 1 and expect no warning as the earlier memory is cleanedup. SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \gset \c - - - :worker_1_port SET search_path TO split_shard_replication_setup_schema; SET client_min_messages TO ERROR; SELECT count(*) FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[ ROW(1, 'id', 2, '-2147483648', '-1', :worker_1_node)::pg_catalog.split_shard_info, ROW(1, 'id', 3, '0', '2147483647', :worker_1_node)::pg_catalog.split_shard_info ], 0); SET client_min_messages TO WARNING; SELECT count(*) FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[ ROW(1, 'id', 2, '-2147483648', '-1', :worker_1_node)::pg_catalog.split_shard_info, ROW(1, 'id', 3, '0', '2147483647', :worker_1_node)::pg_catalog.split_shard_info ], 0); SELECT pg_catalog.worker_split_shard_release_dsm(); SELECT count(*) FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[ ROW(1, 'id', 2, '-2147483648', '-1', :worker_1_node)::pg_catalog.split_shard_info, ROW(1, 'id', 3, '0', '2147483647', :worker_1_node)::pg_catalog.split_shard_info ], 0); SELECT pg_catalog.worker_split_shard_release_dsm(); -- cleanup, we are done with these manually created test tables DROP TABLE table_to_split_1, table_to_split_2, table_to_split_3; \c - - - :worker_2_port SET search_path TO split_shard_replication_setup_schema; DROP TABLE table_to_split_1, table_to_split_2, table_to_split_3; \c - - - :master_port CALL pg_catalog.citus_cleanup_orphaned_resources(); SET client_min_messages TO ERROR; DROP SCHEMA split_shard_replication_setup_schema CASCADE; DROP OWNED BY myuser; DROP USER myuser; DROP OWNED BY admin_user; DROP USER admin_user;