From f1be4888b9131ca1791933282a75ad2df676f512 Mon Sep 17 00:00:00 2001 From: Sameer Awasekar Date: Wed, 1 Jun 2022 19:20:55 +0530 Subject: [PATCH] Address comments 1) Created new schema for tests 2) Variable renaming --- .../split_shard_replication_setup.c | 41 ++++++++----------- src/backend/distributed/shardsplit/pgoutput.c | 7 +++- .../shardsplit/shardsplit_shared_memory.c | 18 ++++---- .../split_shard_replication_setup/11.0-2.sql | 2 +- .../split_shard_replication_setup/latest.sql | 2 +- .../distributed/shardsplit_shared_memory.h | 2 - src/test/regress/enterprise_split_schedule | 1 + ...plit_shard_replication_colocated_setup.out | 27 ++++++++++-- .../split_shard_replication_setup.out | 24 ++++++++++- ...plit_shard_replication_colocated_setup.sql | 30 +++++++++++--- .../sql/split_shard_replication_setup.sql | 39 ++++++++++++++---- .../regress/sql/split_shard_test_helpers.sql | 10 ++--- 12 files changed, 140 insertions(+), 63 deletions(-) diff --git a/src/backend/distributed/operations/split_shard_replication_setup.c b/src/backend/distributed/operations/split_shard_replication_setup.c index c14d5331e..791c320de 100644 --- a/src/backend/distributed/operations/split_shard_replication_setup.c +++ b/src/backend/distributed/operations/split_shard_replication_setup.c @@ -14,6 +14,7 @@ #include "distributed/shard_utils.h" #include "distributed/shardsplit_shared_memory.h" #include "distributed/citus_safe_lib.h" +#include "distributed/listutils.h" #include "utils/builtins.h" #include "utils/lsyscache.h" @@ -75,9 +76,9 @@ static int NodeShardMappingHashCompare(const void *left, const void *right, Size * sourceShardId - id of the shard that is undergoing a split * childShardId - id of shard that stores a specific range of values * belonging to sourceShardId(parent) - * minValue - lower bound of hash value which childShard stores + * minValue - lower bound(inclusive) of hash value which childShard stores * - * maxValue - upper bound of hash value which childShard stores + * maxValue - upper bound(inclusive) of hash value which childShard stores * * NodeId - Node where the childShardId is located * @@ -87,12 +88,12 @@ static int NodeShardMappingHashCompare(const void *left, const void *right, Size * * Usage Semantics: * This UDF returns a shared memory handle where the information is stored. This shared memory - * handle is used by caller to encode replication slot name as "NodeId_SharedMemoryHandle" for every + * handle is used by caller to encode replication slot name as "NodeId_SharedMemoryHandle_TableOnwerId" for every * distinct target node. The same encoded slot name is stored in one of the fields of the * in-memory data structure(ShardSplitInfo). * - * There is a 1-1 mapping between a target node and a replication slot. One replication - * slot takes care of replicating changes for all shards belonging to the same owner on that node. + * There is a 1-1 mapping between a table owner id and a replication slot. One replication + * slot takes care of replicating changes for all shards belonging to the same owner on a particular node. * * During the replication phase, 'decoding_plugin_for_shard_split' called for a change on a particular * replication slot, will decode the shared memory handle from its slot name and will attach to the @@ -334,7 +335,6 @@ CreateShardSplitInfo(uint64 sourceShardIdToSplit, CitusTableCacheEntry *cachedTableEntry = GetCitusTableCacheEntry( shardIntervalToSplit->relationId); - /*Todo(sameer): Also check if non-distributed table */ if (!IsCitusTableTypeCacheEntry(cachedTableEntry, HASH_DISTRIBUTED)) { ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), @@ -345,28 +345,25 @@ CreateShardSplitInfo(uint64 sourceShardIdToSplit, Assert(shardIntervalToSplit->maxValueExists); /* Oid of distributed table */ - Oid citusTableOid = InvalidOid; - citusTableOid = shardIntervalToSplit->relationId; - Oid sourceShardToSplitOid = InvalidOid; - sourceShardToSplitOid = GetTableLocalShardOid(citusTableOid, - sourceShardIdToSplit); + Oid citusTableOid = shardIntervalToSplit->relationId; + Oid sourceShardToSplitOid = GetTableLocalShardOid(citusTableOid, + sourceShardIdToSplit); /* Oid of dummy table at the source */ - Oid desSplitChildShardOid = InvalidOid; - desSplitChildShardOid = GetTableLocalShardOid(citusTableOid, - desSplitChildShardId); + Oid destSplitChildShardOid = GetTableLocalShardOid(citusTableOid, + desSplitChildShardId); if (citusTableOid == InvalidOid || sourceShardToSplitOid == InvalidOid || - desSplitChildShardOid == InvalidOid) + destSplitChildShardOid == InvalidOid) { ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("Invalid citusTableOid:%u " "sourceShardToSplitOid: %u," - "desSplitChildShardOid :%u ", + "destSplitChildShardOid :%u ", citusTableOid, sourceShardToSplitOid, - desSplitChildShardOid))); + destSplitChildShardOid))); } /* determine the partition column in the tuple descriptor */ @@ -376,14 +373,13 @@ CreateShardSplitInfo(uint64 sourceShardIdToSplit, ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("Invalid Partition Column"))); } - int partitionColumnIndex = -1; - partitionColumnIndex = partitionColumn->varattno - 1; + int partitionColumnIndex = partitionColumn->varattno - 1; ShardSplitInfo *shardSplitInfo = palloc0(sizeof(ShardSplitInfo)); shardSplitInfo->distributedTableOid = citusTableOid; shardSplitInfo->partitionColumnIndex = partitionColumnIndex; shardSplitInfo->sourceShardOid = sourceShardToSplitOid; - shardSplitInfo->splitChildShardOid = desSplitChildShardOid; + shardSplitInfo->splitChildShardOid = destSplitChildShardOid; shardSplitInfo->shardMinValue = minValue; shardSplitInfo->shardMaxValue = maxValue; shardSplitInfo->nodeId = nodeId; @@ -449,10 +445,9 @@ PopulateShardSplitInfoInSM(ShardSplitInfo *shardSplitInfoArray, encode_replication_slot(nodeId, dsmHandle, tableOwnerId); List *shardSplitInfoList = entry->shardSplitInfoList; - ListCell *listCell = NULL; - foreach(listCell, shardSplitInfoList) + ShardSplitInfo *splitShardInfo = NULL; + foreach_ptr(splitShardInfo, shardSplitInfoList) { - ShardSplitInfo *splitShardInfo = (ShardSplitInfo *) lfirst(listCell); ShardSplitInfo *shardInfoInSM = &shardSplitInfoArray[index]; shardInfoInSM->distributedTableOid = splitShardInfo->distributedTableOid; diff --git a/src/backend/distributed/shardsplit/pgoutput.c b/src/backend/distributed/shardsplit/pgoutput.c index a7a26b28d..f0d6d5c96 100644 --- a/src/backend/distributed/shardsplit/pgoutput.c +++ b/src/backend/distributed/shardsplit/pgoutput.c @@ -15,6 +15,11 @@ #include "distributed/shardsplit_shared_memory.h" #include "replication/logical.h" +/* + * Dynamically-loaded modules are required to include this macro call to check for + * incompatibility (such as being compiled for a different major PostgreSQL version etc). + * In a multiple source-file module, the macro call should only appear once. + */ PG_MODULE_MAGIC; extern void _PG_output_plugin_init(OutputPluginCallbacks *cb); @@ -62,7 +67,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb) /* * GetHashValueForIncomingTuple returns the hash value of the partition * column for the incoming tuple. It also checks if the change should be - * handled as the incoming committed change would belong to a relation + * handled as the incoming committed change can belong to a relation * that is not under going split. */ static int32_t diff --git a/src/backend/distributed/shardsplit/shardsplit_shared_memory.c b/src/backend/distributed/shardsplit/shardsplit_shared_memory.c index cbad1e1c7..eab5f348e 100644 --- a/src/backend/distributed/shardsplit/shardsplit_shared_memory.c +++ b/src/backend/distributed/shardsplit/shardsplit_shared_memory.c @@ -31,8 +31,7 @@ static ShardSplitInfoSMHeader * GetShardSplitInfoSMHeaderFromDSMHandle(dsm_handl /* * GetShardSplitInfoSMHeaderFromDSMHandle returns the header of the shared memory - * segment beloing to 'dsmHandle'. It pins the shared memory segment mapping till - * lifetime of the backend process accessing it. + * segment. It pins the mapping till lifetime of the backend process accessing it. */ static ShardSplitInfoSMHeader * GetShardSplitInfoSMHeaderFromDSMHandle(dsm_handle dsmHandle) @@ -52,9 +51,8 @@ GetShardSplitInfoSMHeaderFromDSMHandle(dsm_handle dsmHandle) } /* - * By default, mappings are owned by current resource owner, which typically - * means they stick around for the duration of current query. - * Keep a dynamic shared memory mapping until end of session to avoid warnings and leak. + * Detatching segment associated with resource owner with 'dsm_pin_mapping' call before the + * resource owner releases, to avoid warning being logged and potential leaks. */ dsm_pin_mapping(dsmSegment); @@ -64,7 +62,6 @@ GetShardSplitInfoSMHeaderFromDSMHandle(dsm_handle dsmHandle) return header; } - /* * GetShardSplitInfoSMArrayForSlot returns pointer to the array of * 'ShardSplitInfo' struct stored in the shared memory segment. @@ -96,12 +93,11 @@ GetShardSplitInfoSMArrayForSlot(char *slotName, int *shardSplitInfoCount) /* - * AllocateSharedMemoryForShardSplitInfo is used to create a place to store + * AllocateSharedMemoryForShardSplitInfo is used to allocate and store * information about the shard undergoing a split. The function allocates dynamic - * shared memory segment consisting of a header which stores the id of process - * creating it and an array of "steps" which store ShardSplitInfo. The contents of - * this shared memory segment are consumed by WAL sender process during catch up phase of - * replication through logical decoding plugin. + * shared memory segment consisting of a header and an array of ShardSplitInfo structure. + * The contents of this shared memory segment are consumed by WAL sender process + * during catch up phase of replication through logical decoding plugin. * * The shared memory segment exists till the catch up phase completes or the * postmaster shutsdown. diff --git a/src/backend/distributed/sql/udfs/split_shard_replication_setup/11.0-2.sql b/src/backend/distributed/sql/udfs/split_shard_replication_setup/11.0-2.sql index 7089466ee..32e00aec8 100644 --- a/src/backend/distributed/sql/udfs/split_shard_replication_setup/11.0-2.sql +++ b/src/backend/distributed/sql/udfs/split_shard_replication_setup/11.0-2.sql @@ -4,4 +4,4 @@ RETURNS bigint LANGUAGE C STRICT AS 'MODULE_PATHNAME', $$split_shard_replication_setup$$; COMMENT ON FUNCTION pg_catalog.split_shard_replication_setup(shardInfo bigint[][]) - IS 'Replication setup for splitting a shard' \ No newline at end of file + IS 'Replication setup for splitting a shard' diff --git a/src/backend/distributed/sql/udfs/split_shard_replication_setup/latest.sql b/src/backend/distributed/sql/udfs/split_shard_replication_setup/latest.sql index 7089466ee..32e00aec8 100644 --- a/src/backend/distributed/sql/udfs/split_shard_replication_setup/latest.sql +++ b/src/backend/distributed/sql/udfs/split_shard_replication_setup/latest.sql @@ -4,4 +4,4 @@ RETURNS bigint LANGUAGE C STRICT AS 'MODULE_PATHNAME', $$split_shard_replication_setup$$; COMMENT ON FUNCTION pg_catalog.split_shard_replication_setup(shardInfo bigint[][]) - IS 'Replication setup for splitting a shard' \ No newline at end of file + IS 'Replication setup for splitting a shard' diff --git a/src/include/distributed/shardsplit_shared_memory.h b/src/include/distributed/shardsplit_shared_memory.h index 9daf013a5..144a24caa 100644 --- a/src/include/distributed/shardsplit_shared_memory.h +++ b/src/include/distributed/shardsplit_shared_memory.h @@ -14,8 +14,6 @@ #ifndef SHARDSPLIT_SHARED_MEMORY_H #define SHARDSPLIT_SHARED_MEMORY_H -#include "c.h" -#include "fmgr.h" #include "distributed/shard_split.h" /* diff --git a/src/test/regress/enterprise_split_schedule b/src/test/regress/enterprise_split_schedule index 6f216ea44..71bfcc34b 100644 --- a/src/test/regress/enterprise_split_schedule +++ b/src/test/regress/enterprise_split_schedule @@ -5,5 +5,6 @@ test: multi_cluster_management test: multi_test_catalog_views test: tablespace # Split tests go here. +test: split_shard_test_helpers test: citus_split_shard_by_split_points_negative test: citus_split_shard_by_split_points diff --git a/src/test/regress/expected/split_shard_replication_colocated_setup.out b/src/test/regress/expected/split_shard_replication_colocated_setup.out index a79f6ff24..0bda47f3e 100644 --- a/src/test/regress/expected/split_shard_replication_colocated_setup.out +++ b/src/test/regress/expected/split_shard_replication_colocated_setup.out @@ -1,12 +1,16 @@ \c - - - :master_port +CREATE USER myuser; +CREATE USER admin_user; +GRANT USAGE, CREATE ON SCHEMA split_shard_replication_setup_schema, public to myuser; +GRANT USAGE, CREATE ON SCHEMA split_shard_replication_setup_schema, public to admin_user; +SET search_path TO split_shard_replication_setup_schema; SET citus.shard_replication_factor TO 1; SET citus.shard_count TO 1; SET citus.next_shard_id TO 4; -CREATE USER myuser; -CREATE USER admin_user; SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \gset \c - myuser - - +SET search_path TO split_shard_replication_setup_schema; SET citus.shard_replication_factor TO 1; SET citus.shard_count TO 1; SET citus.next_shard_id TO 4; @@ -18,6 +22,7 @@ SELECT create_distributed_table('table_first','id'); (1 row) \c - admin_user - - +SET search_path TO split_shard_replication_setup_schema; SET citus.next_shard_id TO 7; SET citus.shard_count TO 1; CREATE TABLE table_second (id bigserial PRIMARY KEY, value char); @@ -28,16 +33,20 @@ SELECT create_distributed_table('table_second', 'id', colocate_with => 'table_fi (1 row) \c - myuser - :worker_1_port +SET search_path TO split_shard_replication_setup_schema; CREATE TABLE table_first_5(id bigserial PRIMARY KEY, value char); CREATE TABLE table_first_6(id bigserial PRIMARY KEY, value char); \c - myuser - :worker_2_port +SET search_path TO split_shard_replication_setup_schema; CREATE TABLE table_first_4(id bigserial PRIMARY KEY, value char); CREATE TABLE table_first_5(id bigserial PRIMARY KEY, value char); CREATE TABLE table_first_6(id bigserial PRIMARY KEY, value char); \c - admin_user - :worker_1_port +SET search_path TO split_shard_replication_setup_schema; CREATE TABLE table_second_8(id bigserial PRIMARY KEY, value char); CREATE TABLE table_second_9(id bigserial PRIMARY KEY, value char); \c - admin_user - :worker_2_port +SET search_path TO split_shard_replication_setup_schema; CREATE TABLE table_second_7(id bigserial PRIMARY KEY, value char); CREATE TABLE table_second_8(id bigserial PRIMARY KEY, value char); CREATE TABLE table_second_9(id bigserial PRIMARY KEY, value char); @@ -50,8 +59,9 @@ CREATE TABLE table_second_9(id bigserial PRIMARY KEY, value char); --- 6. Create two publishers and two subscribers for respective table owners. --- 7. Insert into table_first_4 and table_second_7 at source worker1 --- 8. Expect the results in child shards on worker2 -\c - postgres - :worker_1_port -- Create publication at worker1 +\c - postgres - :worker_1_port +SET search_path TO split_shard_replication_setup_schema; BEGIN; CREATE PUBLICATION PUB1 for table table_first_4, table_first_5, table_first_6; COMMIT; @@ -74,6 +84,7 @@ SELECT pg_sleep(5); -- Create subscription at worker2 with copy_data to 'false' and derived replication slot name \c - postgres - :worker_2_port +SET search_path TO split_shard_replication_setup_schema; SET client_min_messages TO WARNING; BEGIN; SELECT 1 from public.create_subscription_for_owner_one(:worker_2_node, 'SUB1'); @@ -84,6 +95,7 @@ SELECT 1 from public.create_subscription_for_owner_one(:worker_2_node, 'SUB1'); COMMIT; \c - myuser - :worker_1_port +SET search_path TO split_shard_replication_setup_schema; INSERT into table_first_4 values(100, 'a'); INSERT into table_first_4 values(400, 'a'); INSERT into table_first_4 values(500, 'a'); @@ -94,6 +106,7 @@ select pg_sleep(2); (1 row) \c - admin_user - :worker_1_port +SET search_path TO split_shard_replication_setup_schema; INSERT INTO table_second_7 values(100, 'a'); INSERT INTO table_second_7 values(400, 'a'); SELECT * from table_second_7; @@ -110,7 +123,8 @@ select pg_sleep(2); (1 row) \c - myuser - :worker_2_port -SELECT * from table_first_4; -- should alwasy have zero rows +SET search_path TO split_shard_replication_setup_schema; +SELECT * from table_first_4; id | value --------------------------------------------------------------------- (0 rows) @@ -128,7 +142,9 @@ SELECT * from table_first_6; 500 | a (2 rows) +-- should have zero rows in all the below tables as the subscription is not yet created for admin_user \c - admin_user - :worker_2_port +SET search_path TO split_shard_replication_setup_schema; SELECT * from table_second_7; id | value --------------------------------------------------------------------- @@ -145,6 +161,7 @@ SELECT * from table_second_9; (0 rows) \c - postgres - :worker_2_port +SET search_path TO split_shard_replication_setup_schema; SET client_min_messages TO WARNING; BEGIN; SELECT 1 from public.create_subscription_for_owner_two(:worker_2_node, 'SUB2'); @@ -160,7 +177,9 @@ select pg_sleep(5); (1 row) +-- expect data \c - admin_user - :worker_2_port +SET search_path TO split_shard_replication_setup_schema; SELECT * from table_second_7; id | value --------------------------------------------------------------------- diff --git a/src/test/regress/expected/split_shard_replication_setup.out b/src/test/regress/expected/split_shard_replication_setup.out index 6c182dd13..1bd789b41 100644 --- a/src/test/regress/expected/split_shard_replication_setup.out +++ b/src/test/regress/expected/split_shard_replication_setup.out @@ -1,3 +1,5 @@ +CREATE SCHEMA split_shard_replication_setup_schema; +SET search_path TO split_shard_replication_setup_schema; SET citus.shard_replication_factor TO 1; SET citus.shard_count TO 1; SET citus.next_shard_id TO 1; @@ -39,12 +41,14 @@ SELECT create_distributed_table('slotName_table','id'); -- 7. Insert into table_to_split_1 at source worker1 -- 8. Expect the results in either table_to_split_2 or table_to_split_3 at worker2 \c - - - :worker_2_port +SET search_path TO split_shard_replication_setup_schema; CREATE TABLE table_to_split_1(id bigserial PRIMARY KEY, value char); CREATE TABLE table_to_split_2(id bigserial PRIMARY KEY, value char); CREATE TABLE table_to_split_3(id bigserial PRIMARY KEY, value char); -- Create dummy shard tables(table_to_split_2/3) at worker1 -- This is needed for Pub/Sub framework to work. \c - - - :worker_1_port +SET search_path TO split_shard_replication_setup_schema; BEGIN; CREATE TABLE table_to_split_2(id bigserial PRIMARY KEY, value char); CREATE TABLE table_to_split_3(id bigserial PRIMARY KEY, value char); @@ -62,8 +66,9 @@ select 1 from public.create_replication_slot(:worker_2_node, :worker_2_node); (1 row) COMMIT; -\c - - - :worker_2_port -- Create subscription at worker2 with copy_data to 'false' and derived replication slot name +\c - - - :worker_2_port +SET search_path TO split_shard_replication_setup_schema; BEGIN; SELECT 1 from public.create_subscription(:worker_2_node, 'SUB1'); ?column? @@ -96,6 +101,7 @@ SELECT * from table_to_split_3; -- Insert data in table_to_split_1 at worker1 \c - - - :worker_1_port +SET search_path TO split_shard_replication_setup_schema; INSERT into table_to_split_1 values(100, 'a'); INSERT into table_to_split_1 values(400, 'a'); INSERT into table_to_split_1 values(500, 'a'); @@ -125,6 +131,7 @@ select pg_sleep(2); -- Expect data to be present in shard xxxxx and shard xxxxx based on the hash value. \c - - - :worker_2_port +SET search_path TO split_shard_replication_setup_schema; SELECT * from table_to_split_1; -- should alwasy have zero rows id | value --------------------------------------------------------------------- @@ -145,6 +152,7 @@ SELECT * from table_to_split_3; -- UPDATE data of table_to_split_1 from worker1 \c - - - :worker_1_port +SET search_path TO split_shard_replication_setup_schema; UPDATE table_to_split_1 SET value='b' where id = 100; UPDATE table_to_split_1 SET value='b' where id = 400; UPDATE table_to_split_1 SET value='b' where id = 500; @@ -156,6 +164,7 @@ SELECT pg_sleep(2); -- Value should be updated in table_to_split_2; \c - - - :worker_2_port +SET search_path TO split_shard_replication_setup_schema; SELECT * FROM table_to_split_1; id | value --------------------------------------------------------------------- @@ -175,6 +184,7 @@ SELECT * FROM table_to_split_3; (2 rows) \c - - - :worker_1_port +SET search_path TO split_shard_replication_setup_schema; DELETE FROM table_to_split_1; SELECT pg_sleep(5); pg_sleep @@ -184,6 +194,7 @@ SELECT pg_sleep(5); -- Child shard rows should be deleted \c - - - :worker_2_port +SET search_path TO split_shard_replication_setup_schema; SELECT * FROM table_to_split_1; id | value --------------------------------------------------------------------- @@ -201,9 +212,11 @@ SELECT * FROM table_to_split_3; -- drop publication from worker1 \c - - - :worker_1_port +SET search_path TO split_shard_replication_setup_schema; drop PUBLICATION PUB1; DELETE FROM slotName_table; \c - - - :worker_2_port +SET search_path TO split_shard_replication_setup_schema; SET client_min_messages TO WARNING; DROP SUBSCRIPTION SUB1; DELETE FROM slotName_table; @@ -212,6 +225,7 @@ DELETE FROM slotName_table; -- 2. table_to_split_1 is located on worker1. -- 3. table_to_split_2 is located on worker1 and table_to_split_3 is located on worker2 \c - - - :worker_1_port +SET search_path TO split_shard_replication_setup_schema; -- Create publication at worker1 BEGIN; CREATE PUBLICATION PUB1 for table table_to_split_1, table_to_split_2, table_to_split_3; @@ -242,6 +256,7 @@ select pg_sleep(5); (1 row) \c - - - :worker_2_port +SET search_path TO split_shard_replication_setup_schema; -- Create subscription at worker2 with copy_data to 'false' and derived replication slot name BEGIN; SELECT 1 from public.create_subscription(:worker_2_node, 'SUB2'); @@ -275,6 +290,7 @@ SELECT * from table_to_split_3; -- Insert data in table_to_split_1 at worker1 \c - - - :worker_1_port +SET search_path TO split_shard_replication_setup_schema; INSERT into table_to_split_1 values(100, 'a'); INSERT into table_to_split_1 values(400, 'a'); INSERT into table_to_split_1 values(500, 'a'); @@ -307,6 +323,7 @@ SELECT * from table_to_split_3; -- Expect data to be present only in table_to_split3 on worker2 \c - - - :worker_2_port +SET search_path TO split_shard_replication_setup_schema; SELECT * from table_to_split_1; id | value --------------------------------------------------------------------- @@ -326,6 +343,7 @@ SELECT * from table_to_split_3; -- delete all from table_to_split_1 \c - - - :worker_1_port +SET search_path TO split_shard_replication_setup_schema; DELETE FROM table_to_split_1; SELECT pg_sleep(5); pg_sleep @@ -341,6 +359,7 @@ SELECT * from table_to_split_2; -- rows from table_to_split_3 should be deleted \c - - - :worker_2_port +SET search_path TO split_shard_replication_setup_schema; SELECT * from table_to_split_3; id | value --------------------------------------------------------------------- @@ -348,11 +367,13 @@ SELECT * from table_to_split_3; -- drop publication from worker1 \c - - - :worker_1_port +SET search_path TO split_shard_replication_setup_schema; SET client_min_messages TO WARNING; DROP PUBLICATION PUB1; DROP SUBSCRIPTION SUB1; DELETE FROM slotName_table; \c - - - :worker_2_port +SET search_path TO split_shard_replication_setup_schema; SET client_min_messages TO WARNING; DROP SUBSCRIPTION SUB2; DELETE FROM slotName_table; @@ -361,6 +382,7 @@ DELETE FROM slotName_table; -- 2. table_to_split_1 is located on worker1. -- 3. table_to_split_2 and table_to_split_3 are located on worker1 \c - - - :worker_1_port +SET search_path TO split_shard_replication_setup_schema; SET client_min_messages TO WARNING; -- Create publication at worker1 BEGIN; diff --git a/src/test/regress/sql/split_shard_replication_colocated_setup.sql b/src/test/regress/sql/split_shard_replication_colocated_setup.sql index c90bca8e0..a7aa373dc 100644 --- a/src/test/regress/sql/split_shard_replication_colocated_setup.sql +++ b/src/test/regress/sql/split_shard_replication_colocated_setup.sql @@ -1,15 +1,20 @@ \c - - - :master_port +CREATE USER myuser; +CREATE USER admin_user; + +GRANT USAGE, CREATE ON SCHEMA split_shard_replication_setup_schema, public to myuser; +GRANT USAGE, CREATE ON SCHEMA split_shard_replication_setup_schema, public to admin_user; + +SET search_path TO split_shard_replication_setup_schema; SET citus.shard_replication_factor TO 1; SET citus.shard_count TO 1; SET citus.next_shard_id TO 4; -CREATE USER myuser; -CREATE USER admin_user; - SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \gset \c - myuser - - +SET search_path TO split_shard_replication_setup_schema; SET citus.shard_replication_factor TO 1; SET citus.shard_count TO 1; SET citus.next_shard_id TO 4; @@ -17,25 +22,30 @@ CREATE TABLE table_first (id bigserial PRIMARY KEY, value char); SELECT create_distributed_table('table_first','id'); \c - admin_user - - +SET search_path TO split_shard_replication_setup_schema; SET citus.next_shard_id TO 7; SET citus.shard_count TO 1; CREATE TABLE table_second (id bigserial PRIMARY KEY, value char); SELECT create_distributed_table('table_second', 'id', colocate_with => 'table_first'); \c - myuser - :worker_1_port +SET search_path TO split_shard_replication_setup_schema; CREATE TABLE table_first_5(id bigserial PRIMARY KEY, value char); CREATE TABLE table_first_6(id bigserial PRIMARY KEY, value char); \c - myuser - :worker_2_port +SET search_path TO split_shard_replication_setup_schema; CREATE TABLE table_first_4(id bigserial PRIMARY KEY, value char); CREATE TABLE table_first_5(id bigserial PRIMARY KEY, value char); CREATE TABLE table_first_6(id bigserial PRIMARY KEY, value char); \c - admin_user - :worker_1_port +SET search_path TO split_shard_replication_setup_schema; CREATE TABLE table_second_8(id bigserial PRIMARY KEY, value char); CREATE TABLE table_second_9(id bigserial PRIMARY KEY, value char); \c - admin_user - :worker_2_port +SET search_path TO split_shard_replication_setup_schema; CREATE TABLE table_second_7(id bigserial PRIMARY KEY, value char); CREATE TABLE table_second_8(id bigserial PRIMARY KEY, value char); CREATE TABLE table_second_9(id bigserial PRIMARY KEY, value char); @@ -50,8 +60,9 @@ CREATE TABLE table_second_9(id bigserial PRIMARY KEY, value char); --- 7. Insert into table_first_4 and table_second_7 at source worker1 --- 8. Expect the results in child shards on worker2 -\c - postgres - :worker_1_port -- Create publication at worker1 +\c - postgres - :worker_1_port +SET search_path TO split_shard_replication_setup_schema; BEGIN; CREATE PUBLICATION PUB1 for table table_first_4, table_first_5, table_first_6; COMMIT; @@ -68,41 +79,50 @@ SELECT pg_sleep(5); -- Create subscription at worker2 with copy_data to 'false' and derived replication slot name \c - postgres - :worker_2_port +SET search_path TO split_shard_replication_setup_schema; SET client_min_messages TO WARNING; BEGIN; SELECT 1 from public.create_subscription_for_owner_one(:worker_2_node, 'SUB1'); COMMIT; \c - myuser - :worker_1_port +SET search_path TO split_shard_replication_setup_schema; INSERT into table_first_4 values(100, 'a'); INSERT into table_first_4 values(400, 'a'); INSERT into table_first_4 values(500, 'a'); select pg_sleep(2); \c - admin_user - :worker_1_port +SET search_path TO split_shard_replication_setup_schema; INSERT INTO table_second_7 values(100, 'a'); INSERT INTO table_second_7 values(400, 'a'); SELECT * from table_second_7; select pg_sleep(2); \c - myuser - :worker_2_port -SELECT * from table_first_4; -- should alwasy have zero rows +SET search_path TO split_shard_replication_setup_schema; +SELECT * from table_first_4; SELECT * from table_first_5; SELECT * from table_first_6; +-- should have zero rows in all the below tables as the subscription is not yet created for admin_user \c - admin_user - :worker_2_port +SET search_path TO split_shard_replication_setup_schema; SELECT * from table_second_7; SELECT * from table_second_8; SELECT * from table_second_9; \c - postgres - :worker_2_port +SET search_path TO split_shard_replication_setup_schema; SET client_min_messages TO WARNING; BEGIN; SELECT 1 from public.create_subscription_for_owner_two(:worker_2_node, 'SUB2'); COMMIT; select pg_sleep(5); +-- expect data \c - admin_user - :worker_2_port +SET search_path TO split_shard_replication_setup_schema; SELECT * from table_second_7; SELECT * from table_second_8; SELECT * from table_second_9; \ No newline at end of file diff --git a/src/test/regress/sql/split_shard_replication_setup.sql b/src/test/regress/sql/split_shard_replication_setup.sql index e95fe866a..4ac2642bd 100644 --- a/src/test/regress/sql/split_shard_replication_setup.sql +++ b/src/test/regress/sql/split_shard_replication_setup.sql @@ -1,3 +1,5 @@ +CREATE SCHEMA split_shard_replication_setup_schema; +SET search_path TO split_shard_replication_setup_schema; SET citus.shard_replication_factor TO 1; SET citus.shard_count TO 1; SET citus.next_shard_id TO 1; @@ -24,7 +26,7 @@ SELECT create_distributed_table('slotName_table','id'); -- split_shard_replication_setup -- ( -- ARRAY[ --- ARRAY[1 /*source shardId */, 2 /* new shardId */,-2147483648 /* minHashValue */, -1 /* maxHasValue */ , 18 /* nodeId where new shard is placed */ ], +-- ARRAY[1 /*source shardId */, 2 /* new shardId */,-2147483648 /* minHashValue */, -1 /* maxHasValue */ , 18 /* nodeId where new shard is placed */ ], -- ARRAY[1, 3 , 0 , 2147483647, 18 ] -- ] -- ); @@ -34,6 +36,7 @@ SELECT create_distributed_table('slotName_table','id'); -- 8. Expect the results in either table_to_split_2 or table_to_split_3 at worker2 \c - - - :worker_2_port +SET search_path TO split_shard_replication_setup_schema; CREATE TABLE table_to_split_1(id bigserial PRIMARY KEY, value char); CREATE TABLE table_to_split_2(id bigserial PRIMARY KEY, value char); CREATE TABLE table_to_split_3(id bigserial PRIMARY KEY, value char); @@ -41,6 +44,7 @@ CREATE TABLE table_to_split_3(id bigserial PRIMARY KEY, value char); -- Create dummy shard tables(table_to_split_2/3) at worker1 -- This is needed for Pub/Sub framework to work. \c - - - :worker_1_port +SET search_path TO split_shard_replication_setup_schema; BEGIN; CREATE TABLE table_to_split_2(id bigserial PRIMARY KEY, value char); CREATE TABLE table_to_split_3(id bigserial PRIMARY KEY, value char); @@ -56,8 +60,9 @@ BEGIN; select 1 from public.create_replication_slot(:worker_2_node, :worker_2_node); COMMIT; -\c - - - :worker_2_port -- Create subscription at worker2 with copy_data to 'false' and derived replication slot name +\c - - - :worker_2_port +SET search_path TO split_shard_replication_setup_schema; BEGIN; SELECT 1 from public.create_subscription(:worker_2_node, 'SUB1'); COMMIT; @@ -69,8 +74,9 @@ SELECT * from table_to_split_2; SELECT * from table_to_split_3; --- Insert data in table_to_split_1 at worker1 +-- Insert data in table_to_split_1 at worker1 \c - - - :worker_1_port +SET search_path TO split_shard_replication_setup_schema; INSERT into table_to_split_1 values(100, 'a'); INSERT into table_to_split_1 values(400, 'a'); INSERT into table_to_split_1 values(500, 'a'); @@ -81,12 +87,14 @@ select pg_sleep(2); -- Expect data to be present in shard 2 and shard 3 based on the hash value. \c - - - :worker_2_port +SET search_path TO split_shard_replication_setup_schema; SELECT * from table_to_split_1; -- should alwasy have zero rows SELECT * from table_to_split_2; SELECT * from table_to_split_3; -- UPDATE data of table_to_split_1 from worker1 \c - - - :worker_1_port +SET search_path TO split_shard_replication_setup_schema; UPDATE table_to_split_1 SET value='b' where id = 100; UPDATE table_to_split_1 SET value='b' where id = 400; UPDATE table_to_split_1 SET value='b' where id = 500; @@ -94,26 +102,31 @@ SELECT pg_sleep(2); -- Value should be updated in table_to_split_2; \c - - - :worker_2_port +SET search_path TO split_shard_replication_setup_schema; SELECT * FROM table_to_split_1; SELECT * FROM table_to_split_2; SELECT * FROM table_to_split_3; \c - - - :worker_1_port +SET search_path TO split_shard_replication_setup_schema; DELETE FROM table_to_split_1; SELECT pg_sleep(5); -- Child shard rows should be deleted \c - - - :worker_2_port +SET search_path TO split_shard_replication_setup_schema; SELECT * FROM table_to_split_1; SELECT * FROM table_to_split_2; SELECT * FROM table_to_split_3; -- drop publication from worker1 \c - - - :worker_1_port +SET search_path TO split_shard_replication_setup_schema; drop PUBLICATION PUB1; DELETE FROM slotName_table; \c - - - :worker_2_port +SET search_path TO split_shard_replication_setup_schema; SET client_min_messages TO WARNING; DROP SUBSCRIPTION SUB1; DELETE FROM slotName_table; @@ -124,6 +137,7 @@ DELETE FROM slotName_table; -- 3. table_to_split_2 is located on worker1 and table_to_split_3 is located on worker2 \c - - - :worker_1_port +SET search_path TO split_shard_replication_setup_schema; -- Create publication at worker1 BEGIN; CREATE PUBLICATION PUB1 for table table_to_split_1, table_to_split_2, table_to_split_3; @@ -142,6 +156,7 @@ COMMIT; select pg_sleep(5); \c - - - :worker_2_port +SET search_path TO split_shard_replication_setup_schema; -- Create subscription at worker2 with copy_data to 'false' and derived replication slot name BEGIN; SELECT 1 from public.create_subscription(:worker_2_node, 'SUB2'); @@ -155,6 +170,7 @@ SELECT * from table_to_split_3; -- Insert data in table_to_split_1 at worker1 \c - - - :worker_1_port +SET search_path TO split_shard_replication_setup_schema; INSERT into table_to_split_1 values(100, 'a'); INSERT into table_to_split_1 values(400, 'a'); INSERT into table_to_split_1 values(500, 'a'); @@ -163,17 +179,19 @@ select pg_sleep(5); -- expect data to present in table_to_split_2 on worker1 as its destination for value '400' SELECT * from table_to_split_1; -SELECT * from table_to_split_2; +SELECT * from table_to_split_2; SELECT * from table_to_split_3; -- Expect data to be present only in table_to_split3 on worker2 \c - - - :worker_2_port +SET search_path TO split_shard_replication_setup_schema; SELECT * from table_to_split_1; -SELECT * from table_to_split_2; +SELECT * from table_to_split_2; SELECT * from table_to_split_3; -- delete all from table_to_split_1 \c - - - :worker_1_port +SET search_path TO split_shard_replication_setup_schema; DELETE FROM table_to_split_1; SELECT pg_sleep(5); @@ -182,17 +200,19 @@ SELECT * from table_to_split_2; -- rows from table_to_split_3 should be deleted \c - - - :worker_2_port +SET search_path TO split_shard_replication_setup_schema; SELECT * from table_to_split_3; -- drop publication from worker1 \c - - - :worker_1_port +SET search_path TO split_shard_replication_setup_schema; SET client_min_messages TO WARNING; DROP PUBLICATION PUB1; DROP SUBSCRIPTION SUB1; DELETE FROM slotName_table; \c - - - :worker_2_port - +SET search_path TO split_shard_replication_setup_schema; SET client_min_messages TO WARNING; DROP SUBSCRIPTION SUB2; DELETE FROM slotName_table; @@ -203,6 +223,7 @@ DELETE FROM slotName_table; -- 3. table_to_split_2 and table_to_split_3 are located on worker1 \c - - - :worker_1_port +SET search_path TO split_shard_replication_setup_schema; SET client_min_messages TO WARNING; -- Create publication at worker1 @@ -228,17 +249,17 @@ select pg_sleep(5); -- expect data to present in table_to_split_2/3 on worker1 SELECT * from table_to_split_1; -SELECT * from table_to_split_2; +SELECT * from table_to_split_2; SELECT * from table_to_split_3; DELETE FROM table_to_split_1; SELECT pg_sleep(5); SELECT * from table_to_split_1; -SELECT * from table_to_split_2; +SELECT * from table_to_split_2; SELECT * from table_to_split_3; -- clean up DROP PUBLICATION PUB1; DELETE FROM slotName_table; -DROP SUBSCRIPTION SUB1; \ No newline at end of file +DROP SUBSCRIPTION SUB1; diff --git a/src/test/regress/sql/split_shard_test_helpers.sql b/src/test/regress/sql/split_shard_test_helpers.sql index a06f08193..132bd103c 100644 --- a/src/test/regress/sql/split_shard_test_helpers.sql +++ b/src/test/regress/sql/split_shard_test_helpers.sql @@ -69,7 +69,7 @@ DECLARE tableOwnerOne bigint; tableOwnerTwo bigint; begin - -- setup shared memory information + -- setup shared memory information SELECT * into sharedMemoryId from public.split_shard_replication_setup_for_colocated_shards(targetNode1, targetNode2); SELECT relowner into tableOwnerOne from pg_class where relname='table_first'; @@ -83,7 +83,7 @@ begin INSERT INTO slotName_table values(targetOneSlotName, targetNode1, 1); INSERT INTO slotName_table values(targetTwoSlotName, targetNode2, 2); - + return targetOneSlotName; end $$ LANGUAGE plpgsql; @@ -106,7 +106,7 @@ CREATE OR REPLACE FUNCTION create_subscription_for_owner_one(targetNodeId intege DECLARE replicationSlotName text; nodeportLocal int; - subname text; + subname text; begin SELECT name into replicationSlotName from slotName_table where id = 1; EXECUTE FORMAT($sub$create subscription %s connection 'host=localhost port=57637 user=postgres dbname=regression' publication PUB1 with(create_slot=false, enabled=true, slot_name='%s', copy_data=false)$sub$, subscriptionName, replicationSlotName); @@ -120,11 +120,11 @@ CREATE OR REPLACE FUNCTION create_subscription_for_owner_two(targetNodeId intege DECLARE replicationSlotName text; nodeportLocal int; - subname text; + subname text; begin SELECT name into replicationSlotName from slotName_table where id = 2; EXECUTE FORMAT($sub$create subscription %s connection 'host=localhost port=57637 user=postgres dbname=regression' publication PUB2 with(create_slot=false, enabled=true, slot_name='%s', copy_data=false)$sub$, subscriptionName, replicationSlotName); RAISE NOTICE 'sameer %', replicationSlotName; return replicationSlotName; end -$$ LANGUAGE plpgsql; \ No newline at end of file +$$ LANGUAGE plpgsql;