From c8b7817bec992e38542899b86a64e7a902914773 Mon Sep 17 00:00:00 2001 From: Nitish Upreti Date: Tue, 30 Aug 2022 17:36:59 -0700 Subject: [PATCH] Rebase shard_split changes on main merge --- .../distributed/operations/shard_split.c | 446 ++++++++---------- 1 file changed, 208 insertions(+), 238 deletions(-) diff --git a/src/backend/distributed/operations/shard_split.c b/src/backend/distributed/operations/shard_split.c index bc4ad208b..a70a718b9 100644 --- a/src/backend/distributed/operations/shard_split.c +++ b/src/backend/distributed/operations/shard_split.c @@ -34,6 +34,7 @@ #include "distributed/multi_partitioning_utils.h" #include "distributed/worker_manager.h" #include "distributed/worker_transaction.h" +#include "distributed/shard_cleaner.h" #include "distributed/shared_library_init.h" #include "distributed/pg_dist_shard.h" #include "distributed/metadata_sync.h" @@ -45,6 +46,9 @@ #include "distributed/shard_rebalancer.h" #include "postmaster/postmaster.h" +/* declarations for dynamic loading */ +bool DeferShardDeleteOnSplit = true; + /* * Entry for map that tracks ShardInterval -> Placement Node * created by split workflow. @@ -73,12 +77,13 @@ static void ErrorIfCannotSplitShardExtended(SplitOperation splitOperation, ShardInterval *shardIntervalToSplit, List *shardSplitPointsList, List *nodeIdsForPlacementList); +static bool CheckIfRelationWithSameNameExists(ShardInterval *shardInterval, + WorkerNode *workerNode); static void ErrorIfModificationAndSplitInTheSameTransaction(SplitOperation splitOperation); -static void CreateSplitShardsForShardGroup(HTAB *mapOfShardToPlacementCreatedByWorkflow, - List *shardGroupSplitIntervalListList, +static void CreateSplitShardsForShardGroup(List *shardGroupSplitIntervalListList, List *workersForPlacementList); -static void CreateDummyShardsForShardGroup(HTAB *mapOfDummyShardToPlacement, +static void CreateDummyShardsForShardGroup(HTAB *mapOfPlacementToDummyShardList, List *sourceColocatedShardIntervalList, List *shardGroupSplitIntervalListList, WorkerNode *sourceWorkerNode, @@ -87,7 +92,7 @@ static HTAB * CreateWorkerForPlacementSet(List *workersForPlacementList); static void CreateAuxiliaryStructuresForShardGroup(List *shardGroupSplitIntervalListList, List *workersForPlacementList, bool includeReplicaIdentity); -static void CreateReplicaIdentitiesForDummyShards(HTAB *mapOfDummyShardToPlacement); +static void CreateReplicaIdentitiesForDummyShards(HTAB *mapOfPlacementToDummyShardList); static void CreateObjectOnPlacement(List *objectCreationCommandList, WorkerNode *workerNode); static List * CreateSplitIntervalsForShardGroup(List *sourceColocatedShardList, @@ -131,8 +136,6 @@ static void CreatePartitioningHierarchy(List *shardGroupSplitIntervalListList, List *workersForPlacementList); static void CreateForeignKeyConstraints(List *shardGroupSplitIntervalListList, List *workersForPlacementList); -static void TryDropSplitShardsOnFailure(HTAB *mapOfShardToPlacementCreatedByWorkflow); -static HTAB * CreateEmptyMapForShardsCreatedByWorkflow(); static Task * CreateTaskForDDLCommandList(List *ddlCommandList, WorkerNode *workerNode); static StringInfo CreateSplitShardReplicationSetupUDF( List *sourceColocatedShardIntervalList, List *shardGroupSplitIntervalListList, @@ -148,10 +151,8 @@ static List * ExecuteSplitShardReplicationSetupUDF(WorkerNode *sourceWorkerNode, DistributionColumnMap * distributionColumnOverrides); static void ExecuteSplitShardReleaseSharedMemory(WorkerNode *sourceWorkerNode); -static void AddDummyShardEntryInMap(HTAB *mapOfDummyShards, uint32 targetNodeId, +static void AddDummyShardEntryInMap(HTAB *mapOfPlacementToDummyShardList, uint32 targetNodeId, ShardInterval *shardInterval); -static void DropDummyShards(HTAB *mapOfDummyShardToPlacement); -static void DropDummyShard(MultiConnection *connection, ShardInterval *shardInterval); static uint64 GetNextShardIdForSplitChild(void); static void AcquireNonblockingSplitLock(Oid relationId); static List * GetWorkerNodesFromWorkerIds(List *nodeIdsForPlacementList); @@ -515,6 +516,9 @@ SplitShard(SplitMode splitMode, /* use the user-specified shard ID as the split workflow ID */ uint64 splitWorkflowId = shardIntervalToSplit->shardId; + /* Start operation to prepare for generating cleanup records */ + StartNewOperationNeedingCleanup(); + if (splitMode == BLOCKING_SPLIT) { EnsureReferenceTablesExistOnAllNodesExtended(TRANSFER_MODE_BLOCK_WRITES); @@ -539,70 +543,9 @@ SplitShard(SplitMode splitMode, PlacementMovedUsingLogicalReplicationInTX = true; } -} - -/* - * ShardIntervalHashCode computes the hash code for a Shardinterval using - * shardId. - */ -static uint32 -ShardIntervalHashCode(const void *key, Size keySize) -{ - const ShardInterval *shardInterval = (const ShardInterval *) key; - const uint64 *shardId = &(shardInterval->shardId); - - /* standard hash function outlined in Effective Java, Item 8 */ - uint32 result = 17; - result = 37 * result + tag_hash(shardId, sizeof(uint64)); - - return result; -} - - -/* - * ShardIntervalHashCompare compares two shard intervals using shard id. - */ -static int -ShardIntervalHashCompare(const void *lhsKey, const void *rhsKey, Size keySize) -{ - const ShardInterval *intervalLhs = (const ShardInterval *) lhsKey; - const ShardInterval *intervalRhs = (const ShardInterval *) rhsKey; - - int shardIdCompare = 0; - - /* first, compare by shard id */ - if (intervalLhs->shardId < intervalRhs->shardId) - { - shardIdCompare = -1; - } - else if (intervalLhs->shardId > intervalRhs->shardId) - { - shardIdCompare = 1; - } - - return shardIdCompare; -} - - -/* Create an empty map that tracks ShardInterval -> Placement Node as created by workflow */ -static HTAB * -CreateEmptyMapForShardsCreatedByWorkflow() -{ - HASHCTL info = { 0 }; - info.keysize = sizeof(ShardInterval); - info.entrysize = sizeof(ShardCreatedByWorkflowEntry); - info.hash = ShardIntervalHashCode; - info.match = ShardIntervalHashCompare; - info.hcxt = CurrentMemoryContext; - - /* we don't have value field as it's a set */ - info.entrysize = info.keysize; - uint32 hashFlags = (HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT); - - HTAB *splitChildrenCreatedByWorkflow = hash_create("Shard id to Node Placement Map", - 32, &info, hashFlags); - return splitChildrenCreatedByWorkflow; + bool isSuccess = true; + CompleteNewOperationNeedingCleanup(isSuccess); } @@ -635,13 +578,10 @@ BlockingShardSplit(SplitOperation splitOperation, WorkerNode *sourceShardNode = ActiveShardPlacementWorkerNode(firstShard->shardId); - HTAB *mapOfShardToPlacementCreatedByWorkflow = - CreateEmptyMapForShardsCreatedByWorkflow(); PG_TRY(); { /* Physically create split children. */ - CreateSplitShardsForShardGroup(mapOfShardToPlacementCreatedByWorkflow, - shardGroupSplitIntervalListList, + CreateSplitShardsForShardGroup(shardGroupSplitIntervalListList, workersForPlacementList); /* For Blocking split, copy isn't snapshotted */ @@ -690,7 +630,8 @@ BlockingShardSplit(SplitOperation splitOperation, ShutdownAllConnections(); /* Do a best effort cleanup of shards created on workers in the above block */ - TryDropSplitShardsOnFailure(mapOfShardToPlacementCreatedByWorkflow); + bool isSuccess = false; + CompleteNewOperationNeedingCleanup(isSuccess); PG_RE_THROW(); } @@ -701,10 +642,58 @@ BlockingShardSplit(SplitOperation splitOperation, } +/* Check if a relation with given name already exists on the worker node */ +static bool +CheckIfRelationWithSameNameExists(ShardInterval *shardInterval, WorkerNode *workerNode) +{ + char *schemaName = get_namespace_name( + get_rel_namespace(shardInterval->relationId)); + char *shardName = get_rel_name(shardInterval->relationId); + AppendShardIdToName(&shardName, shardInterval->shardId); + + StringInfo checkShardExistsQuery = makeStringInfo(); + appendStringInfo(checkShardExistsQuery, + "SELECT EXISTS (SELECT FROM pg_tables WHERE schemaname = '%s' AND tablename = '%s');", + schemaName, + shardName); + + int connectionFlags = 0; + MultiConnection *connection = GetNodeUserDatabaseConnection(connectionFlags, + workerNode->workerName, + workerNode->workerPort, + CitusExtensionOwnerName(), + get_database_name( + MyDatabaseId)); + + PGresult *result = NULL; + int queryResult = ExecuteOptionalRemoteCommand(connection, + checkShardExistsQuery->data, &result); + if (queryResult != RESPONSE_OKAY || !IsResponseOK(result) || PQntuples(result) != 1) + { + ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg( + "Cannot check if relation %s already exists for split on worker %s:%d", + ConstructQualifiedShardName(shardInterval), + connection->hostname, + connection->port))); + + PQclear(result); + ForgetResults(connection); + + return false; + } + + char *checkExists = PQgetvalue(result, 0, 0); + PQclear(result); + ForgetResults(connection); + + return strcmp(checkExists, "t") == 0; +} + + /* Create ShardGroup split children on a list of corresponding workers. */ static void -CreateSplitShardsForShardGroup(HTAB *mapOfShardToPlacementCreatedByWorkflow, - List *shardGroupSplitIntervalListList, +CreateSplitShardsForShardGroup(List *shardGroupSplitIntervalListList, List *workersForPlacementList) { /* @@ -732,16 +721,35 @@ CreateSplitShardsForShardGroup(HTAB *mapOfShardToPlacementCreatedByWorkflow, splitShardCreationCommandList, shardInterval->shardId); - /* Create new split child shard on the specified placement list */ - CreateObjectOnPlacement(splitShardCreationCommandList, workerPlacementNode); + /* Log resource for cleanup in case of failure only. + * Before we log a record, do a best effort check to see if a shard with same name exists. + * This is because, it will cause shard creation to fail and we will end up cleaning the + * old shard. We don't want that. + */ + bool relationExists = CheckIfRelationWithSameNameExists(shardInterval, + workerPlacementNode); - ShardCreatedByWorkflowEntry entry; - entry.shardIntervalKey = shardInterval; - entry.workerNodeValue = workerPlacementNode; - bool found = false; - hash_search(mapOfShardToPlacementCreatedByWorkflow, &entry, HASH_ENTER, - &found); - Assert(!found); + if (relationExists) + { + ereport(ERROR, (errcode(ERRCODE_DUPLICATE_TABLE), + errmsg("Relation %s already exists on worker %s:%d.", + ConstructQualifiedShardName(shardInterval), + workerPlacementNode->workerName, + workerPlacementNode->workerPort))); + } + else + { + CleanupPolicy policy = CLEANUP_ON_FAILURE; + InsertCleanupRecordInSubtransaction(CLEANUP_SHARD_PLACEMENT, + ConstructQualifiedShardName( + shardInterval), + workerPlacementNode->groupId, + policy); + + /* Create new split child shard on the specified placement list */ + CreateObjectOnPlacement(splitShardCreationCommandList, + workerPlacementNode); + } } } } @@ -1336,20 +1344,37 @@ DropShardList(List *shardIntervalList) /* get shard name */ char *qualifiedShardName = ConstructQualifiedShardName(shardInterval); - char storageType = shardInterval->storageType; - if (storageType == SHARD_STORAGE_TABLE) + if (DeferShardDeleteOnSplit) { - appendStringInfo(dropQuery, DROP_REGULAR_TABLE_COMMAND, - qualifiedShardName); + /* Log shard in pg_dist_cleanup. + * Parent shards are to be dropped only on sucess after split workflow is complete, + * so mark the policy as 'CLEANUP_DEFERRED_ON_SUCCESS'. + * We also log cleanup record in the current transaction. If the current transaction rolls back, + * we do not generate a record at all. + */ + CleanupPolicy policy = CLEANUP_DEFERRED_ON_SUCCESS; + InsertCleanupRecordInCurrentTransaction(CLEANUP_SHARD_PLACEMENT, + qualifiedShardName, + placement->groupId, + policy); } - else if (storageType == SHARD_STORAGE_FOREIGN) + else { - appendStringInfo(dropQuery, DROP_FOREIGN_TABLE_COMMAND, - qualifiedShardName); - } + char storageType = shardInterval->storageType; + if (storageType == SHARD_STORAGE_TABLE) + { + appendStringInfo(dropQuery, DROP_REGULAR_TABLE_COMMAND, + qualifiedShardName); + } + else if (storageType == SHARD_STORAGE_FOREIGN) + { + appendStringInfo(dropQuery, DROP_FOREIGN_TABLE_COMMAND, + qualifiedShardName); + } - /* drop old shard */ - SendCommandToWorker(workerName, workerPort, dropQuery->data); + /* drop old shard */ + SendCommandToWorker(workerName, workerPort, dropQuery->data); + } } /* delete shard row */ @@ -1358,50 +1383,6 @@ DropShardList(List *shardIntervalList) } -/* - * In case of failure, TryDropSplitShardsOnFailure drops in-progress shard placements from both the - * coordinator and mx nodes. - */ -static void -TryDropSplitShardsOnFailure(HTAB *mapOfShardToPlacementCreatedByWorkflow) -{ - HASH_SEQ_STATUS status; - ShardCreatedByWorkflowEntry *entry; - - hash_seq_init(&status, mapOfShardToPlacementCreatedByWorkflow); - while ((entry = (ShardCreatedByWorkflowEntry *) hash_seq_search(&status)) != 0) - { - ShardInterval *shardInterval = entry->shardIntervalKey; - WorkerNode *workerPlacementNode = entry->workerNodeValue; - - char *qualifiedShardName = ConstructQualifiedShardName(shardInterval); - StringInfo dropShardQuery = makeStringInfo(); - - /* Caller enforces that foreign tables cannot be split (use DROP_REGULAR_TABLE_COMMAND) */ - appendStringInfo(dropShardQuery, DROP_REGULAR_TABLE_COMMAND, - qualifiedShardName); - - int connectionFlags = FOR_DDL; - connectionFlags |= OUTSIDE_TRANSACTION; - MultiConnection *connnection = GetNodeUserDatabaseConnection( - connectionFlags, - workerPlacementNode->workerName, - workerPlacementNode->workerPort, - CurrentUserName(), - NULL /* databaseName */); - - /* - * Perform a drop in best effort manner. - * The shard may or may not exist and the connection could have died. - */ - ExecuteOptionalRemoteCommand( - connnection, - dropShardQuery->data, - NULL /* pgResult */); - } -} - - /* * AcquireNonblockingSplitLock does not allow concurrent nonblocking splits, because we share memory and * replication slots. @@ -1488,11 +1469,6 @@ NonBlockingShardSplit(SplitOperation splitOperation, databaseName); ClaimConnectionExclusively(sourceConnection); - HTAB *mapOfShardToPlacementCreatedByWorkflow = - CreateEmptyMapForShardsCreatedByWorkflow(); - - HTAB *mapOfDummyShardToPlacement = CreateSimpleHash(NodeAndOwner, - GroupedShardSplitInfos); MultiConnection *sourceReplicationConnection = GetReplicationConnection(sourceShardToCopyNode->workerName, sourceShardToCopyNode->workerPort); @@ -1501,8 +1477,7 @@ NonBlockingShardSplit(SplitOperation splitOperation, PG_TRY(); { /* 1) Physically create split children. */ - CreateSplitShardsForShardGroup(mapOfShardToPlacementCreatedByWorkflow, - shardGroupSplitIntervalListList, + CreateSplitShardsForShardGroup(shardGroupSplitIntervalListList, workersForPlacementList); /* @@ -1510,8 +1485,10 @@ NonBlockingShardSplit(SplitOperation splitOperation, * Refer to the comment section of 'CreateDummyShardsForShardGroup' for indepth * information. */ + HTAB *mapOfPlacementToDummyShardList = CreateSimpleHash(NodeAndOwner, + GroupedShardSplitInfos); CreateDummyShardsForShardGroup( - mapOfDummyShardToPlacement, + mapOfPlacementToDummyShardList, sourceColocatedShardIntervalList, shardGroupSplitIntervalListList, sourceShardToCopyNode, @@ -1526,7 +1503,7 @@ NonBlockingShardSplit(SplitOperation splitOperation, * initial COPY phase, like we do for the replica identities on the * target shards. */ - CreateReplicaIdentitiesForDummyShards(mapOfDummyShardToPlacement); + CreateReplicaIdentitiesForDummyShards(mapOfPlacementToDummyShardList); /* 4) Create Publications. */ CreatePublications(sourceConnection, publicationInfoHash); @@ -1680,11 +1657,6 @@ NonBlockingShardSplit(SplitOperation splitOperation, CreateForeignKeyConstraints(shardGroupSplitIntervalListList, workersForPlacementList); - /* - * 24) Drop dummy shards. - */ - DropDummyShards(mapOfDummyShardToPlacement); - /* * 24) Release shared memory allocated by worker_split_shard_replication_setup udf * at source node. @@ -1705,14 +1677,14 @@ NonBlockingShardSplit(SplitOperation splitOperation, /* end ongoing transactions to enable us to clean up */ ShutdownAllConnections(); - /* Do a best effort cleanup of shards created on workers in the above block */ - TryDropSplitShardsOnFailure(mapOfShardToPlacementCreatedByWorkflow); - + /* Do a best effort cleanup of shards created on workers in the above block + * TODO(niupre): We don't need to do this once shard cleaner can clean replication + * artifacts. + */ DropAllLogicalReplicationLeftovers(SHARD_SPLIT); - DropDummyShards(mapOfDummyShardToPlacement); - - ExecuteSplitShardReleaseSharedMemory(sourceShardToCopyNode); + bool isSuccess = false; + CompleteNewOperationNeedingCleanup(isSuccess); PG_RE_THROW(); } @@ -1743,7 +1715,7 @@ NonBlockingShardSplit(SplitOperation splitOperation, * Note 2 : Given there is an overlap of source and destination in Worker0, Shard1_1 and Shard2_1 need not be created. */ static void -CreateDummyShardsForShardGroup(HTAB *mapOfDummyShardToPlacement, +CreateDummyShardsForShardGroup(HTAB *mapOfPlacementToDummyShardList, List *sourceColocatedShardIntervalList, List *shardGroupSplitIntervalListList, WorkerNode *sourceWorkerNode, @@ -1780,13 +1752,43 @@ CreateDummyShardsForShardGroup(HTAB *mapOfDummyShardToPlacement, splitShardCreationCommandList, shardInterval->shardId); - /* Create dummy source shard on the specified placement list */ - CreateObjectOnPlacement(splitShardCreationCommandList, workerPlacementNode); + /* Log resource for cleanup in case of failure only. + * Before we log a record, do a best effort check to see if a shard with same name exists. + * This is because, it will cause shard creation to fail and we will end up cleaning the + * old shard. We don't want that. + */ + bool relationExists = CheckIfRelationWithSameNameExists(shardInterval, + workerPlacementNode); - /* Add dummy source shard entry created for placement node in map */ - AddDummyShardEntryInMap(mapOfDummyShardToPlacement, - workerPlacementNode->nodeId, - shardInterval); + if (relationExists) + { + ereport(ERROR, (errcode(ERRCODE_DUPLICATE_TABLE), + errmsg("Relation %s already exists on worker %s:%d.", + ConstructQualifiedShardName(shardInterval), + workerPlacementNode->workerName, + workerPlacementNode->workerPort))); + } + else + { + /* Log shard in pg_dist_cleanup. Given dummy shards are transient resources, + * we want to cleanup irrespective of operation success or failure. + */ + CleanupPolicy policy = CLEANUP_ALWAYS; + InsertCleanupRecordInSubtransaction(CLEANUP_SHARD_PLACEMENT, + ConstructQualifiedShardName( + shardInterval), + workerPlacementNode->groupId, + policy); + + /* Create dummy source shard on the specified placement list */ + CreateObjectOnPlacement(splitShardCreationCommandList, + workerPlacementNode); + + /* Add dummy source shard entry created for placement node in map */ + AddDummyShardEntryInMap(mapOfPlacementToDummyShardList, + workerPlacementNode->nodeId, + shardInterval); + } } } @@ -1815,12 +1817,42 @@ CreateDummyShardsForShardGroup(HTAB *mapOfDummyShardToPlacement, splitShardCreationCommandList, shardInterval->shardId); - /* Create dummy split child shard on source worker node */ - CreateObjectOnPlacement(splitShardCreationCommandList, sourceWorkerNode); + /* Log resource for cleanup in case of failure only. + * Before we log a record, do a best effort check to see if a shard with same name exists. + * This is because, it will cause shard creation to fail and we will end up cleaning the + * old shard. We don't want that. + */ + bool relationExists = CheckIfRelationWithSameNameExists(shardInterval, + sourceWorkerNode); - /* Add dummy split child shard entry created on source node */ - AddDummyShardEntryInMap(mapOfDummyShardToPlacement, sourceWorkerNode->nodeId, - shardInterval); + if (relationExists) + { + ereport(ERROR, (errcode(ERRCODE_DUPLICATE_TABLE), + errmsg("Relation %s already exists on worker %s:%d.", + ConstructQualifiedShardName(shardInterval), + sourceWorkerNode->workerName, + sourceWorkerNode->workerPort))); + } + else + { + /* Log shard in pg_dist_cleanup. Given dummy shards are transient resources, + * we want to cleanup irrespective of operation success or failure. + */ + CleanupPolicy policy = CLEANUP_ALWAYS; + InsertCleanupRecordInSubtransaction(CLEANUP_SHARD_PLACEMENT, + ConstructQualifiedShardName( + shardInterval), + sourceWorkerNode->groupId, + policy); + + /* Create dummy split child shard on source worker node */ + CreateObjectOnPlacement(splitShardCreationCommandList, sourceWorkerNode); + + /* Add dummy split child shard entry created on source node */ + AddDummyShardEntryInMap(mapOfPlacementToDummyShardList, + sourceWorkerNode->nodeId, + shardInterval); + } } } } @@ -2076,7 +2108,7 @@ ParseReplicationSlotInfoFromResult(PGresult *result) * of logical replication. We cautiously delete only the dummy shards added in the DummyShardHashMap. */ static void -AddDummyShardEntryInMap(HTAB *mapOfDummyShardToPlacement, uint32 targetNodeId, +AddDummyShardEntryInMap(HTAB *mapOfPlacementToDummyShardList, uint32 targetNodeId, ShardInterval *shardInterval) { NodeAndOwner key; @@ -2085,7 +2117,7 @@ AddDummyShardEntryInMap(HTAB *mapOfDummyShardToPlacement, uint32 targetNodeId, bool found = false; GroupedDummyShards *nodeMappingEntry = - (GroupedDummyShards *) hash_search(mapOfDummyShardToPlacement, &key, + (GroupedDummyShards *) hash_search(mapOfPlacementToDummyShardList, &key, HASH_ENTER, &found); if (!found) @@ -2098,68 +2130,6 @@ AddDummyShardEntryInMap(HTAB *mapOfDummyShardToPlacement, uint32 targetNodeId, } -/* - * DropDummyShards traverses the dummy shard map and drops shard at given node. - * It fails if the shard cannot be dropped. - */ -static void -DropDummyShards(HTAB *mapOfDummyShardToPlacement) -{ - HASH_SEQ_STATUS status; - hash_seq_init(&status, mapOfDummyShardToPlacement); - - GroupedDummyShards *entry = NULL; - while ((entry = (GroupedDummyShards *) hash_seq_search(&status)) != NULL) - { - uint32 nodeId = entry->key.nodeId; - WorkerNode *shardToBeDroppedNode = FindNodeWithNodeId(nodeId, - false /* missingOk */); - - int connectionFlags = FOR_DDL; - connectionFlags |= OUTSIDE_TRANSACTION; - MultiConnection *connection = GetNodeUserDatabaseConnection( - connectionFlags, - shardToBeDroppedNode->workerName, - shardToBeDroppedNode->workerPort, - CurrentUserName(), - NULL /* databaseName */); - - List *dummyShardIntervalList = entry->shardIntervals; - ShardInterval *shardInterval = NULL; - foreach_ptr(shardInterval, dummyShardIntervalList) - { - DropDummyShard(connection, shardInterval); - } - - CloseConnection(connection); - } -} - - -/* - * DropDummyShard drops a given shard on the node connection. - * It fails if the shard cannot be dropped. - */ -static void -DropDummyShard(MultiConnection *connection, ShardInterval *shardInterval) -{ - char *qualifiedShardName = ConstructQualifiedShardName(shardInterval); - StringInfo dropShardQuery = makeStringInfo(); - - /* Caller enforces that foreign tables cannot be split (use DROP_REGULAR_TABLE_COMMAND) */ - appendStringInfo(dropShardQuery, DROP_REGULAR_TABLE_COMMAND, - qualifiedShardName); - - /* - * Since the dummy shard is expected to be present on the given node, - * fail if it cannot be dropped during cleanup. - */ - ExecuteCriticalRemoteCommand( - connection, - dropShardQuery->data); -} - - /* * CreateReplicaIdentitiesForDummyShards creates replica indentities for split * dummy shards.