mirror of https://github.com/citusdata/citus.git
Blocking split setup
parent
d19d876b5f
commit
b5b14ed0bd
|
@ -0,0 +1,89 @@
|
|||
/*-------------------------------------------------------------------------
|
||||
*
|
||||
* citus_split_shard_by_split_points.c
|
||||
*
|
||||
* This file contains functions to split a shard.
|
||||
*
|
||||
* Copyright (c) Citus Data, Inc.
|
||||
*
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
|
||||
#include "postgres.h"
|
||||
#include "nodes/pg_list.h"
|
||||
#include "distributed/utils/array_type.h"
|
||||
#include "lib/stringinfo.h"
|
||||
#include "utils/builtins.h"
|
||||
#include "utils/lsyscache.h"
|
||||
#include "distributed/colocation_utils.h"
|
||||
#include "distributed/metadata_cache.h"
|
||||
#include "distributed/shardinterval_utils.h"
|
||||
#include "distributed/coordinator_protocol.h"
|
||||
#include "distributed/connection_management.h"
|
||||
#include "distributed/remote_commands.h"
|
||||
#include "distributed/shard_split.h"
|
||||
|
||||
/* declarations for dynamic loading */
|
||||
PG_FUNCTION_INFO_V1(citus_split_shard_by_split_points);
|
||||
|
||||
static SplitMode LookupSplitMode(Oid shardSplitModeOid);
|
||||
|
||||
/*
|
||||
* citus_split_shard_by_split_points(shard_id bigint, split_points integer[], node_ids integer[])
|
||||
* Split source shard into multiple shards using the given split points.
|
||||
* 'shard_id' is the id of source shard to split.
|
||||
* 'split_points' is an array of integers that represents the split points.
|
||||
* 'node_ids' is an array of integers that represents the placement node ids of the new shards.
|
||||
* 'split_mode citus.split_mode' is the mode of split.
|
||||
*/
|
||||
Datum
|
||||
citus_split_shard_by_split_points(PG_FUNCTION_ARGS)
|
||||
{
|
||||
CheckCitusVersion(ERROR);
|
||||
EnsureCoordinator();
|
||||
|
||||
uint64 shardIdToSplit = DatumGetUInt64(PG_GETARG_DATUM(0));
|
||||
|
||||
ArrayType *splitPointsArrayObject = PG_GETARG_ARRAYTYPE_P(1);
|
||||
List *shardSplitPointsList = IntegerArrayTypeToList(splitPointsArrayObject);
|
||||
|
||||
ArrayType *nodeIdsArrayObject = PG_GETARG_ARRAYTYPE_P(2);
|
||||
List *nodeIdsForPlacementList = IntegerArrayTypeToList(nodeIdsArrayObject);
|
||||
|
||||
Oid shardSplitModeOid = PG_GETARG_OID(3);
|
||||
SplitMode shardSplitMode = LookupSplitMode(shardSplitModeOid);
|
||||
|
||||
SplitShard(
|
||||
shardSplitMode,
|
||||
SHARD_SPLIT_API,
|
||||
shardIdToSplit,
|
||||
shardSplitPointsList,
|
||||
nodeIdsForPlacementList);
|
||||
|
||||
PG_RETURN_VOID();
|
||||
}
|
||||
|
||||
/*
|
||||
* LookupSplitMode maps the oids of citus.shard_split_mode enum
|
||||
* values to a char.
|
||||
*/
|
||||
SplitMode
|
||||
LookupSplitMode(Oid shardSplitModeOid)
|
||||
{
|
||||
SplitMode shardSplitMode = BLOCKING_SPLIT;
|
||||
|
||||
Datum enumLabelDatum = DirectFunctionCall1(enum_out, shardSplitModeOid);
|
||||
char *enumLabel = DatumGetCString(enumLabelDatum);
|
||||
|
||||
if (strncmp(enumLabel, "blocking", NAMEDATALEN) == 0)
|
||||
{
|
||||
shardSplitMode = BLOCKING_SPLIT;
|
||||
}
|
||||
/* Extend with other modes as we support them */
|
||||
else
|
||||
{
|
||||
ereport(ERROR, (errmsg("Invalid label for enum: %s", enumLabel)));
|
||||
}
|
||||
|
||||
return shardSplitMode;
|
||||
}
|
|
@ -0,0 +1,516 @@
|
|||
/*-------------------------------------------------------------------------
|
||||
*
|
||||
* shard_split.c
|
||||
*
|
||||
* Function definitions for the shard split.
|
||||
*
|
||||
* Copyright (c) Citus Data, Inc.
|
||||
*
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
|
||||
#include "postgres.h"
|
||||
#include "nodes/pg_list.h"
|
||||
#include "utils/array.h"
|
||||
#include "distributed/utils/array_type.h"
|
||||
#include "lib/stringinfo.h"
|
||||
#include "utils/builtins.h"
|
||||
#include "utils/lsyscache.h"
|
||||
#include "distributed/colocation_utils.h"
|
||||
#include "distributed/metadata_cache.h"
|
||||
#include "distributed/shardinterval_utils.h"
|
||||
#include "distributed/coordinator_protocol.h"
|
||||
#include "distributed/connection_management.h"
|
||||
#include "distributed/remote_commands.h"
|
||||
#include "distributed/shard_split.h"
|
||||
#include "distributed/reference_table_utils.h"
|
||||
#include "distributed/multi_partitioning_utils.h"
|
||||
#include "distributed/worker_transaction.h"
|
||||
#include "distributed/shared_library_init.h"
|
||||
|
||||
/* Function declarations */
|
||||
static void ErrorIfCannotSplitShardExtended(SplitOperation splitOperation,
|
||||
ShardInterval *shardIntervalToSplit,
|
||||
List *shardSplitPointsList,
|
||||
List *nodeIdsForPlacementList);
|
||||
/* TODO (niupre) : This will be public function when citus-enterprise is merged */
|
||||
static void ErrorIfCannotSplitShard(SplitOperation splitOperation, ShardInterval *sourceShard);
|
||||
static void CreateSplitShardsForShardGroup(List *splitChildrenShardIntervalListList,
|
||||
List *workersForPlacementList);
|
||||
static void CreateShardOnPlacement(List *splitShardCreationCommandList,
|
||||
WorkerNode *workerNode);
|
||||
static List * CreateSplitIntervalsForShardGroup(List *sourceColocatedShardList,
|
||||
List *splitPointsForShard);
|
||||
static void CreateSplitIntervalsForShard(ShardInterval *sourceShard,
|
||||
List *splitPointsForShard,
|
||||
List **shardSplitChildrenIntervalList);
|
||||
static void BlockingShardSplit(SplitOperation splitOperation,
|
||||
ShardInterval *shardIntervalToSplit,
|
||||
List *shardSplitPointsList,
|
||||
List *workersForPlacementList);
|
||||
|
||||
/* Customize error message strings based on operation type */
|
||||
static const char *const SplitOperationName[] =
|
||||
{
|
||||
[SHARD_SPLIT_API] = "split"
|
||||
};
|
||||
static const char *const SplitTargetName[] =
|
||||
{
|
||||
[SHARD_SPLIT_API] = "shard"
|
||||
};
|
||||
static const char *const SplitOperationType[] =
|
||||
{
|
||||
[BLOCKING_SPLIT] = "blocking"
|
||||
};
|
||||
|
||||
/* Function definitions */
|
||||
|
||||
/*
|
||||
* ErrorIfCannotSplitShard checks relation kind and invalid shards. It errors
|
||||
* out if we are not able to split the given shard.
|
||||
*/
|
||||
void
|
||||
ErrorIfCannotSplitShard(SplitOperation splitOperation, ShardInterval *sourceShard)
|
||||
{
|
||||
Oid relationId = sourceShard->relationId;
|
||||
ListCell *colocatedTableCell = NULL;
|
||||
ListCell *colocatedShardCell = NULL;
|
||||
|
||||
/* checks for table ownership and foreign tables */
|
||||
List *colocatedTableList = ColocatedTableList(relationId);
|
||||
foreach(colocatedTableCell, colocatedTableList)
|
||||
{
|
||||
Oid colocatedTableId = lfirst_oid(colocatedTableCell);
|
||||
|
||||
/* check that user has owner rights in all co-located tables */
|
||||
EnsureTableOwner(colocatedTableId);
|
||||
|
||||
char relationKind = get_rel_relkind(colocatedTableId);
|
||||
if (relationKind == RELKIND_FOREIGN_TABLE)
|
||||
{
|
||||
char *relationName = get_rel_name(colocatedTableId);
|
||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("cannot %s %s because \"%s\" is a "
|
||||
"foreign table",
|
||||
SplitOperationName[splitOperation],
|
||||
SplitTargetName[splitOperation],
|
||||
relationName),
|
||||
errdetail("Splitting shards backed by foreign tables "
|
||||
"is not supported.")));
|
||||
}
|
||||
|
||||
/*
|
||||
* At the moment, we do not support copying a shard if that shard's
|
||||
* relation is in a colocation group with a partitioned table or partition.
|
||||
*/
|
||||
if (PartitionedTable(colocatedTableId))
|
||||
{
|
||||
char *sourceRelationName = get_rel_name(relationId);
|
||||
char *colocatedRelationName = get_rel_name(colocatedTableId);
|
||||
|
||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("cannot %s of '%s', because it "
|
||||
"is a partitioned table",
|
||||
SplitOperationName[splitOperation],
|
||||
colocatedRelationName),
|
||||
errdetail("In colocation group of '%s', a partitioned "
|
||||
"relation exists: '%s'. Citus does not support "
|
||||
"%s of partitioned tables.",
|
||||
sourceRelationName,
|
||||
colocatedRelationName,
|
||||
SplitOperationName[splitOperation])));
|
||||
}
|
||||
}
|
||||
|
||||
/* check shards with inactive placements */
|
||||
List *colocatedShardList = ColocatedShardIntervalList(sourceShard);
|
||||
foreach(colocatedShardCell, colocatedShardList)
|
||||
{
|
||||
ShardInterval *shardInterval = (ShardInterval *) lfirst(colocatedShardCell);
|
||||
uint64 shardId = shardInterval->shardId;
|
||||
ListCell *shardPlacementCell = NULL;
|
||||
|
||||
List *shardPlacementList = ShardPlacementListWithoutOrphanedPlacements(shardId);
|
||||
foreach(shardPlacementCell, shardPlacementList)
|
||||
{
|
||||
ShardPlacement *placement = (ShardPlacement *) lfirst(shardPlacementCell);
|
||||
if (placement->shardState != SHARD_STATE_ACTIVE)
|
||||
{
|
||||
char *relationName = get_rel_name(shardInterval->relationId);
|
||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("cannot %s %s because relation "
|
||||
"\"%s\" has an inactive shard placement "
|
||||
"for the shard %lu",
|
||||
SplitOperationName[splitOperation],
|
||||
SplitTargetName[splitOperation],
|
||||
relationName, shardId),
|
||||
errhint("Use master_copy_shard_placement UDF to "
|
||||
"repair the inactive shard placement.")));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* Exteded checks before we decide to split the shard.
|
||||
* When all consumers (Example : ISOLATE_TENANT_TO_NEW_SHARD) directly call 'SplitShard' API,
|
||||
* this method will be merged with 'ErrorIfCannotSplitShard' above.
|
||||
*/
|
||||
static void
|
||||
ErrorIfCannotSplitShardExtended(SplitOperation splitOperation,
|
||||
ShardInterval *shardIntervalToSplit,
|
||||
List *shardSplitPointsList,
|
||||
List *nodeIdsForPlacementList)
|
||||
{
|
||||
CitusTableCacheEntry *cachedTableEntry = GetCitusTableCacheEntry(
|
||||
shardIntervalToSplit->relationId);
|
||||
|
||||
/* Perform checks common to both blocking and non-blocking Split API here. */
|
||||
if (!IsCitusTableTypeCacheEntry(cachedTableEntry, HASH_DISTRIBUTED))
|
||||
{
|
||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("Cannot %s %s as operation "
|
||||
"is only supported for hash distributed tables.",
|
||||
SplitOperationName[splitOperation],
|
||||
SplitTargetName[splitOperation])));
|
||||
}
|
||||
|
||||
if (extern_IsColumnarTableAmTable(shardIntervalToSplit->relationId))
|
||||
{
|
||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("Cannot %s %s as operation "
|
||||
"is not supported for Columnar tables.",
|
||||
SplitOperationName[splitOperation],
|
||||
SplitTargetName[splitOperation])));
|
||||
}
|
||||
|
||||
uint32 relationReplicationFactor = TableShardReplicationFactor(
|
||||
shardIntervalToSplit->relationId);
|
||||
if (relationReplicationFactor > 1)
|
||||
{
|
||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg(
|
||||
"Operation %s not supported for %s as replication factor '%u' "
|
||||
"is greater than 1.",
|
||||
SplitOperationName[splitOperation],
|
||||
SplitTargetName[splitOperation],
|
||||
relationReplicationFactor)));
|
||||
}
|
||||
|
||||
int splitPointsCount = list_length(shardSplitPointsList);
|
||||
int nodeIdsCount = list_length(nodeIdsForPlacementList);
|
||||
if (nodeIdsCount != splitPointsCount + 1)
|
||||
{
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_SYNTAX_ERROR),
|
||||
errmsg(
|
||||
"Number of worker node ids should be one greater split points. "
|
||||
"NodeId count is '%d' and SplitPoint count is '%d'.",
|
||||
nodeIdsCount,
|
||||
splitPointsCount)));
|
||||
}
|
||||
|
||||
|
||||
Assert(shardIntervalToSplit->minValueExists);
|
||||
Assert(shardIntervalToSplit->maxValueExists);
|
||||
|
||||
/* We already verified table is Hash Distributed. We know (minValue, maxValue) are integers. */
|
||||
int32 minValue = DatumGetInt32(shardIntervalToSplit->minValue);
|
||||
int32 maxValue = DatumGetInt32(shardIntervalToSplit->maxValue);
|
||||
|
||||
/* Fail if Shard Interval cannot be split anymore i.e (min, max) range overlap. */
|
||||
if (minValue == maxValue)
|
||||
{
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_SYNTAX_ERROR),
|
||||
errmsg(
|
||||
"Cannot split shard id \"%lu\" as min/max range are equal: ('%d', '%d').",
|
||||
shardIntervalToSplit->shardId,
|
||||
minValue,
|
||||
maxValue)));
|
||||
}
|
||||
|
||||
NullableDatum lastShardSplitPoint = { 0, true /*isnull*/ };
|
||||
Datum shardSplitPoint;
|
||||
foreach_int(shardSplitPoint, shardSplitPointsList)
|
||||
{
|
||||
int32 shardSplitPointValue = DatumGetInt32(shardSplitPoint);
|
||||
|
||||
/* All Split points should lie within the shard interval range. */
|
||||
int splitPointShardIndex = FindShardIntervalIndex(shardSplitPoint,
|
||||
cachedTableEntry);
|
||||
if (shardIntervalToSplit->shardIndex != splitPointShardIndex)
|
||||
{
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_SYNTAX_ERROR),
|
||||
errmsg(
|
||||
"Split point %d is outside the min/max range for shard id %lu.",
|
||||
shardSplitPointValue,
|
||||
shardIntervalToSplit->shardId)));
|
||||
}
|
||||
|
||||
/* Split points should be in strictly increasing order */
|
||||
int32 lastShardSplitPointValue = DatumGetInt32(lastShardSplitPoint.value);
|
||||
if (!lastShardSplitPoint.isnull && shardSplitPointValue <=
|
||||
lastShardSplitPointValue)
|
||||
{
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_SYNTAX_ERROR),
|
||||
errmsg(
|
||||
"Invalid Split Points '%d' followed by '%d'. "
|
||||
"All split points should be strictly increasing.",
|
||||
lastShardSplitPointValue,
|
||||
shardSplitPointValue)));
|
||||
}
|
||||
|
||||
/*
|
||||
* Given our split points inclusive, you cannot specify the max value in a range as a split point.
|
||||
* Example: Shard 81060002 range is from (0,1073741823). '1073741823' as split point is invalid.
|
||||
* '1073741822' is correct and will split shard to: (0, 1073741822) and (1073741823, 1073741823).
|
||||
*/
|
||||
if (maxValue == shardSplitPointValue)
|
||||
{
|
||||
int32 validSplitPoint = shardIntervalToSplit->maxValue - 1;
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_SYNTAX_ERROR),
|
||||
errmsg(
|
||||
"Invalid split point %d, as split points should be inclusive. Please use %d instead.",
|
||||
maxValue,
|
||||
validSplitPoint)));
|
||||
}
|
||||
|
||||
lastShardSplitPoint = (NullableDatum) {
|
||||
shardSplitPoint, false
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* SplitShard API to split a given shard (or shard group) in blocking / non-blocking fashion
|
||||
* based on specified split points to a set of destination nodes.
|
||||
* 'splitOperation' : Customer operation that triggered split.
|
||||
* 'shardInterval' : Source shard interval to be split.
|
||||
* 'shardSplitPointsList' : Split Points list for the source 'shardInterval'.
|
||||
* 'workersForPlacementList' : Placement list corresponding to split children.
|
||||
*/
|
||||
void
|
||||
SplitShard(SplitMode splitMode,
|
||||
SplitOperation splitOperation,
|
||||
uint64 shardIdToSplit,
|
||||
List *shardSplitPointsList,
|
||||
List *nodeIdsForPlacementList)
|
||||
{
|
||||
if (XactModificationLevel > XACT_MODIFICATION_NONE)
|
||||
{
|
||||
ereport(ERROR, (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
|
||||
errmsg("cannot %s %s after other modifications "
|
||||
"in the same transaction.",
|
||||
SplitOperationName[splitOperation],
|
||||
SplitTargetName[splitOperation])));
|
||||
}
|
||||
|
||||
ShardInterval *shardIntervalToSplit = LoadShardInterval(shardIdToSplit);
|
||||
List *colocatedTableList = ColocatedTableList(shardIntervalToSplit->relationId);
|
||||
|
||||
/* sort the tables to avoid deadlocks */
|
||||
colocatedTableList = SortList(colocatedTableList, CompareOids);
|
||||
Oid colocatedTableId = InvalidOid;
|
||||
foreach_oid(colocatedTableId, colocatedTableList)
|
||||
{
|
||||
/*
|
||||
* Block concurrent DDL / TRUNCATE commands on the relation. Similarly,
|
||||
* block concurrent citus_move_shard_placement() / isolate_tenant_to_new_shard()
|
||||
* on any shard of the same relation.
|
||||
*/
|
||||
LockRelationOid(colocatedTableId, ShareUpdateExclusiveLock);
|
||||
}
|
||||
|
||||
/*
|
||||
* TODO(niupre): When all consumers (Example : ISOLATE_TENANT_TO_NEW_SHARD) directly call 'SplitShard' API,
|
||||
* these two methods will be merged.
|
||||
*/
|
||||
ErrorIfCannotSplitShard(SHARD_SPLIT_API, shardIntervalToSplit);
|
||||
ErrorIfCannotSplitShardExtended(
|
||||
SHARD_SPLIT_API,
|
||||
shardIntervalToSplit,
|
||||
shardSplitPointsList,
|
||||
nodeIdsForPlacementList);
|
||||
|
||||
List *workersForPlacementList = NULL;
|
||||
Datum nodeId;
|
||||
foreach_int(nodeId, nodeIdsForPlacementList)
|
||||
{
|
||||
uint32 nodeIdValue = DatumGetUInt32(nodeId);
|
||||
WorkerNode *workerNode = LookupNodeByNodeId(nodeIdValue);
|
||||
|
||||
/* NodeId in Citus are unsigned and range from [1, 4294967296]. */
|
||||
if (nodeIdValue < 1 || workerNode == NULL)
|
||||
{
|
||||
ereport(ERROR, (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
|
||||
errmsg("Invalid Node Id '%u'.", nodeIdValue)));
|
||||
}
|
||||
|
||||
workersForPlacementList =
|
||||
lappend(workersForPlacementList, (void *) workerNode);
|
||||
}
|
||||
|
||||
if (splitMode == BLOCKING_SPLIT)
|
||||
{
|
||||
EnsureReferenceTablesExistOnAllNodesExtended(TRANSFER_MODE_BLOCK_WRITES);
|
||||
BlockingShardSplit(
|
||||
splitOperation,
|
||||
shardIntervalToSplit,
|
||||
shardSplitPointsList,
|
||||
workersForPlacementList);
|
||||
}
|
||||
else
|
||||
{
|
||||
/* we only support blocking shard split in this code path for now. */
|
||||
ereport(ERROR, (errmsg("Invalid split mode %s.", SplitOperationType[splitMode])));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* SplitShard API to split a given shard (or shard group) in blocking fashion
|
||||
* based on specified split points to a set of destination nodes.
|
||||
* 'splitOperation' : Customer operation that triggered split.
|
||||
* 'shardInterval' : Source shard interval to be split.
|
||||
* 'shardSplitPointsList' : Split Points list for the source 'shardInterval'.
|
||||
* 'workersForPlacementList' : Placement list corresponding to split children.
|
||||
*/
|
||||
static void
|
||||
BlockingShardSplit(SplitOperation splitOperation,
|
||||
ShardInterval *shardIntervalToSplit,
|
||||
List *shardSplitPointsList,
|
||||
List *workersForPlacementList)
|
||||
{
|
||||
List *sourceColocatedShardIntervalList = ColocatedShardIntervalList(
|
||||
shardIntervalToSplit);
|
||||
|
||||
/* First create shard interval metadata for split children */
|
||||
List *shardGroupSplitIntervalListList = CreateSplitIntervalsForShardGroup(
|
||||
sourceColocatedShardIntervalList,
|
||||
shardSplitPointsList);
|
||||
|
||||
/* Physically create split children on local/remote nodes */
|
||||
CreateSplitShardsForShardGroup(
|
||||
shardGroupSplitIntervalListList,
|
||||
workersForPlacementList);
|
||||
}
|
||||
|
||||
/* Create ShardGroup split children on a list of corresponding workers. */
|
||||
static void
|
||||
CreateSplitShardsForShardGroup(List *shardGroupSplitIntervalListList,
|
||||
List *workersForPlacementList)
|
||||
{
|
||||
/* Iterate on shard intervals for shard group */
|
||||
List *shardIntervalList = NULL;
|
||||
foreach_ptr(shardIntervalList, shardGroupSplitIntervalListList)
|
||||
{
|
||||
ShardInterval *shardInterval = NULL;
|
||||
WorkerNode *workerPlacementNode = NULL;
|
||||
forboth_ptr(shardInterval, shardIntervalList, workerPlacementNode,
|
||||
workersForPlacementList)
|
||||
{
|
||||
/* Populate list of commands necessary to create shard interval on destination */
|
||||
List *splitShardCreationCommandList = GetPreLoadTableCreationCommands(
|
||||
shardInterval->relationId,
|
||||
false, /* includeSequenceDefaults */
|
||||
NULL /* auto add columnar options for cstore tables */);
|
||||
splitShardCreationCommandList = WorkerApplyShardDDLCommandList(
|
||||
splitShardCreationCommandList,
|
||||
shardInterval->shardId);
|
||||
|
||||
/* Create new split child shard on the specified placement list */
|
||||
CreateShardOnPlacement(splitShardCreationCommandList, workerPlacementNode);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* Create a shard (using DDL list) on a worker node.
|
||||
*/
|
||||
static void
|
||||
CreateShardOnPlacement(List *splitShardCreationCommandList,
|
||||
WorkerNode *workerPlacementNode)
|
||||
{
|
||||
char *currentUser = CurrentUserName();
|
||||
SendCommandListToWorkerOutsideTransaction(workerPlacementNode->workerName,
|
||||
workerPlacementNode->workerPort,
|
||||
currentUser,
|
||||
splitShardCreationCommandList);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* Create split children intervals for a shardgroup given list of split points.
|
||||
*/
|
||||
static List *
|
||||
CreateSplitIntervalsForShardGroup(List *sourceColocatedShardIntervalList,
|
||||
List *splitPointsForShard)
|
||||
{
|
||||
List *shardGroupSplitIntervalListList = NULL;
|
||||
|
||||
ShardInterval *shardToSplitInterval = NULL;
|
||||
foreach_ptr(shardToSplitInterval, sourceColocatedShardIntervalList)
|
||||
{
|
||||
List *shardSplitIntervalList = NULL;
|
||||
CreateSplitIntervalsForShard(shardToSplitInterval, splitPointsForShard,
|
||||
&shardSplitIntervalList);
|
||||
|
||||
shardGroupSplitIntervalListList = lappend(shardGroupSplitIntervalListList,
|
||||
shardSplitIntervalList);
|
||||
}
|
||||
|
||||
return shardGroupSplitIntervalListList;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* Create split children intervals given a sourceshard and a list of split points.
|
||||
* Example: SourceShard is range [0, 100] and SplitPoints are (15, 30) will give us:
|
||||
* [(0, 15) (16, 30) (31, 100)]
|
||||
*/
|
||||
static void
|
||||
CreateSplitIntervalsForShard(ShardInterval *sourceShard,
|
||||
List *splitPointsForShard,
|
||||
List **shardSplitChildrenIntervalList)
|
||||
{
|
||||
/* For 'N' split points, we will have N+1 shard intervals created. */
|
||||
int shardIntervalCount = list_length(splitPointsForShard) + 1;
|
||||
ListCell *splitPointCell = list_head(splitPointsForShard);
|
||||
int32 splitParentMaxValue = DatumGetInt32(sourceShard->maxValue);
|
||||
|
||||
int32 currentSplitChildMinValue = DatumGetInt32(sourceShard->minValue);
|
||||
for (int index = 0; index < shardIntervalCount; index++)
|
||||
{
|
||||
ShardInterval *splitChildShardInterval = CopyShardInterval(sourceShard);
|
||||
splitChildShardInterval->shardIndex = -1;
|
||||
splitChildShardInterval->shardId = GetNextShardId();
|
||||
|
||||
splitChildShardInterval->minValueExists = true;
|
||||
splitChildShardInterval->minValue = currentSplitChildMinValue;
|
||||
splitChildShardInterval->maxValueExists = true;
|
||||
|
||||
/* Length of splitPointsForShard is one less than 'shardIntervalCount' and we need to account */
|
||||
/* for 'splitPointCell' being NULL for last iteration. */
|
||||
if (splitPointCell)
|
||||
{
|
||||
splitChildShardInterval->maxValue = DatumGetInt32((Datum) lfirst(
|
||||
splitPointCell));
|
||||
splitPointCell = lnext(splitPointsForShard, splitPointCell);
|
||||
}
|
||||
else
|
||||
{
|
||||
splitChildShardInterval->maxValue = splitParentMaxValue;
|
||||
}
|
||||
|
||||
currentSplitChildMinValue = splitChildShardInterval->maxValue + 1;
|
||||
*shardSplitChildrenIntervalList = lappend(*shardSplitChildrenIntervalList,
|
||||
splitChildShardInterval);
|
||||
}
|
||||
}
|
|
@ -8,3 +8,4 @@ DROP FUNCTION pg_catalog.worker_range_partition_table(bigint, integer, text, tex
|
|||
DROP FUNCTION pg_catalog.worker_repartition_cleanup(bigint);
|
||||
|
||||
#include "../../columnar/sql/columnar--11.0-2--11.1-1.sql"
|
||||
#include "udfs/citus_split_shard_by_split_points/11.0-2.sql"
|
||||
|
|
14
src/backend/distributed/sql/udfs/citus_split_shard_by_split_points/11.0-2.sql
generated
Normal file
14
src/backend/distributed/sql/udfs/citus_split_shard_by_split_points/11.0-2.sql
generated
Normal file
|
@ -0,0 +1,14 @@
|
|||
CREATE OR REPLACE FUNCTION pg_catalog.citus_split_shard_by_split_points(
|
||||
shard_id bigint,
|
||||
split_points integer[],
|
||||
-- A 'nodeId' is a uint32 in CITUS [1, 4294967296] but postgres does not have unsigned type support.
|
||||
-- Use integer (consistent with other previously defined UDFs that take nodeId as integer) as for all practical purposes it is big enough.
|
||||
node_ids integer[],
|
||||
-- Three modes to be implemented: blocking, non_blocking and auto.
|
||||
-- Currently, the default / only supported mode is blocking.
|
||||
split_mode citus.split_mode default 'blocking')
|
||||
RETURNS void
|
||||
LANGUAGE C STRICT
|
||||
AS 'MODULE_PATHNAME', $$citus_split_shard_by_split_points$$;
|
||||
COMMENT ON FUNCTION pg_catalog.citus_split_shard_by_split_points(shard_id bigint, split_points integer[], nodeIds integer[], citus.split_mode)
|
||||
IS 'split a shard using split mode.';
|
|
@ -0,0 +1,14 @@
|
|||
CREATE OR REPLACE FUNCTION pg_catalog.citus_split_shard_by_split_points(
|
||||
shard_id bigint,
|
||||
split_points integer[],
|
||||
-- A 'nodeId' is a uint32 in CITUS [1, 4294967296] but postgres does not have unsigned type support.
|
||||
-- Use integer (consistent with other previously defined UDFs that take nodeId as integer) as for all practical purposes it is big enough.
|
||||
node_ids integer[],
|
||||
-- Three modes to be implemented: blocking, non_blocking and auto.
|
||||
-- Currently, the default / only supported mode is blocking.
|
||||
split_mode citus.split_mode default 'blocking')
|
||||
RETURNS void
|
||||
LANGUAGE C STRICT
|
||||
AS 'MODULE_PATHNAME', $$citus_split_shard_by_split_points$$;
|
||||
COMMENT ON FUNCTION pg_catalog.citus_split_shard_by_split_points(shard_id bigint, split_points integer[], nodeIds integer[], citus.split_mode)
|
||||
IS 'split a shard using split mode.';
|
|
@ -12,6 +12,7 @@
|
|||
#include "postgres.h"
|
||||
#include "miscadmin.h"
|
||||
|
||||
#include "nodes/pg_list.h"
|
||||
#include "distributed/utils/array_type.h"
|
||||
#include "utils/array.h"
|
||||
#include "utils/lsyscache.h"
|
||||
|
@ -96,3 +97,21 @@ DatumArrayToArrayType(Datum *datumArray, int datumCount, Oid datumTypeId)
|
|||
|
||||
return arrayObject;
|
||||
}
|
||||
|
||||
/*
|
||||
* Converts ArrayType to List.
|
||||
*/
|
||||
List *
|
||||
IntegerArrayTypeToList(ArrayType *arrayObject)
|
||||
{
|
||||
List *list = NULL;
|
||||
Datum *datumObjectArray = DeconstructArrayObject(arrayObject);
|
||||
int arrayObjectCount = ArrayObjectCount(arrayObject);
|
||||
|
||||
for (int index = 0; index < arrayObjectCount; index++)
|
||||
{
|
||||
list = lappend_int(list, datumObjectArray[index]);
|
||||
}
|
||||
|
||||
return list;
|
||||
}
|
||||
|
|
|
@ -282,6 +282,9 @@ extern int MasterDropAllShards(Oid relationId, char *schemaName, char *relationN
|
|||
extern Datum master_create_worker_shards(PG_FUNCTION_ARGS);
|
||||
extern Datum isolate_tenant_to_new_shard(PG_FUNCTION_ARGS);
|
||||
|
||||
/* function declarations for shard split functionality */
|
||||
extern Datum citus_split_shard_by_split_points(PG_FUNCTION_ARGS);
|
||||
|
||||
/* function declarations for shard repair functionality */
|
||||
extern Datum master_copy_shard_placement(PG_FUNCTION_ARGS);
|
||||
|
||||
|
|
|
@ -0,0 +1,40 @@
|
|||
/*-------------------------------------------------------------------------
|
||||
*
|
||||
* shard_split.h
|
||||
*
|
||||
* API for shard splits.
|
||||
*
|
||||
* Copyright (c) Citus Data, Inc.
|
||||
*
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
|
||||
#ifndef SHARDSPLIT_UTILS_H_
|
||||
#define SHARDSPLIT_UTILS_H_
|
||||
|
||||
/* Split Modes supported by Shard Split API */
|
||||
typedef enum SplitMode
|
||||
{
|
||||
BLOCKING_SPLIT = 0,
|
||||
} SplitMode;
|
||||
|
||||
/*
|
||||
* User Scenario calling Split Shard API.
|
||||
* The 'SplitOperation' type is used to customize info/error messages based on user scenario.
|
||||
*/
|
||||
typedef enum SplitOperation
|
||||
{
|
||||
SHARD_SPLIT_API = 0
|
||||
} SplitOperation;
|
||||
|
||||
/*
|
||||
* SplitShard API to split a given shard (or shard group) in blocking / non-blocking fashion
|
||||
* based on specified split points to a set of destination nodes.
|
||||
*/
|
||||
extern void SplitShard(SplitMode splitMode,
|
||||
SplitOperation splitOperation,
|
||||
uint64 shardIdToSplit,
|
||||
List *shardSplitPointsList,
|
||||
List *nodeIdsForPlacementList);
|
||||
|
||||
#endif /* SHARDSPLIT_UTILS_H_ */
|
|
@ -20,6 +20,6 @@ extern Datum * DeconstructArrayObject(ArrayType *arrayObject);
|
|||
extern int32 ArrayObjectCount(ArrayType *arrayObject);
|
||||
extern ArrayType * DatumArrayToArrayType(Datum *datumArray, int datumCount,
|
||||
Oid datumTypeId);
|
||||
|
||||
extern List * IntegerArrayTypeToList(ArrayType *arrayObject);
|
||||
|
||||
#endif /* CITUS_ARRAY_TYPE_H */
|
||||
|
|
Loading…
Reference in New Issue