diff --git a/src/backend/distributed/connection/placement_connection.c b/src/backend/distributed/connection/placement_connection.c index 8e946403b..8d39afda0 100644 --- a/src/backend/distributed/connection/placement_connection.c +++ b/src/backend/distributed/connection/placement_connection.c @@ -308,7 +308,7 @@ StartPlacementListConnection(uint32 flags, List *placementAccessList, chosenConnection = StartNodeUserDatabaseConnection(flags, nodeName, nodePort, userName, NULL); - if (flags & CONNECTION_PER_PLACEMENT && + if ((flags & CONNECTION_PER_PLACEMENT) && ConnectionAccessedDifferentPlacement(chosenConnection, placement)) { /* diff --git a/src/backend/distributed/executor/multi_router_executor.c b/src/backend/distributed/executor/multi_router_executor.c index debc03ba3..8a2d86716 100644 --- a/src/backend/distributed/executor/multi_router_executor.c +++ b/src/backend/distributed/executor/multi_router_executor.c @@ -101,7 +101,7 @@ static List * GetModifyConnections(Task *task, bool markCritical); static int64 ExecuteModifyTasks(List *taskList, bool expectResults, ParamListInfo paramListInfo, CitusScanState *scanState); static void AcquireExecutorShardLockForRowModify(Task *task, RowModifyLevel modLevel); -static void AcquireExecutorShardLocksForRelationRowLockList(Task *task); +static void AcquireExecutorShardLocksForRelationRowLockList(List *relationRowLockList); static bool RequiresConsistentSnapshot(Task *task); static void RouterMultiModifyExecScan(CustomScanState *node); static void RouterSequentialModifyExecScan(CustomScanState *node); @@ -247,8 +247,16 @@ AcquireExecutorShardLockForRowModify(Task *task, RowModifyLevel modLevel) static void -AcquireExecutorShardLocksForRelationRowLockList(Task *task) +AcquireExecutorShardLocksForRelationRowLockList(List *relationRowLockList) { + ListCell *relationRowLockCell = NULL; + LOCKMODE rowLockMode = NoLock; + + if (relationRowLockList == NIL) + { + return; + } + /* * If lock clause exists and it effects any reference table, we need to get * lock on shard resource. Type of lock is determined by the type of row lock @@ -266,33 +274,27 @@ AcquireExecutorShardLocksForRelationRowLockList(Task *task) * with each other but conflicts with modify commands, we get ShareLock for * them. */ - if (task->relationRowLockList != NIL) + foreach(relationRowLockCell, relationRowLockList) { - ListCell *rtiLockCell = NULL; - LOCKMODE rowLockMode = NoLock; + RelationRowLock *relationRowLock = lfirst(relationRowLockCell); + LockClauseStrength rowLockStrength = relationRowLock->rowLockStrength; + Oid relationId = relationRowLock->relationId; - foreach(rtiLockCell, task->relationRowLockList) + if (PartitionMethod(relationId) == DISTRIBUTE_BY_NONE) { - RelationRowLock *relationRowLock = (RelationRowLock *) lfirst(rtiLockCell); - LockClauseStrength rowLockStrength = relationRowLock->rowLockStrength; - Oid relationId = relationRowLock->relationId; + List *shardIntervalList = LoadShardIntervalList(relationId); - if (PartitionMethod(relationId) == DISTRIBUTE_BY_NONE) + if (rowLockStrength == LCS_FORKEYSHARE || rowLockStrength == LCS_FORSHARE) { - List *shardIntervalList = LoadShardIntervalList(relationId); - - if (rowLockStrength == LCS_FORKEYSHARE || rowLockStrength == LCS_FORSHARE) - { - rowLockMode = ShareLock; - } - else if (rowLockStrength == LCS_FORNOKEYUPDATE || rowLockStrength == - LCS_FORUPDATE) - { - rowLockMode = ExclusiveLock; - } - - SerializeNonCommutativeWrites(shardIntervalList, rowLockMode); + rowLockMode = ShareLock; } + else if (rowLockStrength == LCS_FORNOKEYUPDATE || + rowLockStrength == LCS_FORUPDATE) + { + rowLockMode = ExclusiveLock; + } + + SerializeNonCommutativeWrites(shardIntervalList, rowLockMode); } } } @@ -316,7 +318,7 @@ void AcquireExecutorShardLocks(Task *task, RowModifyLevel modLevel) { AcquireExecutorShardLockForRowModify(task, modLevel); - AcquireExecutorShardLocksForRelationRowLockList(task); + AcquireExecutorShardLocksForRelationRowLockList(task->relationRowLockList); /* * If the task has a subselect, then we may need to lock the shards from which @@ -920,8 +922,6 @@ ExecuteSingleSelectTask(CitusScanState *scanState, Task *task) { placementAccessList = BuildPlacementSelectList(taskPlacement->groupId, relationShardList); - - Assert(list_length(placementAccessList) == list_length(relationShardList)); } else { @@ -1316,9 +1316,6 @@ GetModifyConnections(Task *task, bool markCritical) multiConnectionList = lappend(multiConnectionList, multiConnection); } - /* then finish in parallel */ - FinishConnectionListEstablishment(multiConnectionList); - /* and start transactions if applicable */ RemoteTransactionsBeginIfNecessary(multiConnectionList); diff --git a/src/include/distributed/multi_physical_planner.h b/src/include/distributed/multi_physical_planner.h index cabb46df0..35685f3bb 100644 --- a/src/include/distributed/multi_physical_planner.h +++ b/src/include/distributed/multi_physical_planner.h @@ -194,8 +194,23 @@ typedef struct Task TaskExecution *taskExecution; /* used by task tracker executor */ char replicationModel; /* only applies to modify tasks */ + /* + * List of struct RelationRowLock. This contains an entry for each + * query identified as a FOR [KEY] UPDATE/SHARE target. Citus + * converts PostgreSQL's RowMarkClause to RelationRowLock in + * RowLocksOnRelations(). + */ List *relationRowLockList; + bool modifyWithSubquery; + + /* + * List of struct RelationShard. This represents the mapping of relations + * in the RTE list to shard IDs for a task for the purposes of: + * - Locking: See AcquireExecutorShardLocks() + * - Deparsing: See UpdateRelationToShardNames() + * - Relation Access Tracking + */ List *relationShardList; List *rowValuesLists; /* rows to use when building multi-row INSERT */