Use GetPlacementListConnection for router DML

pull/1455/head
Marco Slot 2017-06-21 13:35:37 +02:00
parent 29f21fea59
commit 710fe8666b
1 changed files with 20 additions and 10 deletions

View File

@ -80,7 +80,7 @@ static ShardPlacementAccess * CreatePlacementAccess(ShardPlacement *placement,
static void ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, static void ExecuteSingleModifyTask(CitusScanState *scanState, Task *task,
bool expectResults); bool expectResults);
static void ExecuteSingleSelectTask(CitusScanState *scanState, Task *task); static void ExecuteSingleSelectTask(CitusScanState *scanState, Task *task);
static List * GetModifyConnections(List *taskPlacementList, bool markCritical, static List * GetModifyConnections(Task *task, bool markCritical,
bool startedInTransaction); bool startedInTransaction);
static void ExecuteMultipleTasks(CitusScanState *scanState, List *taskList, static void ExecuteMultipleTasks(CitusScanState *scanState, List *taskList,
bool isModificationQuery, bool expectResults); bool isModificationQuery, bool expectResults);
@ -781,7 +781,7 @@ ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, bool expectResult
* establish the connection, mark as critical (when modifying reference * establish the connection, mark as critical (when modifying reference
* table) and start a transaction (when in a transaction). * table) and start a transaction (when in a transaction).
*/ */
connectionList = GetModifyConnections(taskPlacementList, connectionList = GetModifyConnections(task,
taskRequiresTwoPhaseCommit, taskRequiresTwoPhaseCommit,
startedInTransaction); startedInTransaction);
@ -884,10 +884,12 @@ ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, bool expectResult
* transaction in progress. * transaction in progress.
*/ */
static List * static List *
GetModifyConnections(List *taskPlacementList, bool markCritical, bool noNewTransactions) GetModifyConnections(Task *task, bool markCritical, bool noNewTransactions)
{ {
List *taskPlacementList = task->taskPlacementList;
ListCell *taskPlacementCell = NULL; ListCell *taskPlacementCell = NULL;
List *multiConnectionList = NIL; List *multiConnectionList = NIL;
List *relationShardList = task->relationShardList;
/* first initiate connection establishment for all necessary connections */ /* first initiate connection establishment for all necessary connections */
foreach(taskPlacementCell, taskPlacementList) foreach(taskPlacementCell, taskPlacementList)
@ -895,14 +897,22 @@ GetModifyConnections(List *taskPlacementList, bool markCritical, bool noNewTrans
ShardPlacement *taskPlacement = (ShardPlacement *) lfirst(taskPlacementCell); ShardPlacement *taskPlacement = (ShardPlacement *) lfirst(taskPlacementCell);
int connectionFlags = SESSION_LIFESPAN | FOR_DML; int connectionFlags = SESSION_LIFESPAN | FOR_DML;
MultiConnection *multiConnection = NULL; MultiConnection *multiConnection = NULL;
List *placementAccessList = NIL;
ShardPlacementAccess *placementModification = NULL;
/* /* create placement accesses for placements that appear in a subselect */
* FIXME: It's not actually correct to use only one shard placement placementAccessList = BuildPlacementSelectList(taskPlacement->nodeName,
* here for router queries involving multiple relations. We should taskPlacement->nodePort,
* check that this connection is the only modifying one associated relationShardList);
* with all the involved shards.
*/ /* create placement access for the placement that we're modifying */
multiConnection = StartPlacementConnection(connectionFlags, taskPlacement, NULL); placementModification = CreatePlacementAccess(taskPlacement,
PLACEMENT_ACCESS_DML);
placementAccessList = lappend(placementAccessList, placementModification);
/* get an appropriate connection for the DML statement */
multiConnection = GetPlacementListConnection(connectionFlags, placementAccessList,
NULL);
/* /*
* If already in a transaction, disallow expanding set of remote * If already in a transaction, disallow expanding set of remote