mirror of https://github.com/citusdata/citus.git
Unify distributed execution logic for single replicated tables
Citus does not acquire any executor locks for shard replication == 1. With this commit, we unify this decision and exit early.pull/5405/head
parent
20f3248b6e
commit
d5e89b1132
|
@ -632,6 +632,7 @@ static void CleanUpSessions(DistributedExecution *execution);
|
||||||
|
|
||||||
static void LockPartitionsForDistributedPlan(DistributedPlan *distributedPlan);
|
static void LockPartitionsForDistributedPlan(DistributedPlan *distributedPlan);
|
||||||
static void AcquireExecutorShardLocksForExecution(DistributedExecution *execution);
|
static void AcquireExecutorShardLocksForExecution(DistributedExecution *execution);
|
||||||
|
static bool ModifiedTableReplicated(List *taskList);
|
||||||
static bool DistributedExecutionModifiesDatabase(DistributedExecution *execution);
|
static bool DistributedExecutionModifiesDatabase(DistributedExecution *execution);
|
||||||
static bool TaskListModifiesDatabase(RowModifyLevel modLevel, List *taskList);
|
static bool TaskListModifiesDatabase(RowModifyLevel modLevel, List *taskList);
|
||||||
static bool DistributedExecutionRequiresRollback(List *taskList);
|
static bool DistributedExecutionRequiresRollback(List *taskList);
|
||||||
|
@ -1556,6 +1557,13 @@ LockPartitionsForDistributedPlan(DistributedPlan *distributedPlan)
|
||||||
*
|
*
|
||||||
* The second case prevents deadlocks due to out-of-order execution.
|
* The second case prevents deadlocks due to out-of-order execution.
|
||||||
*
|
*
|
||||||
|
* There are two GUCs that can override the default behaviors.
|
||||||
|
* 'citus.all_modifications_commutative' relaxes locking
|
||||||
|
* that's done for the purpose of keeping replicas consistent.
|
||||||
|
* 'citus.enable_deadlock_prevention' relaxes locking done for
|
||||||
|
* the purpose of avoiding deadlocks between concurrent
|
||||||
|
* multi-shard commands.
|
||||||
|
*
|
||||||
* We do not take executor shard locks for utility commands such as
|
* We do not take executor shard locks for utility commands such as
|
||||||
* TRUNCATE because the table locks already prevent concurrent access.
|
* TRUNCATE because the table locks already prevent concurrent access.
|
||||||
*/
|
*/
|
||||||
|
@ -1577,22 +1585,208 @@ AcquireExecutorShardLocksForExecution(DistributedExecution *execution)
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
bool requiresParallelExecutionLocks =
|
||||||
* When executing in sequential mode or only executing a single task, we
|
!(list_length(taskList) == 1 || ShouldRunTasksSequentially(taskList));
|
||||||
* do not need multi-shard locks.
|
|
||||||
*/
|
bool modifiedTableReplicated = ModifiedTableReplicated(taskList);
|
||||||
if (list_length(taskList) == 1 || ShouldRunTasksSequentially(taskList))
|
if (!modifiedTableReplicated && !requiresParallelExecutionLocks)
|
||||||
{
|
{
|
||||||
|
/*
|
||||||
|
* When a distributed query on tables with replication
|
||||||
|
* factor == 1 and command hits only a single shard, we
|
||||||
|
* rely on Postgres to handle the serialization of the
|
||||||
|
* concurrent modifications on the workers.
|
||||||
|
*
|
||||||
|
* For reference tables, even if their placements are replicated
|
||||||
|
* ones (e.g., single node), we acquire the distributed execution
|
||||||
|
* locks to be consistent when new node(s) are added. So, they
|
||||||
|
* do not return at this point.
|
||||||
|
*/
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* We first assume that all the remaining modifications are going to
|
||||||
|
* be serialized. So, start with an ExclusiveLock and lower the lock level
|
||||||
|
* as much as possible.
|
||||||
|
*/
|
||||||
|
int lockMode = ExclusiveLock;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* In addition to honouring commutativity rules, we currently only
|
||||||
|
* allow a single multi-shard command on a shard at a time. Otherwise,
|
||||||
|
* concurrent multi-shard commands may take row-level locks on the
|
||||||
|
* shard placements in a different order and create a distributed
|
||||||
|
* deadlock. This applies even when writes are commutative and/or
|
||||||
|
* there is no replication. This can be relaxed via
|
||||||
|
* EnableDeadlockPrevention.
|
||||||
|
*
|
||||||
|
* 1. If citus.all_modifications_commutative is set to true, then all locks
|
||||||
|
* are acquired as RowExclusiveLock.
|
||||||
|
*
|
||||||
|
* 2. If citus.all_modifications_commutative is false, then only the shards
|
||||||
|
* with more than one replicas are locked with ExclusiveLock. Otherwise, the
|
||||||
|
* lock is acquired with ShareUpdateExclusiveLock.
|
||||||
|
*
|
||||||
|
* ShareUpdateExclusiveLock conflicts with itself such that only one
|
||||||
|
* multi-shard modification at a time is allowed on a shard. It also conflicts
|
||||||
|
* with ExclusiveLock, which ensures that updates/deletes/upserts are applied
|
||||||
|
* in the same order on all placements. It does not conflict with
|
||||||
|
* RowExclusiveLock, which is normally obtained by single-shard, commutative
|
||||||
|
* writes.
|
||||||
|
*/
|
||||||
|
if (!modifiedTableReplicated && requiresParallelExecutionLocks)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* When there is no replication then we only need to prevent
|
||||||
|
* concurrent multi-shard commands on the same shards. This is
|
||||||
|
* because concurrent, parallel commands may modify the same
|
||||||
|
* set of shards, but in different orders. The order of the
|
||||||
|
* accesses might trigger distributed deadlocks that are not
|
||||||
|
* possible to happen on non-distributed systems such
|
||||||
|
* regular Postgres.
|
||||||
|
*
|
||||||
|
* As an example, assume that we have two queries: query-1 and query-2.
|
||||||
|
* Both queries access shard-1 and shard-2. If query-1 first accesses to
|
||||||
|
* shard-1 then shard-2, and query-2 accesses shard-2 then shard-1, these
|
||||||
|
* two commands might block each other in case they modify the same rows
|
||||||
|
* (e.g., cause distributed deadlocks).
|
||||||
|
*
|
||||||
|
* In either case, ShareUpdateExclusive has the desired effect, since
|
||||||
|
* it conflicts with itself and ExclusiveLock (taken by non-commutative
|
||||||
|
* writes).
|
||||||
|
*
|
||||||
|
* However, some users find this too restrictive, so we allow them to
|
||||||
|
* reduce to a RowExclusiveLock when citus.enable_deadlock_prevention
|
||||||
|
* is enabled, which lets multi-shard modifications run in parallel as
|
||||||
|
* long as they all disable the GUC.
|
||||||
|
*/
|
||||||
|
lockMode =
|
||||||
|
EnableDeadlockPrevention ? ShareUpdateExclusiveLock : RowExclusiveLock;
|
||||||
|
|
||||||
|
if (!IsCoordinator())
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* We also skip taking a heavy-weight lock when running a multi-shard
|
||||||
|
* commands from workers, since we currently do not prevent concurrency
|
||||||
|
* across workers anyway.
|
||||||
|
*/
|
||||||
|
lockMode = RowExclusiveLock;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else if (modifiedTableReplicated)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* When we are executing distributed queries on replicated tables, our
|
||||||
|
* default behaviour is to prevent any concurrency. This is valid
|
||||||
|
* for when parallel execution is happening or not.
|
||||||
|
*
|
||||||
|
* The reason is that we cannot control the order of the placement accesses
|
||||||
|
* of two distributed queries to the same shards. The order of the accesses
|
||||||
|
* might cause the replicas of the same shard placements diverge. This is
|
||||||
|
* not possible to happen on non-distributed systems such regular Postgres.
|
||||||
|
*
|
||||||
|
* As an example, assume that we have two queries: query-1 and query-2.
|
||||||
|
* Both queries only access the placements of shard-1, say p-1 and p-2.
|
||||||
|
*
|
||||||
|
* And, assume that these queries are non-commutative, such as:
|
||||||
|
* query-1: UPDATE table SET b = 1 WHERE key = 1;
|
||||||
|
* query-2: UPDATE table SET b = 2 WHERE key = 1;
|
||||||
|
*
|
||||||
|
* If query-1 accesses to p-1 then p-2, and query-2 accesses
|
||||||
|
* p-2 then p-1, these two commands would leave the p-1 and p-2
|
||||||
|
* diverged (e.g., the values for the column "b" would be different).
|
||||||
|
*
|
||||||
|
* The only exception to this rule is the single shard commutative
|
||||||
|
* modifications, such as INSERTs. In that case, we can allow
|
||||||
|
* concurrency among such backends, hence lowering the lock level
|
||||||
|
* to RowExclusiveLock.
|
||||||
|
*/
|
||||||
|
if (!requiresParallelExecutionLocks && modLevel < ROW_MODIFY_NONCOMMUTATIVE)
|
||||||
|
{
|
||||||
|
lockMode = RowExclusiveLock;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (AllModificationsCommutative)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* The mapping is overridden when all_modifications_commutative is set to true.
|
||||||
|
* In that case, all modifications are treated as commutative, which can be used
|
||||||
|
* to communicate that the application is only generating commutative
|
||||||
|
* UPDATE/DELETE/UPSERT commands and exclusive locks are unnecessary. This
|
||||||
|
* is irrespective of single-shard/multi-shard or replicated tables.
|
||||||
|
*/
|
||||||
|
lockMode = RowExclusiveLock;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* now, iterate on the tasks and acquire the executor locks on the shards */
|
||||||
Task *task = NULL;
|
Task *task = NULL;
|
||||||
foreach_ptr(task, taskList)
|
foreach_ptr(task, taskList)
|
||||||
{
|
{
|
||||||
AcquireExecutorShardLocks(task, modLevel);
|
/*
|
||||||
}
|
* If we are dealing with a partition we are also taking locks on parent table
|
||||||
}
|
* to prevent deadlocks on concurrent operations on a partition and its parent.
|
||||||
else if (list_length(taskList) > 1)
|
*/
|
||||||
|
LockParentShardResourceIfPartition(task->anchorShardId, lockMode);
|
||||||
|
|
||||||
|
ShardInterval *anchorShardInterval = LoadShardInterval(task->anchorShardId);
|
||||||
|
SerializeNonCommutativeWrites(list_make1(anchorShardInterval), lockMode);
|
||||||
|
|
||||||
|
/* Acquire additional locks for SELECT .. FOR UPDATE on reference tables */
|
||||||
|
AcquireExecutorShardLocksForRelationRowLockList(task->relationRowLockList);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* If the task has a subselect, then we may need to lock the shards from which
|
||||||
|
* the query selects as well to prevent the subselects from seeing different
|
||||||
|
* results on different replicas.
|
||||||
|
*/
|
||||||
|
if (RequiresConsistentSnapshot(task))
|
||||||
{
|
{
|
||||||
AcquireExecutorMultiShardLocks(taskList);
|
/*
|
||||||
|
* ExclusiveLock conflicts with all lock types used by modifications
|
||||||
|
* and therefore prevents other modifications from running
|
||||||
|
* concurrently.
|
||||||
|
*/
|
||||||
|
|
||||||
|
LockRelationShardResources(task->relationShardList, ExclusiveLock);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* ModifiedTableReplicated iterates on the task list and returns true
|
||||||
|
* if any of the tasks' anchor shard is a replicated table. We qualify
|
||||||
|
* replicated tables as any reference table or any distributed table with
|
||||||
|
* replication factor > 1.
|
||||||
|
*/
|
||||||
|
static bool
|
||||||
|
ModifiedTableReplicated(List *taskList)
|
||||||
|
{
|
||||||
|
Task *task = NULL;
|
||||||
|
foreach_ptr(task, taskList)
|
||||||
|
{
|
||||||
|
int64 shardId = task->anchorShardId;
|
||||||
|
|
||||||
|
if (shardId == INVALID_SHARD_ID)
|
||||||
|
{
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (ReferenceTableShardId(shardId))
|
||||||
|
{
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
Oid relationId = RelationIdForShard(shardId);
|
||||||
|
if (!SingleReplicatedTable(relationId))
|
||||||
|
{
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -19,161 +19,12 @@
|
||||||
#include "distributed/transaction_management.h"
|
#include "distributed/transaction_management.h"
|
||||||
|
|
||||||
|
|
||||||
static bool RequiresConsistentSnapshot(Task *task);
|
|
||||||
static void AcquireExecutorShardLockForRowModify(Task *task, RowModifyLevel modLevel);
|
|
||||||
static void AcquireExecutorShardLocksForRelationRowLockList(List *relationRowLockList);
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* AcquireExecutorShardLocks acquires locks on shards for the given task if
|
|
||||||
* necessary to avoid divergence between multiple replicas of the same shard.
|
|
||||||
* No lock is obtained when there is only one replica.
|
|
||||||
*
|
|
||||||
* The function determines the appropriate lock mode based on the commutativity
|
|
||||||
* rule of the command. In each case, it uses a lock mode that enforces the
|
|
||||||
* commutativity rule.
|
|
||||||
*
|
|
||||||
* The mapping is overridden when all_modifications_commutative is set to true.
|
|
||||||
* In that case, all modifications are treated as commutative, which can be used
|
|
||||||
* to communicate that the application is only generating commutative
|
|
||||||
* UPDATE/DELETE/UPSERT commands and exclusive locks are unnecessary.
|
|
||||||
*/
|
|
||||||
void
|
|
||||||
AcquireExecutorShardLocks(Task *task, RowModifyLevel modLevel)
|
|
||||||
{
|
|
||||||
AcquireExecutorShardLockForRowModify(task, modLevel);
|
|
||||||
AcquireExecutorShardLocksForRelationRowLockList(task->relationRowLockList);
|
|
||||||
|
|
||||||
/*
|
|
||||||
* If the task has a subselect, then we may need to lock the shards from which
|
|
||||||
* the query selects as well to prevent the subselects from seeing different
|
|
||||||
* results on different replicas. In particular this prevents INSERT.. SELECT
|
|
||||||
* commands from having a different effect on different placements.
|
|
||||||
*/
|
|
||||||
if (RequiresConsistentSnapshot(task))
|
|
||||||
{
|
|
||||||
/*
|
|
||||||
* ExclusiveLock conflicts with all lock types used by modifications
|
|
||||||
* and therefore prevents other modifications from running
|
|
||||||
* concurrently.
|
|
||||||
*/
|
|
||||||
|
|
||||||
LockRelationShardResources(task->relationShardList, ExclusiveLock);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* AcquireExecutorMultiShardLocks acquires shard locks needed for execution
|
|
||||||
* of writes on multiple shards. In addition to honouring commutativity
|
|
||||||
* rules, we currently only allow a single multi-shard command on a shard at
|
|
||||||
* a time. Otherwise, concurrent multi-shard commands may take row-level
|
|
||||||
* locks on the shard placements in a different order and create a distributed
|
|
||||||
* deadlock. This applies even when writes are commutative and/or there is
|
|
||||||
* no replication.
|
|
||||||
*
|
|
||||||
* 1. If citus.all_modifications_commutative is set to true, then all locks
|
|
||||||
* are acquired as ShareUpdateExclusiveLock.
|
|
||||||
*
|
|
||||||
* 2. If citus.all_modifications_commutative is false, then only the shards
|
|
||||||
* with 2 or more replicas are locked with ExclusiveLock. Otherwise, the
|
|
||||||
* lock is acquired with ShareUpdateExclusiveLock.
|
|
||||||
*
|
|
||||||
* ShareUpdateExclusiveLock conflicts with itself such that only one
|
|
||||||
* multi-shard modification at a time is allowed on a shard. It also conflicts
|
|
||||||
* with ExclusiveLock, which ensures that updates/deletes/upserts are applied
|
|
||||||
* in the same order on all placements. It does not conflict with
|
|
||||||
* RowExclusiveLock, which is normally obtained by single-shard, commutative
|
|
||||||
* writes.
|
|
||||||
*/
|
|
||||||
void
|
|
||||||
AcquireExecutorMultiShardLocks(List *taskList)
|
|
||||||
{
|
|
||||||
Task *task = NULL;
|
|
||||||
foreach_ptr(task, taskList)
|
|
||||||
{
|
|
||||||
LOCKMODE lockMode = NoLock;
|
|
||||||
|
|
||||||
if (task->anchorShardId == INVALID_SHARD_ID)
|
|
||||||
{
|
|
||||||
/* no shard locks to take if the task is not anchored to a shard */
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (AllModificationsCommutative || list_length(task->taskPlacementList) == 1)
|
|
||||||
{
|
|
||||||
/*
|
|
||||||
* When all writes are commutative then we only need to prevent multi-shard
|
|
||||||
* commands from running concurrently with each other and with commands
|
|
||||||
* that are explicitly non-commutative. When there is no replication then
|
|
||||||
* we only need to prevent concurrent multi-shard commands.
|
|
||||||
*
|
|
||||||
* In either case, ShareUpdateExclusive has the desired effect, since
|
|
||||||
* it conflicts with itself and ExclusiveLock (taken by non-commutative
|
|
||||||
* writes).
|
|
||||||
*
|
|
||||||
* However, some users find this too restrictive, so we allow them to
|
|
||||||
* reduce to a RowExclusiveLock when citus.enable_deadlock_prevention
|
|
||||||
* is enabled, which lets multi-shard modifications run in parallel as
|
|
||||||
* long as they all disable the GUC.
|
|
||||||
*
|
|
||||||
* We also skip taking a heavy-weight lock when running a multi-shard
|
|
||||||
* commands from workers, since we cannot prevent concurrency across
|
|
||||||
* workers anyway.
|
|
||||||
*/
|
|
||||||
|
|
||||||
if (EnableDeadlockPrevention && IsCoordinator())
|
|
||||||
{
|
|
||||||
lockMode = ShareUpdateExclusiveLock;
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
lockMode = RowExclusiveLock;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
/*
|
|
||||||
* When there is replication, prevent all concurrent writes to the same
|
|
||||||
* shards to ensure the writes are ordered.
|
|
||||||
*/
|
|
||||||
|
|
||||||
lockMode = ExclusiveLock;
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
|
||||||
* If we are dealing with a partition we are also taking locks on parent table
|
|
||||||
* to prevent deadlocks on concurrent operations on a partition and its parent.
|
|
||||||
*/
|
|
||||||
LockParentShardResourceIfPartition(task->anchorShardId, lockMode);
|
|
||||||
LockShardResource(task->anchorShardId, lockMode);
|
|
||||||
|
|
||||||
/*
|
|
||||||
* If the task has a subselect, then we may need to lock the shards from which
|
|
||||||
* the query selects as well to prevent the subselects from seeing different
|
|
||||||
* results on different replicas.
|
|
||||||
*/
|
|
||||||
|
|
||||||
if (RequiresConsistentSnapshot(task))
|
|
||||||
{
|
|
||||||
/*
|
|
||||||
* ExclusiveLock conflicts with all lock types used by modifications
|
|
||||||
* and therefore prevents other modifications from running
|
|
||||||
* concurrently.
|
|
||||||
*/
|
|
||||||
|
|
||||||
LockRelationShardResources(task->relationShardList, ExclusiveLock);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* RequiresConsistentSnapshot returns true if the given task need to take
|
* RequiresConsistentSnapshot returns true if the given task need to take
|
||||||
* the necessary locks to ensure that a subquery in the modify query
|
* the necessary locks to ensure that a subquery in the modify query
|
||||||
* returns the same output for all task placements.
|
* returns the same output for all task placements.
|
||||||
*/
|
*/
|
||||||
static bool
|
bool
|
||||||
RequiresConsistentSnapshot(Task *task)
|
RequiresConsistentSnapshot(Task *task)
|
||||||
{
|
{
|
||||||
bool requiresIsolation = false;
|
bool requiresIsolation = false;
|
||||||
|
@ -246,113 +97,7 @@ AcquireMetadataLocks(List *taskList)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
static void
|
void
|
||||||
AcquireExecutorShardLockForRowModify(Task *task, RowModifyLevel modLevel)
|
|
||||||
{
|
|
||||||
LOCKMODE lockMode = NoLock;
|
|
||||||
int64 shardId = task->anchorShardId;
|
|
||||||
|
|
||||||
if (shardId == INVALID_SHARD_ID)
|
|
||||||
{
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (modLevel <= ROW_MODIFY_READONLY)
|
|
||||||
{
|
|
||||||
/*
|
|
||||||
* The executor shard lock is used to maintain consistency between
|
|
||||||
* replicas and therefore no lock is required for read-only queries
|
|
||||||
* or in general when there is only one replica.
|
|
||||||
*/
|
|
||||||
|
|
||||||
lockMode = NoLock;
|
|
||||||
}
|
|
||||||
else if (list_length(task->taskPlacementList) == 1)
|
|
||||||
{
|
|
||||||
if (task->replicationModel == REPLICATION_MODEL_2PC)
|
|
||||||
{
|
|
||||||
/*
|
|
||||||
* While we don't need a lock to ensure writes are applied in
|
|
||||||
* a consistent order when there is a single replica. We also use
|
|
||||||
* shard resource locks as a crude implementation of SELECT..
|
|
||||||
* FOR UPDATE on reference tables, so we should always take
|
|
||||||
* a lock that conflicts with the FOR UPDATE/SHARE locks.
|
|
||||||
*/
|
|
||||||
lockMode = RowExclusiveLock;
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
/*
|
|
||||||
* When there is no replication, the worker itself can decide on
|
|
||||||
* on the order in which writes are applied.
|
|
||||||
*/
|
|
||||||
lockMode = NoLock;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
else if (AllModificationsCommutative)
|
|
||||||
{
|
|
||||||
/*
|
|
||||||
* Bypass commutativity checks when citus.all_modifications_commutative
|
|
||||||
* is enabled.
|
|
||||||
*
|
|
||||||
* A RowExclusiveLock does not conflict with itself and therefore allows
|
|
||||||
* multiple commutative commands to proceed concurrently. It does
|
|
||||||
* conflict with ExclusiveLock, which may still be obtained by another
|
|
||||||
* session that executes an UPDATE/DELETE/UPSERT command with
|
|
||||||
* citus.all_modifications_commutative disabled.
|
|
||||||
*/
|
|
||||||
|
|
||||||
lockMode = RowExclusiveLock;
|
|
||||||
}
|
|
||||||
else if (modLevel < ROW_MODIFY_NONCOMMUTATIVE)
|
|
||||||
{
|
|
||||||
/*
|
|
||||||
* An INSERT commutes with other INSERT commands, since performing them
|
|
||||||
* out-of-order only affects the table order on disk, but not the
|
|
||||||
* contents.
|
|
||||||
*
|
|
||||||
* When a unique constraint exists, INSERTs are not strictly commutative,
|
|
||||||
* but whichever INSERT comes last will error out and thus has no effect.
|
|
||||||
* INSERT is not commutative with UPDATE/DELETE/UPSERT, since the
|
|
||||||
* UPDATE/DELETE/UPSERT may consider the INSERT, depending on execution
|
|
||||||
* order.
|
|
||||||
*
|
|
||||||
* A RowExclusiveLock does not conflict with itself and therefore allows
|
|
||||||
* multiple INSERT commands to proceed concurrently. It conflicts with
|
|
||||||
* ExclusiveLock obtained by UPDATE/DELETE/UPSERT, ensuring those do
|
|
||||||
* not run concurrently with INSERT.
|
|
||||||
*/
|
|
||||||
|
|
||||||
lockMode = RowExclusiveLock;
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
/*
|
|
||||||
* UPDATE/DELETE/UPSERT commands do not commute with other modifications
|
|
||||||
* since the rows modified by one command may be affected by the outcome
|
|
||||||
* of another command.
|
|
||||||
*
|
|
||||||
* We need to handle upsert before INSERT, because PostgreSQL models
|
|
||||||
* upsert commands as INSERT with an ON CONFLICT section.
|
|
||||||
*
|
|
||||||
* ExclusiveLock conflicts with all lock types used by modifications
|
|
||||||
* and therefore prevents other modifications from running
|
|
||||||
* concurrently.
|
|
||||||
*/
|
|
||||||
|
|
||||||
lockMode = ExclusiveLock;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (lockMode != NoLock)
|
|
||||||
{
|
|
||||||
ShardInterval *shardInterval = LoadShardInterval(shardId);
|
|
||||||
|
|
||||||
SerializeNonCommutativeWrites(list_make1(shardInterval), lockMode);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
static void
|
|
||||||
AcquireExecutorShardLocksForRelationRowLockList(List *relationRowLockList)
|
AcquireExecutorShardLocksForRelationRowLockList(List *relationRowLockList)
|
||||||
{
|
{
|
||||||
LOCKMODE rowLockMode = NoLock;
|
LOCKMODE rowLockMode = NoLock;
|
||||||
|
|
|
@ -16,8 +16,8 @@
|
||||||
#include "storage/lockdefs.h"
|
#include "storage/lockdefs.h"
|
||||||
#include "distributed/multi_physical_planner.h"
|
#include "distributed/multi_physical_planner.h"
|
||||||
|
|
||||||
extern void AcquireExecutorShardLocks(Task *task, RowModifyLevel modLevel);
|
extern void AcquireExecutorShardLocksForRelationRowLockList(List *relationRowLockList);
|
||||||
extern void AcquireExecutorMultiShardLocks(List *taskList);
|
extern bool RequiresConsistentSnapshot(Task *task);
|
||||||
extern void AcquireMetadataLocks(List *taskList);
|
extern void AcquireMetadataLocks(List *taskList);
|
||||||
extern void LockPartitionsInRelationList(List *relationIdList, LOCKMODE lockmode);
|
extern void LockPartitionsInRelationList(List *relationIdList, LOCKMODE lockmode);
|
||||||
extern void LockPartitionRelations(Oid relationId, LOCKMODE lockMode);
|
extern void LockPartitionRelations(Oid relationId, LOCKMODE lockMode);
|
||||||
|
|
Loading…
Reference in New Issue