From b4f2c92d1be447f0f3bc9498839e9f6cf71f01bd Mon Sep 17 00:00:00 2001 From: Marco Slot Date: Thu, 7 Jan 2021 07:16:26 +0100 Subject: [PATCH] Fix transactions --- .../distributed/executor/adaptive_executor.c | 22 ++++++++++--------- .../transaction/transaction_management.c | 2 +- 2 files changed, 13 insertions(+), 11 deletions(-) diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index 8ad9d7e11..4597cc121 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -1167,18 +1167,9 @@ DecideTransactionPropertiesForTaskList(RowModifyLevel modLevel, List *taskList, if (GetCurrentLocalExecutionStatus() == LOCAL_EXECUTION_REQUIRED) { - /* - * In case localExecutionHappened, we force the executor to use 2PC. - * The primary motivation is that at this point we're definitely expanding - * the nodes participated in the transaction. And, by re-generating the - * remote task lists during local query execution, we might prevent the adaptive - * executor to kick-in 2PC (or even start coordinated transaction, that's why - * we prefer adding this check here instead of - * Activate2PCIfModifyingTransactionExpandsToNewNode()). - */ xactProperties.errorOnAnyFailure = true; xactProperties.useRemoteTransactionBlocks = TRANSACTION_BLOCKS_REQUIRED; - xactProperties.requires2PC = true; + xactProperties.requires2PC = false; return xactProperties; } @@ -1440,6 +1431,11 @@ TaskListRequires2PC(List *taskList) } Task *task = (Task *) linitial(taskList); + if (ReadOnlyTask(task->taskType)) + { + return false; + } + if (task->replicationModel == REPLICATION_MODEL_2PC) { return true; @@ -3187,6 +3183,12 @@ Activate2PCIfModifyingTransactionExpandsToNewNode(WorkerSession *session) return; } + if (GetCurrentLocalExecutionStatus() == LOCAL_EXECUTION_REQUIRED) + { + /* we did local execution and are expanding to an additional node */ + CoordinatedTransactionUse2PC(); + } + DistributedExecution *execution = session->workerPool->distributedExecution; if (TransactionModifiedDistributedTable(execution) && DistributedExecutionModifiesDatabase(execution) && diff --git a/src/backend/distributed/transaction/transaction_management.c b/src/backend/distributed/transaction/transaction_management.c index 96a4180a4..e5b7740fd 100644 --- a/src/backend/distributed/transaction/transaction_management.c +++ b/src/backend/distributed/transaction/transaction_management.c @@ -460,7 +460,7 @@ CoordinatedTransactionCallback(XactEvent event, void *arg) case XACT_EVENT_PARALLEL_PRE_COMMIT: case XACT_EVENT_PRE_PREPARE: { - if (InCoordinatedTransaction()) + if (CoordinatedTransactionUses2PC) { ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("cannot use 2PC in transactions involving "