Some cleanup

pull/2883/head
Hadi Moshayedi 2019-08-12 15:38:52 -07:00
parent 03ef456c50
commit 009d8b7401
3 changed files with 42 additions and 30 deletions

View File

@ -308,7 +308,7 @@ StartPlacementListConnection(uint32 flags, List *placementAccessList,
chosenConnection = StartNodeUserDatabaseConnection(flags, nodeName, nodePort, chosenConnection = StartNodeUserDatabaseConnection(flags, nodeName, nodePort,
userName, NULL); userName, NULL);
if (flags & CONNECTION_PER_PLACEMENT && if ((flags & CONNECTION_PER_PLACEMENT) &&
ConnectionAccessedDifferentPlacement(chosenConnection, placement)) ConnectionAccessedDifferentPlacement(chosenConnection, placement))
{ {
/* /*

View File

@ -101,7 +101,7 @@ static List * GetModifyConnections(Task *task, bool markCritical);
static int64 ExecuteModifyTasks(List *taskList, bool expectResults, static int64 ExecuteModifyTasks(List *taskList, bool expectResults,
ParamListInfo paramListInfo, CitusScanState *scanState); ParamListInfo paramListInfo, CitusScanState *scanState);
static void AcquireExecutorShardLockForRowModify(Task *task, RowModifyLevel modLevel); static void AcquireExecutorShardLockForRowModify(Task *task, RowModifyLevel modLevel);
static void AcquireExecutorShardLocksForRelationRowLockList(Task *task); static void AcquireExecutorShardLocksForRelationRowLockList(List *relationRowLockList);
static bool RequiresConsistentSnapshot(Task *task); static bool RequiresConsistentSnapshot(Task *task);
static void RouterMultiModifyExecScan(CustomScanState *node); static void RouterMultiModifyExecScan(CustomScanState *node);
static void RouterSequentialModifyExecScan(CustomScanState *node); static void RouterSequentialModifyExecScan(CustomScanState *node);
@ -247,8 +247,16 @@ AcquireExecutorShardLockForRowModify(Task *task, RowModifyLevel modLevel)
static void static void
AcquireExecutorShardLocksForRelationRowLockList(Task *task) AcquireExecutorShardLocksForRelationRowLockList(List *relationRowLockList)
{ {
ListCell *relationRowLockCell = NULL;
LOCKMODE rowLockMode = NoLock;
if (relationRowLockList == NIL)
{
return;
}
/* /*
* If lock clause exists and it effects any reference table, we need to get * If lock clause exists and it effects any reference table, we need to get
* lock on shard resource. Type of lock is determined by the type of row lock * lock on shard resource. Type of lock is determined by the type of row lock
@ -266,14 +274,9 @@ AcquireExecutorShardLocksForRelationRowLockList(Task *task)
* with each other but conflicts with modify commands, we get ShareLock for * with each other but conflicts with modify commands, we get ShareLock for
* them. * them.
*/ */
if (task->relationRowLockList != NIL) foreach(relationRowLockCell, relationRowLockList)
{ {
ListCell *rtiLockCell = NULL; RelationRowLock *relationRowLock = lfirst(relationRowLockCell);
LOCKMODE rowLockMode = NoLock;
foreach(rtiLockCell, task->relationRowLockList)
{
RelationRowLock *relationRowLock = (RelationRowLock *) lfirst(rtiLockCell);
LockClauseStrength rowLockStrength = relationRowLock->rowLockStrength; LockClauseStrength rowLockStrength = relationRowLock->rowLockStrength;
Oid relationId = relationRowLock->relationId; Oid relationId = relationRowLock->relationId;
@ -285,8 +288,8 @@ AcquireExecutorShardLocksForRelationRowLockList(Task *task)
{ {
rowLockMode = ShareLock; rowLockMode = ShareLock;
} }
else if (rowLockStrength == LCS_FORNOKEYUPDATE || rowLockStrength == else if (rowLockStrength == LCS_FORNOKEYUPDATE ||
LCS_FORUPDATE) rowLockStrength == LCS_FORUPDATE)
{ {
rowLockMode = ExclusiveLock; rowLockMode = ExclusiveLock;
} }
@ -294,7 +297,6 @@ AcquireExecutorShardLocksForRelationRowLockList(Task *task)
SerializeNonCommutativeWrites(shardIntervalList, rowLockMode); SerializeNonCommutativeWrites(shardIntervalList, rowLockMode);
} }
} }
}
} }
@ -316,7 +318,7 @@ void
AcquireExecutorShardLocks(Task *task, RowModifyLevel modLevel) AcquireExecutorShardLocks(Task *task, RowModifyLevel modLevel)
{ {
AcquireExecutorShardLockForRowModify(task, modLevel); AcquireExecutorShardLockForRowModify(task, modLevel);
AcquireExecutorShardLocksForRelationRowLockList(task); AcquireExecutorShardLocksForRelationRowLockList(task->relationRowLockList);
/* /*
* If the task has a subselect, then we may need to lock the shards from which * If the task has a subselect, then we may need to lock the shards from which
@ -920,8 +922,6 @@ ExecuteSingleSelectTask(CitusScanState *scanState, Task *task)
{ {
placementAccessList = BuildPlacementSelectList(taskPlacement->groupId, placementAccessList = BuildPlacementSelectList(taskPlacement->groupId,
relationShardList); relationShardList);
Assert(list_length(placementAccessList) == list_length(relationShardList));
} }
else else
{ {
@ -1316,9 +1316,6 @@ GetModifyConnections(Task *task, bool markCritical)
multiConnectionList = lappend(multiConnectionList, multiConnection); multiConnectionList = lappend(multiConnectionList, multiConnection);
} }
/* then finish in parallel */
FinishConnectionListEstablishment(multiConnectionList);
/* and start transactions if applicable */ /* and start transactions if applicable */
RemoteTransactionsBeginIfNecessary(multiConnectionList); RemoteTransactionsBeginIfNecessary(multiConnectionList);

View File

@ -194,8 +194,23 @@ typedef struct Task
TaskExecution *taskExecution; /* used by task tracker executor */ TaskExecution *taskExecution; /* used by task tracker executor */
char replicationModel; /* only applies to modify tasks */ char replicationModel; /* only applies to modify tasks */
/*
* List of struct RelationRowLock. This contains an entry for each
* query identified as a FOR [KEY] UPDATE/SHARE target. Citus
* converts PostgreSQL's RowMarkClause to RelationRowLock in
* RowLocksOnRelations().
*/
List *relationRowLockList; List *relationRowLockList;
bool modifyWithSubquery; bool modifyWithSubquery;
/*
* List of struct RelationShard. This represents the mapping of relations
* in the RTE list to shard IDs for a task for the purposes of:
* - Locking: See AcquireExecutorShardLocks()
* - Deparsing: See UpdateRelationToShardNames()
* - Relation Access Tracking
*/
List *relationShardList; List *relationShardList;
List *rowValuesLists; /* rows to use when building multi-row INSERT */ List *rowValuesLists; /* rows to use when building multi-row INSERT */