add execution params struct (#3747)

We had 9+ parameters in some of the functions related to execution.
Execution params is created to simplify this a bit so that we can set
only the fields that we are interested in and it is easier to read.
pull/3017/head
SaitTalhaNisanci 2020-04-14 14:32:40 +03:00 committed by GitHub
parent d58b5e67c1
commit 132efdbc56
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 126 additions and 56 deletions

View File

@ -187,10 +187,15 @@ CallFuncExprRemotely(CallStmt *callStmt, DistObjectCacheEntry *procedure,
};
bool localExecutionSupported = true;
ExecuteTaskListExtended(ROW_MODIFY_NONE, list_make1(task),
tupleDesc, tupleStore, hasReturning,
MaxAdaptiveExecutorPoolSize,
&xactProperties, NIL, localExecutionSupported);
ExecutionParams *executionParams = CreateBasicExecutionParams(
ROW_MODIFY_NONE, list_make1(task), MaxAdaptiveExecutorPoolSize,
localExecutionSupported
);
executionParams->tupleStore = tupleStore;
executionParams->tupleDescriptor = tupleDesc;
executionParams->hasReturning = hasReturning;
executionParams->xactProperties = xactProperties;
ExecuteTaskListExtended(executionParams);
while (tuplestore_gettupleslot(tupleStore, true, false, slot))
{

View File

@ -881,13 +881,6 @@ uint64
ExecuteTaskListOutsideTransaction(RowModifyLevel modLevel, List *taskList,
int targetPoolSize, List *jobIdList)
{
TupleDesc tupleDescriptor = NULL;
Tuplestorestate *tupleStore = NULL;
bool hasReturning = false;
TransactionProperties xactProperties =
DecideTransactionPropertiesForTaskList(modLevel, taskList, true);
/*
* As we are going to run the tasks outside transaction, we shouldn't use local execution.
* However, there is some problem when using local execution related to
@ -895,9 +888,13 @@ ExecuteTaskListOutsideTransaction(RowModifyLevel modLevel, List *taskList,
* coming to this path with local execution. See PR:3711
*/
bool localExecutionSupported = false;
return ExecuteTaskListExtended(modLevel, taskList, tupleDescriptor,
tupleStore, hasReturning, targetPoolSize,
&xactProperties, jobIdList, localExecutionSupported);
ExecutionParams *executionParams = CreateBasicExecutionParams(
modLevel, taskList, targetPoolSize, localExecutionSupported
);
executionParams->xactProperties = DecideTransactionPropertiesForTaskList(
modLevel, taskList, true);
return ExecuteTaskListExtended(executionParams);
}
@ -906,19 +903,15 @@ ExecuteTaskListOutsideTransaction(RowModifyLevel modLevel, List *taskList,
* for some of the arguments.
*/
uint64
ExecuteTaskList(RowModifyLevel modLevel, List *taskList, int targetPoolSize, bool
localExecutionSupported)
ExecuteTaskList(RowModifyLevel modLevel, List *taskList,
int targetPoolSize, bool localExecutionSupported)
{
TupleDesc tupleDescriptor = NULL;
Tuplestorestate *tupleStore = NULL;
bool hasReturning = false;
TransactionProperties xactProperties =
DecideTransactionPropertiesForTaskList(modLevel, taskList, false);
return ExecuteTaskListExtended(modLevel, taskList, tupleDescriptor,
tupleStore, hasReturning, targetPoolSize,
&xactProperties, NIL, localExecutionSupported);
ExecutionParams *executionParams = CreateBasicExecutionParams(
modLevel, taskList, targetPoolSize, localExecutionSupported
);
executionParams->xactProperties = DecideTransactionPropertiesForTaskList(
modLevel, taskList, false);
return ExecuteTaskListExtended(executionParams);
}
@ -932,13 +925,18 @@ ExecuteTaskListIntoTupleStore(RowModifyLevel modLevel, List *taskList,
bool hasReturning)
{
int targetPoolSize = MaxAdaptiveExecutorPoolSize;
TransactionProperties xactProperties = DecideTransactionPropertiesForTaskList(
modLevel, taskList, false);
bool localExecutionSupported = true;
return ExecuteTaskListExtended(modLevel, taskList, tupleDescriptor,
tupleStore, hasReturning, targetPoolSize,
&xactProperties, NIL, localExecutionSupported);
ExecutionParams *executionParams = CreateBasicExecutionParams(
modLevel, taskList, targetPoolSize, localExecutionSupported
);
executionParams->xactProperties = DecideTransactionPropertiesForTaskList(
modLevel, taskList, false);
executionParams->hasReturning = hasReturning;
executionParams->tupleStore = tupleStore;
executionParams->tupleDescriptor = tupleDescriptor;
return ExecuteTaskListExtended(executionParams);
}
@ -947,27 +945,24 @@ ExecuteTaskListIntoTupleStore(RowModifyLevel modLevel, List *taskList,
* runs it.
*/
uint64
ExecuteTaskListExtended(RowModifyLevel modLevel, List *taskList,
TupleDesc tupleDescriptor, Tuplestorestate *tupleStore,
bool hasReturning, int targetPoolSize,
TransactionProperties *xactProperties,
List *jobIdList,
bool localExecutionSupported)
ExecuteTaskListExtended(ExecutionParams *executionParams)
{
ParamListInfo paramListInfo = NULL;
uint64 locallyProcessedRows = 0;
List *localTaskList = NIL;
List *remoteTaskList = NIL;
if (localExecutionSupported && ShouldExecuteTasksLocally(taskList))
if (executionParams->localExecutionSupported && ShouldExecuteTasksLocally(
executionParams->taskList))
{
bool readOnlyPlan = false;
ExtractLocalAndRemoteTasks(readOnlyPlan, taskList, &localTaskList,
ExtractLocalAndRemoteTasks(readOnlyPlan, executionParams->taskList,
&localTaskList,
&remoteTaskList);
}
else
{
remoteTaskList = taskList;
remoteTaskList = executionParams->taskList;
}
/*
@ -982,17 +977,21 @@ ExecuteTaskListExtended(RowModifyLevel modLevel, List *taskList,
ErrorIfTransactionAccessedPlacementsLocally();
}
locallyProcessedRows += ExecuteLocalTaskList(localTaskList, tupleStore);
locallyProcessedRows += ExecuteLocalTaskList(localTaskList,
executionParams->tupleStore);
if (MultiShardConnectionType == SEQUENTIAL_CONNECTION)
{
targetPoolSize = 1;
executionParams->targetPoolSize = 1;
}
DistributedExecution *execution =
CreateDistributedExecution(modLevel, remoteTaskList, hasReturning, paramListInfo,
tupleDescriptor, tupleStore, targetPoolSize,
xactProperties, jobIdList);
CreateDistributedExecution(
executionParams->modLevel, remoteTaskList,
executionParams->hasReturning, paramListInfo,
executionParams->tupleDescriptor, executionParams->tupleStore,
executionParams->targetPoolSize, &executionParams->xactProperties,
executionParams->jobIdList);
StartDistributedExecution(execution);
RunDistributedExecution(execution);
@ -1002,6 +1001,31 @@ ExecuteTaskListExtended(RowModifyLevel modLevel, List *taskList,
}
/*
* CreateBasicExecutionParams creates basic execution parameters with some common
* fields.
*/
ExecutionParams *
CreateBasicExecutionParams(RowModifyLevel modLevel,
List *taskList,
int targetPoolSize,
bool localExecutionSupported)
{
ExecutionParams *executionParams = palloc0(sizeof(ExecutionParams));
executionParams->modLevel = modLevel;
executionParams->taskList = taskList;
executionParams->targetPoolSize = targetPoolSize;
executionParams->localExecutionSupported = localExecutionSupported;
executionParams->tupleStore = NULL;
executionParams->tupleDescriptor = NULL;
executionParams->hasReturning = false;
executionParams->jobIdList = NIL;
return executionParams;
}
/*
* CreateDistributedExecution creates a distributed execution data structure for
* a distributed plan.

View File

@ -422,9 +422,15 @@ ExecuteSelectTasksIntoTupleStore(List *taskList, TupleDesc resultDescriptor,
* query string for the local placement.
*/
bool localExecutionSupported = false;
ExecuteTaskListExtended(ROW_MODIFY_READONLY, taskList, resultDescriptor,
resultStore, hasReturning, targetPoolSize, &xactProperties,
NIL, localExecutionSupported);
ExecutionParams *executionParams = CreateBasicExecutionParams(
ROW_MODIFY_READONLY, taskList, targetPoolSize, localExecutionSupported
);
executionParams->tupleDescriptor = resultDescriptor;
executionParams->tupleStore = resultStore;
executionParams->xactProperties = xactProperties;
executionParams->hasReturning = hasReturning;
ExecuteTaskListExtended(executionParams);
return resultStore;
}

View File

@ -74,13 +74,48 @@ extern void CitusExecutorRun(QueryDesc *queryDesc, ScanDirection direction, uint
bool execute_once);
extern void AdaptiveExecutorPreExecutorRun(CitusScanState *scanState);
extern TupleTableSlot * AdaptiveExecutor(CitusScanState *scanState);
extern uint64 ExecuteTaskListExtended(RowModifyLevel modLevel, List *taskList,
TupleDesc tupleDescriptor,
Tuplestorestate *tupleStore,
bool hasReturning, int targetPoolSize,
TransactionProperties *xactProperties,
List *jobIdList,
bool localExecutionSupported);
/*
* ExecutionParams contains parameters that are used during the execution.
* Some of these can be the zero value if it is not needed during the execution.
*/
typedef struct ExecutionParams
{
/* modLevel is the access level for rows.*/
RowModifyLevel modLevel;
/* taskList contains the tasks for the execution.*/
List *taskList;
/* tupleDescriptor contains the description for the result tuples.*/
TupleDesc tupleDescriptor;
/* tupleStore is where the results will be stored for this execution */
Tuplestorestate *tupleStore;
/* hasReturning is true if this execution will return some result. */
bool hasReturning;
/* targetPoolSize is the maximum amount of connections per worker */
int targetPoolSize;
/* xactProperties contains properties for transactions, such as if we should use 2pc. */
TransactionProperties xactProperties;
/* jobIdList contains all job ids for the execution */
List *jobIdList;
/* localExecutionSupported is true if we can use local execution, if it is false
* local execution will not be used. */
bool localExecutionSupported;
} ExecutionParams;
ExecutionParams * CreateBasicExecutionParams(RowModifyLevel modLevel,
List *taskList,
int targetPoolSize,
bool localExecutionSupported);
extern uint64 ExecuteTaskListExtended(ExecutionParams *executionParams);
extern uint64 ExecuteTaskListIntoTupleStore(RowModifyLevel modLevel, List *taskList,
TupleDesc tupleDescriptor,
Tuplestorestate *tupleStore,