Fill in jobIdList field of DistributedExecution

Pass down jobIdList from ExecuteTasksInDependencyOrder

Also clean up comment for ExecuteTaskListOutsideTransaction
pull/3461/head
Philip Dubé 2020-02-03 22:08:08 +00:00
parent c252811884
commit ecad4aa5e6
8 changed files with 33 additions and 27 deletions

View File

@ -189,7 +189,7 @@ CallFuncExprRemotely(CallStmt *callStmt, DistObjectCacheEntry *procedure,
ExecuteTaskListExtended(ROW_MODIFY_NONE, list_make1(task), ExecuteTaskListExtended(ROW_MODIFY_NONE, list_make1(task),
tupleDesc, tupleStore, hasReturning, tupleDesc, tupleStore, hasReturning,
MaxAdaptiveExecutorPoolSize, MaxAdaptiveExecutorPoolSize,
&xactProperties); &xactProperties, NIL);
while (tuplestore_gettupleslot(tupleStore, true, false, slot)) while (tuplestore_gettupleslot(tupleStore, true, false, slot))
{ {

View File

@ -543,14 +543,15 @@ typedef struct TaskPlacementExecution
/* local functions */ /* local functions */
static DistributedExecution * CreateDistributedExecution(RowModifyLevel modLevel, static DistributedExecution * CreateDistributedExecution(RowModifyLevel modLevel,
List *taskList, bool List *taskList,
hasReturning, bool hasReturning,
ParamListInfo paramListInfo, ParamListInfo paramListInfo,
TupleDesc tupleDescriptor, TupleDesc tupleDescriptor,
Tuplestorestate *tupleStore, Tuplestorestate *tupleStore,
int targetPoolSize, int targetPoolSize,
TransactionProperties * TransactionProperties *
xactProperties); xactProperties,
List *jobIdList);
static TransactionProperties DecideTransactionPropertiesForTaskList(RowModifyLevel static TransactionProperties DecideTransactionPropertiesForTaskList(RowModifyLevel
modLevel, modLevel,
List *taskList, List *taskList,
@ -680,7 +681,8 @@ AdaptiveExecutor(CitusScanState *scanState)
tupleDescriptor, tupleDescriptor,
scanState->tuplestorestate, scanState->tuplestorestate,
targetPoolSize, targetPoolSize,
&xactProperties); &xactProperties,
jobIdList);
/* /*
* Make sure that we acquire the appropriate locks even if the local tasks * Make sure that we acquire the appropriate locks even if the local tasks
@ -803,24 +805,24 @@ ExecuteUtilityTaskListWithoutResults(List *taskList)
/* /*
* ExecuteTaskListRepartiton is a proxy to ExecuteTaskListExtended() with defaults * ExecuteTaskListOutsideTransaction is a proxy to ExecuteTaskListExtended
* for some of the arguments for a repartition query. * with defaults for some of the arguments.
*/ */
uint64 uint64
ExecuteTaskListOutsideTransaction(RowModifyLevel modLevel, List *taskList, int ExecuteTaskListOutsideTransaction(RowModifyLevel modLevel, List *taskList,
targetPoolSize) int targetPoolSize, List *jobIdList)
{ {
TupleDesc tupleDescriptor = NULL; TupleDesc tupleDescriptor = NULL;
Tuplestorestate *tupleStore = NULL; Tuplestorestate *tupleStore = NULL;
bool hasReturning = false; bool hasReturning = false;
TransactionProperties xactProperties = DecideTransactionPropertiesForTaskList( TransactionProperties xactProperties =
modLevel, taskList, true); DecideTransactionPropertiesForTaskList(modLevel, taskList, true);
return ExecuteTaskListExtended(modLevel, taskList, tupleDescriptor, return ExecuteTaskListExtended(modLevel, taskList, tupleDescriptor,
tupleStore, hasReturning, targetPoolSize, tupleStore, hasReturning, targetPoolSize,
&xactProperties); &xactProperties, jobIdList);
} }
@ -840,7 +842,7 @@ ExecuteTaskList(RowModifyLevel modLevel, List *taskList, int targetPoolSize)
return ExecuteTaskListExtended(modLevel, taskList, tupleDescriptor, return ExecuteTaskListExtended(modLevel, taskList, tupleDescriptor,
tupleStore, hasReturning, targetPoolSize, tupleStore, hasReturning, targetPoolSize,
&xactProperties); &xactProperties, NIL);
} }
@ -860,7 +862,7 @@ ExecuteTaskListIntoTupleStore(RowModifyLevel modLevel, List *taskList,
return ExecuteTaskListExtended(modLevel, taskList, tupleDescriptor, return ExecuteTaskListExtended(modLevel, taskList, tupleDescriptor,
tupleStore, hasReturning, targetPoolSize, tupleStore, hasReturning, targetPoolSize,
&xactProperties); &xactProperties, NIL);
} }
@ -872,7 +874,7 @@ uint64
ExecuteTaskListExtended(RowModifyLevel modLevel, List *taskList, ExecuteTaskListExtended(RowModifyLevel modLevel, List *taskList,
TupleDesc tupleDescriptor, Tuplestorestate *tupleStore, TupleDesc tupleDescriptor, Tuplestorestate *tupleStore,
bool hasReturning, int targetPoolSize, bool hasReturning, int targetPoolSize,
TransactionProperties *xactProperties) TransactionProperties *xactProperties, List *jobIdList)
{ {
ParamListInfo paramListInfo = NULL; ParamListInfo paramListInfo = NULL;
@ -890,7 +892,7 @@ ExecuteTaskListExtended(RowModifyLevel modLevel, List *taskList,
DistributedExecution *execution = DistributedExecution *execution =
CreateDistributedExecution(modLevel, taskList, hasReturning, paramListInfo, CreateDistributedExecution(modLevel, taskList, hasReturning, paramListInfo,
tupleDescriptor, tupleStore, targetPoolSize, tupleDescriptor, tupleStore, targetPoolSize,
xactProperties); xactProperties, jobIdList);
StartDistributedExecution(execution); StartDistributedExecution(execution);
RunDistributedExecution(execution); RunDistributedExecution(execution);
@ -909,7 +911,7 @@ CreateDistributedExecution(RowModifyLevel modLevel, List *taskList,
bool hasReturning, bool hasReturning,
ParamListInfo paramListInfo, TupleDesc tupleDescriptor, ParamListInfo paramListInfo, TupleDesc tupleDescriptor,
Tuplestorestate *tupleStore, int targetPoolSize, Tuplestorestate *tupleStore, int targetPoolSize,
TransactionProperties *xactProperties) TransactionProperties *xactProperties, List *jobIdList)
{ {
DistributedExecution *execution = DistributedExecution *execution =
(DistributedExecution *) palloc0(sizeof(DistributedExecution)); (DistributedExecution *) palloc0(sizeof(DistributedExecution));
@ -941,6 +943,8 @@ CreateDistributedExecution(RowModifyLevel modLevel, List *taskList,
execution->connectionSetChanged = false; execution->connectionSetChanged = false;
execution->waitFlagsChanged = false; execution->waitFlagsChanged = false;
execution->jobIdList = jobIdList;
/* allocate execution specific data once, on the ExecutorState memory context */ /* allocate execution specific data once, on the ExecutorState memory context */
if (tupleDescriptor != NULL) if (tupleDescriptor != NULL)
{ {

View File

@ -51,7 +51,7 @@ static bool IsTaskAlreadyCompleted(Task *task, HTAB *completedTasks);
* execute all of them in parallel. The parallelism is bound by MaxAdaptiveExecutorPoolSize. * execute all of them in parallel. The parallelism is bound by MaxAdaptiveExecutorPoolSize.
*/ */
void void
ExecuteTasksInDependencyOrder(List *allTasks, List *excludedTasks) ExecuteTasksInDependencyOrder(List *allTasks, List *excludedTasks, List *jobIds)
{ {
HTAB *completedTasks = CreateTaskHashTable(); HTAB *completedTasks = CreateTaskHashTable();
@ -66,7 +66,7 @@ ExecuteTasksInDependencyOrder(List *allTasks, List *excludedTasks)
break; break;
} }
ExecuteTaskListOutsideTransaction(ROW_MODIFY_NONE, curTasks, ExecuteTaskListOutsideTransaction(ROW_MODIFY_NONE, curTasks,
MaxAdaptiveExecutorPoolSize); MaxAdaptiveExecutorPoolSize, jobIds);
AddCompletedTasks(curTasks, completedTasks); AddCompletedTasks(curTasks, completedTasks);
curTasks = NIL; curTasks = NIL;

View File

@ -413,7 +413,8 @@ ExecuteSelectTasksIntoTupleStore(List *taskList, TupleDesc resultDescriptor,
work_mem); work_mem);
ExecuteTaskListExtended(ROW_MODIFY_READONLY, taskList, resultDescriptor, ExecuteTaskListExtended(ROW_MODIFY_READONLY, taskList, resultDescriptor,
resultStore, hasReturning, targetPoolSize, &xactProperties); resultStore, hasReturning, targetPoolSize, &xactProperties,
NIL);
return resultStore; return resultStore;
} }

View File

@ -68,7 +68,7 @@ ExecuteDependentTasks(List *topLevelTasks, Job *topLevelJob)
List *jobIds = CreateTemporarySchemasForMergeTasks(topLevelJob); List *jobIds = CreateTemporarySchemasForMergeTasks(topLevelJob);
ExecuteTasksInDependencyOrder(allTasks, topLevelTasks); ExecuteTasksInDependencyOrder(allTasks, topLevelTasks, jobIds);
return jobIds; return jobIds;
} }

View File

@ -10,11 +10,10 @@ extern int MaxAdaptiveExecutorPoolSize;
/* GUC, number of ms to wait between opening connections to the same worker */ /* GUC, number of ms to wait between opening connections to the same worker */
extern int ExecutorSlowStartInterval; extern int ExecutorSlowStartInterval;
extern uint64 ExecuteTaskList(RowModifyLevel modLevel, List *taskList, int extern uint64 ExecuteTaskList(RowModifyLevel modLevel, List *taskList,
targetPoolSize); int targetPoolSize);
extern uint64 ExecuteTaskListOutsideTransaction(RowModifyLevel modLevel, List *taskList, extern uint64 ExecuteTaskListOutsideTransaction(RowModifyLevel modLevel, List *taskList,
int int targetPoolSize, List *jobIdList);
targetPoolSize);
#endif /* ADAPTIVE_EXECUTOR_H */ #endif /* ADAPTIVE_EXECUTOR_H */

View File

@ -14,7 +14,8 @@
#include "nodes/pg_list.h" #include "nodes/pg_list.h"
extern void ExecuteTasksInDependencyOrder(List *allTasks, List *excludedTasks); extern void ExecuteTasksInDependencyOrder(List *allTasks, List *excludedTasks,
List *jobIds);
#endif /* DIRECTED_ACYCLIC_GRAPH_EXECUTION_H */ #endif /* DIRECTED_ACYCLIC_GRAPH_EXECUTION_H */

View File

@ -77,7 +77,8 @@ extern uint64 ExecuteTaskListExtended(RowModifyLevel modLevel, List *taskList,
TupleDesc tupleDescriptor, TupleDesc tupleDescriptor,
Tuplestorestate *tupleStore, Tuplestorestate *tupleStore,
bool hasReturning, int targetPoolSize, bool hasReturning, int targetPoolSize,
TransactionProperties *xactProperties); TransactionProperties *xactProperties,
List *jobIdList);
extern uint64 ExecuteTaskListIntoTupleStore(RowModifyLevel modLevel, List *taskList, extern uint64 ExecuteTaskListIntoTupleStore(RowModifyLevel modLevel, List *taskList,
TupleDesc tupleDescriptor, TupleDesc tupleDescriptor,
Tuplestorestate *tupleStore, Tuplestorestate *tupleStore,