From 577d651778bdd6ea2dc19cafc2a530d0326836b0 Mon Sep 17 00:00:00 2001 From: Onur Tirtir Date: Thu, 24 Nov 2022 13:12:34 +0300 Subject: [PATCH] Phase - III: recursively plan non-recurring sub join trees too --- .../distributed/planner/recursive_planning.c | 38 +++-- .../regress/expected/multi_outer_join.out | 40 +++++- .../expected/multi_outer_join_reference.out | 43 +++++- .../regress/expected/sqlancer_failures.out | 134 +++++++++++++----- src/test/regress/sql/multi_outer_join.sql | 2 +- .../sql/multi_outer_join_reference.sql | 5 +- src/test/regress/sql/sqlancer_failures.sql | 4 +- 7 files changed, 207 insertions(+), 59 deletions(-) diff --git a/src/backend/distributed/planner/recursive_planning.c b/src/backend/distributed/planner/recursive_planning.c index 6371d99ad..4d9bb895f 100644 --- a/src/backend/distributed/planner/recursive_planning.c +++ b/src/backend/distributed/planner/recursive_planning.c @@ -844,19 +844,36 @@ RecursivelyPlanDistributedJoinNode(Node *distributedNode, Query *query, if (IsA(distributedNode, JoinExpr)) { /* - * XXX: This, for example, means that RecursivelyPlanRecurringTupleOuterJoins - * needs to plan inner side, i.e., INNER JOIN , - * of the following join: + * This, for example, means that RecursivelyPlanRecurringTupleOuterJoins + * needs to plan inner side, i.e., " INNER JOIN ", + * of the following join: + * LEFT JOIN ( INNER JOIN ) * - * LEFT JOIN ( INNER JOIN ) + * XXX: Ideally we should handle such a sub join tree by moving + * it into a subquery "as a whole" but this implies that we need to + * rebuild the rtable and re-point all the Vars to the new rtable + * indexes, so we've not implemented that yet. * - * However, this would require moving part of the join tree into a - * subquery but this implies that we need to rebuild the rtable and - * re-point all the Vars to the new rtable indexes. We have not - * implemented that yet. + * Instead, we recursively plan all the distributed tables in that + * sub join tree. This is much more inefficient than the other + * approach (since we lose the opportunity to push-down the whole + * sub join tree into the workers) but is easier to implement. */ - ereport(DEBUG4, (errmsg("recursive planner cannot plan distributed sub " - "join nodes yet"))); + + Node *larg = ((JoinExpr *) distributedNode)->larg; + if (!IsJoinNodeRecurring(larg, query)) + { + RecursivelyPlanDistributedJoinNode(larg, query, + recursivePlanningContext); + } + + Node *rarg = ((JoinExpr *) distributedNode)->rarg; + if (!IsJoinNodeRecurring(rarg, query)) + { + RecursivelyPlanDistributedJoinNode(rarg, query, + recursivePlanningContext); + } + return; } @@ -882,7 +899,6 @@ RecursivelyPlanDistributedJoinNode(Node *distributedNode, Query *query, else if (distributedRte->rtekind == RTE_SUBQUERY) { RecursivelyPlanSubquery(distributedRte->subquery, recursivePlanningContext); - return; } else { diff --git a/src/test/regress/expected/multi_outer_join.out b/src/test/regress/expected/multi_outer_join.out index 1748a0dfc..4194efe52 100644 --- a/src/test/regress/expected/multi_outer_join.out +++ b/src/test/regress/expected/multi_outer_join.out @@ -406,7 +406,7 @@ ORDER BY l_custkey, r_custkey, t_custkey; 30 | 30 | 30 (17 rows) --- Right join with single shard right most table should error out +-- Right join with single shard right most table should work SELECT l_custkey, r_custkey, t_custkey FROM @@ -414,8 +414,42 @@ FROM LEFT JOIN multi_outer_join_right r1 ON (l1.l_custkey = r1.r_custkey) RIGHT JOIN multi_outer_join_third_reference t1 ON (r1.r_custkey = t1.t_custkey) ORDER BY l_custkey, r_custkey, t_custkey; -ERROR: cannot pushdown the subquery -DETAIL: There exist a reference table in the outer part of the outer join +LOG: join order: [ "multi_outer_join_left" ] +LOG: join order: [ "multi_outer_join_right" ] + l_custkey | r_custkey | t_custkey +--------------------------------------------------------------------- + 11 | 11 | 11 + 12 | 12 | 12 + 14 | 14 | 14 + 16 | 16 | 16 + 17 | 17 | 17 + 18 | 18 | 18 + 20 | 20 | 20 + 21 | 21 | 21 + 22 | 22 | 22 + 24 | 24 | 24 + 26 | 26 | 26 + 27 | 27 | 27 + 28 | 28 | 28 + 30 | 30 | 30 + | | 1 + | | 2 + | | 3 + | | 4 + | | 5 + | | 6 + | | 7 + | | 8 + | | 9 + | | 10 + | | 13 + | | 15 + | | 19 + | | 23 + | | 25 + | | 29 +(30 rows) + -- Right join with single shard left most table should work SELECT t_custkey, r_custkey, l_custkey diff --git a/src/test/regress/expected/multi_outer_join_reference.out b/src/test/regress/expected/multi_outer_join_reference.out index 1e705d14d..aca91bda7 100644 --- a/src/test/regress/expected/multi_outer_join_reference.out +++ b/src/test/regress/expected/multi_outer_join_reference.out @@ -401,15 +401,50 @@ ORDER BY 1; 30 | 30 | 30 (25 rows) --- Right join with single shard right most table should error out +-- Right join with single shard right most table should work SELECT l_custkey, r_custkey, t_custkey FROM multi_outer_join_left_hash l1 LEFT JOIN multi_outer_join_right_hash r1 ON (l1.l_custkey = r1.r_custkey) - RIGHT JOIN multi_outer_join_third_reference t1 ON (r1.r_custkey = t1.t_custkey); -ERROR: cannot pushdown the subquery -DETAIL: There exist a reference table in the outer part of the outer join + RIGHT JOIN multi_outer_join_third_reference t1 ON (r1.r_custkey = t1.t_custkey) +ORDER BY 1,2,3; +LOG: join order: [ "multi_outer_join_left_hash" ] +LOG: join order: [ "multi_outer_join_right_hash" ] + l_custkey | r_custkey | t_custkey +--------------------------------------------------------------------- + 11 | 11 | 11 + 12 | 12 | 12 + 13 | 13 | 13 + 14 | 14 | 14 + 15 | 15 | 15 + 21 | 21 | 21 + 22 | 22 | 22 + 23 | 23 | 23 + 24 | 24 | 24 + 25 | 25 | 25 + 26 | 26 | 26 + 27 | 27 | 27 + 28 | 28 | 28 + 29 | 29 | 29 + 30 | 30 | 30 + | | 1 + | | 2 + | | 3 + | | 4 + | | 5 + | | 6 + | | 7 + | | 8 + | | 9 + | | 10 + | | 16 + | | 17 + | | 18 + | | 19 + | | 20 +(30 rows) + -- Right join with single shard left most table should work SELECT t_custkey, r_custkey, l_custkey diff --git a/src/test/regress/expected/sqlancer_failures.out b/src/test/regress/expected/sqlancer_failures.out index 207e71d56..bd519c39a 100644 --- a/src/test/regress/expected/sqlancer_failures.out +++ b/src/test/regress/expected/sqlancer_failures.out @@ -174,27 +174,38 @@ SELECT create_distributed_table('distributed_table', 'user_id'); -- ref L -- / \ -- dist ref --- so we should error out as reference table is in the outer part of the top level (left) outer join SELECT count(*) FROM distributed_table a LEFT JOIN reference_table b ON (true) RIGHT JOIN reference_table c ON (true); -ERROR: cannot pushdown the subquery -DETAIL: There exist a reference table in the outer part of the outer join + count +--------------------------------------------------------------------- + 0 +(1 row) + SELECT count(*) FROM distributed_table a LEFT JOIN (SELECT * FROM reference_table OFFSET 0) b ON (true) RIGHT JOIN (SELECT * FROM reference_table OFFSET 0) c ON (true); -ERROR: cannot pushdown the subquery -DETAIL: Complex subqueries, CTEs and local tables cannot be in the outer part of an outer join with a distributed table + count +--------------------------------------------------------------------- + 0 +(1 row) + SELECT count(*) FROM distributed_table a LEFT JOIN reference_table b ON (true) RIGHT JOIN reference_table c ON (c.id > 0); -ERROR: cannot pushdown the subquery -DETAIL: There exist a reference table in the outer part of the outer join + count +--------------------------------------------------------------------- + 0 +(1 row) + SELECT count(*) FROM distributed_table a LEFT JOIN (SELECT * FROM reference_table OFFSET 0) b ON (true) RIGHT JOIN (SELECT * FROM reference_table OFFSET 0) c ON (c.id > 0); -ERROR: cannot pushdown the subquery -DETAIL: Complex subqueries, CTEs and local tables cannot be in the outer part of an outer join with a distributed table + count +--------------------------------------------------------------------- + 0 +(1 row) + -- drop existing sqlancer tables before next tests DROP TABLE t0, t1, t2, t3, t4 CASCADE; CREATE TABLE tbl1(a REAL, b FLOAT, c money); @@ -258,14 +269,16 @@ SELECT create_reference_table('t4'); -- t1(ref) L -- / \ -- t0(dist) t4(ref) --- -- so we should error out SELECT count(*) FROM ( SELECT ALL t4.c1, t0.c0, t0.c1 FROM ONLY t0 LEFT OUTER JOIN t4 ON CAST(masklen('142.158.96.44') AS BOOLEAN) RIGHT OUTER JOIN t1 ON ((0.024767844)::MONEY) BETWEEN (t1.c1) AND (CAST(0.0602135 AS MONEY)) ) AS foo; -ERROR: cannot pushdown the subquery -DETAIL: There exist a reference table in the outer part of the outer join + count +--------------------------------------------------------------------- + 0 +(1 row) + -- first subquery has the same join tree as above, so we should error out SELECT count(*) FROM ( SELECT ALL t4.c1, t0.c0, t0.c1 FROM ONLY t0 @@ -281,21 +294,29 @@ UNION ALL SELECT ALL t4.c1, t0.c0, t0.c1 FROM ONLY t0 RIGHT OUTER JOIN t1 ON ((0.024767844)::MONEY) BETWEEN (t1.c1) AND ((0.0602135)::MONEY) WHERE (NOT (((t0.c0)LIKE((t4.c0))))) ISNULL ) AS foo; -ERROR: cannot pushdown the subquery -DETAIL: There exist a reference table in the outer part of the outer join + count +--------------------------------------------------------------------- + 0 +(1 row) + -- unsupported outer JOIN inside a subquery in WHERE clause SELECT * FROM distributed_table WHERE buy_count > ( SELECT count(*) FROM distributed_table a LEFT JOIN reference_table b ON (true) RIGHT JOIN reference_table c ON (false)); -ERROR: cannot pushdown the subquery -DETAIL: There exist a reference table in the outer part of the outer join + user_id | item_id | buy_count +--------------------------------------------------------------------- +(0 rows) + -- unsupported outer JOIN via subqueries SELECT count(*) FROM (SELECT *, random() FROM distributed_table) AS a LEFT JOIN (SELECT *, random() FROM reference_table) AS b ON (true) RIGHT JOIN (SELECT *, random() FROM reference_table) AS c ON (false); -ERROR: cannot pushdown the subquery -DETAIL: There exist a reference table in the outer part of the outer join + count +--------------------------------------------------------------------- + 0 +(1 row) + -- unsupported outer JOIN in a sublevel subquery SELECT count(*) @@ -310,8 +331,11 @@ JOIN RIGHT JOIN reference_table c ON (true) ) AS unsupported_join ON (true); -ERROR: cannot pushdown the subquery -DETAIL: There exist a reference table in the outer part of the outer join + count +--------------------------------------------------------------------- + 0 +(1 row) + SELECT count(*) FROM @@ -325,8 +349,11 @@ JOIN RIGHT JOIN (SELECT * FROM reference_table OFFSET 0) c ON (true) ) AS unsupported_join ON (true); -ERROR: cannot pushdown the subquery -DETAIL: Complex subqueries, CTEs and local tables cannot be in the outer part of an outer join with a distributed table + count +--------------------------------------------------------------------- + 0 +(1 row) + -- unsupported outer JOIN in a sublevel INNER JOIN SELECT unsupported_join.* @@ -336,8 +363,10 @@ FROM RIGHT JOIN reference_table c ON (true)) as unsupported_join (x,y,z,t,e,f,q) JOIN (reference_table d JOIN reference_table e ON(true)) ON (true); -ERROR: cannot pushdown the subquery -DETAIL: There exist a reference table in the outer part of the outer join + x | y | z | t | e | f | q | it_name | k_no +--------------------------------------------------------------------- +(0 rows) + -- unsupported outer JOIN in a sublevel LEFT JOIN SELECT unsupported_join.* @@ -347,8 +376,10 @@ FROM RIGHT JOIN reference_table c ON (true)) as unsupported_join LEFT JOIN (reference_table d JOIN reference_table e ON(true)) ON (true); -ERROR: cannot pushdown the subquery -DETAIL: There exist a reference table in the outer part of the outer join + user_id | item_id | buy_count | id | it_name | k_no | id | it_name | k_no +--------------------------------------------------------------------- +(0 rows) + SELECT unsupported_join.* FROM @@ -363,8 +394,10 @@ LEFT JOIN ON(true) ) ON (true); -ERROR: cannot pushdown the subquery -DETAIL: Complex subqueries, CTEs and local tables cannot be in the outer part of an outer join with a distributed table + user_id | item_id | buy_count | id | it_name | k_no | id | it_name | k_no +--------------------------------------------------------------------- +(0 rows) + -- unsupported outer JOIN in a sublevel RIGHT JOIN SELECT unsupported_join.* @@ -374,8 +407,10 @@ FROM RIGHT JOIN reference_table c ON (false)) as unsupported_join RIGHT JOIN (reference_table d JOIN reference_table e ON(true)) ON (true); -ERROR: cannot pushdown the subquery -DETAIL: There exist a reference table in the outer part of the outer join + user_id | item_id | buy_count | id | it_name | k_no | id | it_name | k_no +--------------------------------------------------------------------- +(0 rows) + SELECT unsupported_join.* FROM @@ -390,9 +425,11 @@ RIGHT JOIN ON(true) ) ON (true); -ERROR: cannot pushdown the subquery -DETAIL: Complex subqueries, CTEs and local tables cannot be in the outer part of an outer join with a distributed table -EXPLAIN SELECT + user_id | item_id | buy_count | id | it_name | k_no | id | it_name | k_no +--------------------------------------------------------------------- +(0 rows) + +EXPLAIN (COSTS OFF) SELECT unsupported_join.* FROM (distributed_table a @@ -400,7 +437,34 @@ FROM RIGHT JOIN reference_table c ON (true)) as unsupported_join (x,y,z,t,e,f,q) JOIN (reference_table d JOIN reference_table e ON(true)) ON (d.id > 0); -ERROR: cannot pushdown the subquery -DETAIL: There exist a reference table in the outer part of the outer join + QUERY PLAN +--------------------------------------------------------------------- + Custom Scan (Citus Adaptive) + -> Distributed Subplan XXX_1 + -> Custom Scan (Citus Adaptive) + Task Count: 4 + Tasks Shown: One of 4 + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Seq Scan on distributed_table_92862439 a + Task Count: 1 + Tasks Shown: All + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Nested Loop + -> Nested Loop + -> Nested Loop Left Join + -> Seq Scan on reference_table_92862438 c + -> Nested Loop Left Join + -> Function Scan on read_intermediate_result intermediate_result + -> Materialize + -> Seq Scan on reference_table_92862438 b + -> Materialize + -> Seq Scan on reference_table_92862438 d + Filter: (id > 0) + -> Materialize + -> Seq Scan on reference_table_92862438 e +(25 rows) + SET client_min_messages TO WARNING; DROP SCHEMA sqlancer_failures CASCADE; diff --git a/src/test/regress/sql/multi_outer_join.sql b/src/test/regress/sql/multi_outer_join.sql index 280f4e211..c3e01c944 100644 --- a/src/test/regress/sql/multi_outer_join.sql +++ b/src/test/regress/sql/multi_outer_join.sql @@ -302,7 +302,7 @@ FROM LEFT JOIN multi_outer_join_third_reference t1 ON (r1.r_custkey = t1.t_custkey) ORDER BY l_custkey, r_custkey, t_custkey; --- Right join with single shard right most table should error out +-- Right join with single shard right most table should work SELECT l_custkey, r_custkey, t_custkey FROM diff --git a/src/test/regress/sql/multi_outer_join_reference.sql b/src/test/regress/sql/multi_outer_join_reference.sql index 9c824736b..04a9c23e2 100644 --- a/src/test/regress/sql/multi_outer_join_reference.sql +++ b/src/test/regress/sql/multi_outer_join_reference.sql @@ -302,13 +302,14 @@ FROM LEFT JOIN multi_outer_join_third_reference t1 ON (r1.r_custkey = t1.t_custkey) ORDER BY 1; --- Right join with single shard right most table should error out +-- Right join with single shard right most table should work SELECT l_custkey, r_custkey, t_custkey FROM multi_outer_join_left_hash l1 LEFT JOIN multi_outer_join_right_hash r1 ON (l1.l_custkey = r1.r_custkey) - RIGHT JOIN multi_outer_join_third_reference t1 ON (r1.r_custkey = t1.t_custkey); + RIGHT JOIN multi_outer_join_third_reference t1 ON (r1.r_custkey = t1.t_custkey) +ORDER BY 1,2,3; -- Right join with single shard left most table should work SELECT diff --git a/src/test/regress/sql/sqlancer_failures.sql b/src/test/regress/sql/sqlancer_failures.sql index afb7b909f..d44ed04d9 100644 --- a/src/test/regress/sql/sqlancer_failures.sql +++ b/src/test/regress/sql/sqlancer_failures.sql @@ -70,7 +70,6 @@ SELECT create_distributed_table('distributed_table', 'user_id'); -- ref L -- / \ -- dist ref --- so we should error out as reference table is in the outer part of the top level (left) outer join SELECT count(*) FROM distributed_table a LEFT JOIN reference_table b ON (true) @@ -123,7 +122,6 @@ SELECT create_reference_table('t4'); -- t1(ref) L -- / \ -- t0(dist) t4(ref) --- -- so we should error out SELECT count(*) FROM ( SELECT ALL t4.c1, t0.c0, t0.c1 FROM ONLY t0 LEFT OUTER JOIN t4 ON CAST(masklen('142.158.96.44') AS BOOLEAN) @@ -247,7 +245,7 @@ RIGHT JOIN ) ON (true); -EXPLAIN SELECT +EXPLAIN (COSTS OFF) SELECT unsupported_join.* FROM (distributed_table a