Cleanup on catch

pull/6029/head
Nitish Upreti 2022-06-28 16:22:37 -07:00
parent dfec00940d
commit ebd0e9a617
1 changed files with 86 additions and 25 deletions

View File

@ -64,6 +64,8 @@ static void InsertSplitChildrenShardMetadata(List *shardGroupSplitIntervalListLi
List *workersForPlacementList);
static void CreateForeignKeyConstraints(List *shardGroupSplitIntervalListList,
List *workersForPlacementList);
static void TryDropSplitShardsOnFailure(List *shardGroupSplitIntervalListList,
List *workersForPlacementList);
/* Customize error message strings based on operation type */
static const char *const SplitOperationName[] =
@ -418,34 +420,47 @@ BlockingShardSplit(SplitOperation splitOperation,
WorkerNode *sourceShardToCopyNode = FindNodeWithNodeId(sourceShardPlacement->nodeId,
false /* missingOk */);
/*
* Physically create split children, perform split copy and create auxillary structures.
* This includes: indexes, replicaIdentity. triggers and statistics.
* Foreign key constraints are created after Metadata changes (see CreateForeignKeyConstraints).
*/
CreateSplitShardsForShardGroup(
sourceShardToCopyNode,
sourceColocatedShardIntervalList,
shardGroupSplitIntervalListList,
workersForPlacementList);
PG_TRY();
{
/*
* Physically create split children, perform split copy and create auxillary structures.
* This includes: indexes, replicaIdentity. triggers and statistics.
* Foreign key constraints are created after Metadata changes (see CreateForeignKeyConstraints).
*/
CreateSplitShardsForShardGroup(
sourceShardToCopyNode,
sourceColocatedShardIntervalList,
shardGroupSplitIntervalListList,
workersForPlacementList);
/*
* Drop old shards and delete related metadata. Have to do that before
* creating the new shard metadata, because there's cross-checks
* preventing inconsistent metadata (like overlapping shards).
*/
DropShardList(sourceColocatedShardIntervalList);
/*
* Drop old shards and delete related metadata. Have to do that before
* creating the new shard metadata, because there's cross-checks
* preventing inconsistent metadata (like overlapping shards).
*/
DropShardList(sourceColocatedShardIntervalList);
/* Insert new shard and placement metdata */
InsertSplitChildrenShardMetadata(shardGroupSplitIntervalListList,
workersForPlacementList);
/* Insert new shard and placement metdata */
InsertSplitChildrenShardMetadata(shardGroupSplitIntervalListList,
workersForPlacementList);
/*
* Create foreign keys if exists after the metadata changes happening in
* DropShardList() and InsertSplitChildrenShardMetadata() because the foreign
* key creation depends on the new metadata.
*/
CreateForeignKeyConstraints(shardGroupSplitIntervalListList, workersForPlacementList);
/*
* Create foreign keys if exists after the metadata changes happening in
* DropShardList() and InsertSplitChildrenShardMetadata() because the foreign
* key creation depends on the new metadata.
*/
CreateForeignKeyConstraints(shardGroupSplitIntervalListList,
workersForPlacementList);
}
PG_CATCH();
{
/* Do a best effort cleanup of shards created on workers in the above block */
TryDropSplitShardsOnFailure(shardGroupSplitIntervalListList,
workersForPlacementList);
PG_RE_THROW();
}
PG_END_TRY();
CitusInvalidateRelcacheByRelid(DistShardRelationId());
}
@ -841,3 +856,49 @@ DropShardList(List *shardIntervalList)
DeleteShardRow(oldShardId);
}
}
/*
* In case of failure, DropShardPlacementList drops shard placements and their metadata from both the
* coordinator and mx nodes.
*/
static void
TryDropSplitShardsOnFailure(List *shardGroupSplitIntervalListList,
List *workersForPlacementList)
{
List *shardIntervalList = NULL;
foreach_ptr(shardIntervalList, shardGroupSplitIntervalListList)
{
/* Iterate on split shard interval list and corresponding placement worker */
ShardInterval *shardInterval = NULL;
WorkerNode *workerPlacementNode = NULL;
forboth_ptr(shardInterval, shardIntervalList, workerPlacementNode,
workersForPlacementList)
{
char *qualifiedShardName = ConstructQualifiedShardName(shardInterval);
StringInfo dropShardQuery = makeStringInfo();
/* Caller enforces that foreign tables cannot be split (use DROP_REGULAR_TABLE_COMMAND) */
appendStringInfo(dropShardQuery, DROP_REGULAR_TABLE_COMMAND,
qualifiedShardName);
int connectionFlags = FOR_DDL;
connectionFlags |= OUTSIDE_TRANSACTION;
MultiConnection *connnection = GetNodeUserDatabaseConnection(
connectionFlags,
workerPlacementNode->workerName,
workerPlacementNode->workerPort,
CurrentUserName(),
NULL /* databaseName */);
/*
* Perform a drop in best effort manner.
* The shard may or may not exist and the connection could have died.
*/
ExecuteOptionalRemoteCommand(
connnection,
dropShardQuery->data,
NULL /* pgResult */);
}
}
}