mirror of https://github.com/citusdata/citus.git
Refactored testcase to handle split shard locally
parent
c304b5f8a7
commit
7a61bf1082
|
@ -16,6 +16,8 @@ SELECT 1 FROM citus_add_node('localhost', :worker_2_port);
|
||||||
1
|
1
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
|
SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset
|
||||||
|
SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \gset
|
||||||
-- Create distributed table (non co-located)
|
-- Create distributed table (non co-located)
|
||||||
CREATE TABLE table_to_split (id bigserial PRIMARY KEY, value char);
|
CREATE TABLE table_to_split (id bigserial PRIMARY KEY, value char);
|
||||||
SELECT create_distributed_table('table_to_split','id');
|
SELECT create_distributed_table('table_to_split','id');
|
||||||
|
@ -26,56 +28,67 @@ SELECT create_distributed_table('table_to_split','id');
|
||||||
|
|
||||||
-- slotName_table is used to persist replication slot name.
|
-- slotName_table is used to persist replication slot name.
|
||||||
-- It is only used for testing as the worker2 needs to create subscription over the same replication slot.
|
-- It is only used for testing as the worker2 needs to create subscription over the same replication slot.
|
||||||
CREATE TABLE slotName_table (name text, id int primary key);
|
CREATE TABLE slotName_table (name text, nodeId int, id int primary key);
|
||||||
SELECT create_distributed_table('slotName_table','id');
|
SELECT create_distributed_table('slotName_table','id');
|
||||||
create_distributed_table
|
create_distributed_table
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
-- Shard with id '1' of table table_to_split is undergoing a split into two new shards
|
-- targetNode1, targetNode2 are the locations where childShard1 and childShard2 are placed respectively
|
||||||
-- with id '2' and '3' respectively. table_to_split_1 is placed on worker1(NodeId 16) and
|
CREATE OR REPLACE FUNCTION SplitShardReplicationSetup(targetNode1 integer, targetNode2 integer) RETURNS text AS $$
|
||||||
-- new child shards, table_to_split_2 and table_to_split_3 are placed on worker2(NodeId 18).
|
|
||||||
-- TODO(saawasek): make it parameterized
|
|
||||||
CREATE OR REPLACE FUNCTION SplitShardReplicationSetup() RETURNS text AS $$
|
|
||||||
DECLARE
|
DECLARE
|
||||||
memoryId bigint := 0;
|
memoryId bigint := 0;
|
||||||
memoryIdText text;
|
memoryIdText text;
|
||||||
begin
|
begin
|
||||||
SELECT * into memoryId from split_shard_replication_setup(ARRAY[ARRAY[1,2,-2147483648,-1,18], ARRAY[1,3,0,2147483647,18]]);
|
SELECT * into memoryId from split_shard_replication_setup(ARRAY[ARRAY[1,2,-2147483648,-1, targetNode1], ARRAY[1,3,0,2147483647,targetNode2]]);
|
||||||
SELECT FORMAT('%s', memoryId) into memoryIdText;
|
SELECT FORMAT('%s', memoryId) into memoryIdText;
|
||||||
return memoryIdText;
|
return memoryIdText;
|
||||||
end
|
end
|
||||||
$$ LANGUAGE plpgsql;
|
$$ LANGUAGE plpgsql;
|
||||||
-- Sets up split shard information and returns Slot Name in format : DestinationNodeId_SlotType_SharedMemoryId
|
-- Create replication slots for targetNode1 and targetNode2
|
||||||
-- TODO(saawasek): make it parameterized
|
CREATE OR REPLACE FUNCTION CreateReplicationSlot(targetNode1 integer, targetNode2 integer) RETURNS text AS $$
|
||||||
CREATE OR REPLACE FUNCTION CreateReplicationSlot() RETURNS text AS $$
|
|
||||||
DECLARE
|
DECLARE
|
||||||
replicationSlotName text;
|
targetOneSlotName text;
|
||||||
createdSlotName text;
|
targetTwoSlotName text;
|
||||||
sharedMemoryId text;
|
sharedMemoryId text;
|
||||||
derivedSlotName text;
|
derivedSlotName text;
|
||||||
begin
|
begin
|
||||||
SELECT * into sharedMemoryId from SplitShardReplicationSetup();
|
|
||||||
-- '18' is nodeId of worker2
|
SELECT * into sharedMemoryId from SplitShardReplicationSetup(targetNode1, targetNode2);
|
||||||
SELECT FORMAT('18_0_%s', sharedMemoryId) into derivedSlotName;
|
SELECT FORMAT('%s_%s', targetNode1, sharedMemoryId) into derivedSlotName;
|
||||||
SELECT slot_name into replicationSlotName from pg_create_logical_replication_slot(derivedSlotName, 'logical_decoding_plugin');
|
SELECT slot_name into targetOneSlotName from pg_create_logical_replication_slot(derivedSlotName, 'logical_decoding_plugin');
|
||||||
INSERT INTO slotName_table values(replicationSlotName, 1);
|
|
||||||
return replicationSlotName;
|
-- if new child shards are placed on different nodes, create one more replication slot
|
||||||
|
if (targetNode1 != targetNode2) then
|
||||||
|
SELECT FORMAT('%s_%s', targetNode2, sharedMemoryId) into derivedSlotName;
|
||||||
|
SELECT slot_name into targetTwoSlotName from pg_create_logical_replication_slot(derivedSlotName, 'logical_decoding_plugin');
|
||||||
|
INSERT INTO slotName_table values(targetTwoSlotName, targetNode2, 1);
|
||||||
|
end if;
|
||||||
|
|
||||||
|
INSERT INTO slotName_table values(targetOneSlotName, targetNode1, 2);
|
||||||
|
return targetOneSlotName;
|
||||||
end
|
end
|
||||||
$$ LANGUAGE plpgsql;
|
$$ LANGUAGE plpgsql;
|
||||||
CREATE OR REPLACE FUNCTION CreateSubscription() RETURNS text AS $$
|
CREATE OR REPLACE FUNCTION CreateSubscription(targetNodeId integer, subscriptionName text) RETURNS text AS $$
|
||||||
DECLARE
|
DECLARE
|
||||||
replicationSlotName text;
|
replicationSlotName text;
|
||||||
nodeportLocal int;
|
nodeportLocal int;
|
||||||
subname text;
|
subname text;
|
||||||
begin
|
begin
|
||||||
SELECT name into replicationSlotName from slotName_table;
|
SELECT name into replicationSlotName from slotName_table where nodeId = targetNodeId;
|
||||||
EXECUTE FORMAT($sub$create subscription subforID1 connection 'host=localhost port=xxxxx user=postgres dbname=regression' publication PUB1 with(create_slot=false, enabled=true, slot_name='%s', copy_data=false)$sub$, replicationSlotName);
|
EXECUTE FORMAT($sub$create subscription %s connection 'host=localhost port=xxxxx user=postgres dbname=regression' publication PUB1 with(create_slot=false, enabled=true, slot_name='%s', copy_data=false)$sub$, subscriptionName, replicationSlotName);
|
||||||
return 'a';
|
return 'a';
|
||||||
end
|
end
|
||||||
$$ LANGUAGE plpgsql;
|
$$ LANGUAGE plpgsql;
|
||||||
-- Test scenario starts from here
|
CREATE OR REPLACE FUNCTION DropSubscription(subscriptionName text) RETURNS text AS $$
|
||||||
|
DECLARE
|
||||||
|
begin
|
||||||
|
EXECUTE FORMAT('DROP SUBSCRIPTION %s', subscriptionName);
|
||||||
|
return subscriptionName;
|
||||||
|
end
|
||||||
|
$$ LANGUAGE plpgsql;
|
||||||
|
-- Test scenario one starts from here
|
||||||
-- 1. table_to_split is a citus distributed table
|
-- 1. table_to_split is a citus distributed table
|
||||||
-- 2. Shard table_to_split_1 is located on worker1.
|
-- 2. Shard table_to_split_1 is located on worker1.
|
||||||
-- 3. table_to_split_1 is split into table_to_split_2 and table_to_split_3.
|
-- 3. table_to_split_1 is split into table_to_split_2 and table_to_split_3.
|
||||||
|
@ -112,7 +125,7 @@ BEGIN;
|
||||||
COMMIT;
|
COMMIT;
|
||||||
-- Create replication slot and setup shard split information at worker1
|
-- Create replication slot and setup shard split information at worker1
|
||||||
BEGIN;
|
BEGIN;
|
||||||
select 1 from CreateReplicationSlot();
|
select 1 from CreateReplicationSlot(:worker_2_node, :worker_2_node);
|
||||||
?column?
|
?column?
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
1
|
1
|
||||||
|
@ -123,7 +136,7 @@ COMMIT;
|
||||||
SET search_path TO citus_split_shard_by_split_points;
|
SET search_path TO citus_split_shard_by_split_points;
|
||||||
-- Create subscription at worker2 with copy_data to 'false' and derived replication slot name
|
-- Create subscription at worker2 with copy_data to 'false' and derived replication slot name
|
||||||
BEGIN;
|
BEGIN;
|
||||||
SELECT 1 from CreateSubscription();
|
SELECT 1 from CreateSubscription(:worker_2_node, 'SUB1');
|
||||||
?column?
|
?column?
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
1
|
1
|
||||||
|
@ -166,6 +179,16 @@ SELECT * from table_to_split_1;
|
||||||
500 | a
|
500 | a
|
||||||
(3 rows)
|
(3 rows)
|
||||||
|
|
||||||
|
SELECT * from table_to_split_2;
|
||||||
|
id | value
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
(0 rows)
|
||||||
|
|
||||||
|
SELECT * from table_to_split_3;
|
||||||
|
id | value
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
(0 rows)
|
||||||
|
|
||||||
select pg_sleep(10);
|
select pg_sleep(10);
|
||||||
pg_sleep
|
pg_sleep
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
|
@ -227,3 +250,169 @@ SELECT * FROM table_to_split_3;
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
(0 rows)
|
(0 rows)
|
||||||
|
|
||||||
|
-- drop publication from worker1
|
||||||
|
\c - - - :worker_1_port
|
||||||
|
SET search_path TO citus_split_shard_by_split_points;
|
||||||
|
drop PUBLICATION PUB1;
|
||||||
|
DELETE FROM slotName_table;
|
||||||
|
\c - - - :worker_2_port
|
||||||
|
SET search_path TO citus_split_shard_by_split_points;
|
||||||
|
SET client_min_messages TO WARNING;
|
||||||
|
DROP SUBSCRIPTION SUB1;
|
||||||
|
DELETE FROM slotName_table;
|
||||||
|
-- Test scenario two starts from here
|
||||||
|
-- 1. table_to_split_1 is split into table_to_split_2 and table_to_split_3. table_to_split_1 is
|
||||||
|
-- located on worker1.
|
||||||
|
-- table_to_split_2 is located on worker1 and table_to_split_3 is located on worker2
|
||||||
|
\c - - - :worker_1_port
|
||||||
|
SET search_path TO citus_split_shard_by_split_points;
|
||||||
|
-- Create publication at worker1
|
||||||
|
BEGIN;
|
||||||
|
CREATE PUBLICATION PUB1 for table table_to_split_1, table_to_split_2, table_to_split_3;
|
||||||
|
COMMIT;
|
||||||
|
-- Create replication slot and setup shard split information at worker1
|
||||||
|
-- table_to_split2 is located on Worker1 and table_to_split_3 is located on worker2
|
||||||
|
BEGIN;
|
||||||
|
select 1 from CreateReplicationSlot(:worker_1_node, :worker_2_node);
|
||||||
|
?column?
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
1
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
COMMIT;
|
||||||
|
SELECT pg_sleep(10);
|
||||||
|
pg_sleep
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- Create subscription at worker1 with copy_data to 'false' and derived replication slot name
|
||||||
|
BEGIN;
|
||||||
|
SELECT 1 from CreateSubscription(:worker_1_node, 'SUB1');
|
||||||
|
?column?
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
1
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
COMMIT;
|
||||||
|
\c - - - :worker_2_port
|
||||||
|
SET search_path TO citus_split_shard_by_split_points;
|
||||||
|
-- Create subscription at worker2 with copy_data to 'false' and derived replication slot name
|
||||||
|
BEGIN;
|
||||||
|
SELECT 1 from CreateSubscription(:worker_2_node, 'SUB2');
|
||||||
|
?column?
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
1
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
COMMIT;
|
||||||
|
-- No data is present at this moment in all the below tables at worker2
|
||||||
|
SELECT * from table_to_split_1;
|
||||||
|
id | value
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
(0 rows)
|
||||||
|
|
||||||
|
SELECT * from table_to_split_2;
|
||||||
|
id | value
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
(0 rows)
|
||||||
|
|
||||||
|
SELECT * from table_to_split_3;
|
||||||
|
id | value
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
(0 rows)
|
||||||
|
|
||||||
|
select pg_sleep(10);
|
||||||
|
pg_sleep
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- Insert data in table_to_split_1 at worker1
|
||||||
|
\c - - - :worker_1_port
|
||||||
|
SET search_path TO citus_split_shard_by_split_points;
|
||||||
|
INSERT into table_to_split_1 values(100, 'a');
|
||||||
|
INSERT into table_to_split_1 values(400, 'a');
|
||||||
|
INSERT into table_to_split_1 values(500, 'a');
|
||||||
|
select pg_sleep(10);
|
||||||
|
pg_sleep
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- expect data to present in table_to_split_2 on worker1
|
||||||
|
SELECT * from table_to_split_1;
|
||||||
|
id | value
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
100 | a
|
||||||
|
400 | a
|
||||||
|
500 | a
|
||||||
|
(3 rows)
|
||||||
|
|
||||||
|
SELECT * from table_to_split_2;
|
||||||
|
id | value
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
400 | a
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT * from table_to_split_3;
|
||||||
|
id | value
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
(0 rows)
|
||||||
|
|
||||||
|
select pg_sleep(10);
|
||||||
|
pg_sleep
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- Expect data to be present in table_to_split3 on worker2
|
||||||
|
\c - - - :worker_2_port
|
||||||
|
select pg_sleep(10);
|
||||||
|
pg_sleep
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SET search_path TO citus_split_shard_by_split_points;
|
||||||
|
SELECT * from table_to_split_1;
|
||||||
|
id | value
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
(0 rows)
|
||||||
|
|
||||||
|
SELECT * from table_to_split_2;
|
||||||
|
id | value
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
(0 rows)
|
||||||
|
|
||||||
|
SELECT * from table_to_split_3;
|
||||||
|
id | value
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
100 | a
|
||||||
|
500 | a
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
-- delete all from table_to_split_1
|
||||||
|
\c - - - :worker_1_port
|
||||||
|
SET search_path TO citus_split_shard_by_split_points;
|
||||||
|
DELETE FROM table_to_split_1;
|
||||||
|
SELECT pg_sleep(5);
|
||||||
|
pg_sleep
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- rows from table_to_split_2 should be deleted
|
||||||
|
SELECT * from table_to_split_2;
|
||||||
|
id | value
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
(0 rows)
|
||||||
|
|
||||||
|
-- rows from table_to_split_3 should be deleted
|
||||||
|
\c - - - :worker_2_port
|
||||||
|
SET search_path TO citus_split_shard_by_split_points;
|
||||||
|
SELECT * from table_to_split_3;
|
||||||
|
id | value
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
(0 rows)
|
||||||
|
|
||||||
|
|
|
@ -8,61 +8,76 @@ SET citus.next_shard_id TO 1;
|
||||||
SELECT 1 FROM citus_add_node('localhost', :worker_1_port);
|
SELECT 1 FROM citus_add_node('localhost', :worker_1_port);
|
||||||
SELECT 1 FROM citus_add_node('localhost', :worker_2_port);
|
SELECT 1 FROM citus_add_node('localhost', :worker_2_port);
|
||||||
|
|
||||||
|
SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset
|
||||||
|
SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \gset
|
||||||
|
|
||||||
-- Create distributed table (non co-located)
|
-- Create distributed table (non co-located)
|
||||||
CREATE TABLE table_to_split (id bigserial PRIMARY KEY, value char);
|
CREATE TABLE table_to_split (id bigserial PRIMARY KEY, value char);
|
||||||
SELECT create_distributed_table('table_to_split','id');
|
SELECT create_distributed_table('table_to_split','id');
|
||||||
|
|
||||||
-- slotName_table is used to persist replication slot name.
|
-- slotName_table is used to persist replication slot name.
|
||||||
-- It is only used for testing as the worker2 needs to create subscription over the same replication slot.
|
-- It is only used for testing as the worker2 needs to create subscription over the same replication slot.
|
||||||
CREATE TABLE slotName_table (name text, id int primary key);
|
CREATE TABLE slotName_table (name text, nodeId int, id int primary key);
|
||||||
SELECT create_distributed_table('slotName_table','id');
|
SELECT create_distributed_table('slotName_table','id');
|
||||||
|
|
||||||
-- Shard with id '1' of table table_to_split is undergoing a split into two new shards
|
-- targetNode1, targetNode2 are the locations where childShard1 and childShard2 are placed respectively
|
||||||
-- with id '2' and '3' respectively. table_to_split_1 is placed on worker1(NodeId 16) and
|
CREATE OR REPLACE FUNCTION SplitShardReplicationSetup(targetNode1 integer, targetNode2 integer) RETURNS text AS $$
|
||||||
-- new child shards, table_to_split_2 and table_to_split_3 are placed on worker2(NodeId 18).
|
|
||||||
-- TODO(saawasek): make it parameterized
|
|
||||||
CREATE OR REPLACE FUNCTION SplitShardReplicationSetup() RETURNS text AS $$
|
|
||||||
DECLARE
|
DECLARE
|
||||||
memoryId bigint := 0;
|
memoryId bigint := 0;
|
||||||
memoryIdText text;
|
memoryIdText text;
|
||||||
begin
|
begin
|
||||||
SELECT * into memoryId from split_shard_replication_setup(ARRAY[ARRAY[1,2,-2147483648,-1,18], ARRAY[1,3,0,2147483647,18]]);
|
SELECT * into memoryId from split_shard_replication_setup(ARRAY[ARRAY[1,2,-2147483648,-1, targetNode1], ARRAY[1,3,0,2147483647,targetNode2]]);
|
||||||
SELECT FORMAT('%s', memoryId) into memoryIdText;
|
SELECT FORMAT('%s', memoryId) into memoryIdText;
|
||||||
return memoryIdText;
|
return memoryIdText;
|
||||||
end
|
end
|
||||||
$$ LANGUAGE plpgsql;
|
$$ LANGUAGE plpgsql;
|
||||||
|
|
||||||
-- Sets up split shard information and returns Slot Name in format : DestinationNodeId_SlotType_SharedMemoryId
|
-- Create replication slots for targetNode1 and targetNode2
|
||||||
-- TODO(saawasek): make it parameterized
|
CREATE OR REPLACE FUNCTION CreateReplicationSlot(targetNode1 integer, targetNode2 integer) RETURNS text AS $$
|
||||||
CREATE OR REPLACE FUNCTION CreateReplicationSlot() RETURNS text AS $$
|
|
||||||
DECLARE
|
DECLARE
|
||||||
replicationSlotName text;
|
targetOneSlotName text;
|
||||||
createdSlotName text;
|
targetTwoSlotName text;
|
||||||
sharedMemoryId text;
|
sharedMemoryId text;
|
||||||
derivedSlotName text;
|
derivedSlotName text;
|
||||||
begin
|
begin
|
||||||
SELECT * into sharedMemoryId from SplitShardReplicationSetup();
|
|
||||||
-- '18' is nodeId of worker2
|
SELECT * into sharedMemoryId from SplitShardReplicationSetup(targetNode1, targetNode2);
|
||||||
SELECT FORMAT('18_0_%s', sharedMemoryId) into derivedSlotName;
|
SELECT FORMAT('%s_%s', targetNode1, sharedMemoryId) into derivedSlotName;
|
||||||
SELECT slot_name into replicationSlotName from pg_create_logical_replication_slot(derivedSlotName, 'logical_decoding_plugin');
|
SELECT slot_name into targetOneSlotName from pg_create_logical_replication_slot(derivedSlotName, 'logical_decoding_plugin');
|
||||||
INSERT INTO slotName_table values(replicationSlotName, 1);
|
|
||||||
return replicationSlotName;
|
-- if new child shards are placed on different nodes, create one more replication slot
|
||||||
|
if (targetNode1 != targetNode2) then
|
||||||
|
SELECT FORMAT('%s_%s', targetNode2, sharedMemoryId) into derivedSlotName;
|
||||||
|
SELECT slot_name into targetTwoSlotName from pg_create_logical_replication_slot(derivedSlotName, 'logical_decoding_plugin');
|
||||||
|
INSERT INTO slotName_table values(targetTwoSlotName, targetNode2, 1);
|
||||||
|
end if;
|
||||||
|
|
||||||
|
INSERT INTO slotName_table values(targetOneSlotName, targetNode1, 2);
|
||||||
|
return targetOneSlotName;
|
||||||
end
|
end
|
||||||
$$ LANGUAGE plpgsql;
|
$$ LANGUAGE plpgsql;
|
||||||
|
|
||||||
CREATE OR REPLACE FUNCTION CreateSubscription() RETURNS text AS $$
|
CREATE OR REPLACE FUNCTION CreateSubscription(targetNodeId integer, subscriptionName text) RETURNS text AS $$
|
||||||
DECLARE
|
DECLARE
|
||||||
replicationSlotName text;
|
replicationSlotName text;
|
||||||
nodeportLocal int;
|
nodeportLocal int;
|
||||||
subname text;
|
subname text;
|
||||||
begin
|
begin
|
||||||
SELECT name into replicationSlotName from slotName_table;
|
SELECT name into replicationSlotName from slotName_table where nodeId = targetNodeId;
|
||||||
EXECUTE FORMAT($sub$create subscription subforID1 connection 'host=localhost port=57637 user=postgres dbname=regression' publication PUB1 with(create_slot=false, enabled=true, slot_name='%s', copy_data=false)$sub$, replicationSlotName);
|
EXECUTE FORMAT($sub$create subscription %s connection 'host=localhost port=57637 user=postgres dbname=regression' publication PUB1 with(create_slot=false, enabled=true, slot_name='%s', copy_data=false)$sub$, subscriptionName, replicationSlotName);
|
||||||
return 'a';
|
return 'a';
|
||||||
end
|
end
|
||||||
$$ LANGUAGE plpgsql;
|
$$ LANGUAGE plpgsql;
|
||||||
|
|
||||||
-- Test scenario starts from here
|
CREATE OR REPLACE FUNCTION DropSubscription(subscriptionName text) RETURNS text AS $$
|
||||||
|
DECLARE
|
||||||
|
begin
|
||||||
|
EXECUTE FORMAT('DROP SUBSCRIPTION %s', subscriptionName);
|
||||||
|
return subscriptionName;
|
||||||
|
end
|
||||||
|
$$ LANGUAGE plpgsql;
|
||||||
|
|
||||||
|
-- Test scenario one starts from here
|
||||||
-- 1. table_to_split is a citus distributed table
|
-- 1. table_to_split is a citus distributed table
|
||||||
-- 2. Shard table_to_split_1 is located on worker1.
|
-- 2. Shard table_to_split_1 is located on worker1.
|
||||||
-- 3. table_to_split_1 is split into table_to_split_2 and table_to_split_3.
|
-- 3. table_to_split_1 is split into table_to_split_2 and table_to_split_3.
|
||||||
|
@ -103,7 +118,7 @@ COMMIT;
|
||||||
|
|
||||||
-- Create replication slot and setup shard split information at worker1
|
-- Create replication slot and setup shard split information at worker1
|
||||||
BEGIN;
|
BEGIN;
|
||||||
select 1 from CreateReplicationSlot();
|
select 1 from CreateReplicationSlot(:worker_2_node, :worker_2_node);
|
||||||
COMMIT;
|
COMMIT;
|
||||||
|
|
||||||
\c - - - :worker_2_port
|
\c - - - :worker_2_port
|
||||||
|
@ -111,7 +126,7 @@ SET search_path TO citus_split_shard_by_split_points;
|
||||||
|
|
||||||
-- Create subscription at worker2 with copy_data to 'false' and derived replication slot name
|
-- Create subscription at worker2 with copy_data to 'false' and derived replication slot name
|
||||||
BEGIN;
|
BEGIN;
|
||||||
SELECT 1 from CreateSubscription();
|
SELECT 1 from CreateSubscription(:worker_2_node, 'SUB1');
|
||||||
COMMIT;
|
COMMIT;
|
||||||
|
|
||||||
-- No data is present at this moment in all the below tables at worker2
|
-- No data is present at this moment in all the below tables at worker2
|
||||||
|
@ -127,6 +142,8 @@ INSERT into table_to_split_1 values(100, 'a');
|
||||||
INSERT into table_to_split_1 values(400, 'a');
|
INSERT into table_to_split_1 values(400, 'a');
|
||||||
INSERT into table_to_split_1 values(500, 'a');
|
INSERT into table_to_split_1 values(500, 'a');
|
||||||
SELECT * from table_to_split_1;
|
SELECT * from table_to_split_1;
|
||||||
|
SELECT * from table_to_split_2;
|
||||||
|
SELECT * from table_to_split_3;
|
||||||
select pg_sleep(10);
|
select pg_sleep(10);
|
||||||
|
|
||||||
-- Expect data to be present in shard 2 and shard 3 based on the hash value.
|
-- Expect data to be present in shard 2 and shard 3 based on the hash value.
|
||||||
|
@ -149,3 +166,90 @@ SET search_path TO citus_split_shard_by_split_points;
|
||||||
SELECT * FROM table_to_split_1;
|
SELECT * FROM table_to_split_1;
|
||||||
SELECT * FROM table_to_split_2;
|
SELECT * FROM table_to_split_2;
|
||||||
SELECT * FROM table_to_split_3;
|
SELECT * FROM table_to_split_3;
|
||||||
|
|
||||||
|
-- drop publication from worker1
|
||||||
|
\c - - - :worker_1_port
|
||||||
|
SET search_path TO citus_split_shard_by_split_points;
|
||||||
|
drop PUBLICATION PUB1;
|
||||||
|
DELETE FROM slotName_table;
|
||||||
|
|
||||||
|
\c - - - :worker_2_port
|
||||||
|
SET search_path TO citus_split_shard_by_split_points;
|
||||||
|
SET client_min_messages TO WARNING;
|
||||||
|
DROP SUBSCRIPTION SUB1;
|
||||||
|
DELETE FROM slotName_table;
|
||||||
|
|
||||||
|
-- Test scenario two starts from here
|
||||||
|
-- 1. table_to_split_1 is split into table_to_split_2 and table_to_split_3. table_to_split_1 is
|
||||||
|
-- located on worker1.
|
||||||
|
-- table_to_split_2 is located on worker1 and table_to_split_3 is located on worker2
|
||||||
|
|
||||||
|
\c - - - :worker_1_port
|
||||||
|
SET search_path TO citus_split_shard_by_split_points;
|
||||||
|
|
||||||
|
-- Create publication at worker1
|
||||||
|
BEGIN;
|
||||||
|
CREATE PUBLICATION PUB1 for table table_to_split_1, table_to_split_2, table_to_split_3;
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
-- Create replication slot and setup shard split information at worker1
|
||||||
|
-- table_to_split2 is located on Worker1 and table_to_split_3 is located on worker2
|
||||||
|
BEGIN;
|
||||||
|
select 1 from CreateReplicationSlot(:worker_1_node, :worker_2_node);
|
||||||
|
COMMIT;
|
||||||
|
SELECT pg_sleep(10);
|
||||||
|
|
||||||
|
-- Create subscription at worker1 with copy_data to 'false' and derived replication slot name
|
||||||
|
BEGIN;
|
||||||
|
SELECT 1 from CreateSubscription(:worker_1_node, 'SUB1');
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
\c - - - :worker_2_port
|
||||||
|
SET search_path TO citus_split_shard_by_split_points;
|
||||||
|
|
||||||
|
-- Create subscription at worker2 with copy_data to 'false' and derived replication slot name
|
||||||
|
BEGIN;
|
||||||
|
SELECT 1 from CreateSubscription(:worker_2_node, 'SUB2');
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
-- No data is present at this moment in all the below tables at worker2
|
||||||
|
SELECT * from table_to_split_1;
|
||||||
|
SELECT * from table_to_split_2;
|
||||||
|
SELECT * from table_to_split_3;
|
||||||
|
select pg_sleep(10);
|
||||||
|
|
||||||
|
-- Insert data in table_to_split_1 at worker1
|
||||||
|
\c - - - :worker_1_port
|
||||||
|
SET search_path TO citus_split_shard_by_split_points;
|
||||||
|
INSERT into table_to_split_1 values(100, 'a');
|
||||||
|
INSERT into table_to_split_1 values(400, 'a');
|
||||||
|
INSERT into table_to_split_1 values(500, 'a');
|
||||||
|
select pg_sleep(10);
|
||||||
|
|
||||||
|
-- expect data to present in table_to_split_2 on worker1
|
||||||
|
SELECT * from table_to_split_1;
|
||||||
|
SELECT * from table_to_split_2;
|
||||||
|
SELECT * from table_to_split_3;
|
||||||
|
select pg_sleep(10);
|
||||||
|
|
||||||
|
-- Expect data to be present in table_to_split3 on worker2
|
||||||
|
\c - - - :worker_2_port
|
||||||
|
select pg_sleep(10);
|
||||||
|
SET search_path TO citus_split_shard_by_split_points;
|
||||||
|
SELECT * from table_to_split_1;
|
||||||
|
SELECT * from table_to_split_2;
|
||||||
|
SELECT * from table_to_split_3;
|
||||||
|
|
||||||
|
-- delete all from table_to_split_1
|
||||||
|
\c - - - :worker_1_port
|
||||||
|
SET search_path TO citus_split_shard_by_split_points;
|
||||||
|
DELETE FROM table_to_split_1;
|
||||||
|
SELECT pg_sleep(5);
|
||||||
|
|
||||||
|
-- rows from table_to_split_2 should be deleted
|
||||||
|
SELECT * from table_to_split_2;
|
||||||
|
|
||||||
|
-- rows from table_to_split_3 should be deleted
|
||||||
|
\c - - - :worker_2_port
|
||||||
|
SET search_path TO citus_split_shard_by_split_points;
|
||||||
|
SELECT * from table_to_split_3;
|
Loading…
Reference in New Issue