diff --git a/src/backend/distributed/master/master_delete_protocol.c b/src/backend/distributed/master/master_delete_protocol.c index 8cd02dfc2..8303254c8 100644 --- a/src/backend/distributed/master/master_delete_protocol.c +++ b/src/backend/distributed/master/master_delete_protocol.c @@ -27,6 +27,7 @@ #include "commands/dbcommands.h" #include "distributed/commands/utility_hook.h" #include "distributed/connection_management.h" +#include "distributed/listutils.h" #include "distributed/master_protocol.h" #include "distributed/metadata_sync.h" #include "distributed/multi_client_executor.h" @@ -77,6 +78,12 @@ static List * ShardsMatchingDeleteCriteria(Oid relationId, List *shardList, Node *deleteCriteria); static int DropShards(Oid relationId, char *schemaName, char *relationName, List *deletableShardIntervalList); +static void ExecuteDropShardPlacementCommandRemotely(ShardPlacement *shardPlacement, + const char *shardRelationName, + const char *dropShardPlacementCommand); +static char * CreateDropShardPlacementCommand(const char *schemaName, + const char *shardRelationName, char + storageType); /* exports for SQL callable functions */ @@ -326,7 +333,7 @@ master_drop_sequences(PG_FUNCTION_ARGS) /* * CheckTableSchemaNameForDrop errors out if the current user does not - * have permission to undistribute the given relation, taking into + * have permission to un-distribute the given relation, taking into * account that it may be called from the drop trigger. If the table exists, * the function rewrites the given table and schema name. */ @@ -359,93 +366,76 @@ static int DropShards(Oid relationId, char *schemaName, char *relationName, List *deletableShardIntervalList) { - ListCell *shardIntervalCell = NULL; + Assert(OidIsValid(relationId)); + Assert(schemaName != NULL); + Assert(relationName != NULL); UseCoordinatedTransaction(); - /* At this point we intentionally decided to not use 2PC for reference tables */ + /* + * At this point we intentionally decided to not use 2PC for reference + * tables + */ if (MultiShardCommitProtocol == COMMIT_PROTOCOL_2PC) { CoordinatedTransactionUse2PC(); } - foreach(shardIntervalCell, deletableShardIntervalList) + ShardInterval *shardInterval = NULL; + + foreach_ptr(shardInterval, deletableShardIntervalList) { - ListCell *shardPlacementCell = NULL; - ShardInterval *shardInterval = (ShardInterval *) lfirst(shardIntervalCell); uint64 shardId = shardInterval->shardId; char *shardRelationName = pstrdup(relationName); Assert(shardInterval->relationId == relationId); - /* Build shard relation name. */ + /* build shard relation name */ AppendShardIdToName(&shardRelationName, shardId); - char *quotedShardName = quote_qualified_identifier(schemaName, shardRelationName); List *shardPlacementList = ShardPlacementList(shardId); - foreach(shardPlacementCell, shardPlacementList) + + ShardPlacement *shardPlacement = NULL; + + foreach_ptr(shardPlacement, shardPlacementList) { - ShardPlacement *shardPlacement = - (ShardPlacement *) lfirst(shardPlacementCell); - char *workerName = shardPlacement->nodeName; - uint32 workerPort = shardPlacement->nodePort; - StringInfo workerDropQuery = makeStringInfo(); - uint32 connectionFlags = FOR_DDL; + uint64 shardPlacementId = shardPlacement->placementId; - char storageType = shardInterval->storageType; - if (storageType == SHARD_STORAGE_TABLE) - { - appendStringInfo(workerDropQuery, DROP_REGULAR_TABLE_COMMAND, - quotedShardName); - } - else if (storageType == SHARD_STORAGE_COLUMNAR || - storageType == SHARD_STORAGE_FOREIGN) - { - appendStringInfo(workerDropQuery, DROP_FOREIGN_TABLE_COMMAND, - quotedShardName); - } - - /* - * The active DROP SCHEMA/DATABASE ... CASCADE will drop the shard, if we - * try to drop it over another connection, we will get into a distributed - * deadlock. - */ if (shardPlacement->groupId == COORDINATOR_GROUP_ID && IsCoordinator() && DropSchemaOrDBInProgress()) { - DeleteShardPlacementRow(shardPlacement->placementId); - continue; + /* + * The active DROP SCHEMA/DATABASE ... CASCADE will drop the + * shard, if we try to drop it over another connection, we will + * get into a distributed deadlock. + */ } - - MultiConnection *connection = GetPlacementConnection(connectionFlags, - shardPlacement, - NULL); - - RemoteTransactionBeginIfNecessary(connection); - - if (PQstatus(connection->pgConn) != CONNECTION_OK) + else { - uint64 placementId = shardPlacement->placementId; + char storageType = shardInterval->storageType; - ereport(WARNING, (errmsg("could not connect to shard \"%s\" on node " - "\"%s:%u\"", shardRelationName, workerName, - workerPort), - errdetail("Marking this shard placement for " - "deletion"))); + const char *dropShardPlacementCommand = + CreateDropShardPlacementCommand(schemaName, shardRelationName, + storageType); - UpdateShardPlacementState(placementId, SHARD_STATE_TO_DELETE); - - continue; + /* + * Try to open a new connection (or use an existing one) to + * connect to target node to drop shard placement over that + * remote connection + */ + ExecuteDropShardPlacementCommandRemotely(shardPlacement, + shardRelationName, + dropShardPlacementCommand); } - MarkRemoteTransactionCritical(connection); - - ExecuteCriticalRemoteCommand(connection, workerDropQuery->data); - - DeleteShardPlacementRow(shardPlacement->placementId); + DeleteShardPlacementRow(shardPlacementId); } + /* + * Now that we deleted all placements of the shard (or their metadata), + * delete the shard metadata as well. + */ DeleteShardRow(shardId); } @@ -455,6 +445,89 @@ DropShards(Oid relationId, char *schemaName, char *relationName, } +/* + * ExecuteDropShardPlacementCommandRemotely executes the given DROP shard command + * via remote critical connection. + */ +static void +ExecuteDropShardPlacementCommandRemotely(ShardPlacement *shardPlacement, + const char *shardRelationName, + const char *dropShardPlacementCommand) +{ + Assert(shardPlacement != NULL); + Assert(shardRelationName != NULL); + Assert(dropShardPlacementCommand != NULL); + + uint32 connectionFlags = FOR_DDL; + MultiConnection *connection = GetPlacementConnection(connectionFlags, + shardPlacement, + NULL); + + RemoteTransactionBeginIfNecessary(connection); + + if (PQstatus(connection->pgConn) != CONNECTION_OK) + { + uint64 placementId = shardPlacement->placementId; + + char *workerName = shardPlacement->nodeName; + uint32 workerPort = shardPlacement->nodePort; + + ereport(WARNING, (errmsg("could not connect to shard \"%s\" on node " + "\"%s:%u\"", shardRelationName, workerName, + workerPort), + errdetail("Marking this shard placement for " + "deletion"))); + + UpdateShardPlacementState(placementId, SHARD_STATE_TO_DELETE); + + return; + } + + MarkRemoteTransactionCritical(connection); + + ExecuteCriticalRemoteCommand(connection, dropShardPlacementCommand); +} + + +/* + * CreateDropShardPlacementCommand function builds the DROP command to drop + * the given shard relation by qualifying it with schema name according to + * shard relation's storage type. + */ +static char * +CreateDropShardPlacementCommand(const char *schemaName, const char *shardRelationName, + char storageType) +{ + Assert(schemaName != NULL); + Assert(shardRelationName != NULL); + + StringInfo workerDropQuery = makeStringInfo(); + + const char *quotedShardName = quote_qualified_identifier(schemaName, + shardRelationName); + + /* build workerDropQuery according to shard storage type */ + if (storageType == SHARD_STORAGE_TABLE) + { + appendStringInfo(workerDropQuery, DROP_REGULAR_TABLE_COMMAND, + quotedShardName); + } + else if (storageType == SHARD_STORAGE_COLUMNAR || + storageType == SHARD_STORAGE_FOREIGN) + { + appendStringInfo(workerDropQuery, DROP_FOREIGN_TABLE_COMMAND, + quotedShardName); + } + else + { + /* no other storage type is expected here */ + Assert(false); + } + + return workerDropQuery->data; +} + + /* Checks that delete is only on one table. */ static void CheckTableCount(Query *deleteQuery)