mirror of https://github.com/citusdata/citus.git
Added colocated shard test
- added a new test helper fileusers/saawasek/non_blocking_split_integrated
parent
d71ead0e43
commit
daae0bb10d
|
@ -25,7 +25,7 @@ static HTAB *ShardInfoHashMap = NULL;
|
|||
/* Entry for hash map */
|
||||
typedef struct NodeShardMappingEntry
|
||||
{
|
||||
uint64_t key;
|
||||
uint64_t keyNodeId;
|
||||
List *shardSplitInfoList;
|
||||
} NodeShardMappingEntry;
|
||||
|
||||
|
@ -397,7 +397,7 @@ AddShardSplitInfoEntryForNodeInMap(ShardSplitInfo *shardSplitInfo)
|
|||
if (!found)
|
||||
{
|
||||
nodeMappingEntry->shardSplitInfoList = NULL;
|
||||
nodeMappingEntry->key = keyNodeId;
|
||||
nodeMappingEntry->keyNodeId = keyNodeId;
|
||||
}
|
||||
|
||||
nodeMappingEntry->shardSplitInfoList =
|
||||
|
@ -433,7 +433,7 @@ PopulateShardSplitInfoInSM(ShardSplitInfo *shardSplitInfoArray,
|
|||
int index = 0;
|
||||
while ((entry = (NodeShardMappingEntry *) hash_seq_search(&status)) != NULL)
|
||||
{
|
||||
uint64_t nodeId = entry->key;
|
||||
uint64_t nodeId = entry->keyNodeId;
|
||||
char *derivedSlotName =
|
||||
encode_replication_slot(nodeId, dsmHandle);
|
||||
|
||||
|
|
|
@ -124,62 +124,3 @@ BEGIN
|
|||
END LOOP;
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
\c - - - :worker_1_port
|
||||
-- Create replication slots for targetNode1 and targetNode2
|
||||
CREATE OR REPLACE FUNCTION CreateReplicationSlot(targetNode1 integer, targetNode2 integer) RETURNS text AS $$
|
||||
DECLARE
|
||||
targetOneSlotName text;
|
||||
targetTwoSlotName text;
|
||||
sharedMemoryId text;
|
||||
derivedSlotName text;
|
||||
begin
|
||||
|
||||
SELECT * into sharedMemoryId from public.SplitShardReplicationSetup(targetNode1, targetNode2);
|
||||
SELECT FORMAT('%s_%s', targetNode1, sharedMemoryId) into derivedSlotName;
|
||||
SELECT slot_name into targetOneSlotName from pg_create_logical_replication_slot(derivedSlotName, 'decoding_plugin_for_shard_split');
|
||||
|
||||
-- if new child shards are placed on different nodes, create one more replication slot
|
||||
if (targetNode1 != targetNode2) then
|
||||
SELECT FORMAT('%s_%s', targetNode2, sharedMemoryId) into derivedSlotName;
|
||||
SELECT slot_name into targetTwoSlotName from pg_create_logical_replication_slot(derivedSlotName, 'decoding_plugin_for_shard_split');
|
||||
INSERT INTO slotName_table values(targetTwoSlotName, targetNode2, 1);
|
||||
end if;
|
||||
|
||||
INSERT INTO slotName_table values(targetOneSlotName, targetNode1, 2);
|
||||
return targetOneSlotName;
|
||||
end
|
||||
$$ LANGUAGE plpgsql;
|
||||
-- targetNode1, targetNode2 are the locations where childShard1 and childShard2 are placed respectively
|
||||
CREATE OR REPLACE FUNCTION SplitShardReplicationSetup(targetNode1 integer, targetNode2 integer) RETURNS text AS $$
|
||||
DECLARE
|
||||
memoryId bigint := 0;
|
||||
memoryIdText text;
|
||||
begin
|
||||
SELECT * into memoryId from split_shard_replication_setup(ARRAY[ARRAY[1,2,-2147483648,-1, targetNode1], ARRAY[1,3,0,2147483647,targetNode2]]);
|
||||
SELECT FORMAT('%s', memoryId) into memoryIdText;
|
||||
return memoryIdText;
|
||||
end
|
||||
$$ LANGUAGE plpgsql;
|
||||
CREATE OR REPLACE FUNCTION CreateSubscription(targetNodeId integer, subscriptionName text) RETURNS text AS $$
|
||||
DECLARE
|
||||
replicationSlotName text;
|
||||
nodeportLocal int;
|
||||
subname text;
|
||||
begin
|
||||
SELECT name into replicationSlotName from slotName_table where nodeId = targetNodeId;
|
||||
EXECUTE FORMAT($sub$create subscription %s connection 'host=localhost port=xxxxx user=postgres dbname=regression' publication PUB1 with(create_slot=false, enabled=true, slot_name='%s', copy_data=false)$sub$, subscriptionName, replicationSlotName);
|
||||
return 'a';
|
||||
end
|
||||
$$ LANGUAGE plpgsql;
|
||||
\c - - - :worker_2_port
|
||||
CREATE OR REPLACE FUNCTION CreateSubscription(targetNodeId integer, subscriptionName text) RETURNS text AS $$
|
||||
DECLARE
|
||||
replicationSlotName text;
|
||||
nodeportLocal int;
|
||||
subname text;
|
||||
begin
|
||||
SELECT name into replicationSlotName from slotName_table where nodeId = targetNodeId;
|
||||
EXECUTE FORMAT($sub$create subscription %s connection 'host=localhost port=xxxxx user=postgres dbname=regression' publication PUB1 with(create_slot=false, enabled=true, slot_name='%s', copy_data=false)$sub$, subscriptionName, replicationSlotName);
|
||||
return 'a';
|
||||
end
|
||||
$$ LANGUAGE plpgsql;
|
||||
|
|
|
@ -0,0 +1,202 @@
|
|||
\c - - - :master_port
|
||||
SET citus.shard_replication_factor TO 1;
|
||||
SET citus.shard_count TO 1;
|
||||
SET citus.next_shard_id TO 4;
|
||||
-- Add two additional nodes to cluster.
|
||||
SELECT 1 FROM citus_add_node('localhost', :worker_1_port);
|
||||
?column?
|
||||
---------------------------------------------------------------------
|
||||
1
|
||||
(1 row)
|
||||
|
||||
SELECT 1 FROM citus_add_node('localhost', :worker_2_port);
|
||||
?column?
|
||||
---------------------------------------------------------------------
|
||||
1
|
||||
(1 row)
|
||||
|
||||
SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset
|
||||
SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \gset
|
||||
-- Create distributed table (co-located)
|
||||
CREATE TABLE table_first (id bigserial PRIMARY KEY, value char);
|
||||
CREATE TABLE table_second (id bigserial PRIMARY KEY, value char);
|
||||
SELECT create_distributed_table('table_first','id');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SET citus.next_shard_id TO 7;
|
||||
SET citus.shard_count TO 1;
|
||||
SELECT create_distributed_table('table_second', 'id', colocate_with => 'table_first');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
\c - - - :worker_1_port
|
||||
CREATE TABLE table_first_5(id bigserial PRIMARY KEY, value char);
|
||||
CREATE TABLE table_first_6(id bigserial PRIMARY KEY, value char);
|
||||
CREATE TABLE table_second_8(id bigserial PRIMARY KEY, value char);
|
||||
CREATE TABLE table_second_9(id bigserial PRIMARY KEY, value char);
|
||||
\c - - - :worker_2_port
|
||||
CREATE TABLE table_first_4(id bigserial PRIMARY KEY, value char);
|
||||
CREATE TABLE table_first_5(id bigserial PRIMARY KEY, value char);
|
||||
CREATE TABLE table_first_6(id bigserial PRIMARY KEY, value char);
|
||||
CREATE TABLE table_second_7(id bigserial PRIMARY KEY, value char);
|
||||
CREATE TABLE table_second_8(id bigserial PRIMARY KEY, value char);
|
||||
CREATE TABLE table_second_9(id bigserial PRIMARY KEY, value char);
|
||||
-- Test scenario one starts from here
|
||||
-- 1. table_first and table_second are colocated tables.
|
||||
-- 2. Shard table_first_4 and table_second_7 are colocated on worker1
|
||||
-- 3. table_first_4 is split into table_first_5 and table_first_6 with target as worker2
|
||||
-- 4. table_second_7 is split into table_second_8 and table_second_9 with target as worker2
|
||||
-- 5. Create Replication slot with 'decoding_plugin_for_shard_split'
|
||||
-- 6. Setup Pub/Sub
|
||||
-- 7. Insert into table_first_4 and table_second_7 at source worker1
|
||||
-- 8. Expect the results in child shards on worker2
|
||||
\c - - - :worker_1_port
|
||||
-- Create publication at worker1
|
||||
BEGIN;
|
||||
CREATE PUBLICATION PUB1 for table table_first_4, table_first_5, table_first_6, table_second_7, table_second_8, table_second_9;
|
||||
COMMIT;
|
||||
BEGIN;
|
||||
select 1 from public.CreateReplicationSlotForColocatedShards(:worker_2_node, :worker_2_node);
|
||||
?column?
|
||||
---------------------------------------------------------------------
|
||||
1
|
||||
(1 row)
|
||||
|
||||
COMMIT;
|
||||
SELECT pg_sleep(10);
|
||||
pg_sleep
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
\c - - - :worker_2_port
|
||||
-- Create subscription at worker2 with copy_data to 'false' and derived replication slot name
|
||||
BEGIN;
|
||||
SELECT 1 from public.CreateSubscription(:worker_2_node, 'SUB1');
|
||||
?column?
|
||||
---------------------------------------------------------------------
|
||||
1
|
||||
(1 row)
|
||||
|
||||
COMMIT;
|
||||
-- No data is present at this moment in all the below tables at worker2
|
||||
SELECT * from table_first_4;
|
||||
id | value
|
||||
---------------------------------------------------------------------
|
||||
(0 rows)
|
||||
|
||||
SELECT * from table_first_5;
|
||||
id | value
|
||||
---------------------------------------------------------------------
|
||||
(0 rows)
|
||||
|
||||
SELECT * from table_first_6;
|
||||
id | value
|
||||
---------------------------------------------------------------------
|
||||
(0 rows)
|
||||
|
||||
select pg_sleep(10);
|
||||
pg_sleep
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- Insert data in table_to_split_1 at worker1
|
||||
\c - - - :worker_1_port
|
||||
INSERT into table_first_4 values(100, 'a');
|
||||
INSERT into table_first_4 values(400, 'a');
|
||||
INSERT into table_first_4 values(500, 'a');
|
||||
SELECT * from table_first_4;
|
||||
id | value
|
||||
---------------------------------------------------------------------
|
||||
100 | a
|
||||
400 | a
|
||||
500 | a
|
||||
(3 rows)
|
||||
|
||||
SELECT * from table_first_5;
|
||||
id | value
|
||||
---------------------------------------------------------------------
|
||||
(0 rows)
|
||||
|
||||
SELECT * from table_first_6;
|
||||
id | value
|
||||
---------------------------------------------------------------------
|
||||
(0 rows)
|
||||
|
||||
INSERT into table_second_7 values(100, 'a');
|
||||
INSERT into table_second_7 values(400, 'a');
|
||||
INSERT into table_second_7 values(500, 'a');
|
||||
SELECT * FROM table_second_7;
|
||||
id | value
|
||||
---------------------------------------------------------------------
|
||||
100 | a
|
||||
400 | a
|
||||
500 | a
|
||||
(3 rows)
|
||||
|
||||
SELECT * FROM table_second_8;
|
||||
id | value
|
||||
---------------------------------------------------------------------
|
||||
(0 rows)
|
||||
|
||||
SELECT * FROM table_second_9;
|
||||
id | value
|
||||
---------------------------------------------------------------------
|
||||
(0 rows)
|
||||
|
||||
select pg_sleep(10);
|
||||
pg_sleep
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- Expect data to be present in shard xxxxx,6 and 8,9 based on the hash value.
|
||||
\c - - - :worker_2_port
|
||||
select pg_sleep(10);
|
||||
pg_sleep
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT * from table_first_4; -- should alwasy have zero rows
|
||||
id | value
|
||||
---------------------------------------------------------------------
|
||||
(0 rows)
|
||||
|
||||
SELECT * from table_first_5;
|
||||
id | value
|
||||
---------------------------------------------------------------------
|
||||
400 | a
|
||||
(1 row)
|
||||
|
||||
SELECT * from table_first_6;
|
||||
id | value
|
||||
---------------------------------------------------------------------
|
||||
100 | a
|
||||
500 | a
|
||||
(2 rows)
|
||||
|
||||
SELECT * FROM table_second_7; --should alwasy have zero rows
|
||||
id | value
|
||||
---------------------------------------------------------------------
|
||||
(0 rows)
|
||||
|
||||
SELECT * FROM table_second_8;
|
||||
id | value
|
||||
---------------------------------------------------------------------
|
||||
400 | a
|
||||
(1 row)
|
||||
|
||||
SELECT * FROM table_second_9;
|
||||
id | value
|
||||
---------------------------------------------------------------------
|
||||
100 | a
|
||||
500 | a
|
||||
(2 rows)
|
||||
|
|
@ -356,14 +356,17 @@ DELETE FROM slotName_table;
|
|||
SET client_min_messages TO WARNING;
|
||||
DROP SUBSCRIPTION SUB2;
|
||||
DELETE FROM slotName_table;
|
||||
-- Test Scenario 3
|
||||
-- Test scenario three starts from here (parent shard and child shards are located on same machine)
|
||||
-- 1. table_to_split_1 is split into table_to_split_2 and table_to_split_3.
|
||||
-- 2. table_to_split_1 is located on worker1.
|
||||
-- 3. table_to_split_2 and table_to_split_3 are located on worker1
|
||||
\c - - - :worker_1_port
|
||||
SET client_min_messages TO WARNING;
|
||||
-- Create publication at worker1
|
||||
BEGIN;
|
||||
CREATE PUBLICATION PUB1 for table table_to_split_1, table_to_split_2, table_to_split_3;
|
||||
COMMIT;
|
||||
-- Create replication slots for two target nodes worker1 and worker2.
|
||||
-- Worker1 is target for table_to_split_2 and Worker2 is target for table_to_split_3
|
||||
-- Worker1 is target for table_to_split_2 and table_to_split_3
|
||||
BEGIN;
|
||||
select 1 from public.CreateReplicationSlot(:worker_1_node, :worker_1_node);
|
||||
?column?
|
||||
|
@ -393,7 +396,7 @@ SELECT pg_sleep(10);
|
|||
|
||||
(1 row)
|
||||
|
||||
INSERT into table_to_split_1 values(100, 'a');
|
||||
INSERT into table_to_split_1 values(100, 'a');
|
||||
INSERT into table_to_split_1 values(400, 'a');
|
||||
INSERT into table_to_split_1 values(500, 'a');
|
||||
select pg_sleep(10);
|
||||
|
@ -402,7 +405,7 @@ select pg_sleep(10);
|
|||
|
||||
(1 row)
|
||||
|
||||
-- expect data to present in table_to_split_2 on worker1
|
||||
-- expect data to present in table_to_split_2/3 on worker1
|
||||
SELECT * from table_to_split_1;
|
||||
id | value
|
||||
---------------------------------------------------------------------
|
||||
|
@ -452,3 +455,7 @@ SELECT * from table_to_split_3;
|
|||
---------------------------------------------------------------------
|
||||
(0 rows)
|
||||
|
||||
-- clean up
|
||||
DROP PUBLICATION PUB1;
|
||||
DELETE FROM slotName_table;
|
||||
DROP SUBSCRIPTION SUB1;
|
||||
|
|
|
@ -0,0 +1,92 @@
|
|||
-- File to create functions and helpers needed for split shard tests
|
||||
-- Populates shared memory mapping for parent shard with id 1.
|
||||
-- targetNode1, targetNode2 are the locations where child shard xxxxx and 3 are placed respectively
|
||||
CREATE OR REPLACE FUNCTION SplitShardReplicationSetup(targetNode1 integer, targetNode2 integer) RETURNS text AS $$
|
||||
DECLARE
|
||||
memoryId bigint := 0;
|
||||
memoryIdText text;
|
||||
begin
|
||||
SELECT * into memoryId from split_shard_replication_setup(ARRAY[ARRAY[1,2,-2147483648,-1, targetNode1], ARRAY[1,3,0,2147483647,targetNode2]]);
|
||||
SELECT FORMAT('%s', memoryId) into memoryIdText;
|
||||
return memoryIdText;
|
||||
end
|
||||
$$ LANGUAGE plpgsql;
|
||||
-- Create replication slots for targetNode1 and targetNode2 incase of non-colocated shards
|
||||
CREATE OR REPLACE FUNCTION CreateReplicationSlot(targetNode1 integer, targetNode2 integer) RETURNS text AS $$
|
||||
DECLARE
|
||||
targetOneSlotName text;
|
||||
targetTwoSlotName text;
|
||||
sharedMemoryId text;
|
||||
derivedSlotName text;
|
||||
begin
|
||||
|
||||
SELECT * into sharedMemoryId from public.SplitShardReplicationSetup(targetNode1, targetNode2);
|
||||
SELECT FORMAT('%s_%s', targetNode1, sharedMemoryId) into derivedSlotName;
|
||||
SELECT slot_name into targetOneSlotName from pg_create_logical_replication_slot(derivedSlotName, 'decoding_plugin_for_shard_split');
|
||||
|
||||
-- if new child shards are placed on different nodes, create one more replication slot
|
||||
if (targetNode1 != targetNode2) then
|
||||
SELECT FORMAT('%s_%s', targetNode2, sharedMemoryId) into derivedSlotName;
|
||||
SELECT slot_name into targetTwoSlotName from pg_create_logical_replication_slot(derivedSlotName, 'decoding_plugin_for_shard_split');
|
||||
INSERT INTO slotName_table values(targetTwoSlotName, targetNode2, 1);
|
||||
end if;
|
||||
|
||||
INSERT INTO slotName_table values(targetOneSlotName, targetNode1, 2);
|
||||
return targetOneSlotName;
|
||||
end
|
||||
$$ LANGUAGE plpgsql;
|
||||
-- Populates shared memory mapping for colocated parent shards 4 and 7.
|
||||
-- shard xxxxx has child shards 5 and 6. Shard 7 has child shards 8 and 9.
|
||||
CREATE OR REPLACE FUNCTION SplitShardReplicationSetupForColocatedShards(targetNode1 integer, targetNode2 integer) RETURNS text AS $$
|
||||
DECLARE
|
||||
memoryId bigint := 0;
|
||||
memoryIdText text;
|
||||
begin
|
||||
SELECT * into memoryId from split_shard_replication_setup(
|
||||
ARRAY[
|
||||
ARRAY[4, 5, -2147483648,-1, targetNode1],
|
||||
ARRAY[4, 6, 0 ,2147483647, targetNode2],
|
||||
ARRAY[7, 8, -2147483648,-1, targetNode1],
|
||||
ARRAY[7, 9, 0, 2147483647 , targetNode2]
|
||||
]);
|
||||
|
||||
SELECT FORMAT('%s', memoryId) into memoryIdText;
|
||||
return memoryIdText;
|
||||
end
|
||||
$$ LANGUAGE plpgsql;
|
||||
-- Create replication slots for targetNode1 and targetNode2 incase of colocated shards
|
||||
CREATE OR REPLACE FUNCTION CreateReplicationSlotForColocatedShards(targetNode1 integer, targetNode2 integer) RETURNS text AS $$
|
||||
DECLARE
|
||||
targetOneSlotName text;
|
||||
targetTwoSlotName text;
|
||||
sharedMemoryId text;
|
||||
derivedSlotName text;
|
||||
begin
|
||||
|
||||
SELECT * into sharedMemoryId from public.SplitShardReplicationSetupForColocatedShards(targetNode1, targetNode2);
|
||||
SELECT FORMAT('%s_%s', targetNode1, sharedMemoryId) into derivedSlotName;
|
||||
SELECT slot_name into targetOneSlotName from pg_create_logical_replication_slot(derivedSlotName, 'decoding_plugin_for_shard_split');
|
||||
|
||||
-- if new child shards are placed on different nodes, create one more replication slot
|
||||
if (targetNode1 != targetNode2) then
|
||||
SELECT FORMAT('%s_%s', targetNode2, sharedMemoryId) into derivedSlotName;
|
||||
SELECT slot_name into targetTwoSlotName from pg_create_logical_replication_slot(derivedSlotName, 'decoding_plugin_for_shard_split');
|
||||
INSERT INTO slotName_table values(targetTwoSlotName, targetNode2, 1);
|
||||
end if;
|
||||
|
||||
INSERT INTO slotName_table values(targetOneSlotName, targetNode1, 2);
|
||||
return targetOneSlotName;
|
||||
end
|
||||
$$ LANGUAGE plpgsql;
|
||||
-- create subscription on target node with given 'subscriptionName'
|
||||
CREATE OR REPLACE FUNCTION CreateSubscription(targetNodeId integer, subscriptionName text) RETURNS text AS $$
|
||||
DECLARE
|
||||
replicationSlotName text;
|
||||
nodeportLocal int;
|
||||
subname text;
|
||||
begin
|
||||
SELECT name into replicationSlotName from slotName_table where nodeId = targetNodeId;
|
||||
EXECUTE FORMAT($sub$create subscription %s connection 'host=localhost port=xxxxx user=postgres dbname=regression' publication PUB1 with(create_slot=false, enabled=true, slot_name='%s', copy_data=false)$sub$, subscriptionName, replicationSlotName);
|
||||
return replicationSlotName;
|
||||
end
|
||||
$$ LANGUAGE plpgsql;
|
|
@ -135,66 +135,3 @@ BEGIN
|
|||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
|
||||
\c - - - :worker_1_port
|
||||
-- Create replication slots for targetNode1 and targetNode2
|
||||
CREATE OR REPLACE FUNCTION CreateReplicationSlot(targetNode1 integer, targetNode2 integer) RETURNS text AS $$
|
||||
DECLARE
|
||||
targetOneSlotName text;
|
||||
targetTwoSlotName text;
|
||||
sharedMemoryId text;
|
||||
derivedSlotName text;
|
||||
begin
|
||||
|
||||
SELECT * into sharedMemoryId from public.SplitShardReplicationSetup(targetNode1, targetNode2);
|
||||
SELECT FORMAT('%s_%s', targetNode1, sharedMemoryId) into derivedSlotName;
|
||||
SELECT slot_name into targetOneSlotName from pg_create_logical_replication_slot(derivedSlotName, 'decoding_plugin_for_shard_split');
|
||||
|
||||
-- if new child shards are placed on different nodes, create one more replication slot
|
||||
if (targetNode1 != targetNode2) then
|
||||
SELECT FORMAT('%s_%s', targetNode2, sharedMemoryId) into derivedSlotName;
|
||||
SELECT slot_name into targetTwoSlotName from pg_create_logical_replication_slot(derivedSlotName, 'decoding_plugin_for_shard_split');
|
||||
INSERT INTO slotName_table values(targetTwoSlotName, targetNode2, 1);
|
||||
end if;
|
||||
|
||||
INSERT INTO slotName_table values(targetOneSlotName, targetNode1, 2);
|
||||
return targetOneSlotName;
|
||||
end
|
||||
$$ LANGUAGE plpgsql;
|
||||
|
||||
-- targetNode1, targetNode2 are the locations where childShard1 and childShard2 are placed respectively
|
||||
CREATE OR REPLACE FUNCTION SplitShardReplicationSetup(targetNode1 integer, targetNode2 integer) RETURNS text AS $$
|
||||
DECLARE
|
||||
memoryId bigint := 0;
|
||||
memoryIdText text;
|
||||
begin
|
||||
SELECT * into memoryId from split_shard_replication_setup(ARRAY[ARRAY[1,2,-2147483648,-1, targetNode1], ARRAY[1,3,0,2147483647,targetNode2]]);
|
||||
SELECT FORMAT('%s', memoryId) into memoryIdText;
|
||||
return memoryIdText;
|
||||
end
|
||||
$$ LANGUAGE plpgsql;
|
||||
|
||||
CREATE OR REPLACE FUNCTION CreateSubscription(targetNodeId integer, subscriptionName text) RETURNS text AS $$
|
||||
DECLARE
|
||||
replicationSlotName text;
|
||||
nodeportLocal int;
|
||||
subname text;
|
||||
begin
|
||||
SELECT name into replicationSlotName from slotName_table where nodeId = targetNodeId;
|
||||
EXECUTE FORMAT($sub$create subscription %s connection 'host=localhost port=57637 user=postgres dbname=regression' publication PUB1 with(create_slot=false, enabled=true, slot_name='%s', copy_data=false)$sub$, subscriptionName, replicationSlotName);
|
||||
return 'a';
|
||||
end
|
||||
$$ LANGUAGE plpgsql;
|
||||
|
||||
\c - - - :worker_2_port
|
||||
CREATE OR REPLACE FUNCTION CreateSubscription(targetNodeId integer, subscriptionName text) RETURNS text AS $$
|
||||
DECLARE
|
||||
replicationSlotName text;
|
||||
nodeportLocal int;
|
||||
subname text;
|
||||
begin
|
||||
SELECT name into replicationSlotName from slotName_table where nodeId = targetNodeId;
|
||||
EXECUTE FORMAT($sub$create subscription %s connection 'host=localhost port=57637 user=postgres dbname=regression' publication PUB1 with(create_slot=false, enabled=true, slot_name='%s', copy_data=false)$sub$, subscriptionName, replicationSlotName);
|
||||
return 'a';
|
||||
end
|
||||
$$ LANGUAGE plpgsql;
|
||||
|
||||
|
|
|
@ -0,0 +1,100 @@
|
|||
\c - - - :master_port
|
||||
SET citus.shard_replication_factor TO 1;
|
||||
SET citus.shard_count TO 1;
|
||||
SET citus.next_shard_id TO 4;
|
||||
|
||||
-- Add two additional nodes to cluster.
|
||||
SELECT 1 FROM citus_add_node('localhost', :worker_1_port);
|
||||
SELECT 1 FROM citus_add_node('localhost', :worker_2_port);
|
||||
|
||||
SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset
|
||||
SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \gset
|
||||
|
||||
-- Create distributed table (co-located)
|
||||
CREATE TABLE table_first (id bigserial PRIMARY KEY, value char);
|
||||
CREATE TABLE table_second (id bigserial PRIMARY KEY, value char);
|
||||
|
||||
SELECT create_distributed_table('table_first','id');
|
||||
|
||||
SET citus.next_shard_id TO 7;
|
||||
SET citus.shard_count TO 1;
|
||||
SELECT create_distributed_table('table_second', 'id', colocate_with => 'table_first');
|
||||
|
||||
\c - - - :worker_1_port
|
||||
CREATE TABLE table_first_5(id bigserial PRIMARY KEY, value char);
|
||||
CREATE TABLE table_first_6(id bigserial PRIMARY KEY, value char);
|
||||
|
||||
CREATE TABLE table_second_8(id bigserial PRIMARY KEY, value char);
|
||||
CREATE TABLE table_second_9(id bigserial PRIMARY KEY, value char);
|
||||
|
||||
\c - - - :worker_2_port
|
||||
CREATE TABLE table_first_4(id bigserial PRIMARY KEY, value char);
|
||||
CREATE TABLE table_first_5(id bigserial PRIMARY KEY, value char);
|
||||
CREATE TABLE table_first_6(id bigserial PRIMARY KEY, value char);
|
||||
|
||||
CREATE TABLE table_second_7(id bigserial PRIMARY KEY, value char);
|
||||
CREATE TABLE table_second_8(id bigserial PRIMARY KEY, value char);
|
||||
CREATE TABLE table_second_9(id bigserial PRIMARY KEY, value char);
|
||||
|
||||
-- Test scenario one starts from here
|
||||
-- 1. table_first and table_second are colocated tables.
|
||||
-- 2. Shard table_first_4 and table_second_7 are colocated on worker1
|
||||
-- 3. table_first_4 is split into table_first_5 and table_first_6 with target as worker2
|
||||
-- 4. table_second_7 is split into table_second_8 and table_second_9 with target as worker2
|
||||
-- 5. Create Replication slot with 'decoding_plugin_for_shard_split'
|
||||
-- 6. Setup Pub/Sub
|
||||
-- 7. Insert into table_first_4 and table_second_7 at source worker1
|
||||
-- 8. Expect the results in child shards on worker2
|
||||
|
||||
\c - - - :worker_1_port
|
||||
-- Create publication at worker1
|
||||
BEGIN;
|
||||
CREATE PUBLICATION PUB1 for table table_first_4, table_first_5, table_first_6, table_second_7, table_second_8, table_second_9;
|
||||
COMMIT;
|
||||
|
||||
BEGIN;
|
||||
select 1 from public.CreateReplicationSlotForColocatedShards(:worker_2_node, :worker_2_node);
|
||||
COMMIT;
|
||||
SELECT pg_sleep(10);
|
||||
|
||||
\c - - - :worker_2_port
|
||||
-- Create subscription at worker2 with copy_data to 'false' and derived replication slot name
|
||||
BEGIN;
|
||||
SELECT 1 from public.CreateSubscription(:worker_2_node, 'SUB1');
|
||||
COMMIT;
|
||||
|
||||
-- No data is present at this moment in all the below tables at worker2
|
||||
SELECT * from table_first_4;
|
||||
SELECT * from table_first_5;
|
||||
SELECT * from table_first_6;
|
||||
select pg_sleep(10);
|
||||
|
||||
-- Insert data in table_to_split_1 at worker1
|
||||
\c - - - :worker_1_port
|
||||
INSERT into table_first_4 values(100, 'a');
|
||||
INSERT into table_first_4 values(400, 'a');
|
||||
INSERT into table_first_4 values(500, 'a');
|
||||
|
||||
SELECT * from table_first_4;
|
||||
SELECT * from table_first_5;
|
||||
SELECT * from table_first_6;
|
||||
|
||||
INSERT into table_second_7 values(100, 'a');
|
||||
INSERT into table_second_7 values(400, 'a');
|
||||
INSERT into table_second_7 values(500, 'a');
|
||||
|
||||
SELECT * FROM table_second_7;
|
||||
SELECT * FROM table_second_8;
|
||||
SELECT * FROM table_second_9;
|
||||
select pg_sleep(10);
|
||||
|
||||
-- Expect data to be present in shard 5,6 and 8,9 based on the hash value.
|
||||
\c - - - :worker_2_port
|
||||
select pg_sleep(10);
|
||||
SELECT * from table_first_4; -- should alwasy have zero rows
|
||||
SELECT * from table_first_5;
|
||||
SELECT * from table_first_6;
|
||||
|
||||
SELECT * FROM table_second_7; --should alwasy have zero rows
|
||||
SELECT * FROM table_second_8;
|
||||
SELECT * FROM table_second_9;
|
|
@ -185,20 +185,25 @@ DROP SUBSCRIPTION SUB1;
|
|||
DELETE FROM slotName_table;
|
||||
|
||||
\c - - - :worker_2_port
|
||||
|
||||
SET client_min_messages TO WARNING;
|
||||
DROP SUBSCRIPTION SUB2;
|
||||
DELETE FROM slotName_table;
|
||||
|
||||
-- Test Scenario 3
|
||||
-- Test scenario three starts from here (parent shard and child shards are located on same machine)
|
||||
-- 1. table_to_split_1 is split into table_to_split_2 and table_to_split_3.
|
||||
-- 2. table_to_split_1 is located on worker1.
|
||||
-- 3. table_to_split_2 and table_to_split_3 are located on worker1
|
||||
|
||||
\c - - - :worker_1_port
|
||||
SET client_min_messages TO WARNING;
|
||||
|
||||
-- Create publication at worker1
|
||||
BEGIN;
|
||||
CREATE PUBLICATION PUB1 for table table_to_split_1, table_to_split_2, table_to_split_3;
|
||||
COMMIT;
|
||||
|
||||
-- Create replication slots for two target nodes worker1 and worker2.
|
||||
-- Worker1 is target for table_to_split_2 and Worker2 is target for table_to_split_3
|
||||
-- Worker1 is target for table_to_split_2 and table_to_split_3
|
||||
BEGIN;
|
||||
select 1 from public.CreateReplicationSlot(:worker_1_node, :worker_1_node);
|
||||
COMMIT;
|
||||
|
@ -210,12 +215,12 @@ SELECT 1 from public.CreateSubscription(:worker_1_node, 'SUB1');
|
|||
COMMIT;
|
||||
SELECT pg_sleep(10);
|
||||
|
||||
INSERT into table_to_split_1 values(100, 'a');
|
||||
INSERT into table_to_split_1 values(100, 'a');
|
||||
INSERT into table_to_split_1 values(400, 'a');
|
||||
INSERT into table_to_split_1 values(500, 'a');
|
||||
select pg_sleep(10);
|
||||
|
||||
-- expect data to present in table_to_split_2 on worker1
|
||||
-- expect data to present in table_to_split_2/3 on worker1
|
||||
SELECT * from table_to_split_1;
|
||||
SELECT * from table_to_split_2;
|
||||
SELECT * from table_to_split_3;
|
||||
|
@ -225,4 +230,9 @@ DELETE FROM table_to_split_1;
|
|||
SELECT pg_sleep(10);
|
||||
SELECT * from table_to_split_1;
|
||||
SELECT * from table_to_split_2;
|
||||
SELECT * from table_to_split_3;
|
||||
SELECT * from table_to_split_3;
|
||||
|
||||
-- clean up
|
||||
DROP PUBLICATION PUB1;
|
||||
DELETE FROM slotName_table;
|
||||
DROP SUBSCRIPTION SUB1;
|
|
@ -0,0 +1,97 @@
|
|||
-- File to create functions and helpers needed for split shard tests
|
||||
|
||||
-- Populates shared memory mapping for parent shard with id 1.
|
||||
-- targetNode1, targetNode2 are the locations where child shard 2 and 3 are placed respectively
|
||||
CREATE OR REPLACE FUNCTION SplitShardReplicationSetup(targetNode1 integer, targetNode2 integer) RETURNS text AS $$
|
||||
DECLARE
|
||||
memoryId bigint := 0;
|
||||
memoryIdText text;
|
||||
begin
|
||||
SELECT * into memoryId from split_shard_replication_setup(ARRAY[ARRAY[1,2,-2147483648,-1, targetNode1], ARRAY[1,3,0,2147483647,targetNode2]]);
|
||||
SELECT FORMAT('%s', memoryId) into memoryIdText;
|
||||
return memoryIdText;
|
||||
end
|
||||
$$ LANGUAGE plpgsql;
|
||||
|
||||
-- Create replication slots for targetNode1 and targetNode2 incase of non-colocated shards
|
||||
CREATE OR REPLACE FUNCTION CreateReplicationSlot(targetNode1 integer, targetNode2 integer) RETURNS text AS $$
|
||||
DECLARE
|
||||
targetOneSlotName text;
|
||||
targetTwoSlotName text;
|
||||
sharedMemoryId text;
|
||||
derivedSlotName text;
|
||||
begin
|
||||
|
||||
SELECT * into sharedMemoryId from public.SplitShardReplicationSetup(targetNode1, targetNode2);
|
||||
SELECT FORMAT('%s_%s', targetNode1, sharedMemoryId) into derivedSlotName;
|
||||
SELECT slot_name into targetOneSlotName from pg_create_logical_replication_slot(derivedSlotName, 'decoding_plugin_for_shard_split');
|
||||
|
||||
-- if new child shards are placed on different nodes, create one more replication slot
|
||||
if (targetNode1 != targetNode2) then
|
||||
SELECT FORMAT('%s_%s', targetNode2, sharedMemoryId) into derivedSlotName;
|
||||
SELECT slot_name into targetTwoSlotName from pg_create_logical_replication_slot(derivedSlotName, 'decoding_plugin_for_shard_split');
|
||||
INSERT INTO slotName_table values(targetTwoSlotName, targetNode2, 1);
|
||||
end if;
|
||||
|
||||
INSERT INTO slotName_table values(targetOneSlotName, targetNode1, 2);
|
||||
return targetOneSlotName;
|
||||
end
|
||||
$$ LANGUAGE plpgsql;
|
||||
|
||||
-- Populates shared memory mapping for colocated parent shards 4 and 7.
|
||||
-- shard 4 has child shards 5 and 6. Shard 7 has child shards 8 and 9.
|
||||
CREATE OR REPLACE FUNCTION SplitShardReplicationSetupForColocatedShards(targetNode1 integer, targetNode2 integer) RETURNS text AS $$
|
||||
DECLARE
|
||||
memoryId bigint := 0;
|
||||
memoryIdText text;
|
||||
begin
|
||||
SELECT * into memoryId from split_shard_replication_setup(
|
||||
ARRAY[
|
||||
ARRAY[4, 5, -2147483648,-1, targetNode1],
|
||||
ARRAY[4, 6, 0 ,2147483647, targetNode2],
|
||||
ARRAY[7, 8, -2147483648,-1, targetNode1],
|
||||
ARRAY[7, 9, 0, 2147483647 , targetNode2]
|
||||
]);
|
||||
|
||||
SELECT FORMAT('%s', memoryId) into memoryIdText;
|
||||
return memoryIdText;
|
||||
end
|
||||
$$ LANGUAGE plpgsql;
|
||||
|
||||
-- Create replication slots for targetNode1 and targetNode2 incase of colocated shards
|
||||
CREATE OR REPLACE FUNCTION CreateReplicationSlotForColocatedShards(targetNode1 integer, targetNode2 integer) RETURNS text AS $$
|
||||
DECLARE
|
||||
targetOneSlotName text;
|
||||
targetTwoSlotName text;
|
||||
sharedMemoryId text;
|
||||
derivedSlotName text;
|
||||
begin
|
||||
|
||||
SELECT * into sharedMemoryId from public.SplitShardReplicationSetupForColocatedShards(targetNode1, targetNode2);
|
||||
SELECT FORMAT('%s_%s', targetNode1, sharedMemoryId) into derivedSlotName;
|
||||
SELECT slot_name into targetOneSlotName from pg_create_logical_replication_slot(derivedSlotName, 'decoding_plugin_for_shard_split');
|
||||
|
||||
-- if new child shards are placed on different nodes, create one more replication slot
|
||||
if (targetNode1 != targetNode2) then
|
||||
SELECT FORMAT('%s_%s', targetNode2, sharedMemoryId) into derivedSlotName;
|
||||
SELECT slot_name into targetTwoSlotName from pg_create_logical_replication_slot(derivedSlotName, 'decoding_plugin_for_shard_split');
|
||||
INSERT INTO slotName_table values(targetTwoSlotName, targetNode2, 1);
|
||||
end if;
|
||||
|
||||
INSERT INTO slotName_table values(targetOneSlotName, targetNode1, 2);
|
||||
return targetOneSlotName;
|
||||
end
|
||||
$$ LANGUAGE plpgsql;
|
||||
|
||||
-- create subscription on target node with given 'subscriptionName'
|
||||
CREATE OR REPLACE FUNCTION CreateSubscription(targetNodeId integer, subscriptionName text) RETURNS text AS $$
|
||||
DECLARE
|
||||
replicationSlotName text;
|
||||
nodeportLocal int;
|
||||
subname text;
|
||||
begin
|
||||
SELECT name into replicationSlotName from slotName_table where nodeId = targetNodeId;
|
||||
EXECUTE FORMAT($sub$create subscription %s connection 'host=localhost port=57637 user=postgres dbname=regression' publication PUB1 with(create_slot=false, enabled=true, slot_name='%s', copy_data=false)$sub$, subscriptionName, replicationSlotName);
|
||||
return replicationSlotName;
|
||||
end
|
||||
$$ LANGUAGE plpgsql;
|
Loading…
Reference in New Issue