From f61a053f0000926cbc8622720acfaa9674ddb828 Mon Sep 17 00:00:00 2001 From: Nitish Upreti Date: Tue, 14 Jun 2022 12:17:50 -0700 Subject: [PATCH] Shard Split e2e code --- .../distributed/operations/shard_split.c | 366 +++++++++++++++++- 1 file changed, 353 insertions(+), 13 deletions(-) diff --git a/src/backend/distributed/operations/shard_split.c b/src/backend/distributed/operations/shard_split.c index a2e4f8bf8..1f8857221 100644 --- a/src/backend/distributed/operations/shard_split.c +++ b/src/backend/distributed/operations/shard_split.c @@ -27,17 +27,28 @@ #include "distributed/multi_partitioning_utils.h" #include "distributed/worker_transaction.h" #include "distributed/shared_library_init.h" +#include "distributed/pg_dist_shard.h" +#include "distributed/metadata_sync.h" + +/* + * These will be public function when citus-enterprise is merged + */ +static void ErrorIfCannotSplitShard(SplitOperation splitOperation, ShardInterval *sourceShard); +static void InsertSplitOffShardMetadata(List *splitOffShardList, + List *sourcePlacementList); +static void DropShardList(List *shardIntervalList); +static void CreateForeignConstraints(List *splitOffShardList, List *sourcePlacementList); +static void ExecuteCommandListOnPlacements(List *commandList, List *placementList); /* Function declarations */ static void ErrorIfCannotSplitShardExtended(SplitOperation splitOperation, ShardInterval *shardIntervalToSplit, List *shardSplitPointsList, List *nodeIdsForPlacementList); -/* TODO (niupre) : This will be public function when citus-enterprise is merged */ -static void ErrorIfCannotSplitShard(SplitOperation splitOperation, ShardInterval *sourceShard); static void CreateSplitShardsForShardGroup(List *splitChildrenShardIntervalListList, - List *workersForPlacementList); -static void CreateShardOnPlacement(List *splitShardCreationCommandList, + List *workersForPlacementList, + List **splitOffShardList); +static void CreateObjectOnPlacement(List *objectCreationCommandList, WorkerNode *workerNode); static List * CreateSplitIntervalsForShardGroup(List *sourceColocatedShardList, List *splitPointsForShard); @@ -390,21 +401,49 @@ BlockingShardSplit(SplitOperation splitOperation, List *sourceColocatedShardIntervalList = ColocatedShardIntervalList( shardIntervalToSplit); + BlockWritesToShardList(sourceColocatedShardIntervalList); + /* First create shard interval metadata for split children */ List *shardGroupSplitIntervalListList = CreateSplitIntervalsForShardGroup( sourceColocatedShardIntervalList, shardSplitPointsList); - /* Physically create split children on local/remote nodes */ + /* Physically create split children and perform split copy */ + List *splitOffShardList = NIL; CreateSplitShardsForShardGroup( shardGroupSplitIntervalListList, - workersForPlacementList); + workersForPlacementList, + &splitOffShardList); + + // Only single placement allowed (already validated by caller) + List *sourcePlacementList = ActiveShardPlacementList(shardIntervalToSplit->shardId); + Assert(sourcePlacementList->length == 1); + + /* + * Drop old shards and delete related metadata. Have to do that before + * creating the new shard metadata, because there's cross-checks + * preventing inconsistent metadata (like overlapping shards). + */ + DropShardList(sourceColocatedShardIntervalList); + + /* insert new metadata */ + InsertSplitOffShardMetadata(splitOffShardList, sourcePlacementList); + + /* + * Create foreign keys if exists after the metadata changes happening in + * DropShardList() and InsertSplitOffShardMetadata() because the foreign + * key creation depends on the new metadata. + */ + CreateForeignConstraints(splitOffShardList, sourcePlacementList); + + CitusInvalidateRelcacheByRelid(DistShardRelationId()); } /* Create ShardGroup split children on a list of corresponding workers. */ static void CreateSplitShardsForShardGroup(List *shardGroupSplitIntervalListList, - List *workersForPlacementList) + List *workersForPlacementList, + List **splitOffShardList) { /* Iterate on shard intervals for shard group */ List *shardIntervalList = NULL; @@ -425,27 +464,50 @@ CreateSplitShardsForShardGroup(List *shardGroupSplitIntervalListList, shardInterval->shardId); /* Create new split child shard on the specified placement list */ - CreateShardOnPlacement(splitShardCreationCommandList, workerPlacementNode); + CreateObjectOnPlacement(splitShardCreationCommandList, workerPlacementNode); + + (*splitOffShardList) = lappend(*splitOffShardList, shardInterval); + } + } + + /* Perform Split Copy */ + + + /* Create Indexes post copy */ + foreach_ptr(shardIntervalList, shardGroupSplitIntervalListList) + { + ShardInterval *shardInterval = NULL; + WorkerNode *workerPlacementNode = NULL; + forboth_ptr(shardInterval, shardIntervalList, workerPlacementNode, + workersForPlacementList) + { + List *indexCommandList = GetPostLoadTableCreationCommands( + shardInterval->relationId, + true /* includeIndexes */, + true /* includeReplicaIdentity */); + indexCommandList = WorkerApplyShardDDLCommandList( + indexCommandList, + shardInterval->shardId); + + CreateObjectOnPlacement(indexCommandList, workerPlacementNode); } } } - /* - * Create a shard (using DDL list) on a worker node. + * Create an object (shard/index) on a worker node. */ static void -CreateShardOnPlacement(List *splitShardCreationCommandList, +CreateObjectOnPlacement(List *objectCreationCommandList, WorkerNode *workerPlacementNode) { char *currentUser = CurrentUserName(); SendCommandListToWorkerOutsideTransaction(workerPlacementNode->workerName, workerPlacementNode->workerPort, currentUser, - splitShardCreationCommandList); + objectCreationCommandList); } - /* * Create split children intervals for a shardgroup given list of split points. */ @@ -514,3 +576,281 @@ CreateSplitIntervalsForShard(ShardInterval *sourceShard, splitChildShardInterval); } } + +/* + * InsertSplitOffShardMetadata inserts new shard and shard placement data into + * catolog tables both the coordinator and mx nodes. + */ +static void +InsertSplitOffShardMetadata(List *splitOffShardList, List *sourcePlacementList) +{ + List *syncedShardList = NIL; + ListCell *shardCell = NULL; + ListCell *commandCell = NULL; + + /* add new metadata */ + foreach(shardCell, splitOffShardList) + { + ShardInterval *splitOffShard = (ShardInterval *) lfirst(shardCell); + Oid relationId = splitOffShard->relationId; + uint64 shardId = splitOffShard->shardId; + char storageType = splitOffShard->storageType; + ListCell *shardPlacementCell = NULL; + + int32 shardMinValue = DatumGetInt32(splitOffShard->minValue); + int32 shardMaxValue = DatumGetInt32(splitOffShard->maxValue); + text *shardMinValueText = IntegerToText(shardMinValue); + text *shardMaxValueText = IntegerToText(shardMaxValue); + + InsertShardRow(relationId, shardId, storageType, shardMinValueText, + shardMaxValueText); + + /* split off shard placement metadata */ + foreach(shardPlacementCell, sourcePlacementList) + { + ShardPlacement *placement = (ShardPlacement *) lfirst(shardPlacementCell); + uint64 shardSize = 0; + + InsertShardPlacementRow(shardId, INVALID_PLACEMENT_ID, SHARD_STATE_ACTIVE, + shardSize, placement->groupId); + } + + if (ShouldSyncTableMetadata(relationId)) + { + syncedShardList = lappend(syncedShardList, splitOffShard); + } + } + + /* send commands to synced nodes one by one */ + List *splitOffShardMetadataCommandList = ShardListInsertCommand(syncedShardList); + foreach(commandCell, splitOffShardMetadataCommandList) + { + char *command = (char *) lfirst(commandCell); + SendCommandToWorkersWithMetadata(command); + } +} + +/* + * DropShardList drops shards and their metadata from both the coordinator and + * mx nodes. + */ +static void +DropShardList(List *shardIntervalList) +{ + ListCell *shardIntervalCell = NULL; + + foreach(shardIntervalCell, shardIntervalList) + { + ShardInterval *shardInterval = (ShardInterval *) lfirst(shardIntervalCell); + ListCell *shardPlacementCell = NULL; + Oid relationId = shardInterval->relationId; + uint64 oldShardId = shardInterval->shardId; + + /* delete metadata from synced nodes */ + if (ShouldSyncTableMetadata(relationId)) + { + ListCell *commandCell = NULL; + + /* send the commands one by one */ + List *shardMetadataDeleteCommandList = ShardDeleteCommandList(shardInterval); + foreach(commandCell, shardMetadataDeleteCommandList) + { + char *command = (char *) lfirst(commandCell); + SendCommandToWorkersWithMetadata(command); + } + } + + /* delete shard placements and drop shards */ + List *shardPlacementList = ActiveShardPlacementList(oldShardId); + foreach(shardPlacementCell, shardPlacementList) + { + ShardPlacement *placement = (ShardPlacement *) lfirst(shardPlacementCell); + char *workerName = placement->nodeName; + uint32 workerPort = placement->nodePort; + StringInfo dropQuery = makeStringInfo(); + + DeleteShardPlacementRow(placement->placementId); + + /* get shard name */ + char *qualifiedShardName = ConstructQualifiedShardName(shardInterval); + + char storageType = shardInterval->storageType; + if (storageType == SHARD_STORAGE_TABLE) + { + appendStringInfo(dropQuery, DROP_REGULAR_TABLE_COMMAND, + qualifiedShardName); + } + else if (storageType == SHARD_STORAGE_FOREIGN) + { + appendStringInfo(dropQuery, DROP_FOREIGN_TABLE_COMMAND, + qualifiedShardName); + } + + /* drop old shard */ + SendCommandToWorker(workerName, workerPort, dropQuery->data); + } + + /* delete shard row */ + DeleteShardRow(oldShardId); + } +} + +/* + * CreateForeignConstraints creates the foreign constraints on the newly + * created shards via the tenant isolation. + * + * The function treats foreign keys to reference tables and foreign keys to + * co-located distributed tables differently. The former one needs to be + * executed over a single connection to prevent self-deadlocks. The latter + * one can be executed in parallel if there are multiple replicas. + */ +static void +CreateForeignConstraints(List *splitOffShardList, List *sourcePlacementList) +{ + ListCell *splitOffShardCell = NULL; + + List *colocatedShardForeignConstraintCommandList = NIL; + List *referenceTableForeignConstraintList = NIL; + + foreach(splitOffShardCell, splitOffShardList) + { + ShardInterval *splitOffShard = (ShardInterval *) lfirst(splitOffShardCell); + + List *currentColocatedForeignKeyList = NIL; + List *currentReferenceForeignKeyList = NIL; + + CopyShardForeignConstraintCommandListGrouped(splitOffShard, + ¤tColocatedForeignKeyList, + ¤tReferenceForeignKeyList); + + colocatedShardForeignConstraintCommandList = + list_concat(colocatedShardForeignConstraintCommandList, + currentColocatedForeignKeyList); + referenceTableForeignConstraintList = + list_concat(referenceTableForeignConstraintList, + currentReferenceForeignKeyList); + } + + /* + * We can use parallel connections to while creating co-located foreign keys + * if the source placement . + * However, foreign keys to reference tables need to be created using a single + * connection per worker to prevent self-deadlocks. + */ + if (colocatedShardForeignConstraintCommandList != NIL) + { + ExecuteCommandListOnPlacements(colocatedShardForeignConstraintCommandList, + sourcePlacementList); + } + + if (referenceTableForeignConstraintList != NIL) + { + ListCell *shardPlacementCell = NULL; + foreach(shardPlacementCell, sourcePlacementList) + { + ShardPlacement *shardPlacement = + (ShardPlacement *) lfirst(shardPlacementCell); + + char *nodeName = shardPlacement->nodeName; + int32 nodePort = shardPlacement->nodePort; + + /* + * We're using the connections that we've used for dropping the + * source placements within the same coordinated transaction. + */ + char *command = NULL; + foreach_ptr(command, referenceTableForeignConstraintList) + { + SendCommandToWorker(nodeName, nodePort, command); + } + } + } +} + +/* + * ExecuteCommandListOnPlacements runs the given command list on the nodes of + * the given shard placement list. First, it creates connections. Then it sends + * commands one by one. For every command, first it send the command to all + * connections and then checks the results. This helps to run long running + * commands in parallel. Finally, it sends commit messages to all connections + * and close them. + */ +static void +ExecuteCommandListOnPlacements(List *commandList, List *placementList) +{ + List *workerConnectionList = NIL; + ListCell *workerConnectionCell = NULL; + ListCell *shardPlacementCell = NULL; + ListCell *commandCell = NULL; + + /* create connections and start transactions */ + foreach(shardPlacementCell, placementList) + { + ShardPlacement *shardPlacement = (ShardPlacement *) lfirst(shardPlacementCell); + char *nodeName = shardPlacement->nodeName; + int32 nodePort = shardPlacement->nodePort; + + int connectionFlags = FORCE_NEW_CONNECTION; + char *currentUser = CurrentUserName(); + + /* create a new connection */ + MultiConnection *workerConnection = GetNodeUserDatabaseConnection(connectionFlags, + nodeName, + nodePort, + currentUser, + NULL); + + /* mark connection as critical ans start transaction */ + MarkRemoteTransactionCritical(workerConnection); + RemoteTransactionBegin(workerConnection); + + /* add connection to the list */ + workerConnectionList = lappend(workerConnectionList, workerConnection); + } + + /* send and check results for every command one by one */ + foreach(commandCell, commandList) + { + char *command = lfirst(commandCell); + + /* first only send the command */ + foreach(workerConnectionCell, workerConnectionList) + { + MultiConnection *workerConnection = + (MultiConnection *) lfirst(workerConnectionCell); + + int querySent = SendRemoteCommand(workerConnection, command); + if (querySent == 0) + { + ReportConnectionError(workerConnection, ERROR); + } + } + + /* then check the result separately to run long running commands in parallel */ + foreach(workerConnectionCell, workerConnectionList) + { + MultiConnection *workerConnection = + (MultiConnection *) lfirst(workerConnectionCell); + bool raiseInterrupts = true; + + PGresult *result = GetRemoteCommandResult(workerConnection, raiseInterrupts); + if (!IsResponseOK(result)) + { + ReportResultError(workerConnection, result, ERROR); + } + + PQclear(result); + ForgetResults(workerConnection); + } + } + + /* finally commit each transaction and close connections */ + foreach(workerConnectionCell, workerConnectionList) + { + MultiConnection *workerConnection = + (MultiConnection *) lfirst(workerConnectionCell); + + RemoteTransactionCommit(workerConnection); + CloseConnection(workerConnection); + } +}