From 686ce21e8034fc8691369fec5907489f6f77422f Mon Sep 17 00:00:00 2001 From: Nitish Upreti Date: Thu, 23 Jun 2022 18:35:34 -0700 Subject: [PATCH] Blocking Split workflow works --- .../distributed/operations/isolate_shards.c | 146 +++++++++++++ .../distributed/operations/shard_split.c | 198 +++++++----------- src/include/distributed/shard_split.h | 3 - 3 files changed, 223 insertions(+), 124 deletions(-) diff --git a/src/backend/distributed/operations/isolate_shards.c b/src/backend/distributed/operations/isolate_shards.c index 52ef9e9f1..5e4681fc5 100644 --- a/src/backend/distributed/operations/isolate_shards.c +++ b/src/backend/distributed/operations/isolate_shards.c @@ -57,6 +57,10 @@ static ShardInterval * CreateSplitOffShardFromTemplate(ShardInterval *shardTempl Oid relationId); static List * SplitOffCommandList(ShardInterval *sourceShard, ShardInterval *splitOffShard); +static void InsertSplitOffShardMetadata(List *splitOffShardList, + List *sourcePlacementList); +static void CreateForeignConstraints(List *splitOffShardList, List *sourcePlacementList); +static void ExecuteCommandListOnWorker(char *nodeName, int nodePort, List *commandList); /* * isolate_tenant_to_new_shard isolates a tenant to its own shard by spliting @@ -277,6 +281,94 @@ SplitShardByValue(ShardInterval *sourceShard, Datum distributionValueDatum) } +/* + * CreateForeignConstraints creates the foreign constraints on the newly + * created shards via the tenant isolation. + * + * The function treats foreign keys to reference tables and foreign keys to + * co-located distributed tables differently. The former one needs to be + * executed over a single connection to prevent self-deadlocks. The latter + * one can be executed in parallel if there are multiple replicas. + */ +static void +CreateForeignConstraints(List *splitOffShardList, List *sourcePlacementList) +{ + ListCell *splitOffShardCell = NULL; + + List *colocatedShardForeignConstraintCommandList = NIL; + List *referenceTableForeignConstraintList = NIL; + + foreach(splitOffShardCell, splitOffShardList) + { + ShardInterval *splitOffShard = (ShardInterval *) lfirst(splitOffShardCell); + + List *currentColocatedForeignKeyList = NIL; + List *currentReferenceForeignKeyList = NIL; + + CopyShardForeignConstraintCommandListGrouped(splitOffShard, + ¤tColocatedForeignKeyList, + ¤tReferenceForeignKeyList); + + colocatedShardForeignConstraintCommandList = + list_concat(colocatedShardForeignConstraintCommandList, + currentColocatedForeignKeyList); + referenceTableForeignConstraintList = + list_concat(referenceTableForeignConstraintList, + currentReferenceForeignKeyList); + } + + /* + * We can use parallel connections to while creating co-located foreign keys + * if the source placement . + * However, foreign keys to reference tables need to be created using a single + * connection per worker to prevent self-deadlocks. + */ + if (colocatedShardForeignConstraintCommandList != NIL) + { + ExecuteCommandListOnPlacements(colocatedShardForeignConstraintCommandList, + sourcePlacementList); + } + + if (referenceTableForeignConstraintList != NIL) + { + ListCell *shardPlacementCell = NULL; + foreach(shardPlacementCell, sourcePlacementList) + { + ShardPlacement *shardPlacement = + (ShardPlacement *) lfirst(shardPlacementCell); + + char *nodeName = shardPlacement->nodeName; + int32 nodePort = shardPlacement->nodePort; + + /* + * We're using the connections that we've used for dropping the + * source placements within the same coordinated transaction. + */ + ExecuteCommandListOnWorker(nodeName, nodePort, + referenceTableForeignConstraintList); + } + } +} + + +/* + * ExecuteCommandListOnWorker executes the command on the given node within + * the coordinated 2PC. + */ +static void +ExecuteCommandListOnWorker(char *nodeName, int nodePort, List *commandList) +{ + ListCell *commandCell = NULL; + + foreach(commandCell, commandList) + { + char *command = (char *) lfirst(commandCell); + + SendCommandToWorker(nodeName, nodePort, command); + } +} + + /* * CreateSplitOffShards gets a shard and a hashed value to pick the split point. * First, it creates templates to create new shards. Then, for every colocated @@ -457,3 +549,57 @@ SplitOffCommandList(ShardInterval *sourceShard, ShardInterval *splitOffShard) return splitOffCommandList; } + + +/* + * InsertSplitOffShardMetadata inserts new shard and shard placement data into + * catolog tables both the coordinator and mx nodes. + */ +static void +InsertSplitOffShardMetadata(List *splitOffShardList, List *sourcePlacementList) +{ + List *syncedShardList = NIL; + ListCell *shardCell = NULL; + ListCell *commandCell = NULL; + + /* add new metadata */ + foreach(shardCell, splitOffShardList) + { + ShardInterval *splitOffShard = (ShardInterval *) lfirst(shardCell); + Oid relationId = splitOffShard->relationId; + uint64 shardId = splitOffShard->shardId; + char storageType = splitOffShard->storageType; + ListCell *shardPlacementCell = NULL; + + int32 shardMinValue = DatumGetInt32(splitOffShard->minValue); + int32 shardMaxValue = DatumGetInt32(splitOffShard->maxValue); + text *shardMinValueText = IntegerToText(shardMinValue); + text *shardMaxValueText = IntegerToText(shardMaxValue); + + InsertShardRow(relationId, shardId, storageType, shardMinValueText, + shardMaxValueText); + + /* split off shard placement metadata */ + foreach(shardPlacementCell, sourcePlacementList) + { + ShardPlacement *placement = (ShardPlacement *) lfirst(shardPlacementCell); + uint64 shardSize = 0; + + InsertShardPlacementRow(shardId, INVALID_PLACEMENT_ID, SHARD_STATE_ACTIVE, + shardSize, placement->groupId); + } + + if (ShouldSyncTableMetadata(relationId)) + { + syncedShardList = lappend(syncedShardList, splitOffShard); + } + } + + /* send commands to synced nodes one by one */ + List *splitOffShardMetadataCommandList = ShardListInsertCommand(syncedShardList); + foreach(commandCell, splitOffShardMetadataCommandList) + { + char *command = (char *) lfirst(commandCell); + SendCommandToWorkersWithMetadata(command); + } +} diff --git a/src/backend/distributed/operations/shard_split.c b/src/backend/distributed/operations/shard_split.c index 4484cdee5..7cd5d12af 100644 --- a/src/backend/distributed/operations/shard_split.c +++ b/src/backend/distributed/operations/shard_split.c @@ -41,8 +41,7 @@ static void ErrorIfCannotSplitShardExtended(SplitOperation splitOperation, static void CreateSplitShardsForShardGroup(WorkerNode *sourceShardNode, List *sourceColocatedShardIntervalList, List *shardGroupSplitIntervalListList, - List *workersForPlacementList, - List **splitOffShardList); + List *workersForPlacementList); static void CreateObjectOnPlacement(List *objectCreationCommandList, WorkerNode *workerNode); static List * CreateSplitIntervalsForShardGroup(List *sourceColocatedShardList, @@ -61,6 +60,8 @@ static void DoSplitCopy(WorkerNode *sourceShardNode, static StringInfo CreateSplitCopyCommand(ShardInterval *sourceShardSplitInterval, List *splitChildrenShardIntervalList, List *workersForPlacementList); +static void InsertSplitChildrenShardMetadata(List *shardGroupSplitIntervalListList, List *workersForPlacementList); +static void CreateForeignKeyConstraints(List *shardGroupSplitIntervalListList, List *workersForPlacementList); /* Customize error message strings based on operation type */ static const char *const SplitOperationName[] = @@ -413,13 +414,11 @@ BlockingShardSplit(SplitOperation splitOperation, WorkerNode *sourceShardToCopyNode = FindNodeWithNodeId(sourceShardPlacement->nodeId, false /* missingOk */); /* Physically create split children and perform split copy */ - List *splitOffShardList = NULL; CreateSplitShardsForShardGroup( sourceShardToCopyNode, sourceColocatedShardIntervalList, shardGroupSplitIntervalListList, - workersForPlacementList, - &splitOffShardList); + workersForPlacementList); /* * Drop old shards and delete related metadata. Have to do that before @@ -428,15 +427,15 @@ BlockingShardSplit(SplitOperation splitOperation, */ DropShardList(sourceColocatedShardIntervalList); - /* insert new metadata */ - InsertSplitOffShardMetadata(splitOffShardList, sourcePlacementList); + /* Insert new shard and placement metdata */ + InsertSplitChildrenShardMetadata(shardGroupSplitIntervalListList, workersForPlacementList); /* * Create foreign keys if exists after the metadata changes happening in - * DropShardList() and InsertSplitOffShardMetadata() because the foreign + * DropShardList() and InsertSplitChildrenShardMetadata() because the foreign * key creation depends on the new metadata. */ - CreateForeignConstraints(splitOffShardList, sourcePlacementList); + CreateForeignKeyConstraints(shardGroupSplitIntervalListList, workersForPlacementList); CitusInvalidateRelcacheByRelid(DistShardRelationId()); } @@ -447,8 +446,7 @@ static void CreateSplitShardsForShardGroup(WorkerNode *sourceShardNode, List *sourceColocatedShardIntervalList, List *shardGroupSplitIntervalListList, - List *workersForPlacementList, - List **splitOffShardList) + List *workersForPlacementList) { /* Iterate on shard intervals for shard group */ List *shardIntervalList = NULL; @@ -470,8 +468,6 @@ CreateSplitShardsForShardGroup(WorkerNode *sourceShardNode, /* Create new split child shard on the specified placement list */ CreateObjectOnPlacement(splitShardCreationCommandList, workerPlacementNode); - - (*splitOffShardList) = lappend(*splitOffShardList, shardInterval); } } @@ -479,7 +475,10 @@ CreateSplitShardsForShardGroup(WorkerNode *sourceShardNode, DoSplitCopy(sourceShardNode, sourceColocatedShardIntervalList, shardGroupSplitIntervalListList, workersForPlacementList); - /* TODO(niupre) : Can we use Adaptive execution for creating multiple indexes parallely? */ + /* + * Create Indexes post copy. + * TODO(niupre) : Can we use Adaptive execution for creating multiple indexes parallely + */ foreach_ptr(shardIntervalList, shardGroupSplitIntervalListList) { ShardInterval *shardInterval = NULL; @@ -677,59 +676,89 @@ CreateSplitIntervalsForShard(ShardInterval *sourceShard, /* - * InsertSplitOffShardMetadata inserts new shard and shard placement data into - * catolog tables both the coordinator and mx nodes. + * Insert new shard and placement metadata. */ -void -InsertSplitOffShardMetadata(List *splitOffShardList, List *sourcePlacementList) +static void +InsertSplitChildrenShardMetadata(List *shardGroupSplitIntervalListList, List *workersForPlacementList) { - List *syncedShardList = NIL; - ListCell *shardCell = NULL; - ListCell *commandCell = NULL; - - /* add new metadata */ - foreach(shardCell, splitOffShardList) + /* Iterate on shard intervals for shard group */ + List *shardIntervalList = NULL; + List *syncedShardList = NULL; + foreach_ptr(shardIntervalList, shardGroupSplitIntervalListList) { - ShardInterval *splitOffShard = (ShardInterval *) lfirst(shardCell); - Oid relationId = splitOffShard->relationId; - uint64 shardId = splitOffShard->shardId; - char storageType = splitOffShard->storageType; - ListCell *shardPlacementCell = NULL; - - int32 shardMinValue = DatumGetInt32(splitOffShard->minValue); - int32 shardMaxValue = DatumGetInt32(splitOffShard->maxValue); - text *shardMinValueText = IntegerToText(shardMinValue); - text *shardMaxValueText = IntegerToText(shardMaxValue); - - InsertShardRow(relationId, shardId, storageType, shardMinValueText, - shardMaxValueText); - - /* split off shard placement metadata */ - foreach(shardPlacementCell, sourcePlacementList) + /* Iterate on split children shards along with the respective placement workers */ + ShardInterval *shardInterval = NULL; + WorkerNode *workerPlacementNode = NULL; + forboth_ptr(shardInterval, shardIntervalList, workerPlacementNode, + workersForPlacementList) { - ShardPlacement *placement = (ShardPlacement *) lfirst(shardPlacementCell); - uint64 shardSize = 0; + InsertShardRow( + shardInterval->relationId, + shardInterval->shardId, + shardInterval->storageType, + IntegerToText(DatumGetInt32(shardInterval->minValue)), + IntegerToText(DatumGetInt32(shardInterval->maxValue))); - InsertShardPlacementRow(shardId, INVALID_PLACEMENT_ID, SHARD_STATE_ACTIVE, - shardSize, placement->groupId); + InsertShardPlacementRow( + shardInterval->shardId, + INVALID_PLACEMENT_ID, /* triggers generation of new id */ + SHARD_STATE_ACTIVE, + 0, /* shard length */ + workerPlacementNode->groupId); } - if (ShouldSyncTableMetadata(relationId)) + if (ShouldSyncTableMetadata(shardInterval->relationId)) { - syncedShardList = lappend(syncedShardList, splitOffShard); + syncedShardList = lappend(syncedShardList, shardInterval); } } /* send commands to synced nodes one by one */ List *splitOffShardMetadataCommandList = ShardListInsertCommand(syncedShardList); - foreach(commandCell, splitOffShardMetadataCommandList) + char *command = NULL; + foreach_ptr(command, splitOffShardMetadataCommandList) { - char *command = (char *) lfirst(commandCell); SendCommandToWorkersWithMetadata(command); } } +/* + * Create foreign key constraints on the split children shards. + */ +static void +CreateForeignKeyConstraints(List *shardGroupSplitIntervalListList, List *workersForPlacementList) +{ + /* Create constraints between shards */ + List* shardIntervalList = NULL; + foreach_ptr(shardIntervalList, shardGroupSplitIntervalListList) + { + ShardInterval *shardInterval = NULL; + WorkerNode *workerPlacementNode = NULL; + forboth_ptr(shardInterval, shardIntervalList, workerPlacementNode, + workersForPlacementList) + { + List *shardForeignConstraintCommandList = NIL; + List *referenceTableForeignConstraintList = NIL; + + CopyShardForeignConstraintCommandListGrouped(shardInterval, + &shardForeignConstraintCommandList, + &referenceTableForeignConstraintList); + + List *commandList = NIL; + commandList = list_concat(commandList, shardForeignConstraintCommandList); + commandList = list_concat(commandList, referenceTableForeignConstraintList); + + SendCommandListToWorkerOutsideTransaction( + workerPlacementNode->workerName, + workerPlacementNode->workerPort, + TableOwner(shardInterval->relationId), + commandList); + } + } +} + + /* * DropShardList drops shards and their metadata from both the coordinator and * mx nodes. @@ -796,79 +825,6 @@ DropShardList(List *shardIntervalList) } -/* - * CreateForeignConstraints creates the foreign constraints on the newly - * created shards via the tenant isolation. - * - * The function treats foreign keys to reference tables and foreign keys to - * co-located distributed tables differently. The former one needs to be - * executed over a single connection to prevent self-deadlocks. The latter - * one can be executed in parallel if there are multiple replicas. - */ -void -CreateForeignConstraints(List *splitOffShardList, List *sourcePlacementList) -{ - ListCell *splitOffShardCell = NULL; - - List *colocatedShardForeignConstraintCommandList = NIL; - List *referenceTableForeignConstraintList = NIL; - - foreach(splitOffShardCell, splitOffShardList) - { - ShardInterval *splitOffShard = (ShardInterval *) lfirst(splitOffShardCell); - - List *currentColocatedForeignKeyList = NIL; - List *currentReferenceForeignKeyList = NIL; - - CopyShardForeignConstraintCommandListGrouped(splitOffShard, - ¤tColocatedForeignKeyList, - ¤tReferenceForeignKeyList); - - colocatedShardForeignConstraintCommandList = - list_concat(colocatedShardForeignConstraintCommandList, - currentColocatedForeignKeyList); - referenceTableForeignConstraintList = - list_concat(referenceTableForeignConstraintList, - currentReferenceForeignKeyList); - } - - /* - * We can use parallel connections to while creating co-located foreign keys - * if the source placement . - * However, foreign keys to reference tables need to be created using a single - * connection per worker to prevent self-deadlocks. - */ - if (colocatedShardForeignConstraintCommandList != NIL) - { - ExecuteCommandListOnPlacements(colocatedShardForeignConstraintCommandList, - sourcePlacementList); - } - - if (referenceTableForeignConstraintList != NIL) - { - ListCell *shardPlacementCell = NULL; - foreach(shardPlacementCell, sourcePlacementList) - { - ShardPlacement *shardPlacement = - (ShardPlacement *) lfirst(shardPlacementCell); - - char *nodeName = shardPlacement->nodeName; - int32 nodePort = shardPlacement->nodePort; - - /* - * We're using the connections that we've used for dropping the - * source placements within the same coordinated transaction. - */ - char *command = NULL; - foreach_ptr(command, referenceTableForeignConstraintList) - { - SendCommandToWorker(nodeName, nodePort, command); - } - } - } -} - - /* * ExecuteCommandListOnPlacements runs the given command list on the nodes of * the given shard placement list. First, it creates connections. Then it sends diff --git a/src/include/distributed/shard_split.h b/src/include/distributed/shard_split.h index 5452e0224..cccd5efe9 100644 --- a/src/include/distributed/shard_split.h +++ b/src/include/distributed/shard_split.h @@ -41,10 +41,7 @@ extern void SplitShard(SplitMode splitMode, /* TODO(niupre): Make all these APIs private when all consumers (Example : ISOLATE_TENANT_TO_NEW_SHARD) directly call 'SplitShard' API. */ extern void ErrorIfCannotSplitShard(SplitOperation splitOperation, ShardInterval *sourceShard); -extern void InsertSplitOffShardMetadata(List *splitOffShardList, - List *sourcePlacementList); extern void DropShardList(List *shardIntervalList); -extern void CreateForeignConstraints(List *splitOffShardList, List *sourcePlacementList); extern void ExecuteCommandListOnPlacements(List *commandList, List *placementList); #endif /* SHARDSPLIT_H_ */