From b5b14ed0bd70d0030fe897340d93b69ecffc1b26 Mon Sep 17 00:00:00 2001 From: Nitish Upreti Date: Mon, 13 Jun 2022 16:32:26 -0700 Subject: [PATCH] Blocking split setup --- .../citus_split_shard_by_split_points.c | 89 +++ .../{split_shards.c => isolate_shards.c} | 0 .../distributed/operations/shard_split.c | 516 ++++++++++++++++++ .../distributed/sql/citus--11.0-2--11.1-1.sql | 1 + .../11.0-2.sql | 14 + .../latest.sql | 14 + src/backend/distributed/utils/array_type.c | 19 + .../distributed/coordinator_protocol.h | 3 + src/include/distributed/shard_split.h | 40 ++ src/include/distributed/utils/array_type.h | 2 +- 10 files changed, 697 insertions(+), 1 deletion(-) create mode 100644 src/backend/distributed/operations/citus_split_shard_by_split_points.c rename src/backend/distributed/operations/{split_shards.c => isolate_shards.c} (100%) create mode 100644 src/backend/distributed/operations/shard_split.c create mode 100644 src/backend/distributed/sql/udfs/citus_split_shard_by_split_points/11.0-2.sql create mode 100644 src/backend/distributed/sql/udfs/citus_split_shard_by_split_points/latest.sql create mode 100644 src/include/distributed/shard_split.h diff --git a/src/backend/distributed/operations/citus_split_shard_by_split_points.c b/src/backend/distributed/operations/citus_split_shard_by_split_points.c new file mode 100644 index 000000000..5503d6c2b --- /dev/null +++ b/src/backend/distributed/operations/citus_split_shard_by_split_points.c @@ -0,0 +1,89 @@ +/*------------------------------------------------------------------------- + * + * citus_split_shard_by_split_points.c + * + * This file contains functions to split a shard. + * + * Copyright (c) Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" +#include "nodes/pg_list.h" +#include "distributed/utils/array_type.h" +#include "lib/stringinfo.h" +#include "utils/builtins.h" +#include "utils/lsyscache.h" +#include "distributed/colocation_utils.h" +#include "distributed/metadata_cache.h" +#include "distributed/shardinterval_utils.h" +#include "distributed/coordinator_protocol.h" +#include "distributed/connection_management.h" +#include "distributed/remote_commands.h" +#include "distributed/shard_split.h" + +/* declarations for dynamic loading */ +PG_FUNCTION_INFO_V1(citus_split_shard_by_split_points); + +static SplitMode LookupSplitMode(Oid shardSplitModeOid); + +/* + * citus_split_shard_by_split_points(shard_id bigint, split_points integer[], node_ids integer[]) + * Split source shard into multiple shards using the given split points. + * 'shard_id' is the id of source shard to split. + * 'split_points' is an array of integers that represents the split points. + * 'node_ids' is an array of integers that represents the placement node ids of the new shards. + * 'split_mode citus.split_mode' is the mode of split. + */ +Datum +citus_split_shard_by_split_points(PG_FUNCTION_ARGS) +{ + CheckCitusVersion(ERROR); + EnsureCoordinator(); + + uint64 shardIdToSplit = DatumGetUInt64(PG_GETARG_DATUM(0)); + + ArrayType *splitPointsArrayObject = PG_GETARG_ARRAYTYPE_P(1); + List *shardSplitPointsList = IntegerArrayTypeToList(splitPointsArrayObject); + + ArrayType *nodeIdsArrayObject = PG_GETARG_ARRAYTYPE_P(2); + List *nodeIdsForPlacementList = IntegerArrayTypeToList(nodeIdsArrayObject); + + Oid shardSplitModeOid = PG_GETARG_OID(3); + SplitMode shardSplitMode = LookupSplitMode(shardSplitModeOid); + + SplitShard( + shardSplitMode, + SHARD_SPLIT_API, + shardIdToSplit, + shardSplitPointsList, + nodeIdsForPlacementList); + + PG_RETURN_VOID(); +} + +/* + * LookupSplitMode maps the oids of citus.shard_split_mode enum + * values to a char. + */ +SplitMode +LookupSplitMode(Oid shardSplitModeOid) +{ + SplitMode shardSplitMode = BLOCKING_SPLIT; + + Datum enumLabelDatum = DirectFunctionCall1(enum_out, shardSplitModeOid); + char *enumLabel = DatumGetCString(enumLabelDatum); + + if (strncmp(enumLabel, "blocking", NAMEDATALEN) == 0) + { + shardSplitMode = BLOCKING_SPLIT; + } + /* Extend with other modes as we support them */ + else + { + ereport(ERROR, (errmsg("Invalid label for enum: %s", enumLabel))); + } + + return shardSplitMode; +} diff --git a/src/backend/distributed/operations/split_shards.c b/src/backend/distributed/operations/isolate_shards.c similarity index 100% rename from src/backend/distributed/operations/split_shards.c rename to src/backend/distributed/operations/isolate_shards.c diff --git a/src/backend/distributed/operations/shard_split.c b/src/backend/distributed/operations/shard_split.c new file mode 100644 index 000000000..a2e4f8bf8 --- /dev/null +++ b/src/backend/distributed/operations/shard_split.c @@ -0,0 +1,516 @@ +/*------------------------------------------------------------------------- + * + * shard_split.c + * + * Function definitions for the shard split. + * + * Copyright (c) Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" +#include "nodes/pg_list.h" +#include "utils/array.h" +#include "distributed/utils/array_type.h" +#include "lib/stringinfo.h" +#include "utils/builtins.h" +#include "utils/lsyscache.h" +#include "distributed/colocation_utils.h" +#include "distributed/metadata_cache.h" +#include "distributed/shardinterval_utils.h" +#include "distributed/coordinator_protocol.h" +#include "distributed/connection_management.h" +#include "distributed/remote_commands.h" +#include "distributed/shard_split.h" +#include "distributed/reference_table_utils.h" +#include "distributed/multi_partitioning_utils.h" +#include "distributed/worker_transaction.h" +#include "distributed/shared_library_init.h" + +/* Function declarations */ +static void ErrorIfCannotSplitShardExtended(SplitOperation splitOperation, + ShardInterval *shardIntervalToSplit, + List *shardSplitPointsList, + List *nodeIdsForPlacementList); +/* TODO (niupre) : This will be public function when citus-enterprise is merged */ +static void ErrorIfCannotSplitShard(SplitOperation splitOperation, ShardInterval *sourceShard); +static void CreateSplitShardsForShardGroup(List *splitChildrenShardIntervalListList, + List *workersForPlacementList); +static void CreateShardOnPlacement(List *splitShardCreationCommandList, + WorkerNode *workerNode); +static List * CreateSplitIntervalsForShardGroup(List *sourceColocatedShardList, + List *splitPointsForShard); +static void CreateSplitIntervalsForShard(ShardInterval *sourceShard, + List *splitPointsForShard, + List **shardSplitChildrenIntervalList); +static void BlockingShardSplit(SplitOperation splitOperation, + ShardInterval *shardIntervalToSplit, + List *shardSplitPointsList, + List *workersForPlacementList); + +/* Customize error message strings based on operation type */ +static const char *const SplitOperationName[] = +{ + [SHARD_SPLIT_API] = "split" +}; +static const char *const SplitTargetName[] = +{ + [SHARD_SPLIT_API] = "shard" +}; +static const char *const SplitOperationType[] = +{ + [BLOCKING_SPLIT] = "blocking" +}; + +/* Function definitions */ + +/* + * ErrorIfCannotSplitShard checks relation kind and invalid shards. It errors + * out if we are not able to split the given shard. + */ +void +ErrorIfCannotSplitShard(SplitOperation splitOperation, ShardInterval *sourceShard) +{ + Oid relationId = sourceShard->relationId; + ListCell *colocatedTableCell = NULL; + ListCell *colocatedShardCell = NULL; + + /* checks for table ownership and foreign tables */ + List *colocatedTableList = ColocatedTableList(relationId); + foreach(colocatedTableCell, colocatedTableList) + { + Oid colocatedTableId = lfirst_oid(colocatedTableCell); + + /* check that user has owner rights in all co-located tables */ + EnsureTableOwner(colocatedTableId); + + char relationKind = get_rel_relkind(colocatedTableId); + if (relationKind == RELKIND_FOREIGN_TABLE) + { + char *relationName = get_rel_name(colocatedTableId); + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot %s %s because \"%s\" is a " + "foreign table", + SplitOperationName[splitOperation], + SplitTargetName[splitOperation], + relationName), + errdetail("Splitting shards backed by foreign tables " + "is not supported."))); + } + + /* + * At the moment, we do not support copying a shard if that shard's + * relation is in a colocation group with a partitioned table or partition. + */ + if (PartitionedTable(colocatedTableId)) + { + char *sourceRelationName = get_rel_name(relationId); + char *colocatedRelationName = get_rel_name(colocatedTableId); + + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot %s of '%s', because it " + "is a partitioned table", + SplitOperationName[splitOperation], + colocatedRelationName), + errdetail("In colocation group of '%s', a partitioned " + "relation exists: '%s'. Citus does not support " + "%s of partitioned tables.", + sourceRelationName, + colocatedRelationName, + SplitOperationName[splitOperation]))); + } + } + + /* check shards with inactive placements */ + List *colocatedShardList = ColocatedShardIntervalList(sourceShard); + foreach(colocatedShardCell, colocatedShardList) + { + ShardInterval *shardInterval = (ShardInterval *) lfirst(colocatedShardCell); + uint64 shardId = shardInterval->shardId; + ListCell *shardPlacementCell = NULL; + + List *shardPlacementList = ShardPlacementListWithoutOrphanedPlacements(shardId); + foreach(shardPlacementCell, shardPlacementList) + { + ShardPlacement *placement = (ShardPlacement *) lfirst(shardPlacementCell); + if (placement->shardState != SHARD_STATE_ACTIVE) + { + char *relationName = get_rel_name(shardInterval->relationId); + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot %s %s because relation " + "\"%s\" has an inactive shard placement " + "for the shard %lu", + SplitOperationName[splitOperation], + SplitTargetName[splitOperation], + relationName, shardId), + errhint("Use master_copy_shard_placement UDF to " + "repair the inactive shard placement."))); + } + } + } +} + + +/* + * Exteded checks before we decide to split the shard. + * When all consumers (Example : ISOLATE_TENANT_TO_NEW_SHARD) directly call 'SplitShard' API, + * this method will be merged with 'ErrorIfCannotSplitShard' above. + */ +static void +ErrorIfCannotSplitShardExtended(SplitOperation splitOperation, + ShardInterval *shardIntervalToSplit, + List *shardSplitPointsList, + List *nodeIdsForPlacementList) +{ + CitusTableCacheEntry *cachedTableEntry = GetCitusTableCacheEntry( + shardIntervalToSplit->relationId); + + /* Perform checks common to both blocking and non-blocking Split API here. */ + if (!IsCitusTableTypeCacheEntry(cachedTableEntry, HASH_DISTRIBUTED)) + { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("Cannot %s %s as operation " + "is only supported for hash distributed tables.", + SplitOperationName[splitOperation], + SplitTargetName[splitOperation]))); + } + + if (extern_IsColumnarTableAmTable(shardIntervalToSplit->relationId)) + { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("Cannot %s %s as operation " + "is not supported for Columnar tables.", + SplitOperationName[splitOperation], + SplitTargetName[splitOperation]))); + } + + uint32 relationReplicationFactor = TableShardReplicationFactor( + shardIntervalToSplit->relationId); + if (relationReplicationFactor > 1) + { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg( + "Operation %s not supported for %s as replication factor '%u' " + "is greater than 1.", + SplitOperationName[splitOperation], + SplitTargetName[splitOperation], + relationReplicationFactor))); + } + + int splitPointsCount = list_length(shardSplitPointsList); + int nodeIdsCount = list_length(nodeIdsForPlacementList); + if (nodeIdsCount != splitPointsCount + 1) + { + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg( + "Number of worker node ids should be one greater split points. " + "NodeId count is '%d' and SplitPoint count is '%d'.", + nodeIdsCount, + splitPointsCount))); + } + + + Assert(shardIntervalToSplit->minValueExists); + Assert(shardIntervalToSplit->maxValueExists); + + /* We already verified table is Hash Distributed. We know (minValue, maxValue) are integers. */ + int32 minValue = DatumGetInt32(shardIntervalToSplit->minValue); + int32 maxValue = DatumGetInt32(shardIntervalToSplit->maxValue); + + /* Fail if Shard Interval cannot be split anymore i.e (min, max) range overlap. */ + if (minValue == maxValue) + { + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg( + "Cannot split shard id \"%lu\" as min/max range are equal: ('%d', '%d').", + shardIntervalToSplit->shardId, + minValue, + maxValue))); + } + + NullableDatum lastShardSplitPoint = { 0, true /*isnull*/ }; + Datum shardSplitPoint; + foreach_int(shardSplitPoint, shardSplitPointsList) + { + int32 shardSplitPointValue = DatumGetInt32(shardSplitPoint); + + /* All Split points should lie within the shard interval range. */ + int splitPointShardIndex = FindShardIntervalIndex(shardSplitPoint, + cachedTableEntry); + if (shardIntervalToSplit->shardIndex != splitPointShardIndex) + { + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg( + "Split point %d is outside the min/max range for shard id %lu.", + shardSplitPointValue, + shardIntervalToSplit->shardId))); + } + + /* Split points should be in strictly increasing order */ + int32 lastShardSplitPointValue = DatumGetInt32(lastShardSplitPoint.value); + if (!lastShardSplitPoint.isnull && shardSplitPointValue <= + lastShardSplitPointValue) + { + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg( + "Invalid Split Points '%d' followed by '%d'. " + "All split points should be strictly increasing.", + lastShardSplitPointValue, + shardSplitPointValue))); + } + + /* + * Given our split points inclusive, you cannot specify the max value in a range as a split point. + * Example: Shard 81060002 range is from (0,1073741823). '1073741823' as split point is invalid. + * '1073741822' is correct and will split shard to: (0, 1073741822) and (1073741823, 1073741823). + */ + if (maxValue == shardSplitPointValue) + { + int32 validSplitPoint = shardIntervalToSplit->maxValue - 1; + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg( + "Invalid split point %d, as split points should be inclusive. Please use %d instead.", + maxValue, + validSplitPoint))); + } + + lastShardSplitPoint = (NullableDatum) { + shardSplitPoint, false + }; + } +} + + +/* + * SplitShard API to split a given shard (or shard group) in blocking / non-blocking fashion + * based on specified split points to a set of destination nodes. + * 'splitOperation' : Customer operation that triggered split. + * 'shardInterval' : Source shard interval to be split. + * 'shardSplitPointsList' : Split Points list for the source 'shardInterval'. + * 'workersForPlacementList' : Placement list corresponding to split children. + */ +void +SplitShard(SplitMode splitMode, + SplitOperation splitOperation, + uint64 shardIdToSplit, + List *shardSplitPointsList, + List *nodeIdsForPlacementList) +{ + if (XactModificationLevel > XACT_MODIFICATION_NONE) + { + ereport(ERROR, (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION), + errmsg("cannot %s %s after other modifications " + "in the same transaction.", + SplitOperationName[splitOperation], + SplitTargetName[splitOperation]))); + } + + ShardInterval *shardIntervalToSplit = LoadShardInterval(shardIdToSplit); + List *colocatedTableList = ColocatedTableList(shardIntervalToSplit->relationId); + + /* sort the tables to avoid deadlocks */ + colocatedTableList = SortList(colocatedTableList, CompareOids); + Oid colocatedTableId = InvalidOid; + foreach_oid(colocatedTableId, colocatedTableList) + { + /* + * Block concurrent DDL / TRUNCATE commands on the relation. Similarly, + * block concurrent citus_move_shard_placement() / isolate_tenant_to_new_shard() + * on any shard of the same relation. + */ + LockRelationOid(colocatedTableId, ShareUpdateExclusiveLock); + } + + /* + * TODO(niupre): When all consumers (Example : ISOLATE_TENANT_TO_NEW_SHARD) directly call 'SplitShard' API, + * these two methods will be merged. + */ + ErrorIfCannotSplitShard(SHARD_SPLIT_API, shardIntervalToSplit); + ErrorIfCannotSplitShardExtended( + SHARD_SPLIT_API, + shardIntervalToSplit, + shardSplitPointsList, + nodeIdsForPlacementList); + + List *workersForPlacementList = NULL; + Datum nodeId; + foreach_int(nodeId, nodeIdsForPlacementList) + { + uint32 nodeIdValue = DatumGetUInt32(nodeId); + WorkerNode *workerNode = LookupNodeByNodeId(nodeIdValue); + + /* NodeId in Citus are unsigned and range from [1, 4294967296]. */ + if (nodeIdValue < 1 || workerNode == NULL) + { + ereport(ERROR, (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION), + errmsg("Invalid Node Id '%u'.", nodeIdValue))); + } + + workersForPlacementList = + lappend(workersForPlacementList, (void *) workerNode); + } + + if (splitMode == BLOCKING_SPLIT) + { + EnsureReferenceTablesExistOnAllNodesExtended(TRANSFER_MODE_BLOCK_WRITES); + BlockingShardSplit( + splitOperation, + shardIntervalToSplit, + shardSplitPointsList, + workersForPlacementList); + } + else + { + /* we only support blocking shard split in this code path for now. */ + ereport(ERROR, (errmsg("Invalid split mode %s.", SplitOperationType[splitMode]))); + } +} + + +/* + * SplitShard API to split a given shard (or shard group) in blocking fashion + * based on specified split points to a set of destination nodes. + * 'splitOperation' : Customer operation that triggered split. + * 'shardInterval' : Source shard interval to be split. + * 'shardSplitPointsList' : Split Points list for the source 'shardInterval'. + * 'workersForPlacementList' : Placement list corresponding to split children. + */ +static void +BlockingShardSplit(SplitOperation splitOperation, + ShardInterval *shardIntervalToSplit, + List *shardSplitPointsList, + List *workersForPlacementList) +{ + List *sourceColocatedShardIntervalList = ColocatedShardIntervalList( + shardIntervalToSplit); + + /* First create shard interval metadata for split children */ + List *shardGroupSplitIntervalListList = CreateSplitIntervalsForShardGroup( + sourceColocatedShardIntervalList, + shardSplitPointsList); + + /* Physically create split children on local/remote nodes */ + CreateSplitShardsForShardGroup( + shardGroupSplitIntervalListList, + workersForPlacementList); +} + +/* Create ShardGroup split children on a list of corresponding workers. */ +static void +CreateSplitShardsForShardGroup(List *shardGroupSplitIntervalListList, + List *workersForPlacementList) +{ + /* Iterate on shard intervals for shard group */ + List *shardIntervalList = NULL; + foreach_ptr(shardIntervalList, shardGroupSplitIntervalListList) + { + ShardInterval *shardInterval = NULL; + WorkerNode *workerPlacementNode = NULL; + forboth_ptr(shardInterval, shardIntervalList, workerPlacementNode, + workersForPlacementList) + { + /* Populate list of commands necessary to create shard interval on destination */ + List *splitShardCreationCommandList = GetPreLoadTableCreationCommands( + shardInterval->relationId, + false, /* includeSequenceDefaults */ + NULL /* auto add columnar options for cstore tables */); + splitShardCreationCommandList = WorkerApplyShardDDLCommandList( + splitShardCreationCommandList, + shardInterval->shardId); + + /* Create new split child shard on the specified placement list */ + CreateShardOnPlacement(splitShardCreationCommandList, workerPlacementNode); + } + } +} + + +/* + * Create a shard (using DDL list) on a worker node. + */ +static void +CreateShardOnPlacement(List *splitShardCreationCommandList, + WorkerNode *workerPlacementNode) +{ + char *currentUser = CurrentUserName(); + SendCommandListToWorkerOutsideTransaction(workerPlacementNode->workerName, + workerPlacementNode->workerPort, + currentUser, + splitShardCreationCommandList); +} + + +/* + * Create split children intervals for a shardgroup given list of split points. + */ +static List * +CreateSplitIntervalsForShardGroup(List *sourceColocatedShardIntervalList, + List *splitPointsForShard) +{ + List *shardGroupSplitIntervalListList = NULL; + + ShardInterval *shardToSplitInterval = NULL; + foreach_ptr(shardToSplitInterval, sourceColocatedShardIntervalList) + { + List *shardSplitIntervalList = NULL; + CreateSplitIntervalsForShard(shardToSplitInterval, splitPointsForShard, + &shardSplitIntervalList); + + shardGroupSplitIntervalListList = lappend(shardGroupSplitIntervalListList, + shardSplitIntervalList); + } + + return shardGroupSplitIntervalListList; +} + + +/* + * Create split children intervals given a sourceshard and a list of split points. + * Example: SourceShard is range [0, 100] and SplitPoints are (15, 30) will give us: + * [(0, 15) (16, 30) (31, 100)] + */ +static void +CreateSplitIntervalsForShard(ShardInterval *sourceShard, + List *splitPointsForShard, + List **shardSplitChildrenIntervalList) +{ + /* For 'N' split points, we will have N+1 shard intervals created. */ + int shardIntervalCount = list_length(splitPointsForShard) + 1; + ListCell *splitPointCell = list_head(splitPointsForShard); + int32 splitParentMaxValue = DatumGetInt32(sourceShard->maxValue); + + int32 currentSplitChildMinValue = DatumGetInt32(sourceShard->minValue); + for (int index = 0; index < shardIntervalCount; index++) + { + ShardInterval *splitChildShardInterval = CopyShardInterval(sourceShard); + splitChildShardInterval->shardIndex = -1; + splitChildShardInterval->shardId = GetNextShardId(); + + splitChildShardInterval->minValueExists = true; + splitChildShardInterval->minValue = currentSplitChildMinValue; + splitChildShardInterval->maxValueExists = true; + + /* Length of splitPointsForShard is one less than 'shardIntervalCount' and we need to account */ + /* for 'splitPointCell' being NULL for last iteration. */ + if (splitPointCell) + { + splitChildShardInterval->maxValue = DatumGetInt32((Datum) lfirst( + splitPointCell)); + splitPointCell = lnext(splitPointsForShard, splitPointCell); + } + else + { + splitChildShardInterval->maxValue = splitParentMaxValue; + } + + currentSplitChildMinValue = splitChildShardInterval->maxValue + 1; + *shardSplitChildrenIntervalList = lappend(*shardSplitChildrenIntervalList, + splitChildShardInterval); + } +} diff --git a/src/backend/distributed/sql/citus--11.0-2--11.1-1.sql b/src/backend/distributed/sql/citus--11.0-2--11.1-1.sql index ff58922f8..458cef095 100644 --- a/src/backend/distributed/sql/citus--11.0-2--11.1-1.sql +++ b/src/backend/distributed/sql/citus--11.0-2--11.1-1.sql @@ -8,3 +8,4 @@ DROP FUNCTION pg_catalog.worker_range_partition_table(bigint, integer, text, tex DROP FUNCTION pg_catalog.worker_repartition_cleanup(bigint); #include "../../columnar/sql/columnar--11.0-2--11.1-1.sql" +#include "udfs/citus_split_shard_by_split_points/11.0-2.sql" diff --git a/src/backend/distributed/sql/udfs/citus_split_shard_by_split_points/11.0-2.sql b/src/backend/distributed/sql/udfs/citus_split_shard_by_split_points/11.0-2.sql new file mode 100644 index 000000000..bce5e9d04 --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_split_shard_by_split_points/11.0-2.sql @@ -0,0 +1,14 @@ +CREATE OR REPLACE FUNCTION pg_catalog.citus_split_shard_by_split_points( + shard_id bigint, + split_points integer[], + -- A 'nodeId' is a uint32 in CITUS [1, 4294967296] but postgres does not have unsigned type support. + -- Use integer (consistent with other previously defined UDFs that take nodeId as integer) as for all practical purposes it is big enough. + node_ids integer[], + -- Three modes to be implemented: blocking, non_blocking and auto. + -- Currently, the default / only supported mode is blocking. + split_mode citus.split_mode default 'blocking') +RETURNS void +LANGUAGE C STRICT +AS 'MODULE_PATHNAME', $$citus_split_shard_by_split_points$$; +COMMENT ON FUNCTION pg_catalog.citus_split_shard_by_split_points(shard_id bigint, split_points integer[], nodeIds integer[], citus.split_mode) + IS 'split a shard using split mode.'; diff --git a/src/backend/distributed/sql/udfs/citus_split_shard_by_split_points/latest.sql b/src/backend/distributed/sql/udfs/citus_split_shard_by_split_points/latest.sql new file mode 100644 index 000000000..bce5e9d04 --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_split_shard_by_split_points/latest.sql @@ -0,0 +1,14 @@ +CREATE OR REPLACE FUNCTION pg_catalog.citus_split_shard_by_split_points( + shard_id bigint, + split_points integer[], + -- A 'nodeId' is a uint32 in CITUS [1, 4294967296] but postgres does not have unsigned type support. + -- Use integer (consistent with other previously defined UDFs that take nodeId as integer) as for all practical purposes it is big enough. + node_ids integer[], + -- Three modes to be implemented: blocking, non_blocking and auto. + -- Currently, the default / only supported mode is blocking. + split_mode citus.split_mode default 'blocking') +RETURNS void +LANGUAGE C STRICT +AS 'MODULE_PATHNAME', $$citus_split_shard_by_split_points$$; +COMMENT ON FUNCTION pg_catalog.citus_split_shard_by_split_points(shard_id bigint, split_points integer[], nodeIds integer[], citus.split_mode) + IS 'split a shard using split mode.'; diff --git a/src/backend/distributed/utils/array_type.c b/src/backend/distributed/utils/array_type.c index eed2e3fdf..70f88e740 100644 --- a/src/backend/distributed/utils/array_type.c +++ b/src/backend/distributed/utils/array_type.c @@ -12,6 +12,7 @@ #include "postgres.h" #include "miscadmin.h" +#include "nodes/pg_list.h" #include "distributed/utils/array_type.h" #include "utils/array.h" #include "utils/lsyscache.h" @@ -96,3 +97,21 @@ DatumArrayToArrayType(Datum *datumArray, int datumCount, Oid datumTypeId) return arrayObject; } + +/* + * Converts ArrayType to List. + */ +List * +IntegerArrayTypeToList(ArrayType *arrayObject) +{ + List *list = NULL; + Datum *datumObjectArray = DeconstructArrayObject(arrayObject); + int arrayObjectCount = ArrayObjectCount(arrayObject); + + for (int index = 0; index < arrayObjectCount; index++) + { + list = lappend_int(list, datumObjectArray[index]); + } + + return list; +} diff --git a/src/include/distributed/coordinator_protocol.h b/src/include/distributed/coordinator_protocol.h index bda318a25..8d079d0a5 100644 --- a/src/include/distributed/coordinator_protocol.h +++ b/src/include/distributed/coordinator_protocol.h @@ -282,6 +282,9 @@ extern int MasterDropAllShards(Oid relationId, char *schemaName, char *relationN extern Datum master_create_worker_shards(PG_FUNCTION_ARGS); extern Datum isolate_tenant_to_new_shard(PG_FUNCTION_ARGS); +/* function declarations for shard split functionality */ +extern Datum citus_split_shard_by_split_points(PG_FUNCTION_ARGS); + /* function declarations for shard repair functionality */ extern Datum master_copy_shard_placement(PG_FUNCTION_ARGS); diff --git a/src/include/distributed/shard_split.h b/src/include/distributed/shard_split.h new file mode 100644 index 000000000..27d1ade5a --- /dev/null +++ b/src/include/distributed/shard_split.h @@ -0,0 +1,40 @@ +/*------------------------------------------------------------------------- + * + * shard_split.h + * + * API for shard splits. + * + * Copyright (c) Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#ifndef SHARDSPLIT_UTILS_H_ +#define SHARDSPLIT_UTILS_H_ + +/* Split Modes supported by Shard Split API */ +typedef enum SplitMode +{ + BLOCKING_SPLIT = 0, +} SplitMode; + +/* + * User Scenario calling Split Shard API. + * The 'SplitOperation' type is used to customize info/error messages based on user scenario. + */ +typedef enum SplitOperation +{ + SHARD_SPLIT_API = 0 +} SplitOperation; + +/* + * SplitShard API to split a given shard (or shard group) in blocking / non-blocking fashion + * based on specified split points to a set of destination nodes. + */ +extern void SplitShard(SplitMode splitMode, + SplitOperation splitOperation, + uint64 shardIdToSplit, + List *shardSplitPointsList, + List *nodeIdsForPlacementList); + +#endif /* SHARDSPLIT_UTILS_H_ */ diff --git a/src/include/distributed/utils/array_type.h b/src/include/distributed/utils/array_type.h index cb03aafed..6b605cbee 100644 --- a/src/include/distributed/utils/array_type.h +++ b/src/include/distributed/utils/array_type.h @@ -20,6 +20,6 @@ extern Datum * DeconstructArrayObject(ArrayType *arrayObject); extern int32 ArrayObjectCount(ArrayType *arrayObject); extern ArrayType * DatumArrayToArrayType(Datum *datumArray, int datumCount, Oid datumTypeId); - +extern List * IntegerArrayTypeToList(ArrayType *arrayObject); #endif /* CITUS_ARRAY_TYPE_H */