From 5da75b84ac1928405adf957b4522a6c23e9bad8e Mon Sep 17 00:00:00 2001 From: Sameer Awasekar Date: Thu, 26 May 2022 17:53:47 +0530 Subject: [PATCH] Removed some methods. Handling review comments --- .../operations/shard_split_replicatoin.c | 24 ++- src/backend/distributed/shardsplit/Makefile | 2 - src/backend/distributed/shardsplit/pgoutput.c | 15 +- .../shardsplit/shardsplit_shared_memory.c | 57 ++--- src/include/distributed/shard_split.h | 8 +- .../distributed/shardsplit_shared_memory.h | 9 +- .../regress/expected/multi_test_helpers.out | 59 +++++ .../split_shard_replication_setup.out | 202 +++++++++++------- src/test/regress/sql/multi_test_helpers.sql | 63 ++++++ .../sql/split_shard_replication_setup.sql | 179 +++++++--------- 10 files changed, 366 insertions(+), 252 deletions(-) diff --git a/src/backend/distributed/operations/shard_split_replicatoin.c b/src/backend/distributed/operations/shard_split_replicatoin.c index 4c179771c..89a45dc05 100644 --- a/src/backend/distributed/operations/shard_split_replicatoin.c +++ b/src/backend/distributed/operations/shard_split_replicatoin.c @@ -68,8 +68,9 @@ static void SetupHashMapForShardInfo(); * This meta information is stored in a shared memory segment and accessed * by logical decoding plugin. * - * Split information is given by user as an Array in the below format - * [{sourceShardId, childShardId, minValue, maxValue, Destination NodeId}] + * Split information is given by user as an Array of source shards undergoing splits + * in the below format. + * Array[Array[sourceShardId, childShardId, minValue, maxValue, Destination NodeId]] * * sourceShardId - id of the shard that is undergoing a split * childShardId - id of shard that stores a specific range of values @@ -84,11 +85,20 @@ static void SetupHashMapForShardInfo(); * Multiple shards can be placed on the same destiation node. Source and * destinations nodes can be same too. * + * Usage Semantics: + * This UDF returns a shared memory handle where the information is stored. This shared memory + * handle is used by caller to encode replication slot name as "NodeId_MemoryHandle" for every + * distinct target node. The same encoded slot name is stored in one of the fields of the + * in-memory data structure(ShardSplitInfo). + * * There is a 1-1 mapping between a target node and a replication slot as one replication * slot takes care of replicating changes for one node. - * The 'decoding_plugin_for_shard_split' consumes this information and routes the tuple - * from the source shard to the appropriate destination shard that falls in the - * respective range. + * + * During the replication phase, 'decoding_plugin_for_shard_split' called for a change on a particular + * replication slot, will decode the shared memory handle from its slot name and will attach to the + * shared memory. The plugin consumes the information from shared memory. It routes the tuple + * from the source shard to the appropriate destination shard for which the respective slot is + * responsible. */ Datum split_shard_replication_setup(PG_FUNCTION_ARGS) @@ -359,9 +369,6 @@ CreateShardSplitInfo(uint64 sourceShardIdToSplit, desSplitChildShardOid))); } - /* Get PartitionColumnIndex for citusTableOid */ - int partitionColumnIndex = -1; - /* determine the partition column in the tuple descriptor */ Var *partitionColumn = cachedTableEntry->partitionColumn; if (partitionColumn == NULL) @@ -369,6 +376,7 @@ CreateShardSplitInfo(uint64 sourceShardIdToSplit, ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("Invalid Partition Column"))); } + int partitionColumnIndex = -1; partitionColumnIndex = partitionColumn->varattno - 1; ShardSplitInfo *shardSplitInfo = palloc0(sizeof(ShardSplitInfo)); diff --git a/src/backend/distributed/shardsplit/Makefile b/src/backend/distributed/shardsplit/Makefile index 93ec8a0b2..8d0c67b64 100644 --- a/src/backend/distributed/shardsplit/Makefile +++ b/src/backend/distributed/shardsplit/Makefile @@ -12,8 +12,6 @@ safestringlib_srcdir = $(citus_abs_top_srcdir)/vendor/safestringlib SUBDIRS = . safeclib SUBDIRS += ENSURE_SUBDIRS_EXIST := $(shell mkdir -p $(SUBDIRS)) -#OBJS += \ - $(patsubst $(citus_abs_srcdir)/%.c,%.o,$(foreach dir,$(SUBDIRS), $(sort $(wildcard $(citus_abs_srcdir)/$(dir)/*.c)))) OBJS += pgoutput.o MODULE_big = decoding_plugin_for_shard_split diff --git a/src/backend/distributed/shardsplit/pgoutput.c b/src/backend/distributed/shardsplit/pgoutput.c index 2d029ef3b..91073e60e 100644 --- a/src/backend/distributed/shardsplit/pgoutput.c +++ b/src/backend/distributed/shardsplit/pgoutput.c @@ -55,10 +55,8 @@ static int32_t GetHashValueForIncomingTuple(Relation sourceShardRelation, void _PG_output_plugin_init(OutputPluginCallbacks *cb) { - char *plugin = "pgoutput"; - LogicalOutputPluginInit plugin_init = - (LogicalOutputPluginInit) load_external_function(plugin, + (LogicalOutputPluginInit) load_external_function("pgoutput", "_PG_output_plugin_init", false, NULL); @@ -70,8 +68,8 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb) /* ask the output plugin to fill the callback struct */ plugin_init(cb); + /* actual pgoutput callback will be called with the appropriate destination shard */ pgoutputChangeCB = cb->change_cb; - cb->change_cb = split_change_cb; } @@ -202,8 +200,6 @@ FindTargetRelationOid(Relation sourceShardRelation, * part of replication. This in turn creates one more commit(2). * Commit 2 should be skipped as the source shard and destination for commit 2 * are same and the commit has already been applied. - * - * TODO(saawasek): Add the information in Hashmap for performance reasons. */ bool ShouldCommitBeApplied(Relation sourceShardRelation) @@ -240,7 +236,6 @@ split_change_cb(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, * initialized. This gets initialized during the replication of * first message. */ - int arraySize = 0; if (shardSplitInfoArray == NULL) { shardSplitInfoArray = @@ -274,9 +269,11 @@ split_change_cb(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, break; } - /* Only INSERT/DELETE are visible in the replication path of split shard */ + /* Only INSERT/DELETE actions are visible in the replication path of split shard */ default: - Assert(false); + ereport(ERROR, errmsg( + "Unexpected Action :%d. Expected action is INSERT or DELETE", + change->action)); } /* Current replication slot is not responsible for handling the change */ diff --git a/src/backend/distributed/shardsplit/shardsplit_shared_memory.c b/src/backend/distributed/shardsplit/shardsplit_shared_memory.c index 80a17d29b..26c58102b 100644 --- a/src/backend/distributed/shardsplit/shardsplit_shared_memory.c +++ b/src/backend/distributed/shardsplit/shardsplit_shared_memory.c @@ -26,11 +26,10 @@ static ShardSplitInfoSMHeader * AllocateSharedMemoryForShardSplitInfo(int dsm_handle * dsmHandle); -static void * ShardSplitInfoSMSteps(ShardSplitInfoSMHeader *shardSplitInfoSMHeader); +static void * ShardSplitInfoSMData(ShardSplitInfoSMHeader *shardSplitInfoSMHeader); static ShardSplitInfoSMHeader * GetShardSplitInfoSMHeaderFromDSMHandle(dsm_handle dsmHandle); -static dsm_handle GetSMHandleFromSlotName(char *slotName); /* * GetShardSplitInfoSMHeaderFromDSMHandle returns the header of the shared memory @@ -69,50 +68,31 @@ GetShardSplitInfoSMHeaderFromDSMHandle(dsm_handle dsmHandle) * 'ShardSplitInfo' struct stored in the shared memory segment. */ ShardSplitInfo * -GetShardSplitInfoSMArrayForSlot(char *slotName, int *arraySize) +GetShardSplitInfoSMArrayForSlot(char *slotName, int *shardSplitInfoCount) { if (slotName == NULL || - arraySize == NULL) + shardSplitInfoCount == NULL) { ereport(ERROR, (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED), errmsg("Expected slot name and array size arguments"))); } - dsm_handle dsmHandle = GetSMHandleFromSlotName(slotName); + dsm_handle dsmHandle; + uint64_t nodeId = 0; + decode_replication_slot(slotName, &nodeId, &dsmHandle); + ShardSplitInfoSMHeader *shardSplitInfoSMHeader = GetShardSplitInfoSMHeaderFromDSMHandle(dsmHandle); - *arraySize = shardSplitInfoSMHeader->stepCount; + *shardSplitInfoCount = shardSplitInfoSMHeader->shardSplitInfoCount; ShardSplitInfo *shardSplitInfoArray = - (ShardSplitInfo *) ShardSplitInfoSMSteps(shardSplitInfoSMHeader); + (ShardSplitInfo *) ShardSplitInfoSMData(shardSplitInfoSMHeader); return shardSplitInfoArray; } -/* - * GetSMHandleFromSlotName function returns the shared memory handle - * from the replication slot name. Replication slot name is encoded as - * "NODEID_SlotType_SharedMemoryHANDLE". - */ -static dsm_handle -GetSMHandleFromSlotName(char *slotName) -{ - if (slotName == NULL) - { - ereport(ERROR, - errmsg("Invalid NULL replication slot name.")); - } - - uint64_t nodeId = 0; - dsm_handle handle = 0; - decode_replication_slot(slotName, &nodeId, &handle); - - return handle; -} - - /* * AllocateSharedMemoryForShardSplitInfo is used to create a place to store * information about the shard undergoing a split. The function allocates dynamic @@ -157,8 +137,7 @@ AllocateSharedMemoryForShardSplitInfo(int shardSplitInfoCount, Size shardSplitIn ShardSplitInfoSMHeader *shardSplitInfoSMHeader = GetShardSplitInfoSMHeaderFromDSMHandle(*dsmHandle); - shardSplitInfoSMHeader->stepCount = shardSplitInfoCount; - shardSplitInfoSMHeader->processId = MyProcPid; + shardSplitInfoSMHeader->shardSplitInfoCount = shardSplitInfoCount; return shardSplitInfoSMHeader; } @@ -180,20 +159,20 @@ CreateSharedMemoryForShardSplitInfo(int shardSplitInfoCount, dsm_handle *dsmHand sizeof(ShardSplitInfo), dsmHandle); ShardSplitInfo *shardSplitInfoSMArray = - (ShardSplitInfo *) ShardSplitInfoSMSteps(shardSplitInfoSMHeader); + (ShardSplitInfo *) ShardSplitInfoSMData(shardSplitInfoSMHeader); return shardSplitInfoSMArray; } /* - * ShardSplitInfoSMSteps returns a pointer to the array of 'ShardSplitInfo' - * steps that are stored in shared memory segment. This is simply the data - * right after the header, so this function is trivial. The main purpose of - * this function is to make the intent clear to readers of the code. + * ShardSplitInfoSMData returns a pointer to the array of 'ShardSplitInfo'. + * This is simply the data right after the header, so this function is trivial. + * The main purpose of this function is to make the intent clear to readers + * of the code. */ static void * -ShardSplitInfoSMSteps(ShardSplitInfoSMHeader *shardSplitInfoSMHeader) +ShardSplitInfoSMData(ShardSplitInfoSMHeader *shardSplitInfoSMHeader) { return shardSplitInfoSMHeader + 1; } @@ -223,7 +202,9 @@ decode_replication_slot(char *slotName, uint64_t *nodeId, dsm_handle *dsmHandle) { - if (slotName == NULL) + if (slotName == NULL || + nodeId == NULL || + dsmHandle == NULL) { ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), diff --git a/src/include/distributed/shard_split.h b/src/include/distributed/shard_split.h index 3e6f9acc6..46c32ff45 100644 --- a/src/include/distributed/shard_split.h +++ b/src/include/distributed/shard_split.h @@ -29,16 +29,16 @@ typedef enum SplitOperation } SplitOperation; /* - * In-memory representation of a split child shard. + * In-memory mapping of a split child shard. */ typedef struct ShardSplitInfo { Oid distributedTableOid; /* citus distributed table Oid */ - int partitionColumnIndex; + int partitionColumnIndex; /* partition column index */ Oid sourceShardOid; /* parent shard Oid */ Oid splitChildShardOid; /* child shard Oid */ - int32 shardMinValue; - int32 shardMaxValue; + int32 shardMinValue; /* min hash value */ + int32 shardMaxValue; /* max hash value */ uint64 nodeId; /* node where child shard is to be placed */ char slotName[NAMEDATALEN]; /* replication slot name belonging to this node */ } ShardSplitInfo; diff --git a/src/include/distributed/shardsplit_shared_memory.h b/src/include/distributed/shardsplit_shared_memory.h index 4670c3926..ee1e389bd 100644 --- a/src/include/distributed/shardsplit_shared_memory.h +++ b/src/include/distributed/shardsplit_shared_memory.h @@ -1,6 +1,6 @@ /*------------------------------------------------------------------------- * - * shardsplit_sharedmemory.h + * shardsplit_shared_memory.h * API's for creating and accessing shared memory segments to store * shard split information. 'setup_shard_replication' UDF creates the * shared memory, populates the contents and WAL sender processes are @@ -14,7 +14,6 @@ #ifndef SHARDSPLIT_SHARED_MEMORY_H #define SHARDSPLIT_SHARED_MEMORY_H -#include "postgres.h" #include "c.h" #include "fmgr.h" #include "distributed/shard_split.h" @@ -24,8 +23,7 @@ */ typedef struct ShardSplitInfoSMHeader { - uint64 processId; /* process id creating the shared memory segment */ - int stepCount; /* number of elements in the shared memory */ + int shardSplitInfoCount; /* number of elements in the shared memory */ } ShardSplitInfoSMHeader; @@ -33,7 +31,8 @@ typedef struct ShardSplitInfoSMHeader extern ShardSplitInfo * CreateSharedMemoryForShardSplitInfo(int shardSplitInfoCount, dsm_handle *dsmHandle); -extern ShardSplitInfo * GetShardSplitInfoSMArrayForSlot(char *slotName, int *arraySize); +extern ShardSplitInfo * GetShardSplitInfoSMArrayForSlot(char *slotName, + int *shardSplitInfoCount); /* Functions related to encoding-decoding for replication slot name */ diff --git a/src/test/regress/expected/multi_test_helpers.out b/src/test/regress/expected/multi_test_helpers.out index 5ae18f26b..5c40c886d 100644 --- a/src/test/regress/expected/multi_test_helpers.out +++ b/src/test/regress/expected/multi_test_helpers.out @@ -124,3 +124,62 @@ BEGIN END LOOP; END; $$ LANGUAGE plpgsql; +\c - - - :worker_1_port +-- Create replication slots for targetNode1 and targetNode2 +CREATE OR REPLACE FUNCTION CreateReplicationSlot(targetNode1 integer, targetNode2 integer) RETURNS text AS $$ +DECLARE + targetOneSlotName text; + targetTwoSlotName text; + sharedMemoryId text; + derivedSlotName text; +begin + + SELECT * into sharedMemoryId from public.SplitShardReplicationSetup(targetNode1, targetNode2); + SELECT FORMAT('%s_%s', targetNode1, sharedMemoryId) into derivedSlotName; + SELECT slot_name into targetOneSlotName from pg_create_logical_replication_slot(derivedSlotName, 'decoding_plugin_for_shard_split'); + + -- if new child shards are placed on different nodes, create one more replication slot + if (targetNode1 != targetNode2) then + SELECT FORMAT('%s_%s', targetNode2, sharedMemoryId) into derivedSlotName; + SELECT slot_name into targetTwoSlotName from pg_create_logical_replication_slot(derivedSlotName, 'decoding_plugin_for_shard_split'); + INSERT INTO slotName_table values(targetTwoSlotName, targetNode2, 1); + end if; + + INSERT INTO slotName_table values(targetOneSlotName, targetNode1, 2); + return targetOneSlotName; +end +$$ LANGUAGE plpgsql; +-- targetNode1, targetNode2 are the locations where childShard1 and childShard2 are placed respectively +CREATE OR REPLACE FUNCTION SplitShardReplicationSetup(targetNode1 integer, targetNode2 integer) RETURNS text AS $$ +DECLARE + memoryId bigint := 0; + memoryIdText text; +begin + SELECT * into memoryId from split_shard_replication_setup(ARRAY[ARRAY[1,2,-2147483648,-1, targetNode1], ARRAY[1,3,0,2147483647,targetNode2]]); + SELECT FORMAT('%s', memoryId) into memoryIdText; + return memoryIdText; +end +$$ LANGUAGE plpgsql; +CREATE OR REPLACE FUNCTION CreateSubscription(targetNodeId integer, subscriptionName text) RETURNS text AS $$ +DECLARE + replicationSlotName text; + nodeportLocal int; + subname text; +begin + SELECT name into replicationSlotName from slotName_table where nodeId = targetNodeId; + EXECUTE FORMAT($sub$create subscription %s connection 'host=localhost port=xxxxx user=postgres dbname=regression' publication PUB1 with(create_slot=false, enabled=true, slot_name='%s', copy_data=false)$sub$, subscriptionName, replicationSlotName); + return 'a'; +end +$$ LANGUAGE plpgsql; +\c - - - :worker_2_port +CREATE OR REPLACE FUNCTION CreateSubscription(targetNodeId integer, subscriptionName text) RETURNS text AS $$ +DECLARE + replicationSlotName text; + nodeportLocal int; + subname text; +begin + SELECT name into replicationSlotName from slotName_table where nodeId = targetNodeId; + EXECUTE FORMAT($sub$create subscription %s connection 'host=localhost port=xxxxx user=postgres dbname=regression' publication PUB1 with(create_slot=false, enabled=true, slot_name='%s', copy_data=false)$sub$, subscriptionName, replicationSlotName); + return 'a'; +end +$$ LANGUAGE plpgsql; diff --git a/src/test/regress/expected/split_shard_replication_setup.out b/src/test/regress/expected/split_shard_replication_setup.out index 3c4df005c..e0011f60f 100644 --- a/src/test/regress/expected/split_shard_replication_setup.out +++ b/src/test/regress/expected/split_shard_replication_setup.out @@ -1,5 +1,3 @@ -CREATE SCHEMA citus_split_shard_by_split_points; -SET search_path TO citus_split_shard_by_split_points; SET citus.shard_replication_factor TO 1; SET citus.shard_count TO 1; SET citus.next_shard_id TO 1; @@ -35,59 +33,6 @@ SELECT create_distributed_table('slotName_table','id'); (1 row) --- targetNode1, targetNode2 are the locations where childShard1 and childShard2 are placed respectively -CREATE OR REPLACE FUNCTION SplitShardReplicationSetup(targetNode1 integer, targetNode2 integer) RETURNS text AS $$ -DECLARE - memoryId bigint := 0; - memoryIdText text; -begin - SELECT * into memoryId from split_shard_replication_setup(ARRAY[ARRAY[1,2,-2147483648,-1, targetNode1], ARRAY[1,3,0,2147483647,targetNode2]]); - SELECT FORMAT('%s', memoryId) into memoryIdText; - return memoryIdText; -end -$$ LANGUAGE plpgsql; --- Create replication slots for targetNode1 and targetNode2 -CREATE OR REPLACE FUNCTION CreateReplicationSlot(targetNode1 integer, targetNode2 integer) RETURNS text AS $$ -DECLARE - targetOneSlotName text; - targetTwoSlotName text; - sharedMemoryId text; - derivedSlotName text; -begin - - SELECT * into sharedMemoryId from SplitShardReplicationSetup(targetNode1, targetNode2); - SELECT FORMAT('%s_%s', targetNode1, sharedMemoryId) into derivedSlotName; - SELECT slot_name into targetOneSlotName from pg_create_logical_replication_slot(derivedSlotName, 'decoding_plugin_for_shard_split'); - - -- if new child shards are placed on different nodes, create one more replication slot - if (targetNode1 != targetNode2) then - SELECT FORMAT('%s_%s', targetNode2, sharedMemoryId) into derivedSlotName; - SELECT slot_name into targetTwoSlotName from pg_create_logical_replication_slot(derivedSlotName, 'decoding_plugin_for_shard_split'); - INSERT INTO slotName_table values(targetTwoSlotName, targetNode2, 1); - end if; - - INSERT INTO slotName_table values(targetOneSlotName, targetNode1, 2); - return targetOneSlotName; -end -$$ LANGUAGE plpgsql; -CREATE OR REPLACE FUNCTION CreateSubscription(targetNodeId integer, subscriptionName text) RETURNS text AS $$ -DECLARE - replicationSlotName text; - nodeportLocal int; - subname text; -begin - SELECT name into replicationSlotName from slotName_table where nodeId = targetNodeId; - EXECUTE FORMAT($sub$create subscription %s connection 'host=localhost port=xxxxx user=postgres dbname=regression' publication PUB1 with(create_slot=false, enabled=true, slot_name='%s', copy_data=false)$sub$, subscriptionName, replicationSlotName); - return 'a'; -end -$$ LANGUAGE plpgsql; -CREATE OR REPLACE FUNCTION DropSubscription(subscriptionName text) RETURNS text AS $$ -DECLARE -begin - EXECUTE FORMAT('DROP SUBSCRIPTION %s', subscriptionName); - return subscriptionName; -end -$$ LANGUAGE plpgsql; -- Test scenario one starts from here -- 1. table_to_split is a citus distributed table -- 2. Shard table_to_split_1 is located on worker1. @@ -107,14 +52,12 @@ $$ LANGUAGE plpgsql; -- 7. Insert into table_to_split_1 at source worker1 -- 8. Expect the results in either table_to_split_2 or table_to_split_2 at worker2 \c - - - :worker_2_port -SET search_path TO citus_split_shard_by_split_points; CREATE TABLE table_to_split_1(id bigserial PRIMARY KEY, value char); CREATE TABLE table_to_split_2(id bigserial PRIMARY KEY, value char); CREATE TABLE table_to_split_3(id bigserial PRIMARY KEY, value char); -- Create dummy shard tables(table_to_split_2/3) at worker1 -- This is needed for Pub/Sub framework to work. \c - - - :worker_1_port -SET search_path TO citus_split_shard_by_split_points; BEGIN; CREATE TABLE table_to_split_2(id bigserial PRIMARY KEY, value char); CREATE TABLE table_to_split_3(id bigserial PRIMARY KEY, value char); @@ -123,9 +66,9 @@ COMMIT; BEGIN; CREATE PUBLICATION PUB1 for table table_to_split_1, table_to_split_2, table_to_split_3; COMMIT; --- Create replication slot and setup shard split information at worker1 +-- Create replication slot for target node worker2 BEGIN; -select 1 from CreateReplicationSlot(:worker_2_node, :worker_2_node); +select 1 from public.CreateReplicationSlot(:worker_2_node, :worker_2_node); ?column? --------------------------------------------------------------------- 1 @@ -133,10 +76,9 @@ select 1 from CreateReplicationSlot(:worker_2_node, :worker_2_node); COMMIT; \c - - - :worker_2_port -SET search_path TO citus_split_shard_by_split_points; -- Create subscription at worker2 with copy_data to 'false' and derived replication slot name BEGIN; -SELECT 1 from CreateSubscription(:worker_2_node, 'SUB1'); +SELECT 1 from public.CreateSubscription(:worker_2_node, 'SUB1'); ?column? --------------------------------------------------------------------- 1 @@ -167,7 +109,6 @@ select pg_sleep(10); -- Insert data in table_to_split_1 at worker1 \c - - - :worker_1_port -SET search_path TO citus_split_shard_by_split_points; INSERT into table_to_split_1 values(100, 'a'); INSERT into table_to_split_1 values(400, 'a'); INSERT into table_to_split_1 values(500, 'a'); @@ -203,7 +144,6 @@ select pg_sleep(10); (1 row) -SET search_path TO citus_split_shard_by_split_points; SELECT * from table_to_split_1; -- should alwasy have zero rows id | value --------------------------------------------------------------------- @@ -224,7 +164,6 @@ SELECT * from table_to_split_3; -- Delete data from table_to_split_1 from worker1 \c - - - :worker_1_port -SET search_path TO citus_split_shard_by_split_points; DELETE FROM table_to_split_1; SELECT pg_sleep(10); pg_sleep @@ -234,7 +173,6 @@ SELECT pg_sleep(10); -- Child shard rows should be deleted \c - - - :worker_2_port -SET search_path TO citus_split_shard_by_split_points; SELECT * FROM table_to_split_1; id | value --------------------------------------------------------------------- @@ -252,28 +190,25 @@ SELECT * FROM table_to_split_3; -- drop publication from worker1 \c - - - :worker_1_port -SET search_path TO citus_split_shard_by_split_points; drop PUBLICATION PUB1; DELETE FROM slotName_table; \c - - - :worker_2_port -SET search_path TO citus_split_shard_by_split_points; SET client_min_messages TO WARNING; DROP SUBSCRIPTION SUB1; DELETE FROM slotName_table; -- Test scenario two starts from here --- 1. table_to_split_1 is split into table_to_split_2 and table_to_split_3. table_to_split_1 is --- located on worker1. --- table_to_split_2 is located on worker1 and table_to_split_3 is located on worker2 +-- 1. table_to_split_1 is split into table_to_split_2 and table_to_split_3. +-- 2. table_to_split_1 is located on worker1. +-- 3. table_to_split_2 is located on worker1 and table_to_split_3 is located on worker2 \c - - - :worker_1_port -SET search_path TO citus_split_shard_by_split_points; -- Create publication at worker1 BEGIN; CREATE PUBLICATION PUB1 for table table_to_split_1, table_to_split_2, table_to_split_3; COMMIT; --- Create replication slot and setup shard split information at worker1 --- table_to_split2 is located on Worker1 and table_to_split_3 is located on worker2 +-- Create replication slots for two target nodes worker1 and worker2. +-- Worker1 is target for table_to_split_2 and Worker2 is target for table_to_split_3 BEGIN; -select 1 from CreateReplicationSlot(:worker_1_node, :worker_2_node); +select 1 from public.CreateReplicationSlot(:worker_1_node, :worker_2_node); ?column? --------------------------------------------------------------------- 1 @@ -288,7 +223,7 @@ SELECT pg_sleep(10); -- Create subscription at worker1 with copy_data to 'false' and derived replication slot name BEGIN; -SELECT 1 from CreateSubscription(:worker_1_node, 'SUB1'); +SELECT 1 from public.CreateSubscription(:worker_1_node, 'SUB1'); ?column? --------------------------------------------------------------------- 1 @@ -296,10 +231,9 @@ SELECT 1 from CreateSubscription(:worker_1_node, 'SUB1'); COMMIT; \c - - - :worker_2_port -SET search_path TO citus_split_shard_by_split_points; -- Create subscription at worker2 with copy_data to 'false' and derived replication slot name BEGIN; -SELECT 1 from CreateSubscription(:worker_2_node, 'SUB2'); +SELECT 1 from public.CreateSubscription(:worker_2_node, 'SUB2'); ?column? --------------------------------------------------------------------- 1 @@ -330,7 +264,6 @@ select pg_sleep(10); -- Insert data in table_to_split_1 at worker1 \c - - - :worker_1_port -SET search_path TO citus_split_shard_by_split_points; INSERT into table_to_split_1 values(100, 'a'); INSERT into table_to_split_1 values(400, 'a'); INSERT into table_to_split_1 values(500, 'a'); @@ -340,7 +273,7 @@ select pg_sleep(10); (1 row) --- expect data to present in table_to_split_2 on worker1 +-- expect data to present in table_to_split_2 on worker1 as its destination for value '400' SELECT * from table_to_split_1; id | value --------------------------------------------------------------------- @@ -366,7 +299,7 @@ select pg_sleep(10); (1 row) --- Expect data to be present in table_to_split3 on worker2 +-- Expect data to be present only in table_to_split3 on worker2 \c - - - :worker_2_port select pg_sleep(10); pg_sleep @@ -374,7 +307,6 @@ select pg_sleep(10); (1 row) -SET search_path TO citus_split_shard_by_split_points; SELECT * from table_to_split_1; id | value --------------------------------------------------------------------- @@ -394,7 +326,6 @@ SELECT * from table_to_split_3; -- delete all from table_to_split_1 \c - - - :worker_1_port -SET search_path TO citus_split_shard_by_split_points; DELETE FROM table_to_split_1; SELECT pg_sleep(5); pg_sleep @@ -410,7 +341,112 @@ SELECT * from table_to_split_2; -- rows from table_to_split_3 should be deleted \c - - - :worker_2_port -SET search_path TO citus_split_shard_by_split_points; +SELECT * from table_to_split_3; + id | value +--------------------------------------------------------------------- +(0 rows) + + -- drop publication from worker1 +\c - - - :worker_1_port +SET client_min_messages TO WARNING; +DROP PUBLICATION PUB1; +DROP SUBSCRIPTION SUB1; +DELETE FROM slotName_table; +\c - - - :worker_2_port +SET client_min_messages TO WARNING; +DROP SUBSCRIPTION SUB2; +DELETE FROM slotName_table; +-- Test Scenario 3 +\c - - - :worker_1_port +-- Create publication at worker1 +BEGIN; + CREATE PUBLICATION PUB1 for table table_to_split_1, table_to_split_2, table_to_split_3; +COMMIT; +-- Create replication slots for two target nodes worker1 and worker2. +-- Worker1 is target for table_to_split_2 and Worker2 is target for table_to_split_3 +BEGIN; +select 1 from public.CreateReplicationSlot(:worker_1_node, :worker_1_node); + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + +COMMIT; +SELECT pg_sleep(10); + pg_sleep +--------------------------------------------------------------------- + +(1 row) + +-- Create subscription at worker1 with copy_data to 'false' and derived replication slot name +BEGIN; +SELECT 1 from public.CreateSubscription(:worker_1_node, 'SUB1'); + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + +COMMIT; +SELECT pg_sleep(10); + pg_sleep +--------------------------------------------------------------------- + +(1 row) + + INSERT into table_to_split_1 values(100, 'a'); +INSERT into table_to_split_1 values(400, 'a'); +INSERT into table_to_split_1 values(500, 'a'); +select pg_sleep(10); + pg_sleep +--------------------------------------------------------------------- + +(1 row) + +-- expect data to present in table_to_split_2 on worker1 +SELECT * from table_to_split_1; + id | value +--------------------------------------------------------------------- + 100 | a + 400 | a + 500 | a +(3 rows) + +SELECT * from table_to_split_2; + id | value +--------------------------------------------------------------------- + 400 | a +(1 row) + +SELECT * from table_to_split_3; + id | value +--------------------------------------------------------------------- + 100 | a + 500 | a +(2 rows) + +select pg_sleep(10); + pg_sleep +--------------------------------------------------------------------- + +(1 row) + +DELETE FROM table_to_split_1; +SELECT pg_sleep(10); + pg_sleep +--------------------------------------------------------------------- + +(1 row) + +SELECT * from table_to_split_1; + id | value +--------------------------------------------------------------------- +(0 rows) + +SELECT * from table_to_split_2; + id | value +--------------------------------------------------------------------- +(0 rows) + SELECT * from table_to_split_3; id | value --------------------------------------------------------------------- diff --git a/src/test/regress/sql/multi_test_helpers.sql b/src/test/regress/sql/multi_test_helpers.sql index 9e70cb971..4e3eeeb51 100644 --- a/src/test/regress/sql/multi_test_helpers.sql +++ b/src/test/regress/sql/multi_test_helpers.sql @@ -135,3 +135,66 @@ BEGIN END; $$ LANGUAGE plpgsql; +\c - - - :worker_1_port +-- Create replication slots for targetNode1 and targetNode2 +CREATE OR REPLACE FUNCTION CreateReplicationSlot(targetNode1 integer, targetNode2 integer) RETURNS text AS $$ +DECLARE + targetOneSlotName text; + targetTwoSlotName text; + sharedMemoryId text; + derivedSlotName text; +begin + + SELECT * into sharedMemoryId from public.SplitShardReplicationSetup(targetNode1, targetNode2); + SELECT FORMAT('%s_%s', targetNode1, sharedMemoryId) into derivedSlotName; + SELECT slot_name into targetOneSlotName from pg_create_logical_replication_slot(derivedSlotName, 'decoding_plugin_for_shard_split'); + + -- if new child shards are placed on different nodes, create one more replication slot + if (targetNode1 != targetNode2) then + SELECT FORMAT('%s_%s', targetNode2, sharedMemoryId) into derivedSlotName; + SELECT slot_name into targetTwoSlotName from pg_create_logical_replication_slot(derivedSlotName, 'decoding_plugin_for_shard_split'); + INSERT INTO slotName_table values(targetTwoSlotName, targetNode2, 1); + end if; + + INSERT INTO slotName_table values(targetOneSlotName, targetNode1, 2); + return targetOneSlotName; +end +$$ LANGUAGE plpgsql; + +-- targetNode1, targetNode2 are the locations where childShard1 and childShard2 are placed respectively +CREATE OR REPLACE FUNCTION SplitShardReplicationSetup(targetNode1 integer, targetNode2 integer) RETURNS text AS $$ +DECLARE + memoryId bigint := 0; + memoryIdText text; +begin + SELECT * into memoryId from split_shard_replication_setup(ARRAY[ARRAY[1,2,-2147483648,-1, targetNode1], ARRAY[1,3,0,2147483647,targetNode2]]); + SELECT FORMAT('%s', memoryId) into memoryIdText; + return memoryIdText; +end +$$ LANGUAGE plpgsql; + +CREATE OR REPLACE FUNCTION CreateSubscription(targetNodeId integer, subscriptionName text) RETURNS text AS $$ +DECLARE + replicationSlotName text; + nodeportLocal int; + subname text; +begin + SELECT name into replicationSlotName from slotName_table where nodeId = targetNodeId; + EXECUTE FORMAT($sub$create subscription %s connection 'host=localhost port=57637 user=postgres dbname=regression' publication PUB1 with(create_slot=false, enabled=true, slot_name='%s', copy_data=false)$sub$, subscriptionName, replicationSlotName); + return 'a'; +end +$$ LANGUAGE plpgsql; + +\c - - - :worker_2_port +CREATE OR REPLACE FUNCTION CreateSubscription(targetNodeId integer, subscriptionName text) RETURNS text AS $$ +DECLARE + replicationSlotName text; + nodeportLocal int; + subname text; +begin + SELECT name into replicationSlotName from slotName_table where nodeId = targetNodeId; + EXECUTE FORMAT($sub$create subscription %s connection 'host=localhost port=57637 user=postgres dbname=regression' publication PUB1 with(create_slot=false, enabled=true, slot_name='%s', copy_data=false)$sub$, subscriptionName, replicationSlotName); + return 'a'; +end +$$ LANGUAGE plpgsql; + diff --git a/src/test/regress/sql/split_shard_replication_setup.sql b/src/test/regress/sql/split_shard_replication_setup.sql index 2fb3de983..75e6588de 100644 --- a/src/test/regress/sql/split_shard_replication_setup.sql +++ b/src/test/regress/sql/split_shard_replication_setup.sql @@ -1,5 +1,3 @@ -CREATE SCHEMA citus_split_shard_by_split_points; -SET search_path TO citus_split_shard_by_split_points; SET citus.shard_replication_factor TO 1; SET citus.shard_count TO 1; SET citus.next_shard_id TO 1; @@ -20,63 +18,6 @@ SELECT create_distributed_table('table_to_split','id'); CREATE TABLE slotName_table (name text, nodeId int, id int primary key); SELECT create_distributed_table('slotName_table','id'); --- targetNode1, targetNode2 are the locations where childShard1 and childShard2 are placed respectively -CREATE OR REPLACE FUNCTION SplitShardReplicationSetup(targetNode1 integer, targetNode2 integer) RETURNS text AS $$ -DECLARE - memoryId bigint := 0; - memoryIdText text; -begin - SELECT * into memoryId from split_shard_replication_setup(ARRAY[ARRAY[1,2,-2147483648,-1, targetNode1], ARRAY[1,3,0,2147483647,targetNode2]]); - SELECT FORMAT('%s', memoryId) into memoryIdText; - return memoryIdText; -end -$$ LANGUAGE plpgsql; - --- Create replication slots for targetNode1 and targetNode2 -CREATE OR REPLACE FUNCTION CreateReplicationSlot(targetNode1 integer, targetNode2 integer) RETURNS text AS $$ -DECLARE - targetOneSlotName text; - targetTwoSlotName text; - sharedMemoryId text; - derivedSlotName text; -begin - - SELECT * into sharedMemoryId from SplitShardReplicationSetup(targetNode1, targetNode2); - SELECT FORMAT('%s_%s', targetNode1, sharedMemoryId) into derivedSlotName; - SELECT slot_name into targetOneSlotName from pg_create_logical_replication_slot(derivedSlotName, 'decoding_plugin_for_shard_split'); - - -- if new child shards are placed on different nodes, create one more replication slot - if (targetNode1 != targetNode2) then - SELECT FORMAT('%s_%s', targetNode2, sharedMemoryId) into derivedSlotName; - SELECT slot_name into targetTwoSlotName from pg_create_logical_replication_slot(derivedSlotName, 'decoding_plugin_for_shard_split'); - INSERT INTO slotName_table values(targetTwoSlotName, targetNode2, 1); - end if; - - INSERT INTO slotName_table values(targetOneSlotName, targetNode1, 2); - return targetOneSlotName; -end -$$ LANGUAGE plpgsql; - -CREATE OR REPLACE FUNCTION CreateSubscription(targetNodeId integer, subscriptionName text) RETURNS text AS $$ -DECLARE - replicationSlotName text; - nodeportLocal int; - subname text; -begin - SELECT name into replicationSlotName from slotName_table where nodeId = targetNodeId; - EXECUTE FORMAT($sub$create subscription %s connection 'host=localhost port=57637 user=postgres dbname=regression' publication PUB1 with(create_slot=false, enabled=true, slot_name='%s', copy_data=false)$sub$, subscriptionName, replicationSlotName); - return 'a'; -end -$$ LANGUAGE plpgsql; - -CREATE OR REPLACE FUNCTION DropSubscription(subscriptionName text) RETURNS text AS $$ -DECLARE -begin - EXECUTE FORMAT('DROP SUBSCRIPTION %s', subscriptionName); - return subscriptionName; -end -$$ LANGUAGE plpgsql; - -- Test scenario one starts from here -- 1. table_to_split is a citus distributed table -- 2. Shard table_to_split_1 is located on worker1. @@ -97,7 +38,6 @@ $$ LANGUAGE plpgsql; -- 8. Expect the results in either table_to_split_2 or table_to_split_2 at worker2 \c - - - :worker_2_port -SET search_path TO citus_split_shard_by_split_points; CREATE TABLE table_to_split_1(id bigserial PRIMARY KEY, value char); CREATE TABLE table_to_split_2(id bigserial PRIMARY KEY, value char); CREATE TABLE table_to_split_3(id bigserial PRIMARY KEY, value char); @@ -105,7 +45,6 @@ CREATE TABLE table_to_split_3(id bigserial PRIMARY KEY, value char); -- Create dummy shard tables(table_to_split_2/3) at worker1 -- This is needed for Pub/Sub framework to work. \c - - - :worker_1_port -SET search_path TO citus_split_shard_by_split_points; BEGIN; CREATE TABLE table_to_split_2(id bigserial PRIMARY KEY, value char); CREATE TABLE table_to_split_3(id bigserial PRIMARY KEY, value char); @@ -116,17 +55,15 @@ BEGIN; CREATE PUBLICATION PUB1 for table table_to_split_1, table_to_split_2, table_to_split_3; COMMIT; --- Create replication slot and setup shard split information at worker1 +-- Create replication slot for target node worker2 BEGIN; -select 1 from CreateReplicationSlot(:worker_2_node, :worker_2_node); +select 1 from public.CreateReplicationSlot(:worker_2_node, :worker_2_node); COMMIT; \c - - - :worker_2_port -SET search_path TO citus_split_shard_by_split_points; - -- Create subscription at worker2 with copy_data to 'false' and derived replication slot name BEGIN; -SELECT 1 from CreateSubscription(:worker_2_node, 'SUB1'); +SELECT 1 from public.CreateSubscription(:worker_2_node, 'SUB1'); COMMIT; -- No data is present at this moment in all the below tables at worker2 @@ -137,7 +74,6 @@ select pg_sleep(10); -- Insert data in table_to_split_1 at worker1 \c - - - :worker_1_port -SET search_path TO citus_split_shard_by_split_points; INSERT into table_to_split_1 values(100, 'a'); INSERT into table_to_split_1 values(400, 'a'); INSERT into table_to_split_1 values(500, 'a'); @@ -149,67 +85,58 @@ select pg_sleep(10); -- Expect data to be present in shard 2 and shard 3 based on the hash value. \c - - - :worker_2_port select pg_sleep(10); -SET search_path TO citus_split_shard_by_split_points; SELECT * from table_to_split_1; -- should alwasy have zero rows SELECT * from table_to_split_2; SELECT * from table_to_split_3; -- Delete data from table_to_split_1 from worker1 \c - - - :worker_1_port -SET search_path TO citus_split_shard_by_split_points; DELETE FROM table_to_split_1; SELECT pg_sleep(10); -- Child shard rows should be deleted \c - - - :worker_2_port -SET search_path TO citus_split_shard_by_split_points; SELECT * FROM table_to_split_1; SELECT * FROM table_to_split_2; SELECT * FROM table_to_split_3; -- drop publication from worker1 \c - - - :worker_1_port -SET search_path TO citus_split_shard_by_split_points; drop PUBLICATION PUB1; DELETE FROM slotName_table; \c - - - :worker_2_port -SET search_path TO citus_split_shard_by_split_points; SET client_min_messages TO WARNING; DROP SUBSCRIPTION SUB1; DELETE FROM slotName_table; -- Test scenario two starts from here --- 1. table_to_split_1 is split into table_to_split_2 and table_to_split_3. table_to_split_1 is --- located on worker1. --- table_to_split_2 is located on worker1 and table_to_split_3 is located on worker2 +-- 1. table_to_split_1 is split into table_to_split_2 and table_to_split_3. +-- 2. table_to_split_1 is located on worker1. +-- 3. table_to_split_2 is located on worker1 and table_to_split_3 is located on worker2 \c - - - :worker_1_port -SET search_path TO citus_split_shard_by_split_points; - -- Create publication at worker1 BEGIN; CREATE PUBLICATION PUB1 for table table_to_split_1, table_to_split_2, table_to_split_3; COMMIT; --- Create replication slot and setup shard split information at worker1 --- table_to_split2 is located on Worker1 and table_to_split_3 is located on worker2 +-- Create replication slots for two target nodes worker1 and worker2. +-- Worker1 is target for table_to_split_2 and Worker2 is target for table_to_split_3 BEGIN; -select 1 from CreateReplicationSlot(:worker_1_node, :worker_2_node); +select 1 from public.CreateReplicationSlot(:worker_1_node, :worker_2_node); COMMIT; SELECT pg_sleep(10); -- Create subscription at worker1 with copy_data to 'false' and derived replication slot name BEGIN; -SELECT 1 from CreateSubscription(:worker_1_node, 'SUB1'); +SELECT 1 from public.CreateSubscription(:worker_1_node, 'SUB1'); COMMIT; \c - - - :worker_2_port -SET search_path TO citus_split_shard_by_split_points; - -- Create subscription at worker2 with copy_data to 'false' and derived replication slot name BEGIN; -SELECT 1 from CreateSubscription(:worker_2_node, 'SUB2'); +SELECT 1 from public.CreateSubscription(:worker_2_node, 'SUB2'); COMMIT; -- No data is present at this moment in all the below tables at worker2 @@ -220,36 +147,82 @@ select pg_sleep(10); -- Insert data in table_to_split_1 at worker1 \c - - - :worker_1_port -SET search_path TO citus_split_shard_by_split_points; INSERT into table_to_split_1 values(100, 'a'); INSERT into table_to_split_1 values(400, 'a'); INSERT into table_to_split_1 values(500, 'a'); select pg_sleep(10); +-- expect data to present in table_to_split_2 on worker1 as its destination for value '400' +SELECT * from table_to_split_1; +SELECT * from table_to_split_2; +SELECT * from table_to_split_3; +select pg_sleep(10); + +-- Expect data to be present only in table_to_split3 on worker2 +\c - - - :worker_2_port +select pg_sleep(10); +SELECT * from table_to_split_1; +SELECT * from table_to_split_2; +SELECT * from table_to_split_3; + +-- delete all from table_to_split_1 +\c - - - :worker_1_port +DELETE FROM table_to_split_1; +SELECT pg_sleep(5); + +-- rows from table_to_split_2 should be deleted +SELECT * from table_to_split_2; + +-- rows from table_to_split_3 should be deleted +\c - - - :worker_2_port +SELECT * from table_to_split_3; + + -- drop publication from worker1 +\c - - - :worker_1_port +SET client_min_messages TO WARNING; +DROP PUBLICATION PUB1; +DROP SUBSCRIPTION SUB1; +DELETE FROM slotName_table; + +\c - - - :worker_2_port +SET client_min_messages TO WARNING; +DROP SUBSCRIPTION SUB2; +DELETE FROM slotName_table; + +-- Test Scenario 3 +\c - - - :worker_1_port + +-- Create publication at worker1 +BEGIN; + CREATE PUBLICATION PUB1 for table table_to_split_1, table_to_split_2, table_to_split_3; +COMMIT; + +-- Create replication slots for two target nodes worker1 and worker2. +-- Worker1 is target for table_to_split_2 and Worker2 is target for table_to_split_3 +BEGIN; +select 1 from public.CreateReplicationSlot(:worker_1_node, :worker_1_node); +COMMIT; +SELECT pg_sleep(10); + +-- Create subscription at worker1 with copy_data to 'false' and derived replication slot name +BEGIN; +SELECT 1 from public.CreateSubscription(:worker_1_node, 'SUB1'); +COMMIT; +SELECT pg_sleep(10); + + INSERT into table_to_split_1 values(100, 'a'); +INSERT into table_to_split_1 values(400, 'a'); +INSERT into table_to_split_1 values(500, 'a'); +select pg_sleep(10); + -- expect data to present in table_to_split_2 on worker1 SELECT * from table_to_split_1; SELECT * from table_to_split_2; SELECT * from table_to_split_3; select pg_sleep(10); --- Expect data to be present in table_to_split3 on worker2 -\c - - - :worker_2_port -select pg_sleep(10); -SET search_path TO citus_split_shard_by_split_points; +DELETE FROM table_to_split_1; +SELECT pg_sleep(10); SELECT * from table_to_split_1; SELECT * from table_to_split_2; -SELECT * from table_to_split_3; - --- delete all from table_to_split_1 -\c - - - :worker_1_port -SET search_path TO citus_split_shard_by_split_points; -DELETE FROM table_to_split_1; -SELECT pg_sleep(5); - --- rows from table_to_split_2 should be deleted -SELECT * from table_to_split_2; - --- rows from table_to_split_3 should be deleted -\c - - - :worker_2_port -SET search_path TO citus_split_shard_by_split_points; SELECT * from table_to_split_3; \ No newline at end of file