diff --git a/src/test/regress/citus_tests/run_test.py b/src/test/regress/citus_tests/run_test.py index 9048f595e..6bdecac41 100755 --- a/src/test/regress/citus_tests/run_test.py +++ b/src/test/regress/citus_tests/run_test.py @@ -73,6 +73,8 @@ elif "isolation" in test_schedule: test_schedule = 'base_isolation_schedule' elif "failure" in test_schedule: test_schedule = 'failure_base_schedule' +elif "split" in test_schedule: + test_schedule = 'minimal_schedule' elif "mx" in test_schedule: if use_base_schedule: test_schedule = 'mx_base_schedule' diff --git a/src/test/regress/enterprise_split_schedule b/src/test/regress/enterprise_split_schedule deleted file mode 100644 index fd9788a42..000000000 --- a/src/test/regress/enterprise_split_schedule +++ /dev/null @@ -1,10 +0,0 @@ -# Split Shard tests. -# Include tests from 'minimal_schedule' for setup. -test: multi_test_helpers multi_test_helpers_superuser columnar_test_helpers -test: multi_cluster_management -test: multi_test_catalog_views -test: tablespace -# Split tests go here. -test: citus_split_shard_by_split_points_negative -test: citus_split_shard_by_split_points -test: citus_split_shard_by_split_points_deferred_drop diff --git a/src/test/regress/expected/citus_non_blocking_shard_split_cleanup.out b/src/test/regress/expected/citus_non_blocking_shard_split_cleanup.out deleted file mode 100644 index 5fe6dc0d0..000000000 --- a/src/test/regress/expected/citus_non_blocking_shard_split_cleanup.out +++ /dev/null @@ -1,123 +0,0 @@ -/* -Citus Shard Split Test.The test is model similar to 'shard_move_constraints'. -Here is a high level overview of test plan: - 1. Create a table 'sensors' (ShardCount = 2) to be split. Add indexes and statistics on this table. - 2. Create two other tables: 'reference_table' and 'colocated_dist_table', co-located with sensors. - 3. Create Foreign key constraints between the two co-located distributed tables. - 4. Load data into the three tables. - 5. Move one of the shards for 'sensors' to test ShardMove -> Split. - 6. Trigger Split on both shards of 'sensors'. This will also split co-located tables. - 7. Move one of the split shard to test Split -> ShardMove. - 8. Split an already split shard second time on a different schema. -*/ -CREATE SCHEMA "citus_split_test_schema"; -CREATE ROLE test_split_role WITH LOGIN; -GRANT USAGE, CREATE ON SCHEMA "citus_split_test_schema" TO test_split_role; -SET ROLE test_split_role; -SET search_path TO "citus_split_test_schema"; -SET citus.next_shard_id TO 8981000; -SET citus.next_placement_id TO 8610000; -SET citus.shard_count TO 2; -SET citus.shard_replication_factor TO 1; --- BEGIN: Create table to split, along with other co-located tables. Add indexes, statistics etc. -CREATE TABLE sensors( - measureid integer, - eventdatetime date, - measure_data jsonb, - meaure_quantity decimal(15, 2), - measure_status char(1), - measure_comment varchar(44), - PRIMARY KEY (measureid, eventdatetime, measure_data)); -SELECT create_distributed_table('sensors', 'measureid', colocate_with:='none'); - create_distributed_table ---------------------------------------------------------------------- - -(1 row) - -INSERT INTO sensors SELECT i, '2020-01-05', '{}', 11011.10, 'A', 'I <3 Citus' FROM generate_series(0,1000)i; --- END: Create table to split, along with other co-located tables. Add indexes, statistics etc. --- BEGIN : Move one shard before we split it. -\c - postgres - :master_port -SET ROLE test_split_role; -SET search_path TO "citus_split_test_schema"; -SET citus.next_shard_id TO 8981007; -SELECT citus_move_shard_placement(8981000, 'localhost', :worker_1_port, 'localhost', :worker_2_port, shard_transfer_mode:='force_logical'); - citus_move_shard_placement ---------------------------------------------------------------------- - -(1 row) - --- END : Move one shard before we split it. --- BEGIN : Set node id variables -SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset -SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \gset --- END : Set node id variables --- BEGIN : Split two shards : One with move and One without move. --- Perform 2 way split -SELECT * FROM citus_shards WHERE nodeport IN (:worker_1_port, :worker_2_port); - table_name | shardid | shard_name | citus_table_type | colocation_id | nodename | nodeport | shard_size ---------------------------------------------------------------------- - sensors | 8981000 | citus_split_test_schema.sensors_8981000 | distributed | 1390009 | localhost | 57638 | 40960 - sensors | 8981001 | citus_split_test_schema.sensors_8981001 | distributed | 1390009 | localhost | 57638 | 40960 -(2 rows) - -SELECT pg_catalog.citus_split_shard_by_split_points( - 8981000, - ARRAY['-1073741824'], - ARRAY[:worker_2_node, :worker_2_node], - 'force_logical'); -WARNING: replication slot "citus_shard_split_template_slot_8981000" does not exist -CONTEXT: while executing command on localhost:xxxxx - citus_split_shard_by_split_points ---------------------------------------------------------------------- - -(1 row) - -SELECT * FROM citus_shards WHERE nodeport IN (:worker_1_port, :worker_2_port); - table_name | shardid | shard_name | citus_table_type | colocation_id | nodename | nodeport | shard_size ---------------------------------------------------------------------- - sensors | 8981001 | citus_split_test_schema.sensors_8981001 | distributed | 1390009 | localhost | 57638 | 40960 - sensors | 8981007 | citus_split_test_schema.sensors_8981007 | distributed | 1390009 | localhost | 57638 | 24576 - sensors | 8981008 | citus_split_test_schema.sensors_8981008 | distributed | 1390009 | localhost | 57638 | 24576 -(3 rows) - -\c - - - :worker_2_port -SELECT slot_name FROM pg_replication_slots; - slot_name ---------------------------------------------------------------------- - citus_shard_split_template_slot_8981000 - citus_shard_split_18_20648 -(2 rows) - -\c - - - :master_port -SELECT pg_catalog.citus_split_shard_by_split_points( - 8981001, - ARRAY['536870911', '1610612735'], - ARRAY[:worker_1_node, :worker_1_node, :worker_2_node], - 'force_logical'); -WARNING: replication slot "citus_shard_split_template_slot_8981001" does not exist -CONTEXT: while executing command on localhost:xxxxx - citus_split_shard_by_split_points ---------------------------------------------------------------------- - -(1 row) - -SELECT * FROM citus_shards WHERE nodeport IN (:worker_1_port, :worker_2_port); - table_name | shardid | shard_name | citus_table_type | colocation_id | nodename | nodeport | shard_size ---------------------------------------------------------------------- - citus_split_test_schema.sensors | 102042 | citus_split_test_schema.sensors_102042 | distributed | 1390009 | localhost | 57637 | 8192 - citus_split_test_schema.sensors | 102043 | citus_split_test_schema.sensors_102043 | distributed | 1390009 | localhost | 57637 | 16384 - citus_split_test_schema.sensors | 102044 | citus_split_test_schema.sensors_102044 | distributed | 1390009 | localhost | 57638 | 16384 - citus_split_test_schema.sensors | 8981007 | citus_split_test_schema.sensors_8981007 | distributed | 1390009 | localhost | 57638 | 24576 - citus_split_test_schema.sensors | 8981008 | citus_split_test_schema.sensors_8981008 | distributed | 1390009 | localhost | 57638 | 24576 -(5 rows) - -\c - - - :worker_2_port -SELECT slot_name FROM pg_replication_slots; - slot_name ---------------------------------------------------------------------- - citus_shard_split_template_slot_8981001 - citus_shard_split_16_20648 - citus_shard_split_18_20648 -(3 rows) - diff --git a/src/test/regress/expected/citus_split_shard_by_split_points.out b/src/test/regress/expected/citus_split_shard_by_split_points.out index 7570267ba..6fb6270c7 100644 --- a/src/test/regress/expected/citus_split_shard_by_split_points.out +++ b/src/test/regress/expected/citus_split_shard_by_split_points.out @@ -498,4 +498,5 @@ DETAIL: drop cascades to table citus_split_test_schema.sensors drop cascades to table citus_split_test_schema.reference_table drop cascades to table citus_split_test_schema.colocated_dist_table drop cascades to table citus_split_test_schema.table_with_index_rep_identity +DROP USER test_split_role; --END : Cleanup diff --git a/src/test/regress/expected/citus_split_shard_by_split_points_deferred_drop.out b/src/test/regress/expected/citus_split_shard_by_split_points_deferred_drop.out index f0cd227b9..fc66118ca 100644 --- a/src/test/regress/expected/citus_split_shard_by_split_points_deferred_drop.out +++ b/src/test/regress/expected/citus_split_shard_by_split_points_deferred_drop.out @@ -13,9 +13,9 @@ SELECT * from pg_dist_cleanup; --------------------------------------------------------------------- (0 rows) --- Set a very long(10mins) time interval to stop auto cleanup in case of deferred drop. +-- Disable Deferred drop auto cleanup to avoid flaky tests. \c - postgres - :master_port -ALTER SYSTEM SET citus.defer_shard_delete_interval TO 600000; +ALTER SYSTEM SET citus.defer_shard_delete_interval TO -1; SELECT pg_reload_conf(); pg_reload_conf --------------------------------------------------------------------- @@ -28,8 +28,9 @@ SET citus.next_placement_id TO 8610000; SET citus.shard_count TO 2; SET citus.shard_replication_factor TO 1; SET citus.next_operation_id TO 777; -SET citus.next_cleanup_record_id TO 11; +SET citus.next_cleanup_record_id TO 511; SET ROLE test_split_role; +SET search_path TO "citus_split_shard_by_split_points_deferred_schema"; CREATE TABLE table_to_split(id int PRIMARY KEY, int_data int, data text); SELECT create_distributed_table('table_to_split', 'id'); create_distributed_table @@ -61,26 +62,25 @@ SELECT pg_catalog.citus_split_shard_by_split_points( (1 row) --- The original shards are marked for deferred drop with policy_type = 2. +-- The original shard is marked for deferred drop with policy_type = 2. +-- The previous shard should be dropped at the beginning of the second split call SELECT * from pg_dist_cleanup; - record_id | operation_id | object_type | object_name | node_group_id | policy_type + record_id | operation_id | object_type | object_name | node_group_id | policy_type --------------------------------------------------------------------- - 11 | 777 | 1 | public.table_to_split_8981000 | 14 | 2 - 12 | 778 | 1 | public.table_to_split_8981001 | 16 | 2 -(2 rows) + 512 | 778 | 1 | citus_split_shard_by_split_points_deferred_schema.table_to_split_8981001 | 16 | 2 +(1 row) --- The physical shards should not be deleted. +-- One of the physical shards should not be deleted, the other one should. \c - - - :worker_1_port -SELECT relname FROM pg_class where relname LIKE '%table_to_split_%' AND relkind = 'r'; +SELECT relname FROM pg_class where relname LIKE '%table_to_split_%' AND relkind = 'r' ORDER BY relname; relname --------------------------------------------------------------------- - table_to_split_8981000 table_to_split_9999000 table_to_split_9999002 -(3 rows) +(2 rows) \c - - - :worker_2_port -SELECT relname FROM pg_class where relname LIKE '%table_to_split_%' AND relkind = 'r'; +SELECT relname FROM pg_class where relname LIKE '%table_to_split_%' AND relkind = 'r' ORDER BY relname; relname --------------------------------------------------------------------- table_to_split_8981001 @@ -88,30 +88,19 @@ SELECT relname FROM pg_class where relname LIKE '%table_to_split_%' AND relkind table_to_split_9999003 (3 rows) --- Set a very short(1ms) time interval to force deferred drop cleanup. +-- Perform deferred drop cleanup. \c - postgres - :master_port -ALTER SYSTEM SET citus.defer_shard_delete_interval TO 1; -SELECT pg_reload_conf(); - pg_reload_conf ---------------------------------------------------------------------- - t -(1 row) - --- Give enough time for the deferred drop cleanup to run. -SELECT pg_sleep(2); - pg_sleep ---------------------------------------------------------------------- - -(1 row) - +CALL citus_cleanup_orphaned_resources(); +NOTICE: cleaned up 1 orphaned resources -- Clenaup has been done. SELECT * from pg_dist_cleanup; record_id | operation_id | object_type | object_name | node_group_id | policy_type --------------------------------------------------------------------- (0 rows) +-- Verify that the shard to be dropped is dropped \c - - - :worker_1_port -SELECT relname FROM pg_class where relname LIKE '%table_to_split_%' AND relkind = 'r'; +SELECT relname FROM pg_class where relname LIKE '%table_to_split_%' AND relkind = 'r' ORDER BY relname; relname --------------------------------------------------------------------- table_to_split_9999000 @@ -119,7 +108,7 @@ SELECT relname FROM pg_class where relname LIKE '%table_to_split_%' AND relkind (2 rows) \c - - - :worker_2_port -SELECT relname FROM pg_class where relname LIKE '%table_to_split_%' AND relkind = 'r'; +SELECT relname FROM pg_class where relname LIKE '%table_to_split_%' AND relkind = 'r' ORDER BY relname; relname --------------------------------------------------------------------- table_to_split_9999001 @@ -128,5 +117,6 @@ SELECT relname FROM pg_class where relname LIKE '%table_to_split_%' AND relkind -- Test Cleanup \c - postgres - :master_port +SET client_min_messages TO WARNING; DROP SCHEMA "citus_split_shard_by_split_points_deferred_schema" CASCADE; -NOTICE: drop cascades to table citus_split_shard_by_split_points_deferred_schema.temp_table +DROP USER test_split_role; diff --git a/src/test/regress/expected/split_shard.out b/src/test/regress/expected/split_shard.out new file mode 100644 index 000000000..069ff306f --- /dev/null +++ b/src/test/regress/expected/split_shard.out @@ -0,0 +1,841 @@ +CREATE SCHEMA split_shard_replication_setup_schema; +SET search_path TO split_shard_replication_setup_schema; +SET citus.shard_replication_factor TO 1; +SET citus.shard_count TO 1; +SET citus.next_shard_id TO 1; +SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset +SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \gset +CREATE OR REPLACE FUNCTION wait_for_expected_rowcount_at_table(tableName text, expectedCount integer) RETURNS void AS $$ +DECLARE +actualCount integer; +BEGIN + EXECUTE FORMAT('SELECT COUNT(*) FROM %s', tableName) INTO actualCount; + WHILE expectedCount != actualCount LOOP + EXECUTE FORMAT('SELECT COUNT(*) FROM %s', tableName) INTO actualCount; + END LOOP; +END$$ LANGUAGE plpgsql; +CREATE OR REPLACE FUNCTION wait_for_updated_rowcount_at_table(tableName text, expectedCount integer) RETURNS void AS $$ +DECLARE +actualCount integer; +BEGIN + EXECUTE FORMAT($query$SELECT COUNT(*) FROM %s WHERE value='b'$query$, tableName) INTO actualCount; + WHILE expectedCount != actualCount LOOP + EXECUTE FORMAT($query$SELECT COUNT(*) FROM %s WHERE value='b'$query$, tableName) INTO actualCount; + END LOOP; +END$$ LANGUAGE plpgsql; +-- Create distributed table (non co-located) +CREATE TABLE table_to_split (id bigserial PRIMARY KEY, value char); +SELECT create_distributed_table('table_to_split','id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +-- Test scenario one starts from here +-- 1. table_to_split is a citus distributed table +-- 2. Shard table_to_split_1 is located on worker1. +-- 3. table_to_split_1 is split into table_to_split_2 and table_to_split_3. +-- table_to_split_2/3 are located on worker2 +-- 4. execute UDF split_shard_replication_setup on worker1 with below +-- params: +-- worker_split_shard_replication_setup +-- ( +-- ARRAY[ +-- ROW(1 /*source shardId */, 2 /* new shardId */,-2147483648 /* minHashValue */, -1 /* maxHasValue */ , 18 /* nodeId where new shard is placed */ ), +-- ROW(1, 3 , 0 , 2147483647, 18 ) +-- ] +-- ); +-- 5. Create Replication slot with 'citus' +-- 6. Setup Pub/Sub +-- 7. Insert into table_to_split_1 at source worker1 +-- 8. Expect the results in either table_to_split_2 or table_to_split_3 at worker2 +\c - - - :worker_2_port +SET search_path TO split_shard_replication_setup_schema; +CREATE TABLE table_to_split_1(id bigserial PRIMARY KEY, value char); +CREATE TABLE table_to_split_2(id bigserial PRIMARY KEY, value char); +CREATE TABLE table_to_split_3(id bigserial PRIMARY KEY, value char); +-- Create dummy shard tables(table_to_split_2/3b) at worker1 +-- This is needed for Pub/Sub framework to work. +\c - - - :worker_1_port +SET search_path TO split_shard_replication_setup_schema; +CREATE TABLE table_to_split_2(id bigserial PRIMARY KEY, value char); +CREATE TABLE table_to_split_3(id bigserial PRIMARY KEY, value char); +-- Create publication at worker1 +CREATE PUBLICATION pub1 FOR TABLE table_to_split_1, table_to_split_2, table_to_split_3; +SELECT count(*) FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[ + ROW(1, 'id', 2, '-2147483648', '-1', :worker_2_node)::pg_catalog.split_shard_info, + ROW(1, 'id', 3, '0', '2147483647', :worker_2_node)::pg_catalog.split_shard_info + ], 0); + count +--------------------------------------------------------------------- + 1 +(1 row) + +-- we create replication slots with a name including the next_operation_id as a suffix +-- if this test file fails, make sure you compare the next_operation_id output to the object name in the next command +SHOW citus.next_operation_id; + citus.next_operation_id +--------------------------------------------------------------------- + 0 +(1 row) + +SELECT slot_name FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_slot_%s_10_0', :worker_2_node), 'citus') \gset +-- Create subscription at worker2 with copy_data to 'false' and derived replication slot name +\c - - - :worker_2_port +SET search_path TO split_shard_replication_setup_schema; +CREATE SUBSCRIPTION sub1 + CONNECTION 'host=localhost port=xxxxx user=postgres dbname=regression' + PUBLICATION pub1 + WITH ( + create_slot=false, + enabled=true, + slot_name=:slot_name, + copy_data=false); +-- No data is present at this moment in all the below tables at worker2 +SELECT * FROM table_to_split_1; + id | value +--------------------------------------------------------------------- +(0 rows) + +SELECT * FROM table_to_split_2; + id | value +--------------------------------------------------------------------- +(0 rows) + +SELECT * FROM table_to_split_3; + id | value +--------------------------------------------------------------------- +(0 rows) + +-- Insert data in table_to_split_1 at worker1 +\c - - - :worker_1_port +SET search_path TO split_shard_replication_setup_schema; +INSERT INTO table_to_split_1 values(100, 'a'); +INSERT INTO table_to_split_1 values(400, 'a'); +INSERT INTO table_to_split_1 values(500, 'a'); +SELECT * FROM table_to_split_1; + id | value +--------------------------------------------------------------------- + 100 | a + 400 | a + 500 | a +(3 rows) + +SELECT * FROM table_to_split_2; + id | value +--------------------------------------------------------------------- +(0 rows) + +SELECT * FROM table_to_split_3; + id | value +--------------------------------------------------------------------- +(0 rows) + +-- Expect data to be present in shard xxxxx and shard xxxxx based on the hash value. +\c - - - :worker_2_port +SET search_path TO split_shard_replication_setup_schema; +SELECT * FROM table_to_split_1; -- should alwasy have zero rows + id | value +--------------------------------------------------------------------- +(0 rows) + +SELECT wait_for_expected_rowcount_at_table('table_to_split_2', 1); + wait_for_expected_rowcount_at_table +--------------------------------------------------------------------- + +(1 row) + +SELECT * FROM table_to_split_2; + id | value +--------------------------------------------------------------------- + 400 | a +(1 row) + +SELECT wait_for_expected_rowcount_at_table('table_to_split_3', 2); + wait_for_expected_rowcount_at_table +--------------------------------------------------------------------- + +(1 row) + +SELECT * FROM table_to_split_3; + id | value +--------------------------------------------------------------------- + 100 | a + 500 | a +(2 rows) + +-- UPDATE data of table_to_split_1 from worker1 +\c - - - :worker_1_port +SET search_path TO split_shard_replication_setup_schema; +UPDATE table_to_split_1 SET value='b' WHERE id = 100; +UPDATE table_to_split_1 SET value='b' WHERE id = 400; +UPDATE table_to_split_1 SET value='b' WHERE id = 500; +\c - - - :worker_2_port +SET search_path TO split_shard_replication_setup_schema; +SELECT * FROM table_to_split_1; + id | value +--------------------------------------------------------------------- +(0 rows) + +-- Value should be updated in table_to_split_2; +SELECT wait_for_updated_rowcount_at_table('table_to_split_2', 1); + wait_for_updated_rowcount_at_table +--------------------------------------------------------------------- + +(1 row) + +SELECT * FROM table_to_split_2; + id | value +--------------------------------------------------------------------- + 400 | b +(1 row) + +-- Value should be updated in table_to_split_3; +SELECT wait_for_updated_rowcount_at_table('table_to_split_3', 2); + wait_for_updated_rowcount_at_table +--------------------------------------------------------------------- + +(1 row) + +SELECT * FROM table_to_split_3; + id | value +--------------------------------------------------------------------- + 100 | b + 500 | b +(2 rows) + +\c - - - :worker_1_port +SET search_path TO split_shard_replication_setup_schema; +DELETE FROM table_to_split_1; +-- Child shard rows should be deleted +\c - - - :worker_2_port +SET search_path TO split_shard_replication_setup_schema; +SELECT wait_for_expected_rowcount_at_table('table_to_split_1', 0); + wait_for_expected_rowcount_at_table +--------------------------------------------------------------------- + +(1 row) + +SELECT * FROM table_to_split_1; + id | value +--------------------------------------------------------------------- +(0 rows) + +SELECT wait_for_expected_rowcount_at_table('table_to_split_2', 0); + wait_for_expected_rowcount_at_table +--------------------------------------------------------------------- + +(1 row) + +SELECT * FROM table_to_split_2; + id | value +--------------------------------------------------------------------- +(0 rows) + +SELECT wait_for_expected_rowcount_at_table('table_to_split_3', 0); + wait_for_expected_rowcount_at_table +--------------------------------------------------------------------- + +(1 row) + +SELECT * FROM table_to_split_3; + id | value +--------------------------------------------------------------------- +(0 rows) + + -- drop publication from worker1 +\c - - - :worker_1_port +SET search_path TO split_shard_replication_setup_schema; +DROP PUBLICATION pub1; +\c - - - :worker_2_port +SET search_path TO split_shard_replication_setup_schema; +SET client_min_messages TO ERROR; +DROP SUBSCRIPTION sub1; +\c - - - :master_port +-- Test scenario (Parent and one child on same node. Other child on different node) +-- 1. table_to_split_1 is split into table_to_split_2 and table_to_split_3. +-- 2. table_to_split_1 is located on worker1. +-- 3. table_to_split_2 is located on worker1 and table_to_split_3 is located on worker2 +SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset +SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \gset +\c - - - :worker_1_port +SET search_path TO split_shard_replication_setup_schema; +-- Create publication at worker1 +CREATE PUBLICATION pub1 FOR TABLE table_to_split_1, table_to_split_2, table_to_split_3; +SELECT count(*) FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[ + ROW(1, 'id', 2, '-2147483648', '-1', :worker_1_node)::pg_catalog.split_shard_info, + ROW(1, 'id', 3, '0', '2147483647', :worker_2_node)::pg_catalog.split_shard_info + ], 0); +WARNING: Previous split shard worflow was not successfully and could not complete the cleanup phase. Continuing with the current split shard workflow. + count +--------------------------------------------------------------------- + 2 +(1 row) + +-- we create replication slots with a name including the next_operation_id as a suffix +-- if this test file fails, make sure you compare the next_operation_id output to the object name in the next command +SHOW citus.next_operation_id; + citus.next_operation_id +--------------------------------------------------------------------- + 0 +(1 row) + +SELECT slot_name AS slot_for_worker1 FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_slot_%s_10_0', :worker_1_node), 'citus') \gset +SELECT slot_name AS slot_for_worker2 FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_slot_%s_10_0', :worker_2_node), 'citus') \gset +-- Create subscription at worker1 with copy_data to 'false' and 'slot_for_worker1' +CREATE SUBSCRIPTION sub_worker1 + CONNECTION 'host=localhost port=xxxxx user=postgres dbname=regression' + PUBLICATION pub1 + WITH ( + create_slot=false, + enabled=true, + slot_name=:slot_for_worker1, + copy_data=false); +\c - - - :worker_2_port +SET search_path TO split_shard_replication_setup_schema; +-- Create subscription at worker2 with copy_data to 'false' and 'slot_for_worker2' +CREATE SUBSCRIPTION sub_worker2 + CONNECTION 'host=localhost port=xxxxx user=postgres dbname=regression' + PUBLICATION pub1 + WITH ( + create_slot=false, + enabled=true, + slot_name=:slot_for_worker2, + copy_data=false); +-- No data is present at this moment in all the below tables at worker2 +SELECT * FROM table_to_split_1; + id | value +--------------------------------------------------------------------- +(0 rows) + +SELECT * FROM table_to_split_2; + id | value +--------------------------------------------------------------------- +(0 rows) + +SELECT * FROM table_to_split_3; + id | value +--------------------------------------------------------------------- +(0 rows) + +-- Insert data in table_to_split_1 at worker1 +\c - - - :worker_1_port +SET search_path TO split_shard_replication_setup_schema; +INSERT INTO table_to_split_1 VALUES(100, 'a'); +INSERT INTO table_to_split_1 VALUES(400, 'a'); +INSERT INTO table_to_split_1 VALUES(500, 'a'); +UPDATE table_to_split_1 SET value='b' WHERE id = 400; +SELECT * FROM table_to_split_1; + id | value +--------------------------------------------------------------------- + 100 | a + 500 | a + 400 | b +(3 rows) + +-- expect data to present in table_to_split_2 on worker1 as its destination for value '400' +SELECT wait_for_expected_rowcount_at_table('table_to_split_2', 1); + wait_for_expected_rowcount_at_table +--------------------------------------------------------------------- + +(1 row) + +SELECT id FROM table_to_split_2; + id +--------------------------------------------------------------------- + 400 +(1 row) + +SELECT * FROM table_to_split_3; + id | value +--------------------------------------------------------------------- +(0 rows) + +-- Expect data to be present only in table_to_split3 on worker2 +\c - - - :worker_2_port +SET search_path TO split_shard_replication_setup_schema; +SELECT * FROM table_to_split_1; + id | value +--------------------------------------------------------------------- +(0 rows) + +SELECT * FROM table_to_split_2; + id | value +--------------------------------------------------------------------- +(0 rows) + +SELECT wait_for_expected_rowcount_at_table('table_to_split_3', 2); + wait_for_expected_rowcount_at_table +--------------------------------------------------------------------- + +(1 row) + +SELECT * FROM table_to_split_3; + id | value +--------------------------------------------------------------------- + 100 | a + 500 | a +(2 rows) + +-- delete all from table_to_split_1 +\c - - - :worker_1_port +SET search_path TO split_shard_replication_setup_schema; +DELETE FROM table_to_split_1; +-- rows from table_to_split_2 should be deleted +SELECT wait_for_expected_rowcount_at_table('table_to_split_2', 0); + wait_for_expected_rowcount_at_table +--------------------------------------------------------------------- + +(1 row) + +SELECT * FROM table_to_split_2; + id | value +--------------------------------------------------------------------- +(0 rows) + +-- rows from table_to_split_3 should be deleted +\c - - - :worker_2_port +SET search_path TO split_shard_replication_setup_schema; +SELECT wait_for_expected_rowcount_at_table('table_to_split_3', 0); + wait_for_expected_rowcount_at_table +--------------------------------------------------------------------- + +(1 row) + +SELECT * FROM table_to_split_3; + id | value +--------------------------------------------------------------------- +(0 rows) + +\c - - - :worker_2_port +SET search_path TO split_shard_replication_setup_schema; +SET client_min_messages TO ERROR; +DROP SUBSCRIPTION sub_worker2; + -- drop publication from worker1 +\c - - - :worker_1_port +SET search_path TO split_shard_replication_setup_schema; +SET client_min_messages TO ERROR; +DROP SUBSCRIPTION sub_worker1; +DROP PUBLICATION pub1; +\c - - - :master_port +-- Test scenario (parent shard and child shards are located on same machine) +-- 1. table_to_split_1 is split into table_to_split_2 and table_to_split_3. +-- 2. table_to_split_1 is located on worker1. +-- 3. table_to_split_2 and table_to_split_3 are located on worker1 +SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset +SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \gset +\c - - - :worker_1_port +SET search_path TO split_shard_replication_setup_schema; +SET client_min_messages TO ERROR; +-- Create publication at worker1 +CREATE PUBLICATION pub1 for table table_to_split_1, table_to_split_2, table_to_split_3; +-- Worker1 is target for table_to_split_2 and table_to_split_3 +SELECT count(*) FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[ + ROW(1, 'id', 2, '-2147483648', '-1', :worker_1_node)::pg_catalog.split_shard_info, + ROW(1, 'id', 3, '0', '2147483647', :worker_1_node)::pg_catalog.split_shard_info + ], 0); + count +--------------------------------------------------------------------- + 1 +(1 row) + +-- we create replication slots with a name including the next_operation_id as a suffix +-- if this test file fails, make sure you compare the next_operation_id output to the object name in the next command +SHOW citus.next_operation_id; + citus.next_operation_id +--------------------------------------------------------------------- + 0 +(1 row) + +SELECT slot_name AS local_slot FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_slot_%s_10_0', :worker_1_node), 'citus') \gset +-- Create subscription at worker1 with copy_data to 'false' a +BEGIN; +CREATE SUBSCRIPTION local_subscription + CONNECTION 'host=localhost port=xxxxx user=postgres dbname=regression' + PUBLICATION pub1 + WITH ( + create_slot=false, + enabled=true, + slot_name=:local_slot, + copy_data=false); +COMMIT; +INSERT INTO table_to_split_1 VALUES(100, 'a'); +INSERT INTO table_to_split_1 VALUES(400, 'a'); +INSERT INTO table_to_split_1 VALUES(500, 'a'); +-- expect data to present in table_to_split_2/3 on worker1 +SELECT * FROM table_to_split_1; + id | value +--------------------------------------------------------------------- + 100 | a + 400 | a + 500 | a +(3 rows) + +SELECT wait_for_expected_rowcount_at_table('table_to_split_2', 1); + wait_for_expected_rowcount_at_table +--------------------------------------------------------------------- + +(1 row) + +SELECT * FROM table_to_split_2; + id | value +--------------------------------------------------------------------- + 400 | a +(1 row) + +SELECT wait_for_expected_rowcount_at_table('table_to_split_3', 2); + wait_for_expected_rowcount_at_table +--------------------------------------------------------------------- + +(1 row) + +SELECT * FROM table_to_split_3; + id | value +--------------------------------------------------------------------- + 100 | a + 500 | a +(2 rows) + +DELETE FROM table_to_split_1; +SELECT wait_for_expected_rowcount_at_table('table_to_split_1', 0); + wait_for_expected_rowcount_at_table +--------------------------------------------------------------------- + +(1 row) + +SELECT * FROM table_to_split_1; + id | value +--------------------------------------------------------------------- +(0 rows) + +SELECT wait_for_expected_rowcount_at_table('table_to_split_2', 0); + wait_for_expected_rowcount_at_table +--------------------------------------------------------------------- + +(1 row) + +SELECT * FROM table_to_split_2; + id | value +--------------------------------------------------------------------- +(0 rows) + +SELECT wait_for_expected_rowcount_at_table('table_to_split_3', 0); + wait_for_expected_rowcount_at_table +--------------------------------------------------------------------- + +(1 row) + +SELECT * FROM table_to_split_3; + id | value +--------------------------------------------------------------------- +(0 rows) + +-- clean up +DROP SUBSCRIPTION local_subscription; +DROP PUBLICATION pub1; +\c - - - :master_port +CREATE USER myuser; +CREATE USER admin_user; +GRANT USAGE, CREATE ON SCHEMA split_shard_replication_setup_schema, public to myuser; +GRANT USAGE, CREATE ON SCHEMA split_shard_replication_setup_schema, public to admin_user; +SET search_path TO split_shard_replication_setup_schema; +SET citus.shard_replication_factor TO 1; +SET citus.shard_count TO 1; +SET citus.next_shard_id TO 4; +SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset +SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \gset +\c - myuser - - +SET search_path TO split_shard_replication_setup_schema; +SET citus.shard_replication_factor TO 1; +SET citus.shard_count TO 1; +SET citus.next_shard_id TO 4; +CREATE TABLE table_first (id bigserial PRIMARY KEY, value char); +SELECT create_distributed_table('table_first','id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +\c - admin_user - - +SET search_path TO split_shard_replication_setup_schema; +SET citus.next_shard_id TO 7; +SET citus.shard_count TO 1; +CREATE TABLE table_second (id bigserial PRIMARY KEY, value char); +SELECT create_distributed_table('table_second', 'id', colocate_with => 'table_first'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +\c - myuser - :worker_1_port +SET search_path TO split_shard_replication_setup_schema; +CREATE TABLE table_first_5(id bigserial PRIMARY KEY, value char); +CREATE TABLE table_first_6(id bigserial PRIMARY KEY, value char); +\c - myuser - :worker_2_port +SET search_path TO split_shard_replication_setup_schema; +CREATE TABLE table_first_4(id bigserial PRIMARY KEY, value char); +CREATE TABLE table_first_5(id bigserial PRIMARY KEY, value char); +CREATE TABLE table_first_6(id bigserial PRIMARY KEY, value char); +\c - admin_user - :worker_1_port +SET search_path TO split_shard_replication_setup_schema; +CREATE TABLE table_second_8(id bigserial PRIMARY KEY, value char); +CREATE TABLE table_second_9(id bigserial PRIMARY KEY, value char); +\c - admin_user - :worker_2_port +SET search_path TO split_shard_replication_setup_schema; +CREATE TABLE table_second_7(id bigserial PRIMARY KEY, value char); +CREATE TABLE table_second_8(id bigserial PRIMARY KEY, value char); +CREATE TABLE table_second_9(id bigserial PRIMARY KEY, value char); +--- Test scenario one starts from here +--- 1. table_first and table_second are colocated tables. +--- 2. myuser is the owner table_first and admin_user is the owner of table_second. +--- 3. Shard table_first_4 and table_second_7 are colocated on worker1 +--- 4. table_first_4 is split into table_first_5 and table_first_6 with target as worker2 +--- 5. table_second_7 is split into table_second_8 and table_second_9 with target as worker2 +--- 6. Create two publishers and two subscribers for respective table owners. +--- 7. Insert into table_first_4 and table_second_7 at source worker1 +--- 8. Expect the results in child shards on worker2 +-- Create publication at worker1 +\c - postgres - :worker_1_port +SET search_path TO split_shard_replication_setup_schema; +CREATE PUBLICATION pub1 FOR TABLE table_first_4, table_first_5, table_first_6; +CREATE PUBLICATION pub2 FOR TABLE table_second_7, table_second_8, table_second_9; +SELECT count(*) FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[ + ROW(4, 'id', 5, '-2147483648', '-1', :worker_2_node)::pg_catalog.split_shard_info, + ROW(4, 'id', 6, '0', '2147483647', :worker_2_node)::pg_catalog.split_shard_info, + ROW(7, 'id', 8, '-2147483648', '-1', :worker_2_node)::pg_catalog.split_shard_info, + ROW(7, 'id', 9, '0', '2147483647', :worker_2_node)::pg_catalog.split_shard_info + ], 0); +WARNING: Previous split shard worflow was not successfully and could not complete the cleanup phase. Continuing with the current split shard workflow. + count +--------------------------------------------------------------------- + 2 +(1 row) + +SELECT relowner AS table_owner_one FROM pg_class WHERE relname='table_first' \gset +SELECT relowner AS table_owner_two FROM pg_class WHERE relname='table_second' \gset +-- we create replication slots with a name including the next_operation_id as a suffix +-- if this test file fails, make sure you compare the next_operation_id output to the object name in the next command +SHOW citus.next_operation_id; + citus.next_operation_id +--------------------------------------------------------------------- + 0 +(1 row) + +SELECT slot_name AS slot_for_first_owner FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_slot_%s_%s_0', :worker_2_node, :table_owner_one), 'citus') \gset +SELECT slot_name AS slot_for_second_owner FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_slot_%s_%s_0', :worker_2_node, :table_owner_two), 'citus') \gset +-- Create subscription at worker2 with copy_data to 'false' +\c - postgres - :worker_2_port +SET search_path TO split_shard_replication_setup_schema; +SET client_min_messages TO WARNING; +CREATE SUBSCRIPTION sub1 + CONNECTION 'host=localhost port=xxxxx user=postgres dbname=regression' + PUBLICATION pub1 + WITH ( + create_slot=false, + enabled=true, + slot_name=:slot_for_first_owner, + copy_data=false); +\c - myuser - :worker_1_port +SET search_path TO split_shard_replication_setup_schema; +INSERT INTO table_first_4 VALUES(100, 'a'); +INSERT INTO table_first_4 VALUES(400, 'a'); +INSERT INTO table_first_4 VALUES(500, 'a'); +SELECT wait_for_expected_rowcount_at_table('table_first_4', 3); + wait_for_expected_rowcount_at_table +--------------------------------------------------------------------- + +(1 row) + +SELECT * FROM table_first_4; + id | value +--------------------------------------------------------------------- + 100 | a + 400 | a + 500 | a +(3 rows) + +\c - admin_user - :worker_1_port +SET search_path TO split_shard_replication_setup_schema; +INSERT INTO table_second_7 VALUES(100, 'a'); +INSERT INTO table_second_7 VALUES(400, 'a'); +SELECT wait_for_expected_rowcount_at_table('table_second_7', 2); + wait_for_expected_rowcount_at_table +--------------------------------------------------------------------- + +(1 row) + +SELECT * FROM table_second_7; + id | value +--------------------------------------------------------------------- + 100 | a + 400 | a +(2 rows) + +-- expect data in table_first_5/6 +\c - myuser - :worker_2_port +SET search_path TO split_shard_replication_setup_schema; +SELECT * FROM table_first_4; + id | value +--------------------------------------------------------------------- +(0 rows) + +SELECT wait_for_expected_rowcount_at_table('table_first_5', 1); + wait_for_expected_rowcount_at_table +--------------------------------------------------------------------- + +(1 row) + +SELECT * FROM table_first_5; + id | value +--------------------------------------------------------------------- + 400 | a +(1 row) + +SELECT wait_for_expected_rowcount_at_table('table_first_6', 2); + wait_for_expected_rowcount_at_table +--------------------------------------------------------------------- + +(1 row) + +SELECT * FROM table_first_6; + id | value +--------------------------------------------------------------------- + 100 | a + 500 | a +(2 rows) + +-- should have zero rows in all the below tables as the subscription is not yet created for admin_user +\c - admin_user - :worker_2_port +SET search_path TO split_shard_replication_setup_schema; +SELECT * FROM table_second_7; + id | value +--------------------------------------------------------------------- +(0 rows) + +SELECT * FROM table_second_8; + id | value +--------------------------------------------------------------------- +(0 rows) + +SELECT * FROM table_second_9; + id | value +--------------------------------------------------------------------- +(0 rows) + +\c - postgres - :worker_2_port +SET search_path TO split_shard_replication_setup_schema; +SET client_min_messages TO WARNING; +CREATE SUBSCRIPTION sub2 + CONNECTION 'host=localhost port=xxxxx user=postgres dbname=regression' + PUBLICATION pub2 + WITH ( + create_slot=false, + enabled=true, + slot_name=:slot_for_second_owner, + copy_data=false); +-- expect data +\c - admin_user - :worker_2_port +SET search_path TO split_shard_replication_setup_schema; +SELECT * FROM table_second_7; + id | value +--------------------------------------------------------------------- +(0 rows) + +SELECT wait_for_expected_rowcount_at_table('table_second_8', 1); + wait_for_expected_rowcount_at_table +--------------------------------------------------------------------- + +(1 row) + +SELECT * FROM table_second_8; + id | value +--------------------------------------------------------------------- + 400 | a +(1 row) + +SELECT wait_for_expected_rowcount_at_table('table_second_9', 1); + wait_for_expected_rowcount_at_table +--------------------------------------------------------------------- + +(1 row) + +SELECT * FROM table_second_9; + id | value +--------------------------------------------------------------------- + 100 | a +(1 row) + +\c - postgres - :worker_1_port +SET search_path TO split_shard_replication_setup_schema; +DROP PUBLICATION pub1; +DROP PUBLICATION pub2; +\c - postgres - :worker_2_port +SET search_path TO split_shard_replication_setup_schema; +SET client_min_messages TO ERROR; +DROP SUBSCRIPTION sub1; +DROP SUBSCRIPTION sub2; +\c - - - :master_port +-- Test Secneario +-- 1) Setup shared memory segment by calling worker_split_shard_replication_setup. +-- 2) Redo step 1 as the earlier memory. Redoing will trigger warning as earlier memory isn't released. +-- 3) Execute worker_split_shard_release_dsm to release the dynamic shared memory segment +-- 4) Redo step 1 and expect no warning as the earlier memory is cleanedup. +SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset +SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \gset +\c - - - :worker_1_port +SET search_path TO split_shard_replication_setup_schema; +SET client_min_messages TO ERROR; +SELECT count(*) FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[ + ROW(1, 'id', 2, '-2147483648', '-1', :worker_1_node)::pg_catalog.split_shard_info, + ROW(1, 'id', 3, '0', '2147483647', :worker_1_node)::pg_catalog.split_shard_info + ], 0); + count +--------------------------------------------------------------------- + 1 +(1 row) + +SET client_min_messages TO WARNING; +SELECT count(*) FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[ + ROW(1, 'id', 2, '-2147483648', '-1', :worker_1_node)::pg_catalog.split_shard_info, + ROW(1, 'id', 3, '0', '2147483647', :worker_1_node)::pg_catalog.split_shard_info + ], 0); +WARNING: Previous split shard worflow was not successfully and could not complete the cleanup phase. Continuing with the current split shard workflow. + count +--------------------------------------------------------------------- + 1 +(1 row) + +SELECT pg_catalog.worker_split_shard_release_dsm(); + worker_split_shard_release_dsm +--------------------------------------------------------------------- + +(1 row) + +SELECT count(*) FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[ + ROW(1, 'id', 2, '-2147483648', '-1', :worker_1_node)::pg_catalog.split_shard_info, + ROW(1, 'id', 3, '0', '2147483647', :worker_1_node)::pg_catalog.split_shard_info + ], 0); + count +--------------------------------------------------------------------- + 1 +(1 row) + +SELECT pg_catalog.worker_split_shard_release_dsm(); + worker_split_shard_release_dsm +--------------------------------------------------------------------- + +(1 row) + +-- cleanup, we are done with these manually created test tables +DROP TABLE table_to_split_1, table_to_split_2, table_to_split_3; +\c - - - :worker_2_port +SET search_path TO split_shard_replication_setup_schema; +DROP TABLE table_to_split_1, table_to_split_2, table_to_split_3; +\c - - - :master_port +CALL pg_catalog.citus_cleanup_orphaned_resources(); +SET client_min_messages TO ERROR; +DROP SCHEMA split_shard_replication_setup_schema CASCADE; +DROP OWNED BY myuser; +DROP USER myuser; +DROP OWNED BY admin_user; +DROP USER admin_user; diff --git a/src/test/regress/expected/split_shard_release_dsm.out b/src/test/regress/expected/split_shard_release_dsm.out deleted file mode 100644 index f35406b5a..000000000 --- a/src/test/regress/expected/split_shard_release_dsm.out +++ /dev/null @@ -1,45 +0,0 @@ --- Test Secneario --- 1) Setup shared memory segment by calling worker_split_shard_replication_setup. --- 2) Redo step 1 as the earlier memory. Redoing will trigger warning as earlier memory isn't released. --- 3) Execute worker_split_shard_release_dsm to release the dynamic shared memory segment --- 4) Redo step 1 and expect no warning as the earlier memory is cleanedup. -SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset -SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \gset -\c - - - :worker_1_port -SET search_path TO split_shard_replication_setup_schema; -SET client_min_messages TO ERROR; -SELECT count(*) FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[ - ROW(1, 'id', 2, '-2147483648', '-1', :worker_1_node)::pg_catalog.split_shard_info, - ROW(1, 'id', 3, '0', '2147483647', :worker_1_node)::pg_catalog.split_shard_info - ], 0); - count ---------------------------------------------------------------------- - 1 -(1 row) - -SET client_min_messages TO WARNING; -SELECT count(*) FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[ - ROW(1, 'id', 2, '-2147483648', '-1', :worker_1_node)::pg_catalog.split_shard_info, - ROW(1, 'id', 3, '0', '2147483647', :worker_1_node)::pg_catalog.split_shard_info - ], 0); -WARNING: Previous split shard worflow was not successfully and could not complete the cleanup phase. Continuing with the current split shard workflow. - count ---------------------------------------------------------------------- - 1 -(1 row) - -SELECT pg_catalog.worker_split_shard_release_dsm(); - worker_split_shard_release_dsm ---------------------------------------------------------------------- - -(1 row) - -SELECT count(*) FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[ - ROW(1, 'id', 2, '-2147483648', '-1', :worker_1_node)::pg_catalog.split_shard_info, - ROW(1, 'id', 3, '0', '2147483647', :worker_1_node)::pg_catalog.split_shard_info - ], 0); - count ---------------------------------------------------------------------- - 1 -(1 row) - diff --git a/src/test/regress/expected/split_shard_replication_colocated_setup.out b/src/test/regress/expected/split_shard_replication_colocated_setup.out deleted file mode 100644 index ab159bc1f..000000000 --- a/src/test/regress/expected/split_shard_replication_colocated_setup.out +++ /dev/null @@ -1,241 +0,0 @@ -\c - - - :master_port -CREATE USER myuser; -CREATE USER admin_user; -GRANT USAGE, CREATE ON SCHEMA split_shard_replication_setup_schema, public to myuser; -GRANT USAGE, CREATE ON SCHEMA split_shard_replication_setup_schema, public to admin_user; -SET search_path TO split_shard_replication_setup_schema; -SET citus.shard_replication_factor TO 1; -SET citus.shard_count TO 1; -SET citus.next_shard_id TO 4; -SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset -SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \gset -\c - myuser - - -SET search_path TO split_shard_replication_setup_schema; -SET citus.shard_replication_factor TO 1; -SET citus.shard_count TO 1; -SET citus.next_shard_id TO 4; -CREATE TABLE table_first (id bigserial PRIMARY KEY, value char); -SELECT create_distributed_table('table_first','id'); - create_distributed_table ---------------------------------------------------------------------- - -(1 row) - -\c - admin_user - - -SET search_path TO split_shard_replication_setup_schema; -SET citus.next_shard_id TO 7; -SET citus.shard_count TO 1; -CREATE TABLE table_second (id bigserial PRIMARY KEY, value char); -SELECT create_distributed_table('table_second', 'id', colocate_with => 'table_first'); - create_distributed_table ---------------------------------------------------------------------- - -(1 row) - -\c - myuser - :worker_1_port -SET search_path TO split_shard_replication_setup_schema; -CREATE TABLE table_first_5(id bigserial PRIMARY KEY, value char); -CREATE TABLE table_first_6(id bigserial PRIMARY KEY, value char); -\c - myuser - :worker_2_port -SET search_path TO split_shard_replication_setup_schema; -CREATE TABLE table_first_4(id bigserial PRIMARY KEY, value char); -CREATE TABLE table_first_5(id bigserial PRIMARY KEY, value char); -CREATE TABLE table_first_6(id bigserial PRIMARY KEY, value char); -\c - admin_user - :worker_1_port -SET search_path TO split_shard_replication_setup_schema; -CREATE TABLE table_second_8(id bigserial PRIMARY KEY, value char); -CREATE TABLE table_second_9(id bigserial PRIMARY KEY, value char); -\c - admin_user - :worker_2_port -SET search_path TO split_shard_replication_setup_schema; -CREATE TABLE table_second_7(id bigserial PRIMARY KEY, value char); -CREATE TABLE table_second_8(id bigserial PRIMARY KEY, value char); -CREATE TABLE table_second_9(id bigserial PRIMARY KEY, value char); ---- Test scenario one starts from here ---- 1. table_first and table_second are colocated tables. ---- 2. myuser is the owner table_first and admin_user is the owner of table_second. ---- 3. Shard table_first_4 and table_second_7 are colocated on worker1 ---- 4. table_first_4 is split into table_first_5 and table_first_6 with target as worker2 ---- 5. table_second_7 is split into table_second_8 and table_second_9 with target as worker2 ---- 6. Create two publishers and two subscribers for respective table owners. ---- 7. Insert into table_first_4 and table_second_7 at source worker1 ---- 8. Expect the results in child shards on worker2 --- Create publication at worker1 -\c - postgres - :worker_1_port -SET search_path TO split_shard_replication_setup_schema; -CREATE PUBLICATION pub1 FOR TABLE table_first_4, table_first_5, table_first_6; -CREATE PUBLICATION pub2 FOR TABLE table_second_7, table_second_8, table_second_9; -SELECT count(*) FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[ - ROW(4, 'id', 5, '-2147483648', '-1', :worker_2_node)::pg_catalog.split_shard_info, - ROW(4, 'id', 6, '0', '2147483647', :worker_2_node)::pg_catalog.split_shard_info, - ROW(7, 'id', 8, '-2147483648', '-1', :worker_2_node)::pg_catalog.split_shard_info, - ROW(7, 'id', 9, '0', '2147483647', :worker_2_node)::pg_catalog.split_shard_info - ], 0); -WARNING: Previous split shard worflow was not successfully and could not complete the cleanup phase. Continuing with the current split shard workflow. - count ---------------------------------------------------------------------- - 2 -(1 row) - -SELECT relowner AS table_owner_one FROM pg_class WHERE relname='table_first' \gset -SELECT relowner AS table_owner_two FROM pg_class WHERE relname='table_second' \gset --- we create replication slots with a name including the next_operation_id as a suffix --- if this test file fails, make sure you compare the next_operation_id output to the object name in the next command -SHOW citus.next_operation_id; - citus.next_operation_id ---------------------------------------------------------------------- - 0 -(1 row) - -SELECT slot_name AS slot_for_first_owner FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_slot_%s_%s_0', :worker_2_node, :table_owner_one), 'citus') \gset -SELECT slot_name AS slot_for_second_owner FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_slot_%s_%s_0', :worker_2_node, :table_owner_two), 'citus') \gset --- Create subscription at worker2 with copy_data to 'false' -\c - postgres - :worker_2_port -SET search_path TO split_shard_replication_setup_schema; -SET client_min_messages TO WARNING; -CREATE SUBSCRIPTION sub1 - CONNECTION 'host=localhost port=xxxxx user=postgres dbname=regression' - PUBLICATION pub1 - WITH ( - create_slot=false, - enabled=true, - slot_name=:slot_for_first_owner, - copy_data=false); -\c - myuser - :worker_1_port -SET search_path TO split_shard_replication_setup_schema; -INSERT INTO table_first_4 VALUES(100, 'a'); -INSERT INTO table_first_4 VALUES(400, 'a'); -INSERT INTO table_first_4 VALUES(500, 'a'); -SELECT wait_for_expected_rowcount_at_table('table_first_4', 3); - wait_for_expected_rowcount_at_table ---------------------------------------------------------------------- - -(1 row) - -SELECT * FROM table_first_4; - id | value ---------------------------------------------------------------------- - 100 | a - 400 | a - 500 | a -(3 rows) - -\c - admin_user - :worker_1_port -SET search_path TO split_shard_replication_setup_schema; -INSERT INTO table_second_7 VALUES(100, 'a'); -INSERT INTO table_second_7 VALUES(400, 'a'); -SELECT wait_for_expected_rowcount_at_table('table_second_7', 2); - wait_for_expected_rowcount_at_table ---------------------------------------------------------------------- - -(1 row) - -SELECT * FROM table_second_7; - id | value ---------------------------------------------------------------------- - 100 | a - 400 | a -(2 rows) - --- expect data in table_first_5/6 -\c - myuser - :worker_2_port -SET search_path TO split_shard_replication_setup_schema; -SELECT * FROM table_first_4; - id | value ---------------------------------------------------------------------- -(0 rows) - -SELECT wait_for_expected_rowcount_at_table('table_first_5', 1); - wait_for_expected_rowcount_at_table ---------------------------------------------------------------------- - -(1 row) - -SELECT * FROM table_first_5; - id | value ---------------------------------------------------------------------- - 400 | a -(1 row) - -SELECT wait_for_expected_rowcount_at_table('table_first_6', 2); - wait_for_expected_rowcount_at_table ---------------------------------------------------------------------- - -(1 row) - -SELECT * FROM table_first_6; - id | value ---------------------------------------------------------------------- - 100 | a - 500 | a -(2 rows) - --- should have zero rows in all the below tables as the subscription is not yet created for admin_user -\c - admin_user - :worker_2_port -SET search_path TO split_shard_replication_setup_schema; -SELECT * FROM table_second_7; - id | value ---------------------------------------------------------------------- -(0 rows) - -SELECT * FROM table_second_8; - id | value ---------------------------------------------------------------------- -(0 rows) - -SELECT * FROM table_second_9; - id | value ---------------------------------------------------------------------- -(0 rows) - -\c - postgres - :worker_2_port -SET search_path TO split_shard_replication_setup_schema; -SET client_min_messages TO WARNING; -CREATE SUBSCRIPTION sub2 - CONNECTION 'host=localhost port=xxxxx user=postgres dbname=regression' - PUBLICATION pub2 - WITH ( - create_slot=false, - enabled=true, - slot_name=:slot_for_second_owner, - copy_data=false); --- expect data -\c - admin_user - :worker_2_port -SET search_path TO split_shard_replication_setup_schema; -SELECT * FROM table_second_7; - id | value ---------------------------------------------------------------------- -(0 rows) - -SELECT wait_for_expected_rowcount_at_table('table_second_8', 1); - wait_for_expected_rowcount_at_table ---------------------------------------------------------------------- - -(1 row) - -SELECT * FROM table_second_8; - id | value ---------------------------------------------------------------------- - 400 | a -(1 row) - -SELECT wait_for_expected_rowcount_at_table('table_second_9', 1); - wait_for_expected_rowcount_at_table ---------------------------------------------------------------------- - -(1 row) - -SELECT * FROM table_second_9; - id | value ---------------------------------------------------------------------- - 100 | a -(1 row) - -\c - postgres - :worker_1_port -SET search_path TO split_shard_replication_setup_schema; -DROP PUBLICATION pub1; -DROP PUBLICATION pub2; -\c - postgres - :worker_2_port -SET search_path TO split_shard_replication_setup_schema; -SET client_min_messages TO ERROR; -DROP SUBSCRIPTION sub1; -DROP SUBSCRIPTION sub2; diff --git a/src/test/regress/expected/split_shard_replication_setup.out b/src/test/regress/expected/split_shard_replication_setup.out deleted file mode 100644 index 84370c983..000000000 --- a/src/test/regress/expected/split_shard_replication_setup.out +++ /dev/null @@ -1,253 +0,0 @@ -CREATE SCHEMA split_shard_replication_setup_schema; -SET search_path TO split_shard_replication_setup_schema; -SET citus.shard_replication_factor TO 1; -SET citus.shard_count TO 1; -SET citus.next_shard_id TO 1; -SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset -SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \gset -CREATE OR REPLACE FUNCTION wait_for_expected_rowcount_at_table(tableName text, expectedCount integer) RETURNS void AS $$ -DECLARE -actualCount integer; -BEGIN - EXECUTE FORMAT('SELECT COUNT(*) FROM %s', tableName) INTO actualCount; - WHILE expectedCount != actualCount LOOP - EXECUTE FORMAT('SELECT COUNT(*) FROM %s', tableName) INTO actualCount; - END LOOP; -END$$ LANGUAGE plpgsql; -CREATE OR REPLACE FUNCTION wait_for_updated_rowcount_at_table(tableName text, expectedCount integer) RETURNS void AS $$ -DECLARE -actualCount integer; -BEGIN - EXECUTE FORMAT($query$SELECT COUNT(*) FROM %s WHERE value='b'$query$, tableName) INTO actualCount; - WHILE expectedCount != actualCount LOOP - EXECUTE FORMAT($query$SELECT COUNT(*) FROM %s WHERE value='b'$query$, tableName) INTO actualCount; - END LOOP; -END$$ LANGUAGE plpgsql; --- Create distributed table (non co-located) -CREATE TABLE table_to_split (id bigserial PRIMARY KEY, value char); -SELECT create_distributed_table('table_to_split','id'); - create_distributed_table ---------------------------------------------------------------------- - -(1 row) - --- Test scenario one starts from here --- 1. table_to_split is a citus distributed table --- 2. Shard table_to_split_1 is located on worker1. --- 3. table_to_split_1 is split into table_to_split_2 and table_to_split_3. --- table_to_split_2/3 are located on worker2 --- 4. execute UDF split_shard_replication_setup on worker1 with below --- params: --- worker_split_shard_replication_setup --- ( --- ARRAY[ --- ROW(1 /*source shardId */, 2 /* new shardId */,-2147483648 /* minHashValue */, -1 /* maxHasValue */ , 18 /* nodeId where new shard is placed */ ), --- ROW(1, 3 , 0 , 2147483647, 18 ) --- ] --- ); --- 5. Create Replication slot with 'citus' --- 6. Setup Pub/Sub --- 7. Insert into table_to_split_1 at source worker1 --- 8. Expect the results in either table_to_split_2 or table_to_split_3 at worker2 -\c - - - :worker_2_port -SET search_path TO split_shard_replication_setup_schema; -CREATE TABLE table_to_split_1(id bigserial PRIMARY KEY, value char); -CREATE TABLE table_to_split_2(id bigserial PRIMARY KEY, value char); -CREATE TABLE table_to_split_3(id bigserial PRIMARY KEY, value char); --- Create dummy shard tables(table_to_split_2/3b) at worker1 --- This is needed for Pub/Sub framework to work. -\c - - - :worker_1_port -SET search_path TO split_shard_replication_setup_schema; -CREATE TABLE table_to_split_2(id bigserial PRIMARY KEY, value char); -CREATE TABLE table_to_split_3(id bigserial PRIMARY KEY, value char); --- Create publication at worker1 -CREATE PUBLICATION pub1 FOR TABLE table_to_split_1, table_to_split_2, table_to_split_3; -SELECT count(*) FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[ - ROW(1, 'id', 2, '-2147483648', '-1', :worker_2_node)::pg_catalog.split_shard_info, - ROW(1, 'id', 3, '0', '2147483647', :worker_2_node)::pg_catalog.split_shard_info - ], 0); - count ---------------------------------------------------------------------- - 1 -(1 row) - --- we create replication slots with a name including the next_operation_id as a suffix --- if this test file fails, make sure you compare the next_operation_id output to the object name in the next command -SHOW citus.next_operation_id; - citus.next_operation_id ---------------------------------------------------------------------- - 0 -(1 row) - -SELECT slot_name FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_slot_%s_10_0', :worker_2_node), 'citus') \gset --- Create subscription at worker2 with copy_data to 'false' and derived replication slot name -\c - - - :worker_2_port -SET search_path TO split_shard_replication_setup_schema; -CREATE SUBSCRIPTION sub1 - CONNECTION 'host=localhost port=xxxxx user=postgres dbname=regression' - PUBLICATION pub1 - WITH ( - create_slot=false, - enabled=true, - slot_name=:slot_name, - copy_data=false); --- No data is present at this moment in all the below tables at worker2 -SELECT * FROM table_to_split_1; - id | value ---------------------------------------------------------------------- -(0 rows) - -SELECT * FROM table_to_split_2; - id | value ---------------------------------------------------------------------- -(0 rows) - -SELECT * FROM table_to_split_3; - id | value ---------------------------------------------------------------------- -(0 rows) - --- Insert data in table_to_split_1 at worker1 -\c - - - :worker_1_port -SET search_path TO split_shard_replication_setup_schema; -INSERT INTO table_to_split_1 values(100, 'a'); -INSERT INTO table_to_split_1 values(400, 'a'); -INSERT INTO table_to_split_1 values(500, 'a'); -SELECT * FROM table_to_split_1; - id | value ---------------------------------------------------------------------- - 100 | a - 400 | a - 500 | a -(3 rows) - -SELECT * FROM table_to_split_2; - id | value ---------------------------------------------------------------------- -(0 rows) - -SELECT * FROM table_to_split_3; - id | value ---------------------------------------------------------------------- -(0 rows) - --- Expect data to be present in shard xxxxx and shard xxxxx based on the hash value. -\c - - - :worker_2_port -SET search_path TO split_shard_replication_setup_schema; -SELECT * FROM table_to_split_1; -- should alwasy have zero rows - id | value ---------------------------------------------------------------------- -(0 rows) - -SELECT wait_for_expected_rowcount_at_table('table_to_split_2', 1); - wait_for_expected_rowcount_at_table ---------------------------------------------------------------------- - -(1 row) - -SELECT * FROM table_to_split_2; - id | value ---------------------------------------------------------------------- - 400 | a -(1 row) - -SELECT wait_for_expected_rowcount_at_table('table_to_split_3', 2); - wait_for_expected_rowcount_at_table ---------------------------------------------------------------------- - -(1 row) - -SELECT * FROM table_to_split_3; - id | value ---------------------------------------------------------------------- - 100 | a - 500 | a -(2 rows) - --- UPDATE data of table_to_split_1 from worker1 -\c - - - :worker_1_port -SET search_path TO split_shard_replication_setup_schema; -UPDATE table_to_split_1 SET value='b' WHERE id = 100; -UPDATE table_to_split_1 SET value='b' WHERE id = 400; -UPDATE table_to_split_1 SET value='b' WHERE id = 500; -\c - - - :worker_2_port -SET search_path TO split_shard_replication_setup_schema; -SELECT * FROM table_to_split_1; - id | value ---------------------------------------------------------------------- -(0 rows) - --- Value should be updated in table_to_split_2; -SELECT wait_for_updated_rowcount_at_table('table_to_split_2', 1); - wait_for_updated_rowcount_at_table ---------------------------------------------------------------------- - -(1 row) - -SELECT * FROM table_to_split_2; - id | value ---------------------------------------------------------------------- - 400 | b -(1 row) - --- Value should be updated in table_to_split_3; -SELECT wait_for_updated_rowcount_at_table('table_to_split_3', 2); - wait_for_updated_rowcount_at_table ---------------------------------------------------------------------- - -(1 row) - -SELECT * FROM table_to_split_3; - id | value ---------------------------------------------------------------------- - 100 | b - 500 | b -(2 rows) - -\c - - - :worker_1_port -SET search_path TO split_shard_replication_setup_schema; -DELETE FROM table_to_split_1; --- Child shard rows should be deleted -\c - - - :worker_2_port -SET search_path TO split_shard_replication_setup_schema; -SELECT wait_for_expected_rowcount_at_table('table_to_split_1', 0); - wait_for_expected_rowcount_at_table ---------------------------------------------------------------------- - -(1 row) - -SELECT * FROM table_to_split_1; - id | value ---------------------------------------------------------------------- -(0 rows) - -SELECT wait_for_expected_rowcount_at_table('table_to_split_2', 0); - wait_for_expected_rowcount_at_table ---------------------------------------------------------------------- - -(1 row) - -SELECT * FROM table_to_split_2; - id | value ---------------------------------------------------------------------- -(0 rows) - -SELECT wait_for_expected_rowcount_at_table('table_to_split_3', 0); - wait_for_expected_rowcount_at_table ---------------------------------------------------------------------- - -(1 row) - -SELECT * FROM table_to_split_3; - id | value ---------------------------------------------------------------------- -(0 rows) - - -- drop publication from worker1 -\c - - - :worker_1_port -SET search_path TO split_shard_replication_setup_schema; -DROP PUBLICATION pub1; -\c - - - :worker_2_port -SET search_path TO split_shard_replication_setup_schema; -SET client_min_messages TO ERROR; -DROP SUBSCRIPTION sub1; diff --git a/src/test/regress/expected/split_shard_replication_setup_local.out b/src/test/regress/expected/split_shard_replication_setup_local.out deleted file mode 100644 index 05431d06e..000000000 --- a/src/test/regress/expected/split_shard_replication_setup_local.out +++ /dev/null @@ -1,115 +0,0 @@ --- Test scenario (parent shard and child shards are located on same machine) --- 1. table_to_split_1 is split into table_to_split_2 and table_to_split_3. --- 2. table_to_split_1 is located on worker1. --- 3. table_to_split_2 and table_to_split_3 are located on worker1 -SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset -SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \gset -\c - - - :worker_1_port -SET search_path TO split_shard_replication_setup_schema; -SET client_min_messages TO ERROR; --- Create publication at worker1 -CREATE PUBLICATION pub1 for table table_to_split_1, table_to_split_2, table_to_split_3; --- Worker1 is target for table_to_split_2 and table_to_split_3 -SELECT count(*) FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[ - ROW(1, 'id', 2, '-2147483648', '-1', :worker_1_node)::pg_catalog.split_shard_info, - ROW(1, 'id', 3, '0', '2147483647', :worker_1_node)::pg_catalog.split_shard_info - ], 0); - count ---------------------------------------------------------------------- - 1 -(1 row) - --- we create replication slots with a name including the next_operation_id as a suffix --- if this test file fails, make sure you compare the next_operation_id output to the object name in the next command -SHOW citus.next_operation_id; - citus.next_operation_id ---------------------------------------------------------------------- - 0 -(1 row) - -SELECT slot_name AS local_slot FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_slot_%s_10_0', :worker_1_node), 'citus') \gset --- Create subscription at worker1 with copy_data to 'false' a -BEGIN; -CREATE SUBSCRIPTION local_subscription - CONNECTION 'host=localhost port=xxxxx user=postgres dbname=regression' - PUBLICATION pub1 - WITH ( - create_slot=false, - enabled=true, - slot_name=:local_slot, - copy_data=false); -COMMIT; -INSERT INTO table_to_split_1 VALUES(100, 'a'); -INSERT INTO table_to_split_1 VALUES(400, 'a'); -INSERT INTO table_to_split_1 VALUES(500, 'a'); --- expect data to present in table_to_split_2/3 on worker1 -SELECT * FROM table_to_split_1; - id | value ---------------------------------------------------------------------- - 100 | a - 400 | a - 500 | a -(3 rows) - -SELECT wait_for_expected_rowcount_at_table('table_to_split_2', 1); - wait_for_expected_rowcount_at_table ---------------------------------------------------------------------- - -(1 row) - -SELECT * FROM table_to_split_2; - id | value ---------------------------------------------------------------------- - 400 | a -(1 row) - -SELECT wait_for_expected_rowcount_at_table('table_to_split_3', 2); - wait_for_expected_rowcount_at_table ---------------------------------------------------------------------- - -(1 row) - -SELECT * FROM table_to_split_3; - id | value ---------------------------------------------------------------------- - 100 | a - 500 | a -(2 rows) - -DELETE FROM table_to_split_1; -SELECT wait_for_expected_rowcount_at_table('table_to_split_1', 0); - wait_for_expected_rowcount_at_table ---------------------------------------------------------------------- - -(1 row) - -SELECT * FROM table_to_split_1; - id | value ---------------------------------------------------------------------- -(0 rows) - -SELECT wait_for_expected_rowcount_at_table('table_to_split_2', 0); - wait_for_expected_rowcount_at_table ---------------------------------------------------------------------- - -(1 row) - -SELECT * FROM table_to_split_2; - id | value ---------------------------------------------------------------------- -(0 rows) - -SELECT wait_for_expected_rowcount_at_table('table_to_split_3', 0); - wait_for_expected_rowcount_at_table ---------------------------------------------------------------------- - -(1 row) - -SELECT * FROM table_to_split_3; - id | value ---------------------------------------------------------------------- -(0 rows) - --- clean up -DROP SUBSCRIPTION local_subscription; -DROP PUBLICATION pub1; diff --git a/src/test/regress/expected/split_shard_replication_setup_remote_local.out b/src/test/regress/expected/split_shard_replication_setup_remote_local.out deleted file mode 100644 index e6e6fda1d..000000000 --- a/src/test/regress/expected/split_shard_replication_setup_remote_local.out +++ /dev/null @@ -1,165 +0,0 @@ --- Test scenario (Parent and one child on same node. Other child on different node) --- 1. table_to_split_1 is split into table_to_split_2 and table_to_split_3. --- 2. table_to_split_1 is located on worker1. --- 3. table_to_split_2 is located on worker1 and table_to_split_3 is located on worker2 -SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset -SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \gset -\c - - - :worker_1_port -SET search_path TO split_shard_replication_setup_schema; --- Create publication at worker1 -CREATE PUBLICATION pub1 FOR TABLE table_to_split_1, table_to_split_2, table_to_split_3; -SELECT count(*) FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[ - ROW(1, 'id', 2, '-2147483648', '-1', :worker_1_node)::pg_catalog.split_shard_info, - ROW(1, 'id', 3, '0', '2147483647', :worker_2_node)::pg_catalog.split_shard_info - ], 0); -WARNING: Previous split shard worflow was not successfully and could not complete the cleanup phase. Continuing with the current split shard workflow. - count ---------------------------------------------------------------------- - 2 -(1 row) - --- we create replication slots with a name including the next_operation_id as a suffix --- if this test file fails, make sure you compare the next_operation_id output to the object name in the next command -SHOW citus.next_operation_id; - citus.next_operation_id ---------------------------------------------------------------------- - 0 -(1 row) - -SELECT slot_name AS slot_for_worker1 FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_slot_%s_10_0', :worker_1_node), 'citus') \gset -SELECT slot_name AS slot_for_worker2 FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_slot_%s_10_0', :worker_2_node), 'citus') \gset --- Create subscription at worker1 with copy_data to 'false' and 'slot_for_worker1' -CREATE SUBSCRIPTION sub_worker1 - CONNECTION 'host=localhost port=xxxxx user=postgres dbname=regression' - PUBLICATION pub1 - WITH ( - create_slot=false, - enabled=true, - slot_name=:slot_for_worker1, - copy_data=false); -\c - - - :worker_2_port -SET search_path TO split_shard_replication_setup_schema; --- Create subscription at worker2 with copy_data to 'false' and 'slot_for_worker2' -CREATE SUBSCRIPTION sub_worker2 - CONNECTION 'host=localhost port=xxxxx user=postgres dbname=regression' - PUBLICATION pub1 - WITH ( - create_slot=false, - enabled=true, - slot_name=:slot_for_worker2, - copy_data=false); --- No data is present at this moment in all the below tables at worker2 -SELECT * FROM table_to_split_1; - id | value ---------------------------------------------------------------------- -(0 rows) - -SELECT * FROM table_to_split_2; - id | value ---------------------------------------------------------------------- -(0 rows) - -SELECT * FROM table_to_split_3; - id | value ---------------------------------------------------------------------- -(0 rows) - --- Insert data in table_to_split_1 at worker1 -\c - - - :worker_1_port -SET search_path TO split_shard_replication_setup_schema; -INSERT INTO table_to_split_1 VALUES(100, 'a'); -INSERT INTO table_to_split_1 VALUES(400, 'a'); -INSERT INTO table_to_split_1 VALUES(500, 'a'); -UPDATE table_to_split_1 SET value='b' WHERE id = 400; -SELECT * FROM table_to_split_1; - id | value ---------------------------------------------------------------------- - 100 | a - 500 | a - 400 | b -(3 rows) - --- expect data to present in table_to_split_2 on worker1 as its destination for value '400' -SELECT wait_for_expected_rowcount_at_table('table_to_split_2', 1); - wait_for_expected_rowcount_at_table ---------------------------------------------------------------------- - -(1 row) - -SELECT * FROM table_to_split_2; - id | value ---------------------------------------------------------------------- - 400 | b -(1 row) - -SELECT * FROM table_to_split_3; - id | value ---------------------------------------------------------------------- -(0 rows) - --- Expect data to be present only in table_to_split3 on worker2 -\c - - - :worker_2_port -SET search_path TO split_shard_replication_setup_schema; -SELECT * FROM table_to_split_1; - id | value ---------------------------------------------------------------------- -(0 rows) - -SELECT * FROM table_to_split_2; - id | value ---------------------------------------------------------------------- -(0 rows) - -SELECT wait_for_expected_rowcount_at_table('table_to_split_3', 2); - wait_for_expected_rowcount_at_table ---------------------------------------------------------------------- - -(1 row) - -SELECT * FROM table_to_split_3; - id | value ---------------------------------------------------------------------- - 100 | a - 500 | a -(2 rows) - --- delete all from table_to_split_1 -\c - - - :worker_1_port -SET search_path TO split_shard_replication_setup_schema; -DELETE FROM table_to_split_1; --- rows from table_to_split_2 should be deleted -SELECT wait_for_expected_rowcount_at_table('table_to_split_2', 0); - wait_for_expected_rowcount_at_table ---------------------------------------------------------------------- - -(1 row) - -SELECT * FROM table_to_split_2; - id | value ---------------------------------------------------------------------- -(0 rows) - --- rows from table_to_split_3 should be deleted -\c - - - :worker_2_port -SET search_path TO split_shard_replication_setup_schema; -SELECT wait_for_expected_rowcount_at_table('table_to_split_3', 0); - wait_for_expected_rowcount_at_table ---------------------------------------------------------------------- - -(1 row) - -SELECT * FROM table_to_split_3; - id | value ---------------------------------------------------------------------- -(0 rows) - -\c - - - :worker_2_port -SET search_path TO split_shard_replication_setup_schema; -SET client_min_messages TO ERROR; -DROP SUBSCRIPTION sub_worker2; - -- drop publication from worker1 -\c - - - :worker_1_port -SET search_path TO split_shard_replication_setup_schema; -SET client_min_messages TO ERROR; -DROP SUBSCRIPTION sub_worker1; -DROP PUBLICATION pub1; diff --git a/src/test/regress/split_schedule b/src/test/regress/split_schedule index 62ba469bf..2510260fb 100644 --- a/src/test/regress/split_schedule +++ b/src/test/regress/split_schedule @@ -7,16 +7,13 @@ test: tablespace # Helpers for foreign key catalogs. test: foreign_key_to_reference_table # Split tests go here. -test: split_shard_replication_setup -test: split_shard_replication_setup_remote_local -test: split_shard_replication_setup_local -test: split_shard_replication_colocated_setup -test: split_shard_release_dsm +test: split_shard test: worker_split_copy_test test: worker_split_binary_copy_test test: worker_split_text_copy_test test: citus_split_shard_by_split_points_negative test: citus_split_shard_by_split_points +test: citus_split_shard_by_split_points_deferred_drop test: citus_split_shard_by_split_points_failure # Name citus_split_shard_by_split_points_columnar_partitioned was too long and being truncated. # use citus_split_shard_columnar_partitioned instead. diff --git a/src/test/regress/sql/citus_split_shard_by_split_points.sql b/src/test/regress/sql/citus_split_shard_by_split_points.sql index c48dc22a4..f50130276 100644 --- a/src/test/regress/sql/citus_split_shard_by_split_points.sql +++ b/src/test/regress/sql/citus_split_shard_by_split_points.sql @@ -256,4 +256,5 @@ SELECT COUNT(*) FROM colocated_dist_table; ALTER SYSTEM RESET citus.defer_shard_delete_interval; SELECT pg_reload_conf(); DROP SCHEMA "citus_split_test_schema" CASCADE; +DROP USER test_split_role; --END : Cleanup diff --git a/src/test/regress/sql/citus_split_shard_by_split_points_deferred_drop.sql b/src/test/regress/sql/citus_split_shard_by_split_points_deferred_drop.sql index 1728355ab..c3ca23c88 100644 --- a/src/test/regress/sql/citus_split_shard_by_split_points_deferred_drop.sql +++ b/src/test/regress/sql/citus_split_shard_by_split_points_deferred_drop.sql @@ -24,8 +24,9 @@ SET citus.next_placement_id TO 8610000; SET citus.shard_count TO 2; SET citus.shard_replication_factor TO 1; SET citus.next_operation_id TO 777; -SET citus.next_cleanup_record_id TO 11; +SET citus.next_cleanup_record_id TO 511; SET ROLE test_split_role; +SET search_path TO "citus_split_shard_by_split_points_deferred_schema"; CREATE TABLE table_to_split(id int PRIMARY KEY, int_data int, data text); SELECT create_distributed_table('table_to_split', 'id'); @@ -48,29 +49,33 @@ SELECT pg_catalog.citus_split_shard_by_split_points( ARRAY[:worker_1_node, :worker_2_node], 'force_logical'); --- The original shards are marked for deferred drop with policy_type = 2. +-- The original shard is marked for deferred drop with policy_type = 2. +-- The previous shard should be dropped at the beginning of the second split call SELECT * from pg_dist_cleanup; --- The physical shards should not be deleted. +-- One of the physical shards should not be deleted, the other one should. \c - - - :worker_1_port -SELECT relname FROM pg_class where relname LIKE '%table_to_split_%' AND relkind = 'r'; +SELECT relname FROM pg_class where relname LIKE '%table_to_split_%' AND relkind = 'r' ORDER BY relname; \c - - - :worker_2_port -SELECT relname FROM pg_class where relname LIKE '%table_to_split_%' AND relkind = 'r'; +SELECT relname FROM pg_class where relname LIKE '%table_to_split_%' AND relkind = 'r' ORDER BY relname; -- Perform deferred drop cleanup. \c - postgres - :master_port -CALL citus_cleanup_orphaned_shards(); +CALL citus_cleanup_orphaned_resources(); -- Clenaup has been done. SELECT * from pg_dist_cleanup; +-- Verify that the shard to be dropped is dropped \c - - - :worker_1_port -SELECT relname FROM pg_class where relname LIKE '%table_to_split_%' AND relkind = 'r'; +SELECT relname FROM pg_class where relname LIKE '%table_to_split_%' AND relkind = 'r' ORDER BY relname; \c - - - :worker_2_port -SELECT relname FROM pg_class where relname LIKE '%table_to_split_%' AND relkind = 'r'; +SELECT relname FROM pg_class where relname LIKE '%table_to_split_%' AND relkind = 'r' ORDER BY relname; -- Test Cleanup \c - postgres - :master_port +SET client_min_messages TO WARNING; DROP SCHEMA "citus_split_shard_by_split_points_deferred_schema" CASCADE; +DROP USER test_split_role; diff --git a/src/test/regress/sql/split_shard.sql b/src/test/regress/sql/split_shard.sql new file mode 100644 index 000000000..f7c105076 --- /dev/null +++ b/src/test/regress/sql/split_shard.sql @@ -0,0 +1,551 @@ +CREATE SCHEMA split_shard_replication_setup_schema; +SET search_path TO split_shard_replication_setup_schema; +SET citus.shard_replication_factor TO 1; +SET citus.shard_count TO 1; +SET citus.next_shard_id TO 1; + +SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset +SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \gset + +CREATE OR REPLACE FUNCTION wait_for_expected_rowcount_at_table(tableName text, expectedCount integer) RETURNS void AS $$ +DECLARE +actualCount integer; +BEGIN + EXECUTE FORMAT('SELECT COUNT(*) FROM %s', tableName) INTO actualCount; + WHILE expectedCount != actualCount LOOP + EXECUTE FORMAT('SELECT COUNT(*) FROM %s', tableName) INTO actualCount; + END LOOP; +END$$ LANGUAGE plpgsql; + +CREATE OR REPLACE FUNCTION wait_for_updated_rowcount_at_table(tableName text, expectedCount integer) RETURNS void AS $$ +DECLARE +actualCount integer; +BEGIN + EXECUTE FORMAT($query$SELECT COUNT(*) FROM %s WHERE value='b'$query$, tableName) INTO actualCount; + WHILE expectedCount != actualCount LOOP + EXECUTE FORMAT($query$SELECT COUNT(*) FROM %s WHERE value='b'$query$, tableName) INTO actualCount; + END LOOP; +END$$ LANGUAGE plpgsql; + +-- Create distributed table (non co-located) +CREATE TABLE table_to_split (id bigserial PRIMARY KEY, value char); +SELECT create_distributed_table('table_to_split','id'); + +-- Test scenario one starts from here +-- 1. table_to_split is a citus distributed table +-- 2. Shard table_to_split_1 is located on worker1. +-- 3. table_to_split_1 is split into table_to_split_2 and table_to_split_3. +-- table_to_split_2/3 are located on worker2 +-- 4. execute UDF split_shard_replication_setup on worker1 with below +-- params: +-- worker_split_shard_replication_setup +-- ( +-- ARRAY[ +-- ROW(1 /*source shardId */, 2 /* new shardId */,-2147483648 /* minHashValue */, -1 /* maxHasValue */ , 18 /* nodeId where new shard is placed */ ), +-- ROW(1, 3 , 0 , 2147483647, 18 ) +-- ] +-- ); +-- 5. Create Replication slot with 'citus' +-- 6. Setup Pub/Sub +-- 7. Insert into table_to_split_1 at source worker1 +-- 8. Expect the results in either table_to_split_2 or table_to_split_3 at worker2 + +\c - - - :worker_2_port +SET search_path TO split_shard_replication_setup_schema; +CREATE TABLE table_to_split_1(id bigserial PRIMARY KEY, value char); +CREATE TABLE table_to_split_2(id bigserial PRIMARY KEY, value char); +CREATE TABLE table_to_split_3(id bigserial PRIMARY KEY, value char); + +-- Create dummy shard tables(table_to_split_2/3b) at worker1 +-- This is needed for Pub/Sub framework to work. +\c - - - :worker_1_port +SET search_path TO split_shard_replication_setup_schema; +CREATE TABLE table_to_split_2(id bigserial PRIMARY KEY, value char); +CREATE TABLE table_to_split_3(id bigserial PRIMARY KEY, value char); + +-- Create publication at worker1 +CREATE PUBLICATION pub1 FOR TABLE table_to_split_1, table_to_split_2, table_to_split_3; + +SELECT count(*) FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[ + ROW(1, 'id', 2, '-2147483648', '-1', :worker_2_node)::pg_catalog.split_shard_info, + ROW(1, 'id', 3, '0', '2147483647', :worker_2_node)::pg_catalog.split_shard_info + ], 0); + +-- we create replication slots with a name including the next_operation_id as a suffix +-- if this test file fails, make sure you compare the next_operation_id output to the object name in the next command +SHOW citus.next_operation_id; + +SELECT slot_name FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_slot_%s_10_0', :worker_2_node), 'citus') \gset + +-- Create subscription at worker2 with copy_data to 'false' and derived replication slot name +\c - - - :worker_2_port +SET search_path TO split_shard_replication_setup_schema; + +CREATE SUBSCRIPTION sub1 + CONNECTION 'host=localhost port=57637 user=postgres dbname=regression' + PUBLICATION pub1 + WITH ( + create_slot=false, + enabled=true, + slot_name=:slot_name, + copy_data=false); + +-- No data is present at this moment in all the below tables at worker2 +SELECT * FROM table_to_split_1; +SELECT * FROM table_to_split_2; +SELECT * FROM table_to_split_3; + +-- Insert data in table_to_split_1 at worker1 +\c - - - :worker_1_port +SET search_path TO split_shard_replication_setup_schema; +INSERT INTO table_to_split_1 values(100, 'a'); +INSERT INTO table_to_split_1 values(400, 'a'); +INSERT INTO table_to_split_1 values(500, 'a'); + +SELECT * FROM table_to_split_1; +SELECT * FROM table_to_split_2; +SELECT * FROM table_to_split_3; + + +-- Expect data to be present in shard 2 and shard 3 based on the hash value. +\c - - - :worker_2_port +SET search_path TO split_shard_replication_setup_schema; +SELECT * FROM table_to_split_1; -- should alwasy have zero rows + +SELECT wait_for_expected_rowcount_at_table('table_to_split_2', 1); +SELECT * FROM table_to_split_2; + +SELECT wait_for_expected_rowcount_at_table('table_to_split_3', 2); +SELECT * FROM table_to_split_3; + +-- UPDATE data of table_to_split_1 from worker1 +\c - - - :worker_1_port +SET search_path TO split_shard_replication_setup_schema; +UPDATE table_to_split_1 SET value='b' WHERE id = 100; +UPDATE table_to_split_1 SET value='b' WHERE id = 400; +UPDATE table_to_split_1 SET value='b' WHERE id = 500; + +\c - - - :worker_2_port +SET search_path TO split_shard_replication_setup_schema; +SELECT * FROM table_to_split_1; + +-- Value should be updated in table_to_split_2; +SELECT wait_for_updated_rowcount_at_table('table_to_split_2', 1); +SELECT * FROM table_to_split_2; + +-- Value should be updated in table_to_split_3; +SELECT wait_for_updated_rowcount_at_table('table_to_split_3', 2); +SELECT * FROM table_to_split_3; + +\c - - - :worker_1_port +SET search_path TO split_shard_replication_setup_schema; +DELETE FROM table_to_split_1; + +-- Child shard rows should be deleted +\c - - - :worker_2_port +SET search_path TO split_shard_replication_setup_schema; + +SELECT wait_for_expected_rowcount_at_table('table_to_split_1', 0); +SELECT * FROM table_to_split_1; + +SELECT wait_for_expected_rowcount_at_table('table_to_split_2', 0); +SELECT * FROM table_to_split_2; + +SELECT wait_for_expected_rowcount_at_table('table_to_split_3', 0); +SELECT * FROM table_to_split_3; + + -- drop publication from worker1 +\c - - - :worker_1_port +SET search_path TO split_shard_replication_setup_schema; +DROP PUBLICATION pub1; + +\c - - - :worker_2_port +SET search_path TO split_shard_replication_setup_schema; +SET client_min_messages TO ERROR; +DROP SUBSCRIPTION sub1; + +\c - - - :master_port +-- Test scenario (Parent and one child on same node. Other child on different node) +-- 1. table_to_split_1 is split into table_to_split_2 and table_to_split_3. +-- 2. table_to_split_1 is located on worker1. +-- 3. table_to_split_2 is located on worker1 and table_to_split_3 is located on worker2 +SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset +SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \gset + +\c - - - :worker_1_port +SET search_path TO split_shard_replication_setup_schema; + +-- Create publication at worker1 +CREATE PUBLICATION pub1 FOR TABLE table_to_split_1, table_to_split_2, table_to_split_3; + +SELECT count(*) FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[ + ROW(1, 'id', 2, '-2147483648', '-1', :worker_1_node)::pg_catalog.split_shard_info, + ROW(1, 'id', 3, '0', '2147483647', :worker_2_node)::pg_catalog.split_shard_info + ], 0); + +-- we create replication slots with a name including the next_operation_id as a suffix +-- if this test file fails, make sure you compare the next_operation_id output to the object name in the next command +SHOW citus.next_operation_id; + +SELECT slot_name AS slot_for_worker1 FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_slot_%s_10_0', :worker_1_node), 'citus') \gset +SELECT slot_name AS slot_for_worker2 FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_slot_%s_10_0', :worker_2_node), 'citus') \gset + +-- Create subscription at worker1 with copy_data to 'false' and 'slot_for_worker1' +CREATE SUBSCRIPTION sub_worker1 + CONNECTION 'host=localhost port=57637 user=postgres dbname=regression' + PUBLICATION pub1 + WITH ( + create_slot=false, + enabled=true, + slot_name=:slot_for_worker1, + copy_data=false); + +\c - - - :worker_2_port +SET search_path TO split_shard_replication_setup_schema; + +-- Create subscription at worker2 with copy_data to 'false' and 'slot_for_worker2' +CREATE SUBSCRIPTION sub_worker2 + CONNECTION 'host=localhost port=57637 user=postgres dbname=regression' + PUBLICATION pub1 + WITH ( + create_slot=false, + enabled=true, + slot_name=:slot_for_worker2, + copy_data=false); + +-- No data is present at this moment in all the below tables at worker2 +SELECT * FROM table_to_split_1; +SELECT * FROM table_to_split_2; +SELECT * FROM table_to_split_3; + +-- Insert data in table_to_split_1 at worker1 +\c - - - :worker_1_port +SET search_path TO split_shard_replication_setup_schema; +INSERT INTO table_to_split_1 VALUES(100, 'a'); +INSERT INTO table_to_split_1 VALUES(400, 'a'); +INSERT INTO table_to_split_1 VALUES(500, 'a'); +UPDATE table_to_split_1 SET value='b' WHERE id = 400; +SELECT * FROM table_to_split_1; + +-- expect data to present in table_to_split_2 on worker1 as its destination for value '400' +SELECT wait_for_expected_rowcount_at_table('table_to_split_2', 1); +SELECT id FROM table_to_split_2; +SELECT * FROM table_to_split_3; + +-- Expect data to be present only in table_to_split3 on worker2 +\c - - - :worker_2_port +SET search_path TO split_shard_replication_setup_schema; +SELECT * FROM table_to_split_1; +SELECT * FROM table_to_split_2; + +SELECT wait_for_expected_rowcount_at_table('table_to_split_3', 2); +SELECT * FROM table_to_split_3; + +-- delete all from table_to_split_1 +\c - - - :worker_1_port +SET search_path TO split_shard_replication_setup_schema; +DELETE FROM table_to_split_1; + +-- rows from table_to_split_2 should be deleted +SELECT wait_for_expected_rowcount_at_table('table_to_split_2', 0); +SELECT * FROM table_to_split_2; + +-- rows from table_to_split_3 should be deleted +\c - - - :worker_2_port +SET search_path TO split_shard_replication_setup_schema; + +SELECT wait_for_expected_rowcount_at_table('table_to_split_3', 0); +SELECT * FROM table_to_split_3; + +\c - - - :worker_2_port +SET search_path TO split_shard_replication_setup_schema; +SET client_min_messages TO ERROR; +DROP SUBSCRIPTION sub_worker2; + + -- drop publication from worker1 +\c - - - :worker_1_port +SET search_path TO split_shard_replication_setup_schema; +SET client_min_messages TO ERROR; +DROP SUBSCRIPTION sub_worker1; +DROP PUBLICATION pub1; + +\c - - - :master_port +-- Test scenario (parent shard and child shards are located on same machine) +-- 1. table_to_split_1 is split into table_to_split_2 and table_to_split_3. +-- 2. table_to_split_1 is located on worker1. +-- 3. table_to_split_2 and table_to_split_3 are located on worker1 +SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset +SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \gset + +\c - - - :worker_1_port +SET search_path TO split_shard_replication_setup_schema; +SET client_min_messages TO ERROR; + +-- Create publication at worker1 +CREATE PUBLICATION pub1 for table table_to_split_1, table_to_split_2, table_to_split_3; + +-- Worker1 is target for table_to_split_2 and table_to_split_3 +SELECT count(*) FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[ + ROW(1, 'id', 2, '-2147483648', '-1', :worker_1_node)::pg_catalog.split_shard_info, + ROW(1, 'id', 3, '0', '2147483647', :worker_1_node)::pg_catalog.split_shard_info + ], 0); + +-- we create replication slots with a name including the next_operation_id as a suffix +-- if this test file fails, make sure you compare the next_operation_id output to the object name in the next command +SHOW citus.next_operation_id; + +SELECT slot_name AS local_slot FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_slot_%s_10_0', :worker_1_node), 'citus') \gset + +-- Create subscription at worker1 with copy_data to 'false' a +BEGIN; +CREATE SUBSCRIPTION local_subscription + CONNECTION 'host=localhost port=57637 user=postgres dbname=regression' + PUBLICATION pub1 + WITH ( + create_slot=false, + enabled=true, + slot_name=:local_slot, + copy_data=false); +COMMIT; + +INSERT INTO table_to_split_1 VALUES(100, 'a'); +INSERT INTO table_to_split_1 VALUES(400, 'a'); +INSERT INTO table_to_split_1 VALUES(500, 'a'); + +-- expect data to present in table_to_split_2/3 on worker1 +SELECT * FROM table_to_split_1; + +SELECT wait_for_expected_rowcount_at_table('table_to_split_2', 1); +SELECT * FROM table_to_split_2; + +SELECT wait_for_expected_rowcount_at_table('table_to_split_3', 2); +SELECT * FROM table_to_split_3; + +DELETE FROM table_to_split_1; + +SELECT wait_for_expected_rowcount_at_table('table_to_split_1', 0); +SELECT * FROM table_to_split_1; + +SELECT wait_for_expected_rowcount_at_table('table_to_split_2', 0); +SELECT * FROM table_to_split_2; + +SELECT wait_for_expected_rowcount_at_table('table_to_split_3', 0); +SELECT * FROM table_to_split_3; + +-- clean up +DROP SUBSCRIPTION local_subscription; +DROP PUBLICATION pub1; + +\c - - - :master_port +CREATE USER myuser; +CREATE USER admin_user; + +GRANT USAGE, CREATE ON SCHEMA split_shard_replication_setup_schema, public to myuser; +GRANT USAGE, CREATE ON SCHEMA split_shard_replication_setup_schema, public to admin_user; + +SET search_path TO split_shard_replication_setup_schema; +SET citus.shard_replication_factor TO 1; +SET citus.shard_count TO 1; +SET citus.next_shard_id TO 4; + +SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset +SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \gset + +\c - myuser - - +SET search_path TO split_shard_replication_setup_schema; +SET citus.shard_replication_factor TO 1; +SET citus.shard_count TO 1; +SET citus.next_shard_id TO 4; +CREATE TABLE table_first (id bigserial PRIMARY KEY, value char); +SELECT create_distributed_table('table_first','id'); + +\c - admin_user - - +SET search_path TO split_shard_replication_setup_schema; +SET citus.next_shard_id TO 7; +SET citus.shard_count TO 1; +CREATE TABLE table_second (id bigserial PRIMARY KEY, value char); +SELECT create_distributed_table('table_second', 'id', colocate_with => 'table_first'); + +\c - myuser - :worker_1_port +SET search_path TO split_shard_replication_setup_schema; +CREATE TABLE table_first_5(id bigserial PRIMARY KEY, value char); +CREATE TABLE table_first_6(id bigserial PRIMARY KEY, value char); + +\c - myuser - :worker_2_port +SET search_path TO split_shard_replication_setup_schema; +CREATE TABLE table_first_4(id bigserial PRIMARY KEY, value char); +CREATE TABLE table_first_5(id bigserial PRIMARY KEY, value char); +CREATE TABLE table_first_6(id bigserial PRIMARY KEY, value char); + +\c - admin_user - :worker_1_port +SET search_path TO split_shard_replication_setup_schema; +CREATE TABLE table_second_8(id bigserial PRIMARY KEY, value char); +CREATE TABLE table_second_9(id bigserial PRIMARY KEY, value char); + +\c - admin_user - :worker_2_port +SET search_path TO split_shard_replication_setup_schema; +CREATE TABLE table_second_7(id bigserial PRIMARY KEY, value char); +CREATE TABLE table_second_8(id bigserial PRIMARY KEY, value char); +CREATE TABLE table_second_9(id bigserial PRIMARY KEY, value char); + +--- Test scenario one starts from here +--- 1. table_first and table_second are colocated tables. +--- 2. myuser is the owner table_first and admin_user is the owner of table_second. +--- 3. Shard table_first_4 and table_second_7 are colocated on worker1 +--- 4. table_first_4 is split into table_first_5 and table_first_6 with target as worker2 +--- 5. table_second_7 is split into table_second_8 and table_second_9 with target as worker2 +--- 6. Create two publishers and two subscribers for respective table owners. +--- 7. Insert into table_first_4 and table_second_7 at source worker1 +--- 8. Expect the results in child shards on worker2 + +-- Create publication at worker1 +\c - postgres - :worker_1_port +SET search_path TO split_shard_replication_setup_schema; +CREATE PUBLICATION pub1 FOR TABLE table_first_4, table_first_5, table_first_6; +CREATE PUBLICATION pub2 FOR TABLE table_second_7, table_second_8, table_second_9; + +SELECT count(*) FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[ + ROW(4, 'id', 5, '-2147483648', '-1', :worker_2_node)::pg_catalog.split_shard_info, + ROW(4, 'id', 6, '0', '2147483647', :worker_2_node)::pg_catalog.split_shard_info, + ROW(7, 'id', 8, '-2147483648', '-1', :worker_2_node)::pg_catalog.split_shard_info, + ROW(7, 'id', 9, '0', '2147483647', :worker_2_node)::pg_catalog.split_shard_info + ], 0); + +SELECT relowner AS table_owner_one FROM pg_class WHERE relname='table_first' \gset +SELECT relowner AS table_owner_two FROM pg_class WHERE relname='table_second' \gset + +-- we create replication slots with a name including the next_operation_id as a suffix +-- if this test file fails, make sure you compare the next_operation_id output to the object name in the next command +SHOW citus.next_operation_id; + +SELECT slot_name AS slot_for_first_owner FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_slot_%s_%s_0', :worker_2_node, :table_owner_one), 'citus') \gset + +SELECT slot_name AS slot_for_second_owner FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_slot_%s_%s_0', :worker_2_node, :table_owner_two), 'citus') \gset + +-- Create subscription at worker2 with copy_data to 'false' +\c - postgres - :worker_2_port +SET search_path TO split_shard_replication_setup_schema; +SET client_min_messages TO WARNING; +CREATE SUBSCRIPTION sub1 + CONNECTION 'host=localhost port=57637 user=postgres dbname=regression' + PUBLICATION pub1 + WITH ( + create_slot=false, + enabled=true, + slot_name=:slot_for_first_owner, + copy_data=false); + +\c - myuser - :worker_1_port +SET search_path TO split_shard_replication_setup_schema; +INSERT INTO table_first_4 VALUES(100, 'a'); +INSERT INTO table_first_4 VALUES(400, 'a'); +INSERT INTO table_first_4 VALUES(500, 'a'); + +SELECT wait_for_expected_rowcount_at_table('table_first_4', 3); +SELECT * FROM table_first_4; + +\c - admin_user - :worker_1_port +SET search_path TO split_shard_replication_setup_schema; +INSERT INTO table_second_7 VALUES(100, 'a'); +INSERT INTO table_second_7 VALUES(400, 'a'); + +SELECT wait_for_expected_rowcount_at_table('table_second_7', 2); +SELECT * FROM table_second_7; + +-- expect data in table_first_5/6 +\c - myuser - :worker_2_port +SET search_path TO split_shard_replication_setup_schema; +SELECT * FROM table_first_4; + +SELECT wait_for_expected_rowcount_at_table('table_first_5', 1); +SELECT * FROM table_first_5; + +SELECT wait_for_expected_rowcount_at_table('table_first_6', 2); +SELECT * FROM table_first_6; + +-- should have zero rows in all the below tables as the subscription is not yet created for admin_user +\c - admin_user - :worker_2_port +SET search_path TO split_shard_replication_setup_schema; +SELECT * FROM table_second_7; +SELECT * FROM table_second_8; +SELECT * FROM table_second_9; + +\c - postgres - :worker_2_port +SET search_path TO split_shard_replication_setup_schema; +SET client_min_messages TO WARNING; +CREATE SUBSCRIPTION sub2 + CONNECTION 'host=localhost port=57637 user=postgres dbname=regression' + PUBLICATION pub2 + WITH ( + create_slot=false, + enabled=true, + slot_name=:slot_for_second_owner, + copy_data=false); + +-- expect data +\c - admin_user - :worker_2_port +SET search_path TO split_shard_replication_setup_schema; +SELECT * FROM table_second_7; + +SELECT wait_for_expected_rowcount_at_table('table_second_8', 1); +SELECT * FROM table_second_8; + +SELECT wait_for_expected_rowcount_at_table('table_second_9', 1); +SELECT * FROM table_second_9; + +\c - postgres - :worker_1_port +SET search_path TO split_shard_replication_setup_schema; +DROP PUBLICATION pub1; +DROP PUBLICATION pub2; + +\c - postgres - :worker_2_port +SET search_path TO split_shard_replication_setup_schema; +SET client_min_messages TO ERROR; +DROP SUBSCRIPTION sub1; +DROP SUBSCRIPTION sub2; + +\c - - - :master_port +-- Test Secneario +-- 1) Setup shared memory segment by calling worker_split_shard_replication_setup. +-- 2) Redo step 1 as the earlier memory. Redoing will trigger warning as earlier memory isn't released. +-- 3) Execute worker_split_shard_release_dsm to release the dynamic shared memory segment +-- 4) Redo step 1 and expect no warning as the earlier memory is cleanedup. + +SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset +SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \gset + +\c - - - :worker_1_port +SET search_path TO split_shard_replication_setup_schema; +SET client_min_messages TO ERROR; +SELECT count(*) FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[ + ROW(1, 'id', 2, '-2147483648', '-1', :worker_1_node)::pg_catalog.split_shard_info, + ROW(1, 'id', 3, '0', '2147483647', :worker_1_node)::pg_catalog.split_shard_info + ], 0); + +SET client_min_messages TO WARNING; +SELECT count(*) FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[ + ROW(1, 'id', 2, '-2147483648', '-1', :worker_1_node)::pg_catalog.split_shard_info, + ROW(1, 'id', 3, '0', '2147483647', :worker_1_node)::pg_catalog.split_shard_info + ], 0); + +SELECT pg_catalog.worker_split_shard_release_dsm(); +SELECT count(*) FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[ + ROW(1, 'id', 2, '-2147483648', '-1', :worker_1_node)::pg_catalog.split_shard_info, + ROW(1, 'id', 3, '0', '2147483647', :worker_1_node)::pg_catalog.split_shard_info + ], 0); +SELECT pg_catalog.worker_split_shard_release_dsm(); + +-- cleanup, we are done with these manually created test tables +DROP TABLE table_to_split_1, table_to_split_2, table_to_split_3; + +\c - - - :worker_2_port +SET search_path TO split_shard_replication_setup_schema; +DROP TABLE table_to_split_1, table_to_split_2, table_to_split_3; +\c - - - :master_port +CALL pg_catalog.citus_cleanup_orphaned_resources(); +SET client_min_messages TO ERROR; +DROP SCHEMA split_shard_replication_setup_schema CASCADE; +DROP OWNED BY myuser; +DROP USER myuser; +DROP OWNED BY admin_user; +DROP USER admin_user; diff --git a/src/test/regress/sql/split_shard_release_dsm.sql b/src/test/regress/sql/split_shard_release_dsm.sql deleted file mode 100644 index 69394e207..000000000 --- a/src/test/regress/sql/split_shard_release_dsm.sql +++ /dev/null @@ -1,28 +0,0 @@ --- Test Secneario --- 1) Setup shared memory segment by calling worker_split_shard_replication_setup. --- 2) Redo step 1 as the earlier memory. Redoing will trigger warning as earlier memory isn't released. --- 3) Execute worker_split_shard_release_dsm to release the dynamic shared memory segment --- 4) Redo step 1 and expect no warning as the earlier memory is cleanedup. - -SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset -SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \gset - -\c - - - :worker_1_port -SET search_path TO split_shard_replication_setup_schema; -SET client_min_messages TO ERROR; -SELECT count(*) FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[ - ROW(1, 'id', 2, '-2147483648', '-1', :worker_1_node)::pg_catalog.split_shard_info, - ROW(1, 'id', 3, '0', '2147483647', :worker_1_node)::pg_catalog.split_shard_info - ], 0); - -SET client_min_messages TO WARNING; -SELECT count(*) FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[ - ROW(1, 'id', 2, '-2147483648', '-1', :worker_1_node)::pg_catalog.split_shard_info, - ROW(1, 'id', 3, '0', '2147483647', :worker_1_node)::pg_catalog.split_shard_info - ], 0); - -SELECT pg_catalog.worker_split_shard_release_dsm(); -SELECT count(*) FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[ - ROW(1, 'id', 2, '-2147483648', '-1', :worker_1_node)::pg_catalog.split_shard_info, - ROW(1, 'id', 3, '0', '2147483647', :worker_1_node)::pg_catalog.split_shard_info - ], 0); diff --git a/src/test/regress/sql/split_shard_replication_colocated_setup.sql b/src/test/regress/sql/split_shard_replication_colocated_setup.sql deleted file mode 100644 index 74a82e936..000000000 --- a/src/test/regress/sql/split_shard_replication_colocated_setup.sql +++ /dev/null @@ -1,167 +0,0 @@ -\c - - - :master_port -CREATE USER myuser; -CREATE USER admin_user; - -GRANT USAGE, CREATE ON SCHEMA split_shard_replication_setup_schema, public to myuser; -GRANT USAGE, CREATE ON SCHEMA split_shard_replication_setup_schema, public to admin_user; - -SET search_path TO split_shard_replication_setup_schema; -SET citus.shard_replication_factor TO 1; -SET citus.shard_count TO 1; -SET citus.next_shard_id TO 4; - -SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset -SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \gset - -\c - myuser - - -SET search_path TO split_shard_replication_setup_schema; -SET citus.shard_replication_factor TO 1; -SET citus.shard_count TO 1; -SET citus.next_shard_id TO 4; -CREATE TABLE table_first (id bigserial PRIMARY KEY, value char); -SELECT create_distributed_table('table_first','id'); - -\c - admin_user - - -SET search_path TO split_shard_replication_setup_schema; -SET citus.next_shard_id TO 7; -SET citus.shard_count TO 1; -CREATE TABLE table_second (id bigserial PRIMARY KEY, value char); -SELECT create_distributed_table('table_second', 'id', colocate_with => 'table_first'); - -\c - myuser - :worker_1_port -SET search_path TO split_shard_replication_setup_schema; -CREATE TABLE table_first_5(id bigserial PRIMARY KEY, value char); -CREATE TABLE table_first_6(id bigserial PRIMARY KEY, value char); - -\c - myuser - :worker_2_port -SET search_path TO split_shard_replication_setup_schema; -CREATE TABLE table_first_4(id bigserial PRIMARY KEY, value char); -CREATE TABLE table_first_5(id bigserial PRIMARY KEY, value char); -CREATE TABLE table_first_6(id bigserial PRIMARY KEY, value char); - -\c - admin_user - :worker_1_port -SET search_path TO split_shard_replication_setup_schema; -CREATE TABLE table_second_8(id bigserial PRIMARY KEY, value char); -CREATE TABLE table_second_9(id bigserial PRIMARY KEY, value char); - -\c - admin_user - :worker_2_port -SET search_path TO split_shard_replication_setup_schema; -CREATE TABLE table_second_7(id bigserial PRIMARY KEY, value char); -CREATE TABLE table_second_8(id bigserial PRIMARY KEY, value char); -CREATE TABLE table_second_9(id bigserial PRIMARY KEY, value char); - ---- Test scenario one starts from here ---- 1. table_first and table_second are colocated tables. ---- 2. myuser is the owner table_first and admin_user is the owner of table_second. ---- 3. Shard table_first_4 and table_second_7 are colocated on worker1 ---- 4. table_first_4 is split into table_first_5 and table_first_6 with target as worker2 ---- 5. table_second_7 is split into table_second_8 and table_second_9 with target as worker2 ---- 6. Create two publishers and two subscribers for respective table owners. ---- 7. Insert into table_first_4 and table_second_7 at source worker1 ---- 8. Expect the results in child shards on worker2 - --- Create publication at worker1 -\c - postgres - :worker_1_port -SET search_path TO split_shard_replication_setup_schema; -CREATE PUBLICATION pub1 FOR TABLE table_first_4, table_first_5, table_first_6; -CREATE PUBLICATION pub2 FOR TABLE table_second_7, table_second_8, table_second_9; - -SELECT count(*) FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[ - ROW(4, 'id', 5, '-2147483648', '-1', :worker_2_node)::pg_catalog.split_shard_info, - ROW(4, 'id', 6, '0', '2147483647', :worker_2_node)::pg_catalog.split_shard_info, - ROW(7, 'id', 8, '-2147483648', '-1', :worker_2_node)::pg_catalog.split_shard_info, - ROW(7, 'id', 9, '0', '2147483647', :worker_2_node)::pg_catalog.split_shard_info - ], 0); - -SELECT relowner AS table_owner_one FROM pg_class WHERE relname='table_first' \gset -SELECT relowner AS table_owner_two FROM pg_class WHERE relname='table_second' \gset - --- we create replication slots with a name including the next_operation_id as a suffix --- if this test file fails, make sure you compare the next_operation_id output to the object name in the next command -SHOW citus.next_operation_id; - -SELECT slot_name AS slot_for_first_owner FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_slot_%s_%s_0', :worker_2_node, :table_owner_one), 'citus') \gset - -SELECT slot_name AS slot_for_second_owner FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_slot_%s_%s_0', :worker_2_node, :table_owner_two), 'citus') \gset - --- Create subscription at worker2 with copy_data to 'false' -\c - postgres - :worker_2_port -SET search_path TO split_shard_replication_setup_schema; -SET client_min_messages TO WARNING; -CREATE SUBSCRIPTION sub1 - CONNECTION 'host=localhost port=57637 user=postgres dbname=regression' - PUBLICATION pub1 - WITH ( - create_slot=false, - enabled=true, - slot_name=:slot_for_first_owner, - copy_data=false); - -\c - myuser - :worker_1_port -SET search_path TO split_shard_replication_setup_schema; -INSERT INTO table_first_4 VALUES(100, 'a'); -INSERT INTO table_first_4 VALUES(400, 'a'); -INSERT INTO table_first_4 VALUES(500, 'a'); - -SELECT wait_for_expected_rowcount_at_table('table_first_4', 3); -SELECT * FROM table_first_4; - -\c - admin_user - :worker_1_port -SET search_path TO split_shard_replication_setup_schema; -INSERT INTO table_second_7 VALUES(100, 'a'); -INSERT INTO table_second_7 VALUES(400, 'a'); - -SELECT wait_for_expected_rowcount_at_table('table_second_7', 2); -SELECT * FROM table_second_7; - --- expect data in table_first_5/6 -\c - myuser - :worker_2_port -SET search_path TO split_shard_replication_setup_schema; -SELECT * FROM table_first_4; - -SELECT wait_for_expected_rowcount_at_table('table_first_5', 1); -SELECT * FROM table_first_5; - -SELECT wait_for_expected_rowcount_at_table('table_first_6', 2); -SELECT * FROM table_first_6; - --- should have zero rows in all the below tables as the subscription is not yet created for admin_user -\c - admin_user - :worker_2_port -SET search_path TO split_shard_replication_setup_schema; -SELECT * FROM table_second_7; -SELECT * FROM table_second_8; -SELECT * FROM table_second_9; - -\c - postgres - :worker_2_port -SET search_path TO split_shard_replication_setup_schema; -SET client_min_messages TO WARNING; -CREATE SUBSCRIPTION sub2 - CONNECTION 'host=localhost port=57637 user=postgres dbname=regression' - PUBLICATION pub2 - WITH ( - create_slot=false, - enabled=true, - slot_name=:slot_for_second_owner, - copy_data=false); - --- expect data -\c - admin_user - :worker_2_port -SET search_path TO split_shard_replication_setup_schema; -SELECT * FROM table_second_7; - -SELECT wait_for_expected_rowcount_at_table('table_second_8', 1); -SELECT * FROM table_second_8; - -SELECT wait_for_expected_rowcount_at_table('table_second_9', 1); -SELECT * FROM table_second_9; - -\c - postgres - :worker_1_port -SET search_path TO split_shard_replication_setup_schema; -DROP PUBLICATION pub1; -DROP PUBLICATION pub2; - -\c - postgres - :worker_2_port -SET search_path TO split_shard_replication_setup_schema; -SET client_min_messages TO ERROR; -DROP SUBSCRIPTION sub1; -DROP SUBSCRIPTION sub2; diff --git a/src/test/regress/sql/split_shard_replication_setup.sql b/src/test/regress/sql/split_shard_replication_setup.sql deleted file mode 100644 index 0d13cf381..000000000 --- a/src/test/regress/sql/split_shard_replication_setup.sql +++ /dev/null @@ -1,166 +0,0 @@ -CREATE SCHEMA split_shard_replication_setup_schema; -SET search_path TO split_shard_replication_setup_schema; -SET citus.shard_replication_factor TO 1; -SET citus.shard_count TO 1; -SET citus.next_shard_id TO 1; - -SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset -SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \gset - -CREATE OR REPLACE FUNCTION wait_for_expected_rowcount_at_table(tableName text, expectedCount integer) RETURNS void AS $$ -DECLARE -actualCount integer; -BEGIN - EXECUTE FORMAT('SELECT COUNT(*) FROM %s', tableName) INTO actualCount; - WHILE expectedCount != actualCount LOOP - EXECUTE FORMAT('SELECT COUNT(*) FROM %s', tableName) INTO actualCount; - END LOOP; -END$$ LANGUAGE plpgsql; - -CREATE OR REPLACE FUNCTION wait_for_updated_rowcount_at_table(tableName text, expectedCount integer) RETURNS void AS $$ -DECLARE -actualCount integer; -BEGIN - EXECUTE FORMAT($query$SELECT COUNT(*) FROM %s WHERE value='b'$query$, tableName) INTO actualCount; - WHILE expectedCount != actualCount LOOP - EXECUTE FORMAT($query$SELECT COUNT(*) FROM %s WHERE value='b'$query$, tableName) INTO actualCount; - END LOOP; -END$$ LANGUAGE plpgsql; - --- Create distributed table (non co-located) -CREATE TABLE table_to_split (id bigserial PRIMARY KEY, value char); -SELECT create_distributed_table('table_to_split','id'); - --- Test scenario one starts from here --- 1. table_to_split is a citus distributed table --- 2. Shard table_to_split_1 is located on worker1. --- 3. table_to_split_1 is split into table_to_split_2 and table_to_split_3. --- table_to_split_2/3 are located on worker2 --- 4. execute UDF split_shard_replication_setup on worker1 with below --- params: --- worker_split_shard_replication_setup --- ( --- ARRAY[ --- ROW(1 /*source shardId */, 2 /* new shardId */,-2147483648 /* minHashValue */, -1 /* maxHasValue */ , 18 /* nodeId where new shard is placed */ ), --- ROW(1, 3 , 0 , 2147483647, 18 ) --- ] --- ); --- 5. Create Replication slot with 'citus' --- 6. Setup Pub/Sub --- 7. Insert into table_to_split_1 at source worker1 --- 8. Expect the results in either table_to_split_2 or table_to_split_3 at worker2 - -\c - - - :worker_2_port -SET search_path TO split_shard_replication_setup_schema; -CREATE TABLE table_to_split_1(id bigserial PRIMARY KEY, value char); -CREATE TABLE table_to_split_2(id bigserial PRIMARY KEY, value char); -CREATE TABLE table_to_split_3(id bigserial PRIMARY KEY, value char); - --- Create dummy shard tables(table_to_split_2/3b) at worker1 --- This is needed for Pub/Sub framework to work. -\c - - - :worker_1_port -SET search_path TO split_shard_replication_setup_schema; -CREATE TABLE table_to_split_2(id bigserial PRIMARY KEY, value char); -CREATE TABLE table_to_split_3(id bigserial PRIMARY KEY, value char); - --- Create publication at worker1 -CREATE PUBLICATION pub1 FOR TABLE table_to_split_1, table_to_split_2, table_to_split_3; - -SELECT count(*) FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[ - ROW(1, 'id', 2, '-2147483648', '-1', :worker_2_node)::pg_catalog.split_shard_info, - ROW(1, 'id', 3, '0', '2147483647', :worker_2_node)::pg_catalog.split_shard_info - ], 0); - --- we create replication slots with a name including the next_operation_id as a suffix --- if this test file fails, make sure you compare the next_operation_id output to the object name in the next command -SHOW citus.next_operation_id; - -SELECT slot_name FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_slot_%s_10_0', :worker_2_node), 'citus') \gset - --- Create subscription at worker2 with copy_data to 'false' and derived replication slot name -\c - - - :worker_2_port -SET search_path TO split_shard_replication_setup_schema; - -CREATE SUBSCRIPTION sub1 - CONNECTION 'host=localhost port=57637 user=postgres dbname=regression' - PUBLICATION pub1 - WITH ( - create_slot=false, - enabled=true, - slot_name=:slot_name, - copy_data=false); - --- No data is present at this moment in all the below tables at worker2 -SELECT * FROM table_to_split_1; -SELECT * FROM table_to_split_2; -SELECT * FROM table_to_split_3; - --- Insert data in table_to_split_1 at worker1 -\c - - - :worker_1_port -SET search_path TO split_shard_replication_setup_schema; -INSERT INTO table_to_split_1 values(100, 'a'); -INSERT INTO table_to_split_1 values(400, 'a'); -INSERT INTO table_to_split_1 values(500, 'a'); - -SELECT * FROM table_to_split_1; -SELECT * FROM table_to_split_2; -SELECT * FROM table_to_split_3; - - --- Expect data to be present in shard 2 and shard 3 based on the hash value. -\c - - - :worker_2_port -SET search_path TO split_shard_replication_setup_schema; -SELECT * FROM table_to_split_1; -- should alwasy have zero rows - -SELECT wait_for_expected_rowcount_at_table('table_to_split_2', 1); -SELECT * FROM table_to_split_2; - -SELECT wait_for_expected_rowcount_at_table('table_to_split_3', 2); -SELECT * FROM table_to_split_3; - --- UPDATE data of table_to_split_1 from worker1 -\c - - - :worker_1_port -SET search_path TO split_shard_replication_setup_schema; -UPDATE table_to_split_1 SET value='b' WHERE id = 100; -UPDATE table_to_split_1 SET value='b' WHERE id = 400; -UPDATE table_to_split_1 SET value='b' WHERE id = 500; - -\c - - - :worker_2_port -SET search_path TO split_shard_replication_setup_schema; -SELECT * FROM table_to_split_1; - --- Value should be updated in table_to_split_2; -SELECT wait_for_updated_rowcount_at_table('table_to_split_2', 1); -SELECT * FROM table_to_split_2; - --- Value should be updated in table_to_split_3; -SELECT wait_for_updated_rowcount_at_table('table_to_split_3', 2); -SELECT * FROM table_to_split_3; - -\c - - - :worker_1_port -SET search_path TO split_shard_replication_setup_schema; -DELETE FROM table_to_split_1; - --- Child shard rows should be deleted -\c - - - :worker_2_port -SET search_path TO split_shard_replication_setup_schema; - -SELECT wait_for_expected_rowcount_at_table('table_to_split_1', 0); -SELECT * FROM table_to_split_1; - -SELECT wait_for_expected_rowcount_at_table('table_to_split_2', 0); -SELECT * FROM table_to_split_2; - -SELECT wait_for_expected_rowcount_at_table('table_to_split_3', 0); -SELECT * FROM table_to_split_3; - - -- drop publication from worker1 -\c - - - :worker_1_port -SET search_path TO split_shard_replication_setup_schema; -DROP PUBLICATION pub1; - -\c - - - :worker_2_port -SET search_path TO split_shard_replication_setup_schema; -SET client_min_messages TO ERROR; -DROP SUBSCRIPTION sub1; - diff --git a/src/test/regress/sql/split_shard_replication_setup_local.sql b/src/test/regress/sql/split_shard_replication_setup_local.sql deleted file mode 100644 index e0927c659..000000000 --- a/src/test/regress/sql/split_shard_replication_setup_local.sql +++ /dev/null @@ -1,65 +0,0 @@ --- Test scenario (parent shard and child shards are located on same machine) --- 1. table_to_split_1 is split into table_to_split_2 and table_to_split_3. --- 2. table_to_split_1 is located on worker1. --- 3. table_to_split_2 and table_to_split_3 are located on worker1 -SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset -SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \gset - -\c - - - :worker_1_port -SET search_path TO split_shard_replication_setup_schema; -SET client_min_messages TO ERROR; - --- Create publication at worker1 -CREATE PUBLICATION pub1 for table table_to_split_1, table_to_split_2, table_to_split_3; - --- Worker1 is target for table_to_split_2 and table_to_split_3 -SELECT count(*) FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[ - ROW(1, 'id', 2, '-2147483648', '-1', :worker_1_node)::pg_catalog.split_shard_info, - ROW(1, 'id', 3, '0', '2147483647', :worker_1_node)::pg_catalog.split_shard_info - ], 0); - --- we create replication slots with a name including the next_operation_id as a suffix --- if this test file fails, make sure you compare the next_operation_id output to the object name in the next command -SHOW citus.next_operation_id; - -SELECT slot_name AS local_slot FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_slot_%s_10_0', :worker_1_node), 'citus') \gset - --- Create subscription at worker1 with copy_data to 'false' a -BEGIN; -CREATE SUBSCRIPTION local_subscription - CONNECTION 'host=localhost port=57637 user=postgres dbname=regression' - PUBLICATION pub1 - WITH ( - create_slot=false, - enabled=true, - slot_name=:local_slot, - copy_data=false); -COMMIT; - -INSERT INTO table_to_split_1 VALUES(100, 'a'); -INSERT INTO table_to_split_1 VALUES(400, 'a'); -INSERT INTO table_to_split_1 VALUES(500, 'a'); - --- expect data to present in table_to_split_2/3 on worker1 -SELECT * FROM table_to_split_1; - -SELECT wait_for_expected_rowcount_at_table('table_to_split_2', 1); -SELECT * FROM table_to_split_2; - -SELECT wait_for_expected_rowcount_at_table('table_to_split_3', 2); -SELECT * FROM table_to_split_3; - -DELETE FROM table_to_split_1; - -SELECT wait_for_expected_rowcount_at_table('table_to_split_1', 0); -SELECT * FROM table_to_split_1; - -SELECT wait_for_expected_rowcount_at_table('table_to_split_2', 0); -SELECT * FROM table_to_split_2; - -SELECT wait_for_expected_rowcount_at_table('table_to_split_3', 0); -SELECT * FROM table_to_split_3; - --- clean up -DROP SUBSCRIPTION local_subscription; -DROP PUBLICATION pub1; diff --git a/src/test/regress/sql/split_shard_replication_setup_remote_local.sql b/src/test/regress/sql/split_shard_replication_setup_remote_local.sql deleted file mode 100644 index 0a6e67a5f..000000000 --- a/src/test/regress/sql/split_shard_replication_setup_remote_local.sql +++ /dev/null @@ -1,103 +0,0 @@ --- Test scenario (Parent and one child on same node. Other child on different node) --- 1. table_to_split_1 is split into table_to_split_2 and table_to_split_3. --- 2. table_to_split_1 is located on worker1. --- 3. table_to_split_2 is located on worker1 and table_to_split_3 is located on worker2 -SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset -SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \gset - -\c - - - :worker_1_port -SET search_path TO split_shard_replication_setup_schema; - --- Create publication at worker1 -CREATE PUBLICATION pub1 FOR TABLE table_to_split_1, table_to_split_2, table_to_split_3; - -SELECT count(*) FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[ - ROW(1, 'id', 2, '-2147483648', '-1', :worker_1_node)::pg_catalog.split_shard_info, - ROW(1, 'id', 3, '0', '2147483647', :worker_2_node)::pg_catalog.split_shard_info - ], 0); - --- we create replication slots with a name including the next_operation_id as a suffix --- if this test file fails, make sure you compare the next_operation_id output to the object name in the next command -SHOW citus.next_operation_id; - -SELECT slot_name AS slot_for_worker1 FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_slot_%s_10_0', :worker_1_node), 'citus') \gset -SELECT slot_name AS slot_for_worker2 FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_slot_%s_10_0', :worker_2_node), 'citus') \gset - --- Create subscription at worker1 with copy_data to 'false' and 'slot_for_worker1' -CREATE SUBSCRIPTION sub_worker1 - CONNECTION 'host=localhost port=57637 user=postgres dbname=regression' - PUBLICATION pub1 - WITH ( - create_slot=false, - enabled=true, - slot_name=:slot_for_worker1, - copy_data=false); - -\c - - - :worker_2_port -SET search_path TO split_shard_replication_setup_schema; - --- Create subscription at worker2 with copy_data to 'false' and 'slot_for_worker2' -CREATE SUBSCRIPTION sub_worker2 - CONNECTION 'host=localhost port=57637 user=postgres dbname=regression' - PUBLICATION pub1 - WITH ( - create_slot=false, - enabled=true, - slot_name=:slot_for_worker2, - copy_data=false); - --- No data is present at this moment in all the below tables at worker2 -SELECT * FROM table_to_split_1; -SELECT * FROM table_to_split_2; -SELECT * FROM table_to_split_3; - --- Insert data in table_to_split_1 at worker1 -\c - - - :worker_1_port -SET search_path TO split_shard_replication_setup_schema; -INSERT INTO table_to_split_1 VALUES(100, 'a'); -INSERT INTO table_to_split_1 VALUES(400, 'a'); -INSERT INTO table_to_split_1 VALUES(500, 'a'); -UPDATE table_to_split_1 SET value='b' WHERE id = 400; -SELECT * FROM table_to_split_1; - --- expect data to present in table_to_split_2 on worker1 as its destination for value '400' -SELECT wait_for_expected_rowcount_at_table('table_to_split_2', 1); -SELECT * FROM table_to_split_2; -SELECT * FROM table_to_split_3; - --- Expect data to be present only in table_to_split3 on worker2 -\c - - - :worker_2_port -SET search_path TO split_shard_replication_setup_schema; -SELECT * FROM table_to_split_1; -SELECT * FROM table_to_split_2; - -SELECT wait_for_expected_rowcount_at_table('table_to_split_3', 2); -SELECT * FROM table_to_split_3; - --- delete all from table_to_split_1 -\c - - - :worker_1_port -SET search_path TO split_shard_replication_setup_schema; -DELETE FROM table_to_split_1; - --- rows from table_to_split_2 should be deleted -SELECT wait_for_expected_rowcount_at_table('table_to_split_2', 0); -SELECT * FROM table_to_split_2; - --- rows from table_to_split_3 should be deleted -\c - - - :worker_2_port -SET search_path TO split_shard_replication_setup_schema; - -SELECT wait_for_expected_rowcount_at_table('table_to_split_3', 0); -SELECT * FROM table_to_split_3; - -\c - - - :worker_2_port -SET search_path TO split_shard_replication_setup_schema; -SET client_min_messages TO ERROR; -DROP SUBSCRIPTION sub_worker2; - - -- drop publication from worker1 -\c - - - :worker_1_port -SET search_path TO split_shard_replication_setup_schema; -SET client_min_messages TO ERROR; -DROP SUBSCRIPTION sub_worker1; -DROP PUBLICATION pub1;