Use GetPlacementListConnection for multi-shard commands

pull/1455/head
Marco Slot 2017-06-21 13:29:08 +02:00
parent 01c9b1f921
commit 29f21fea59
4 changed files with 48 additions and 44 deletions

View File

@ -86,7 +86,6 @@ static void ExecuteMultipleTasks(CitusScanState *scanState, List *taskList,
bool isModificationQuery, bool expectResults);
static int64 ExecuteModifyTasks(List *taskList, bool expectResults,
ParamListInfo paramListInfo, CitusScanState *scanState);
static List * TaskShardIntervalList(List *taskList);
static void AcquireExecutorShardLock(Task *task, CmdType commandType);
static void AcquireExecutorMultiShardLocks(List *taskList);
static bool RequiresConsistentSnapshot(Task *task);
@ -1020,7 +1019,6 @@ ExecuteModifyTasks(List *taskList, bool expectResults, ParamListInfo paramListIn
ListCell *taskCell = NULL;
Task *firstTask = NULL;
int connectionFlags = 0;
List *shardIntervalList = NIL;
List *affectedTupleCountList = NIL;
HTAB *shardConnectionHash = NULL;
bool tasksPending = true;
@ -1039,8 +1037,6 @@ ExecuteModifyTasks(List *taskList, bool expectResults, ParamListInfo paramListIn
"commands")));
}
shardIntervalList = TaskShardIntervalList(taskList);
/* ensure that there are no concurrent modifications on the same shards */
AcquireExecutorMultiShardLocks(taskList);
@ -1064,8 +1060,7 @@ ExecuteModifyTasks(List *taskList, bool expectResults, ParamListInfo paramListIn
}
/* open connection to all relevant placements, if not already open */
shardConnectionHash = OpenTransactionsToAllShardPlacements(shardIntervalList,
connectionFlags);
shardConnectionHash = OpenTransactionsForAllTasks(taskList, connectionFlags);
XactModificationLevel = XACT_MODIFICATION_MULTI_SHARD;
@ -1203,29 +1198,6 @@ ExecuteModifyTasks(List *taskList, bool expectResults, ParamListInfo paramListIn
}
/*
* TaskShardIntervalList returns a list of shard intervals for a given list of
* tasks.
*/
static List *
TaskShardIntervalList(List *taskList)
{
ListCell *taskCell = NULL;
List *shardIntervalList = NIL;
foreach(taskCell, taskList)
{
Task *task = (Task *) lfirst(taskCell);
int64 shardId = task->anchorShardId;
ShardInterval *shardInterval = LoadShardInterval(shardId);
shardIntervalList = lappend(shardIntervalList, shardInterval);
}
return shardIntervalList;
}
/*
* SendQueryInSingleRowMode sends the given query on the connection in an
* asynchronous way. The function also sets the single-row mode on the

View File

@ -210,7 +210,7 @@ ModifyMultipleShardsTaskList(Query *query, List *shardIntervalList, Oid relation
task = CitusMakeNode(Task);
task->jobId = jobId;
task->taskId = taskId++;
task->taskType = SQL_TASK;
task->taskType = MODIFY_TASK;
task->queryString = shardQueryString->data;
task->dependedTaskList = NULL;
task->replicationModel = REPLICATION_MODEL_INVALID;

View File

@ -17,6 +17,7 @@
#include "distributed/connection_management.h"
#include "distributed/master_metadata_utility.h"
#include "distributed/metadata_cache.h"
#include "distributed/multi_router_executor.h"
#include "distributed/multi_shard_transaction.h"
#include "distributed/placement_connection.h"
#include "distributed/shardinterval_utils.h"
@ -30,25 +31,27 @@
/*
* OpenTransactionsToAllShardPlacements opens connections to all placements
* using the provided shard identifier list and returns it as a shard ID ->
* ShardConnections hash. connectionFlags can be used to specify whether
* the command is FOR_DML or FOR_DDL.
* OpenTransactionsForAllTasks opens a connection for each task,
* taking into account which shards are read and modified by the task
* to select the appopriate connection, or error out if no appropriate
* connection can be found. The set of connections is returned as an
* anchor shard ID -> ShardConnections hash.
*/
HTAB *
OpenTransactionsToAllShardPlacements(List *shardIntervalList, int connectionFlags)
OpenTransactionsForAllTasks(List *taskList, int connectionFlags)
{
HTAB *shardConnectionHash = NULL;
ListCell *shardIntervalCell = NULL;
ListCell *taskCell = NULL;
List *newConnectionList = NIL;
shardConnectionHash = CreateShardConnectionHash(CurrentMemoryContext);
/* open connections to shards which don't have connections yet */
foreach(shardIntervalCell, shardIntervalList)
foreach(taskCell, taskList)
{
ShardInterval *shardInterval = (ShardInterval *) lfirst(shardIntervalCell);
uint64 shardId = shardInterval->shardId;
Task *task = (Task *) lfirst(taskCell);
ShardPlacementAccessType accessType = PLACEMENT_ACCESS_SELECT;
uint64 shardId = task->anchorShardId;
ShardConnections *shardConnections = NULL;
bool shardConnectionsFound = false;
List *shardPlacementList = NIL;
@ -69,9 +72,24 @@ OpenTransactionsToAllShardPlacements(List *shardIntervalList, int connectionFlag
UINT64_FORMAT, shardId)));
}
if (task->taskType == MODIFY_TASK)
{
accessType = PLACEMENT_ACCESS_DML;
}
else
{
/* can only open connections for DDL and DML commands */
Assert(task->taskType == DDL_TASK);
accessType = PLACEMENT_ACCESS_DDL;
}
foreach(placementCell, shardPlacementList)
{
ShardPlacement *shardPlacement = (ShardPlacement *) lfirst(placementCell);
ShardPlacementAccess placementModification;
List *placementAccessList = NIL;
List *placementSelectList = NIL;
MultiConnection *connection = NULL;
WorkerNode *workerNode = FindWorkerNode(shardPlacement->nodeName,
@ -83,9 +101,24 @@ OpenTransactionsToAllShardPlacements(List *shardIntervalList, int connectionFlag
shardPlacement->nodePort)));
}
connection = StartPlacementConnection(connectionFlags,
shardPlacement,
NULL);
/* add placement access for modification */
placementModification.placement = shardPlacement;
placementModification.accessType = accessType;
placementAccessList = lappend(placementAccessList, &placementModification);
/* add additional placement accesses for subselects (e.g. INSERT .. SELECT) */
placementSelectList = BuildPlacementSelectList(shardPlacement->nodeName,
shardPlacement->nodePort,
task->relationShardList);
placementAccessList = list_concat(placementAccessList, placementSelectList);
/*
* Find a connection that sees preceding writes and cannot self-deadlock,
* or error out if no such connection exists.
*/
connection = StartPlacementListConnection(connectionFlags,
placementAccessList, NULL);
ClaimConnectionExclusively(connection);

View File

@ -27,8 +27,7 @@ typedef struct ShardConnections
} ShardConnections;
extern HTAB * OpenTransactionsToAllShardPlacements(List *shardIdList,
int connectionFlags);
extern HTAB * OpenTransactionsForAllTasks(List *taskList, int connectionFlags);
extern HTAB * CreateShardConnectionHash(MemoryContext memoryContext);
extern ShardConnections * GetShardHashConnections(HTAB *connectionHash, int64 shardId,
bool *connectionsFound);