From a23beeb43f4814ac67453169a192990dd9e49069 Mon Sep 17 00:00:00 2001 From: Sameer Awasekar Date: Mon, 20 Jun 2022 19:46:52 +0530 Subject: [PATCH] Change return type, shard range as text --- .../split_shard_replication_setup.c | 54 +++----- .../shardsplit/shardsplit_decoder.c | 36 +++-- .../shardsplit/shardsplit_shared_memory.c | 1 - .../split_shard_replication_setup/11.0-2.sql | 6 +- .../split_shard_replication_setup/latest.sql | 6 +- .../distributed/utils/citus_safe_lib.c | 47 +++++++ src/include/distributed/citus_safe_lib.h | 1 + .../distributed/shardsplit_shared_memory.h | 4 +- ...plit_shard_replication_colocated_setup.out | 9 +- .../split_shard_replication_setup.out | 23 +--- .../split_shard_replication_setup_local.out | 7 +- ...t_shard_replication_setup_remote_local.out | 14 +- .../expected/upgrade_list_citus_objects.out | 3 +- ...plit_shard_replication_colocated_setup.sql | 9 +- .../sql/split_shard_replication_setup.sql | 20 +-- .../split_shard_replication_setup_local.sql | 11 +- ...t_shard_replication_setup_remote_local.sql | 18 ++- .../regress/sql/split_shard_test_helpers.sql | 130 ------------------ 18 files changed, 144 insertions(+), 255 deletions(-) delete mode 100644 src/test/regress/sql/split_shard_test_helpers.sql diff --git a/src/backend/distributed/operations/split_shard_replication_setup.c b/src/backend/distributed/operations/split_shard_replication_setup.c index da7561db4..1613b4de6 100644 --- a/src/backend/distributed/operations/split_shard_replication_setup.c +++ b/src/backend/distributed/operations/split_shard_replication_setup.c @@ -52,8 +52,7 @@ static ShardSplitInfo * CreateShardSplitInfo(uint64 sourceShardIdToSplit, int32 nodeId); static void AddShardSplitInfoEntryForNodeInMap(ShardSplitInfo *shardSplitInfo); static void PopulateShardSplitInfoInSM(ShardSplitInfoSMHeader *shardSplitInfoSMHeader, - HTAB *shardInfoHashMap, - dsm_handle dsmHandle); + HTAB *shardInfoHashMap); static void SetupHashMapForShardInfo(void); static uint32 NodeShardMappingHash(const void *key, Size keysize); @@ -150,13 +149,12 @@ worker_split_shard_replication_setup(PG_FUNCTION_ARGS) CreateSharedMemoryForShardSplitInfo(shardSplitInfoCount, &dsmHandle); PopulateShardSplitInfoInSM(splitShardInfoSMHeader, - ShardInfoHashMap, - dsmHandle); + ShardInfoHashMap); /* store handle in statically allocated shared memory*/ StoreSharedMemoryHandle(dsmHandle); - return dsmHandle; + PG_RETURN_VOID(); } @@ -220,8 +218,13 @@ CreateShardSplitInfo(uint64 sourceShardIdToSplit, if (!IsCitusTableTypeCacheEntry(cachedTableEntry, HASH_DISTRIBUTED)) { - ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("Cannot Support the feature"))); + Relation distributedRel = RelationIdGetRelation(cachedTableEntry->relationId); + ereport(ERROR, (errmsg( + "Citus does only support Hash distributed tables to be split."), + errdetail("Table '%s' is not Hash distributed", + RelationGetRelationName(distributedRel)) + )); + RelationClose(distributedRel); } Assert(shardIntervalToSplit->minValueExists); @@ -241,8 +244,8 @@ CreateShardSplitInfo(uint64 sourceShardIdToSplit, destSplitChildShardOid == InvalidOid) { ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), - errmsg("Invalid citusTableOid:%u " - "sourceShardToSplitOid: %u," + errmsg("Invalid citusTableOid:%u, " + "sourceShardToSplitOid:%u, " "destSplitChildShardOid:%u ", citusTableOid, sourceShardToSplitOid, @@ -288,7 +291,7 @@ AddShardSplitInfoEntryForNodeInMap(ShardSplitInfo *shardSplitInfo) &found); if (!found) { - nodeMappingEntry->shardSplitInfoList = NULL; + nodeMappingEntry->shardSplitInfoList = NIL; } nodeMappingEntry->shardSplitInfoList = @@ -305,13 +308,10 @@ AddShardSplitInfoEntryForNodeInMap(ShardSplitInfo *shardSplitInfo) * * shardInfoHashMap - Hashmap containing parsed split information * per nodeId wise - * - * dsmHandle - Shared memory segment handle */ static void PopulateShardSplitInfoInSM(ShardSplitInfoSMHeader *shardSplitInfoSMHeader, - HTAB *shardInfoHashMap, - dsm_handle dsmHandle) + HTAB *shardInfoHashMap) { HASH_SEQ_STATUS status; hash_seq_init(&status, shardInfoHashMap); @@ -323,23 +323,15 @@ PopulateShardSplitInfoInSM(ShardSplitInfoSMHeader *shardSplitInfoSMHeader, uint32_t nodeId = entry->key.nodeId; uint32_t tableOwnerId = entry->key.tableOwnerId; char *derivedSlotName = - encode_replication_slot(nodeId, dsmHandle, tableOwnerId); + encode_replication_slot(nodeId, tableOwnerId); List *shardSplitInfoList = entry->shardSplitInfoList; ShardSplitInfo *splitShardInfo = NULL; foreach_ptr(splitShardInfo, shardSplitInfoList) { - ShardSplitInfo *shardInfoInSM = - &shardSplitInfoSMHeader->splitInfoArray[index]; - - shardInfoInSM->distributedTableOid = splitShardInfo->distributedTableOid; - shardInfoInSM->partitionColumnIndex = splitShardInfo->partitionColumnIndex; - shardInfoInSM->sourceShardOid = splitShardInfo->sourceShardOid; - shardInfoInSM->splitChildShardOid = splitShardInfo->splitChildShardOid; - shardInfoInSM->shardMinValue = splitShardInfo->shardMinValue; - shardInfoInSM->shardMaxValue = splitShardInfo->shardMaxValue; - shardInfoInSM->nodeId = splitShardInfo->nodeId; - strcpy_s(shardInfoInSM->slotName, NAMEDATALEN, derivedSlotName); + shardSplitInfoSMHeader->splitInfoArray[index] = *splitShardInfo; + strcpy_s(shardSplitInfoSMHeader->splitInfoArray[index].slotName, NAMEDATALEN, + derivedSlotName); index++; } } @@ -397,7 +389,6 @@ ParseShardSplitInfoFromDatum(Datum shardSplitInfoDatum, { ereport(ERROR, (errmsg("source_shard_id for split_shard_info can't be null"))); } - *sourceShardId = DatumGetUInt64(sourceShardIdDatum); Datum childShardIdDatum = GetAttributeByName(dataTuple, "child_shard_id", &isnull); @@ -405,7 +396,6 @@ ParseShardSplitInfoFromDatum(Datum shardSplitInfoDatum, { ereport(ERROR, (errmsg("child_shard_id for split_shard_info can't be null"))); } - *childShardId = DatumGetUInt64(childShardIdDatum); Datum minValueDatum = GetAttributeByName(dataTuple, "shard_min_value", &isnull); @@ -413,16 +403,16 @@ ParseShardSplitInfoFromDatum(Datum shardSplitInfoDatum, { ereport(ERROR, (errmsg("shard_min_value for split_shard_info can't be null"))); } - - *minValue = DatumGetInt32(minValueDatum); + char *shardMinValueString = text_to_cstring(DatumGetTextP(minValueDatum)); + *minValue = SafeStringToInt32(shardMinValueString); Datum maxValueDatum = GetAttributeByName(dataTuple, "shard_max_value", &isnull); if (isnull) { ereport(ERROR, (errmsg("shard_max_value for split_shard_info can't be null"))); } - - *maxValue = DatumGetInt32(maxValueDatum); + char *shardMaxValueString = text_to_cstring(DatumGetTextP(maxValueDatum)); + *maxValue = SafeStringToInt32(shardMaxValueString); Datum nodeIdDatum = GetAttributeByName(dataTuple, "node_id", &isnull); if (isnull) diff --git a/src/backend/distributed/shardsplit/shardsplit_decoder.c b/src/backend/distributed/shardsplit/shardsplit_decoder.c index 237c73a33..137e48cc0 100644 --- a/src/backend/distributed/shardsplit/shardsplit_decoder.c +++ b/src/backend/distributed/shardsplit/shardsplit_decoder.c @@ -6,7 +6,7 @@ * Copyright (c) 2012-2017, PostgreSQL Global Development Group * * IDENTIFICATION - * src/backend/distributed/shardsplit/pgoutput.c + * src/backend/distributed/shardsplit/shardsplit_decoder.c * *------------------------------------------------------------------------- */ @@ -24,8 +24,9 @@ PG_MODULE_MAGIC; extern void _PG_output_plugin_init(OutputPluginCallbacks *cb); static LogicalDecodeChangeCB pgoutputChangeCB; -static ShardSplitInfoSMHeader *shardSplitInfoSMHeader = NULL; -static ShardSplitInfoForReplicationSlot *shardSplitInfoForSlot = NULL; + +static ShardSplitInfoSMHeader *ShardSplitInfo_SMHeader = NULL; +static ShardSplitInfoForReplicationSlot *ShardSplitInfoForSlot = NULL; /* Plugin callback */ static void split_change_cb(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, @@ -41,6 +42,11 @@ static Oid FindTargetRelationOid(Relation sourceShardRelation, HeapTuple tuple, char *currentSlotName); +/* + * Postgres uses 'pgoutput' as default plugin for logical replication. + * We want to reuse Postgres pgoutput's functionality as much as possible. + * Hence we load all the functions of this plugin and override as required. + */ void _PG_output_plugin_init(OutputPluginCallbacks *cb) { @@ -79,10 +85,10 @@ GetHashValueForIncomingTuple(Relation sourceShardRelation, Oid distributedTableOid = InvalidOid; Oid sourceShardOid = sourceShardRelation->rd_id; - for (int i = shardSplitInfoForSlot->startIndex; i <= shardSplitInfoForSlot->endIndex; + for (int i = ShardSplitInfoForSlot->startIndex; i <= ShardSplitInfoForSlot->endIndex; i++) { - shardSplitInfo = &shardSplitInfoSMHeader->splitInfoArray[i]; + shardSplitInfo = &ShardSplitInfo_SMHeader->splitInfoArray[i]; if (shardSplitInfo->sourceShardOid == sourceShardOid) { distributedTableOid = shardSplitInfo->distributedTableOid; @@ -149,18 +155,18 @@ FindTargetRelationOid(Relation sourceShardRelation, Oid targetRelationOid = InvalidOid; Oid sourceShardRelationOid = sourceShardRelation->rd_id; - bool bShouldHandleUpdate = false; + bool shouldHandleUpdate = false; int hashValue = GetHashValueForIncomingTuple(sourceShardRelation, tuple, - &bShouldHandleUpdate); - if (bShouldHandleUpdate == false) + &shouldHandleUpdate); + if (shouldHandleUpdate == false) { return InvalidOid; } - for (int i = shardSplitInfoForSlot->startIndex; i <= shardSplitInfoForSlot->endIndex; + for (int i = ShardSplitInfoForSlot->startIndex; i <= ShardSplitInfoForSlot->endIndex; i++) { - ShardSplitInfo *shardSplitInfo = &shardSplitInfoSMHeader->splitInfoArray[i]; + ShardSplitInfo *shardSplitInfo = &ShardSplitInfo_SMHeader->splitInfoArray[i]; /* * Each commit message is processed by all the configured replication slots. @@ -193,11 +199,11 @@ bool IsCommitRecursive(Relation sourceShardRelation) { Oid sourceShardOid = sourceShardRelation->rd_id; - for (int i = shardSplitInfoForSlot->startIndex; i <= shardSplitInfoForSlot->endIndex; + for (int i = ShardSplitInfoForSlot->startIndex; i <= ShardSplitInfoForSlot->endIndex; i++) { /* skip the commit when destination is equal to the source */ - ShardSplitInfo *shardSplitInfo = &shardSplitInfoSMHeader->splitInfoArray[i]; + ShardSplitInfo *shardSplitInfo = &ShardSplitInfo_SMHeader->splitInfoArray[i]; if (sourceShardOid == shardSplitInfo->splitChildShardOid) { return true; @@ -225,11 +231,11 @@ split_change_cb(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, * Get ShardSplitInfoForSlot if not already initialized. * This gets initialized during the replication of first message. */ - if (shardSplitInfoForSlot == NULL) + if (ShardSplitInfoForSlot == NULL) { - shardSplitInfoForSlot = PopulateShardSplitInfoForReplicationSlot( + ShardSplitInfoForSlot = PopulateShardSplitInfoForReplicationSlot( ctx->slot->data.name.data); - shardSplitInfoSMHeader = shardSplitInfoForSlot->shardSplitInfoHeader; + ShardSplitInfo_SMHeader = ShardSplitInfoForSlot->shardSplitInfoHeader; } if (IsCommitRecursive(relation)) diff --git a/src/backend/distributed/shardsplit/shardsplit_shared_memory.c b/src/backend/distributed/shardsplit/shardsplit_shared_memory.c index 7daf24c55..87168781e 100644 --- a/src/backend/distributed/shardsplit/shardsplit_shared_memory.c +++ b/src/backend/distributed/shardsplit/shardsplit_shared_memory.c @@ -170,7 +170,6 @@ CreateSharedMemoryForShardSplitInfo(int shardSplitInfoCount, dsm_handle *dsmHand */ char * encode_replication_slot(uint32_t nodeId, - dsm_handle dsmHandle, uint32_t tableOwnerId) { StringInfo slotName = makeStringInfo(); diff --git a/src/backend/distributed/sql/udfs/split_shard_replication_setup/11.0-2.sql b/src/backend/distributed/sql/udfs/split_shard_replication_setup/11.0-2.sql index 3131668cb..94f625308 100644 --- a/src/backend/distributed/sql/udfs/split_shard_replication_setup/11.0-2.sql +++ b/src/backend/distributed/sql/udfs/split_shard_replication_setup/11.0-2.sql @@ -5,14 +5,14 @@ DROP FUNCTION IF EXISTS pg_catalog.worker_split_shard_replication_setup; CREATE TYPE citus.split_shard_info AS ( source_shard_id bigint, child_shard_id bigint, - shard_min_value integer, - shard_max_value integer, + shard_min_value text, + shard_max_value text, node_id integer); CREATE OR REPLACE FUNCTION pg_catalog.worker_split_shard_replication_setup( splitShardInfo citus.split_shard_info[]) -RETURNS bigint +RETURNS void LANGUAGE C STRICT AS 'MODULE_PATHNAME', $$worker_split_shard_replication_setup$$; COMMENT ON FUNCTION pg_catalog.worker_split_shard_replication_setup(splitShardInfo citus.split_shard_info[]) diff --git a/src/backend/distributed/sql/udfs/split_shard_replication_setup/latest.sql b/src/backend/distributed/sql/udfs/split_shard_replication_setup/latest.sql index 3131668cb..94f625308 100644 --- a/src/backend/distributed/sql/udfs/split_shard_replication_setup/latest.sql +++ b/src/backend/distributed/sql/udfs/split_shard_replication_setup/latest.sql @@ -5,14 +5,14 @@ DROP FUNCTION IF EXISTS pg_catalog.worker_split_shard_replication_setup; CREATE TYPE citus.split_shard_info AS ( source_shard_id bigint, child_shard_id bigint, - shard_min_value integer, - shard_max_value integer, + shard_min_value text, + shard_max_value text, node_id integer); CREATE OR REPLACE FUNCTION pg_catalog.worker_split_shard_replication_setup( splitShardInfo citus.split_shard_info[]) -RETURNS bigint +RETURNS void LANGUAGE C STRICT AS 'MODULE_PATHNAME', $$worker_split_shard_replication_setup$$; COMMENT ON FUNCTION pg_catalog.worker_split_shard_replication_setup(splitShardInfo citus.split_shard_info[]) diff --git a/src/backend/distributed/utils/citus_safe_lib.c b/src/backend/distributed/utils/citus_safe_lib.c index 466c598be..7e53bfe3f 100644 --- a/src/backend/distributed/utils/citus_safe_lib.c +++ b/src/backend/distributed/utils/citus_safe_lib.c @@ -295,3 +295,50 @@ SafeSnprintf(char *restrict buffer, rsize_t bufsz, const char *restrict format, va_end(args); return result; } + + +/* + * SafeStringToInt32 converts a string containing a number to a int32. When it + * fails it calls ereport. + * + * The different error cases are inspired by + * https://stackoverflow.com/a/26083517/2570866 + */ +int32 +SafeStringToInt32(const char *str) +{ + char *endptr; + errno = 0; + long number = strtol(str, &endptr, 10); + + if (str == endptr) + { + ereport(ERROR, (errmsg("Error parsing %s as int32, no digits found\n", str))); + } + else if ((errno == ERANGE && number == LONG_MIN) || number < INT32_MIN) + { + ereport(ERROR, (errmsg("Error parsing %s as int32, underflow occurred\n", str))); + } + else if ((errno == ERANGE && number == LONG_MAX) || number > INT32_MAX) + { + ereport(ERROR, (errmsg("Error parsing %s as int32, overflow occurred\n", str))); + } + else if (errno == EINVAL) + { + ereport(ERROR, (errmsg( + "Error parsing %s as int32, base contains unsupported value\n", + str))); + } + else if (errno != 0 && number == 0) + { + int err = errno; + ereport(ERROR, (errmsg("Error parsing %s as int32, errno %d\n", str, err))); + } + else if (errno == 0 && str && *endptr != '\0') + { + ereport(ERROR, (errmsg( + "Error parsing %s as int32, aditional characters remain after int32\n", + str))); + } + return number; +} diff --git a/src/include/distributed/citus_safe_lib.h b/src/include/distributed/citus_safe_lib.h index c934e931b..e039072a7 100644 --- a/src/include/distributed/citus_safe_lib.h +++ b/src/include/distributed/citus_safe_lib.h @@ -19,6 +19,7 @@ extern void ereport_constraint_handler(const char *message, void *pointer, errno_t error); extern int64 SafeStringToInt64(const char *str); +extern int32 SafeStringToInt32(const char *str); extern uint64 SafeStringToUint64(const char *str); extern void SafeQsort(void *ptr, rsize_t count, rsize_t size, int (*comp)(const void *, const void *)); diff --git a/src/include/distributed/shardsplit_shared_memory.h b/src/include/distributed/shardsplit_shared_memory.h index 9a193438b..1fcd4a840 100644 --- a/src/include/distributed/shardsplit_shared_memory.h +++ b/src/include/distributed/shardsplit_shared_memory.h @@ -66,7 +66,5 @@ extern ShardSplitInfoForReplicationSlot * PopulateShardSplitInfoForReplicationSl char *slotName); -char * encode_replication_slot(uint32_t nodeId, - dsm_handle dsmHandle, - uint32_t tableOwnerId); +char * encode_replication_slot(uint32_t nodeId, uint32_t tableOwnerId); #endif /* SHARDSPLIT_SHARED_MEMORY_H */ diff --git a/src/test/regress/expected/split_shard_replication_colocated_setup.out b/src/test/regress/expected/split_shard_replication_colocated_setup.out index 4b1a3281d..173628896 100644 --- a/src/test/regress/expected/split_shard_replication_colocated_setup.out +++ b/src/test/regress/expected/split_shard_replication_colocated_setup.out @@ -65,10 +65,10 @@ SET search_path TO split_shard_replication_setup_schema; CREATE PUBLICATION pub1 FOR TABLE table_first_4, table_first_5, table_first_6; CREATE PUBLICATION pub2 FOR TABLE table_second_7, table_second_8, table_second_9; SELECT worker_split_shard_replication_setup(ARRAY[ - ROW(4, 5, -2147483648,-1, :worker_2_node)::citus.split_shard_info, - ROW(4, 6, 0 ,2147483647, :worker_2_node)::citus.split_shard_info, - ROW(7, 8, -2147483648,-1, :worker_2_node)::citus.split_shard_info, - ROW(7, 9, 0, 2147483647 , :worker_2_node)::citus.split_shard_info + ROW(4, 5, '-2147483648', '-1', :worker_2_node)::citus.split_shard_info, + ROW(4, 6, '0', '2147483647', :worker_2_node)::citus.split_shard_info, + ROW(7, 8, '-2147483648', '-1', :worker_2_node)::citus.split_shard_info, + ROW(7, 9, '0', '2147483647', :worker_2_node)::citus.split_shard_info ]) AS shared_memory_id \gset WARNING: As a part of split shard workflow,unexpectedly found a valid shared memory handle while storing a new one. SELECT relowner AS table_owner_one FROM pg_class WHERE relname='table_first' \gset @@ -127,6 +127,7 @@ SELECT pg_sleep(2); (1 row) +-- expect data in table_first_5/6 \c - myuser - :worker_2_port SET search_path TO split_shard_replication_setup_schema; SELECT * FROM table_first_4; diff --git a/src/test/regress/expected/split_shard_replication_setup.out b/src/test/regress/expected/split_shard_replication_setup.out index eb0735c4c..7f34da9cb 100644 --- a/src/test/regress/expected/split_shard_replication_setup.out +++ b/src/test/regress/expected/split_shard_replication_setup.out @@ -13,15 +13,6 @@ SELECT create_distributed_table('table_to_split','id'); (1 row) --- slotName_table is used to persist replication slot name. --- It is only used for testing as the worker2 needs to create subscription over the same replication slot. -CREATE TABLE slotName_table (name text, nodeId int, id int primary key); -SELECT create_distributed_table('slotName_table','id'); - create_distributed_table ---------------------------------------------------------------------- - -(1 row) - -- Test scenario one starts from here -- 1. table_to_split is a citus distributed table -- 2. Shard table_to_split_1 is located on worker1. @@ -29,11 +20,11 @@ SELECT create_distributed_table('slotName_table','id'); -- table_to_split_2/3 are located on worker2 -- 4. execute UDF split_shard_replication_setup on worker1 with below -- params: --- split_shard_replication_setup +-- worker_split_shard_replication_setup -- ( -- ARRAY[ --- ARRAY[1 /*source shardId */, 2 /* new shardId */,-2147483648 /* minHashValue */, -1 /* maxHasValue */ , 18 /* nodeId where new shard is placed */ ], --- ARRAY[1, 3 , 0 , 2147483647, 18 ] +-- ROW(1 /*source shardId */, 2 /* new shardId */,-2147483648 /* minHashValue */, -1 /* maxHasValue */ , 18 /* nodeId where new shard is placed */ ), +-- ROW(1, 3 , 0 , 2147483647, 18 ) -- ] -- ); -- 5. Create Replication slot with 'decoding_plugin_for_shard_split' @@ -54,8 +45,8 @@ CREATE TABLE table_to_split_3(id bigserial PRIMARY KEY, value char); -- Create publication at worker1 CREATE PUBLICATION pub1 FOR TABLE table_to_split_1, table_to_split_2, table_to_split_3; SELECT worker_split_shard_replication_setup(ARRAY[ - ROW(1,2,-2147483648,-1, :worker_2_node)::citus.split_shard_info, - ROW(1,3,0,2147483647, :worker_2_node)::citus.split_shard_info + ROW(1, 2, '-2147483648', '-1', :worker_2_node)::citus.split_shard_info, + ROW(1, 3, '0', '2147483647', :worker_2_node)::citus.split_shard_info ]) AS shared_memory_id \gset SELECT slot_name FROM pg_create_logical_replication_slot(FORMAT('citus_split_%s_10', :worker_2_node), 'decoding_plugin_for_shard_split') \gset -- Create subscription at worker2 with copy_data to 'false' and derived replication slot name @@ -69,7 +60,7 @@ CREATE SUBSCRIPTION sub1 enabled=true, slot_name=:slot_name, copy_data=false); -select pg_sleep(5); +SELECT pg_sleep(5); pg_sleep --------------------------------------------------------------------- @@ -206,9 +197,7 @@ SELECT * FROM table_to_split_3; \c - - - :worker_1_port SET search_path TO split_shard_replication_setup_schema; DROP PUBLICATION pub1; -DELETE FROM slotName_table; \c - - - :worker_2_port SET search_path TO split_shard_replication_setup_schema; SET client_min_messages TO ERROR; DROP SUBSCRIPTION sub1; -DELETE FROM slotName_table; diff --git a/src/test/regress/expected/split_shard_replication_setup_local.out b/src/test/regress/expected/split_shard_replication_setup_local.out index ca827d5bd..ea8e1fcd8 100644 --- a/src/test/regress/expected/split_shard_replication_setup_local.out +++ b/src/test/regress/expected/split_shard_replication_setup_local.out @@ -1,4 +1,4 @@ --- Test scenario three starts from here (parent shard and child shards are located on same machine) +-- Test scenario (parent shard and child shards are located on same machine) -- 1. table_to_split_1 is split into table_to_split_2 and table_to_split_3. -- 2. table_to_split_1 is located on worker1. -- 3. table_to_split_2 and table_to_split_3 are located on worker1 @@ -11,8 +11,8 @@ SET client_min_messages TO ERROR; CREATE PUBLICATION pub1 for table table_to_split_1, table_to_split_2, table_to_split_3; -- Worker1 is target for table_to_split_2 and table_to_split_3 SELECT worker_split_shard_replication_setup(ARRAY[ - ROW(1,2,-2147483648,-1, :worker_1_node)::citus.split_shard_info, - ROW(1,3,0,2147483647, :worker_1_node)::citus.split_shard_info + ROW(1, 2, '-2147483648', '-1', :worker_1_node)::citus.split_shard_info, + ROW(1, 3, '0', '2147483647', :worker_1_node)::citus.split_shard_info ]) AS shared_memory_id \gset SELECT slot_name AS local_slot FROM pg_create_logical_replication_slot(FORMAT('citus_split_%s_10', :worker_1_node), 'decoding_plugin_for_shard_split') \gset -- Create subscription at worker1 with copy_data to 'false' a @@ -88,4 +88,3 @@ SELECT * FROM table_to_split_3; -- clean up DROP SUBSCRIPTION local_subscription; DROP PUBLICATION pub1; -DELETE FROM slotName_table; diff --git a/src/test/regress/expected/split_shard_replication_setup_remote_local.out b/src/test/regress/expected/split_shard_replication_setup_remote_local.out index 6a1e7d112..c08efc85d 100644 --- a/src/test/regress/expected/split_shard_replication_setup_remote_local.out +++ b/src/test/regress/expected/split_shard_replication_setup_remote_local.out @@ -1,4 +1,4 @@ --- Test scenario two starts from here +-- Test scenario (Parent and one child on same node. Other child on different node) -- 1. table_to_split_1 is split into table_to_split_2 and table_to_split_3. -- 2. table_to_split_1 is located on worker1. -- 3. table_to_split_2 is located on worker1 and table_to_split_3 is located on worker2 @@ -9,8 +9,8 @@ SET search_path TO split_shard_replication_setup_schema; -- Create publication at worker1 CREATE PUBLICATION pub1 FOR TABLE table_to_split_1, table_to_split_2, table_to_split_3; SELECT worker_split_shard_replication_setup(ARRAY[ - ROW(1,2,-2147483648,-1, :worker_1_node)::citus.split_shard_info, - ROW(1,3,0,2147483647, :worker_2_node)::citus.split_shard_info + ROW(1, 2, '-2147483648', '-1', :worker_1_node)::citus.split_shard_info, + ROW(1, 3, '0', '2147483647', :worker_2_node)::citus.split_shard_info ]) AS shared_memory_id \gset WARNING: As a part of split shard workflow,unexpectedly found a valid shared memory handle while storing a new one. SELECT slot_name AS slot_for_worker1 FROM pg_create_logical_replication_slot(FORMAT('citus_split_%s_10', :worker_1_node), 'decoding_plugin_for_shard_split') \gset @@ -24,7 +24,7 @@ CREATE SUBSCRIPTION sub_worker1 enabled=true, slot_name=:slot_for_worker1, copy_data=false); -select pg_sleep(5); +SELECT pg_sleep(5); pg_sleep --------------------------------------------------------------------- @@ -41,7 +41,7 @@ CREATE SUBSCRIPTION sub_worker2 enabled=true, slot_name=:slot_for_worker2, copy_data=false); -select pg_sleep(5); +SELECT pg_sleep(5); pg_sleep --------------------------------------------------------------------- @@ -70,7 +70,7 @@ INSERT INTO table_to_split_1 VALUES(100, 'a'); INSERT INTO table_to_split_1 VALUES(400, 'a'); INSERT INTO table_to_split_1 VALUES(500, 'a'); UPDATE table_to_split_1 SET value='b' WHERE id = 400; -select pg_sleep(5); +SELECT pg_sleep(5); pg_sleep --------------------------------------------------------------------- @@ -144,11 +144,9 @@ SELECT * FROM table_to_split_3; SET search_path TO split_shard_replication_setup_schema; SET client_min_messages TO ERROR; DROP SUBSCRIPTION sub_worker2; -DELETE FROM slotName_table; -- drop publication from worker1 \c - - - :worker_1_port SET search_path TO split_shard_replication_setup_schema; SET client_min_messages TO ERROR; DROP SUBSCRIPTION sub_worker1; DROP PUBLICATION pub1; -DELETE FROM slotName_table; diff --git a/src/test/regress/expected/upgrade_list_citus_objects.out b/src/test/regress/expected/upgrade_list_citus_objects.out index f526353e0..ba9e74dda 100644 --- a/src/test/regress/expected/upgrade_list_citus_objects.out +++ b/src/test/regress/expected/upgrade_list_citus_objects.out @@ -242,7 +242,7 @@ ORDER BY 1; function worker_partitioned_table_size(regclass) function worker_record_sequence_dependency(regclass,regclass,name) function worker_save_query_explain_analyze(text,jsonb) - function worker_split_shard_replication_setup(bigint[]) + function worker_split_shard_replication_setup(citus.split_shard_info[]) schema citus schema citus_internal schema columnar @@ -271,6 +271,7 @@ ORDER BY 1; table pg_dist_transaction type citus.distribution_type type citus.shard_transfer_mode + type citus.split_shard_info type citus_copy_format type noderole view citus_dist_stat_activity diff --git a/src/test/regress/sql/split_shard_replication_colocated_setup.sql b/src/test/regress/sql/split_shard_replication_colocated_setup.sql index 25f6db1ae..6b5111e08 100644 --- a/src/test/regress/sql/split_shard_replication_colocated_setup.sql +++ b/src/test/regress/sql/split_shard_replication_colocated_setup.sql @@ -67,10 +67,10 @@ CREATE PUBLICATION pub1 FOR TABLE table_first_4, table_first_5, table_first_6; CREATE PUBLICATION pub2 FOR TABLE table_second_7, table_second_8, table_second_9; SELECT worker_split_shard_replication_setup(ARRAY[ - ROW(4, 5, -2147483648,-1, :worker_2_node)::citus.split_shard_info, - ROW(4, 6, 0 ,2147483647, :worker_2_node)::citus.split_shard_info, - ROW(7, 8, -2147483648,-1, :worker_2_node)::citus.split_shard_info, - ROW(7, 9, 0, 2147483647 , :worker_2_node)::citus.split_shard_info + ROW(4, 5, '-2147483648', '-1', :worker_2_node)::citus.split_shard_info, + ROW(4, 6, '0', '2147483647', :worker_2_node)::citus.split_shard_info, + ROW(7, 8, '-2147483648', '-1', :worker_2_node)::citus.split_shard_info, + ROW(7, 9, '0', '2147483647', :worker_2_node)::citus.split_shard_info ]) AS shared_memory_id \gset SELECT relowner AS table_owner_one FROM pg_class WHERE relname='table_first' \gset @@ -110,6 +110,7 @@ INSERT INTO table_second_7 VALUES(400, 'a'); SELECT * FROM table_second_7; SELECT pg_sleep(2); +-- expect data in table_first_5/6 \c - myuser - :worker_2_port SET search_path TO split_shard_replication_setup_schema; SELECT * FROM table_first_4; diff --git a/src/test/regress/sql/split_shard_replication_setup.sql b/src/test/regress/sql/split_shard_replication_setup.sql index 13fe8c8e7..35b33271c 100644 --- a/src/test/regress/sql/split_shard_replication_setup.sql +++ b/src/test/regress/sql/split_shard_replication_setup.sql @@ -11,11 +11,6 @@ SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \ CREATE TABLE table_to_split (id bigserial PRIMARY KEY, value char); SELECT create_distributed_table('table_to_split','id'); --- slotName_table is used to persist replication slot name. --- It is only used for testing as the worker2 needs to create subscription over the same replication slot. -CREATE TABLE slotName_table (name text, nodeId int, id int primary key); -SELECT create_distributed_table('slotName_table','id'); - -- Test scenario one starts from here -- 1. table_to_split is a citus distributed table -- 2. Shard table_to_split_1 is located on worker1. @@ -23,11 +18,11 @@ SELECT create_distributed_table('slotName_table','id'); -- table_to_split_2/3 are located on worker2 -- 4. execute UDF split_shard_replication_setup on worker1 with below -- params: --- split_shard_replication_setup +-- worker_split_shard_replication_setup -- ( -- ARRAY[ --- ARRAY[1 /*source shardId */, 2 /* new shardId */,-2147483648 /* minHashValue */, -1 /* maxHasValue */ , 18 /* nodeId where new shard is placed */ ], --- ARRAY[1, 3 , 0 , 2147483647, 18 ] +-- ROW(1 /*source shardId */, 2 /* new shardId */,-2147483648 /* minHashValue */, -1 /* maxHasValue */ , 18 /* nodeId where new shard is placed */ ), +-- ROW(1, 3 , 0 , 2147483647, 18 ) -- ] -- ); -- 5. Create Replication slot with 'decoding_plugin_for_shard_split' @@ -52,8 +47,8 @@ CREATE TABLE table_to_split_3(id bigserial PRIMARY KEY, value char); CREATE PUBLICATION pub1 FOR TABLE table_to_split_1, table_to_split_2, table_to_split_3; SELECT worker_split_shard_replication_setup(ARRAY[ - ROW(1,2,-2147483648,-1, :worker_2_node)::citus.split_shard_info, - ROW(1,3,0,2147483647, :worker_2_node)::citus.split_shard_info + ROW(1, 2, '-2147483648', '-1', :worker_2_node)::citus.split_shard_info, + ROW(1, 3, '0', '2147483647', :worker_2_node)::citus.split_shard_info ]) AS shared_memory_id \gset SELECT slot_name FROM pg_create_logical_replication_slot(FORMAT('citus_split_%s_10', :worker_2_node), 'decoding_plugin_for_shard_split') \gset @@ -71,7 +66,7 @@ CREATE SUBSCRIPTION sub1 slot_name=:slot_name, copy_data=false); -select pg_sleep(5); +SELECT pg_sleep(5); -- No data is present at this moment in all the below tables at worker2 SELECT * FROM table_to_split_1; @@ -130,11 +125,8 @@ SELECT * FROM table_to_split_3; \c - - - :worker_1_port SET search_path TO split_shard_replication_setup_schema; DROP PUBLICATION pub1; -DELETE FROM slotName_table; \c - - - :worker_2_port SET search_path TO split_shard_replication_setup_schema; SET client_min_messages TO ERROR; DROP SUBSCRIPTION sub1; -DELETE FROM slotName_table; - diff --git a/src/test/regress/sql/split_shard_replication_setup_local.sql b/src/test/regress/sql/split_shard_replication_setup_local.sql index ffb6f8acf..1a269dc52 100644 --- a/src/test/regress/sql/split_shard_replication_setup_local.sql +++ b/src/test/regress/sql/split_shard_replication_setup_local.sql @@ -1,4 +1,4 @@ --- Test scenario three starts from here (parent shard and child shards are located on same machine) +-- Test scenario (parent shard and child shards are located on same machine) -- 1. table_to_split_1 is split into table_to_split_2 and table_to_split_3. -- 2. table_to_split_1 is located on worker1. -- 3. table_to_split_2 and table_to_split_3 are located on worker1 @@ -14,9 +14,9 @@ CREATE PUBLICATION pub1 for table table_to_split_1, table_to_split_2, table_to_s -- Worker1 is target for table_to_split_2 and table_to_split_3 SELECT worker_split_shard_replication_setup(ARRAY[ - ROW(1,2,-2147483648,-1, :worker_1_node)::citus.split_shard_info, - ROW(1,3,0,2147483647, :worker_1_node)::citus.split_shard_info - ]) AS shared_memory_id \gset + ROW(1, 2, '-2147483648', '-1', :worker_1_node)::citus.split_shard_info, + ROW(1, 3, '0', '2147483647', :worker_1_node)::citus.split_shard_info + ]) AS shared_memory_id \gset SELECT slot_name AS local_slot FROM pg_create_logical_replication_slot(FORMAT('citus_split_%s_10', :worker_1_node), 'decoding_plugin_for_shard_split') \gset @@ -24,7 +24,7 @@ SELECT slot_name AS local_slot FROM pg_create_logical_replication_slot(FORMAT('c BEGIN; CREATE SUBSCRIPTION local_subscription CONNECTION 'host=localhost port=57637 user=postgres dbname=regression' - PUBLICATION pub1 + PUBLICATION pub1 WITH ( create_slot=false, enabled=true, @@ -53,4 +53,3 @@ SELECT * FROM table_to_split_3; -- clean up DROP SUBSCRIPTION local_subscription; DROP PUBLICATION pub1; -DELETE FROM slotName_table; diff --git a/src/test/regress/sql/split_shard_replication_setup_remote_local.sql b/src/test/regress/sql/split_shard_replication_setup_remote_local.sql index 1baa5fcb3..cceb972a7 100644 --- a/src/test/regress/sql/split_shard_replication_setup_remote_local.sql +++ b/src/test/regress/sql/split_shard_replication_setup_remote_local.sql @@ -1,4 +1,4 @@ --- Test scenario two starts from here +-- Test scenario (Parent and one child on same node. Other child on different node) -- 1. table_to_split_1 is split into table_to_split_2 and table_to_split_3. -- 2. table_to_split_1 is located on worker1. -- 3. table_to_split_2 is located on worker1 and table_to_split_3 is located on worker2 @@ -12,8 +12,8 @@ SET search_path TO split_shard_replication_setup_schema; CREATE PUBLICATION pub1 FOR TABLE table_to_split_1, table_to_split_2, table_to_split_3; SELECT worker_split_shard_replication_setup(ARRAY[ - ROW(1,2,-2147483648,-1, :worker_1_node)::citus.split_shard_info, - ROW(1,3,0,2147483647, :worker_2_node)::citus.split_shard_info + ROW(1, 2, '-2147483648', '-1', :worker_1_node)::citus.split_shard_info, + ROW(1, 3, '0', '2147483647', :worker_2_node)::citus.split_shard_info ]) AS shared_memory_id \gset SELECT slot_name AS slot_for_worker1 FROM pg_create_logical_replication_slot(FORMAT('citus_split_%s_10', :worker_1_node), 'decoding_plugin_for_shard_split') \gset @@ -22,13 +22,13 @@ SELECT slot_name AS slot_for_worker2 FROM pg_create_logical_replication_slot(FOR -- Create subscription at worker1 with copy_data to 'false' and 'slot_for_worker1' CREATE SUBSCRIPTION sub_worker1 CONNECTION 'host=localhost port=57637 user=postgres dbname=regression' - PUBLICATION pub1 + PUBLICATION pub1 WITH ( create_slot=false, enabled=true, slot_name=:slot_for_worker1, copy_data=false); -select pg_sleep(5); +SELECT pg_sleep(5); \c - - - :worker_2_port SET search_path TO split_shard_replication_setup_schema; @@ -36,13 +36,13 @@ SET search_path TO split_shard_replication_setup_schema; -- Create subscription at worker2 with copy_data to 'false' and 'slot_for_worker2' CREATE SUBSCRIPTION sub_worker2 CONNECTION 'host=localhost port=57637 user=postgres dbname=regression' - PUBLICATION pub1 + PUBLICATION pub1 WITH ( create_slot=false, enabled=true, slot_name=:slot_for_worker2, copy_data=false); -select pg_sleep(5); +SELECT pg_sleep(5); -- No data is present at this moment in all the below tables at worker2 SELECT * FROM table_to_split_1; @@ -56,7 +56,7 @@ INSERT INTO table_to_split_1 VALUES(100, 'a'); INSERT INTO table_to_split_1 VALUES(400, 'a'); INSERT INTO table_to_split_1 VALUES(500, 'a'); UPDATE table_to_split_1 SET value='b' WHERE id = 400; -select pg_sleep(5); +SELECT pg_sleep(5); -- expect data to present in table_to_split_2 on worker1 as its destination for value '400' SELECT * FROM table_to_split_1; @@ -88,7 +88,6 @@ SELECT * FROM table_to_split_3; SET search_path TO split_shard_replication_setup_schema; SET client_min_messages TO ERROR; DROP SUBSCRIPTION sub_worker2; -DELETE FROM slotName_table; -- drop publication from worker1 \c - - - :worker_1_port @@ -96,4 +95,3 @@ SET search_path TO split_shard_replication_setup_schema; SET client_min_messages TO ERROR; DROP SUBSCRIPTION sub_worker1; DROP PUBLICATION pub1; -DELETE FROM slotName_table; \ No newline at end of file diff --git a/src/test/regress/sql/split_shard_test_helpers.sql b/src/test/regress/sql/split_shard_test_helpers.sql deleted file mode 100644 index c195049ff..000000000 --- a/src/test/regress/sql/split_shard_test_helpers.sql +++ /dev/null @@ -1,130 +0,0 @@ --- File to create functions and helpers needed for split shard tests - --- Populates shared memory mapping for parent shard with id 1. --- targetNode1, targetNode2 are the locations where child shard 2 and 3 are placed respectively -CREATE OR REPLACE FUNCTION split_shard_replication_setup_helper(targetNode1 integer, targetNode2 integer) RETURNS text AS $$ -DECLARE - memoryId bigint := 0; - memoryIdText text; -begin - SELECT * into memoryId from worker_split_shard_replication_setup ( - ARRAY[ROW(1,2,-2147483648,-1, targetNode1)::citus.split_shard_info, - ROW(1,3,0,2147483647, targetNode2)::citus.split_shard_info]); - SELECT FORMAT('%s', memoryId) into memoryIdText; - return memoryIdText; -end -$$ LANGUAGE plpgsql; - --- Create replication slots for targetNode1 and targetNode2 incase of non-colocated shards -CREATE OR REPLACE FUNCTION create_replication_slot(targetNode1 integer, targetNode2 integer) RETURNS text AS $$ -DECLARE - targetOneSlotName text; - targetTwoSlotName text; - sharedMemoryId text; - derivedSlotName text; -begin - - SELECT * into sharedMemoryId from public.split_shard_replication_setup_helper(targetNode1, targetNode2); - SELECT FORMAT('citus_split_%s_10', targetNode1) into derivedSlotName; - SELECT slot_name into targetOneSlotName from pg_create_logical_replication_slot(derivedSlotName, 'decoding_plugin_for_shard_split'); - - -- if new child shards are placed on different nodes, create one more replication slot - if (targetNode1 != targetNode2) then - SELECT FORMAT('citus_split_%s_10', targetNode2) into derivedSlotName; - SELECT slot_name into targetTwoSlotName from pg_create_logical_replication_slot(derivedSlotName, 'decoding_plugin_for_shard_split'); - INSERT INTO slotName_table values(targetTwoSlotName, targetNode2, 1); - end if; - - INSERT INTO slotName_table values(targetOneSlotName, targetNode1, 2); - return targetOneSlotName; -end -$$ LANGUAGE plpgsql; - --- Populates shared memory mapping for colocated parent shards 4 and 7. --- shard 4 has child shards 5 and 6. Shard 7 has child shards 8 and 9. -CREATE OR REPLACE FUNCTION split_shard_replication_setup_for_colocated_shards(targetNode1 integer, targetNode2 integer) RETURNS text AS $$ -DECLARE - memoryId bigint := 0; - memoryIdText text; -begin - SELECT * into memoryId from worker_split_shard_replication_setup( - ARRAY[ - ROW(4, 5, -2147483648,-1, targetNode1)::citus.split_shard_info, - ROW(4, 6, 0 ,2147483647, targetNode2)::citus.split_shard_info, - ROW(7, 8, -2147483648,-1, targetNode1)::citus.split_shard_info, - ROW(7, 9, 0, 2147483647 , targetNode2)::citus.split_shard_info - ]); - - SELECT FORMAT('%s', memoryId) into memoryIdText; - return memoryIdText; -end -$$ LANGUAGE plpgsql; - --- Create replication slots for targetNode1 and targetNode2 incase of colocated shards -CREATE OR REPLACE FUNCTION create_replication_slot_for_colocated_shards(targetNode1 integer, targetNode2 integer) RETURNS text AS $$ -DECLARE - targetOneSlotName text; - targetTwoSlotName text; - sharedMemoryId text; - derivedSlotNameOne text; - derivedSlotNameTwo text; - tableOwnerOne bigint; - tableOwnerTwo bigint; -begin - -- setup shared memory information - SELECT * into sharedMemoryId from public.split_shard_replication_setup_for_colocated_shards(targetNode1, targetNode2); - - SELECT relowner into tableOwnerOne from pg_class where relname='table_first'; - SELECT FORMAT('citus_split_%s_%s', targetNode1, tableOwnerOne) into derivedSlotNameOne; - SELECT slot_name into targetOneSlotName from pg_create_logical_replication_slot(derivedSlotNameOne, 'decoding_plugin_for_shard_split'); - - SELECT relowner into tableOwnerTwo from pg_class where relname='table_second'; - SELECT FORMAT('citus_split_%s_%s', targetNode2, tableOwnerTwo) into derivedSlotNameTwo; - SELECT slot_name into targetTwoSlotName from pg_create_logical_replication_slot(derivedSlotNameTwo, 'decoding_plugin_for_shard_split'); - - - INSERT INTO slotName_table values(targetOneSlotName, targetNode1, 1); - INSERT INTO slotName_table values(targetTwoSlotName, targetNode2, 2); - - return targetOneSlotName; -end -$$ LANGUAGE plpgsql; - --- create subscription on target node with given 'subscriptionName' -CREATE OR REPLACE FUNCTION create_subscription(targetNodeId integer, subscriptionName text) RETURNS text AS $$ -DECLARE - replicationSlotName text; - nodeportLocal int; - subname text; -begin - SELECT name into replicationSlotName from slotName_table where nodeId = targetNodeId; - EXECUTE FORMAT($sub$create subscription %s connection 'host=localhost port=57637 user=postgres dbname=regression' publication PUB1 with(create_slot=false, enabled=true, slot_name='%s', copy_data=false)$sub$, subscriptionName, replicationSlotName); - return replicationSlotName; -end -$$ LANGUAGE plpgsql; - --- create subscription on target node with given 'subscriptionName' -CREATE OR REPLACE FUNCTION create_subscription_for_owner_one(targetNodeId integer, subscriptionName text) RETURNS text AS $$ -DECLARE - replicationSlotName text; - nodeportLocal int; - subname text; -begin - SELECT name into replicationSlotName from slotName_table where id = 1; - EXECUTE FORMAT($sub$create subscription %s connection 'host=localhost port=57637 user=postgres dbname=regression' publication PUB1 with(create_slot=false, enabled=true, slot_name='%s', copy_data=false)$sub$, subscriptionName, replicationSlotName); - return replicationSlotName; -end -$$ LANGUAGE plpgsql; - --- create subscription on target node with given 'subscriptionName' -CREATE OR REPLACE FUNCTION create_subscription_for_owner_two(targetNodeId integer, subscriptionName text) RETURNS text AS $$ -DECLARE - replicationSlotName text; - nodeportLocal int; - subname text; -begin - SELECT name into replicationSlotName from slotName_table where id = 2; - EXECUTE FORMAT($sub$create subscription %s connection 'host=localhost port=57637 user=postgres dbname=regression' publication PUB2 with(create_slot=false, enabled=true, slot_name='%s', copy_data=false)$sub$, subscriptionName, replicationSlotName); - return replicationSlotName; -end -$$ LANGUAGE plpgsql;