mirror of https://github.com/citusdata/citus.git
Remove pg_sleep and address nits
parent
99a21757d4
commit
6d55154529
|
@ -311,7 +311,7 @@ PopulateShardSplitInfoInSM(ShardSplitInfoSMHeader *shardSplitInfoSMHeader,
|
|||
hash_seq_init(&status, shardInfoHashMap);
|
||||
|
||||
NodeShardMappingEntry *entry = NULL;
|
||||
int index = 0;
|
||||
int splitInfoIndex = 0;
|
||||
while ((entry = (NodeShardMappingEntry *) hash_seq_search(&status)) != NULL)
|
||||
{
|
||||
uint32_t nodeId = entry->key.nodeId;
|
||||
|
@ -323,10 +323,11 @@ PopulateShardSplitInfoInSM(ShardSplitInfoSMHeader *shardSplitInfoSMHeader,
|
|||
ShardSplitInfo *splitShardInfo = NULL;
|
||||
foreach_ptr(splitShardInfo, shardSplitInfoList)
|
||||
{
|
||||
shardSplitInfoSMHeader->splitInfoArray[index] = *splitShardInfo;
|
||||
strcpy_s(shardSplitInfoSMHeader->splitInfoArray[index].slotName, NAMEDATALEN,
|
||||
shardSplitInfoSMHeader->splitInfoArray[splitInfoIndex] = *splitShardInfo;
|
||||
strcpy_s(shardSplitInfoSMHeader->splitInfoArray[splitInfoIndex].slotName,
|
||||
NAMEDATALEN,
|
||||
derivedSlotName);
|
||||
index++;
|
||||
splitInfoIndex++;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -329,30 +329,30 @@ PopulateShardSplitInfoForReplicationSlot(char *slotName)
|
|||
MemoryContext oldContext = MemoryContextSwitchTo(TopMemoryContext);
|
||||
|
||||
ShardSplitInfoForReplicationSlot *infoForReplicationSlot =
|
||||
(ShardSplitInfoForReplicationSlot *) palloc(
|
||||
(ShardSplitInfoForReplicationSlot *) palloc0(
|
||||
sizeof(ShardSplitInfoForReplicationSlot));
|
||||
infoForReplicationSlot->shardSplitInfoHeader = smHeader;
|
||||
infoForReplicationSlot->startIndex = -1;
|
||||
infoForReplicationSlot->endIndex = -1;
|
||||
|
||||
int index = 0;
|
||||
while (index < smHeader->count)
|
||||
int splitInfoIndex = 0;
|
||||
while (splitInfoIndex < smHeader->count)
|
||||
{
|
||||
if (strcmp(smHeader->splitInfoArray[index].slotName, slotName) == 0)
|
||||
if (strcmp(smHeader->splitInfoArray[splitInfoIndex].slotName, slotName) == 0)
|
||||
{
|
||||
/* Found the starting index from where current slot information begins */
|
||||
infoForReplicationSlot->startIndex = index;
|
||||
infoForReplicationSlot->startIndex = splitInfoIndex;
|
||||
|
||||
/* Slide forward to get the ending index */
|
||||
index++;
|
||||
while (index < smHeader->count && strcmp(
|
||||
smHeader->splitInfoArray[index].slotName, slotName) == 0)
|
||||
splitInfoIndex++;
|
||||
while (splitInfoIndex < smHeader->count && strcmp(
|
||||
smHeader->splitInfoArray[splitInfoIndex].slotName, slotName) == 0)
|
||||
{
|
||||
index++;
|
||||
splitInfoIndex++;
|
||||
}
|
||||
|
||||
/* Found ending index */
|
||||
infoForReplicationSlot->endIndex = index - 1;
|
||||
infoForReplicationSlot->endIndex = splitInfoIndex - 1;
|
||||
|
||||
/*
|
||||
* 'ShardSplitInfo' with same slot name are stored contiguously in shared memory segment.
|
||||
|
@ -362,7 +362,7 @@ PopulateShardSplitInfoForReplicationSlot(char *slotName)
|
|||
break;
|
||||
}
|
||||
|
||||
index++;
|
||||
splitInfoIndex++;
|
||||
}
|
||||
|
||||
if (infoForReplicationSlot->startIndex == -1)
|
||||
|
|
|
@ -80,12 +80,6 @@ SELECT relowner AS table_owner_one FROM pg_class WHERE relname='table_first' \gs
|
|||
SELECT relowner AS table_owner_two FROM pg_class WHERE relname='table_second' \gset
|
||||
SELECT slot_name AS slot_for_first_owner FROM pg_create_logical_replication_slot(FORMAT('citus_split_%s_%s', :worker_2_node, :table_owner_one), 'citus') \gset
|
||||
SELECT slot_name AS slot_for_second_owner FROM pg_create_logical_replication_slot(FORMAT('citus_split_%s_%s', :worker_2_node, :table_owner_two), 'citus') \gset
|
||||
SELECT pg_sleep(5);
|
||||
pg_sleep
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- Create subscription at worker2 with copy_data to 'false'
|
||||
\c - postgres - :worker_2_port
|
||||
SET search_path TO split_shard_replication_setup_schema;
|
||||
|
@ -98,27 +92,35 @@ CREATE SUBSCRIPTION sub1
|
|||
enabled=true,
|
||||
slot_name=:slot_for_first_owner,
|
||||
copy_data=false);
|
||||
SELECT pg_sleep(5);
|
||||
pg_sleep
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
\c - myuser - :worker_1_port
|
||||
SET search_path TO split_shard_replication_setup_schema;
|
||||
INSERT INTO table_first_4 VALUES(100, 'a');
|
||||
INSERT INTO table_first_4 VALUES(400, 'a');
|
||||
INSERT INTO table_first_4 VALUES(500, 'a');
|
||||
SELECT pg_sleep(2);
|
||||
pg_sleep
|
||||
SELECT wait_for_expected_rowcount_at_table('table_first_4', 3);
|
||||
wait_for_expected_rowcount_at_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT * FROM table_first_4;
|
||||
id | value
|
||||
---------------------------------------------------------------------
|
||||
100 | a
|
||||
400 | a
|
||||
500 | a
|
||||
(3 rows)
|
||||
|
||||
\c - admin_user - :worker_1_port
|
||||
SET search_path TO split_shard_replication_setup_schema;
|
||||
INSERT INTO table_second_7 VALUES(100, 'a');
|
||||
INSERT INTO table_second_7 VALUES(400, 'a');
|
||||
SELECT wait_for_expected_rowcount_at_table('table_second_7', 2);
|
||||
wait_for_expected_rowcount_at_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT * FROM table_second_7;
|
||||
id | value
|
||||
---------------------------------------------------------------------
|
||||
|
@ -126,12 +128,6 @@ SELECT * FROM table_second_7;
|
|||
400 | a
|
||||
(2 rows)
|
||||
|
||||
SELECT pg_sleep(2);
|
||||
pg_sleep
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- expect data in table_first_5/6
|
||||
\c - myuser - :worker_2_port
|
||||
SET search_path TO split_shard_replication_setup_schema;
|
||||
|
@ -140,12 +136,24 @@ SELECT * FROM table_first_4;
|
|||
---------------------------------------------------------------------
|
||||
(0 rows)
|
||||
|
||||
SELECT wait_for_expected_rowcount_at_table('table_first_5', 1);
|
||||
wait_for_expected_rowcount_at_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT * FROM table_first_5;
|
||||
id | value
|
||||
---------------------------------------------------------------------
|
||||
400 | a
|
||||
(1 row)
|
||||
|
||||
SELECT wait_for_expected_rowcount_at_table('table_first_6', 2);
|
||||
wait_for_expected_rowcount_at_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT * FROM table_first_6;
|
||||
id | value
|
||||
---------------------------------------------------------------------
|
||||
|
@ -182,12 +190,6 @@ CREATE SUBSCRIPTION sub2
|
|||
enabled=true,
|
||||
slot_name=:slot_for_second_owner,
|
||||
copy_data=false);
|
||||
SELECT pg_sleep(5);
|
||||
pg_sleep
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- expect data
|
||||
\c - admin_user - :worker_2_port
|
||||
SET search_path TO split_shard_replication_setup_schema;
|
||||
|
@ -196,12 +198,24 @@ SELECT * FROM table_second_7;
|
|||
---------------------------------------------------------------------
|
||||
(0 rows)
|
||||
|
||||
SELECT wait_for_expected_rowcount_at_table('table_second_8', 1);
|
||||
wait_for_expected_rowcount_at_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT * FROM table_second_8;
|
||||
id | value
|
||||
---------------------------------------------------------------------
|
||||
400 | a
|
||||
(1 row)
|
||||
|
||||
SELECT wait_for_expected_rowcount_at_table('table_second_9', 1);
|
||||
wait_for_expected_rowcount_at_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT * FROM table_second_9;
|
||||
id | value
|
||||
---------------------------------------------------------------------
|
||||
|
|
|
@ -5,6 +5,24 @@ SET citus.shard_count TO 1;
|
|||
SET citus.next_shard_id TO 1;
|
||||
SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset
|
||||
SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \gset
|
||||
CREATE OR REPLACE FUNCTION wait_for_expected_rowcount_at_table(tableName text, expectedCount integer) RETURNS void AS $$
|
||||
DECLARE
|
||||
actualCount integer;
|
||||
BEGIN
|
||||
EXECUTE FORMAT('SELECT COUNT(*) FROM %s', tableName) INTO actualCount;
|
||||
WHILE expectedCount != actualCount LOOP
|
||||
EXECUTE FORMAT('SELECT COUNT(*) FROM %s', tableName) INTO actualCount;
|
||||
END LOOP;
|
||||
END$$ LANGUAGE plpgsql;
|
||||
CREATE OR REPLACE FUNCTION wait_for_updated_rowcount_at_table(tableName text, expectedCount integer) RETURNS void AS $$
|
||||
DECLARE
|
||||
actualCount integer;
|
||||
BEGIN
|
||||
EXECUTE FORMAT($query$SELECT COUNT(*) FROM %s WHERE value='b'$query$, tableName) INTO actualCount;
|
||||
WHILE expectedCount != actualCount LOOP
|
||||
EXECUTE FORMAT($query$SELECT COUNT(*) FROM %s WHERE value='b'$query$, tableName) INTO actualCount;
|
||||
END LOOP;
|
||||
END$$ LANGUAGE plpgsql;
|
||||
-- Create distributed table (non co-located)
|
||||
CREATE TABLE table_to_split (id bigserial PRIMARY KEY, value char);
|
||||
SELECT create_distributed_table('table_to_split','id');
|
||||
|
@ -36,7 +54,7 @@ SET search_path TO split_shard_replication_setup_schema;
|
|||
CREATE TABLE table_to_split_1(id bigserial PRIMARY KEY, value char);
|
||||
CREATE TABLE table_to_split_2(id bigserial PRIMARY KEY, value char);
|
||||
CREATE TABLE table_to_split_3(id bigserial PRIMARY KEY, value char);
|
||||
-- Create dummy shard tables(table_to_split_2/3) at worker1
|
||||
-- Create dummy shard tables(table_to_split_2/3b) at worker1
|
||||
-- This is needed for Pub/Sub framework to work.
|
||||
\c - - - :worker_1_port
|
||||
SET search_path TO split_shard_replication_setup_schema;
|
||||
|
@ -65,12 +83,6 @@ CREATE SUBSCRIPTION sub1
|
|||
enabled=true,
|
||||
slot_name=:slot_name,
|
||||
copy_data=false);
|
||||
SELECT pg_sleep(5);
|
||||
pg_sleep
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- No data is present at this moment in all the below tables at worker2
|
||||
SELECT * FROM table_to_split_1;
|
||||
id | value
|
||||
|
@ -111,12 +123,6 @@ SELECT * FROM table_to_split_3;
|
|||
---------------------------------------------------------------------
|
||||
(0 rows)
|
||||
|
||||
select pg_sleep(2);
|
||||
pg_sleep
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- Expect data to be present in shard xxxxx and shard xxxxx based on the hash value.
|
||||
\c - - - :worker_2_port
|
||||
SET search_path TO split_shard_replication_setup_schema;
|
||||
|
@ -125,12 +131,24 @@ SELECT * FROM table_to_split_1; -- should alwasy have zero rows
|
|||
---------------------------------------------------------------------
|
||||
(0 rows)
|
||||
|
||||
SELECT wait_for_expected_rowcount_at_table('table_to_split_2', 1);
|
||||
wait_for_expected_rowcount_at_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT * FROM table_to_split_2;
|
||||
id | value
|
||||
---------------------------------------------------------------------
|
||||
400 | a
|
||||
(1 row)
|
||||
|
||||
SELECT wait_for_expected_rowcount_at_table('table_to_split_3', 2);
|
||||
wait_for_expected_rowcount_at_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT * FROM table_to_split_3;
|
||||
id | value
|
||||
---------------------------------------------------------------------
|
||||
|
@ -144,13 +162,6 @@ SET search_path TO split_shard_replication_setup_schema;
|
|||
UPDATE table_to_split_1 SET value='b' WHERE id = 100;
|
||||
UPDATE table_to_split_1 SET value='b' WHERE id = 400;
|
||||
UPDATE table_to_split_1 SET value='b' WHERE id = 500;
|
||||
SELECT pg_sleep(2);
|
||||
pg_sleep
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- Value should be updated in table_to_split_2;
|
||||
\c - - - :worker_2_port
|
||||
SET search_path TO split_shard_replication_setup_schema;
|
||||
SELECT * FROM table_to_split_1;
|
||||
|
@ -158,12 +169,26 @@ SELECT * FROM table_to_split_1;
|
|||
---------------------------------------------------------------------
|
||||
(0 rows)
|
||||
|
||||
-- Value should be updated in table_to_split_2;
|
||||
SELECT wait_for_updated_rowcount_at_table('table_to_split_2', 1);
|
||||
wait_for_updated_rowcount_at_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT * FROM table_to_split_2;
|
||||
id | value
|
||||
---------------------------------------------------------------------
|
||||
400 | b
|
||||
(1 row)
|
||||
|
||||
-- Value should be updated in table_to_split_3;
|
||||
SELECT wait_for_updated_rowcount_at_table('table_to_split_3', 2);
|
||||
wait_for_updated_rowcount_at_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT * FROM table_to_split_3;
|
||||
id | value
|
||||
---------------------------------------------------------------------
|
||||
|
@ -174,25 +199,37 @@ SELECT * FROM table_to_split_3;
|
|||
\c - - - :worker_1_port
|
||||
SET search_path TO split_shard_replication_setup_schema;
|
||||
DELETE FROM table_to_split_1;
|
||||
SELECT pg_sleep(5);
|
||||
pg_sleep
|
||||
-- Child shard rows should be deleted
|
||||
\c - - - :worker_2_port
|
||||
SET search_path TO split_shard_replication_setup_schema;
|
||||
SELECT wait_for_expected_rowcount_at_table('table_to_split_1', 0);
|
||||
wait_for_expected_rowcount_at_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- Child shard rows should be deleted
|
||||
\c - - - :worker_2_port
|
||||
SET search_path TO split_shard_replication_setup_schema;
|
||||
SELECT * FROM table_to_split_1;
|
||||
id | value
|
||||
---------------------------------------------------------------------
|
||||
(0 rows)
|
||||
|
||||
SELECT wait_for_expected_rowcount_at_table('table_to_split_2', 0);
|
||||
wait_for_expected_rowcount_at_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT * FROM table_to_split_2;
|
||||
id | value
|
||||
---------------------------------------------------------------------
|
||||
(0 rows)
|
||||
|
||||
SELECT wait_for_expected_rowcount_at_table('table_to_split_3', 0);
|
||||
wait_for_expected_rowcount_at_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT * FROM table_to_split_3;
|
||||
id | value
|
||||
---------------------------------------------------------------------
|
||||
|
|
|
@ -31,21 +31,9 @@ CREATE SUBSCRIPTION local_subscription
|
|||
slot_name=:local_slot,
|
||||
copy_data=false);
|
||||
COMMIT;
|
||||
select pg_sleep(5);
|
||||
pg_sleep
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
INSERT INTO table_to_split_1 VALUES(100, 'a');
|
||||
INSERT INTO table_to_split_1 VALUES(400, 'a');
|
||||
INSERT INTO table_to_split_1 VALUES(500, 'a');
|
||||
select pg_sleep(5);
|
||||
pg_sleep
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- expect data to present in table_to_split_2/3 on worker1
|
||||
SELECT * FROM table_to_split_1;
|
||||
id | value
|
||||
|
@ -55,12 +43,24 @@ SELECT * FROM table_to_split_1;
|
|||
500 | a
|
||||
(3 rows)
|
||||
|
||||
SELECT wait_for_expected_rowcount_at_table('table_to_split_2', 1);
|
||||
wait_for_expected_rowcount_at_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT * FROM table_to_split_2;
|
||||
id | value
|
||||
---------------------------------------------------------------------
|
||||
400 | a
|
||||
(1 row)
|
||||
|
||||
SELECT wait_for_expected_rowcount_at_table('table_to_split_3', 2);
|
||||
wait_for_expected_rowcount_at_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT * FROM table_to_split_3;
|
||||
id | value
|
||||
---------------------------------------------------------------------
|
||||
|
@ -69,8 +69,8 @@ SELECT * FROM table_to_split_3;
|
|||
(2 rows)
|
||||
|
||||
DELETE FROM table_to_split_1;
|
||||
SELECT pg_sleep(5);
|
||||
pg_sleep
|
||||
SELECT wait_for_expected_rowcount_at_table('table_to_split_1', 0);
|
||||
wait_for_expected_rowcount_at_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
@ -80,11 +80,23 @@ SELECT * FROM table_to_split_1;
|
|||
---------------------------------------------------------------------
|
||||
(0 rows)
|
||||
|
||||
SELECT wait_for_expected_rowcount_at_table('table_to_split_2', 0);
|
||||
wait_for_expected_rowcount_at_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT * FROM table_to_split_2;
|
||||
id | value
|
||||
---------------------------------------------------------------------
|
||||
(0 rows)
|
||||
|
||||
SELECT wait_for_expected_rowcount_at_table('table_to_split_3', 0);
|
||||
wait_for_expected_rowcount_at_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT * FROM table_to_split_3;
|
||||
id | value
|
||||
---------------------------------------------------------------------
|
||||
|
|
|
@ -29,12 +29,6 @@ CREATE SUBSCRIPTION sub_worker1
|
|||
enabled=true,
|
||||
slot_name=:slot_for_worker1,
|
||||
copy_data=false);
|
||||
SELECT pg_sleep(5);
|
||||
pg_sleep
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
\c - - - :worker_2_port
|
||||
SET search_path TO split_shard_replication_setup_schema;
|
||||
-- Create subscription at worker2 with copy_data to 'false' and 'slot_for_worker2'
|
||||
|
@ -46,12 +40,6 @@ CREATE SUBSCRIPTION sub_worker2
|
|||
enabled=true,
|
||||
slot_name=:slot_for_worker2,
|
||||
copy_data=false);
|
||||
SELECT pg_sleep(5);
|
||||
pg_sleep
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- No data is present at this moment in all the below tables at worker2
|
||||
SELECT * FROM table_to_split_1;
|
||||
id | value
|
||||
|
@ -75,13 +63,6 @@ INSERT INTO table_to_split_1 VALUES(100, 'a');
|
|||
INSERT INTO table_to_split_1 VALUES(400, 'a');
|
||||
INSERT INTO table_to_split_1 VALUES(500, 'a');
|
||||
UPDATE table_to_split_1 SET value='b' WHERE id = 400;
|
||||
SELECT pg_sleep(5);
|
||||
pg_sleep
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- expect data to present in table_to_split_2 on worker1 as its destination for value '400'
|
||||
SELECT * FROM table_to_split_1;
|
||||
id | value
|
||||
---------------------------------------------------------------------
|
||||
|
@ -90,6 +71,13 @@ SELECT * FROM table_to_split_1;
|
|||
400 | b
|
||||
(3 rows)
|
||||
|
||||
-- expect data to present in table_to_split_2 on worker1 as its destination for value '400'
|
||||
SELECT wait_for_expected_rowcount_at_table('table_to_split_2', 1);
|
||||
wait_for_expected_rowcount_at_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT * FROM table_to_split_2;
|
||||
id | value
|
||||
---------------------------------------------------------------------
|
||||
|
@ -114,6 +102,12 @@ SELECT * FROM table_to_split_2;
|
|||
---------------------------------------------------------------------
|
||||
(0 rows)
|
||||
|
||||
SELECT wait_for_expected_rowcount_at_table('table_to_split_3', 2);
|
||||
wait_for_expected_rowcount_at_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT * FROM table_to_split_3;
|
||||
id | value
|
||||
---------------------------------------------------------------------
|
||||
|
@ -125,13 +119,13 @@ SELECT * FROM table_to_split_3;
|
|||
\c - - - :worker_1_port
|
||||
SET search_path TO split_shard_replication_setup_schema;
|
||||
DELETE FROM table_to_split_1;
|
||||
SELECT pg_sleep(5);
|
||||
pg_sleep
|
||||
-- rows from table_to_split_2 should be deleted
|
||||
SELECT wait_for_expected_rowcount_at_table('table_to_split_2', 0);
|
||||
wait_for_expected_rowcount_at_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- rows from table_to_split_2 should be deleted
|
||||
SELECT * FROM table_to_split_2;
|
||||
id | value
|
||||
---------------------------------------------------------------------
|
||||
|
@ -140,6 +134,12 @@ SELECT * FROM table_to_split_2;
|
|||
-- rows from table_to_split_3 should be deleted
|
||||
\c - - - :worker_2_port
|
||||
SET search_path TO split_shard_replication_setup_schema;
|
||||
SELECT wait_for_expected_rowcount_at_table('table_to_split_3', 0);
|
||||
wait_for_expected_rowcount_at_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT * FROM table_to_split_3;
|
||||
id | value
|
||||
---------------------------------------------------------------------
|
||||
|
|
|
@ -80,8 +80,6 @@ SELECT slot_name AS slot_for_first_owner FROM pg_create_logical_replication_slot
|
|||
|
||||
SELECT slot_name AS slot_for_second_owner FROM pg_create_logical_replication_slot(FORMAT('citus_split_%s_%s', :worker_2_node, :table_owner_two), 'citus') \gset
|
||||
|
||||
SELECT pg_sleep(5);
|
||||
|
||||
-- Create subscription at worker2 with copy_data to 'false'
|
||||
\c - postgres - :worker_2_port
|
||||
SET search_path TO split_shard_replication_setup_schema;
|
||||
|
@ -94,27 +92,33 @@ CREATE SUBSCRIPTION sub1
|
|||
enabled=true,
|
||||
slot_name=:slot_for_first_owner,
|
||||
copy_data=false);
|
||||
SELECT pg_sleep(5);
|
||||
|
||||
\c - myuser - :worker_1_port
|
||||
SET search_path TO split_shard_replication_setup_schema;
|
||||
INSERT INTO table_first_4 VALUES(100, 'a');
|
||||
INSERT INTO table_first_4 VALUES(400, 'a');
|
||||
INSERT INTO table_first_4 VALUES(500, 'a');
|
||||
SELECT pg_sleep(2);
|
||||
|
||||
SELECT wait_for_expected_rowcount_at_table('table_first_4', 3);
|
||||
SELECT * FROM table_first_4;
|
||||
|
||||
\c - admin_user - :worker_1_port
|
||||
SET search_path TO split_shard_replication_setup_schema;
|
||||
INSERT INTO table_second_7 VALUES(100, 'a');
|
||||
INSERT INTO table_second_7 VALUES(400, 'a');
|
||||
|
||||
SELECT wait_for_expected_rowcount_at_table('table_second_7', 2);
|
||||
SELECT * FROM table_second_7;
|
||||
SELECT pg_sleep(2);
|
||||
|
||||
-- expect data in table_first_5/6
|
||||
\c - myuser - :worker_2_port
|
||||
SET search_path TO split_shard_replication_setup_schema;
|
||||
SELECT * FROM table_first_4;
|
||||
|
||||
SELECT wait_for_expected_rowcount_at_table('table_first_5', 1);
|
||||
SELECT * FROM table_first_5;
|
||||
|
||||
SELECT wait_for_expected_rowcount_at_table('table_first_6', 2);
|
||||
SELECT * FROM table_first_6;
|
||||
|
||||
-- should have zero rows in all the below tables as the subscription is not yet created for admin_user
|
||||
|
@ -135,11 +139,14 @@ CREATE SUBSCRIPTION sub2
|
|||
enabled=true,
|
||||
slot_name=:slot_for_second_owner,
|
||||
copy_data=false);
|
||||
SELECT pg_sleep(5);
|
||||
|
||||
-- expect data
|
||||
\c - admin_user - :worker_2_port
|
||||
SET search_path TO split_shard_replication_setup_schema;
|
||||
SELECT * FROM table_second_7;
|
||||
|
||||
SELECT wait_for_expected_rowcount_at_table('table_second_8', 1);
|
||||
SELECT * FROM table_second_8;
|
||||
|
||||
SELECT wait_for_expected_rowcount_at_table('table_second_9', 1);
|
||||
SELECT * FROM table_second_9;
|
||||
|
|
|
@ -7,6 +7,26 @@ SET citus.next_shard_id TO 1;
|
|||
SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset
|
||||
SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \gset
|
||||
|
||||
CREATE OR REPLACE FUNCTION wait_for_expected_rowcount_at_table(tableName text, expectedCount integer) RETURNS void AS $$
|
||||
DECLARE
|
||||
actualCount integer;
|
||||
BEGIN
|
||||
EXECUTE FORMAT('SELECT COUNT(*) FROM %s', tableName) INTO actualCount;
|
||||
WHILE expectedCount != actualCount LOOP
|
||||
EXECUTE FORMAT('SELECT COUNT(*) FROM %s', tableName) INTO actualCount;
|
||||
END LOOP;
|
||||
END$$ LANGUAGE plpgsql;
|
||||
|
||||
CREATE OR REPLACE FUNCTION wait_for_updated_rowcount_at_table(tableName text, expectedCount integer) RETURNS void AS $$
|
||||
DECLARE
|
||||
actualCount integer;
|
||||
BEGIN
|
||||
EXECUTE FORMAT($query$SELECT COUNT(*) FROM %s WHERE value='b'$query$, tableName) INTO actualCount;
|
||||
WHILE expectedCount != actualCount LOOP
|
||||
EXECUTE FORMAT($query$SELECT COUNT(*) FROM %s WHERE value='b'$query$, tableName) INTO actualCount;
|
||||
END LOOP;
|
||||
END$$ LANGUAGE plpgsql;
|
||||
|
||||
-- Create distributed table (non co-located)
|
||||
CREATE TABLE table_to_split (id bigserial PRIMARY KEY, value char);
|
||||
SELECT create_distributed_table('table_to_split','id');
|
||||
|
@ -36,7 +56,7 @@ CREATE TABLE table_to_split_1(id bigserial PRIMARY KEY, value char);
|
|||
CREATE TABLE table_to_split_2(id bigserial PRIMARY KEY, value char);
|
||||
CREATE TABLE table_to_split_3(id bigserial PRIMARY KEY, value char);
|
||||
|
||||
-- Create dummy shard tables(table_to_split_2/3) at worker1
|
||||
-- Create dummy shard tables(table_to_split_2/3b) at worker1
|
||||
-- This is needed for Pub/Sub framework to work.
|
||||
\c - - - :worker_1_port
|
||||
SET search_path TO split_shard_replication_setup_schema;
|
||||
|
@ -66,8 +86,6 @@ CREATE SUBSCRIPTION sub1
|
|||
slot_name=:slot_name,
|
||||
copy_data=false);
|
||||
|
||||
SELECT pg_sleep(5);
|
||||
|
||||
-- No data is present at this moment in all the below tables at worker2
|
||||
SELECT * FROM table_to_split_1;
|
||||
SELECT * FROM table_to_split_2;
|
||||
|
@ -83,13 +101,17 @@ INSERT INTO table_to_split_1 values(500, 'a');
|
|||
SELECT * FROM table_to_split_1;
|
||||
SELECT * FROM table_to_split_2;
|
||||
SELECT * FROM table_to_split_3;
|
||||
select pg_sleep(2);
|
||||
|
||||
|
||||
-- Expect data to be present in shard 2 and shard 3 based on the hash value.
|
||||
\c - - - :worker_2_port
|
||||
SET search_path TO split_shard_replication_setup_schema;
|
||||
SELECT * FROM table_to_split_1; -- should alwasy have zero rows
|
||||
|
||||
SELECT wait_for_expected_rowcount_at_table('table_to_split_2', 1);
|
||||
SELECT * FROM table_to_split_2;
|
||||
|
||||
SELECT wait_for_expected_rowcount_at_table('table_to_split_3', 2);
|
||||
SELECT * FROM table_to_split_3;
|
||||
|
||||
-- UPDATE data of table_to_split_1 from worker1
|
||||
|
@ -99,26 +121,33 @@ UPDATE table_to_split_1 SET value='b' WHERE id = 100;
|
|||
UPDATE table_to_split_1 SET value='b' WHERE id = 400;
|
||||
UPDATE table_to_split_1 SET value='b' WHERE id = 500;
|
||||
|
||||
SELECT pg_sleep(2);
|
||||
|
||||
-- Value should be updated in table_to_split_2;
|
||||
\c - - - :worker_2_port
|
||||
SET search_path TO split_shard_replication_setup_schema;
|
||||
SELECT * FROM table_to_split_1;
|
||||
|
||||
-- Value should be updated in table_to_split_2;
|
||||
SELECT wait_for_updated_rowcount_at_table('table_to_split_2', 1);
|
||||
SELECT * FROM table_to_split_2;
|
||||
|
||||
-- Value should be updated in table_to_split_3;
|
||||
SELECT wait_for_updated_rowcount_at_table('table_to_split_3', 2);
|
||||
SELECT * FROM table_to_split_3;
|
||||
|
||||
\c - - - :worker_1_port
|
||||
SET search_path TO split_shard_replication_setup_schema;
|
||||
DELETE FROM table_to_split_1;
|
||||
|
||||
SELECT pg_sleep(5);
|
||||
|
||||
-- Child shard rows should be deleted
|
||||
\c - - - :worker_2_port
|
||||
SET search_path TO split_shard_replication_setup_schema;
|
||||
|
||||
SELECT wait_for_expected_rowcount_at_table('table_to_split_1', 0);
|
||||
SELECT * FROM table_to_split_1;
|
||||
|
||||
SELECT wait_for_expected_rowcount_at_table('table_to_split_2', 0);
|
||||
SELECT * FROM table_to_split_2;
|
||||
|
||||
SELECT wait_for_expected_rowcount_at_table('table_to_split_3', 0);
|
||||
SELECT * FROM table_to_split_3;
|
||||
|
||||
-- drop publication from worker1
|
||||
|
|
|
@ -31,23 +31,29 @@ CREATE SUBSCRIPTION local_subscription
|
|||
slot_name=:local_slot,
|
||||
copy_data=false);
|
||||
COMMIT;
|
||||
select pg_sleep(5);
|
||||
|
||||
INSERT INTO table_to_split_1 VALUES(100, 'a');
|
||||
INSERT INTO table_to_split_1 VALUES(400, 'a');
|
||||
INSERT INTO table_to_split_1 VALUES(500, 'a');
|
||||
select pg_sleep(5);
|
||||
|
||||
-- expect data to present in table_to_split_2/3 on worker1
|
||||
SELECT * FROM table_to_split_1;
|
||||
|
||||
SELECT wait_for_expected_rowcount_at_table('table_to_split_2', 1);
|
||||
SELECT * FROM table_to_split_2;
|
||||
|
||||
SELECT wait_for_expected_rowcount_at_table('table_to_split_3', 2);
|
||||
SELECT * FROM table_to_split_3;
|
||||
|
||||
|
||||
DELETE FROM table_to_split_1;
|
||||
SELECT pg_sleep(5);
|
||||
|
||||
SELECT wait_for_expected_rowcount_at_table('table_to_split_1', 0);
|
||||
SELECT * FROM table_to_split_1;
|
||||
|
||||
SELECT wait_for_expected_rowcount_at_table('table_to_split_2', 0);
|
||||
SELECT * FROM table_to_split_2;
|
||||
|
||||
SELECT wait_for_expected_rowcount_at_table('table_to_split_3', 0);
|
||||
SELECT * FROM table_to_split_3;
|
||||
|
||||
-- clean up
|
||||
|
|
|
@ -28,7 +28,6 @@ CREATE SUBSCRIPTION sub_worker1
|
|||
enabled=true,
|
||||
slot_name=:slot_for_worker1,
|
||||
copy_data=false);
|
||||
SELECT pg_sleep(5);
|
||||
|
||||
\c - - - :worker_2_port
|
||||
SET search_path TO split_shard_replication_setup_schema;
|
||||
|
@ -42,7 +41,6 @@ CREATE SUBSCRIPTION sub_worker2
|
|||
enabled=true,
|
||||
slot_name=:slot_for_worker2,
|
||||
copy_data=false);
|
||||
SELECT pg_sleep(5);
|
||||
|
||||
-- No data is present at this moment in all the below tables at worker2
|
||||
SELECT * FROM table_to_split_1;
|
||||
|
@ -56,10 +54,10 @@ INSERT INTO table_to_split_1 VALUES(100, 'a');
|
|||
INSERT INTO table_to_split_1 VALUES(400, 'a');
|
||||
INSERT INTO table_to_split_1 VALUES(500, 'a');
|
||||
UPDATE table_to_split_1 SET value='b' WHERE id = 400;
|
||||
SELECT pg_sleep(5);
|
||||
SELECT * FROM table_to_split_1;
|
||||
|
||||
-- expect data to present in table_to_split_2 on worker1 as its destination for value '400'
|
||||
SELECT * FROM table_to_split_1;
|
||||
SELECT wait_for_expected_rowcount_at_table('table_to_split_2', 1);
|
||||
SELECT * FROM table_to_split_2;
|
||||
SELECT * FROM table_to_split_3;
|
||||
|
||||
|
@ -68,20 +66,24 @@ SELECT * FROM table_to_split_3;
|
|||
SET search_path TO split_shard_replication_setup_schema;
|
||||
SELECT * FROM table_to_split_1;
|
||||
SELECT * FROM table_to_split_2;
|
||||
|
||||
SELECT wait_for_expected_rowcount_at_table('table_to_split_3', 2);
|
||||
SELECT * FROM table_to_split_3;
|
||||
|
||||
-- delete all from table_to_split_1
|
||||
\c - - - :worker_1_port
|
||||
SET search_path TO split_shard_replication_setup_schema;
|
||||
DELETE FROM table_to_split_1;
|
||||
SELECT pg_sleep(5);
|
||||
|
||||
-- rows from table_to_split_2 should be deleted
|
||||
SELECT wait_for_expected_rowcount_at_table('table_to_split_2', 0);
|
||||
SELECT * FROM table_to_split_2;
|
||||
|
||||
-- rows from table_to_split_3 should be deleted
|
||||
\c - - - :worker_2_port
|
||||
SET search_path TO split_shard_replication_setup_schema;
|
||||
|
||||
SELECT wait_for_expected_rowcount_at_table('table_to_split_3', 0);
|
||||
SELECT * FROM table_to_split_3;
|
||||
|
||||
\c - - - :worker_2_port
|
||||
|
|
Loading…
Reference in New Issue