diff --git a/src/backend/distributed/operations/split_shard_replication_setup.c b/src/backend/distributed/operations/split_shard_replication_setup.c index 92dee0202..70d2888b5 100644 --- a/src/backend/distributed/operations/split_shard_replication_setup.c +++ b/src/backend/distributed/operations/split_shard_replication_setup.c @@ -311,7 +311,7 @@ PopulateShardSplitInfoInSM(ShardSplitInfoSMHeader *shardSplitInfoSMHeader, hash_seq_init(&status, shardInfoHashMap); NodeShardMappingEntry *entry = NULL; - int index = 0; + int splitInfoIndex = 0; while ((entry = (NodeShardMappingEntry *) hash_seq_search(&status)) != NULL) { uint32_t nodeId = entry->key.nodeId; @@ -323,10 +323,11 @@ PopulateShardSplitInfoInSM(ShardSplitInfoSMHeader *shardSplitInfoSMHeader, ShardSplitInfo *splitShardInfo = NULL; foreach_ptr(splitShardInfo, shardSplitInfoList) { - shardSplitInfoSMHeader->splitInfoArray[index] = *splitShardInfo; - strcpy_s(shardSplitInfoSMHeader->splitInfoArray[index].slotName, NAMEDATALEN, + shardSplitInfoSMHeader->splitInfoArray[splitInfoIndex] = *splitShardInfo; + strcpy_s(shardSplitInfoSMHeader->splitInfoArray[splitInfoIndex].slotName, + NAMEDATALEN, derivedSlotName); - index++; + splitInfoIndex++; } } } diff --git a/src/backend/distributed/shardsplit/shardsplit_shared_memory.c b/src/backend/distributed/shardsplit/shardsplit_shared_memory.c index d98c08d08..bd3750591 100644 --- a/src/backend/distributed/shardsplit/shardsplit_shared_memory.c +++ b/src/backend/distributed/shardsplit/shardsplit_shared_memory.c @@ -329,30 +329,30 @@ PopulateShardSplitInfoForReplicationSlot(char *slotName) MemoryContext oldContext = MemoryContextSwitchTo(TopMemoryContext); ShardSplitInfoForReplicationSlot *infoForReplicationSlot = - (ShardSplitInfoForReplicationSlot *) palloc( + (ShardSplitInfoForReplicationSlot *) palloc0( sizeof(ShardSplitInfoForReplicationSlot)); infoForReplicationSlot->shardSplitInfoHeader = smHeader; infoForReplicationSlot->startIndex = -1; infoForReplicationSlot->endIndex = -1; - int index = 0; - while (index < smHeader->count) + int splitInfoIndex = 0; + while (splitInfoIndex < smHeader->count) { - if (strcmp(smHeader->splitInfoArray[index].slotName, slotName) == 0) + if (strcmp(smHeader->splitInfoArray[splitInfoIndex].slotName, slotName) == 0) { /* Found the starting index from where current slot information begins */ - infoForReplicationSlot->startIndex = index; + infoForReplicationSlot->startIndex = splitInfoIndex; /* Slide forward to get the ending index */ - index++; - while (index < smHeader->count && strcmp( - smHeader->splitInfoArray[index].slotName, slotName) == 0) + splitInfoIndex++; + while (splitInfoIndex < smHeader->count && strcmp( + smHeader->splitInfoArray[splitInfoIndex].slotName, slotName) == 0) { - index++; + splitInfoIndex++; } /* Found ending index */ - infoForReplicationSlot->endIndex = index - 1; + infoForReplicationSlot->endIndex = splitInfoIndex - 1; /* * 'ShardSplitInfo' with same slot name are stored contiguously in shared memory segment. @@ -362,7 +362,7 @@ PopulateShardSplitInfoForReplicationSlot(char *slotName) break; } - index++; + splitInfoIndex++; } if (infoForReplicationSlot->startIndex == -1) diff --git a/src/test/regress/expected/split_shard_replication_colocated_setup.out b/src/test/regress/expected/split_shard_replication_colocated_setup.out index bfb493a79..cb513c023 100644 --- a/src/test/regress/expected/split_shard_replication_colocated_setup.out +++ b/src/test/regress/expected/split_shard_replication_colocated_setup.out @@ -80,12 +80,6 @@ SELECT relowner AS table_owner_one FROM pg_class WHERE relname='table_first' \gs SELECT relowner AS table_owner_two FROM pg_class WHERE relname='table_second' \gset SELECT slot_name AS slot_for_first_owner FROM pg_create_logical_replication_slot(FORMAT('citus_split_%s_%s', :worker_2_node, :table_owner_one), 'citus') \gset SELECT slot_name AS slot_for_second_owner FROM pg_create_logical_replication_slot(FORMAT('citus_split_%s_%s', :worker_2_node, :table_owner_two), 'citus') \gset -SELECT pg_sleep(5); - pg_sleep ---------------------------------------------------------------------- - -(1 row) - -- Create subscription at worker2 with copy_data to 'false' \c - postgres - :worker_2_port SET search_path TO split_shard_replication_setup_schema; @@ -98,27 +92,35 @@ CREATE SUBSCRIPTION sub1 enabled=true, slot_name=:slot_for_first_owner, copy_data=false); -SELECT pg_sleep(5); - pg_sleep ---------------------------------------------------------------------- - -(1 row) - \c - myuser - :worker_1_port SET search_path TO split_shard_replication_setup_schema; INSERT INTO table_first_4 VALUES(100, 'a'); INSERT INTO table_first_4 VALUES(400, 'a'); INSERT INTO table_first_4 VALUES(500, 'a'); -SELECT pg_sleep(2); - pg_sleep +SELECT wait_for_expected_rowcount_at_table('table_first_4', 3); + wait_for_expected_rowcount_at_table --------------------------------------------------------------------- (1 row) +SELECT * FROM table_first_4; + id | value +--------------------------------------------------------------------- + 100 | a + 400 | a + 500 | a +(3 rows) + \c - admin_user - :worker_1_port SET search_path TO split_shard_replication_setup_schema; INSERT INTO table_second_7 VALUES(100, 'a'); INSERT INTO table_second_7 VALUES(400, 'a'); +SELECT wait_for_expected_rowcount_at_table('table_second_7', 2); + wait_for_expected_rowcount_at_table +--------------------------------------------------------------------- + +(1 row) + SELECT * FROM table_second_7; id | value --------------------------------------------------------------------- @@ -126,12 +128,6 @@ SELECT * FROM table_second_7; 400 | a (2 rows) -SELECT pg_sleep(2); - pg_sleep ---------------------------------------------------------------------- - -(1 row) - -- expect data in table_first_5/6 \c - myuser - :worker_2_port SET search_path TO split_shard_replication_setup_schema; @@ -140,12 +136,24 @@ SELECT * FROM table_first_4; --------------------------------------------------------------------- (0 rows) +SELECT wait_for_expected_rowcount_at_table('table_first_5', 1); + wait_for_expected_rowcount_at_table +--------------------------------------------------------------------- + +(1 row) + SELECT * FROM table_first_5; id | value --------------------------------------------------------------------- 400 | a (1 row) +SELECT wait_for_expected_rowcount_at_table('table_first_6', 2); + wait_for_expected_rowcount_at_table +--------------------------------------------------------------------- + +(1 row) + SELECT * FROM table_first_6; id | value --------------------------------------------------------------------- @@ -182,12 +190,6 @@ CREATE SUBSCRIPTION sub2 enabled=true, slot_name=:slot_for_second_owner, copy_data=false); -SELECT pg_sleep(5); - pg_sleep ---------------------------------------------------------------------- - -(1 row) - -- expect data \c - admin_user - :worker_2_port SET search_path TO split_shard_replication_setup_schema; @@ -196,12 +198,24 @@ SELECT * FROM table_second_7; --------------------------------------------------------------------- (0 rows) +SELECT wait_for_expected_rowcount_at_table('table_second_8', 1); + wait_for_expected_rowcount_at_table +--------------------------------------------------------------------- + +(1 row) + SELECT * FROM table_second_8; id | value --------------------------------------------------------------------- 400 | a (1 row) +SELECT wait_for_expected_rowcount_at_table('table_second_9', 1); + wait_for_expected_rowcount_at_table +--------------------------------------------------------------------- + +(1 row) + SELECT * FROM table_second_9; id | value --------------------------------------------------------------------- diff --git a/src/test/regress/expected/split_shard_replication_setup.out b/src/test/regress/expected/split_shard_replication_setup.out index 41d03cb3a..ed7d5d400 100644 --- a/src/test/regress/expected/split_shard_replication_setup.out +++ b/src/test/regress/expected/split_shard_replication_setup.out @@ -5,6 +5,24 @@ SET citus.shard_count TO 1; SET citus.next_shard_id TO 1; SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \gset +CREATE OR REPLACE FUNCTION wait_for_expected_rowcount_at_table(tableName text, expectedCount integer) RETURNS void AS $$ +DECLARE +actualCount integer; +BEGIN + EXECUTE FORMAT('SELECT COUNT(*) FROM %s', tableName) INTO actualCount; + WHILE expectedCount != actualCount LOOP + EXECUTE FORMAT('SELECT COUNT(*) FROM %s', tableName) INTO actualCount; + END LOOP; +END$$ LANGUAGE plpgsql; +CREATE OR REPLACE FUNCTION wait_for_updated_rowcount_at_table(tableName text, expectedCount integer) RETURNS void AS $$ +DECLARE +actualCount integer; +BEGIN + EXECUTE FORMAT($query$SELECT COUNT(*) FROM %s WHERE value='b'$query$, tableName) INTO actualCount; + WHILE expectedCount != actualCount LOOP + EXECUTE FORMAT($query$SELECT COUNT(*) FROM %s WHERE value='b'$query$, tableName) INTO actualCount; + END LOOP; +END$$ LANGUAGE plpgsql; -- Create distributed table (non co-located) CREATE TABLE table_to_split (id bigserial PRIMARY KEY, value char); SELECT create_distributed_table('table_to_split','id'); @@ -36,7 +54,7 @@ SET search_path TO split_shard_replication_setup_schema; CREATE TABLE table_to_split_1(id bigserial PRIMARY KEY, value char); CREATE TABLE table_to_split_2(id bigserial PRIMARY KEY, value char); CREATE TABLE table_to_split_3(id bigserial PRIMARY KEY, value char); --- Create dummy shard tables(table_to_split_2/3) at worker1 +-- Create dummy shard tables(table_to_split_2/3b) at worker1 -- This is needed for Pub/Sub framework to work. \c - - - :worker_1_port SET search_path TO split_shard_replication_setup_schema; @@ -65,12 +83,6 @@ CREATE SUBSCRIPTION sub1 enabled=true, slot_name=:slot_name, copy_data=false); -SELECT pg_sleep(5); - pg_sleep ---------------------------------------------------------------------- - -(1 row) - -- No data is present at this moment in all the below tables at worker2 SELECT * FROM table_to_split_1; id | value @@ -111,12 +123,6 @@ SELECT * FROM table_to_split_3; --------------------------------------------------------------------- (0 rows) -select pg_sleep(2); - pg_sleep ---------------------------------------------------------------------- - -(1 row) - -- Expect data to be present in shard xxxxx and shard xxxxx based on the hash value. \c - - - :worker_2_port SET search_path TO split_shard_replication_setup_schema; @@ -125,12 +131,24 @@ SELECT * FROM table_to_split_1; -- should alwasy have zero rows --------------------------------------------------------------------- (0 rows) +SELECT wait_for_expected_rowcount_at_table('table_to_split_2', 1); + wait_for_expected_rowcount_at_table +--------------------------------------------------------------------- + +(1 row) + SELECT * FROM table_to_split_2; id | value --------------------------------------------------------------------- 400 | a (1 row) +SELECT wait_for_expected_rowcount_at_table('table_to_split_3', 2); + wait_for_expected_rowcount_at_table +--------------------------------------------------------------------- + +(1 row) + SELECT * FROM table_to_split_3; id | value --------------------------------------------------------------------- @@ -144,13 +162,6 @@ SET search_path TO split_shard_replication_setup_schema; UPDATE table_to_split_1 SET value='b' WHERE id = 100; UPDATE table_to_split_1 SET value='b' WHERE id = 400; UPDATE table_to_split_1 SET value='b' WHERE id = 500; -SELECT pg_sleep(2); - pg_sleep ---------------------------------------------------------------------- - -(1 row) - --- Value should be updated in table_to_split_2; \c - - - :worker_2_port SET search_path TO split_shard_replication_setup_schema; SELECT * FROM table_to_split_1; @@ -158,12 +169,26 @@ SELECT * FROM table_to_split_1; --------------------------------------------------------------------- (0 rows) +-- Value should be updated in table_to_split_2; +SELECT wait_for_updated_rowcount_at_table('table_to_split_2', 1); + wait_for_updated_rowcount_at_table +--------------------------------------------------------------------- + +(1 row) + SELECT * FROM table_to_split_2; id | value --------------------------------------------------------------------- 400 | b (1 row) +-- Value should be updated in table_to_split_3; +SELECT wait_for_updated_rowcount_at_table('table_to_split_3', 2); + wait_for_updated_rowcount_at_table +--------------------------------------------------------------------- + +(1 row) + SELECT * FROM table_to_split_3; id | value --------------------------------------------------------------------- @@ -174,25 +199,37 @@ SELECT * FROM table_to_split_3; \c - - - :worker_1_port SET search_path TO split_shard_replication_setup_schema; DELETE FROM table_to_split_1; -SELECT pg_sleep(5); - pg_sleep +-- Child shard rows should be deleted +\c - - - :worker_2_port +SET search_path TO split_shard_replication_setup_schema; +SELECT wait_for_expected_rowcount_at_table('table_to_split_1', 0); + wait_for_expected_rowcount_at_table --------------------------------------------------------------------- (1 row) --- Child shard rows should be deleted -\c - - - :worker_2_port -SET search_path TO split_shard_replication_setup_schema; SELECT * FROM table_to_split_1; id | value --------------------------------------------------------------------- (0 rows) +SELECT wait_for_expected_rowcount_at_table('table_to_split_2', 0); + wait_for_expected_rowcount_at_table +--------------------------------------------------------------------- + +(1 row) + SELECT * FROM table_to_split_2; id | value --------------------------------------------------------------------- (0 rows) +SELECT wait_for_expected_rowcount_at_table('table_to_split_3', 0); + wait_for_expected_rowcount_at_table +--------------------------------------------------------------------- + +(1 row) + SELECT * FROM table_to_split_3; id | value --------------------------------------------------------------------- diff --git a/src/test/regress/expected/split_shard_replication_setup_local.out b/src/test/regress/expected/split_shard_replication_setup_local.out index f873b6cc3..afbc515a2 100644 --- a/src/test/regress/expected/split_shard_replication_setup_local.out +++ b/src/test/regress/expected/split_shard_replication_setup_local.out @@ -31,21 +31,9 @@ CREATE SUBSCRIPTION local_subscription slot_name=:local_slot, copy_data=false); COMMIT; -select pg_sleep(5); - pg_sleep ---------------------------------------------------------------------- - -(1 row) - INSERT INTO table_to_split_1 VALUES(100, 'a'); INSERT INTO table_to_split_1 VALUES(400, 'a'); INSERT INTO table_to_split_1 VALUES(500, 'a'); -select pg_sleep(5); - pg_sleep ---------------------------------------------------------------------- - -(1 row) - -- expect data to present in table_to_split_2/3 on worker1 SELECT * FROM table_to_split_1; id | value @@ -55,12 +43,24 @@ SELECT * FROM table_to_split_1; 500 | a (3 rows) +SELECT wait_for_expected_rowcount_at_table('table_to_split_2', 1); + wait_for_expected_rowcount_at_table +--------------------------------------------------------------------- + +(1 row) + SELECT * FROM table_to_split_2; id | value --------------------------------------------------------------------- 400 | a (1 row) +SELECT wait_for_expected_rowcount_at_table('table_to_split_3', 2); + wait_for_expected_rowcount_at_table +--------------------------------------------------------------------- + +(1 row) + SELECT * FROM table_to_split_3; id | value --------------------------------------------------------------------- @@ -69,8 +69,8 @@ SELECT * FROM table_to_split_3; (2 rows) DELETE FROM table_to_split_1; -SELECT pg_sleep(5); - pg_sleep +SELECT wait_for_expected_rowcount_at_table('table_to_split_1', 0); + wait_for_expected_rowcount_at_table --------------------------------------------------------------------- (1 row) @@ -80,11 +80,23 @@ SELECT * FROM table_to_split_1; --------------------------------------------------------------------- (0 rows) +SELECT wait_for_expected_rowcount_at_table('table_to_split_2', 0); + wait_for_expected_rowcount_at_table +--------------------------------------------------------------------- + +(1 row) + SELECT * FROM table_to_split_2; id | value --------------------------------------------------------------------- (0 rows) +SELECT wait_for_expected_rowcount_at_table('table_to_split_3', 0); + wait_for_expected_rowcount_at_table +--------------------------------------------------------------------- + +(1 row) + SELECT * FROM table_to_split_3; id | value --------------------------------------------------------------------- diff --git a/src/test/regress/expected/split_shard_replication_setup_remote_local.out b/src/test/regress/expected/split_shard_replication_setup_remote_local.out index 56f757d84..6106bf2cb 100644 --- a/src/test/regress/expected/split_shard_replication_setup_remote_local.out +++ b/src/test/regress/expected/split_shard_replication_setup_remote_local.out @@ -29,12 +29,6 @@ CREATE SUBSCRIPTION sub_worker1 enabled=true, slot_name=:slot_for_worker1, copy_data=false); -SELECT pg_sleep(5); - pg_sleep ---------------------------------------------------------------------- - -(1 row) - \c - - - :worker_2_port SET search_path TO split_shard_replication_setup_schema; -- Create subscription at worker2 with copy_data to 'false' and 'slot_for_worker2' @@ -46,12 +40,6 @@ CREATE SUBSCRIPTION sub_worker2 enabled=true, slot_name=:slot_for_worker2, copy_data=false); -SELECT pg_sleep(5); - pg_sleep ---------------------------------------------------------------------- - -(1 row) - -- No data is present at this moment in all the below tables at worker2 SELECT * FROM table_to_split_1; id | value @@ -75,13 +63,6 @@ INSERT INTO table_to_split_1 VALUES(100, 'a'); INSERT INTO table_to_split_1 VALUES(400, 'a'); INSERT INTO table_to_split_1 VALUES(500, 'a'); UPDATE table_to_split_1 SET value='b' WHERE id = 400; -SELECT pg_sleep(5); - pg_sleep ---------------------------------------------------------------------- - -(1 row) - --- expect data to present in table_to_split_2 on worker1 as its destination for value '400' SELECT * FROM table_to_split_1; id | value --------------------------------------------------------------------- @@ -90,6 +71,13 @@ SELECT * FROM table_to_split_1; 400 | b (3 rows) +-- expect data to present in table_to_split_2 on worker1 as its destination for value '400' +SELECT wait_for_expected_rowcount_at_table('table_to_split_2', 1); + wait_for_expected_rowcount_at_table +--------------------------------------------------------------------- + +(1 row) + SELECT * FROM table_to_split_2; id | value --------------------------------------------------------------------- @@ -114,6 +102,12 @@ SELECT * FROM table_to_split_2; --------------------------------------------------------------------- (0 rows) +SELECT wait_for_expected_rowcount_at_table('table_to_split_3', 2); + wait_for_expected_rowcount_at_table +--------------------------------------------------------------------- + +(1 row) + SELECT * FROM table_to_split_3; id | value --------------------------------------------------------------------- @@ -125,13 +119,13 @@ SELECT * FROM table_to_split_3; \c - - - :worker_1_port SET search_path TO split_shard_replication_setup_schema; DELETE FROM table_to_split_1; -SELECT pg_sleep(5); - pg_sleep +-- rows from table_to_split_2 should be deleted +SELECT wait_for_expected_rowcount_at_table('table_to_split_2', 0); + wait_for_expected_rowcount_at_table --------------------------------------------------------------------- (1 row) --- rows from table_to_split_2 should be deleted SELECT * FROM table_to_split_2; id | value --------------------------------------------------------------------- @@ -140,6 +134,12 @@ SELECT * FROM table_to_split_2; -- rows from table_to_split_3 should be deleted \c - - - :worker_2_port SET search_path TO split_shard_replication_setup_schema; +SELECT wait_for_expected_rowcount_at_table('table_to_split_3', 0); + wait_for_expected_rowcount_at_table +--------------------------------------------------------------------- + +(1 row) + SELECT * FROM table_to_split_3; id | value --------------------------------------------------------------------- diff --git a/src/test/regress/sql/split_shard_replication_colocated_setup.sql b/src/test/regress/sql/split_shard_replication_colocated_setup.sql index 6addcd983..a73595f1c 100644 --- a/src/test/regress/sql/split_shard_replication_colocated_setup.sql +++ b/src/test/regress/sql/split_shard_replication_colocated_setup.sql @@ -80,8 +80,6 @@ SELECT slot_name AS slot_for_first_owner FROM pg_create_logical_replication_slot SELECT slot_name AS slot_for_second_owner FROM pg_create_logical_replication_slot(FORMAT('citus_split_%s_%s', :worker_2_node, :table_owner_two), 'citus') \gset -SELECT pg_sleep(5); - -- Create subscription at worker2 with copy_data to 'false' \c - postgres - :worker_2_port SET search_path TO split_shard_replication_setup_schema; @@ -94,27 +92,33 @@ CREATE SUBSCRIPTION sub1 enabled=true, slot_name=:slot_for_first_owner, copy_data=false); -SELECT pg_sleep(5); \c - myuser - :worker_1_port SET search_path TO split_shard_replication_setup_schema; INSERT INTO table_first_4 VALUES(100, 'a'); INSERT INTO table_first_4 VALUES(400, 'a'); INSERT INTO table_first_4 VALUES(500, 'a'); -SELECT pg_sleep(2); + +SELECT wait_for_expected_rowcount_at_table('table_first_4', 3); +SELECT * FROM table_first_4; \c - admin_user - :worker_1_port SET search_path TO split_shard_replication_setup_schema; INSERT INTO table_second_7 VALUES(100, 'a'); INSERT INTO table_second_7 VALUES(400, 'a'); + +SELECT wait_for_expected_rowcount_at_table('table_second_7', 2); SELECT * FROM table_second_7; -SELECT pg_sleep(2); -- expect data in table_first_5/6 \c - myuser - :worker_2_port SET search_path TO split_shard_replication_setup_schema; SELECT * FROM table_first_4; + +SELECT wait_for_expected_rowcount_at_table('table_first_5', 1); SELECT * FROM table_first_5; + +SELECT wait_for_expected_rowcount_at_table('table_first_6', 2); SELECT * FROM table_first_6; -- should have zero rows in all the below tables as the subscription is not yet created for admin_user @@ -135,11 +139,14 @@ CREATE SUBSCRIPTION sub2 enabled=true, slot_name=:slot_for_second_owner, copy_data=false); -SELECT pg_sleep(5); -- expect data \c - admin_user - :worker_2_port SET search_path TO split_shard_replication_setup_schema; SELECT * FROM table_second_7; + +SELECT wait_for_expected_rowcount_at_table('table_second_8', 1); SELECT * FROM table_second_8; + +SELECT wait_for_expected_rowcount_at_table('table_second_9', 1); SELECT * FROM table_second_9; diff --git a/src/test/regress/sql/split_shard_replication_setup.sql b/src/test/regress/sql/split_shard_replication_setup.sql index 48cec6ce0..814b4b8ae 100644 --- a/src/test/regress/sql/split_shard_replication_setup.sql +++ b/src/test/regress/sql/split_shard_replication_setup.sql @@ -7,6 +7,26 @@ SET citus.next_shard_id TO 1; SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \gset +CREATE OR REPLACE FUNCTION wait_for_expected_rowcount_at_table(tableName text, expectedCount integer) RETURNS void AS $$ +DECLARE +actualCount integer; +BEGIN + EXECUTE FORMAT('SELECT COUNT(*) FROM %s', tableName) INTO actualCount; + WHILE expectedCount != actualCount LOOP + EXECUTE FORMAT('SELECT COUNT(*) FROM %s', tableName) INTO actualCount; + END LOOP; +END$$ LANGUAGE plpgsql; + +CREATE OR REPLACE FUNCTION wait_for_updated_rowcount_at_table(tableName text, expectedCount integer) RETURNS void AS $$ +DECLARE +actualCount integer; +BEGIN + EXECUTE FORMAT($query$SELECT COUNT(*) FROM %s WHERE value='b'$query$, tableName) INTO actualCount; + WHILE expectedCount != actualCount LOOP + EXECUTE FORMAT($query$SELECT COUNT(*) FROM %s WHERE value='b'$query$, tableName) INTO actualCount; + END LOOP; +END$$ LANGUAGE plpgsql; + -- Create distributed table (non co-located) CREATE TABLE table_to_split (id bigserial PRIMARY KEY, value char); SELECT create_distributed_table('table_to_split','id'); @@ -36,7 +56,7 @@ CREATE TABLE table_to_split_1(id bigserial PRIMARY KEY, value char); CREATE TABLE table_to_split_2(id bigserial PRIMARY KEY, value char); CREATE TABLE table_to_split_3(id bigserial PRIMARY KEY, value char); --- Create dummy shard tables(table_to_split_2/3) at worker1 +-- Create dummy shard tables(table_to_split_2/3b) at worker1 -- This is needed for Pub/Sub framework to work. \c - - - :worker_1_port SET search_path TO split_shard_replication_setup_schema; @@ -66,8 +86,6 @@ CREATE SUBSCRIPTION sub1 slot_name=:slot_name, copy_data=false); -SELECT pg_sleep(5); - -- No data is present at this moment in all the below tables at worker2 SELECT * FROM table_to_split_1; SELECT * FROM table_to_split_2; @@ -83,13 +101,17 @@ INSERT INTO table_to_split_1 values(500, 'a'); SELECT * FROM table_to_split_1; SELECT * FROM table_to_split_2; SELECT * FROM table_to_split_3; -select pg_sleep(2); + -- Expect data to be present in shard 2 and shard 3 based on the hash value. \c - - - :worker_2_port SET search_path TO split_shard_replication_setup_schema; SELECT * FROM table_to_split_1; -- should alwasy have zero rows + +SELECT wait_for_expected_rowcount_at_table('table_to_split_2', 1); SELECT * FROM table_to_split_2; + +SELECT wait_for_expected_rowcount_at_table('table_to_split_3', 2); SELECT * FROM table_to_split_3; -- UPDATE data of table_to_split_1 from worker1 @@ -99,26 +121,33 @@ UPDATE table_to_split_1 SET value='b' WHERE id = 100; UPDATE table_to_split_1 SET value='b' WHERE id = 400; UPDATE table_to_split_1 SET value='b' WHERE id = 500; -SELECT pg_sleep(2); - --- Value should be updated in table_to_split_2; \c - - - :worker_2_port SET search_path TO split_shard_replication_setup_schema; SELECT * FROM table_to_split_1; + +-- Value should be updated in table_to_split_2; +SELECT wait_for_updated_rowcount_at_table('table_to_split_2', 1); SELECT * FROM table_to_split_2; + +-- Value should be updated in table_to_split_3; +SELECT wait_for_updated_rowcount_at_table('table_to_split_3', 2); SELECT * FROM table_to_split_3; \c - - - :worker_1_port SET search_path TO split_shard_replication_setup_schema; DELETE FROM table_to_split_1; -SELECT pg_sleep(5); - -- Child shard rows should be deleted \c - - - :worker_2_port SET search_path TO split_shard_replication_setup_schema; + +SELECT wait_for_expected_rowcount_at_table('table_to_split_1', 0); SELECT * FROM table_to_split_1; + +SELECT wait_for_expected_rowcount_at_table('table_to_split_2', 0); SELECT * FROM table_to_split_2; + +SELECT wait_for_expected_rowcount_at_table('table_to_split_3', 0); SELECT * FROM table_to_split_3; -- drop publication from worker1 diff --git a/src/test/regress/sql/split_shard_replication_setup_local.sql b/src/test/regress/sql/split_shard_replication_setup_local.sql index 88fb13273..cfabfb150 100644 --- a/src/test/regress/sql/split_shard_replication_setup_local.sql +++ b/src/test/regress/sql/split_shard_replication_setup_local.sql @@ -31,23 +31,29 @@ CREATE SUBSCRIPTION local_subscription slot_name=:local_slot, copy_data=false); COMMIT; -select pg_sleep(5); INSERT INTO table_to_split_1 VALUES(100, 'a'); INSERT INTO table_to_split_1 VALUES(400, 'a'); INSERT INTO table_to_split_1 VALUES(500, 'a'); -select pg_sleep(5); -- expect data to present in table_to_split_2/3 on worker1 SELECT * FROM table_to_split_1; + +SELECT wait_for_expected_rowcount_at_table('table_to_split_2', 1); SELECT * FROM table_to_split_2; + +SELECT wait_for_expected_rowcount_at_table('table_to_split_3', 2); SELECT * FROM table_to_split_3; - DELETE FROM table_to_split_1; -SELECT pg_sleep(5); + +SELECT wait_for_expected_rowcount_at_table('table_to_split_1', 0); SELECT * FROM table_to_split_1; + +SELECT wait_for_expected_rowcount_at_table('table_to_split_2', 0); SELECT * FROM table_to_split_2; + +SELECT wait_for_expected_rowcount_at_table('table_to_split_3', 0); SELECT * FROM table_to_split_3; -- clean up diff --git a/src/test/regress/sql/split_shard_replication_setup_remote_local.sql b/src/test/regress/sql/split_shard_replication_setup_remote_local.sql index 6d5243a94..92fd7ffaf 100644 --- a/src/test/regress/sql/split_shard_replication_setup_remote_local.sql +++ b/src/test/regress/sql/split_shard_replication_setup_remote_local.sql @@ -28,7 +28,6 @@ CREATE SUBSCRIPTION sub_worker1 enabled=true, slot_name=:slot_for_worker1, copy_data=false); -SELECT pg_sleep(5); \c - - - :worker_2_port SET search_path TO split_shard_replication_setup_schema; @@ -42,7 +41,6 @@ CREATE SUBSCRIPTION sub_worker2 enabled=true, slot_name=:slot_for_worker2, copy_data=false); -SELECT pg_sleep(5); -- No data is present at this moment in all the below tables at worker2 SELECT * FROM table_to_split_1; @@ -56,10 +54,10 @@ INSERT INTO table_to_split_1 VALUES(100, 'a'); INSERT INTO table_to_split_1 VALUES(400, 'a'); INSERT INTO table_to_split_1 VALUES(500, 'a'); UPDATE table_to_split_1 SET value='b' WHERE id = 400; -SELECT pg_sleep(5); +SELECT * FROM table_to_split_1; -- expect data to present in table_to_split_2 on worker1 as its destination for value '400' -SELECT * FROM table_to_split_1; +SELECT wait_for_expected_rowcount_at_table('table_to_split_2', 1); SELECT * FROM table_to_split_2; SELECT * FROM table_to_split_3; @@ -68,20 +66,24 @@ SELECT * FROM table_to_split_3; SET search_path TO split_shard_replication_setup_schema; SELECT * FROM table_to_split_1; SELECT * FROM table_to_split_2; + +SELECT wait_for_expected_rowcount_at_table('table_to_split_3', 2); SELECT * FROM table_to_split_3; -- delete all from table_to_split_1 \c - - - :worker_1_port SET search_path TO split_shard_replication_setup_schema; DELETE FROM table_to_split_1; -SELECT pg_sleep(5); -- rows from table_to_split_2 should be deleted +SELECT wait_for_expected_rowcount_at_table('table_to_split_2', 0); SELECT * FROM table_to_split_2; -- rows from table_to_split_3 should be deleted \c - - - :worker_2_port SET search_path TO split_shard_replication_setup_schema; + +SELECT wait_for_expected_rowcount_at_table('table_to_split_3', 0); SELECT * FROM table_to_split_3; \c - - - :worker_2_port