refactor ExecuteLocalTaskList

enh/localExecuteSelectInto
SaitTalhaNisanci 2020-03-17 13:18:10 +03:00
parent 3b7959a763
commit dfcf1d07b2
3 changed files with 24 additions and 17 deletions

View File

@ -785,7 +785,11 @@ HasDependentJobs(Job *mainJob)
static void static void
RunLocalExecution(CitusScanState *scanState, DistributedExecution *execution) RunLocalExecution(CitusScanState *scanState, DistributedExecution *execution)
{ {
uint64 rowsProcessed = ExecuteLocalTaskList(scanState, execution->localTaskList); EState *estate = ScanStateGetExecutorState(scanState);
uint64 rowsProcessed = ExecuteLocalTaskList(execution->localTaskList,
estate->es_param_list_info,
scanState->distributedPlan,
scanState->tuplestorestate);
/* /*
* We're deliberately not setting execution->rowsProcessed here. The main reason * We're deliberately not setting execution->rowsProcessed here. The main reason

View File

@ -99,7 +99,6 @@
#include "nodes/params.h" #include "nodes/params.h"
#include "utils/snapmgr.h" #include "utils/snapmgr.h"
/* controlled via a GUC */ /* controlled via a GUC */
bool EnableLocalExecution = true; bool EnableLocalExecution = true;
bool LogLocalCommands = false; bool LogLocalCommands = false;
@ -107,12 +106,12 @@ bool LogLocalCommands = false;
bool TransactionAccessedLocalPlacement = false; bool TransactionAccessedLocalPlacement = false;
bool TransactionConnectedToLocalGroup = false; bool TransactionConnectedToLocalGroup = false;
static void SplitLocalAndRemotePlacements(List *taskPlacementList, static void SplitLocalAndRemotePlacements(List *taskPlacementList,
List **localTaskPlacementList, List **localTaskPlacementList,
List **remoteTaskPlacementList); List **remoteTaskPlacementList);
static uint64 ExecuteLocalTaskPlan(CitusScanState *scanState, PlannedStmt *taskPlan, static uint64 ExecuteLocalTaskPlan(PlannedStmt *taskPlan, char *queryString,
char *queryString); Tuplestorestate *tupleStoreState, ParamListInfo
paramListInfo);
static void LogLocalCommand(Task *task); static void LogLocalCommand(Task *task);
static void ExtractParametersForLocalExecution(ParamListInfo paramListInfo, static void ExtractParametersForLocalExecution(ParamListInfo paramListInfo,
Oid **parameterTypes, Oid **parameterTypes,
@ -120,7 +119,6 @@ static void ExtractParametersForLocalExecution(ParamListInfo paramListInfo,
static void LocallyExecuteUtilityTask(const char *utilityCommand); static void LocallyExecuteUtilityTask(const char *utilityCommand);
static void LocallyExecuteUdfTaskQuery(Query *localUdfCommandQuery); static void LocallyExecuteUdfTaskQuery(Query *localUdfCommandQuery);
/* /*
* ExecuteLocalTasks gets a CitusScanState node and list of local tasks. * ExecuteLocalTasks gets a CitusScanState node and list of local tasks.
* *
@ -130,11 +128,11 @@ static void LocallyExecuteUdfTaskQuery(Query *localUdfCommandQuery);
* The function returns totalRowsProcessed. * The function returns totalRowsProcessed.
*/ */
uint64 uint64
ExecuteLocalTaskList(CitusScanState *scanState, List *taskList) ExecuteLocalTaskList(List *taskList, ParamListInfo orig_paramListInfo,
DistributedPlan *distributedPlan,
Tuplestorestate *tupleStoreState)
{ {
EState *executorState = ScanStateGetExecutorState(scanState); ParamListInfo paramListInfo = copyParamList(orig_paramListInfo);
DistributedPlan *distributedPlan = scanState->distributedPlan;
ParamListInfo paramListInfo = copyParamList(executorState->es_param_list_info);
int numParams = 0; int numParams = 0;
Oid *parameterTypes = NULL; Oid *parameterTypes = NULL;
uint64 totalRowsProcessed = 0; uint64 totalRowsProcessed = 0;
@ -224,7 +222,8 @@ ExecuteLocalTaskList(CitusScanState *scanState, List *taskList)
: "<optimized out by local execution>"; : "<optimized out by local execution>";
totalRowsProcessed += totalRowsProcessed +=
ExecuteLocalTaskPlan(scanState, localPlan, shardQueryString); ExecuteLocalTaskPlan(localPlan, shardQueryString, tupleStoreState,
paramListInfo);
} }
return totalRowsProcessed; return totalRowsProcessed;
@ -252,7 +251,10 @@ ExtractAndExecuteLocalAndRemoteTasks(CitusScanState *scanState,
/* set local (if any) & remote tasks */ /* set local (if any) & remote tasks */
ExtractLocalAndRemoteTasks(readOnlyPlan, taskList, &localTaskList, ExtractLocalAndRemoteTasks(readOnlyPlan, taskList, &localTaskList,
&remoteTaskList); &remoteTaskList);
processedRows += ExecuteLocalTaskList(scanState, localTaskList); EState *estate = ScanStateGetExecutorState(scanState);
processedRows += ExecuteLocalTaskList(localTaskList, estate->es_param_list_info,
scanState->distributedPlan,
scanState->tuplestorestate);
} }
else else
{ {
@ -512,10 +514,9 @@ SplitLocalAndRemotePlacements(List *taskPlacementList, List **localTaskPlacement
* tupleStore if necessary. The function returns the * tupleStore if necessary. The function returns the
*/ */
static uint64 static uint64
ExecuteLocalTaskPlan(CitusScanState *scanState, PlannedStmt *taskPlan, char *queryString) ExecuteLocalTaskPlan(PlannedStmt *taskPlan, char *queryString,
Tuplestorestate *tupleStoreState, ParamListInfo paramListInfo)
{ {
EState *executorState = ScanStateGetExecutorState(scanState);
ParamListInfo paramListInfo = executorState->es_param_list_info;
DestReceiver *tupleStoreDestReceiver = CreateDestReceiver(DestTuplestore); DestReceiver *tupleStoreDestReceiver = CreateDestReceiver(DestTuplestore);
ScanDirection scanDirection = ForwardScanDirection; ScanDirection scanDirection = ForwardScanDirection;
QueryEnvironment *queryEnv = create_queryEnv(); QueryEnvironment *queryEnv = create_queryEnv();
@ -527,7 +528,7 @@ ExecuteLocalTaskPlan(CitusScanState *scanState, PlannedStmt *taskPlan, char *que
* the other task executions and the adaptive executor. * the other task executions and the adaptive executor.
*/ */
SetTuplestoreDestReceiverParams(tupleStoreDestReceiver, SetTuplestoreDestReceiverParams(tupleStoreDestReceiver,
scanState->tuplestorestate, tupleStoreState,
CurrentMemoryContext, false); CurrentMemoryContext, false);
/* Create a QueryDesc for the query */ /* Create a QueryDesc for the query */

View File

@ -24,7 +24,9 @@ extern bool TransactionConnectedToLocalGroup;
extern uint64 ExtractAndExecuteLocalAndRemoteTasks(CitusScanState *scanState, extern uint64 ExtractAndExecuteLocalAndRemoteTasks(CitusScanState *scanState,
List *taskList, RowModifyLevel List *taskList, RowModifyLevel
rowModifyLevel, bool hasReturning); rowModifyLevel, bool hasReturning);
extern uint64 ExecuteLocalTaskList(CitusScanState *scanState, List *taskList); extern uint64 ExecuteLocalTaskList(List *taskList, ParamListInfo paramListInfo,
DistributedPlan *distributedPlan,
Tuplestorestate *tupleStoreState);
extern void ExecuteLocalUtilityTaskList(List *localTaskList); extern void ExecuteLocalUtilityTaskList(List *localTaskList);
extern void ExtractLocalAndRemoteTasks(bool readOnlyPlan, List *taskList, extern void ExtractLocalAndRemoteTasks(bool readOnlyPlan, List *taskList,
List **localTaskList, List **remoteTaskList); List **localTaskList, List **remoteTaskList);