Move EnsureCompatibleLocalExecutionState to local_executor.c

disable_normal_joins
Marco Slot 2022-02-22 14:06:02 +01:00
parent 360dd14d8a
commit 87b47c42c8
3 changed files with 28 additions and 0 deletions

View File

@ -1055,6 +1055,14 @@ ExecuteTaskListExtended(ExecutionParams *executionParams)
defaultTupleDest, &executionParams->xactProperties,
executionParams->jobIdList, executionParams->localExecutionSupported);
/*
* If current transaction accessed local placements and task list includes
* tasks that should be executed locally (accessing any of the local placements),
* then we should error out as it would cause inconsistencies across the
* remote connection and local execution.
*/
EnsureCompatibleLocalExecutionState(execution->remoteTaskList);
/* run the remote execution */
StartDistributedExecution(execution);
RunDistributedExecution(execution);

View File

@ -915,6 +915,25 @@ TaskAccessesLocalNode(Task *task)
}
/*
* EnsureCompatibleLocalExecutionState makes sure that the tasks won't have
* any visibility problems because of local execution.
*/
void
EnsureCompatibleLocalExecutionState(List *taskList)
{
/*
* We have LOCAL_EXECUTION_REQUIRED check here to avoid unnecessarily
* iterating the task list in AnyTaskAccessesLocalNode.
*/
if (GetCurrentLocalExecutionStatus() == LOCAL_EXECUTION_REQUIRED &&
AnyTaskAccessesLocalNode(taskList))
{
ErrorIfTransactionAccessedPlacementsLocally();
}
}
/*
* ErrorIfTransactionAccessedPlacementsLocally errors out if a local query
* on any shard has already been executed in the same transaction.

View File

@ -43,6 +43,7 @@ extern void ExecuteUtilityCommand(const char *utilityCommand);
extern bool ShouldExecuteTasksLocally(List *taskList);
extern bool AnyTaskAccessesLocalNode(List *taskList);
extern bool TaskAccessesLocalNode(Task *task);
extern void EnsureCompatibleLocalExecutionState(List *taskList);
extern void ErrorIfTransactionAccessedPlacementsLocally(void);
extern void DisableLocalExecution(void);
extern void SetLocalExecutionStatus(LocalExecutionStatus newStatus);