diff --git a/src/backend/distributed/commands/multi_copy.c b/src/backend/distributed/commands/multi_copy.c index d0af329c4..eb47a7ae2 100644 --- a/src/backend/distributed/commands/multi_copy.c +++ b/src/backend/distributed/commands/multi_copy.c @@ -2097,7 +2097,7 @@ ShouldExecuteCopyLocally(bool isIntermediateResult) return false; } - if (CurrentLocalExecutionStatus == LOCAL_EXECUTION_REQUIRED) + if (GetCurrentLocalExecutionStatus() == LOCAL_EXECUTION_REQUIRED) { /* * For various reasons, including the transaction visibility @@ -2120,7 +2120,7 @@ ShouldExecuteCopyLocally(bool isIntermediateResult) } /* if we connected to the localhost via a connection, we might not be able to see some previous changes that are done via the connection */ - return CurrentLocalExecutionStatus != LOCAL_EXECUTION_DISABLED && + return GetCurrentLocalExecutionStatus() != LOCAL_EXECUTION_DISABLED && IsMultiStatementTransaction(); } diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index 2b05012c6..5ecdad20a 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -1004,7 +1004,7 @@ ExecuteTaskListExtended(ExecutionParams *executionParams) * then we should error out as it would cause inconsistencies across the * remote connection and local execution. */ - if (CurrentLocalExecutionStatus == LOCAL_EXECUTION_REQUIRED && + if (GetCurrentLocalExecutionStatus() == LOCAL_EXECUTION_REQUIRED && AnyTaskAccessesLocalNode(remoteTaskList)) { ErrorIfTransactionAccessedPlacementsLocally(); @@ -1187,7 +1187,7 @@ DecideTransactionPropertiesForTaskList(RowModifyLevel modLevel, List *taskList, return xactProperties; } - if (CurrentLocalExecutionStatus == LOCAL_EXECUTION_REQUIRED) + if (GetCurrentLocalExecutionStatus() == LOCAL_EXECUTION_REQUIRED) { /* * In case localExecutionHappened, we force the executor to use 2PC. diff --git a/src/backend/distributed/executor/local_executor.c b/src/backend/distributed/executor/local_executor.c index c4e59a7ed..bcbdf823f 100644 --- a/src/backend/distributed/executor/local_executor.c +++ b/src/backend/distributed/executor/local_executor.c @@ -110,7 +110,7 @@ bool EnableLocalExecution = true; bool LogLocalCommands = false; -LocalExecutionStatus CurrentLocalExecutionStatus = LOCAL_EXECUTION_OPTIONAL; +static LocalExecutionStatus CurrentLocalExecutionStatus = LOCAL_EXECUTION_OPTIONAL; static void SplitLocalAndRemotePlacements(List *taskPlacementList, List **localTaskPlacementList, @@ -131,6 +131,17 @@ static void LocallyExecuteUdfTaskQuery(Query *localUdfCommandQuery); static void EnsureTransitionPossible(LocalExecutionStatus from, LocalExecutionStatus to); + +/* + * GetCurrentLocalExecutionStatus returns the current local execution status. + */ +LocalExecutionStatus +GetCurrentLocalExecutionStatus(void) +{ + return CurrentLocalExecutionStatus; +} + + /* * ExecuteLocalTasks executes the given tasks locally. * @@ -649,7 +660,7 @@ RecordNonDistTableAccessesForTask(Task *task) void SetLocalExecutionStatus(LocalExecutionStatus newStatus) { - EnsureTransitionPossible(CurrentLocalExecutionStatus, newStatus); + EnsureTransitionPossible(GetCurrentLocalExecutionStatus(), newStatus); CurrentLocalExecutionStatus = newStatus; } @@ -695,7 +706,7 @@ ShouldExecuteTasksLocally(List *taskList) return false; } - if (CurrentLocalExecutionStatus == LOCAL_EXECUTION_DISABLED) + if (GetCurrentLocalExecutionStatus() == LOCAL_EXECUTION_DISABLED) { /* * if the current transaction accessed the local node over a connection @@ -704,42 +715,21 @@ ShouldExecuteTasksLocally(List *taskList) return false; } - if (CurrentLocalExecutionStatus == LOCAL_EXECUTION_REQUIRED) + if (GetCurrentLocalExecutionStatus() == LOCAL_EXECUTION_REQUIRED) { - bool isValidLocalExecutionPath PG_USED_FOR_ASSERTS_ONLY = false; - /* - * For various reasons, including the transaction visibility - * rules (e.g., read-your-own-writes), we have to use local - * execution again if it has already happened within this - * transaction block. + * If we already used local execution for a previous command + * we should stick to it for read-your-writes policy, this can be a + * case when we are inside a transaction block. Such as: + * + * BEGIN; + * some-command; -- executed via local execution + * another-command; -- this should be executed via local execution for visibility + * COMMIT; + * + * We may need to use local execution even if we are not inside a transaction block, + * however the state will go back to LOCAL_EXECUTION_OPTIONAL at the end of transaction. */ - isValidLocalExecutionPath = IsMultiStatementTransaction() || - InCoordinatedTransaction(); - - /* - * In some cases, such as when a single command leads to a local - * command execution followed by remote task (list) execution, we - * still expect the remote execution to first try local execution - * as TransactionAccessedLocalPlacement is set by the local execution. - * The remote execution shouldn't create any local tasks as the local - * execution should have executed all the local tasks. And, we are - * ensuring it here. - */ - isValidLocalExecutionPath |= !AnyTaskAccessesLocalNode(taskList); - - /* - * We might error out later in the execution if it is not suitable - * to execute the tasks locally. - */ - Assert(isValidLocalExecutionPath); - - /* - * TODO: A future improvement could be to keep track of which placements - * have been locally executed. At this point, only use local execution - * for those placements. That'd help to benefit more from parallelism. - */ - return true; } @@ -836,7 +826,7 @@ TaskAccessesLocalNode(Task *task) void ErrorIfTransactionAccessedPlacementsLocally(void) { - if (CurrentLocalExecutionStatus == LOCAL_EXECUTION_REQUIRED) + if (GetCurrentLocalExecutionStatus() == LOCAL_EXECUTION_REQUIRED) { ereport(ERROR, (errmsg("cannot execute command because a local execution has " diff --git a/src/backend/distributed/executor/repartition_join_execution.c b/src/backend/distributed/executor/repartition_join_execution.c index 3a633d5c9..ae55b14b3 100644 --- a/src/backend/distributed/executor/repartition_join_execution.c +++ b/src/backend/distributed/executor/repartition_join_execution.c @@ -86,7 +86,7 @@ EnsureCompatibleLocalExecutionState(List *taskList) * We have LOCAL_EXECUTION_REQUIRED check here to avoid unnecessarily * iterating the task list in AnyTaskAccessesLocalNode. */ - if (CurrentLocalExecutionStatus == LOCAL_EXECUTION_REQUIRED && + if (GetCurrentLocalExecutionStatus() == LOCAL_EXECUTION_REQUIRED && AnyTaskAccessesLocalNode(taskList)) { ErrorIfTransactionAccessedPlacementsLocally(); diff --git a/src/backend/distributed/planner/local_plan_cache.c b/src/backend/distributed/planner/local_plan_cache.c index a9b139eea..ae5fa6f15 100644 --- a/src/backend/distributed/planner/local_plan_cache.c +++ b/src/backend/distributed/planner/local_plan_cache.c @@ -178,7 +178,7 @@ IsLocalPlanCachingSupported(Job *currentJob, DistributedPlan *originalDistribute return false; } - if (CurrentLocalExecutionStatus == LOCAL_EXECUTION_DISABLED) + if (GetCurrentLocalExecutionStatus() == LOCAL_EXECUTION_DISABLED) { /* transaction already connected to localhost */ return false; diff --git a/src/include/distributed/local_executor.h b/src/include/distributed/local_executor.h index 74c8a7a1d..5f2bcceb7 100644 --- a/src/include/distributed/local_executor.h +++ b/src/include/distributed/local_executor.h @@ -31,9 +31,8 @@ typedef enum LocalExecutionStatus LOCAL_EXECUTION_DISABLED } LocalExecutionStatus; -extern enum LocalExecutionStatus CurrentLocalExecutionStatus; - /* extern function declarations */ +extern LocalExecutionStatus GetCurrentLocalExecutionStatus(void); extern uint64 ExecuteLocalTaskList(List *taskList, TupleDestination *defaultTupleDest); extern uint64 ExecuteLocalUtilityTaskList(List *utilityTaskList); extern uint64 ExecuteLocalTaskListExtended(List *taskList, ParamListInfo