diff --git a/src/backend/distributed/commands/local_multi_copy.c b/src/backend/distributed/commands/local_multi_copy.c index f624d5e65..0f6f619e6 100644 --- a/src/backend/distributed/commands/local_multi_copy.c +++ b/src/backend/distributed/commands/local_multi_copy.c @@ -64,7 +64,7 @@ WriteTupleToLocalShard(TupleTableSlot *slot, CitusCopyDestReceiver *copyDest, in * Since we are doing a local copy, the following statements should * use local execution to see the changes */ - TransactionAccessedLocalPlacement = true; + SetLocalExecutionStatus(LOCAL_EXECUTION_REQUIRED); bool isBinaryCopy = localCopyOutState->binary; if (ShouldAddBinaryHeaders(localCopyOutState->fe_msgbuf, isBinaryCopy)) diff --git a/src/backend/distributed/commands/multi_copy.c b/src/backend/distributed/commands/multi_copy.c index 7724d2018..f769d69b3 100644 --- a/src/backend/distributed/commands/multi_copy.c +++ b/src/backend/distributed/commands/multi_copy.c @@ -2029,7 +2029,7 @@ ShouldExecuteCopyLocally(bool isIntermediateResult) return false; } - if (TransactionAccessedLocalPlacement) + if (CurrentLocalExecutionStatus == LOCAL_EXECUTION_REQUIRED) { /* * For various reasons, including the transaction visibility @@ -2052,7 +2052,8 @@ ShouldExecuteCopyLocally(bool isIntermediateResult) } /* if we connected to the localhost via a connection, we might not be able to see some previous changes that are done via the connection */ - return !TransactionConnectedToLocalGroup && IsMultiStatementTransaction(); + return CurrentLocalExecutionStatus != LOCAL_EXECUTION_DISABLED && + IsMultiStatementTransaction(); } @@ -3294,7 +3295,7 @@ InitializeCopyShardState(CopyShardState *shardState, */ if (!isCopyToIntermediateFile) { - TransactionConnectedToLocalGroup = true; + SetLocalExecutionStatus(LOCAL_EXECUTION_DISABLED); } } diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index c79ef0f18..4f8863edc 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -975,7 +975,8 @@ ExecuteTaskListExtended(RowModifyLevel modLevel, List *taskList, * then we should error out as it would cause inconsistencies across the * remote connection and local execution. */ - if (TransactionAccessedLocalPlacement && AnyTaskAccessesLocalNode(remoteTaskList)) + if (CurrentLocalExecutionStatus == LOCAL_EXECUTION_REQUIRED && + AnyTaskAccessesLocalNode(remoteTaskList)) { ErrorIfTransactionAccessedPlacementsLocally(); } @@ -1111,7 +1112,7 @@ DecideTransactionPropertiesForTaskList(RowModifyLevel modLevel, List *taskList, return xactProperties; } - if (TransactionAccessedLocalPlacement) + if (CurrentLocalExecutionStatus == LOCAL_EXECUTION_REQUIRED) { /* * In case localExecutionHappened, we force the executor to use 2PC. @@ -1835,10 +1836,9 @@ AssignTasksToConnectionsOrWorkerPool(DistributedExecution *execution) placementExecutionReady = false; } - if (!TransactionConnectedToLocalGroup && taskPlacement->groupId == - localGroupId) + if (taskPlacement->groupId == localGroupId) { - TransactionConnectedToLocalGroup = true; + SetLocalExecutionStatus(LOCAL_EXECUTION_DISABLED); } } } diff --git a/src/backend/distributed/executor/local_executor.c b/src/backend/distributed/executor/local_executor.c index 5ae73c76e..0cc6e70b5 100644 --- a/src/backend/distributed/executor/local_executor.c +++ b/src/backend/distributed/executor/local_executor.c @@ -107,8 +107,7 @@ bool EnableLocalExecution = true; bool LogLocalCommands = false; -bool TransactionAccessedLocalPlacement = false; -bool TransactionConnectedToLocalGroup = false; +LocalExecutionStatus CurrentLocalExecutionStatus = LOCAL_EXECUTION_OPTIONAL; static void SplitLocalAndRemotePlacements(List *taskPlacementList, List **localTaskPlacementList, @@ -124,6 +123,8 @@ static void ExtractParametersForLocalExecution(ParamListInfo paramListInfo, const char ***parameterValues); static void LocallyExecuteUtilityTask(const char *utilityCommand); static void LocallyExecuteUdfTaskQuery(Query *localUdfCommandQuery); +static void EnsureTransitionPossible(LocalExecutionStatus from, + LocalExecutionStatus to); /* * ExecuteLocalTasks executes the given tasks locally. @@ -193,7 +194,7 @@ ExecuteLocalTaskListExtended(List *taskList, ParamListInfo orig_paramListInfo, */ if (task->anchorShardId != INVALID_SHARD_ID) { - TransactionAccessedLocalPlacement = true; + SetLocalExecutionStatus(LOCAL_EXECUTION_REQUIRED); } LogLocalCommand(task); @@ -351,7 +352,7 @@ ExecuteLocalUtilityTaskList(List *localTaskList) * We should register the access to local placement to force the local * execution of the following commands withing the current transaction. */ - TransactionAccessedLocalPlacement = true; + SetLocalExecutionStatus(LOCAL_EXECUTION_REQUIRED); LogLocalCommand(localTask); @@ -589,6 +590,47 @@ ExecuteLocalTaskPlan(PlannedStmt *taskPlan, char *queryString, } +/* + * SetLocalExecutionStatus sets the local execution status to + * the given status, it errors if the transition is not possible from the + * current status. + */ +void +SetLocalExecutionStatus(LocalExecutionStatus newStatus) +{ + EnsureTransitionPossible(CurrentLocalExecutionStatus, newStatus); + + CurrentLocalExecutionStatus = newStatus; +} + + +/* + * EnsureTransitionPossible errors if we cannot switch to the 'to' status + * from the 'from' status. + */ +static void +EnsureTransitionPossible(LocalExecutionStatus from, LocalExecutionStatus + to) +{ + if (from == LOCAL_EXECUTION_REQUIRED && to == LOCAL_EXECUTION_DISABLED) + { + ereport(ERROR, + (errmsg( + "cannot switch local execution status from local execution required " + "to local execution disabled since it can cause " + "visibility problems in the current transaction"))); + } + if (from == LOCAL_EXECUTION_DISABLED && to == LOCAL_EXECUTION_REQUIRED) + { + ereport(ERROR, + (errmsg( + "cannot switch local execution status from local execution disabled " + "to local execution enabled since it can cause " + "visibility problems in the current transaction"))); + } +} + + /* * ShouldExecuteTasksLocally gets a task list and returns true if the * any of the tasks should be executed locally. This function does not @@ -602,7 +644,7 @@ ShouldExecuteTasksLocally(List *taskList) return false; } - if (TransactionConnectedToLocalGroup) + if (CurrentLocalExecutionStatus == LOCAL_EXECUTION_DISABLED) { /* * if the current transaction accessed the local node over a connection @@ -611,7 +653,7 @@ ShouldExecuteTasksLocally(List *taskList) return false; } - if (TransactionAccessedLocalPlacement) + if (CurrentLocalExecutionStatus == LOCAL_EXECUTION_REQUIRED) { bool isValidLocalExecutionPath PG_USED_FOR_ASSERTS_ONLY = false; @@ -743,7 +785,7 @@ TaskAccessesLocalNode(Task *task) void ErrorIfTransactionAccessedPlacementsLocally(void) { - if (TransactionAccessedLocalPlacement) + if (CurrentLocalExecutionStatus == LOCAL_EXECUTION_REQUIRED) { ereport(ERROR, (errmsg("cannot execute command because a local execution has " diff --git a/src/backend/distributed/executor/repartition_join_execution.c b/src/backend/distributed/executor/repartition_join_execution.c index 6ccd9db4e..242f5908a 100644 --- a/src/backend/distributed/executor/repartition_join_execution.c +++ b/src/backend/distributed/executor/repartition_join_execution.c @@ -83,10 +83,11 @@ static void EnsureCompatibleLocalExecutionState(List *taskList) { /* - * We have TransactionAccessedLocalPlacement check here to avoid unnecessarily + * We have LOCAL_EXECUTION_REQUIRED check here to avoid unnecessarily * iterating the task list in AnyTaskAccessesLocalNode. */ - if (TransactionAccessedLocalPlacement && AnyTaskAccessesLocalNode(taskList)) + if (CurrentLocalExecutionStatus == LOCAL_EXECUTION_REQUIRED && + AnyTaskAccessesLocalNode(taskList)) { ErrorIfTransactionAccessedPlacementsLocally(); } diff --git a/src/backend/distributed/master/master_delete_protocol.c b/src/backend/distributed/master/master_delete_protocol.c index 7ea34954a..6afcf1098 100644 --- a/src/backend/distributed/master/master_delete_protocol.c +++ b/src/backend/distributed/master/master_delete_protocol.c @@ -458,7 +458,7 @@ DropShards(Oid relationId, char *schemaName, char *relationName, if (isLocalShardPlacement) { - TransactionConnectedToLocalGroup = true; + SetLocalExecutionStatus(LOCAL_EXECUTION_DISABLED); } } diff --git a/src/backend/distributed/planner/local_plan_cache.c b/src/backend/distributed/planner/local_plan_cache.c index cfac1abf1..584ce2d51 100644 --- a/src/backend/distributed/planner/local_plan_cache.c +++ b/src/backend/distributed/planner/local_plan_cache.c @@ -179,7 +179,7 @@ IsLocalPlanCachingSupported(Job *currentJob, DistributedPlan *originalDistribute return false; } - if (TransactionConnectedToLocalGroup) + if (CurrentLocalExecutionStatus == LOCAL_EXECUTION_DISABLED) { /* transaction already connected to localhost */ return false; diff --git a/src/backend/distributed/transaction/transaction_management.c b/src/backend/distributed/transaction/transaction_management.c index cc235ac32..1783d4a73 100644 --- a/src/backend/distributed/transaction/transaction_management.c +++ b/src/backend/distributed/transaction/transaction_management.c @@ -445,8 +445,7 @@ ResetGlobalVariables() { CurrentCoordinatedTransactionState = COORD_TRANS_NONE; XactModificationLevel = XACT_MODIFICATION_NONE; - TransactionAccessedLocalPlacement = false; - TransactionConnectedToLocalGroup = false; + SetLocalExecutionStatus(LOCAL_EXECUTION_OPTIONAL); dlist_init(&InProgressTransactions); activeSetStmts = NULL; CoordinatedTransactionUses2PC = false; diff --git a/src/include/distributed/local_executor.h b/src/include/distributed/local_executor.h index 6a3d1ed34..f42a7c47b 100644 --- a/src/include/distributed/local_executor.h +++ b/src/include/distributed/local_executor.h @@ -17,8 +17,14 @@ extern bool EnableLocalExecution; extern bool LogLocalCommands; -extern bool TransactionAccessedLocalPlacement; -extern bool TransactionConnectedToLocalGroup; +typedef enum LocalExecutionStatus +{ + LOCAL_EXECUTION_REQUIRED, + LOCAL_EXECUTION_OPTIONAL, + LOCAL_EXECUTION_DISABLED +} LocalExecutionStatus; + +extern enum LocalExecutionStatus CurrentLocalExecutionStatus; /* extern function declarations */ extern uint64 ExecuteLocalTaskList(List *taskList, @@ -35,5 +41,6 @@ extern bool AnyTaskAccessesLocalNode(List *taskList); extern bool TaskAccessesLocalNode(Task *task); extern void ErrorIfTransactionAccessedPlacementsLocally(void); extern void DisableLocalExecution(void); +extern void SetLocalExecutionStatus(LocalExecutionStatus newStatus); #endif /* LOCAL_EXECUTION_H */