diff --git a/src/backend/distributed/operations/split_shard_replication_setup.c b/src/backend/distributed/operations/split_shard_replication_setup.c index 99948759f..da7561db4 100644 --- a/src/backend/distributed/operations/split_shard_replication_setup.c +++ b/src/backend/distributed/operations/split_shard_replication_setup.c @@ -38,20 +38,20 @@ typedef struct NodeShardMappingEntry } NodeShardMappingEntry; /* Function declarations */ -static void ParseShardSplitInfo(ArrayType *shardInfoArrayObject, - int shardSplitInfoIndex, - uint64 *sourceShardId, - uint64 *desShardId, - int32 *minValue, - int32 *maxValue, - int32 *nodeId); +static void ParseShardSplitInfoFromDatum(Datum shardSplitInfoDatum, + uint64 *sourceShardId, + uint64 *childShardId, + int32 *minValue, + int32 *maxValue, + int32 *nodeId); + static ShardSplitInfo * CreateShardSplitInfo(uint64 sourceShardIdToSplit, uint64 desSplitChildShardId, int32 minValue, int32 maxValue, int32 nodeId); static void AddShardSplitInfoEntryForNodeInMap(ShardSplitInfo *shardSplitInfo); -static void PopulateShardSplitInfoInSM(ShardSplitInfo *shardSplitInfoArray, +static void PopulateShardSplitInfoInSM(ShardSplitInfoSMHeader *shardSplitInfoSMHeader, HTAB *shardInfoHashMap, dsm_handle dsmHandle); @@ -103,33 +103,40 @@ static int NodeShardMappingHashCompare(const void *left, const void *right, Size Datum worker_split_shard_replication_setup(PG_FUNCTION_ARGS) { + if (PG_ARGISNULL(0)) + { + ereport(ERROR, (errmsg("targets can't be null"))); + } + ArrayType *shardInfoArrayObject = PG_GETARG_ARRAYTYPE_P(0); - int shardInfoArrayLength = ARR_DIMS(shardInfoArrayObject)[0]; + if (array_contains_nulls(shardInfoArrayObject)) + { + ereport(ERROR, (errmsg("Unexpectedly shard info array contains a null value"))); + } /* SetupMap */ SetupHashMapForShardInfo(); int shardSplitInfoCount = 0; - for (int index = 0; index < shardInfoArrayLength; index++) + + ArrayIterator shardInfo_iterator = array_create_iterator(shardInfoArrayObject, 0, + NULL); + Datum shardInfoDatum = 0; + bool isnull = false; + while (array_iterate(shardInfo_iterator, &shardInfoDatum, &isnull)) { uint64 sourceShardId = 0; - uint64 desShardId = 0; + uint64 childShardId = 0; int32 minValue = 0; int32 maxValue = 0; int32 nodeId = 0; - ParseShardSplitInfo( - shardInfoArrayObject, - index, - &sourceShardId, - &desShardId, - &minValue, - &maxValue, - &nodeId); + ParseShardSplitInfoFromDatum(shardInfoDatum, &sourceShardId, &childShardId, + &minValue, &maxValue, &nodeId); ShardSplitInfo *shardSplitInfo = CreateShardSplitInfo( sourceShardId, - desShardId, + childShardId, minValue, maxValue, nodeId); @@ -139,13 +146,16 @@ worker_split_shard_replication_setup(PG_FUNCTION_ARGS) } dsm_handle dsmHandle; - ShardSplitInfo *splitShardInfoSMArray = + ShardSplitInfoSMHeader *splitShardInfoSMHeader = CreateSharedMemoryForShardSplitInfo(shardSplitInfoCount, &dsmHandle); - PopulateShardSplitInfoInSM(splitShardInfoSMArray, + PopulateShardSplitInfoInSM(splitShardInfoSMHeader, ShardInfoHashMap, dsmHandle); + /* store handle in statically allocated shared memory*/ + StoreSharedMemoryHandle(dsmHandle); + return dsmHandle; } @@ -173,144 +183,6 @@ SetupHashMapForShardInfo() } -static void -ParseShardSplitInfo(ArrayType *shardInfoArrayObject, - int shardSplitInfoIndex, - uint64 *sourceShardId, - uint64 *desShardId, - int32 *minValue, - int32 *maxValue, - int32 *nodeId) -{ - Oid elemtypeId = ARR_ELEMTYPE(shardInfoArrayObject); - int16 elemtypeLength = 0; - bool elemtypeByValue = false; - char elemtypeAlignment = 0; - get_typlenbyvalalign(elemtypeId, &elemtypeLength, &elemtypeByValue, - &elemtypeAlignment); - - int elementIndex = 0; - int indexes[] = { shardSplitInfoIndex + 1, elementIndex + 1 }; - bool isNull = false; - - /* Get source shard Id */ - Datum sourceShardIdDat = array_ref( - shardInfoArrayObject, - 2, - indexes, - -1, /* (> 0 is for fixed-length arrays -- these are assumed to be 1-d, 0-based) */ - elemtypeLength, - elemtypeByValue, - elemtypeAlignment, - &isNull); - - if (isNull) - { - ereport(ERROR, - (errcode(ERRCODE_SYNTAX_ERROR), - errmsg("null entry found for source shardId"))); - } - - *sourceShardId = DatumGetUInt64(sourceShardIdDat); - - /* Get destination shard Id */ - elementIndex++; - isNull = false; - indexes[0] = shardSplitInfoIndex + 1; - indexes[1] = elementIndex + 1; - Datum destinationShardIdDat = array_ref( - shardInfoArrayObject, - 2, - indexes, - -1, /* (> 0 is for fixed-length arrays -- these are assumed to be 1-d, 0-based) */ - elemtypeLength, - elemtypeByValue, - elemtypeAlignment, - &isNull); - - if (isNull) - { - ereport(ERROR, - (errcode(ERRCODE_SYNTAX_ERROR), - errmsg("null entry found for destination shardId"))); - } - - *desShardId = DatumGetUInt64(destinationShardIdDat); - - /* Get minValue for destination shard */ - elementIndex++; - isNull = false; - indexes[0] = shardSplitInfoIndex + 1; - indexes[1] = elementIndex + 1; - Datum minValueDat = array_ref( - shardInfoArrayObject, - 2, - indexes, - -1, /* (> 0 is for fixed-length arrays -- these are assumed to be 1-d, 0-based) */ - elemtypeLength, - elemtypeByValue, - elemtypeAlignment, - &isNull); - - if (isNull) - { - ereport(ERROR, - (errcode(ERRCODE_SYNTAX_ERROR), - errmsg("null entry found for min value"))); - } - - *minValue = DatumGetInt32(minValueDat); - - /* Get maxValue for destination shard */ - elementIndex++; - isNull = false; - indexes[0] = shardSplitInfoIndex + 1; - indexes[1] = elementIndex + 1; - Datum maxValueDat = array_ref( - shardInfoArrayObject, - 2, - indexes, - -1, /* (> 0 is for fixed-length arrays -- these are assumed to be 1-d, 0-based) */ - elemtypeLength, - elemtypeByValue, - elemtypeAlignment, - &isNull); - - if (isNull) - { - ereport(ERROR, - (errcode(ERRCODE_SYNTAX_ERROR), - errmsg("null entry found for max value"))); - } - - *maxValue = DatumGetInt32(maxValueDat); - - /* Get nodeId for shard placement*/ - elementIndex++; - isNull = false; - indexes[0] = shardSplitInfoIndex + 1; - indexes[1] = elementIndex + 1; - Datum nodeIdDat = array_ref( - shardInfoArrayObject, - 2, - indexes, - -1, /* (> 0 is for fixed-length arrays -- these are assumed to be 1-d, 0-based) */ - elemtypeLength, - elemtypeByValue, - elemtypeAlignment, - &isNull); - - if (isNull) - { - ereport(ERROR, - (errcode(ERRCODE_SYNTAX_ERROR), - errmsg("null entry found for max value"))); - } - - *nodeId = DatumGetInt32(nodeIdDat); -} - - /* * CreateShardSplitInfo function constructs ShardSplitInfo data structure * with appropriate OIs' for source and destination relation. @@ -330,6 +202,19 @@ CreateShardSplitInfo(uint64 sourceShardIdToSplit, int32 nodeId) { ShardInterval *shardIntervalToSplit = LoadShardInterval(sourceShardIdToSplit); + + /* If metadata is not synced, we cannot proceed further as split work flow assumes + * metadata to be synced on worker node hosting source shard to split. + */ + if (shardIntervalToSplit == NULL) + { + ereport(ERROR, + errmsg( + "Could not find metadata corresponding to source shard id: %ld. " + "Split workflow assumes metadata to be synced across " + "worker nodes hosting source shards.", sourceShardIdToSplit)); + } + CitusTableCacheEntry *cachedTableEntry = GetCitusTableCacheEntry( shardIntervalToSplit->relationId); @@ -358,7 +243,7 @@ CreateShardSplitInfo(uint64 sourceShardIdToSplit, ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("Invalid citusTableOid:%u " "sourceShardToSplitOid: %u," - "destSplitChildShardOid :%u ", + "destSplitChildShardOid:%u ", citusTableOid, sourceShardToSplitOid, destSplitChildShardOid))); @@ -416,8 +301,7 @@ AddShardSplitInfoEntryForNodeInMap(ShardSplitInfo *shardSplitInfo) * into shared memory segment. This information is consumed by the WAL sender * process during logical replication. * - * shardSplitInfoArray - Shared memory pointer where information has to - * be copied + * shardSplitInfoSMHeader - Shared memory header * * shardInfoHashMap - Hashmap containing parsed split information * per nodeId wise @@ -425,7 +309,7 @@ AddShardSplitInfoEntryForNodeInMap(ShardSplitInfo *shardSplitInfo) * dsmHandle - Shared memory segment handle */ static void -PopulateShardSplitInfoInSM(ShardSplitInfo *shardSplitInfoArray, +PopulateShardSplitInfoInSM(ShardSplitInfoSMHeader *shardSplitInfoSMHeader, HTAB *shardInfoHashMap, dsm_handle dsmHandle) { @@ -445,7 +329,8 @@ PopulateShardSplitInfoInSM(ShardSplitInfo *shardSplitInfoArray, ShardSplitInfo *splitShardInfo = NULL; foreach_ptr(splitShardInfo, shardSplitInfoList) { - ShardSplitInfo *shardInfoInSM = &shardSplitInfoArray[index]; + ShardSplitInfo *shardInfoInSM = + &shardSplitInfoSMHeader->splitInfoArray[index]; shardInfoInSM->distributedTableOid = splitShardInfo->distributedTableOid; shardInfoInSM->partitionColumnIndex = splitShardInfo->partitionColumnIndex; @@ -494,3 +379,56 @@ NodeShardMappingHashCompare(const void *left, const void *right, Size keysize) return 0; } } + + +static void +ParseShardSplitInfoFromDatum(Datum shardSplitInfoDatum, + uint64 *sourceShardId, + uint64 *childShardId, + int32 *minValue, + int32 *maxValue, + int32 *nodeId) +{ + HeapTupleHeader dataTuple = DatumGetHeapTupleHeader(shardSplitInfoDatum); + bool isnull = false; + + Datum sourceShardIdDatum = GetAttributeByName(dataTuple, "source_shard_id", &isnull); + if (isnull) + { + ereport(ERROR, (errmsg("source_shard_id for split_shard_info can't be null"))); + } + + *sourceShardId = DatumGetUInt64(sourceShardIdDatum); + + Datum childShardIdDatum = GetAttributeByName(dataTuple, "child_shard_id", &isnull); + if (isnull) + { + ereport(ERROR, (errmsg("child_shard_id for split_shard_info can't be null"))); + } + + *childShardId = DatumGetUInt64(childShardIdDatum); + + Datum minValueDatum = GetAttributeByName(dataTuple, "shard_min_value", &isnull); + if (isnull) + { + ereport(ERROR, (errmsg("shard_min_value for split_shard_info can't be null"))); + } + + *minValue = DatumGetInt32(minValueDatum); + + Datum maxValueDatum = GetAttributeByName(dataTuple, "shard_max_value", &isnull); + if (isnull) + { + ereport(ERROR, (errmsg("shard_max_value for split_shard_info can't be null"))); + } + + *maxValue = DatumGetInt32(maxValueDatum); + + Datum nodeIdDatum = GetAttributeByName(dataTuple, "node_id", &isnull); + if (isnull) + { + ereport(ERROR, (errmsg("node_id for split_shard_info can't be null"))); + } + + *nodeId = DatumGetInt32(nodeIdDatum); +} diff --git a/src/backend/distributed/shardsplit/Makefile b/src/backend/distributed/shardsplit/Makefile index 8d0c67b64..2c36d6ad7 100644 --- a/src/backend/distributed/shardsplit/Makefile +++ b/src/backend/distributed/shardsplit/Makefile @@ -12,7 +12,7 @@ safestringlib_srcdir = $(citus_abs_top_srcdir)/vendor/safestringlib SUBDIRS = . safeclib SUBDIRS += ENSURE_SUBDIRS_EXIST := $(shell mkdir -p $(SUBDIRS)) -OBJS += pgoutput.o +OBJS += shardsplit_decoder.o MODULE_big = decoding_plugin_for_shard_split diff --git a/src/backend/distributed/shardsplit/pgoutput.c b/src/backend/distributed/shardsplit/shardsplit_decoder.c similarity index 83% rename from src/backend/distributed/shardsplit/pgoutput.c rename to src/backend/distributed/shardsplit/shardsplit_decoder.c index e7f37faa3..237c73a33 100644 --- a/src/backend/distributed/shardsplit/pgoutput.c +++ b/src/backend/distributed/shardsplit/shardsplit_decoder.c @@ -24,9 +24,8 @@ PG_MODULE_MAGIC; extern void _PG_output_plugin_init(OutputPluginCallbacks *cb); static LogicalDecodeChangeCB pgoutputChangeCB; -ShardSplitInfo *shardSplitInfoArray = NULL; -int shardSplitInfoArraySize = 0; - +static ShardSplitInfoSMHeader *shardSplitInfoSMHeader = NULL; +static ShardSplitInfoForReplicationSlot *shardSplitInfoForSlot = NULL; /* Plugin callback */ static void split_change_cb(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, @@ -80,9 +79,10 @@ GetHashValueForIncomingTuple(Relation sourceShardRelation, Oid distributedTableOid = InvalidOid; Oid sourceShardOid = sourceShardRelation->rd_id; - for (int i = 0; i < shardSplitInfoArraySize; i++) + for (int i = shardSplitInfoForSlot->startIndex; i <= shardSplitInfoForSlot->endIndex; + i++) { - shardSplitInfo = &shardSplitInfoArray[i]; + shardSplitInfo = &shardSplitInfoSMHeader->splitInfoArray[i]; if (shardSplitInfo->sourceShardOid == sourceShardOid) { distributedTableOid = shardSplitInfo->distributedTableOid; @@ -157,19 +157,18 @@ FindTargetRelationOid(Relation sourceShardRelation, return InvalidOid; } - for (int i = 0; i < shardSplitInfoArraySize; i++) + for (int i = shardSplitInfoForSlot->startIndex; i <= shardSplitInfoForSlot->endIndex; + i++) { - ShardSplitInfo *shardSplitInfo = &shardSplitInfoArray[i]; + ShardSplitInfo *shardSplitInfo = &shardSplitInfoSMHeader->splitInfoArray[i]; /* - * Each commit message is processed by all the configured - * replication slots. However, a replication is slot only responsible - * for new shard placements belonging to a single node. We check if the - * current slot which is processing the commit should emit - * a target relation Oid. + * Each commit message is processed by all the configured replication slots. + * A replication slot is responsible for shard placements belonging to unique + * table owner and nodeId combination. We check if the current slot which is + * processing the commit should emit a target relation Oid. */ - if (strcmp(shardSplitInfo->slotName, currentSlotName) == 0 && - shardSplitInfo->sourceShardOid == sourceShardRelationOid && + if (shardSplitInfo->sourceShardOid == sourceShardRelationOid && shardSplitInfo->shardMinValue <= hashValue && shardSplitInfo->shardMaxValue >= hashValue) { @@ -194,10 +193,11 @@ bool IsCommitRecursive(Relation sourceShardRelation) { Oid sourceShardOid = sourceShardRelation->rd_id; - for (int i = 0; i < shardSplitInfoArraySize; i++) + for (int i = shardSplitInfoForSlot->startIndex; i <= shardSplitInfoForSlot->endIndex; + i++) { /* skip the commit when destination is equal to the source */ - ShardSplitInfo *shardSplitInfo = &shardSplitInfoArray[i]; + ShardSplitInfo *shardSplitInfo = &shardSplitInfoSMHeader->splitInfoArray[i]; if (sourceShardOid == shardSplitInfo->splitChildShardOid) { return true; @@ -216,19 +216,22 @@ static void split_change_cb(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change) { - /* - * Get ShardSplitInfo array from Shared Memory if not already - * initialized. This gets initialized during the replication of - * first message. - */ - if (shardSplitInfoArray == NULL) + if (!is_publishable_relation(relation)) { - shardSplitInfoArray = - GetShardSplitInfoSMArrayForSlot(ctx->slot->data.name.data, - &shardSplitInfoArraySize); + return; + } + + /* + * Get ShardSplitInfoForSlot if not already initialized. + * This gets initialized during the replication of first message. + */ + if (shardSplitInfoForSlot == NULL) + { + shardSplitInfoForSlot = PopulateShardSplitInfoForReplicationSlot( + ctx->slot->data.name.data); + shardSplitInfoSMHeader = shardSplitInfoForSlot->shardSplitInfoHeader; } - /* avoid applying recursive commit */ if (IsCommitRecursive(relation)) { return; diff --git a/src/backend/distributed/shardsplit/shardsplit_shared_memory.c b/src/backend/distributed/shardsplit/shardsplit_shared_memory.c index c8e336783..7daf24c55 100644 --- a/src/backend/distributed/shardsplit/shardsplit_shared_memory.c +++ b/src/backend/distributed/shardsplit/shardsplit_shared_memory.c @@ -15,6 +15,16 @@ #include "distributed/shardinterval_utils.h" #include "distributed/shardsplit_shared_memory.h" #include "distributed/citus_safe_lib.h" +#include "storage/ipc.h" +#include "utils/memutils.h" + +const char *sharedMemoryNameForHandleManagement = + "SHARED_MEMORY_FOR_SPLIT_SHARD_HANDLE_MANAGEMENT"; + +static shmem_startup_hook_type prev_shmem_startup_hook = NULL; + +static +void ShardSplitShmemInit(void); /* Function declarations */ static ShardSplitInfoSMHeader * AllocateSharedMemoryForShardSplitInfo(int @@ -24,8 +34,6 @@ static ShardSplitInfoSMHeader * AllocateSharedMemoryForShardSplitInfo(int dsm_handle * dsmHandle); -static void * ShardSplitInfoSMData(ShardSplitInfoSMHeader *shardSplitInfoSMHeader); - static ShardSplitInfoSMHeader * GetShardSplitInfoSMHeaderFromDSMHandle(dsm_handle dsmHandle); @@ -64,32 +72,24 @@ GetShardSplitInfoSMHeaderFromDSMHandle(dsm_handle dsmHandle) /* - * GetShardSplitInfoSMArrayForSlot returns pointer to the array of - * 'ShardSplitInfo' struct stored in the shared memory segment. + * GetShardSplitInfoSMHeader returns pointer to the header of shared memory segment. */ -ShardSplitInfo * -GetShardSplitInfoSMArrayForSlot(char *slotName, int *shardSplitInfoCount) +ShardSplitInfoSMHeader * +GetShardSplitInfoSMHeader(char *slotName) { - if (slotName == NULL || - shardSplitInfoCount == NULL) + if (slotName == NULL) { ereport(ERROR, (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED), - errmsg("Expected slot name and array size arguments"))); + errmsg("Expected slot name but found NULL"))); } - dsm_handle dsmHandle; - uint32_t nodeId = 0; - decode_replication_slot(slotName, &nodeId, &dsmHandle); + dsm_handle dsmHandle = GetSharedMemoryHandle(); ShardSplitInfoSMHeader *shardSplitInfoSMHeader = GetShardSplitInfoSMHeaderFromDSMHandle(dsmHandle); - *shardSplitInfoCount = shardSplitInfoSMHeader->shardSplitInfoCount; - ShardSplitInfo *shardSplitInfoArray = - (ShardSplitInfo *) ShardSplitInfoSMData(shardSplitInfoSMHeader); - - return shardSplitInfoArray; + return shardSplitInfoSMHeader; } @@ -114,7 +114,8 @@ AllocateSharedMemoryForShardSplitInfo(int shardSplitInfoCount, Size shardSplitIn "positive values"))); } - Size totalSize = sizeof(ShardSplitInfoSMHeader) + shardSplitInfoCount * + Size totalSize = offsetof(ShardSplitInfoSMHeader, splitInfoArray) + + shardSplitInfoCount * shardSplitInfoSize; dsm_segment *dsmSegment = dsm_create(totalSize, DSM_CREATE_NULL_IF_MAXSEGMENTS); @@ -122,7 +123,7 @@ AllocateSharedMemoryForShardSplitInfo(int shardSplitInfoCount, Size shardSplitIn { ereport(ERROR, (errmsg("could not create a dynamic shared memory segment to " - "keep shard split info"))); + "store shard split info"))); } *dsmHandle = dsm_segment_handle(dsmSegment); @@ -136,7 +137,7 @@ AllocateSharedMemoryForShardSplitInfo(int shardSplitInfoCount, Size shardSplitIn ShardSplitInfoSMHeader *shardSplitInfoSMHeader = GetShardSplitInfoSMHeaderFromDSMHandle(*dsmHandle); - shardSplitInfoSMHeader->shardSplitInfoCount = shardSplitInfoCount; + shardSplitInfoSMHeader->count = shardSplitInfoCount; return shardSplitInfoSMHeader; } @@ -144,43 +145,27 @@ AllocateSharedMemoryForShardSplitInfo(int shardSplitInfoCount, Size shardSplitIn /* * CreateSharedMemoryForShardSplitInfo is a wrapper function which creates shared memory - * for storing shard split infomation. The function returns pointer to the first element - * within this array. + * for storing shard split infomation. The function returns pointer to the header of + * shared memory segment. * * shardSplitInfoCount - number of 'ShardSplitInfo ' elements to be allocated * dsmHandle - handle of the allocated shared memory segment */ -ShardSplitInfo * +ShardSplitInfoSMHeader * CreateSharedMemoryForShardSplitInfo(int shardSplitInfoCount, dsm_handle *dsmHandle) { ShardSplitInfoSMHeader *shardSplitInfoSMHeader = AllocateSharedMemoryForShardSplitInfo(shardSplitInfoCount, sizeof(ShardSplitInfo), dsmHandle); - ShardSplitInfo *shardSplitInfoSMArray = - (ShardSplitInfo *) ShardSplitInfoSMData(shardSplitInfoSMHeader); - - return shardSplitInfoSMArray; -} - - -/* - * ShardSplitInfoSMData returns a pointer to the array of 'ShardSplitInfo'. - * This is simply the data right after the header, so this function is trivial. - * The main purpose of this function is to make the intent clear to readers - * of the code. - */ -static void * -ShardSplitInfoSMData(ShardSplitInfoSMHeader *shardSplitInfoSMHeader) -{ - return shardSplitInfoSMHeader + 1; + return shardSplitInfoSMHeader; } /* * encode_replication_slot returns an encoded replication slot name * in the following format. - * Slot Name = citus_split_nodeId_sharedMemoryHandle_tableOwnerOid + * Slot Name = citus_split_nodeId_tableOwnerOid * Max supported length of replication slot name is 64 bytes. */ char * @@ -189,51 +174,194 @@ encode_replication_slot(uint32_t nodeId, uint32_t tableOwnerId) { StringInfo slotName = makeStringInfo(); - appendStringInfo(slotName, "citus_split_%u_%u_%u", nodeId, dsmHandle, tableOwnerId); + appendStringInfo(slotName, "citus_split_%u_%u", nodeId, tableOwnerId); + + if (slotName->len > NAMEDATALEN) + { + ereport(ERROR, + (errmsg( + "Replication Slot name:%s having length:%d is greater than maximum allowed length:%d", + slotName->data, slotName->len, NAMEDATALEN))); + } + return slotName->data; } /* - * decode_replication_slot decodes the replication slot name - * into node id, shared memory handle. + * InitializeShardSplitSMHandleManagement requests the necessary shared memory + * from Postgres and sets up the shared memory startup hook. + * This memory is used to store handle of other shared memories allocated during split workflow. */ void -decode_replication_slot(char *slotName, - uint32_t *nodeId, - dsm_handle *dsmHandle) +InitializeShardSplitSMHandleManagement(void) { - int index = 0; - char *strtokPosition = NULL; - char *dupSlotName = pstrdup(slotName); - char *slotNameString = strtok_r(dupSlotName, "_", &strtokPosition); - while (slotNameString != NULL) + prev_shmem_startup_hook = shmem_startup_hook; + shmem_startup_hook = ShardSplitShmemInit; +} + + +static void +ShardSplitShmemInit(void) +{ + bool alreadyInitialized = false; + ShardSplitShmemData *smData = ShmemInitStruct(sharedMemoryNameForHandleManagement, + sizeof(ShardSplitShmemData), + &alreadyInitialized); + + if (!alreadyInitialized) { - /* third part of the slot name is NodeId */ - if (index == 2) - { - *nodeId = strtoul(slotNameString, NULL, 10); - } + char *trancheName = "Split_Shard_Setup_Tranche"; - /* fourth part of the name is memory handle */ - else if (index == 3) - { - *dsmHandle = strtoul(slotNameString, NULL, 10); - } + NamedLWLockTranche *namedLockTranche = + &smData->namedLockTranche; - slotNameString = strtok_r(NULL, "_", &strtokPosition); - index++; + /* start by zeroing out all the memory */ + memset(smData, 0, + sizeof(ShardSplitShmemData)); - /*Ignoring TableOwnerOid*/ + namedLockTranche->trancheId = LWLockNewTrancheId(); + + LWLockRegisterTranche(namedLockTranche->trancheId, trancheName); + LWLockInitialize(&smData->lock, + namedLockTranche->trancheId); + + smData->dsmHandle = DSM_HANDLE_INVALID; } - /* - * Replication slot name is encoded as citus_split_nodeId_sharedMemoryHandle_tableOwnerOid. - * Hence the number of tokens would be strictly five considering "_" as delimiter. - */ - if (index != 5) + if (prev_shmem_startup_hook != NULL) { - ereport(ERROR, - (errmsg("Invalid Replication Slot name encoding: %s", slotName))); + prev_shmem_startup_hook(); } } + + +/* + * StoreSharedMemoryHandle stores a handle of shared memory + * allocated and populated by 'worker_split_shard_replication_setup' UDF. + */ +void +StoreSharedMemoryHandle(dsm_handle dsmHandle) +{ + bool found = false; + ShardSplitShmemData *smData = ShmemInitStruct(sharedMemoryNameForHandleManagement, + sizeof(ShardSplitShmemData), + &found); + if (!found) + { + ereport(ERROR, + errmsg( + "Shared memory for handle management should have been initialized during boot")); + } + + LWLockAcquire(&smData->lock, LW_EXCLUSIVE); + + /* + * In a normal situation, previously stored handle should have been invalidated + * before the current function is called. + * If this handle is still valid, it means cleanup of previous split shard + * workflow failed. Log a waring and continue the current shard split operation. + */ + if (smData->dsmHandle != DSM_HANDLE_INVALID) + { + ereport(WARNING, + errmsg( + "As a part of split shard workflow,unexpectedly found a valid" + " shared memory handle while storing a new one.")); + } + + /* Store the incoming handle */ + smData->dsmHandle = dsmHandle; + + LWLockRelease(&smData->lock); +} + + +/* + * GetSharedMemoryHandle returns the shared memory handle stored + * by 'worker_split_shard_replication_setup' UDF. This handle + * is requested by wal sender processes during logical replication phase. + */ +dsm_handle +GetSharedMemoryHandle(void) +{ + bool found = false; + ShardSplitShmemData *smData = ShmemInitStruct(sharedMemoryNameForHandleManagement, + sizeof(ShardSplitShmemData), + &found); + if (!found) + { + ereport(ERROR, + errmsg( + "Shared memory for handle management should have been initialized during boot")); + } + + LWLockAcquire(&smData->lock, LW_SHARED); + dsm_handle dsmHandle = smData->dsmHandle; + LWLockRelease(&smData->lock); + + return dsmHandle; +} + + +/* + * PopulateShardSplitInfoForReplicationSlot function traverses 'ShardSplitInfo' array + * stored within shared memory segment. It returns the starting and ending index position + * of a given slot within this array. When the given replication slot processes a commit, + * traversal is only limited within this bound thus enhancing performance. + */ +ShardSplitInfoForReplicationSlot * +PopulateShardSplitInfoForReplicationSlot(char *slotName) +{ + ShardSplitInfoSMHeader *smHeader = GetShardSplitInfoSMHeader(slotName); + + MemoryContext oldContext = MemoryContextSwitchTo(TopMemoryContext); + + ShardSplitInfoForReplicationSlot *infoForReplicationSlot = + (ShardSplitInfoForReplicationSlot *) palloc( + sizeof(ShardSplitInfoForReplicationSlot)); + infoForReplicationSlot->shardSplitInfoHeader = smHeader; + infoForReplicationSlot->startIndex = -1; + infoForReplicationSlot->endIndex = -1; + + int index = 0; + while (index < smHeader->count) + { + if (strcmp(smHeader->splitInfoArray[index].slotName, slotName) == 0) + { + /* Found the starting index from where current slot information begins */ + infoForReplicationSlot->startIndex = index; + + /* Slide forward to get the end index */ + index++; + while (index < smHeader->count && strcmp( + smHeader->splitInfoArray[index].slotName, slotName) == 0) + { + index++; + } + + infoForReplicationSlot->endIndex = index - 1; + + /* + * 'ShardSplitInfo' with same slot name are stored contiguously in shared memory segment. + * After the current 'index' position, we should not encounter any 'ShardSplitInfo' with incoming slot name. + * If this happens, there is shared memory corruption. Its worth to go ahead and assert for this assumption. + * TODO: Traverse further and assert + */ + } + + index++; + } + + if (infoForReplicationSlot->startIndex == -1) + { + ereport(ERROR, + (errmsg("Unexpectedly could not find information " + "corresponding to replication slot name:%s in shared memory.", + slotName))); + } + + MemoryContextSwitchTo(oldContext); + + return infoForReplicationSlot; +} diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index f6033b824..f9af95297 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -103,6 +103,8 @@ #include "utils/syscache.h" #include "utils/varlena.h" +#include "distributed/shardsplit_shared_memory.h" + #include "columnar/columnar.h" ColumnarSupportsIndexAM_type extern_ColumnarSupportsIndexAM = NULL; @@ -376,6 +378,9 @@ _PG_init(void) InitializeSharedConnectionStats(); InitializeLocallyReservedSharedConnections(); + /* initialize shard split shared memory handle management */ + InitializeShardSplitSMHandleManagement(); + /* enable modification of pg_catalog tables during pg_upgrade */ if (IsBinaryUpgrade) { diff --git a/src/backend/distributed/sql/udfs/split_shard_replication_setup/11.0-2.sql b/src/backend/distributed/sql/udfs/split_shard_replication_setup/11.0-2.sql index 4627747d2..3131668cb 100644 --- a/src/backend/distributed/sql/udfs/split_shard_replication_setup/11.0-2.sql +++ b/src/backend/distributed/sql/udfs/split_shard_replication_setup/11.0-2.sql @@ -1,7 +1,19 @@ +DROP TYPE IF EXISTS citus.split_shard_info; + +DROP FUNCTION IF EXISTS pg_catalog.worker_split_shard_replication_setup; + +CREATE TYPE citus.split_shard_info AS ( + source_shard_id bigint, + child_shard_id bigint, + shard_min_value integer, + shard_max_value integer, + node_id integer); + + CREATE OR REPLACE FUNCTION pg_catalog.worker_split_shard_replication_setup( - shardInfo bigint[][]) + splitShardInfo citus.split_shard_info[]) RETURNS bigint LANGUAGE C STRICT AS 'MODULE_PATHNAME', $$worker_split_shard_replication_setup$$; -COMMENT ON FUNCTION pg_catalog.worker_split_shard_replication_setup(shardInfo bigint[][]) +COMMENT ON FUNCTION pg_catalog.worker_split_shard_replication_setup(splitShardInfo citus.split_shard_info[]) IS 'Replication setup for splitting a shard' diff --git a/src/backend/distributed/sql/udfs/split_shard_replication_setup/latest.sql b/src/backend/distributed/sql/udfs/split_shard_replication_setup/latest.sql index 4627747d2..3131668cb 100644 --- a/src/backend/distributed/sql/udfs/split_shard_replication_setup/latest.sql +++ b/src/backend/distributed/sql/udfs/split_shard_replication_setup/latest.sql @@ -1,7 +1,19 @@ +DROP TYPE IF EXISTS citus.split_shard_info; + +DROP FUNCTION IF EXISTS pg_catalog.worker_split_shard_replication_setup; + +CREATE TYPE citus.split_shard_info AS ( + source_shard_id bigint, + child_shard_id bigint, + shard_min_value integer, + shard_max_value integer, + node_id integer); + + CREATE OR REPLACE FUNCTION pg_catalog.worker_split_shard_replication_setup( - shardInfo bigint[][]) + splitShardInfo citus.split_shard_info[]) RETURNS bigint LANGUAGE C STRICT AS 'MODULE_PATHNAME', $$worker_split_shard_replication_setup$$; -COMMENT ON FUNCTION pg_catalog.worker_split_shard_replication_setup(shardInfo bigint[][]) +COMMENT ON FUNCTION pg_catalog.worker_split_shard_replication_setup(splitShardInfo citus.split_shard_info[]) IS 'Replication setup for splitting a shard' diff --git a/src/include/distributed/shardsplit_shared_memory.h b/src/include/distributed/shardsplit_shared_memory.h index df64135c1..9a193438b 100644 --- a/src/include/distributed/shardsplit_shared_memory.h +++ b/src/include/distributed/shardsplit_shared_memory.h @@ -21,24 +21,52 @@ */ typedef struct ShardSplitInfoSMHeader { - int shardSplitInfoCount; /* number of elements in the shared memory */ + int count; /* number of elements in the shared memory */ + ShardSplitInfo splitInfoArray[FLEXIBLE_ARRAY_MEMBER]; } ShardSplitInfoSMHeader; +/* + * Shard split information is populated and stored in shared memory in the form of one dimensional + * array by 'worker_split_shard_replication_setup'. Information belonging to same replication + * slot is grouped together and stored contiguously within this array. + * 'ShardSplitInfoForReplicationSlot' stores the starting and ending indices for a particular + * replication slot within shared memory segment. + * When a slot processes a commit, traversing only within this boundary of shared memory segment + * improves performance. + */ +typedef struct ShardSplitInfoForReplicationSlot +{ + ShardSplitInfoSMHeader *shardSplitInfoHeader; /* shared memory segment header */ + int startIndex; /* starting index for a given slot */ + int endIndex; /* ending index for a given slot */ +} ShardSplitInfoForReplicationSlot; -/* Functions for creating and accessing shared memory segments */ -extern ShardSplitInfo * CreateSharedMemoryForShardSplitInfo(int shardSplitInfoCount, - dsm_handle *dsmHandle); +typedef struct ShardSplitShmemData +{ + int trancheId; + NamedLWLockTranche namedLockTranche; + LWLock lock; -extern ShardSplitInfo * GetShardSplitInfoSMArrayForSlot(char *slotName, - int *shardSplitInfoCount); + dsm_handle dsmHandle; +} ShardSplitShmemData; + +/* Functions for creating and accessing shared memory used for dsm handle managment */ +void InitializeShardSplitSMHandleManagement(void); + +void StoreSharedMemoryHandle(dsm_handle dsmHandle); +dsm_handle GetSharedMemoryHandle(void); + +/* Functions for creating and accessing shared memory segments consisting shard split information */ +extern ShardSplitInfoSMHeader * CreateSharedMemoryForShardSplitInfo(int + shardSplitInfoCount, + dsm_handle *dsmHandle); +extern ShardSplitInfoSMHeader * GetShardSplitInfoSMHeader(char *slotName); + +extern ShardSplitInfoForReplicationSlot * PopulateShardSplitInfoForReplicationSlot( + char *slotName); -/* Functions related to encoding-decoding for replication slot name */ char * encode_replication_slot(uint32_t nodeId, dsm_handle dsmHandle, uint32_t tableOwnerId); -void decode_replication_slot(char *slotName, - uint32_t *nodeId, - dsm_handle *dsmHandle); - #endif /* SHARDSPLIT_SHARED_MEMORY_H */ diff --git a/src/test/regress/enterprise_split_schedule b/src/test/regress/enterprise_split_schedule index 71bfcc34b..6f216ea44 100644 --- a/src/test/regress/enterprise_split_schedule +++ b/src/test/regress/enterprise_split_schedule @@ -5,6 +5,5 @@ test: multi_cluster_management test: multi_test_catalog_views test: tablespace # Split tests go here. -test: split_shard_test_helpers test: citus_split_shard_by_split_points_negative test: citus_split_shard_by_split_points diff --git a/src/test/regress/expected/split_shard_replication_colocated_setup.out b/src/test/regress/expected/split_shard_replication_colocated_setup.out index 0bda47f3e..4b1a3281d 100644 --- a/src/test/regress/expected/split_shard_replication_colocated_setup.out +++ b/src/test/regress/expected/split_shard_replication_colocated_setup.out @@ -62,44 +62,49 @@ CREATE TABLE table_second_9(id bigserial PRIMARY KEY, value char); -- Create publication at worker1 \c - postgres - :worker_1_port SET search_path TO split_shard_replication_setup_schema; -BEGIN; - CREATE PUBLICATION PUB1 for table table_first_4, table_first_5, table_first_6; -COMMIT; -BEGIN; - CREATE PUBLICATION PUB2 for table table_second_7, table_second_8, table_second_9; -COMMIT; -BEGIN; -select 1 from public.create_replication_slot_for_colocated_shards(:worker_2_node, :worker_2_node); - ?column? ---------------------------------------------------------------------- - 1 -(1 row) - -COMMIT; +CREATE PUBLICATION pub1 FOR TABLE table_first_4, table_first_5, table_first_6; +CREATE PUBLICATION pub2 FOR TABLE table_second_7, table_second_8, table_second_9; +SELECT worker_split_shard_replication_setup(ARRAY[ + ROW(4, 5, -2147483648,-1, :worker_2_node)::citus.split_shard_info, + ROW(4, 6, 0 ,2147483647, :worker_2_node)::citus.split_shard_info, + ROW(7, 8, -2147483648,-1, :worker_2_node)::citus.split_shard_info, + ROW(7, 9, 0, 2147483647 , :worker_2_node)::citus.split_shard_info + ]) AS shared_memory_id \gset +WARNING: As a part of split shard workflow,unexpectedly found a valid shared memory handle while storing a new one. +SELECT relowner AS table_owner_one FROM pg_class WHERE relname='table_first' \gset +SELECT relowner AS table_owner_two FROM pg_class WHERE relname='table_second' \gset +SELECT slot_name AS slot_for_first_owner FROM pg_create_logical_replication_slot(FORMAT('citus_split_%s_%s', :worker_2_node, :table_owner_one), 'decoding_plugin_for_shard_split') \gset +SELECT slot_name AS slot_for_second_owner FROM pg_create_logical_replication_slot(FORMAT('citus_split_%s_%s', :worker_2_node, :table_owner_two), 'decoding_plugin_for_shard_split') \gset SELECT pg_sleep(5); pg_sleep --------------------------------------------------------------------- (1 row) --- Create subscription at worker2 with copy_data to 'false' and derived replication slot name +-- Create subscription at worker2 with copy_data to 'false' \c - postgres - :worker_2_port SET search_path TO split_shard_replication_setup_schema; SET client_min_messages TO WARNING; -BEGIN; -SELECT 1 from public.create_subscription_for_owner_one(:worker_2_node, 'SUB1'); - ?column? +CREATE SUBSCRIPTION sub1 + CONNECTION 'host=localhost port=xxxxx user=postgres dbname=regression' + PUBLICATION pub1 + WITH ( + create_slot=false, + enabled=true, + slot_name=:slot_for_first_owner, + copy_data=false); +SELECT pg_sleep(5); + pg_sleep --------------------------------------------------------------------- - 1 + (1 row) -COMMIT; \c - myuser - :worker_1_port SET search_path TO split_shard_replication_setup_schema; -INSERT into table_first_4 values(100, 'a'); -INSERT into table_first_4 values(400, 'a'); -INSERT into table_first_4 values(500, 'a'); -select pg_sleep(2); +INSERT INTO table_first_4 VALUES(100, 'a'); +INSERT INTO table_first_4 VALUES(400, 'a'); +INSERT INTO table_first_4 VALUES(500, 'a'); +SELECT pg_sleep(2); pg_sleep --------------------------------------------------------------------- @@ -107,16 +112,16 @@ select pg_sleep(2); \c - admin_user - :worker_1_port SET search_path TO split_shard_replication_setup_schema; -INSERT INTO table_second_7 values(100, 'a'); -INSERT INTO table_second_7 values(400, 'a'); -SELECT * from table_second_7; +INSERT INTO table_second_7 VALUES(100, 'a'); +INSERT INTO table_second_7 VALUES(400, 'a'); +SELECT * FROM table_second_7; id | value --------------------------------------------------------------------- 100 | a 400 | a (2 rows) -select pg_sleep(2); +SELECT pg_sleep(2); pg_sleep --------------------------------------------------------------------- @@ -124,18 +129,18 @@ select pg_sleep(2); \c - myuser - :worker_2_port SET search_path TO split_shard_replication_setup_schema; -SELECT * from table_first_4; +SELECT * FROM table_first_4; id | value --------------------------------------------------------------------- (0 rows) -SELECT * from table_first_5; +SELECT * FROM table_first_5; id | value --------------------------------------------------------------------- 400 | a (1 row) -SELECT * from table_first_6; +SELECT * FROM table_first_6; id | value --------------------------------------------------------------------- 100 | a @@ -145,17 +150,17 @@ SELECT * from table_first_6; -- should have zero rows in all the below tables as the subscription is not yet created for admin_user \c - admin_user - :worker_2_port SET search_path TO split_shard_replication_setup_schema; -SELECT * from table_second_7; +SELECT * FROM table_second_7; id | value --------------------------------------------------------------------- (0 rows) -SELECT * from table_second_8; +SELECT * FROM table_second_8; id | value --------------------------------------------------------------------- (0 rows) -SELECT * from table_second_9; +SELECT * FROM table_second_9; id | value --------------------------------------------------------------------- (0 rows) @@ -163,15 +168,15 @@ SELECT * from table_second_9; \c - postgres - :worker_2_port SET search_path TO split_shard_replication_setup_schema; SET client_min_messages TO WARNING; -BEGIN; -SELECT 1 from public.create_subscription_for_owner_two(:worker_2_node, 'SUB2'); - ?column? ---------------------------------------------------------------------- - 1 -(1 row) - -COMMIT; -select pg_sleep(5); +CREATE SUBSCRIPTION sub2 + CONNECTION 'host=localhost port=xxxxx user=postgres dbname=regression' + PUBLICATION pub2 + WITH ( + create_slot=false, + enabled=true, + slot_name=:slot_for_second_owner, + copy_data=false); +SELECT pg_sleep(5); pg_sleep --------------------------------------------------------------------- @@ -180,18 +185,18 @@ select pg_sleep(5); -- expect data \c - admin_user - :worker_2_port SET search_path TO split_shard_replication_setup_schema; -SELECT * from table_second_7; +SELECT * FROM table_second_7; id | value --------------------------------------------------------------------- (0 rows) -SELECT * from table_second_8; +SELECT * FROM table_second_8; id | value --------------------------------------------------------------------- 400 | a (1 row) -SELECT * from table_second_9; +SELECT * FROM table_second_9; id | value --------------------------------------------------------------------- 100 | a diff --git a/src/test/regress/expected/split_shard_replication_setup.out b/src/test/regress/expected/split_shard_replication_setup.out index 1bd789b41..eb0735c4c 100644 --- a/src/test/regress/expected/split_shard_replication_setup.out +++ b/src/test/regress/expected/split_shard_replication_setup.out @@ -49,34 +49,26 @@ CREATE TABLE table_to_split_3(id bigserial PRIMARY KEY, value char); -- This is needed for Pub/Sub framework to work. \c - - - :worker_1_port SET search_path TO split_shard_replication_setup_schema; -BEGIN; - CREATE TABLE table_to_split_2(id bigserial PRIMARY KEY, value char); - CREATE TABLE table_to_split_3(id bigserial PRIMARY KEY, value char); -COMMIT; +CREATE TABLE table_to_split_2(id bigserial PRIMARY KEY, value char); +CREATE TABLE table_to_split_3(id bigserial PRIMARY KEY, value char); -- Create publication at worker1 -BEGIN; - CREATE PUBLICATION PUB1 for table table_to_split_1, table_to_split_2, table_to_split_3; -COMMIT; --- Create replication slot for target node worker2 -BEGIN; -select 1 from public.create_replication_slot(:worker_2_node, :worker_2_node); - ?column? ---------------------------------------------------------------------- - 1 -(1 row) - -COMMIT; +CREATE PUBLICATION pub1 FOR TABLE table_to_split_1, table_to_split_2, table_to_split_3; +SELECT worker_split_shard_replication_setup(ARRAY[ + ROW(1,2,-2147483648,-1, :worker_2_node)::citus.split_shard_info, + ROW(1,3,0,2147483647, :worker_2_node)::citus.split_shard_info + ]) AS shared_memory_id \gset +SELECT slot_name FROM pg_create_logical_replication_slot(FORMAT('citus_split_%s_10', :worker_2_node), 'decoding_plugin_for_shard_split') \gset -- Create subscription at worker2 with copy_data to 'false' and derived replication slot name \c - - - :worker_2_port SET search_path TO split_shard_replication_setup_schema; -BEGIN; -SELECT 1 from public.create_subscription(:worker_2_node, 'SUB1'); - ?column? ---------------------------------------------------------------------- - 1 -(1 row) - -COMMIT; +CREATE SUBSCRIPTION sub1 + CONNECTION 'host=localhost port=xxxxx user=postgres dbname=regression' + PUBLICATION pub1 + WITH ( + create_slot=false, + enabled=true, + slot_name=:slot_name, + copy_data=false); select pg_sleep(5); pg_sleep --------------------------------------------------------------------- @@ -84,17 +76,17 @@ select pg_sleep(5); (1 row) -- No data is present at this moment in all the below tables at worker2 -SELECT * from table_to_split_1; +SELECT * FROM table_to_split_1; id | value --------------------------------------------------------------------- (0 rows) -SELECT * from table_to_split_2; +SELECT * FROM table_to_split_2; id | value --------------------------------------------------------------------- (0 rows) -SELECT * from table_to_split_3; +SELECT * FROM table_to_split_3; id | value --------------------------------------------------------------------- (0 rows) @@ -102,10 +94,10 @@ SELECT * from table_to_split_3; -- Insert data in table_to_split_1 at worker1 \c - - - :worker_1_port SET search_path TO split_shard_replication_setup_schema; -INSERT into table_to_split_1 values(100, 'a'); -INSERT into table_to_split_1 values(400, 'a'); -INSERT into table_to_split_1 values(500, 'a'); -SELECT * from table_to_split_1; +INSERT INTO table_to_split_1 values(100, 'a'); +INSERT INTO table_to_split_1 values(400, 'a'); +INSERT INTO table_to_split_1 values(500, 'a'); +SELECT * FROM table_to_split_1; id | value --------------------------------------------------------------------- 100 | a @@ -113,12 +105,12 @@ SELECT * from table_to_split_1; 500 | a (3 rows) -SELECT * from table_to_split_2; +SELECT * FROM table_to_split_2; id | value --------------------------------------------------------------------- (0 rows) -SELECT * from table_to_split_3; +SELECT * FROM table_to_split_3; id | value --------------------------------------------------------------------- (0 rows) @@ -132,18 +124,18 @@ select pg_sleep(2); -- Expect data to be present in shard xxxxx and shard xxxxx based on the hash value. \c - - - :worker_2_port SET search_path TO split_shard_replication_setup_schema; -SELECT * from table_to_split_1; -- should alwasy have zero rows +SELECT * FROM table_to_split_1; -- should alwasy have zero rows id | value --------------------------------------------------------------------- (0 rows) -SELECT * from table_to_split_2; +SELECT * FROM table_to_split_2; id | value --------------------------------------------------------------------- 400 | a (1 row) -SELECT * from table_to_split_3; +SELECT * FROM table_to_split_3; id | value --------------------------------------------------------------------- 100 | a @@ -153,9 +145,9 @@ SELECT * from table_to_split_3; -- UPDATE data of table_to_split_1 from worker1 \c - - - :worker_1_port SET search_path TO split_shard_replication_setup_schema; -UPDATE table_to_split_1 SET value='b' where id = 100; -UPDATE table_to_split_1 SET value='b' where id = 400; -UPDATE table_to_split_1 SET value='b' where id = 500; +UPDATE table_to_split_1 SET value='b' WHERE id = 100; +UPDATE table_to_split_1 SET value='b' WHERE id = 400; +UPDATE table_to_split_1 SET value='b' WHERE id = 500; SELECT pg_sleep(2); pg_sleep --------------------------------------------------------------------- @@ -213,259 +205,10 @@ SELECT * FROM table_to_split_3; -- drop publication from worker1 \c - - - :worker_1_port SET search_path TO split_shard_replication_setup_schema; -drop PUBLICATION PUB1; +DROP PUBLICATION pub1; DELETE FROM slotName_table; \c - - - :worker_2_port SET search_path TO split_shard_replication_setup_schema; -SET client_min_messages TO WARNING; -DROP SUBSCRIPTION SUB1; +SET client_min_messages TO ERROR; +DROP SUBSCRIPTION sub1; DELETE FROM slotName_table; --- Test scenario two starts from here --- 1. table_to_split_1 is split into table_to_split_2 and table_to_split_3. --- 2. table_to_split_1 is located on worker1. --- 3. table_to_split_2 is located on worker1 and table_to_split_3 is located on worker2 -\c - - - :worker_1_port -SET search_path TO split_shard_replication_setup_schema; --- Create publication at worker1 -BEGIN; - CREATE PUBLICATION PUB1 for table table_to_split_1, table_to_split_2, table_to_split_3; -COMMIT; --- Create replication slots for two target nodes worker1 and worker2. --- Worker1 is target for table_to_split_2 and Worker2 is target for table_to_split_3 -BEGIN; -select 1 from public.create_replication_slot(:worker_1_node, :worker_2_node); - ?column? ---------------------------------------------------------------------- - 1 -(1 row) - -COMMIT; --- Create subscription at worker1 with copy_data to 'false' and derived replication slot name -BEGIN; -SELECT 1 from public.create_subscription(:worker_1_node, 'SUB1'); - ?column? ---------------------------------------------------------------------- - 1 -(1 row) - -COMMIT; -select pg_sleep(5); - pg_sleep ---------------------------------------------------------------------- - -(1 row) - -\c - - - :worker_2_port -SET search_path TO split_shard_replication_setup_schema; --- Create subscription at worker2 with copy_data to 'false' and derived replication slot name -BEGIN; -SELECT 1 from public.create_subscription(:worker_2_node, 'SUB2'); - ?column? ---------------------------------------------------------------------- - 1 -(1 row) - -COMMIT; -select pg_sleep(5); - pg_sleep ---------------------------------------------------------------------- - -(1 row) - --- No data is present at this moment in all the below tables at worker2 -SELECT * from table_to_split_1; - id | value ---------------------------------------------------------------------- -(0 rows) - -SELECT * from table_to_split_2; - id | value ---------------------------------------------------------------------- -(0 rows) - -SELECT * from table_to_split_3; - id | value ---------------------------------------------------------------------- -(0 rows) - --- Insert data in table_to_split_1 at worker1 -\c - - - :worker_1_port -SET search_path TO split_shard_replication_setup_schema; -INSERT into table_to_split_1 values(100, 'a'); -INSERT into table_to_split_1 values(400, 'a'); -INSERT into table_to_split_1 values(500, 'a'); -UPDATE table_to_split_1 SET value='b' where id = 400; -select pg_sleep(5); - pg_sleep ---------------------------------------------------------------------- - -(1 row) - --- expect data to present in table_to_split_2 on worker1 as its destination for value '400' -SELECT * from table_to_split_1; - id | value ---------------------------------------------------------------------- - 100 | a - 500 | a - 400 | b -(3 rows) - -SELECT * from table_to_split_2; - id | value ---------------------------------------------------------------------- - 400 | b -(1 row) - -SELECT * from table_to_split_3; - id | value ---------------------------------------------------------------------- -(0 rows) - --- Expect data to be present only in table_to_split3 on worker2 -\c - - - :worker_2_port -SET search_path TO split_shard_replication_setup_schema; -SELECT * from table_to_split_1; - id | value ---------------------------------------------------------------------- -(0 rows) - -SELECT * from table_to_split_2; - id | value ---------------------------------------------------------------------- -(0 rows) - -SELECT * from table_to_split_3; - id | value ---------------------------------------------------------------------- - 100 | a - 500 | a -(2 rows) - --- delete all from table_to_split_1 -\c - - - :worker_1_port -SET search_path TO split_shard_replication_setup_schema; -DELETE FROM table_to_split_1; -SELECT pg_sleep(5); - pg_sleep ---------------------------------------------------------------------- - -(1 row) - --- rows from table_to_split_2 should be deleted -SELECT * from table_to_split_2; - id | value ---------------------------------------------------------------------- -(0 rows) - --- rows from table_to_split_3 should be deleted -\c - - - :worker_2_port -SET search_path TO split_shard_replication_setup_schema; -SELECT * from table_to_split_3; - id | value ---------------------------------------------------------------------- -(0 rows) - - -- drop publication from worker1 -\c - - - :worker_1_port -SET search_path TO split_shard_replication_setup_schema; -SET client_min_messages TO WARNING; -DROP PUBLICATION PUB1; -DROP SUBSCRIPTION SUB1; -DELETE FROM slotName_table; -\c - - - :worker_2_port -SET search_path TO split_shard_replication_setup_schema; -SET client_min_messages TO WARNING; -DROP SUBSCRIPTION SUB2; -DELETE FROM slotName_table; --- Test scenario three starts from here (parent shard and child shards are located on same machine) --- 1. table_to_split_1 is split into table_to_split_2 and table_to_split_3. --- 2. table_to_split_1 is located on worker1. --- 3. table_to_split_2 and table_to_split_3 are located on worker1 -\c - - - :worker_1_port -SET search_path TO split_shard_replication_setup_schema; -SET client_min_messages TO WARNING; --- Create publication at worker1 -BEGIN; - CREATE PUBLICATION PUB1 for table table_to_split_1, table_to_split_2, table_to_split_3; -COMMIT; --- Worker1 is target for table_to_split_2 and table_to_split_3 -BEGIN; -select 1 from public.create_replication_slot(:worker_1_node, :worker_1_node); - ?column? ---------------------------------------------------------------------- - 1 -(1 row) - -COMMIT; --- Create subscription at worker1 with copy_data to 'false' and derived replication slot name -BEGIN; -SELECT 1 from public.create_subscription(:worker_1_node, 'SUB1'); - ?column? ---------------------------------------------------------------------- - 1 -(1 row) - -COMMIT; -SELECT pg_sleep(5); - pg_sleep ---------------------------------------------------------------------- - -(1 row) - -INSERT into table_to_split_1 values(100, 'a'); -INSERT into table_to_split_1 values(400, 'a'); -INSERT into table_to_split_1 values(500, 'a'); -select pg_sleep(5); - pg_sleep ---------------------------------------------------------------------- - -(1 row) - --- expect data to present in table_to_split_2/3 on worker1 -SELECT * from table_to_split_1; - id | value ---------------------------------------------------------------------- - 100 | a - 400 | a - 500 | a -(3 rows) - -SELECT * from table_to_split_2; - id | value ---------------------------------------------------------------------- - 400 | a -(1 row) - -SELECT * from table_to_split_3; - id | value ---------------------------------------------------------------------- - 100 | a - 500 | a -(2 rows) - -DELETE FROM table_to_split_1; -SELECT pg_sleep(5); - pg_sleep ---------------------------------------------------------------------- - -(1 row) - -SELECT * from table_to_split_1; - id | value ---------------------------------------------------------------------- -(0 rows) - -SELECT * from table_to_split_2; - id | value ---------------------------------------------------------------------- -(0 rows) - -SELECT * from table_to_split_3; - id | value ---------------------------------------------------------------------- -(0 rows) - --- clean up -DROP PUBLICATION PUB1; -DELETE FROM slotName_table; -DROP SUBSCRIPTION SUB1; diff --git a/src/test/regress/expected/split_shard_replication_setup_local.out b/src/test/regress/expected/split_shard_replication_setup_local.out new file mode 100644 index 000000000..ca827d5bd --- /dev/null +++ b/src/test/regress/expected/split_shard_replication_setup_local.out @@ -0,0 +1,91 @@ +-- Test scenario three starts from here (parent shard and child shards are located on same machine) +-- 1. table_to_split_1 is split into table_to_split_2 and table_to_split_3. +-- 2. table_to_split_1 is located on worker1. +-- 3. table_to_split_2 and table_to_split_3 are located on worker1 +SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset +SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \gset +\c - - - :worker_1_port +SET search_path TO split_shard_replication_setup_schema; +SET client_min_messages TO ERROR; +-- Create publication at worker1 +CREATE PUBLICATION pub1 for table table_to_split_1, table_to_split_2, table_to_split_3; +-- Worker1 is target for table_to_split_2 and table_to_split_3 +SELECT worker_split_shard_replication_setup(ARRAY[ + ROW(1,2,-2147483648,-1, :worker_1_node)::citus.split_shard_info, + ROW(1,3,0,2147483647, :worker_1_node)::citus.split_shard_info + ]) AS shared_memory_id \gset +SELECT slot_name AS local_slot FROM pg_create_logical_replication_slot(FORMAT('citus_split_%s_10', :worker_1_node), 'decoding_plugin_for_shard_split') \gset +-- Create subscription at worker1 with copy_data to 'false' a +BEGIN; +CREATE SUBSCRIPTION local_subscription + CONNECTION 'host=localhost port=xxxxx user=postgres dbname=regression' + PUBLICATION pub1 + WITH ( + create_slot=false, + enabled=true, + slot_name=:local_slot, + copy_data=false); +COMMIT; +select pg_sleep(5); + pg_sleep +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO table_to_split_1 VALUES(100, 'a'); +INSERT INTO table_to_split_1 VALUES(400, 'a'); +INSERT INTO table_to_split_1 VALUES(500, 'a'); +select pg_sleep(5); + pg_sleep +--------------------------------------------------------------------- + +(1 row) + +-- expect data to present in table_to_split_2/3 on worker1 +SELECT * FROM table_to_split_1; + id | value +--------------------------------------------------------------------- + 100 | a + 400 | a + 500 | a +(3 rows) + +SELECT * FROM table_to_split_2; + id | value +--------------------------------------------------------------------- + 400 | a +(1 row) + +SELECT * FROM table_to_split_3; + id | value +--------------------------------------------------------------------- + 100 | a + 500 | a +(2 rows) + +DELETE FROM table_to_split_1; +SELECT pg_sleep(5); + pg_sleep +--------------------------------------------------------------------- + +(1 row) + +SELECT * FROM table_to_split_1; + id | value +--------------------------------------------------------------------- +(0 rows) + +SELECT * FROM table_to_split_2; + id | value +--------------------------------------------------------------------- +(0 rows) + +SELECT * FROM table_to_split_3; + id | value +--------------------------------------------------------------------- +(0 rows) + +-- clean up +DROP SUBSCRIPTION local_subscription; +DROP PUBLICATION pub1; +DELETE FROM slotName_table; diff --git a/src/test/regress/expected/split_shard_replication_setup_remote_local.out b/src/test/regress/expected/split_shard_replication_setup_remote_local.out new file mode 100644 index 000000000..6a1e7d112 --- /dev/null +++ b/src/test/regress/expected/split_shard_replication_setup_remote_local.out @@ -0,0 +1,154 @@ +-- Test scenario two starts from here +-- 1. table_to_split_1 is split into table_to_split_2 and table_to_split_3. +-- 2. table_to_split_1 is located on worker1. +-- 3. table_to_split_2 is located on worker1 and table_to_split_3 is located on worker2 +SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset +SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \gset +\c - - - :worker_1_port +SET search_path TO split_shard_replication_setup_schema; +-- Create publication at worker1 +CREATE PUBLICATION pub1 FOR TABLE table_to_split_1, table_to_split_2, table_to_split_3; +SELECT worker_split_shard_replication_setup(ARRAY[ + ROW(1,2,-2147483648,-1, :worker_1_node)::citus.split_shard_info, + ROW(1,3,0,2147483647, :worker_2_node)::citus.split_shard_info + ]) AS shared_memory_id \gset +WARNING: As a part of split shard workflow,unexpectedly found a valid shared memory handle while storing a new one. +SELECT slot_name AS slot_for_worker1 FROM pg_create_logical_replication_slot(FORMAT('citus_split_%s_10', :worker_1_node), 'decoding_plugin_for_shard_split') \gset +SELECT slot_name AS slot_for_worker2 FROM pg_create_logical_replication_slot(FORMAT('citus_split_%s_10', :worker_2_node), 'decoding_plugin_for_shard_split') \gset +-- Create subscription at worker1 with copy_data to 'false' and 'slot_for_worker1' +CREATE SUBSCRIPTION sub_worker1 + CONNECTION 'host=localhost port=xxxxx user=postgres dbname=regression' + PUBLICATION pub1 + WITH ( + create_slot=false, + enabled=true, + slot_name=:slot_for_worker1, + copy_data=false); +select pg_sleep(5); + pg_sleep +--------------------------------------------------------------------- + +(1 row) + +\c - - - :worker_2_port +SET search_path TO split_shard_replication_setup_schema; +-- Create subscription at worker2 with copy_data to 'false' and 'slot_for_worker2' +CREATE SUBSCRIPTION sub_worker2 + CONNECTION 'host=localhost port=xxxxx user=postgres dbname=regression' + PUBLICATION pub1 + WITH ( + create_slot=false, + enabled=true, + slot_name=:slot_for_worker2, + copy_data=false); +select pg_sleep(5); + pg_sleep +--------------------------------------------------------------------- + +(1 row) + +-- No data is present at this moment in all the below tables at worker2 +SELECT * FROM table_to_split_1; + id | value +--------------------------------------------------------------------- +(0 rows) + +SELECT * FROM table_to_split_2; + id | value +--------------------------------------------------------------------- +(0 rows) + +SELECT * FROM table_to_split_3; + id | value +--------------------------------------------------------------------- +(0 rows) + +-- Insert data in table_to_split_1 at worker1 +\c - - - :worker_1_port +SET search_path TO split_shard_replication_setup_schema; +INSERT INTO table_to_split_1 VALUES(100, 'a'); +INSERT INTO table_to_split_1 VALUES(400, 'a'); +INSERT INTO table_to_split_1 VALUES(500, 'a'); +UPDATE table_to_split_1 SET value='b' WHERE id = 400; +select pg_sleep(5); + pg_sleep +--------------------------------------------------------------------- + +(1 row) + +-- expect data to present in table_to_split_2 on worker1 as its destination for value '400' +SELECT * FROM table_to_split_1; + id | value +--------------------------------------------------------------------- + 100 | a + 500 | a + 400 | b +(3 rows) + +SELECT * FROM table_to_split_2; + id | value +--------------------------------------------------------------------- + 400 | b +(1 row) + +SELECT * FROM table_to_split_3; + id | value +--------------------------------------------------------------------- +(0 rows) + +-- Expect data to be present only in table_to_split3 on worker2 +\c - - - :worker_2_port +SET search_path TO split_shard_replication_setup_schema; +SELECT * FROM table_to_split_1; + id | value +--------------------------------------------------------------------- +(0 rows) + +SELECT * FROM table_to_split_2; + id | value +--------------------------------------------------------------------- +(0 rows) + +SELECT * FROM table_to_split_3; + id | value +--------------------------------------------------------------------- + 100 | a + 500 | a +(2 rows) + +-- delete all from table_to_split_1 +\c - - - :worker_1_port +SET search_path TO split_shard_replication_setup_schema; +DELETE FROM table_to_split_1; +SELECT pg_sleep(5); + pg_sleep +--------------------------------------------------------------------- + +(1 row) + +-- rows from table_to_split_2 should be deleted +SELECT * FROM table_to_split_2; + id | value +--------------------------------------------------------------------- +(0 rows) + +-- rows from table_to_split_3 should be deleted +\c - - - :worker_2_port +SET search_path TO split_shard_replication_setup_schema; +SELECT * FROM table_to_split_3; + id | value +--------------------------------------------------------------------- +(0 rows) + +\c - - - :worker_2_port +SET search_path TO split_shard_replication_setup_schema; +SET client_min_messages TO ERROR; +DROP SUBSCRIPTION sub_worker2; +DELETE FROM slotName_table; + -- drop publication from worker1 +\c - - - :worker_1_port +SET search_path TO split_shard_replication_setup_schema; +SET client_min_messages TO ERROR; +DROP SUBSCRIPTION sub_worker1; +DROP PUBLICATION pub1; +DELETE FROM slotName_table; diff --git a/src/test/regress/sql/split_shard_replication_colocated_setup.sql b/src/test/regress/sql/split_shard_replication_colocated_setup.sql index 3bf64fbcc..25f6db1ae 100644 --- a/src/test/regress/sql/split_shard_replication_colocated_setup.sql +++ b/src/test/regress/sql/split_shard_replication_colocated_setup.sql @@ -63,66 +63,82 @@ CREATE TABLE table_second_9(id bigserial PRIMARY KEY, value char); -- Create publication at worker1 \c - postgres - :worker_1_port SET search_path TO split_shard_replication_setup_schema; -BEGIN; - CREATE PUBLICATION PUB1 for table table_first_4, table_first_5, table_first_6; -COMMIT; +CREATE PUBLICATION pub1 FOR TABLE table_first_4, table_first_5, table_first_6; +CREATE PUBLICATION pub2 FOR TABLE table_second_7, table_second_8, table_second_9; -BEGIN; - CREATE PUBLICATION PUB2 for table table_second_7, table_second_8, table_second_9; -COMMIT; +SELECT worker_split_shard_replication_setup(ARRAY[ + ROW(4, 5, -2147483648,-1, :worker_2_node)::citus.split_shard_info, + ROW(4, 6, 0 ,2147483647, :worker_2_node)::citus.split_shard_info, + ROW(7, 8, -2147483648,-1, :worker_2_node)::citus.split_shard_info, + ROW(7, 9, 0, 2147483647 , :worker_2_node)::citus.split_shard_info + ]) AS shared_memory_id \gset + +SELECT relowner AS table_owner_one FROM pg_class WHERE relname='table_first' \gset +SELECT relowner AS table_owner_two FROM pg_class WHERE relname='table_second' \gset + +SELECT slot_name AS slot_for_first_owner FROM pg_create_logical_replication_slot(FORMAT('citus_split_%s_%s', :worker_2_node, :table_owner_one), 'decoding_plugin_for_shard_split') \gset + +SELECT slot_name AS slot_for_second_owner FROM pg_create_logical_replication_slot(FORMAT('citus_split_%s_%s', :worker_2_node, :table_owner_two), 'decoding_plugin_for_shard_split') \gset -BEGIN; -select 1 from public.create_replication_slot_for_colocated_shards(:worker_2_node, :worker_2_node); -COMMIT; SELECT pg_sleep(5); - --- Create subscription at worker2 with copy_data to 'false' and derived replication slot name +-- Create subscription at worker2 with copy_data to 'false' \c - postgres - :worker_2_port SET search_path TO split_shard_replication_setup_schema; SET client_min_messages TO WARNING; -BEGIN; -SELECT 1 from public.create_subscription_for_owner_one(:worker_2_node, 'SUB1'); -COMMIT; +CREATE SUBSCRIPTION sub1 + CONNECTION 'host=localhost port=57637 user=postgres dbname=regression' + PUBLICATION pub1 + WITH ( + create_slot=false, + enabled=true, + slot_name=:slot_for_first_owner, + copy_data=false); +SELECT pg_sleep(5); \c - myuser - :worker_1_port SET search_path TO split_shard_replication_setup_schema; -INSERT into table_first_4 values(100, 'a'); -INSERT into table_first_4 values(400, 'a'); -INSERT into table_first_4 values(500, 'a'); -select pg_sleep(2); +INSERT INTO table_first_4 VALUES(100, 'a'); +INSERT INTO table_first_4 VALUES(400, 'a'); +INSERT INTO table_first_4 VALUES(500, 'a'); +SELECT pg_sleep(2); \c - admin_user - :worker_1_port SET search_path TO split_shard_replication_setup_schema; -INSERT INTO table_second_7 values(100, 'a'); -INSERT INTO table_second_7 values(400, 'a'); -SELECT * from table_second_7; -select pg_sleep(2); +INSERT INTO table_second_7 VALUES(100, 'a'); +INSERT INTO table_second_7 VALUES(400, 'a'); +SELECT * FROM table_second_7; +SELECT pg_sleep(2); \c - myuser - :worker_2_port SET search_path TO split_shard_replication_setup_schema; -SELECT * from table_first_4; -SELECT * from table_first_5; -SELECT * from table_first_6; +SELECT * FROM table_first_4; +SELECT * FROM table_first_5; +SELECT * FROM table_first_6; -- should have zero rows in all the below tables as the subscription is not yet created for admin_user \c - admin_user - :worker_2_port SET search_path TO split_shard_replication_setup_schema; -SELECT * from table_second_7; -SELECT * from table_second_8; -SELECT * from table_second_9; +SELECT * FROM table_second_7; +SELECT * FROM table_second_8; +SELECT * FROM table_second_9; \c - postgres - :worker_2_port SET search_path TO split_shard_replication_setup_schema; SET client_min_messages TO WARNING; -BEGIN; -SELECT 1 from public.create_subscription_for_owner_two(:worker_2_node, 'SUB2'); -COMMIT; -select pg_sleep(5); +CREATE SUBSCRIPTION sub2 + CONNECTION 'host=localhost port=57637 user=postgres dbname=regression' + PUBLICATION pub2 + WITH ( + create_slot=false, + enabled=true, + slot_name=:slot_for_second_owner, + copy_data=false); +SELECT pg_sleep(5); -- expect data \c - admin_user - :worker_2_port SET search_path TO split_shard_replication_setup_schema; -SELECT * from table_second_7; -SELECT * from table_second_8; -SELECT * from table_second_9; +SELECT * FROM table_second_7; +SELECT * FROM table_second_8; +SELECT * FROM table_second_9; diff --git a/src/test/regress/sql/split_shard_replication_setup.sql b/src/test/regress/sql/split_shard_replication_setup.sql index 4ac2642bd..13fe8c8e7 100644 --- a/src/test/regress/sql/split_shard_replication_setup.sql +++ b/src/test/regress/sql/split_shard_replication_setup.sql @@ -45,59 +45,65 @@ CREATE TABLE table_to_split_3(id bigserial PRIMARY KEY, value char); -- This is needed for Pub/Sub framework to work. \c - - - :worker_1_port SET search_path TO split_shard_replication_setup_schema; -BEGIN; - CREATE TABLE table_to_split_2(id bigserial PRIMARY KEY, value char); - CREATE TABLE table_to_split_3(id bigserial PRIMARY KEY, value char); -COMMIT; +CREATE TABLE table_to_split_2(id bigserial PRIMARY KEY, value char); +CREATE TABLE table_to_split_3(id bigserial PRIMARY KEY, value char); -- Create publication at worker1 -BEGIN; - CREATE PUBLICATION PUB1 for table table_to_split_1, table_to_split_2, table_to_split_3; -COMMIT; +CREATE PUBLICATION pub1 FOR TABLE table_to_split_1, table_to_split_2, table_to_split_3; --- Create replication slot for target node worker2 -BEGIN; -select 1 from public.create_replication_slot(:worker_2_node, :worker_2_node); -COMMIT; +SELECT worker_split_shard_replication_setup(ARRAY[ + ROW(1,2,-2147483648,-1, :worker_2_node)::citus.split_shard_info, + ROW(1,3,0,2147483647, :worker_2_node)::citus.split_shard_info + ]) AS shared_memory_id \gset + +SELECT slot_name FROM pg_create_logical_replication_slot(FORMAT('citus_split_%s_10', :worker_2_node), 'decoding_plugin_for_shard_split') \gset -- Create subscription at worker2 with copy_data to 'false' and derived replication slot name \c - - - :worker_2_port SET search_path TO split_shard_replication_setup_schema; -BEGIN; -SELECT 1 from public.create_subscription(:worker_2_node, 'SUB1'); -COMMIT; + +CREATE SUBSCRIPTION sub1 + CONNECTION 'host=localhost port=57637 user=postgres dbname=regression' + PUBLICATION pub1 + WITH ( + create_slot=false, + enabled=true, + slot_name=:slot_name, + copy_data=false); + select pg_sleep(5); -- No data is present at this moment in all the below tables at worker2 -SELECT * from table_to_split_1; -SELECT * from table_to_split_2; -SELECT * from table_to_split_3; - +SELECT * FROM table_to_split_1; +SELECT * FROM table_to_split_2; +SELECT * FROM table_to_split_3; -- Insert data in table_to_split_1 at worker1 \c - - - :worker_1_port SET search_path TO split_shard_replication_setup_schema; -INSERT into table_to_split_1 values(100, 'a'); -INSERT into table_to_split_1 values(400, 'a'); -INSERT into table_to_split_1 values(500, 'a'); -SELECT * from table_to_split_1; -SELECT * from table_to_split_2; -SELECT * from table_to_split_3; +INSERT INTO table_to_split_1 values(100, 'a'); +INSERT INTO table_to_split_1 values(400, 'a'); +INSERT INTO table_to_split_1 values(500, 'a'); + +SELECT * FROM table_to_split_1; +SELECT * FROM table_to_split_2; +SELECT * FROM table_to_split_3; select pg_sleep(2); -- Expect data to be present in shard 2 and shard 3 based on the hash value. \c - - - :worker_2_port SET search_path TO split_shard_replication_setup_schema; -SELECT * from table_to_split_1; -- should alwasy have zero rows -SELECT * from table_to_split_2; -SELECT * from table_to_split_3; +SELECT * FROM table_to_split_1; -- should alwasy have zero rows +SELECT * FROM table_to_split_2; +SELECT * FROM table_to_split_3; -- UPDATE data of table_to_split_1 from worker1 \c - - - :worker_1_port SET search_path TO split_shard_replication_setup_schema; -UPDATE table_to_split_1 SET value='b' where id = 100; -UPDATE table_to_split_1 SET value='b' where id = 400; -UPDATE table_to_split_1 SET value='b' where id = 500; +UPDATE table_to_split_1 SET value='b' WHERE id = 100; +UPDATE table_to_split_1 SET value='b' WHERE id = 400; +UPDATE table_to_split_1 SET value='b' WHERE id = 500; + SELECT pg_sleep(2); -- Value should be updated in table_to_split_2; @@ -110,6 +116,7 @@ SELECT * FROM table_to_split_3; \c - - - :worker_1_port SET search_path TO split_shard_replication_setup_schema; DELETE FROM table_to_split_1; + SELECT pg_sleep(5); -- Child shard rows should be deleted @@ -122,144 +129,12 @@ SELECT * FROM table_to_split_3; -- drop publication from worker1 \c - - - :worker_1_port SET search_path TO split_shard_replication_setup_schema; -drop PUBLICATION PUB1; +DROP PUBLICATION pub1; DELETE FROM slotName_table; \c - - - :worker_2_port SET search_path TO split_shard_replication_setup_schema; -SET client_min_messages TO WARNING; -DROP SUBSCRIPTION SUB1; +SET client_min_messages TO ERROR; +DROP SUBSCRIPTION sub1; DELETE FROM slotName_table; --- Test scenario two starts from here --- 1. table_to_split_1 is split into table_to_split_2 and table_to_split_3. --- 2. table_to_split_1 is located on worker1. --- 3. table_to_split_2 is located on worker1 and table_to_split_3 is located on worker2 - -\c - - - :worker_1_port -SET search_path TO split_shard_replication_setup_schema; --- Create publication at worker1 -BEGIN; - CREATE PUBLICATION PUB1 for table table_to_split_1, table_to_split_2, table_to_split_3; -COMMIT; - --- Create replication slots for two target nodes worker1 and worker2. --- Worker1 is target for table_to_split_2 and Worker2 is target for table_to_split_3 -BEGIN; -select 1 from public.create_replication_slot(:worker_1_node, :worker_2_node); -COMMIT; - --- Create subscription at worker1 with copy_data to 'false' and derived replication slot name -BEGIN; -SELECT 1 from public.create_subscription(:worker_1_node, 'SUB1'); -COMMIT; -select pg_sleep(5); - -\c - - - :worker_2_port -SET search_path TO split_shard_replication_setup_schema; --- Create subscription at worker2 with copy_data to 'false' and derived replication slot name -BEGIN; -SELECT 1 from public.create_subscription(:worker_2_node, 'SUB2'); -COMMIT; -select pg_sleep(5); - --- No data is present at this moment in all the below tables at worker2 -SELECT * from table_to_split_1; -SELECT * from table_to_split_2; -SELECT * from table_to_split_3; - --- Insert data in table_to_split_1 at worker1 -\c - - - :worker_1_port -SET search_path TO split_shard_replication_setup_schema; -INSERT into table_to_split_1 values(100, 'a'); -INSERT into table_to_split_1 values(400, 'a'); -INSERT into table_to_split_1 values(500, 'a'); -UPDATE table_to_split_1 SET value='b' where id = 400; -select pg_sleep(5); - --- expect data to present in table_to_split_2 on worker1 as its destination for value '400' -SELECT * from table_to_split_1; -SELECT * from table_to_split_2; -SELECT * from table_to_split_3; - --- Expect data to be present only in table_to_split3 on worker2 -\c - - - :worker_2_port -SET search_path TO split_shard_replication_setup_schema; -SELECT * from table_to_split_1; -SELECT * from table_to_split_2; -SELECT * from table_to_split_3; - --- delete all from table_to_split_1 -\c - - - :worker_1_port -SET search_path TO split_shard_replication_setup_schema; -DELETE FROM table_to_split_1; -SELECT pg_sleep(5); - --- rows from table_to_split_2 should be deleted -SELECT * from table_to_split_2; - --- rows from table_to_split_3 should be deleted -\c - - - :worker_2_port -SET search_path TO split_shard_replication_setup_schema; -SELECT * from table_to_split_3; - - -- drop publication from worker1 -\c - - - :worker_1_port -SET search_path TO split_shard_replication_setup_schema; -SET client_min_messages TO WARNING; -DROP PUBLICATION PUB1; -DROP SUBSCRIPTION SUB1; -DELETE FROM slotName_table; - -\c - - - :worker_2_port -SET search_path TO split_shard_replication_setup_schema; -SET client_min_messages TO WARNING; -DROP SUBSCRIPTION SUB2; -DELETE FROM slotName_table; - --- Test scenario three starts from here (parent shard and child shards are located on same machine) --- 1. table_to_split_1 is split into table_to_split_2 and table_to_split_3. --- 2. table_to_split_1 is located on worker1. --- 3. table_to_split_2 and table_to_split_3 are located on worker1 - -\c - - - :worker_1_port -SET search_path TO split_shard_replication_setup_schema; -SET client_min_messages TO WARNING; - --- Create publication at worker1 -BEGIN; - CREATE PUBLICATION PUB1 for table table_to_split_1, table_to_split_2, table_to_split_3; -COMMIT; - --- Worker1 is target for table_to_split_2 and table_to_split_3 -BEGIN; -select 1 from public.create_replication_slot(:worker_1_node, :worker_1_node); -COMMIT; - --- Create subscription at worker1 with copy_data to 'false' and derived replication slot name -BEGIN; -SELECT 1 from public.create_subscription(:worker_1_node, 'SUB1'); -COMMIT; -SELECT pg_sleep(5); - -INSERT into table_to_split_1 values(100, 'a'); -INSERT into table_to_split_1 values(400, 'a'); -INSERT into table_to_split_1 values(500, 'a'); -select pg_sleep(5); - --- expect data to present in table_to_split_2/3 on worker1 -SELECT * from table_to_split_1; -SELECT * from table_to_split_2; -SELECT * from table_to_split_3; - - -DELETE FROM table_to_split_1; -SELECT pg_sleep(5); -SELECT * from table_to_split_1; -SELECT * from table_to_split_2; -SELECT * from table_to_split_3; - --- clean up -DROP PUBLICATION PUB1; -DELETE FROM slotName_table; -DROP SUBSCRIPTION SUB1; diff --git a/src/test/regress/sql/split_shard_replication_setup_local.sql b/src/test/regress/sql/split_shard_replication_setup_local.sql new file mode 100644 index 000000000..ffb6f8acf --- /dev/null +++ b/src/test/regress/sql/split_shard_replication_setup_local.sql @@ -0,0 +1,56 @@ +-- Test scenario three starts from here (parent shard and child shards are located on same machine) +-- 1. table_to_split_1 is split into table_to_split_2 and table_to_split_3. +-- 2. table_to_split_1 is located on worker1. +-- 3. table_to_split_2 and table_to_split_3 are located on worker1 +SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset +SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \gset + +\c - - - :worker_1_port +SET search_path TO split_shard_replication_setup_schema; +SET client_min_messages TO ERROR; + +-- Create publication at worker1 +CREATE PUBLICATION pub1 for table table_to_split_1, table_to_split_2, table_to_split_3; + +-- Worker1 is target for table_to_split_2 and table_to_split_3 +SELECT worker_split_shard_replication_setup(ARRAY[ + ROW(1,2,-2147483648,-1, :worker_1_node)::citus.split_shard_info, + ROW(1,3,0,2147483647, :worker_1_node)::citus.split_shard_info + ]) AS shared_memory_id \gset + +SELECT slot_name AS local_slot FROM pg_create_logical_replication_slot(FORMAT('citus_split_%s_10', :worker_1_node), 'decoding_plugin_for_shard_split') \gset + +-- Create subscription at worker1 with copy_data to 'false' a +BEGIN; +CREATE SUBSCRIPTION local_subscription + CONNECTION 'host=localhost port=57637 user=postgres dbname=regression' + PUBLICATION pub1 + WITH ( + create_slot=false, + enabled=true, + slot_name=:local_slot, + copy_data=false); +COMMIT; +select pg_sleep(5); + +INSERT INTO table_to_split_1 VALUES(100, 'a'); +INSERT INTO table_to_split_1 VALUES(400, 'a'); +INSERT INTO table_to_split_1 VALUES(500, 'a'); +select pg_sleep(5); + +-- expect data to present in table_to_split_2/3 on worker1 +SELECT * FROM table_to_split_1; +SELECT * FROM table_to_split_2; +SELECT * FROM table_to_split_3; + + +DELETE FROM table_to_split_1; +SELECT pg_sleep(5); +SELECT * FROM table_to_split_1; +SELECT * FROM table_to_split_2; +SELECT * FROM table_to_split_3; + +-- clean up +DROP SUBSCRIPTION local_subscription; +DROP PUBLICATION pub1; +DELETE FROM slotName_table; diff --git a/src/test/regress/sql/split_shard_replication_setup_remote_local.sql b/src/test/regress/sql/split_shard_replication_setup_remote_local.sql new file mode 100644 index 000000000..1baa5fcb3 --- /dev/null +++ b/src/test/regress/sql/split_shard_replication_setup_remote_local.sql @@ -0,0 +1,99 @@ +-- Test scenario two starts from here +-- 1. table_to_split_1 is split into table_to_split_2 and table_to_split_3. +-- 2. table_to_split_1 is located on worker1. +-- 3. table_to_split_2 is located on worker1 and table_to_split_3 is located on worker2 +SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset +SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \gset + +\c - - - :worker_1_port +SET search_path TO split_shard_replication_setup_schema; + +-- Create publication at worker1 +CREATE PUBLICATION pub1 FOR TABLE table_to_split_1, table_to_split_2, table_to_split_3; + +SELECT worker_split_shard_replication_setup(ARRAY[ + ROW(1,2,-2147483648,-1, :worker_1_node)::citus.split_shard_info, + ROW(1,3,0,2147483647, :worker_2_node)::citus.split_shard_info + ]) AS shared_memory_id \gset + +SELECT slot_name AS slot_for_worker1 FROM pg_create_logical_replication_slot(FORMAT('citus_split_%s_10', :worker_1_node), 'decoding_plugin_for_shard_split') \gset +SELECT slot_name AS slot_for_worker2 FROM pg_create_logical_replication_slot(FORMAT('citus_split_%s_10', :worker_2_node), 'decoding_plugin_for_shard_split') \gset + +-- Create subscription at worker1 with copy_data to 'false' and 'slot_for_worker1' +CREATE SUBSCRIPTION sub_worker1 + CONNECTION 'host=localhost port=57637 user=postgres dbname=regression' + PUBLICATION pub1 + WITH ( + create_slot=false, + enabled=true, + slot_name=:slot_for_worker1, + copy_data=false); +select pg_sleep(5); + +\c - - - :worker_2_port +SET search_path TO split_shard_replication_setup_schema; + +-- Create subscription at worker2 with copy_data to 'false' and 'slot_for_worker2' +CREATE SUBSCRIPTION sub_worker2 + CONNECTION 'host=localhost port=57637 user=postgres dbname=regression' + PUBLICATION pub1 + WITH ( + create_slot=false, + enabled=true, + slot_name=:slot_for_worker2, + copy_data=false); +select pg_sleep(5); + +-- No data is present at this moment in all the below tables at worker2 +SELECT * FROM table_to_split_1; +SELECT * FROM table_to_split_2; +SELECT * FROM table_to_split_3; + +-- Insert data in table_to_split_1 at worker1 +\c - - - :worker_1_port +SET search_path TO split_shard_replication_setup_schema; +INSERT INTO table_to_split_1 VALUES(100, 'a'); +INSERT INTO table_to_split_1 VALUES(400, 'a'); +INSERT INTO table_to_split_1 VALUES(500, 'a'); +UPDATE table_to_split_1 SET value='b' WHERE id = 400; +select pg_sleep(5); + +-- expect data to present in table_to_split_2 on worker1 as its destination for value '400' +SELECT * FROM table_to_split_1; +SELECT * FROM table_to_split_2; +SELECT * FROM table_to_split_3; + +-- Expect data to be present only in table_to_split3 on worker2 +\c - - - :worker_2_port +SET search_path TO split_shard_replication_setup_schema; +SELECT * FROM table_to_split_1; +SELECT * FROM table_to_split_2; +SELECT * FROM table_to_split_3; + +-- delete all from table_to_split_1 +\c - - - :worker_1_port +SET search_path TO split_shard_replication_setup_schema; +DELETE FROM table_to_split_1; +SELECT pg_sleep(5); + +-- rows from table_to_split_2 should be deleted +SELECT * FROM table_to_split_2; + +-- rows from table_to_split_3 should be deleted +\c - - - :worker_2_port +SET search_path TO split_shard_replication_setup_schema; +SELECT * FROM table_to_split_3; + +\c - - - :worker_2_port +SET search_path TO split_shard_replication_setup_schema; +SET client_min_messages TO ERROR; +DROP SUBSCRIPTION sub_worker2; +DELETE FROM slotName_table; + + -- drop publication from worker1 +\c - - - :worker_1_port +SET search_path TO split_shard_replication_setup_schema; +SET client_min_messages TO ERROR; +DROP SUBSCRIPTION sub_worker1; +DROP PUBLICATION pub1; +DELETE FROM slotName_table; \ No newline at end of file diff --git a/src/test/regress/sql/split_shard_test_helpers.sql b/src/test/regress/sql/split_shard_test_helpers.sql index ba802e94e..c195049ff 100644 --- a/src/test/regress/sql/split_shard_test_helpers.sql +++ b/src/test/regress/sql/split_shard_test_helpers.sql @@ -7,7 +7,9 @@ DECLARE memoryId bigint := 0; memoryIdText text; begin - SELECT * into memoryId from worker_split_shard_replication_setup(ARRAY[ARRAY[1,2,-2147483648,-1, targetNode1], ARRAY[1,3,0,2147483647,targetNode2]]); + SELECT * into memoryId from worker_split_shard_replication_setup ( + ARRAY[ROW(1,2,-2147483648,-1, targetNode1)::citus.split_shard_info, + ROW(1,3,0,2147483647, targetNode2)::citus.split_shard_info]); SELECT FORMAT('%s', memoryId) into memoryIdText; return memoryIdText; end @@ -23,12 +25,12 @@ DECLARE begin SELECT * into sharedMemoryId from public.split_shard_replication_setup_helper(targetNode1, targetNode2); - SELECT FORMAT('citus_split_%s_%s_10', targetNode1, sharedMemoryId) into derivedSlotName; + SELECT FORMAT('citus_split_%s_10', targetNode1) into derivedSlotName; SELECT slot_name into targetOneSlotName from pg_create_logical_replication_slot(derivedSlotName, 'decoding_plugin_for_shard_split'); -- if new child shards are placed on different nodes, create one more replication slot if (targetNode1 != targetNode2) then - SELECT FORMAT('citus_split_%s_%s_10', targetNode2, sharedMemoryId) into derivedSlotName; + SELECT FORMAT('citus_split_%s_10', targetNode2) into derivedSlotName; SELECT slot_name into targetTwoSlotName from pg_create_logical_replication_slot(derivedSlotName, 'decoding_plugin_for_shard_split'); INSERT INTO slotName_table values(targetTwoSlotName, targetNode2, 1); end if; @@ -47,10 +49,10 @@ DECLARE begin SELECT * into memoryId from worker_split_shard_replication_setup( ARRAY[ - ARRAY[4, 5, -2147483648,-1, targetNode1], - ARRAY[4, 6, 0 ,2147483647, targetNode2], - ARRAY[7, 8, -2147483648,-1, targetNode1], - ARRAY[7, 9, 0, 2147483647 , targetNode2] + ROW(4, 5, -2147483648,-1, targetNode1)::citus.split_shard_info, + ROW(4, 6, 0 ,2147483647, targetNode2)::citus.split_shard_info, + ROW(7, 8, -2147483648,-1, targetNode1)::citus.split_shard_info, + ROW(7, 9, 0, 2147483647 , targetNode2)::citus.split_shard_info ]); SELECT FORMAT('%s', memoryId) into memoryIdText; @@ -73,11 +75,11 @@ begin SELECT * into sharedMemoryId from public.split_shard_replication_setup_for_colocated_shards(targetNode1, targetNode2); SELECT relowner into tableOwnerOne from pg_class where relname='table_first'; - SELECT FORMAT('citus_split_%s_%s_%s', targetNode1, sharedMemoryId, tableOwnerOne) into derivedSlotNameOne; + SELECT FORMAT('citus_split_%s_%s', targetNode1, tableOwnerOne) into derivedSlotNameOne; SELECT slot_name into targetOneSlotName from pg_create_logical_replication_slot(derivedSlotNameOne, 'decoding_plugin_for_shard_split'); SELECT relowner into tableOwnerTwo from pg_class where relname='table_second'; - SELECT FORMAT('citus_split_%s_%s_%s', targetNode2, sharedMemoryId, tableOwnerTwo) into derivedSlotNameTwo; + SELECT FORMAT('citus_split_%s_%s', targetNode2, tableOwnerTwo) into derivedSlotNameTwo; SELECT slot_name into targetTwoSlotName from pg_create_logical_replication_slot(derivedSlotNameTwo, 'decoding_plugin_for_shard_split');