Removed some methods. Handling review comments

users/saawasek/non_blocking_split_integrated
Sameer Awasekar 2022-05-26 17:53:47 +05:30
parent 4b064fa321
commit 5da75b84ac
10 changed files with 366 additions and 252 deletions

View File

@ -68,8 +68,9 @@ static void SetupHashMapForShardInfo();
* This meta information is stored in a shared memory segment and accessed * This meta information is stored in a shared memory segment and accessed
* by logical decoding plugin. * by logical decoding plugin.
* *
* Split information is given by user as an Array in the below format * Split information is given by user as an Array of source shards undergoing splits
* [{sourceShardId, childShardId, minValue, maxValue, Destination NodeId}] * in the below format.
* Array[Array[sourceShardId, childShardId, minValue, maxValue, Destination NodeId]]
* *
* sourceShardId - id of the shard that is undergoing a split * sourceShardId - id of the shard that is undergoing a split
* childShardId - id of shard that stores a specific range of values * childShardId - id of shard that stores a specific range of values
@ -84,11 +85,20 @@ static void SetupHashMapForShardInfo();
* Multiple shards can be placed on the same destiation node. Source and * Multiple shards can be placed on the same destiation node. Source and
* destinations nodes can be same too. * destinations nodes can be same too.
* *
* Usage Semantics:
* This UDF returns a shared memory handle where the information is stored. This shared memory
* handle is used by caller to encode replication slot name as "NodeId_MemoryHandle" for every
* distinct target node. The same encoded slot name is stored in one of the fields of the
* in-memory data structure(ShardSplitInfo).
*
* There is a 1-1 mapping between a target node and a replication slot as one replication * There is a 1-1 mapping between a target node and a replication slot as one replication
* slot takes care of replicating changes for one node. * slot takes care of replicating changes for one node.
* The 'decoding_plugin_for_shard_split' consumes this information and routes the tuple *
* from the source shard to the appropriate destination shard that falls in the * During the replication phase, 'decoding_plugin_for_shard_split' called for a change on a particular
* respective range. * replication slot, will decode the shared memory handle from its slot name and will attach to the
* shared memory. The plugin consumes the information from shared memory. It routes the tuple
* from the source shard to the appropriate destination shard for which the respective slot is
* responsible.
*/ */
Datum Datum
split_shard_replication_setup(PG_FUNCTION_ARGS) split_shard_replication_setup(PG_FUNCTION_ARGS)
@ -359,9 +369,6 @@ CreateShardSplitInfo(uint64 sourceShardIdToSplit,
desSplitChildShardOid))); desSplitChildShardOid)));
} }
/* Get PartitionColumnIndex for citusTableOid */
int partitionColumnIndex = -1;
/* determine the partition column in the tuple descriptor */ /* determine the partition column in the tuple descriptor */
Var *partitionColumn = cachedTableEntry->partitionColumn; Var *partitionColumn = cachedTableEntry->partitionColumn;
if (partitionColumn == NULL) if (partitionColumn == NULL)
@ -369,6 +376,7 @@ CreateShardSplitInfo(uint64 sourceShardIdToSplit,
ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR),
errmsg("Invalid Partition Column"))); errmsg("Invalid Partition Column")));
} }
int partitionColumnIndex = -1;
partitionColumnIndex = partitionColumn->varattno - 1; partitionColumnIndex = partitionColumn->varattno - 1;
ShardSplitInfo *shardSplitInfo = palloc0(sizeof(ShardSplitInfo)); ShardSplitInfo *shardSplitInfo = palloc0(sizeof(ShardSplitInfo));

View File

@ -12,8 +12,6 @@ safestringlib_srcdir = $(citus_abs_top_srcdir)/vendor/safestringlib
SUBDIRS = . safeclib SUBDIRS = . safeclib
SUBDIRS += SUBDIRS +=
ENSURE_SUBDIRS_EXIST := $(shell mkdir -p $(SUBDIRS)) ENSURE_SUBDIRS_EXIST := $(shell mkdir -p $(SUBDIRS))
#OBJS += \
$(patsubst $(citus_abs_srcdir)/%.c,%.o,$(foreach dir,$(SUBDIRS), $(sort $(wildcard $(citus_abs_srcdir)/$(dir)/*.c))))
OBJS += pgoutput.o OBJS += pgoutput.o
MODULE_big = decoding_plugin_for_shard_split MODULE_big = decoding_plugin_for_shard_split

View File

@ -55,10 +55,8 @@ static int32_t GetHashValueForIncomingTuple(Relation sourceShardRelation,
void void
_PG_output_plugin_init(OutputPluginCallbacks *cb) _PG_output_plugin_init(OutputPluginCallbacks *cb)
{ {
char *plugin = "pgoutput";
LogicalOutputPluginInit plugin_init = LogicalOutputPluginInit plugin_init =
(LogicalOutputPluginInit) load_external_function(plugin, (LogicalOutputPluginInit) load_external_function("pgoutput",
"_PG_output_plugin_init", "_PG_output_plugin_init",
false, NULL); false, NULL);
@ -70,8 +68,8 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
/* ask the output plugin to fill the callback struct */ /* ask the output plugin to fill the callback struct */
plugin_init(cb); plugin_init(cb);
/* actual pgoutput callback will be called with the appropriate destination shard */
pgoutputChangeCB = cb->change_cb; pgoutputChangeCB = cb->change_cb;
cb->change_cb = split_change_cb; cb->change_cb = split_change_cb;
} }
@ -202,8 +200,6 @@ FindTargetRelationOid(Relation sourceShardRelation,
* part of replication. This in turn creates one more commit(2). * part of replication. This in turn creates one more commit(2).
* Commit 2 should be skipped as the source shard and destination for commit 2 * Commit 2 should be skipped as the source shard and destination for commit 2
* are same and the commit has already been applied. * are same and the commit has already been applied.
*
* TODO(saawasek): Add the information in Hashmap for performance reasons.
*/ */
bool bool
ShouldCommitBeApplied(Relation sourceShardRelation) ShouldCommitBeApplied(Relation sourceShardRelation)
@ -240,7 +236,6 @@ split_change_cb(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
* initialized. This gets initialized during the replication of * initialized. This gets initialized during the replication of
* first message. * first message.
*/ */
int arraySize = 0;
if (shardSplitInfoArray == NULL) if (shardSplitInfoArray == NULL)
{ {
shardSplitInfoArray = shardSplitInfoArray =
@ -274,9 +269,11 @@ split_change_cb(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
break; break;
} }
/* Only INSERT/DELETE are visible in the replication path of split shard */ /* Only INSERT/DELETE actions are visible in the replication path of split shard */
default: default:
Assert(false); ereport(ERROR, errmsg(
"Unexpected Action :%d. Expected action is INSERT or DELETE",
change->action));
} }
/* Current replication slot is not responsible for handling the change */ /* Current replication slot is not responsible for handling the change */

View File

@ -26,11 +26,10 @@ static ShardSplitInfoSMHeader * AllocateSharedMemoryForShardSplitInfo(int
dsm_handle * dsm_handle *
dsmHandle); dsmHandle);
static void * ShardSplitInfoSMSteps(ShardSplitInfoSMHeader *shardSplitInfoSMHeader); static void * ShardSplitInfoSMData(ShardSplitInfoSMHeader *shardSplitInfoSMHeader);
static ShardSplitInfoSMHeader * GetShardSplitInfoSMHeaderFromDSMHandle(dsm_handle static ShardSplitInfoSMHeader * GetShardSplitInfoSMHeaderFromDSMHandle(dsm_handle
dsmHandle); dsmHandle);
static dsm_handle GetSMHandleFromSlotName(char *slotName);
/* /*
* GetShardSplitInfoSMHeaderFromDSMHandle returns the header of the shared memory * GetShardSplitInfoSMHeaderFromDSMHandle returns the header of the shared memory
@ -69,50 +68,31 @@ GetShardSplitInfoSMHeaderFromDSMHandle(dsm_handle dsmHandle)
* 'ShardSplitInfo' struct stored in the shared memory segment. * 'ShardSplitInfo' struct stored in the shared memory segment.
*/ */
ShardSplitInfo * ShardSplitInfo *
GetShardSplitInfoSMArrayForSlot(char *slotName, int *arraySize) GetShardSplitInfoSMArrayForSlot(char *slotName, int *shardSplitInfoCount)
{ {
if (slotName == NULL || if (slotName == NULL ||
arraySize == NULL) shardSplitInfoCount == NULL)
{ {
ereport(ERROR, ereport(ERROR,
(errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED), (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
errmsg("Expected slot name and array size arguments"))); errmsg("Expected slot name and array size arguments")));
} }
dsm_handle dsmHandle = GetSMHandleFromSlotName(slotName); dsm_handle dsmHandle;
uint64_t nodeId = 0;
decode_replication_slot(slotName, &nodeId, &dsmHandle);
ShardSplitInfoSMHeader *shardSplitInfoSMHeader = ShardSplitInfoSMHeader *shardSplitInfoSMHeader =
GetShardSplitInfoSMHeaderFromDSMHandle(dsmHandle); GetShardSplitInfoSMHeaderFromDSMHandle(dsmHandle);
*arraySize = shardSplitInfoSMHeader->stepCount; *shardSplitInfoCount = shardSplitInfoSMHeader->shardSplitInfoCount;
ShardSplitInfo *shardSplitInfoArray = ShardSplitInfo *shardSplitInfoArray =
(ShardSplitInfo *) ShardSplitInfoSMSteps(shardSplitInfoSMHeader); (ShardSplitInfo *) ShardSplitInfoSMData(shardSplitInfoSMHeader);
return shardSplitInfoArray; return shardSplitInfoArray;
} }
/*
* GetSMHandleFromSlotName function returns the shared memory handle
* from the replication slot name. Replication slot name is encoded as
* "NODEID_SlotType_SharedMemoryHANDLE".
*/
static dsm_handle
GetSMHandleFromSlotName(char *slotName)
{
if (slotName == NULL)
{
ereport(ERROR,
errmsg("Invalid NULL replication slot name."));
}
uint64_t nodeId = 0;
dsm_handle handle = 0;
decode_replication_slot(slotName, &nodeId, &handle);
return handle;
}
/* /*
* AllocateSharedMemoryForShardSplitInfo is used to create a place to store * AllocateSharedMemoryForShardSplitInfo is used to create a place to store
* information about the shard undergoing a split. The function allocates dynamic * information about the shard undergoing a split. The function allocates dynamic
@ -157,8 +137,7 @@ AllocateSharedMemoryForShardSplitInfo(int shardSplitInfoCount, Size shardSplitIn
ShardSplitInfoSMHeader *shardSplitInfoSMHeader = ShardSplitInfoSMHeader *shardSplitInfoSMHeader =
GetShardSplitInfoSMHeaderFromDSMHandle(*dsmHandle); GetShardSplitInfoSMHeaderFromDSMHandle(*dsmHandle);
shardSplitInfoSMHeader->stepCount = shardSplitInfoCount; shardSplitInfoSMHeader->shardSplitInfoCount = shardSplitInfoCount;
shardSplitInfoSMHeader->processId = MyProcPid;
return shardSplitInfoSMHeader; return shardSplitInfoSMHeader;
} }
@ -180,20 +159,20 @@ CreateSharedMemoryForShardSplitInfo(int shardSplitInfoCount, dsm_handle *dsmHand
sizeof(ShardSplitInfo), sizeof(ShardSplitInfo),
dsmHandle); dsmHandle);
ShardSplitInfo *shardSplitInfoSMArray = ShardSplitInfo *shardSplitInfoSMArray =
(ShardSplitInfo *) ShardSplitInfoSMSteps(shardSplitInfoSMHeader); (ShardSplitInfo *) ShardSplitInfoSMData(shardSplitInfoSMHeader);
return shardSplitInfoSMArray; return shardSplitInfoSMArray;
} }
/* /*
* ShardSplitInfoSMSteps returns a pointer to the array of 'ShardSplitInfo' * ShardSplitInfoSMData returns a pointer to the array of 'ShardSplitInfo'.
* steps that are stored in shared memory segment. This is simply the data * This is simply the data right after the header, so this function is trivial.
* right after the header, so this function is trivial. The main purpose of * The main purpose of this function is to make the intent clear to readers
* this function is to make the intent clear to readers of the code. * of the code.
*/ */
static void * static void *
ShardSplitInfoSMSteps(ShardSplitInfoSMHeader *shardSplitInfoSMHeader) ShardSplitInfoSMData(ShardSplitInfoSMHeader *shardSplitInfoSMHeader)
{ {
return shardSplitInfoSMHeader + 1; return shardSplitInfoSMHeader + 1;
} }
@ -223,7 +202,9 @@ decode_replication_slot(char *slotName,
uint64_t *nodeId, uint64_t *nodeId,
dsm_handle *dsmHandle) dsm_handle *dsmHandle)
{ {
if (slotName == NULL) if (slotName == NULL ||
nodeId == NULL ||
dsmHandle == NULL)
{ {
ereport(ERROR, ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR), (errcode(ERRCODE_SYNTAX_ERROR),

View File

@ -29,16 +29,16 @@ typedef enum SplitOperation
} SplitOperation; } SplitOperation;
/* /*
* In-memory representation of a split child shard. * In-memory mapping of a split child shard.
*/ */
typedef struct ShardSplitInfo typedef struct ShardSplitInfo
{ {
Oid distributedTableOid; /* citus distributed table Oid */ Oid distributedTableOid; /* citus distributed table Oid */
int partitionColumnIndex; int partitionColumnIndex; /* partition column index */
Oid sourceShardOid; /* parent shard Oid */ Oid sourceShardOid; /* parent shard Oid */
Oid splitChildShardOid; /* child shard Oid */ Oid splitChildShardOid; /* child shard Oid */
int32 shardMinValue; int32 shardMinValue; /* min hash value */
int32 shardMaxValue; int32 shardMaxValue; /* max hash value */
uint64 nodeId; /* node where child shard is to be placed */ uint64 nodeId; /* node where child shard is to be placed */
char slotName[NAMEDATALEN]; /* replication slot name belonging to this node */ char slotName[NAMEDATALEN]; /* replication slot name belonging to this node */
} ShardSplitInfo; } ShardSplitInfo;

View File

@ -1,6 +1,6 @@
/*------------------------------------------------------------------------- /*-------------------------------------------------------------------------
* *
* shardsplit_sharedmemory.h * shardsplit_shared_memory.h
* API's for creating and accessing shared memory segments to store * API's for creating and accessing shared memory segments to store
* shard split information. 'setup_shard_replication' UDF creates the * shard split information. 'setup_shard_replication' UDF creates the
* shared memory, populates the contents and WAL sender processes are * shared memory, populates the contents and WAL sender processes are
@ -14,7 +14,6 @@
#ifndef SHARDSPLIT_SHARED_MEMORY_H #ifndef SHARDSPLIT_SHARED_MEMORY_H
#define SHARDSPLIT_SHARED_MEMORY_H #define SHARDSPLIT_SHARED_MEMORY_H
#include "postgres.h"
#include "c.h" #include "c.h"
#include "fmgr.h" #include "fmgr.h"
#include "distributed/shard_split.h" #include "distributed/shard_split.h"
@ -24,8 +23,7 @@
*/ */
typedef struct ShardSplitInfoSMHeader typedef struct ShardSplitInfoSMHeader
{ {
uint64 processId; /* process id creating the shared memory segment */ int shardSplitInfoCount; /* number of elements in the shared memory */
int stepCount; /* number of elements in the shared memory */
} ShardSplitInfoSMHeader; } ShardSplitInfoSMHeader;
@ -33,7 +31,8 @@ typedef struct ShardSplitInfoSMHeader
extern ShardSplitInfo * CreateSharedMemoryForShardSplitInfo(int shardSplitInfoCount, extern ShardSplitInfo * CreateSharedMemoryForShardSplitInfo(int shardSplitInfoCount,
dsm_handle *dsmHandle); dsm_handle *dsmHandle);
extern ShardSplitInfo * GetShardSplitInfoSMArrayForSlot(char *slotName, int *arraySize); extern ShardSplitInfo * GetShardSplitInfoSMArrayForSlot(char *slotName,
int *shardSplitInfoCount);
/* Functions related to encoding-decoding for replication slot name */ /* Functions related to encoding-decoding for replication slot name */

View File

@ -124,3 +124,62 @@ BEGIN
END LOOP; END LOOP;
END; END;
$$ LANGUAGE plpgsql; $$ LANGUAGE plpgsql;
\c - - - :worker_1_port
-- Create replication slots for targetNode1 and targetNode2
CREATE OR REPLACE FUNCTION CreateReplicationSlot(targetNode1 integer, targetNode2 integer) RETURNS text AS $$
DECLARE
targetOneSlotName text;
targetTwoSlotName text;
sharedMemoryId text;
derivedSlotName text;
begin
SELECT * into sharedMemoryId from public.SplitShardReplicationSetup(targetNode1, targetNode2);
SELECT FORMAT('%s_%s', targetNode1, sharedMemoryId) into derivedSlotName;
SELECT slot_name into targetOneSlotName from pg_create_logical_replication_slot(derivedSlotName, 'decoding_plugin_for_shard_split');
-- if new child shards are placed on different nodes, create one more replication slot
if (targetNode1 != targetNode2) then
SELECT FORMAT('%s_%s', targetNode2, sharedMemoryId) into derivedSlotName;
SELECT slot_name into targetTwoSlotName from pg_create_logical_replication_slot(derivedSlotName, 'decoding_plugin_for_shard_split');
INSERT INTO slotName_table values(targetTwoSlotName, targetNode2, 1);
end if;
INSERT INTO slotName_table values(targetOneSlotName, targetNode1, 2);
return targetOneSlotName;
end
$$ LANGUAGE plpgsql;
-- targetNode1, targetNode2 are the locations where childShard1 and childShard2 are placed respectively
CREATE OR REPLACE FUNCTION SplitShardReplicationSetup(targetNode1 integer, targetNode2 integer) RETURNS text AS $$
DECLARE
memoryId bigint := 0;
memoryIdText text;
begin
SELECT * into memoryId from split_shard_replication_setup(ARRAY[ARRAY[1,2,-2147483648,-1, targetNode1], ARRAY[1,3,0,2147483647,targetNode2]]);
SELECT FORMAT('%s', memoryId) into memoryIdText;
return memoryIdText;
end
$$ LANGUAGE plpgsql;
CREATE OR REPLACE FUNCTION CreateSubscription(targetNodeId integer, subscriptionName text) RETURNS text AS $$
DECLARE
replicationSlotName text;
nodeportLocal int;
subname text;
begin
SELECT name into replicationSlotName from slotName_table where nodeId = targetNodeId;
EXECUTE FORMAT($sub$create subscription %s connection 'host=localhost port=xxxxx user=postgres dbname=regression' publication PUB1 with(create_slot=false, enabled=true, slot_name='%s', copy_data=false)$sub$, subscriptionName, replicationSlotName);
return 'a';
end
$$ LANGUAGE plpgsql;
\c - - - :worker_2_port
CREATE OR REPLACE FUNCTION CreateSubscription(targetNodeId integer, subscriptionName text) RETURNS text AS $$
DECLARE
replicationSlotName text;
nodeportLocal int;
subname text;
begin
SELECT name into replicationSlotName from slotName_table where nodeId = targetNodeId;
EXECUTE FORMAT($sub$create subscription %s connection 'host=localhost port=xxxxx user=postgres dbname=regression' publication PUB1 with(create_slot=false, enabled=true, slot_name='%s', copy_data=false)$sub$, subscriptionName, replicationSlotName);
return 'a';
end
$$ LANGUAGE plpgsql;

View File

@ -1,5 +1,3 @@
CREATE SCHEMA citus_split_shard_by_split_points;
SET search_path TO citus_split_shard_by_split_points;
SET citus.shard_replication_factor TO 1; SET citus.shard_replication_factor TO 1;
SET citus.shard_count TO 1; SET citus.shard_count TO 1;
SET citus.next_shard_id TO 1; SET citus.next_shard_id TO 1;
@ -35,59 +33,6 @@ SELECT create_distributed_table('slotName_table','id');
(1 row) (1 row)
-- targetNode1, targetNode2 are the locations where childShard1 and childShard2 are placed respectively
CREATE OR REPLACE FUNCTION SplitShardReplicationSetup(targetNode1 integer, targetNode2 integer) RETURNS text AS $$
DECLARE
memoryId bigint := 0;
memoryIdText text;
begin
SELECT * into memoryId from split_shard_replication_setup(ARRAY[ARRAY[1,2,-2147483648,-1, targetNode1], ARRAY[1,3,0,2147483647,targetNode2]]);
SELECT FORMAT('%s', memoryId) into memoryIdText;
return memoryIdText;
end
$$ LANGUAGE plpgsql;
-- Create replication slots for targetNode1 and targetNode2
CREATE OR REPLACE FUNCTION CreateReplicationSlot(targetNode1 integer, targetNode2 integer) RETURNS text AS $$
DECLARE
targetOneSlotName text;
targetTwoSlotName text;
sharedMemoryId text;
derivedSlotName text;
begin
SELECT * into sharedMemoryId from SplitShardReplicationSetup(targetNode1, targetNode2);
SELECT FORMAT('%s_%s', targetNode1, sharedMemoryId) into derivedSlotName;
SELECT slot_name into targetOneSlotName from pg_create_logical_replication_slot(derivedSlotName, 'decoding_plugin_for_shard_split');
-- if new child shards are placed on different nodes, create one more replication slot
if (targetNode1 != targetNode2) then
SELECT FORMAT('%s_%s', targetNode2, sharedMemoryId) into derivedSlotName;
SELECT slot_name into targetTwoSlotName from pg_create_logical_replication_slot(derivedSlotName, 'decoding_plugin_for_shard_split');
INSERT INTO slotName_table values(targetTwoSlotName, targetNode2, 1);
end if;
INSERT INTO slotName_table values(targetOneSlotName, targetNode1, 2);
return targetOneSlotName;
end
$$ LANGUAGE plpgsql;
CREATE OR REPLACE FUNCTION CreateSubscription(targetNodeId integer, subscriptionName text) RETURNS text AS $$
DECLARE
replicationSlotName text;
nodeportLocal int;
subname text;
begin
SELECT name into replicationSlotName from slotName_table where nodeId = targetNodeId;
EXECUTE FORMAT($sub$create subscription %s connection 'host=localhost port=xxxxx user=postgres dbname=regression' publication PUB1 with(create_slot=false, enabled=true, slot_name='%s', copy_data=false)$sub$, subscriptionName, replicationSlotName);
return 'a';
end
$$ LANGUAGE plpgsql;
CREATE OR REPLACE FUNCTION DropSubscription(subscriptionName text) RETURNS text AS $$
DECLARE
begin
EXECUTE FORMAT('DROP SUBSCRIPTION %s', subscriptionName);
return subscriptionName;
end
$$ LANGUAGE plpgsql;
-- Test scenario one starts from here -- Test scenario one starts from here
-- 1. table_to_split is a citus distributed table -- 1. table_to_split is a citus distributed table
-- 2. Shard table_to_split_1 is located on worker1. -- 2. Shard table_to_split_1 is located on worker1.
@ -107,14 +52,12 @@ $$ LANGUAGE plpgsql;
-- 7. Insert into table_to_split_1 at source worker1 -- 7. Insert into table_to_split_1 at source worker1
-- 8. Expect the results in either table_to_split_2 or table_to_split_2 at worker2 -- 8. Expect the results in either table_to_split_2 or table_to_split_2 at worker2
\c - - - :worker_2_port \c - - - :worker_2_port
SET search_path TO citus_split_shard_by_split_points;
CREATE TABLE table_to_split_1(id bigserial PRIMARY KEY, value char); CREATE TABLE table_to_split_1(id bigserial PRIMARY KEY, value char);
CREATE TABLE table_to_split_2(id bigserial PRIMARY KEY, value char); CREATE TABLE table_to_split_2(id bigserial PRIMARY KEY, value char);
CREATE TABLE table_to_split_3(id bigserial PRIMARY KEY, value char); CREATE TABLE table_to_split_3(id bigserial PRIMARY KEY, value char);
-- Create dummy shard tables(table_to_split_2/3) at worker1 -- Create dummy shard tables(table_to_split_2/3) at worker1
-- This is needed for Pub/Sub framework to work. -- This is needed for Pub/Sub framework to work.
\c - - - :worker_1_port \c - - - :worker_1_port
SET search_path TO citus_split_shard_by_split_points;
BEGIN; BEGIN;
CREATE TABLE table_to_split_2(id bigserial PRIMARY KEY, value char); CREATE TABLE table_to_split_2(id bigserial PRIMARY KEY, value char);
CREATE TABLE table_to_split_3(id bigserial PRIMARY KEY, value char); CREATE TABLE table_to_split_3(id bigserial PRIMARY KEY, value char);
@ -123,9 +66,9 @@ COMMIT;
BEGIN; BEGIN;
CREATE PUBLICATION PUB1 for table table_to_split_1, table_to_split_2, table_to_split_3; CREATE PUBLICATION PUB1 for table table_to_split_1, table_to_split_2, table_to_split_3;
COMMIT; COMMIT;
-- Create replication slot and setup shard split information at worker1 -- Create replication slot for target node worker2
BEGIN; BEGIN;
select 1 from CreateReplicationSlot(:worker_2_node, :worker_2_node); select 1 from public.CreateReplicationSlot(:worker_2_node, :worker_2_node);
?column? ?column?
--------------------------------------------------------------------- ---------------------------------------------------------------------
1 1
@ -133,10 +76,9 @@ select 1 from CreateReplicationSlot(:worker_2_node, :worker_2_node);
COMMIT; COMMIT;
\c - - - :worker_2_port \c - - - :worker_2_port
SET search_path TO citus_split_shard_by_split_points;
-- Create subscription at worker2 with copy_data to 'false' and derived replication slot name -- Create subscription at worker2 with copy_data to 'false' and derived replication slot name
BEGIN; BEGIN;
SELECT 1 from CreateSubscription(:worker_2_node, 'SUB1'); SELECT 1 from public.CreateSubscription(:worker_2_node, 'SUB1');
?column? ?column?
--------------------------------------------------------------------- ---------------------------------------------------------------------
1 1
@ -167,7 +109,6 @@ select pg_sleep(10);
-- Insert data in table_to_split_1 at worker1 -- Insert data in table_to_split_1 at worker1
\c - - - :worker_1_port \c - - - :worker_1_port
SET search_path TO citus_split_shard_by_split_points;
INSERT into table_to_split_1 values(100, 'a'); INSERT into table_to_split_1 values(100, 'a');
INSERT into table_to_split_1 values(400, 'a'); INSERT into table_to_split_1 values(400, 'a');
INSERT into table_to_split_1 values(500, 'a'); INSERT into table_to_split_1 values(500, 'a');
@ -203,7 +144,6 @@ select pg_sleep(10);
(1 row) (1 row)
SET search_path TO citus_split_shard_by_split_points;
SELECT * from table_to_split_1; -- should alwasy have zero rows SELECT * from table_to_split_1; -- should alwasy have zero rows
id | value id | value
--------------------------------------------------------------------- ---------------------------------------------------------------------
@ -224,7 +164,6 @@ SELECT * from table_to_split_3;
-- Delete data from table_to_split_1 from worker1 -- Delete data from table_to_split_1 from worker1
\c - - - :worker_1_port \c - - - :worker_1_port
SET search_path TO citus_split_shard_by_split_points;
DELETE FROM table_to_split_1; DELETE FROM table_to_split_1;
SELECT pg_sleep(10); SELECT pg_sleep(10);
pg_sleep pg_sleep
@ -234,7 +173,6 @@ SELECT pg_sleep(10);
-- Child shard rows should be deleted -- Child shard rows should be deleted
\c - - - :worker_2_port \c - - - :worker_2_port
SET search_path TO citus_split_shard_by_split_points;
SELECT * FROM table_to_split_1; SELECT * FROM table_to_split_1;
id | value id | value
--------------------------------------------------------------------- ---------------------------------------------------------------------
@ -252,28 +190,25 @@ SELECT * FROM table_to_split_3;
-- drop publication from worker1 -- drop publication from worker1
\c - - - :worker_1_port \c - - - :worker_1_port
SET search_path TO citus_split_shard_by_split_points;
drop PUBLICATION PUB1; drop PUBLICATION PUB1;
DELETE FROM slotName_table; DELETE FROM slotName_table;
\c - - - :worker_2_port \c - - - :worker_2_port
SET search_path TO citus_split_shard_by_split_points;
SET client_min_messages TO WARNING; SET client_min_messages TO WARNING;
DROP SUBSCRIPTION SUB1; DROP SUBSCRIPTION SUB1;
DELETE FROM slotName_table; DELETE FROM slotName_table;
-- Test scenario two starts from here -- Test scenario two starts from here
-- 1. table_to_split_1 is split into table_to_split_2 and table_to_split_3. table_to_split_1 is -- 1. table_to_split_1 is split into table_to_split_2 and table_to_split_3.
-- located on worker1. -- 2. table_to_split_1 is located on worker1.
-- table_to_split_2 is located on worker1 and table_to_split_3 is located on worker2 -- 3. table_to_split_2 is located on worker1 and table_to_split_3 is located on worker2
\c - - - :worker_1_port \c - - - :worker_1_port
SET search_path TO citus_split_shard_by_split_points;
-- Create publication at worker1 -- Create publication at worker1
BEGIN; BEGIN;
CREATE PUBLICATION PUB1 for table table_to_split_1, table_to_split_2, table_to_split_3; CREATE PUBLICATION PUB1 for table table_to_split_1, table_to_split_2, table_to_split_3;
COMMIT; COMMIT;
-- Create replication slot and setup shard split information at worker1 -- Create replication slots for two target nodes worker1 and worker2.
-- table_to_split2 is located on Worker1 and table_to_split_3 is located on worker2 -- Worker1 is target for table_to_split_2 and Worker2 is target for table_to_split_3
BEGIN; BEGIN;
select 1 from CreateReplicationSlot(:worker_1_node, :worker_2_node); select 1 from public.CreateReplicationSlot(:worker_1_node, :worker_2_node);
?column? ?column?
--------------------------------------------------------------------- ---------------------------------------------------------------------
1 1
@ -288,7 +223,7 @@ SELECT pg_sleep(10);
-- Create subscription at worker1 with copy_data to 'false' and derived replication slot name -- Create subscription at worker1 with copy_data to 'false' and derived replication slot name
BEGIN; BEGIN;
SELECT 1 from CreateSubscription(:worker_1_node, 'SUB1'); SELECT 1 from public.CreateSubscription(:worker_1_node, 'SUB1');
?column? ?column?
--------------------------------------------------------------------- ---------------------------------------------------------------------
1 1
@ -296,10 +231,9 @@ SELECT 1 from CreateSubscription(:worker_1_node, 'SUB1');
COMMIT; COMMIT;
\c - - - :worker_2_port \c - - - :worker_2_port
SET search_path TO citus_split_shard_by_split_points;
-- Create subscription at worker2 with copy_data to 'false' and derived replication slot name -- Create subscription at worker2 with copy_data to 'false' and derived replication slot name
BEGIN; BEGIN;
SELECT 1 from CreateSubscription(:worker_2_node, 'SUB2'); SELECT 1 from public.CreateSubscription(:worker_2_node, 'SUB2');
?column? ?column?
--------------------------------------------------------------------- ---------------------------------------------------------------------
1 1
@ -330,7 +264,6 @@ select pg_sleep(10);
-- Insert data in table_to_split_1 at worker1 -- Insert data in table_to_split_1 at worker1
\c - - - :worker_1_port \c - - - :worker_1_port
SET search_path TO citus_split_shard_by_split_points;
INSERT into table_to_split_1 values(100, 'a'); INSERT into table_to_split_1 values(100, 'a');
INSERT into table_to_split_1 values(400, 'a'); INSERT into table_to_split_1 values(400, 'a');
INSERT into table_to_split_1 values(500, 'a'); INSERT into table_to_split_1 values(500, 'a');
@ -340,7 +273,7 @@ select pg_sleep(10);
(1 row) (1 row)
-- expect data to present in table_to_split_2 on worker1 -- expect data to present in table_to_split_2 on worker1 as its destination for value '400'
SELECT * from table_to_split_1; SELECT * from table_to_split_1;
id | value id | value
--------------------------------------------------------------------- ---------------------------------------------------------------------
@ -366,7 +299,7 @@ select pg_sleep(10);
(1 row) (1 row)
-- Expect data to be present in table_to_split3 on worker2 -- Expect data to be present only in table_to_split3 on worker2
\c - - - :worker_2_port \c - - - :worker_2_port
select pg_sleep(10); select pg_sleep(10);
pg_sleep pg_sleep
@ -374,7 +307,6 @@ select pg_sleep(10);
(1 row) (1 row)
SET search_path TO citus_split_shard_by_split_points;
SELECT * from table_to_split_1; SELECT * from table_to_split_1;
id | value id | value
--------------------------------------------------------------------- ---------------------------------------------------------------------
@ -394,7 +326,6 @@ SELECT * from table_to_split_3;
-- delete all from table_to_split_1 -- delete all from table_to_split_1
\c - - - :worker_1_port \c - - - :worker_1_port
SET search_path TO citus_split_shard_by_split_points;
DELETE FROM table_to_split_1; DELETE FROM table_to_split_1;
SELECT pg_sleep(5); SELECT pg_sleep(5);
pg_sleep pg_sleep
@ -410,7 +341,112 @@ SELECT * from table_to_split_2;
-- rows from table_to_split_3 should be deleted -- rows from table_to_split_3 should be deleted
\c - - - :worker_2_port \c - - - :worker_2_port
SET search_path TO citus_split_shard_by_split_points; SELECT * from table_to_split_3;
id | value
---------------------------------------------------------------------
(0 rows)
-- drop publication from worker1
\c - - - :worker_1_port
SET client_min_messages TO WARNING;
DROP PUBLICATION PUB1;
DROP SUBSCRIPTION SUB1;
DELETE FROM slotName_table;
\c - - - :worker_2_port
SET client_min_messages TO WARNING;
DROP SUBSCRIPTION SUB2;
DELETE FROM slotName_table;
-- Test Scenario 3
\c - - - :worker_1_port
-- Create publication at worker1
BEGIN;
CREATE PUBLICATION PUB1 for table table_to_split_1, table_to_split_2, table_to_split_3;
COMMIT;
-- Create replication slots for two target nodes worker1 and worker2.
-- Worker1 is target for table_to_split_2 and Worker2 is target for table_to_split_3
BEGIN;
select 1 from public.CreateReplicationSlot(:worker_1_node, :worker_1_node);
?column?
---------------------------------------------------------------------
1
(1 row)
COMMIT;
SELECT pg_sleep(10);
pg_sleep
---------------------------------------------------------------------
(1 row)
-- Create subscription at worker1 with copy_data to 'false' and derived replication slot name
BEGIN;
SELECT 1 from public.CreateSubscription(:worker_1_node, 'SUB1');
?column?
---------------------------------------------------------------------
1
(1 row)
COMMIT;
SELECT pg_sleep(10);
pg_sleep
---------------------------------------------------------------------
(1 row)
INSERT into table_to_split_1 values(100, 'a');
INSERT into table_to_split_1 values(400, 'a');
INSERT into table_to_split_1 values(500, 'a');
select pg_sleep(10);
pg_sleep
---------------------------------------------------------------------
(1 row)
-- expect data to present in table_to_split_2 on worker1
SELECT * from table_to_split_1;
id | value
---------------------------------------------------------------------
100 | a
400 | a
500 | a
(3 rows)
SELECT * from table_to_split_2;
id | value
---------------------------------------------------------------------
400 | a
(1 row)
SELECT * from table_to_split_3;
id | value
---------------------------------------------------------------------
100 | a
500 | a
(2 rows)
select pg_sleep(10);
pg_sleep
---------------------------------------------------------------------
(1 row)
DELETE FROM table_to_split_1;
SELECT pg_sleep(10);
pg_sleep
---------------------------------------------------------------------
(1 row)
SELECT * from table_to_split_1;
id | value
---------------------------------------------------------------------
(0 rows)
SELECT * from table_to_split_2;
id | value
---------------------------------------------------------------------
(0 rows)
SELECT * from table_to_split_3; SELECT * from table_to_split_3;
id | value id | value
--------------------------------------------------------------------- ---------------------------------------------------------------------

View File

@ -135,3 +135,66 @@ BEGIN
END; END;
$$ LANGUAGE plpgsql; $$ LANGUAGE plpgsql;
\c - - - :worker_1_port
-- Create replication slots for targetNode1 and targetNode2
CREATE OR REPLACE FUNCTION CreateReplicationSlot(targetNode1 integer, targetNode2 integer) RETURNS text AS $$
DECLARE
targetOneSlotName text;
targetTwoSlotName text;
sharedMemoryId text;
derivedSlotName text;
begin
SELECT * into sharedMemoryId from public.SplitShardReplicationSetup(targetNode1, targetNode2);
SELECT FORMAT('%s_%s', targetNode1, sharedMemoryId) into derivedSlotName;
SELECT slot_name into targetOneSlotName from pg_create_logical_replication_slot(derivedSlotName, 'decoding_plugin_for_shard_split');
-- if new child shards are placed on different nodes, create one more replication slot
if (targetNode1 != targetNode2) then
SELECT FORMAT('%s_%s', targetNode2, sharedMemoryId) into derivedSlotName;
SELECT slot_name into targetTwoSlotName from pg_create_logical_replication_slot(derivedSlotName, 'decoding_plugin_for_shard_split');
INSERT INTO slotName_table values(targetTwoSlotName, targetNode2, 1);
end if;
INSERT INTO slotName_table values(targetOneSlotName, targetNode1, 2);
return targetOneSlotName;
end
$$ LANGUAGE plpgsql;
-- targetNode1, targetNode2 are the locations where childShard1 and childShard2 are placed respectively
CREATE OR REPLACE FUNCTION SplitShardReplicationSetup(targetNode1 integer, targetNode2 integer) RETURNS text AS $$
DECLARE
memoryId bigint := 0;
memoryIdText text;
begin
SELECT * into memoryId from split_shard_replication_setup(ARRAY[ARRAY[1,2,-2147483648,-1, targetNode1], ARRAY[1,3,0,2147483647,targetNode2]]);
SELECT FORMAT('%s', memoryId) into memoryIdText;
return memoryIdText;
end
$$ LANGUAGE plpgsql;
CREATE OR REPLACE FUNCTION CreateSubscription(targetNodeId integer, subscriptionName text) RETURNS text AS $$
DECLARE
replicationSlotName text;
nodeportLocal int;
subname text;
begin
SELECT name into replicationSlotName from slotName_table where nodeId = targetNodeId;
EXECUTE FORMAT($sub$create subscription %s connection 'host=localhost port=57637 user=postgres dbname=regression' publication PUB1 with(create_slot=false, enabled=true, slot_name='%s', copy_data=false)$sub$, subscriptionName, replicationSlotName);
return 'a';
end
$$ LANGUAGE plpgsql;
\c - - - :worker_2_port
CREATE OR REPLACE FUNCTION CreateSubscription(targetNodeId integer, subscriptionName text) RETURNS text AS $$
DECLARE
replicationSlotName text;
nodeportLocal int;
subname text;
begin
SELECT name into replicationSlotName from slotName_table where nodeId = targetNodeId;
EXECUTE FORMAT($sub$create subscription %s connection 'host=localhost port=57637 user=postgres dbname=regression' publication PUB1 with(create_slot=false, enabled=true, slot_name='%s', copy_data=false)$sub$, subscriptionName, replicationSlotName);
return 'a';
end
$$ LANGUAGE plpgsql;

View File

@ -1,5 +1,3 @@
CREATE SCHEMA citus_split_shard_by_split_points;
SET search_path TO citus_split_shard_by_split_points;
SET citus.shard_replication_factor TO 1; SET citus.shard_replication_factor TO 1;
SET citus.shard_count TO 1; SET citus.shard_count TO 1;
SET citus.next_shard_id TO 1; SET citus.next_shard_id TO 1;
@ -20,63 +18,6 @@ SELECT create_distributed_table('table_to_split','id');
CREATE TABLE slotName_table (name text, nodeId int, id int primary key); CREATE TABLE slotName_table (name text, nodeId int, id int primary key);
SELECT create_distributed_table('slotName_table','id'); SELECT create_distributed_table('slotName_table','id');
-- targetNode1, targetNode2 are the locations where childShard1 and childShard2 are placed respectively
CREATE OR REPLACE FUNCTION SplitShardReplicationSetup(targetNode1 integer, targetNode2 integer) RETURNS text AS $$
DECLARE
memoryId bigint := 0;
memoryIdText text;
begin
SELECT * into memoryId from split_shard_replication_setup(ARRAY[ARRAY[1,2,-2147483648,-1, targetNode1], ARRAY[1,3,0,2147483647,targetNode2]]);
SELECT FORMAT('%s', memoryId) into memoryIdText;
return memoryIdText;
end
$$ LANGUAGE plpgsql;
-- Create replication slots for targetNode1 and targetNode2
CREATE OR REPLACE FUNCTION CreateReplicationSlot(targetNode1 integer, targetNode2 integer) RETURNS text AS $$
DECLARE
targetOneSlotName text;
targetTwoSlotName text;
sharedMemoryId text;
derivedSlotName text;
begin
SELECT * into sharedMemoryId from SplitShardReplicationSetup(targetNode1, targetNode2);
SELECT FORMAT('%s_%s', targetNode1, sharedMemoryId) into derivedSlotName;
SELECT slot_name into targetOneSlotName from pg_create_logical_replication_slot(derivedSlotName, 'decoding_plugin_for_shard_split');
-- if new child shards are placed on different nodes, create one more replication slot
if (targetNode1 != targetNode2) then
SELECT FORMAT('%s_%s', targetNode2, sharedMemoryId) into derivedSlotName;
SELECT slot_name into targetTwoSlotName from pg_create_logical_replication_slot(derivedSlotName, 'decoding_plugin_for_shard_split');
INSERT INTO slotName_table values(targetTwoSlotName, targetNode2, 1);
end if;
INSERT INTO slotName_table values(targetOneSlotName, targetNode1, 2);
return targetOneSlotName;
end
$$ LANGUAGE plpgsql;
CREATE OR REPLACE FUNCTION CreateSubscription(targetNodeId integer, subscriptionName text) RETURNS text AS $$
DECLARE
replicationSlotName text;
nodeportLocal int;
subname text;
begin
SELECT name into replicationSlotName from slotName_table where nodeId = targetNodeId;
EXECUTE FORMAT($sub$create subscription %s connection 'host=localhost port=57637 user=postgres dbname=regression' publication PUB1 with(create_slot=false, enabled=true, slot_name='%s', copy_data=false)$sub$, subscriptionName, replicationSlotName);
return 'a';
end
$$ LANGUAGE plpgsql;
CREATE OR REPLACE FUNCTION DropSubscription(subscriptionName text) RETURNS text AS $$
DECLARE
begin
EXECUTE FORMAT('DROP SUBSCRIPTION %s', subscriptionName);
return subscriptionName;
end
$$ LANGUAGE plpgsql;
-- Test scenario one starts from here -- Test scenario one starts from here
-- 1. table_to_split is a citus distributed table -- 1. table_to_split is a citus distributed table
-- 2. Shard table_to_split_1 is located on worker1. -- 2. Shard table_to_split_1 is located on worker1.
@ -97,7 +38,6 @@ $$ LANGUAGE plpgsql;
-- 8. Expect the results in either table_to_split_2 or table_to_split_2 at worker2 -- 8. Expect the results in either table_to_split_2 or table_to_split_2 at worker2
\c - - - :worker_2_port \c - - - :worker_2_port
SET search_path TO citus_split_shard_by_split_points;
CREATE TABLE table_to_split_1(id bigserial PRIMARY KEY, value char); CREATE TABLE table_to_split_1(id bigserial PRIMARY KEY, value char);
CREATE TABLE table_to_split_2(id bigserial PRIMARY KEY, value char); CREATE TABLE table_to_split_2(id bigserial PRIMARY KEY, value char);
CREATE TABLE table_to_split_3(id bigserial PRIMARY KEY, value char); CREATE TABLE table_to_split_3(id bigserial PRIMARY KEY, value char);
@ -105,7 +45,6 @@ CREATE TABLE table_to_split_3(id bigserial PRIMARY KEY, value char);
-- Create dummy shard tables(table_to_split_2/3) at worker1 -- Create dummy shard tables(table_to_split_2/3) at worker1
-- This is needed for Pub/Sub framework to work. -- This is needed for Pub/Sub framework to work.
\c - - - :worker_1_port \c - - - :worker_1_port
SET search_path TO citus_split_shard_by_split_points;
BEGIN; BEGIN;
CREATE TABLE table_to_split_2(id bigserial PRIMARY KEY, value char); CREATE TABLE table_to_split_2(id bigserial PRIMARY KEY, value char);
CREATE TABLE table_to_split_3(id bigserial PRIMARY KEY, value char); CREATE TABLE table_to_split_3(id bigserial PRIMARY KEY, value char);
@ -116,17 +55,15 @@ BEGIN;
CREATE PUBLICATION PUB1 for table table_to_split_1, table_to_split_2, table_to_split_3; CREATE PUBLICATION PUB1 for table table_to_split_1, table_to_split_2, table_to_split_3;
COMMIT; COMMIT;
-- Create replication slot and setup shard split information at worker1 -- Create replication slot for target node worker2
BEGIN; BEGIN;
select 1 from CreateReplicationSlot(:worker_2_node, :worker_2_node); select 1 from public.CreateReplicationSlot(:worker_2_node, :worker_2_node);
COMMIT; COMMIT;
\c - - - :worker_2_port \c - - - :worker_2_port
SET search_path TO citus_split_shard_by_split_points;
-- Create subscription at worker2 with copy_data to 'false' and derived replication slot name -- Create subscription at worker2 with copy_data to 'false' and derived replication slot name
BEGIN; BEGIN;
SELECT 1 from CreateSubscription(:worker_2_node, 'SUB1'); SELECT 1 from public.CreateSubscription(:worker_2_node, 'SUB1');
COMMIT; COMMIT;
-- No data is present at this moment in all the below tables at worker2 -- No data is present at this moment in all the below tables at worker2
@ -137,7 +74,6 @@ select pg_sleep(10);
-- Insert data in table_to_split_1 at worker1 -- Insert data in table_to_split_1 at worker1
\c - - - :worker_1_port \c - - - :worker_1_port
SET search_path TO citus_split_shard_by_split_points;
INSERT into table_to_split_1 values(100, 'a'); INSERT into table_to_split_1 values(100, 'a');
INSERT into table_to_split_1 values(400, 'a'); INSERT into table_to_split_1 values(400, 'a');
INSERT into table_to_split_1 values(500, 'a'); INSERT into table_to_split_1 values(500, 'a');
@ -149,67 +85,58 @@ select pg_sleep(10);
-- Expect data to be present in shard 2 and shard 3 based on the hash value. -- Expect data to be present in shard 2 and shard 3 based on the hash value.
\c - - - :worker_2_port \c - - - :worker_2_port
select pg_sleep(10); select pg_sleep(10);
SET search_path TO citus_split_shard_by_split_points;
SELECT * from table_to_split_1; -- should alwasy have zero rows SELECT * from table_to_split_1; -- should alwasy have zero rows
SELECT * from table_to_split_2; SELECT * from table_to_split_2;
SELECT * from table_to_split_3; SELECT * from table_to_split_3;
-- Delete data from table_to_split_1 from worker1 -- Delete data from table_to_split_1 from worker1
\c - - - :worker_1_port \c - - - :worker_1_port
SET search_path TO citus_split_shard_by_split_points;
DELETE FROM table_to_split_1; DELETE FROM table_to_split_1;
SELECT pg_sleep(10); SELECT pg_sleep(10);
-- Child shard rows should be deleted -- Child shard rows should be deleted
\c - - - :worker_2_port \c - - - :worker_2_port
SET search_path TO citus_split_shard_by_split_points;
SELECT * FROM table_to_split_1; SELECT * FROM table_to_split_1;
SELECT * FROM table_to_split_2; SELECT * FROM table_to_split_2;
SELECT * FROM table_to_split_3; SELECT * FROM table_to_split_3;
-- drop publication from worker1 -- drop publication from worker1
\c - - - :worker_1_port \c - - - :worker_1_port
SET search_path TO citus_split_shard_by_split_points;
drop PUBLICATION PUB1; drop PUBLICATION PUB1;
DELETE FROM slotName_table; DELETE FROM slotName_table;
\c - - - :worker_2_port \c - - - :worker_2_port
SET search_path TO citus_split_shard_by_split_points;
SET client_min_messages TO WARNING; SET client_min_messages TO WARNING;
DROP SUBSCRIPTION SUB1; DROP SUBSCRIPTION SUB1;
DELETE FROM slotName_table; DELETE FROM slotName_table;
-- Test scenario two starts from here -- Test scenario two starts from here
-- 1. table_to_split_1 is split into table_to_split_2 and table_to_split_3. table_to_split_1 is -- 1. table_to_split_1 is split into table_to_split_2 and table_to_split_3.
-- located on worker1. -- 2. table_to_split_1 is located on worker1.
-- table_to_split_2 is located on worker1 and table_to_split_3 is located on worker2 -- 3. table_to_split_2 is located on worker1 and table_to_split_3 is located on worker2
\c - - - :worker_1_port \c - - - :worker_1_port
SET search_path TO citus_split_shard_by_split_points;
-- Create publication at worker1 -- Create publication at worker1
BEGIN; BEGIN;
CREATE PUBLICATION PUB1 for table table_to_split_1, table_to_split_2, table_to_split_3; CREATE PUBLICATION PUB1 for table table_to_split_1, table_to_split_2, table_to_split_3;
COMMIT; COMMIT;
-- Create replication slot and setup shard split information at worker1 -- Create replication slots for two target nodes worker1 and worker2.
-- table_to_split2 is located on Worker1 and table_to_split_3 is located on worker2 -- Worker1 is target for table_to_split_2 and Worker2 is target for table_to_split_3
BEGIN; BEGIN;
select 1 from CreateReplicationSlot(:worker_1_node, :worker_2_node); select 1 from public.CreateReplicationSlot(:worker_1_node, :worker_2_node);
COMMIT; COMMIT;
SELECT pg_sleep(10); SELECT pg_sleep(10);
-- Create subscription at worker1 with copy_data to 'false' and derived replication slot name -- Create subscription at worker1 with copy_data to 'false' and derived replication slot name
BEGIN; BEGIN;
SELECT 1 from CreateSubscription(:worker_1_node, 'SUB1'); SELECT 1 from public.CreateSubscription(:worker_1_node, 'SUB1');
COMMIT; COMMIT;
\c - - - :worker_2_port \c - - - :worker_2_port
SET search_path TO citus_split_shard_by_split_points;
-- Create subscription at worker2 with copy_data to 'false' and derived replication slot name -- Create subscription at worker2 with copy_data to 'false' and derived replication slot name
BEGIN; BEGIN;
SELECT 1 from CreateSubscription(:worker_2_node, 'SUB2'); SELECT 1 from public.CreateSubscription(:worker_2_node, 'SUB2');
COMMIT; COMMIT;
-- No data is present at this moment in all the below tables at worker2 -- No data is present at this moment in all the below tables at worker2
@ -220,36 +147,82 @@ select pg_sleep(10);
-- Insert data in table_to_split_1 at worker1 -- Insert data in table_to_split_1 at worker1
\c - - - :worker_1_port \c - - - :worker_1_port
SET search_path TO citus_split_shard_by_split_points;
INSERT into table_to_split_1 values(100, 'a'); INSERT into table_to_split_1 values(100, 'a');
INSERT into table_to_split_1 values(400, 'a'); INSERT into table_to_split_1 values(400, 'a');
INSERT into table_to_split_1 values(500, 'a'); INSERT into table_to_split_1 values(500, 'a');
select pg_sleep(10); select pg_sleep(10);
-- expect data to present in table_to_split_2 on worker1 as its destination for value '400'
SELECT * from table_to_split_1;
SELECT * from table_to_split_2;
SELECT * from table_to_split_3;
select pg_sleep(10);
-- Expect data to be present only in table_to_split3 on worker2
\c - - - :worker_2_port
select pg_sleep(10);
SELECT * from table_to_split_1;
SELECT * from table_to_split_2;
SELECT * from table_to_split_3;
-- delete all from table_to_split_1
\c - - - :worker_1_port
DELETE FROM table_to_split_1;
SELECT pg_sleep(5);
-- rows from table_to_split_2 should be deleted
SELECT * from table_to_split_2;
-- rows from table_to_split_3 should be deleted
\c - - - :worker_2_port
SELECT * from table_to_split_3;
-- drop publication from worker1
\c - - - :worker_1_port
SET client_min_messages TO WARNING;
DROP PUBLICATION PUB1;
DROP SUBSCRIPTION SUB1;
DELETE FROM slotName_table;
\c - - - :worker_2_port
SET client_min_messages TO WARNING;
DROP SUBSCRIPTION SUB2;
DELETE FROM slotName_table;
-- Test Scenario 3
\c - - - :worker_1_port
-- Create publication at worker1
BEGIN;
CREATE PUBLICATION PUB1 for table table_to_split_1, table_to_split_2, table_to_split_3;
COMMIT;
-- Create replication slots for two target nodes worker1 and worker2.
-- Worker1 is target for table_to_split_2 and Worker2 is target for table_to_split_3
BEGIN;
select 1 from public.CreateReplicationSlot(:worker_1_node, :worker_1_node);
COMMIT;
SELECT pg_sleep(10);
-- Create subscription at worker1 with copy_data to 'false' and derived replication slot name
BEGIN;
SELECT 1 from public.CreateSubscription(:worker_1_node, 'SUB1');
COMMIT;
SELECT pg_sleep(10);
INSERT into table_to_split_1 values(100, 'a');
INSERT into table_to_split_1 values(400, 'a');
INSERT into table_to_split_1 values(500, 'a');
select pg_sleep(10);
-- expect data to present in table_to_split_2 on worker1 -- expect data to present in table_to_split_2 on worker1
SELECT * from table_to_split_1; SELECT * from table_to_split_1;
SELECT * from table_to_split_2; SELECT * from table_to_split_2;
SELECT * from table_to_split_3; SELECT * from table_to_split_3;
select pg_sleep(10); select pg_sleep(10);
-- Expect data to be present in table_to_split3 on worker2 DELETE FROM table_to_split_1;
\c - - - :worker_2_port SELECT pg_sleep(10);
select pg_sleep(10);
SET search_path TO citus_split_shard_by_split_points;
SELECT * from table_to_split_1; SELECT * from table_to_split_1;
SELECT * from table_to_split_2; SELECT * from table_to_split_2;
SELECT * from table_to_split_3; SELECT * from table_to_split_3;
-- delete all from table_to_split_1
\c - - - :worker_1_port
SET search_path TO citus_split_shard_by_split_points;
DELETE FROM table_to_split_1;
SELECT pg_sleep(5);
-- rows from table_to_split_2 should be deleted
SELECT * from table_to_split_2;
-- rows from table_to_split_3 should be deleted
\c - - - :worker_2_port
SET search_path TO citus_split_shard_by_split_points;
SELECT * from table_to_split_3;