mirror of https://github.com/citusdata/citus.git
Change encoding of replication slot names. Include table owners
parent
04c2a92455
commit
65eb62e723
|
@ -22,10 +22,17 @@ PG_FUNCTION_INFO_V1(split_shard_replication_setup);
|
|||
|
||||
static HTAB *ShardInfoHashMap = NULL;
|
||||
|
||||
/* key for NodeShardMappingEntry */
|
||||
typedef struct NodeShardMappingKey
|
||||
{
|
||||
uint32_t nodeId;
|
||||
Oid tableOwnerId;
|
||||
} NodeShardMappingKey;
|
||||
|
||||
/* Entry for hash map */
|
||||
typedef struct NodeShardMappingEntry
|
||||
{
|
||||
uint32_t keyNodeId;
|
||||
NodeShardMappingKey key;
|
||||
List *shardSplitInfoList;
|
||||
} NodeShardMappingEntry;
|
||||
|
||||
|
@ -47,7 +54,11 @@ static void PopulateShardSplitInfoInSM(ShardSplitInfo *shardSplitInfoArray,
|
|||
HTAB *shardInfoHashMap,
|
||||
dsm_handle dsmHandle,
|
||||
int shardSplitInfoCount);
|
||||
|
||||
static void SetupHashMapForShardInfo(void);
|
||||
static uint32 NodeShardMappingHash(const void *key, Size keysize);
|
||||
static int NodeShardMappingHashCompare(const void *left, const void *right, Size keysize);
|
||||
|
||||
|
||||
/*
|
||||
* split_shard_replication_setup UDF creates in-memory data structures
|
||||
|
@ -80,8 +91,8 @@ static void SetupHashMapForShardInfo(void);
|
|||
* distinct target node. The same encoded slot name is stored in one of the fields of the
|
||||
* in-memory data structure(ShardSplitInfo).
|
||||
*
|
||||
* There is a 1-1 mapping between a target node and a replication slot as one replication
|
||||
* slot takes care of replicating changes for one node.
|
||||
* There is a 1-1 mapping between a target node and a replication slot. One replication
|
||||
* slot takes care of replicating changes for all shards belonging to the same owner on that node.
|
||||
*
|
||||
* During the replication phase, 'decoding_plugin_for_shard_split' called for a change on a particular
|
||||
* replication slot, will decode the shared memory handle from its slot name and will attach to the
|
||||
|
@ -151,11 +162,13 @@ SetupHashMapForShardInfo()
|
|||
{
|
||||
HASHCTL info;
|
||||
memset(&info, 0, sizeof(info));
|
||||
info.keysize = sizeof(uint32_t);
|
||||
info.keysize = sizeof(NodeShardMappingKey);
|
||||
info.entrysize = sizeof(NodeShardMappingEntry);
|
||||
info.hash = uint32_hash;
|
||||
info.hash = NodeShardMappingHash;
|
||||
info.match = NodeShardMappingHashCompare;
|
||||
info.hcxt = CurrentMemoryContext;
|
||||
int hashFlags = (HASH_ELEM | HASH_CONTEXT | HASH_FUNCTION);
|
||||
|
||||
int hashFlags = (HASH_ELEM | HASH_CONTEXT | HASH_FUNCTION | HASH_COMPARE);
|
||||
|
||||
ShardInfoHashMap = hash_create("ShardInfoMap", 128, &info, hashFlags);
|
||||
}
|
||||
|
@ -386,16 +399,17 @@ CreateShardSplitInfo(uint64 sourceShardIdToSplit,
|
|||
static void
|
||||
AddShardSplitInfoEntryForNodeInMap(ShardSplitInfo *shardSplitInfo)
|
||||
{
|
||||
uint32_t keyNodeId = shardSplitInfo->nodeId;
|
||||
NodeShardMappingKey key;
|
||||
key.nodeId = shardSplitInfo->nodeId;
|
||||
key.tableOwnerId = TableOwnerOid(shardSplitInfo->distributedTableOid);
|
||||
|
||||
bool found = false;
|
||||
NodeShardMappingEntry *nodeMappingEntry =
|
||||
(NodeShardMappingEntry *) hash_search(ShardInfoHashMap, &keyNodeId, HASH_ENTER,
|
||||
(NodeShardMappingEntry *) hash_search(ShardInfoHashMap, &key, HASH_ENTER,
|
||||
&found);
|
||||
|
||||
if (!found)
|
||||
{
|
||||
nodeMappingEntry->shardSplitInfoList = NULL;
|
||||
nodeMappingEntry->keyNodeId = keyNodeId;
|
||||
}
|
||||
|
||||
nodeMappingEntry->shardSplitInfoList =
|
||||
|
@ -429,9 +443,10 @@ PopulateShardSplitInfoInSM(ShardSplitInfo *shardSplitInfoArray,
|
|||
int index = 0;
|
||||
while ((entry = (NodeShardMappingEntry *) hash_seq_search(&status)) != NULL)
|
||||
{
|
||||
uint32_t nodeId = entry->keyNodeId;
|
||||
uint32_t nodeId = entry->key.nodeId;
|
||||
uint32_t tableOwnerId = entry->key.tableOwnerId;
|
||||
char *derivedSlotName =
|
||||
encode_replication_slot(nodeId, dsmHandle);
|
||||
encode_replication_slot(nodeId, dsmHandle, tableOwnerId);
|
||||
|
||||
List *shardSplitInfoList = entry->shardSplitInfoList;
|
||||
ListCell *listCell = NULL;
|
||||
|
@ -452,3 +467,38 @@ PopulateShardSplitInfoInSM(ShardSplitInfo *shardSplitInfoArray,
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* NodeShardMappingHash returns hash value by combining hash of node id
|
||||
* and tableowner Id.
|
||||
*/
|
||||
static uint32
|
||||
NodeShardMappingHash(const void *key, Size keysize)
|
||||
{
|
||||
NodeShardMappingKey *entry = (NodeShardMappingKey *) key;
|
||||
uint32 hash = hash_uint32(entry->nodeId);
|
||||
hash = hash_combine(hash, hash_uint32(entry->tableOwnerId));
|
||||
return hash;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* Comparator function for hash keys
|
||||
*/
|
||||
static int
|
||||
NodeShardMappingHashCompare(const void *left, const void *right, Size keysize)
|
||||
{
|
||||
NodeShardMappingKey *leftKey = (NodeShardMappingKey *) left;
|
||||
NodeShardMappingKey *rightKey = (NodeShardMappingKey *) right;
|
||||
|
||||
if (leftKey->nodeId != rightKey->nodeId ||
|
||||
leftKey->tableOwnerId != rightKey->tableOwnerId)
|
||||
{
|
||||
return 1;
|
||||
}
|
||||
else
|
||||
{
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -183,14 +183,15 @@ ShardSplitInfoSMData(ShardSplitInfoSMHeader *shardSplitInfoSMHeader)
|
|||
/*
|
||||
* encode_replication_slot returns an encoded replication slot name
|
||||
* in the following format.
|
||||
* Slot Name = NodeId_SharedMemoryHandle
|
||||
* Slot Name = NodeId_SharedMemoryHandle_TableOwnerOid
|
||||
*/
|
||||
char *
|
||||
encode_replication_slot(uint32_t nodeId,
|
||||
dsm_handle dsmHandle)
|
||||
dsm_handle dsmHandle,
|
||||
uint32_t tableOwnerId)
|
||||
{
|
||||
StringInfo slotName = makeStringInfo();
|
||||
appendStringInfo(slotName, "%u_%u", nodeId, dsmHandle);
|
||||
appendStringInfo(slotName, "%u_%u_%u", nodeId, dsmHandle, tableOwnerId);
|
||||
return slotName->data;
|
||||
}
|
||||
|
||||
|
@ -230,15 +231,18 @@ decode_replication_slot(char *slotName,
|
|||
{
|
||||
*dsmHandle = strtoul(slotNameString, NULL, 10);
|
||||
}
|
||||
|
||||
slotNameString = strtok_r(NULL, "_", &strtokPosition);
|
||||
index++;
|
||||
|
||||
/*Ignoring TableOwnerOid*/
|
||||
}
|
||||
|
||||
/*
|
||||
* Replication slot name is encoded as NodeId_SharedMemoryHandle. Hence the number of tokens
|
||||
* would be strictly two considering "_" as delimiter.
|
||||
* Replication slot name is encoded as NodeId_SharedMemoryHandle_TableOwnerOid.
|
||||
* Hence the number of tokens would be strictly three considering "_" as delimiter.
|
||||
*/
|
||||
if (index != 2)
|
||||
if (index != 3)
|
||||
{
|
||||
ereport(ERROR,
|
||||
(errmsg("Invalid Replication Slot name encoding: %s", slotName)));
|
||||
|
|
|
@ -37,7 +37,8 @@ extern ShardSplitInfo * GetShardSplitInfoSMArrayForSlot(char *slotName,
|
|||
|
||||
/* Functions related to encoding-decoding for replication slot name */
|
||||
char * encode_replication_slot(uint32_t nodeId,
|
||||
dsm_handle dsmHandle);
|
||||
dsm_handle dsmHandle,
|
||||
uint32_t tableOwnerId);
|
||||
void decode_replication_slot(char *slotName,
|
||||
uint32_t *nodeId,
|
||||
dsm_handle *dsmHandle);
|
||||
|
|
|
@ -2,53 +2,64 @@
|
|||
SET citus.shard_replication_factor TO 1;
|
||||
SET citus.shard_count TO 1;
|
||||
SET citus.next_shard_id TO 4;
|
||||
CREATE USER myuser;
|
||||
CREATE USER admin_user;
|
||||
SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset
|
||||
SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \gset
|
||||
-- Create distributed table (co-located)
|
||||
\c - myuser - -
|
||||
SET citus.shard_replication_factor TO 1;
|
||||
SET citus.shard_count TO 1;
|
||||
SET citus.next_shard_id TO 4;
|
||||
CREATE TABLE table_first (id bigserial PRIMARY KEY, value char);
|
||||
CREATE TABLE table_second (id bigserial PRIMARY KEY, value char);
|
||||
SELECT create_distributed_table('table_first','id');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
\c - admin_user - -
|
||||
SET citus.next_shard_id TO 7;
|
||||
SET citus.shard_count TO 1;
|
||||
CREATE TABLE table_second (id bigserial PRIMARY KEY, value char);
|
||||
SELECT create_distributed_table('table_second', 'id', colocate_with => 'table_first');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
\c - - - :worker_1_port
|
||||
\c - myuser - :worker_1_port
|
||||
CREATE TABLE table_first_5(id bigserial PRIMARY KEY, value char);
|
||||
CREATE TABLE table_first_6(id bigserial PRIMARY KEY, value char);
|
||||
CREATE TABLE table_second_8(id bigserial PRIMARY KEY, value char);
|
||||
CREATE TABLE table_second_9(id bigserial PRIMARY KEY, value char);
|
||||
\c - - - :worker_2_port
|
||||
\c - myuser - :worker_2_port
|
||||
CREATE TABLE table_first_4(id bigserial PRIMARY KEY, value char);
|
||||
CREATE TABLE table_first_5(id bigserial PRIMARY KEY, value char);
|
||||
CREATE TABLE table_first_6(id bigserial PRIMARY KEY, value char);
|
||||
\c - admin_user - :worker_1_port
|
||||
CREATE TABLE table_second_8(id bigserial PRIMARY KEY, value char);
|
||||
CREATE TABLE table_second_9(id bigserial PRIMARY KEY, value char);
|
||||
\c - admin_user - :worker_2_port
|
||||
CREATE TABLE table_second_7(id bigserial PRIMARY KEY, value char);
|
||||
CREATE TABLE table_second_8(id bigserial PRIMARY KEY, value char);
|
||||
CREATE TABLE table_second_9(id bigserial PRIMARY KEY, value char);
|
||||
-- Test scenario one starts from here
|
||||
-- 1. table_first and table_second are colocated tables.
|
||||
-- 2. Shard table_first_4 and table_second_7 are colocated on worker1
|
||||
-- 3. table_first_4 is split into table_first_5 and table_first_6 with target as worker2
|
||||
-- 4. table_second_7 is split into table_second_8 and table_second_9 with target as worker2
|
||||
-- 5. Create Replication slot with 'decoding_plugin_for_shard_split'
|
||||
-- 6. Setup Pub/Sub
|
||||
-- 7. Insert into table_first_4 and table_second_7 at source worker1
|
||||
-- 8. Expect the results in child shards on worker2
|
||||
\c - - - :worker_1_port
|
||||
--- Test scenario one starts from here
|
||||
--- 1. table_first and table_second are colocated tables.
|
||||
--- 2. myuser is the owner table_first and admin_user is the owner of table_second.
|
||||
--- 3. Shard table_first_4 and table_second_7 are colocated on worker1
|
||||
--- 4. table_first_4 is split into table_first_5 and table_first_6 with target as worker2
|
||||
--- 5. table_second_7 is split into table_second_8 and table_second_9 with target as worker2
|
||||
--- 6. Create two publishers and two subscribers for respective table owners.
|
||||
--- 7. Insert into table_first_4 and table_second_7 at source worker1
|
||||
--- 8. Expect the results in child shards on worker2
|
||||
\c - postgres - :worker_1_port
|
||||
-- Create publication at worker1
|
||||
BEGIN;
|
||||
CREATE PUBLICATION PUB1 for table table_first_4, table_first_5, table_first_6, table_second_7, table_second_8, table_second_9;
|
||||
CREATE PUBLICATION PUB1 for table table_first_4, table_first_5, table_first_6;
|
||||
COMMIT;
|
||||
BEGIN;
|
||||
select 1 from public.CreateReplicationSlotForColocatedShards(:worker_2_node, :worker_2_node);
|
||||
CREATE PUBLICATION PUB2 for table table_second_7, table_second_8, table_second_9;
|
||||
COMMIT;
|
||||
BEGIN;
|
||||
select 1 from public.create_replication_slot_for_colocated_shards(:worker_2_node, :worker_2_node);
|
||||
?column?
|
||||
---------------------------------------------------------------------
|
||||
1
|
||||
|
@ -61,84 +72,44 @@ SELECT pg_sleep(5);
|
|||
|
||||
(1 row)
|
||||
|
||||
\c - - - :worker_2_port
|
||||
-- Create subscription at worker2 with copy_data to 'false' and derived replication slot name
|
||||
\c - postgres - :worker_2_port
|
||||
SET client_min_messages TO WARNING;
|
||||
BEGIN;
|
||||
SELECT 1 from public.CreateSubscription(:worker_2_node, 'SUB1');
|
||||
SELECT 1 from public.create_subscription_for_owner_one(:worker_2_node, 'SUB1');
|
||||
?column?
|
||||
---------------------------------------------------------------------
|
||||
1
|
||||
(1 row)
|
||||
|
||||
COMMIT;
|
||||
-- No data is present at this moment in all the below tables at worker2
|
||||
SELECT * from table_first_4;
|
||||
id | value
|
||||
---------------------------------------------------------------------
|
||||
(0 rows)
|
||||
|
||||
SELECT * from table_first_5;
|
||||
id | value
|
||||
---------------------------------------------------------------------
|
||||
(0 rows)
|
||||
|
||||
SELECT * from table_first_6;
|
||||
id | value
|
||||
---------------------------------------------------------------------
|
||||
(0 rows)
|
||||
|
||||
-- Insert data in table_to_split_1 at worker1
|
||||
\c - - - :worker_1_port
|
||||
\c - myuser - :worker_1_port
|
||||
INSERT into table_first_4 values(100, 'a');
|
||||
INSERT into table_first_4 values(400, 'a');
|
||||
INSERT into table_first_4 values(500, 'a');
|
||||
SELECT * from table_first_4;
|
||||
id | value
|
||||
---------------------------------------------------------------------
|
||||
100 | a
|
||||
400 | a
|
||||
500 | a
|
||||
(3 rows)
|
||||
|
||||
SELECT * from table_first_5;
|
||||
id | value
|
||||
---------------------------------------------------------------------
|
||||
(0 rows)
|
||||
|
||||
SELECT * from table_first_6;
|
||||
id | value
|
||||
---------------------------------------------------------------------
|
||||
(0 rows)
|
||||
|
||||
INSERT into table_second_7 values(100, 'a');
|
||||
INSERT into table_second_7 values(400, 'a');
|
||||
INSERT into table_second_7 values(500, 'a');
|
||||
SELECT * FROM table_second_7;
|
||||
id | value
|
||||
---------------------------------------------------------------------
|
||||
100 | a
|
||||
400 | a
|
||||
500 | a
|
||||
(3 rows)
|
||||
|
||||
SELECT * FROM table_second_8;
|
||||
id | value
|
||||
---------------------------------------------------------------------
|
||||
(0 rows)
|
||||
|
||||
SELECT * FROM table_second_9;
|
||||
id | value
|
||||
---------------------------------------------------------------------
|
||||
(0 rows)
|
||||
|
||||
select pg_sleep(5);
|
||||
select pg_sleep(2);
|
||||
pg_sleep
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- Expect data to be present in shard xxxxx,6 and 8,9 based on the hash value.
|
||||
\c - - - :worker_2_port
|
||||
\c - admin_user - :worker_1_port
|
||||
INSERT INTO table_second_7 values(100, 'a');
|
||||
INSERT INTO table_second_7 values(400, 'a');
|
||||
SELECT * from table_second_7;
|
||||
id | value
|
||||
---------------------------------------------------------------------
|
||||
100 | a
|
||||
400 | a
|
||||
(2 rows)
|
||||
|
||||
select pg_sleep(2);
|
||||
pg_sleep
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
\c - myuser - :worker_2_port
|
||||
SELECT * from table_first_4; -- should alwasy have zero rows
|
||||
id | value
|
||||
---------------------------------------------------------------------
|
||||
|
@ -157,21 +128,53 @@ SELECT * from table_first_6;
|
|||
500 | a
|
||||
(2 rows)
|
||||
|
||||
SELECT * FROM table_second_7; --should alwasy have zero rows
|
||||
\c - admin_user - :worker_2_port
|
||||
SELECT * from table_second_7;
|
||||
id | value
|
||||
---------------------------------------------------------------------
|
||||
(0 rows)
|
||||
|
||||
SELECT * FROM table_second_8;
|
||||
SELECT * from table_second_8;
|
||||
id | value
|
||||
---------------------------------------------------------------------
|
||||
(0 rows)
|
||||
|
||||
SELECT * from table_second_9;
|
||||
id | value
|
||||
---------------------------------------------------------------------
|
||||
(0 rows)
|
||||
|
||||
\c - postgres - :worker_2_port
|
||||
SET client_min_messages TO WARNING;
|
||||
BEGIN;
|
||||
SELECT 1 from public.create_subscription_for_owner_two(:worker_2_node, 'SUB2');
|
||||
?column?
|
||||
---------------------------------------------------------------------
|
||||
1
|
||||
(1 row)
|
||||
|
||||
COMMIT;
|
||||
select pg_sleep(5);
|
||||
pg_sleep
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
\c - admin_user - :worker_2_port
|
||||
SELECT * from table_second_7;
|
||||
id | value
|
||||
---------------------------------------------------------------------
|
||||
(0 rows)
|
||||
|
||||
SELECT * from table_second_8;
|
||||
id | value
|
||||
---------------------------------------------------------------------
|
||||
400 | a
|
||||
(1 row)
|
||||
|
||||
SELECT * FROM table_second_9;
|
||||
SELECT * from table_second_9;
|
||||
id | value
|
||||
---------------------------------------------------------------------
|
||||
100 | a
|
||||
500 | a
|
||||
(2 rows)
|
||||
(1 row)
|
||||
|
||||
|
|
|
@ -55,7 +55,7 @@ BEGIN;
|
|||
COMMIT;
|
||||
-- Create replication slot for target node worker2
|
||||
BEGIN;
|
||||
select 1 from public.CreateReplicationSlot(:worker_2_node, :worker_2_node);
|
||||
select 1 from public.create_replication_slot(:worker_2_node, :worker_2_node);
|
||||
?column?
|
||||
---------------------------------------------------------------------
|
||||
1
|
||||
|
@ -65,7 +65,7 @@ COMMIT;
|
|||
\c - - - :worker_2_port
|
||||
-- Create subscription at worker2 with copy_data to 'false' and derived replication slot name
|
||||
BEGIN;
|
||||
SELECT 1 from public.CreateSubscription(:worker_2_node, 'SUB1');
|
||||
SELECT 1 from public.create_subscription(:worker_2_node, 'SUB1');
|
||||
?column?
|
||||
---------------------------------------------------------------------
|
||||
1
|
||||
|
@ -219,7 +219,7 @@ COMMIT;
|
|||
-- Create replication slots for two target nodes worker1 and worker2.
|
||||
-- Worker1 is target for table_to_split_2 and Worker2 is target for table_to_split_3
|
||||
BEGIN;
|
||||
select 1 from public.CreateReplicationSlot(:worker_1_node, :worker_2_node);
|
||||
select 1 from public.create_replication_slot(:worker_1_node, :worker_2_node);
|
||||
?column?
|
||||
---------------------------------------------------------------------
|
||||
1
|
||||
|
@ -228,7 +228,7 @@ select 1 from public.CreateReplicationSlot(:worker_1_node, :worker_2_node);
|
|||
COMMIT;
|
||||
-- Create subscription at worker1 with copy_data to 'false' and derived replication slot name
|
||||
BEGIN;
|
||||
SELECT 1 from public.CreateSubscription(:worker_1_node, 'SUB1');
|
||||
SELECT 1 from public.create_subscription(:worker_1_node, 'SUB1');
|
||||
?column?
|
||||
---------------------------------------------------------------------
|
||||
1
|
||||
|
@ -244,7 +244,7 @@ select pg_sleep(5);
|
|||
\c - - - :worker_2_port
|
||||
-- Create subscription at worker2 with copy_data to 'false' and derived replication slot name
|
||||
BEGIN;
|
||||
SELECT 1 from public.CreateSubscription(:worker_2_node, 'SUB2');
|
||||
SELECT 1 from public.create_subscription(:worker_2_node, 'SUB2');
|
||||
?column?
|
||||
---------------------------------------------------------------------
|
||||
1
|
||||
|
@ -368,7 +368,7 @@ BEGIN;
|
|||
COMMIT;
|
||||
-- Worker1 is target for table_to_split_2 and table_to_split_3
|
||||
BEGIN;
|
||||
select 1 from public.CreateReplicationSlot(:worker_1_node, :worker_1_node);
|
||||
select 1 from public.create_replication_slot(:worker_1_node, :worker_1_node);
|
||||
?column?
|
||||
---------------------------------------------------------------------
|
||||
1
|
||||
|
@ -377,7 +377,7 @@ select 1 from public.CreateReplicationSlot(:worker_1_node, :worker_1_node);
|
|||
COMMIT;
|
||||
-- Create subscription at worker1 with copy_data to 'false' and derived replication slot name
|
||||
BEGIN;
|
||||
SELECT 1 from public.CreateSubscription(:worker_1_node, 'SUB1');
|
||||
SELECT 1 from public.create_subscription(:worker_1_node, 'SUB1');
|
||||
?column?
|
||||
---------------------------------------------------------------------
|
||||
1
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
-- File to create functions and helpers needed for split shard tests
|
||||
-- Populates shared memory mapping for parent shard with id 1.
|
||||
-- targetNode1, targetNode2 are the locations where child shard xxxxx and 3 are placed respectively
|
||||
CREATE OR REPLACE FUNCTION SplitShardReplicationSetup(targetNode1 integer, targetNode2 integer) RETURNS text AS $$
|
||||
CREATE OR REPLACE FUNCTION split_shard_replication_setup_helper(targetNode1 integer, targetNode2 integer) RETURNS text AS $$
|
||||
DECLARE
|
||||
memoryId bigint := 0;
|
||||
memoryIdText text;
|
||||
|
@ -12,7 +12,7 @@ begin
|
|||
end
|
||||
$$ LANGUAGE plpgsql;
|
||||
-- Create replication slots for targetNode1 and targetNode2 incase of non-colocated shards
|
||||
CREATE OR REPLACE FUNCTION CreateReplicationSlot(targetNode1 integer, targetNode2 integer) RETURNS text AS $$
|
||||
CREATE OR REPLACE FUNCTION create_replication_slot(targetNode1 integer, targetNode2 integer) RETURNS text AS $$
|
||||
DECLARE
|
||||
targetOneSlotName text;
|
||||
targetTwoSlotName text;
|
||||
|
@ -20,13 +20,13 @@ DECLARE
|
|||
derivedSlotName text;
|
||||
begin
|
||||
|
||||
SELECT * into sharedMemoryId from public.SplitShardReplicationSetup(targetNode1, targetNode2);
|
||||
SELECT FORMAT('%s_%s', targetNode1, sharedMemoryId) into derivedSlotName;
|
||||
SELECT * into sharedMemoryId from public.split_shard_replication_setup_helper(targetNode1, targetNode2);
|
||||
SELECT FORMAT('%s_%s_10', targetNode1, sharedMemoryId) into derivedSlotName;
|
||||
SELECT slot_name into targetOneSlotName from pg_create_logical_replication_slot(derivedSlotName, 'decoding_plugin_for_shard_split');
|
||||
|
||||
-- if new child shards are placed on different nodes, create one more replication slot
|
||||
if (targetNode1 != targetNode2) then
|
||||
SELECT FORMAT('%s_%s', targetNode2, sharedMemoryId) into derivedSlotName;
|
||||
SELECT FORMAT('%s_%s_10', targetNode2, sharedMemoryId) into derivedSlotName;
|
||||
SELECT slot_name into targetTwoSlotName from pg_create_logical_replication_slot(derivedSlotName, 'decoding_plugin_for_shard_split');
|
||||
INSERT INTO slotName_table values(targetTwoSlotName, targetNode2, 1);
|
||||
end if;
|
||||
|
@ -37,7 +37,7 @@ end
|
|||
$$ LANGUAGE plpgsql;
|
||||
-- Populates shared memory mapping for colocated parent shards 4 and 7.
|
||||
-- shard xxxxx has child shards 5 and 6. Shard 7 has child shards 8 and 9.
|
||||
CREATE OR REPLACE FUNCTION SplitShardReplicationSetupForColocatedShards(targetNode1 integer, targetNode2 integer) RETURNS text AS $$
|
||||
CREATE OR REPLACE FUNCTION split_shard_replication_setup_for_colocated_shards(targetNode1 integer, targetNode2 integer) RETURNS text AS $$
|
||||
DECLARE
|
||||
memoryId bigint := 0;
|
||||
memoryIdText text;
|
||||
|
@ -55,31 +55,36 @@ begin
|
|||
end
|
||||
$$ LANGUAGE plpgsql;
|
||||
-- Create replication slots for targetNode1 and targetNode2 incase of colocated shards
|
||||
CREATE OR REPLACE FUNCTION CreateReplicationSlotForColocatedShards(targetNode1 integer, targetNode2 integer) RETURNS text AS $$
|
||||
CREATE OR REPLACE FUNCTION create_replication_slot_for_colocated_shards(targetNode1 integer, targetNode2 integer) RETURNS text AS $$
|
||||
DECLARE
|
||||
targetOneSlotName text;
|
||||
targetTwoSlotName text;
|
||||
sharedMemoryId text;
|
||||
derivedSlotName text;
|
||||
derivedSlotNameOne text;
|
||||
derivedSlotNameTwo text;
|
||||
tableOwnerOne bigint;
|
||||
tableOwnerTwo bigint;
|
||||
begin
|
||||
-- setup shared memory information
|
||||
SELECT * into sharedMemoryId from public.split_shard_replication_setup_for_colocated_shards(targetNode1, targetNode2);
|
||||
|
||||
SELECT * into sharedMemoryId from public.SplitShardReplicationSetupForColocatedShards(targetNode1, targetNode2);
|
||||
SELECT FORMAT('%s_%s', targetNode1, sharedMemoryId) into derivedSlotName;
|
||||
SELECT slot_name into targetOneSlotName from pg_create_logical_replication_slot(derivedSlotName, 'decoding_plugin_for_shard_split');
|
||||
SELECT relowner into tableOwnerOne from pg_class where relname='table_first';
|
||||
SELECT FORMAT('%s_%s_%s', targetNode1, sharedMemoryId, tableOwnerOne) into derivedSlotNameOne;
|
||||
SELECT slot_name into targetOneSlotName from pg_create_logical_replication_slot(derivedSlotNameOne, 'decoding_plugin_for_shard_split');
|
||||
|
||||
-- if new child shards are placed on different nodes, create one more replication slot
|
||||
if (targetNode1 != targetNode2) then
|
||||
SELECT FORMAT('%s_%s', targetNode2, sharedMemoryId) into derivedSlotName;
|
||||
SELECT slot_name into targetTwoSlotName from pg_create_logical_replication_slot(derivedSlotName, 'decoding_plugin_for_shard_split');
|
||||
INSERT INTO slotName_table values(targetTwoSlotName, targetNode2, 1);
|
||||
end if;
|
||||
SELECT relowner into tableOwnerTwo from pg_class where relname='table_second';
|
||||
SELECT FORMAT('%s_%s_%s', targetNode2, sharedMemoryId, tableOwnerTwo) into derivedSlotNameTwo;
|
||||
SELECT slot_name into targetTwoSlotName from pg_create_logical_replication_slot(derivedSlotNameTwo, 'decoding_plugin_for_shard_split');
|
||||
|
||||
|
||||
INSERT INTO slotName_table values(targetOneSlotName, targetNode1, 1);
|
||||
INSERT INTO slotName_table values(targetTwoSlotName, targetNode2, 2);
|
||||
|
||||
INSERT INTO slotName_table values(targetOneSlotName, targetNode1, 2);
|
||||
return targetOneSlotName;
|
||||
end
|
||||
$$ LANGUAGE plpgsql;
|
||||
-- create subscription on target node with given 'subscriptionName'
|
||||
CREATE OR REPLACE FUNCTION CreateSubscription(targetNodeId integer, subscriptionName text) RETURNS text AS $$
|
||||
CREATE OR REPLACE FUNCTION create_subscription(targetNodeId integer, subscriptionName text) RETURNS text AS $$
|
||||
DECLARE
|
||||
replicationSlotName text;
|
||||
nodeportLocal int;
|
||||
|
@ -90,3 +95,29 @@ begin
|
|||
return replicationSlotName;
|
||||
end
|
||||
$$ LANGUAGE plpgsql;
|
||||
-- create subscription on target node with given 'subscriptionName'
|
||||
CREATE OR REPLACE FUNCTION create_subscription_for_owner_one(targetNodeId integer, subscriptionName text) RETURNS text AS $$
|
||||
DECLARE
|
||||
replicationSlotName text;
|
||||
nodeportLocal int;
|
||||
subname text;
|
||||
begin
|
||||
SELECT name into replicationSlotName from slotName_table where id = 1;
|
||||
EXECUTE FORMAT($sub$create subscription %s connection 'host=localhost port=xxxxx user=postgres dbname=regression' publication PUB1 with(create_slot=false, enabled=true, slot_name='%s', copy_data=false)$sub$, subscriptionName, replicationSlotName);
|
||||
RAISE NOTICE 'sameer %', replicationSlotName;
|
||||
return replicationSlotName;
|
||||
end
|
||||
$$ LANGUAGE plpgsql;
|
||||
-- create subscription on target node with given 'subscriptionName'
|
||||
CREATE OR REPLACE FUNCTION create_subscription_for_owner_two(targetNodeId integer, subscriptionName text) RETURNS text AS $$
|
||||
DECLARE
|
||||
replicationSlotName text;
|
||||
nodeportLocal int;
|
||||
subname text;
|
||||
begin
|
||||
SELECT name into replicationSlotName from slotName_table where id = 2;
|
||||
EXECUTE FORMAT($sub$create subscription %s connection 'host=localhost port=xxxxx user=postgres dbname=regression' publication PUB2 with(create_slot=false, enabled=true, slot_name='%s', copy_data=false)$sub$, subscriptionName, replicationSlotName);
|
||||
RAISE NOTICE 'sameer %', replicationSlotName;
|
||||
return replicationSlotName;
|
||||
end
|
||||
$$ LANGUAGE plpgsql;
|
||||
|
|
|
@ -3,93 +3,106 @@ SET citus.shard_replication_factor TO 1;
|
|||
SET citus.shard_count TO 1;
|
||||
SET citus.next_shard_id TO 4;
|
||||
|
||||
CREATE USER myuser;
|
||||
CREATE USER admin_user;
|
||||
|
||||
SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset
|
||||
SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \gset
|
||||
|
||||
-- Create distributed table (co-located)
|
||||
\c - myuser - -
|
||||
SET citus.shard_replication_factor TO 1;
|
||||
SET citus.shard_count TO 1;
|
||||
SET citus.next_shard_id TO 4;
|
||||
CREATE TABLE table_first (id bigserial PRIMARY KEY, value char);
|
||||
CREATE TABLE table_second (id bigserial PRIMARY KEY, value char);
|
||||
|
||||
SELECT create_distributed_table('table_first','id');
|
||||
|
||||
\c - admin_user - -
|
||||
SET citus.next_shard_id TO 7;
|
||||
SET citus.shard_count TO 1;
|
||||
CREATE TABLE table_second (id bigserial PRIMARY KEY, value char);
|
||||
SELECT create_distributed_table('table_second', 'id', colocate_with => 'table_first');
|
||||
|
||||
\c - - - :worker_1_port
|
||||
\c - myuser - :worker_1_port
|
||||
CREATE TABLE table_first_5(id bigserial PRIMARY KEY, value char);
|
||||
CREATE TABLE table_first_6(id bigserial PRIMARY KEY, value char);
|
||||
|
||||
CREATE TABLE table_second_8(id bigserial PRIMARY KEY, value char);
|
||||
CREATE TABLE table_second_9(id bigserial PRIMARY KEY, value char);
|
||||
|
||||
\c - - - :worker_2_port
|
||||
\c - myuser - :worker_2_port
|
||||
CREATE TABLE table_first_4(id bigserial PRIMARY KEY, value char);
|
||||
CREATE TABLE table_first_5(id bigserial PRIMARY KEY, value char);
|
||||
CREATE TABLE table_first_6(id bigserial PRIMARY KEY, value char);
|
||||
|
||||
\c - admin_user - :worker_1_port
|
||||
CREATE TABLE table_second_8(id bigserial PRIMARY KEY, value char);
|
||||
CREATE TABLE table_second_9(id bigserial PRIMARY KEY, value char);
|
||||
|
||||
\c - admin_user - :worker_2_port
|
||||
CREATE TABLE table_second_7(id bigserial PRIMARY KEY, value char);
|
||||
CREATE TABLE table_second_8(id bigserial PRIMARY KEY, value char);
|
||||
CREATE TABLE table_second_9(id bigserial PRIMARY KEY, value char);
|
||||
|
||||
-- Test scenario one starts from here
|
||||
-- 1. table_first and table_second are colocated tables.
|
||||
-- 2. Shard table_first_4 and table_second_7 are colocated on worker1
|
||||
-- 3. table_first_4 is split into table_first_5 and table_first_6 with target as worker2
|
||||
-- 4. table_second_7 is split into table_second_8 and table_second_9 with target as worker2
|
||||
-- 5. Create Replication slot with 'decoding_plugin_for_shard_split'
|
||||
-- 6. Setup Pub/Sub
|
||||
-- 7. Insert into table_first_4 and table_second_7 at source worker1
|
||||
-- 8. Expect the results in child shards on worker2
|
||||
--- Test scenario one starts from here
|
||||
--- 1. table_first and table_second are colocated tables.
|
||||
--- 2. myuser is the owner table_first and admin_user is the owner of table_second.
|
||||
--- 3. Shard table_first_4 and table_second_7 are colocated on worker1
|
||||
--- 4. table_first_4 is split into table_first_5 and table_first_6 with target as worker2
|
||||
--- 5. table_second_7 is split into table_second_8 and table_second_9 with target as worker2
|
||||
--- 6. Create two publishers and two subscribers for respective table owners.
|
||||
--- 7. Insert into table_first_4 and table_second_7 at source worker1
|
||||
--- 8. Expect the results in child shards on worker2
|
||||
|
||||
\c - - - :worker_1_port
|
||||
\c - postgres - :worker_1_port
|
||||
-- Create publication at worker1
|
||||
BEGIN;
|
||||
CREATE PUBLICATION PUB1 for table table_first_4, table_first_5, table_first_6, table_second_7, table_second_8, table_second_9;
|
||||
CREATE PUBLICATION PUB1 for table table_first_4, table_first_5, table_first_6;
|
||||
COMMIT;
|
||||
|
||||
BEGIN;
|
||||
select 1 from public.CreateReplicationSlotForColocatedShards(:worker_2_node, :worker_2_node);
|
||||
CREATE PUBLICATION PUB2 for table table_second_7, table_second_8, table_second_9;
|
||||
COMMIT;
|
||||
|
||||
BEGIN;
|
||||
select 1 from public.create_replication_slot_for_colocated_shards(:worker_2_node, :worker_2_node);
|
||||
COMMIT;
|
||||
SELECT pg_sleep(5);
|
||||
|
||||
\c - - - :worker_2_port
|
||||
|
||||
-- Create subscription at worker2 with copy_data to 'false' and derived replication slot name
|
||||
\c - postgres - :worker_2_port
|
||||
SET client_min_messages TO WARNING;
|
||||
BEGIN;
|
||||
SELECT 1 from public.CreateSubscription(:worker_2_node, 'SUB1');
|
||||
SELECT 1 from public.create_subscription_for_owner_one(:worker_2_node, 'SUB1');
|
||||
COMMIT;
|
||||
|
||||
-- No data is present at this moment in all the below tables at worker2
|
||||
SELECT * from table_first_4;
|
||||
SELECT * from table_first_5;
|
||||
SELECT * from table_first_6;
|
||||
|
||||
|
||||
-- Insert data in table_to_split_1 at worker1
|
||||
\c - - - :worker_1_port
|
||||
\c - myuser - :worker_1_port
|
||||
INSERT into table_first_4 values(100, 'a');
|
||||
INSERT into table_first_4 values(400, 'a');
|
||||
INSERT into table_first_4 values(500, 'a');
|
||||
select pg_sleep(2);
|
||||
|
||||
SELECT * from table_first_4;
|
||||
SELECT * from table_first_5;
|
||||
SELECT * from table_first_6;
|
||||
\c - admin_user - :worker_1_port
|
||||
INSERT INTO table_second_7 values(100, 'a');
|
||||
INSERT INTO table_second_7 values(400, 'a');
|
||||
SELECT * from table_second_7;
|
||||
select pg_sleep(2);
|
||||
|
||||
INSERT into table_second_7 values(100, 'a');
|
||||
INSERT into table_second_7 values(400, 'a');
|
||||
INSERT into table_second_7 values(500, 'a');
|
||||
|
||||
SELECT * FROM table_second_7;
|
||||
SELECT * FROM table_second_8;
|
||||
SELECT * FROM table_second_9;
|
||||
select pg_sleep(5);
|
||||
|
||||
-- Expect data to be present in shard 5,6 and 8,9 based on the hash value.
|
||||
\c - - - :worker_2_port
|
||||
\c - myuser - :worker_2_port
|
||||
SELECT * from table_first_4; -- should alwasy have zero rows
|
||||
SELECT * from table_first_5;
|
||||
SELECT * from table_first_6;
|
||||
|
||||
SELECT * FROM table_second_7; --should alwasy have zero rows
|
||||
SELECT * FROM table_second_8;
|
||||
SELECT * FROM table_second_9;
|
||||
\c - admin_user - :worker_2_port
|
||||
SELECT * from table_second_7;
|
||||
SELECT * from table_second_8;
|
||||
SELECT * from table_second_9;
|
||||
|
||||
\c - postgres - :worker_2_port
|
||||
SET client_min_messages TO WARNING;
|
||||
BEGIN;
|
||||
SELECT 1 from public.create_subscription_for_owner_two(:worker_2_node, 'SUB2');
|
||||
COMMIT;
|
||||
select pg_sleep(5);
|
||||
|
||||
\c - admin_user - :worker_2_port
|
||||
SELECT * from table_second_7;
|
||||
SELECT * from table_second_8;
|
||||
SELECT * from table_second_9;
|
|
@ -53,13 +53,13 @@ COMMIT;
|
|||
|
||||
-- Create replication slot for target node worker2
|
||||
BEGIN;
|
||||
select 1 from public.CreateReplicationSlot(:worker_2_node, :worker_2_node);
|
||||
select 1 from public.create_replication_slot(:worker_2_node, :worker_2_node);
|
||||
COMMIT;
|
||||
|
||||
\c - - - :worker_2_port
|
||||
-- Create subscription at worker2 with copy_data to 'false' and derived replication slot name
|
||||
BEGIN;
|
||||
SELECT 1 from public.CreateSubscription(:worker_2_node, 'SUB1');
|
||||
SELECT 1 from public.create_subscription(:worker_2_node, 'SUB1');
|
||||
COMMIT;
|
||||
select pg_sleep(5);
|
||||
|
||||
|
@ -132,19 +132,19 @@ COMMIT;
|
|||
-- Create replication slots for two target nodes worker1 and worker2.
|
||||
-- Worker1 is target for table_to_split_2 and Worker2 is target for table_to_split_3
|
||||
BEGIN;
|
||||
select 1 from public.CreateReplicationSlot(:worker_1_node, :worker_2_node);
|
||||
select 1 from public.create_replication_slot(:worker_1_node, :worker_2_node);
|
||||
COMMIT;
|
||||
|
||||
-- Create subscription at worker1 with copy_data to 'false' and derived replication slot name
|
||||
BEGIN;
|
||||
SELECT 1 from public.CreateSubscription(:worker_1_node, 'SUB1');
|
||||
SELECT 1 from public.create_subscription(:worker_1_node, 'SUB1');
|
||||
COMMIT;
|
||||
select pg_sleep(5);
|
||||
|
||||
\c - - - :worker_2_port
|
||||
-- Create subscription at worker2 with copy_data to 'false' and derived replication slot name
|
||||
BEGIN;
|
||||
SELECT 1 from public.CreateSubscription(:worker_2_node, 'SUB2');
|
||||
SELECT 1 from public.create_subscription(:worker_2_node, 'SUB2');
|
||||
COMMIT;
|
||||
select pg_sleep(5);
|
||||
|
||||
|
@ -212,12 +212,12 @@ COMMIT;
|
|||
|
||||
-- Worker1 is target for table_to_split_2 and table_to_split_3
|
||||
BEGIN;
|
||||
select 1 from public.CreateReplicationSlot(:worker_1_node, :worker_1_node);
|
||||
select 1 from public.create_replication_slot(:worker_1_node, :worker_1_node);
|
||||
COMMIT;
|
||||
|
||||
-- Create subscription at worker1 with copy_data to 'false' and derived replication slot name
|
||||
BEGIN;
|
||||
SELECT 1 from public.CreateSubscription(:worker_1_node, 'SUB1');
|
||||
SELECT 1 from public.create_subscription(:worker_1_node, 'SUB1');
|
||||
COMMIT;
|
||||
SELECT pg_sleep(5);
|
||||
|
||||
|
|
|
@ -2,7 +2,7 @@
|
|||
|
||||
-- Populates shared memory mapping for parent shard with id 1.
|
||||
-- targetNode1, targetNode2 are the locations where child shard 2 and 3 are placed respectively
|
||||
CREATE OR REPLACE FUNCTION SplitShardReplicationSetup(targetNode1 integer, targetNode2 integer) RETURNS text AS $$
|
||||
CREATE OR REPLACE FUNCTION split_shard_replication_setup_helper(targetNode1 integer, targetNode2 integer) RETURNS text AS $$
|
||||
DECLARE
|
||||
memoryId bigint := 0;
|
||||
memoryIdText text;
|
||||
|
@ -14,7 +14,7 @@ end
|
|||
$$ LANGUAGE plpgsql;
|
||||
|
||||
-- Create replication slots for targetNode1 and targetNode2 incase of non-colocated shards
|
||||
CREATE OR REPLACE FUNCTION CreateReplicationSlot(targetNode1 integer, targetNode2 integer) RETURNS text AS $$
|
||||
CREATE OR REPLACE FUNCTION create_replication_slot(targetNode1 integer, targetNode2 integer) RETURNS text AS $$
|
||||
DECLARE
|
||||
targetOneSlotName text;
|
||||
targetTwoSlotName text;
|
||||
|
@ -22,13 +22,13 @@ DECLARE
|
|||
derivedSlotName text;
|
||||
begin
|
||||
|
||||
SELECT * into sharedMemoryId from public.SplitShardReplicationSetup(targetNode1, targetNode2);
|
||||
SELECT FORMAT('%s_%s', targetNode1, sharedMemoryId) into derivedSlotName;
|
||||
SELECT * into sharedMemoryId from public.split_shard_replication_setup_helper(targetNode1, targetNode2);
|
||||
SELECT FORMAT('%s_%s_10', targetNode1, sharedMemoryId) into derivedSlotName;
|
||||
SELECT slot_name into targetOneSlotName from pg_create_logical_replication_slot(derivedSlotName, 'decoding_plugin_for_shard_split');
|
||||
|
||||
-- if new child shards are placed on different nodes, create one more replication slot
|
||||
if (targetNode1 != targetNode2) then
|
||||
SELECT FORMAT('%s_%s', targetNode2, sharedMemoryId) into derivedSlotName;
|
||||
SELECT FORMAT('%s_%s_10', targetNode2, sharedMemoryId) into derivedSlotName;
|
||||
SELECT slot_name into targetTwoSlotName from pg_create_logical_replication_slot(derivedSlotName, 'decoding_plugin_for_shard_split');
|
||||
INSERT INTO slotName_table values(targetTwoSlotName, targetNode2, 1);
|
||||
end if;
|
||||
|
@ -40,7 +40,7 @@ $$ LANGUAGE plpgsql;
|
|||
|
||||
-- Populates shared memory mapping for colocated parent shards 4 and 7.
|
||||
-- shard 4 has child shards 5 and 6. Shard 7 has child shards 8 and 9.
|
||||
CREATE OR REPLACE FUNCTION SplitShardReplicationSetupForColocatedShards(targetNode1 integer, targetNode2 integer) RETURNS text AS $$
|
||||
CREATE OR REPLACE FUNCTION split_shard_replication_setup_for_colocated_shards(targetNode1 integer, targetNode2 integer) RETURNS text AS $$
|
||||
DECLARE
|
||||
memoryId bigint := 0;
|
||||
memoryIdText text;
|
||||
|
@ -59,32 +59,37 @@ end
|
|||
$$ LANGUAGE plpgsql;
|
||||
|
||||
-- Create replication slots for targetNode1 and targetNode2 incase of colocated shards
|
||||
CREATE OR REPLACE FUNCTION CreateReplicationSlotForColocatedShards(targetNode1 integer, targetNode2 integer) RETURNS text AS $$
|
||||
CREATE OR REPLACE FUNCTION create_replication_slot_for_colocated_shards(targetNode1 integer, targetNode2 integer) RETURNS text AS $$
|
||||
DECLARE
|
||||
targetOneSlotName text;
|
||||
targetTwoSlotName text;
|
||||
sharedMemoryId text;
|
||||
derivedSlotName text;
|
||||
derivedSlotNameOne text;
|
||||
derivedSlotNameTwo text;
|
||||
tableOwnerOne bigint;
|
||||
tableOwnerTwo bigint;
|
||||
begin
|
||||
-- setup shared memory information
|
||||
SELECT * into sharedMemoryId from public.split_shard_replication_setup_for_colocated_shards(targetNode1, targetNode2);
|
||||
|
||||
SELECT * into sharedMemoryId from public.SplitShardReplicationSetupForColocatedShards(targetNode1, targetNode2);
|
||||
SELECT FORMAT('%s_%s', targetNode1, sharedMemoryId) into derivedSlotName;
|
||||
SELECT slot_name into targetOneSlotName from pg_create_logical_replication_slot(derivedSlotName, 'decoding_plugin_for_shard_split');
|
||||
SELECT relowner into tableOwnerOne from pg_class where relname='table_first';
|
||||
SELECT FORMAT('%s_%s_%s', targetNode1, sharedMemoryId, tableOwnerOne) into derivedSlotNameOne;
|
||||
SELECT slot_name into targetOneSlotName from pg_create_logical_replication_slot(derivedSlotNameOne, 'decoding_plugin_for_shard_split');
|
||||
|
||||
-- if new child shards are placed on different nodes, create one more replication slot
|
||||
if (targetNode1 != targetNode2) then
|
||||
SELECT FORMAT('%s_%s', targetNode2, sharedMemoryId) into derivedSlotName;
|
||||
SELECT slot_name into targetTwoSlotName from pg_create_logical_replication_slot(derivedSlotName, 'decoding_plugin_for_shard_split');
|
||||
INSERT INTO slotName_table values(targetTwoSlotName, targetNode2, 1);
|
||||
end if;
|
||||
SELECT relowner into tableOwnerTwo from pg_class where relname='table_second';
|
||||
SELECT FORMAT('%s_%s_%s', targetNode2, sharedMemoryId, tableOwnerTwo) into derivedSlotNameTwo;
|
||||
SELECT slot_name into targetTwoSlotName from pg_create_logical_replication_slot(derivedSlotNameTwo, 'decoding_plugin_for_shard_split');
|
||||
|
||||
INSERT INTO slotName_table values(targetOneSlotName, targetNode1, 2);
|
||||
|
||||
INSERT INTO slotName_table values(targetOneSlotName, targetNode1, 1);
|
||||
INSERT INTO slotName_table values(targetTwoSlotName, targetNode2, 2);
|
||||
|
||||
return targetOneSlotName;
|
||||
end
|
||||
$$ LANGUAGE plpgsql;
|
||||
|
||||
-- create subscription on target node with given 'subscriptionName'
|
||||
CREATE OR REPLACE FUNCTION CreateSubscription(targetNodeId integer, subscriptionName text) RETURNS text AS $$
|
||||
CREATE OR REPLACE FUNCTION create_subscription(targetNodeId integer, subscriptionName text) RETURNS text AS $$
|
||||
DECLARE
|
||||
replicationSlotName text;
|
||||
nodeportLocal int;
|
||||
|
@ -94,4 +99,32 @@ begin
|
|||
EXECUTE FORMAT($sub$create subscription %s connection 'host=localhost port=57637 user=postgres dbname=regression' publication PUB1 with(create_slot=false, enabled=true, slot_name='%s', copy_data=false)$sub$, subscriptionName, replicationSlotName);
|
||||
return replicationSlotName;
|
||||
end
|
||||
$$ LANGUAGE plpgsql;
|
||||
|
||||
-- create subscription on target node with given 'subscriptionName'
|
||||
CREATE OR REPLACE FUNCTION create_subscription_for_owner_one(targetNodeId integer, subscriptionName text) RETURNS text AS $$
|
||||
DECLARE
|
||||
replicationSlotName text;
|
||||
nodeportLocal int;
|
||||
subname text;
|
||||
begin
|
||||
SELECT name into replicationSlotName from slotName_table where id = 1;
|
||||
EXECUTE FORMAT($sub$create subscription %s connection 'host=localhost port=57637 user=postgres dbname=regression' publication PUB1 with(create_slot=false, enabled=true, slot_name='%s', copy_data=false)$sub$, subscriptionName, replicationSlotName);
|
||||
RAISE NOTICE 'sameer %', replicationSlotName;
|
||||
return replicationSlotName;
|
||||
end
|
||||
$$ LANGUAGE plpgsql;
|
||||
|
||||
-- create subscription on target node with given 'subscriptionName'
|
||||
CREATE OR REPLACE FUNCTION create_subscription_for_owner_two(targetNodeId integer, subscriptionName text) RETURNS text AS $$
|
||||
DECLARE
|
||||
replicationSlotName text;
|
||||
nodeportLocal int;
|
||||
subname text;
|
||||
begin
|
||||
SELECT name into replicationSlotName from slotName_table where id = 2;
|
||||
EXECUTE FORMAT($sub$create subscription %s connection 'host=localhost port=57637 user=postgres dbname=regression' publication PUB2 with(create_slot=false, enabled=true, slot_name='%s', copy_data=false)$sub$, subscriptionName, replicationSlotName);
|
||||
RAISE NOTICE 'sameer %', replicationSlotName;
|
||||
return replicationSlotName;
|
||||
end
|
||||
$$ LANGUAGE plpgsql;
|
Loading…
Reference in New Issue