Mostly working splits, concurrent distribute

feature/shardgroup
Nils Dijk 2023-11-21 14:01:07 +00:00
parent d30e252c8e
commit 2cc7f62de9
No known key found for this signature in database
GPG Key ID: CA1177EF9434F241
6 changed files with 111 additions and 21 deletions

View File

@ -135,6 +135,7 @@ static char DecideDistTableReplicationModel(char distributionMethod,
static List * HashSplitPointsForShardList(List *shardList);
static List * HashSplitPointsForShardCount(int shardCount);
static List * WorkerNodesForShardList(List *shardList);
static List * ShardgroupIdsForShardList(List *shardList);
static List * RoundRobinWorkerNodeList(List *workerNodeList, int listLength);
static CitusTableParams DecideCitusTableParams(CitusTableType tableType,
DistributedTableParams *
@ -581,8 +582,9 @@ CreateDistributedTableConcurrently(Oid relationId, char *distributionColumnName,
errhint("Add more worker nodes.")));
}
List *workersForPlacementList;
List *shardSplitPointsList;
List *workersForPlacementList = NIL;
List *shardSplitPointsList = NIL;
List *shardgroupIdsList = NIL;
if (colocatedTableId != InvalidOid)
{
@ -597,6 +599,11 @@ CreateDistributedTableConcurrently(Oid relationId, char *distributionColumnName,
* Find the node IDs of the shard placements.
*/
workersForPlacementList = WorkerNodesForShardList(colocatedShardList);
/*
* Find the shardgroupIds of the shard placements.
*/
shardgroupIdsList = ShardgroupIdsForShardList(colocatedShardList);
}
else
{
@ -609,6 +616,13 @@ CreateDistributedTableConcurrently(Oid relationId, char *distributionColumnName,
* Place shards in a round-robin fashion across all data nodes.
*/
workersForPlacementList = RoundRobinWorkerNodeList(workerNodeList, shardCount);
/*
* We don't have any opnion on the shardgroupIds of the new shard placements, let
* SplitShard create them for us.
*/
shardgroupIdsList = NIL;
}
/*
@ -647,6 +661,7 @@ CreateDistributedTableConcurrently(Oid relationId, char *distributionColumnName,
shardToSplit->shardId,
shardSplitPointsList,
workersForPlacementList,
shardgroupIdsList,
distributionColumnOverrides,
sourceColocatedShardIntervalList,
colocationId
@ -899,6 +914,20 @@ WorkerNodesForShardList(List *shardList)
}
static List *
ShardgroupIdsForShardList(List *shardList)
{
List *shardgroupIdList = NIL;
ShardInterval *shardInterval = NULL;
foreach_ptr(shardInterval, shardList)
{
shardgroupIdList = lappend(shardgroupIdList, &(shardInterval->shardgroupId));
}
return shardgroupIdList;
}
/*
* RoundRobinWorkerNodeList round robins over the workers in the worker node list
* and adds node ids to a list of length listLength.

View File

@ -3458,7 +3458,7 @@ citus_internal_add_shardgroup_metadata(PG_FUNCTION_ARGS)
if (!ShouldSkipMetadataChecks())
{
/* this UDF is not allowed allowed for executing as a separate command */
EnsureCoordinatorInitiatedOperation();
EnsureCitusInitiatedOperation();
}
InsertShardgroupRow(shardgroupId, colocationId);
@ -3479,7 +3479,7 @@ citus_internal_delete_shardgroup_metadata(PG_FUNCTION_ARGS)
if (!ShouldSkipMetadataChecks())
{
/* this UDF is not allowed allowed for executing as a separate command */
EnsureCoordinatorInitiatedOperation();
EnsureCitusInitiatedOperation();
}
DeleteShardgroupRow(shardgroupId);

View File

@ -63,6 +63,7 @@ citus_split_shard_by_split_points(PG_FUNCTION_ARGS)
shardIdToSplit,
shardSplitPointsList,
nodeIdsForPlacementList,
NIL,
distributionColumnOverrides,
sourceColocatedShardIntervalList,
INVALID_COLOCATION_ID);

View File

@ -176,6 +176,7 @@ isolate_tenant_to_new_shard(PG_FUNCTION_ARGS)
sourceShard->shardId,
shardSplitPointsList,
nodeIdsForPlacementList,
NIL,
distributionColumnOverrides,
sourceColocatedShardIntervalList,
INVALID_COLOCATION_ID);

View File

@ -94,8 +94,9 @@ static void CreateAuxiliaryStructuresForShardGroup(List *shardGroupSplitInterval
static void CreateReplicaIdentitiesForDummyShards(HTAB *mapOfPlacementToDummyShardList);
static void CreateObjectOnPlacement(List *objectCreationCommandList,
WorkerNode *workerNode);
static List * CreateSplitIntervalsForShardGroup(List *sourceColocatedShardList,
List *splitPointsForShard);
static List * CreateSplitIntervalsForShardGroup(List *sourceColocatedShardList,
List *splitPointsForShard,
List *shardgroupIdsList);
static void CreateSplitIntervalsForShard(ShardInterval *sourceShard,
List *splitPointsForShard,
List **shardSplitChildrenIntervalList);
@ -104,12 +105,14 @@ static void BlockingShardSplit(SplitOperation splitOperation,
List *sourceColocatedShardIntervalList,
List *shardSplitPointsList,
List *workersForPlacementList,
List *shardgroupIdsList,
DistributionColumnMap *distributionColumnOverrides);
static void NonBlockingShardSplit(SplitOperation splitOperation,
uint64 splitWorkflowId,
List *sourceColocatedShardIntervalList,
List *shardSplitPointsList,
List *workersForPlacementList,
List *shardgroupIdsList,
DistributionColumnMap *distributionColumnOverrides,
uint32 targetColocationId);
static void DoSplitCopy(WorkerNode *sourceShardNode,
@ -129,7 +132,7 @@ static void UpdateDistributionColumnsForShardGroup(List *colocatedShardList,
char distributionMethod,
int shardCount,
uint32 colocationId);
static void PopulateShardgroupIds(List *shardGroupSplitIntervalListList);
static void PopulateShardgroupIdsWithNewIds(List *shardGroupSplitIntervalListList);
static void InsertSplitChildrenShardgroupMetadata(List *shardGroupSplitIntervalListList);
static void InsertSplitChildrenShardMetadata(List *shardGroupSplitIntervalListList,
List *workersForPlacementList);
@ -430,6 +433,9 @@ GetWorkerNodesFromWorkerIds(List *nodeIdsForPlacementList)
* 'shardInterval' : Source shard interval to be split.
* 'shardSplitPointsList' : Split Points list for the source 'shardInterval'.
* 'nodeIdsForPlacementList' : Placement list corresponding to split children.
* 'shardgroupIdsList' : When colocating a table this list contains pointers to
* the shardgroupIds of the colocated shards in the same
* order as the nodes in nodeIdsForPlacementList.
* 'distributionColumnOverrides': Maps relation IDs to distribution columns.
* If not specified, the distribution column is read
* from the metadata.
@ -444,6 +450,7 @@ SplitShard(SplitMode splitMode,
uint64 shardIdToSplit,
List *shardSplitPointsList,
List *nodeIdsForPlacementList,
List *shardgroupIdsList,
DistributionColumnMap *distributionColumnOverrides,
List *colocatedShardIntervalList,
uint32 targetColocationId)
@ -517,6 +524,7 @@ SplitShard(SplitMode splitMode,
sourceColocatedShardIntervalList,
shardSplitPointsList,
workersForPlacementList,
shardgroupIdsList,
distributionColumnOverrides);
}
else
@ -529,6 +537,7 @@ SplitShard(SplitMode splitMode,
sourceColocatedShardIntervalList,
shardSplitPointsList,
workersForPlacementList,
shardgroupIdsList,
distributionColumnOverrides,
targetColocationId);
@ -550,6 +559,7 @@ SplitShard(SplitMode splitMode,
* sourceColocatedShardIntervalList : Source shard group to be split.
* shardSplitPointsList : Split Points list for the source 'shardInterval'.
* workersForPlacementList : Placement list corresponding to split children.
* shardgroupIdsList : TODO
*/
static void
BlockingShardSplit(SplitOperation splitOperation,
@ -557,6 +567,7 @@ BlockingShardSplit(SplitOperation splitOperation,
List *sourceColocatedShardIntervalList,
List *shardSplitPointsList,
List *workersForPlacementList,
List *shardgroupIdsList,
DistributionColumnMap *distributionColumnOverrides)
{
const char *operationName = SplitOperationAPIName[splitOperation];
@ -566,7 +577,8 @@ BlockingShardSplit(SplitOperation splitOperation,
/* First create shard interval metadata for split children */
List *shardGroupSplitIntervalListList = CreateSplitIntervalsForShardGroup(
sourceColocatedShardIntervalList,
shardSplitPointsList);
shardSplitPointsList,
shardgroupIdsList);
/* Only single placement allowed (already validated RelationReplicationFactor = 1) */
ShardInterval *firstShard = linitial(sourceColocatedShardIntervalList);
@ -621,7 +633,7 @@ BlockingShardSplit(SplitOperation splitOperation,
DropShardListMetadata(sourceColocatedShardIntervalList);
/* allocate and assign new shardgroups to newly created shardIntervals */
PopulateShardgroupIds(shardGroupSplitIntervalListList);
PopulateShardgroupIdsWithNewIds(shardGroupSplitIntervalListList);
InsertSplitChildrenShardgroupMetadata(shardGroupSplitIntervalListList);
@ -1031,10 +1043,12 @@ CreateObjectOnPlacement(List *objectCreationCommandList,
* [ S1_1(-2147483648, 0), S1_2(1, 2147483647) ], // Split Interval List for S1.
* [ S2_1(-2147483648, 0), S2_2(1, 2147483647) ] // Split Interval List for S2.
* ]
* 'shardgroupIdsList': TODO
*/
static List *
CreateSplitIntervalsForShardGroup(List *sourceColocatedShardIntervalList,
List *splitPointsForShard)
List *splitPointsForShard,
List *shardgroupIdsList)
{
List *shardGroupSplitIntervalListList = NIL;
@ -1045,6 +1059,28 @@ CreateSplitIntervalsForShardGroup(List *sourceColocatedShardIntervalList,
CreateSplitIntervalsForShard(shardToSplitInterval, splitPointsForShard,
&shardSplitIntervalList);
int shardcount = list_length(shardSplitIntervalList);
if (list_length(shardgroupIdsList) > 0)
{
Assert(list_length(shardgroupIdsList) == shardcount);
for (int i = 0; i < shardcount; i++)
{
ShardInterval *shardInterval = list_nth(shardSplitIntervalList, i);
shardInterval->shardgroupId = *((ShardgroupID *) list_nth(
shardgroupIdsList, i));
}
}
else
{
for (int i = 0; i < shardcount; i++)
{
ShardInterval *shardInterval = list_nth(shardSplitIntervalList, i);
shardInterval->shardgroupId = GetNextColocationId();
shardgroupIdsList = lappend(shardgroupIdsList,
(void *) &shardInterval->shardgroupId);
}
}
shardGroupSplitIntervalListList = lappend(shardGroupSplitIntervalListList,
shardSplitIntervalList);
}
@ -1083,8 +1119,9 @@ CreateSplitIntervalsForShard(ShardInterval *sourceShard,
for (int index = 0; index < shardIntervalCount; index++)
{
ShardInterval *splitChildShardInterval = CopyShardInterval(sourceShard);
/*
* This will get populated later when we have a list of all new colocated
/*
* This will get populated later when we have a list of all new colocated
* shardIntervals as to make sure that all colocated shardIntervals get the same
* shardgroupId.
*/
@ -1166,7 +1203,7 @@ UpdateDistributionColumnsForShardGroup(List *colocatedShardList,
static void
PopulateShardgroupIds(List *shardGroupSplitIntervalListList)
PopulateShardgroupIdsWithNewIds(List *shardGroupSplitIntervalListList)
{
List *firstShardIntervals = NULL;
List *currentShardIntervals = NULL;
@ -1188,7 +1225,8 @@ PopulateShardgroupIds(List *shardGroupSplitIntervalListList)
/* on subsequent loops we assign the same shardgroupId for colocation */
ShardInterval *firstShardInterval = NULL;
ShardInterval *currentShardInterval = NULL;
forboth_ptr(firstShardInterval, firstShardIntervals, currentShardInterval, currentShardIntervals)
forboth_ptr(firstShardInterval, firstShardIntervals, currentShardInterval,
currentShardIntervals)
{
currentShardInterval->shardgroupId = firstShardInterval->shardgroupId;
}
@ -1221,10 +1259,11 @@ InsertSplitChildrenShardgroupMetadata(List *shardGroupSplitIntervalListList)
foreach_ptr(shardInterval, shardGroupSplitIntervalList)
{
InsertShardgroupRow(shardInterval->shardgroupId, colocationId);
}
}
/* send commands to synced nodes one by one */
List *splitOffShardMetadataCommandList = ShardgroupListInsertCommand(colocationId, shardGroupSplitIntervalList);
List *splitOffShardMetadataCommandList = ShardgroupListInsertCommand(colocationId,
shardGroupSplitIntervalList);
char *command = NULL;
foreach_ptr(command, splitOffShardMetadataCommandList)
{
@ -1387,9 +1426,21 @@ DropShardgroupListMetadata(List *shardIntervalList)
ShardInterval *shardInterval = NULL;
foreach_ptr(shardInterval, shardIntervalList)
{
/*
* When distributing a table concurrently we don't have its shards assigned a
* shardgroup. These don't have to be deleted locally, nor remotely. The latter is
* achieved because don't add the shardinterval to syncedShardIntervalList as we
* preemptively end the current iteration of the loop.
*/
if (!IsShardgroupIDValid(shardInterval->shardgroupId))
{
continue;
}
DeleteShardgroupRow(shardInterval->shardgroupId);
Oid relationId = shardInterval->relationId;
/* delete metadata from synced nodes */
if (ShouldSyncTableMetadata(relationId))
{
@ -1487,6 +1538,7 @@ AcquireNonblockingSplitLock(Oid relationId)
* sourceColocatedShardIntervalList : Source shard group to be split.
* shardSplitPointsList : Split Points list for the source 'shardInterval'.
* workersForPlacementList : Placement list corresponding to split children.
* shardgroupIdsList : TODO
* distributionColumnList : Maps relation IDs to distribution columns.
* If not specified, the distribution column is read
* from the metadata.
@ -1499,6 +1551,7 @@ NonBlockingShardSplit(SplitOperation splitOperation,
List *sourceColocatedShardIntervalList,
List *shardSplitPointsList,
List *workersForPlacementList,
List *shardgroupIdsList,
DistributionColumnMap *distributionColumnOverrides,
uint32 targetColocationId)
{
@ -1512,7 +1565,8 @@ NonBlockingShardSplit(SplitOperation splitOperation,
/* First create shard interval metadata for split children */
List *shardGroupSplitIntervalListList = CreateSplitIntervalsForShardGroup(
sourceColocatedShardIntervalList,
shardSplitPointsList);
shardSplitPointsList,
shardgroupIdsList);
ShardInterval *firstShard = linitial(sourceColocatedShardIntervalList);
@ -1715,10 +1769,14 @@ NonBlockingShardSplit(SplitOperation splitOperation,
targetColocationId);
}
/* allocate and assign new shardgroups to newly created shardIntervals */
PopulateShardgroupIds(shardGroupSplitIntervalListList);
InsertSplitChildrenShardgroupMetadata(shardGroupSplitIntervalListList);
if (list_length(shardgroupIdsList) == 0)
{
/*
* we have generated shardgroupIds during the shadinterval creation, add them to
* metadata
*/
InsertSplitChildrenShardgroupMetadata(shardGroupSplitIntervalListList);
}
/* 12) Insert new shard and placement metdata */
InsertSplitChildrenShardMetadata(shardGroupSplitIntervalListList,

View File

@ -43,6 +43,7 @@ extern void SplitShard(SplitMode splitMode,
uint64 shardIdToSplit,
List *shardSplitPointsList,
List *nodeIdsForPlacementList,
List *shardgoudIdsList,
DistributionColumnMap *distributionColumnOverrides,
List *colocatedShardIntervalList,
uint32 targetColocationId);