From dfcf1d07b2b835ecb1e434722f933dc5feb8a177 Mon Sep 17 00:00:00 2001 From: SaitTalhaNisanci Date: Tue, 17 Mar 2020 13:18:10 +0300 Subject: [PATCH] refactor ExecuteLocalTaskList --- .../distributed/executor/adaptive_executor.c | 6 +++- .../distributed/executor/local_executor.c | 31 ++++++++++--------- src/include/distributed/local_executor.h | 4 ++- 3 files changed, 24 insertions(+), 17 deletions(-) diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index 503500560..7e5d4cfe9 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -785,7 +785,11 @@ HasDependentJobs(Job *mainJob) static void RunLocalExecution(CitusScanState *scanState, DistributedExecution *execution) { - uint64 rowsProcessed = ExecuteLocalTaskList(scanState, execution->localTaskList); + EState *estate = ScanStateGetExecutorState(scanState); + uint64 rowsProcessed = ExecuteLocalTaskList(execution->localTaskList, + estate->es_param_list_info, + scanState->distributedPlan, + scanState->tuplestorestate); /* * We're deliberately not setting execution->rowsProcessed here. The main reason diff --git a/src/backend/distributed/executor/local_executor.c b/src/backend/distributed/executor/local_executor.c index 4930ad7af..eb795e0c0 100644 --- a/src/backend/distributed/executor/local_executor.c +++ b/src/backend/distributed/executor/local_executor.c @@ -99,7 +99,6 @@ #include "nodes/params.h" #include "utils/snapmgr.h" - /* controlled via a GUC */ bool EnableLocalExecution = true; bool LogLocalCommands = false; @@ -107,12 +106,12 @@ bool LogLocalCommands = false; bool TransactionAccessedLocalPlacement = false; bool TransactionConnectedToLocalGroup = false; - static void SplitLocalAndRemotePlacements(List *taskPlacementList, List **localTaskPlacementList, List **remoteTaskPlacementList); -static uint64 ExecuteLocalTaskPlan(CitusScanState *scanState, PlannedStmt *taskPlan, - char *queryString); +static uint64 ExecuteLocalTaskPlan(PlannedStmt *taskPlan, char *queryString, + Tuplestorestate *tupleStoreState, ParamListInfo + paramListInfo); static void LogLocalCommand(Task *task); static void ExtractParametersForLocalExecution(ParamListInfo paramListInfo, Oid **parameterTypes, @@ -120,7 +119,6 @@ static void ExtractParametersForLocalExecution(ParamListInfo paramListInfo, static void LocallyExecuteUtilityTask(const char *utilityCommand); static void LocallyExecuteUdfTaskQuery(Query *localUdfCommandQuery); - /* * ExecuteLocalTasks gets a CitusScanState node and list of local tasks. * @@ -130,11 +128,11 @@ static void LocallyExecuteUdfTaskQuery(Query *localUdfCommandQuery); * The function returns totalRowsProcessed. */ uint64 -ExecuteLocalTaskList(CitusScanState *scanState, List *taskList) +ExecuteLocalTaskList(List *taskList, ParamListInfo orig_paramListInfo, + DistributedPlan *distributedPlan, + Tuplestorestate *tupleStoreState) { - EState *executorState = ScanStateGetExecutorState(scanState); - DistributedPlan *distributedPlan = scanState->distributedPlan; - ParamListInfo paramListInfo = copyParamList(executorState->es_param_list_info); + ParamListInfo paramListInfo = copyParamList(orig_paramListInfo); int numParams = 0; Oid *parameterTypes = NULL; uint64 totalRowsProcessed = 0; @@ -224,7 +222,8 @@ ExecuteLocalTaskList(CitusScanState *scanState, List *taskList) : ""; totalRowsProcessed += - ExecuteLocalTaskPlan(scanState, localPlan, shardQueryString); + ExecuteLocalTaskPlan(localPlan, shardQueryString, tupleStoreState, + paramListInfo); } return totalRowsProcessed; @@ -252,7 +251,10 @@ ExtractAndExecuteLocalAndRemoteTasks(CitusScanState *scanState, /* set local (if any) & remote tasks */ ExtractLocalAndRemoteTasks(readOnlyPlan, taskList, &localTaskList, &remoteTaskList); - processedRows += ExecuteLocalTaskList(scanState, localTaskList); + EState *estate = ScanStateGetExecutorState(scanState); + processedRows += ExecuteLocalTaskList(localTaskList, estate->es_param_list_info, + scanState->distributedPlan, + scanState->tuplestorestate); } else { @@ -512,10 +514,9 @@ SplitLocalAndRemotePlacements(List *taskPlacementList, List **localTaskPlacement * tupleStore if necessary. The function returns the */ static uint64 -ExecuteLocalTaskPlan(CitusScanState *scanState, PlannedStmt *taskPlan, char *queryString) +ExecuteLocalTaskPlan(PlannedStmt *taskPlan, char *queryString, + Tuplestorestate *tupleStoreState, ParamListInfo paramListInfo) { - EState *executorState = ScanStateGetExecutorState(scanState); - ParamListInfo paramListInfo = executorState->es_param_list_info; DestReceiver *tupleStoreDestReceiver = CreateDestReceiver(DestTuplestore); ScanDirection scanDirection = ForwardScanDirection; QueryEnvironment *queryEnv = create_queryEnv(); @@ -527,7 +528,7 @@ ExecuteLocalTaskPlan(CitusScanState *scanState, PlannedStmt *taskPlan, char *que * the other task executions and the adaptive executor. */ SetTuplestoreDestReceiverParams(tupleStoreDestReceiver, - scanState->tuplestorestate, + tupleStoreState, CurrentMemoryContext, false); /* Create a QueryDesc for the query */ diff --git a/src/include/distributed/local_executor.h b/src/include/distributed/local_executor.h index 265434b89..a1a1d156b 100644 --- a/src/include/distributed/local_executor.h +++ b/src/include/distributed/local_executor.h @@ -24,7 +24,9 @@ extern bool TransactionConnectedToLocalGroup; extern uint64 ExtractAndExecuteLocalAndRemoteTasks(CitusScanState *scanState, List *taskList, RowModifyLevel rowModifyLevel, bool hasReturning); -extern uint64 ExecuteLocalTaskList(CitusScanState *scanState, List *taskList); +extern uint64 ExecuteLocalTaskList(List *taskList, ParamListInfo paramListInfo, + DistributedPlan *distributedPlan, + Tuplestorestate *tupleStoreState); extern void ExecuteLocalUtilityTaskList(List *localTaskList); extern void ExtractLocalAndRemoteTasks(bool readOnlyPlan, List *taskList, List **localTaskList, List **remoteTaskList);