From 213d8419c685d9207e2654c0a7d281836dac3b1d Mon Sep 17 00:00:00 2001 From: Marco Slot Date: Mon, 17 Oct 2016 12:29:03 +0200 Subject: [PATCH] Refactor and redocument executor shard lock code --- .../executor/multi_router_executor.c | 121 ++++++++++-------- 1 file changed, 71 insertions(+), 50 deletions(-) diff --git a/src/backend/distributed/executor/multi_router_executor.c b/src/backend/distributed/executor/multi_router_executor.c index 3056bd9f9..9eb22d029 100644 --- a/src/backend/distributed/executor/multi_router_executor.c +++ b/src/backend/distributed/executor/multi_router_executor.c @@ -84,8 +84,7 @@ static bool subXactAbortAttempted = false; /* functions needed during start phase */ static void InitTransactionStateForTask(Task *task); -static LOCKMODE CommutativityRuleToLockMode(CmdType commandType, bool upsertQuery); -static void AcquireExecutorShardLock(Task *task, LOCKMODE lockMode); +static void AcquireExecutorShardLock(Task *task, CmdType commandType); static HTAB * CreateXactParticipantHash(void); /* functions needed during run phase */ @@ -126,7 +125,6 @@ static void MarkRemainingInactivePlacements(void); void RouterExecutorStart(QueryDesc *queryDesc, int eflags, Task *task) { - LOCKMODE lockMode = NoLock; EState *executorState = NULL; CmdType commandType = queryDesc->operation; @@ -174,13 +172,6 @@ RouterExecutorStart(QueryDesc *queryDesc, int eflags, Task *task) * work. */ queryDesc->planstate = (PlanState *) makeNode(MaterialState); - - lockMode = CommutativityRuleToLockMode(commandType, task->upsertQuery); - - if (lockMode != NoLock) - { - AcquireExecutorShardLock(task, lockMode); - } } @@ -241,68 +232,94 @@ InitTransactionStateForTask(Task *task) /* - * CommutativityRuleToLockMode determines the commutativity rule for the given - * command and returns the appropriate lock mode to enforce that rule. The - * function assumes a SELECT doesn't modify state and therefore is commutative - * with all other commands. The function also assumes that an INSERT commutes - * with another INSERT, but not with an UPDATE/DELETE/UPSERT; and an - * UPDATE/DELETE/UPSERT doesn't commute with an INSERT, UPDATE, DELETE or UPSERT. + * AcquireExecutorShardLock acquires a lock on the shard for the given task and + * command type if necessary to avoid divergence between multiple replicas of + * the same shard. No lock is obtained when there is only one replica. * - * Note that the above comment defines INSERT INTO ... ON CONFLICT type of queries - * as an UPSERT. Since UPSERT is not defined as a separate command type in postgres, - * we have to pass it as a second parameter to the function. + * The function determines the appropriate lock mode based on the commutativity + * rule of the command. In each case, it uses a lock mode that enforces the + * commutativity rule. * - * The above mapping is overridden entirely when all_modifications_commutative - * is set to true. In that case, all commands just claim a shared lock. This - * allows the shard repair logic to lock out modifications while permitting all - * commands to otherwise commute. + * The mapping is overridden when all_modifications_commutative is set to true. + * In that case, all modifications are treated as commutative, which can be used + * to communicate that the application is only generating commutative + * UPDATE/DELETE/UPSERT commands and exclusive locks are unnecessary. */ -static LOCKMODE -CommutativityRuleToLockMode(CmdType commandType, bool upsertQuery) +static void +AcquireExecutorShardLock(Task *task, CmdType commandType) { LOCKMODE lockMode = NoLock; + int64 shardId = task->anchorShardId; - /* bypass commutativity checks when flag enabled */ - if (AllModificationsCommutative) + if (commandType == CMD_SELECT || list_length(task->taskPlacementList) == 1) { - return ShareLock; - } + /* + * The executor shard lock is used to maintain consistency between + * replicas and therefore no lock is required for read-only queries + * or in general when there is only one replica. + */ - if (commandType == CMD_SELECT) - { lockMode = NoLock; } - else if (upsertQuery) + else if (AllModificationsCommutative) { + /* + * Bypass commutativity checks when citus.all_modifications_commutative + * is enabled. + * + * A ShareLock does not conflict with itself and therefore allows + * multiple commutative commands to proceed concurrently. It does + * conflict with ExclusiveLock, which may still be obtained by another + * session that executes an UPDATE/DELETE/UPSERT command with + * citus.all_modifications_commutative disabled. + */ + + lockMode = ShareLock; + } + else if (task->upsertQuery || commandType == CMD_UPDATE || commandType == CMD_DELETE) + { + /* + * UPDATE/DELETE/UPSERT commands do not commute with other modifications + * since the rows modified by one command may be affected by the outcome + * of another command. + * + * We need to handle upsert before INSERT, because PostgreSQL models + * upsert commands as INSERT with an ON CONFLICT section. + * + * ExclusiveLock conflicts with all lock types used by modifications + * and therefore prevents other modifications from running + * concurrently. + */ + lockMode = ExclusiveLock; } else if (commandType == CMD_INSERT) { + /* + * An INSERT commutes with other INSERT commands, since performing them + * out-of-order only affects the table order on disk, but not the + * contents. + * + * When a unique constraint exists, INSERTs are not strictly commutative, + * but whichever INSERT comes last will error out and thus has no effect. + * INSERT is not commutative with UPDATE/DELETE/UPSERT, since the + * UPDATE/DELETE/UPSERT may consider the INSERT, depending on execution + * order. + * + * A ShareLock does not conflict with itself and therefore allows + * multiple INSERT commands to proceed concurrently. It conflicts with + * ExclusiveLock obtained by UPDATE/DELETE/UPSERT, ensuring those do + * not run concurrently with INSERT. + */ + lockMode = ShareLock; } - else if (commandType == CMD_UPDATE || commandType == CMD_DELETE) - { - lockMode = ExclusiveLock; - } else { ereport(ERROR, (errmsg("unrecognized operation code: %d", (int) commandType))); } - return lockMode; -} - - -/* - * AcquireExecutorShardLock: acquire shard lock needed for execution of - * a single task within a distributed plan. - */ -static void -AcquireExecutorShardLock(Task *task, LOCKMODE lockMode) -{ - int64 shardId = task->anchorShardId; - - if (shardId != INVALID_SHARD_ID) + if (shardId != INVALID_SHARD_ID && lockMode != NoLock) { LockShardResource(shardId, lockMode); } @@ -476,6 +493,7 @@ ExecuteTaskAndStoreResults(QueryDesc *queryDesc, Task *task, bool isModificationQuery, bool expectResults) { + CmdType commandType = queryDesc->operation; TupleDesc tupleDescriptor = queryDesc->tupDesc; EState *executorState = queryDesc->estate; MaterialState *routerState = (MaterialState *) queryDesc->planstate; @@ -504,6 +522,9 @@ ExecuteTaskAndStoreResults(QueryDesc *queryDesc, Task *task, elog(DEBUG4, "query after master evaluation: %s", queryString); } + /* prevent replicas of the same shard from diverging */ + AcquireExecutorShardLock(task, commandType); + /* * Try to run the query to completion on one placement. If the query fails * attempt the query on the next placement.