From 2cc7f62de9a34feb6f08c9325b04d952c2f97679 Mon Sep 17 00:00:00 2001 From: Nils Dijk Date: Tue, 21 Nov 2023 14:01:07 +0000 Subject: [PATCH] Mostly working splits, concurrent distribute --- .../commands/create_distributed_table.c | 33 ++++++- .../distributed/metadata/metadata_sync.c | 4 +- .../citus_split_shard_by_split_points.c | 1 + .../distributed/operations/isolate_shards.c | 1 + .../distributed/operations/shard_split.c | 92 +++++++++++++++---- src/include/distributed/shard_split.h | 1 + 6 files changed, 111 insertions(+), 21 deletions(-) diff --git a/src/backend/distributed/commands/create_distributed_table.c b/src/backend/distributed/commands/create_distributed_table.c index a3d5dcce2..c3c320ef0 100644 --- a/src/backend/distributed/commands/create_distributed_table.c +++ b/src/backend/distributed/commands/create_distributed_table.c @@ -135,6 +135,7 @@ static char DecideDistTableReplicationModel(char distributionMethod, static List * HashSplitPointsForShardList(List *shardList); static List * HashSplitPointsForShardCount(int shardCount); static List * WorkerNodesForShardList(List *shardList); +static List * ShardgroupIdsForShardList(List *shardList); static List * RoundRobinWorkerNodeList(List *workerNodeList, int listLength); static CitusTableParams DecideCitusTableParams(CitusTableType tableType, DistributedTableParams * @@ -581,8 +582,9 @@ CreateDistributedTableConcurrently(Oid relationId, char *distributionColumnName, errhint("Add more worker nodes."))); } - List *workersForPlacementList; - List *shardSplitPointsList; + List *workersForPlacementList = NIL; + List *shardSplitPointsList = NIL; + List *shardgroupIdsList = NIL; if (colocatedTableId != InvalidOid) { @@ -597,6 +599,11 @@ CreateDistributedTableConcurrently(Oid relationId, char *distributionColumnName, * Find the node IDs of the shard placements. */ workersForPlacementList = WorkerNodesForShardList(colocatedShardList); + + /* + * Find the shardgroupIds of the shard placements. + */ + shardgroupIdsList = ShardgroupIdsForShardList(colocatedShardList); } else { @@ -609,6 +616,13 @@ CreateDistributedTableConcurrently(Oid relationId, char *distributionColumnName, * Place shards in a round-robin fashion across all data nodes. */ workersForPlacementList = RoundRobinWorkerNodeList(workerNodeList, shardCount); + + + /* + * We don't have any opnion on the shardgroupIds of the new shard placements, let + * SplitShard create them for us. + */ + shardgroupIdsList = NIL; } /* @@ -647,6 +661,7 @@ CreateDistributedTableConcurrently(Oid relationId, char *distributionColumnName, shardToSplit->shardId, shardSplitPointsList, workersForPlacementList, + shardgroupIdsList, distributionColumnOverrides, sourceColocatedShardIntervalList, colocationId @@ -899,6 +914,20 @@ WorkerNodesForShardList(List *shardList) } +static List * +ShardgroupIdsForShardList(List *shardList) +{ + List *shardgroupIdList = NIL; + + ShardInterval *shardInterval = NULL; + foreach_ptr(shardInterval, shardList) + { + shardgroupIdList = lappend(shardgroupIdList, &(shardInterval->shardgroupId)); + } + return shardgroupIdList; +} + + /* * RoundRobinWorkerNodeList round robins over the workers in the worker node list * and adds node ids to a list of length listLength. diff --git a/src/backend/distributed/metadata/metadata_sync.c b/src/backend/distributed/metadata/metadata_sync.c index 585781c78..13c23953e 100644 --- a/src/backend/distributed/metadata/metadata_sync.c +++ b/src/backend/distributed/metadata/metadata_sync.c @@ -3458,7 +3458,7 @@ citus_internal_add_shardgroup_metadata(PG_FUNCTION_ARGS) if (!ShouldSkipMetadataChecks()) { /* this UDF is not allowed allowed for executing as a separate command */ - EnsureCoordinatorInitiatedOperation(); + EnsureCitusInitiatedOperation(); } InsertShardgroupRow(shardgroupId, colocationId); @@ -3479,7 +3479,7 @@ citus_internal_delete_shardgroup_metadata(PG_FUNCTION_ARGS) if (!ShouldSkipMetadataChecks()) { /* this UDF is not allowed allowed for executing as a separate command */ - EnsureCoordinatorInitiatedOperation(); + EnsureCitusInitiatedOperation(); } DeleteShardgroupRow(shardgroupId); diff --git a/src/backend/distributed/operations/citus_split_shard_by_split_points.c b/src/backend/distributed/operations/citus_split_shard_by_split_points.c index 076e58d4c..ac4458479 100644 --- a/src/backend/distributed/operations/citus_split_shard_by_split_points.c +++ b/src/backend/distributed/operations/citus_split_shard_by_split_points.c @@ -63,6 +63,7 @@ citus_split_shard_by_split_points(PG_FUNCTION_ARGS) shardIdToSplit, shardSplitPointsList, nodeIdsForPlacementList, + NIL, distributionColumnOverrides, sourceColocatedShardIntervalList, INVALID_COLOCATION_ID); diff --git a/src/backend/distributed/operations/isolate_shards.c b/src/backend/distributed/operations/isolate_shards.c index 502b00f5b..6846a1662 100644 --- a/src/backend/distributed/operations/isolate_shards.c +++ b/src/backend/distributed/operations/isolate_shards.c @@ -176,6 +176,7 @@ isolate_tenant_to_new_shard(PG_FUNCTION_ARGS) sourceShard->shardId, shardSplitPointsList, nodeIdsForPlacementList, + NIL, distributionColumnOverrides, sourceColocatedShardIntervalList, INVALID_COLOCATION_ID); diff --git a/src/backend/distributed/operations/shard_split.c b/src/backend/distributed/operations/shard_split.c index b40b6cf39..a58d8a905 100644 --- a/src/backend/distributed/operations/shard_split.c +++ b/src/backend/distributed/operations/shard_split.c @@ -94,8 +94,9 @@ static void CreateAuxiliaryStructuresForShardGroup(List *shardGroupSplitInterval static void CreateReplicaIdentitiesForDummyShards(HTAB *mapOfPlacementToDummyShardList); static void CreateObjectOnPlacement(List *objectCreationCommandList, WorkerNode *workerNode); -static List * CreateSplitIntervalsForShardGroup(List *sourceColocatedShardList, - List *splitPointsForShard); +static List * CreateSplitIntervalsForShardGroup(List *sourceColocatedShardList, + List *splitPointsForShard, + List *shardgroupIdsList); static void CreateSplitIntervalsForShard(ShardInterval *sourceShard, List *splitPointsForShard, List **shardSplitChildrenIntervalList); @@ -104,12 +105,14 @@ static void BlockingShardSplit(SplitOperation splitOperation, List *sourceColocatedShardIntervalList, List *shardSplitPointsList, List *workersForPlacementList, + List *shardgroupIdsList, DistributionColumnMap *distributionColumnOverrides); static void NonBlockingShardSplit(SplitOperation splitOperation, uint64 splitWorkflowId, List *sourceColocatedShardIntervalList, List *shardSplitPointsList, List *workersForPlacementList, + List *shardgroupIdsList, DistributionColumnMap *distributionColumnOverrides, uint32 targetColocationId); static void DoSplitCopy(WorkerNode *sourceShardNode, @@ -129,7 +132,7 @@ static void UpdateDistributionColumnsForShardGroup(List *colocatedShardList, char distributionMethod, int shardCount, uint32 colocationId); -static void PopulateShardgroupIds(List *shardGroupSplitIntervalListList); +static void PopulateShardgroupIdsWithNewIds(List *shardGroupSplitIntervalListList); static void InsertSplitChildrenShardgroupMetadata(List *shardGroupSplitIntervalListList); static void InsertSplitChildrenShardMetadata(List *shardGroupSplitIntervalListList, List *workersForPlacementList); @@ -430,6 +433,9 @@ GetWorkerNodesFromWorkerIds(List *nodeIdsForPlacementList) * 'shardInterval' : Source shard interval to be split. * 'shardSplitPointsList' : Split Points list for the source 'shardInterval'. * 'nodeIdsForPlacementList' : Placement list corresponding to split children. + * 'shardgroupIdsList' : When colocating a table this list contains pointers to + * the shardgroupIds of the colocated shards in the same + * order as the nodes in nodeIdsForPlacementList. * 'distributionColumnOverrides': Maps relation IDs to distribution columns. * If not specified, the distribution column is read * from the metadata. @@ -444,6 +450,7 @@ SplitShard(SplitMode splitMode, uint64 shardIdToSplit, List *shardSplitPointsList, List *nodeIdsForPlacementList, + List *shardgroupIdsList, DistributionColumnMap *distributionColumnOverrides, List *colocatedShardIntervalList, uint32 targetColocationId) @@ -517,6 +524,7 @@ SplitShard(SplitMode splitMode, sourceColocatedShardIntervalList, shardSplitPointsList, workersForPlacementList, + shardgroupIdsList, distributionColumnOverrides); } else @@ -529,6 +537,7 @@ SplitShard(SplitMode splitMode, sourceColocatedShardIntervalList, shardSplitPointsList, workersForPlacementList, + shardgroupIdsList, distributionColumnOverrides, targetColocationId); @@ -550,6 +559,7 @@ SplitShard(SplitMode splitMode, * sourceColocatedShardIntervalList : Source shard group to be split. * shardSplitPointsList : Split Points list for the source 'shardInterval'. * workersForPlacementList : Placement list corresponding to split children. + * shardgroupIdsList : TODO */ static void BlockingShardSplit(SplitOperation splitOperation, @@ -557,6 +567,7 @@ BlockingShardSplit(SplitOperation splitOperation, List *sourceColocatedShardIntervalList, List *shardSplitPointsList, List *workersForPlacementList, + List *shardgroupIdsList, DistributionColumnMap *distributionColumnOverrides) { const char *operationName = SplitOperationAPIName[splitOperation]; @@ -566,7 +577,8 @@ BlockingShardSplit(SplitOperation splitOperation, /* First create shard interval metadata for split children */ List *shardGroupSplitIntervalListList = CreateSplitIntervalsForShardGroup( sourceColocatedShardIntervalList, - shardSplitPointsList); + shardSplitPointsList, + shardgroupIdsList); /* Only single placement allowed (already validated RelationReplicationFactor = 1) */ ShardInterval *firstShard = linitial(sourceColocatedShardIntervalList); @@ -621,7 +633,7 @@ BlockingShardSplit(SplitOperation splitOperation, DropShardListMetadata(sourceColocatedShardIntervalList); /* allocate and assign new shardgroups to newly created shardIntervals */ - PopulateShardgroupIds(shardGroupSplitIntervalListList); + PopulateShardgroupIdsWithNewIds(shardGroupSplitIntervalListList); InsertSplitChildrenShardgroupMetadata(shardGroupSplitIntervalListList); @@ -1031,10 +1043,12 @@ CreateObjectOnPlacement(List *objectCreationCommandList, * [ S1_1(-2147483648, 0), S1_2(1, 2147483647) ], // Split Interval List for S1. * [ S2_1(-2147483648, 0), S2_2(1, 2147483647) ] // Split Interval List for S2. * ] + * 'shardgroupIdsList': TODO */ static List * CreateSplitIntervalsForShardGroup(List *sourceColocatedShardIntervalList, - List *splitPointsForShard) + List *splitPointsForShard, + List *shardgroupIdsList) { List *shardGroupSplitIntervalListList = NIL; @@ -1045,6 +1059,28 @@ CreateSplitIntervalsForShardGroup(List *sourceColocatedShardIntervalList, CreateSplitIntervalsForShard(shardToSplitInterval, splitPointsForShard, &shardSplitIntervalList); + int shardcount = list_length(shardSplitIntervalList); + if (list_length(shardgroupIdsList) > 0) + { + Assert(list_length(shardgroupIdsList) == shardcount); + for (int i = 0; i < shardcount; i++) + { + ShardInterval *shardInterval = list_nth(shardSplitIntervalList, i); + shardInterval->shardgroupId = *((ShardgroupID *) list_nth( + shardgroupIdsList, i)); + } + } + else + { + for (int i = 0; i < shardcount; i++) + { + ShardInterval *shardInterval = list_nth(shardSplitIntervalList, i); + shardInterval->shardgroupId = GetNextColocationId(); + shardgroupIdsList = lappend(shardgroupIdsList, + (void *) &shardInterval->shardgroupId); + } + } + shardGroupSplitIntervalListList = lappend(shardGroupSplitIntervalListList, shardSplitIntervalList); } @@ -1083,8 +1119,9 @@ CreateSplitIntervalsForShard(ShardInterval *sourceShard, for (int index = 0; index < shardIntervalCount; index++) { ShardInterval *splitChildShardInterval = CopyShardInterval(sourceShard); - /* - * This will get populated later when we have a list of all new colocated + + /* + * This will get populated later when we have a list of all new colocated * shardIntervals as to make sure that all colocated shardIntervals get the same * shardgroupId. */ @@ -1166,7 +1203,7 @@ UpdateDistributionColumnsForShardGroup(List *colocatedShardList, static void -PopulateShardgroupIds(List *shardGroupSplitIntervalListList) +PopulateShardgroupIdsWithNewIds(List *shardGroupSplitIntervalListList) { List *firstShardIntervals = NULL; List *currentShardIntervals = NULL; @@ -1188,7 +1225,8 @@ PopulateShardgroupIds(List *shardGroupSplitIntervalListList) /* on subsequent loops we assign the same shardgroupId for colocation */ ShardInterval *firstShardInterval = NULL; ShardInterval *currentShardInterval = NULL; - forboth_ptr(firstShardInterval, firstShardIntervals, currentShardInterval, currentShardIntervals) + forboth_ptr(firstShardInterval, firstShardIntervals, currentShardInterval, + currentShardIntervals) { currentShardInterval->shardgroupId = firstShardInterval->shardgroupId; } @@ -1221,10 +1259,11 @@ InsertSplitChildrenShardgroupMetadata(List *shardGroupSplitIntervalListList) foreach_ptr(shardInterval, shardGroupSplitIntervalList) { InsertShardgroupRow(shardInterval->shardgroupId, colocationId); - } + } /* send commands to synced nodes one by one */ - List *splitOffShardMetadataCommandList = ShardgroupListInsertCommand(colocationId, shardGroupSplitIntervalList); + List *splitOffShardMetadataCommandList = ShardgroupListInsertCommand(colocationId, + shardGroupSplitIntervalList); char *command = NULL; foreach_ptr(command, splitOffShardMetadataCommandList) { @@ -1387,9 +1426,21 @@ DropShardgroupListMetadata(List *shardIntervalList) ShardInterval *shardInterval = NULL; foreach_ptr(shardInterval, shardIntervalList) { + /* + * When distributing a table concurrently we don't have its shards assigned a + * shardgroup. These don't have to be deleted locally, nor remotely. The latter is + * achieved because don't add the shardinterval to syncedShardIntervalList as we + * preemptively end the current iteration of the loop. + */ + if (!IsShardgroupIDValid(shardInterval->shardgroupId)) + { + continue; + } + DeleteShardgroupRow(shardInterval->shardgroupId); Oid relationId = shardInterval->relationId; + /* delete metadata from synced nodes */ if (ShouldSyncTableMetadata(relationId)) { @@ -1487,6 +1538,7 @@ AcquireNonblockingSplitLock(Oid relationId) * sourceColocatedShardIntervalList : Source shard group to be split. * shardSplitPointsList : Split Points list for the source 'shardInterval'. * workersForPlacementList : Placement list corresponding to split children. + * shardgroupIdsList : TODO * distributionColumnList : Maps relation IDs to distribution columns. * If not specified, the distribution column is read * from the metadata. @@ -1499,6 +1551,7 @@ NonBlockingShardSplit(SplitOperation splitOperation, List *sourceColocatedShardIntervalList, List *shardSplitPointsList, List *workersForPlacementList, + List *shardgroupIdsList, DistributionColumnMap *distributionColumnOverrides, uint32 targetColocationId) { @@ -1512,7 +1565,8 @@ NonBlockingShardSplit(SplitOperation splitOperation, /* First create shard interval metadata for split children */ List *shardGroupSplitIntervalListList = CreateSplitIntervalsForShardGroup( sourceColocatedShardIntervalList, - shardSplitPointsList); + shardSplitPointsList, + shardgroupIdsList); ShardInterval *firstShard = linitial(sourceColocatedShardIntervalList); @@ -1715,10 +1769,14 @@ NonBlockingShardSplit(SplitOperation splitOperation, targetColocationId); } - /* allocate and assign new shardgroups to newly created shardIntervals */ - PopulateShardgroupIds(shardGroupSplitIntervalListList); - - InsertSplitChildrenShardgroupMetadata(shardGroupSplitIntervalListList); + if (list_length(shardgroupIdsList) == 0) + { + /* + * we have generated shardgroupIds during the shadinterval creation, add them to + * metadata + */ + InsertSplitChildrenShardgroupMetadata(shardGroupSplitIntervalListList); + } /* 12) Insert new shard and placement metdata */ InsertSplitChildrenShardMetadata(shardGroupSplitIntervalListList, diff --git a/src/include/distributed/shard_split.h b/src/include/distributed/shard_split.h index 5bd0c7686..546df8d0b 100644 --- a/src/include/distributed/shard_split.h +++ b/src/include/distributed/shard_split.h @@ -43,6 +43,7 @@ extern void SplitShard(SplitMode splitMode, uint64 shardIdToSplit, List *shardSplitPointsList, List *nodeIdsForPlacementList, + List *shardgoudIdsList, DistributionColumnMap *distributionColumnOverrides, List *colocatedShardIntervalList, uint32 targetColocationId);