From 213d8419c685d9207e2654c0a7d281836dac3b1d Mon Sep 17 00:00:00 2001 From: Marco Slot Date: Mon, 17 Oct 2016 12:29:03 +0200 Subject: [PATCH 1/4] Refactor and redocument executor shard lock code --- .../executor/multi_router_executor.c | 121 ++++++++++-------- 1 file changed, 71 insertions(+), 50 deletions(-) diff --git a/src/backend/distributed/executor/multi_router_executor.c b/src/backend/distributed/executor/multi_router_executor.c index 3056bd9f9..9eb22d029 100644 --- a/src/backend/distributed/executor/multi_router_executor.c +++ b/src/backend/distributed/executor/multi_router_executor.c @@ -84,8 +84,7 @@ static bool subXactAbortAttempted = false; /* functions needed during start phase */ static void InitTransactionStateForTask(Task *task); -static LOCKMODE CommutativityRuleToLockMode(CmdType commandType, bool upsertQuery); -static void AcquireExecutorShardLock(Task *task, LOCKMODE lockMode); +static void AcquireExecutorShardLock(Task *task, CmdType commandType); static HTAB * CreateXactParticipantHash(void); /* functions needed during run phase */ @@ -126,7 +125,6 @@ static void MarkRemainingInactivePlacements(void); void RouterExecutorStart(QueryDesc *queryDesc, int eflags, Task *task) { - LOCKMODE lockMode = NoLock; EState *executorState = NULL; CmdType commandType = queryDesc->operation; @@ -174,13 +172,6 @@ RouterExecutorStart(QueryDesc *queryDesc, int eflags, Task *task) * work. */ queryDesc->planstate = (PlanState *) makeNode(MaterialState); - - lockMode = CommutativityRuleToLockMode(commandType, task->upsertQuery); - - if (lockMode != NoLock) - { - AcquireExecutorShardLock(task, lockMode); - } } @@ -241,68 +232,94 @@ InitTransactionStateForTask(Task *task) /* - * CommutativityRuleToLockMode determines the commutativity rule for the given - * command and returns the appropriate lock mode to enforce that rule. The - * function assumes a SELECT doesn't modify state and therefore is commutative - * with all other commands. The function also assumes that an INSERT commutes - * with another INSERT, but not with an UPDATE/DELETE/UPSERT; and an - * UPDATE/DELETE/UPSERT doesn't commute with an INSERT, UPDATE, DELETE or UPSERT. + * AcquireExecutorShardLock acquires a lock on the shard for the given task and + * command type if necessary to avoid divergence between multiple replicas of + * the same shard. No lock is obtained when there is only one replica. * - * Note that the above comment defines INSERT INTO ... ON CONFLICT type of queries - * as an UPSERT. Since UPSERT is not defined as a separate command type in postgres, - * we have to pass it as a second parameter to the function. + * The function determines the appropriate lock mode based on the commutativity + * rule of the command. In each case, it uses a lock mode that enforces the + * commutativity rule. * - * The above mapping is overridden entirely when all_modifications_commutative - * is set to true. In that case, all commands just claim a shared lock. This - * allows the shard repair logic to lock out modifications while permitting all - * commands to otherwise commute. + * The mapping is overridden when all_modifications_commutative is set to true. + * In that case, all modifications are treated as commutative, which can be used + * to communicate that the application is only generating commutative + * UPDATE/DELETE/UPSERT commands and exclusive locks are unnecessary. */ -static LOCKMODE -CommutativityRuleToLockMode(CmdType commandType, bool upsertQuery) +static void +AcquireExecutorShardLock(Task *task, CmdType commandType) { LOCKMODE lockMode = NoLock; + int64 shardId = task->anchorShardId; - /* bypass commutativity checks when flag enabled */ - if (AllModificationsCommutative) + if (commandType == CMD_SELECT || list_length(task->taskPlacementList) == 1) { - return ShareLock; - } + /* + * The executor shard lock is used to maintain consistency between + * replicas and therefore no lock is required for read-only queries + * or in general when there is only one replica. + */ - if (commandType == CMD_SELECT) - { lockMode = NoLock; } - else if (upsertQuery) + else if (AllModificationsCommutative) { + /* + * Bypass commutativity checks when citus.all_modifications_commutative + * is enabled. + * + * A ShareLock does not conflict with itself and therefore allows + * multiple commutative commands to proceed concurrently. It does + * conflict with ExclusiveLock, which may still be obtained by another + * session that executes an UPDATE/DELETE/UPSERT command with + * citus.all_modifications_commutative disabled. + */ + + lockMode = ShareLock; + } + else if (task->upsertQuery || commandType == CMD_UPDATE || commandType == CMD_DELETE) + { + /* + * UPDATE/DELETE/UPSERT commands do not commute with other modifications + * since the rows modified by one command may be affected by the outcome + * of another command. + * + * We need to handle upsert before INSERT, because PostgreSQL models + * upsert commands as INSERT with an ON CONFLICT section. + * + * ExclusiveLock conflicts with all lock types used by modifications + * and therefore prevents other modifications from running + * concurrently. + */ + lockMode = ExclusiveLock; } else if (commandType == CMD_INSERT) { + /* + * An INSERT commutes with other INSERT commands, since performing them + * out-of-order only affects the table order on disk, but not the + * contents. + * + * When a unique constraint exists, INSERTs are not strictly commutative, + * but whichever INSERT comes last will error out and thus has no effect. + * INSERT is not commutative with UPDATE/DELETE/UPSERT, since the + * UPDATE/DELETE/UPSERT may consider the INSERT, depending on execution + * order. + * + * A ShareLock does not conflict with itself and therefore allows + * multiple INSERT commands to proceed concurrently. It conflicts with + * ExclusiveLock obtained by UPDATE/DELETE/UPSERT, ensuring those do + * not run concurrently with INSERT. + */ + lockMode = ShareLock; } - else if (commandType == CMD_UPDATE || commandType == CMD_DELETE) - { - lockMode = ExclusiveLock; - } else { ereport(ERROR, (errmsg("unrecognized operation code: %d", (int) commandType))); } - return lockMode; -} - - -/* - * AcquireExecutorShardLock: acquire shard lock needed for execution of - * a single task within a distributed plan. - */ -static void -AcquireExecutorShardLock(Task *task, LOCKMODE lockMode) -{ - int64 shardId = task->anchorShardId; - - if (shardId != INVALID_SHARD_ID) + if (shardId != INVALID_SHARD_ID && lockMode != NoLock) { LockShardResource(shardId, lockMode); } @@ -476,6 +493,7 @@ ExecuteTaskAndStoreResults(QueryDesc *queryDesc, Task *task, bool isModificationQuery, bool expectResults) { + CmdType commandType = queryDesc->operation; TupleDesc tupleDescriptor = queryDesc->tupDesc; EState *executorState = queryDesc->estate; MaterialState *routerState = (MaterialState *) queryDesc->planstate; @@ -504,6 +522,9 @@ ExecuteTaskAndStoreResults(QueryDesc *queryDesc, Task *task, elog(DEBUG4, "query after master evaluation: %s", queryString); } + /* prevent replicas of the same shard from diverging */ + AcquireExecutorShardLock(task, commandType); + /* * Try to run the query to completion on one placement. If the query fails * attempt the query on the next placement. From 9d98acfb6d4e660e38841a6ef169eff8dd8e0ac5 Mon Sep 17 00:00:00 2001 From: Marco Slot Date: Thu, 6 Oct 2016 00:17:48 +0200 Subject: [PATCH 2/4] Move requiresMasterEvaluation from Task to Job --- src/backend/distributed/executor/multi_router_executor.c | 9 +++++++-- src/backend/distributed/planner/multi_physical_planner.c | 1 + src/backend/distributed/planner/multi_router_planner.c | 5 ++--- src/backend/distributed/utils/citus_outfuncs.c | 2 +- src/backend/distributed/utils/citus_readfuncs.c | 2 +- src/include/distributed/multi_physical_planner.h | 2 +- 6 files changed, 13 insertions(+), 8 deletions(-) diff --git a/src/backend/distributed/executor/multi_router_executor.c b/src/backend/distributed/executor/multi_router_executor.c index 9eb22d029..4476e058c 100644 --- a/src/backend/distributed/executor/multi_router_executor.c +++ b/src/backend/distributed/executor/multi_router_executor.c @@ -91,6 +91,7 @@ static HTAB * CreateXactParticipantHash(void); static bool ExecuteTaskAndStoreResults(QueryDesc *queryDesc, Task *task, bool isModificationQuery, + bool requiresMasterModification, bool expectResults); static uint64 ReturnRowsFromTuplestore(uint64 tupleCount, TupleDesc tupleDescriptor, DestReceiver *destination, @@ -361,7 +362,8 @@ RouterExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count) { PlannedStmt *planStatement = queryDesc->plannedstmt; MultiPlan *multiPlan = GetMultiPlan(planStatement); - List *taskList = multiPlan->workerJob->taskList; + Job *workerJob = multiPlan->workerJob; + List *taskList = workerJob->taskList; Task *task = NULL; EState *estate = queryDesc->estate; CmdType operation = queryDesc->operation; @@ -415,6 +417,7 @@ RouterExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count) { bool resultsOK = false; bool isModificationQuery = false; + bool requiresMasterEvaluation = workerJob->requiresMasterEvaluation; if (operation == CMD_INSERT || operation == CMD_UPDATE || operation == CMD_DELETE) @@ -429,6 +432,7 @@ RouterExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count) resultsOK = ExecuteTaskAndStoreResults(queryDesc, task, isModificationQuery, + requiresMasterEvaluation, sendTuples); if (!resultsOK) { @@ -491,6 +495,7 @@ out: static bool ExecuteTaskAndStoreResults(QueryDesc *queryDesc, Task *task, bool isModificationQuery, + bool requiresMasterEvaluation, bool expectResults) { CmdType commandType = queryDesc->operation; @@ -506,7 +511,7 @@ ExecuteTaskAndStoreResults(QueryDesc *queryDesc, Task *task, bool gotResults = false; char *queryString = task->queryString; - if (isModificationQuery && task->requiresMasterEvaluation) + if (isModificationQuery && requiresMasterEvaluation) { PlannedStmt *planStatement = queryDesc->plannedstmt; MultiPlan *multiPlan = GetMultiPlan(planStatement); diff --git a/src/backend/distributed/planner/multi_physical_planner.c b/src/backend/distributed/planner/multi_physical_planner.c index 3251ac714..70e819d6d 100644 --- a/src/backend/distributed/planner/multi_physical_planner.c +++ b/src/backend/distributed/planner/multi_physical_planner.c @@ -1677,6 +1677,7 @@ BuildJob(Query *jobQuery, List *dependedJobList) job->jobId = UniqueJobId(); job->jobQuery = jobQuery; job->dependedJobList = dependedJobList; + job->requiresMasterEvaluation = false; return job; } diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index 3de008c60..56a7c357f 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -757,7 +757,6 @@ RouterModifyTask(Query *originalQuery, Query *query) StringInfo queryString = makeStringInfo(); Task *modifyTask = NULL; bool upsertQuery = false; - bool requiresMasterEvaluation = RequiresMasterEvaluation(originalQuery); /* grab shared metadata lock to stop concurrent placement additions */ LockShardDistributionMetadata(shardId, ShareLock); @@ -789,7 +788,6 @@ RouterModifyTask(Query *originalQuery, Query *query) modifyTask->anchorShardId = shardId; modifyTask->dependedTaskList = NIL; modifyTask->upsertQuery = upsertQuery; - modifyTask->requiresMasterEvaluation = requiresMasterEvaluation; return modifyTask; } @@ -1126,7 +1124,6 @@ RouterSelectTask(Query *originalQuery, Query *query, task->anchorShardId = shardId; task->dependedTaskList = NIL; task->upsertQuery = upsertQuery; - task->requiresMasterEvaluation = false; *placementList = workerList; @@ -1477,6 +1474,7 @@ RouterQueryJob(Query *query, Task *task, List *placementList) Job *job = NULL; List *taskList = NIL; TaskType taskType = task->taskType; + bool requiresMasterEvaluation = RequiresMasterEvaluation(query); /* * We send modify task to the first replica, otherwise we choose the target shard @@ -1501,6 +1499,7 @@ RouterQueryJob(Query *query, Task *task, List *placementList) job->subqueryPushdown = false; job->jobQuery = query; job->taskList = taskList; + job->requiresMasterEvaluation = requiresMasterEvaluation; return job; } diff --git a/src/backend/distributed/utils/citus_outfuncs.c b/src/backend/distributed/utils/citus_outfuncs.c index 3c4cd213b..49b41db27 100644 --- a/src/backend/distributed/utils/citus_outfuncs.c +++ b/src/backend/distributed/utils/citus_outfuncs.c @@ -389,6 +389,7 @@ OutJobFields(StringInfo str, const Job *node) WRITE_NODE_FIELD(taskList); WRITE_NODE_FIELD(dependedJobList); WRITE_BOOL_FIELD(subqueryPushdown); + WRITE_BOOL_FIELD(requiresMasterEvaluation); } @@ -492,7 +493,6 @@ OutTask(OUTFUNC_ARGS) WRITE_BOOL_FIELD(assignmentConstrained); WRITE_NODE_FIELD(taskExecution); WRITE_BOOL_FIELD(upsertQuery); - WRITE_BOOL_FIELD(requiresMasterEvaluation); } #if (PG_VERSION_NUM < 90600) diff --git a/src/backend/distributed/utils/citus_readfuncs.c b/src/backend/distributed/utils/citus_readfuncs.c index 1b85b0b93..256dc20a2 100644 --- a/src/backend/distributed/utils/citus_readfuncs.c +++ b/src/backend/distributed/utils/citus_readfuncs.c @@ -161,6 +161,7 @@ readJobInfo(Job *local_node) READ_NODE_FIELD(taskList); READ_NODE_FIELD(dependedJobList); READ_BOOL_FIELD(subqueryPushdown); + READ_BOOL_FIELD(requiresMasterEvaluation); } @@ -288,7 +289,6 @@ ReadTask(READFUNC_ARGS) READ_BOOL_FIELD(assignmentConstrained); READ_NODE_FIELD(taskExecution); READ_BOOL_FIELD(upsertQuery); - READ_BOOL_FIELD(requiresMasterEvaluation); READ_DONE(); } diff --git a/src/include/distributed/multi_physical_planner.h b/src/include/distributed/multi_physical_planner.h index 351ab9171..3d12fa3aa 100644 --- a/src/include/distributed/multi_physical_planner.h +++ b/src/include/distributed/multi_physical_planner.h @@ -120,6 +120,7 @@ typedef struct Job List *taskList; List *dependedJobList; bool subqueryPushdown; + bool requiresMasterEvaluation; /* only applies to modify jobs */ } Job; @@ -168,7 +169,6 @@ typedef struct Task uint64 shardId; /* only applies to shard fetch tasks */ TaskExecution *taskExecution; /* used by task tracker executor */ bool upsertQuery; /* only applies to modify tasks */ - bool requiresMasterEvaluation; /* only applies to modify tasks */ } Task; From a497e7178c07e37f7317961387d3cf92caccb8d7 Mon Sep 17 00:00:00 2001 From: Marco Slot Date: Fri, 7 Oct 2016 03:26:04 +0200 Subject: [PATCH 3/4] Parallelise master_modify_multiple_shards --- src/backend/distributed/commands/multi_copy.c | 3 +- .../executor/multi_router_executor.c | 357 ++++++++++++++---- .../distributed/executor/multi_utility.c | 7 +- .../master/master_modify_multiple_shards.c | 198 ++-------- .../transaction/multi_shard_transaction.c | 2 +- .../distributed/utils/connection_cache.c | 14 +- src/backend/distributed/utils/resource_lock.c | 35 +- src/include/distributed/connection_cache.h | 2 +- .../distributed/multi_router_executor.h | 4 + src/include/distributed/resource_lock.h | 5 +- .../expected/multi_modifying_xacts.out | 6 +- .../regress/expected/multi_shard_modify.out | 20 +- src/test/regress/expected/multi_truncate.out | 28 +- src/test/regress/sql/multi_shard_modify.sql | 6 +- src/test/regress/sql/multi_truncate.sql | 14 +- 15 files changed, 410 insertions(+), 291 deletions(-) diff --git a/src/backend/distributed/commands/multi_copy.c b/src/backend/distributed/commands/multi_copy.c index b1ed3d5cd..f8745d466 100644 --- a/src/backend/distributed/commands/multi_copy.c +++ b/src/backend/distributed/commands/multi_copy.c @@ -433,7 +433,8 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag) } /* prevent concurrent placement changes and non-commutative DML statements */ - LockShards(shardIntervalList, ShareLock); + LockShardListMetadata(shardIntervalList, ShareLock); + LockShardListResources(shardIntervalList, ShareLock); /* initialize the shard interval cache */ shardCount = cacheEntry->shardIntervalArrayLength; diff --git a/src/backend/distributed/executor/multi_router_executor.c b/src/backend/distributed/executor/multi_router_executor.c index 4476e058c..6c0238a23 100644 --- a/src/backend/distributed/executor/multi_router_executor.c +++ b/src/backend/distributed/executor/multi_router_executor.c @@ -28,6 +28,7 @@ #include "catalog/pg_type.h" #include "distributed/citus_clauses.h" #include "distributed/citus_ruleutils.h" +#include "distributed/commit_protocol.h" #include "distributed/connection_cache.h" #include "distributed/master_metadata_utility.h" #include "distributed/metadata_cache.h" @@ -35,6 +36,7 @@ #include "distributed/multi_physical_planner.h" #include "distributed/multi_planner.h" #include "distributed/multi_router_executor.h" +#include "distributed/multi_shard_transaction.h" #include "distributed/relay_utility.h" #include "distributed/resource_lock.h" #include "executor/execdesc.h" @@ -64,6 +66,7 @@ /* controls use of locks to enforce safe commutativity */ bool AllModificationsCommutative = false; + /* * The following static variables are necessary to track the progression of * multi-statement transactions managed by the router executor. After the first @@ -84,15 +87,15 @@ static bool subXactAbortAttempted = false; /* functions needed during start phase */ static void InitTransactionStateForTask(Task *task); -static void AcquireExecutorShardLock(Task *task, CmdType commandType); static HTAB * CreateXactParticipantHash(void); /* functions needed during run phase */ -static bool ExecuteTaskAndStoreResults(QueryDesc *queryDesc, - Task *task, - bool isModificationQuery, - bool requiresMasterModification, - bool expectResults); +static bool ExecuteSingleTask(QueryDesc *queryDesc, Task *task, + bool isModificationQuery, bool expectResults); +static void ExecuteMultipleTasks(QueryDesc *queryDesc, List *taskList, + bool isModificationQuery, bool expectResults); +static List * TaskShardIntervalList(List *taskList); +static void AcquireExecutorShardLock(Task *task, CmdType commandType); static uint64 ReturnRowsFromTuplestore(uint64 tupleCount, TupleDesc tupleDescriptor, DestReceiver *destination, Tuplestorestate *tupleStore); @@ -106,8 +109,8 @@ static void ExtractParametersFromParamListInfo(ParamListInfo paramListInfo, static bool SendQueryInSingleRowMode(PGconn *connection, char *query, ParamListInfo paramListInfo); static bool StoreQueryResult(MaterialState *routerState, PGconn *connection, - TupleDesc tupleDescriptor, int64 *rows); -static bool ConsumeQueryResult(PGconn *connection, int64 *rows); + TupleDesc tupleDescriptor, bool failOnError, int64 *rows); +static bool ConsumeQueryResult(PGconn *connection, bool failOnError, int64 *rows); static void RecordShardIdParticipant(uint64 affectedShardId, NodeConnectionEntry *participantEntry); @@ -136,25 +139,6 @@ RouterExecutorStart(QueryDesc *queryDesc, int eflags, Task *task) if (commandType != CMD_SELECT) { eflags |= EXEC_FLAG_SKIP_TRIGGERS; - - if (XactModificationLevel == XACT_MODIFICATION_SCHEMA) - { - ereport(ERROR, (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION), - errmsg("distributed data modifications must not appear in " - "transaction blocks which contain distributed DDL " - "commands"))); - } - - /* - * We could naturally handle function-based transactions (i.e. those - * using PL/pgSQL or similar) by checking the type of queryDesc->dest, - * but some customers already use functions that touch multiple shards - * from within a function, so we'll ignore functions for now. - */ - if (IsTransactionBlock() && xactParticipantHash == NULL) - { - InitTransactionStateForTask(task); - } } /* signal that it is a router execution */ @@ -364,7 +348,6 @@ RouterExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count) MultiPlan *multiPlan = GetMultiPlan(planStatement); Job *workerJob = multiPlan->workerJob; List *taskList = workerJob->taskList; - Task *task = NULL; EState *estate = queryDesc->estate; CmdType operation = queryDesc->operation; MemoryContext oldcontext = NULL; @@ -372,14 +355,8 @@ RouterExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count) MaterialState *routerState = (MaterialState *) queryDesc->planstate; bool sendTuples = operation == CMD_SELECT || queryDesc->plannedstmt->hasReturning; - /* router executor can only execute distributed plans with a single task */ - Assert(list_length(taskList) == 1); - task = (Task *) linitial(taskList); - Assert(estate != NULL); Assert(!(estate->es_top_eflags & EXEC_FLAG_EXPLAIN_ONLY)); - Assert(task != NULL); - oldcontext = MemoryContextSwitchTo(estate->es_query_cxt); @@ -415,7 +392,6 @@ RouterExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count) */ if (!routerState->eof_underlying) { - bool resultsOK = false; bool isModificationQuery = false; bool requiresMasterEvaluation = workerJob->requiresMasterEvaluation; @@ -430,13 +406,45 @@ RouterExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count) (int) operation))); } - resultsOK = ExecuteTaskAndStoreResults(queryDesc, task, - isModificationQuery, - requiresMasterEvaluation, - sendTuples); - if (!resultsOK) + if (requiresMasterEvaluation) { - ereport(ERROR, (errmsg("could not receive query results"))); + ListCell *taskCell = NULL; + Query *query = workerJob->jobQuery; + Oid relationId = ((RangeTblEntry *) linitial(query->rtable))->relid; + + ExecuteMasterEvaluableFunctions(query); + + foreach(taskCell, taskList) + { + Task *task = (Task *) lfirst(taskCell); + StringInfo newQueryString = makeStringInfo(); + + deparse_shard_query(query, relationId, task->anchorShardId, + newQueryString); + + elog(DEBUG4, "query before master evaluation: %s", task->queryString); + elog(DEBUG4, "query after master evaluation: %s", newQueryString->data); + + task->queryString = newQueryString->data; + } + } + + if (list_length(taskList) == 1) + { + Task *task = (Task *) linitial(taskList); + bool resultsOK = false; + + resultsOK = ExecuteSingleTask(queryDesc, task, isModificationQuery, + sendTuples); + if (!resultsOK) + { + ereport(ERROR, (errmsg("could not receive query results"))); + } + } + else + { + ExecuteMultipleTasks(queryDesc, taskList, isModificationQuery, + sendTuples); } /* mark underlying query as having executed */ @@ -483,8 +491,8 @@ out: /* - * ExecuteTaskAndStoreResults executes the task on the remote node, retrieves - * the results and stores them, if SELECT or RETURNING is used, in a tuple + * ExecuteSingleTask executes the task on the remote node, retrieves the + * results and stores them, if SELECT or RETURNING is used, in a tuple * store. * * If the task fails on one of the placements, the function retries it on @@ -493,12 +501,11 @@ out: * failed), or errors out (DML failed on all placements). */ static bool -ExecuteTaskAndStoreResults(QueryDesc *queryDesc, Task *task, - bool isModificationQuery, - bool requiresMasterEvaluation, - bool expectResults) +ExecuteSingleTask(QueryDesc *queryDesc, Task *task, + bool isModificationQuery, + bool expectResults) { - CmdType commandType = queryDesc->operation; + CmdType operation = queryDesc->operation; TupleDesc tupleDescriptor = queryDesc->tupDesc; EState *executorState = queryDesc->estate; MaterialState *routerState = (MaterialState *) queryDesc->planstate; @@ -511,24 +518,27 @@ ExecuteTaskAndStoreResults(QueryDesc *queryDesc, Task *task, bool gotResults = false; char *queryString = task->queryString; - if (isModificationQuery && requiresMasterEvaluation) + if (XactModificationLevel == XACT_MODIFICATION_MULTI_SHARD) { - PlannedStmt *planStatement = queryDesc->plannedstmt; - MultiPlan *multiPlan = GetMultiPlan(planStatement); - Query *query = multiPlan->workerJob->jobQuery; - Oid relid = ((RangeTblEntry *) linitial(query->rtable))->relid; - StringInfo queryStringInfo = makeStringInfo(); + ereport(ERROR, (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION), + errmsg("single-shard DML commands must not appear in " + "transaction blocks which contain multi-shard data " + "modifications"))); + } - ExecuteMasterEvaluableFunctions(query); - deparse_shard_query(query, relid, task->anchorShardId, queryStringInfo); - queryString = queryStringInfo->data; - - elog(DEBUG4, "query before master evaluation: %s", task->queryString); - elog(DEBUG4, "query after master evaluation: %s", queryString); + /* + * We could naturally handle function-based transactions (i.e. those + * using PL/pgSQL or similar) by checking the type of queryDesc->dest, + * but some customers already use functions that touch multiple shards + * from within a function, so we'll ignore functions for now. + */ + if (operation != CMD_SELECT && xactParticipantHash == NULL && IsTransactionBlock()) + { + InitTransactionStateForTask(task); } /* prevent replicas of the same shard from diverging */ - AcquireExecutorShardLock(task, commandType); + AcquireExecutorShardLock(task, operation); /* * Try to run the query to completion on one placement. If the query fails @@ -538,6 +548,7 @@ ExecuteTaskAndStoreResults(QueryDesc *queryDesc, Task *task, { ShardPlacement *taskPlacement = (ShardPlacement *) lfirst(taskPlacementCell); bool queryOK = false; + bool failOnError = false; int64 currentAffectedTupleCount = 0; PGconn *connection = GetConnectionForPlacement(taskPlacement, isModificationQuery); @@ -564,11 +575,12 @@ ExecuteTaskAndStoreResults(QueryDesc *queryDesc, Task *task, if (!gotResults && expectResults) { queryOK = StoreQueryResult(routerState, connection, tupleDescriptor, - ¤tAffectedTupleCount); + failOnError, ¤tAffectedTupleCount); } else { - queryOK = ConsumeQueryResult(connection, ¤tAffectedTupleCount); + queryOK = ConsumeQueryResult(connection, failOnError, + ¤tAffectedTupleCount); } if (queryOK) @@ -642,6 +654,200 @@ ExecuteTaskAndStoreResults(QueryDesc *queryDesc, Task *task, } +/* + * ExecuteMultipleTasks executes a list of tasks on remote nodes, retrieves + * the results and, if RETURNING is used, stores them in a tuple store. + * + * If a task fails on one of the placements, the transaction rolls back. + * Otherwise, the changes are committed using 2PC when the local transaction + * commits. + */ +static void +ExecuteMultipleTasks(QueryDesc *queryDesc, List *taskList, + bool isModificationQuery, bool expectResults) +{ + TupleDesc tupleDescriptor = queryDesc->tupDesc; + EState *executorState = queryDesc->estate; + MaterialState *routerState = (MaterialState *) queryDesc->planstate; + ParamListInfo paramListInfo = queryDesc->params; + int64 affectedTupleCount = -1; + + /* can only support modifications right now */ + Assert(isModificationQuery); + + affectedTupleCount = ExecuteModifyTasks(taskList, expectResults, paramListInfo, + routerState, tupleDescriptor); + + executorState->es_processed = affectedTupleCount; +} + + +/* + * ExecuteModifyTasks executes a list of tasks on remote nodes, and + * optionally retrieves the results and stores them in a tuple store. + * + * If a task fails on one of the placements, the transaction rolls back. + * Otherwise, the changes are committed using 2PC when the local transaction + * commits. + */ +int64 +ExecuteModifyTasks(List *taskList, bool expectResults, ParamListInfo paramListInfo, + MaterialState *routerState, TupleDesc tupleDescriptor) +{ + int64 totalAffectedTupleCount = 0; + ListCell *taskCell = NULL; + char *userName = CurrentUserName(); + List *shardIntervalList = NIL; + + if (XactModificationLevel == XACT_MODIFICATION_DATA) + { + ereport(ERROR, (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION), + errmsg("multi-shard data modifications must not appear in " + "transaction blocks which contain single-shard DML " + "commands"))); + } + + XactModificationLevel = XACT_MODIFICATION_MULTI_SHARD; + + shardIntervalList = TaskShardIntervalList(taskList); + + /* ensure that there are no concurrent modifications on the same shards */ + LockShardListResources(shardIntervalList, ExclusiveLock); + + /* open connection to all relevant placements, if not already open */ + OpenTransactionsToAllShardPlacements(shardIntervalList, userName); + + /* send command to all relevant shard placements */ + foreach(taskCell, taskList) + { + Task *task = (Task *) lfirst(taskCell); + int64 shardId = task->anchorShardId; + char *queryString = task->queryString; + bool shardConnectionsFound = false; + ShardConnections *shardConnections = NULL; + List *connectionList = NIL; + ListCell *connectionCell = NULL; + + shardConnections = GetShardConnections(shardId, &shardConnectionsFound); + Assert(shardConnectionsFound); + + connectionList = shardConnections->connectionList; + Assert(connectionList != NIL); + + foreach(connectionCell, connectionList) + { + TransactionConnection *transactionConnection = + (TransactionConnection *) lfirst(connectionCell); + PGconn *connection = transactionConnection->connection; + bool queryOK = false; + + queryOK = SendQueryInSingleRowMode(connection, queryString, paramListInfo); + if (!queryOK) + { + ReraiseRemoteError(connection, NULL); + } + } + } + + /* collects results from all relevant shard placements */ + foreach(taskCell, taskList) + { + Task *task = (Task *) lfirst(taskCell); + int64 shardId = task->anchorShardId; + bool shardConnectionsFound = false; + ShardConnections *shardConnections = NULL; + List *connectionList = NIL; + ListCell *connectionCell = NULL; + int64 affectedTupleCount = 0; + bool gotResults = false; + + /* abort in case of cancellation */ + CHECK_FOR_INTERRUPTS(); + + shardConnections = GetShardConnections(shardId, &shardConnectionsFound); + connectionList = shardConnections->connectionList; + + foreach(connectionCell, connectionList) + { + TransactionConnection *transactionConnection = + (TransactionConnection *) lfirst(connectionCell); + PGconn *connection = transactionConnection->connection; + int64 currentAffectedTupleCount = 0; + bool failOnError = true; + bool queryOK PG_USED_FOR_ASSERTS_ONLY = false; + + /* + * If caller is interested, store query results the first time + * through. The output of the query's execution on other shards is + * discarded if we run there (because it's a modification query). + */ + if (!gotResults && expectResults) + { + Assert(routerState != NULL && tupleDescriptor != NULL); + + queryOK = StoreQueryResult(routerState, connection, tupleDescriptor, + failOnError, ¤tAffectedTupleCount); + } + else + { + queryOK = ConsumeQueryResult(connection, failOnError, + ¤tAffectedTupleCount); + } + + /* should have rolled back on error */ + Assert(queryOK); + + if (!gotResults) + { + affectedTupleCount = currentAffectedTupleCount; + totalAffectedTupleCount += affectedTupleCount; + } + else if (currentAffectedTupleCount != affectedTupleCount) + { + char *nodeName = ConnectionGetOptionValue(connection, "host"); + char *nodePort = ConnectionGetOptionValue(connection, "port"); + + ereport(WARNING, + (errmsg("modified "INT64_FORMAT " tuples, but expected " + "to modify "INT64_FORMAT, + currentAffectedTupleCount, affectedTupleCount), + errdetail("modified placement on %s:%s", nodeName, + nodePort))); + } + + gotResults = true; + } + } + + CHECK_FOR_INTERRUPTS(); + + return totalAffectedTupleCount; +} + + +/* + * TaskShardIntervalList returns a list of shard intervals for a given list of + * tasks. + */ +static List * +TaskShardIntervalList(List *taskList) +{ + ListCell *taskCell = NULL; + List *shardIntervalList = NIL; + + foreach(taskCell, taskList) + { + Task *task = (Task *) lfirst(taskCell); + int64 shardId = task->anchorShardId; + ShardInterval *shardInterval = LoadShardInterval(shardId); + + shardIntervalList = lappend(shardIntervalList, shardInterval); + } + + return shardIntervalList; +} + + /* * ReturnRowsFromTuplestore moves rows from a given tuplestore into a * receiver. It performs the necessary limiting to support cursors. @@ -929,7 +1135,7 @@ ExtractParametersFromParamListInfo(ParamListInfo paramListInfo, Oid **parameterT */ static bool StoreQueryResult(MaterialState *routerState, PGconn *connection, - TupleDesc tupleDescriptor, int64 *rows) + TupleDesc tupleDescriptor, bool failOnError, int64 *rows) { AttInMetadata *attributeInputMetadata = TupleDescGetAttInMetadata(tupleDescriptor); Tuplestorestate *tupleStore = NULL; @@ -947,7 +1153,7 @@ StoreQueryResult(MaterialState *routerState, PGconn *connection, { routerState->tuplestorestate = tuplestore_begin_heap(false, false, work_mem); } - else + else if (!failOnError) { /* might have failed query execution on another placement before */ tuplestore_clear(routerState->tuplestorestate); @@ -974,7 +1180,7 @@ StoreQueryResult(MaterialState *routerState, PGconn *connection, { char *sqlStateString = PQresultErrorField(result, PG_DIAG_SQLSTATE); int category = 0; - bool raiseError = false; + bool isConstraintViolation = false; /* * If the error code is in constraint violation class, we want to @@ -982,9 +1188,9 @@ StoreQueryResult(MaterialState *routerState, PGconn *connection, * placements. */ category = ERRCODE_TO_CATEGORY(ERRCODE_INTEGRITY_CONSTRAINT_VIOLATION); - raiseError = SqlStateMatchesCategory(sqlStateString, category); + isConstraintViolation = SqlStateMatchesCategory(sqlStateString, category); - if (raiseError) + if (isConstraintViolation || failOnError) { RemoveXactConnection(connection); ReraiseRemoteError(connection, result); @@ -1056,7 +1262,7 @@ StoreQueryResult(MaterialState *routerState, PGconn *connection, * has been an error. */ static bool -ConsumeQueryResult(PGconn *connection, int64 *rows) +ConsumeQueryResult(PGconn *connection, bool failOnError, int64 *rows) { bool commandFailed = false; bool gotResponse = false; @@ -1086,7 +1292,7 @@ ConsumeQueryResult(PGconn *connection, int64 *rows) { char *sqlStateString = PQresultErrorField(result, PG_DIAG_SQLSTATE); int category = 0; - bool raiseError = false; + bool isConstraintViolation = false; /* * If the error code is in constraint violation class, we want to @@ -1094,9 +1300,9 @@ ConsumeQueryResult(PGconn *connection, int64 *rows) * placements. */ category = ERRCODE_TO_CATEGORY(ERRCODE_INTEGRITY_CONSTRAINT_VIOLATION); - raiseError = SqlStateMatchesCategory(sqlStateString, category); + isConstraintViolation = SqlStateMatchesCategory(sqlStateString, category); - if (raiseError) + if (isConstraintViolation || failOnError) { RemoveXactConnection(connection); ReraiseRemoteError(connection, result); @@ -1118,8 +1324,11 @@ ConsumeQueryResult(PGconn *connection, int64 *rows) char *currentAffectedTupleString = PQcmdTuples(result); int64 currentAffectedTupleCount = 0; - scanint8(currentAffectedTupleString, false, ¤tAffectedTupleCount); - Assert(currentAffectedTupleCount >= 0); + if (*currentAffectedTupleString != '\0') + { + scanint8(currentAffectedTupleString, false, ¤tAffectedTupleCount); + Assert(currentAffectedTupleCount >= 0); + } #if (PG_VERSION_NUM < 90600) diff --git a/src/backend/distributed/executor/multi_utility.c b/src/backend/distributed/executor/multi_utility.c index 8864a7b16..05182dbba 100644 --- a/src/backend/distributed/executor/multi_utility.c +++ b/src/backend/distributed/executor/multi_utility.c @@ -1292,7 +1292,8 @@ ExecuteDistributedDDLCommand(Oid relationId, const char *ddlCommandString, { ereport(ERROR, (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION), errmsg("distributed DDL commands must not appear within " - "transaction blocks containing data modifications"))); + "transaction blocks containing single-shard data " + "modifications"))); } ShowNoticeIfNotUsing2PC(); @@ -1305,7 +1306,7 @@ ExecuteDistributedDDLCommand(Oid relationId, const char *ddlCommandString, ereport(ERROR, (errmsg("could not execute DDL command on worker node shards"))); } - XactModificationLevel = XACT_MODIFICATION_SCHEMA; + XactModificationLevel = XACT_MODIFICATION_MULTI_SHARD; } @@ -1359,7 +1360,7 @@ ExecuteCommandOnWorkerShards(Oid relationId, const char *commandString) Oid schemaId = get_rel_namespace(relationId); char *schemaName = get_namespace_name(schemaId); - LockShards(shardIntervalList, ShareLock); + LockShardListResources(shardIntervalList, ShareLock); OpenTransactionsToAllShardPlacements(shardIntervalList, tableOwner); foreach(shardIntervalCell, shardIntervalList) diff --git a/src/backend/distributed/master/master_modify_multiple_shards.c b/src/backend/distributed/master/master_modify_multiple_shards.c index d54407341..169f22d91 100644 --- a/src/backend/distributed/master/master_modify_multiple_shards.c +++ b/src/backend/distributed/master/master_modify_multiple_shards.c @@ -41,6 +41,7 @@ #include "distributed/pg_dist_shard.h" #include "distributed/pg_dist_partition.h" #include "distributed/resource_lock.h" +#include "distributed/shardinterval_utils.h" #include "distributed/worker_protocol.h" #include "optimizer/clauses.h" #include "optimizer/predtest.h" @@ -55,11 +56,9 @@ #include "utils/memutils.h" -static void LockShardsForModify(List *shardIntervalList); -static bool HasReplication(List *shardIntervalList); -static int SendQueryToShards(Query *query, List *shardIntervalList, Oid relationId); -static int SendQueryToPlacements(char *shardQueryString, - ShardConnections *shardConnections); +static List * ModifyMultipleShardsTaskList(Query *query, List *shardIntervalList, + Oid relationId); + PG_FUNCTION_INFO_V1(master_modify_multiple_shards); @@ -83,14 +82,12 @@ master_modify_multiple_shards(PG_FUNCTION_ARGS) Query *modifyQuery = NULL; Node *queryTreeNode; List *restrictClauseList = NIL; - bool isTopLevel = true; bool failOK = false; List *shardIntervalList = NIL; List *prunedShardIntervalList = NIL; + List *taskList = NIL; int32 affectedTupleCount = 0; - PreventTransactionChain(isTopLevel, "master_modify_multiple_shards"); - queryTreeNode = ParseTreeNode(queryString); if (IsA(queryTreeNode, DeleteStmt)) { @@ -163,183 +160,50 @@ master_modify_multiple_shards(PG_FUNCTION_ARGS) CHECK_FOR_INTERRUPTS(); - LockShardsForModify(prunedShardIntervalList); - - affectedTupleCount = SendQueryToShards(modifyQuery, prunedShardIntervalList, - relationId); + taskList = ModifyMultipleShardsTaskList(modifyQuery, prunedShardIntervalList, + relationId); + affectedTupleCount = ExecuteModifyTasks(taskList, false, NULL, NULL, NULL); PG_RETURN_INT32(affectedTupleCount); } /* - * LockShardsForModify command locks the replicas of given shard. The - * lock logic is slightly different from LockShards function. Basically, - * - * 1. If citus.all_modifications_commutative is set to true, then all locks - * are acquired as ShareLock. - * 2. If citus.all_modifications_commutative is false, then only the shards - * with 2 or more replicas are locked with ExclusiveLock. Otherwise, the - * lock is acquired with ShareLock. + * ModifyMultipleShardsTaskList builds a list of tasks to execute a query on a + * given list of shards. */ -static void -LockShardsForModify(List *shardIntervalList) +static List * +ModifyMultipleShardsTaskList(Query *query, List *shardIntervalList, Oid relationId) { - LOCKMODE lockMode = NoLock; + List *taskList = NIL; + ListCell *shardIntervalCell = NULL; + uint64 jobId = INVALID_JOB_ID; + int taskId = 1; - if (AllModificationsCommutative) - { - lockMode = ShareLock; - } - else if (!HasReplication(shardIntervalList)) - { - lockMode = ShareLock; - } - else - { - lockMode = ExclusiveLock; - } - - LockShards(shardIntervalList, lockMode); -} - - -/* - * HasReplication checks whether any of the shards in the given list has more - * than one replica. - */ -static bool -HasReplication(List *shardIntervalList) -{ - ListCell *shardIntervalCell; - bool hasReplication = false; + /* lock metadata before getting placment lists */ + LockShardListMetadata(shardIntervalList, ShareLock); foreach(shardIntervalCell, shardIntervalList) { ShardInterval *shardInterval = (ShardInterval *) lfirst(shardIntervalCell); - uint64 shardId = shardInterval->shardId; - List *shardPlacementList = FinalizedShardPlacementList(shardId); - if (shardPlacementList->length > 1) - { - hasReplication = true; - } - } - - return hasReplication; -} - - -/* - * SendQueryToShards executes the given query in all placements of the given - * shard list and returns the total affected tuple count. The execution is done - * in a distributed transaction and the commit protocol is decided according to - * the value of citus.multi_shard_commit_protocol parameter. SendQueryToShards - * does not acquire locks for the shards so it is advised to acquire locks to - * the shards when necessary before calling SendQueryToShards. - */ -static int -SendQueryToShards(Query *query, List *shardIntervalList, Oid relationId) -{ - int affectedTupleCount = 0; - char *relationOwner = TableOwner(relationId); - ListCell *shardIntervalCell = NULL; - - OpenTransactionsToAllShardPlacements(shardIntervalList, relationOwner); - - foreach(shardIntervalCell, shardIntervalList) - { - ShardInterval *shardInterval = (ShardInterval *) lfirst( - shardIntervalCell); Oid relationId = shardInterval->relationId; uint64 shardId = shardInterval->shardId; - bool shardConnectionsFound = false; - ShardConnections *shardConnections = NULL; StringInfo shardQueryString = makeStringInfo(); - char *shardQueryStringData = NULL; - int shardAffectedTupleCount = -1; - - shardConnections = GetShardConnections(shardId, &shardConnectionsFound); - Assert(shardConnectionsFound); + Task *task = NULL; deparse_shard_query(query, relationId, shardId, shardQueryString); - shardQueryStringData = shardQueryString->data; - shardAffectedTupleCount = SendQueryToPlacements(shardQueryStringData, - shardConnections); - affectedTupleCount += shardAffectedTupleCount; + + task = CitusMakeNode(Task); + task->jobId = jobId; + task->taskId = taskId++; + task->taskType = SQL_TASK; + task->queryString = shardQueryString->data; + task->dependedTaskList = NULL; + task->anchorShardId = shardId; + task->taskPlacementList = FinalizedShardPlacementList(shardId); + + taskList = lappend(taskList, task); } - /* check for cancellation one last time before returning */ - CHECK_FOR_INTERRUPTS(); - - return affectedTupleCount; -} - - -/* - * SendQueryToPlacements sends the given query string to all given placement - * connections of a shard. CommitRemoteTransactions or AbortRemoteTransactions - * should be called after all queries have been sent successfully. - */ -static int -SendQueryToPlacements(char *shardQueryString, ShardConnections *shardConnections) -{ - uint64 shardId = shardConnections->shardId; - List *connectionList = shardConnections->connectionList; - ListCell *connectionCell = NULL; - int32 shardAffectedTupleCount = -1; - - Assert(connectionList != NIL); - - foreach(connectionCell, connectionList) - { - TransactionConnection *transactionConnection = - (TransactionConnection *) lfirst(connectionCell); - PGconn *connection = transactionConnection->connection; - PGresult *result = NULL; - char *placementAffectedTupleString = NULL; - int32 placementAffectedTupleCount = -1; - - CHECK_FOR_INTERRUPTS(); - - /* send the query */ - result = PQexec(connection, shardQueryString); - if (PQresultStatus(result) != PGRES_COMMAND_OK) - { - WarnRemoteError(connection, result); - ereport(ERROR, (errmsg("could not send query to shard placement"))); - } - - placementAffectedTupleString = PQcmdTuples(result); - - /* returned tuple count is empty for utility commands, use 0 as affected count */ - if (*placementAffectedTupleString == '\0') - { - placementAffectedTupleCount = 0; - } - else - { - placementAffectedTupleCount = pg_atoi(placementAffectedTupleString, - sizeof(int32), 0); - } - - if ((shardAffectedTupleCount == -1) || - (shardAffectedTupleCount == placementAffectedTupleCount)) - { - shardAffectedTupleCount = placementAffectedTupleCount; - } - else - { - ereport(ERROR, - (errmsg("modified %d tuples, but expected to modify %d", - placementAffectedTupleCount, shardAffectedTupleCount), - errdetail("Affected tuple counts at placements of shard " - UINT64_FORMAT " are different.", shardId))); - } - - PQclear(result); - - transactionConnection->transactionState = TRANSACTION_STATE_OPEN; - } - - return shardAffectedTupleCount; + return taskList; } diff --git a/src/backend/distributed/transaction/multi_shard_transaction.c b/src/backend/distributed/transaction/multi_shard_transaction.c index 09d251c0e..054900469 100644 --- a/src/backend/distributed/transaction/multi_shard_transaction.c +++ b/src/backend/distributed/transaction/multi_shard_transaction.c @@ -153,7 +153,7 @@ BeginTransactionOnShardPlacements(uint64 shardId, char *userName) transactionConnection->groupId = workerNode->groupId; transactionConnection->connectionId = shardConnections->shardId; - transactionConnection->transactionState = TRANSACTION_STATE_INVALID; + transactionConnection->transactionState = TRANSACTION_STATE_OPEN; transactionConnection->connection = connection; transactionConnection->nodeName = shardPlacement->nodeName; transactionConnection->nodePort = shardPlacement->nodePort; diff --git a/src/backend/distributed/utils/connection_cache.c b/src/backend/distributed/utils/connection_cache.c index 8f852fdd4..28f97a8a7 100644 --- a/src/backend/distributed/utils/connection_cache.c +++ b/src/backend/distributed/utils/connection_cache.c @@ -125,21 +125,9 @@ void PurgeConnection(PGconn *connection) { NodeConnectionKey nodeConnectionKey; - PGconn *purgedConnection = NULL; BuildKeyForConnection(connection, &nodeConnectionKey); - purgedConnection = PurgeConnectionByKey(&nodeConnectionKey); - - /* - * It's possible the provided connection matches the host and port for - * an entry in the hash without being precisely the same connection. In - * that case, we will want to close the provided connection in addition - * to the one from the hash (which was closed by PurgeConnectionByKey). - */ - if (purgedConnection != connection) - { - PQfinish(connection); - } + PurgeConnectionByKey(&nodeConnectionKey); } diff --git a/src/backend/distributed/utils/resource_lock.c b/src/backend/distributed/utils/resource_lock.c index f9048ccf9..e7af6f27a 100644 --- a/src/backend/distributed/utils/resource_lock.c +++ b/src/backend/distributed/utils/resource_lock.c @@ -20,6 +20,7 @@ #include "distributed/listutils.h" #include "distributed/master_metadata_utility.h" #include "distributed/metadata_cache.h" +#include "distributed/multi_router_executor.h" #include "distributed/relay_utility.h" #include "distributed/resource_lock.h" #include "distributed/shardinterval_utils.h" @@ -28,9 +29,8 @@ /* * LockShardDistributionMetadata returns after grabbing a lock for distribution - * metadata related to the specified shard, blocking if required. ExclusiveLock - * and ShareLock modes are supported. Any locks acquired using this method are - * released at transaction end. + * metadata related to the specified shard, blocking if required. Any locks + * acquired using this method are released at transaction end. */ void LockShardDistributionMetadata(int64 shardId, LOCKMODE lockMode) @@ -126,12 +126,11 @@ UnlockJobResource(uint64 jobId, LOCKMODE lockmode) /* - * LockShards takes shared locks on the metadata and the data of all shards in - * shardIntervalList. This prevents concurrent placement changes and concurrent - * DML statements that require an exclusive lock. + * LockShardListMetadata takes shared locks on the metadata of all shards in + * shardIntervalList to prevents concurrent placement changes. */ void -LockShards(List *shardIntervalList, LOCKMODE lockMode) +LockShardListMetadata(List *shardIntervalList, LOCKMODE lockMode) { ListCell *shardIntervalCell = NULL; @@ -143,10 +142,28 @@ LockShards(List *shardIntervalList, LOCKMODE lockMode) ShardInterval *shardInterval = (ShardInterval *) lfirst(shardIntervalCell); int64 shardId = shardInterval->shardId; - /* prevent concurrent changes to number of placements */ LockShardDistributionMetadata(shardId, lockMode); + } +} + + +/* + * LockShardListResources takes locks on all shards in shardIntervalList to + * prevent concurrent DML statements on those shards. + */ +void +LockShardListResources(List *shardIntervalList, LOCKMODE lockMode) +{ + ListCell *shardIntervalCell = NULL; + + /* lock shards in order of shard id to prevent deadlock */ + shardIntervalList = SortList(shardIntervalList, CompareShardIntervalsById); + + foreach(shardIntervalCell, shardIntervalList) + { + ShardInterval *shardInterval = (ShardInterval *) lfirst(shardIntervalCell); + int64 shardId = shardInterval->shardId; - /* prevent concurrent update/delete statements */ LockShardResource(shardId, lockMode); } } diff --git a/src/include/distributed/connection_cache.h b/src/include/distributed/connection_cache.h index 54fb97a22..7623cafb2 100644 --- a/src/include/distributed/connection_cache.h +++ b/src/include/distributed/connection_cache.h @@ -58,7 +58,7 @@ typedef enum XACT_MODIFICATION_INVALID = 0, /* placeholder initial value */ XACT_MODIFICATION_NONE, /* no modifications have taken place */ XACT_MODIFICATION_DATA, /* data modifications (DML) have occurred */ - XACT_MODIFICATION_SCHEMA /* schema modifications (DDL) have occurred */ + XACT_MODIFICATION_MULTI_SHARD /* multi-shard modifications have occurred */ } XactModificationType; diff --git a/src/include/distributed/multi_router_executor.h b/src/include/distributed/multi_router_executor.h index 236222f5c..61c3e0c67 100644 --- a/src/include/distributed/multi_router_executor.h +++ b/src/include/distributed/multi_router_executor.h @@ -39,4 +39,8 @@ extern void RouterExecutorFinish(QueryDesc *queryDesc); extern void RouterExecutorEnd(QueryDesc *queryDesc); extern void RegisterRouterExecutorXactCallbacks(void); +extern int64 ExecuteModifyTasks(List *taskList, bool expectResults, + ParamListInfo paramListInfo, MaterialState *routerState, + TupleDesc tupleDescriptor); + #endif /* MULTI_ROUTER_EXECUTOR_H_ */ diff --git a/src/include/distributed/resource_lock.h b/src/include/distributed/resource_lock.h index 17319a079..454e8085f 100644 --- a/src/include/distributed/resource_lock.h +++ b/src/include/distributed/resource_lock.h @@ -75,7 +75,10 @@ extern void UnlockShardResource(uint64 shardId, LOCKMODE lockmode); extern void LockJobResource(uint64 jobId, LOCKMODE lockmode); extern void UnlockJobResource(uint64 jobId, LOCKMODE lockmode); -extern void LockShards(List *shardIntervalList, LOCKMODE lockMode); +/* Lock multiple shards for safe modification */ +extern void LockShardListMetadata(List *shardIntervalList, LOCKMODE lockMode); +extern void LockShardListResources(List *shardIntervalList, LOCKMODE lockMode); + extern void LockMetadataSnapshot(LOCKMODE lockMode); #endif /* RESOURCE_LOCK_H */ diff --git a/src/test/regress/expected/multi_modifying_xacts.out b/src/test/regress/expected/multi_modifying_xacts.out index a7f030076..28f338a1b 100644 --- a/src/test/regress/expected/multi_modifying_xacts.out +++ b/src/test/regress/expected/multi_modifying_xacts.out @@ -168,13 +168,13 @@ ABORT; BEGIN; INSERT INTO labs VALUES (6, 'Bell Labs'); ALTER TABLE labs ADD COLUMN motto text; -ERROR: distributed DDL commands must not appear within transaction blocks containing data modifications +ERROR: distributed DDL commands must not appear within transaction blocks containing single-shard data modifications COMMIT; -- whether it occurs first or second BEGIN; ALTER TABLE labs ADD COLUMN motto text; INSERT INTO labs VALUES (6, 'Bell Labs'); -ERROR: distributed data modifications must not appear in transaction blocks which contain distributed DDL commands +ERROR: single-shard DML commands must not appear in transaction blocks which contain multi-shard data modifications COMMIT; -- but the DDL should correctly roll back \d labs @@ -244,7 +244,7 @@ SELECT * FROM labs WHERE id = 12; BEGIN; \copy labs from stdin delimiter ',' ALTER TABLE labs ADD COLUMN motto text; -ERROR: distributed DDL commands must not appear within transaction blocks containing data modifications +ERROR: distributed DDL commands must not appear within transaction blocks containing single-shard data modifications COMMIT; -- the DDL fails, but copy persists \d labs diff --git a/src/test/regress/expected/multi_shard_modify.out b/src/test/regress/expected/multi_shard_modify.out index d722c3ad3..328b6ce33 100644 --- a/src/test/regress/expected/multi_shard_modify.out +++ b/src/test/regress/expected/multi_shard_modify.out @@ -22,11 +22,27 @@ SELECT master_create_worker_shards('multi_shard_modify_test', 4, 2); COPY multi_shard_modify_test (t_key, t_name, t_value) FROM STDIN WITH (FORMAT 'csv'); -- Testing master_modify_multiple_shards --- Verify that master_modify_multiple_shards cannot be called in a transaction block +-- Verify that master_modify_multiple_shards can be rolled back BEGIN; SELECT master_modify_multiple_shards('DELETE FROM multi_shard_modify_test WHERE t_key > 10 AND t_key <= 13'); -ERROR: master_modify_multiple_shards cannot run inside a transaction block + master_modify_multiple_shards +------------------------------- + 3 +(1 row) + +SELECT master_modify_multiple_shards('DELETE FROM multi_shard_modify_test WHERE t_key = 202'); + master_modify_multiple_shards +------------------------------- + 1 +(1 row) + ROLLBACK; +SELECT count(*) FROM multi_shard_modify_test; + count +------- + 27 +(1 row) + -- Check that master_modify_multiple_shards cannot be called with non-distributed tables CREATE TEMPORARY TABLE temporary_nondistributed_table (col_1 integer,col_2 text); INSERT INTO temporary_nondistributed_table VALUES (37, 'eren'), (31, 'onder'); diff --git a/src/test/regress/expected/multi_truncate.out b/src/test/regress/expected/multi_truncate.out index 09fb65e06..0e17df7de 100644 --- a/src/test/regress/expected/multi_truncate.out +++ b/src/test/regress/expected/multi_truncate.out @@ -138,11 +138,15 @@ SELECT shardid FROM pg_dist_shard where logicalrelid = 'test_truncate_range'::re 1210005 (3 rows) --- command can not be run inside transaction -BEGIN; TRUNCATE TABLE test_truncate_range; COMMIT; -ERROR: master_modify_multiple_shards cannot run inside a transaction block -CONTEXT: SQL statement "SELECT master_modify_multiple_shards(commandText)" -PL/pgSQL function citus_truncate_trigger() line 17 at PERFORM +-- verify that truncate can be aborted +INSERT INTO test_truncate_range VALUES (1); +BEGIN; TRUNCATE TABLE test_truncate_range; ROLLBACK; +SELECT count(*) FROM test_truncate_range; + count +------- + 1 +(1 row) + DROP TABLE test_truncate_range; -- -- truncate for hash distribution. @@ -226,11 +230,15 @@ SELECT shardid FROM pg_dist_shard where logicalrelid = 'test_truncate_hash'::reg 1210009 (4 rows) --- command can not be run inside transaction -BEGIN; TRUNCATE TABLE test_truncate_hash; COMMIT; -ERROR: master_modify_multiple_shards cannot run inside a transaction block -CONTEXT: SQL statement "SELECT master_modify_multiple_shards(commandText)" -PL/pgSQL function citus_truncate_trigger() line 17 at PERFORM +-- verify that truncate can be aborted +INSERT INTO test_truncate_hash VALUES (1); +BEGIN; TRUNCATE TABLE test_truncate_hash; ROLLBACK; +SELECT count(*) FROM test_truncate_hash; + count +------- + 1 +(1 row) + DROP TABLE test_truncate_hash; -- test with table with spaces in it CREATE TABLE "a b hash" (a int, b int); diff --git a/src/test/regress/sql/multi_shard_modify.sql b/src/test/regress/sql/multi_shard_modify.sql index 9b1fabf26..6564e6f11 100644 --- a/src/test/regress/sql/multi_shard_modify.sql +++ b/src/test/regress/sql/multi_shard_modify.sql @@ -46,11 +46,15 @@ COPY multi_shard_modify_test (t_key, t_name, t_value) FROM STDIN WITH (FORMAT 'c \. -- Testing master_modify_multiple_shards --- Verify that master_modify_multiple_shards cannot be called in a transaction block + +-- Verify that master_modify_multiple_shards can be rolled back BEGIN; SELECT master_modify_multiple_shards('DELETE FROM multi_shard_modify_test WHERE t_key > 10 AND t_key <= 13'); +SELECT master_modify_multiple_shards('DELETE FROM multi_shard_modify_test WHERE t_key = 202'); ROLLBACK; +SELECT count(*) FROM multi_shard_modify_test; + -- Check that master_modify_multiple_shards cannot be called with non-distributed tables CREATE TEMPORARY TABLE temporary_nondistributed_table (col_1 integer,col_2 text); INSERT INTO temporary_nondistributed_table VALUES (37, 'eren'), (31, 'onder'); diff --git a/src/test/regress/sql/multi_truncate.sql b/src/test/regress/sql/multi_truncate.sql index 4abc9f27e..dcd5ea306 100644 --- a/src/test/regress/sql/multi_truncate.sql +++ b/src/test/regress/sql/multi_truncate.sql @@ -89,8 +89,10 @@ SELECT count(*) FROM test_truncate_range; -- verify 3 shards are still present SELECT shardid FROM pg_dist_shard where logicalrelid = 'test_truncate_range'::regclass; --- command can not be run inside transaction -BEGIN; TRUNCATE TABLE test_truncate_range; COMMIT; +-- verify that truncate can be aborted +INSERT INTO test_truncate_range VALUES (1); +BEGIN; TRUNCATE TABLE test_truncate_range; ROLLBACK; +SELECT count(*) FROM test_truncate_range; DROP TABLE test_truncate_range; @@ -136,8 +138,10 @@ SELECT count(*) FROM test_truncate_hash; -- verify 4 shards are still presents SELECT shardid FROM pg_dist_shard where logicalrelid = 'test_truncate_hash'::regclass; --- command can not be run inside transaction -BEGIN; TRUNCATE TABLE test_truncate_hash; COMMIT; +-- verify that truncate can be aborted +INSERT INTO test_truncate_hash VALUES (1); +BEGIN; TRUNCATE TABLE test_truncate_hash; ROLLBACK; +SELECT count(*) FROM test_truncate_hash; DROP TABLE test_truncate_hash; @@ -173,4 +177,4 @@ TRUNCATE TABLE "a b append"; -- verify all shards are dropped SELECT shardid FROM pg_dist_shard where logicalrelid = '"a b append"'::regclass; -DROP TABLE "a b append"; \ No newline at end of file +DROP TABLE "a b append"; From 65f6d7c02aaea61b6c97b3490ef01327469616cc Mon Sep 17 00:00:00 2001 From: Marco Slot Date: Tue, 18 Oct 2016 10:21:52 +0200 Subject: [PATCH 4/4] Follow consistent execution order in parallel commands --- .../executor/multi_router_executor.c | 240 +++++++++++++----- 1 file changed, 176 insertions(+), 64 deletions(-) diff --git a/src/backend/distributed/executor/multi_router_executor.c b/src/backend/distributed/executor/multi_router_executor.c index 6c0238a23..bbce5d070 100644 --- a/src/backend/distributed/executor/multi_router_executor.c +++ b/src/backend/distributed/executor/multi_router_executor.c @@ -96,6 +96,8 @@ static void ExecuteMultipleTasks(QueryDesc *queryDesc, List *taskList, bool isModificationQuery, bool expectResults); static List * TaskShardIntervalList(List *taskList); static void AcquireExecutorShardLock(Task *task, CmdType commandType); +static void AcquireExecutorMultiShardLocks(List *shardIntervalList); +static bool IsReplicated(List *shardIntervalList); static uint64 ReturnRowsFromTuplestore(uint64 tupleCount, TupleDesc tupleDescriptor, DestReceiver *destination, Tuplestorestate *tupleStore); @@ -252,14 +254,14 @@ AcquireExecutorShardLock(Task *task, CmdType commandType) * Bypass commutativity checks when citus.all_modifications_commutative * is enabled. * - * A ShareLock does not conflict with itself and therefore allows + * A RowExclusiveLock does not conflict with itself and therefore allows * multiple commutative commands to proceed concurrently. It does * conflict with ExclusiveLock, which may still be obtained by another * session that executes an UPDATE/DELETE/UPSERT command with * citus.all_modifications_commutative disabled. */ - lockMode = ShareLock; + lockMode = RowExclusiveLock; } else if (task->upsertQuery || commandType == CMD_UPDATE || commandType == CMD_DELETE) { @@ -291,13 +293,13 @@ AcquireExecutorShardLock(Task *task, CmdType commandType) * UPDATE/DELETE/UPSERT may consider the INSERT, depending on execution * order. * - * A ShareLock does not conflict with itself and therefore allows + * A RowExclusiveLock does not conflict with itself and therefore allows * multiple INSERT commands to proceed concurrently. It conflicts with * ExclusiveLock obtained by UPDATE/DELETE/UPSERT, ensuring those do * not run concurrently with INSERT. */ - lockMode = ShareLock; + lockMode = RowExclusiveLock; } else { @@ -311,6 +313,81 @@ AcquireExecutorShardLock(Task *task, CmdType commandType) } +/* + * AcquireExecutorMultiShardLocks acquires shard locks need for execution + * of writes on multiple shards. + * + * 1. If citus.all_modifications_commutative is set to true, then all locks + * are acquired as ShareUpdateExclusiveLock. + * 2. If citus.all_modifications_commutative is false, then only the shards + * with 2 or more replicas are locked with ExclusiveLock. Otherwise, the + * lock is acquired with ShareUpdateExclusiveLock. + * + * ShareUpdateExclusiveLock conflicts with itself such that only one + * multi-shard modification at a time is allowed on a shard. It also conflicts + * with ExclusiveLock, which ensures that updates/deletes/upserts are applied + * in the same order on all placements. It does not conflict with ShareLock, + * which is normally obtained by single-shard commutative writes. + */ +static void +AcquireExecutorMultiShardLocks(List *shardIntervalList) +{ + LOCKMODE lockMode = NoLock; + + if (AllModificationsCommutative || !IsReplicated(shardIntervalList)) + { + /* + * When all writes are commutative then we only need to prevent multi-shard + * commands from running concurrently with each other and with commands + * that are explicitly non-commutative. When there is not replication then + * we only need to prevent concurrent multi-shard commands. + * + * In either case, ShareUpdateExclusive has the desired effect, since + * it conflicts with itself and ExclusiveLock (taken by non-commutative + * writes). + */ + + lockMode = ShareUpdateExclusiveLock; + } + else + { + /* + * When there is replication, prevent all concurrent writes to the same + * shards to ensure the writes are ordered. + */ + lockMode = ExclusiveLock; + } + + LockShardListResources(shardIntervalList, lockMode); +} + + +/* + * IsReplicated checks whether any of the shards in the given list has more + * than one replica. + */ +static bool +IsReplicated(List *shardIntervalList) +{ + ListCell *shardIntervalCell; + bool hasReplication = false; + + foreach(shardIntervalCell, shardIntervalList) + { + ShardInterval *shardInterval = (ShardInterval *) lfirst(shardIntervalCell); + uint64 shardId = shardInterval->shardId; + List *shardPlacementList = FinalizedShardPlacementList(shardId); + if (shardPlacementList->length > 1) + { + hasReplication = true; + break; + } + } + + return hasReplication; +} + + /* * CreateXactParticipantHash initializes the map used to store the connections * needed to process distributed transactions. Unlike the connection cache, we @@ -422,8 +499,10 @@ RouterExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count) deparse_shard_query(query, relationId, task->anchorShardId, newQueryString); - elog(DEBUG4, "query before master evaluation: %s", task->queryString); - elog(DEBUG4, "query after master evaluation: %s", newQueryString->data); + ereport(DEBUG4, (errmsg("query before master evaluation: %s", + task->queryString))); + ereport(DEBUG4, (errmsg("query after master evaluation: %s", + newQueryString->data))); task->queryString = newQueryString->data; } @@ -698,6 +777,9 @@ ExecuteModifyTasks(List *taskList, bool expectResults, ParamListInfo paramListIn ListCell *taskCell = NULL; char *userName = CurrentUserName(); List *shardIntervalList = NIL; + List *affectedTupleCountList = NIL; + bool tasksPending = true; + int placementIndex = 0; if (XactModificationLevel == XACT_MODIFICATION_DATA) { @@ -712,76 +794,87 @@ ExecuteModifyTasks(List *taskList, bool expectResults, ParamListInfo paramListIn shardIntervalList = TaskShardIntervalList(taskList); /* ensure that there are no concurrent modifications on the same shards */ - LockShardListResources(shardIntervalList, ExclusiveLock); + AcquireExecutorMultiShardLocks(shardIntervalList); /* open connection to all relevant placements, if not already open */ OpenTransactionsToAllShardPlacements(shardIntervalList, userName); - /* send command to all relevant shard placements */ - foreach(taskCell, taskList) + /* iterate over placements in rounds, to ensure in-order execution */ + while (tasksPending) { - Task *task = (Task *) lfirst(taskCell); - int64 shardId = task->anchorShardId; - char *queryString = task->queryString; - bool shardConnectionsFound = false; - ShardConnections *shardConnections = NULL; - List *connectionList = NIL; - ListCell *connectionCell = NULL; + int taskIndex = 0; - shardConnections = GetShardConnections(shardId, &shardConnectionsFound); - Assert(shardConnectionsFound); + tasksPending = false; - connectionList = shardConnections->connectionList; - Assert(connectionList != NIL); - - foreach(connectionCell, connectionList) + /* send command to all shard placements with the current index in parallel */ + foreach(taskCell, taskList) { - TransactionConnection *transactionConnection = - (TransactionConnection *) lfirst(connectionCell); - PGconn *connection = transactionConnection->connection; + Task *task = (Task *) lfirst(taskCell); + int64 shardId = task->anchorShardId; + char *queryString = task->queryString; + bool shardConnectionsFound = false; + ShardConnections *shardConnections = NULL; + List *connectionList = NIL; + TransactionConnection *transactionConnection = NULL; + PGconn *connection = NULL; bool queryOK = false; + shardConnections = GetShardConnections(shardId, &shardConnectionsFound); + connectionList = shardConnections->connectionList; + + if (placementIndex >= list_length(connectionList)) + { + /* no more active placements for this task */ + continue; + } + + transactionConnection = + (TransactionConnection *) list_nth(connectionList, placementIndex); + connection = transactionConnection->connection; + queryOK = SendQueryInSingleRowMode(connection, queryString, paramListInfo); if (!queryOK) { ReraiseRemoteError(connection, NULL); } } - } - /* collects results from all relevant shard placements */ - foreach(taskCell, taskList) - { - Task *task = (Task *) lfirst(taskCell); - int64 shardId = task->anchorShardId; - bool shardConnectionsFound = false; - ShardConnections *shardConnections = NULL; - List *connectionList = NIL; - ListCell *connectionCell = NULL; - int64 affectedTupleCount = 0; - bool gotResults = false; - - /* abort in case of cancellation */ - CHECK_FOR_INTERRUPTS(); - - shardConnections = GetShardConnections(shardId, &shardConnectionsFound); - connectionList = shardConnections->connectionList; - - foreach(connectionCell, connectionList) + /* collects results from all relevant shard placements */ + foreach(taskCell, taskList) { - TransactionConnection *transactionConnection = - (TransactionConnection *) lfirst(connectionCell); - PGconn *connection = transactionConnection->connection; + Task *task = (Task *) lfirst(taskCell); + int64 shardId = task->anchorShardId; + bool shardConnectionsFound = false; + ShardConnections *shardConnections = NULL; + List *connectionList = NIL; + TransactionConnection *transactionConnection = NULL; + PGconn *connection = NULL; int64 currentAffectedTupleCount = 0; bool failOnError = true; bool queryOK PG_USED_FOR_ASSERTS_ONLY = false; + /* abort in case of cancellation */ + CHECK_FOR_INTERRUPTS(); + + shardConnections = GetShardConnections(shardId, &shardConnectionsFound); + connectionList = shardConnections->connectionList; + + if (placementIndex >= list_length(connectionList)) + { + /* no more active placements for this task */ + continue; + } + + transactionConnection = + (TransactionConnection *) list_nth(connectionList, placementIndex); + connection = transactionConnection->connection; + /* * If caller is interested, store query results the first time * through. The output of the query's execution on other shards is * discarded if we run there (because it's a modification query). */ - if (!gotResults && expectResults) + if (placementIndex == 0 && expectResults) { Assert(routerState != NULL && tupleDescriptor != NULL); @@ -797,26 +890,45 @@ ExecuteModifyTasks(List *taskList, bool expectResults, ParamListInfo paramListIn /* should have rolled back on error */ Assert(queryOK); - if (!gotResults) + if (placementIndex == 0) { - affectedTupleCount = currentAffectedTupleCount; - totalAffectedTupleCount += affectedTupleCount; - } - else if (currentAffectedTupleCount != affectedTupleCount) - { - char *nodeName = ConnectionGetOptionValue(connection, "host"); - char *nodePort = ConnectionGetOptionValue(connection, "port"); + totalAffectedTupleCount += currentAffectedTupleCount; - ereport(WARNING, - (errmsg("modified "INT64_FORMAT " tuples, but expected " - "to modify "INT64_FORMAT, - currentAffectedTupleCount, affectedTupleCount), - errdetail("modified placement on %s:%s", nodeName, - nodePort))); + /* keep track of the initial affected tuple count */ + affectedTupleCountList = lappend_int(affectedTupleCountList, + currentAffectedTupleCount); + } + else + { + /* warn the user if shard placements have diverged */ + int64 previousAffectedTupleCount = list_nth_int(affectedTupleCountList, + taskIndex); + + if (currentAffectedTupleCount != previousAffectedTupleCount) + { + char *nodeName = ConnectionGetOptionValue(connection, "host"); + char *nodePort = ConnectionGetOptionValue(connection, "port"); + + ereport(WARNING, + (errmsg("modified "INT64_FORMAT " tuples of shard " + UINT64_FORMAT ", but expected to modify "INT64_FORMAT, + currentAffectedTupleCount, shardId, + previousAffectedTupleCount), + errdetail("modified placement on %s:%s", nodeName, + nodePort))); + } } - gotResults = true; + if (!tasksPending && placementIndex + 1 < list_length(connectionList)) + { + /* more tasks to be done after thise one */ + tasksPending = true; + } + + taskIndex++; } + + placementIndex++; } CHECK_FOR_INTERRUPTS();