diff --git a/Makefile b/Makefile index 12b26cbbe..ce8c6bc0f 100644 --- a/Makefile +++ b/Makefile @@ -17,12 +17,16 @@ all: extension pg_send_cancellation # build columnar only columnar: $(MAKE) -C src/backend/columnar all +logical_decoding_plugin: + $(MAKE) -C src/backend/distributed/shardsplit all # build extension extension: $(citus_top_builddir)/src/include/citus_version.h columnar $(MAKE) -C src/backend/distributed/ all install-columnar: columnar $(MAKE) -C src/backend/columnar install -install-extension: extension install-columnar +install-logical_decoding_plugin: logical_decoding_plugin + $(MAKE) -C src/backend/distributed/shardsplit install +install-extension: extension install-columnar install-logical_decoding_plugin $(MAKE) -C src/backend/distributed/ install install-headers: extension $(MKDIR_P) '$(DESTDIR)$(includedir_server)/distributed/' diff --git a/src/backend/distributed/Makefile b/src/backend/distributed/Makefile index 69dbc0cfd..608f60c0b 100644 --- a/src/backend/distributed/Makefile +++ b/src/backend/distributed/Makefile @@ -18,7 +18,7 @@ generated_downgrade_sql_files += $(patsubst %,$(citus_abs_srcdir)/build/sql/%,$( DATA_built = $(generated_sql_files) # directories with source files -SUBDIRS = . commands connection ddl deparser executor metadata operations planner progress relay safeclib test transaction utils worker +SUBDIRS = . commands connection ddl deparser executor metadata operations planner progress relay safeclib shardsplit test transaction utils worker # enterprise modules SUBDIRS += replication diff --git a/src/backend/distributed/operations/shard_split_replicatoin.c b/src/backend/distributed/operations/shard_split_replicatoin.c new file mode 100644 index 000000000..504ac1874 --- /dev/null +++ b/src/backend/distributed/operations/shard_split_replicatoin.c @@ -0,0 +1,466 @@ +/*------------------------------------------------------------------------- + * + * shard_split_replication.c + * This file contains functions to setup information about list of shards + * that are being split. + * + * Copyright (c) Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" +#include "nodes/pg_list.h" +#include "utils/array.h" +#include "distributed/utils/array_type.h" +#include "lib/stringinfo.h" +#include "utils/builtins.h" +#include "utils/lsyscache.h" +#include "distributed/colocation_utils.h" +#include "distributed/metadata_cache.h" +#include "distributed/shardinterval_utils.h" +#include "distributed/coordinator_protocol.h" +#include "distributed/connection_management.h" +#include "distributed/remote_commands.h" +#include "distributed/shard_split.h" +#include "distributed/shard_utils.h" +#include "distributed/shardsplit_shared_memory.h" +#include "common/hashfn.h" +#include "safe_str_lib.h" +#include "distributed/citus_safe_lib.h" + +/* declarations for dynamic loading */ +PG_FUNCTION_INFO_V1(split_shard_replication_setup); + +static HTAB *ShardInfoHashMap = NULL; + +/* Entry for hash map */ +typedef struct NodeShardMappingEntry +{ + uint64_t key; + List *shardSplitInfoList; +} NodeShardMappingEntry; + +/* Function declarations */ +static void ParseShardSplitInfo(ArrayType *shardInfoArrayObject, + int shardSplitInfoIndex, + uint64 *sourceShardId, + uint64 *desShardId, + int32 *minValue, + int32 *maxValue, + int32 *nodeId); +static ShardSplitInfo * GetShardSplitInfo(uint64 sourceShardIdToSplit, + uint64 desSplitChildShardId, + int32 minValue, + int32 maxValue, + int32 nodeId); +static void AddShardSplitInfoEntryForNode(ShardSplitInfo *shardSplitInfo); +static void * CopyShardSplitInfoToSM(ShardSplitInfo *shardSplitInfoArray, + HTAB *shardInfoHashMap, + dsm_handle dsmHandle, + int shardSplitInfoCount); +static HTAB * SetupHashMapForShardInfo(); + +/* + * split_shard_replication_setup UDF creates in-memory data structures + * to store the meta information about the new shards and their placements + * required during the catch up phase of logical replication. + * This meta information is stored in a shared memory segment and accessed + * by logical decoding plugin. + * + * Split information is given by user as an Array in the below format + * [{sourceShardId, childShardID, minValue, maxValue, Destination NodeId}] + * + * sourceShardId - id of the shard that is undergoing a split + * childShardId - id of shard that stores a specific range of values + * belonging to sourceShardId(parent) + * minValue - lower bound of hash value which childShard stores + * + * maxValue - upper bound of hash value which childShard stores + * + * NodeId - Node where the childShardId is located + * + * The function parses the data and builds routing map per destination node id. + * Multiple shards can be placed on the same destiation node. Source and + * destinations nodes can be same too. + * + * There is a 1-1 mapping between a node and a replication slot as one replication + * slot takes care of replicating changes for one node. + * The 'logical_decoding_plugin' consumes this information and routes the tuple + * from the source shard to the appropriate destination shard that falls in the + * respective range. + */ +Datum +split_shard_replication_setup(PG_FUNCTION_ARGS) +{ + ArrayType *shardInfoArrayObject = PG_GETARG_ARRAYTYPE_P(0); + int shardInfoArrayLength = ARR_DIMS(shardInfoArrayObject)[0]; + int insideCount = ARR_DIMS(shardInfoArrayObject)[1]; + + /* SetupMap */ + SetupHashMapForShardInfo(); + + int shardSplitInfoCount = 0; + for (int index = 0; index < shardInfoArrayLength; index++) + { + uint64 sourceShardId = 0; + uint64 desShardId = 0; + int32 minValue = 0; + int32 maxValue = 0; + int32 nodeId = 0; + + ParseShardSplitInfo( + shardInfoArrayObject, + index, + &sourceShardId, + &desShardId, + &minValue, + &maxValue, + &nodeId); + + ShardSplitInfo *shardSplitInfo = GetShardSplitInfo( + sourceShardId, + desShardId, + minValue, + maxValue, + nodeId); + + AddShardSplitInfoEntryForNode(shardSplitInfo); + shardSplitInfoCount++; + } + + dsm_handle dsmHandle; + ShardSplitInfo *splitShardInfoSMArray = + GetSharedMemoryForShardSplitInfo(shardSplitInfoCount, &dsmHandle); + + CopyShardSplitInfoToSM(splitShardInfoSMArray, + ShardInfoHashMap, + dsmHandle, + shardSplitInfoCount); + + return dsmHandle; +} + + +/* + * SetupHashMapForShardInfo initializes a hash map to store shard split + * information by grouping them node id wise. The key of the hash table + * is 'nodeId' and value is a list of ShardSplitInfo that are placed on + * this particular node. + */ +static HTAB * +SetupHashMapForShardInfo() +{ + HASHCTL info; + memset(&info, 0, sizeof(info)); + info.keysize = sizeof(uint64_t); + info.entrysize = sizeof(NodeShardMappingEntry); + info.hash = uint32_hash; + info.hcxt = CurrentMemoryContext; + int hashFlags = (HASH_ELEM | HASH_CONTEXT | HASH_FUNCTION); + + ShardInfoHashMap = hash_create("ShardInfoMap", 128, &info, hashFlags); + return ShardInfoHashMap; +} + + +static void +ParseShardSplitInfo(ArrayType *shardInfoArrayObject, + int shardSplitInfoIndex, + uint64 *sourceShardId, + uint64 *desShardId, + int32 *minValue, + int32 *maxValue, + int32 *nodeId) +{ + Oid elemtypeId = ARR_ELEMTYPE(shardInfoArrayObject); + int16 elemtypeLength = 0; + bool elemtypeByValue = false; + char elemtypeAlignment = 0; + get_typlenbyvalalign(elemtypeId, &elemtypeLength, &elemtypeByValue, + &elemtypeAlignment); + + int elementIndex = 0; + int indexes[] = { shardSplitInfoIndex + 1, elementIndex + 1 }; + bool isNull = false; + + /* Get source shard Id */ + Datum sourceShardIdDat = array_ref( + shardInfoArrayObject, + 2, + indexes, + -1, /* (> 0 is for fixed-length arrays -- these are assumed to be 1-d, 0-based) */ + elemtypeLength, + elemtypeByValue, + elemtypeAlignment, + &isNull); + + if (isNull) + { + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("null entry found for source shardId"))); + } + + *sourceShardId = DatumGetUInt64(sourceShardIdDat); + + /* Get destination shard Id */ + elementIndex++; + isNull = false; + indexes[0] = shardSplitInfoIndex + 1; + indexes[1] = elementIndex + 1; + Datum destinationShardIdDat = array_ref( + shardInfoArrayObject, + 2, + indexes, + -1, /* (> 0 is for fixed-length arrays -- these are assumed to be 1-d, 0-based) */ + elemtypeLength, + elemtypeByValue, + elemtypeAlignment, + &isNull); + + if (isNull) + { + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("null entry found for destination shardId"))); + } + + *desShardId = DatumGetUInt64(destinationShardIdDat); + + /* Get minValue for destination shard */ + elementIndex++; + isNull = false; + indexes[0] = shardSplitInfoIndex + 1; + indexes[1] = elementIndex + 1; + Datum minValueDat = array_ref( + shardInfoArrayObject, + 2, + indexes, + -1, /* (> 0 is for fixed-length arrays -- these are assumed to be 1-d, 0-based) */ + elemtypeLength, + elemtypeByValue, + elemtypeAlignment, + &isNull); + + if (isNull) + { + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("null entry found for min value"))); + } + + *minValue = DatumGetInt32(minValueDat); + + /* Get maxValue for destination shard */ + elementIndex++; + isNull = false; + indexes[0] = shardSplitInfoIndex + 1; + indexes[1] = elementIndex + 1; + Datum maxValueDat = array_ref( + shardInfoArrayObject, + 2, + indexes, + -1, /* (> 0 is for fixed-length arrays -- these are assumed to be 1-d, 0-based) */ + elemtypeLength, + elemtypeByValue, + elemtypeAlignment, + &isNull); + + if (isNull) + { + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("null entry found for max value"))); + } + + *maxValue = DatumGetInt32(maxValueDat); + + /* Get nodeId for shard placement*/ + elementIndex++; + isNull = false; + indexes[0] = shardSplitInfoIndex + 1; + indexes[1] = elementIndex + 1; + Datum nodeIdDat = array_ref( + shardInfoArrayObject, + 2, + indexes, + -1, /* (> 0 is for fixed-length arrays -- these are assumed to be 1-d, 0-based) */ + elemtypeLength, + elemtypeByValue, + elemtypeAlignment, + &isNull); + + if (isNull) + { + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("null entry found for max value"))); + } + + *nodeId = DatumGetInt32(nodeIdDat); + + PG_RETURN_VOID(); +} + + +/* + * GetShardSplitInfo function constructs ShardSplitInfo data structure + * with appropriate OIs' for source and destination relation. + * + * sourceShardIdToSplit - Existing shardId which has a valid entry in cache and catalogue + * desSplitChildShardId - New split child shard which doesn't have an entry in metacache yet. + * minValue - Minimum hash value for desSplitChildShardId + * maxValue - Maximum hash value for desSplitChildShardId + * nodeId - NodeId where + * However we can use shard ID and construct qualified shardName. + */ +ShardSplitInfo * +GetShardSplitInfo(uint64 sourceShardIdToSplit, + uint64 desSplitChildShardId, + int32 minValue, + int32 maxValue, + int32 nodeId) +{ + ShardInterval *shardIntervalToSplit = LoadShardInterval(sourceShardIdToSplit); + CitusTableCacheEntry *cachedTableEntry = GetCitusTableCacheEntry( + shardIntervalToSplit->relationId); + + /*Todo(sameer): Also check if non-distributed table */ + if (!IsCitusTableTypeCacheEntry(cachedTableEntry, HASH_DISTRIBUTED)) + { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("Cannot Support the feature"))); + } + + Assert(shardIntervalToSplit->minValueExists); + Assert(shardIntervalToSplit->maxValueExists); + + /* Oid of distributed table */ + Oid citusTableOid = InvalidOid; + citusTableOid = shardIntervalToSplit->relationId; + Oid sourceShardToSplitOid = InvalidOid; + sourceShardToSplitOid = GetTableLocalShardOid(citusTableOid, + sourceShardIdToSplit); + + /* Oid of dummy table at the source */ + Oid desSplitChildShardOid = InvalidOid; + desSplitChildShardOid = GetTableLocalShardOid(citusTableOid, + desSplitChildShardId); + + if (citusTableOid == InvalidOid || + sourceShardToSplitOid == InvalidOid || + desSplitChildShardOid == InvalidOid) + { + ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("Invalid citusTableOid:%u " + "sourceShardToSplitOid: %u," + "desSplitChildShardOid :%u ", + citusTableOid, + sourceShardToSplitOid, + desSplitChildShardOid))); + } + + /* Get PartitionColumnIndex for citusTableOid */ + int partitionColumnIndex = -1; + + /* determine the partition column in the tuple descriptor */ + Var *partitionColumn = cachedTableEntry->partitionColumn; + if (partitionColumn == NULL) + { + ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("Invalid Partition Column"))); + } + partitionColumnIndex = partitionColumn->varattno - 1; + + ShardSplitInfo *shardSplitInfo = palloc0(sizeof(ShardSplitInfo)); + shardSplitInfo->distributedTableOid = citusTableOid; + shardSplitInfo->partitionColumnIndex = partitionColumnIndex; + shardSplitInfo->sourceShardOid = sourceShardToSplitOid; + shardSplitInfo->splitChildShardOid = desSplitChildShardOid; + shardSplitInfo->shardMinValue = minValue; + shardSplitInfo->shardMaxValue = maxValue; + shardSplitInfo->nodeId = nodeId; + + return shardSplitInfo; +} + + +/* + * AddShardSplitInfoEntryForNode function add's ShardSplitInfo entry + * to the hash map. The key is nodeId on which the new shard is to be placed. + */ +void +AddShardSplitInfoEntryForNode(ShardSplitInfo *shardSplitInfo) +{ + uint64_t keyNodeId = shardSplitInfo->nodeId; + bool found = false; + NodeShardMappingEntry *nodeMappingEntry = + (NodeShardMappingEntry *) hash_search(ShardInfoHashMap, &keyNodeId, HASH_ENTER, + &found); + + if (!found) + { + nodeMappingEntry->shardSplitInfoList = NULL; + nodeMappingEntry->key = keyNodeId; + } + + nodeMappingEntry->shardSplitInfoList = + lappend(nodeMappingEntry->shardSplitInfoList, (ShardSplitInfo *) shardSplitInfo); + + PG_RETURN_VOID(); +} + + +/* + * CopyShardSplitInfoToSM function copies information from the hash map + * into shared memory segment. This information is consumed by the WAL sender + * process during logical replication. + * + * shardSplitInfoArray - Shared memory pointer where information has to + * be copied + * + * shardInfoHashMap - Hashmap containing parsed split information + * per nodeId wise + * + * dsmHandle - Shared memory segment handle + */ +void * +CopyShardSplitInfoToSM(ShardSplitInfo *shardSplitInfoArray, + HTAB *shardInfoHashMap, + dsm_handle dsmHandle, + int shardSplitInfoCount) +{ + HASH_SEQ_STATUS status; + hash_seq_init(&status, shardInfoHashMap); + + NodeShardMappingEntry *entry = NULL; + int index = 0; + while ((entry = (NodeShardMappingEntry *) hash_seq_search(&status)) != NULL) + { + uint64_t nodeId = entry->key; + char *derivedSlotName = + encode_replication_slot(nodeId, + SLOT_HANDLING_INSERT_AND_DELETE, + dsmHandle); + + List *shardSplitInfoList = entry->shardSplitInfoList; + ListCell *listCell = NULL; + foreach(listCell, shardSplitInfoList) + { + ShardSplitInfo *splitShardInfo = (ShardSplitInfo *) lfirst(listCell); + ShardSplitInfo *shardInfoInSM = &shardSplitInfoArray[index]; + + shardInfoInSM->distributedTableOid = splitShardInfo->distributedTableOid; + shardInfoInSM->partitionColumnIndex = splitShardInfo->partitionColumnIndex; + shardInfoInSM->sourceShardOid = splitShardInfo->sourceShardOid; + shardInfoInSM->splitChildShardOid = splitShardInfo->splitChildShardOid; + shardInfoInSM->shardMinValue = splitShardInfo->shardMinValue; + shardInfoInSM->shardMaxValue = splitShardInfo->shardMaxValue; + shardInfoInSM->nodeId = splitShardInfo->nodeId; + strcpy_s(shardInfoInSM->slotName, NAMEDATALEN, derivedSlotName); + index++; + } + } + + PG_RETURN_VOID(); +} diff --git a/src/backend/distributed/shardsplit/Makefile b/src/backend/distributed/shardsplit/Makefile new file mode 100644 index 000000000..942125ca4 --- /dev/null +++ b/src/backend/distributed/shardsplit/Makefile @@ -0,0 +1,27 @@ +#------------------------------------------------------------------------- +# +# Makefile-- +# Makefile for src/backend/replication/pgoutput +# +# IDENTIFICATION +# src/backend/replication/pgoutput +# +#------------------------------------------------------------------------- +citus_subdir = src/backend/distributed/shardsplit +citus_top_builddir = ../../../.. +safestringlib_srcdir = $(citus_abs_top_srcdir)/vendor/safestringlib +SUBDIRS = . safeclib +SUBDIRS += +ENSURE_SUBDIRS_EXIST := $(shell mkdir -p $(SUBDIRS)) +#OBJS += \ + $(patsubst $(citus_abs_srcdir)/%.c,%.o,$(foreach dir,$(SUBDIRS), $(sort $(wildcard $(citus_abs_srcdir)/$(dir)/*.c)))) +OBJS += pgoutput.o + +MODULE_big = logical_decoding_plugin + +PG_CPPFLAGS += -I$(libpq_srcdir) -I$(safestringlib_srcdir)/include + +include $(citus_top_builddir)/Makefile.global + +.PHONY: install-all +install-all: install diff --git a/src/backend/distributed/shardsplit/pgoutput.c b/src/backend/distributed/shardsplit/pgoutput.c new file mode 100644 index 000000000..b8c6cb2df --- /dev/null +++ b/src/backend/distributed/shardsplit/pgoutput.c @@ -0,0 +1,353 @@ +/*------------------------------------------------------------------------- + * + * pgoutput.c + * Logical Replication output plugin + * + * Copyright (c) 2012-2017, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/backend/distributed/shardsplit/pgoutput.c + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" +#include "access/htup_details.h" +#include "access/nbtree.h" +#include "catalog/pg_am.h" +#include "catalog/pg_collation.h" +#include "catalog/pg_publication.h" +#include "catalog/pg_type.h" +#include "distributed/multi_progress.h" +#include "distributed/worker_protocol.h" +#include "replication/logical.h" +#include "replication/logicalproto.h" +#include "replication/origin.h" +#include "replication/pgoutput.h" +#include "utils/inval.h" +#include "utils/int8.h" +#include "utils/lsyscache.h" +#include "utils/memutils.h" +#include "utils/syscache.h" +#include "utils/typcache.h" +#include "utils/varlena.h" +#include "distributed/shard_split.h" +#include "distributed/shardsplit_shared_memory.h" +#include "citus_version.h" + +PG_MODULE_MAGIC; + +extern void _PG_output_plugin_init(OutputPluginCallbacks *cb); +static LogicalDecodeChangeCB pgoutputChangeCB; +ShardSplitInfo *shardSplitInfoArray = NULL; +int shardSplitInfoArraySize = 0; + + +/* Plugin callback */ +static void split_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, + Relation relation, ReorderBufferChange *change); + +/* Helper methods */ +static bool ShouldSlotHandleChange(char *slotName, ReorderBufferChange *change); +static bool ShouldCommitBeApplied(Relation sourceShardRelation); +static int GetHashValueForIncomingTuple(Relation sourceShardRelation, + HeapTuple tuple, + bool *shouldHandleUpdate); + +void +_PG_output_plugin_init(OutputPluginCallbacks *cb) +{ + char *plugin = "pgoutput"; + + LogicalOutputPluginInit plugin_init = + (LogicalOutputPluginInit) load_external_function(plugin, + "_PG_output_plugin_init", + false, NULL); + + if (plugin_init == NULL) + { + elog(ERROR, "output plugins have to declare the _PG_output_plugin_init symbol"); + } + + /* ask the output plugin to fill the callback struct */ + plugin_init(cb); + + pgoutputChangeCB = cb->change_cb; + + cb->change_cb = split_change; +} + + +/* + * GetHashValueForIncomingTuple returns the hash value of the partition + * column for the incoming tuple. It also checks if the change should be + * handled as the incoming committed change would belong to a relation + * that is not under going split. + */ +static int +GetHashValueForIncomingTuple(Relation sourceShardRelation, + HeapTuple tuple, + bool *shouldHandleChange) +{ + ShardSplitInfo *shardSplitInfo = NULL; + int partitionColumnIndex = -1; + Oid distributedTableOid = InvalidOid; + + Oid sourceShardOid = sourceShardRelation->rd_id; + for (int i = 0; i < shardSplitInfoArraySize; i++) + { + shardSplitInfo = &shardSplitInfoArray[i]; + if (shardSplitInfo->sourceShardOid == sourceShardOid) + { + distributedTableOid = shardSplitInfo->distributedTableOid; + partitionColumnIndex = shardSplitInfo->partitionColumnIndex; + break; + } + } + + /* + * The commit can belong to any other table that is not going + * under split. Ignore such commit's. + */ + if (partitionColumnIndex == -1 || + distributedTableOid == InvalidOid) + { + /* + * TODO(saawasek): Change below warning to DEBUG once more test case + * are added. + */ + ereport(WARNING, errmsg("Skipping Commit as " + "Relation: %s isn't splitting", + RelationGetRelationName(sourceShardRelation))); + *shouldHandleChange = false; + return 0; + } + + CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(distributedTableOid); + if (cacheEntry == NULL) + { + ereport(ERROR, errmsg("null entry found for cacheEntry")); + } + + TupleDesc relationTupleDes = RelationGetDescr(sourceShardRelation); + bool isNull = false; + Datum partitionColumnValue = heap_getattr(tuple, + partitionColumnIndex + 1, + relationTupleDes, + &isNull); + + FmgrInfo *hashFunction = cacheEntry->hashFunction; + + /* get hashed value of the distribution value */ + Datum hashedValueDatum = FunctionCall1(hashFunction, partitionColumnValue); + int hashedValue = DatumGetInt32(hashedValueDatum); + + *shouldHandleChange = true; + + return hashedValue; +} + + +/* + * FindTargetRelationOid returns the destination relation Oid for the incoming + * tuple. + * sourceShardRelation - Relation on which a commit has happened. + * tuple - changed tuple. + * currentSlotName - Name of replication slot that is processing this update. + */ +Oid +FindTargetRelationOid(Relation sourceShardRelation, + HeapTuple tuple, + char *currentSlotName) +{ + Oid targetRelationOid = InvalidOid; + Oid sourceShardRelationOid = sourceShardRelation->rd_id; + + bool bShouldHandleUpdate = false; + int hashValue = GetHashValueForIncomingTuple(sourceShardRelation, tuple, + &bShouldHandleUpdate); + if (bShouldHandleUpdate == false) + { + return InvalidOid; + } + + for (int i = 0; i < shardSplitInfoArraySize; i++) + { + ShardSplitInfo *shardSplitInfo = &shardSplitInfoArray[i]; + + /* + * Each commit message is processed by all the configured + * replication slots. However, a replication is slot only responsible + * for new shard placements belonging to a single node. We check if the + * current slot which is processing the commit should emit + * a target relation Oid. + */ + if (strcmp(shardSplitInfo->slotName, currentSlotName) == 0 && + shardSplitInfo->sourceShardOid == sourceShardRelationOid && + shardSplitInfo->shardMinValue <= hashValue && + shardSplitInfo->shardMaxValue >= hashValue) + { + targetRelationOid = shardSplitInfo->splitChildShardOid; + break; + } + } + + return targetRelationOid; +} + + +/* + * ShouldCommitBeApplied avoids recursive commit case when source shard and + * new split child shards are placed on the same node. When the source shard + * recives a commit(1), the WAL sender processes this commit message. This + * commit is applied to a child shard which is placed on the same node as a + * part of replication. This in turn creates one more commit(2). + * Commit 2 should be skipped as the source shard and destination for commit 2 + * are same and the commit has already been applied. + */ +bool +ShouldCommitBeApplied(Relation sourceShardRelation) +{ + ShardSplitInfo *shardSplitInfo = NULL; + int partitionColumnIndex = -1; + Oid distributedTableOid = InvalidOid; + + Oid sourceShardOid = sourceShardRelation->rd_id; + for (int i = 0; i < shardSplitInfoArraySize; i++) + { + /* skip the commit when destination is equal to the source */ + shardSplitInfo = &shardSplitInfoArray[i]; + if (shardSplitInfo->splitChildShardOid == sourceShardOid) + { + return false; + } + } + + return true; +} + + +bool +ShouldSlotHandleChange(char *slotName, ReorderBufferChange *change) +{ + if (slotName == NULL) + { + ereport(ERROR, errmsg("Invalid null replication slot name.")); + } + + uint64_t nodeId = 0; + uint32_t slotType = 0; + dsm_handle dsmHandle; + + /* change this to enum */ + decode_replication_slot(slotName, &nodeId, &slotType, &dsmHandle); + if (slotType != SLOT_HANDLING_INSERT_AND_DELETE && + slotType != SLOT_HANDLING_DELETE_OF_UPDATE) + { + ereport(ERROR, errmsg("Invalid replication slot type.")); + } + + if (slotType == SLOT_HANDLING_DELETE_OF_UPDATE && + change->action != REORDER_BUFFER_CHANGE_UPDATE) + { + return false; + } + + return true; +} + +/* +* split_change function emits the incoming tuple change +* to the appropriate destination shard. +*/ +static void +split_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, + Relation relation, ReorderBufferChange *change) +{ + /* + * Get ShardSplitInfo array from Shared Memory if not already + * initialized. This gets initialized during the replication of + * first message. + */ + int arraySize = 0; + if (shardSplitInfoArray == NULL) + { + shardSplitInfoArray = + GetShardSplitInfoSMArrayForSlot(ctx->slot->data.name.data, + &arraySize); + shardSplitInfoArraySize = arraySize; + } + + char *replicationSlotName = ctx->slot->data.name.data; + bool shouldHandleChanges = false; + if (!ShouldSlotHandleChange(replicationSlotName, change)) + { + return; + } + + if (!ShouldCommitBeApplied(relation)) + { + return; + } + + uint64_t nodeId = 0; + uint32 slotType = 0; + dsm_handle dsmHandle = 0; + decode_replication_slot(replicationSlotName, &nodeId, &slotType, &dsmHandle); + Oid targetRelationOid = InvalidOid; + switch (change->action) + { + case REORDER_BUFFER_CHANGE_INSERT: + { + HeapTuple newTuple = &(change->data.tp.newtuple->tuple); + targetRelationOid = FindTargetRelationOid(relation, newTuple, + replicationSlotName); + break; + } + + case REORDER_BUFFER_CHANGE_UPDATE: + { + switch (slotType) + { + case SLOT_HANDLING_INSERT_AND_DELETE: + { + HeapTuple newTuple = &(change->data.tp.newtuple->tuple); + Oid destinationForInsert = FindTargetRelationOid(relation, newTuple, + replicationSlotName); + targetRelationOid = destinationForInsert; + change->action = REORDER_BUFFER_CHANGE_INSERT; + break; + } + + case SLOT_HANDLING_DELETE_OF_UPDATE: + { + char *modifiedSlotName = encode_replication_slot(nodeId, 0, + dsmHandle); + HeapTuple oldTuple = &(change->data.tp.oldtuple->tuple); + Oid destinationForDelete = FindTargetRelationOid(relation, oldTuple, + modifiedSlotName); + targetRelationOid = destinationForDelete; + change->action = REORDER_BUFFER_CHANGE_DELETE; + break; + } + } + + break; + } + + case REORDER_BUFFER_CHANGE_DELETE: + { + HeapTuple oldTuple = &(change->data.tp.oldtuple->tuple); + targetRelationOid = FindTargetRelationOid(relation, oldTuple, + replicationSlotName); + + break; + } + } + + if (targetRelationOid != InvalidOid) + { + Relation targetRelation = RelationIdGetRelation(targetRelationOid); + pgoutputChangeCB(ctx, txn, targetRelation, change); + RelationClose(targetRelation); + } +} diff --git a/src/backend/distributed/shardsplit/shardsplit_shared_memory.c b/src/backend/distributed/shardsplit/shardsplit_shared_memory.c new file mode 100644 index 000000000..973c007d1 --- /dev/null +++ b/src/backend/distributed/shardsplit/shardsplit_shared_memory.c @@ -0,0 +1,246 @@ +/*------------------------------------------------------------------------- + * + * shardsplit_shared_memory.c + * API's for creating and accessing shared memory segments to store + * shard split information. 'setup_shard_replication' UDF creates the + * shared memory, populates the contents and WAL sender processes are + * the consumers. + * + * Copyright (c) Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" +#include "miscadmin.h" +#include "lib/stringinfo.h" +#include "distributed/colocation_utils.h" +#include "distributed/shardsplit_shared_memory.h" +#include "distributed/citus_safe_lib.h" + +/* + * GetShardSplitInfoSMHeaderFromDSMHandle returns the header of the shared memory + * segment beloing to 'dsmHandle'. It pins the shared memory segment mapping till + * lifetime of the backend process accessing it. + */ +ShardSplitInfoSMHeader * +GetShardSplitInfoSMHeaderFromDSMHandle(dsm_handle dsmHandle, + dsm_segment **attachedSegment) +{ + dsm_segment *dsmSegment = dsm_find_mapping(dsmHandle); + + if (dsmSegment == NULL) + { + dsmSegment = dsm_attach(dsmHandle); + } + + if (dsmSegment == NULL) + { + ereport(ERROR, + (errmsg("could not attach to dynamic shared memory segment " + "corresponding to handle:%u", dsmHandle))); + } + + /* Remain attached until end of backend or DetachSession(). */ + dsm_pin_mapping(dsmSegment); + + ShardSplitInfoSMHeader *header = (ShardSplitInfoSMHeader *) dsm_segment_address( + dsmSegment); + + *attachedSegment = dsmSegment; + + return header; +} + + +/* + * GetShardSplitInfoSMArrayForSlot returns pointer to the array of + * 'ShardSplitInfo' struct stored in the shared memory segment. + */ +ShardSplitInfo * +GetShardSplitInfoSMArrayForSlot(char *slotName, int *arraySize) +{ + if (slotName == NULL || + arraySize == NULL) + { + ereport(ERROR, + (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED), + errmsg("Expected slot name and array size arguments"))); + } + + dsm_handle dsmHandle = GetSMHandleFromSlotName(slotName); + dsm_segment *dsmSegment = NULL; + ShardSplitInfoSMHeader *shardSplitInfoSMHeader = + GetShardSplitInfoSMHeaderFromDSMHandle(dsmHandle, + &dsmSegment); + *arraySize = shardSplitInfoSMHeader->stepCount; + + ShardSplitInfo *shardSplitInfoArray = + (ShardSplitInfo *) ShardSplitInfoSMSteps(shardSplitInfoSMHeader); + + return shardSplitInfoArray; +} + + +/* + * GetSMHandleFromSlotName function returns the shared memory handle + * from the replication slot name. Replication slot name is encoded as + * "NODEID_SlotType_SharedMemoryHANDLE". + */ +dsm_handle +GetSMHandleFromSlotName(char *slotName) +{ + if (slotName == NULL) + { + ereport(ERROR, + errmsg("Invalid NULL replication slot name.")); + } + + uint64_t nodeId = 0; + uint32_t type = 0; + dsm_handle handle = 0; + decode_replication_slot(slotName, &nodeId, &type, &handle); + + return handle; +} + + +/* + * CreateShardSplitInfoSharedMemory is used to create a place to store + * information about the shard undergoing a split. The function creates dynamic + * shared memory segment consisting of a header regarding the processId and an + * array of "steps" which store ShardSplitInfo. The contents of this shared + * memory segment are consumed by WAL sender process during catch up phase of + * replication through logical decoding plugin. + * + * The shared memory segment exists till the catch up phase completes or the + * postmaster shutsdown. + */ +ShardSplitInfoSMHeader * +CreateShardSplitInfoSharedMemory(int stepCount, Size stepSize, dsm_handle *dsmHandle) +{ + if (stepSize <= 0 || stepCount <= 0) + { + ereport(ERROR, + (errmsg("number of steps and size of each step should be " + "positive values"))); + } + + Size totalSize = sizeof(ShardSplitInfoSMHeader) + stepSize * stepCount; + dsm_segment *dsmSegment = dsm_create(totalSize, DSM_CREATE_NULL_IF_MAXSEGMENTS); + + if (dsmSegment == NULL) + { + ereport(ERROR, + (errmsg("could not create a dynamic shared memory segment to " + "keep shard split info"))); + } + + *dsmHandle = dsm_segment_handle(dsmSegment); + + /* + * Pin the segment till Postmaster shutsdown since we need this + * segment even after the session ends for replication catchup phase. + */ + dsm_pin_segment(dsmSegment); + + ShardSplitInfoSMHeader *shardSplitInfoSMHeader = + GetShardSplitInfoSMHeaderFromDSMHandle(*dsmHandle, &dsmSegment); + + shardSplitInfoSMHeader->stepCount = stepCount; + shardSplitInfoSMHeader->processId = MyProcPid; + + return shardSplitInfoSMHeader; +} + + +/* + * GetSharedMemoryForShardSplitInfo is a wrapper function which creates shared memory + * for storing shard split infomation. The function returns pointer the first element + * within this array. + * + * shardSplitInfoCount - number of 'ShardSplitInfo ' elements to be allocated + * dsmHandle - handle of the allocated shared memory segment + */ +ShardSplitInfo * +GetSharedMemoryForShardSplitInfo(int shardSplitInfoCount, dsm_handle *dsmHandle) +{ + ShardSplitInfoSMHeader *shardSplitInfoSMHeader = + CreateShardSplitInfoSharedMemory(shardSplitInfoCount, + sizeof(ShardSplitInfo), + dsmHandle); + ShardSplitInfo *shardSplitInfoSMArray = + (ShardSplitInfo *) ShardSplitInfoSMSteps(shardSplitInfoSMHeader); + + return shardSplitInfoSMArray; +} + + +/* + * ShardSplitInfoSMSteps returns a pointer to the array of 'ShardSplitInfo' + * steps that are stored in shared memory segment. This is simply the data + * right after the header, so this function is trivial. The main purpose of + * this function is to make the intent clear to readers of the code. + */ +void * +ShardSplitInfoSMSteps(ShardSplitInfoSMHeader *shardSplitInfoSMHeader) +{ + return shardSplitInfoSMHeader + 1; +} + + +/* + * encode_replication_slot returns an encoded replication slot name + * in the following format. + * Slot Name = NodeId_ReplicationSlotType_SharedMemoryHandle + */ +char * +encode_replication_slot(uint64_t nodeId, + uint32 slotType, + dsm_handle dsmHandle) +{ + StringInfo slotName = makeStringInfo(); + appendStringInfo(slotName, "%ld_%u_%u", nodeId, slotType, dsmHandle); + return slotName->data; +} + + +/* + * decode_replication_slot decodes the replication slot name + * into node id, slotType, shared memory handle. + */ +void +decode_replication_slot(char *slotName, + uint64_t *nodeId, + uint32_t *slotType, + dsm_handle *dsmHandle) +{ + if (slotName == NULL) + { + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("Invalid null replication slot name."))); + } + + int index = 0; + char *strtokPosition = NULL; + char *dupSlotName = pstrdup(slotName); + char *slotNameString = strtok_r(dupSlotName, "_", &strtokPosition); + while (slotNameString != NULL) + { + if (index == 0) + { + *nodeId = SafeStringToUint64(slotNameString); + } + else if (index == 1) + { + *slotType = strtoul(slotNameString, NULL, 10); + } + else if (index == 2) + { + *dsmHandle = strtoul(slotNameString, NULL, 10); + } + slotNameString = strtok_r(NULL, "_", &strtokPosition); + index++; + } +} diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index ad479fe32..f6033b824 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -105,9 +105,6 @@ #include "columnar/columnar.h" -/* marks shared object as one loadable by the postgres version compiled against */ -PG_MODULE_MAGIC; - ColumnarSupportsIndexAM_type extern_ColumnarSupportsIndexAM = NULL; CompressionTypeStr_type extern_CompressionTypeStr = NULL; IsColumnarTableAmTable_type extern_IsColumnarTableAmTable = NULL; diff --git a/src/backend/distributed/sql/citus--11.0-2--11.1-1.sql b/src/backend/distributed/sql/citus--11.0-2--11.1-1.sql index e7d2c67ff..e3667acf4 100644 --- a/src/backend/distributed/sql/citus--11.0-2--11.1-1.sql +++ b/src/backend/distributed/sql/citus--11.0-2--11.1-1.sql @@ -10,3 +10,4 @@ DROP FUNCTION pg_catalog.worker_repartition_cleanup(bigint); #include "../../columnar/sql/columnar--11.0-2--11.1-1.sql" #include "udfs/citus_split_shard_by_split_points/11.0-2.sql" #include "udfs/worker_split_copy/11.0-2.sql" +#include "udfs/split_shard_replication_setup/11.0-2.sql" diff --git a/src/backend/distributed/sql/udfs/split_shard_replication_setup/11.0-2.sql b/src/backend/distributed/sql/udfs/split_shard_replication_setup/11.0-2.sql new file mode 100644 index 000000000..3ae0fbb2a --- /dev/null +++ b/src/backend/distributed/sql/udfs/split_shard_replication_setup/11.0-2.sql @@ -0,0 +1 @@ +CREATE OR REPLACE FUNCTION pg_catalog.split_shard_replication_setup(shardInfo integer[][]) RETURNS bigint LANGUAGE C STRICT AS 'MODULE_PATHNAME', $$split_shard_replication_setup$$; \ No newline at end of file diff --git a/src/backend/distributed/sql/udfs/split_shard_replication_setup/latest.sql b/src/backend/distributed/sql/udfs/split_shard_replication_setup/latest.sql new file mode 100644 index 000000000..3ae0fbb2a --- /dev/null +++ b/src/backend/distributed/sql/udfs/split_shard_replication_setup/latest.sql @@ -0,0 +1 @@ +CREATE OR REPLACE FUNCTION pg_catalog.split_shard_replication_setup(shardInfo integer[][]) RETURNS bigint LANGUAGE C STRICT AS 'MODULE_PATHNAME', $$split_shard_replication_setup$$; \ No newline at end of file diff --git a/src/include/distributed/pgoutput.h b/src/include/distributed/pgoutput.h new file mode 100644 index 000000000..e69de29bb diff --git a/src/include/distributed/shard_split.h b/src/include/distributed/shard_split.h index 790e3d612..3e6f9acc6 100644 --- a/src/include/distributed/shard_split.h +++ b/src/include/distributed/shard_split.h @@ -29,8 +29,23 @@ typedef enum SplitOperation } SplitOperation; /* - * SplitShard API to split a given shard (or shard group) using split mode and - * specified split points to a set of destination nodes. + * In-memory representation of a split child shard. + */ +typedef struct ShardSplitInfo +{ + Oid distributedTableOid; /* citus distributed table Oid */ + int partitionColumnIndex; + Oid sourceShardOid; /* parent shard Oid */ + Oid splitChildShardOid; /* child shard Oid */ + int32 shardMinValue; + int32 shardMaxValue; + uint64 nodeId; /* node where child shard is to be placed */ + char slotName[NAMEDATALEN]; /* replication slot name belonging to this node */ +} ShardSplitInfo; + +/* + * SplitShard API to split a given shard (or shard group) in blocking / non-blocking fashion + * based on specified split points to a set of destination nodes. */ extern void SplitShard(SplitMode splitMode, SplitOperation splitOperation, diff --git a/src/include/distributed/shardsplit_shared_memory.h b/src/include/distributed/shardsplit_shared_memory.h new file mode 100644 index 000000000..3ca41ffe7 --- /dev/null +++ b/src/include/distributed/shardsplit_shared_memory.h @@ -0,0 +1,84 @@ +/*------------------------------------------------------------------------- + * + * shardsplit_sharedmemory.h + * API's for creating and accessing shared memory segments to store + * shard split information. 'setup_shard_replication' UDF creates the + * shared memory, populates the contents and WAL sender processes are + * the consumers. + * + * Copyright (c) Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#ifndef SHARDSPLIT_SHARED_MEMORY_H +#define SHARDSPLIT_SHARED_MEMORY_H + +#include "postgres.h" +#include "c.h" +#include "fmgr.h" +#include "distributed/shard_split.h" + +/* + * Header of the shared memory segment where shard split information is stored. + */ +typedef struct ShardSplitInfoSMHeader +{ + uint64 processId; /* process id creating the shared memory segment */ + int stepCount; /* number of elements in the shared memory */ +} ShardSplitInfoSMHeader; + + +/* Functions for creating and accessing shared memory segments */ +extern ShardSplitInfoSMHeader * CreateShardSplitInfoSharedMemory(int stepCount, + Size stepSize, + dsm_handle *dsmHandle); + +extern ShardSplitInfo * GetSharedMemoryForShardSplitInfo(int shardSplitInfoCount, + dsm_handle *dsmHandle); + +extern ShardSplitInfo * GetShardSplitInfoSMArrayForSlot(char *slotName, int *arraySize); + +extern dsm_handle GetSMHandleFromSlotName(char *slotName); + +/* + * ShardSplitInfoSMSteps returns a pointer to the array of shard split info + * steps that are stored in shared memory. + */ +extern void * ShardSplitInfoSMSteps(ShardSplitInfoSMHeader *shardSplitInfoSMHeader); + +extern ShardSplitInfoSMHeader * GetShardSplitInfoSMHeaderFromDSMHandle(dsm_handle + dsmHandle, + dsm_segment ** + attachedSegment); + +/* + * An UPADATE request for a partition key, is realized as 'DELETE' on + * old shard and 'INSERT' on new shard. So a commit of UPDATE has to be + * seggrated in two replication messages. WAL sender belonging to a + * replication slot can send only one message and hence to handle UPDATE we + * have to create one extra replication slot per node that handles the deletion + * part of an UPDATE. + * + * SLOT_HANDING_INSERT_AND_DELETE - Responsible for handling INSERT and DELETE + * operations. + * SLOT_HANDLING_DELETE_OF_UPDATE - Responsible for only handling DELETE on old shard + * for an UPDATE. Its a no-op for INSERT and DELETE + * operations. + */ +enum ReplicationSlotType +{ + SLOT_HANDLING_INSERT_AND_DELETE, + SLOT_HANDLING_DELETE_OF_UPDATE +}; + +/* Functions related to encoding-decoding for replication slot name */ +char * encode_replication_slot(uint64_t nodeId, + uint32 slotType, + dsm_handle dsmHandle); +void decode_replication_slot(char *slotName, + uint64_t *nodeId, + uint32_t *slotType, + dsm_handle *dsmHandle); + +#endif /* SHARDSPLIT_SHARED_MEMORY_H */ diff --git a/src/test/regress/expected/split_shard_replication_setup.out b/src/test/regress/expected/split_shard_replication_setup.out new file mode 100644 index 000000000..ab9bfaf0c --- /dev/null +++ b/src/test/regress/expected/split_shard_replication_setup.out @@ -0,0 +1,201 @@ +CREATE SCHEMA citus_split_shard_by_split_points; +SET search_path TO citus_split_shard_by_split_points; +SET citus.shard_replication_factor TO 1; +SET citus.shard_count TO 1; +SET citus.next_shard_id TO 1; +-- Add two additional nodes to cluster. +SELECT 1 FROM citus_add_node('localhost', :worker_1_port); + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + +SELECT 1 FROM citus_add_node('localhost', :worker_2_port); + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + +-- Create distributed table (non co-located) +CREATE TABLE table_to_split (id bigserial PRIMARY KEY, value char); +SELECT create_distributed_table('table_to_split','id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +-- slotName_table is used to persist replication slot name. +-- It is only used for testing as the worker2 needs to create subscription over the same replication slot. +CREATE TABLE slotName_table (name text, id int primary key); +SELECT create_distributed_table('slotName_table','id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +-- Shard with id '1' of table table_to_split is undergoing a split into two new shards +-- with id '2' and '3' respectively. table_to_split_1 is placed on worker1(NodeId 16) and +-- new child shards, table_to_split_2 and table_to_split_3 are placed on worker2(NodeId 18). +-- TODO(saawasek): make it parameterized +CREATE OR REPLACE FUNCTION SplitShardReplicationSetup() RETURNS text AS $$ +DECLARE + memoryId bigint := 0; + memoryIdText text; +begin + SELECT * into memoryId from split_shard_replication_setup(ARRAY[ARRAY[1,2,-2147483648,-1,18], ARRAY[1,3,0,2147483647,18]]); + SELECT FORMAT('%s', memoryId) into memoryIdText; + return memoryIdText; +end +$$ LANGUAGE plpgsql; +-- Sets up split shard information and returns Slot Name in format : DestinationNodeId_SlotType_SharedMemoryId +-- TODO(saawasek): make it parameterized +CREATE OR REPLACE FUNCTION CreateReplicationSlot() RETURNS text AS $$ +DECLARE + replicationSlotName text; + createdSlotName text; + sharedMemoryId text; + derivedSlotName text; +begin + SELECT * into sharedMemoryId from SplitShardReplicationSetup(); + -- '18' is nodeId of worker2 + SELECT FORMAT('18_0_%s', sharedMemoryId) into derivedSlotName; + SELECT slot_name into replicationSlotName from pg_create_logical_replication_slot(derivedSlotName, 'logical_decoding_plugin'); + INSERT INTO slotName_table values(replicationSlotName, 1); + return replicationSlotName; +end +$$ LANGUAGE plpgsql; +CREATE OR REPLACE FUNCTION CreateSubscription() RETURNS text AS $$ +DECLARE + replicationSlotName text; + nodeportLocal int; + subname text; +begin + SELECT name into replicationSlotName from slotName_table; + EXECUTE FORMAT($sub$create subscription subforID1 connection 'host=localhost port=xxxxx user=postgres dbname=regression' publication PUB1 with(create_slot=false, enabled=true, slot_name='%s', copy_data=false)$sub$, replicationSlotName); + return 'a'; +end +$$ LANGUAGE plpgsql; +-- Test scenario starts from here +-- 1. table_to_split is a citus distributed table +-- 2. Shard table_to_split_1 is located on worker1. +-- 3. table_to_split_1 is split into table_to_split_2 and table_to_split_3. +-- table_to_split_2/3 are located on worker2 +-- 4. execute UDF split_shard_replication_setup on worker1 with below +-- params: +-- split_shard_replication_setup +-- ( +-- ARRAY[ +-- ARRAY[1 /*source shardId */, 2 /* new shardId */,-2147483648 /* minHashValue */, -1 /* maxHasValue */ , 18 /* nodeId where new shard is placed */ ], +-- ARRAY[1, 3 , 0 , 2147483647, 18 ] +-- ] +-- ); +-- 5. Create Replication slot with 'logical_decoding_plugin' +-- 6. Setup Pub/Sub +-- 7. Insert into table_to_split_1 at source worker1 +-- 8. Expect the results in either table_to_split_2 or table_to_split_2 at worker2 +\c - - - :worker_2_port +SET search_path TO citus_split_shard_by_split_points; +CREATE TABLE table_to_split_1(id bigserial PRIMARY KEY, value char); +CREATE TABLE table_to_split_2(id bigserial PRIMARY KEY, value char); +CREATE TABLE table_to_split_3(id bigserial PRIMARY KEY, value char); +-- Create dummy shard tables(table_to_split_2/3) at worker1 +-- This is needed for Pub/Sub framework to work. +\c - - - :worker_1_port +SET search_path TO citus_split_shard_by_split_points; +BEGIN; + CREATE TABLE table_to_split_2(id bigserial PRIMARY KEY, value char); + CREATE TABLE table_to_split_3(id bigserial PRIMARY KEY, value char); +COMMIT; +-- Create publication at worker1 +BEGIN; + CREATE PUBLICATION PUB1 for table table_to_split_1, table_to_split_2, table_to_split_3; +COMMIT; +-- Create replication slot and setup shard split information at worker1 +BEGIN; +select 1 from CreateReplicationSlot(); + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + +COMMIT; +\c - - - :worker_2_port +SET search_path TO citus_split_shard_by_split_points; +-- Create subscription at worker2 with copy_data to 'false' and derived replication slot name +BEGIN; +SELECT 1 from CreateSubscription(); + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + +COMMIT; +-- No data is present at this moment in all the below tables at worker2 +SELECT * from table_to_split_1; + id | value +--------------------------------------------------------------------- +(0 rows) + +SELECT * from table_to_split_2; + id | value +--------------------------------------------------------------------- +(0 rows) + +SELECT * from table_to_split_3; + id | value +--------------------------------------------------------------------- +(0 rows) + +select pg_sleep(10); + pg_sleep +--------------------------------------------------------------------- + +(1 row) + +-- Insert data in table_to_split_1 at worker1 +\c - - - :worker_1_port +SET search_path TO citus_split_shard_by_split_points; +INSERT into table_to_split_1 values(100, 'a'); +INSERT into table_to_split_1 values(400, 'a'); +INSERT into table_to_split_1 values(500, 'a'); +SELECT * from table_to_split_1; + id | value +--------------------------------------------------------------------- + 100 | a + 400 | a + 500 | a +(3 rows) + +select pg_sleep(10); + pg_sleep +--------------------------------------------------------------------- + +(1 row) + +-- Expect data to be present in shard xxxxx and shard xxxxx based on the hash value. +\c - - - :worker_2_port +select pg_sleep(10); + pg_sleep +--------------------------------------------------------------------- + +(1 row) + +SET search_path TO citus_split_shard_by_split_points; +SELECT * from table_to_split_1; -- should alwasy have zero rows + id | value +--------------------------------------------------------------------- +(0 rows) + +SELECT * from table_to_split_2; + id | value +--------------------------------------------------------------------- + 400 | a +(1 row) + +SELECT * from table_to_split_3; + id | value +--------------------------------------------------------------------- + 100 | a + 500 | a +(2 rows) + diff --git a/src/test/regress/sql/split_shard_replication_setup.sql b/src/test/regress/sql/split_shard_replication_setup.sql new file mode 100644 index 000000000..eb5ada6cf --- /dev/null +++ b/src/test/regress/sql/split_shard_replication_setup.sql @@ -0,0 +1,138 @@ +CREATE SCHEMA citus_split_shard_by_split_points; +SET search_path TO citus_split_shard_by_split_points; +SET citus.shard_replication_factor TO 1; +SET citus.shard_count TO 1; +SET citus.next_shard_id TO 1; + +-- Add two additional nodes to cluster. +SELECT 1 FROM citus_add_node('localhost', :worker_1_port); +SELECT 1 FROM citus_add_node('localhost', :worker_2_port); + +-- Create distributed table (non co-located) +CREATE TABLE table_to_split (id bigserial PRIMARY KEY, value char); +SELECT create_distributed_table('table_to_split','id'); + +-- slotName_table is used to persist replication slot name. +-- It is only used for testing as the worker2 needs to create subscription over the same replication slot. +CREATE TABLE slotName_table (name text, id int primary key); +SELECT create_distributed_table('slotName_table','id'); + +-- Shard with id '1' of table table_to_split is undergoing a split into two new shards +-- with id '2' and '3' respectively. table_to_split_1 is placed on worker1(NodeId 16) and +-- new child shards, table_to_split_2 and table_to_split_3 are placed on worker2(NodeId 18). +-- TODO(saawasek): make it parameterized +CREATE OR REPLACE FUNCTION SplitShardReplicationSetup() RETURNS text AS $$ +DECLARE + memoryId bigint := 0; + memoryIdText text; +begin + SELECT * into memoryId from split_shard_replication_setup(ARRAY[ARRAY[1,2,-2147483648,-1,18], ARRAY[1,3,0,2147483647,18]]); + SELECT FORMAT('%s', memoryId) into memoryIdText; + return memoryIdText; +end +$$ LANGUAGE plpgsql; + +-- Sets up split shard information and returns Slot Name in format : DestinationNodeId_SlotType_SharedMemoryId +-- TODO(saawasek): make it parameterized +CREATE OR REPLACE FUNCTION CreateReplicationSlot() RETURNS text AS $$ +DECLARE + replicationSlotName text; + createdSlotName text; + sharedMemoryId text; + derivedSlotName text; +begin + SELECT * into sharedMemoryId from SplitShardReplicationSetup(); + -- '18' is nodeId of worker2 + SELECT FORMAT('18_0_%s', sharedMemoryId) into derivedSlotName; + SELECT slot_name into replicationSlotName from pg_create_logical_replication_slot(derivedSlotName, 'logical_decoding_plugin'); + INSERT INTO slotName_table values(replicationSlotName, 1); + return replicationSlotName; +end +$$ LANGUAGE plpgsql; + +CREATE OR REPLACE FUNCTION CreateSubscription() RETURNS text AS $$ +DECLARE + replicationSlotName text; + nodeportLocal int; + subname text; +begin + SELECT name into replicationSlotName from slotName_table; + EXECUTE FORMAT($sub$create subscription subforID1 connection 'host=localhost port=57637 user=postgres dbname=regression' publication PUB1 with(create_slot=false, enabled=true, slot_name='%s', copy_data=false)$sub$, replicationSlotName); + return 'a'; +end +$$ LANGUAGE plpgsql; + +-- Test scenario starts from here +-- 1. table_to_split is a citus distributed table +-- 2. Shard table_to_split_1 is located on worker1. +-- 3. table_to_split_1 is split into table_to_split_2 and table_to_split_3. +-- table_to_split_2/3 are located on worker2 +-- 4. execute UDF split_shard_replication_setup on worker1 with below +-- params: +-- split_shard_replication_setup +-- ( +-- ARRAY[ +-- ARRAY[1 /*source shardId */, 2 /* new shardId */,-2147483648 /* minHashValue */, -1 /* maxHasValue */ , 18 /* nodeId where new shard is placed */ ], +-- ARRAY[1, 3 , 0 , 2147483647, 18 ] +-- ] +-- ); +-- 5. Create Replication slot with 'logical_decoding_plugin' +-- 6. Setup Pub/Sub +-- 7. Insert into table_to_split_1 at source worker1 +-- 8. Expect the results in either table_to_split_2 or table_to_split_2 at worker2 + +\c - - - :worker_2_port +SET search_path TO citus_split_shard_by_split_points; +CREATE TABLE table_to_split_1(id bigserial PRIMARY KEY, value char); +CREATE TABLE table_to_split_2(id bigserial PRIMARY KEY, value char); +CREATE TABLE table_to_split_3(id bigserial PRIMARY KEY, value char); + +-- Create dummy shard tables(table_to_split_2/3) at worker1 +-- This is needed for Pub/Sub framework to work. +\c - - - :worker_1_port +SET search_path TO citus_split_shard_by_split_points; +BEGIN; + CREATE TABLE table_to_split_2(id bigserial PRIMARY KEY, value char); + CREATE TABLE table_to_split_3(id bigserial PRIMARY KEY, value char); +COMMIT; + +-- Create publication at worker1 +BEGIN; + CREATE PUBLICATION PUB1 for table table_to_split_1, table_to_split_2, table_to_split_3; +COMMIT; + +-- Create replication slot and setup shard split information at worker1 +BEGIN; +select 1 from CreateReplicationSlot(); +COMMIT; + +\c - - - :worker_2_port +SET search_path TO citus_split_shard_by_split_points; + +-- Create subscription at worker2 with copy_data to 'false' and derived replication slot name +BEGIN; +SELECT 1 from CreateSubscription(); +COMMIT; + +-- No data is present at this moment in all the below tables at worker2 +SELECT * from table_to_split_1; +SELECT * from table_to_split_2; +SELECT * from table_to_split_3; +select pg_sleep(10); + +-- Insert data in table_to_split_1 at worker1 +\c - - - :worker_1_port +SET search_path TO citus_split_shard_by_split_points; +INSERT into table_to_split_1 values(100, 'a'); +INSERT into table_to_split_1 values(400, 'a'); +INSERT into table_to_split_1 values(500, 'a'); +SELECT * from table_to_split_1; +select pg_sleep(10); + +-- Expect data to be present in shard 2 and shard 3 based on the hash value. +\c - - - :worker_2_port +select pg_sleep(10); +SET search_path TO citus_split_shard_by_split_points; +SELECT * from table_to_split_1; -- should alwasy have zero rows +SELECT * from table_to_split_2; +SELECT * from table_to_split_3; \ No newline at end of file