refactor ExecuteLocalTaskList (#3617)

ExecuteLocalTaskList doesn't need scanState as it only uses
paramListInfo, distributedPlan and tupleStoreState. It is better to pass
only the variables that the function needs, so that we can call this
function from other places when we dont have scanState.
pull/3682/head^2
SaitTalhaNisanci 2020-03-31 19:19:54 +03:00 committed by GitHub
parent 96358079ac
commit 6cd32b0db1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 52 additions and 19 deletions

View File

@ -785,7 +785,11 @@ HasDependentJobs(Job *mainJob)
static void static void
RunLocalExecution(CitusScanState *scanState, DistributedExecution *execution) RunLocalExecution(CitusScanState *scanState, DistributedExecution *execution)
{ {
uint64 rowsProcessed = ExecuteLocalTaskList(scanState, execution->localTaskList); EState *estate = ScanStateGetExecutorState(scanState);
uint64 rowsProcessed = ExecuteLocalTaskListExtended(execution->localTaskList,
estate->es_param_list_info,
scanState->distributedPlan,
scanState->tuplestorestate);
/* /*
* We're deliberately not setting execution->rowsProcessed here. The main reason * We're deliberately not setting execution->rowsProcessed here. The main reason

View File

@ -100,7 +100,6 @@
#include "nodes/params.h" #include "nodes/params.h"
#include "utils/snapmgr.h" #include "utils/snapmgr.h"
/* controlled via a GUC */ /* controlled via a GUC */
bool EnableLocalExecution = true; bool EnableLocalExecution = true;
bool LogLocalCommands = false; bool LogLocalCommands = false;
@ -108,12 +107,12 @@ bool LogLocalCommands = false;
bool TransactionAccessedLocalPlacement = false; bool TransactionAccessedLocalPlacement = false;
bool TransactionConnectedToLocalGroup = false; bool TransactionConnectedToLocalGroup = false;
static void SplitLocalAndRemotePlacements(List *taskPlacementList, static void SplitLocalAndRemotePlacements(List *taskPlacementList,
List **localTaskPlacementList, List **localTaskPlacementList,
List **remoteTaskPlacementList); List **remoteTaskPlacementList);
static uint64 ExecuteLocalTaskPlan(CitusScanState *scanState, PlannedStmt *taskPlan, static uint64 ExecuteLocalTaskPlan(PlannedStmt *taskPlan, char *queryString,
char *queryString); Tuplestorestate *tupleStoreState, ParamListInfo
paramListInfo);
static void LogLocalCommand(Task *task); static void LogLocalCommand(Task *task);
static void ExtractParametersForLocalExecution(ParamListInfo paramListInfo, static void ExtractParametersForLocalExecution(ParamListInfo paramListInfo,
Oid **parameterTypes, Oid **parameterTypes,
@ -121,21 +120,40 @@ static void ExtractParametersForLocalExecution(ParamListInfo paramListInfo,
static void LocallyExecuteUtilityTask(const char *utilityCommand); static void LocallyExecuteUtilityTask(const char *utilityCommand);
static void LocallyExecuteUdfTaskQuery(Query *localUdfCommandQuery); static void LocallyExecuteUdfTaskQuery(Query *localUdfCommandQuery);
/* /*
* ExecuteLocalTasks gets a CitusScanState node and list of local tasks. * ExecuteLocalTasks executes the given tasks locally.
* *
* The function goes over the task list and executes them locally. * The function goes over the task list and executes them locally.
* The returning tuples (if any) is stored in the CitusScanState. * The returning tuples (if any) is stored in the tupleStoreState.
* *
* The function returns totalRowsProcessed. * The function returns totalRowsProcessed.
*/ */
uint64 uint64
ExecuteLocalTaskList(CitusScanState *scanState, List *taskList) ExecuteLocalTaskList(List *taskList, Tuplestorestate *tupleStoreState)
{ {
EState *executorState = ScanStateGetExecutorState(scanState); DistributedPlan *distributedPlan = NULL;
DistributedPlan *distributedPlan = scanState->distributedPlan; ParamListInfo paramListInfo = NULL;
ParamListInfo paramListInfo = copyParamList(executorState->es_param_list_info); return ExecuteLocalTaskListExtended(taskList, paramListInfo, distributedPlan,
tupleStoreState);
}
/*
* ExecuteLocalTaskListExtended executes the given tasks locally.
*
* The function goes over the task list and executes them locally.
* The returning tuples (if any) is stored in the tupleStoreState.
*
* It uses a cached plan if distributedPlan is found in cache.
*
* The function returns totalRowsProcessed.
*/
uint64
ExecuteLocalTaskListExtended(List *taskList, ParamListInfo orig_paramListInfo,
DistributedPlan *distributedPlan,
Tuplestorestate *tupleStoreState)
{
ParamListInfo paramListInfo = copyParamList(orig_paramListInfo);
int numParams = 0; int numParams = 0;
Oid *parameterTypes = NULL; Oid *parameterTypes = NULL;
uint64 totalRowsProcessed = 0; uint64 totalRowsProcessed = 0;
@ -232,7 +250,8 @@ ExecuteLocalTaskList(CitusScanState *scanState, List *taskList)
} }
totalRowsProcessed += totalRowsProcessed +=
ExecuteLocalTaskPlan(scanState, localPlan, shardQueryString); ExecuteLocalTaskPlan(localPlan, shardQueryString, tupleStoreState,
paramListInfo);
} }
return totalRowsProcessed; return totalRowsProcessed;
@ -260,7 +279,13 @@ ExtractAndExecuteLocalAndRemoteTasks(CitusScanState *scanState,
/* set local (if any) & remote tasks */ /* set local (if any) & remote tasks */
ExtractLocalAndRemoteTasks(readOnlyPlan, taskList, &localTaskList, ExtractLocalAndRemoteTasks(readOnlyPlan, taskList, &localTaskList,
&remoteTaskList); &remoteTaskList);
processedRows += ExecuteLocalTaskList(scanState, localTaskList); EState *estate = ScanStateGetExecutorState(scanState);
processedRows += ExecuteLocalTaskListExtended(
localTaskList,
estate->es_param_list_info,
scanState->distributedPlan,
scanState->tuplestorestate
);
} }
else else
{ {
@ -514,10 +539,9 @@ SplitLocalAndRemotePlacements(List *taskPlacementList, List **localTaskPlacement
* tupleStore if necessary. The function returns the * tupleStore if necessary. The function returns the
*/ */
static uint64 static uint64
ExecuteLocalTaskPlan(CitusScanState *scanState, PlannedStmt *taskPlan, char *queryString) ExecuteLocalTaskPlan(PlannedStmt *taskPlan, char *queryString,
Tuplestorestate *tupleStoreState, ParamListInfo paramListInfo)
{ {
EState *executorState = ScanStateGetExecutorState(scanState);
ParamListInfo paramListInfo = executorState->es_param_list_info;
DestReceiver *tupleStoreDestReceiver = CreateDestReceiver(DestTuplestore); DestReceiver *tupleStoreDestReceiver = CreateDestReceiver(DestTuplestore);
ScanDirection scanDirection = ForwardScanDirection; ScanDirection scanDirection = ForwardScanDirection;
QueryEnvironment *queryEnv = create_queryEnv(); QueryEnvironment *queryEnv = create_queryEnv();
@ -529,7 +553,7 @@ ExecuteLocalTaskPlan(CitusScanState *scanState, PlannedStmt *taskPlan, char *que
* the other task executions and the adaptive executor. * the other task executions and the adaptive executor.
*/ */
SetTuplestoreDestReceiverParams(tupleStoreDestReceiver, SetTuplestoreDestReceiverParams(tupleStoreDestReceiver,
scanState->tuplestorestate, tupleStoreState,
CurrentMemoryContext, false); CurrentMemoryContext, false);
/* Create a QueryDesc for the query */ /* Create a QueryDesc for the query */

View File

@ -24,7 +24,12 @@ extern bool TransactionConnectedToLocalGroup;
extern uint64 ExtractAndExecuteLocalAndRemoteTasks(CitusScanState *scanState, extern uint64 ExtractAndExecuteLocalAndRemoteTasks(CitusScanState *scanState,
List *taskList, RowModifyLevel List *taskList, RowModifyLevel
rowModifyLevel, bool hasReturning); rowModifyLevel, bool hasReturning);
extern uint64 ExecuteLocalTaskList(CitusScanState *scanState, List *taskList); extern uint64 ExecuteLocalTaskList(List *taskList,
Tuplestorestate *tupleStoreState);
extern uint64 ExecuteLocalTaskListExtended(List *taskList, ParamListInfo
orig_paramListInfo,
DistributedPlan *distributedPlan,
Tuplestorestate *tupleStoreState);
extern void ExecuteLocalUtilityTaskList(List *localTaskList); extern void ExecuteLocalUtilityTaskList(List *localTaskList);
extern void ExtractLocalAndRemoteTasks(bool readOnlyPlan, List *taskList, extern void ExtractLocalAndRemoteTasks(bool readOnlyPlan, List *taskList,
List **localTaskList, List **remoteTaskList); List **localTaskList, List **remoteTaskList);