From f458d1fd1c70724a586936e178dbed2717d53d9c Mon Sep 17 00:00:00 2001 From: SaitTalhaNisanci Date: Wed, 24 Jun 2020 12:10:00 +0300 Subject: [PATCH] Fix/task execution (#3941) * Not set TaskExecution with adaptive executor Adaptive executor is using a utility method from task tracker for repartition joins, however adaptive executor doesn't need taskExecution. It is only used by task tracker. This causes a problem when explain analyze is used because what taskExecution is pointing to might be random. We solve this by not setting taskExecution from adaptive executor. So it will stay NULL as set by CreateTask. * use same memory context as task for taskExecution Co-authored-by: Jelte Fennema --- .../executor/multi_task_tracker_executor.c | 27 +++++--- .../executor/repartition_join_execution.c | 3 +- .../distributed/multi_task_tracker_executor.h | 2 +- .../expected/chbenchmark_all_queries.out | 64 +++++++++++++++++++ .../regress/sql/chbenchmark_all_queries.sql | 26 ++++++++ 5 files changed, 111 insertions(+), 11 deletions(-) diff --git a/src/backend/distributed/executor/multi_task_tracker_executor.c b/src/backend/distributed/executor/multi_task_tracker_executor.c index a6482c1b8..a099e3232 100644 --- a/src/backend/distributed/executor/multi_task_tracker_executor.c +++ b/src/backend/distributed/executor/multi_task_tracker_executor.c @@ -185,7 +185,9 @@ MultiTaskTrackerExecute(Job *job) * We walk over the task tree, and create a task execution struct for each * task. We then associate the task with its execution and get back a list. */ - List *taskAndExecutionList = TaskAndExecutionList(jobTaskList); + bool createTaskExecution = true; + List *taskAndExecutionList = CreateTaskListForJobTree(jobTaskList, + createTaskExecution); /* * We now count the number of "top level" tasks in the query tree. Once they @@ -446,13 +448,14 @@ MultiTaskTrackerExecute(Job *job) /* - * TaskAndExecutionList visits all tasks in the job tree, starting with the given - * job's task list. For each visited task, the function creates a task execution - * struct, associates the task execution with the task, and adds the task and its - * execution to a list. The function then returns the list. + * CreateTaskListForJobTree visits all tasks in the job tree (by following dependentTaskList), + * starting with the given job's task list. + * If createTaskExecution is set to true, for each visited task, + * the function creates a task execution struct associates the task execution with the task, + * and adds the task and its execution to a list. The function then returns the list. */ List * -TaskAndExecutionList(List *jobTaskList) +CreateTaskListForJobTree(List *jobTaskList, bool createTaskExecution) { List *taskAndExecutionList = NIL; const int topLevelTaskHashSize = 32; @@ -470,9 +473,15 @@ TaskAndExecutionList(List *jobTaskList) Task *task = (Task *) linitial(taskQueue); taskQueue = list_delete_first(taskQueue); - /* create task execution and associate it with task */ - TaskExecution *taskExecution = InitTaskExecution(task, EXEC_TASK_UNASSIGNED); - task->taskExecution = taskExecution; + if (createTaskExecution) + { + MemoryContext oldContext = MemoryContextSwitchTo(GetMemoryChunkContext(task)); + + /* create task execution and associate it with task */ + TaskExecution *taskExecution = InitTaskExecution(task, EXEC_TASK_UNASSIGNED); + MemoryContextSwitchTo(oldContext); + task->taskExecution = taskExecution; + } taskAndExecutionList = lappend(taskAndExecutionList, task); diff --git a/src/backend/distributed/executor/repartition_join_execution.c b/src/backend/distributed/executor/repartition_join_execution.c index 242f5908a..0c5753213 100644 --- a/src/backend/distributed/executor/repartition_join_execution.c +++ b/src/backend/distributed/executor/repartition_join_execution.c @@ -63,7 +63,8 @@ ExecuteDependentTasks(List *topLevelTasks, Job *topLevelJob) { EnsureNoModificationsHaveBeenDone(); - List *allTasks = TaskAndExecutionList(topLevelTasks); + bool createTaskExecution = false; + List *allTasks = CreateTaskListForJobTree(topLevelTasks, createTaskExecution); EnsureCompatibleLocalExecutionState(allTasks); diff --git a/src/include/distributed/multi_task_tracker_executor.h b/src/include/distributed/multi_task_tracker_executor.h index 4f4d412ed..f496c48eb 100644 --- a/src/include/distributed/multi_task_tracker_executor.h +++ b/src/include/distributed/multi_task_tracker_executor.h @@ -3,6 +3,6 @@ #ifndef MULTI_TASK_TRACKER_EXECUTOR_H #define MULTI_TASK_TRACKER_EXECUTOR_H -extern List * TaskAndExecutionList(List *jobTaskList); +extern List * CreateTaskListForJobTree(List *jobTaskList, bool viaTaskTracker); #endif /* MULTI_TASK_TRACKER_EXECUTOR_H */ diff --git a/src/test/regress/expected/chbenchmark_all_queries.out b/src/test/regress/expected/chbenchmark_all_queries.out index 32e60b324..b0a1a775f 100644 --- a/src/test/regress/expected/chbenchmark_all_queries.out +++ b/src/test/regress/expected/chbenchmark_all_queries.out @@ -864,6 +864,70 @@ LOG: join order: [ "stock" ][ reference join "item" ][ dual partition join "ord abc | def (1 row) +\set default_analyze_flags '(ANALYZE on, COSTS off, TIMING off, SUMMARY off)' +EXPLAIN :default_analyze_flags SELECT + su_name, + su_address +FROM + supplier, + nation +WHERE su_suppkey in + (SELECT + mod(s_i_id * s_w_id, 10000) + FROM + stock, + order_line + WHERE s_i_id IN + (SELECT i_id + FROM item + WHERE i_data LIKE 'co%') + AND ol_i_id = s_i_id + AND ol_delivery_d > '2008-05-23 12:00:00' -- was 2010, but our order is in 2008 + GROUP BY s_i_id, s_w_id, s_quantity + HAVING 2*s_quantity > sum(ol_quantity)) + AND su_nationkey = n_nationkey + AND n_name = 'Germany' +ORDER BY su_name; +LOG: join order: [ "stock" ][ reference join "item" ][ dual partition join "order_line" ] + QUERY PLAN +--------------------------------------------------------------------- + Custom Scan (Citus Adaptive) (actual rows=1 loops=1) + -> Distributed Subplan XXX_1 + Intermediate Data Size: 30 bytes + Result destination: Send to 2 nodes + -> Custom Scan (Citus Adaptive) (actual rows=3 loops=1) + Task Count: 4 + Tuple data received from nodes: 12 bytes + Tasks Shown: None, not supported for re-partition queries + -> MapMergeJob + Map Task Count: 4 + Merge Task Count: 4 + -> MapMergeJob + Map Task Count: 4 + Merge Task Count: 4 + Task Count: 1 + Tuple data received from nodes: 28 bytes + Tasks Shown: All + -> Task + Tuple data received from node: 28 bytes + Node: host=localhost port=xxxxx dbname=regression + -> Sort (actual rows=1 loops=1) + Sort Key: supplier.su_name + Sort Method: quicksort Memory: 25kB + -> Nested Loop Semi Join (actual rows=1 loops=1) + Join Filter: (supplier.su_suppkey = intermediate_result.mod) + Rows Removed by Join Filter: 1 + -> Hash Join (actual rows=1 loops=1) + Hash Cond: (supplier.su_nationkey = nation.n_nationkey) + -> Seq Scan on supplier_1650035 supplier (actual rows=11 loops=1) + -> Hash (actual rows=1 loops=1) + Buckets: 1024 Batches: 1 Memory Usage: 9kB + -> Seq Scan on nation_1650034 nation (actual rows=1 loops=1) + Filter: (n_name = 'Germany'::bpchar) + Rows Removed by Filter: 3 + -> Function Scan on read_intermediate_result intermediate_result (actual rows=2 loops=1) +(35 rows) + -- Query 21 -- DATA SET DOES NOT COVER THIS QUERY SELECT diff --git a/src/test/regress/sql/chbenchmark_all_queries.sql b/src/test/regress/sql/chbenchmark_all_queries.sql index bed8308a9..82ce7eb68 100644 --- a/src/test/regress/sql/chbenchmark_all_queries.sql +++ b/src/test/regress/sql/chbenchmark_all_queries.sql @@ -673,6 +673,32 @@ WHERE su_suppkey in AND n_name = 'Germany' ORDER BY su_name; +\set default_analyze_flags '(ANALYZE on, COSTS off, TIMING off, SUMMARY off)' +EXPLAIN :default_analyze_flags SELECT + su_name, + su_address +FROM + supplier, + nation +WHERE su_suppkey in + (SELECT + mod(s_i_id * s_w_id, 10000) + FROM + stock, + order_line + WHERE s_i_id IN + (SELECT i_id + FROM item + WHERE i_data LIKE 'co%') + AND ol_i_id = s_i_id + AND ol_delivery_d > '2008-05-23 12:00:00' -- was 2010, but our order is in 2008 + GROUP BY s_i_id, s_w_id, s_quantity + HAVING 2*s_quantity > sum(ol_quantity)) + AND su_nationkey = n_nationkey + AND n_name = 'Germany' +ORDER BY su_name; + + -- Query 21 -- DATA SET DOES NOT COVER THIS QUERY SELECT