From 6cd32b0db1b922c25d8f455765a313209a0e3700 Mon Sep 17 00:00:00 2001 From: SaitTalhaNisanci Date: Tue, 31 Mar 2020 19:19:54 +0300 Subject: [PATCH] refactor ExecuteLocalTaskList (#3617) ExecuteLocalTaskList doesn't need scanState as it only uses paramListInfo, distributedPlan and tupleStoreState. It is better to pass only the variables that the function needs, so that we can call this function from other places when we dont have scanState. --- .../distributed/executor/adaptive_executor.c | 6 +- .../distributed/executor/local_executor.c | 58 +++++++++++++------ src/include/distributed/local_executor.h | 7 ++- 3 files changed, 52 insertions(+), 19 deletions(-) diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index 79e181c7b..5640a31b6 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -785,7 +785,11 @@ HasDependentJobs(Job *mainJob) static void RunLocalExecution(CitusScanState *scanState, DistributedExecution *execution) { - uint64 rowsProcessed = ExecuteLocalTaskList(scanState, execution->localTaskList); + EState *estate = ScanStateGetExecutorState(scanState); + uint64 rowsProcessed = ExecuteLocalTaskListExtended(execution->localTaskList, + estate->es_param_list_info, + scanState->distributedPlan, + scanState->tuplestorestate); /* * We're deliberately not setting execution->rowsProcessed here. The main reason diff --git a/src/backend/distributed/executor/local_executor.c b/src/backend/distributed/executor/local_executor.c index f90bd43f6..57f4f4975 100644 --- a/src/backend/distributed/executor/local_executor.c +++ b/src/backend/distributed/executor/local_executor.c @@ -100,7 +100,6 @@ #include "nodes/params.h" #include "utils/snapmgr.h" - /* controlled via a GUC */ bool EnableLocalExecution = true; bool LogLocalCommands = false; @@ -108,12 +107,12 @@ bool LogLocalCommands = false; bool TransactionAccessedLocalPlacement = false; bool TransactionConnectedToLocalGroup = false; - static void SplitLocalAndRemotePlacements(List *taskPlacementList, List **localTaskPlacementList, List **remoteTaskPlacementList); -static uint64 ExecuteLocalTaskPlan(CitusScanState *scanState, PlannedStmt *taskPlan, - char *queryString); +static uint64 ExecuteLocalTaskPlan(PlannedStmt *taskPlan, char *queryString, + Tuplestorestate *tupleStoreState, ParamListInfo + paramListInfo); static void LogLocalCommand(Task *task); static void ExtractParametersForLocalExecution(ParamListInfo paramListInfo, Oid **parameterTypes, @@ -121,21 +120,40 @@ static void ExtractParametersForLocalExecution(ParamListInfo paramListInfo, static void LocallyExecuteUtilityTask(const char *utilityCommand); static void LocallyExecuteUdfTaskQuery(Query *localUdfCommandQuery); - /* - * ExecuteLocalTasks gets a CitusScanState node and list of local tasks. + * ExecuteLocalTasks executes the given tasks locally. * * The function goes over the task list and executes them locally. - * The returning tuples (if any) is stored in the CitusScanState. + * The returning tuples (if any) is stored in the tupleStoreState. * * The function returns totalRowsProcessed. */ uint64 -ExecuteLocalTaskList(CitusScanState *scanState, List *taskList) +ExecuteLocalTaskList(List *taskList, Tuplestorestate *tupleStoreState) { - EState *executorState = ScanStateGetExecutorState(scanState); - DistributedPlan *distributedPlan = scanState->distributedPlan; - ParamListInfo paramListInfo = copyParamList(executorState->es_param_list_info); + DistributedPlan *distributedPlan = NULL; + ParamListInfo paramListInfo = NULL; + return ExecuteLocalTaskListExtended(taskList, paramListInfo, distributedPlan, + tupleStoreState); +} + + +/* + * ExecuteLocalTaskListExtended executes the given tasks locally. + * + * The function goes over the task list and executes them locally. + * The returning tuples (if any) is stored in the tupleStoreState. + * + * It uses a cached plan if distributedPlan is found in cache. + * + * The function returns totalRowsProcessed. + */ +uint64 +ExecuteLocalTaskListExtended(List *taskList, ParamListInfo orig_paramListInfo, + DistributedPlan *distributedPlan, + Tuplestorestate *tupleStoreState) +{ + ParamListInfo paramListInfo = copyParamList(orig_paramListInfo); int numParams = 0; Oid *parameterTypes = NULL; uint64 totalRowsProcessed = 0; @@ -232,7 +250,8 @@ ExecuteLocalTaskList(CitusScanState *scanState, List *taskList) } totalRowsProcessed += - ExecuteLocalTaskPlan(scanState, localPlan, shardQueryString); + ExecuteLocalTaskPlan(localPlan, shardQueryString, tupleStoreState, + paramListInfo); } return totalRowsProcessed; @@ -260,7 +279,13 @@ ExtractAndExecuteLocalAndRemoteTasks(CitusScanState *scanState, /* set local (if any) & remote tasks */ ExtractLocalAndRemoteTasks(readOnlyPlan, taskList, &localTaskList, &remoteTaskList); - processedRows += ExecuteLocalTaskList(scanState, localTaskList); + EState *estate = ScanStateGetExecutorState(scanState); + processedRows += ExecuteLocalTaskListExtended( + localTaskList, + estate->es_param_list_info, + scanState->distributedPlan, + scanState->tuplestorestate + ); } else { @@ -514,10 +539,9 @@ SplitLocalAndRemotePlacements(List *taskPlacementList, List **localTaskPlacement * tupleStore if necessary. The function returns the */ static uint64 -ExecuteLocalTaskPlan(CitusScanState *scanState, PlannedStmt *taskPlan, char *queryString) +ExecuteLocalTaskPlan(PlannedStmt *taskPlan, char *queryString, + Tuplestorestate *tupleStoreState, ParamListInfo paramListInfo) { - EState *executorState = ScanStateGetExecutorState(scanState); - ParamListInfo paramListInfo = executorState->es_param_list_info; DestReceiver *tupleStoreDestReceiver = CreateDestReceiver(DestTuplestore); ScanDirection scanDirection = ForwardScanDirection; QueryEnvironment *queryEnv = create_queryEnv(); @@ -529,7 +553,7 @@ ExecuteLocalTaskPlan(CitusScanState *scanState, PlannedStmt *taskPlan, char *que * the other task executions and the adaptive executor. */ SetTuplestoreDestReceiverParams(tupleStoreDestReceiver, - scanState->tuplestorestate, + tupleStoreState, CurrentMemoryContext, false); /* Create a QueryDesc for the query */ diff --git a/src/include/distributed/local_executor.h b/src/include/distributed/local_executor.h index 265434b89..f4301a888 100644 --- a/src/include/distributed/local_executor.h +++ b/src/include/distributed/local_executor.h @@ -24,7 +24,12 @@ extern bool TransactionConnectedToLocalGroup; extern uint64 ExtractAndExecuteLocalAndRemoteTasks(CitusScanState *scanState, List *taskList, RowModifyLevel rowModifyLevel, bool hasReturning); -extern uint64 ExecuteLocalTaskList(CitusScanState *scanState, List *taskList); +extern uint64 ExecuteLocalTaskList(List *taskList, + Tuplestorestate *tupleStoreState); +extern uint64 ExecuteLocalTaskListExtended(List *taskList, ParamListInfo + orig_paramListInfo, + DistributedPlan *distributedPlan, + Tuplestorestate *tupleStoreState); extern void ExecuteLocalUtilityTaskList(List *localTaskList); extern void ExtractLocalAndRemoteTasks(bool readOnlyPlan, List *taskList, List **localTaskList, List **remoteTaskList);