Add read locks for making INSERT/SELECT consistent across placements

When an INSERT/SELECT command runs on a distributed table with replication
factor >1, we need to ensure that it sees the same result on each placement
of a shard. After this change, the router executor takes exclusive locks on
shards from which the SELECT in an INSERT/SELECT reads in order to prevent
concurrent changes. This is not a very optimal solution, but it's simple
and correct.

The citus.all_modifications_commutative can be used to avoid aggressive
locking. An INSERT/SELECT whose filters are known to exclude any ongoing
writes can be marked as commutative.
pull/896/head
Marco Slot 2016-10-20 11:31:12 +02:00 committed by Marco Slot
parent 2a6875d817
commit 44d1f7b950
5 changed files with 146 additions and 49 deletions

View File

@ -100,8 +100,8 @@ static int64 ExecuteModifyTasks(List *taskList, bool expectResults,
TupleDesc tupleDescriptor);
static List * TaskShardIntervalList(List *taskList);
static void AcquireExecutorShardLock(Task *task, CmdType commandType);
static void AcquireExecutorMultiShardLocks(List *shardIntervalList);
static bool IsReplicated(List *shardIntervalList);
static void AcquireExecutorMultiShardLocks(List *taskList);
static bool RequiresSelectIsolation(Task *task);
static uint64 ReturnRowsFromTuplestore(uint64 tupleCount, TupleDesc tupleDescriptor,
DestReceiver *destination,
Tuplestorestate *tupleStore);
@ -311,15 +311,38 @@ AcquireExecutorShardLock(Task *task, CmdType commandType)
{
LockShardResource(shardId, lockMode);
}
/*
* If the task has a subselect, then we may need to lock the shards from which
* the query selects as well to prevent the subselects from seeing different
* results on different replicas. In particular this prevents INSERT.. SELECT
* commands from having a different effect on different placements.
*/
if (RequiresSelectIsolation(task))
{
/*
* ExclusiveLock conflicts with all lock types used by modifications
* and therefore prevents other modifications from running
* concurrently.
*/
LockShardListResources(task->selectShardList, ExclusiveLock);
}
}
/*
* AcquireExecutorMultiShardLocks acquires shard locks need for execution
* of writes on multiple shards.
* AcquireExecutorMultiShardLocks acquires shard locks needed for execution
* of writes on multiple shards. In addition to honouring commutativity
* rules, we currently only allow a single multi-shard command on a shard at
* a time. Otherwise, concurrent multi-shard commands may take row-level
* locks on the shard placements in a different order and create a distributed
* deadlock. This applies even when writes are commutative and/or there is
* no replication.
*
* 1. If citus.all_modifications_commutative is set to true, then all locks
* are acquired as ShareUpdateExclusiveLock.
*
* 2. If citus.all_modifications_commutative is false, then only the shards
* with 2 or more replicas are locked with ExclusiveLock. Otherwise, the
* lock is acquired with ShareUpdateExclusiveLock.
@ -327,65 +350,122 @@ AcquireExecutorShardLock(Task *task, CmdType commandType)
* ShareUpdateExclusiveLock conflicts with itself such that only one
* multi-shard modification at a time is allowed on a shard. It also conflicts
* with ExclusiveLock, which ensures that updates/deletes/upserts are applied
* in the same order on all placements. It does not conflict with ShareLock,
* which is normally obtained by single-shard commutative writes.
* in the same order on all placements. It does not conflict with
* RowExclusiveLock, which is normally obtained by single-shard, commutative
* writes.
*/
static void
AcquireExecutorMultiShardLocks(List *shardIntervalList)
AcquireExecutorMultiShardLocks(List *taskList)
{
LOCKMODE lockMode = NoLock;
ListCell *taskCell = NULL;
if (AllModificationsCommutative || !IsReplicated(shardIntervalList))
foreach(taskCell, taskList)
{
Task *task = (Task *) lfirst(taskCell);
LOCKMODE lockMode = NoLock;
if (AllModificationsCommutative || list_length(task->taskPlacementList) == 1)
{
/*
* When all writes are commutative then we only need to prevent multi-shard
* commands from running concurrently with each other and with commands
* that are explicitly non-commutative. When there is no replication then
* we only need to prevent concurrent multi-shard commands.
*
* In either case, ShareUpdateExclusive has the desired effect, since
* it conflicts with itself and ExclusiveLock (taken by non-commutative
* writes).
*/
lockMode = ShareUpdateExclusiveLock;
}
else
{
/*
* When there is replication, prevent all concurrent writes to the same
* shards to ensure the writes are ordered.
*/
lockMode = ExclusiveLock;
}
LockShardResource(task->anchorShardId, lockMode);
/*
* When all writes are commutative then we only need to prevent multi-shard
* commands from running concurrently with each other and with commands
* that are explicitly non-commutative. When there is not replication then
* we only need to prevent concurrent multi-shard commands.
*
* In either case, ShareUpdateExclusive has the desired effect, since
* it conflicts with itself and ExclusiveLock (taken by non-commutative
* writes).
* If the task has a subselect, then we may need to lock the shards from which
* the query selects as well to prevent the subselects from seeing different
* results on different replicas. In particular this prevents INSERT..SELECT
* commands from having different effects on different placements.
*/
lockMode = ShareUpdateExclusiveLock;
}
else
{
/*
* When there is replication, prevent all concurrent writes to the same
* shards to ensure the writes are ordered.
*/
lockMode = ExclusiveLock;
}
if (RequiresSelectIsolation(task))
{
/*
* ExclusiveLock conflicts with all lock types used by modifications
* and therefore prevents other modifications from running
* concurrently.
*/
LockShardListResources(shardIntervalList, lockMode);
LockShardListResources(task->selectShardList, ExclusiveLock);
}
}
}
/*
* IsReplicated checks whether any of the shards in the given list has more
* than one replica.
* RequiresSelectIsolation returns whether a given task requires SELECT
* isolation, meaning we should take the necessary locks to ensure that a
* subselect, which may join multiple shards, will return the same output
* for all task placements.
*/
static bool
IsReplicated(List *shardIntervalList)
RequiresSelectIsolation(Task *task)
{
ListCell *shardIntervalCell;
bool hasReplication = false;
bool requiresIsolation = false;
foreach(shardIntervalCell, shardIntervalList)
if (!task->insertSelectQuery)
{
ShardInterval *shardInterval = (ShardInterval *) lfirst(shardIntervalCell);
uint64 shardId = shardInterval->shardId;
List *shardPlacementList = FinalizedShardPlacementList(shardId);
if (shardPlacementList->length > 1)
{
hasReplication = true;
break;
}
/*
* Only INSERT/SELECT commands currently require SELECT isolation.
* Other commands do not read from other shards.
*/
requiresIsolation = false;
}
else if (list_length(task->taskPlacementList) == 1)
{
/*
* If there is only one replica then we fully rely on PostgreSQL to
* provide SELECT isolation. In this case, we do not provide isolation
* across the shards, but that was never our intention.
*/
requiresIsolation = false;
}
else if (AllModificationsCommutative)
{
/*
* An INSERT/SELECT is commutative with other writes if it excludes
* any ongoing writes based on the filter conditions. Without knowing
* whether this is true, we assume the user took this into account
* when enabling citus.all_modifications_commutative. This option
* gives users an escape from aggressive locking during INSERT/SELECT.
*/
requiresIsolation = false;
}
else
{
/*
* If this is a non-commutative write, then we need to block ongoing
* writes to make sure that the subselect returns the same result
* on all placements.
*/
requiresIsolation = true;
}
return hasReplication;
return requiresIsolation;
}
@ -809,7 +889,7 @@ ExecuteModifyTasks(List *taskList, bool expectResults, ParamListInfo paramListIn
shardIntervalList = TaskShardIntervalList(taskList);
/* ensure that there are no concurrent modifications on the same shards */
AcquireExecutorMultiShardLocks(shardIntervalList);
AcquireExecutorMultiShardLocks(taskList);
/* open connection to all relevant placements, if not already open */
OpenTransactionsToAllShardPlacements(shardIntervalList, userName);

View File

@ -105,7 +105,8 @@ static Task * RouterSelectTask(Query *originalQuery,
List **placementList);
static bool RouterSelectQuery(Query *originalQuery,
RelationRestrictionContext *restrictionContext,
List **placementList, uint64 *anchorShardId);
List **placementList, uint64 *anchorShardId,
List **selectShardList);
static List * TargetShardIntervalsForSelect(Query *query,
RelationRestrictionContext *restrictionContext);
static List * WorkersContainingAllShards(List *prunedShardIntervalsList);
@ -264,6 +265,8 @@ CreateMultiTaskRouterPlan(Query *originalQuery,
/* add the task if it could be created */
if (modifyTask != NULL)
{
modifyTask->insertSelectQuery = true;
sqlTaskList = lappend(sqlTaskList, modifyTask);
}
@ -323,6 +326,7 @@ RouterModifyTaskForShardInterval(Query *originalQuery, ShardInterval *shardInter
Task *modifyTask = NULL;
List *selectPlacementList = NIL;
uint64 selectAnchorShardId = INVALID_SHARD_ID;
List *selectShardList = NIL;
uint64 jobId = INVALID_JOB_ID;
List *insertShardPlacementList = NULL;
List *intersectedPlacementList = NULL;
@ -363,7 +367,8 @@ RouterModifyTaskForShardInterval(Query *originalQuery, ShardInterval *shardInter
* updated to point to the relevant nodes and selectPlacementList is determined.
*/
routerPlannable = RouterSelectQuery(copiedSubquery, copiedRestrictionContext,
&selectPlacementList, &selectAnchorShardId);
&selectPlacementList, &selectAnchorShardId,
&selectShardList);
if (!routerPlannable)
{
@ -417,6 +422,7 @@ RouterModifyTaskForShardInterval(Query *originalQuery, ShardInterval *shardInter
modifyTask->anchorShardId = shardId;
modifyTask->taskPlacementList = insertShardPlacementList;
modifyTask->upsertQuery = upsertQuery;
modifyTask->selectShardList = selectShardList;
return modifyTask;
}
@ -1762,9 +1768,10 @@ RouterSelectTask(Query *originalQuery, RelationRestrictionContext *restrictionCo
StringInfo queryString = makeStringInfo();
bool upsertQuery = false;
uint64 shardId = INVALID_SHARD_ID;
List *selectShardList = NIL;
queryRoutable = RouterSelectQuery(originalQuery, restrictionContext,
placementList, &shardId);
placementList, &shardId, &selectShardList);
if (!queryRoutable)
@ -1798,7 +1805,7 @@ RouterSelectTask(Query *originalQuery, RelationRestrictionContext *restrictionCo
*/
static bool
RouterSelectQuery(Query *originalQuery, RelationRestrictionContext *restrictionContext,
List **placementList, uint64 *anchorShardId)
List **placementList, uint64 *anchorShardId, List **selectShardList)
{
List *prunedRelationShardList = TargetShardIntervalsForSelect(originalQuery,
restrictionContext);
@ -1833,12 +1840,15 @@ RouterSelectQuery(Query *originalQuery, RelationRestrictionContext *restrictionC
/* all relations are now pruned down to 0 or 1 shards */
Assert(list_length(prunedShardList) <= 1);
shardInterval = (ShardInterval *) linitial(prunedShardList);
/* anchor shard id */
if (shardId == INVALID_SHARD_ID)
{
shardInterval = (ShardInterval *) linitial(prunedShardList);
shardId = shardInterval->shardId;
}
*selectShardList = lappend(*selectShardList, shardInterval);
}
/*

View File

@ -493,6 +493,8 @@ OutTask(OUTFUNC_ARGS)
WRITE_BOOL_FIELD(assignmentConstrained);
WRITE_NODE_FIELD(taskExecution);
WRITE_BOOL_FIELD(upsertQuery);
WRITE_BOOL_FIELD(insertSelectQuery);
WRITE_NODE_FIELD(selectShardList);
}
#if (PG_VERSION_NUM < 90600)

View File

@ -289,6 +289,8 @@ ReadTask(READFUNC_ARGS)
READ_BOOL_FIELD(assignmentConstrained);
READ_NODE_FIELD(taskExecution);
READ_BOOL_FIELD(upsertQuery);
READ_BOOL_FIELD(insertSelectQuery);
READ_NODE_FIELD(selectShardList);
READ_DONE();
}

View File

@ -169,6 +169,9 @@ typedef struct Task
uint64 shardId; /* only applies to shard fetch tasks */
TaskExecution *taskExecution; /* used by task tracker executor */
bool upsertQuery; /* only applies to modify tasks */
bool insertSelectQuery;
List *selectShardList; /* only applies INSERT/SELECT tasks */
} Task;