mirror of https://github.com/citusdata/citus.git
Introduce GetCurrentLocalExecutionStatus wrapper
We should not access CurrentLocalExecutionStatus directly because that would mean that we could also set it directly, which we shouldn't because we have checks to see if the new state is possible, otherwise we error.pull/4233/head
parent
619b8b7654
commit
ecde6c6eef
|
@ -2097,7 +2097,7 @@ ShouldExecuteCopyLocally(bool isIntermediateResult)
|
|||
return false;
|
||||
}
|
||||
|
||||
if (CurrentLocalExecutionStatus == LOCAL_EXECUTION_REQUIRED)
|
||||
if (GetCurrentLocalExecutionStatus() == LOCAL_EXECUTION_REQUIRED)
|
||||
{
|
||||
/*
|
||||
* For various reasons, including the transaction visibility
|
||||
|
@ -2120,7 +2120,7 @@ ShouldExecuteCopyLocally(bool isIntermediateResult)
|
|||
}
|
||||
|
||||
/* if we connected to the localhost via a connection, we might not be able to see some previous changes that are done via the connection */
|
||||
return CurrentLocalExecutionStatus != LOCAL_EXECUTION_DISABLED &&
|
||||
return GetCurrentLocalExecutionStatus() != LOCAL_EXECUTION_DISABLED &&
|
||||
IsMultiStatementTransaction();
|
||||
}
|
||||
|
||||
|
|
|
@ -1004,7 +1004,7 @@ ExecuteTaskListExtended(ExecutionParams *executionParams)
|
|||
* then we should error out as it would cause inconsistencies across the
|
||||
* remote connection and local execution.
|
||||
*/
|
||||
if (CurrentLocalExecutionStatus == LOCAL_EXECUTION_REQUIRED &&
|
||||
if (GetCurrentLocalExecutionStatus() == LOCAL_EXECUTION_REQUIRED &&
|
||||
AnyTaskAccessesLocalNode(remoteTaskList))
|
||||
{
|
||||
ErrorIfTransactionAccessedPlacementsLocally();
|
||||
|
@ -1187,7 +1187,7 @@ DecideTransactionPropertiesForTaskList(RowModifyLevel modLevel, List *taskList,
|
|||
return xactProperties;
|
||||
}
|
||||
|
||||
if (CurrentLocalExecutionStatus == LOCAL_EXECUTION_REQUIRED)
|
||||
if (GetCurrentLocalExecutionStatus() == LOCAL_EXECUTION_REQUIRED)
|
||||
{
|
||||
/*
|
||||
* In case localExecutionHappened, we force the executor to use 2PC.
|
||||
|
|
|
@ -110,7 +110,7 @@
|
|||
bool EnableLocalExecution = true;
|
||||
bool LogLocalCommands = false;
|
||||
|
||||
LocalExecutionStatus CurrentLocalExecutionStatus = LOCAL_EXECUTION_OPTIONAL;
|
||||
static LocalExecutionStatus CurrentLocalExecutionStatus = LOCAL_EXECUTION_OPTIONAL;
|
||||
|
||||
static void SplitLocalAndRemotePlacements(List *taskPlacementList,
|
||||
List **localTaskPlacementList,
|
||||
|
@ -131,6 +131,17 @@ static void LocallyExecuteUdfTaskQuery(Query *localUdfCommandQuery);
|
|||
static void EnsureTransitionPossible(LocalExecutionStatus from,
|
||||
LocalExecutionStatus to);
|
||||
|
||||
|
||||
/*
|
||||
* GetCurrentLocalExecutionStatus returns the current local execution status.
|
||||
*/
|
||||
LocalExecutionStatus
|
||||
GetCurrentLocalExecutionStatus(void)
|
||||
{
|
||||
return CurrentLocalExecutionStatus;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* ExecuteLocalTasks executes the given tasks locally.
|
||||
*
|
||||
|
@ -649,7 +660,7 @@ RecordNonDistTableAccessesForTask(Task *task)
|
|||
void
|
||||
SetLocalExecutionStatus(LocalExecutionStatus newStatus)
|
||||
{
|
||||
EnsureTransitionPossible(CurrentLocalExecutionStatus, newStatus);
|
||||
EnsureTransitionPossible(GetCurrentLocalExecutionStatus(), newStatus);
|
||||
|
||||
CurrentLocalExecutionStatus = newStatus;
|
||||
}
|
||||
|
@ -695,7 +706,7 @@ ShouldExecuteTasksLocally(List *taskList)
|
|||
return false;
|
||||
}
|
||||
|
||||
if (CurrentLocalExecutionStatus == LOCAL_EXECUTION_DISABLED)
|
||||
if (GetCurrentLocalExecutionStatus() == LOCAL_EXECUTION_DISABLED)
|
||||
{
|
||||
/*
|
||||
* if the current transaction accessed the local node over a connection
|
||||
|
@ -704,42 +715,21 @@ ShouldExecuteTasksLocally(List *taskList)
|
|||
return false;
|
||||
}
|
||||
|
||||
if (CurrentLocalExecutionStatus == LOCAL_EXECUTION_REQUIRED)
|
||||
if (GetCurrentLocalExecutionStatus() == LOCAL_EXECUTION_REQUIRED)
|
||||
{
|
||||
bool isValidLocalExecutionPath PG_USED_FOR_ASSERTS_ONLY = false;
|
||||
|
||||
/*
|
||||
* For various reasons, including the transaction visibility
|
||||
* rules (e.g., read-your-own-writes), we have to use local
|
||||
* execution again if it has already happened within this
|
||||
* transaction block.
|
||||
* If we already used local execution for a previous command
|
||||
* we should stick to it for read-your-writes policy, this can be a
|
||||
* case when we are inside a transaction block. Such as:
|
||||
*
|
||||
* BEGIN;
|
||||
* some-command; -- executed via local execution
|
||||
* another-command; -- this should be executed via local execution for visibility
|
||||
* COMMIT;
|
||||
*
|
||||
* We may need to use local execution even if we are not inside a transaction block,
|
||||
* however the state will go back to LOCAL_EXECUTION_OPTIONAL at the end of transaction.
|
||||
*/
|
||||
isValidLocalExecutionPath = IsMultiStatementTransaction() ||
|
||||
InCoordinatedTransaction();
|
||||
|
||||
/*
|
||||
* In some cases, such as when a single command leads to a local
|
||||
* command execution followed by remote task (list) execution, we
|
||||
* still expect the remote execution to first try local execution
|
||||
* as TransactionAccessedLocalPlacement is set by the local execution.
|
||||
* The remote execution shouldn't create any local tasks as the local
|
||||
* execution should have executed all the local tasks. And, we are
|
||||
* ensuring it here.
|
||||
*/
|
||||
isValidLocalExecutionPath |= !AnyTaskAccessesLocalNode(taskList);
|
||||
|
||||
/*
|
||||
* We might error out later in the execution if it is not suitable
|
||||
* to execute the tasks locally.
|
||||
*/
|
||||
Assert(isValidLocalExecutionPath);
|
||||
|
||||
/*
|
||||
* TODO: A future improvement could be to keep track of which placements
|
||||
* have been locally executed. At this point, only use local execution
|
||||
* for those placements. That'd help to benefit more from parallelism.
|
||||
*/
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -836,7 +826,7 @@ TaskAccessesLocalNode(Task *task)
|
|||
void
|
||||
ErrorIfTransactionAccessedPlacementsLocally(void)
|
||||
{
|
||||
if (CurrentLocalExecutionStatus == LOCAL_EXECUTION_REQUIRED)
|
||||
if (GetCurrentLocalExecutionStatus() == LOCAL_EXECUTION_REQUIRED)
|
||||
{
|
||||
ereport(ERROR,
|
||||
(errmsg("cannot execute command because a local execution has "
|
||||
|
|
|
@ -86,7 +86,7 @@ EnsureCompatibleLocalExecutionState(List *taskList)
|
|||
* We have LOCAL_EXECUTION_REQUIRED check here to avoid unnecessarily
|
||||
* iterating the task list in AnyTaskAccessesLocalNode.
|
||||
*/
|
||||
if (CurrentLocalExecutionStatus == LOCAL_EXECUTION_REQUIRED &&
|
||||
if (GetCurrentLocalExecutionStatus() == LOCAL_EXECUTION_REQUIRED &&
|
||||
AnyTaskAccessesLocalNode(taskList))
|
||||
{
|
||||
ErrorIfTransactionAccessedPlacementsLocally();
|
||||
|
|
|
@ -178,7 +178,7 @@ IsLocalPlanCachingSupported(Job *currentJob, DistributedPlan *originalDistribute
|
|||
return false;
|
||||
}
|
||||
|
||||
if (CurrentLocalExecutionStatus == LOCAL_EXECUTION_DISABLED)
|
||||
if (GetCurrentLocalExecutionStatus() == LOCAL_EXECUTION_DISABLED)
|
||||
{
|
||||
/* transaction already connected to localhost */
|
||||
return false;
|
||||
|
|
|
@ -31,9 +31,8 @@ typedef enum LocalExecutionStatus
|
|||
LOCAL_EXECUTION_DISABLED
|
||||
} LocalExecutionStatus;
|
||||
|
||||
extern enum LocalExecutionStatus CurrentLocalExecutionStatus;
|
||||
|
||||
/* extern function declarations */
|
||||
extern LocalExecutionStatus GetCurrentLocalExecutionStatus(void);
|
||||
extern uint64 ExecuteLocalTaskList(List *taskList, TupleDestination *defaultTupleDest);
|
||||
extern uint64 ExecuteLocalUtilityTaskList(List *utilityTaskList);
|
||||
extern uint64 ExecuteLocalTaskListExtended(List *taskList, ParamListInfo
|
||||
|
|
Loading…
Reference in New Issue