mirror of https://github.com/citusdata/citus.git
use an enum for local execution status (#3733)
We have two variables that are related to local execution status. TransactionAccessedLocalPlacement and TransactionConnectedToLocalGroup. Only one of these fields should be set, however we didn't have any check for this contraint and it was error prone. What those two variables are used is that we are trying to understand if we should use local execution, the current session, or if we should be using a connection to execute the current query, therefore the tasks. In the enum, now it is more clear what these variables mean. Also, now we have a method to change the local execution status. The method will error if we are trying to transition from a state to a wrong state. This will help us avoid problems.pull/3690/head
parent
24dcb02bca
commit
3dc7cad754
|
@ -64,7 +64,7 @@ WriteTupleToLocalShard(TupleTableSlot *slot, CitusCopyDestReceiver *copyDest, in
|
||||||
* Since we are doing a local copy, the following statements should
|
* Since we are doing a local copy, the following statements should
|
||||||
* use local execution to see the changes
|
* use local execution to see the changes
|
||||||
*/
|
*/
|
||||||
TransactionAccessedLocalPlacement = true;
|
SetLocalExecutionStatus(LOCAL_EXECUTION_REQUIRED);
|
||||||
|
|
||||||
bool isBinaryCopy = localCopyOutState->binary;
|
bool isBinaryCopy = localCopyOutState->binary;
|
||||||
if (ShouldAddBinaryHeaders(localCopyOutState->fe_msgbuf, isBinaryCopy))
|
if (ShouldAddBinaryHeaders(localCopyOutState->fe_msgbuf, isBinaryCopy))
|
||||||
|
|
|
@ -2029,7 +2029,7 @@ ShouldExecuteCopyLocally(bool isIntermediateResult)
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (TransactionAccessedLocalPlacement)
|
if (CurrentLocalExecutionStatus == LOCAL_EXECUTION_REQUIRED)
|
||||||
{
|
{
|
||||||
/*
|
/*
|
||||||
* For various reasons, including the transaction visibility
|
* For various reasons, including the transaction visibility
|
||||||
|
@ -2052,7 +2052,8 @@ ShouldExecuteCopyLocally(bool isIntermediateResult)
|
||||||
}
|
}
|
||||||
|
|
||||||
/* if we connected to the localhost via a connection, we might not be able to see some previous changes that are done via the connection */
|
/* if we connected to the localhost via a connection, we might not be able to see some previous changes that are done via the connection */
|
||||||
return !TransactionConnectedToLocalGroup && IsMultiStatementTransaction();
|
return CurrentLocalExecutionStatus != LOCAL_EXECUTION_DISABLED &&
|
||||||
|
IsMultiStatementTransaction();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -3294,7 +3295,7 @@ InitializeCopyShardState(CopyShardState *shardState,
|
||||||
*/
|
*/
|
||||||
if (!isCopyToIntermediateFile)
|
if (!isCopyToIntermediateFile)
|
||||||
{
|
{
|
||||||
TransactionConnectedToLocalGroup = true;
|
SetLocalExecutionStatus(LOCAL_EXECUTION_DISABLED);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -975,7 +975,8 @@ ExecuteTaskListExtended(RowModifyLevel modLevel, List *taskList,
|
||||||
* then we should error out as it would cause inconsistencies across the
|
* then we should error out as it would cause inconsistencies across the
|
||||||
* remote connection and local execution.
|
* remote connection and local execution.
|
||||||
*/
|
*/
|
||||||
if (TransactionAccessedLocalPlacement && AnyTaskAccessesLocalNode(remoteTaskList))
|
if (CurrentLocalExecutionStatus == LOCAL_EXECUTION_REQUIRED &&
|
||||||
|
AnyTaskAccessesLocalNode(remoteTaskList))
|
||||||
{
|
{
|
||||||
ErrorIfTransactionAccessedPlacementsLocally();
|
ErrorIfTransactionAccessedPlacementsLocally();
|
||||||
}
|
}
|
||||||
|
@ -1111,7 +1112,7 @@ DecideTransactionPropertiesForTaskList(RowModifyLevel modLevel, List *taskList,
|
||||||
return xactProperties;
|
return xactProperties;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (TransactionAccessedLocalPlacement)
|
if (CurrentLocalExecutionStatus == LOCAL_EXECUTION_REQUIRED)
|
||||||
{
|
{
|
||||||
/*
|
/*
|
||||||
* In case localExecutionHappened, we force the executor to use 2PC.
|
* In case localExecutionHappened, we force the executor to use 2PC.
|
||||||
|
@ -1835,10 +1836,9 @@ AssignTasksToConnectionsOrWorkerPool(DistributedExecution *execution)
|
||||||
placementExecutionReady = false;
|
placementExecutionReady = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!TransactionConnectedToLocalGroup && taskPlacement->groupId ==
|
if (taskPlacement->groupId == localGroupId)
|
||||||
localGroupId)
|
|
||||||
{
|
{
|
||||||
TransactionConnectedToLocalGroup = true;
|
SetLocalExecutionStatus(LOCAL_EXECUTION_DISABLED);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -107,8 +107,7 @@
|
||||||
bool EnableLocalExecution = true;
|
bool EnableLocalExecution = true;
|
||||||
bool LogLocalCommands = false;
|
bool LogLocalCommands = false;
|
||||||
|
|
||||||
bool TransactionAccessedLocalPlacement = false;
|
LocalExecutionStatus CurrentLocalExecutionStatus = LOCAL_EXECUTION_OPTIONAL;
|
||||||
bool TransactionConnectedToLocalGroup = false;
|
|
||||||
|
|
||||||
static void SplitLocalAndRemotePlacements(List *taskPlacementList,
|
static void SplitLocalAndRemotePlacements(List *taskPlacementList,
|
||||||
List **localTaskPlacementList,
|
List **localTaskPlacementList,
|
||||||
|
@ -124,6 +123,8 @@ static void ExtractParametersForLocalExecution(ParamListInfo paramListInfo,
|
||||||
const char ***parameterValues);
|
const char ***parameterValues);
|
||||||
static void LocallyExecuteUtilityTask(const char *utilityCommand);
|
static void LocallyExecuteUtilityTask(const char *utilityCommand);
|
||||||
static void LocallyExecuteUdfTaskQuery(Query *localUdfCommandQuery);
|
static void LocallyExecuteUdfTaskQuery(Query *localUdfCommandQuery);
|
||||||
|
static void EnsureTransitionPossible(LocalExecutionStatus from,
|
||||||
|
LocalExecutionStatus to);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* ExecuteLocalTasks executes the given tasks locally.
|
* ExecuteLocalTasks executes the given tasks locally.
|
||||||
|
@ -193,7 +194,7 @@ ExecuteLocalTaskListExtended(List *taskList, ParamListInfo orig_paramListInfo,
|
||||||
*/
|
*/
|
||||||
if (task->anchorShardId != INVALID_SHARD_ID)
|
if (task->anchorShardId != INVALID_SHARD_ID)
|
||||||
{
|
{
|
||||||
TransactionAccessedLocalPlacement = true;
|
SetLocalExecutionStatus(LOCAL_EXECUTION_REQUIRED);
|
||||||
}
|
}
|
||||||
LogLocalCommand(task);
|
LogLocalCommand(task);
|
||||||
|
|
||||||
|
@ -351,7 +352,7 @@ ExecuteLocalUtilityTaskList(List *localTaskList)
|
||||||
* We should register the access to local placement to force the local
|
* We should register the access to local placement to force the local
|
||||||
* execution of the following commands withing the current transaction.
|
* execution of the following commands withing the current transaction.
|
||||||
*/
|
*/
|
||||||
TransactionAccessedLocalPlacement = true;
|
SetLocalExecutionStatus(LOCAL_EXECUTION_REQUIRED);
|
||||||
|
|
||||||
LogLocalCommand(localTask);
|
LogLocalCommand(localTask);
|
||||||
|
|
||||||
|
@ -589,6 +590,47 @@ ExecuteLocalTaskPlan(PlannedStmt *taskPlan, char *queryString,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* SetLocalExecutionStatus sets the local execution status to
|
||||||
|
* the given status, it errors if the transition is not possible from the
|
||||||
|
* current status.
|
||||||
|
*/
|
||||||
|
void
|
||||||
|
SetLocalExecutionStatus(LocalExecutionStatus newStatus)
|
||||||
|
{
|
||||||
|
EnsureTransitionPossible(CurrentLocalExecutionStatus, newStatus);
|
||||||
|
|
||||||
|
CurrentLocalExecutionStatus = newStatus;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* EnsureTransitionPossible errors if we cannot switch to the 'to' status
|
||||||
|
* from the 'from' status.
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
EnsureTransitionPossible(LocalExecutionStatus from, LocalExecutionStatus
|
||||||
|
to)
|
||||||
|
{
|
||||||
|
if (from == LOCAL_EXECUTION_REQUIRED && to == LOCAL_EXECUTION_DISABLED)
|
||||||
|
{
|
||||||
|
ereport(ERROR,
|
||||||
|
(errmsg(
|
||||||
|
"cannot switch local execution status from local execution required "
|
||||||
|
"to local execution disabled since it can cause "
|
||||||
|
"visibility problems in the current transaction")));
|
||||||
|
}
|
||||||
|
if (from == LOCAL_EXECUTION_DISABLED && to == LOCAL_EXECUTION_REQUIRED)
|
||||||
|
{
|
||||||
|
ereport(ERROR,
|
||||||
|
(errmsg(
|
||||||
|
"cannot switch local execution status from local execution disabled "
|
||||||
|
"to local execution enabled since it can cause "
|
||||||
|
"visibility problems in the current transaction")));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* ShouldExecuteTasksLocally gets a task list and returns true if the
|
* ShouldExecuteTasksLocally gets a task list and returns true if the
|
||||||
* any of the tasks should be executed locally. This function does not
|
* any of the tasks should be executed locally. This function does not
|
||||||
|
@ -602,7 +644,7 @@ ShouldExecuteTasksLocally(List *taskList)
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (TransactionConnectedToLocalGroup)
|
if (CurrentLocalExecutionStatus == LOCAL_EXECUTION_DISABLED)
|
||||||
{
|
{
|
||||||
/*
|
/*
|
||||||
* if the current transaction accessed the local node over a connection
|
* if the current transaction accessed the local node over a connection
|
||||||
|
@ -611,7 +653,7 @@ ShouldExecuteTasksLocally(List *taskList)
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (TransactionAccessedLocalPlacement)
|
if (CurrentLocalExecutionStatus == LOCAL_EXECUTION_REQUIRED)
|
||||||
{
|
{
|
||||||
bool isValidLocalExecutionPath PG_USED_FOR_ASSERTS_ONLY = false;
|
bool isValidLocalExecutionPath PG_USED_FOR_ASSERTS_ONLY = false;
|
||||||
|
|
||||||
|
@ -743,7 +785,7 @@ TaskAccessesLocalNode(Task *task)
|
||||||
void
|
void
|
||||||
ErrorIfTransactionAccessedPlacementsLocally(void)
|
ErrorIfTransactionAccessedPlacementsLocally(void)
|
||||||
{
|
{
|
||||||
if (TransactionAccessedLocalPlacement)
|
if (CurrentLocalExecutionStatus == LOCAL_EXECUTION_REQUIRED)
|
||||||
{
|
{
|
||||||
ereport(ERROR,
|
ereport(ERROR,
|
||||||
(errmsg("cannot execute command because a local execution has "
|
(errmsg("cannot execute command because a local execution has "
|
||||||
|
|
|
@ -83,10 +83,11 @@ static void
|
||||||
EnsureCompatibleLocalExecutionState(List *taskList)
|
EnsureCompatibleLocalExecutionState(List *taskList)
|
||||||
{
|
{
|
||||||
/*
|
/*
|
||||||
* We have TransactionAccessedLocalPlacement check here to avoid unnecessarily
|
* We have LOCAL_EXECUTION_REQUIRED check here to avoid unnecessarily
|
||||||
* iterating the task list in AnyTaskAccessesLocalNode.
|
* iterating the task list in AnyTaskAccessesLocalNode.
|
||||||
*/
|
*/
|
||||||
if (TransactionAccessedLocalPlacement && AnyTaskAccessesLocalNode(taskList))
|
if (CurrentLocalExecutionStatus == LOCAL_EXECUTION_REQUIRED &&
|
||||||
|
AnyTaskAccessesLocalNode(taskList))
|
||||||
{
|
{
|
||||||
ErrorIfTransactionAccessedPlacementsLocally();
|
ErrorIfTransactionAccessedPlacementsLocally();
|
||||||
}
|
}
|
||||||
|
|
|
@ -458,7 +458,7 @@ DropShards(Oid relationId, char *schemaName, char *relationName,
|
||||||
|
|
||||||
if (isLocalShardPlacement)
|
if (isLocalShardPlacement)
|
||||||
{
|
{
|
||||||
TransactionConnectedToLocalGroup = true;
|
SetLocalExecutionStatus(LOCAL_EXECUTION_DISABLED);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -179,7 +179,7 @@ IsLocalPlanCachingSupported(Job *currentJob, DistributedPlan *originalDistribute
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (TransactionConnectedToLocalGroup)
|
if (CurrentLocalExecutionStatus == LOCAL_EXECUTION_DISABLED)
|
||||||
{
|
{
|
||||||
/* transaction already connected to localhost */
|
/* transaction already connected to localhost */
|
||||||
return false;
|
return false;
|
||||||
|
|
|
@ -445,8 +445,7 @@ ResetGlobalVariables()
|
||||||
{
|
{
|
||||||
CurrentCoordinatedTransactionState = COORD_TRANS_NONE;
|
CurrentCoordinatedTransactionState = COORD_TRANS_NONE;
|
||||||
XactModificationLevel = XACT_MODIFICATION_NONE;
|
XactModificationLevel = XACT_MODIFICATION_NONE;
|
||||||
TransactionAccessedLocalPlacement = false;
|
SetLocalExecutionStatus(LOCAL_EXECUTION_OPTIONAL);
|
||||||
TransactionConnectedToLocalGroup = false;
|
|
||||||
dlist_init(&InProgressTransactions);
|
dlist_init(&InProgressTransactions);
|
||||||
activeSetStmts = NULL;
|
activeSetStmts = NULL;
|
||||||
CoordinatedTransactionUses2PC = false;
|
CoordinatedTransactionUses2PC = false;
|
||||||
|
|
|
@ -17,8 +17,14 @@
|
||||||
extern bool EnableLocalExecution;
|
extern bool EnableLocalExecution;
|
||||||
extern bool LogLocalCommands;
|
extern bool LogLocalCommands;
|
||||||
|
|
||||||
extern bool TransactionAccessedLocalPlacement;
|
typedef enum LocalExecutionStatus
|
||||||
extern bool TransactionConnectedToLocalGroup;
|
{
|
||||||
|
LOCAL_EXECUTION_REQUIRED,
|
||||||
|
LOCAL_EXECUTION_OPTIONAL,
|
||||||
|
LOCAL_EXECUTION_DISABLED
|
||||||
|
} LocalExecutionStatus;
|
||||||
|
|
||||||
|
extern enum LocalExecutionStatus CurrentLocalExecutionStatus;
|
||||||
|
|
||||||
/* extern function declarations */
|
/* extern function declarations */
|
||||||
extern uint64 ExecuteLocalTaskList(List *taskList,
|
extern uint64 ExecuteLocalTaskList(List *taskList,
|
||||||
|
@ -35,5 +41,6 @@ extern bool AnyTaskAccessesLocalNode(List *taskList);
|
||||||
extern bool TaskAccessesLocalNode(Task *task);
|
extern bool TaskAccessesLocalNode(Task *task);
|
||||||
extern void ErrorIfTransactionAccessedPlacementsLocally(void);
|
extern void ErrorIfTransactionAccessedPlacementsLocally(void);
|
||||||
extern void DisableLocalExecution(void);
|
extern void DisableLocalExecution(void);
|
||||||
|
extern void SetLocalExecutionStatus(LocalExecutionStatus newStatus);
|
||||||
|
|
||||||
#endif /* LOCAL_EXECUTION_H */
|
#endif /* LOCAL_EXECUTION_H */
|
||||||
|
|
Loading…
Reference in New Issue