Merge pull request #5386 from citusdata/simplify_2pc_decision

Simplify 2PC decision in the executor
pull/5406/head
Önder Kalacı 2021-10-23 09:35:22 +02:00 committed by GitHub
commit fc00ddee4e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 28 additions and 26 deletions

View File

@ -1407,12 +1407,7 @@ DistributedExecutionRequiresRollback(List *taskList)
/* /*
* TaskListRequires2PC determines whether the given task list requires 2PC * TaskListRequires2PC determines whether the given task list requires 2PC.
* because the tasks provided operates on a reference table or there are multiple
* tasks and the commit protocol is 2PC.
*
* Note that we currently do not generate tasks lists that involves multiple different
* tables, thus we only check the first task in the list for reference tables.
*/ */
static bool static bool
TaskListRequires2PC(List *taskList) TaskListRequires2PC(List *taskList)
@ -1423,29 +1418,28 @@ TaskListRequires2PC(List *taskList)
} }
Task *task = (Task *) linitial(taskList); Task *task = (Task *) linitial(taskList);
if (list_length(task->taskPlacementList) > 1) if (ReadOnlyTask(task->taskType))
{ {
/* /* we do not trigger 2PC for ReadOnly queries */
* Even single DML/DDL tasks with replicated tables return false;
* (including reference and non-reference tables) }
* should require BEGIN/COMMIT/ROLLBACK.
bool singleTask = list_length(taskList) == 1;
if (singleTask && list_length(task->taskPlacementList) == 1)
{
/* we do not trigger 2PC for modifications that are:
* - single task
* - single placement
*/ */
return true; return false;
} }
bool multipleTasks = list_length(taskList) > 1; /*
if (!ReadOnlyTask(task->taskType) && multipleTasks) * Otherwise, all modifications are done via 2PC. This includes:
{ * - Multi-shard commands irrespective of the replication factor
/* all multi-shard modifications use 2PC */ * - Single-shard commands that are targeting more than one replica
return true; */
} return true;
if (task->taskType == DDL_TASK)
{
return true;
}
return false;
} }
@ -3398,8 +3392,16 @@ ConnectionStateMachine(WorkerSession *session)
/* /*
* The execution may have failed as a result of WorkerSessionFailed * The execution may have failed as a result of WorkerSessionFailed
* or WorkerPoolFailed. * or WorkerPoolFailed.
*
* Even if this execution has not failed -- but just a single session is
* failed -- and an earlier execution in this transaction which marked
* the remote transaction as critical, we should fail right away as the
* transaction will fail anyway on PREPARE/COMMIT time.
*/ */
if (execution->failed || RemoteTransaction *transaction = &connection->remoteTransaction;
if (transaction->transactionCritical ||
execution->failed ||
(execution->transactionProperties->errorOnAnyFailure && (execution->transactionProperties->errorOnAnyFailure &&
workerPool->failureState != WORKER_POOL_FAILED_OVER_TO_LOCAL)) workerPool->failureState != WORKER_POOL_FAILED_OVER_TO_LOCAL))
{ {