implement DropTaskList before introducing local DROP table execution (#3603)

pull/3604/head
Onur Tirtir 2020-03-10 19:12:44 +03:00 committed by GitHub
parent c26f99ea82
commit e902581cb6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 97 additions and 35 deletions

View File

@ -27,7 +27,9 @@
#include "commands/dbcommands.h" #include "commands/dbcommands.h"
#include "distributed/commands/utility_hook.h" #include "distributed/commands/utility_hook.h"
#include "distributed/connection_management.h" #include "distributed/connection_management.h"
#include "distributed/deparse_shard_query.h"
#include "distributed/listutils.h" #include "distributed/listutils.h"
#include "distributed/local_executor.h"
#include "distributed/master_protocol.h" #include "distributed/master_protocol.h"
#include "distributed/metadata_sync.h" #include "distributed/metadata_sync.h"
#include "distributed/multi_client_executor.h" #include "distributed/multi_client_executor.h"
@ -78,12 +80,14 @@ static List * ShardsMatchingDeleteCriteria(Oid relationId, List *shardList,
Node *deleteCriteria); Node *deleteCriteria);
static int DropShards(Oid relationId, char *schemaName, char *relationName, static int DropShards(Oid relationId, char *schemaName, char *relationName,
List *deletableShardIntervalList); List *deletableShardIntervalList);
static List * DropTaskList(Oid relationId, char *schemaName, char *relationName,
List *deletableShardIntervalList);
static void ExecuteDropShardPlacementCommandRemotely(ShardPlacement *shardPlacement, static void ExecuteDropShardPlacementCommandRemotely(ShardPlacement *shardPlacement,
const char *shardRelationName, const char *shardRelationName,
const char *dropShardPlacementCommand); const char *dropShardPlacementCommand);
static char * CreateDropShardPlacementCommand(const char *schemaName, static char * CreateDropShardPlacementCommand(const char *schemaName,
const char *shardRelationName, char const char *shardRelationName,
storageType); char storageType);
/* exports for SQL callable functions */ /* exports for SQL callable functions */
@ -372,6 +376,15 @@ DropShards(Oid relationId, char *schemaName, char *relationName,
UseCoordinatedTransaction(); UseCoordinatedTransaction();
/*
* We will use below variable accross this function to decide if we can
* use local execution
*/
int32 localGroupId = GetLocalGroupId();
/* DROP table commands are currently only supported from the coordinator */
Assert(localGroupId == COORDINATOR_GROUP_ID);
/* /*
* At this point we intentionally decided to not use 2PC for reference * At this point we intentionally decided to not use 2PC for reference
* tables * tables
@ -381,50 +394,43 @@ DropShards(Oid relationId, char *schemaName, char *relationName,
CoordinatedTransactionUse2PC(); CoordinatedTransactionUse2PC();
} }
ShardInterval *shardInterval = NULL; List *dropTaskList = DropTaskList(relationId, schemaName, relationName,
foreach_ptr(shardInterval, deletableShardIntervalList) deletableShardIntervalList);
Task *task = NULL;
foreach_ptr(task, dropTaskList)
{ {
uint64 shardId = shardInterval->shardId; uint64 shardId = task->anchorShardId;
char *shardRelationName = pstrdup(relationName);
Assert(shardInterval->relationId == relationId);
/* build shard relation name */
AppendShardIdToName(&shardRelationName, shardId);
List *shardPlacementList = ShardPlacementList(shardId);
ShardPlacement *shardPlacement = NULL; ShardPlacement *shardPlacement = NULL;
foreach_ptr(shardPlacement, shardPlacementList) foreach_ptr(shardPlacement, task->taskPlacementList)
{ {
uint64 shardPlacementId = shardPlacement->placementId; uint64 shardPlacementId = shardPlacement->placementId;
int32 shardPlacementGroupId = shardPlacement->groupId;
if (shardPlacement->groupId == COORDINATOR_GROUP_ID && bool isLocalShardPlacement = (shardPlacementGroupId == localGroupId);
IsCoordinator() &&
DropSchemaOrDBInProgress()) if (isLocalShardPlacement && DropSchemaOrDBInProgress() &&
localGroupId == COORDINATOR_GROUP_ID)
{ {
/* /*
* The active DROP SCHEMA/DATABASE ... CASCADE will drop the * The active DROP SCHEMA/DATABASE ... CASCADE will drop the
* shard, if we try to drop it over another connection, we will * shard, if we try to drop it over another connection, we will
* get into a distributed deadlock. * get into a distributed deadlock. Hence, just delete the shard
* placement metadata and skip it for now.
*/ */
DeleteShardPlacementRow(shardPlacementId);
continue;
} }
else
{
char storageType = shardInterval->storageType;
const char *dropShardPlacementCommand = const char *dropShardPlacementCommand = TaskQueryString(task);
CreateDropShardPlacementCommand(schemaName, shardRelationName,
storageType);
/*
* Try to open a new connection (or use an existing one) to
* connect to target node to drop shard placement over that
* remote connection
*/
ExecuteDropShardPlacementCommandRemotely(shardPlacement, ExecuteDropShardPlacementCommandRemotely(shardPlacement,
shardRelationName, relationName,
dropShardPlacementCommand); dropShardPlacementCommand);
if (isLocalShardPlacement)
{
TransactionConnectedToLocalGroup = true;
} }
DeleteShardPlacementRow(shardPlacementId); DeleteShardPlacementRow(shardPlacementId);
@ -443,17 +449,67 @@ DropShards(Oid relationId, char *schemaName, char *relationName,
} }
/*
* DropTaskList returns a list of tasks to execute a DROP command on shard
* placements of distributed table. This is handled separately from other
* DDL commands because we handle it via the DROP trigger, which is called
* whenever a drop cascades.
*/
static List *
DropTaskList(Oid relationId, char *schemaName, char *relationName,
List *deletableShardIntervalList)
{
/* resulting task list */
List *taskList = NIL;
/* enumerate the tasks when putting them to the taskList */
int taskId = 1;
ShardInterval *shardInterval = NULL;
foreach_ptr(shardInterval, deletableShardIntervalList)
{
Assert(shardInterval->relationId == relationId);
uint64 shardId = shardInterval->shardId;
char storageType = shardInterval->storageType;
char *shardRelationName = pstrdup(relationName);
/* build shard relation name */
AppendShardIdToName(&shardRelationName, shardId);
char *dropShardPlacementCommand =
CreateDropShardPlacementCommand(schemaName, shardRelationName,
storageType);
Task *task = CitusMakeNode(Task);
task->jobId = INVALID_JOB_ID;
task->taskId = taskId++;
task->taskType = DDL_TASK;
SetTaskQueryString(task, dropShardPlacementCommand);
task->dependentTaskList = NULL;
task->replicationModel = REPLICATION_MODEL_INVALID;
task->anchorShardId = shardId;
task->taskPlacementList = ShardPlacementList(shardId);
taskList = lappend(taskList, task);
}
return taskList;
}
/* /*
* ExecuteDropShardPlacementCommandRemotely executes the given DROP shard command * ExecuteDropShardPlacementCommandRemotely executes the given DROP shard command
* via remote critical connection. * via remote critical connection.
*/ */
static void static void
ExecuteDropShardPlacementCommandRemotely(ShardPlacement *shardPlacement, ExecuteDropShardPlacementCommandRemotely(ShardPlacement *shardPlacement,
const char *shardRelationName, const char *relationName,
const char *dropShardPlacementCommand) const char *dropShardPlacementCommand)
{ {
Assert(shardPlacement != NULL); Assert(shardPlacement != NULL);
Assert(shardRelationName != NULL); Assert(relationName != NULL);
Assert(dropShardPlacementCommand != NULL); Assert(dropShardPlacementCommand != NULL);
uint32 connectionFlags = FOR_DDL; uint32 connectionFlags = FOR_DDL;
@ -470,6 +526,12 @@ ExecuteDropShardPlacementCommandRemotely(ShardPlacement *shardPlacement,
char *workerName = shardPlacement->nodeName; char *workerName = shardPlacement->nodeName;
uint32 workerPort = shardPlacement->nodePort; uint32 workerPort = shardPlacement->nodePort;
/* build shard relation name */
uint64 shardId = shardPlacement->shardId;
char *shardRelationName = pstrdup(relationName);
AppendShardIdToName(&shardRelationName, shardId);
ereport(WARNING, (errmsg("could not connect to shard \"%s\" on node " ereport(WARNING, (errmsg("could not connect to shard \"%s\" on node "
"\"%s:%u\"", shardRelationName, workerName, "\"%s:%u\"", shardRelationName, workerName,
workerPort), workerPort),