mirror of https://github.com/citusdata/citus.git
844 lines
29 KiB
C
844 lines
29 KiB
C
/*-------------------------------------------------------------------------
|
|
*
|
|
* shard_split.c
|
|
*
|
|
* Function definitions for the shard split.
|
|
*
|
|
* Copyright (c) Citus Data, Inc.
|
|
*
|
|
*-------------------------------------------------------------------------
|
|
*/
|
|
|
|
#include "postgres.h"
|
|
#include "nodes/pg_list.h"
|
|
#include "utils/array.h"
|
|
#include "distributed/utils/array_type.h"
|
|
#include "lib/stringinfo.h"
|
|
#include "utils/builtins.h"
|
|
#include "utils/lsyscache.h"
|
|
#include "distributed/adaptive_executor.h"
|
|
#include "distributed/colocation_utils.h"
|
|
#include "distributed/metadata_cache.h"
|
|
#include "distributed/shardinterval_utils.h"
|
|
#include "distributed/coordinator_protocol.h"
|
|
#include "distributed/connection_management.h"
|
|
#include "distributed/remote_commands.h"
|
|
#include "distributed/shard_split.h"
|
|
#include "distributed/reference_table_utils.h"
|
|
#include "distributed/multi_partitioning_utils.h"
|
|
#include "distributed/worker_manager.h"
|
|
#include "distributed/worker_transaction.h"
|
|
#include "distributed/shared_library_init.h"
|
|
#include "distributed/pg_dist_shard.h"
|
|
#include "distributed/metadata_sync.h"
|
|
#include "distributed/multi_physical_planner.h"
|
|
|
|
/* Function declarations */
|
|
static void ErrorIfCannotSplitShardExtended(SplitOperation splitOperation,
|
|
ShardInterval *shardIntervalToSplit,
|
|
List *shardSplitPointsList,
|
|
List *nodeIdsForPlacementList);
|
|
static void CreateSplitShardsForShardGroup(WorkerNode *sourceShardNode,
|
|
List *sourceColocatedShardIntervalList,
|
|
List *shardGroupSplitIntervalListList,
|
|
List *workersForPlacementList);
|
|
static void CreateObjectOnPlacement(List *objectCreationCommandList,
|
|
WorkerNode *workerNode);
|
|
static List * CreateSplitIntervalsForShardGroup(List *sourceColocatedShardList,
|
|
List *splitPointsForShard);
|
|
static void CreateSplitIntervalsForShard(ShardInterval *sourceShard,
|
|
List *splitPointsForShard,
|
|
List **shardSplitChildrenIntervalList);
|
|
static void BlockingShardSplit(SplitOperation splitOperation,
|
|
ShardInterval *shardIntervalToSplit,
|
|
List *shardSplitPointsList,
|
|
List *workersForPlacementList);
|
|
static void DoSplitCopy(WorkerNode *sourceShardNode,
|
|
List *sourceColocatedShardIntervalList,
|
|
List *shardGroupSplitIntervalListList,
|
|
List *workersForPlacementList);
|
|
static StringInfo CreateSplitCopyCommand(ShardInterval *sourceShardSplitInterval,
|
|
List *splitChildrenShardIntervalList,
|
|
List *workersForPlacementList);
|
|
static void InsertSplitChildrenShardMetadata(List *shardGroupSplitIntervalListList,
|
|
List *workersForPlacementList);
|
|
static void CreateForeignKeyConstraints(List *shardGroupSplitIntervalListList,
|
|
List *workersForPlacementList);
|
|
|
|
/* Customize error message strings based on operation type */
|
|
static const char *const SplitOperationName[] =
|
|
{
|
|
[SHARD_SPLIT_API] = "split",
|
|
[ISOLATE_TENANT_TO_NEW_SHARD] = "isolate",
|
|
};
|
|
static const char *const SplitTargetName[] =
|
|
{
|
|
[SHARD_SPLIT_API] = "shard",
|
|
[ISOLATE_TENANT_TO_NEW_SHARD] = "tenant",
|
|
};
|
|
|
|
/* Function definitions */
|
|
|
|
/*
|
|
* ErrorIfCannotSplitShard checks relation kind and invalid shards. It errors
|
|
* out if we are not able to split the given shard.
|
|
*/
|
|
void
|
|
ErrorIfCannotSplitShard(SplitOperation splitOperation, ShardInterval *sourceShard)
|
|
{
|
|
Oid relationId = sourceShard->relationId;
|
|
ListCell *colocatedTableCell = NULL;
|
|
ListCell *colocatedShardCell = NULL;
|
|
|
|
/* checks for table ownership and foreign tables */
|
|
List *colocatedTableList = ColocatedTableList(relationId);
|
|
foreach(colocatedTableCell, colocatedTableList)
|
|
{
|
|
Oid colocatedTableId = lfirst_oid(colocatedTableCell);
|
|
|
|
/* check that user has owner rights in all co-located tables */
|
|
EnsureTableOwner(colocatedTableId);
|
|
|
|
char relationKind = get_rel_relkind(colocatedTableId);
|
|
if (relationKind == RELKIND_FOREIGN_TABLE)
|
|
{
|
|
char *relationName = get_rel_name(colocatedTableId);
|
|
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
|
errmsg("cannot %s %s because \"%s\" is a "
|
|
"foreign table",
|
|
SplitOperationName[splitOperation],
|
|
SplitTargetName[splitOperation],
|
|
relationName),
|
|
errdetail("Splitting shards backed by foreign tables "
|
|
"is not supported.")));
|
|
}
|
|
|
|
/*
|
|
* At the moment, we do not support copying a shard if that shard's
|
|
* relation is in a colocation group with a partitioned table or partition.
|
|
*/
|
|
if (PartitionedTable(colocatedTableId))
|
|
{
|
|
char *sourceRelationName = get_rel_name(relationId);
|
|
char *colocatedRelationName = get_rel_name(colocatedTableId);
|
|
|
|
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
|
errmsg("cannot %s of '%s', because it "
|
|
"is a partitioned table",
|
|
SplitOperationName[splitOperation],
|
|
colocatedRelationName),
|
|
errdetail("In colocation group of '%s', a partitioned "
|
|
"relation exists: '%s'. Citus does not support "
|
|
"%s of partitioned tables.",
|
|
sourceRelationName,
|
|
colocatedRelationName,
|
|
SplitOperationName[splitOperation])));
|
|
}
|
|
}
|
|
|
|
/* check shards with inactive placements */
|
|
List *colocatedShardList = ColocatedShardIntervalList(sourceShard);
|
|
foreach(colocatedShardCell, colocatedShardList)
|
|
{
|
|
ShardInterval *shardInterval = (ShardInterval *) lfirst(colocatedShardCell);
|
|
uint64 shardId = shardInterval->shardId;
|
|
ListCell *shardPlacementCell = NULL;
|
|
|
|
List *shardPlacementList = ShardPlacementListWithoutOrphanedPlacements(shardId);
|
|
foreach(shardPlacementCell, shardPlacementList)
|
|
{
|
|
ShardPlacement *placement = (ShardPlacement *) lfirst(shardPlacementCell);
|
|
if (placement->shardState != SHARD_STATE_ACTIVE)
|
|
{
|
|
char *relationName = get_rel_name(shardInterval->relationId);
|
|
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
|
errmsg("cannot %s %s because relation "
|
|
"\"%s\" has an inactive shard placement "
|
|
"for the shard %lu",
|
|
SplitOperationName[splitOperation],
|
|
SplitTargetName[splitOperation],
|
|
relationName, shardId),
|
|
errhint("Use master_copy_shard_placement UDF to "
|
|
"repair the inactive shard placement.")));
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
/*
|
|
* Exteded checks before we decide to split the shard.
|
|
* When all consumers (Example : ISOLATE_TENANT_TO_NEW_SHARD) directly call 'SplitShard' API,
|
|
* this method will be merged with 'ErrorIfCannotSplitShard' above.
|
|
*/
|
|
static void
|
|
ErrorIfCannotSplitShardExtended(SplitOperation splitOperation,
|
|
ShardInterval *shardIntervalToSplit,
|
|
List *shardSplitPointsList,
|
|
List *nodeIdsForPlacementList)
|
|
{
|
|
CitusTableCacheEntry *cachedTableEntry = GetCitusTableCacheEntry(
|
|
shardIntervalToSplit->relationId);
|
|
|
|
/* Perform checks common to both blocking and non-blocking Split API here. */
|
|
if (!IsCitusTableTypeCacheEntry(cachedTableEntry, HASH_DISTRIBUTED))
|
|
{
|
|
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
|
errmsg("Cannot %s %s as operation "
|
|
"is only supported for hash distributed tables.",
|
|
SplitOperationName[splitOperation],
|
|
SplitTargetName[splitOperation])));
|
|
}
|
|
|
|
if (extern_IsColumnarTableAmTable(shardIntervalToSplit->relationId))
|
|
{
|
|
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
|
errmsg("Cannot %s %s as operation "
|
|
"is not supported for Columnar tables.",
|
|
SplitOperationName[splitOperation],
|
|
SplitTargetName[splitOperation])));
|
|
}
|
|
|
|
uint32 relationReplicationFactor = TableShardReplicationFactor(
|
|
shardIntervalToSplit->relationId);
|
|
if (relationReplicationFactor > 1)
|
|
{
|
|
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
|
errmsg(
|
|
"Operation %s not supported for %s as replication factor '%u' "
|
|
"is greater than 1.",
|
|
SplitOperationName[splitOperation],
|
|
SplitTargetName[splitOperation],
|
|
relationReplicationFactor)));
|
|
}
|
|
|
|
int splitPointsCount = list_length(shardSplitPointsList);
|
|
int nodeIdsCount = list_length(nodeIdsForPlacementList);
|
|
if (nodeIdsCount != splitPointsCount + 1)
|
|
{
|
|
ereport(ERROR,
|
|
(errcode(ERRCODE_SYNTAX_ERROR),
|
|
errmsg(
|
|
"Number of worker node ids should be one greater split points. "
|
|
"NodeId count is '%d' and SplitPoint count is '%d'.",
|
|
nodeIdsCount,
|
|
splitPointsCount)));
|
|
}
|
|
|
|
|
|
Assert(shardIntervalToSplit->minValueExists);
|
|
Assert(shardIntervalToSplit->maxValueExists);
|
|
|
|
/* We already verified table is Hash Distributed. We know (minValue, maxValue) are integers. */
|
|
int32 minValue = DatumGetInt32(shardIntervalToSplit->minValue);
|
|
int32 maxValue = DatumGetInt32(shardIntervalToSplit->maxValue);
|
|
|
|
/* Fail if Shard Interval cannot be split anymore i.e (min, max) range overlap. */
|
|
if (minValue == maxValue)
|
|
{
|
|
ereport(ERROR,
|
|
(errcode(ERRCODE_SYNTAX_ERROR),
|
|
errmsg(
|
|
"Cannot split shard id \"%lu\" as min/max range are equal: ('%d', '%d').",
|
|
shardIntervalToSplit->shardId,
|
|
minValue,
|
|
maxValue)));
|
|
}
|
|
|
|
NullableDatum lastShardSplitPoint = { 0, true /*isnull*/ };
|
|
Datum shardSplitPoint;
|
|
foreach_int(shardSplitPoint, shardSplitPointsList)
|
|
{
|
|
int32 shardSplitPointValue = DatumGetInt32(shardSplitPoint);
|
|
|
|
/* All Split points should lie within the shard interval range. */
|
|
int splitPointShardIndex = FindShardIntervalIndex(shardSplitPoint,
|
|
cachedTableEntry);
|
|
if (shardIntervalToSplit->shardIndex != splitPointShardIndex)
|
|
{
|
|
ereport(ERROR,
|
|
(errcode(ERRCODE_SYNTAX_ERROR),
|
|
errmsg(
|
|
"Split point %d is outside the min/max range(%d, %d) for shard id %lu.",
|
|
shardSplitPointValue,
|
|
DatumGetInt32(shardIntervalToSplit->minValue),
|
|
DatumGetInt32(shardIntervalToSplit->maxValue),
|
|
shardIntervalToSplit->shardId)));
|
|
}
|
|
|
|
/* Split points should be in strictly increasing order */
|
|
int32 lastShardSplitPointValue = DatumGetInt32(lastShardSplitPoint.value);
|
|
if (!lastShardSplitPoint.isnull && shardSplitPointValue <=
|
|
lastShardSplitPointValue)
|
|
{
|
|
ereport(ERROR,
|
|
(errcode(ERRCODE_SYNTAX_ERROR),
|
|
errmsg(
|
|
"Invalid Split Points '%d' followed by '%d'. "
|
|
"All split points should be strictly increasing.",
|
|
lastShardSplitPointValue,
|
|
shardSplitPointValue)));
|
|
}
|
|
|
|
/*
|
|
* Given our split points inclusive, you cannot specify the max value in a range as a split point.
|
|
* Example: Shard 81060002 range is from (0,1073741823). '1073741823' as split point is invalid.
|
|
* '1073741822' is correct and will split shard to: (0, 1073741822) and (1073741823, 1073741823).
|
|
*/
|
|
if (maxValue == shardSplitPointValue)
|
|
{
|
|
int32 validSplitPoint = shardIntervalToSplit->maxValue - 1;
|
|
ereport(ERROR,
|
|
(errcode(ERRCODE_SYNTAX_ERROR),
|
|
errmsg(
|
|
"Invalid split point %d, as split points should be inclusive. Please use %d instead.",
|
|
maxValue,
|
|
validSplitPoint)));
|
|
}
|
|
|
|
lastShardSplitPoint = (NullableDatum) {
|
|
shardSplitPoint, false
|
|
};
|
|
}
|
|
}
|
|
|
|
|
|
/*
|
|
* SplitShard API to split a given shard (or shard group) based on specified split points
|
|
* to a set of destination nodes.
|
|
* 'splitMode' : Mode of split operation.
|
|
* 'splitOperation' : Customer operation that triggered split.
|
|
* 'shardInterval' : Source shard interval to be split.
|
|
* 'shardSplitPointsList' : Split Points list for the source 'shardInterval'.
|
|
* 'nodeIdsForPlacementList' : Placement list corresponding to split children.
|
|
*/
|
|
void
|
|
SplitShard(SplitMode splitMode,
|
|
SplitOperation splitOperation,
|
|
uint64 shardIdToSplit,
|
|
List *shardSplitPointsList,
|
|
List *nodeIdsForPlacementList)
|
|
{
|
|
if (XactModificationLevel > XACT_MODIFICATION_NONE)
|
|
{
|
|
ereport(ERROR, (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
|
|
errmsg("cannot %s %s after other modifications "
|
|
"in the same transaction.",
|
|
SplitOperationName[splitOperation],
|
|
SplitTargetName[splitOperation])));
|
|
}
|
|
|
|
ShardInterval *shardIntervalToSplit = LoadShardInterval(shardIdToSplit);
|
|
List *colocatedTableList = ColocatedTableList(shardIntervalToSplit->relationId);
|
|
|
|
/* sort the tables to avoid deadlocks */
|
|
colocatedTableList = SortList(colocatedTableList, CompareOids);
|
|
Oid colocatedTableId = InvalidOid;
|
|
foreach_oid(colocatedTableId, colocatedTableList)
|
|
{
|
|
/*
|
|
* Block concurrent DDL / TRUNCATE commands on the relation. Similarly,
|
|
* block concurrent citus_move_shard_placement() / isolate_tenant_to_new_shard()
|
|
* on any shard of the same relation.
|
|
*/
|
|
LockRelationOid(colocatedTableId, ShareUpdateExclusiveLock);
|
|
}
|
|
|
|
ErrorIfCannotSplitShard(SHARD_SPLIT_API, shardIntervalToSplit);
|
|
ErrorIfCannotSplitShardExtended(
|
|
SHARD_SPLIT_API,
|
|
shardIntervalToSplit,
|
|
shardSplitPointsList,
|
|
nodeIdsForPlacementList);
|
|
|
|
List *workersForPlacementList = NULL;
|
|
Datum nodeId;
|
|
foreach_int(nodeId, nodeIdsForPlacementList)
|
|
{
|
|
uint32 nodeIdValue = DatumGetUInt32(nodeId);
|
|
WorkerNode *workerNode = LookupNodeByNodeId(nodeIdValue);
|
|
|
|
/* NodeId in Citus are unsigned and range from [1, 4294967296]. */
|
|
if (nodeIdValue < 1 || workerNode == NULL)
|
|
{
|
|
ereport(ERROR, (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
|
|
errmsg("Invalid Node Id '%u'.", nodeIdValue)));
|
|
}
|
|
|
|
workersForPlacementList =
|
|
lappend(workersForPlacementList, (void *) workerNode);
|
|
}
|
|
|
|
if (splitMode == BLOCKING_SPLIT)
|
|
{
|
|
EnsureReferenceTablesExistOnAllNodesExtended(TRANSFER_MODE_BLOCK_WRITES);
|
|
BlockingShardSplit(
|
|
splitOperation,
|
|
shardIntervalToSplit,
|
|
shardSplitPointsList,
|
|
workersForPlacementList);
|
|
}
|
|
else
|
|
{
|
|
/* we only support blocking shard split in this code path for now. */
|
|
ereport(ERROR, (errmsg("Invalid split mode value %d.", splitMode)));
|
|
}
|
|
}
|
|
|
|
|
|
/*
|
|
* SplitShard API to split a given shard (or shard group) in blocking fashion
|
|
* based on specified split points to a set of destination nodes.
|
|
* 'splitOperation' : Customer operation that triggered split.
|
|
* 'shardIntervalToSplit' : Source shard interval to be split.
|
|
* 'shardSplitPointsList' : Split Points list for the source 'shardInterval'.
|
|
* 'workersForPlacementList' : Placement list corresponding to split children.
|
|
*/
|
|
static void
|
|
BlockingShardSplit(SplitOperation splitOperation,
|
|
ShardInterval *shardIntervalToSplit,
|
|
List *shardSplitPointsList,
|
|
List *workersForPlacementList)
|
|
{
|
|
List *sourceColocatedShardIntervalList = ColocatedShardIntervalList(
|
|
shardIntervalToSplit);
|
|
|
|
BlockWritesToShardList(sourceColocatedShardIntervalList);
|
|
|
|
/* First create shard interval metadata for split children */
|
|
List *shardGroupSplitIntervalListList = CreateSplitIntervalsForShardGroup(
|
|
sourceColocatedShardIntervalList,
|
|
shardSplitPointsList);
|
|
|
|
/* Only single placement allowed (already validated RelationReplicationFactor = 1) */
|
|
List *sourcePlacementList = ActiveShardPlacementList(shardIntervalToSplit->shardId);
|
|
Assert(sourcePlacementList->length == 1);
|
|
ShardPlacement *sourceShardPlacement = (ShardPlacement *) linitial(
|
|
sourcePlacementList);
|
|
WorkerNode *sourceShardToCopyNode = FindNodeWithNodeId(sourceShardPlacement->nodeId,
|
|
false /* missingOk */);
|
|
|
|
/*
|
|
* Physically create split children, perform split copy and create auxillary structures.
|
|
* This includes: indexes, replicaIdentity. triggers and statistics.
|
|
* Foreign key constraints are created after Metadata changes (see CreateForeignKeyConstraints).
|
|
*/
|
|
CreateSplitShardsForShardGroup(
|
|
sourceShardToCopyNode,
|
|
sourceColocatedShardIntervalList,
|
|
shardGroupSplitIntervalListList,
|
|
workersForPlacementList);
|
|
|
|
/*
|
|
* Drop old shards and delete related metadata. Have to do that before
|
|
* creating the new shard metadata, because there's cross-checks
|
|
* preventing inconsistent metadata (like overlapping shards).
|
|
*/
|
|
DropShardList(sourceColocatedShardIntervalList);
|
|
|
|
/* Insert new shard and placement metdata */
|
|
InsertSplitChildrenShardMetadata(shardGroupSplitIntervalListList,
|
|
workersForPlacementList);
|
|
|
|
/*
|
|
* Create foreign keys if exists after the metadata changes happening in
|
|
* DropShardList() and InsertSplitChildrenShardMetadata() because the foreign
|
|
* key creation depends on the new metadata.
|
|
*/
|
|
CreateForeignKeyConstraints(shardGroupSplitIntervalListList, workersForPlacementList);
|
|
|
|
CitusInvalidateRelcacheByRelid(DistShardRelationId());
|
|
}
|
|
|
|
|
|
/* Create ShardGroup split children on a list of corresponding workers. */
|
|
static void
|
|
CreateSplitShardsForShardGroup(WorkerNode *sourceShardNode,
|
|
List *sourceColocatedShardIntervalList,
|
|
List *shardGroupSplitIntervalListList,
|
|
List *workersForPlacementList)
|
|
{
|
|
/* Iterate on shard interval list for shard group */
|
|
List *shardIntervalList = NULL;
|
|
foreach_ptr(shardIntervalList, shardGroupSplitIntervalListList)
|
|
{
|
|
/* Iterate on split shard interval list and corresponding placement worker */
|
|
ShardInterval *shardInterval = NULL;
|
|
WorkerNode *workerPlacementNode = NULL;
|
|
forboth_ptr(shardInterval, shardIntervalList, workerPlacementNode,
|
|
workersForPlacementList)
|
|
{
|
|
/* Populate list of commands necessary to create shard interval on destination */
|
|
List *splitShardCreationCommandList = GetPreLoadTableCreationCommands(
|
|
shardInterval->relationId,
|
|
false, /* includeSequenceDefaults */
|
|
NULL /* auto add columnar options for cstore tables */);
|
|
splitShardCreationCommandList = WorkerApplyShardDDLCommandList(
|
|
splitShardCreationCommandList,
|
|
shardInterval->shardId);
|
|
|
|
/* Create new split child shard on the specified placement list */
|
|
CreateObjectOnPlacement(splitShardCreationCommandList, workerPlacementNode);
|
|
}
|
|
}
|
|
|
|
/* Perform Split Copy */
|
|
DoSplitCopy(sourceShardNode, sourceColocatedShardIntervalList,
|
|
shardGroupSplitIntervalListList, workersForPlacementList);
|
|
|
|
/*
|
|
* Create auxillary structures post copy.
|
|
*/
|
|
foreach_ptr(shardIntervalList, shardGroupSplitIntervalListList)
|
|
{
|
|
/* Iterate on split shard interval list and corresponding placement worker */
|
|
ShardInterval *shardInterval = NULL;
|
|
WorkerNode *workerPlacementNode = NULL;
|
|
forboth_ptr(shardInterval, shardIntervalList, workerPlacementNode,
|
|
workersForPlacementList)
|
|
{
|
|
List *indexCommandList = GetPostLoadTableCreationCommands(
|
|
shardInterval->relationId,
|
|
true /* includeIndexes */,
|
|
true /* includeReplicaIdentity */);
|
|
indexCommandList = WorkerApplyShardDDLCommandList(
|
|
indexCommandList,
|
|
shardInterval->shardId);
|
|
|
|
CreateObjectOnPlacement(indexCommandList, workerPlacementNode);
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
/*
|
|
* Perform Split Copy from source shard(s) to split children.
|
|
* 'sourceShardNode' : Source shard worker node.
|
|
* 'sourceColocatedShardIntervalList' : List of source shard intervals from shard group.
|
|
* 'shardGroupSplitIntervalListList' : List of shard intervals for split children.
|
|
* 'workersForPlacementList' : List of workers for split children placement.
|
|
*/
|
|
static void
|
|
DoSplitCopy(WorkerNode *sourceShardNode, List *sourceColocatedShardIntervalList,
|
|
List *shardGroupSplitIntervalListList, List *destinationWorkerNodesList)
|
|
{
|
|
ShardInterval *sourceShardIntervalToCopy = NULL;
|
|
List *splitShardIntervalList = NULL;
|
|
|
|
int taskId = 0;
|
|
List *splitCopyTaskList = NULL;
|
|
forboth_ptr(sourceShardIntervalToCopy, sourceColocatedShardIntervalList,
|
|
splitShardIntervalList, shardGroupSplitIntervalListList)
|
|
{
|
|
StringInfo splitCopyUdfCommand = CreateSplitCopyCommand(sourceShardIntervalToCopy,
|
|
splitShardIntervalList,
|
|
destinationWorkerNodesList);
|
|
|
|
Task *splitCopyTask = CreateBasicTask(
|
|
sourceShardIntervalToCopy->shardId, /* jobId */
|
|
taskId,
|
|
READ_TASK,
|
|
splitCopyUdfCommand->data);
|
|
|
|
ShardPlacement *taskPlacement = CitusMakeNode(ShardPlacement);
|
|
SetPlacementNodeMetadata(taskPlacement, sourceShardNode);
|
|
|
|
splitCopyTask->taskPlacementList = list_make1(taskPlacement);
|
|
|
|
splitCopyTaskList = lappend(splitCopyTaskList, splitCopyTask);
|
|
taskId++;
|
|
}
|
|
|
|
ExecuteTaskListOutsideTransaction(ROW_MODIFY_NONE, splitCopyTaskList,
|
|
MaxAdaptiveExecutorPoolSize,
|
|
NULL /* jobIdList (ignored by API implementation) */);
|
|
}
|
|
|
|
|
|
/*
|
|
* Create Copy command for a given shard source shard to be copied to corresponding split children.
|
|
* 'sourceShardSplitInterval' : Source shard interval to be copied.
|
|
* 'splitChildrenShardINnerIntervalList' : List of shard intervals for split children.
|
|
* 'destinationWorkerNodesList' : List of workers for split children placement.
|
|
*/
|
|
static StringInfo
|
|
CreateSplitCopyCommand(ShardInterval *sourceShardSplitInterval,
|
|
List *splitChildrenShardIntervalList,
|
|
List *destinationWorkerNodesList)
|
|
{
|
|
StringInfo splitCopyInfoArray = makeStringInfo();
|
|
appendStringInfo(splitCopyInfoArray, "ARRAY[");
|
|
|
|
ShardInterval *splitChildShardInterval = NULL;
|
|
bool addComma = false;
|
|
WorkerNode *destinationWorkerNode = NULL;
|
|
forboth_ptr(splitChildShardInterval, splitChildrenShardIntervalList,
|
|
destinationWorkerNode, destinationWorkerNodesList)
|
|
{
|
|
if (addComma)
|
|
{
|
|
appendStringInfo(splitCopyInfoArray, ",");
|
|
}
|
|
|
|
StringInfo splitCopyInfoRow = makeStringInfo();
|
|
appendStringInfo(splitCopyInfoRow,
|
|
"ROW(%lu, %d, %d, %u)::citus.split_copy_info",
|
|
splitChildShardInterval->shardId,
|
|
DatumGetInt32(splitChildShardInterval->minValue),
|
|
DatumGetInt32(splitChildShardInterval->maxValue),
|
|
destinationWorkerNode->nodeId);
|
|
appendStringInfo(splitCopyInfoArray, "%s", splitCopyInfoRow->data);
|
|
|
|
addComma = true;
|
|
}
|
|
appendStringInfo(splitCopyInfoArray, "]");
|
|
|
|
StringInfo splitCopyUdf = makeStringInfo();
|
|
appendStringInfo(splitCopyUdf, "SELECT worker_split_copy(%lu, %s);",
|
|
sourceShardSplitInterval->shardId,
|
|
splitCopyInfoArray->data);
|
|
|
|
return splitCopyUdf;
|
|
}
|
|
|
|
|
|
/*
|
|
* Create an object on a worker node.
|
|
*/
|
|
static void
|
|
CreateObjectOnPlacement(List *objectCreationCommandList,
|
|
WorkerNode *workerPlacementNode)
|
|
{
|
|
char *currentUser = CurrentUserName();
|
|
SendCommandListToWorkerOutsideTransaction(workerPlacementNode->workerName,
|
|
workerPlacementNode->workerPort,
|
|
currentUser,
|
|
objectCreationCommandList);
|
|
}
|
|
|
|
|
|
/*
|
|
* Create split children intervals for a shardgroup given list of split points.
|
|
*/
|
|
static List *
|
|
CreateSplitIntervalsForShardGroup(List *sourceColocatedShardIntervalList,
|
|
List *splitPointsForShard)
|
|
{
|
|
List *shardGroupSplitIntervalListList = NULL;
|
|
|
|
ShardInterval *shardToSplitInterval = NULL;
|
|
foreach_ptr(shardToSplitInterval, sourceColocatedShardIntervalList)
|
|
{
|
|
List *shardSplitIntervalList = NULL;
|
|
CreateSplitIntervalsForShard(shardToSplitInterval, splitPointsForShard,
|
|
&shardSplitIntervalList);
|
|
|
|
shardGroupSplitIntervalListList = lappend(shardGroupSplitIntervalListList,
|
|
shardSplitIntervalList);
|
|
}
|
|
|
|
return shardGroupSplitIntervalListList;
|
|
}
|
|
|
|
|
|
/*
|
|
* Create split children intervals given a sourceshard and a list of split points.
|
|
* Example: SourceShard is range [0, 100] and SplitPoints are (15, 30) will give us:
|
|
* [(0, 15) (16, 30) (31, 100)]
|
|
*/
|
|
static void
|
|
CreateSplitIntervalsForShard(ShardInterval *sourceShard,
|
|
List *splitPointsForShard,
|
|
List **shardSplitChildrenIntervalList)
|
|
{
|
|
/* For 'N' split points, we will have N+1 shard intervals created. */
|
|
int shardIntervalCount = list_length(splitPointsForShard) + 1;
|
|
ListCell *splitPointCell = list_head(splitPointsForShard);
|
|
int32 splitParentMaxValue = DatumGetInt32(sourceShard->maxValue);
|
|
|
|
int32 currentSplitChildMinValue = DatumGetInt32(sourceShard->minValue);
|
|
for (int index = 0; index < shardIntervalCount; index++)
|
|
{
|
|
ShardInterval *splitChildShardInterval = CopyShardInterval(sourceShard);
|
|
splitChildShardInterval->shardIndex = -1;
|
|
splitChildShardInterval->shardId = GetNextShardId();
|
|
|
|
splitChildShardInterval->minValueExists = true;
|
|
splitChildShardInterval->minValue = currentSplitChildMinValue;
|
|
splitChildShardInterval->maxValueExists = true;
|
|
|
|
/* Length of splitPointsForShard is one less than 'shardIntervalCount' and we need to account */
|
|
/* for 'splitPointCell' being NULL for last iteration. */
|
|
if (splitPointCell)
|
|
{
|
|
splitChildShardInterval->maxValue = DatumGetInt32((Datum) lfirst(
|
|
splitPointCell));
|
|
splitPointCell = lnext(splitPointsForShard, splitPointCell);
|
|
}
|
|
else
|
|
{
|
|
splitChildShardInterval->maxValue = splitParentMaxValue;
|
|
}
|
|
|
|
currentSplitChildMinValue = splitChildShardInterval->maxValue + 1;
|
|
*shardSplitChildrenIntervalList = lappend(*shardSplitChildrenIntervalList,
|
|
splitChildShardInterval);
|
|
}
|
|
}
|
|
|
|
|
|
/*
|
|
* Insert new shard and placement metadata.
|
|
* Sync the Metadata with all nodes if enabled.
|
|
*/
|
|
static void
|
|
InsertSplitChildrenShardMetadata(List *shardGroupSplitIntervalListList,
|
|
List *workersForPlacementList)
|
|
{
|
|
/* Iterate on shard intervals for shard group */
|
|
List *shardIntervalList = NULL;
|
|
List *syncedShardList = NULL;
|
|
foreach_ptr(shardIntervalList, shardGroupSplitIntervalListList)
|
|
{
|
|
/* Iterate on split children shards along with the respective placement workers */
|
|
ShardInterval *shardInterval = NULL;
|
|
WorkerNode *workerPlacementNode = NULL;
|
|
forboth_ptr(shardInterval, shardIntervalList, workerPlacementNode,
|
|
workersForPlacementList)
|
|
{
|
|
InsertShardRow(
|
|
shardInterval->relationId,
|
|
shardInterval->shardId,
|
|
shardInterval->storageType,
|
|
IntegerToText(DatumGetInt32(shardInterval->minValue)),
|
|
IntegerToText(DatumGetInt32(shardInterval->maxValue)));
|
|
|
|
InsertShardPlacementRow(
|
|
shardInterval->shardId,
|
|
INVALID_PLACEMENT_ID, /* triggers generation of new id */
|
|
SHARD_STATE_ACTIVE,
|
|
0, /* shard length (zero for HashDistributed Table) */
|
|
workerPlacementNode->groupId);
|
|
}
|
|
|
|
if (ShouldSyncTableMetadata(shardInterval->relationId))
|
|
{
|
|
syncedShardList = lappend(syncedShardList, shardInterval);
|
|
}
|
|
}
|
|
|
|
/* send commands to synced nodes one by one */
|
|
List *splitOffShardMetadataCommandList = ShardListInsertCommand(syncedShardList);
|
|
char *command = NULL;
|
|
foreach_ptr(command, splitOffShardMetadataCommandList)
|
|
{
|
|
SendCommandToWorkersWithMetadata(command);
|
|
}
|
|
}
|
|
|
|
|
|
/*
|
|
* Create foreign key constraints on the split children shards.
|
|
*/
|
|
static void
|
|
CreateForeignKeyConstraints(List *shardGroupSplitIntervalListList,
|
|
List *workersForPlacementList)
|
|
{
|
|
/* Create constraints between shards */
|
|
List *shardIntervalList = NULL;
|
|
foreach_ptr(shardIntervalList, shardGroupSplitIntervalListList)
|
|
{
|
|
/* Iterate on split children shards along with the respective placement workers */
|
|
ShardInterval *shardInterval = NULL;
|
|
WorkerNode *workerPlacementNode = NULL;
|
|
forboth_ptr(shardInterval, shardIntervalList, workerPlacementNode,
|
|
workersForPlacementList)
|
|
{
|
|
List *shardForeignConstraintCommandList = NIL;
|
|
List *referenceTableForeignConstraintList = NIL;
|
|
|
|
CopyShardForeignConstraintCommandListGrouped(shardInterval,
|
|
&
|
|
shardForeignConstraintCommandList,
|
|
&
|
|
referenceTableForeignConstraintList);
|
|
|
|
List *commandList = NIL;
|
|
commandList = list_concat(commandList, shardForeignConstraintCommandList);
|
|
commandList = list_concat(commandList, referenceTableForeignConstraintList);
|
|
|
|
SendCommandListToWorkerOutsideTransaction(
|
|
workerPlacementNode->workerName,
|
|
workerPlacementNode->workerPort,
|
|
TableOwner(shardInterval->relationId),
|
|
commandList);
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
/*
|
|
* DropShardList drops shards and their metadata from both the coordinator and
|
|
* mx nodes.
|
|
*/
|
|
void
|
|
DropShardList(List *shardIntervalList)
|
|
{
|
|
ListCell *shardIntervalCell = NULL;
|
|
|
|
foreach(shardIntervalCell, shardIntervalList)
|
|
{
|
|
ShardInterval *shardInterval = (ShardInterval *) lfirst(shardIntervalCell);
|
|
ListCell *shardPlacementCell = NULL;
|
|
Oid relationId = shardInterval->relationId;
|
|
uint64 oldShardId = shardInterval->shardId;
|
|
|
|
/* delete metadata from synced nodes */
|
|
if (ShouldSyncTableMetadata(relationId))
|
|
{
|
|
ListCell *commandCell = NULL;
|
|
|
|
/* send the commands one by one (calls citus_internal_delete_shard_metadata internally) */
|
|
List *shardMetadataDeleteCommandList = ShardDeleteCommandList(shardInterval);
|
|
foreach(commandCell, shardMetadataDeleteCommandList)
|
|
{
|
|
char *command = (char *) lfirst(commandCell);
|
|
SendCommandToWorkersWithMetadata(command);
|
|
}
|
|
}
|
|
|
|
/* delete shard placements and drop shards */
|
|
List *shardPlacementList = ActiveShardPlacementList(oldShardId);
|
|
foreach(shardPlacementCell, shardPlacementList)
|
|
{
|
|
ShardPlacement *placement = (ShardPlacement *) lfirst(shardPlacementCell);
|
|
char *workerName = placement->nodeName;
|
|
uint32 workerPort = placement->nodePort;
|
|
StringInfo dropQuery = makeStringInfo();
|
|
|
|
DeleteShardPlacementRow(placement->placementId);
|
|
|
|
/* get shard name */
|
|
char *qualifiedShardName = ConstructQualifiedShardName(shardInterval);
|
|
|
|
char storageType = shardInterval->storageType;
|
|
if (storageType == SHARD_STORAGE_TABLE)
|
|
{
|
|
appendStringInfo(dropQuery, DROP_REGULAR_TABLE_COMMAND,
|
|
qualifiedShardName);
|
|
}
|
|
else if (storageType == SHARD_STORAGE_FOREIGN)
|
|
{
|
|
appendStringInfo(dropQuery, DROP_FOREIGN_TABLE_COMMAND,
|
|
qualifiedShardName);
|
|
}
|
|
|
|
/* drop old shard */
|
|
SendCommandToWorker(workerName, workerPort, dropQuery->data);
|
|
}
|
|
|
|
/* delete shard row */
|
|
DeleteShardRow(oldShardId);
|
|
}
|
|
}
|