From ce4c4540c5a321c888fa64e1ca8c2fc73411e4ba Mon Sep 17 00:00:00 2001 From: Onder Kalaci Date: Mon, 18 Oct 2021 12:33:39 +0200 Subject: [PATCH] Simplify 2PC decision in the executor It seems like the decision for 2PC is more complicated than it should be. With this change, we do one behavioral change. In essense, before this commit, when a SELECT task with replication factor > 1 is executed, the executor was triggering 2PC. And, in fact, the transaction manager (`ConnectionModifiedPlacement()`) was able to understand not to trigger 2PC when no modification happens. However, for transaction blocks like: BEGIN; -- a command that triggers 2PC -- A SELECT command on replication > 1 .. COMMIT; The SELECT was used to be qualified as required 2PC. And, as a side-effect the executor was setting `xactProperties.errorOnAnyFailure = true;` So, the commands was failing at the time of execution. Now, they fail at the end of the transaction. --- .../distributed/executor/adaptive_executor.c | 54 ++++++++++--------- 1 file changed, 28 insertions(+), 26 deletions(-) diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index 90fa0e77b..b605ff719 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -1407,12 +1407,7 @@ DistributedExecutionRequiresRollback(List *taskList) /* - * TaskListRequires2PC determines whether the given task list requires 2PC - * because the tasks provided operates on a reference table or there are multiple - * tasks and the commit protocol is 2PC. - * - * Note that we currently do not generate tasks lists that involves multiple different - * tables, thus we only check the first task in the list for reference tables. + * TaskListRequires2PC determines whether the given task list requires 2PC. */ static bool TaskListRequires2PC(List *taskList) @@ -1423,29 +1418,28 @@ TaskListRequires2PC(List *taskList) } Task *task = (Task *) linitial(taskList); - if (list_length(task->taskPlacementList) > 1) + if (ReadOnlyTask(task->taskType)) { - /* - * Even single DML/DDL tasks with replicated tables - * (including reference and non-reference tables) - * should require BEGIN/COMMIT/ROLLBACK. + /* we do not trigger 2PC for ReadOnly queries */ + return false; + } + + bool singleTask = list_length(taskList) == 1; + if (singleTask && list_length(task->taskPlacementList) == 1) + { + /* we do not trigger 2PC for modifications that are: + * - single task + * - single placement */ - return true; + return false; } - bool multipleTasks = list_length(taskList) > 1; - if (!ReadOnlyTask(task->taskType) && multipleTasks) - { - /* all multi-shard modifications use 2PC */ - return true; - } - - if (task->taskType == DDL_TASK) - { - return true; - } - - return false; + /* + * Otherwise, all modifications are done via 2PC. This includes: + * - Multi-shard commands irrespective of the replication factor + * - Single-shard commands that are targeting more than one replica + */ + return true; } @@ -3398,8 +3392,16 @@ ConnectionStateMachine(WorkerSession *session) /* * The execution may have failed as a result of WorkerSessionFailed * or WorkerPoolFailed. + * + * Even if this execution has not failed -- but just a single session is + * failed -- and an earlier execution in this transaction which marked + * the remote transaction as critical, we should fail right away as the + * transaction will fail anyway on PREPARE/COMMIT time. */ - if (execution->failed || + RemoteTransaction *transaction = &connection->remoteTransaction; + + if (transaction->transactionCritical || + execution->failed || (execution->transactionProperties->errorOnAnyFailure && workerPool->failureState != WORKER_POOL_FAILED_OVER_TO_LOCAL)) {