diff --git a/src/backend/distributed/executor/multi_task_tracker_executor.c b/src/backend/distributed/executor/multi_task_tracker_executor.c index a6482c1b8..a099e3232 100644 --- a/src/backend/distributed/executor/multi_task_tracker_executor.c +++ b/src/backend/distributed/executor/multi_task_tracker_executor.c @@ -185,7 +185,9 @@ MultiTaskTrackerExecute(Job *job) * We walk over the task tree, and create a task execution struct for each * task. We then associate the task with its execution and get back a list. */ - List *taskAndExecutionList = TaskAndExecutionList(jobTaskList); + bool createTaskExecution = true; + List *taskAndExecutionList = CreateTaskListForJobTree(jobTaskList, + createTaskExecution); /* * We now count the number of "top level" tasks in the query tree. Once they @@ -446,13 +448,14 @@ MultiTaskTrackerExecute(Job *job) /* - * TaskAndExecutionList visits all tasks in the job tree, starting with the given - * job's task list. For each visited task, the function creates a task execution - * struct, associates the task execution with the task, and adds the task and its - * execution to a list. The function then returns the list. + * CreateTaskListForJobTree visits all tasks in the job tree (by following dependentTaskList), + * starting with the given job's task list. + * If createTaskExecution is set to true, for each visited task, + * the function creates a task execution struct associates the task execution with the task, + * and adds the task and its execution to a list. The function then returns the list. */ List * -TaskAndExecutionList(List *jobTaskList) +CreateTaskListForJobTree(List *jobTaskList, bool createTaskExecution) { List *taskAndExecutionList = NIL; const int topLevelTaskHashSize = 32; @@ -470,9 +473,15 @@ TaskAndExecutionList(List *jobTaskList) Task *task = (Task *) linitial(taskQueue); taskQueue = list_delete_first(taskQueue); - /* create task execution and associate it with task */ - TaskExecution *taskExecution = InitTaskExecution(task, EXEC_TASK_UNASSIGNED); - task->taskExecution = taskExecution; + if (createTaskExecution) + { + MemoryContext oldContext = MemoryContextSwitchTo(GetMemoryChunkContext(task)); + + /* create task execution and associate it with task */ + TaskExecution *taskExecution = InitTaskExecution(task, EXEC_TASK_UNASSIGNED); + MemoryContextSwitchTo(oldContext); + task->taskExecution = taskExecution; + } taskAndExecutionList = lappend(taskAndExecutionList, task); diff --git a/src/backend/distributed/executor/repartition_join_execution.c b/src/backend/distributed/executor/repartition_join_execution.c index 242f5908a..0c5753213 100644 --- a/src/backend/distributed/executor/repartition_join_execution.c +++ b/src/backend/distributed/executor/repartition_join_execution.c @@ -63,7 +63,8 @@ ExecuteDependentTasks(List *topLevelTasks, Job *topLevelJob) { EnsureNoModificationsHaveBeenDone(); - List *allTasks = TaskAndExecutionList(topLevelTasks); + bool createTaskExecution = false; + List *allTasks = CreateTaskListForJobTree(topLevelTasks, createTaskExecution); EnsureCompatibleLocalExecutionState(allTasks); diff --git a/src/include/distributed/multi_task_tracker_executor.h b/src/include/distributed/multi_task_tracker_executor.h index 4f4d412ed..f496c48eb 100644 --- a/src/include/distributed/multi_task_tracker_executor.h +++ b/src/include/distributed/multi_task_tracker_executor.h @@ -3,6 +3,6 @@ #ifndef MULTI_TASK_TRACKER_EXECUTOR_H #define MULTI_TASK_TRACKER_EXECUTOR_H -extern List * TaskAndExecutionList(List *jobTaskList); +extern List * CreateTaskListForJobTree(List *jobTaskList, bool viaTaskTracker); #endif /* MULTI_TASK_TRACKER_EXECUTOR_H */ diff --git a/src/test/regress/expected/chbenchmark_all_queries.out b/src/test/regress/expected/chbenchmark_all_queries.out index 32e60b324..b0a1a775f 100644 --- a/src/test/regress/expected/chbenchmark_all_queries.out +++ b/src/test/regress/expected/chbenchmark_all_queries.out @@ -864,6 +864,70 @@ LOG: join order: [ "stock" ][ reference join "item" ][ dual partition join "ord abc | def (1 row) +\set default_analyze_flags '(ANALYZE on, COSTS off, TIMING off, SUMMARY off)' +EXPLAIN :default_analyze_flags SELECT + su_name, + su_address +FROM + supplier, + nation +WHERE su_suppkey in + (SELECT + mod(s_i_id * s_w_id, 10000) + FROM + stock, + order_line + WHERE s_i_id IN + (SELECT i_id + FROM item + WHERE i_data LIKE 'co%') + AND ol_i_id = s_i_id + AND ol_delivery_d > '2008-05-23 12:00:00' -- was 2010, but our order is in 2008 + GROUP BY s_i_id, s_w_id, s_quantity + HAVING 2*s_quantity > sum(ol_quantity)) + AND su_nationkey = n_nationkey + AND n_name = 'Germany' +ORDER BY su_name; +LOG: join order: [ "stock" ][ reference join "item" ][ dual partition join "order_line" ] + QUERY PLAN +--------------------------------------------------------------------- + Custom Scan (Citus Adaptive) (actual rows=1 loops=1) + -> Distributed Subplan XXX_1 + Intermediate Data Size: 30 bytes + Result destination: Send to 2 nodes + -> Custom Scan (Citus Adaptive) (actual rows=3 loops=1) + Task Count: 4 + Tuple data received from nodes: 12 bytes + Tasks Shown: None, not supported for re-partition queries + -> MapMergeJob + Map Task Count: 4 + Merge Task Count: 4 + -> MapMergeJob + Map Task Count: 4 + Merge Task Count: 4 + Task Count: 1 + Tuple data received from nodes: 28 bytes + Tasks Shown: All + -> Task + Tuple data received from node: 28 bytes + Node: host=localhost port=xxxxx dbname=regression + -> Sort (actual rows=1 loops=1) + Sort Key: supplier.su_name + Sort Method: quicksort Memory: 25kB + -> Nested Loop Semi Join (actual rows=1 loops=1) + Join Filter: (supplier.su_suppkey = intermediate_result.mod) + Rows Removed by Join Filter: 1 + -> Hash Join (actual rows=1 loops=1) + Hash Cond: (supplier.su_nationkey = nation.n_nationkey) + -> Seq Scan on supplier_1650035 supplier (actual rows=11 loops=1) + -> Hash (actual rows=1 loops=1) + Buckets: 1024 Batches: 1 Memory Usage: 9kB + -> Seq Scan on nation_1650034 nation (actual rows=1 loops=1) + Filter: (n_name = 'Germany'::bpchar) + Rows Removed by Filter: 3 + -> Function Scan on read_intermediate_result intermediate_result (actual rows=2 loops=1) +(35 rows) + -- Query 21 -- DATA SET DOES NOT COVER THIS QUERY SELECT diff --git a/src/test/regress/sql/chbenchmark_all_queries.sql b/src/test/regress/sql/chbenchmark_all_queries.sql index bed8308a9..82ce7eb68 100644 --- a/src/test/regress/sql/chbenchmark_all_queries.sql +++ b/src/test/regress/sql/chbenchmark_all_queries.sql @@ -673,6 +673,32 @@ WHERE su_suppkey in AND n_name = 'Germany' ORDER BY su_name; +\set default_analyze_flags '(ANALYZE on, COSTS off, TIMING off, SUMMARY off)' +EXPLAIN :default_analyze_flags SELECT + su_name, + su_address +FROM + supplier, + nation +WHERE su_suppkey in + (SELECT + mod(s_i_id * s_w_id, 10000) + FROM + stock, + order_line + WHERE s_i_id IN + (SELECT i_id + FROM item + WHERE i_data LIKE 'co%') + AND ol_i_id = s_i_id + AND ol_delivery_d > '2008-05-23 12:00:00' -- was 2010, but our order is in 2008 + GROUP BY s_i_id, s_w_id, s_quantity + HAVING 2*s_quantity > sum(ol_quantity)) + AND su_nationkey = n_nationkey + AND n_name = 'Germany' +ORDER BY su_name; + + -- Query 21 -- DATA SET DOES NOT COVER THIS QUERY SELECT