Merge pull request #4233 from citusdata/introduce_get_local_execution_status

Introduce GetCurrentLocalExecutionStatus wrapper
pull/4246/head
SaitTalhaNisanci 2020-10-15 15:55:46 +03:00 committed by GitHub
commit b5a3526c07
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 34 additions and 45 deletions

View File

@ -2097,7 +2097,7 @@ ShouldExecuteCopyLocally(bool isIntermediateResult)
return false; return false;
} }
if (CurrentLocalExecutionStatus == LOCAL_EXECUTION_REQUIRED) if (GetCurrentLocalExecutionStatus() == LOCAL_EXECUTION_REQUIRED)
{ {
/* /*
* For various reasons, including the transaction visibility * For various reasons, including the transaction visibility
@ -2120,7 +2120,7 @@ ShouldExecuteCopyLocally(bool isIntermediateResult)
} }
/* if we connected to the localhost via a connection, we might not be able to see some previous changes that are done via the connection */ /* if we connected to the localhost via a connection, we might not be able to see some previous changes that are done via the connection */
return CurrentLocalExecutionStatus != LOCAL_EXECUTION_DISABLED && return GetCurrentLocalExecutionStatus() != LOCAL_EXECUTION_DISABLED &&
IsMultiStatementTransaction(); IsMultiStatementTransaction();
} }

View File

@ -1004,7 +1004,7 @@ ExecuteTaskListExtended(ExecutionParams *executionParams)
* then we should error out as it would cause inconsistencies across the * then we should error out as it would cause inconsistencies across the
* remote connection and local execution. * remote connection and local execution.
*/ */
if (CurrentLocalExecutionStatus == LOCAL_EXECUTION_REQUIRED && if (GetCurrentLocalExecutionStatus() == LOCAL_EXECUTION_REQUIRED &&
AnyTaskAccessesLocalNode(remoteTaskList)) AnyTaskAccessesLocalNode(remoteTaskList))
{ {
ErrorIfTransactionAccessedPlacementsLocally(); ErrorIfTransactionAccessedPlacementsLocally();
@ -1187,7 +1187,7 @@ DecideTransactionPropertiesForTaskList(RowModifyLevel modLevel, List *taskList,
return xactProperties; return xactProperties;
} }
if (CurrentLocalExecutionStatus == LOCAL_EXECUTION_REQUIRED) if (GetCurrentLocalExecutionStatus() == LOCAL_EXECUTION_REQUIRED)
{ {
/* /*
* In case localExecutionHappened, we force the executor to use 2PC. * In case localExecutionHappened, we force the executor to use 2PC.

View File

@ -110,7 +110,7 @@
bool EnableLocalExecution = true; bool EnableLocalExecution = true;
bool LogLocalCommands = false; bool LogLocalCommands = false;
LocalExecutionStatus CurrentLocalExecutionStatus = LOCAL_EXECUTION_OPTIONAL; static LocalExecutionStatus CurrentLocalExecutionStatus = LOCAL_EXECUTION_OPTIONAL;
static void SplitLocalAndRemotePlacements(List *taskPlacementList, static void SplitLocalAndRemotePlacements(List *taskPlacementList,
List **localTaskPlacementList, List **localTaskPlacementList,
@ -131,6 +131,17 @@ static void LocallyExecuteUdfTaskQuery(Query *localUdfCommandQuery);
static void EnsureTransitionPossible(LocalExecutionStatus from, static void EnsureTransitionPossible(LocalExecutionStatus from,
LocalExecutionStatus to); LocalExecutionStatus to);
/*
* GetCurrentLocalExecutionStatus returns the current local execution status.
*/
LocalExecutionStatus
GetCurrentLocalExecutionStatus(void)
{
return CurrentLocalExecutionStatus;
}
/* /*
* ExecuteLocalTasks executes the given tasks locally. * ExecuteLocalTasks executes the given tasks locally.
* *
@ -649,7 +660,7 @@ RecordNonDistTableAccessesForTask(Task *task)
void void
SetLocalExecutionStatus(LocalExecutionStatus newStatus) SetLocalExecutionStatus(LocalExecutionStatus newStatus)
{ {
EnsureTransitionPossible(CurrentLocalExecutionStatus, newStatus); EnsureTransitionPossible(GetCurrentLocalExecutionStatus(), newStatus);
CurrentLocalExecutionStatus = newStatus; CurrentLocalExecutionStatus = newStatus;
} }
@ -695,7 +706,7 @@ ShouldExecuteTasksLocally(List *taskList)
return false; return false;
} }
if (CurrentLocalExecutionStatus == LOCAL_EXECUTION_DISABLED) if (GetCurrentLocalExecutionStatus() == LOCAL_EXECUTION_DISABLED)
{ {
/* /*
* if the current transaction accessed the local node over a connection * if the current transaction accessed the local node over a connection
@ -704,42 +715,21 @@ ShouldExecuteTasksLocally(List *taskList)
return false; return false;
} }
if (CurrentLocalExecutionStatus == LOCAL_EXECUTION_REQUIRED) if (GetCurrentLocalExecutionStatus() == LOCAL_EXECUTION_REQUIRED)
{ {
bool isValidLocalExecutionPath PG_USED_FOR_ASSERTS_ONLY = false;
/* /*
* For various reasons, including the transaction visibility * If we already used local execution for a previous command
* rules (e.g., read-your-own-writes), we have to use local * we should stick to it for read-your-writes policy, this can be a
* execution again if it has already happened within this * case when we are inside a transaction block. Such as:
* transaction block. *
* BEGIN;
* some-command; -- executed via local execution
* another-command; -- this should be executed via local execution for visibility
* COMMIT;
*
* We may need to use local execution even if we are not inside a transaction block,
* however the state will go back to LOCAL_EXECUTION_OPTIONAL at the end of transaction.
*/ */
isValidLocalExecutionPath = IsMultiStatementTransaction() ||
InCoordinatedTransaction();
/*
* In some cases, such as when a single command leads to a local
* command execution followed by remote task (list) execution, we
* still expect the remote execution to first try local execution
* as TransactionAccessedLocalPlacement is set by the local execution.
* The remote execution shouldn't create any local tasks as the local
* execution should have executed all the local tasks. And, we are
* ensuring it here.
*/
isValidLocalExecutionPath |= !AnyTaskAccessesLocalNode(taskList);
/*
* We might error out later in the execution if it is not suitable
* to execute the tasks locally.
*/
Assert(isValidLocalExecutionPath);
/*
* TODO: A future improvement could be to keep track of which placements
* have been locally executed. At this point, only use local execution
* for those placements. That'd help to benefit more from parallelism.
*/
return true; return true;
} }
@ -836,7 +826,7 @@ TaskAccessesLocalNode(Task *task)
void void
ErrorIfTransactionAccessedPlacementsLocally(void) ErrorIfTransactionAccessedPlacementsLocally(void)
{ {
if (CurrentLocalExecutionStatus == LOCAL_EXECUTION_REQUIRED) if (GetCurrentLocalExecutionStatus() == LOCAL_EXECUTION_REQUIRED)
{ {
ereport(ERROR, ereport(ERROR,
(errmsg("cannot execute command because a local execution has " (errmsg("cannot execute command because a local execution has "

View File

@ -86,7 +86,7 @@ EnsureCompatibleLocalExecutionState(List *taskList)
* We have LOCAL_EXECUTION_REQUIRED check here to avoid unnecessarily * We have LOCAL_EXECUTION_REQUIRED check here to avoid unnecessarily
* iterating the task list in AnyTaskAccessesLocalNode. * iterating the task list in AnyTaskAccessesLocalNode.
*/ */
if (CurrentLocalExecutionStatus == LOCAL_EXECUTION_REQUIRED && if (GetCurrentLocalExecutionStatus() == LOCAL_EXECUTION_REQUIRED &&
AnyTaskAccessesLocalNode(taskList)) AnyTaskAccessesLocalNode(taskList))
{ {
ErrorIfTransactionAccessedPlacementsLocally(); ErrorIfTransactionAccessedPlacementsLocally();

View File

@ -178,7 +178,7 @@ IsLocalPlanCachingSupported(Job *currentJob, DistributedPlan *originalDistribute
return false; return false;
} }
if (CurrentLocalExecutionStatus == LOCAL_EXECUTION_DISABLED) if (GetCurrentLocalExecutionStatus() == LOCAL_EXECUTION_DISABLED)
{ {
/* transaction already connected to localhost */ /* transaction already connected to localhost */
return false; return false;

View File

@ -31,9 +31,8 @@ typedef enum LocalExecutionStatus
LOCAL_EXECUTION_DISABLED LOCAL_EXECUTION_DISABLED
} LocalExecutionStatus; } LocalExecutionStatus;
extern enum LocalExecutionStatus CurrentLocalExecutionStatus;
/* extern function declarations */ /* extern function declarations */
extern LocalExecutionStatus GetCurrentLocalExecutionStatus(void);
extern uint64 ExecuteLocalTaskList(List *taskList, TupleDestination *defaultTupleDest); extern uint64 ExecuteLocalTaskList(List *taskList, TupleDestination *defaultTupleDest);
extern uint64 ExecuteLocalUtilityTaskList(List *utilityTaskList); extern uint64 ExecuteLocalUtilityTaskList(List *utilityTaskList);
extern uint64 ExecuteLocalTaskListExtended(List *taskList, ParamListInfo extern uint64 ExecuteLocalTaskListExtended(List *taskList, ParamListInfo