From 44d1f7b9509d55dbc0f610241d469e3e8b698299 Mon Sep 17 00:00:00 2001 From: Marco Slot Date: Thu, 20 Oct 2016 11:31:12 +0200 Subject: [PATCH] Add read locks for making INSERT/SELECT consistent across placements When an INSERT/SELECT command runs on a distributed table with replication factor >1, we need to ensure that it sees the same result on each placement of a shard. After this change, the router executor takes exclusive locks on shards from which the SELECT in an INSERT/SELECT reads in order to prevent concurrent changes. This is not a very optimal solution, but it's simple and correct. The citus.all_modifications_commutative can be used to avoid aggressive locking. An INSERT/SELECT whose filters are known to exclude any ongoing writes can be marked as commutative. --- .../executor/multi_router_executor.c | 168 +++++++++++++----- .../planner/multi_router_planner.c | 20 ++- .../distributed/utils/citus_outfuncs.c | 2 + .../distributed/utils/citus_readfuncs.c | 2 + .../distributed/multi_physical_planner.h | 3 + 5 files changed, 146 insertions(+), 49 deletions(-) diff --git a/src/backend/distributed/executor/multi_router_executor.c b/src/backend/distributed/executor/multi_router_executor.c index cb9171bb4..70fd5fd7a 100644 --- a/src/backend/distributed/executor/multi_router_executor.c +++ b/src/backend/distributed/executor/multi_router_executor.c @@ -100,8 +100,8 @@ static int64 ExecuteModifyTasks(List *taskList, bool expectResults, TupleDesc tupleDescriptor); static List * TaskShardIntervalList(List *taskList); static void AcquireExecutorShardLock(Task *task, CmdType commandType); -static void AcquireExecutorMultiShardLocks(List *shardIntervalList); -static bool IsReplicated(List *shardIntervalList); +static void AcquireExecutorMultiShardLocks(List *taskList); +static bool RequiresSelectIsolation(Task *task); static uint64 ReturnRowsFromTuplestore(uint64 tupleCount, TupleDesc tupleDescriptor, DestReceiver *destination, Tuplestorestate *tupleStore); @@ -311,15 +311,38 @@ AcquireExecutorShardLock(Task *task, CmdType commandType) { LockShardResource(shardId, lockMode); } + + /* + * If the task has a subselect, then we may need to lock the shards from which + * the query selects as well to prevent the subselects from seeing different + * results on different replicas. In particular this prevents INSERT.. SELECT + * commands from having a different effect on different placements. + */ + if (RequiresSelectIsolation(task)) + { + /* + * ExclusiveLock conflicts with all lock types used by modifications + * and therefore prevents other modifications from running + * concurrently. + */ + + LockShardListResources(task->selectShardList, ExclusiveLock); + } } /* - * AcquireExecutorMultiShardLocks acquires shard locks need for execution - * of writes on multiple shards. + * AcquireExecutorMultiShardLocks acquires shard locks needed for execution + * of writes on multiple shards. In addition to honouring commutativity + * rules, we currently only allow a single multi-shard command on a shard at + * a time. Otherwise, concurrent multi-shard commands may take row-level + * locks on the shard placements in a different order and create a distributed + * deadlock. This applies even when writes are commutative and/or there is + * no replication. * * 1. If citus.all_modifications_commutative is set to true, then all locks * are acquired as ShareUpdateExclusiveLock. + * * 2. If citus.all_modifications_commutative is false, then only the shards * with 2 or more replicas are locked with ExclusiveLock. Otherwise, the * lock is acquired with ShareUpdateExclusiveLock. @@ -327,65 +350,122 @@ AcquireExecutorShardLock(Task *task, CmdType commandType) * ShareUpdateExclusiveLock conflicts with itself such that only one * multi-shard modification at a time is allowed on a shard. It also conflicts * with ExclusiveLock, which ensures that updates/deletes/upserts are applied - * in the same order on all placements. It does not conflict with ShareLock, - * which is normally obtained by single-shard commutative writes. + * in the same order on all placements. It does not conflict with + * RowExclusiveLock, which is normally obtained by single-shard, commutative + * writes. */ static void -AcquireExecutorMultiShardLocks(List *shardIntervalList) +AcquireExecutorMultiShardLocks(List *taskList) { - LOCKMODE lockMode = NoLock; + ListCell *taskCell = NULL; - if (AllModificationsCommutative || !IsReplicated(shardIntervalList)) + foreach(taskCell, taskList) { + Task *task = (Task *) lfirst(taskCell); + LOCKMODE lockMode = NoLock; + + if (AllModificationsCommutative || list_length(task->taskPlacementList) == 1) + { + /* + * When all writes are commutative then we only need to prevent multi-shard + * commands from running concurrently with each other and with commands + * that are explicitly non-commutative. When there is no replication then + * we only need to prevent concurrent multi-shard commands. + * + * In either case, ShareUpdateExclusive has the desired effect, since + * it conflicts with itself and ExclusiveLock (taken by non-commutative + * writes). + */ + + lockMode = ShareUpdateExclusiveLock; + } + else + { + /* + * When there is replication, prevent all concurrent writes to the same + * shards to ensure the writes are ordered. + */ + + lockMode = ExclusiveLock; + } + + LockShardResource(task->anchorShardId, lockMode); + /* - * When all writes are commutative then we only need to prevent multi-shard - * commands from running concurrently with each other and with commands - * that are explicitly non-commutative. When there is not replication then - * we only need to prevent concurrent multi-shard commands. - * - * In either case, ShareUpdateExclusive has the desired effect, since - * it conflicts with itself and ExclusiveLock (taken by non-commutative - * writes). + * If the task has a subselect, then we may need to lock the shards from which + * the query selects as well to prevent the subselects from seeing different + * results on different replicas. In particular this prevents INSERT..SELECT + * commands from having different effects on different placements. */ - lockMode = ShareUpdateExclusiveLock; - } - else - { - /* - * When there is replication, prevent all concurrent writes to the same - * shards to ensure the writes are ordered. - */ - lockMode = ExclusiveLock; - } + if (RequiresSelectIsolation(task)) + { + /* + * ExclusiveLock conflicts with all lock types used by modifications + * and therefore prevents other modifications from running + * concurrently. + */ - LockShardListResources(shardIntervalList, lockMode); + LockShardListResources(task->selectShardList, ExclusiveLock); + } + } } /* - * IsReplicated checks whether any of the shards in the given list has more - * than one replica. + * RequiresSelectIsolation returns whether a given task requires SELECT + * isolation, meaning we should take the necessary locks to ensure that a + * subselect, which may join multiple shards, will return the same output + * for all task placements. */ static bool -IsReplicated(List *shardIntervalList) +RequiresSelectIsolation(Task *task) { - ListCell *shardIntervalCell; - bool hasReplication = false; + bool requiresIsolation = false; - foreach(shardIntervalCell, shardIntervalList) + if (!task->insertSelectQuery) { - ShardInterval *shardInterval = (ShardInterval *) lfirst(shardIntervalCell); - uint64 shardId = shardInterval->shardId; - List *shardPlacementList = FinalizedShardPlacementList(shardId); - if (shardPlacementList->length > 1) - { - hasReplication = true; - break; - } + /* + * Only INSERT/SELECT commands currently require SELECT isolation. + * Other commands do not read from other shards. + */ + + requiresIsolation = false; + } + else if (list_length(task->taskPlacementList) == 1) + { + /* + * If there is only one replica then we fully rely on PostgreSQL to + * provide SELECT isolation. In this case, we do not provide isolation + * across the shards, but that was never our intention. + */ + + requiresIsolation = false; + } + else if (AllModificationsCommutative) + { + /* + * An INSERT/SELECT is commutative with other writes if it excludes + * any ongoing writes based on the filter conditions. Without knowing + * whether this is true, we assume the user took this into account + * when enabling citus.all_modifications_commutative. This option + * gives users an escape from aggressive locking during INSERT/SELECT. + */ + + requiresIsolation = false; + } + else + { + /* + * If this is a non-commutative write, then we need to block ongoing + * writes to make sure that the subselect returns the same result + * on all placements. + */ + + requiresIsolation = true; } - return hasReplication; + return requiresIsolation; } @@ -809,7 +889,7 @@ ExecuteModifyTasks(List *taskList, bool expectResults, ParamListInfo paramListIn shardIntervalList = TaskShardIntervalList(taskList); /* ensure that there are no concurrent modifications on the same shards */ - AcquireExecutorMultiShardLocks(shardIntervalList); + AcquireExecutorMultiShardLocks(taskList); /* open connection to all relevant placements, if not already open */ OpenTransactionsToAllShardPlacements(shardIntervalList, userName); diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index 7c67e11bc..f0063188a 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -105,7 +105,8 @@ static Task * RouterSelectTask(Query *originalQuery, List **placementList); static bool RouterSelectQuery(Query *originalQuery, RelationRestrictionContext *restrictionContext, - List **placementList, uint64 *anchorShardId); + List **placementList, uint64 *anchorShardId, + List **selectShardList); static List * TargetShardIntervalsForSelect(Query *query, RelationRestrictionContext *restrictionContext); static List * WorkersContainingAllShards(List *prunedShardIntervalsList); @@ -264,6 +265,8 @@ CreateMultiTaskRouterPlan(Query *originalQuery, /* add the task if it could be created */ if (modifyTask != NULL) { + modifyTask->insertSelectQuery = true; + sqlTaskList = lappend(sqlTaskList, modifyTask); } @@ -323,6 +326,7 @@ RouterModifyTaskForShardInterval(Query *originalQuery, ShardInterval *shardInter Task *modifyTask = NULL; List *selectPlacementList = NIL; uint64 selectAnchorShardId = INVALID_SHARD_ID; + List *selectShardList = NIL; uint64 jobId = INVALID_JOB_ID; List *insertShardPlacementList = NULL; List *intersectedPlacementList = NULL; @@ -363,7 +367,8 @@ RouterModifyTaskForShardInterval(Query *originalQuery, ShardInterval *shardInter * updated to point to the relevant nodes and selectPlacementList is determined. */ routerPlannable = RouterSelectQuery(copiedSubquery, copiedRestrictionContext, - &selectPlacementList, &selectAnchorShardId); + &selectPlacementList, &selectAnchorShardId, + &selectShardList); if (!routerPlannable) { @@ -417,6 +422,7 @@ RouterModifyTaskForShardInterval(Query *originalQuery, ShardInterval *shardInter modifyTask->anchorShardId = shardId; modifyTask->taskPlacementList = insertShardPlacementList; modifyTask->upsertQuery = upsertQuery; + modifyTask->selectShardList = selectShardList; return modifyTask; } @@ -1762,9 +1768,10 @@ RouterSelectTask(Query *originalQuery, RelationRestrictionContext *restrictionCo StringInfo queryString = makeStringInfo(); bool upsertQuery = false; uint64 shardId = INVALID_SHARD_ID; + List *selectShardList = NIL; queryRoutable = RouterSelectQuery(originalQuery, restrictionContext, - placementList, &shardId); + placementList, &shardId, &selectShardList); if (!queryRoutable) @@ -1798,7 +1805,7 @@ RouterSelectTask(Query *originalQuery, RelationRestrictionContext *restrictionCo */ static bool RouterSelectQuery(Query *originalQuery, RelationRestrictionContext *restrictionContext, - List **placementList, uint64 *anchorShardId) + List **placementList, uint64 *anchorShardId, List **selectShardList) { List *prunedRelationShardList = TargetShardIntervalsForSelect(originalQuery, restrictionContext); @@ -1833,12 +1840,15 @@ RouterSelectQuery(Query *originalQuery, RelationRestrictionContext *restrictionC /* all relations are now pruned down to 0 or 1 shards */ Assert(list_length(prunedShardList) <= 1); + shardInterval = (ShardInterval *) linitial(prunedShardList); + /* anchor shard id */ if (shardId == INVALID_SHARD_ID) { - shardInterval = (ShardInterval *) linitial(prunedShardList); shardId = shardInterval->shardId; } + + *selectShardList = lappend(*selectShardList, shardInterval); } /* diff --git a/src/backend/distributed/utils/citus_outfuncs.c b/src/backend/distributed/utils/citus_outfuncs.c index 49b41db27..6242dffd3 100644 --- a/src/backend/distributed/utils/citus_outfuncs.c +++ b/src/backend/distributed/utils/citus_outfuncs.c @@ -493,6 +493,8 @@ OutTask(OUTFUNC_ARGS) WRITE_BOOL_FIELD(assignmentConstrained); WRITE_NODE_FIELD(taskExecution); WRITE_BOOL_FIELD(upsertQuery); + WRITE_BOOL_FIELD(insertSelectQuery); + WRITE_NODE_FIELD(selectShardList); } #if (PG_VERSION_NUM < 90600) diff --git a/src/backend/distributed/utils/citus_readfuncs.c b/src/backend/distributed/utils/citus_readfuncs.c index 256dc20a2..1558b14fe 100644 --- a/src/backend/distributed/utils/citus_readfuncs.c +++ b/src/backend/distributed/utils/citus_readfuncs.c @@ -289,6 +289,8 @@ ReadTask(READFUNC_ARGS) READ_BOOL_FIELD(assignmentConstrained); READ_NODE_FIELD(taskExecution); READ_BOOL_FIELD(upsertQuery); + READ_BOOL_FIELD(insertSelectQuery); + READ_NODE_FIELD(selectShardList); READ_DONE(); } diff --git a/src/include/distributed/multi_physical_planner.h b/src/include/distributed/multi_physical_planner.h index e1fc30aea..5a84159a1 100644 --- a/src/include/distributed/multi_physical_planner.h +++ b/src/include/distributed/multi_physical_planner.h @@ -169,6 +169,9 @@ typedef struct Task uint64 shardId; /* only applies to shard fetch tasks */ TaskExecution *taskExecution; /* used by task tracker executor */ bool upsertQuery; /* only applies to modify tasks */ + + bool insertSelectQuery; + List *selectShardList; /* only applies INSERT/SELECT tasks */ } Task;