diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index 11e7ff8c8..70960d92c 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -632,6 +632,7 @@ static void CleanUpSessions(DistributedExecution *execution); static void LockPartitionsForDistributedPlan(DistributedPlan *distributedPlan); static void AcquireExecutorShardLocksForExecution(DistributedExecution *execution); +static bool ModifiedTableReplicated(List *taskList); static bool DistributedExecutionModifiesDatabase(DistributedExecution *execution); static bool TaskListModifiesDatabase(RowModifyLevel modLevel, List *taskList); static bool DistributedExecutionRequiresRollback(List *taskList); @@ -1556,6 +1557,13 @@ LockPartitionsForDistributedPlan(DistributedPlan *distributedPlan) * * The second case prevents deadlocks due to out-of-order execution. * + * There are two GUCs that can override the default behaviors. + * 'citus.all_modifications_commutative' relaxes locking + * that's done for the purpose of keeping replicas consistent. + * 'citus.enable_deadlock_prevention' relaxes locking done for + * the purpose of avoiding deadlocks between concurrent + * multi-shard commands. + * * We do not take executor shard locks for utility commands such as * TRUNCATE because the table locks already prevent concurrent access. */ @@ -1577,22 +1585,208 @@ AcquireExecutorShardLocksForExecution(DistributedExecution *execution) return; } - /* - * When executing in sequential mode or only executing a single task, we - * do not need multi-shard locks. - */ - if (list_length(taskList) == 1 || ShouldRunTasksSequentially(taskList)) + bool requiresParallelExecutionLocks = + !(list_length(taskList) == 1 || ShouldRunTasksSequentially(taskList)); + + bool modifiedTableReplicated = ModifiedTableReplicated(taskList); + if (!modifiedTableReplicated && !requiresParallelExecutionLocks) { - Task *task = NULL; - foreach_ptr(task, taskList) + /* + * When a distributed query on tables with replication + * factor == 1 and command hits only a single shard, we + * rely on Postgres to handle the serialization of the + * concurrent modifications on the workers. + * + * For reference tables, even if their placements are replicated + * ones (e.g., single node), we acquire the distributed execution + * locks to be consistent when new node(s) are added. So, they + * do not return at this point. + */ + return; + } + + /* + * We first assume that all the remaining modifications are going to + * be serialized. So, start with an ExclusiveLock and lower the lock level + * as much as possible. + */ + int lockMode = ExclusiveLock; + + /* + * In addition to honouring commutativity rules, we currently only + * allow a single multi-shard command on a shard at a time. Otherwise, + * concurrent multi-shard commands may take row-level locks on the + * shard placements in a different order and create a distributed + * deadlock. This applies even when writes are commutative and/or + * there is no replication. This can be relaxed via + * EnableDeadlockPrevention. + * + * 1. If citus.all_modifications_commutative is set to true, then all locks + * are acquired as RowExclusiveLock. + * + * 2. If citus.all_modifications_commutative is false, then only the shards + * with more than one replicas are locked with ExclusiveLock. Otherwise, the + * lock is acquired with ShareUpdateExclusiveLock. + * + * ShareUpdateExclusiveLock conflicts with itself such that only one + * multi-shard modification at a time is allowed on a shard. It also conflicts + * with ExclusiveLock, which ensures that updates/deletes/upserts are applied + * in the same order on all placements. It does not conflict with + * RowExclusiveLock, which is normally obtained by single-shard, commutative + * writes. + */ + if (!modifiedTableReplicated && requiresParallelExecutionLocks) + { + /* + * When there is no replication then we only need to prevent + * concurrent multi-shard commands on the same shards. This is + * because concurrent, parallel commands may modify the same + * set of shards, but in different orders. The order of the + * accesses might trigger distributed deadlocks that are not + * possible to happen on non-distributed systems such + * regular Postgres. + * + * As an example, assume that we have two queries: query-1 and query-2. + * Both queries access shard-1 and shard-2. If query-1 first accesses to + * shard-1 then shard-2, and query-2 accesses shard-2 then shard-1, these + * two commands might block each other in case they modify the same rows + * (e.g., cause distributed deadlocks). + * + * In either case, ShareUpdateExclusive has the desired effect, since + * it conflicts with itself and ExclusiveLock (taken by non-commutative + * writes). + * + * However, some users find this too restrictive, so we allow them to + * reduce to a RowExclusiveLock when citus.enable_deadlock_prevention + * is enabled, which lets multi-shard modifications run in parallel as + * long as they all disable the GUC. + */ + lockMode = + EnableDeadlockPrevention ? ShareUpdateExclusiveLock : RowExclusiveLock; + + if (!IsCoordinator()) { - AcquireExecutorShardLocks(task, modLevel); + /* + * We also skip taking a heavy-weight lock when running a multi-shard + * commands from workers, since we currently do not prevent concurrency + * across workers anyway. + */ + lockMode = RowExclusiveLock; } } - else if (list_length(taskList) > 1) + else if (modifiedTableReplicated) { - AcquireExecutorMultiShardLocks(taskList); + /* + * When we are executing distributed queries on replicated tables, our + * default behaviour is to prevent any concurrency. This is valid + * for when parallel execution is happening or not. + * + * The reason is that we cannot control the order of the placement accesses + * of two distributed queries to the same shards. The order of the accesses + * might cause the replicas of the same shard placements diverge. This is + * not possible to happen on non-distributed systems such regular Postgres. + * + * As an example, assume that we have two queries: query-1 and query-2. + * Both queries only access the placements of shard-1, say p-1 and p-2. + * + * And, assume that these queries are non-commutative, such as: + * query-1: UPDATE table SET b = 1 WHERE key = 1; + * query-2: UPDATE table SET b = 2 WHERE key = 1; + * + * If query-1 accesses to p-1 then p-2, and query-2 accesses + * p-2 then p-1, these two commands would leave the p-1 and p-2 + * diverged (e.g., the values for the column "b" would be different). + * + * The only exception to this rule is the single shard commutative + * modifications, such as INSERTs. In that case, we can allow + * concurrency among such backends, hence lowering the lock level + * to RowExclusiveLock. + */ + if (!requiresParallelExecutionLocks && modLevel < ROW_MODIFY_NONCOMMUTATIVE) + { + lockMode = RowExclusiveLock; + } } + + if (AllModificationsCommutative) + { + /* + * The mapping is overridden when all_modifications_commutative is set to true. + * In that case, all modifications are treated as commutative, which can be used + * to communicate that the application is only generating commutative + * UPDATE/DELETE/UPSERT commands and exclusive locks are unnecessary. This + * is irrespective of single-shard/multi-shard or replicated tables. + */ + lockMode = RowExclusiveLock; + } + + /* now, iterate on the tasks and acquire the executor locks on the shards */ + Task *task = NULL; + foreach_ptr(task, taskList) + { + /* + * If we are dealing with a partition we are also taking locks on parent table + * to prevent deadlocks on concurrent operations on a partition and its parent. + */ + LockParentShardResourceIfPartition(task->anchorShardId, lockMode); + + ShardInterval *anchorShardInterval = LoadShardInterval(task->anchorShardId); + SerializeNonCommutativeWrites(list_make1(anchorShardInterval), lockMode); + + /* Acquire additional locks for SELECT .. FOR UPDATE on reference tables */ + AcquireExecutorShardLocksForRelationRowLockList(task->relationRowLockList); + + /* + * If the task has a subselect, then we may need to lock the shards from which + * the query selects as well to prevent the subselects from seeing different + * results on different replicas. + */ + if (RequiresConsistentSnapshot(task)) + { + /* + * ExclusiveLock conflicts with all lock types used by modifications + * and therefore prevents other modifications from running + * concurrently. + */ + + LockRelationShardResources(task->relationShardList, ExclusiveLock); + } + } +} + + +/* + * ModifiedTableReplicated iterates on the task list and returns true + * if any of the tasks' anchor shard is a replicated table. We qualify + * replicated tables as any reference table or any distributed table with + * replication factor > 1. + */ +static bool +ModifiedTableReplicated(List *taskList) +{ + Task *task = NULL; + foreach_ptr(task, taskList) + { + int64 shardId = task->anchorShardId; + + if (shardId == INVALID_SHARD_ID) + { + continue; + } + + if (ReferenceTableShardId(shardId)) + { + return true; + } + + Oid relationId = RelationIdForShard(shardId); + if (!SingleReplicatedTable(relationId)) + { + return true; + } + } + + return false; } diff --git a/src/backend/distributed/executor/distributed_execution_locks.c b/src/backend/distributed/executor/distributed_execution_locks.c index bf581f26a..035c6e511 100644 --- a/src/backend/distributed/executor/distributed_execution_locks.c +++ b/src/backend/distributed/executor/distributed_execution_locks.c @@ -19,161 +19,12 @@ #include "distributed/transaction_management.h" -static bool RequiresConsistentSnapshot(Task *task); -static void AcquireExecutorShardLockForRowModify(Task *task, RowModifyLevel modLevel); -static void AcquireExecutorShardLocksForRelationRowLockList(List *relationRowLockList); - - -/* - * AcquireExecutorShardLocks acquires locks on shards for the given task if - * necessary to avoid divergence between multiple replicas of the same shard. - * No lock is obtained when there is only one replica. - * - * The function determines the appropriate lock mode based on the commutativity - * rule of the command. In each case, it uses a lock mode that enforces the - * commutativity rule. - * - * The mapping is overridden when all_modifications_commutative is set to true. - * In that case, all modifications are treated as commutative, which can be used - * to communicate that the application is only generating commutative - * UPDATE/DELETE/UPSERT commands and exclusive locks are unnecessary. - */ -void -AcquireExecutorShardLocks(Task *task, RowModifyLevel modLevel) -{ - AcquireExecutorShardLockForRowModify(task, modLevel); - AcquireExecutorShardLocksForRelationRowLockList(task->relationRowLockList); - - /* - * If the task has a subselect, then we may need to lock the shards from which - * the query selects as well to prevent the subselects from seeing different - * results on different replicas. In particular this prevents INSERT.. SELECT - * commands from having a different effect on different placements. - */ - if (RequiresConsistentSnapshot(task)) - { - /* - * ExclusiveLock conflicts with all lock types used by modifications - * and therefore prevents other modifications from running - * concurrently. - */ - - LockRelationShardResources(task->relationShardList, ExclusiveLock); - } -} - - -/* - * AcquireExecutorMultiShardLocks acquires shard locks needed for execution - * of writes on multiple shards. In addition to honouring commutativity - * rules, we currently only allow a single multi-shard command on a shard at - * a time. Otherwise, concurrent multi-shard commands may take row-level - * locks on the shard placements in a different order and create a distributed - * deadlock. This applies even when writes are commutative and/or there is - * no replication. - * - * 1. If citus.all_modifications_commutative is set to true, then all locks - * are acquired as ShareUpdateExclusiveLock. - * - * 2. If citus.all_modifications_commutative is false, then only the shards - * with 2 or more replicas are locked with ExclusiveLock. Otherwise, the - * lock is acquired with ShareUpdateExclusiveLock. - * - * ShareUpdateExclusiveLock conflicts with itself such that only one - * multi-shard modification at a time is allowed on a shard. It also conflicts - * with ExclusiveLock, which ensures that updates/deletes/upserts are applied - * in the same order on all placements. It does not conflict with - * RowExclusiveLock, which is normally obtained by single-shard, commutative - * writes. - */ -void -AcquireExecutorMultiShardLocks(List *taskList) -{ - Task *task = NULL; - foreach_ptr(task, taskList) - { - LOCKMODE lockMode = NoLock; - - if (task->anchorShardId == INVALID_SHARD_ID) - { - /* no shard locks to take if the task is not anchored to a shard */ - continue; - } - - if (AllModificationsCommutative || list_length(task->taskPlacementList) == 1) - { - /* - * When all writes are commutative then we only need to prevent multi-shard - * commands from running concurrently with each other and with commands - * that are explicitly non-commutative. When there is no replication then - * we only need to prevent concurrent multi-shard commands. - * - * In either case, ShareUpdateExclusive has the desired effect, since - * it conflicts with itself and ExclusiveLock (taken by non-commutative - * writes). - * - * However, some users find this too restrictive, so we allow them to - * reduce to a RowExclusiveLock when citus.enable_deadlock_prevention - * is enabled, which lets multi-shard modifications run in parallel as - * long as they all disable the GUC. - * - * We also skip taking a heavy-weight lock when running a multi-shard - * commands from workers, since we cannot prevent concurrency across - * workers anyway. - */ - - if (EnableDeadlockPrevention && IsCoordinator()) - { - lockMode = ShareUpdateExclusiveLock; - } - else - { - lockMode = RowExclusiveLock; - } - } - else - { - /* - * When there is replication, prevent all concurrent writes to the same - * shards to ensure the writes are ordered. - */ - - lockMode = ExclusiveLock; - } - - /* - * If we are dealing with a partition we are also taking locks on parent table - * to prevent deadlocks on concurrent operations on a partition and its parent. - */ - LockParentShardResourceIfPartition(task->anchorShardId, lockMode); - LockShardResource(task->anchorShardId, lockMode); - - /* - * If the task has a subselect, then we may need to lock the shards from which - * the query selects as well to prevent the subselects from seeing different - * results on different replicas. - */ - - if (RequiresConsistentSnapshot(task)) - { - /* - * ExclusiveLock conflicts with all lock types used by modifications - * and therefore prevents other modifications from running - * concurrently. - */ - - LockRelationShardResources(task->relationShardList, ExclusiveLock); - } - } -} - - /* * RequiresConsistentSnapshot returns true if the given task need to take * the necessary locks to ensure that a subquery in the modify query * returns the same output for all task placements. */ -static bool +bool RequiresConsistentSnapshot(Task *task) { bool requiresIsolation = false; @@ -246,113 +97,7 @@ AcquireMetadataLocks(List *taskList) } -static void -AcquireExecutorShardLockForRowModify(Task *task, RowModifyLevel modLevel) -{ - LOCKMODE lockMode = NoLock; - int64 shardId = task->anchorShardId; - - if (shardId == INVALID_SHARD_ID) - { - return; - } - - if (modLevel <= ROW_MODIFY_READONLY) - { - /* - * The executor shard lock is used to maintain consistency between - * replicas and therefore no lock is required for read-only queries - * or in general when there is only one replica. - */ - - lockMode = NoLock; - } - else if (list_length(task->taskPlacementList) == 1) - { - if (task->replicationModel == REPLICATION_MODEL_2PC) - { - /* - * While we don't need a lock to ensure writes are applied in - * a consistent order when there is a single replica. We also use - * shard resource locks as a crude implementation of SELECT.. - * FOR UPDATE on reference tables, so we should always take - * a lock that conflicts with the FOR UPDATE/SHARE locks. - */ - lockMode = RowExclusiveLock; - } - else - { - /* - * When there is no replication, the worker itself can decide on - * on the order in which writes are applied. - */ - lockMode = NoLock; - } - } - else if (AllModificationsCommutative) - { - /* - * Bypass commutativity checks when citus.all_modifications_commutative - * is enabled. - * - * A RowExclusiveLock does not conflict with itself and therefore allows - * multiple commutative commands to proceed concurrently. It does - * conflict with ExclusiveLock, which may still be obtained by another - * session that executes an UPDATE/DELETE/UPSERT command with - * citus.all_modifications_commutative disabled. - */ - - lockMode = RowExclusiveLock; - } - else if (modLevel < ROW_MODIFY_NONCOMMUTATIVE) - { - /* - * An INSERT commutes with other INSERT commands, since performing them - * out-of-order only affects the table order on disk, but not the - * contents. - * - * When a unique constraint exists, INSERTs are not strictly commutative, - * but whichever INSERT comes last will error out and thus has no effect. - * INSERT is not commutative with UPDATE/DELETE/UPSERT, since the - * UPDATE/DELETE/UPSERT may consider the INSERT, depending on execution - * order. - * - * A RowExclusiveLock does not conflict with itself and therefore allows - * multiple INSERT commands to proceed concurrently. It conflicts with - * ExclusiveLock obtained by UPDATE/DELETE/UPSERT, ensuring those do - * not run concurrently with INSERT. - */ - - lockMode = RowExclusiveLock; - } - else - { - /* - * UPDATE/DELETE/UPSERT commands do not commute with other modifications - * since the rows modified by one command may be affected by the outcome - * of another command. - * - * We need to handle upsert before INSERT, because PostgreSQL models - * upsert commands as INSERT with an ON CONFLICT section. - * - * ExclusiveLock conflicts with all lock types used by modifications - * and therefore prevents other modifications from running - * concurrently. - */ - - lockMode = ExclusiveLock; - } - - if (lockMode != NoLock) - { - ShardInterval *shardInterval = LoadShardInterval(shardId); - - SerializeNonCommutativeWrites(list_make1(shardInterval), lockMode); - } -} - - -static void +void AcquireExecutorShardLocksForRelationRowLockList(List *relationRowLockList) { LOCKMODE rowLockMode = NoLock; diff --git a/src/include/distributed/distributed_execution_locks.h b/src/include/distributed/distributed_execution_locks.h index eaa51676a..b274cd459 100644 --- a/src/include/distributed/distributed_execution_locks.h +++ b/src/include/distributed/distributed_execution_locks.h @@ -16,8 +16,8 @@ #include "storage/lockdefs.h" #include "distributed/multi_physical_planner.h" -extern void AcquireExecutorShardLocks(Task *task, RowModifyLevel modLevel); -extern void AcquireExecutorMultiShardLocks(List *taskList); +extern void AcquireExecutorShardLocksForRelationRowLockList(List *relationRowLockList); +extern bool RequiresConsistentSnapshot(Task *task); extern void AcquireMetadataLocks(List *taskList); extern void LockPartitionsInRelationList(List *relationIdList, LOCKMODE lockmode); extern void LockPartitionRelations(Oid relationId, LOCKMODE lockMode);