Address comments

1) Created new schema for tests
2) Variable renaming
users/saawasek/non_blocking_split_integrated
Sameer Awasekar 2022-06-01 19:20:55 +05:30
parent 65eb62e723
commit f1be4888b9
12 changed files with 140 additions and 63 deletions

View File

@ -14,6 +14,7 @@
#include "distributed/shard_utils.h"
#include "distributed/shardsplit_shared_memory.h"
#include "distributed/citus_safe_lib.h"
#include "distributed/listutils.h"
#include "utils/builtins.h"
#include "utils/lsyscache.h"
@ -75,9 +76,9 @@ static int NodeShardMappingHashCompare(const void *left, const void *right, Size
* sourceShardId - id of the shard that is undergoing a split
* childShardId - id of shard that stores a specific range of values
* belonging to sourceShardId(parent)
* minValue - lower bound of hash value which childShard stores
* minValue - lower bound(inclusive) of hash value which childShard stores
*
* maxValue - upper bound of hash value which childShard stores
* maxValue - upper bound(inclusive) of hash value which childShard stores
*
* NodeId - Node where the childShardId is located
*
@ -87,12 +88,12 @@ static int NodeShardMappingHashCompare(const void *left, const void *right, Size
*
* Usage Semantics:
* This UDF returns a shared memory handle where the information is stored. This shared memory
* handle is used by caller to encode replication slot name as "NodeId_SharedMemoryHandle" for every
* handle is used by caller to encode replication slot name as "NodeId_SharedMemoryHandle_TableOnwerId" for every
* distinct target node. The same encoded slot name is stored in one of the fields of the
* in-memory data structure(ShardSplitInfo).
*
* There is a 1-1 mapping between a target node and a replication slot. One replication
* slot takes care of replicating changes for all shards belonging to the same owner on that node.
* There is a 1-1 mapping between a table owner id and a replication slot. One replication
* slot takes care of replicating changes for all shards belonging to the same owner on a particular node.
*
* During the replication phase, 'decoding_plugin_for_shard_split' called for a change on a particular
* replication slot, will decode the shared memory handle from its slot name and will attach to the
@ -334,7 +335,6 @@ CreateShardSplitInfo(uint64 sourceShardIdToSplit,
CitusTableCacheEntry *cachedTableEntry = GetCitusTableCacheEntry(
shardIntervalToSplit->relationId);
/*Todo(sameer): Also check if non-distributed table */
if (!IsCitusTableTypeCacheEntry(cachedTableEntry, HASH_DISTRIBUTED))
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
@ -345,28 +345,25 @@ CreateShardSplitInfo(uint64 sourceShardIdToSplit,
Assert(shardIntervalToSplit->maxValueExists);
/* Oid of distributed table */
Oid citusTableOid = InvalidOid;
citusTableOid = shardIntervalToSplit->relationId;
Oid sourceShardToSplitOid = InvalidOid;
sourceShardToSplitOid = GetTableLocalShardOid(citusTableOid,
sourceShardIdToSplit);
Oid citusTableOid = shardIntervalToSplit->relationId;
Oid sourceShardToSplitOid = GetTableLocalShardOid(citusTableOid,
sourceShardIdToSplit);
/* Oid of dummy table at the source */
Oid desSplitChildShardOid = InvalidOid;
desSplitChildShardOid = GetTableLocalShardOid(citusTableOid,
desSplitChildShardId);
Oid destSplitChildShardOid = GetTableLocalShardOid(citusTableOid,
desSplitChildShardId);
if (citusTableOid == InvalidOid ||
sourceShardToSplitOid == InvalidOid ||
desSplitChildShardOid == InvalidOid)
destSplitChildShardOid == InvalidOid)
{
ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR),
errmsg("Invalid citusTableOid:%u "
"sourceShardToSplitOid: %u,"
"desSplitChildShardOid :%u ",
"destSplitChildShardOid :%u ",
citusTableOid,
sourceShardToSplitOid,
desSplitChildShardOid)));
destSplitChildShardOid)));
}
/* determine the partition column in the tuple descriptor */
@ -376,14 +373,13 @@ CreateShardSplitInfo(uint64 sourceShardIdToSplit,
ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR),
errmsg("Invalid Partition Column")));
}
int partitionColumnIndex = -1;
partitionColumnIndex = partitionColumn->varattno - 1;
int partitionColumnIndex = partitionColumn->varattno - 1;
ShardSplitInfo *shardSplitInfo = palloc0(sizeof(ShardSplitInfo));
shardSplitInfo->distributedTableOid = citusTableOid;
shardSplitInfo->partitionColumnIndex = partitionColumnIndex;
shardSplitInfo->sourceShardOid = sourceShardToSplitOid;
shardSplitInfo->splitChildShardOid = desSplitChildShardOid;
shardSplitInfo->splitChildShardOid = destSplitChildShardOid;
shardSplitInfo->shardMinValue = minValue;
shardSplitInfo->shardMaxValue = maxValue;
shardSplitInfo->nodeId = nodeId;
@ -449,10 +445,9 @@ PopulateShardSplitInfoInSM(ShardSplitInfo *shardSplitInfoArray,
encode_replication_slot(nodeId, dsmHandle, tableOwnerId);
List *shardSplitInfoList = entry->shardSplitInfoList;
ListCell *listCell = NULL;
foreach(listCell, shardSplitInfoList)
ShardSplitInfo *splitShardInfo = NULL;
foreach_ptr(splitShardInfo, shardSplitInfoList)
{
ShardSplitInfo *splitShardInfo = (ShardSplitInfo *) lfirst(listCell);
ShardSplitInfo *shardInfoInSM = &shardSplitInfoArray[index];
shardInfoInSM->distributedTableOid = splitShardInfo->distributedTableOid;

View File

@ -15,6 +15,11 @@
#include "distributed/shardsplit_shared_memory.h"
#include "replication/logical.h"
/*
* Dynamically-loaded modules are required to include this macro call to check for
* incompatibility (such as being compiled for a different major PostgreSQL version etc).
* In a multiple source-file module, the macro call should only appear once.
*/
PG_MODULE_MAGIC;
extern void _PG_output_plugin_init(OutputPluginCallbacks *cb);
@ -62,7 +67,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
/*
* GetHashValueForIncomingTuple returns the hash value of the partition
* column for the incoming tuple. It also checks if the change should be
* handled as the incoming committed change would belong to a relation
* handled as the incoming committed change can belong to a relation
* that is not under going split.
*/
static int32_t

View File

@ -31,8 +31,7 @@ static ShardSplitInfoSMHeader * GetShardSplitInfoSMHeaderFromDSMHandle(dsm_handl
/*
* GetShardSplitInfoSMHeaderFromDSMHandle returns the header of the shared memory
* segment beloing to 'dsmHandle'. It pins the shared memory segment mapping till
* lifetime of the backend process accessing it.
* segment. It pins the mapping till lifetime of the backend process accessing it.
*/
static ShardSplitInfoSMHeader *
GetShardSplitInfoSMHeaderFromDSMHandle(dsm_handle dsmHandle)
@ -52,9 +51,8 @@ GetShardSplitInfoSMHeaderFromDSMHandle(dsm_handle dsmHandle)
}
/*
* By default, mappings are owned by current resource owner, which typically
* means they stick around for the duration of current query.
* Keep a dynamic shared memory mapping until end of session to avoid warnings and leak.
* Detatching segment associated with resource owner with 'dsm_pin_mapping' call before the
* resource owner releases, to avoid warning being logged and potential leaks.
*/
dsm_pin_mapping(dsmSegment);
@ -64,7 +62,6 @@ GetShardSplitInfoSMHeaderFromDSMHandle(dsm_handle dsmHandle)
return header;
}
/*
* GetShardSplitInfoSMArrayForSlot returns pointer to the array of
* 'ShardSplitInfo' struct stored in the shared memory segment.
@ -96,12 +93,11 @@ GetShardSplitInfoSMArrayForSlot(char *slotName, int *shardSplitInfoCount)
/*
* AllocateSharedMemoryForShardSplitInfo is used to create a place to store
* AllocateSharedMemoryForShardSplitInfo is used to allocate and store
* information about the shard undergoing a split. The function allocates dynamic
* shared memory segment consisting of a header which stores the id of process
* creating it and an array of "steps" which store ShardSplitInfo. The contents of
* this shared memory segment are consumed by WAL sender process during catch up phase of
* replication through logical decoding plugin.
* shared memory segment consisting of a header and an array of ShardSplitInfo structure.
* The contents of this shared memory segment are consumed by WAL sender process
* during catch up phase of replication through logical decoding plugin.
*
* The shared memory segment exists till the catch up phase completes or the
* postmaster shutsdown.

View File

@ -4,4 +4,4 @@ RETURNS bigint
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$split_shard_replication_setup$$;
COMMENT ON FUNCTION pg_catalog.split_shard_replication_setup(shardInfo bigint[][])
IS 'Replication setup for splitting a shard'
IS 'Replication setup for splitting a shard'

View File

@ -4,4 +4,4 @@ RETURNS bigint
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$split_shard_replication_setup$$;
COMMENT ON FUNCTION pg_catalog.split_shard_replication_setup(shardInfo bigint[][])
IS 'Replication setup for splitting a shard'
IS 'Replication setup for splitting a shard'

View File

@ -14,8 +14,6 @@
#ifndef SHARDSPLIT_SHARED_MEMORY_H
#define SHARDSPLIT_SHARED_MEMORY_H
#include "c.h"
#include "fmgr.h"
#include "distributed/shard_split.h"
/*

View File

@ -5,5 +5,6 @@ test: multi_cluster_management
test: multi_test_catalog_views
test: tablespace
# Split tests go here.
test: split_shard_test_helpers
test: citus_split_shard_by_split_points_negative
test: citus_split_shard_by_split_points

View File

@ -1,12 +1,16 @@
\c - - - :master_port
CREATE USER myuser;
CREATE USER admin_user;
GRANT USAGE, CREATE ON SCHEMA split_shard_replication_setup_schema, public to myuser;
GRANT USAGE, CREATE ON SCHEMA split_shard_replication_setup_schema, public to admin_user;
SET search_path TO split_shard_replication_setup_schema;
SET citus.shard_replication_factor TO 1;
SET citus.shard_count TO 1;
SET citus.next_shard_id TO 4;
CREATE USER myuser;
CREATE USER admin_user;
SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset
SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \gset
\c - myuser - -
SET search_path TO split_shard_replication_setup_schema;
SET citus.shard_replication_factor TO 1;
SET citus.shard_count TO 1;
SET citus.next_shard_id TO 4;
@ -18,6 +22,7 @@ SELECT create_distributed_table('table_first','id');
(1 row)
\c - admin_user - -
SET search_path TO split_shard_replication_setup_schema;
SET citus.next_shard_id TO 7;
SET citus.shard_count TO 1;
CREATE TABLE table_second (id bigserial PRIMARY KEY, value char);
@ -28,16 +33,20 @@ SELECT create_distributed_table('table_second', 'id', colocate_with => 'table_fi
(1 row)
\c - myuser - :worker_1_port
SET search_path TO split_shard_replication_setup_schema;
CREATE TABLE table_first_5(id bigserial PRIMARY KEY, value char);
CREATE TABLE table_first_6(id bigserial PRIMARY KEY, value char);
\c - myuser - :worker_2_port
SET search_path TO split_shard_replication_setup_schema;
CREATE TABLE table_first_4(id bigserial PRIMARY KEY, value char);
CREATE TABLE table_first_5(id bigserial PRIMARY KEY, value char);
CREATE TABLE table_first_6(id bigserial PRIMARY KEY, value char);
\c - admin_user - :worker_1_port
SET search_path TO split_shard_replication_setup_schema;
CREATE TABLE table_second_8(id bigserial PRIMARY KEY, value char);
CREATE TABLE table_second_9(id bigserial PRIMARY KEY, value char);
\c - admin_user - :worker_2_port
SET search_path TO split_shard_replication_setup_schema;
CREATE TABLE table_second_7(id bigserial PRIMARY KEY, value char);
CREATE TABLE table_second_8(id bigserial PRIMARY KEY, value char);
CREATE TABLE table_second_9(id bigserial PRIMARY KEY, value char);
@ -50,8 +59,9 @@ CREATE TABLE table_second_9(id bigserial PRIMARY KEY, value char);
--- 6. Create two publishers and two subscribers for respective table owners.
--- 7. Insert into table_first_4 and table_second_7 at source worker1
--- 8. Expect the results in child shards on worker2
\c - postgres - :worker_1_port
-- Create publication at worker1
\c - postgres - :worker_1_port
SET search_path TO split_shard_replication_setup_schema;
BEGIN;
CREATE PUBLICATION PUB1 for table table_first_4, table_first_5, table_first_6;
COMMIT;
@ -74,6 +84,7 @@ SELECT pg_sleep(5);
-- Create subscription at worker2 with copy_data to 'false' and derived replication slot name
\c - postgres - :worker_2_port
SET search_path TO split_shard_replication_setup_schema;
SET client_min_messages TO WARNING;
BEGIN;
SELECT 1 from public.create_subscription_for_owner_one(:worker_2_node, 'SUB1');
@ -84,6 +95,7 @@ SELECT 1 from public.create_subscription_for_owner_one(:worker_2_node, 'SUB1');
COMMIT;
\c - myuser - :worker_1_port
SET search_path TO split_shard_replication_setup_schema;
INSERT into table_first_4 values(100, 'a');
INSERT into table_first_4 values(400, 'a');
INSERT into table_first_4 values(500, 'a');
@ -94,6 +106,7 @@ select pg_sleep(2);
(1 row)
\c - admin_user - :worker_1_port
SET search_path TO split_shard_replication_setup_schema;
INSERT INTO table_second_7 values(100, 'a');
INSERT INTO table_second_7 values(400, 'a');
SELECT * from table_second_7;
@ -110,7 +123,8 @@ select pg_sleep(2);
(1 row)
\c - myuser - :worker_2_port
SELECT * from table_first_4; -- should alwasy have zero rows
SET search_path TO split_shard_replication_setup_schema;
SELECT * from table_first_4;
id | value
---------------------------------------------------------------------
(0 rows)
@ -128,7 +142,9 @@ SELECT * from table_first_6;
500 | a
(2 rows)
-- should have zero rows in all the below tables as the subscription is not yet created for admin_user
\c - admin_user - :worker_2_port
SET search_path TO split_shard_replication_setup_schema;
SELECT * from table_second_7;
id | value
---------------------------------------------------------------------
@ -145,6 +161,7 @@ SELECT * from table_second_9;
(0 rows)
\c - postgres - :worker_2_port
SET search_path TO split_shard_replication_setup_schema;
SET client_min_messages TO WARNING;
BEGIN;
SELECT 1 from public.create_subscription_for_owner_two(:worker_2_node, 'SUB2');
@ -160,7 +177,9 @@ select pg_sleep(5);
(1 row)
-- expect data
\c - admin_user - :worker_2_port
SET search_path TO split_shard_replication_setup_schema;
SELECT * from table_second_7;
id | value
---------------------------------------------------------------------

View File

@ -1,3 +1,5 @@
CREATE SCHEMA split_shard_replication_setup_schema;
SET search_path TO split_shard_replication_setup_schema;
SET citus.shard_replication_factor TO 1;
SET citus.shard_count TO 1;
SET citus.next_shard_id TO 1;
@ -39,12 +41,14 @@ SELECT create_distributed_table('slotName_table','id');
-- 7. Insert into table_to_split_1 at source worker1
-- 8. Expect the results in either table_to_split_2 or table_to_split_3 at worker2
\c - - - :worker_2_port
SET search_path TO split_shard_replication_setup_schema;
CREATE TABLE table_to_split_1(id bigserial PRIMARY KEY, value char);
CREATE TABLE table_to_split_2(id bigserial PRIMARY KEY, value char);
CREATE TABLE table_to_split_3(id bigserial PRIMARY KEY, value char);
-- Create dummy shard tables(table_to_split_2/3) at worker1
-- This is needed for Pub/Sub framework to work.
\c - - - :worker_1_port
SET search_path TO split_shard_replication_setup_schema;
BEGIN;
CREATE TABLE table_to_split_2(id bigserial PRIMARY KEY, value char);
CREATE TABLE table_to_split_3(id bigserial PRIMARY KEY, value char);
@ -62,8 +66,9 @@ select 1 from public.create_replication_slot(:worker_2_node, :worker_2_node);
(1 row)
COMMIT;
\c - - - :worker_2_port
-- Create subscription at worker2 with copy_data to 'false' and derived replication slot name
\c - - - :worker_2_port
SET search_path TO split_shard_replication_setup_schema;
BEGIN;
SELECT 1 from public.create_subscription(:worker_2_node, 'SUB1');
?column?
@ -96,6 +101,7 @@ SELECT * from table_to_split_3;
-- Insert data in table_to_split_1 at worker1
\c - - - :worker_1_port
SET search_path TO split_shard_replication_setup_schema;
INSERT into table_to_split_1 values(100, 'a');
INSERT into table_to_split_1 values(400, 'a');
INSERT into table_to_split_1 values(500, 'a');
@ -125,6 +131,7 @@ select pg_sleep(2);
-- Expect data to be present in shard xxxxx and shard xxxxx based on the hash value.
\c - - - :worker_2_port
SET search_path TO split_shard_replication_setup_schema;
SELECT * from table_to_split_1; -- should alwasy have zero rows
id | value
---------------------------------------------------------------------
@ -145,6 +152,7 @@ SELECT * from table_to_split_3;
-- UPDATE data of table_to_split_1 from worker1
\c - - - :worker_1_port
SET search_path TO split_shard_replication_setup_schema;
UPDATE table_to_split_1 SET value='b' where id = 100;
UPDATE table_to_split_1 SET value='b' where id = 400;
UPDATE table_to_split_1 SET value='b' where id = 500;
@ -156,6 +164,7 @@ SELECT pg_sleep(2);
-- Value should be updated in table_to_split_2;
\c - - - :worker_2_port
SET search_path TO split_shard_replication_setup_schema;
SELECT * FROM table_to_split_1;
id | value
---------------------------------------------------------------------
@ -175,6 +184,7 @@ SELECT * FROM table_to_split_3;
(2 rows)
\c - - - :worker_1_port
SET search_path TO split_shard_replication_setup_schema;
DELETE FROM table_to_split_1;
SELECT pg_sleep(5);
pg_sleep
@ -184,6 +194,7 @@ SELECT pg_sleep(5);
-- Child shard rows should be deleted
\c - - - :worker_2_port
SET search_path TO split_shard_replication_setup_schema;
SELECT * FROM table_to_split_1;
id | value
---------------------------------------------------------------------
@ -201,9 +212,11 @@ SELECT * FROM table_to_split_3;
-- drop publication from worker1
\c - - - :worker_1_port
SET search_path TO split_shard_replication_setup_schema;
drop PUBLICATION PUB1;
DELETE FROM slotName_table;
\c - - - :worker_2_port
SET search_path TO split_shard_replication_setup_schema;
SET client_min_messages TO WARNING;
DROP SUBSCRIPTION SUB1;
DELETE FROM slotName_table;
@ -212,6 +225,7 @@ DELETE FROM slotName_table;
-- 2. table_to_split_1 is located on worker1.
-- 3. table_to_split_2 is located on worker1 and table_to_split_3 is located on worker2
\c - - - :worker_1_port
SET search_path TO split_shard_replication_setup_schema;
-- Create publication at worker1
BEGIN;
CREATE PUBLICATION PUB1 for table table_to_split_1, table_to_split_2, table_to_split_3;
@ -242,6 +256,7 @@ select pg_sleep(5);
(1 row)
\c - - - :worker_2_port
SET search_path TO split_shard_replication_setup_schema;
-- Create subscription at worker2 with copy_data to 'false' and derived replication slot name
BEGIN;
SELECT 1 from public.create_subscription(:worker_2_node, 'SUB2');
@ -275,6 +290,7 @@ SELECT * from table_to_split_3;
-- Insert data in table_to_split_1 at worker1
\c - - - :worker_1_port
SET search_path TO split_shard_replication_setup_schema;
INSERT into table_to_split_1 values(100, 'a');
INSERT into table_to_split_1 values(400, 'a');
INSERT into table_to_split_1 values(500, 'a');
@ -307,6 +323,7 @@ SELECT * from table_to_split_3;
-- Expect data to be present only in table_to_split3 on worker2
\c - - - :worker_2_port
SET search_path TO split_shard_replication_setup_schema;
SELECT * from table_to_split_1;
id | value
---------------------------------------------------------------------
@ -326,6 +343,7 @@ SELECT * from table_to_split_3;
-- delete all from table_to_split_1
\c - - - :worker_1_port
SET search_path TO split_shard_replication_setup_schema;
DELETE FROM table_to_split_1;
SELECT pg_sleep(5);
pg_sleep
@ -341,6 +359,7 @@ SELECT * from table_to_split_2;
-- rows from table_to_split_3 should be deleted
\c - - - :worker_2_port
SET search_path TO split_shard_replication_setup_schema;
SELECT * from table_to_split_3;
id | value
---------------------------------------------------------------------
@ -348,11 +367,13 @@ SELECT * from table_to_split_3;
-- drop publication from worker1
\c - - - :worker_1_port
SET search_path TO split_shard_replication_setup_schema;
SET client_min_messages TO WARNING;
DROP PUBLICATION PUB1;
DROP SUBSCRIPTION SUB1;
DELETE FROM slotName_table;
\c - - - :worker_2_port
SET search_path TO split_shard_replication_setup_schema;
SET client_min_messages TO WARNING;
DROP SUBSCRIPTION SUB2;
DELETE FROM slotName_table;
@ -361,6 +382,7 @@ DELETE FROM slotName_table;
-- 2. table_to_split_1 is located on worker1.
-- 3. table_to_split_2 and table_to_split_3 are located on worker1
\c - - - :worker_1_port
SET search_path TO split_shard_replication_setup_schema;
SET client_min_messages TO WARNING;
-- Create publication at worker1
BEGIN;

View File

@ -1,15 +1,20 @@
\c - - - :master_port
CREATE USER myuser;
CREATE USER admin_user;
GRANT USAGE, CREATE ON SCHEMA split_shard_replication_setup_schema, public to myuser;
GRANT USAGE, CREATE ON SCHEMA split_shard_replication_setup_schema, public to admin_user;
SET search_path TO split_shard_replication_setup_schema;
SET citus.shard_replication_factor TO 1;
SET citus.shard_count TO 1;
SET citus.next_shard_id TO 4;
CREATE USER myuser;
CREATE USER admin_user;
SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset
SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \gset
\c - myuser - -
SET search_path TO split_shard_replication_setup_schema;
SET citus.shard_replication_factor TO 1;
SET citus.shard_count TO 1;
SET citus.next_shard_id TO 4;
@ -17,25 +22,30 @@ CREATE TABLE table_first (id bigserial PRIMARY KEY, value char);
SELECT create_distributed_table('table_first','id');
\c - admin_user - -
SET search_path TO split_shard_replication_setup_schema;
SET citus.next_shard_id TO 7;
SET citus.shard_count TO 1;
CREATE TABLE table_second (id bigserial PRIMARY KEY, value char);
SELECT create_distributed_table('table_second', 'id', colocate_with => 'table_first');
\c - myuser - :worker_1_port
SET search_path TO split_shard_replication_setup_schema;
CREATE TABLE table_first_5(id bigserial PRIMARY KEY, value char);
CREATE TABLE table_first_6(id bigserial PRIMARY KEY, value char);
\c - myuser - :worker_2_port
SET search_path TO split_shard_replication_setup_schema;
CREATE TABLE table_first_4(id bigserial PRIMARY KEY, value char);
CREATE TABLE table_first_5(id bigserial PRIMARY KEY, value char);
CREATE TABLE table_first_6(id bigserial PRIMARY KEY, value char);
\c - admin_user - :worker_1_port
SET search_path TO split_shard_replication_setup_schema;
CREATE TABLE table_second_8(id bigserial PRIMARY KEY, value char);
CREATE TABLE table_second_9(id bigserial PRIMARY KEY, value char);
\c - admin_user - :worker_2_port
SET search_path TO split_shard_replication_setup_schema;
CREATE TABLE table_second_7(id bigserial PRIMARY KEY, value char);
CREATE TABLE table_second_8(id bigserial PRIMARY KEY, value char);
CREATE TABLE table_second_9(id bigserial PRIMARY KEY, value char);
@ -50,8 +60,9 @@ CREATE TABLE table_second_9(id bigserial PRIMARY KEY, value char);
--- 7. Insert into table_first_4 and table_second_7 at source worker1
--- 8. Expect the results in child shards on worker2
\c - postgres - :worker_1_port
-- Create publication at worker1
\c - postgres - :worker_1_port
SET search_path TO split_shard_replication_setup_schema;
BEGIN;
CREATE PUBLICATION PUB1 for table table_first_4, table_first_5, table_first_6;
COMMIT;
@ -68,41 +79,50 @@ SELECT pg_sleep(5);
-- Create subscription at worker2 with copy_data to 'false' and derived replication slot name
\c - postgres - :worker_2_port
SET search_path TO split_shard_replication_setup_schema;
SET client_min_messages TO WARNING;
BEGIN;
SELECT 1 from public.create_subscription_for_owner_one(:worker_2_node, 'SUB1');
COMMIT;
\c - myuser - :worker_1_port
SET search_path TO split_shard_replication_setup_schema;
INSERT into table_first_4 values(100, 'a');
INSERT into table_first_4 values(400, 'a');
INSERT into table_first_4 values(500, 'a');
select pg_sleep(2);
\c - admin_user - :worker_1_port
SET search_path TO split_shard_replication_setup_schema;
INSERT INTO table_second_7 values(100, 'a');
INSERT INTO table_second_7 values(400, 'a');
SELECT * from table_second_7;
select pg_sleep(2);
\c - myuser - :worker_2_port
SELECT * from table_first_4; -- should alwasy have zero rows
SET search_path TO split_shard_replication_setup_schema;
SELECT * from table_first_4;
SELECT * from table_first_5;
SELECT * from table_first_6;
-- should have zero rows in all the below tables as the subscription is not yet created for admin_user
\c - admin_user - :worker_2_port
SET search_path TO split_shard_replication_setup_schema;
SELECT * from table_second_7;
SELECT * from table_second_8;
SELECT * from table_second_9;
\c - postgres - :worker_2_port
SET search_path TO split_shard_replication_setup_schema;
SET client_min_messages TO WARNING;
BEGIN;
SELECT 1 from public.create_subscription_for_owner_two(:worker_2_node, 'SUB2');
COMMIT;
select pg_sleep(5);
-- expect data
\c - admin_user - :worker_2_port
SET search_path TO split_shard_replication_setup_schema;
SELECT * from table_second_7;
SELECT * from table_second_8;
SELECT * from table_second_9;

View File

@ -1,3 +1,5 @@
CREATE SCHEMA split_shard_replication_setup_schema;
SET search_path TO split_shard_replication_setup_schema;
SET citus.shard_replication_factor TO 1;
SET citus.shard_count TO 1;
SET citus.next_shard_id TO 1;
@ -24,7 +26,7 @@ SELECT create_distributed_table('slotName_table','id');
-- split_shard_replication_setup
-- (
-- ARRAY[
-- ARRAY[1 /*source shardId */, 2 /* new shardId */,-2147483648 /* minHashValue */, -1 /* maxHasValue */ , 18 /* nodeId where new shard is placed */ ],
-- ARRAY[1 /*source shardId */, 2 /* new shardId */,-2147483648 /* minHashValue */, -1 /* maxHasValue */ , 18 /* nodeId where new shard is placed */ ],
-- ARRAY[1, 3 , 0 , 2147483647, 18 ]
-- ]
-- );
@ -34,6 +36,7 @@ SELECT create_distributed_table('slotName_table','id');
-- 8. Expect the results in either table_to_split_2 or table_to_split_3 at worker2
\c - - - :worker_2_port
SET search_path TO split_shard_replication_setup_schema;
CREATE TABLE table_to_split_1(id bigserial PRIMARY KEY, value char);
CREATE TABLE table_to_split_2(id bigserial PRIMARY KEY, value char);
CREATE TABLE table_to_split_3(id bigserial PRIMARY KEY, value char);
@ -41,6 +44,7 @@ CREATE TABLE table_to_split_3(id bigserial PRIMARY KEY, value char);
-- Create dummy shard tables(table_to_split_2/3) at worker1
-- This is needed for Pub/Sub framework to work.
\c - - - :worker_1_port
SET search_path TO split_shard_replication_setup_schema;
BEGIN;
CREATE TABLE table_to_split_2(id bigserial PRIMARY KEY, value char);
CREATE TABLE table_to_split_3(id bigserial PRIMARY KEY, value char);
@ -56,8 +60,9 @@ BEGIN;
select 1 from public.create_replication_slot(:worker_2_node, :worker_2_node);
COMMIT;
\c - - - :worker_2_port
-- Create subscription at worker2 with copy_data to 'false' and derived replication slot name
\c - - - :worker_2_port
SET search_path TO split_shard_replication_setup_schema;
BEGIN;
SELECT 1 from public.create_subscription(:worker_2_node, 'SUB1');
COMMIT;
@ -69,8 +74,9 @@ SELECT * from table_to_split_2;
SELECT * from table_to_split_3;
-- Insert data in table_to_split_1 at worker1
-- Insert data in table_to_split_1 at worker1
\c - - - :worker_1_port
SET search_path TO split_shard_replication_setup_schema;
INSERT into table_to_split_1 values(100, 'a');
INSERT into table_to_split_1 values(400, 'a');
INSERT into table_to_split_1 values(500, 'a');
@ -81,12 +87,14 @@ select pg_sleep(2);
-- Expect data to be present in shard 2 and shard 3 based on the hash value.
\c - - - :worker_2_port
SET search_path TO split_shard_replication_setup_schema;
SELECT * from table_to_split_1; -- should alwasy have zero rows
SELECT * from table_to_split_2;
SELECT * from table_to_split_3;
-- UPDATE data of table_to_split_1 from worker1
\c - - - :worker_1_port
SET search_path TO split_shard_replication_setup_schema;
UPDATE table_to_split_1 SET value='b' where id = 100;
UPDATE table_to_split_1 SET value='b' where id = 400;
UPDATE table_to_split_1 SET value='b' where id = 500;
@ -94,26 +102,31 @@ SELECT pg_sleep(2);
-- Value should be updated in table_to_split_2;
\c - - - :worker_2_port
SET search_path TO split_shard_replication_setup_schema;
SELECT * FROM table_to_split_1;
SELECT * FROM table_to_split_2;
SELECT * FROM table_to_split_3;
\c - - - :worker_1_port
SET search_path TO split_shard_replication_setup_schema;
DELETE FROM table_to_split_1;
SELECT pg_sleep(5);
-- Child shard rows should be deleted
\c - - - :worker_2_port
SET search_path TO split_shard_replication_setup_schema;
SELECT * FROM table_to_split_1;
SELECT * FROM table_to_split_2;
SELECT * FROM table_to_split_3;
-- drop publication from worker1
\c - - - :worker_1_port
SET search_path TO split_shard_replication_setup_schema;
drop PUBLICATION PUB1;
DELETE FROM slotName_table;
\c - - - :worker_2_port
SET search_path TO split_shard_replication_setup_schema;
SET client_min_messages TO WARNING;
DROP SUBSCRIPTION SUB1;
DELETE FROM slotName_table;
@ -124,6 +137,7 @@ DELETE FROM slotName_table;
-- 3. table_to_split_2 is located on worker1 and table_to_split_3 is located on worker2
\c - - - :worker_1_port
SET search_path TO split_shard_replication_setup_schema;
-- Create publication at worker1
BEGIN;
CREATE PUBLICATION PUB1 for table table_to_split_1, table_to_split_2, table_to_split_3;
@ -142,6 +156,7 @@ COMMIT;
select pg_sleep(5);
\c - - - :worker_2_port
SET search_path TO split_shard_replication_setup_schema;
-- Create subscription at worker2 with copy_data to 'false' and derived replication slot name
BEGIN;
SELECT 1 from public.create_subscription(:worker_2_node, 'SUB2');
@ -155,6 +170,7 @@ SELECT * from table_to_split_3;
-- Insert data in table_to_split_1 at worker1
\c - - - :worker_1_port
SET search_path TO split_shard_replication_setup_schema;
INSERT into table_to_split_1 values(100, 'a');
INSERT into table_to_split_1 values(400, 'a');
INSERT into table_to_split_1 values(500, 'a');
@ -163,17 +179,19 @@ select pg_sleep(5);
-- expect data to present in table_to_split_2 on worker1 as its destination for value '400'
SELECT * from table_to_split_1;
SELECT * from table_to_split_2;
SELECT * from table_to_split_2;
SELECT * from table_to_split_3;
-- Expect data to be present only in table_to_split3 on worker2
\c - - - :worker_2_port
SET search_path TO split_shard_replication_setup_schema;
SELECT * from table_to_split_1;
SELECT * from table_to_split_2;
SELECT * from table_to_split_2;
SELECT * from table_to_split_3;
-- delete all from table_to_split_1
\c - - - :worker_1_port
SET search_path TO split_shard_replication_setup_schema;
DELETE FROM table_to_split_1;
SELECT pg_sleep(5);
@ -182,17 +200,19 @@ SELECT * from table_to_split_2;
-- rows from table_to_split_3 should be deleted
\c - - - :worker_2_port
SET search_path TO split_shard_replication_setup_schema;
SELECT * from table_to_split_3;
-- drop publication from worker1
\c - - - :worker_1_port
SET search_path TO split_shard_replication_setup_schema;
SET client_min_messages TO WARNING;
DROP PUBLICATION PUB1;
DROP SUBSCRIPTION SUB1;
DELETE FROM slotName_table;
\c - - - :worker_2_port
SET search_path TO split_shard_replication_setup_schema;
SET client_min_messages TO WARNING;
DROP SUBSCRIPTION SUB2;
DELETE FROM slotName_table;
@ -203,6 +223,7 @@ DELETE FROM slotName_table;
-- 3. table_to_split_2 and table_to_split_3 are located on worker1
\c - - - :worker_1_port
SET search_path TO split_shard_replication_setup_schema;
SET client_min_messages TO WARNING;
-- Create publication at worker1
@ -228,17 +249,17 @@ select pg_sleep(5);
-- expect data to present in table_to_split_2/3 on worker1
SELECT * from table_to_split_1;
SELECT * from table_to_split_2;
SELECT * from table_to_split_2;
SELECT * from table_to_split_3;
DELETE FROM table_to_split_1;
SELECT pg_sleep(5);
SELECT * from table_to_split_1;
SELECT * from table_to_split_2;
SELECT * from table_to_split_2;
SELECT * from table_to_split_3;
-- clean up
DROP PUBLICATION PUB1;
DELETE FROM slotName_table;
DROP SUBSCRIPTION SUB1;
DROP SUBSCRIPTION SUB1;

View File

@ -69,7 +69,7 @@ DECLARE
tableOwnerOne bigint;
tableOwnerTwo bigint;
begin
-- setup shared memory information
-- setup shared memory information
SELECT * into sharedMemoryId from public.split_shard_replication_setup_for_colocated_shards(targetNode1, targetNode2);
SELECT relowner into tableOwnerOne from pg_class where relname='table_first';
@ -83,7 +83,7 @@ begin
INSERT INTO slotName_table values(targetOneSlotName, targetNode1, 1);
INSERT INTO slotName_table values(targetTwoSlotName, targetNode2, 2);
return targetOneSlotName;
end
$$ LANGUAGE plpgsql;
@ -106,7 +106,7 @@ CREATE OR REPLACE FUNCTION create_subscription_for_owner_one(targetNodeId intege
DECLARE
replicationSlotName text;
nodeportLocal int;
subname text;
subname text;
begin
SELECT name into replicationSlotName from slotName_table where id = 1;
EXECUTE FORMAT($sub$create subscription %s connection 'host=localhost port=57637 user=postgres dbname=regression' publication PUB1 with(create_slot=false, enabled=true, slot_name='%s', copy_data=false)$sub$, subscriptionName, replicationSlotName);
@ -120,11 +120,11 @@ CREATE OR REPLACE FUNCTION create_subscription_for_owner_two(targetNodeId intege
DECLARE
replicationSlotName text;
nodeportLocal int;
subname text;
subname text;
begin
SELECT name into replicationSlotName from slotName_table where id = 2;
EXECUTE FORMAT($sub$create subscription %s connection 'host=localhost port=57637 user=postgres dbname=regression' publication PUB2 with(create_slot=false, enabled=true, slot_name='%s', copy_data=false)$sub$, subscriptionName, replicationSlotName);
RAISE NOTICE 'sameer %', replicationSlotName;
return replicationSlotName;
end
$$ LANGUAGE plpgsql;
$$ LANGUAGE plpgsql;