From 132efdbc56501f374a9831b01f30d7bf29083ede Mon Sep 17 00:00:00 2001 From: SaitTalhaNisanci Date: Tue, 14 Apr 2020 14:32:40 +0300 Subject: [PATCH] add execution params struct (#3747) We had 9+ parameters in some of the functions related to execution. Execution params is created to simplify this a bit so that we can set only the fields that we are interested in and it is easier to read. --- src/backend/distributed/commands/call.c | 13 ++- .../distributed/executor/adaptive_executor.c | 108 +++++++++++------- .../distributed_intermediate_results.c | 12 +- src/include/distributed/multi_executor.h | 49 ++++++-- 4 files changed, 126 insertions(+), 56 deletions(-) diff --git a/src/backend/distributed/commands/call.c b/src/backend/distributed/commands/call.c index 45f34bd7b..d50988f8e 100644 --- a/src/backend/distributed/commands/call.c +++ b/src/backend/distributed/commands/call.c @@ -187,10 +187,15 @@ CallFuncExprRemotely(CallStmt *callStmt, DistObjectCacheEntry *procedure, }; bool localExecutionSupported = true; - ExecuteTaskListExtended(ROW_MODIFY_NONE, list_make1(task), - tupleDesc, tupleStore, hasReturning, - MaxAdaptiveExecutorPoolSize, - &xactProperties, NIL, localExecutionSupported); + ExecutionParams *executionParams = CreateBasicExecutionParams( + ROW_MODIFY_NONE, list_make1(task), MaxAdaptiveExecutorPoolSize, + localExecutionSupported + ); + executionParams->tupleStore = tupleStore; + executionParams->tupleDescriptor = tupleDesc; + executionParams->hasReturning = hasReturning; + executionParams->xactProperties = xactProperties; + ExecuteTaskListExtended(executionParams); while (tuplestore_gettupleslot(tupleStore, true, false, slot)) { diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index 10d411bea..479048bcf 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -881,13 +881,6 @@ uint64 ExecuteTaskListOutsideTransaction(RowModifyLevel modLevel, List *taskList, int targetPoolSize, List *jobIdList) { - TupleDesc tupleDescriptor = NULL; - Tuplestorestate *tupleStore = NULL; - bool hasReturning = false; - - TransactionProperties xactProperties = - DecideTransactionPropertiesForTaskList(modLevel, taskList, true); - /* * As we are going to run the tasks outside transaction, we shouldn't use local execution. * However, there is some problem when using local execution related to @@ -895,9 +888,13 @@ ExecuteTaskListOutsideTransaction(RowModifyLevel modLevel, List *taskList, * coming to this path with local execution. See PR:3711 */ bool localExecutionSupported = false; - return ExecuteTaskListExtended(modLevel, taskList, tupleDescriptor, - tupleStore, hasReturning, targetPoolSize, - &xactProperties, jobIdList, localExecutionSupported); + ExecutionParams *executionParams = CreateBasicExecutionParams( + modLevel, taskList, targetPoolSize, localExecutionSupported + ); + + executionParams->xactProperties = DecideTransactionPropertiesForTaskList( + modLevel, taskList, true); + return ExecuteTaskListExtended(executionParams); } @@ -906,19 +903,15 @@ ExecuteTaskListOutsideTransaction(RowModifyLevel modLevel, List *taskList, * for some of the arguments. */ uint64 -ExecuteTaskList(RowModifyLevel modLevel, List *taskList, int targetPoolSize, bool - localExecutionSupported) +ExecuteTaskList(RowModifyLevel modLevel, List *taskList, + int targetPoolSize, bool localExecutionSupported) { - TupleDesc tupleDescriptor = NULL; - Tuplestorestate *tupleStore = NULL; - bool hasReturning = false; - - TransactionProperties xactProperties = - DecideTransactionPropertiesForTaskList(modLevel, taskList, false); - - return ExecuteTaskListExtended(modLevel, taskList, tupleDescriptor, - tupleStore, hasReturning, targetPoolSize, - &xactProperties, NIL, localExecutionSupported); + ExecutionParams *executionParams = CreateBasicExecutionParams( + modLevel, taskList, targetPoolSize, localExecutionSupported + ); + executionParams->xactProperties = DecideTransactionPropertiesForTaskList( + modLevel, taskList, false); + return ExecuteTaskListExtended(executionParams); } @@ -932,13 +925,18 @@ ExecuteTaskListIntoTupleStore(RowModifyLevel modLevel, List *taskList, bool hasReturning) { int targetPoolSize = MaxAdaptiveExecutorPoolSize; - - TransactionProperties xactProperties = DecideTransactionPropertiesForTaskList( - modLevel, taskList, false); bool localExecutionSupported = true; - return ExecuteTaskListExtended(modLevel, taskList, tupleDescriptor, - tupleStore, hasReturning, targetPoolSize, - &xactProperties, NIL, localExecutionSupported); + ExecutionParams *executionParams = CreateBasicExecutionParams( + modLevel, taskList, targetPoolSize, localExecutionSupported + ); + + executionParams->xactProperties = DecideTransactionPropertiesForTaskList( + modLevel, taskList, false); + executionParams->hasReturning = hasReturning; + executionParams->tupleStore = tupleStore; + executionParams->tupleDescriptor = tupleDescriptor; + + return ExecuteTaskListExtended(executionParams); } @@ -947,27 +945,24 @@ ExecuteTaskListIntoTupleStore(RowModifyLevel modLevel, List *taskList, * runs it. */ uint64 -ExecuteTaskListExtended(RowModifyLevel modLevel, List *taskList, - TupleDesc tupleDescriptor, Tuplestorestate *tupleStore, - bool hasReturning, int targetPoolSize, - TransactionProperties *xactProperties, - List *jobIdList, - bool localExecutionSupported) +ExecuteTaskListExtended(ExecutionParams *executionParams) { ParamListInfo paramListInfo = NULL; uint64 locallyProcessedRows = 0; List *localTaskList = NIL; List *remoteTaskList = NIL; - if (localExecutionSupported && ShouldExecuteTasksLocally(taskList)) + if (executionParams->localExecutionSupported && ShouldExecuteTasksLocally( + executionParams->taskList)) { bool readOnlyPlan = false; - ExtractLocalAndRemoteTasks(readOnlyPlan, taskList, &localTaskList, + ExtractLocalAndRemoteTasks(readOnlyPlan, executionParams->taskList, + &localTaskList, &remoteTaskList); } else { - remoteTaskList = taskList; + remoteTaskList = executionParams->taskList; } /* @@ -982,17 +977,21 @@ ExecuteTaskListExtended(RowModifyLevel modLevel, List *taskList, ErrorIfTransactionAccessedPlacementsLocally(); } - locallyProcessedRows += ExecuteLocalTaskList(localTaskList, tupleStore); + locallyProcessedRows += ExecuteLocalTaskList(localTaskList, + executionParams->tupleStore); if (MultiShardConnectionType == SEQUENTIAL_CONNECTION) { - targetPoolSize = 1; + executionParams->targetPoolSize = 1; } DistributedExecution *execution = - CreateDistributedExecution(modLevel, remoteTaskList, hasReturning, paramListInfo, - tupleDescriptor, tupleStore, targetPoolSize, - xactProperties, jobIdList); + CreateDistributedExecution( + executionParams->modLevel, remoteTaskList, + executionParams->hasReturning, paramListInfo, + executionParams->tupleDescriptor, executionParams->tupleStore, + executionParams->targetPoolSize, &executionParams->xactProperties, + executionParams->jobIdList); StartDistributedExecution(execution); RunDistributedExecution(execution); @@ -1002,6 +1001,31 @@ ExecuteTaskListExtended(RowModifyLevel modLevel, List *taskList, } +/* + * CreateBasicExecutionParams creates basic execution parameters with some common + * fields. + */ +ExecutionParams * +CreateBasicExecutionParams(RowModifyLevel modLevel, + List *taskList, + int targetPoolSize, + bool localExecutionSupported) +{ + ExecutionParams *executionParams = palloc0(sizeof(ExecutionParams)); + executionParams->modLevel = modLevel; + executionParams->taskList = taskList; + executionParams->targetPoolSize = targetPoolSize; + executionParams->localExecutionSupported = localExecutionSupported; + + executionParams->tupleStore = NULL; + executionParams->tupleDescriptor = NULL; + executionParams->hasReturning = false; + executionParams->jobIdList = NIL; + + return executionParams; +} + + /* * CreateDistributedExecution creates a distributed execution data structure for * a distributed plan. diff --git a/src/backend/distributed/executor/distributed_intermediate_results.c b/src/backend/distributed/executor/distributed_intermediate_results.c index 44e4522a2..0392880b8 100644 --- a/src/backend/distributed/executor/distributed_intermediate_results.c +++ b/src/backend/distributed/executor/distributed_intermediate_results.c @@ -422,9 +422,15 @@ ExecuteSelectTasksIntoTupleStore(List *taskList, TupleDesc resultDescriptor, * query string for the local placement. */ bool localExecutionSupported = false; - ExecuteTaskListExtended(ROW_MODIFY_READONLY, taskList, resultDescriptor, - resultStore, hasReturning, targetPoolSize, &xactProperties, - NIL, localExecutionSupported); + ExecutionParams *executionParams = CreateBasicExecutionParams( + ROW_MODIFY_READONLY, taskList, targetPoolSize, localExecutionSupported + ); + executionParams->tupleDescriptor = resultDescriptor; + executionParams->tupleStore = resultStore; + executionParams->xactProperties = xactProperties; + executionParams->hasReturning = hasReturning; + + ExecuteTaskListExtended(executionParams); return resultStore; } diff --git a/src/include/distributed/multi_executor.h b/src/include/distributed/multi_executor.h index 50bb8c53b..a2755e640 100644 --- a/src/include/distributed/multi_executor.h +++ b/src/include/distributed/multi_executor.h @@ -74,13 +74,48 @@ extern void CitusExecutorRun(QueryDesc *queryDesc, ScanDirection direction, uint bool execute_once); extern void AdaptiveExecutorPreExecutorRun(CitusScanState *scanState); extern TupleTableSlot * AdaptiveExecutor(CitusScanState *scanState); -extern uint64 ExecuteTaskListExtended(RowModifyLevel modLevel, List *taskList, - TupleDesc tupleDescriptor, - Tuplestorestate *tupleStore, - bool hasReturning, int targetPoolSize, - TransactionProperties *xactProperties, - List *jobIdList, - bool localExecutionSupported); + +/* + * ExecutionParams contains parameters that are used during the execution. + * Some of these can be the zero value if it is not needed during the execution. + */ +typedef struct ExecutionParams +{ + /* modLevel is the access level for rows.*/ + RowModifyLevel modLevel; + + /* taskList contains the tasks for the execution.*/ + List *taskList; + + /* tupleDescriptor contains the description for the result tuples.*/ + TupleDesc tupleDescriptor; + + /* tupleStore is where the results will be stored for this execution */ + Tuplestorestate *tupleStore; + + /* hasReturning is true if this execution will return some result. */ + bool hasReturning; + + /* targetPoolSize is the maximum amount of connections per worker */ + int targetPoolSize; + + /* xactProperties contains properties for transactions, such as if we should use 2pc. */ + TransactionProperties xactProperties; + + /* jobIdList contains all job ids for the execution */ + List *jobIdList; + + /* localExecutionSupported is true if we can use local execution, if it is false + * local execution will not be used. */ + bool localExecutionSupported; +} ExecutionParams; + +ExecutionParams * CreateBasicExecutionParams(RowModifyLevel modLevel, + List *taskList, + int targetPoolSize, + bool localExecutionSupported); + +extern uint64 ExecuteTaskListExtended(ExecutionParams *executionParams); extern uint64 ExecuteTaskListIntoTupleStore(RowModifyLevel modLevel, List *taskList, TupleDesc tupleDescriptor, Tuplestorestate *tupleStore,