Addressing review comments

1. Changed SM memory header
2. Changed API prototype
3. Refactored testcase using gset
users/saawasek/non_blocking_split_integrated
Sameer Awasekar 2022-06-20 15:13:22 +05:30
parent 5b82fd2ea3
commit b66067d09f
18 changed files with 992 additions and 826 deletions

View File

@ -38,20 +38,20 @@ typedef struct NodeShardMappingEntry
} NodeShardMappingEntry;
/* Function declarations */
static void ParseShardSplitInfo(ArrayType *shardInfoArrayObject,
int shardSplitInfoIndex,
uint64 *sourceShardId,
uint64 *desShardId,
int32 *minValue,
int32 *maxValue,
int32 *nodeId);
static void ParseShardSplitInfoFromDatum(Datum shardSplitInfoDatum,
uint64 *sourceShardId,
uint64 *childShardId,
int32 *minValue,
int32 *maxValue,
int32 *nodeId);
static ShardSplitInfo * CreateShardSplitInfo(uint64 sourceShardIdToSplit,
uint64 desSplitChildShardId,
int32 minValue,
int32 maxValue,
int32 nodeId);
static void AddShardSplitInfoEntryForNodeInMap(ShardSplitInfo *shardSplitInfo);
static void PopulateShardSplitInfoInSM(ShardSplitInfo *shardSplitInfoArray,
static void PopulateShardSplitInfoInSM(ShardSplitInfoSMHeader *shardSplitInfoSMHeader,
HTAB *shardInfoHashMap,
dsm_handle dsmHandle);
@ -103,33 +103,40 @@ static int NodeShardMappingHashCompare(const void *left, const void *right, Size
Datum
worker_split_shard_replication_setup(PG_FUNCTION_ARGS)
{
if (PG_ARGISNULL(0))
{
ereport(ERROR, (errmsg("targets can't be null")));
}
ArrayType *shardInfoArrayObject = PG_GETARG_ARRAYTYPE_P(0);
int shardInfoArrayLength = ARR_DIMS(shardInfoArrayObject)[0];
if (array_contains_nulls(shardInfoArrayObject))
{
ereport(ERROR, (errmsg("Unexpectedly shard info array contains a null value")));
}
/* SetupMap */
SetupHashMapForShardInfo();
int shardSplitInfoCount = 0;
for (int index = 0; index < shardInfoArrayLength; index++)
ArrayIterator shardInfo_iterator = array_create_iterator(shardInfoArrayObject, 0,
NULL);
Datum shardInfoDatum = 0;
bool isnull = false;
while (array_iterate(shardInfo_iterator, &shardInfoDatum, &isnull))
{
uint64 sourceShardId = 0;
uint64 desShardId = 0;
uint64 childShardId = 0;
int32 minValue = 0;
int32 maxValue = 0;
int32 nodeId = 0;
ParseShardSplitInfo(
shardInfoArrayObject,
index,
&sourceShardId,
&desShardId,
&minValue,
&maxValue,
&nodeId);
ParseShardSplitInfoFromDatum(shardInfoDatum, &sourceShardId, &childShardId,
&minValue, &maxValue, &nodeId);
ShardSplitInfo *shardSplitInfo = CreateShardSplitInfo(
sourceShardId,
desShardId,
childShardId,
minValue,
maxValue,
nodeId);
@ -139,13 +146,16 @@ worker_split_shard_replication_setup(PG_FUNCTION_ARGS)
}
dsm_handle dsmHandle;
ShardSplitInfo *splitShardInfoSMArray =
ShardSplitInfoSMHeader *splitShardInfoSMHeader =
CreateSharedMemoryForShardSplitInfo(shardSplitInfoCount, &dsmHandle);
PopulateShardSplitInfoInSM(splitShardInfoSMArray,
PopulateShardSplitInfoInSM(splitShardInfoSMHeader,
ShardInfoHashMap,
dsmHandle);
/* store handle in statically allocated shared memory*/
StoreSharedMemoryHandle(dsmHandle);
return dsmHandle;
}
@ -173,144 +183,6 @@ SetupHashMapForShardInfo()
}
static void
ParseShardSplitInfo(ArrayType *shardInfoArrayObject,
int shardSplitInfoIndex,
uint64 *sourceShardId,
uint64 *desShardId,
int32 *minValue,
int32 *maxValue,
int32 *nodeId)
{
Oid elemtypeId = ARR_ELEMTYPE(shardInfoArrayObject);
int16 elemtypeLength = 0;
bool elemtypeByValue = false;
char elemtypeAlignment = 0;
get_typlenbyvalalign(elemtypeId, &elemtypeLength, &elemtypeByValue,
&elemtypeAlignment);
int elementIndex = 0;
int indexes[] = { shardSplitInfoIndex + 1, elementIndex + 1 };
bool isNull = false;
/* Get source shard Id */
Datum sourceShardIdDat = array_ref(
shardInfoArrayObject,
2,
indexes,
-1, /* (> 0 is for fixed-length arrays -- these are assumed to be 1-d, 0-based) */
elemtypeLength,
elemtypeByValue,
elemtypeAlignment,
&isNull);
if (isNull)
{
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("null entry found for source shardId")));
}
*sourceShardId = DatumGetUInt64(sourceShardIdDat);
/* Get destination shard Id */
elementIndex++;
isNull = false;
indexes[0] = shardSplitInfoIndex + 1;
indexes[1] = elementIndex + 1;
Datum destinationShardIdDat = array_ref(
shardInfoArrayObject,
2,
indexes,
-1, /* (> 0 is for fixed-length arrays -- these are assumed to be 1-d, 0-based) */
elemtypeLength,
elemtypeByValue,
elemtypeAlignment,
&isNull);
if (isNull)
{
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("null entry found for destination shardId")));
}
*desShardId = DatumGetUInt64(destinationShardIdDat);
/* Get minValue for destination shard */
elementIndex++;
isNull = false;
indexes[0] = shardSplitInfoIndex + 1;
indexes[1] = elementIndex + 1;
Datum minValueDat = array_ref(
shardInfoArrayObject,
2,
indexes,
-1, /* (> 0 is for fixed-length arrays -- these are assumed to be 1-d, 0-based) */
elemtypeLength,
elemtypeByValue,
elemtypeAlignment,
&isNull);
if (isNull)
{
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("null entry found for min value")));
}
*minValue = DatumGetInt32(minValueDat);
/* Get maxValue for destination shard */
elementIndex++;
isNull = false;
indexes[0] = shardSplitInfoIndex + 1;
indexes[1] = elementIndex + 1;
Datum maxValueDat = array_ref(
shardInfoArrayObject,
2,
indexes,
-1, /* (> 0 is for fixed-length arrays -- these are assumed to be 1-d, 0-based) */
elemtypeLength,
elemtypeByValue,
elemtypeAlignment,
&isNull);
if (isNull)
{
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("null entry found for max value")));
}
*maxValue = DatumGetInt32(maxValueDat);
/* Get nodeId for shard placement*/
elementIndex++;
isNull = false;
indexes[0] = shardSplitInfoIndex + 1;
indexes[1] = elementIndex + 1;
Datum nodeIdDat = array_ref(
shardInfoArrayObject,
2,
indexes,
-1, /* (> 0 is for fixed-length arrays -- these are assumed to be 1-d, 0-based) */
elemtypeLength,
elemtypeByValue,
elemtypeAlignment,
&isNull);
if (isNull)
{
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("null entry found for max value")));
}
*nodeId = DatumGetInt32(nodeIdDat);
}
/*
* CreateShardSplitInfo function constructs ShardSplitInfo data structure
* with appropriate OIs' for source and destination relation.
@ -330,6 +202,19 @@ CreateShardSplitInfo(uint64 sourceShardIdToSplit,
int32 nodeId)
{
ShardInterval *shardIntervalToSplit = LoadShardInterval(sourceShardIdToSplit);
/* If metadata is not synced, we cannot proceed further as split work flow assumes
* metadata to be synced on worker node hosting source shard to split.
*/
if (shardIntervalToSplit == NULL)
{
ereport(ERROR,
errmsg(
"Could not find metadata corresponding to source shard id: %ld. "
"Split workflow assumes metadata to be synced across "
"worker nodes hosting source shards.", sourceShardIdToSplit));
}
CitusTableCacheEntry *cachedTableEntry = GetCitusTableCacheEntry(
shardIntervalToSplit->relationId);
@ -358,7 +243,7 @@ CreateShardSplitInfo(uint64 sourceShardIdToSplit,
ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR),
errmsg("Invalid citusTableOid:%u "
"sourceShardToSplitOid: %u,"
"destSplitChildShardOid :%u ",
"destSplitChildShardOid:%u ",
citusTableOid,
sourceShardToSplitOid,
destSplitChildShardOid)));
@ -416,8 +301,7 @@ AddShardSplitInfoEntryForNodeInMap(ShardSplitInfo *shardSplitInfo)
* into shared memory segment. This information is consumed by the WAL sender
* process during logical replication.
*
* shardSplitInfoArray - Shared memory pointer where information has to
* be copied
* shardSplitInfoSMHeader - Shared memory header
*
* shardInfoHashMap - Hashmap containing parsed split information
* per nodeId wise
@ -425,7 +309,7 @@ AddShardSplitInfoEntryForNodeInMap(ShardSplitInfo *shardSplitInfo)
* dsmHandle - Shared memory segment handle
*/
static void
PopulateShardSplitInfoInSM(ShardSplitInfo *shardSplitInfoArray,
PopulateShardSplitInfoInSM(ShardSplitInfoSMHeader *shardSplitInfoSMHeader,
HTAB *shardInfoHashMap,
dsm_handle dsmHandle)
{
@ -445,7 +329,8 @@ PopulateShardSplitInfoInSM(ShardSplitInfo *shardSplitInfoArray,
ShardSplitInfo *splitShardInfo = NULL;
foreach_ptr(splitShardInfo, shardSplitInfoList)
{
ShardSplitInfo *shardInfoInSM = &shardSplitInfoArray[index];
ShardSplitInfo *shardInfoInSM =
&shardSplitInfoSMHeader->splitInfoArray[index];
shardInfoInSM->distributedTableOid = splitShardInfo->distributedTableOid;
shardInfoInSM->partitionColumnIndex = splitShardInfo->partitionColumnIndex;
@ -494,3 +379,56 @@ NodeShardMappingHashCompare(const void *left, const void *right, Size keysize)
return 0;
}
}
static void
ParseShardSplitInfoFromDatum(Datum shardSplitInfoDatum,
uint64 *sourceShardId,
uint64 *childShardId,
int32 *minValue,
int32 *maxValue,
int32 *nodeId)
{
HeapTupleHeader dataTuple = DatumGetHeapTupleHeader(shardSplitInfoDatum);
bool isnull = false;
Datum sourceShardIdDatum = GetAttributeByName(dataTuple, "source_shard_id", &isnull);
if (isnull)
{
ereport(ERROR, (errmsg("source_shard_id for split_shard_info can't be null")));
}
*sourceShardId = DatumGetUInt64(sourceShardIdDatum);
Datum childShardIdDatum = GetAttributeByName(dataTuple, "child_shard_id", &isnull);
if (isnull)
{
ereport(ERROR, (errmsg("child_shard_id for split_shard_info can't be null")));
}
*childShardId = DatumGetUInt64(childShardIdDatum);
Datum minValueDatum = GetAttributeByName(dataTuple, "shard_min_value", &isnull);
if (isnull)
{
ereport(ERROR, (errmsg("shard_min_value for split_shard_info can't be null")));
}
*minValue = DatumGetInt32(minValueDatum);
Datum maxValueDatum = GetAttributeByName(dataTuple, "shard_max_value", &isnull);
if (isnull)
{
ereport(ERROR, (errmsg("shard_max_value for split_shard_info can't be null")));
}
*maxValue = DatumGetInt32(maxValueDatum);
Datum nodeIdDatum = GetAttributeByName(dataTuple, "node_id", &isnull);
if (isnull)
{
ereport(ERROR, (errmsg("node_id for split_shard_info can't be null")));
}
*nodeId = DatumGetInt32(nodeIdDatum);
}

View File

@ -12,7 +12,7 @@ safestringlib_srcdir = $(citus_abs_top_srcdir)/vendor/safestringlib
SUBDIRS = . safeclib
SUBDIRS +=
ENSURE_SUBDIRS_EXIST := $(shell mkdir -p $(SUBDIRS))
OBJS += pgoutput.o
OBJS += shardsplit_decoder.o
MODULE_big = decoding_plugin_for_shard_split

View File

@ -24,9 +24,8 @@ PG_MODULE_MAGIC;
extern void _PG_output_plugin_init(OutputPluginCallbacks *cb);
static LogicalDecodeChangeCB pgoutputChangeCB;
ShardSplitInfo *shardSplitInfoArray = NULL;
int shardSplitInfoArraySize = 0;
static ShardSplitInfoSMHeader *shardSplitInfoSMHeader = NULL;
static ShardSplitInfoForReplicationSlot *shardSplitInfoForSlot = NULL;
/* Plugin callback */
static void split_change_cb(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
@ -80,9 +79,10 @@ GetHashValueForIncomingTuple(Relation sourceShardRelation,
Oid distributedTableOid = InvalidOid;
Oid sourceShardOid = sourceShardRelation->rd_id;
for (int i = 0; i < shardSplitInfoArraySize; i++)
for (int i = shardSplitInfoForSlot->startIndex; i <= shardSplitInfoForSlot->endIndex;
i++)
{
shardSplitInfo = &shardSplitInfoArray[i];
shardSplitInfo = &shardSplitInfoSMHeader->splitInfoArray[i];
if (shardSplitInfo->sourceShardOid == sourceShardOid)
{
distributedTableOid = shardSplitInfo->distributedTableOid;
@ -157,19 +157,18 @@ FindTargetRelationOid(Relation sourceShardRelation,
return InvalidOid;
}
for (int i = 0; i < shardSplitInfoArraySize; i++)
for (int i = shardSplitInfoForSlot->startIndex; i <= shardSplitInfoForSlot->endIndex;
i++)
{
ShardSplitInfo *shardSplitInfo = &shardSplitInfoArray[i];
ShardSplitInfo *shardSplitInfo = &shardSplitInfoSMHeader->splitInfoArray[i];
/*
* Each commit message is processed by all the configured
* replication slots. However, a replication is slot only responsible
* for new shard placements belonging to a single node. We check if the
* current slot which is processing the commit should emit
* a target relation Oid.
* Each commit message is processed by all the configured replication slots.
* A replication slot is responsible for shard placements belonging to unique
* table owner and nodeId combination. We check if the current slot which is
* processing the commit should emit a target relation Oid.
*/
if (strcmp(shardSplitInfo->slotName, currentSlotName) == 0 &&
shardSplitInfo->sourceShardOid == sourceShardRelationOid &&
if (shardSplitInfo->sourceShardOid == sourceShardRelationOid &&
shardSplitInfo->shardMinValue <= hashValue &&
shardSplitInfo->shardMaxValue >= hashValue)
{
@ -194,10 +193,11 @@ bool
IsCommitRecursive(Relation sourceShardRelation)
{
Oid sourceShardOid = sourceShardRelation->rd_id;
for (int i = 0; i < shardSplitInfoArraySize; i++)
for (int i = shardSplitInfoForSlot->startIndex; i <= shardSplitInfoForSlot->endIndex;
i++)
{
/* skip the commit when destination is equal to the source */
ShardSplitInfo *shardSplitInfo = &shardSplitInfoArray[i];
ShardSplitInfo *shardSplitInfo = &shardSplitInfoSMHeader->splitInfoArray[i];
if (sourceShardOid == shardSplitInfo->splitChildShardOid)
{
return true;
@ -216,19 +216,22 @@ static void
split_change_cb(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
Relation relation, ReorderBufferChange *change)
{
/*
* Get ShardSplitInfo array from Shared Memory if not already
* initialized. This gets initialized during the replication of
* first message.
*/
if (shardSplitInfoArray == NULL)
if (!is_publishable_relation(relation))
{
shardSplitInfoArray =
GetShardSplitInfoSMArrayForSlot(ctx->slot->data.name.data,
&shardSplitInfoArraySize);
return;
}
/*
* Get ShardSplitInfoForSlot if not already initialized.
* This gets initialized during the replication of first message.
*/
if (shardSplitInfoForSlot == NULL)
{
shardSplitInfoForSlot = PopulateShardSplitInfoForReplicationSlot(
ctx->slot->data.name.data);
shardSplitInfoSMHeader = shardSplitInfoForSlot->shardSplitInfoHeader;
}
/* avoid applying recursive commit */
if (IsCommitRecursive(relation))
{
return;

View File

@ -15,6 +15,16 @@
#include "distributed/shardinterval_utils.h"
#include "distributed/shardsplit_shared_memory.h"
#include "distributed/citus_safe_lib.h"
#include "storage/ipc.h"
#include "utils/memutils.h"
const char *sharedMemoryNameForHandleManagement =
"SHARED_MEMORY_FOR_SPLIT_SHARD_HANDLE_MANAGEMENT";
static shmem_startup_hook_type prev_shmem_startup_hook = NULL;
static
void ShardSplitShmemInit(void);
/* Function declarations */
static ShardSplitInfoSMHeader * AllocateSharedMemoryForShardSplitInfo(int
@ -24,8 +34,6 @@ static ShardSplitInfoSMHeader * AllocateSharedMemoryForShardSplitInfo(int
dsm_handle *
dsmHandle);
static void * ShardSplitInfoSMData(ShardSplitInfoSMHeader *shardSplitInfoSMHeader);
static ShardSplitInfoSMHeader * GetShardSplitInfoSMHeaderFromDSMHandle(dsm_handle
dsmHandle);
@ -64,32 +72,24 @@ GetShardSplitInfoSMHeaderFromDSMHandle(dsm_handle dsmHandle)
/*
* GetShardSplitInfoSMArrayForSlot returns pointer to the array of
* 'ShardSplitInfo' struct stored in the shared memory segment.
* GetShardSplitInfoSMHeader returns pointer to the header of shared memory segment.
*/
ShardSplitInfo *
GetShardSplitInfoSMArrayForSlot(char *slotName, int *shardSplitInfoCount)
ShardSplitInfoSMHeader *
GetShardSplitInfoSMHeader(char *slotName)
{
if (slotName == NULL ||
shardSplitInfoCount == NULL)
if (slotName == NULL)
{
ereport(ERROR,
(errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
errmsg("Expected slot name and array size arguments")));
errmsg("Expected slot name but found NULL")));
}
dsm_handle dsmHandle;
uint32_t nodeId = 0;
decode_replication_slot(slotName, &nodeId, &dsmHandle);
dsm_handle dsmHandle = GetSharedMemoryHandle();
ShardSplitInfoSMHeader *shardSplitInfoSMHeader =
GetShardSplitInfoSMHeaderFromDSMHandle(dsmHandle);
*shardSplitInfoCount = shardSplitInfoSMHeader->shardSplitInfoCount;
ShardSplitInfo *shardSplitInfoArray =
(ShardSplitInfo *) ShardSplitInfoSMData(shardSplitInfoSMHeader);
return shardSplitInfoArray;
return shardSplitInfoSMHeader;
}
@ -114,7 +114,8 @@ AllocateSharedMemoryForShardSplitInfo(int shardSplitInfoCount, Size shardSplitIn
"positive values")));
}
Size totalSize = sizeof(ShardSplitInfoSMHeader) + shardSplitInfoCount *
Size totalSize = offsetof(ShardSplitInfoSMHeader, splitInfoArray) +
shardSplitInfoCount *
shardSplitInfoSize;
dsm_segment *dsmSegment = dsm_create(totalSize, DSM_CREATE_NULL_IF_MAXSEGMENTS);
@ -122,7 +123,7 @@ AllocateSharedMemoryForShardSplitInfo(int shardSplitInfoCount, Size shardSplitIn
{
ereport(ERROR,
(errmsg("could not create a dynamic shared memory segment to "
"keep shard split info")));
"store shard split info")));
}
*dsmHandle = dsm_segment_handle(dsmSegment);
@ -136,7 +137,7 @@ AllocateSharedMemoryForShardSplitInfo(int shardSplitInfoCount, Size shardSplitIn
ShardSplitInfoSMHeader *shardSplitInfoSMHeader =
GetShardSplitInfoSMHeaderFromDSMHandle(*dsmHandle);
shardSplitInfoSMHeader->shardSplitInfoCount = shardSplitInfoCount;
shardSplitInfoSMHeader->count = shardSplitInfoCount;
return shardSplitInfoSMHeader;
}
@ -144,43 +145,27 @@ AllocateSharedMemoryForShardSplitInfo(int shardSplitInfoCount, Size shardSplitIn
/*
* CreateSharedMemoryForShardSplitInfo is a wrapper function which creates shared memory
* for storing shard split infomation. The function returns pointer to the first element
* within this array.
* for storing shard split infomation. The function returns pointer to the header of
* shared memory segment.
*
* shardSplitInfoCount - number of 'ShardSplitInfo ' elements to be allocated
* dsmHandle - handle of the allocated shared memory segment
*/
ShardSplitInfo *
ShardSplitInfoSMHeader *
CreateSharedMemoryForShardSplitInfo(int shardSplitInfoCount, dsm_handle *dsmHandle)
{
ShardSplitInfoSMHeader *shardSplitInfoSMHeader =
AllocateSharedMemoryForShardSplitInfo(shardSplitInfoCount,
sizeof(ShardSplitInfo),
dsmHandle);
ShardSplitInfo *shardSplitInfoSMArray =
(ShardSplitInfo *) ShardSplitInfoSMData(shardSplitInfoSMHeader);
return shardSplitInfoSMArray;
}
/*
* ShardSplitInfoSMData returns a pointer to the array of 'ShardSplitInfo'.
* This is simply the data right after the header, so this function is trivial.
* The main purpose of this function is to make the intent clear to readers
* of the code.
*/
static void *
ShardSplitInfoSMData(ShardSplitInfoSMHeader *shardSplitInfoSMHeader)
{
return shardSplitInfoSMHeader + 1;
return shardSplitInfoSMHeader;
}
/*
* encode_replication_slot returns an encoded replication slot name
* in the following format.
* Slot Name = citus_split_nodeId_sharedMemoryHandle_tableOwnerOid
* Slot Name = citus_split_nodeId_tableOwnerOid
* Max supported length of replication slot name is 64 bytes.
*/
char *
@ -189,51 +174,194 @@ encode_replication_slot(uint32_t nodeId,
uint32_t tableOwnerId)
{
StringInfo slotName = makeStringInfo();
appendStringInfo(slotName, "citus_split_%u_%u_%u", nodeId, dsmHandle, tableOwnerId);
appendStringInfo(slotName, "citus_split_%u_%u", nodeId, tableOwnerId);
if (slotName->len > NAMEDATALEN)
{
ereport(ERROR,
(errmsg(
"Replication Slot name:%s having length:%d is greater than maximum allowed length:%d",
slotName->data, slotName->len, NAMEDATALEN)));
}
return slotName->data;
}
/*
* decode_replication_slot decodes the replication slot name
* into node id, shared memory handle.
* InitializeShardSplitSMHandleManagement requests the necessary shared memory
* from Postgres and sets up the shared memory startup hook.
* This memory is used to store handle of other shared memories allocated during split workflow.
*/
void
decode_replication_slot(char *slotName,
uint32_t *nodeId,
dsm_handle *dsmHandle)
InitializeShardSplitSMHandleManagement(void)
{
int index = 0;
char *strtokPosition = NULL;
char *dupSlotName = pstrdup(slotName);
char *slotNameString = strtok_r(dupSlotName, "_", &strtokPosition);
while (slotNameString != NULL)
prev_shmem_startup_hook = shmem_startup_hook;
shmem_startup_hook = ShardSplitShmemInit;
}
static void
ShardSplitShmemInit(void)
{
bool alreadyInitialized = false;
ShardSplitShmemData *smData = ShmemInitStruct(sharedMemoryNameForHandleManagement,
sizeof(ShardSplitShmemData),
&alreadyInitialized);
if (!alreadyInitialized)
{
/* third part of the slot name is NodeId */
if (index == 2)
{
*nodeId = strtoul(slotNameString, NULL, 10);
}
char *trancheName = "Split_Shard_Setup_Tranche";
/* fourth part of the name is memory handle */
else if (index == 3)
{
*dsmHandle = strtoul(slotNameString, NULL, 10);
}
NamedLWLockTranche *namedLockTranche =
&smData->namedLockTranche;
slotNameString = strtok_r(NULL, "_", &strtokPosition);
index++;
/* start by zeroing out all the memory */
memset(smData, 0,
sizeof(ShardSplitShmemData));
/*Ignoring TableOwnerOid*/
namedLockTranche->trancheId = LWLockNewTrancheId();
LWLockRegisterTranche(namedLockTranche->trancheId, trancheName);
LWLockInitialize(&smData->lock,
namedLockTranche->trancheId);
smData->dsmHandle = DSM_HANDLE_INVALID;
}
/*
* Replication slot name is encoded as citus_split_nodeId_sharedMemoryHandle_tableOwnerOid.
* Hence the number of tokens would be strictly five considering "_" as delimiter.
*/
if (index != 5)
if (prev_shmem_startup_hook != NULL)
{
ereport(ERROR,
(errmsg("Invalid Replication Slot name encoding: %s", slotName)));
prev_shmem_startup_hook();
}
}
/*
* StoreSharedMemoryHandle stores a handle of shared memory
* allocated and populated by 'worker_split_shard_replication_setup' UDF.
*/
void
StoreSharedMemoryHandle(dsm_handle dsmHandle)
{
bool found = false;
ShardSplitShmemData *smData = ShmemInitStruct(sharedMemoryNameForHandleManagement,
sizeof(ShardSplitShmemData),
&found);
if (!found)
{
ereport(ERROR,
errmsg(
"Shared memory for handle management should have been initialized during boot"));
}
LWLockAcquire(&smData->lock, LW_EXCLUSIVE);
/*
* In a normal situation, previously stored handle should have been invalidated
* before the current function is called.
* If this handle is still valid, it means cleanup of previous split shard
* workflow failed. Log a waring and continue the current shard split operation.
*/
if (smData->dsmHandle != DSM_HANDLE_INVALID)
{
ereport(WARNING,
errmsg(
"As a part of split shard workflow,unexpectedly found a valid"
" shared memory handle while storing a new one."));
}
/* Store the incoming handle */
smData->dsmHandle = dsmHandle;
LWLockRelease(&smData->lock);
}
/*
* GetSharedMemoryHandle returns the shared memory handle stored
* by 'worker_split_shard_replication_setup' UDF. This handle
* is requested by wal sender processes during logical replication phase.
*/
dsm_handle
GetSharedMemoryHandle(void)
{
bool found = false;
ShardSplitShmemData *smData = ShmemInitStruct(sharedMemoryNameForHandleManagement,
sizeof(ShardSplitShmemData),
&found);
if (!found)
{
ereport(ERROR,
errmsg(
"Shared memory for handle management should have been initialized during boot"));
}
LWLockAcquire(&smData->lock, LW_SHARED);
dsm_handle dsmHandle = smData->dsmHandle;
LWLockRelease(&smData->lock);
return dsmHandle;
}
/*
* PopulateShardSplitInfoForReplicationSlot function traverses 'ShardSplitInfo' array
* stored within shared memory segment. It returns the starting and ending index position
* of a given slot within this array. When the given replication slot processes a commit,
* traversal is only limited within this bound thus enhancing performance.
*/
ShardSplitInfoForReplicationSlot *
PopulateShardSplitInfoForReplicationSlot(char *slotName)
{
ShardSplitInfoSMHeader *smHeader = GetShardSplitInfoSMHeader(slotName);
MemoryContext oldContext = MemoryContextSwitchTo(TopMemoryContext);
ShardSplitInfoForReplicationSlot *infoForReplicationSlot =
(ShardSplitInfoForReplicationSlot *) palloc(
sizeof(ShardSplitInfoForReplicationSlot));
infoForReplicationSlot->shardSplitInfoHeader = smHeader;
infoForReplicationSlot->startIndex = -1;
infoForReplicationSlot->endIndex = -1;
int index = 0;
while (index < smHeader->count)
{
if (strcmp(smHeader->splitInfoArray[index].slotName, slotName) == 0)
{
/* Found the starting index from where current slot information begins */
infoForReplicationSlot->startIndex = index;
/* Slide forward to get the end index */
index++;
while (index < smHeader->count && strcmp(
smHeader->splitInfoArray[index].slotName, slotName) == 0)
{
index++;
}
infoForReplicationSlot->endIndex = index - 1;
/*
* 'ShardSplitInfo' with same slot name are stored contiguously in shared memory segment.
* After the current 'index' position, we should not encounter any 'ShardSplitInfo' with incoming slot name.
* If this happens, there is shared memory corruption. Its worth to go ahead and assert for this assumption.
* TODO: Traverse further and assert
*/
}
index++;
}
if (infoForReplicationSlot->startIndex == -1)
{
ereport(ERROR,
(errmsg("Unexpectedly could not find information "
"corresponding to replication slot name:%s in shared memory.",
slotName)));
}
MemoryContextSwitchTo(oldContext);
return infoForReplicationSlot;
}

View File

@ -103,6 +103,8 @@
#include "utils/syscache.h"
#include "utils/varlena.h"
#include "distributed/shardsplit_shared_memory.h"
#include "columnar/columnar.h"
ColumnarSupportsIndexAM_type extern_ColumnarSupportsIndexAM = NULL;
@ -376,6 +378,9 @@ _PG_init(void)
InitializeSharedConnectionStats();
InitializeLocallyReservedSharedConnections();
/* initialize shard split shared memory handle management */
InitializeShardSplitSMHandleManagement();
/* enable modification of pg_catalog tables during pg_upgrade */
if (IsBinaryUpgrade)
{

View File

@ -1,7 +1,19 @@
DROP TYPE IF EXISTS citus.split_shard_info;
DROP FUNCTION IF EXISTS pg_catalog.worker_split_shard_replication_setup;
CREATE TYPE citus.split_shard_info AS (
source_shard_id bigint,
child_shard_id bigint,
shard_min_value integer,
shard_max_value integer,
node_id integer);
CREATE OR REPLACE FUNCTION pg_catalog.worker_split_shard_replication_setup(
shardInfo bigint[][])
splitShardInfo citus.split_shard_info[])
RETURNS bigint
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$worker_split_shard_replication_setup$$;
COMMENT ON FUNCTION pg_catalog.worker_split_shard_replication_setup(shardInfo bigint[][])
COMMENT ON FUNCTION pg_catalog.worker_split_shard_replication_setup(splitShardInfo citus.split_shard_info[])
IS 'Replication setup for splitting a shard'

View File

@ -1,7 +1,19 @@
DROP TYPE IF EXISTS citus.split_shard_info;
DROP FUNCTION IF EXISTS pg_catalog.worker_split_shard_replication_setup;
CREATE TYPE citus.split_shard_info AS (
source_shard_id bigint,
child_shard_id bigint,
shard_min_value integer,
shard_max_value integer,
node_id integer);
CREATE OR REPLACE FUNCTION pg_catalog.worker_split_shard_replication_setup(
shardInfo bigint[][])
splitShardInfo citus.split_shard_info[])
RETURNS bigint
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$worker_split_shard_replication_setup$$;
COMMENT ON FUNCTION pg_catalog.worker_split_shard_replication_setup(shardInfo bigint[][])
COMMENT ON FUNCTION pg_catalog.worker_split_shard_replication_setup(splitShardInfo citus.split_shard_info[])
IS 'Replication setup for splitting a shard'

View File

@ -21,24 +21,52 @@
*/
typedef struct ShardSplitInfoSMHeader
{
int shardSplitInfoCount; /* number of elements in the shared memory */
int count; /* number of elements in the shared memory */
ShardSplitInfo splitInfoArray[FLEXIBLE_ARRAY_MEMBER];
} ShardSplitInfoSMHeader;
/*
* Shard split information is populated and stored in shared memory in the form of one dimensional
* array by 'worker_split_shard_replication_setup'. Information belonging to same replication
* slot is grouped together and stored contiguously within this array.
* 'ShardSplitInfoForReplicationSlot' stores the starting and ending indices for a particular
* replication slot within shared memory segment.
* When a slot processes a commit, traversing only within this boundary of shared memory segment
* improves performance.
*/
typedef struct ShardSplitInfoForReplicationSlot
{
ShardSplitInfoSMHeader *shardSplitInfoHeader; /* shared memory segment header */
int startIndex; /* starting index for a given slot */
int endIndex; /* ending index for a given slot */
} ShardSplitInfoForReplicationSlot;
/* Functions for creating and accessing shared memory segments */
extern ShardSplitInfo * CreateSharedMemoryForShardSplitInfo(int shardSplitInfoCount,
dsm_handle *dsmHandle);
typedef struct ShardSplitShmemData
{
int trancheId;
NamedLWLockTranche namedLockTranche;
LWLock lock;
extern ShardSplitInfo * GetShardSplitInfoSMArrayForSlot(char *slotName,
int *shardSplitInfoCount);
dsm_handle dsmHandle;
} ShardSplitShmemData;
/* Functions for creating and accessing shared memory used for dsm handle managment */
void InitializeShardSplitSMHandleManagement(void);
void StoreSharedMemoryHandle(dsm_handle dsmHandle);
dsm_handle GetSharedMemoryHandle(void);
/* Functions for creating and accessing shared memory segments consisting shard split information */
extern ShardSplitInfoSMHeader * CreateSharedMemoryForShardSplitInfo(int
shardSplitInfoCount,
dsm_handle *dsmHandle);
extern ShardSplitInfoSMHeader * GetShardSplitInfoSMHeader(char *slotName);
extern ShardSplitInfoForReplicationSlot * PopulateShardSplitInfoForReplicationSlot(
char *slotName);
/* Functions related to encoding-decoding for replication slot name */
char * encode_replication_slot(uint32_t nodeId,
dsm_handle dsmHandle,
uint32_t tableOwnerId);
void decode_replication_slot(char *slotName,
uint32_t *nodeId,
dsm_handle *dsmHandle);
#endif /* SHARDSPLIT_SHARED_MEMORY_H */

View File

@ -5,6 +5,5 @@ test: multi_cluster_management
test: multi_test_catalog_views
test: tablespace
# Split tests go here.
test: split_shard_test_helpers
test: citus_split_shard_by_split_points_negative
test: citus_split_shard_by_split_points

View File

@ -62,44 +62,49 @@ CREATE TABLE table_second_9(id bigserial PRIMARY KEY, value char);
-- Create publication at worker1
\c - postgres - :worker_1_port
SET search_path TO split_shard_replication_setup_schema;
BEGIN;
CREATE PUBLICATION PUB1 for table table_first_4, table_first_5, table_first_6;
COMMIT;
BEGIN;
CREATE PUBLICATION PUB2 for table table_second_7, table_second_8, table_second_9;
COMMIT;
BEGIN;
select 1 from public.create_replication_slot_for_colocated_shards(:worker_2_node, :worker_2_node);
?column?
---------------------------------------------------------------------
1
(1 row)
COMMIT;
CREATE PUBLICATION pub1 FOR TABLE table_first_4, table_first_5, table_first_6;
CREATE PUBLICATION pub2 FOR TABLE table_second_7, table_second_8, table_second_9;
SELECT worker_split_shard_replication_setup(ARRAY[
ROW(4, 5, -2147483648,-1, :worker_2_node)::citus.split_shard_info,
ROW(4, 6, 0 ,2147483647, :worker_2_node)::citus.split_shard_info,
ROW(7, 8, -2147483648,-1, :worker_2_node)::citus.split_shard_info,
ROW(7, 9, 0, 2147483647 , :worker_2_node)::citus.split_shard_info
]) AS shared_memory_id \gset
WARNING: As a part of split shard workflow,unexpectedly found a valid shared memory handle while storing a new one.
SELECT relowner AS table_owner_one FROM pg_class WHERE relname='table_first' \gset
SELECT relowner AS table_owner_two FROM pg_class WHERE relname='table_second' \gset
SELECT slot_name AS slot_for_first_owner FROM pg_create_logical_replication_slot(FORMAT('citus_split_%s_%s', :worker_2_node, :table_owner_one), 'decoding_plugin_for_shard_split') \gset
SELECT slot_name AS slot_for_second_owner FROM pg_create_logical_replication_slot(FORMAT('citus_split_%s_%s', :worker_2_node, :table_owner_two), 'decoding_plugin_for_shard_split') \gset
SELECT pg_sleep(5);
pg_sleep
---------------------------------------------------------------------
(1 row)
-- Create subscription at worker2 with copy_data to 'false' and derived replication slot name
-- Create subscription at worker2 with copy_data to 'false'
\c - postgres - :worker_2_port
SET search_path TO split_shard_replication_setup_schema;
SET client_min_messages TO WARNING;
BEGIN;
SELECT 1 from public.create_subscription_for_owner_one(:worker_2_node, 'SUB1');
?column?
CREATE SUBSCRIPTION sub1
CONNECTION 'host=localhost port=xxxxx user=postgres dbname=regression'
PUBLICATION pub1
WITH (
create_slot=false,
enabled=true,
slot_name=:slot_for_first_owner,
copy_data=false);
SELECT pg_sleep(5);
pg_sleep
---------------------------------------------------------------------
1
(1 row)
COMMIT;
\c - myuser - :worker_1_port
SET search_path TO split_shard_replication_setup_schema;
INSERT into table_first_4 values(100, 'a');
INSERT into table_first_4 values(400, 'a');
INSERT into table_first_4 values(500, 'a');
select pg_sleep(2);
INSERT INTO table_first_4 VALUES(100, 'a');
INSERT INTO table_first_4 VALUES(400, 'a');
INSERT INTO table_first_4 VALUES(500, 'a');
SELECT pg_sleep(2);
pg_sleep
---------------------------------------------------------------------
@ -107,16 +112,16 @@ select pg_sleep(2);
\c - admin_user - :worker_1_port
SET search_path TO split_shard_replication_setup_schema;
INSERT INTO table_second_7 values(100, 'a');
INSERT INTO table_second_7 values(400, 'a');
SELECT * from table_second_7;
INSERT INTO table_second_7 VALUES(100, 'a');
INSERT INTO table_second_7 VALUES(400, 'a');
SELECT * FROM table_second_7;
id | value
---------------------------------------------------------------------
100 | a
400 | a
(2 rows)
select pg_sleep(2);
SELECT pg_sleep(2);
pg_sleep
---------------------------------------------------------------------
@ -124,18 +129,18 @@ select pg_sleep(2);
\c - myuser - :worker_2_port
SET search_path TO split_shard_replication_setup_schema;
SELECT * from table_first_4;
SELECT * FROM table_first_4;
id | value
---------------------------------------------------------------------
(0 rows)
SELECT * from table_first_5;
SELECT * FROM table_first_5;
id | value
---------------------------------------------------------------------
400 | a
(1 row)
SELECT * from table_first_6;
SELECT * FROM table_first_6;
id | value
---------------------------------------------------------------------
100 | a
@ -145,17 +150,17 @@ SELECT * from table_first_6;
-- should have zero rows in all the below tables as the subscription is not yet created for admin_user
\c - admin_user - :worker_2_port
SET search_path TO split_shard_replication_setup_schema;
SELECT * from table_second_7;
SELECT * FROM table_second_7;
id | value
---------------------------------------------------------------------
(0 rows)
SELECT * from table_second_8;
SELECT * FROM table_second_8;
id | value
---------------------------------------------------------------------
(0 rows)
SELECT * from table_second_9;
SELECT * FROM table_second_9;
id | value
---------------------------------------------------------------------
(0 rows)
@ -163,15 +168,15 @@ SELECT * from table_second_9;
\c - postgres - :worker_2_port
SET search_path TO split_shard_replication_setup_schema;
SET client_min_messages TO WARNING;
BEGIN;
SELECT 1 from public.create_subscription_for_owner_two(:worker_2_node, 'SUB2');
?column?
---------------------------------------------------------------------
1
(1 row)
COMMIT;
select pg_sleep(5);
CREATE SUBSCRIPTION sub2
CONNECTION 'host=localhost port=xxxxx user=postgres dbname=regression'
PUBLICATION pub2
WITH (
create_slot=false,
enabled=true,
slot_name=:slot_for_second_owner,
copy_data=false);
SELECT pg_sleep(5);
pg_sleep
---------------------------------------------------------------------
@ -180,18 +185,18 @@ select pg_sleep(5);
-- expect data
\c - admin_user - :worker_2_port
SET search_path TO split_shard_replication_setup_schema;
SELECT * from table_second_7;
SELECT * FROM table_second_7;
id | value
---------------------------------------------------------------------
(0 rows)
SELECT * from table_second_8;
SELECT * FROM table_second_8;
id | value
---------------------------------------------------------------------
400 | a
(1 row)
SELECT * from table_second_9;
SELECT * FROM table_second_9;
id | value
---------------------------------------------------------------------
100 | a

View File

@ -49,34 +49,26 @@ CREATE TABLE table_to_split_3(id bigserial PRIMARY KEY, value char);
-- This is needed for Pub/Sub framework to work.
\c - - - :worker_1_port
SET search_path TO split_shard_replication_setup_schema;
BEGIN;
CREATE TABLE table_to_split_2(id bigserial PRIMARY KEY, value char);
CREATE TABLE table_to_split_3(id bigserial PRIMARY KEY, value char);
COMMIT;
CREATE TABLE table_to_split_2(id bigserial PRIMARY KEY, value char);
CREATE TABLE table_to_split_3(id bigserial PRIMARY KEY, value char);
-- Create publication at worker1
BEGIN;
CREATE PUBLICATION PUB1 for table table_to_split_1, table_to_split_2, table_to_split_3;
COMMIT;
-- Create replication slot for target node worker2
BEGIN;
select 1 from public.create_replication_slot(:worker_2_node, :worker_2_node);
?column?
---------------------------------------------------------------------
1
(1 row)
COMMIT;
CREATE PUBLICATION pub1 FOR TABLE table_to_split_1, table_to_split_2, table_to_split_3;
SELECT worker_split_shard_replication_setup(ARRAY[
ROW(1,2,-2147483648,-1, :worker_2_node)::citus.split_shard_info,
ROW(1,3,0,2147483647, :worker_2_node)::citus.split_shard_info
]) AS shared_memory_id \gset
SELECT slot_name FROM pg_create_logical_replication_slot(FORMAT('citus_split_%s_10', :worker_2_node), 'decoding_plugin_for_shard_split') \gset
-- Create subscription at worker2 with copy_data to 'false' and derived replication slot name
\c - - - :worker_2_port
SET search_path TO split_shard_replication_setup_schema;
BEGIN;
SELECT 1 from public.create_subscription(:worker_2_node, 'SUB1');
?column?
---------------------------------------------------------------------
1
(1 row)
COMMIT;
CREATE SUBSCRIPTION sub1
CONNECTION 'host=localhost port=xxxxx user=postgres dbname=regression'
PUBLICATION pub1
WITH (
create_slot=false,
enabled=true,
slot_name=:slot_name,
copy_data=false);
select pg_sleep(5);
pg_sleep
---------------------------------------------------------------------
@ -84,17 +76,17 @@ select pg_sleep(5);
(1 row)
-- No data is present at this moment in all the below tables at worker2
SELECT * from table_to_split_1;
SELECT * FROM table_to_split_1;
id | value
---------------------------------------------------------------------
(0 rows)
SELECT * from table_to_split_2;
SELECT * FROM table_to_split_2;
id | value
---------------------------------------------------------------------
(0 rows)
SELECT * from table_to_split_3;
SELECT * FROM table_to_split_3;
id | value
---------------------------------------------------------------------
(0 rows)
@ -102,10 +94,10 @@ SELECT * from table_to_split_3;
-- Insert data in table_to_split_1 at worker1
\c - - - :worker_1_port
SET search_path TO split_shard_replication_setup_schema;
INSERT into table_to_split_1 values(100, 'a');
INSERT into table_to_split_1 values(400, 'a');
INSERT into table_to_split_1 values(500, 'a');
SELECT * from table_to_split_1;
INSERT INTO table_to_split_1 values(100, 'a');
INSERT INTO table_to_split_1 values(400, 'a');
INSERT INTO table_to_split_1 values(500, 'a');
SELECT * FROM table_to_split_1;
id | value
---------------------------------------------------------------------
100 | a
@ -113,12 +105,12 @@ SELECT * from table_to_split_1;
500 | a
(3 rows)
SELECT * from table_to_split_2;
SELECT * FROM table_to_split_2;
id | value
---------------------------------------------------------------------
(0 rows)
SELECT * from table_to_split_3;
SELECT * FROM table_to_split_3;
id | value
---------------------------------------------------------------------
(0 rows)
@ -132,18 +124,18 @@ select pg_sleep(2);
-- Expect data to be present in shard xxxxx and shard xxxxx based on the hash value.
\c - - - :worker_2_port
SET search_path TO split_shard_replication_setup_schema;
SELECT * from table_to_split_1; -- should alwasy have zero rows
SELECT * FROM table_to_split_1; -- should alwasy have zero rows
id | value
---------------------------------------------------------------------
(0 rows)
SELECT * from table_to_split_2;
SELECT * FROM table_to_split_2;
id | value
---------------------------------------------------------------------
400 | a
(1 row)
SELECT * from table_to_split_3;
SELECT * FROM table_to_split_3;
id | value
---------------------------------------------------------------------
100 | a
@ -153,9 +145,9 @@ SELECT * from table_to_split_3;
-- UPDATE data of table_to_split_1 from worker1
\c - - - :worker_1_port
SET search_path TO split_shard_replication_setup_schema;
UPDATE table_to_split_1 SET value='b' where id = 100;
UPDATE table_to_split_1 SET value='b' where id = 400;
UPDATE table_to_split_1 SET value='b' where id = 500;
UPDATE table_to_split_1 SET value='b' WHERE id = 100;
UPDATE table_to_split_1 SET value='b' WHERE id = 400;
UPDATE table_to_split_1 SET value='b' WHERE id = 500;
SELECT pg_sleep(2);
pg_sleep
---------------------------------------------------------------------
@ -213,259 +205,10 @@ SELECT * FROM table_to_split_3;
-- drop publication from worker1
\c - - - :worker_1_port
SET search_path TO split_shard_replication_setup_schema;
drop PUBLICATION PUB1;
DROP PUBLICATION pub1;
DELETE FROM slotName_table;
\c - - - :worker_2_port
SET search_path TO split_shard_replication_setup_schema;
SET client_min_messages TO WARNING;
DROP SUBSCRIPTION SUB1;
SET client_min_messages TO ERROR;
DROP SUBSCRIPTION sub1;
DELETE FROM slotName_table;
-- Test scenario two starts from here
-- 1. table_to_split_1 is split into table_to_split_2 and table_to_split_3.
-- 2. table_to_split_1 is located on worker1.
-- 3. table_to_split_2 is located on worker1 and table_to_split_3 is located on worker2
\c - - - :worker_1_port
SET search_path TO split_shard_replication_setup_schema;
-- Create publication at worker1
BEGIN;
CREATE PUBLICATION PUB1 for table table_to_split_1, table_to_split_2, table_to_split_3;
COMMIT;
-- Create replication slots for two target nodes worker1 and worker2.
-- Worker1 is target for table_to_split_2 and Worker2 is target for table_to_split_3
BEGIN;
select 1 from public.create_replication_slot(:worker_1_node, :worker_2_node);
?column?
---------------------------------------------------------------------
1
(1 row)
COMMIT;
-- Create subscription at worker1 with copy_data to 'false' and derived replication slot name
BEGIN;
SELECT 1 from public.create_subscription(:worker_1_node, 'SUB1');
?column?
---------------------------------------------------------------------
1
(1 row)
COMMIT;
select pg_sleep(5);
pg_sleep
---------------------------------------------------------------------
(1 row)
\c - - - :worker_2_port
SET search_path TO split_shard_replication_setup_schema;
-- Create subscription at worker2 with copy_data to 'false' and derived replication slot name
BEGIN;
SELECT 1 from public.create_subscription(:worker_2_node, 'SUB2');
?column?
---------------------------------------------------------------------
1
(1 row)
COMMIT;
select pg_sleep(5);
pg_sleep
---------------------------------------------------------------------
(1 row)
-- No data is present at this moment in all the below tables at worker2
SELECT * from table_to_split_1;
id | value
---------------------------------------------------------------------
(0 rows)
SELECT * from table_to_split_2;
id | value
---------------------------------------------------------------------
(0 rows)
SELECT * from table_to_split_3;
id | value
---------------------------------------------------------------------
(0 rows)
-- Insert data in table_to_split_1 at worker1
\c - - - :worker_1_port
SET search_path TO split_shard_replication_setup_schema;
INSERT into table_to_split_1 values(100, 'a');
INSERT into table_to_split_1 values(400, 'a');
INSERT into table_to_split_1 values(500, 'a');
UPDATE table_to_split_1 SET value='b' where id = 400;
select pg_sleep(5);
pg_sleep
---------------------------------------------------------------------
(1 row)
-- expect data to present in table_to_split_2 on worker1 as its destination for value '400'
SELECT * from table_to_split_1;
id | value
---------------------------------------------------------------------
100 | a
500 | a
400 | b
(3 rows)
SELECT * from table_to_split_2;
id | value
---------------------------------------------------------------------
400 | b
(1 row)
SELECT * from table_to_split_3;
id | value
---------------------------------------------------------------------
(0 rows)
-- Expect data to be present only in table_to_split3 on worker2
\c - - - :worker_2_port
SET search_path TO split_shard_replication_setup_schema;
SELECT * from table_to_split_1;
id | value
---------------------------------------------------------------------
(0 rows)
SELECT * from table_to_split_2;
id | value
---------------------------------------------------------------------
(0 rows)
SELECT * from table_to_split_3;
id | value
---------------------------------------------------------------------
100 | a
500 | a
(2 rows)
-- delete all from table_to_split_1
\c - - - :worker_1_port
SET search_path TO split_shard_replication_setup_schema;
DELETE FROM table_to_split_1;
SELECT pg_sleep(5);
pg_sleep
---------------------------------------------------------------------
(1 row)
-- rows from table_to_split_2 should be deleted
SELECT * from table_to_split_2;
id | value
---------------------------------------------------------------------
(0 rows)
-- rows from table_to_split_3 should be deleted
\c - - - :worker_2_port
SET search_path TO split_shard_replication_setup_schema;
SELECT * from table_to_split_3;
id | value
---------------------------------------------------------------------
(0 rows)
-- drop publication from worker1
\c - - - :worker_1_port
SET search_path TO split_shard_replication_setup_schema;
SET client_min_messages TO WARNING;
DROP PUBLICATION PUB1;
DROP SUBSCRIPTION SUB1;
DELETE FROM slotName_table;
\c - - - :worker_2_port
SET search_path TO split_shard_replication_setup_schema;
SET client_min_messages TO WARNING;
DROP SUBSCRIPTION SUB2;
DELETE FROM slotName_table;
-- Test scenario three starts from here (parent shard and child shards are located on same machine)
-- 1. table_to_split_1 is split into table_to_split_2 and table_to_split_3.
-- 2. table_to_split_1 is located on worker1.
-- 3. table_to_split_2 and table_to_split_3 are located on worker1
\c - - - :worker_1_port
SET search_path TO split_shard_replication_setup_schema;
SET client_min_messages TO WARNING;
-- Create publication at worker1
BEGIN;
CREATE PUBLICATION PUB1 for table table_to_split_1, table_to_split_2, table_to_split_3;
COMMIT;
-- Worker1 is target for table_to_split_2 and table_to_split_3
BEGIN;
select 1 from public.create_replication_slot(:worker_1_node, :worker_1_node);
?column?
---------------------------------------------------------------------
1
(1 row)
COMMIT;
-- Create subscription at worker1 with copy_data to 'false' and derived replication slot name
BEGIN;
SELECT 1 from public.create_subscription(:worker_1_node, 'SUB1');
?column?
---------------------------------------------------------------------
1
(1 row)
COMMIT;
SELECT pg_sleep(5);
pg_sleep
---------------------------------------------------------------------
(1 row)
INSERT into table_to_split_1 values(100, 'a');
INSERT into table_to_split_1 values(400, 'a');
INSERT into table_to_split_1 values(500, 'a');
select pg_sleep(5);
pg_sleep
---------------------------------------------------------------------
(1 row)
-- expect data to present in table_to_split_2/3 on worker1
SELECT * from table_to_split_1;
id | value
---------------------------------------------------------------------
100 | a
400 | a
500 | a
(3 rows)
SELECT * from table_to_split_2;
id | value
---------------------------------------------------------------------
400 | a
(1 row)
SELECT * from table_to_split_3;
id | value
---------------------------------------------------------------------
100 | a
500 | a
(2 rows)
DELETE FROM table_to_split_1;
SELECT pg_sleep(5);
pg_sleep
---------------------------------------------------------------------
(1 row)
SELECT * from table_to_split_1;
id | value
---------------------------------------------------------------------
(0 rows)
SELECT * from table_to_split_2;
id | value
---------------------------------------------------------------------
(0 rows)
SELECT * from table_to_split_3;
id | value
---------------------------------------------------------------------
(0 rows)
-- clean up
DROP PUBLICATION PUB1;
DELETE FROM slotName_table;
DROP SUBSCRIPTION SUB1;

View File

@ -0,0 +1,91 @@
-- Test scenario three starts from here (parent shard and child shards are located on same machine)
-- 1. table_to_split_1 is split into table_to_split_2 and table_to_split_3.
-- 2. table_to_split_1 is located on worker1.
-- 3. table_to_split_2 and table_to_split_3 are located on worker1
SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset
SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \gset
\c - - - :worker_1_port
SET search_path TO split_shard_replication_setup_schema;
SET client_min_messages TO ERROR;
-- Create publication at worker1
CREATE PUBLICATION pub1 for table table_to_split_1, table_to_split_2, table_to_split_3;
-- Worker1 is target for table_to_split_2 and table_to_split_3
SELECT worker_split_shard_replication_setup(ARRAY[
ROW(1,2,-2147483648,-1, :worker_1_node)::citus.split_shard_info,
ROW(1,3,0,2147483647, :worker_1_node)::citus.split_shard_info
]) AS shared_memory_id \gset
SELECT slot_name AS local_slot FROM pg_create_logical_replication_slot(FORMAT('citus_split_%s_10', :worker_1_node), 'decoding_plugin_for_shard_split') \gset
-- Create subscription at worker1 with copy_data to 'false' a
BEGIN;
CREATE SUBSCRIPTION local_subscription
CONNECTION 'host=localhost port=xxxxx user=postgres dbname=regression'
PUBLICATION pub1
WITH (
create_slot=false,
enabled=true,
slot_name=:local_slot,
copy_data=false);
COMMIT;
select pg_sleep(5);
pg_sleep
---------------------------------------------------------------------
(1 row)
INSERT INTO table_to_split_1 VALUES(100, 'a');
INSERT INTO table_to_split_1 VALUES(400, 'a');
INSERT INTO table_to_split_1 VALUES(500, 'a');
select pg_sleep(5);
pg_sleep
---------------------------------------------------------------------
(1 row)
-- expect data to present in table_to_split_2/3 on worker1
SELECT * FROM table_to_split_1;
id | value
---------------------------------------------------------------------
100 | a
400 | a
500 | a
(3 rows)
SELECT * FROM table_to_split_2;
id | value
---------------------------------------------------------------------
400 | a
(1 row)
SELECT * FROM table_to_split_3;
id | value
---------------------------------------------------------------------
100 | a
500 | a
(2 rows)
DELETE FROM table_to_split_1;
SELECT pg_sleep(5);
pg_sleep
---------------------------------------------------------------------
(1 row)
SELECT * FROM table_to_split_1;
id | value
---------------------------------------------------------------------
(0 rows)
SELECT * FROM table_to_split_2;
id | value
---------------------------------------------------------------------
(0 rows)
SELECT * FROM table_to_split_3;
id | value
---------------------------------------------------------------------
(0 rows)
-- clean up
DROP SUBSCRIPTION local_subscription;
DROP PUBLICATION pub1;
DELETE FROM slotName_table;

View File

@ -0,0 +1,154 @@
-- Test scenario two starts from here
-- 1. table_to_split_1 is split into table_to_split_2 and table_to_split_3.
-- 2. table_to_split_1 is located on worker1.
-- 3. table_to_split_2 is located on worker1 and table_to_split_3 is located on worker2
SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset
SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \gset
\c - - - :worker_1_port
SET search_path TO split_shard_replication_setup_schema;
-- Create publication at worker1
CREATE PUBLICATION pub1 FOR TABLE table_to_split_1, table_to_split_2, table_to_split_3;
SELECT worker_split_shard_replication_setup(ARRAY[
ROW(1,2,-2147483648,-1, :worker_1_node)::citus.split_shard_info,
ROW(1,3,0,2147483647, :worker_2_node)::citus.split_shard_info
]) AS shared_memory_id \gset
WARNING: As a part of split shard workflow,unexpectedly found a valid shared memory handle while storing a new one.
SELECT slot_name AS slot_for_worker1 FROM pg_create_logical_replication_slot(FORMAT('citus_split_%s_10', :worker_1_node), 'decoding_plugin_for_shard_split') \gset
SELECT slot_name AS slot_for_worker2 FROM pg_create_logical_replication_slot(FORMAT('citus_split_%s_10', :worker_2_node), 'decoding_plugin_for_shard_split') \gset
-- Create subscription at worker1 with copy_data to 'false' and 'slot_for_worker1'
CREATE SUBSCRIPTION sub_worker1
CONNECTION 'host=localhost port=xxxxx user=postgres dbname=regression'
PUBLICATION pub1
WITH (
create_slot=false,
enabled=true,
slot_name=:slot_for_worker1,
copy_data=false);
select pg_sleep(5);
pg_sleep
---------------------------------------------------------------------
(1 row)
\c - - - :worker_2_port
SET search_path TO split_shard_replication_setup_schema;
-- Create subscription at worker2 with copy_data to 'false' and 'slot_for_worker2'
CREATE SUBSCRIPTION sub_worker2
CONNECTION 'host=localhost port=xxxxx user=postgres dbname=regression'
PUBLICATION pub1
WITH (
create_slot=false,
enabled=true,
slot_name=:slot_for_worker2,
copy_data=false);
select pg_sleep(5);
pg_sleep
---------------------------------------------------------------------
(1 row)
-- No data is present at this moment in all the below tables at worker2
SELECT * FROM table_to_split_1;
id | value
---------------------------------------------------------------------
(0 rows)
SELECT * FROM table_to_split_2;
id | value
---------------------------------------------------------------------
(0 rows)
SELECT * FROM table_to_split_3;
id | value
---------------------------------------------------------------------
(0 rows)
-- Insert data in table_to_split_1 at worker1
\c - - - :worker_1_port
SET search_path TO split_shard_replication_setup_schema;
INSERT INTO table_to_split_1 VALUES(100, 'a');
INSERT INTO table_to_split_1 VALUES(400, 'a');
INSERT INTO table_to_split_1 VALUES(500, 'a');
UPDATE table_to_split_1 SET value='b' WHERE id = 400;
select pg_sleep(5);
pg_sleep
---------------------------------------------------------------------
(1 row)
-- expect data to present in table_to_split_2 on worker1 as its destination for value '400'
SELECT * FROM table_to_split_1;
id | value
---------------------------------------------------------------------
100 | a
500 | a
400 | b
(3 rows)
SELECT * FROM table_to_split_2;
id | value
---------------------------------------------------------------------
400 | b
(1 row)
SELECT * FROM table_to_split_3;
id | value
---------------------------------------------------------------------
(0 rows)
-- Expect data to be present only in table_to_split3 on worker2
\c - - - :worker_2_port
SET search_path TO split_shard_replication_setup_schema;
SELECT * FROM table_to_split_1;
id | value
---------------------------------------------------------------------
(0 rows)
SELECT * FROM table_to_split_2;
id | value
---------------------------------------------------------------------
(0 rows)
SELECT * FROM table_to_split_3;
id | value
---------------------------------------------------------------------
100 | a
500 | a
(2 rows)
-- delete all from table_to_split_1
\c - - - :worker_1_port
SET search_path TO split_shard_replication_setup_schema;
DELETE FROM table_to_split_1;
SELECT pg_sleep(5);
pg_sleep
---------------------------------------------------------------------
(1 row)
-- rows from table_to_split_2 should be deleted
SELECT * FROM table_to_split_2;
id | value
---------------------------------------------------------------------
(0 rows)
-- rows from table_to_split_3 should be deleted
\c - - - :worker_2_port
SET search_path TO split_shard_replication_setup_schema;
SELECT * FROM table_to_split_3;
id | value
---------------------------------------------------------------------
(0 rows)
\c - - - :worker_2_port
SET search_path TO split_shard_replication_setup_schema;
SET client_min_messages TO ERROR;
DROP SUBSCRIPTION sub_worker2;
DELETE FROM slotName_table;
-- drop publication from worker1
\c - - - :worker_1_port
SET search_path TO split_shard_replication_setup_schema;
SET client_min_messages TO ERROR;
DROP SUBSCRIPTION sub_worker1;
DROP PUBLICATION pub1;
DELETE FROM slotName_table;

View File

@ -63,66 +63,82 @@ CREATE TABLE table_second_9(id bigserial PRIMARY KEY, value char);
-- Create publication at worker1
\c - postgres - :worker_1_port
SET search_path TO split_shard_replication_setup_schema;
BEGIN;
CREATE PUBLICATION PUB1 for table table_first_4, table_first_5, table_first_6;
COMMIT;
CREATE PUBLICATION pub1 FOR TABLE table_first_4, table_first_5, table_first_6;
CREATE PUBLICATION pub2 FOR TABLE table_second_7, table_second_8, table_second_9;
BEGIN;
CREATE PUBLICATION PUB2 for table table_second_7, table_second_8, table_second_9;
COMMIT;
SELECT worker_split_shard_replication_setup(ARRAY[
ROW(4, 5, -2147483648,-1, :worker_2_node)::citus.split_shard_info,
ROW(4, 6, 0 ,2147483647, :worker_2_node)::citus.split_shard_info,
ROW(7, 8, -2147483648,-1, :worker_2_node)::citus.split_shard_info,
ROW(7, 9, 0, 2147483647 , :worker_2_node)::citus.split_shard_info
]) AS shared_memory_id \gset
SELECT relowner AS table_owner_one FROM pg_class WHERE relname='table_first' \gset
SELECT relowner AS table_owner_two FROM pg_class WHERE relname='table_second' \gset
SELECT slot_name AS slot_for_first_owner FROM pg_create_logical_replication_slot(FORMAT('citus_split_%s_%s', :worker_2_node, :table_owner_one), 'decoding_plugin_for_shard_split') \gset
SELECT slot_name AS slot_for_second_owner FROM pg_create_logical_replication_slot(FORMAT('citus_split_%s_%s', :worker_2_node, :table_owner_two), 'decoding_plugin_for_shard_split') \gset
BEGIN;
select 1 from public.create_replication_slot_for_colocated_shards(:worker_2_node, :worker_2_node);
COMMIT;
SELECT pg_sleep(5);
-- Create subscription at worker2 with copy_data to 'false' and derived replication slot name
-- Create subscription at worker2 with copy_data to 'false'
\c - postgres - :worker_2_port
SET search_path TO split_shard_replication_setup_schema;
SET client_min_messages TO WARNING;
BEGIN;
SELECT 1 from public.create_subscription_for_owner_one(:worker_2_node, 'SUB1');
COMMIT;
CREATE SUBSCRIPTION sub1
CONNECTION 'host=localhost port=57637 user=postgres dbname=regression'
PUBLICATION pub1
WITH (
create_slot=false,
enabled=true,
slot_name=:slot_for_first_owner,
copy_data=false);
SELECT pg_sleep(5);
\c - myuser - :worker_1_port
SET search_path TO split_shard_replication_setup_schema;
INSERT into table_first_4 values(100, 'a');
INSERT into table_first_4 values(400, 'a');
INSERT into table_first_4 values(500, 'a');
select pg_sleep(2);
INSERT INTO table_first_4 VALUES(100, 'a');
INSERT INTO table_first_4 VALUES(400, 'a');
INSERT INTO table_first_4 VALUES(500, 'a');
SELECT pg_sleep(2);
\c - admin_user - :worker_1_port
SET search_path TO split_shard_replication_setup_schema;
INSERT INTO table_second_7 values(100, 'a');
INSERT INTO table_second_7 values(400, 'a');
SELECT * from table_second_7;
select pg_sleep(2);
INSERT INTO table_second_7 VALUES(100, 'a');
INSERT INTO table_second_7 VALUES(400, 'a');
SELECT * FROM table_second_7;
SELECT pg_sleep(2);
\c - myuser - :worker_2_port
SET search_path TO split_shard_replication_setup_schema;
SELECT * from table_first_4;
SELECT * from table_first_5;
SELECT * from table_first_6;
SELECT * FROM table_first_4;
SELECT * FROM table_first_5;
SELECT * FROM table_first_6;
-- should have zero rows in all the below tables as the subscription is not yet created for admin_user
\c - admin_user - :worker_2_port
SET search_path TO split_shard_replication_setup_schema;
SELECT * from table_second_7;
SELECT * from table_second_8;
SELECT * from table_second_9;
SELECT * FROM table_second_7;
SELECT * FROM table_second_8;
SELECT * FROM table_second_9;
\c - postgres - :worker_2_port
SET search_path TO split_shard_replication_setup_schema;
SET client_min_messages TO WARNING;
BEGIN;
SELECT 1 from public.create_subscription_for_owner_two(:worker_2_node, 'SUB2');
COMMIT;
select pg_sleep(5);
CREATE SUBSCRIPTION sub2
CONNECTION 'host=localhost port=57637 user=postgres dbname=regression'
PUBLICATION pub2
WITH (
create_slot=false,
enabled=true,
slot_name=:slot_for_second_owner,
copy_data=false);
SELECT pg_sleep(5);
-- expect data
\c - admin_user - :worker_2_port
SET search_path TO split_shard_replication_setup_schema;
SELECT * from table_second_7;
SELECT * from table_second_8;
SELECT * from table_second_9;
SELECT * FROM table_second_7;
SELECT * FROM table_second_8;
SELECT * FROM table_second_9;

View File

@ -45,59 +45,65 @@ CREATE TABLE table_to_split_3(id bigserial PRIMARY KEY, value char);
-- This is needed for Pub/Sub framework to work.
\c - - - :worker_1_port
SET search_path TO split_shard_replication_setup_schema;
BEGIN;
CREATE TABLE table_to_split_2(id bigserial PRIMARY KEY, value char);
CREATE TABLE table_to_split_3(id bigserial PRIMARY KEY, value char);
COMMIT;
CREATE TABLE table_to_split_2(id bigserial PRIMARY KEY, value char);
CREATE TABLE table_to_split_3(id bigserial PRIMARY KEY, value char);
-- Create publication at worker1
BEGIN;
CREATE PUBLICATION PUB1 for table table_to_split_1, table_to_split_2, table_to_split_3;
COMMIT;
CREATE PUBLICATION pub1 FOR TABLE table_to_split_1, table_to_split_2, table_to_split_3;
-- Create replication slot for target node worker2
BEGIN;
select 1 from public.create_replication_slot(:worker_2_node, :worker_2_node);
COMMIT;
SELECT worker_split_shard_replication_setup(ARRAY[
ROW(1,2,-2147483648,-1, :worker_2_node)::citus.split_shard_info,
ROW(1,3,0,2147483647, :worker_2_node)::citus.split_shard_info
]) AS shared_memory_id \gset
SELECT slot_name FROM pg_create_logical_replication_slot(FORMAT('citus_split_%s_10', :worker_2_node), 'decoding_plugin_for_shard_split') \gset
-- Create subscription at worker2 with copy_data to 'false' and derived replication slot name
\c - - - :worker_2_port
SET search_path TO split_shard_replication_setup_schema;
BEGIN;
SELECT 1 from public.create_subscription(:worker_2_node, 'SUB1');
COMMIT;
CREATE SUBSCRIPTION sub1
CONNECTION 'host=localhost port=57637 user=postgres dbname=regression'
PUBLICATION pub1
WITH (
create_slot=false,
enabled=true,
slot_name=:slot_name,
copy_data=false);
select pg_sleep(5);
-- No data is present at this moment in all the below tables at worker2
SELECT * from table_to_split_1;
SELECT * from table_to_split_2;
SELECT * from table_to_split_3;
SELECT * FROM table_to_split_1;
SELECT * FROM table_to_split_2;
SELECT * FROM table_to_split_3;
-- Insert data in table_to_split_1 at worker1
\c - - - :worker_1_port
SET search_path TO split_shard_replication_setup_schema;
INSERT into table_to_split_1 values(100, 'a');
INSERT into table_to_split_1 values(400, 'a');
INSERT into table_to_split_1 values(500, 'a');
SELECT * from table_to_split_1;
SELECT * from table_to_split_2;
SELECT * from table_to_split_3;
INSERT INTO table_to_split_1 values(100, 'a');
INSERT INTO table_to_split_1 values(400, 'a');
INSERT INTO table_to_split_1 values(500, 'a');
SELECT * FROM table_to_split_1;
SELECT * FROM table_to_split_2;
SELECT * FROM table_to_split_3;
select pg_sleep(2);
-- Expect data to be present in shard 2 and shard 3 based on the hash value.
\c - - - :worker_2_port
SET search_path TO split_shard_replication_setup_schema;
SELECT * from table_to_split_1; -- should alwasy have zero rows
SELECT * from table_to_split_2;
SELECT * from table_to_split_3;
SELECT * FROM table_to_split_1; -- should alwasy have zero rows
SELECT * FROM table_to_split_2;
SELECT * FROM table_to_split_3;
-- UPDATE data of table_to_split_1 from worker1
\c - - - :worker_1_port
SET search_path TO split_shard_replication_setup_schema;
UPDATE table_to_split_1 SET value='b' where id = 100;
UPDATE table_to_split_1 SET value='b' where id = 400;
UPDATE table_to_split_1 SET value='b' where id = 500;
UPDATE table_to_split_1 SET value='b' WHERE id = 100;
UPDATE table_to_split_1 SET value='b' WHERE id = 400;
UPDATE table_to_split_1 SET value='b' WHERE id = 500;
SELECT pg_sleep(2);
-- Value should be updated in table_to_split_2;
@ -110,6 +116,7 @@ SELECT * FROM table_to_split_3;
\c - - - :worker_1_port
SET search_path TO split_shard_replication_setup_schema;
DELETE FROM table_to_split_1;
SELECT pg_sleep(5);
-- Child shard rows should be deleted
@ -122,144 +129,12 @@ SELECT * FROM table_to_split_3;
-- drop publication from worker1
\c - - - :worker_1_port
SET search_path TO split_shard_replication_setup_schema;
drop PUBLICATION PUB1;
DROP PUBLICATION pub1;
DELETE FROM slotName_table;
\c - - - :worker_2_port
SET search_path TO split_shard_replication_setup_schema;
SET client_min_messages TO WARNING;
DROP SUBSCRIPTION SUB1;
SET client_min_messages TO ERROR;
DROP SUBSCRIPTION sub1;
DELETE FROM slotName_table;
-- Test scenario two starts from here
-- 1. table_to_split_1 is split into table_to_split_2 and table_to_split_3.
-- 2. table_to_split_1 is located on worker1.
-- 3. table_to_split_2 is located on worker1 and table_to_split_3 is located on worker2
\c - - - :worker_1_port
SET search_path TO split_shard_replication_setup_schema;
-- Create publication at worker1
BEGIN;
CREATE PUBLICATION PUB1 for table table_to_split_1, table_to_split_2, table_to_split_3;
COMMIT;
-- Create replication slots for two target nodes worker1 and worker2.
-- Worker1 is target for table_to_split_2 and Worker2 is target for table_to_split_3
BEGIN;
select 1 from public.create_replication_slot(:worker_1_node, :worker_2_node);
COMMIT;
-- Create subscription at worker1 with copy_data to 'false' and derived replication slot name
BEGIN;
SELECT 1 from public.create_subscription(:worker_1_node, 'SUB1');
COMMIT;
select pg_sleep(5);
\c - - - :worker_2_port
SET search_path TO split_shard_replication_setup_schema;
-- Create subscription at worker2 with copy_data to 'false' and derived replication slot name
BEGIN;
SELECT 1 from public.create_subscription(:worker_2_node, 'SUB2');
COMMIT;
select pg_sleep(5);
-- No data is present at this moment in all the below tables at worker2
SELECT * from table_to_split_1;
SELECT * from table_to_split_2;
SELECT * from table_to_split_3;
-- Insert data in table_to_split_1 at worker1
\c - - - :worker_1_port
SET search_path TO split_shard_replication_setup_schema;
INSERT into table_to_split_1 values(100, 'a');
INSERT into table_to_split_1 values(400, 'a');
INSERT into table_to_split_1 values(500, 'a');
UPDATE table_to_split_1 SET value='b' where id = 400;
select pg_sleep(5);
-- expect data to present in table_to_split_2 on worker1 as its destination for value '400'
SELECT * from table_to_split_1;
SELECT * from table_to_split_2;
SELECT * from table_to_split_3;
-- Expect data to be present only in table_to_split3 on worker2
\c - - - :worker_2_port
SET search_path TO split_shard_replication_setup_schema;
SELECT * from table_to_split_1;
SELECT * from table_to_split_2;
SELECT * from table_to_split_3;
-- delete all from table_to_split_1
\c - - - :worker_1_port
SET search_path TO split_shard_replication_setup_schema;
DELETE FROM table_to_split_1;
SELECT pg_sleep(5);
-- rows from table_to_split_2 should be deleted
SELECT * from table_to_split_2;
-- rows from table_to_split_3 should be deleted
\c - - - :worker_2_port
SET search_path TO split_shard_replication_setup_schema;
SELECT * from table_to_split_3;
-- drop publication from worker1
\c - - - :worker_1_port
SET search_path TO split_shard_replication_setup_schema;
SET client_min_messages TO WARNING;
DROP PUBLICATION PUB1;
DROP SUBSCRIPTION SUB1;
DELETE FROM slotName_table;
\c - - - :worker_2_port
SET search_path TO split_shard_replication_setup_schema;
SET client_min_messages TO WARNING;
DROP SUBSCRIPTION SUB2;
DELETE FROM slotName_table;
-- Test scenario three starts from here (parent shard and child shards are located on same machine)
-- 1. table_to_split_1 is split into table_to_split_2 and table_to_split_3.
-- 2. table_to_split_1 is located on worker1.
-- 3. table_to_split_2 and table_to_split_3 are located on worker1
\c - - - :worker_1_port
SET search_path TO split_shard_replication_setup_schema;
SET client_min_messages TO WARNING;
-- Create publication at worker1
BEGIN;
CREATE PUBLICATION PUB1 for table table_to_split_1, table_to_split_2, table_to_split_3;
COMMIT;
-- Worker1 is target for table_to_split_2 and table_to_split_3
BEGIN;
select 1 from public.create_replication_slot(:worker_1_node, :worker_1_node);
COMMIT;
-- Create subscription at worker1 with copy_data to 'false' and derived replication slot name
BEGIN;
SELECT 1 from public.create_subscription(:worker_1_node, 'SUB1');
COMMIT;
SELECT pg_sleep(5);
INSERT into table_to_split_1 values(100, 'a');
INSERT into table_to_split_1 values(400, 'a');
INSERT into table_to_split_1 values(500, 'a');
select pg_sleep(5);
-- expect data to present in table_to_split_2/3 on worker1
SELECT * from table_to_split_1;
SELECT * from table_to_split_2;
SELECT * from table_to_split_3;
DELETE FROM table_to_split_1;
SELECT pg_sleep(5);
SELECT * from table_to_split_1;
SELECT * from table_to_split_2;
SELECT * from table_to_split_3;
-- clean up
DROP PUBLICATION PUB1;
DELETE FROM slotName_table;
DROP SUBSCRIPTION SUB1;

View File

@ -0,0 +1,56 @@
-- Test scenario three starts from here (parent shard and child shards are located on same machine)
-- 1. table_to_split_1 is split into table_to_split_2 and table_to_split_3.
-- 2. table_to_split_1 is located on worker1.
-- 3. table_to_split_2 and table_to_split_3 are located on worker1
SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset
SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \gset
\c - - - :worker_1_port
SET search_path TO split_shard_replication_setup_schema;
SET client_min_messages TO ERROR;
-- Create publication at worker1
CREATE PUBLICATION pub1 for table table_to_split_1, table_to_split_2, table_to_split_3;
-- Worker1 is target for table_to_split_2 and table_to_split_3
SELECT worker_split_shard_replication_setup(ARRAY[
ROW(1,2,-2147483648,-1, :worker_1_node)::citus.split_shard_info,
ROW(1,3,0,2147483647, :worker_1_node)::citus.split_shard_info
]) AS shared_memory_id \gset
SELECT slot_name AS local_slot FROM pg_create_logical_replication_slot(FORMAT('citus_split_%s_10', :worker_1_node), 'decoding_plugin_for_shard_split') \gset
-- Create subscription at worker1 with copy_data to 'false' a
BEGIN;
CREATE SUBSCRIPTION local_subscription
CONNECTION 'host=localhost port=57637 user=postgres dbname=regression'
PUBLICATION pub1
WITH (
create_slot=false,
enabled=true,
slot_name=:local_slot,
copy_data=false);
COMMIT;
select pg_sleep(5);
INSERT INTO table_to_split_1 VALUES(100, 'a');
INSERT INTO table_to_split_1 VALUES(400, 'a');
INSERT INTO table_to_split_1 VALUES(500, 'a');
select pg_sleep(5);
-- expect data to present in table_to_split_2/3 on worker1
SELECT * FROM table_to_split_1;
SELECT * FROM table_to_split_2;
SELECT * FROM table_to_split_3;
DELETE FROM table_to_split_1;
SELECT pg_sleep(5);
SELECT * FROM table_to_split_1;
SELECT * FROM table_to_split_2;
SELECT * FROM table_to_split_3;
-- clean up
DROP SUBSCRIPTION local_subscription;
DROP PUBLICATION pub1;
DELETE FROM slotName_table;

View File

@ -0,0 +1,99 @@
-- Test scenario two starts from here
-- 1. table_to_split_1 is split into table_to_split_2 and table_to_split_3.
-- 2. table_to_split_1 is located on worker1.
-- 3. table_to_split_2 is located on worker1 and table_to_split_3 is located on worker2
SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset
SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \gset
\c - - - :worker_1_port
SET search_path TO split_shard_replication_setup_schema;
-- Create publication at worker1
CREATE PUBLICATION pub1 FOR TABLE table_to_split_1, table_to_split_2, table_to_split_3;
SELECT worker_split_shard_replication_setup(ARRAY[
ROW(1,2,-2147483648,-1, :worker_1_node)::citus.split_shard_info,
ROW(1,3,0,2147483647, :worker_2_node)::citus.split_shard_info
]) AS shared_memory_id \gset
SELECT slot_name AS slot_for_worker1 FROM pg_create_logical_replication_slot(FORMAT('citus_split_%s_10', :worker_1_node), 'decoding_plugin_for_shard_split') \gset
SELECT slot_name AS slot_for_worker2 FROM pg_create_logical_replication_slot(FORMAT('citus_split_%s_10', :worker_2_node), 'decoding_plugin_for_shard_split') \gset
-- Create subscription at worker1 with copy_data to 'false' and 'slot_for_worker1'
CREATE SUBSCRIPTION sub_worker1
CONNECTION 'host=localhost port=57637 user=postgres dbname=regression'
PUBLICATION pub1
WITH (
create_slot=false,
enabled=true,
slot_name=:slot_for_worker1,
copy_data=false);
select pg_sleep(5);
\c - - - :worker_2_port
SET search_path TO split_shard_replication_setup_schema;
-- Create subscription at worker2 with copy_data to 'false' and 'slot_for_worker2'
CREATE SUBSCRIPTION sub_worker2
CONNECTION 'host=localhost port=57637 user=postgres dbname=regression'
PUBLICATION pub1
WITH (
create_slot=false,
enabled=true,
slot_name=:slot_for_worker2,
copy_data=false);
select pg_sleep(5);
-- No data is present at this moment in all the below tables at worker2
SELECT * FROM table_to_split_1;
SELECT * FROM table_to_split_2;
SELECT * FROM table_to_split_3;
-- Insert data in table_to_split_1 at worker1
\c - - - :worker_1_port
SET search_path TO split_shard_replication_setup_schema;
INSERT INTO table_to_split_1 VALUES(100, 'a');
INSERT INTO table_to_split_1 VALUES(400, 'a');
INSERT INTO table_to_split_1 VALUES(500, 'a');
UPDATE table_to_split_1 SET value='b' WHERE id = 400;
select pg_sleep(5);
-- expect data to present in table_to_split_2 on worker1 as its destination for value '400'
SELECT * FROM table_to_split_1;
SELECT * FROM table_to_split_2;
SELECT * FROM table_to_split_3;
-- Expect data to be present only in table_to_split3 on worker2
\c - - - :worker_2_port
SET search_path TO split_shard_replication_setup_schema;
SELECT * FROM table_to_split_1;
SELECT * FROM table_to_split_2;
SELECT * FROM table_to_split_3;
-- delete all from table_to_split_1
\c - - - :worker_1_port
SET search_path TO split_shard_replication_setup_schema;
DELETE FROM table_to_split_1;
SELECT pg_sleep(5);
-- rows from table_to_split_2 should be deleted
SELECT * FROM table_to_split_2;
-- rows from table_to_split_3 should be deleted
\c - - - :worker_2_port
SET search_path TO split_shard_replication_setup_schema;
SELECT * FROM table_to_split_3;
\c - - - :worker_2_port
SET search_path TO split_shard_replication_setup_schema;
SET client_min_messages TO ERROR;
DROP SUBSCRIPTION sub_worker2;
DELETE FROM slotName_table;
-- drop publication from worker1
\c - - - :worker_1_port
SET search_path TO split_shard_replication_setup_schema;
SET client_min_messages TO ERROR;
DROP SUBSCRIPTION sub_worker1;
DROP PUBLICATION pub1;
DELETE FROM slotName_table;

View File

@ -7,7 +7,9 @@ DECLARE
memoryId bigint := 0;
memoryIdText text;
begin
SELECT * into memoryId from worker_split_shard_replication_setup(ARRAY[ARRAY[1,2,-2147483648,-1, targetNode1], ARRAY[1,3,0,2147483647,targetNode2]]);
SELECT * into memoryId from worker_split_shard_replication_setup (
ARRAY[ROW(1,2,-2147483648,-1, targetNode1)::citus.split_shard_info,
ROW(1,3,0,2147483647, targetNode2)::citus.split_shard_info]);
SELECT FORMAT('%s', memoryId) into memoryIdText;
return memoryIdText;
end
@ -23,12 +25,12 @@ DECLARE
begin
SELECT * into sharedMemoryId from public.split_shard_replication_setup_helper(targetNode1, targetNode2);
SELECT FORMAT('citus_split_%s_%s_10', targetNode1, sharedMemoryId) into derivedSlotName;
SELECT FORMAT('citus_split_%s_10', targetNode1) into derivedSlotName;
SELECT slot_name into targetOneSlotName from pg_create_logical_replication_slot(derivedSlotName, 'decoding_plugin_for_shard_split');
-- if new child shards are placed on different nodes, create one more replication slot
if (targetNode1 != targetNode2) then
SELECT FORMAT('citus_split_%s_%s_10', targetNode2, sharedMemoryId) into derivedSlotName;
SELECT FORMAT('citus_split_%s_10', targetNode2) into derivedSlotName;
SELECT slot_name into targetTwoSlotName from pg_create_logical_replication_slot(derivedSlotName, 'decoding_plugin_for_shard_split');
INSERT INTO slotName_table values(targetTwoSlotName, targetNode2, 1);
end if;
@ -47,10 +49,10 @@ DECLARE
begin
SELECT * into memoryId from worker_split_shard_replication_setup(
ARRAY[
ARRAY[4, 5, -2147483648,-1, targetNode1],
ARRAY[4, 6, 0 ,2147483647, targetNode2],
ARRAY[7, 8, -2147483648,-1, targetNode1],
ARRAY[7, 9, 0, 2147483647 , targetNode2]
ROW(4, 5, -2147483648,-1, targetNode1)::citus.split_shard_info,
ROW(4, 6, 0 ,2147483647, targetNode2)::citus.split_shard_info,
ROW(7, 8, -2147483648,-1, targetNode1)::citus.split_shard_info,
ROW(7, 9, 0, 2147483647 , targetNode2)::citus.split_shard_info
]);
SELECT FORMAT('%s', memoryId) into memoryIdText;
@ -73,11 +75,11 @@ begin
SELECT * into sharedMemoryId from public.split_shard_replication_setup_for_colocated_shards(targetNode1, targetNode2);
SELECT relowner into tableOwnerOne from pg_class where relname='table_first';
SELECT FORMAT('citus_split_%s_%s_%s', targetNode1, sharedMemoryId, tableOwnerOne) into derivedSlotNameOne;
SELECT FORMAT('citus_split_%s_%s', targetNode1, tableOwnerOne) into derivedSlotNameOne;
SELECT slot_name into targetOneSlotName from pg_create_logical_replication_slot(derivedSlotNameOne, 'decoding_plugin_for_shard_split');
SELECT relowner into tableOwnerTwo from pg_class where relname='table_second';
SELECT FORMAT('citus_split_%s_%s_%s', targetNode2, sharedMemoryId, tableOwnerTwo) into derivedSlotNameTwo;
SELECT FORMAT('citus_split_%s_%s', targetNode2, tableOwnerTwo) into derivedSlotNameTwo;
SELECT slot_name into targetTwoSlotName from pg_create_logical_replication_slot(derivedSlotNameTwo, 'decoding_plugin_for_shard_split');