diff --git a/src/backend/distributed/master/master_delete_protocol.c b/src/backend/distributed/master/master_delete_protocol.c index 11bc3317c..1f8fe2a2c 100644 --- a/src/backend/distributed/master/master_delete_protocol.c +++ b/src/backend/distributed/master/master_delete_protocol.c @@ -27,7 +27,9 @@ #include "commands/dbcommands.h" #include "distributed/commands/utility_hook.h" #include "distributed/connection_management.h" +#include "distributed/deparse_shard_query.h" #include "distributed/listutils.h" +#include "distributed/local_executor.h" #include "distributed/master_protocol.h" #include "distributed/metadata_sync.h" #include "distributed/multi_client_executor.h" @@ -78,12 +80,14 @@ static List * ShardsMatchingDeleteCriteria(Oid relationId, List *shardList, Node *deleteCriteria); static int DropShards(Oid relationId, char *schemaName, char *relationName, List *deletableShardIntervalList); +static List * DropTaskList(Oid relationId, char *schemaName, char *relationName, + List *deletableShardIntervalList); static void ExecuteDropShardPlacementCommandRemotely(ShardPlacement *shardPlacement, const char *shardRelationName, const char *dropShardPlacementCommand); static char * CreateDropShardPlacementCommand(const char *schemaName, - const char *shardRelationName, char - storageType); + const char *shardRelationName, + char storageType); /* exports for SQL callable functions */ @@ -372,6 +376,15 @@ DropShards(Oid relationId, char *schemaName, char *relationName, UseCoordinatedTransaction(); + /* + * We will use below variable accross this function to decide if we can + * use local execution + */ + int32 localGroupId = GetLocalGroupId(); + + /* DROP table commands are currently only supported from the coordinator */ + Assert(localGroupId == COORDINATOR_GROUP_ID); + /* * At this point we intentionally decided to not use 2PC for reference * tables @@ -381,50 +394,43 @@ DropShards(Oid relationId, char *schemaName, char *relationName, CoordinatedTransactionUse2PC(); } - ShardInterval *shardInterval = NULL; - foreach_ptr(shardInterval, deletableShardIntervalList) + List *dropTaskList = DropTaskList(relationId, schemaName, relationName, + deletableShardIntervalList); + + Task *task = NULL; + foreach_ptr(task, dropTaskList) { - uint64 shardId = shardInterval->shardId; - char *shardRelationName = pstrdup(relationName); - - Assert(shardInterval->relationId == relationId); - - /* build shard relation name */ - AppendShardIdToName(&shardRelationName, shardId); - - List *shardPlacementList = ShardPlacementList(shardId); + uint64 shardId = task->anchorShardId; ShardPlacement *shardPlacement = NULL; - foreach_ptr(shardPlacement, shardPlacementList) + foreach_ptr(shardPlacement, task->taskPlacementList) { uint64 shardPlacementId = shardPlacement->placementId; + int32 shardPlacementGroupId = shardPlacement->groupId; - if (shardPlacement->groupId == COORDINATOR_GROUP_ID && - IsCoordinator() && - DropSchemaOrDBInProgress()) + bool isLocalShardPlacement = (shardPlacementGroupId == localGroupId); + + if (isLocalShardPlacement && DropSchemaOrDBInProgress() && + localGroupId == COORDINATOR_GROUP_ID) { /* * The active DROP SCHEMA/DATABASE ... CASCADE will drop the * shard, if we try to drop it over another connection, we will - * get into a distributed deadlock. + * get into a distributed deadlock. Hence, just delete the shard + * placement metadata and skip it for now. */ + DeleteShardPlacementRow(shardPlacementId); + continue; } - else + + const char *dropShardPlacementCommand = TaskQueryString(task); + ExecuteDropShardPlacementCommandRemotely(shardPlacement, + relationName, + dropShardPlacementCommand); + + if (isLocalShardPlacement) { - char storageType = shardInterval->storageType; - - const char *dropShardPlacementCommand = - CreateDropShardPlacementCommand(schemaName, shardRelationName, - storageType); - - /* - * Try to open a new connection (or use an existing one) to - * connect to target node to drop shard placement over that - * remote connection - */ - ExecuteDropShardPlacementCommandRemotely(shardPlacement, - shardRelationName, - dropShardPlacementCommand); + TransactionConnectedToLocalGroup = true; } DeleteShardPlacementRow(shardPlacementId); @@ -443,17 +449,67 @@ DropShards(Oid relationId, char *schemaName, char *relationName, } +/* + * DropTaskList returns a list of tasks to execute a DROP command on shard + * placements of distributed table. This is handled separately from other + * DDL commands because we handle it via the DROP trigger, which is called + * whenever a drop cascades. + */ +static List * +DropTaskList(Oid relationId, char *schemaName, char *relationName, + List *deletableShardIntervalList) +{ + /* resulting task list */ + List *taskList = NIL; + + /* enumerate the tasks when putting them to the taskList */ + int taskId = 1; + + ShardInterval *shardInterval = NULL; + foreach_ptr(shardInterval, deletableShardIntervalList) + { + Assert(shardInterval->relationId == relationId); + + uint64 shardId = shardInterval->shardId; + char storageType = shardInterval->storageType; + + char *shardRelationName = pstrdup(relationName); + + /* build shard relation name */ + AppendShardIdToName(&shardRelationName, shardId); + + char *dropShardPlacementCommand = + CreateDropShardPlacementCommand(schemaName, shardRelationName, + storageType); + + Task *task = CitusMakeNode(Task); + task->jobId = INVALID_JOB_ID; + task->taskId = taskId++; + task->taskType = DDL_TASK; + SetTaskQueryString(task, dropShardPlacementCommand); + task->dependentTaskList = NULL; + task->replicationModel = REPLICATION_MODEL_INVALID; + task->anchorShardId = shardId; + task->taskPlacementList = ShardPlacementList(shardId); + + taskList = lappend(taskList, task); + } + + return taskList; +} + + /* * ExecuteDropShardPlacementCommandRemotely executes the given DROP shard command * via remote critical connection. */ static void ExecuteDropShardPlacementCommandRemotely(ShardPlacement *shardPlacement, - const char *shardRelationName, + const char *relationName, const char *dropShardPlacementCommand) { Assert(shardPlacement != NULL); - Assert(shardRelationName != NULL); + Assert(relationName != NULL); Assert(dropShardPlacementCommand != NULL); uint32 connectionFlags = FOR_DDL; @@ -470,6 +526,12 @@ ExecuteDropShardPlacementCommandRemotely(ShardPlacement *shardPlacement, char *workerName = shardPlacement->nodeName; uint32 workerPort = shardPlacement->nodePort; + /* build shard relation name */ + uint64 shardId = shardPlacement->shardId; + char *shardRelationName = pstrdup(relationName); + + AppendShardIdToName(&shardRelationName, shardId); + ereport(WARNING, (errmsg("could not connect to shard \"%s\" on node " "\"%s:%u\"", shardRelationName, workerName, workerPort),