diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index 79e181c7b..5640a31b6 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -785,7 +785,11 @@ HasDependentJobs(Job *mainJob) static void RunLocalExecution(CitusScanState *scanState, DistributedExecution *execution) { - uint64 rowsProcessed = ExecuteLocalTaskList(scanState, execution->localTaskList); + EState *estate = ScanStateGetExecutorState(scanState); + uint64 rowsProcessed = ExecuteLocalTaskListExtended(execution->localTaskList, + estate->es_param_list_info, + scanState->distributedPlan, + scanState->tuplestorestate); /* * We're deliberately not setting execution->rowsProcessed here. The main reason diff --git a/src/backend/distributed/executor/local_executor.c b/src/backend/distributed/executor/local_executor.c index f90bd43f6..57f4f4975 100644 --- a/src/backend/distributed/executor/local_executor.c +++ b/src/backend/distributed/executor/local_executor.c @@ -100,7 +100,6 @@ #include "nodes/params.h" #include "utils/snapmgr.h" - /* controlled via a GUC */ bool EnableLocalExecution = true; bool LogLocalCommands = false; @@ -108,12 +107,12 @@ bool LogLocalCommands = false; bool TransactionAccessedLocalPlacement = false; bool TransactionConnectedToLocalGroup = false; - static void SplitLocalAndRemotePlacements(List *taskPlacementList, List **localTaskPlacementList, List **remoteTaskPlacementList); -static uint64 ExecuteLocalTaskPlan(CitusScanState *scanState, PlannedStmt *taskPlan, - char *queryString); +static uint64 ExecuteLocalTaskPlan(PlannedStmt *taskPlan, char *queryString, + Tuplestorestate *tupleStoreState, ParamListInfo + paramListInfo); static void LogLocalCommand(Task *task); static void ExtractParametersForLocalExecution(ParamListInfo paramListInfo, Oid **parameterTypes, @@ -121,21 +120,40 @@ static void ExtractParametersForLocalExecution(ParamListInfo paramListInfo, static void LocallyExecuteUtilityTask(const char *utilityCommand); static void LocallyExecuteUdfTaskQuery(Query *localUdfCommandQuery); - /* - * ExecuteLocalTasks gets a CitusScanState node and list of local tasks. + * ExecuteLocalTasks executes the given tasks locally. * * The function goes over the task list and executes them locally. - * The returning tuples (if any) is stored in the CitusScanState. + * The returning tuples (if any) is stored in the tupleStoreState. * * The function returns totalRowsProcessed. */ uint64 -ExecuteLocalTaskList(CitusScanState *scanState, List *taskList) +ExecuteLocalTaskList(List *taskList, Tuplestorestate *tupleStoreState) { - EState *executorState = ScanStateGetExecutorState(scanState); - DistributedPlan *distributedPlan = scanState->distributedPlan; - ParamListInfo paramListInfo = copyParamList(executorState->es_param_list_info); + DistributedPlan *distributedPlan = NULL; + ParamListInfo paramListInfo = NULL; + return ExecuteLocalTaskListExtended(taskList, paramListInfo, distributedPlan, + tupleStoreState); +} + + +/* + * ExecuteLocalTaskListExtended executes the given tasks locally. + * + * The function goes over the task list and executes them locally. + * The returning tuples (if any) is stored in the tupleStoreState. + * + * It uses a cached plan if distributedPlan is found in cache. + * + * The function returns totalRowsProcessed. + */ +uint64 +ExecuteLocalTaskListExtended(List *taskList, ParamListInfo orig_paramListInfo, + DistributedPlan *distributedPlan, + Tuplestorestate *tupleStoreState) +{ + ParamListInfo paramListInfo = copyParamList(orig_paramListInfo); int numParams = 0; Oid *parameterTypes = NULL; uint64 totalRowsProcessed = 0; @@ -232,7 +250,8 @@ ExecuteLocalTaskList(CitusScanState *scanState, List *taskList) } totalRowsProcessed += - ExecuteLocalTaskPlan(scanState, localPlan, shardQueryString); + ExecuteLocalTaskPlan(localPlan, shardQueryString, tupleStoreState, + paramListInfo); } return totalRowsProcessed; @@ -260,7 +279,13 @@ ExtractAndExecuteLocalAndRemoteTasks(CitusScanState *scanState, /* set local (if any) & remote tasks */ ExtractLocalAndRemoteTasks(readOnlyPlan, taskList, &localTaskList, &remoteTaskList); - processedRows += ExecuteLocalTaskList(scanState, localTaskList); + EState *estate = ScanStateGetExecutorState(scanState); + processedRows += ExecuteLocalTaskListExtended( + localTaskList, + estate->es_param_list_info, + scanState->distributedPlan, + scanState->tuplestorestate + ); } else { @@ -514,10 +539,9 @@ SplitLocalAndRemotePlacements(List *taskPlacementList, List **localTaskPlacement * tupleStore if necessary. The function returns the */ static uint64 -ExecuteLocalTaskPlan(CitusScanState *scanState, PlannedStmt *taskPlan, char *queryString) +ExecuteLocalTaskPlan(PlannedStmt *taskPlan, char *queryString, + Tuplestorestate *tupleStoreState, ParamListInfo paramListInfo) { - EState *executorState = ScanStateGetExecutorState(scanState); - ParamListInfo paramListInfo = executorState->es_param_list_info; DestReceiver *tupleStoreDestReceiver = CreateDestReceiver(DestTuplestore); ScanDirection scanDirection = ForwardScanDirection; QueryEnvironment *queryEnv = create_queryEnv(); @@ -529,7 +553,7 @@ ExecuteLocalTaskPlan(CitusScanState *scanState, PlannedStmt *taskPlan, char *que * the other task executions and the adaptive executor. */ SetTuplestoreDestReceiverParams(tupleStoreDestReceiver, - scanState->tuplestorestate, + tupleStoreState, CurrentMemoryContext, false); /* Create a QueryDesc for the query */ diff --git a/src/include/distributed/local_executor.h b/src/include/distributed/local_executor.h index 265434b89..f4301a888 100644 --- a/src/include/distributed/local_executor.h +++ b/src/include/distributed/local_executor.h @@ -24,7 +24,12 @@ extern bool TransactionConnectedToLocalGroup; extern uint64 ExtractAndExecuteLocalAndRemoteTasks(CitusScanState *scanState, List *taskList, RowModifyLevel rowModifyLevel, bool hasReturning); -extern uint64 ExecuteLocalTaskList(CitusScanState *scanState, List *taskList); +extern uint64 ExecuteLocalTaskList(List *taskList, + Tuplestorestate *tupleStoreState); +extern uint64 ExecuteLocalTaskListExtended(List *taskList, ParamListInfo + orig_paramListInfo, + DistributedPlan *distributedPlan, + Tuplestorestate *tupleStoreState); extern void ExecuteLocalUtilityTaskList(List *localTaskList); extern void ExtractLocalAndRemoteTasks(bool readOnlyPlan, List *taskList, List **localTaskList, List **remoteTaskList);