Refactor and redocument executor shard lock code

pull/855/head
Marco Slot 2016-10-17 12:29:03 +02:00
parent bd9a433709
commit 213d8419c6
1 changed files with 71 additions and 50 deletions

View File

@ -84,8 +84,7 @@ static bool subXactAbortAttempted = false;
/* functions needed during start phase */ /* functions needed during start phase */
static void InitTransactionStateForTask(Task *task); static void InitTransactionStateForTask(Task *task);
static LOCKMODE CommutativityRuleToLockMode(CmdType commandType, bool upsertQuery); static void AcquireExecutorShardLock(Task *task, CmdType commandType);
static void AcquireExecutorShardLock(Task *task, LOCKMODE lockMode);
static HTAB * CreateXactParticipantHash(void); static HTAB * CreateXactParticipantHash(void);
/* functions needed during run phase */ /* functions needed during run phase */
@ -126,7 +125,6 @@ static void MarkRemainingInactivePlacements(void);
void void
RouterExecutorStart(QueryDesc *queryDesc, int eflags, Task *task) RouterExecutorStart(QueryDesc *queryDesc, int eflags, Task *task)
{ {
LOCKMODE lockMode = NoLock;
EState *executorState = NULL; EState *executorState = NULL;
CmdType commandType = queryDesc->operation; CmdType commandType = queryDesc->operation;
@ -174,13 +172,6 @@ RouterExecutorStart(QueryDesc *queryDesc, int eflags, Task *task)
* work. * work.
*/ */
queryDesc->planstate = (PlanState *) makeNode(MaterialState); queryDesc->planstate = (PlanState *) makeNode(MaterialState);
lockMode = CommutativityRuleToLockMode(commandType, task->upsertQuery);
if (lockMode != NoLock)
{
AcquireExecutorShardLock(task, lockMode);
}
} }
@ -241,68 +232,94 @@ InitTransactionStateForTask(Task *task)
/* /*
* CommutativityRuleToLockMode determines the commutativity rule for the given * AcquireExecutorShardLock acquires a lock on the shard for the given task and
* command and returns the appropriate lock mode to enforce that rule. The * command type if necessary to avoid divergence between multiple replicas of
* function assumes a SELECT doesn't modify state and therefore is commutative * the same shard. No lock is obtained when there is only one replica.
* with all other commands. The function also assumes that an INSERT commutes
* with another INSERT, but not with an UPDATE/DELETE/UPSERT; and an
* UPDATE/DELETE/UPSERT doesn't commute with an INSERT, UPDATE, DELETE or UPSERT.
* *
* Note that the above comment defines INSERT INTO ... ON CONFLICT type of queries * The function determines the appropriate lock mode based on the commutativity
* as an UPSERT. Since UPSERT is not defined as a separate command type in postgres, * rule of the command. In each case, it uses a lock mode that enforces the
* we have to pass it as a second parameter to the function. * commutativity rule.
* *
* The above mapping is overridden entirely when all_modifications_commutative * The mapping is overridden when all_modifications_commutative is set to true.
* is set to true. In that case, all commands just claim a shared lock. This * In that case, all modifications are treated as commutative, which can be used
* allows the shard repair logic to lock out modifications while permitting all * to communicate that the application is only generating commutative
* commands to otherwise commute. * UPDATE/DELETE/UPSERT commands and exclusive locks are unnecessary.
*/ */
static LOCKMODE static void
CommutativityRuleToLockMode(CmdType commandType, bool upsertQuery) AcquireExecutorShardLock(Task *task, CmdType commandType)
{ {
LOCKMODE lockMode = NoLock; LOCKMODE lockMode = NoLock;
int64 shardId = task->anchorShardId;
/* bypass commutativity checks when flag enabled */ if (commandType == CMD_SELECT || list_length(task->taskPlacementList) == 1)
if (AllModificationsCommutative)
{ {
return ShareLock; /*
} * The executor shard lock is used to maintain consistency between
* replicas and therefore no lock is required for read-only queries
* or in general when there is only one replica.
*/
if (commandType == CMD_SELECT)
{
lockMode = NoLock; lockMode = NoLock;
} }
else if (upsertQuery) else if (AllModificationsCommutative)
{ {
/*
* Bypass commutativity checks when citus.all_modifications_commutative
* is enabled.
*
* A ShareLock does not conflict with itself and therefore allows
* multiple commutative commands to proceed concurrently. It does
* conflict with ExclusiveLock, which may still be obtained by another
* session that executes an UPDATE/DELETE/UPSERT command with
* citus.all_modifications_commutative disabled.
*/
lockMode = ShareLock;
}
else if (task->upsertQuery || commandType == CMD_UPDATE || commandType == CMD_DELETE)
{
/*
* UPDATE/DELETE/UPSERT commands do not commute with other modifications
* since the rows modified by one command may be affected by the outcome
* of another command.
*
* We need to handle upsert before INSERT, because PostgreSQL models
* upsert commands as INSERT with an ON CONFLICT section.
*
* ExclusiveLock conflicts with all lock types used by modifications
* and therefore prevents other modifications from running
* concurrently.
*/
lockMode = ExclusiveLock; lockMode = ExclusiveLock;
} }
else if (commandType == CMD_INSERT) else if (commandType == CMD_INSERT)
{ {
/*
* An INSERT commutes with other INSERT commands, since performing them
* out-of-order only affects the table order on disk, but not the
* contents.
*
* When a unique constraint exists, INSERTs are not strictly commutative,
* but whichever INSERT comes last will error out and thus has no effect.
* INSERT is not commutative with UPDATE/DELETE/UPSERT, since the
* UPDATE/DELETE/UPSERT may consider the INSERT, depending on execution
* order.
*
* A ShareLock does not conflict with itself and therefore allows
* multiple INSERT commands to proceed concurrently. It conflicts with
* ExclusiveLock obtained by UPDATE/DELETE/UPSERT, ensuring those do
* not run concurrently with INSERT.
*/
lockMode = ShareLock; lockMode = ShareLock;
} }
else if (commandType == CMD_UPDATE || commandType == CMD_DELETE)
{
lockMode = ExclusiveLock;
}
else else
{ {
ereport(ERROR, (errmsg("unrecognized operation code: %d", (int) commandType))); ereport(ERROR, (errmsg("unrecognized operation code: %d", (int) commandType)));
} }
return lockMode; if (shardId != INVALID_SHARD_ID && lockMode != NoLock)
}
/*
* AcquireExecutorShardLock: acquire shard lock needed for execution of
* a single task within a distributed plan.
*/
static void
AcquireExecutorShardLock(Task *task, LOCKMODE lockMode)
{
int64 shardId = task->anchorShardId;
if (shardId != INVALID_SHARD_ID)
{ {
LockShardResource(shardId, lockMode); LockShardResource(shardId, lockMode);
} }
@ -476,6 +493,7 @@ ExecuteTaskAndStoreResults(QueryDesc *queryDesc, Task *task,
bool isModificationQuery, bool isModificationQuery,
bool expectResults) bool expectResults)
{ {
CmdType commandType = queryDesc->operation;
TupleDesc tupleDescriptor = queryDesc->tupDesc; TupleDesc tupleDescriptor = queryDesc->tupDesc;
EState *executorState = queryDesc->estate; EState *executorState = queryDesc->estate;
MaterialState *routerState = (MaterialState *) queryDesc->planstate; MaterialState *routerState = (MaterialState *) queryDesc->planstate;
@ -504,6 +522,9 @@ ExecuteTaskAndStoreResults(QueryDesc *queryDesc, Task *task,
elog(DEBUG4, "query after master evaluation: %s", queryString); elog(DEBUG4, "query after master evaluation: %s", queryString);
} }
/* prevent replicas of the same shard from diverging */
AcquireExecutorShardLock(task, commandType);
/* /*
* Try to run the query to completion on one placement. If the query fails * Try to run the query to completion on one placement. If the query fails
* attempt the query on the next placement. * attempt the query on the next placement.