Fix transactions

marcocitus/fix-pgazure
Marco Slot 2021-01-07 07:16:26 +01:00
parent 75c533ca02
commit b4f2c92d1b
2 changed files with 13 additions and 11 deletions

View File

@ -1167,18 +1167,9 @@ DecideTransactionPropertiesForTaskList(RowModifyLevel modLevel, List *taskList,
if (GetCurrentLocalExecutionStatus() == LOCAL_EXECUTION_REQUIRED) if (GetCurrentLocalExecutionStatus() == LOCAL_EXECUTION_REQUIRED)
{ {
/*
* In case localExecutionHappened, we force the executor to use 2PC.
* The primary motivation is that at this point we're definitely expanding
* the nodes participated in the transaction. And, by re-generating the
* remote task lists during local query execution, we might prevent the adaptive
* executor to kick-in 2PC (or even start coordinated transaction, that's why
* we prefer adding this check here instead of
* Activate2PCIfModifyingTransactionExpandsToNewNode()).
*/
xactProperties.errorOnAnyFailure = true; xactProperties.errorOnAnyFailure = true;
xactProperties.useRemoteTransactionBlocks = TRANSACTION_BLOCKS_REQUIRED; xactProperties.useRemoteTransactionBlocks = TRANSACTION_BLOCKS_REQUIRED;
xactProperties.requires2PC = true; xactProperties.requires2PC = false;
return xactProperties; return xactProperties;
} }
@ -1440,6 +1431,11 @@ TaskListRequires2PC(List *taskList)
} }
Task *task = (Task *) linitial(taskList); Task *task = (Task *) linitial(taskList);
if (ReadOnlyTask(task->taskType))
{
return false;
}
if (task->replicationModel == REPLICATION_MODEL_2PC) if (task->replicationModel == REPLICATION_MODEL_2PC)
{ {
return true; return true;
@ -3187,6 +3183,12 @@ Activate2PCIfModifyingTransactionExpandsToNewNode(WorkerSession *session)
return; return;
} }
if (GetCurrentLocalExecutionStatus() == LOCAL_EXECUTION_REQUIRED)
{
/* we did local execution and are expanding to an additional node */
CoordinatedTransactionUse2PC();
}
DistributedExecution *execution = session->workerPool->distributedExecution; DistributedExecution *execution = session->workerPool->distributedExecution;
if (TransactionModifiedDistributedTable(execution) && if (TransactionModifiedDistributedTable(execution) &&
DistributedExecutionModifiesDatabase(execution) && DistributedExecutionModifiesDatabase(execution) &&

View File

@ -460,7 +460,7 @@ CoordinatedTransactionCallback(XactEvent event, void *arg)
case XACT_EVENT_PARALLEL_PRE_COMMIT: case XACT_EVENT_PARALLEL_PRE_COMMIT:
case XACT_EVENT_PRE_PREPARE: case XACT_EVENT_PRE_PREPARE:
{ {
if (InCoordinatedTransaction()) if (CoordinatedTransactionUses2PC)
{ {
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot use 2PC in transactions involving " errmsg("cannot use 2PC in transactions involving "