Introducing 'split_shard_replication_setup' UDF to handle the catchup phase of

citus_split_shard.
users/saawasek/non_blocking_split_integrated
Sameer Awasekar 2022-05-15 17:31:36 +05:30
parent e29222458c
commit c9844abea9
15 changed files with 1541 additions and 7 deletions

View File

@ -17,12 +17,16 @@ all: extension pg_send_cancellation
# build columnar only
columnar:
$(MAKE) -C src/backend/columnar all
logical_decoding_plugin:
$(MAKE) -C src/backend/distributed/shardsplit all
# build extension
extension: $(citus_top_builddir)/src/include/citus_version.h columnar
$(MAKE) -C src/backend/distributed/ all
install-columnar: columnar
$(MAKE) -C src/backend/columnar install
install-extension: extension install-columnar
install-logical_decoding_plugin: logical_decoding_plugin
$(MAKE) -C src/backend/distributed/shardsplit install
install-extension: extension install-columnar install-logical_decoding_plugin
$(MAKE) -C src/backend/distributed/ install
install-headers: extension
$(MKDIR_P) '$(DESTDIR)$(includedir_server)/distributed/'

View File

@ -18,7 +18,7 @@ generated_downgrade_sql_files += $(patsubst %,$(citus_abs_srcdir)/build/sql/%,$(
DATA_built = $(generated_sql_files)
# directories with source files
SUBDIRS = . commands connection ddl deparser executor metadata operations planner progress relay safeclib test transaction utils worker
SUBDIRS = . commands connection ddl deparser executor metadata operations planner progress relay safeclib shardsplit test transaction utils worker
# enterprise modules
SUBDIRS += replication

View File

@ -0,0 +1,466 @@
/*-------------------------------------------------------------------------
*
* shard_split_replication.c
* This file contains functions to setup information about list of shards
* that are being split.
*
* Copyright (c) Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "nodes/pg_list.h"
#include "utils/array.h"
#include "distributed/utils/array_type.h"
#include "lib/stringinfo.h"
#include "utils/builtins.h"
#include "utils/lsyscache.h"
#include "distributed/colocation_utils.h"
#include "distributed/metadata_cache.h"
#include "distributed/shardinterval_utils.h"
#include "distributed/coordinator_protocol.h"
#include "distributed/connection_management.h"
#include "distributed/remote_commands.h"
#include "distributed/shard_split.h"
#include "distributed/shard_utils.h"
#include "distributed/shardsplit_shared_memory.h"
#include "common/hashfn.h"
#include "safe_str_lib.h"
#include "distributed/citus_safe_lib.h"
/* declarations for dynamic loading */
PG_FUNCTION_INFO_V1(split_shard_replication_setup);
static HTAB *ShardInfoHashMap = NULL;
/* Entry for hash map */
typedef struct NodeShardMappingEntry
{
uint64_t key;
List *shardSplitInfoList;
} NodeShardMappingEntry;
/* Function declarations */
static void ParseShardSplitInfo(ArrayType *shardInfoArrayObject,
int shardSplitInfoIndex,
uint64 *sourceShardId,
uint64 *desShardId,
int32 *minValue,
int32 *maxValue,
int32 *nodeId);
static ShardSplitInfo * GetShardSplitInfo(uint64 sourceShardIdToSplit,
uint64 desSplitChildShardId,
int32 minValue,
int32 maxValue,
int32 nodeId);
static void AddShardSplitInfoEntryForNode(ShardSplitInfo *shardSplitInfo);
static void * CopyShardSplitInfoToSM(ShardSplitInfo *shardSplitInfoArray,
HTAB *shardInfoHashMap,
dsm_handle dsmHandle,
int shardSplitInfoCount);
static HTAB * SetupHashMapForShardInfo();
/*
* split_shard_replication_setup UDF creates in-memory data structures
* to store the meta information about the new shards and their placements
* required during the catch up phase of logical replication.
* This meta information is stored in a shared memory segment and accessed
* by logical decoding plugin.
*
* Split information is given by user as an Array in the below format
* [{sourceShardId, childShardID, minValue, maxValue, Destination NodeId}]
*
* sourceShardId - id of the shard that is undergoing a split
* childShardId - id of shard that stores a specific range of values
* belonging to sourceShardId(parent)
* minValue - lower bound of hash value which childShard stores
*
* maxValue - upper bound of hash value which childShard stores
*
* NodeId - Node where the childShardId is located
*
* The function parses the data and builds routing map per destination node id.
* Multiple shards can be placed on the same destiation node. Source and
* destinations nodes can be same too.
*
* There is a 1-1 mapping between a node and a replication slot as one replication
* slot takes care of replicating changes for one node.
* The 'logical_decoding_plugin' consumes this information and routes the tuple
* from the source shard to the appropriate destination shard that falls in the
* respective range.
*/
Datum
split_shard_replication_setup(PG_FUNCTION_ARGS)
{
ArrayType *shardInfoArrayObject = PG_GETARG_ARRAYTYPE_P(0);
int shardInfoArrayLength = ARR_DIMS(shardInfoArrayObject)[0];
int insideCount = ARR_DIMS(shardInfoArrayObject)[1];
/* SetupMap */
SetupHashMapForShardInfo();
int shardSplitInfoCount = 0;
for (int index = 0; index < shardInfoArrayLength; index++)
{
uint64 sourceShardId = 0;
uint64 desShardId = 0;
int32 minValue = 0;
int32 maxValue = 0;
int32 nodeId = 0;
ParseShardSplitInfo(
shardInfoArrayObject,
index,
&sourceShardId,
&desShardId,
&minValue,
&maxValue,
&nodeId);
ShardSplitInfo *shardSplitInfo = GetShardSplitInfo(
sourceShardId,
desShardId,
minValue,
maxValue,
nodeId);
AddShardSplitInfoEntryForNode(shardSplitInfo);
shardSplitInfoCount++;
}
dsm_handle dsmHandle;
ShardSplitInfo *splitShardInfoSMArray =
GetSharedMemoryForShardSplitInfo(shardSplitInfoCount, &dsmHandle);
CopyShardSplitInfoToSM(splitShardInfoSMArray,
ShardInfoHashMap,
dsmHandle,
shardSplitInfoCount);
return dsmHandle;
}
/*
* SetupHashMapForShardInfo initializes a hash map to store shard split
* information by grouping them node id wise. The key of the hash table
* is 'nodeId' and value is a list of ShardSplitInfo that are placed on
* this particular node.
*/
static HTAB *
SetupHashMapForShardInfo()
{
HASHCTL info;
memset(&info, 0, sizeof(info));
info.keysize = sizeof(uint64_t);
info.entrysize = sizeof(NodeShardMappingEntry);
info.hash = uint32_hash;
info.hcxt = CurrentMemoryContext;
int hashFlags = (HASH_ELEM | HASH_CONTEXT | HASH_FUNCTION);
ShardInfoHashMap = hash_create("ShardInfoMap", 128, &info, hashFlags);
return ShardInfoHashMap;
}
static void
ParseShardSplitInfo(ArrayType *shardInfoArrayObject,
int shardSplitInfoIndex,
uint64 *sourceShardId,
uint64 *desShardId,
int32 *minValue,
int32 *maxValue,
int32 *nodeId)
{
Oid elemtypeId = ARR_ELEMTYPE(shardInfoArrayObject);
int16 elemtypeLength = 0;
bool elemtypeByValue = false;
char elemtypeAlignment = 0;
get_typlenbyvalalign(elemtypeId, &elemtypeLength, &elemtypeByValue,
&elemtypeAlignment);
int elementIndex = 0;
int indexes[] = { shardSplitInfoIndex + 1, elementIndex + 1 };
bool isNull = false;
/* Get source shard Id */
Datum sourceShardIdDat = array_ref(
shardInfoArrayObject,
2,
indexes,
-1, /* (> 0 is for fixed-length arrays -- these are assumed to be 1-d, 0-based) */
elemtypeLength,
elemtypeByValue,
elemtypeAlignment,
&isNull);
if (isNull)
{
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("null entry found for source shardId")));
}
*sourceShardId = DatumGetUInt64(sourceShardIdDat);
/* Get destination shard Id */
elementIndex++;
isNull = false;
indexes[0] = shardSplitInfoIndex + 1;
indexes[1] = elementIndex + 1;
Datum destinationShardIdDat = array_ref(
shardInfoArrayObject,
2,
indexes,
-1, /* (> 0 is for fixed-length arrays -- these are assumed to be 1-d, 0-based) */
elemtypeLength,
elemtypeByValue,
elemtypeAlignment,
&isNull);
if (isNull)
{
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("null entry found for destination shardId")));
}
*desShardId = DatumGetUInt64(destinationShardIdDat);
/* Get minValue for destination shard */
elementIndex++;
isNull = false;
indexes[0] = shardSplitInfoIndex + 1;
indexes[1] = elementIndex + 1;
Datum minValueDat = array_ref(
shardInfoArrayObject,
2,
indexes,
-1, /* (> 0 is for fixed-length arrays -- these are assumed to be 1-d, 0-based) */
elemtypeLength,
elemtypeByValue,
elemtypeAlignment,
&isNull);
if (isNull)
{
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("null entry found for min value")));
}
*minValue = DatumGetInt32(minValueDat);
/* Get maxValue for destination shard */
elementIndex++;
isNull = false;
indexes[0] = shardSplitInfoIndex + 1;
indexes[1] = elementIndex + 1;
Datum maxValueDat = array_ref(
shardInfoArrayObject,
2,
indexes,
-1, /* (> 0 is for fixed-length arrays -- these are assumed to be 1-d, 0-based) */
elemtypeLength,
elemtypeByValue,
elemtypeAlignment,
&isNull);
if (isNull)
{
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("null entry found for max value")));
}
*maxValue = DatumGetInt32(maxValueDat);
/* Get nodeId for shard placement*/
elementIndex++;
isNull = false;
indexes[0] = shardSplitInfoIndex + 1;
indexes[1] = elementIndex + 1;
Datum nodeIdDat = array_ref(
shardInfoArrayObject,
2,
indexes,
-1, /* (> 0 is for fixed-length arrays -- these are assumed to be 1-d, 0-based) */
elemtypeLength,
elemtypeByValue,
elemtypeAlignment,
&isNull);
if (isNull)
{
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("null entry found for max value")));
}
*nodeId = DatumGetInt32(nodeIdDat);
PG_RETURN_VOID();
}
/*
* GetShardSplitInfo function constructs ShardSplitInfo data structure
* with appropriate OIs' for source and destination relation.
*
* sourceShardIdToSplit - Existing shardId which has a valid entry in cache and catalogue
* desSplitChildShardId - New split child shard which doesn't have an entry in metacache yet.
* minValue - Minimum hash value for desSplitChildShardId
* maxValue - Maximum hash value for desSplitChildShardId
* nodeId - NodeId where
* However we can use shard ID and construct qualified shardName.
*/
ShardSplitInfo *
GetShardSplitInfo(uint64 sourceShardIdToSplit,
uint64 desSplitChildShardId,
int32 minValue,
int32 maxValue,
int32 nodeId)
{
ShardInterval *shardIntervalToSplit = LoadShardInterval(sourceShardIdToSplit);
CitusTableCacheEntry *cachedTableEntry = GetCitusTableCacheEntry(
shardIntervalToSplit->relationId);
/*Todo(sameer): Also check if non-distributed table */
if (!IsCitusTableTypeCacheEntry(cachedTableEntry, HASH_DISTRIBUTED))
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("Cannot Support the feature")));
}
Assert(shardIntervalToSplit->minValueExists);
Assert(shardIntervalToSplit->maxValueExists);
/* Oid of distributed table */
Oid citusTableOid = InvalidOid;
citusTableOid = shardIntervalToSplit->relationId;
Oid sourceShardToSplitOid = InvalidOid;
sourceShardToSplitOid = GetTableLocalShardOid(citusTableOid,
sourceShardIdToSplit);
/* Oid of dummy table at the source */
Oid desSplitChildShardOid = InvalidOid;
desSplitChildShardOid = GetTableLocalShardOid(citusTableOid,
desSplitChildShardId);
if (citusTableOid == InvalidOid ||
sourceShardToSplitOid == InvalidOid ||
desSplitChildShardOid == InvalidOid)
{
ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR),
errmsg("Invalid citusTableOid:%u "
"sourceShardToSplitOid: %u,"
"desSplitChildShardOid :%u ",
citusTableOid,
sourceShardToSplitOid,
desSplitChildShardOid)));
}
/* Get PartitionColumnIndex for citusTableOid */
int partitionColumnIndex = -1;
/* determine the partition column in the tuple descriptor */
Var *partitionColumn = cachedTableEntry->partitionColumn;
if (partitionColumn == NULL)
{
ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR),
errmsg("Invalid Partition Column")));
}
partitionColumnIndex = partitionColumn->varattno - 1;
ShardSplitInfo *shardSplitInfo = palloc0(sizeof(ShardSplitInfo));
shardSplitInfo->distributedTableOid = citusTableOid;
shardSplitInfo->partitionColumnIndex = partitionColumnIndex;
shardSplitInfo->sourceShardOid = sourceShardToSplitOid;
shardSplitInfo->splitChildShardOid = desSplitChildShardOid;
shardSplitInfo->shardMinValue = minValue;
shardSplitInfo->shardMaxValue = maxValue;
shardSplitInfo->nodeId = nodeId;
return shardSplitInfo;
}
/*
* AddShardSplitInfoEntryForNode function add's ShardSplitInfo entry
* to the hash map. The key is nodeId on which the new shard is to be placed.
*/
void
AddShardSplitInfoEntryForNode(ShardSplitInfo *shardSplitInfo)
{
uint64_t keyNodeId = shardSplitInfo->nodeId;
bool found = false;
NodeShardMappingEntry *nodeMappingEntry =
(NodeShardMappingEntry *) hash_search(ShardInfoHashMap, &keyNodeId, HASH_ENTER,
&found);
if (!found)
{
nodeMappingEntry->shardSplitInfoList = NULL;
nodeMappingEntry->key = keyNodeId;
}
nodeMappingEntry->shardSplitInfoList =
lappend(nodeMappingEntry->shardSplitInfoList, (ShardSplitInfo *) shardSplitInfo);
PG_RETURN_VOID();
}
/*
* CopyShardSplitInfoToSM function copies information from the hash map
* into shared memory segment. This information is consumed by the WAL sender
* process during logical replication.
*
* shardSplitInfoArray - Shared memory pointer where information has to
* be copied
*
* shardInfoHashMap - Hashmap containing parsed split information
* per nodeId wise
*
* dsmHandle - Shared memory segment handle
*/
void *
CopyShardSplitInfoToSM(ShardSplitInfo *shardSplitInfoArray,
HTAB *shardInfoHashMap,
dsm_handle dsmHandle,
int shardSplitInfoCount)
{
HASH_SEQ_STATUS status;
hash_seq_init(&status, shardInfoHashMap);
NodeShardMappingEntry *entry = NULL;
int index = 0;
while ((entry = (NodeShardMappingEntry *) hash_seq_search(&status)) != NULL)
{
uint64_t nodeId = entry->key;
char *derivedSlotName =
encode_replication_slot(nodeId,
SLOT_HANDLING_INSERT_AND_DELETE,
dsmHandle);
List *shardSplitInfoList = entry->shardSplitInfoList;
ListCell *listCell = NULL;
foreach(listCell, shardSplitInfoList)
{
ShardSplitInfo *splitShardInfo = (ShardSplitInfo *) lfirst(listCell);
ShardSplitInfo *shardInfoInSM = &shardSplitInfoArray[index];
shardInfoInSM->distributedTableOid = splitShardInfo->distributedTableOid;
shardInfoInSM->partitionColumnIndex = splitShardInfo->partitionColumnIndex;
shardInfoInSM->sourceShardOid = splitShardInfo->sourceShardOid;
shardInfoInSM->splitChildShardOid = splitShardInfo->splitChildShardOid;
shardInfoInSM->shardMinValue = splitShardInfo->shardMinValue;
shardInfoInSM->shardMaxValue = splitShardInfo->shardMaxValue;
shardInfoInSM->nodeId = splitShardInfo->nodeId;
strcpy_s(shardInfoInSM->slotName, NAMEDATALEN, derivedSlotName);
index++;
}
}
PG_RETURN_VOID();
}

View File

@ -0,0 +1,27 @@
#-------------------------------------------------------------------------
#
# Makefile--
# Makefile for src/backend/replication/pgoutput
#
# IDENTIFICATION
# src/backend/replication/pgoutput
#
#-------------------------------------------------------------------------
citus_subdir = src/backend/distributed/shardsplit
citus_top_builddir = ../../../..
safestringlib_srcdir = $(citus_abs_top_srcdir)/vendor/safestringlib
SUBDIRS = . safeclib
SUBDIRS +=
ENSURE_SUBDIRS_EXIST := $(shell mkdir -p $(SUBDIRS))
#OBJS += \
$(patsubst $(citus_abs_srcdir)/%.c,%.o,$(foreach dir,$(SUBDIRS), $(sort $(wildcard $(citus_abs_srcdir)/$(dir)/*.c))))
OBJS += pgoutput.o
MODULE_big = logical_decoding_plugin
PG_CPPFLAGS += -I$(libpq_srcdir) -I$(safestringlib_srcdir)/include
include $(citus_top_builddir)/Makefile.global
.PHONY: install-all
install-all: install

View File

@ -0,0 +1,353 @@
/*-------------------------------------------------------------------------
*
* pgoutput.c
* Logical Replication output plugin
*
* Copyright (c) 2012-2017, PostgreSQL Global Development Group
*
* IDENTIFICATION
* src/backend/distributed/shardsplit/pgoutput.c
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "access/htup_details.h"
#include "access/nbtree.h"
#include "catalog/pg_am.h"
#include "catalog/pg_collation.h"
#include "catalog/pg_publication.h"
#include "catalog/pg_type.h"
#include "distributed/multi_progress.h"
#include "distributed/worker_protocol.h"
#include "replication/logical.h"
#include "replication/logicalproto.h"
#include "replication/origin.h"
#include "replication/pgoutput.h"
#include "utils/inval.h"
#include "utils/int8.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/syscache.h"
#include "utils/typcache.h"
#include "utils/varlena.h"
#include "distributed/shard_split.h"
#include "distributed/shardsplit_shared_memory.h"
#include "citus_version.h"
PG_MODULE_MAGIC;
extern void _PG_output_plugin_init(OutputPluginCallbacks *cb);
static LogicalDecodeChangeCB pgoutputChangeCB;
ShardSplitInfo *shardSplitInfoArray = NULL;
int shardSplitInfoArraySize = 0;
/* Plugin callback */
static void split_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
Relation relation, ReorderBufferChange *change);
/* Helper methods */
static bool ShouldSlotHandleChange(char *slotName, ReorderBufferChange *change);
static bool ShouldCommitBeApplied(Relation sourceShardRelation);
static int GetHashValueForIncomingTuple(Relation sourceShardRelation,
HeapTuple tuple,
bool *shouldHandleUpdate);
void
_PG_output_plugin_init(OutputPluginCallbacks *cb)
{
char *plugin = "pgoutput";
LogicalOutputPluginInit plugin_init =
(LogicalOutputPluginInit) load_external_function(plugin,
"_PG_output_plugin_init",
false, NULL);
if (plugin_init == NULL)
{
elog(ERROR, "output plugins have to declare the _PG_output_plugin_init symbol");
}
/* ask the output plugin to fill the callback struct */
plugin_init(cb);
pgoutputChangeCB = cb->change_cb;
cb->change_cb = split_change;
}
/*
* GetHashValueForIncomingTuple returns the hash value of the partition
* column for the incoming tuple. It also checks if the change should be
* handled as the incoming committed change would belong to a relation
* that is not under going split.
*/
static int
GetHashValueForIncomingTuple(Relation sourceShardRelation,
HeapTuple tuple,
bool *shouldHandleChange)
{
ShardSplitInfo *shardSplitInfo = NULL;
int partitionColumnIndex = -1;
Oid distributedTableOid = InvalidOid;
Oid sourceShardOid = sourceShardRelation->rd_id;
for (int i = 0; i < shardSplitInfoArraySize; i++)
{
shardSplitInfo = &shardSplitInfoArray[i];
if (shardSplitInfo->sourceShardOid == sourceShardOid)
{
distributedTableOid = shardSplitInfo->distributedTableOid;
partitionColumnIndex = shardSplitInfo->partitionColumnIndex;
break;
}
}
/*
* The commit can belong to any other table that is not going
* under split. Ignore such commit's.
*/
if (partitionColumnIndex == -1 ||
distributedTableOid == InvalidOid)
{
/*
* TODO(saawasek): Change below warning to DEBUG once more test case
* are added.
*/
ereport(WARNING, errmsg("Skipping Commit as "
"Relation: %s isn't splitting",
RelationGetRelationName(sourceShardRelation)));
*shouldHandleChange = false;
return 0;
}
CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(distributedTableOid);
if (cacheEntry == NULL)
{
ereport(ERROR, errmsg("null entry found for cacheEntry"));
}
TupleDesc relationTupleDes = RelationGetDescr(sourceShardRelation);
bool isNull = false;
Datum partitionColumnValue = heap_getattr(tuple,
partitionColumnIndex + 1,
relationTupleDes,
&isNull);
FmgrInfo *hashFunction = cacheEntry->hashFunction;
/* get hashed value of the distribution value */
Datum hashedValueDatum = FunctionCall1(hashFunction, partitionColumnValue);
int hashedValue = DatumGetInt32(hashedValueDatum);
*shouldHandleChange = true;
return hashedValue;
}
/*
* FindTargetRelationOid returns the destination relation Oid for the incoming
* tuple.
* sourceShardRelation - Relation on which a commit has happened.
* tuple - changed tuple.
* currentSlotName - Name of replication slot that is processing this update.
*/
Oid
FindTargetRelationOid(Relation sourceShardRelation,
HeapTuple tuple,
char *currentSlotName)
{
Oid targetRelationOid = InvalidOid;
Oid sourceShardRelationOid = sourceShardRelation->rd_id;
bool bShouldHandleUpdate = false;
int hashValue = GetHashValueForIncomingTuple(sourceShardRelation, tuple,
&bShouldHandleUpdate);
if (bShouldHandleUpdate == false)
{
return InvalidOid;
}
for (int i = 0; i < shardSplitInfoArraySize; i++)
{
ShardSplitInfo *shardSplitInfo = &shardSplitInfoArray[i];
/*
* Each commit message is processed by all the configured
* replication slots. However, a replication is slot only responsible
* for new shard placements belonging to a single node. We check if the
* current slot which is processing the commit should emit
* a target relation Oid.
*/
if (strcmp(shardSplitInfo->slotName, currentSlotName) == 0 &&
shardSplitInfo->sourceShardOid == sourceShardRelationOid &&
shardSplitInfo->shardMinValue <= hashValue &&
shardSplitInfo->shardMaxValue >= hashValue)
{
targetRelationOid = shardSplitInfo->splitChildShardOid;
break;
}
}
return targetRelationOid;
}
/*
* ShouldCommitBeApplied avoids recursive commit case when source shard and
* new split child shards are placed on the same node. When the source shard
* recives a commit(1), the WAL sender processes this commit message. This
* commit is applied to a child shard which is placed on the same node as a
* part of replication. This in turn creates one more commit(2).
* Commit 2 should be skipped as the source shard and destination for commit 2
* are same and the commit has already been applied.
*/
bool
ShouldCommitBeApplied(Relation sourceShardRelation)
{
ShardSplitInfo *shardSplitInfo = NULL;
int partitionColumnIndex = -1;
Oid distributedTableOid = InvalidOid;
Oid sourceShardOid = sourceShardRelation->rd_id;
for (int i = 0; i < shardSplitInfoArraySize; i++)
{
/* skip the commit when destination is equal to the source */
shardSplitInfo = &shardSplitInfoArray[i];
if (shardSplitInfo->splitChildShardOid == sourceShardOid)
{
return false;
}
}
return true;
}
bool
ShouldSlotHandleChange(char *slotName, ReorderBufferChange *change)
{
if (slotName == NULL)
{
ereport(ERROR, errmsg("Invalid null replication slot name."));
}
uint64_t nodeId = 0;
uint32_t slotType = 0;
dsm_handle dsmHandle;
/* change this to enum */
decode_replication_slot(slotName, &nodeId, &slotType, &dsmHandle);
if (slotType != SLOT_HANDLING_INSERT_AND_DELETE &&
slotType != SLOT_HANDLING_DELETE_OF_UPDATE)
{
ereport(ERROR, errmsg("Invalid replication slot type."));
}
if (slotType == SLOT_HANDLING_DELETE_OF_UPDATE &&
change->action != REORDER_BUFFER_CHANGE_UPDATE)
{
return false;
}
return true;
}
/*
* split_change function emits the incoming tuple change
* to the appropriate destination shard.
*/
static void
split_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
Relation relation, ReorderBufferChange *change)
{
/*
* Get ShardSplitInfo array from Shared Memory if not already
* initialized. This gets initialized during the replication of
* first message.
*/
int arraySize = 0;
if (shardSplitInfoArray == NULL)
{
shardSplitInfoArray =
GetShardSplitInfoSMArrayForSlot(ctx->slot->data.name.data,
&arraySize);
shardSplitInfoArraySize = arraySize;
}
char *replicationSlotName = ctx->slot->data.name.data;
bool shouldHandleChanges = false;
if (!ShouldSlotHandleChange(replicationSlotName, change))
{
return;
}
if (!ShouldCommitBeApplied(relation))
{
return;
}
uint64_t nodeId = 0;
uint32 slotType = 0;
dsm_handle dsmHandle = 0;
decode_replication_slot(replicationSlotName, &nodeId, &slotType, &dsmHandle);
Oid targetRelationOid = InvalidOid;
switch (change->action)
{
case REORDER_BUFFER_CHANGE_INSERT:
{
HeapTuple newTuple = &(change->data.tp.newtuple->tuple);
targetRelationOid = FindTargetRelationOid(relation, newTuple,
replicationSlotName);
break;
}
case REORDER_BUFFER_CHANGE_UPDATE:
{
switch (slotType)
{
case SLOT_HANDLING_INSERT_AND_DELETE:
{
HeapTuple newTuple = &(change->data.tp.newtuple->tuple);
Oid destinationForInsert = FindTargetRelationOid(relation, newTuple,
replicationSlotName);
targetRelationOid = destinationForInsert;
change->action = REORDER_BUFFER_CHANGE_INSERT;
break;
}
case SLOT_HANDLING_DELETE_OF_UPDATE:
{
char *modifiedSlotName = encode_replication_slot(nodeId, 0,
dsmHandle);
HeapTuple oldTuple = &(change->data.tp.oldtuple->tuple);
Oid destinationForDelete = FindTargetRelationOid(relation, oldTuple,
modifiedSlotName);
targetRelationOid = destinationForDelete;
change->action = REORDER_BUFFER_CHANGE_DELETE;
break;
}
}
break;
}
case REORDER_BUFFER_CHANGE_DELETE:
{
HeapTuple oldTuple = &(change->data.tp.oldtuple->tuple);
targetRelationOid = FindTargetRelationOid(relation, oldTuple,
replicationSlotName);
break;
}
}
if (targetRelationOid != InvalidOid)
{
Relation targetRelation = RelationIdGetRelation(targetRelationOid);
pgoutputChangeCB(ctx, txn, targetRelation, change);
RelationClose(targetRelation);
}
}

View File

@ -0,0 +1,246 @@
/*-------------------------------------------------------------------------
*
* shardsplit_shared_memory.c
* API's for creating and accessing shared memory segments to store
* shard split information. 'setup_shard_replication' UDF creates the
* shared memory, populates the contents and WAL sender processes are
* the consumers.
*
* Copyright (c) Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "miscadmin.h"
#include "lib/stringinfo.h"
#include "distributed/colocation_utils.h"
#include "distributed/shardsplit_shared_memory.h"
#include "distributed/citus_safe_lib.h"
/*
* GetShardSplitInfoSMHeaderFromDSMHandle returns the header of the shared memory
* segment beloing to 'dsmHandle'. It pins the shared memory segment mapping till
* lifetime of the backend process accessing it.
*/
ShardSplitInfoSMHeader *
GetShardSplitInfoSMHeaderFromDSMHandle(dsm_handle dsmHandle,
dsm_segment **attachedSegment)
{
dsm_segment *dsmSegment = dsm_find_mapping(dsmHandle);
if (dsmSegment == NULL)
{
dsmSegment = dsm_attach(dsmHandle);
}
if (dsmSegment == NULL)
{
ereport(ERROR,
(errmsg("could not attach to dynamic shared memory segment "
"corresponding to handle:%u", dsmHandle)));
}
/* Remain attached until end of backend or DetachSession(). */
dsm_pin_mapping(dsmSegment);
ShardSplitInfoSMHeader *header = (ShardSplitInfoSMHeader *) dsm_segment_address(
dsmSegment);
*attachedSegment = dsmSegment;
return header;
}
/*
* GetShardSplitInfoSMArrayForSlot returns pointer to the array of
* 'ShardSplitInfo' struct stored in the shared memory segment.
*/
ShardSplitInfo *
GetShardSplitInfoSMArrayForSlot(char *slotName, int *arraySize)
{
if (slotName == NULL ||
arraySize == NULL)
{
ereport(ERROR,
(errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
errmsg("Expected slot name and array size arguments")));
}
dsm_handle dsmHandle = GetSMHandleFromSlotName(slotName);
dsm_segment *dsmSegment = NULL;
ShardSplitInfoSMHeader *shardSplitInfoSMHeader =
GetShardSplitInfoSMHeaderFromDSMHandle(dsmHandle,
&dsmSegment);
*arraySize = shardSplitInfoSMHeader->stepCount;
ShardSplitInfo *shardSplitInfoArray =
(ShardSplitInfo *) ShardSplitInfoSMSteps(shardSplitInfoSMHeader);
return shardSplitInfoArray;
}
/*
* GetSMHandleFromSlotName function returns the shared memory handle
* from the replication slot name. Replication slot name is encoded as
* "NODEID_SlotType_SharedMemoryHANDLE".
*/
dsm_handle
GetSMHandleFromSlotName(char *slotName)
{
if (slotName == NULL)
{
ereport(ERROR,
errmsg("Invalid NULL replication slot name."));
}
uint64_t nodeId = 0;
uint32_t type = 0;
dsm_handle handle = 0;
decode_replication_slot(slotName, &nodeId, &type, &handle);
return handle;
}
/*
* CreateShardSplitInfoSharedMemory is used to create a place to store
* information about the shard undergoing a split. The function creates dynamic
* shared memory segment consisting of a header regarding the processId and an
* array of "steps" which store ShardSplitInfo. The contents of this shared
* memory segment are consumed by WAL sender process during catch up phase of
* replication through logical decoding plugin.
*
* The shared memory segment exists till the catch up phase completes or the
* postmaster shutsdown.
*/
ShardSplitInfoSMHeader *
CreateShardSplitInfoSharedMemory(int stepCount, Size stepSize, dsm_handle *dsmHandle)
{
if (stepSize <= 0 || stepCount <= 0)
{
ereport(ERROR,
(errmsg("number of steps and size of each step should be "
"positive values")));
}
Size totalSize = sizeof(ShardSplitInfoSMHeader) + stepSize * stepCount;
dsm_segment *dsmSegment = dsm_create(totalSize, DSM_CREATE_NULL_IF_MAXSEGMENTS);
if (dsmSegment == NULL)
{
ereport(ERROR,
(errmsg("could not create a dynamic shared memory segment to "
"keep shard split info")));
}
*dsmHandle = dsm_segment_handle(dsmSegment);
/*
* Pin the segment till Postmaster shutsdown since we need this
* segment even after the session ends for replication catchup phase.
*/
dsm_pin_segment(dsmSegment);
ShardSplitInfoSMHeader *shardSplitInfoSMHeader =
GetShardSplitInfoSMHeaderFromDSMHandle(*dsmHandle, &dsmSegment);
shardSplitInfoSMHeader->stepCount = stepCount;
shardSplitInfoSMHeader->processId = MyProcPid;
return shardSplitInfoSMHeader;
}
/*
* GetSharedMemoryForShardSplitInfo is a wrapper function which creates shared memory
* for storing shard split infomation. The function returns pointer the first element
* within this array.
*
* shardSplitInfoCount - number of 'ShardSplitInfo ' elements to be allocated
* dsmHandle - handle of the allocated shared memory segment
*/
ShardSplitInfo *
GetSharedMemoryForShardSplitInfo(int shardSplitInfoCount, dsm_handle *dsmHandle)
{
ShardSplitInfoSMHeader *shardSplitInfoSMHeader =
CreateShardSplitInfoSharedMemory(shardSplitInfoCount,
sizeof(ShardSplitInfo),
dsmHandle);
ShardSplitInfo *shardSplitInfoSMArray =
(ShardSplitInfo *) ShardSplitInfoSMSteps(shardSplitInfoSMHeader);
return shardSplitInfoSMArray;
}
/*
* ShardSplitInfoSMSteps returns a pointer to the array of 'ShardSplitInfo'
* steps that are stored in shared memory segment. This is simply the data
* right after the header, so this function is trivial. The main purpose of
* this function is to make the intent clear to readers of the code.
*/
void *
ShardSplitInfoSMSteps(ShardSplitInfoSMHeader *shardSplitInfoSMHeader)
{
return shardSplitInfoSMHeader + 1;
}
/*
* encode_replication_slot returns an encoded replication slot name
* in the following format.
* Slot Name = NodeId_ReplicationSlotType_SharedMemoryHandle
*/
char *
encode_replication_slot(uint64_t nodeId,
uint32 slotType,
dsm_handle dsmHandle)
{
StringInfo slotName = makeStringInfo();
appendStringInfo(slotName, "%ld_%u_%u", nodeId, slotType, dsmHandle);
return slotName->data;
}
/*
* decode_replication_slot decodes the replication slot name
* into node id, slotType, shared memory handle.
*/
void
decode_replication_slot(char *slotName,
uint64_t *nodeId,
uint32_t *slotType,
dsm_handle *dsmHandle)
{
if (slotName == NULL)
{
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("Invalid null replication slot name.")));
}
int index = 0;
char *strtokPosition = NULL;
char *dupSlotName = pstrdup(slotName);
char *slotNameString = strtok_r(dupSlotName, "_", &strtokPosition);
while (slotNameString != NULL)
{
if (index == 0)
{
*nodeId = SafeStringToUint64(slotNameString);
}
else if (index == 1)
{
*slotType = strtoul(slotNameString, NULL, 10);
}
else if (index == 2)
{
*dsmHandle = strtoul(slotNameString, NULL, 10);
}
slotNameString = strtok_r(NULL, "_", &strtokPosition);
index++;
}
}

View File

@ -105,9 +105,6 @@
#include "columnar/columnar.h"
/* marks shared object as one loadable by the postgres version compiled against */
PG_MODULE_MAGIC;
ColumnarSupportsIndexAM_type extern_ColumnarSupportsIndexAM = NULL;
CompressionTypeStr_type extern_CompressionTypeStr = NULL;
IsColumnarTableAmTable_type extern_IsColumnarTableAmTable = NULL;

View File

@ -10,3 +10,4 @@ DROP FUNCTION pg_catalog.worker_repartition_cleanup(bigint);
#include "../../columnar/sql/columnar--11.0-2--11.1-1.sql"
#include "udfs/citus_split_shard_by_split_points/11.0-2.sql"
#include "udfs/worker_split_copy/11.0-2.sql"
#include "udfs/split_shard_replication_setup/11.0-2.sql"

View File

@ -0,0 +1 @@
CREATE OR REPLACE FUNCTION pg_catalog.split_shard_replication_setup(shardInfo integer[][]) RETURNS bigint LANGUAGE C STRICT AS 'MODULE_PATHNAME', $$split_shard_replication_setup$$;

View File

@ -0,0 +1 @@
CREATE OR REPLACE FUNCTION pg_catalog.split_shard_replication_setup(shardInfo integer[][]) RETURNS bigint LANGUAGE C STRICT AS 'MODULE_PATHNAME', $$split_shard_replication_setup$$;

View File

View File

@ -29,8 +29,23 @@ typedef enum SplitOperation
} SplitOperation;
/*
* SplitShard API to split a given shard (or shard group) using split mode and
* specified split points to a set of destination nodes.
* In-memory representation of a split child shard.
*/
typedef struct ShardSplitInfo
{
Oid distributedTableOid; /* citus distributed table Oid */
int partitionColumnIndex;
Oid sourceShardOid; /* parent shard Oid */
Oid splitChildShardOid; /* child shard Oid */
int32 shardMinValue;
int32 shardMaxValue;
uint64 nodeId; /* node where child shard is to be placed */
char slotName[NAMEDATALEN]; /* replication slot name belonging to this node */
} ShardSplitInfo;
/*
* SplitShard API to split a given shard (or shard group) in blocking / non-blocking fashion
* based on specified split points to a set of destination nodes.
*/
extern void SplitShard(SplitMode splitMode,
SplitOperation splitOperation,

View File

@ -0,0 +1,84 @@
/*-------------------------------------------------------------------------
*
* shardsplit_sharedmemory.h
* API's for creating and accessing shared memory segments to store
* shard split information. 'setup_shard_replication' UDF creates the
* shared memory, populates the contents and WAL sender processes are
* the consumers.
*
* Copyright (c) Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#ifndef SHARDSPLIT_SHARED_MEMORY_H
#define SHARDSPLIT_SHARED_MEMORY_H
#include "postgres.h"
#include "c.h"
#include "fmgr.h"
#include "distributed/shard_split.h"
/*
* Header of the shared memory segment where shard split information is stored.
*/
typedef struct ShardSplitInfoSMHeader
{
uint64 processId; /* process id creating the shared memory segment */
int stepCount; /* number of elements in the shared memory */
} ShardSplitInfoSMHeader;
/* Functions for creating and accessing shared memory segments */
extern ShardSplitInfoSMHeader * CreateShardSplitInfoSharedMemory(int stepCount,
Size stepSize,
dsm_handle *dsmHandle);
extern ShardSplitInfo * GetSharedMemoryForShardSplitInfo(int shardSplitInfoCount,
dsm_handle *dsmHandle);
extern ShardSplitInfo * GetShardSplitInfoSMArrayForSlot(char *slotName, int *arraySize);
extern dsm_handle GetSMHandleFromSlotName(char *slotName);
/*
* ShardSplitInfoSMSteps returns a pointer to the array of shard split info
* steps that are stored in shared memory.
*/
extern void * ShardSplitInfoSMSteps(ShardSplitInfoSMHeader *shardSplitInfoSMHeader);
extern ShardSplitInfoSMHeader * GetShardSplitInfoSMHeaderFromDSMHandle(dsm_handle
dsmHandle,
dsm_segment **
attachedSegment);
/*
* An UPADATE request for a partition key, is realized as 'DELETE' on
* old shard and 'INSERT' on new shard. So a commit of UPDATE has to be
* seggrated in two replication messages. WAL sender belonging to a
* replication slot can send only one message and hence to handle UPDATE we
* have to create one extra replication slot per node that handles the deletion
* part of an UPDATE.
*
* SLOT_HANDING_INSERT_AND_DELETE - Responsible for handling INSERT and DELETE
* operations.
* SLOT_HANDLING_DELETE_OF_UPDATE - Responsible for only handling DELETE on old shard
* for an UPDATE. Its a no-op for INSERT and DELETE
* operations.
*/
enum ReplicationSlotType
{
SLOT_HANDLING_INSERT_AND_DELETE,
SLOT_HANDLING_DELETE_OF_UPDATE
};
/* Functions related to encoding-decoding for replication slot name */
char * encode_replication_slot(uint64_t nodeId,
uint32 slotType,
dsm_handle dsmHandle);
void decode_replication_slot(char *slotName,
uint64_t *nodeId,
uint32_t *slotType,
dsm_handle *dsmHandle);
#endif /* SHARDSPLIT_SHARED_MEMORY_H */

View File

@ -0,0 +1,201 @@
CREATE SCHEMA citus_split_shard_by_split_points;
SET search_path TO citus_split_shard_by_split_points;
SET citus.shard_replication_factor TO 1;
SET citus.shard_count TO 1;
SET citus.next_shard_id TO 1;
-- Add two additional nodes to cluster.
SELECT 1 FROM citus_add_node('localhost', :worker_1_port);
?column?
---------------------------------------------------------------------
1
(1 row)
SELECT 1 FROM citus_add_node('localhost', :worker_2_port);
?column?
---------------------------------------------------------------------
1
(1 row)
-- Create distributed table (non co-located)
CREATE TABLE table_to_split (id bigserial PRIMARY KEY, value char);
SELECT create_distributed_table('table_to_split','id');
create_distributed_table
---------------------------------------------------------------------
(1 row)
-- slotName_table is used to persist replication slot name.
-- It is only used for testing as the worker2 needs to create subscription over the same replication slot.
CREATE TABLE slotName_table (name text, id int primary key);
SELECT create_distributed_table('slotName_table','id');
create_distributed_table
---------------------------------------------------------------------
(1 row)
-- Shard with id '1' of table table_to_split is undergoing a split into two new shards
-- with id '2' and '3' respectively. table_to_split_1 is placed on worker1(NodeId 16) and
-- new child shards, table_to_split_2 and table_to_split_3 are placed on worker2(NodeId 18).
-- TODO(saawasek): make it parameterized
CREATE OR REPLACE FUNCTION SplitShardReplicationSetup() RETURNS text AS $$
DECLARE
memoryId bigint := 0;
memoryIdText text;
begin
SELECT * into memoryId from split_shard_replication_setup(ARRAY[ARRAY[1,2,-2147483648,-1,18], ARRAY[1,3,0,2147483647,18]]);
SELECT FORMAT('%s', memoryId) into memoryIdText;
return memoryIdText;
end
$$ LANGUAGE plpgsql;
-- Sets up split shard information and returns Slot Name in format : DestinationNodeId_SlotType_SharedMemoryId
-- TODO(saawasek): make it parameterized
CREATE OR REPLACE FUNCTION CreateReplicationSlot() RETURNS text AS $$
DECLARE
replicationSlotName text;
createdSlotName text;
sharedMemoryId text;
derivedSlotName text;
begin
SELECT * into sharedMemoryId from SplitShardReplicationSetup();
-- '18' is nodeId of worker2
SELECT FORMAT('18_0_%s', sharedMemoryId) into derivedSlotName;
SELECT slot_name into replicationSlotName from pg_create_logical_replication_slot(derivedSlotName, 'logical_decoding_plugin');
INSERT INTO slotName_table values(replicationSlotName, 1);
return replicationSlotName;
end
$$ LANGUAGE plpgsql;
CREATE OR REPLACE FUNCTION CreateSubscription() RETURNS text AS $$
DECLARE
replicationSlotName text;
nodeportLocal int;
subname text;
begin
SELECT name into replicationSlotName from slotName_table;
EXECUTE FORMAT($sub$create subscription subforID1 connection 'host=localhost port=xxxxx user=postgres dbname=regression' publication PUB1 with(create_slot=false, enabled=true, slot_name='%s', copy_data=false)$sub$, replicationSlotName);
return 'a';
end
$$ LANGUAGE plpgsql;
-- Test scenario starts from here
-- 1. table_to_split is a citus distributed table
-- 2. Shard table_to_split_1 is located on worker1.
-- 3. table_to_split_1 is split into table_to_split_2 and table_to_split_3.
-- table_to_split_2/3 are located on worker2
-- 4. execute UDF split_shard_replication_setup on worker1 with below
-- params:
-- split_shard_replication_setup
-- (
-- ARRAY[
-- ARRAY[1 /*source shardId */, 2 /* new shardId */,-2147483648 /* minHashValue */, -1 /* maxHasValue */ , 18 /* nodeId where new shard is placed */ ],
-- ARRAY[1, 3 , 0 , 2147483647, 18 ]
-- ]
-- );
-- 5. Create Replication slot with 'logical_decoding_plugin'
-- 6. Setup Pub/Sub
-- 7. Insert into table_to_split_1 at source worker1
-- 8. Expect the results in either table_to_split_2 or table_to_split_2 at worker2
\c - - - :worker_2_port
SET search_path TO citus_split_shard_by_split_points;
CREATE TABLE table_to_split_1(id bigserial PRIMARY KEY, value char);
CREATE TABLE table_to_split_2(id bigserial PRIMARY KEY, value char);
CREATE TABLE table_to_split_3(id bigserial PRIMARY KEY, value char);
-- Create dummy shard tables(table_to_split_2/3) at worker1
-- This is needed for Pub/Sub framework to work.
\c - - - :worker_1_port
SET search_path TO citus_split_shard_by_split_points;
BEGIN;
CREATE TABLE table_to_split_2(id bigserial PRIMARY KEY, value char);
CREATE TABLE table_to_split_3(id bigserial PRIMARY KEY, value char);
COMMIT;
-- Create publication at worker1
BEGIN;
CREATE PUBLICATION PUB1 for table table_to_split_1, table_to_split_2, table_to_split_3;
COMMIT;
-- Create replication slot and setup shard split information at worker1
BEGIN;
select 1 from CreateReplicationSlot();
?column?
---------------------------------------------------------------------
1
(1 row)
COMMIT;
\c - - - :worker_2_port
SET search_path TO citus_split_shard_by_split_points;
-- Create subscription at worker2 with copy_data to 'false' and derived replication slot name
BEGIN;
SELECT 1 from CreateSubscription();
?column?
---------------------------------------------------------------------
1
(1 row)
COMMIT;
-- No data is present at this moment in all the below tables at worker2
SELECT * from table_to_split_1;
id | value
---------------------------------------------------------------------
(0 rows)
SELECT * from table_to_split_2;
id | value
---------------------------------------------------------------------
(0 rows)
SELECT * from table_to_split_3;
id | value
---------------------------------------------------------------------
(0 rows)
select pg_sleep(10);
pg_sleep
---------------------------------------------------------------------
(1 row)
-- Insert data in table_to_split_1 at worker1
\c - - - :worker_1_port
SET search_path TO citus_split_shard_by_split_points;
INSERT into table_to_split_1 values(100, 'a');
INSERT into table_to_split_1 values(400, 'a');
INSERT into table_to_split_1 values(500, 'a');
SELECT * from table_to_split_1;
id | value
---------------------------------------------------------------------
100 | a
400 | a
500 | a
(3 rows)
select pg_sleep(10);
pg_sleep
---------------------------------------------------------------------
(1 row)
-- Expect data to be present in shard xxxxx and shard xxxxx based on the hash value.
\c - - - :worker_2_port
select pg_sleep(10);
pg_sleep
---------------------------------------------------------------------
(1 row)
SET search_path TO citus_split_shard_by_split_points;
SELECT * from table_to_split_1; -- should alwasy have zero rows
id | value
---------------------------------------------------------------------
(0 rows)
SELECT * from table_to_split_2;
id | value
---------------------------------------------------------------------
400 | a
(1 row)
SELECT * from table_to_split_3;
id | value
---------------------------------------------------------------------
100 | a
500 | a
(2 rows)

View File

@ -0,0 +1,138 @@
CREATE SCHEMA citus_split_shard_by_split_points;
SET search_path TO citus_split_shard_by_split_points;
SET citus.shard_replication_factor TO 1;
SET citus.shard_count TO 1;
SET citus.next_shard_id TO 1;
-- Add two additional nodes to cluster.
SELECT 1 FROM citus_add_node('localhost', :worker_1_port);
SELECT 1 FROM citus_add_node('localhost', :worker_2_port);
-- Create distributed table (non co-located)
CREATE TABLE table_to_split (id bigserial PRIMARY KEY, value char);
SELECT create_distributed_table('table_to_split','id');
-- slotName_table is used to persist replication slot name.
-- It is only used for testing as the worker2 needs to create subscription over the same replication slot.
CREATE TABLE slotName_table (name text, id int primary key);
SELECT create_distributed_table('slotName_table','id');
-- Shard with id '1' of table table_to_split is undergoing a split into two new shards
-- with id '2' and '3' respectively. table_to_split_1 is placed on worker1(NodeId 16) and
-- new child shards, table_to_split_2 and table_to_split_3 are placed on worker2(NodeId 18).
-- TODO(saawasek): make it parameterized
CREATE OR REPLACE FUNCTION SplitShardReplicationSetup() RETURNS text AS $$
DECLARE
memoryId bigint := 0;
memoryIdText text;
begin
SELECT * into memoryId from split_shard_replication_setup(ARRAY[ARRAY[1,2,-2147483648,-1,18], ARRAY[1,3,0,2147483647,18]]);
SELECT FORMAT('%s', memoryId) into memoryIdText;
return memoryIdText;
end
$$ LANGUAGE plpgsql;
-- Sets up split shard information and returns Slot Name in format : DestinationNodeId_SlotType_SharedMemoryId
-- TODO(saawasek): make it parameterized
CREATE OR REPLACE FUNCTION CreateReplicationSlot() RETURNS text AS $$
DECLARE
replicationSlotName text;
createdSlotName text;
sharedMemoryId text;
derivedSlotName text;
begin
SELECT * into sharedMemoryId from SplitShardReplicationSetup();
-- '18' is nodeId of worker2
SELECT FORMAT('18_0_%s', sharedMemoryId) into derivedSlotName;
SELECT slot_name into replicationSlotName from pg_create_logical_replication_slot(derivedSlotName, 'logical_decoding_plugin');
INSERT INTO slotName_table values(replicationSlotName, 1);
return replicationSlotName;
end
$$ LANGUAGE plpgsql;
CREATE OR REPLACE FUNCTION CreateSubscription() RETURNS text AS $$
DECLARE
replicationSlotName text;
nodeportLocal int;
subname text;
begin
SELECT name into replicationSlotName from slotName_table;
EXECUTE FORMAT($sub$create subscription subforID1 connection 'host=localhost port=57637 user=postgres dbname=regression' publication PUB1 with(create_slot=false, enabled=true, slot_name='%s', copy_data=false)$sub$, replicationSlotName);
return 'a';
end
$$ LANGUAGE plpgsql;
-- Test scenario starts from here
-- 1. table_to_split is a citus distributed table
-- 2. Shard table_to_split_1 is located on worker1.
-- 3. table_to_split_1 is split into table_to_split_2 and table_to_split_3.
-- table_to_split_2/3 are located on worker2
-- 4. execute UDF split_shard_replication_setup on worker1 with below
-- params:
-- split_shard_replication_setup
-- (
-- ARRAY[
-- ARRAY[1 /*source shardId */, 2 /* new shardId */,-2147483648 /* minHashValue */, -1 /* maxHasValue */ , 18 /* nodeId where new shard is placed */ ],
-- ARRAY[1, 3 , 0 , 2147483647, 18 ]
-- ]
-- );
-- 5. Create Replication slot with 'logical_decoding_plugin'
-- 6. Setup Pub/Sub
-- 7. Insert into table_to_split_1 at source worker1
-- 8. Expect the results in either table_to_split_2 or table_to_split_2 at worker2
\c - - - :worker_2_port
SET search_path TO citus_split_shard_by_split_points;
CREATE TABLE table_to_split_1(id bigserial PRIMARY KEY, value char);
CREATE TABLE table_to_split_2(id bigserial PRIMARY KEY, value char);
CREATE TABLE table_to_split_3(id bigserial PRIMARY KEY, value char);
-- Create dummy shard tables(table_to_split_2/3) at worker1
-- This is needed for Pub/Sub framework to work.
\c - - - :worker_1_port
SET search_path TO citus_split_shard_by_split_points;
BEGIN;
CREATE TABLE table_to_split_2(id bigserial PRIMARY KEY, value char);
CREATE TABLE table_to_split_3(id bigserial PRIMARY KEY, value char);
COMMIT;
-- Create publication at worker1
BEGIN;
CREATE PUBLICATION PUB1 for table table_to_split_1, table_to_split_2, table_to_split_3;
COMMIT;
-- Create replication slot and setup shard split information at worker1
BEGIN;
select 1 from CreateReplicationSlot();
COMMIT;
\c - - - :worker_2_port
SET search_path TO citus_split_shard_by_split_points;
-- Create subscription at worker2 with copy_data to 'false' and derived replication slot name
BEGIN;
SELECT 1 from CreateSubscription();
COMMIT;
-- No data is present at this moment in all the below tables at worker2
SELECT * from table_to_split_1;
SELECT * from table_to_split_2;
SELECT * from table_to_split_3;
select pg_sleep(10);
-- Insert data in table_to_split_1 at worker1
\c - - - :worker_1_port
SET search_path TO citus_split_shard_by_split_points;
INSERT into table_to_split_1 values(100, 'a');
INSERT into table_to_split_1 values(400, 'a');
INSERT into table_to_split_1 values(500, 'a');
SELECT * from table_to_split_1;
select pg_sleep(10);
-- Expect data to be present in shard 2 and shard 3 based on the hash value.
\c - - - :worker_2_port
select pg_sleep(10);
SET search_path TO citus_split_shard_by_split_points;
SELECT * from table_to_split_1; -- should alwasy have zero rows
SELECT * from table_to_split_2;
SELECT * from table_to_split_3;