From 0c79558cc957020415c0abf1992d73047d5627ad Mon Sep 17 00:00:00 2001 From: Onder Kalaci Date: Thu, 21 Oct 2021 16:16:20 +0200 Subject: [PATCH] Unify distributed execution logic for single replicated tables Citus does not acquire any executor locks for shard replication == 1. With this commit, we unify this decision and exit early. --- .../distributed/executor/adaptive_executor.c | 134 ++++++++++++- .../executor/distributed_execution_locks.c | 188 +----------------- .../distributed/distributed_execution_locks.h | 2 + 3 files changed, 130 insertions(+), 194 deletions(-) diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index 11e7ff8c8..bf0770d21 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -632,6 +632,7 @@ static void CleanUpSessions(DistributedExecution *execution); static void LockPartitionsForDistributedPlan(DistributedPlan *distributedPlan); static void AcquireExecutorShardLocksForExecution(DistributedExecution *execution); +static bool AnyAnchorTableIsReplicated(List *taskList); static bool DistributedExecutionModifiesDatabase(DistributedExecution *execution); static bool TaskListModifiesDatabase(RowModifyLevel modLevel, List *taskList); static bool DistributedExecutionRequiresRollback(List *taskList); @@ -1577,22 +1578,135 @@ AcquireExecutorShardLocksForExecution(DistributedExecution *execution) return; } - /* - * When executing in sequential mode or only executing a single task, we - * do not need multi-shard locks. - */ - if (list_length(taskList) == 1 || ShouldRunTasksSequentially(taskList)) + bool parallelExecutionNotPossible = + list_length(taskList) == 1 || ShouldRunTasksSequentially(taskList); + + bool anyAnchorTableIsReplicated = AnyAnchorTableIsReplicated(taskList); + if (!anyAnchorTableIsReplicated && parallelExecutionNotPossible) { - Task *task = NULL; - foreach_ptr(task, taskList) + /* + * When a distributed query on tables with replication + * replication factor == 1, we rely on Postgres to handle the + * serialization of the concurrent operations on the workers. + * + * For reference tables, even if their placements are replicated + * ones (e.g., single node), we acquire the distributed execution + * locks to be consistent when new node(s) are added. + */ + return; + } + + /* + * We first assume that all the remaining modifications are going to + * be serialized. So, start with an ExclusiveLock and lower the lock level + * as much as possible. + */ + int lockMode = ExclusiveLock; + + if (!anyAnchorTableIsReplicated && IsCoordinator()) + { + /* + * When all writes are commutative then we only need to prevent multi-shard + * commands from running concurrently with each other and with commands + * that are explicitly non-commutative. When there is no replication then + * we only need to prevent concurrent multi-shard commands. + * + * In either case, ShareUpdateExclusive has the desired effect, since + * it conflicts with itself and ExclusiveLock (taken by non-commutative + * writes). + * + * However, some users find this too restrictive, so we allow them to + * reduce to a RowExclusiveLock when citus.enable_deadlock_prevention + * is enabled, which lets multi-shard modifications run in parallel as + * long as they all disable the GUC. + * + * We also skip taking a heavy-weight lock when running a multi-shard + * commands from workers, since we currently do not prevent concurrency + * across workers anyway. + */ + lockMode = + EnableDeadlockPrevention ? ShareUpdateExclusiveLock : RowExclusiveLock; + } + + if (AllModificationsCommutative || + (parallelExecutionNotPossible && modLevel < ROW_MODIFY_NONCOMMUTATIVE)) + { + /* + * If either the user allows via a GUC or the commands are + * single shard commutative commands (e.g., INSERT), we + * lower the level to RowExclusiveLock to allow concurrency + * among the tasks. + */ + lockMode = RowExclusiveLock; + } + + /* now, iterate on the tasks and acquire the executor locks on the shards */ + Task *task = NULL; + foreach_ptr(task, taskList) + { + /* + * If we are dealing with a partition we are also taking locks on parent table + * to prevent deadlocks on concurrent operations on a partition and its parent. + */ + LockParentShardResourceIfPartition(task->anchorShardId, lockMode); + + ShardInterval *anchorShardInterval = LoadShardInterval(task->anchorShardId); + SerializeNonCommutativeWrites(list_make1(anchorShardInterval), lockMode); + + /* Acquire additional locks for SELECT .. FOR UPDATE on reference tables */ + AcquireExecutorShardLocksForRelationRowLockList(task->relationRowLockList); + + /* + * If the task has a subselect, then we may need to lock the shards from which + * the query selects as well to prevent the subselects from seeing different + * results on different replicas. + */ + if (RequiresConsistentSnapshot(task)) { - AcquireExecutorShardLocks(task, modLevel); + /* + * ExclusiveLock conflicts with all lock types used by modifications + * and therefore prevents other modifications from running + * concurrently. + */ + + LockRelationShardResources(task->relationShardList, ExclusiveLock); } } - else if (list_length(taskList) > 1) +} + + +/* + * AnyAnchorTableIsReplicated iterates on the task list and returns true + * if any of the tasks' anchor shard is a replicated table. We qualify + * replicated tables as any reference table or any distributed table with + * replication factor > 1. + */ +static bool +AnyAnchorTableIsReplicated(List *taskList) +{ + Task *task = NULL; + foreach_ptr(task, taskList) { - AcquireExecutorMultiShardLocks(taskList); + int64 shardId = task->anchorShardId; + + if (shardId == INVALID_SHARD_ID) + { + continue; + } + + if (ReferenceTableShardId(shardId)) + { + return true; + } + + Oid relationId = RelationIdForShard(shardId); + if (!SingleReplicatedTable(relationId)) + { + return true; + } } + + return false; } diff --git a/src/backend/distributed/executor/distributed_execution_locks.c b/src/backend/distributed/executor/distributed_execution_locks.c index bf581f26a..f808f2579 100644 --- a/src/backend/distributed/executor/distributed_execution_locks.c +++ b/src/backend/distributed/executor/distributed_execution_locks.c @@ -19,9 +19,7 @@ #include "distributed/transaction_management.h" -static bool RequiresConsistentSnapshot(Task *task); static void AcquireExecutorShardLockForRowModify(Task *task, RowModifyLevel modLevel); -static void AcquireExecutorShardLocksForRelationRowLockList(List *relationRowLockList); /* @@ -88,84 +86,7 @@ AcquireExecutorShardLocks(Task *task, RowModifyLevel modLevel) */ void AcquireExecutorMultiShardLocks(List *taskList) -{ - Task *task = NULL; - foreach_ptr(task, taskList) - { - LOCKMODE lockMode = NoLock; - - if (task->anchorShardId == INVALID_SHARD_ID) - { - /* no shard locks to take if the task is not anchored to a shard */ - continue; - } - - if (AllModificationsCommutative || list_length(task->taskPlacementList) == 1) - { - /* - * When all writes are commutative then we only need to prevent multi-shard - * commands from running concurrently with each other and with commands - * that are explicitly non-commutative. When there is no replication then - * we only need to prevent concurrent multi-shard commands. - * - * In either case, ShareUpdateExclusive has the desired effect, since - * it conflicts with itself and ExclusiveLock (taken by non-commutative - * writes). - * - * However, some users find this too restrictive, so we allow them to - * reduce to a RowExclusiveLock when citus.enable_deadlock_prevention - * is enabled, which lets multi-shard modifications run in parallel as - * long as they all disable the GUC. - * - * We also skip taking a heavy-weight lock when running a multi-shard - * commands from workers, since we cannot prevent concurrency across - * workers anyway. - */ - - if (EnableDeadlockPrevention && IsCoordinator()) - { - lockMode = ShareUpdateExclusiveLock; - } - else - { - lockMode = RowExclusiveLock; - } - } - else - { - /* - * When there is replication, prevent all concurrent writes to the same - * shards to ensure the writes are ordered. - */ - - lockMode = ExclusiveLock; - } - - /* - * If we are dealing with a partition we are also taking locks on parent table - * to prevent deadlocks on concurrent operations on a partition and its parent. - */ - LockParentShardResourceIfPartition(task->anchorShardId, lockMode); - LockShardResource(task->anchorShardId, lockMode); - - /* - * If the task has a subselect, then we may need to lock the shards from which - * the query selects as well to prevent the subselects from seeing different - * results on different replicas. - */ - - if (RequiresConsistentSnapshot(task)) - { - /* - * ExclusiveLock conflicts with all lock types used by modifications - * and therefore prevents other modifications from running - * concurrently. - */ - - LockRelationShardResources(task->relationShardList, ExclusiveLock); - } - } -} +{ } /* @@ -173,7 +94,7 @@ AcquireExecutorMultiShardLocks(List *taskList) * the necessary locks to ensure that a subquery in the modify query * returns the same output for all task placements. */ -static bool +bool RequiresConsistentSnapshot(Task *task) { bool requiresIsolation = false; @@ -248,111 +169,10 @@ AcquireMetadataLocks(List *taskList) static void AcquireExecutorShardLockForRowModify(Task *task, RowModifyLevel modLevel) -{ - LOCKMODE lockMode = NoLock; - int64 shardId = task->anchorShardId; - - if (shardId == INVALID_SHARD_ID) - { - return; - } - - if (modLevel <= ROW_MODIFY_READONLY) - { - /* - * The executor shard lock is used to maintain consistency between - * replicas and therefore no lock is required for read-only queries - * or in general when there is only one replica. - */ - - lockMode = NoLock; - } - else if (list_length(task->taskPlacementList) == 1) - { - if (task->replicationModel == REPLICATION_MODEL_2PC) - { - /* - * While we don't need a lock to ensure writes are applied in - * a consistent order when there is a single replica. We also use - * shard resource locks as a crude implementation of SELECT.. - * FOR UPDATE on reference tables, so we should always take - * a lock that conflicts with the FOR UPDATE/SHARE locks. - */ - lockMode = RowExclusiveLock; - } - else - { - /* - * When there is no replication, the worker itself can decide on - * on the order in which writes are applied. - */ - lockMode = NoLock; - } - } - else if (AllModificationsCommutative) - { - /* - * Bypass commutativity checks when citus.all_modifications_commutative - * is enabled. - * - * A RowExclusiveLock does not conflict with itself and therefore allows - * multiple commutative commands to proceed concurrently. It does - * conflict with ExclusiveLock, which may still be obtained by another - * session that executes an UPDATE/DELETE/UPSERT command with - * citus.all_modifications_commutative disabled. - */ - - lockMode = RowExclusiveLock; - } - else if (modLevel < ROW_MODIFY_NONCOMMUTATIVE) - { - /* - * An INSERT commutes with other INSERT commands, since performing them - * out-of-order only affects the table order on disk, but not the - * contents. - * - * When a unique constraint exists, INSERTs are not strictly commutative, - * but whichever INSERT comes last will error out and thus has no effect. - * INSERT is not commutative with UPDATE/DELETE/UPSERT, since the - * UPDATE/DELETE/UPSERT may consider the INSERT, depending on execution - * order. - * - * A RowExclusiveLock does not conflict with itself and therefore allows - * multiple INSERT commands to proceed concurrently. It conflicts with - * ExclusiveLock obtained by UPDATE/DELETE/UPSERT, ensuring those do - * not run concurrently with INSERT. - */ - - lockMode = RowExclusiveLock; - } - else - { - /* - * UPDATE/DELETE/UPSERT commands do not commute with other modifications - * since the rows modified by one command may be affected by the outcome - * of another command. - * - * We need to handle upsert before INSERT, because PostgreSQL models - * upsert commands as INSERT with an ON CONFLICT section. - * - * ExclusiveLock conflicts with all lock types used by modifications - * and therefore prevents other modifications from running - * concurrently. - */ - - lockMode = ExclusiveLock; - } - - if (lockMode != NoLock) - { - ShardInterval *shardInterval = LoadShardInterval(shardId); - - SerializeNonCommutativeWrites(list_make1(shardInterval), lockMode); - } -} +{ } -static void +void AcquireExecutorShardLocksForRelationRowLockList(List *relationRowLockList) { LOCKMODE rowLockMode = NoLock; diff --git a/src/include/distributed/distributed_execution_locks.h b/src/include/distributed/distributed_execution_locks.h index eaa51676a..548ef2a51 100644 --- a/src/include/distributed/distributed_execution_locks.h +++ b/src/include/distributed/distributed_execution_locks.h @@ -17,7 +17,9 @@ #include "distributed/multi_physical_planner.h" extern void AcquireExecutorShardLocks(Task *task, RowModifyLevel modLevel); +extern void AcquireExecutorShardLocksForRelationRowLockList(List *relationRowLockList); extern void AcquireExecutorMultiShardLocks(List *taskList); +extern bool RequiresConsistentSnapshot(Task *task); extern void AcquireMetadataLocks(List *taskList); extern void LockPartitionsInRelationList(List *relationIdList, LOCKMODE lockmode); extern void LockPartitionRelations(Oid relationId, LOCKMODE lockMode);