Change return type, shard range as text

users/saawasek/non_blocking_split_integrated
Sameer Awasekar 2022-06-20 19:46:52 +05:30
parent b66067d09f
commit a23beeb43f
18 changed files with 144 additions and 255 deletions

View File

@ -52,8 +52,7 @@ static ShardSplitInfo * CreateShardSplitInfo(uint64 sourceShardIdToSplit,
int32 nodeId);
static void AddShardSplitInfoEntryForNodeInMap(ShardSplitInfo *shardSplitInfo);
static void PopulateShardSplitInfoInSM(ShardSplitInfoSMHeader *shardSplitInfoSMHeader,
HTAB *shardInfoHashMap,
dsm_handle dsmHandle);
HTAB *shardInfoHashMap);
static void SetupHashMapForShardInfo(void);
static uint32 NodeShardMappingHash(const void *key, Size keysize);
@ -150,13 +149,12 @@ worker_split_shard_replication_setup(PG_FUNCTION_ARGS)
CreateSharedMemoryForShardSplitInfo(shardSplitInfoCount, &dsmHandle);
PopulateShardSplitInfoInSM(splitShardInfoSMHeader,
ShardInfoHashMap,
dsmHandle);
ShardInfoHashMap);
/* store handle in statically allocated shared memory*/
StoreSharedMemoryHandle(dsmHandle);
return dsmHandle;
PG_RETURN_VOID();
}
@ -220,8 +218,13 @@ CreateShardSplitInfo(uint64 sourceShardIdToSplit,
if (!IsCitusTableTypeCacheEntry(cachedTableEntry, HASH_DISTRIBUTED))
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("Cannot Support the feature")));
Relation distributedRel = RelationIdGetRelation(cachedTableEntry->relationId);
ereport(ERROR, (errmsg(
"Citus does only support Hash distributed tables to be split."),
errdetail("Table '%s' is not Hash distributed",
RelationGetRelationName(distributedRel))
));
RelationClose(distributedRel);
}
Assert(shardIntervalToSplit->minValueExists);
@ -241,8 +244,8 @@ CreateShardSplitInfo(uint64 sourceShardIdToSplit,
destSplitChildShardOid == InvalidOid)
{
ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR),
errmsg("Invalid citusTableOid:%u "
"sourceShardToSplitOid: %u,"
errmsg("Invalid citusTableOid:%u, "
"sourceShardToSplitOid:%u, "
"destSplitChildShardOid:%u ",
citusTableOid,
sourceShardToSplitOid,
@ -288,7 +291,7 @@ AddShardSplitInfoEntryForNodeInMap(ShardSplitInfo *shardSplitInfo)
&found);
if (!found)
{
nodeMappingEntry->shardSplitInfoList = NULL;
nodeMappingEntry->shardSplitInfoList = NIL;
}
nodeMappingEntry->shardSplitInfoList =
@ -305,13 +308,10 @@ AddShardSplitInfoEntryForNodeInMap(ShardSplitInfo *shardSplitInfo)
*
* shardInfoHashMap - Hashmap containing parsed split information
* per nodeId wise
*
* dsmHandle - Shared memory segment handle
*/
static void
PopulateShardSplitInfoInSM(ShardSplitInfoSMHeader *shardSplitInfoSMHeader,
HTAB *shardInfoHashMap,
dsm_handle dsmHandle)
HTAB *shardInfoHashMap)
{
HASH_SEQ_STATUS status;
hash_seq_init(&status, shardInfoHashMap);
@ -323,23 +323,15 @@ PopulateShardSplitInfoInSM(ShardSplitInfoSMHeader *shardSplitInfoSMHeader,
uint32_t nodeId = entry->key.nodeId;
uint32_t tableOwnerId = entry->key.tableOwnerId;
char *derivedSlotName =
encode_replication_slot(nodeId, dsmHandle, tableOwnerId);
encode_replication_slot(nodeId, tableOwnerId);
List *shardSplitInfoList = entry->shardSplitInfoList;
ShardSplitInfo *splitShardInfo = NULL;
foreach_ptr(splitShardInfo, shardSplitInfoList)
{
ShardSplitInfo *shardInfoInSM =
&shardSplitInfoSMHeader->splitInfoArray[index];
shardInfoInSM->distributedTableOid = splitShardInfo->distributedTableOid;
shardInfoInSM->partitionColumnIndex = splitShardInfo->partitionColumnIndex;
shardInfoInSM->sourceShardOid = splitShardInfo->sourceShardOid;
shardInfoInSM->splitChildShardOid = splitShardInfo->splitChildShardOid;
shardInfoInSM->shardMinValue = splitShardInfo->shardMinValue;
shardInfoInSM->shardMaxValue = splitShardInfo->shardMaxValue;
shardInfoInSM->nodeId = splitShardInfo->nodeId;
strcpy_s(shardInfoInSM->slotName, NAMEDATALEN, derivedSlotName);
shardSplitInfoSMHeader->splitInfoArray[index] = *splitShardInfo;
strcpy_s(shardSplitInfoSMHeader->splitInfoArray[index].slotName, NAMEDATALEN,
derivedSlotName);
index++;
}
}
@ -397,7 +389,6 @@ ParseShardSplitInfoFromDatum(Datum shardSplitInfoDatum,
{
ereport(ERROR, (errmsg("source_shard_id for split_shard_info can't be null")));
}
*sourceShardId = DatumGetUInt64(sourceShardIdDatum);
Datum childShardIdDatum = GetAttributeByName(dataTuple, "child_shard_id", &isnull);
@ -405,7 +396,6 @@ ParseShardSplitInfoFromDatum(Datum shardSplitInfoDatum,
{
ereport(ERROR, (errmsg("child_shard_id for split_shard_info can't be null")));
}
*childShardId = DatumGetUInt64(childShardIdDatum);
Datum minValueDatum = GetAttributeByName(dataTuple, "shard_min_value", &isnull);
@ -413,16 +403,16 @@ ParseShardSplitInfoFromDatum(Datum shardSplitInfoDatum,
{
ereport(ERROR, (errmsg("shard_min_value for split_shard_info can't be null")));
}
*minValue = DatumGetInt32(minValueDatum);
char *shardMinValueString = text_to_cstring(DatumGetTextP(minValueDatum));
*minValue = SafeStringToInt32(shardMinValueString);
Datum maxValueDatum = GetAttributeByName(dataTuple, "shard_max_value", &isnull);
if (isnull)
{
ereport(ERROR, (errmsg("shard_max_value for split_shard_info can't be null")));
}
*maxValue = DatumGetInt32(maxValueDatum);
char *shardMaxValueString = text_to_cstring(DatumGetTextP(maxValueDatum));
*maxValue = SafeStringToInt32(shardMaxValueString);
Datum nodeIdDatum = GetAttributeByName(dataTuple, "node_id", &isnull);
if (isnull)

View File

@ -6,7 +6,7 @@
* Copyright (c) 2012-2017, PostgreSQL Global Development Group
*
* IDENTIFICATION
* src/backend/distributed/shardsplit/pgoutput.c
* src/backend/distributed/shardsplit/shardsplit_decoder.c
*
*-------------------------------------------------------------------------
*/
@ -24,8 +24,9 @@ PG_MODULE_MAGIC;
extern void _PG_output_plugin_init(OutputPluginCallbacks *cb);
static LogicalDecodeChangeCB pgoutputChangeCB;
static ShardSplitInfoSMHeader *shardSplitInfoSMHeader = NULL;
static ShardSplitInfoForReplicationSlot *shardSplitInfoForSlot = NULL;
static ShardSplitInfoSMHeader *ShardSplitInfo_SMHeader = NULL;
static ShardSplitInfoForReplicationSlot *ShardSplitInfoForSlot = NULL;
/* Plugin callback */
static void split_change_cb(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
@ -41,6 +42,11 @@ static Oid FindTargetRelationOid(Relation sourceShardRelation,
HeapTuple tuple,
char *currentSlotName);
/*
* Postgres uses 'pgoutput' as default plugin for logical replication.
* We want to reuse Postgres pgoutput's functionality as much as possible.
* Hence we load all the functions of this plugin and override as required.
*/
void
_PG_output_plugin_init(OutputPluginCallbacks *cb)
{
@ -79,10 +85,10 @@ GetHashValueForIncomingTuple(Relation sourceShardRelation,
Oid distributedTableOid = InvalidOid;
Oid sourceShardOid = sourceShardRelation->rd_id;
for (int i = shardSplitInfoForSlot->startIndex; i <= shardSplitInfoForSlot->endIndex;
for (int i = ShardSplitInfoForSlot->startIndex; i <= ShardSplitInfoForSlot->endIndex;
i++)
{
shardSplitInfo = &shardSplitInfoSMHeader->splitInfoArray[i];
shardSplitInfo = &ShardSplitInfo_SMHeader->splitInfoArray[i];
if (shardSplitInfo->sourceShardOid == sourceShardOid)
{
distributedTableOid = shardSplitInfo->distributedTableOid;
@ -149,18 +155,18 @@ FindTargetRelationOid(Relation sourceShardRelation,
Oid targetRelationOid = InvalidOid;
Oid sourceShardRelationOid = sourceShardRelation->rd_id;
bool bShouldHandleUpdate = false;
bool shouldHandleUpdate = false;
int hashValue = GetHashValueForIncomingTuple(sourceShardRelation, tuple,
&bShouldHandleUpdate);
if (bShouldHandleUpdate == false)
&shouldHandleUpdate);
if (shouldHandleUpdate == false)
{
return InvalidOid;
}
for (int i = shardSplitInfoForSlot->startIndex; i <= shardSplitInfoForSlot->endIndex;
for (int i = ShardSplitInfoForSlot->startIndex; i <= ShardSplitInfoForSlot->endIndex;
i++)
{
ShardSplitInfo *shardSplitInfo = &shardSplitInfoSMHeader->splitInfoArray[i];
ShardSplitInfo *shardSplitInfo = &ShardSplitInfo_SMHeader->splitInfoArray[i];
/*
* Each commit message is processed by all the configured replication slots.
@ -193,11 +199,11 @@ bool
IsCommitRecursive(Relation sourceShardRelation)
{
Oid sourceShardOid = sourceShardRelation->rd_id;
for (int i = shardSplitInfoForSlot->startIndex; i <= shardSplitInfoForSlot->endIndex;
for (int i = ShardSplitInfoForSlot->startIndex; i <= ShardSplitInfoForSlot->endIndex;
i++)
{
/* skip the commit when destination is equal to the source */
ShardSplitInfo *shardSplitInfo = &shardSplitInfoSMHeader->splitInfoArray[i];
ShardSplitInfo *shardSplitInfo = &ShardSplitInfo_SMHeader->splitInfoArray[i];
if (sourceShardOid == shardSplitInfo->splitChildShardOid)
{
return true;
@ -225,11 +231,11 @@ split_change_cb(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
* Get ShardSplitInfoForSlot if not already initialized.
* This gets initialized during the replication of first message.
*/
if (shardSplitInfoForSlot == NULL)
if (ShardSplitInfoForSlot == NULL)
{
shardSplitInfoForSlot = PopulateShardSplitInfoForReplicationSlot(
ShardSplitInfoForSlot = PopulateShardSplitInfoForReplicationSlot(
ctx->slot->data.name.data);
shardSplitInfoSMHeader = shardSplitInfoForSlot->shardSplitInfoHeader;
ShardSplitInfo_SMHeader = ShardSplitInfoForSlot->shardSplitInfoHeader;
}
if (IsCommitRecursive(relation))

View File

@ -170,7 +170,6 @@ CreateSharedMemoryForShardSplitInfo(int shardSplitInfoCount, dsm_handle *dsmHand
*/
char *
encode_replication_slot(uint32_t nodeId,
dsm_handle dsmHandle,
uint32_t tableOwnerId)
{
StringInfo slotName = makeStringInfo();

View File

@ -5,14 +5,14 @@ DROP FUNCTION IF EXISTS pg_catalog.worker_split_shard_replication_setup;
CREATE TYPE citus.split_shard_info AS (
source_shard_id bigint,
child_shard_id bigint,
shard_min_value integer,
shard_max_value integer,
shard_min_value text,
shard_max_value text,
node_id integer);
CREATE OR REPLACE FUNCTION pg_catalog.worker_split_shard_replication_setup(
splitShardInfo citus.split_shard_info[])
RETURNS bigint
RETURNS void
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$worker_split_shard_replication_setup$$;
COMMENT ON FUNCTION pg_catalog.worker_split_shard_replication_setup(splitShardInfo citus.split_shard_info[])

View File

@ -5,14 +5,14 @@ DROP FUNCTION IF EXISTS pg_catalog.worker_split_shard_replication_setup;
CREATE TYPE citus.split_shard_info AS (
source_shard_id bigint,
child_shard_id bigint,
shard_min_value integer,
shard_max_value integer,
shard_min_value text,
shard_max_value text,
node_id integer);
CREATE OR REPLACE FUNCTION pg_catalog.worker_split_shard_replication_setup(
splitShardInfo citus.split_shard_info[])
RETURNS bigint
RETURNS void
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$worker_split_shard_replication_setup$$;
COMMENT ON FUNCTION pg_catalog.worker_split_shard_replication_setup(splitShardInfo citus.split_shard_info[])

View File

@ -295,3 +295,50 @@ SafeSnprintf(char *restrict buffer, rsize_t bufsz, const char *restrict format,
va_end(args);
return result;
}
/*
* SafeStringToInt32 converts a string containing a number to a int32. When it
* fails it calls ereport.
*
* The different error cases are inspired by
* https://stackoverflow.com/a/26083517/2570866
*/
int32
SafeStringToInt32(const char *str)
{
char *endptr;
errno = 0;
long number = strtol(str, &endptr, 10);
if (str == endptr)
{
ereport(ERROR, (errmsg("Error parsing %s as int32, no digits found\n", str)));
}
else if ((errno == ERANGE && number == LONG_MIN) || number < INT32_MIN)
{
ereport(ERROR, (errmsg("Error parsing %s as int32, underflow occurred\n", str)));
}
else if ((errno == ERANGE && number == LONG_MAX) || number > INT32_MAX)
{
ereport(ERROR, (errmsg("Error parsing %s as int32, overflow occurred\n", str)));
}
else if (errno == EINVAL)
{
ereport(ERROR, (errmsg(
"Error parsing %s as int32, base contains unsupported value\n",
str)));
}
else if (errno != 0 && number == 0)
{
int err = errno;
ereport(ERROR, (errmsg("Error parsing %s as int32, errno %d\n", str, err)));
}
else if (errno == 0 && str && *endptr != '\0')
{
ereport(ERROR, (errmsg(
"Error parsing %s as int32, aditional characters remain after int32\n",
str)));
}
return number;
}

View File

@ -19,6 +19,7 @@
extern void ereport_constraint_handler(const char *message, void *pointer, errno_t error);
extern int64 SafeStringToInt64(const char *str);
extern int32 SafeStringToInt32(const char *str);
extern uint64 SafeStringToUint64(const char *str);
extern void SafeQsort(void *ptr, rsize_t count, rsize_t size,
int (*comp)(const void *, const void *));

View File

@ -66,7 +66,5 @@ extern ShardSplitInfoForReplicationSlot * PopulateShardSplitInfoForReplicationSl
char *slotName);
char * encode_replication_slot(uint32_t nodeId,
dsm_handle dsmHandle,
uint32_t tableOwnerId);
char * encode_replication_slot(uint32_t nodeId, uint32_t tableOwnerId);
#endif /* SHARDSPLIT_SHARED_MEMORY_H */

View File

@ -65,10 +65,10 @@ SET search_path TO split_shard_replication_setup_schema;
CREATE PUBLICATION pub1 FOR TABLE table_first_4, table_first_5, table_first_6;
CREATE PUBLICATION pub2 FOR TABLE table_second_7, table_second_8, table_second_9;
SELECT worker_split_shard_replication_setup(ARRAY[
ROW(4, 5, -2147483648,-1, :worker_2_node)::citus.split_shard_info,
ROW(4, 6, 0 ,2147483647, :worker_2_node)::citus.split_shard_info,
ROW(7, 8, -2147483648,-1, :worker_2_node)::citus.split_shard_info,
ROW(7, 9, 0, 2147483647 , :worker_2_node)::citus.split_shard_info
ROW(4, 5, '-2147483648', '-1', :worker_2_node)::citus.split_shard_info,
ROW(4, 6, '0', '2147483647', :worker_2_node)::citus.split_shard_info,
ROW(7, 8, '-2147483648', '-1', :worker_2_node)::citus.split_shard_info,
ROW(7, 9, '0', '2147483647', :worker_2_node)::citus.split_shard_info
]) AS shared_memory_id \gset
WARNING: As a part of split shard workflow,unexpectedly found a valid shared memory handle while storing a new one.
SELECT relowner AS table_owner_one FROM pg_class WHERE relname='table_first' \gset
@ -127,6 +127,7 @@ SELECT pg_sleep(2);
(1 row)
-- expect data in table_first_5/6
\c - myuser - :worker_2_port
SET search_path TO split_shard_replication_setup_schema;
SELECT * FROM table_first_4;

View File

@ -13,15 +13,6 @@ SELECT create_distributed_table('table_to_split','id');
(1 row)
-- slotName_table is used to persist replication slot name.
-- It is only used for testing as the worker2 needs to create subscription over the same replication slot.
CREATE TABLE slotName_table (name text, nodeId int, id int primary key);
SELECT create_distributed_table('slotName_table','id');
create_distributed_table
---------------------------------------------------------------------
(1 row)
-- Test scenario one starts from here
-- 1. table_to_split is a citus distributed table
-- 2. Shard table_to_split_1 is located on worker1.
@ -29,11 +20,11 @@ SELECT create_distributed_table('slotName_table','id');
-- table_to_split_2/3 are located on worker2
-- 4. execute UDF split_shard_replication_setup on worker1 with below
-- params:
-- split_shard_replication_setup
-- worker_split_shard_replication_setup
-- (
-- ARRAY[
-- ARRAY[1 /*source shardId */, 2 /* new shardId */,-2147483648 /* minHashValue */, -1 /* maxHasValue */ , 18 /* nodeId where new shard is placed */ ],
-- ARRAY[1, 3 , 0 , 2147483647, 18 ]
-- ROW(1 /*source shardId */, 2 /* new shardId */,-2147483648 /* minHashValue */, -1 /* maxHasValue */ , 18 /* nodeId where new shard is placed */ ),
-- ROW(1, 3 , 0 , 2147483647, 18 )
-- ]
-- );
-- 5. Create Replication slot with 'decoding_plugin_for_shard_split'
@ -54,8 +45,8 @@ CREATE TABLE table_to_split_3(id bigserial PRIMARY KEY, value char);
-- Create publication at worker1
CREATE PUBLICATION pub1 FOR TABLE table_to_split_1, table_to_split_2, table_to_split_3;
SELECT worker_split_shard_replication_setup(ARRAY[
ROW(1,2,-2147483648,-1, :worker_2_node)::citus.split_shard_info,
ROW(1,3,0,2147483647, :worker_2_node)::citus.split_shard_info
ROW(1, 2, '-2147483648', '-1', :worker_2_node)::citus.split_shard_info,
ROW(1, 3, '0', '2147483647', :worker_2_node)::citus.split_shard_info
]) AS shared_memory_id \gset
SELECT slot_name FROM pg_create_logical_replication_slot(FORMAT('citus_split_%s_10', :worker_2_node), 'decoding_plugin_for_shard_split') \gset
-- Create subscription at worker2 with copy_data to 'false' and derived replication slot name
@ -69,7 +60,7 @@ CREATE SUBSCRIPTION sub1
enabled=true,
slot_name=:slot_name,
copy_data=false);
select pg_sleep(5);
SELECT pg_sleep(5);
pg_sleep
---------------------------------------------------------------------
@ -206,9 +197,7 @@ SELECT * FROM table_to_split_3;
\c - - - :worker_1_port
SET search_path TO split_shard_replication_setup_schema;
DROP PUBLICATION pub1;
DELETE FROM slotName_table;
\c - - - :worker_2_port
SET search_path TO split_shard_replication_setup_schema;
SET client_min_messages TO ERROR;
DROP SUBSCRIPTION sub1;
DELETE FROM slotName_table;

View File

@ -1,4 +1,4 @@
-- Test scenario three starts from here (parent shard and child shards are located on same machine)
-- Test scenario (parent shard and child shards are located on same machine)
-- 1. table_to_split_1 is split into table_to_split_2 and table_to_split_3.
-- 2. table_to_split_1 is located on worker1.
-- 3. table_to_split_2 and table_to_split_3 are located on worker1
@ -11,8 +11,8 @@ SET client_min_messages TO ERROR;
CREATE PUBLICATION pub1 for table table_to_split_1, table_to_split_2, table_to_split_3;
-- Worker1 is target for table_to_split_2 and table_to_split_3
SELECT worker_split_shard_replication_setup(ARRAY[
ROW(1,2,-2147483648,-1, :worker_1_node)::citus.split_shard_info,
ROW(1,3,0,2147483647, :worker_1_node)::citus.split_shard_info
ROW(1, 2, '-2147483648', '-1', :worker_1_node)::citus.split_shard_info,
ROW(1, 3, '0', '2147483647', :worker_1_node)::citus.split_shard_info
]) AS shared_memory_id \gset
SELECT slot_name AS local_slot FROM pg_create_logical_replication_slot(FORMAT('citus_split_%s_10', :worker_1_node), 'decoding_plugin_for_shard_split') \gset
-- Create subscription at worker1 with copy_data to 'false' a
@ -88,4 +88,3 @@ SELECT * FROM table_to_split_3;
-- clean up
DROP SUBSCRIPTION local_subscription;
DROP PUBLICATION pub1;
DELETE FROM slotName_table;

View File

@ -1,4 +1,4 @@
-- Test scenario two starts from here
-- Test scenario (Parent and one child on same node. Other child on different node)
-- 1. table_to_split_1 is split into table_to_split_2 and table_to_split_3.
-- 2. table_to_split_1 is located on worker1.
-- 3. table_to_split_2 is located on worker1 and table_to_split_3 is located on worker2
@ -9,8 +9,8 @@ SET search_path TO split_shard_replication_setup_schema;
-- Create publication at worker1
CREATE PUBLICATION pub1 FOR TABLE table_to_split_1, table_to_split_2, table_to_split_3;
SELECT worker_split_shard_replication_setup(ARRAY[
ROW(1,2,-2147483648,-1, :worker_1_node)::citus.split_shard_info,
ROW(1,3,0,2147483647, :worker_2_node)::citus.split_shard_info
ROW(1, 2, '-2147483648', '-1', :worker_1_node)::citus.split_shard_info,
ROW(1, 3, '0', '2147483647', :worker_2_node)::citus.split_shard_info
]) AS shared_memory_id \gset
WARNING: As a part of split shard workflow,unexpectedly found a valid shared memory handle while storing a new one.
SELECT slot_name AS slot_for_worker1 FROM pg_create_logical_replication_slot(FORMAT('citus_split_%s_10', :worker_1_node), 'decoding_plugin_for_shard_split') \gset
@ -24,7 +24,7 @@ CREATE SUBSCRIPTION sub_worker1
enabled=true,
slot_name=:slot_for_worker1,
copy_data=false);
select pg_sleep(5);
SELECT pg_sleep(5);
pg_sleep
---------------------------------------------------------------------
@ -41,7 +41,7 @@ CREATE SUBSCRIPTION sub_worker2
enabled=true,
slot_name=:slot_for_worker2,
copy_data=false);
select pg_sleep(5);
SELECT pg_sleep(5);
pg_sleep
---------------------------------------------------------------------
@ -70,7 +70,7 @@ INSERT INTO table_to_split_1 VALUES(100, 'a');
INSERT INTO table_to_split_1 VALUES(400, 'a');
INSERT INTO table_to_split_1 VALUES(500, 'a');
UPDATE table_to_split_1 SET value='b' WHERE id = 400;
select pg_sleep(5);
SELECT pg_sleep(5);
pg_sleep
---------------------------------------------------------------------
@ -144,11 +144,9 @@ SELECT * FROM table_to_split_3;
SET search_path TO split_shard_replication_setup_schema;
SET client_min_messages TO ERROR;
DROP SUBSCRIPTION sub_worker2;
DELETE FROM slotName_table;
-- drop publication from worker1
\c - - - :worker_1_port
SET search_path TO split_shard_replication_setup_schema;
SET client_min_messages TO ERROR;
DROP SUBSCRIPTION sub_worker1;
DROP PUBLICATION pub1;
DELETE FROM slotName_table;

View File

@ -242,7 +242,7 @@ ORDER BY 1;
function worker_partitioned_table_size(regclass)
function worker_record_sequence_dependency(regclass,regclass,name)
function worker_save_query_explain_analyze(text,jsonb)
function worker_split_shard_replication_setup(bigint[])
function worker_split_shard_replication_setup(citus.split_shard_info[])
schema citus
schema citus_internal
schema columnar
@ -271,6 +271,7 @@ ORDER BY 1;
table pg_dist_transaction
type citus.distribution_type
type citus.shard_transfer_mode
type citus.split_shard_info
type citus_copy_format
type noderole
view citus_dist_stat_activity

View File

@ -67,10 +67,10 @@ CREATE PUBLICATION pub1 FOR TABLE table_first_4, table_first_5, table_first_6;
CREATE PUBLICATION pub2 FOR TABLE table_second_7, table_second_8, table_second_9;
SELECT worker_split_shard_replication_setup(ARRAY[
ROW(4, 5, -2147483648,-1, :worker_2_node)::citus.split_shard_info,
ROW(4, 6, 0 ,2147483647, :worker_2_node)::citus.split_shard_info,
ROW(7, 8, -2147483648,-1, :worker_2_node)::citus.split_shard_info,
ROW(7, 9, 0, 2147483647 , :worker_2_node)::citus.split_shard_info
ROW(4, 5, '-2147483648', '-1', :worker_2_node)::citus.split_shard_info,
ROW(4, 6, '0', '2147483647', :worker_2_node)::citus.split_shard_info,
ROW(7, 8, '-2147483648', '-1', :worker_2_node)::citus.split_shard_info,
ROW(7, 9, '0', '2147483647', :worker_2_node)::citus.split_shard_info
]) AS shared_memory_id \gset
SELECT relowner AS table_owner_one FROM pg_class WHERE relname='table_first' \gset
@ -110,6 +110,7 @@ INSERT INTO table_second_7 VALUES(400, 'a');
SELECT * FROM table_second_7;
SELECT pg_sleep(2);
-- expect data in table_first_5/6
\c - myuser - :worker_2_port
SET search_path TO split_shard_replication_setup_schema;
SELECT * FROM table_first_4;

View File

@ -11,11 +11,6 @@ SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \
CREATE TABLE table_to_split (id bigserial PRIMARY KEY, value char);
SELECT create_distributed_table('table_to_split','id');
-- slotName_table is used to persist replication slot name.
-- It is only used for testing as the worker2 needs to create subscription over the same replication slot.
CREATE TABLE slotName_table (name text, nodeId int, id int primary key);
SELECT create_distributed_table('slotName_table','id');
-- Test scenario one starts from here
-- 1. table_to_split is a citus distributed table
-- 2. Shard table_to_split_1 is located on worker1.
@ -23,11 +18,11 @@ SELECT create_distributed_table('slotName_table','id');
-- table_to_split_2/3 are located on worker2
-- 4. execute UDF split_shard_replication_setup on worker1 with below
-- params:
-- split_shard_replication_setup
-- worker_split_shard_replication_setup
-- (
-- ARRAY[
-- ARRAY[1 /*source shardId */, 2 /* new shardId */,-2147483648 /* minHashValue */, -1 /* maxHasValue */ , 18 /* nodeId where new shard is placed */ ],
-- ARRAY[1, 3 , 0 , 2147483647, 18 ]
-- ROW(1 /*source shardId */, 2 /* new shardId */,-2147483648 /* minHashValue */, -1 /* maxHasValue */ , 18 /* nodeId where new shard is placed */ ),
-- ROW(1, 3 , 0 , 2147483647, 18 )
-- ]
-- );
-- 5. Create Replication slot with 'decoding_plugin_for_shard_split'
@ -52,8 +47,8 @@ CREATE TABLE table_to_split_3(id bigserial PRIMARY KEY, value char);
CREATE PUBLICATION pub1 FOR TABLE table_to_split_1, table_to_split_2, table_to_split_3;
SELECT worker_split_shard_replication_setup(ARRAY[
ROW(1,2,-2147483648,-1, :worker_2_node)::citus.split_shard_info,
ROW(1,3,0,2147483647, :worker_2_node)::citus.split_shard_info
ROW(1, 2, '-2147483648', '-1', :worker_2_node)::citus.split_shard_info,
ROW(1, 3, '0', '2147483647', :worker_2_node)::citus.split_shard_info
]) AS shared_memory_id \gset
SELECT slot_name FROM pg_create_logical_replication_slot(FORMAT('citus_split_%s_10', :worker_2_node), 'decoding_plugin_for_shard_split') \gset
@ -71,7 +66,7 @@ CREATE SUBSCRIPTION sub1
slot_name=:slot_name,
copy_data=false);
select pg_sleep(5);
SELECT pg_sleep(5);
-- No data is present at this moment in all the below tables at worker2
SELECT * FROM table_to_split_1;
@ -130,11 +125,8 @@ SELECT * FROM table_to_split_3;
\c - - - :worker_1_port
SET search_path TO split_shard_replication_setup_schema;
DROP PUBLICATION pub1;
DELETE FROM slotName_table;
\c - - - :worker_2_port
SET search_path TO split_shard_replication_setup_schema;
SET client_min_messages TO ERROR;
DROP SUBSCRIPTION sub1;
DELETE FROM slotName_table;

View File

@ -1,4 +1,4 @@
-- Test scenario three starts from here (parent shard and child shards are located on same machine)
-- Test scenario (parent shard and child shards are located on same machine)
-- 1. table_to_split_1 is split into table_to_split_2 and table_to_split_3.
-- 2. table_to_split_1 is located on worker1.
-- 3. table_to_split_2 and table_to_split_3 are located on worker1
@ -14,9 +14,9 @@ CREATE PUBLICATION pub1 for table table_to_split_1, table_to_split_2, table_to_s
-- Worker1 is target for table_to_split_2 and table_to_split_3
SELECT worker_split_shard_replication_setup(ARRAY[
ROW(1,2,-2147483648,-1, :worker_1_node)::citus.split_shard_info,
ROW(1,3,0,2147483647, :worker_1_node)::citus.split_shard_info
]) AS shared_memory_id \gset
ROW(1, 2, '-2147483648', '-1', :worker_1_node)::citus.split_shard_info,
ROW(1, 3, '0', '2147483647', :worker_1_node)::citus.split_shard_info
]) AS shared_memory_id \gset
SELECT slot_name AS local_slot FROM pg_create_logical_replication_slot(FORMAT('citus_split_%s_10', :worker_1_node), 'decoding_plugin_for_shard_split') \gset
@ -24,7 +24,7 @@ SELECT slot_name AS local_slot FROM pg_create_logical_replication_slot(FORMAT('c
BEGIN;
CREATE SUBSCRIPTION local_subscription
CONNECTION 'host=localhost port=57637 user=postgres dbname=regression'
PUBLICATION pub1
PUBLICATION pub1
WITH (
create_slot=false,
enabled=true,
@ -53,4 +53,3 @@ SELECT * FROM table_to_split_3;
-- clean up
DROP SUBSCRIPTION local_subscription;
DROP PUBLICATION pub1;
DELETE FROM slotName_table;

View File

@ -1,4 +1,4 @@
-- Test scenario two starts from here
-- Test scenario (Parent and one child on same node. Other child on different node)
-- 1. table_to_split_1 is split into table_to_split_2 and table_to_split_3.
-- 2. table_to_split_1 is located on worker1.
-- 3. table_to_split_2 is located on worker1 and table_to_split_3 is located on worker2
@ -12,8 +12,8 @@ SET search_path TO split_shard_replication_setup_schema;
CREATE PUBLICATION pub1 FOR TABLE table_to_split_1, table_to_split_2, table_to_split_3;
SELECT worker_split_shard_replication_setup(ARRAY[
ROW(1,2,-2147483648,-1, :worker_1_node)::citus.split_shard_info,
ROW(1,3,0,2147483647, :worker_2_node)::citus.split_shard_info
ROW(1, 2, '-2147483648', '-1', :worker_1_node)::citus.split_shard_info,
ROW(1, 3, '0', '2147483647', :worker_2_node)::citus.split_shard_info
]) AS shared_memory_id \gset
SELECT slot_name AS slot_for_worker1 FROM pg_create_logical_replication_slot(FORMAT('citus_split_%s_10', :worker_1_node), 'decoding_plugin_for_shard_split') \gset
@ -22,13 +22,13 @@ SELECT slot_name AS slot_for_worker2 FROM pg_create_logical_replication_slot(FOR
-- Create subscription at worker1 with copy_data to 'false' and 'slot_for_worker1'
CREATE SUBSCRIPTION sub_worker1
CONNECTION 'host=localhost port=57637 user=postgres dbname=regression'
PUBLICATION pub1
PUBLICATION pub1
WITH (
create_slot=false,
enabled=true,
slot_name=:slot_for_worker1,
copy_data=false);
select pg_sleep(5);
SELECT pg_sleep(5);
\c - - - :worker_2_port
SET search_path TO split_shard_replication_setup_schema;
@ -36,13 +36,13 @@ SET search_path TO split_shard_replication_setup_schema;
-- Create subscription at worker2 with copy_data to 'false' and 'slot_for_worker2'
CREATE SUBSCRIPTION sub_worker2
CONNECTION 'host=localhost port=57637 user=postgres dbname=regression'
PUBLICATION pub1
PUBLICATION pub1
WITH (
create_slot=false,
enabled=true,
slot_name=:slot_for_worker2,
copy_data=false);
select pg_sleep(5);
SELECT pg_sleep(5);
-- No data is present at this moment in all the below tables at worker2
SELECT * FROM table_to_split_1;
@ -56,7 +56,7 @@ INSERT INTO table_to_split_1 VALUES(100, 'a');
INSERT INTO table_to_split_1 VALUES(400, 'a');
INSERT INTO table_to_split_1 VALUES(500, 'a');
UPDATE table_to_split_1 SET value='b' WHERE id = 400;
select pg_sleep(5);
SELECT pg_sleep(5);
-- expect data to present in table_to_split_2 on worker1 as its destination for value '400'
SELECT * FROM table_to_split_1;
@ -88,7 +88,6 @@ SELECT * FROM table_to_split_3;
SET search_path TO split_shard_replication_setup_schema;
SET client_min_messages TO ERROR;
DROP SUBSCRIPTION sub_worker2;
DELETE FROM slotName_table;
-- drop publication from worker1
\c - - - :worker_1_port
@ -96,4 +95,3 @@ SET search_path TO split_shard_replication_setup_schema;
SET client_min_messages TO ERROR;
DROP SUBSCRIPTION sub_worker1;
DROP PUBLICATION pub1;
DELETE FROM slotName_table;

View File

@ -1,130 +0,0 @@
-- File to create functions and helpers needed for split shard tests
-- Populates shared memory mapping for parent shard with id 1.
-- targetNode1, targetNode2 are the locations where child shard 2 and 3 are placed respectively
CREATE OR REPLACE FUNCTION split_shard_replication_setup_helper(targetNode1 integer, targetNode2 integer) RETURNS text AS $$
DECLARE
memoryId bigint := 0;
memoryIdText text;
begin
SELECT * into memoryId from worker_split_shard_replication_setup (
ARRAY[ROW(1,2,-2147483648,-1, targetNode1)::citus.split_shard_info,
ROW(1,3,0,2147483647, targetNode2)::citus.split_shard_info]);
SELECT FORMAT('%s', memoryId) into memoryIdText;
return memoryIdText;
end
$$ LANGUAGE plpgsql;
-- Create replication slots for targetNode1 and targetNode2 incase of non-colocated shards
CREATE OR REPLACE FUNCTION create_replication_slot(targetNode1 integer, targetNode2 integer) RETURNS text AS $$
DECLARE
targetOneSlotName text;
targetTwoSlotName text;
sharedMemoryId text;
derivedSlotName text;
begin
SELECT * into sharedMemoryId from public.split_shard_replication_setup_helper(targetNode1, targetNode2);
SELECT FORMAT('citus_split_%s_10', targetNode1) into derivedSlotName;
SELECT slot_name into targetOneSlotName from pg_create_logical_replication_slot(derivedSlotName, 'decoding_plugin_for_shard_split');
-- if new child shards are placed on different nodes, create one more replication slot
if (targetNode1 != targetNode2) then
SELECT FORMAT('citus_split_%s_10', targetNode2) into derivedSlotName;
SELECT slot_name into targetTwoSlotName from pg_create_logical_replication_slot(derivedSlotName, 'decoding_plugin_for_shard_split');
INSERT INTO slotName_table values(targetTwoSlotName, targetNode2, 1);
end if;
INSERT INTO slotName_table values(targetOneSlotName, targetNode1, 2);
return targetOneSlotName;
end
$$ LANGUAGE plpgsql;
-- Populates shared memory mapping for colocated parent shards 4 and 7.
-- shard 4 has child shards 5 and 6. Shard 7 has child shards 8 and 9.
CREATE OR REPLACE FUNCTION split_shard_replication_setup_for_colocated_shards(targetNode1 integer, targetNode2 integer) RETURNS text AS $$
DECLARE
memoryId bigint := 0;
memoryIdText text;
begin
SELECT * into memoryId from worker_split_shard_replication_setup(
ARRAY[
ROW(4, 5, -2147483648,-1, targetNode1)::citus.split_shard_info,
ROW(4, 6, 0 ,2147483647, targetNode2)::citus.split_shard_info,
ROW(7, 8, -2147483648,-1, targetNode1)::citus.split_shard_info,
ROW(7, 9, 0, 2147483647 , targetNode2)::citus.split_shard_info
]);
SELECT FORMAT('%s', memoryId) into memoryIdText;
return memoryIdText;
end
$$ LANGUAGE plpgsql;
-- Create replication slots for targetNode1 and targetNode2 incase of colocated shards
CREATE OR REPLACE FUNCTION create_replication_slot_for_colocated_shards(targetNode1 integer, targetNode2 integer) RETURNS text AS $$
DECLARE
targetOneSlotName text;
targetTwoSlotName text;
sharedMemoryId text;
derivedSlotNameOne text;
derivedSlotNameTwo text;
tableOwnerOne bigint;
tableOwnerTwo bigint;
begin
-- setup shared memory information
SELECT * into sharedMemoryId from public.split_shard_replication_setup_for_colocated_shards(targetNode1, targetNode2);
SELECT relowner into tableOwnerOne from pg_class where relname='table_first';
SELECT FORMAT('citus_split_%s_%s', targetNode1, tableOwnerOne) into derivedSlotNameOne;
SELECT slot_name into targetOneSlotName from pg_create_logical_replication_slot(derivedSlotNameOne, 'decoding_plugin_for_shard_split');
SELECT relowner into tableOwnerTwo from pg_class where relname='table_second';
SELECT FORMAT('citus_split_%s_%s', targetNode2, tableOwnerTwo) into derivedSlotNameTwo;
SELECT slot_name into targetTwoSlotName from pg_create_logical_replication_slot(derivedSlotNameTwo, 'decoding_plugin_for_shard_split');
INSERT INTO slotName_table values(targetOneSlotName, targetNode1, 1);
INSERT INTO slotName_table values(targetTwoSlotName, targetNode2, 2);
return targetOneSlotName;
end
$$ LANGUAGE plpgsql;
-- create subscription on target node with given 'subscriptionName'
CREATE OR REPLACE FUNCTION create_subscription(targetNodeId integer, subscriptionName text) RETURNS text AS $$
DECLARE
replicationSlotName text;
nodeportLocal int;
subname text;
begin
SELECT name into replicationSlotName from slotName_table where nodeId = targetNodeId;
EXECUTE FORMAT($sub$create subscription %s connection 'host=localhost port=57637 user=postgres dbname=regression' publication PUB1 with(create_slot=false, enabled=true, slot_name='%s', copy_data=false)$sub$, subscriptionName, replicationSlotName);
return replicationSlotName;
end
$$ LANGUAGE plpgsql;
-- create subscription on target node with given 'subscriptionName'
CREATE OR REPLACE FUNCTION create_subscription_for_owner_one(targetNodeId integer, subscriptionName text) RETURNS text AS $$
DECLARE
replicationSlotName text;
nodeportLocal int;
subname text;
begin
SELECT name into replicationSlotName from slotName_table where id = 1;
EXECUTE FORMAT($sub$create subscription %s connection 'host=localhost port=57637 user=postgres dbname=regression' publication PUB1 with(create_slot=false, enabled=true, slot_name='%s', copy_data=false)$sub$, subscriptionName, replicationSlotName);
return replicationSlotName;
end
$$ LANGUAGE plpgsql;
-- create subscription on target node with given 'subscriptionName'
CREATE OR REPLACE FUNCTION create_subscription_for_owner_two(targetNodeId integer, subscriptionName text) RETURNS text AS $$
DECLARE
replicationSlotName text;
nodeportLocal int;
subname text;
begin
SELECT name into replicationSlotName from slotName_table where id = 2;
EXECUTE FORMAT($sub$create subscription %s connection 'host=localhost port=57637 user=postgres dbname=regression' publication PUB2 with(create_slot=false, enabled=true, slot_name='%s', copy_data=false)$sub$, subscriptionName, replicationSlotName);
return replicationSlotName;
end
$$ LANGUAGE plpgsql;