From 3dc7cad754eb679edadc4696595aaacd3e0ef9a4 Mon Sep 17 00:00:00 2001 From: SaitTalhaNisanci Date: Thu, 9 Apr 2020 19:11:04 +0300 Subject: [PATCH] use an enum for local execution status (#3733) We have two variables that are related to local execution status. TransactionAccessedLocalPlacement and TransactionConnectedToLocalGroup. Only one of these fields should be set, however we didn't have any check for this contraint and it was error prone. What those two variables are used is that we are trying to understand if we should use local execution, the current session, or if we should be using a connection to execute the current query, therefore the tasks. In the enum, now it is more clear what these variables mean. Also, now we have a method to change the local execution status. The method will error if we are trying to transition from a state to a wrong state. This will help us avoid problems. --- .../distributed/commands/local_multi_copy.c | 2 +- src/backend/distributed/commands/multi_copy.c | 7 ++- .../distributed/executor/adaptive_executor.c | 10 ++-- .../distributed/executor/local_executor.c | 56 ++++++++++++++++--- .../executor/repartition_join_execution.c | 5 +- .../master/master_delete_protocol.c | 2 +- .../distributed/planner/local_plan_cache.c | 2 +- .../transaction/transaction_management.c | 3 +- src/include/distributed/local_executor.h | 11 +++- 9 files changed, 74 insertions(+), 24 deletions(-) diff --git a/src/backend/distributed/commands/local_multi_copy.c b/src/backend/distributed/commands/local_multi_copy.c index f624d5e65..0f6f619e6 100644 --- a/src/backend/distributed/commands/local_multi_copy.c +++ b/src/backend/distributed/commands/local_multi_copy.c @@ -64,7 +64,7 @@ WriteTupleToLocalShard(TupleTableSlot *slot, CitusCopyDestReceiver *copyDest, in * Since we are doing a local copy, the following statements should * use local execution to see the changes */ - TransactionAccessedLocalPlacement = true; + SetLocalExecutionStatus(LOCAL_EXECUTION_REQUIRED); bool isBinaryCopy = localCopyOutState->binary; if (ShouldAddBinaryHeaders(localCopyOutState->fe_msgbuf, isBinaryCopy)) diff --git a/src/backend/distributed/commands/multi_copy.c b/src/backend/distributed/commands/multi_copy.c index 7724d2018..f769d69b3 100644 --- a/src/backend/distributed/commands/multi_copy.c +++ b/src/backend/distributed/commands/multi_copy.c @@ -2029,7 +2029,7 @@ ShouldExecuteCopyLocally(bool isIntermediateResult) return false; } - if (TransactionAccessedLocalPlacement) + if (CurrentLocalExecutionStatus == LOCAL_EXECUTION_REQUIRED) { /* * For various reasons, including the transaction visibility @@ -2052,7 +2052,8 @@ ShouldExecuteCopyLocally(bool isIntermediateResult) } /* if we connected to the localhost via a connection, we might not be able to see some previous changes that are done via the connection */ - return !TransactionConnectedToLocalGroup && IsMultiStatementTransaction(); + return CurrentLocalExecutionStatus != LOCAL_EXECUTION_DISABLED && + IsMultiStatementTransaction(); } @@ -3294,7 +3295,7 @@ InitializeCopyShardState(CopyShardState *shardState, */ if (!isCopyToIntermediateFile) { - TransactionConnectedToLocalGroup = true; + SetLocalExecutionStatus(LOCAL_EXECUTION_DISABLED); } } diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index c79ef0f18..4f8863edc 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -975,7 +975,8 @@ ExecuteTaskListExtended(RowModifyLevel modLevel, List *taskList, * then we should error out as it would cause inconsistencies across the * remote connection and local execution. */ - if (TransactionAccessedLocalPlacement && AnyTaskAccessesLocalNode(remoteTaskList)) + if (CurrentLocalExecutionStatus == LOCAL_EXECUTION_REQUIRED && + AnyTaskAccessesLocalNode(remoteTaskList)) { ErrorIfTransactionAccessedPlacementsLocally(); } @@ -1111,7 +1112,7 @@ DecideTransactionPropertiesForTaskList(RowModifyLevel modLevel, List *taskList, return xactProperties; } - if (TransactionAccessedLocalPlacement) + if (CurrentLocalExecutionStatus == LOCAL_EXECUTION_REQUIRED) { /* * In case localExecutionHappened, we force the executor to use 2PC. @@ -1835,10 +1836,9 @@ AssignTasksToConnectionsOrWorkerPool(DistributedExecution *execution) placementExecutionReady = false; } - if (!TransactionConnectedToLocalGroup && taskPlacement->groupId == - localGroupId) + if (taskPlacement->groupId == localGroupId) { - TransactionConnectedToLocalGroup = true; + SetLocalExecutionStatus(LOCAL_EXECUTION_DISABLED); } } } diff --git a/src/backend/distributed/executor/local_executor.c b/src/backend/distributed/executor/local_executor.c index 5ae73c76e..0cc6e70b5 100644 --- a/src/backend/distributed/executor/local_executor.c +++ b/src/backend/distributed/executor/local_executor.c @@ -107,8 +107,7 @@ bool EnableLocalExecution = true; bool LogLocalCommands = false; -bool TransactionAccessedLocalPlacement = false; -bool TransactionConnectedToLocalGroup = false; +LocalExecutionStatus CurrentLocalExecutionStatus = LOCAL_EXECUTION_OPTIONAL; static void SplitLocalAndRemotePlacements(List *taskPlacementList, List **localTaskPlacementList, @@ -124,6 +123,8 @@ static void ExtractParametersForLocalExecution(ParamListInfo paramListInfo, const char ***parameterValues); static void LocallyExecuteUtilityTask(const char *utilityCommand); static void LocallyExecuteUdfTaskQuery(Query *localUdfCommandQuery); +static void EnsureTransitionPossible(LocalExecutionStatus from, + LocalExecutionStatus to); /* * ExecuteLocalTasks executes the given tasks locally. @@ -193,7 +194,7 @@ ExecuteLocalTaskListExtended(List *taskList, ParamListInfo orig_paramListInfo, */ if (task->anchorShardId != INVALID_SHARD_ID) { - TransactionAccessedLocalPlacement = true; + SetLocalExecutionStatus(LOCAL_EXECUTION_REQUIRED); } LogLocalCommand(task); @@ -351,7 +352,7 @@ ExecuteLocalUtilityTaskList(List *localTaskList) * We should register the access to local placement to force the local * execution of the following commands withing the current transaction. */ - TransactionAccessedLocalPlacement = true; + SetLocalExecutionStatus(LOCAL_EXECUTION_REQUIRED); LogLocalCommand(localTask); @@ -589,6 +590,47 @@ ExecuteLocalTaskPlan(PlannedStmt *taskPlan, char *queryString, } +/* + * SetLocalExecutionStatus sets the local execution status to + * the given status, it errors if the transition is not possible from the + * current status. + */ +void +SetLocalExecutionStatus(LocalExecutionStatus newStatus) +{ + EnsureTransitionPossible(CurrentLocalExecutionStatus, newStatus); + + CurrentLocalExecutionStatus = newStatus; +} + + +/* + * EnsureTransitionPossible errors if we cannot switch to the 'to' status + * from the 'from' status. + */ +static void +EnsureTransitionPossible(LocalExecutionStatus from, LocalExecutionStatus + to) +{ + if (from == LOCAL_EXECUTION_REQUIRED && to == LOCAL_EXECUTION_DISABLED) + { + ereport(ERROR, + (errmsg( + "cannot switch local execution status from local execution required " + "to local execution disabled since it can cause " + "visibility problems in the current transaction"))); + } + if (from == LOCAL_EXECUTION_DISABLED && to == LOCAL_EXECUTION_REQUIRED) + { + ereport(ERROR, + (errmsg( + "cannot switch local execution status from local execution disabled " + "to local execution enabled since it can cause " + "visibility problems in the current transaction"))); + } +} + + /* * ShouldExecuteTasksLocally gets a task list and returns true if the * any of the tasks should be executed locally. This function does not @@ -602,7 +644,7 @@ ShouldExecuteTasksLocally(List *taskList) return false; } - if (TransactionConnectedToLocalGroup) + if (CurrentLocalExecutionStatus == LOCAL_EXECUTION_DISABLED) { /* * if the current transaction accessed the local node over a connection @@ -611,7 +653,7 @@ ShouldExecuteTasksLocally(List *taskList) return false; } - if (TransactionAccessedLocalPlacement) + if (CurrentLocalExecutionStatus == LOCAL_EXECUTION_REQUIRED) { bool isValidLocalExecutionPath PG_USED_FOR_ASSERTS_ONLY = false; @@ -743,7 +785,7 @@ TaskAccessesLocalNode(Task *task) void ErrorIfTransactionAccessedPlacementsLocally(void) { - if (TransactionAccessedLocalPlacement) + if (CurrentLocalExecutionStatus == LOCAL_EXECUTION_REQUIRED) { ereport(ERROR, (errmsg("cannot execute command because a local execution has " diff --git a/src/backend/distributed/executor/repartition_join_execution.c b/src/backend/distributed/executor/repartition_join_execution.c index 6ccd9db4e..242f5908a 100644 --- a/src/backend/distributed/executor/repartition_join_execution.c +++ b/src/backend/distributed/executor/repartition_join_execution.c @@ -83,10 +83,11 @@ static void EnsureCompatibleLocalExecutionState(List *taskList) { /* - * We have TransactionAccessedLocalPlacement check here to avoid unnecessarily + * We have LOCAL_EXECUTION_REQUIRED check here to avoid unnecessarily * iterating the task list in AnyTaskAccessesLocalNode. */ - if (TransactionAccessedLocalPlacement && AnyTaskAccessesLocalNode(taskList)) + if (CurrentLocalExecutionStatus == LOCAL_EXECUTION_REQUIRED && + AnyTaskAccessesLocalNode(taskList)) { ErrorIfTransactionAccessedPlacementsLocally(); } diff --git a/src/backend/distributed/master/master_delete_protocol.c b/src/backend/distributed/master/master_delete_protocol.c index 7ea34954a..6afcf1098 100644 --- a/src/backend/distributed/master/master_delete_protocol.c +++ b/src/backend/distributed/master/master_delete_protocol.c @@ -458,7 +458,7 @@ DropShards(Oid relationId, char *schemaName, char *relationName, if (isLocalShardPlacement) { - TransactionConnectedToLocalGroup = true; + SetLocalExecutionStatus(LOCAL_EXECUTION_DISABLED); } } diff --git a/src/backend/distributed/planner/local_plan_cache.c b/src/backend/distributed/planner/local_plan_cache.c index cfac1abf1..584ce2d51 100644 --- a/src/backend/distributed/planner/local_plan_cache.c +++ b/src/backend/distributed/planner/local_plan_cache.c @@ -179,7 +179,7 @@ IsLocalPlanCachingSupported(Job *currentJob, DistributedPlan *originalDistribute return false; } - if (TransactionConnectedToLocalGroup) + if (CurrentLocalExecutionStatus == LOCAL_EXECUTION_DISABLED) { /* transaction already connected to localhost */ return false; diff --git a/src/backend/distributed/transaction/transaction_management.c b/src/backend/distributed/transaction/transaction_management.c index cc235ac32..1783d4a73 100644 --- a/src/backend/distributed/transaction/transaction_management.c +++ b/src/backend/distributed/transaction/transaction_management.c @@ -445,8 +445,7 @@ ResetGlobalVariables() { CurrentCoordinatedTransactionState = COORD_TRANS_NONE; XactModificationLevel = XACT_MODIFICATION_NONE; - TransactionAccessedLocalPlacement = false; - TransactionConnectedToLocalGroup = false; + SetLocalExecutionStatus(LOCAL_EXECUTION_OPTIONAL); dlist_init(&InProgressTransactions); activeSetStmts = NULL; CoordinatedTransactionUses2PC = false; diff --git a/src/include/distributed/local_executor.h b/src/include/distributed/local_executor.h index 6a3d1ed34..f42a7c47b 100644 --- a/src/include/distributed/local_executor.h +++ b/src/include/distributed/local_executor.h @@ -17,8 +17,14 @@ extern bool EnableLocalExecution; extern bool LogLocalCommands; -extern bool TransactionAccessedLocalPlacement; -extern bool TransactionConnectedToLocalGroup; +typedef enum LocalExecutionStatus +{ + LOCAL_EXECUTION_REQUIRED, + LOCAL_EXECUTION_OPTIONAL, + LOCAL_EXECUTION_DISABLED +} LocalExecutionStatus; + +extern enum LocalExecutionStatus CurrentLocalExecutionStatus; /* extern function declarations */ extern uint64 ExecuteLocalTaskList(List *taskList, @@ -35,5 +41,6 @@ extern bool AnyTaskAccessesLocalNode(List *taskList); extern bool TaskAccessesLocalNode(Task *task); extern void ErrorIfTransactionAccessedPlacementsLocally(void); extern void DisableLocalExecution(void); +extern void SetLocalExecutionStatus(LocalExecutionStatus newStatus); #endif /* LOCAL_EXECUTION_H */