From ecad4aa5e6fab8077df16dda4c794b1be93c05bb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Mon, 3 Feb 2020 22:08:08 +0000 Subject: [PATCH] Fill in jobIdList field of DistributedExecution Pass down jobIdList from ExecuteTasksInDependencyOrder Also clean up comment for ExecuteTaskListOutsideTransaction --- src/backend/distributed/commands/call.c | 2 +- .../distributed/executor/adaptive_executor.c | 36 ++++++++++--------- .../directed_acyclic_graph_execution.c | 4 +-- .../distributed_intermediate_results.c | 3 +- .../executor/repartition_join_execution.c | 2 +- src/include/distributed/adaptive_executor.h | 7 ++-- .../directed_acyclic_graph_execution.h | 3 +- src/include/distributed/multi_executor.h | 3 +- 8 files changed, 33 insertions(+), 27 deletions(-) diff --git a/src/backend/distributed/commands/call.c b/src/backend/distributed/commands/call.c index d9d25c4c0..c6187da75 100644 --- a/src/backend/distributed/commands/call.c +++ b/src/backend/distributed/commands/call.c @@ -189,7 +189,7 @@ CallFuncExprRemotely(CallStmt *callStmt, DistObjectCacheEntry *procedure, ExecuteTaskListExtended(ROW_MODIFY_NONE, list_make1(task), tupleDesc, tupleStore, hasReturning, MaxAdaptiveExecutorPoolSize, - &xactProperties); + &xactProperties, NIL); while (tuplestore_gettupleslot(tupleStore, true, false, slot)) { diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index 16f5fc15a..415c26d2a 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -543,14 +543,15 @@ typedef struct TaskPlacementExecution /* local functions */ static DistributedExecution * CreateDistributedExecution(RowModifyLevel modLevel, - List *taskList, bool - hasReturning, + List *taskList, + bool hasReturning, ParamListInfo paramListInfo, TupleDesc tupleDescriptor, Tuplestorestate *tupleStore, int targetPoolSize, TransactionProperties * - xactProperties); + xactProperties, + List *jobIdList); static TransactionProperties DecideTransactionPropertiesForTaskList(RowModifyLevel modLevel, List *taskList, @@ -680,7 +681,8 @@ AdaptiveExecutor(CitusScanState *scanState) tupleDescriptor, scanState->tuplestorestate, targetPoolSize, - &xactProperties); + &xactProperties, + jobIdList); /* * Make sure that we acquire the appropriate locks even if the local tasks @@ -803,24 +805,24 @@ ExecuteUtilityTaskListWithoutResults(List *taskList) /* - * ExecuteTaskListRepartiton is a proxy to ExecuteTaskListExtended() with defaults - * for some of the arguments for a repartition query. + * ExecuteTaskListOutsideTransaction is a proxy to ExecuteTaskListExtended + * with defaults for some of the arguments. */ uint64 -ExecuteTaskListOutsideTransaction(RowModifyLevel modLevel, List *taskList, int - targetPoolSize) +ExecuteTaskListOutsideTransaction(RowModifyLevel modLevel, List *taskList, + int targetPoolSize, List *jobIdList) { TupleDesc tupleDescriptor = NULL; Tuplestorestate *tupleStore = NULL; bool hasReturning = false; - TransactionProperties xactProperties = DecideTransactionPropertiesForTaskList( - modLevel, taskList, true); + TransactionProperties xactProperties = + DecideTransactionPropertiesForTaskList(modLevel, taskList, true); return ExecuteTaskListExtended(modLevel, taskList, tupleDescriptor, tupleStore, hasReturning, targetPoolSize, - &xactProperties); + &xactProperties, jobIdList); } @@ -840,7 +842,7 @@ ExecuteTaskList(RowModifyLevel modLevel, List *taskList, int targetPoolSize) return ExecuteTaskListExtended(modLevel, taskList, tupleDescriptor, tupleStore, hasReturning, targetPoolSize, - &xactProperties); + &xactProperties, NIL); } @@ -860,7 +862,7 @@ ExecuteTaskListIntoTupleStore(RowModifyLevel modLevel, List *taskList, return ExecuteTaskListExtended(modLevel, taskList, tupleDescriptor, tupleStore, hasReturning, targetPoolSize, - &xactProperties); + &xactProperties, NIL); } @@ -872,7 +874,7 @@ uint64 ExecuteTaskListExtended(RowModifyLevel modLevel, List *taskList, TupleDesc tupleDescriptor, Tuplestorestate *tupleStore, bool hasReturning, int targetPoolSize, - TransactionProperties *xactProperties) + TransactionProperties *xactProperties, List *jobIdList) { ParamListInfo paramListInfo = NULL; @@ -890,7 +892,7 @@ ExecuteTaskListExtended(RowModifyLevel modLevel, List *taskList, DistributedExecution *execution = CreateDistributedExecution(modLevel, taskList, hasReturning, paramListInfo, tupleDescriptor, tupleStore, targetPoolSize, - xactProperties); + xactProperties, jobIdList); StartDistributedExecution(execution); RunDistributedExecution(execution); @@ -909,7 +911,7 @@ CreateDistributedExecution(RowModifyLevel modLevel, List *taskList, bool hasReturning, ParamListInfo paramListInfo, TupleDesc tupleDescriptor, Tuplestorestate *tupleStore, int targetPoolSize, - TransactionProperties *xactProperties) + TransactionProperties *xactProperties, List *jobIdList) { DistributedExecution *execution = (DistributedExecution *) palloc0(sizeof(DistributedExecution)); @@ -941,6 +943,8 @@ CreateDistributedExecution(RowModifyLevel modLevel, List *taskList, execution->connectionSetChanged = false; execution->waitFlagsChanged = false; + execution->jobIdList = jobIdList; + /* allocate execution specific data once, on the ExecutorState memory context */ if (tupleDescriptor != NULL) { diff --git a/src/backend/distributed/executor/directed_acyclic_graph_execution.c b/src/backend/distributed/executor/directed_acyclic_graph_execution.c index d4aeb6659..9c177fa95 100644 --- a/src/backend/distributed/executor/directed_acyclic_graph_execution.c +++ b/src/backend/distributed/executor/directed_acyclic_graph_execution.c @@ -51,7 +51,7 @@ static bool IsTaskAlreadyCompleted(Task *task, HTAB *completedTasks); * execute all of them in parallel. The parallelism is bound by MaxAdaptiveExecutorPoolSize. */ void -ExecuteTasksInDependencyOrder(List *allTasks, List *excludedTasks) +ExecuteTasksInDependencyOrder(List *allTasks, List *excludedTasks, List *jobIds) { HTAB *completedTasks = CreateTaskHashTable(); @@ -66,7 +66,7 @@ ExecuteTasksInDependencyOrder(List *allTasks, List *excludedTasks) break; } ExecuteTaskListOutsideTransaction(ROW_MODIFY_NONE, curTasks, - MaxAdaptiveExecutorPoolSize); + MaxAdaptiveExecutorPoolSize, jobIds); AddCompletedTasks(curTasks, completedTasks); curTasks = NIL; diff --git a/src/backend/distributed/executor/distributed_intermediate_results.c b/src/backend/distributed/executor/distributed_intermediate_results.c index addef3b2a..777e2436c 100644 --- a/src/backend/distributed/executor/distributed_intermediate_results.c +++ b/src/backend/distributed/executor/distributed_intermediate_results.c @@ -413,7 +413,8 @@ ExecuteSelectTasksIntoTupleStore(List *taskList, TupleDesc resultDescriptor, work_mem); ExecuteTaskListExtended(ROW_MODIFY_READONLY, taskList, resultDescriptor, - resultStore, hasReturning, targetPoolSize, &xactProperties); + resultStore, hasReturning, targetPoolSize, &xactProperties, + NIL); return resultStore; } diff --git a/src/backend/distributed/executor/repartition_join_execution.c b/src/backend/distributed/executor/repartition_join_execution.c index 556acba7b..66a15baa9 100644 --- a/src/backend/distributed/executor/repartition_join_execution.c +++ b/src/backend/distributed/executor/repartition_join_execution.c @@ -68,7 +68,7 @@ ExecuteDependentTasks(List *topLevelTasks, Job *topLevelJob) List *jobIds = CreateTemporarySchemasForMergeTasks(topLevelJob); - ExecuteTasksInDependencyOrder(allTasks, topLevelTasks); + ExecuteTasksInDependencyOrder(allTasks, topLevelTasks, jobIds); return jobIds; } diff --git a/src/include/distributed/adaptive_executor.h b/src/include/distributed/adaptive_executor.h index c9d871839..b55271c4d 100644 --- a/src/include/distributed/adaptive_executor.h +++ b/src/include/distributed/adaptive_executor.h @@ -10,11 +10,10 @@ extern int MaxAdaptiveExecutorPoolSize; /* GUC, number of ms to wait between opening connections to the same worker */ extern int ExecutorSlowStartInterval; -extern uint64 ExecuteTaskList(RowModifyLevel modLevel, List *taskList, int - targetPoolSize); +extern uint64 ExecuteTaskList(RowModifyLevel modLevel, List *taskList, + int targetPoolSize); extern uint64 ExecuteTaskListOutsideTransaction(RowModifyLevel modLevel, List *taskList, - int - targetPoolSize); + int targetPoolSize, List *jobIdList); #endif /* ADAPTIVE_EXECUTOR_H */ diff --git a/src/include/distributed/directed_acyclic_graph_execution.h b/src/include/distributed/directed_acyclic_graph_execution.h index bf6b74e6c..a071baf16 100644 --- a/src/include/distributed/directed_acyclic_graph_execution.h +++ b/src/include/distributed/directed_acyclic_graph_execution.h @@ -14,7 +14,8 @@ #include "nodes/pg_list.h" -extern void ExecuteTasksInDependencyOrder(List *allTasks, List *excludedTasks); +extern void ExecuteTasksInDependencyOrder(List *allTasks, List *excludedTasks, + List *jobIds); #endif /* DIRECTED_ACYCLIC_GRAPH_EXECUTION_H */ diff --git a/src/include/distributed/multi_executor.h b/src/include/distributed/multi_executor.h index ce1fc51b3..6099b8356 100644 --- a/src/include/distributed/multi_executor.h +++ b/src/include/distributed/multi_executor.h @@ -77,7 +77,8 @@ extern uint64 ExecuteTaskListExtended(RowModifyLevel modLevel, List *taskList, TupleDesc tupleDescriptor, Tuplestorestate *tupleStore, bool hasReturning, int targetPoolSize, - TransactionProperties *xactProperties); + TransactionProperties *xactProperties, + List *jobIdList); extern uint64 ExecuteTaskListIntoTupleStore(RowModifyLevel modLevel, List *taskList, TupleDesc tupleDescriptor, Tuplestorestate *tupleStore,