From ebd0e9a617bdb6b4ae9f88bd053b486a8cad6780 Mon Sep 17 00:00:00 2001 From: Nitish Upreti Date: Tue, 28 Jun 2022 16:22:37 -0700 Subject: [PATCH] Cleanup on catch --- .../distributed/operations/shard_split.c | 111 ++++++++++++++---- 1 file changed, 86 insertions(+), 25 deletions(-) diff --git a/src/backend/distributed/operations/shard_split.c b/src/backend/distributed/operations/shard_split.c index 862454817..aeeca3c1e 100644 --- a/src/backend/distributed/operations/shard_split.c +++ b/src/backend/distributed/operations/shard_split.c @@ -64,6 +64,8 @@ static void InsertSplitChildrenShardMetadata(List *shardGroupSplitIntervalListLi List *workersForPlacementList); static void CreateForeignKeyConstraints(List *shardGroupSplitIntervalListList, List *workersForPlacementList); +static void TryDropSplitShardsOnFailure(List *shardGroupSplitIntervalListList, + List *workersForPlacementList); /* Customize error message strings based on operation type */ static const char *const SplitOperationName[] = @@ -418,34 +420,47 @@ BlockingShardSplit(SplitOperation splitOperation, WorkerNode *sourceShardToCopyNode = FindNodeWithNodeId(sourceShardPlacement->nodeId, false /* missingOk */); - /* - * Physically create split children, perform split copy and create auxillary structures. - * This includes: indexes, replicaIdentity. triggers and statistics. - * Foreign key constraints are created after Metadata changes (see CreateForeignKeyConstraints). - */ - CreateSplitShardsForShardGroup( - sourceShardToCopyNode, - sourceColocatedShardIntervalList, - shardGroupSplitIntervalListList, - workersForPlacementList); + PG_TRY(); + { + /* + * Physically create split children, perform split copy and create auxillary structures. + * This includes: indexes, replicaIdentity. triggers and statistics. + * Foreign key constraints are created after Metadata changes (see CreateForeignKeyConstraints). + */ + CreateSplitShardsForShardGroup( + sourceShardToCopyNode, + sourceColocatedShardIntervalList, + shardGroupSplitIntervalListList, + workersForPlacementList); - /* - * Drop old shards and delete related metadata. Have to do that before - * creating the new shard metadata, because there's cross-checks - * preventing inconsistent metadata (like overlapping shards). - */ - DropShardList(sourceColocatedShardIntervalList); + /* + * Drop old shards and delete related metadata. Have to do that before + * creating the new shard metadata, because there's cross-checks + * preventing inconsistent metadata (like overlapping shards). + */ + DropShardList(sourceColocatedShardIntervalList); - /* Insert new shard and placement metdata */ - InsertSplitChildrenShardMetadata(shardGroupSplitIntervalListList, - workersForPlacementList); + /* Insert new shard and placement metdata */ + InsertSplitChildrenShardMetadata(shardGroupSplitIntervalListList, + workersForPlacementList); - /* - * Create foreign keys if exists after the metadata changes happening in - * DropShardList() and InsertSplitChildrenShardMetadata() because the foreign - * key creation depends on the new metadata. - */ - CreateForeignKeyConstraints(shardGroupSplitIntervalListList, workersForPlacementList); + /* + * Create foreign keys if exists after the metadata changes happening in + * DropShardList() and InsertSplitChildrenShardMetadata() because the foreign + * key creation depends on the new metadata. + */ + CreateForeignKeyConstraints(shardGroupSplitIntervalListList, + workersForPlacementList); + } + PG_CATCH(); + { + /* Do a best effort cleanup of shards created on workers in the above block */ + TryDropSplitShardsOnFailure(shardGroupSplitIntervalListList, + workersForPlacementList); + + PG_RE_THROW(); + } + PG_END_TRY(); CitusInvalidateRelcacheByRelid(DistShardRelationId()); } @@ -841,3 +856,49 @@ DropShardList(List *shardIntervalList) DeleteShardRow(oldShardId); } } + + +/* + * In case of failure, DropShardPlacementList drops shard placements and their metadata from both the + * coordinator and mx nodes. + */ +static void +TryDropSplitShardsOnFailure(List *shardGroupSplitIntervalListList, + List *workersForPlacementList) +{ + List *shardIntervalList = NULL; + foreach_ptr(shardIntervalList, shardGroupSplitIntervalListList) + { + /* Iterate on split shard interval list and corresponding placement worker */ + ShardInterval *shardInterval = NULL; + WorkerNode *workerPlacementNode = NULL; + forboth_ptr(shardInterval, shardIntervalList, workerPlacementNode, + workersForPlacementList) + { + char *qualifiedShardName = ConstructQualifiedShardName(shardInterval); + StringInfo dropShardQuery = makeStringInfo(); + + /* Caller enforces that foreign tables cannot be split (use DROP_REGULAR_TABLE_COMMAND) */ + appendStringInfo(dropShardQuery, DROP_REGULAR_TABLE_COMMAND, + qualifiedShardName); + + int connectionFlags = FOR_DDL; + connectionFlags |= OUTSIDE_TRANSACTION; + MultiConnection *connnection = GetNodeUserDatabaseConnection( + connectionFlags, + workerPlacementNode->workerName, + workerPlacementNode->workerPort, + CurrentUserName(), + NULL /* databaseName */); + + /* + * Perform a drop in best effort manner. + * The shard may or may not exist and the connection could have died. + */ + ExecuteOptionalRemoteCommand( + connnection, + dropShardQuery->data, + NULL /* pgResult */); + } + } +}