From 29f21fea596bce87f2c96607876e646423b9e3c3 Mon Sep 17 00:00:00 2001 From: Marco Slot Date: Wed, 21 Jun 2017 13:29:08 +0200 Subject: [PATCH] Use GetPlacementListConnection for multi-shard commands --- .../executor/multi_router_executor.c | 30 +--------- .../master/master_modify_multiple_shards.c | 2 +- .../transaction/multi_shard_transaction.c | 57 +++++++++++++++---- .../distributed/multi_shard_transaction.h | 3 +- 4 files changed, 48 insertions(+), 44 deletions(-) diff --git a/src/backend/distributed/executor/multi_router_executor.c b/src/backend/distributed/executor/multi_router_executor.c index 6a4beb1c8..d8cfd88db 100644 --- a/src/backend/distributed/executor/multi_router_executor.c +++ b/src/backend/distributed/executor/multi_router_executor.c @@ -86,7 +86,6 @@ static void ExecuteMultipleTasks(CitusScanState *scanState, List *taskList, bool isModificationQuery, bool expectResults); static int64 ExecuteModifyTasks(List *taskList, bool expectResults, ParamListInfo paramListInfo, CitusScanState *scanState); -static List * TaskShardIntervalList(List *taskList); static void AcquireExecutorShardLock(Task *task, CmdType commandType); static void AcquireExecutorMultiShardLocks(List *taskList); static bool RequiresConsistentSnapshot(Task *task); @@ -1020,7 +1019,6 @@ ExecuteModifyTasks(List *taskList, bool expectResults, ParamListInfo paramListIn ListCell *taskCell = NULL; Task *firstTask = NULL; int connectionFlags = 0; - List *shardIntervalList = NIL; List *affectedTupleCountList = NIL; HTAB *shardConnectionHash = NULL; bool tasksPending = true; @@ -1039,8 +1037,6 @@ ExecuteModifyTasks(List *taskList, bool expectResults, ParamListInfo paramListIn "commands"))); } - shardIntervalList = TaskShardIntervalList(taskList); - /* ensure that there are no concurrent modifications on the same shards */ AcquireExecutorMultiShardLocks(taskList); @@ -1064,8 +1060,7 @@ ExecuteModifyTasks(List *taskList, bool expectResults, ParamListInfo paramListIn } /* open connection to all relevant placements, if not already open */ - shardConnectionHash = OpenTransactionsToAllShardPlacements(shardIntervalList, - connectionFlags); + shardConnectionHash = OpenTransactionsForAllTasks(taskList, connectionFlags); XactModificationLevel = XACT_MODIFICATION_MULTI_SHARD; @@ -1203,29 +1198,6 @@ ExecuteModifyTasks(List *taskList, bool expectResults, ParamListInfo paramListIn } -/* - * TaskShardIntervalList returns a list of shard intervals for a given list of - * tasks. - */ -static List * -TaskShardIntervalList(List *taskList) -{ - ListCell *taskCell = NULL; - List *shardIntervalList = NIL; - - foreach(taskCell, taskList) - { - Task *task = (Task *) lfirst(taskCell); - int64 shardId = task->anchorShardId; - ShardInterval *shardInterval = LoadShardInterval(shardId); - - shardIntervalList = lappend(shardIntervalList, shardInterval); - } - - return shardIntervalList; -} - - /* * SendQueryInSingleRowMode sends the given query on the connection in an * asynchronous way. The function also sets the single-row mode on the diff --git a/src/backend/distributed/master/master_modify_multiple_shards.c b/src/backend/distributed/master/master_modify_multiple_shards.c index 08d9bc291..0224759d3 100644 --- a/src/backend/distributed/master/master_modify_multiple_shards.c +++ b/src/backend/distributed/master/master_modify_multiple_shards.c @@ -210,7 +210,7 @@ ModifyMultipleShardsTaskList(Query *query, List *shardIntervalList, Oid relation task = CitusMakeNode(Task); task->jobId = jobId; task->taskId = taskId++; - task->taskType = SQL_TASK; + task->taskType = MODIFY_TASK; task->queryString = shardQueryString->data; task->dependedTaskList = NULL; task->replicationModel = REPLICATION_MODEL_INVALID; diff --git a/src/backend/distributed/transaction/multi_shard_transaction.c b/src/backend/distributed/transaction/multi_shard_transaction.c index d5962f57b..c6e841dd3 100644 --- a/src/backend/distributed/transaction/multi_shard_transaction.c +++ b/src/backend/distributed/transaction/multi_shard_transaction.c @@ -17,6 +17,7 @@ #include "distributed/connection_management.h" #include "distributed/master_metadata_utility.h" #include "distributed/metadata_cache.h" +#include "distributed/multi_router_executor.h" #include "distributed/multi_shard_transaction.h" #include "distributed/placement_connection.h" #include "distributed/shardinterval_utils.h" @@ -30,25 +31,27 @@ /* - * OpenTransactionsToAllShardPlacements opens connections to all placements - * using the provided shard identifier list and returns it as a shard ID -> - * ShardConnections hash. connectionFlags can be used to specify whether - * the command is FOR_DML or FOR_DDL. + * OpenTransactionsForAllTasks opens a connection for each task, + * taking into account which shards are read and modified by the task + * to select the appopriate connection, or error out if no appropriate + * connection can be found. The set of connections is returned as an + * anchor shard ID -> ShardConnections hash. */ HTAB * -OpenTransactionsToAllShardPlacements(List *shardIntervalList, int connectionFlags) +OpenTransactionsForAllTasks(List *taskList, int connectionFlags) { HTAB *shardConnectionHash = NULL; - ListCell *shardIntervalCell = NULL; + ListCell *taskCell = NULL; List *newConnectionList = NIL; shardConnectionHash = CreateShardConnectionHash(CurrentMemoryContext); /* open connections to shards which don't have connections yet */ - foreach(shardIntervalCell, shardIntervalList) + foreach(taskCell, taskList) { - ShardInterval *shardInterval = (ShardInterval *) lfirst(shardIntervalCell); - uint64 shardId = shardInterval->shardId; + Task *task = (Task *) lfirst(taskCell); + ShardPlacementAccessType accessType = PLACEMENT_ACCESS_SELECT; + uint64 shardId = task->anchorShardId; ShardConnections *shardConnections = NULL; bool shardConnectionsFound = false; List *shardPlacementList = NIL; @@ -69,9 +72,24 @@ OpenTransactionsToAllShardPlacements(List *shardIntervalList, int connectionFlag UINT64_FORMAT, shardId))); } + if (task->taskType == MODIFY_TASK) + { + accessType = PLACEMENT_ACCESS_DML; + } + else + { + /* can only open connections for DDL and DML commands */ + Assert(task->taskType == DDL_TASK); + + accessType = PLACEMENT_ACCESS_DDL; + } + foreach(placementCell, shardPlacementList) { ShardPlacement *shardPlacement = (ShardPlacement *) lfirst(placementCell); + ShardPlacementAccess placementModification; + List *placementAccessList = NIL; + List *placementSelectList = NIL; MultiConnection *connection = NULL; WorkerNode *workerNode = FindWorkerNode(shardPlacement->nodeName, @@ -83,9 +101,24 @@ OpenTransactionsToAllShardPlacements(List *shardIntervalList, int connectionFlag shardPlacement->nodePort))); } - connection = StartPlacementConnection(connectionFlags, - shardPlacement, - NULL); + /* add placement access for modification */ + placementModification.placement = shardPlacement; + placementModification.accessType = accessType; + + placementAccessList = lappend(placementAccessList, &placementModification); + + /* add additional placement accesses for subselects (e.g. INSERT .. SELECT) */ + placementSelectList = BuildPlacementSelectList(shardPlacement->nodeName, + shardPlacement->nodePort, + task->relationShardList); + placementAccessList = list_concat(placementAccessList, placementSelectList); + + /* + * Find a connection that sees preceding writes and cannot self-deadlock, + * or error out if no such connection exists. + */ + connection = StartPlacementListConnection(connectionFlags, + placementAccessList, NULL); ClaimConnectionExclusively(connection); diff --git a/src/include/distributed/multi_shard_transaction.h b/src/include/distributed/multi_shard_transaction.h index 9f5baf0c5..1f73c5831 100644 --- a/src/include/distributed/multi_shard_transaction.h +++ b/src/include/distributed/multi_shard_transaction.h @@ -27,8 +27,7 @@ typedef struct ShardConnections } ShardConnections; -extern HTAB * OpenTransactionsToAllShardPlacements(List *shardIdList, - int connectionFlags); +extern HTAB * OpenTransactionsForAllTasks(List *taskList, int connectionFlags); extern HTAB * CreateShardConnectionHash(MemoryContext memoryContext); extern ShardConnections * GetShardHashConnections(HTAB *connectionHash, int64 shardId, bool *connectionsFound);