From e7e48812896742c22787b2edc0c485a302f19690 Mon Sep 17 00:00:00 2001 From: Onur Tirtir Date: Tue, 29 Nov 2022 17:38:06 +0300 Subject: [PATCH] Phase - III: recursively plan non-recurring sub join trees too --- .../distributed/planner/recursive_planning.c | 29 ++-- .../regress/expected/multi_outer_join.out | 40 ++++- .../expected/multi_outer_join_reference.out | 43 ++++- .../regress/expected/sqlancer_failures.out | 160 +++++++++++++----- src/test/regress/sql/multi_outer_join.sql | 2 +- .../sql/multi_outer_join_reference.sql | 5 +- src/test/regress/sql/sqlancer_failures.sql | 27 ++- 7 files changed, 238 insertions(+), 68 deletions(-) diff --git a/src/backend/distributed/planner/recursive_planning.c b/src/backend/distributed/planner/recursive_planning.c index 779ee79fa..b74b59de3 100644 --- a/src/backend/distributed/planner/recursive_planning.c +++ b/src/backend/distributed/planner/recursive_planning.c @@ -826,19 +826,28 @@ RecursivelyPlanDistributedJoinNode(Node *node, Query *query, if (IsA(node, JoinExpr)) { /* - * XXX: This, for example, means that RecursivelyPlanRecurringTupleOuterJoins - * needs to plan inner side, i.e., INNER JOIN , - * of the following join: + * This, for example, means that RecursivelyPlanRecurringTupleOuterJoinWalker + * needs to plan inner side, i.e., " INNER JOIN ", + * of the following join: + * LEFT JOIN ( JOIN ) * - * LEFT JOIN ( INNER JOIN ) + * XXX: Ideally, we should handle such a sub join tree by moving + * it into a subquery "as a whole" but this implies that we need to + * rebuild the rtable and re-point all the Vars to the new rtable + * indexes, so we've not implemented that yet. * - * However, this would require moving part of the join tree into a - * subquery but this implies that we need to rebuild the rtable and - * re-point all the Vars to the new rtable indexes. We have not - * implemented that yet. + * Instead, we recursively plan all the distributed tables in that + * sub join tree. This is much more inefficient than the other + * approach (since we lose the opportunity to push-down the whole + * sub join tree into the workers) but is easier to implement. */ - ereport(DEBUG4, (errmsg("recursive planner cannot plan distributed sub " - "join nodes yet"))); + + RecursivelyPlanDistributedJoinNode(((JoinExpr *) node)->larg, + query, recursivePlanningContext); + + RecursivelyPlanDistributedJoinNode(((JoinExpr *) node)->rarg, + query, recursivePlanningContext); + return; } diff --git a/src/test/regress/expected/multi_outer_join.out b/src/test/regress/expected/multi_outer_join.out index 8884c876f..17b13773d 100644 --- a/src/test/regress/expected/multi_outer_join.out +++ b/src/test/regress/expected/multi_outer_join.out @@ -406,7 +406,7 @@ ORDER BY l_custkey, r_custkey, t_custkey; 30 | 30 | 30 (17 rows) --- Right join with single shard right most table should error out +-- Right join with single shard right most table should work SELECT l_custkey, r_custkey, t_custkey FROM @@ -414,8 +414,42 @@ FROM LEFT JOIN multi_outer_join_right r1 ON (l1.l_custkey = r1.r_custkey) RIGHT JOIN multi_outer_join_third_reference t1 ON (r1.r_custkey = t1.t_custkey) ORDER BY l_custkey, r_custkey, t_custkey; -ERROR: cannot pushdown the subquery -DETAIL: There exist a reference table in the outer part of the outer join +LOG: join order: [ "multi_outer_join_left" ] +LOG: join order: [ "multi_outer_join_right" ] + l_custkey | r_custkey | t_custkey +--------------------------------------------------------------------- + 11 | 11 | 11 + 12 | 12 | 12 + 14 | 14 | 14 + 16 | 16 | 16 + 17 | 17 | 17 + 18 | 18 | 18 + 20 | 20 | 20 + 21 | 21 | 21 + 22 | 22 | 22 + 24 | 24 | 24 + 26 | 26 | 26 + 27 | 27 | 27 + 28 | 28 | 28 + 30 | 30 | 30 + | | 1 + | | 2 + | | 3 + | | 4 + | | 5 + | | 6 + | | 7 + | | 8 + | | 9 + | | 10 + | | 13 + | | 15 + | | 19 + | | 23 + | | 25 + | | 29 +(30 rows) + -- Right join with single shard left most table should work SELECT t_custkey, r_custkey, l_custkey diff --git a/src/test/regress/expected/multi_outer_join_reference.out b/src/test/regress/expected/multi_outer_join_reference.out index 1e705d14d..aca91bda7 100644 --- a/src/test/regress/expected/multi_outer_join_reference.out +++ b/src/test/regress/expected/multi_outer_join_reference.out @@ -401,15 +401,50 @@ ORDER BY 1; 30 | 30 | 30 (25 rows) --- Right join with single shard right most table should error out +-- Right join with single shard right most table should work SELECT l_custkey, r_custkey, t_custkey FROM multi_outer_join_left_hash l1 LEFT JOIN multi_outer_join_right_hash r1 ON (l1.l_custkey = r1.r_custkey) - RIGHT JOIN multi_outer_join_third_reference t1 ON (r1.r_custkey = t1.t_custkey); -ERROR: cannot pushdown the subquery -DETAIL: There exist a reference table in the outer part of the outer join + RIGHT JOIN multi_outer_join_third_reference t1 ON (r1.r_custkey = t1.t_custkey) +ORDER BY 1,2,3; +LOG: join order: [ "multi_outer_join_left_hash" ] +LOG: join order: [ "multi_outer_join_right_hash" ] + l_custkey | r_custkey | t_custkey +--------------------------------------------------------------------- + 11 | 11 | 11 + 12 | 12 | 12 + 13 | 13 | 13 + 14 | 14 | 14 + 15 | 15 | 15 + 21 | 21 | 21 + 22 | 22 | 22 + 23 | 23 | 23 + 24 | 24 | 24 + 25 | 25 | 25 + 26 | 26 | 26 + 27 | 27 | 27 + 28 | 28 | 28 + 29 | 29 | 29 + 30 | 30 | 30 + | | 1 + | | 2 + | | 3 + | | 4 + | | 5 + | | 6 + | | 7 + | | 8 + | | 9 + | | 10 + | | 16 + | | 17 + | | 18 + | | 19 + | | 20 +(30 rows) + -- Right join with single shard left most table should work SELECT t_custkey, r_custkey, l_custkey diff --git a/src/test/regress/expected/sqlancer_failures.out b/src/test/regress/expected/sqlancer_failures.out index 207e71d56..d4d06a4e4 100644 --- a/src/test/regress/expected/sqlancer_failures.out +++ b/src/test/regress/expected/sqlancer_failures.out @@ -168,33 +168,55 @@ SELECT create_distributed_table('distributed_table', 'user_id'); (1 row) +INSERT INTO distributed_table VALUES +(1, 10), +(2, 22), +(3, 34), +(7, 40); +INSERT INTO reference_table VALUES +(1, '100'), +(null, '202'), +(4, '300'), +(null, '401'), +(null, '402'); -- postgres plans below queries by evaluating joins as below: -- L -- / \ -- ref L -- / \ -- dist ref --- so we should error out as reference table is in the outer part of the top level (left) outer join SELECT count(*) FROM distributed_table a LEFT JOIN reference_table b ON (true) RIGHT JOIN reference_table c ON (true); -ERROR: cannot pushdown the subquery -DETAIL: There exist a reference table in the outer part of the outer join + count +--------------------------------------------------------------------- + 100 +(1 row) + SELECT count(*) FROM distributed_table a LEFT JOIN (SELECT * FROM reference_table OFFSET 0) b ON (true) RIGHT JOIN (SELECT * FROM reference_table OFFSET 0) c ON (true); -ERROR: cannot pushdown the subquery -DETAIL: Complex subqueries, CTEs and local tables cannot be in the outer part of an outer join with a distributed table + count +--------------------------------------------------------------------- + 100 +(1 row) + SELECT count(*) FROM distributed_table a LEFT JOIN reference_table b ON (true) RIGHT JOIN reference_table c ON (c.id > 0); -ERROR: cannot pushdown the subquery -DETAIL: There exist a reference table in the outer part of the outer join + count +--------------------------------------------------------------------- + 43 +(1 row) + SELECT count(*) FROM distributed_table a LEFT JOIN (SELECT * FROM reference_table OFFSET 0) b ON (true) RIGHT JOIN (SELECT * FROM reference_table OFFSET 0) c ON (c.id > 0); -ERROR: cannot pushdown the subquery -DETAIL: Complex subqueries, CTEs and local tables cannot be in the outer part of an outer join with a distributed table + count +--------------------------------------------------------------------- + 43 +(1 row) + -- drop existing sqlancer tables before next tests DROP TABLE t0, t1, t2, t3, t4 CASCADE; CREATE TABLE tbl1(a REAL, b FLOAT, c money); @@ -258,14 +280,16 @@ SELECT create_reference_table('t4'); -- t1(ref) L -- / \ -- t0(dist) t4(ref) --- -- so we should error out SELECT count(*) FROM ( SELECT ALL t4.c1, t0.c0, t0.c1 FROM ONLY t0 LEFT OUTER JOIN t4 ON CAST(masklen('142.158.96.44') AS BOOLEAN) RIGHT OUTER JOIN t1 ON ((0.024767844)::MONEY) BETWEEN (t1.c1) AND (CAST(0.0602135 AS MONEY)) ) AS foo; -ERROR: cannot pushdown the subquery -DETAIL: There exist a reference table in the outer part of the outer join + count +--------------------------------------------------------------------- + 0 +(1 row) + -- first subquery has the same join tree as above, so we should error out SELECT count(*) FROM ( SELECT ALL t4.c1, t0.c0, t0.c1 FROM ONLY t0 @@ -281,21 +305,29 @@ UNION ALL SELECT ALL t4.c1, t0.c0, t0.c1 FROM ONLY t0 RIGHT OUTER JOIN t1 ON ((0.024767844)::MONEY) BETWEEN (t1.c1) AND ((0.0602135)::MONEY) WHERE (NOT (((t0.c0)LIKE((t4.c0))))) ISNULL ) AS foo; -ERROR: cannot pushdown the subquery -DETAIL: There exist a reference table in the outer part of the outer join + count +--------------------------------------------------------------------- + 0 +(1 row) + -- unsupported outer JOIN inside a subquery in WHERE clause SELECT * FROM distributed_table WHERE buy_count > ( SELECT count(*) FROM distributed_table a LEFT JOIN reference_table b ON (true) RIGHT JOIN reference_table c ON (false)); -ERROR: cannot pushdown the subquery -DETAIL: There exist a reference table in the outer part of the outer join + user_id | item_id | buy_count +--------------------------------------------------------------------- +(0 rows) + -- unsupported outer JOIN via subqueries SELECT count(*) FROM (SELECT *, random() FROM distributed_table) AS a LEFT JOIN (SELECT *, random() FROM reference_table) AS b ON (true) RIGHT JOIN (SELECT *, random() FROM reference_table) AS c ON (false); -ERROR: cannot pushdown the subquery -DETAIL: There exist a reference table in the outer part of the outer join + count +--------------------------------------------------------------------- + 5 +(1 row) + -- unsupported outer JOIN in a sublevel subquery SELECT count(*) @@ -310,8 +342,11 @@ JOIN RIGHT JOIN reference_table c ON (true) ) AS unsupported_join ON (true); -ERROR: cannot pushdown the subquery -DETAIL: There exist a reference table in the outer part of the outer join + count +--------------------------------------------------------------------- + 400 +(1 row) + SELECT count(*) FROM @@ -325,32 +360,41 @@ JOIN RIGHT JOIN (SELECT * FROM reference_table OFFSET 0) c ON (true) ) AS unsupported_join ON (true); -ERROR: cannot pushdown the subquery -DETAIL: Complex subqueries, CTEs and local tables cannot be in the outer part of an outer join with a distributed table + count +--------------------------------------------------------------------- + 400 +(1 row) + -- unsupported outer JOIN in a sublevel INNER JOIN SELECT - unsupported_join.* + COUNT(unsupported_join.*) FROM (distributed_table a LEFT JOIN reference_table b ON (true) RIGHT JOIN reference_table c ON (true)) as unsupported_join (x,y,z,t,e,f,q) JOIN (reference_table d JOIN reference_table e ON(true)) ON (true); -ERROR: cannot pushdown the subquery -DETAIL: There exist a reference table in the outer part of the outer join + count +--------------------------------------------------------------------- + 2500 +(1 row) + -- unsupported outer JOIN in a sublevel LEFT JOIN SELECT - unsupported_join.* + COUNT(unsupported_join.*) FROM (distributed_table a LEFT JOIN reference_table b ON (true) RIGHT JOIN reference_table c ON (true)) as unsupported_join LEFT JOIN (reference_table d JOIN reference_table e ON(true)) ON (true); -ERROR: cannot pushdown the subquery -DETAIL: There exist a reference table in the outer part of the outer join + count +--------------------------------------------------------------------- + 2500 +(1 row) + SELECT - unsupported_join.* + COUNT(unsupported_join.*) FROM (distributed_table a LEFT JOIN (SELECT * FROM reference_table OFFSET 0) b ON (true) @@ -363,21 +407,27 @@ LEFT JOIN ON(true) ) ON (true); -ERROR: cannot pushdown the subquery -DETAIL: Complex subqueries, CTEs and local tables cannot be in the outer part of an outer join with a distributed table + count +--------------------------------------------------------------------- + 2500 +(1 row) + -- unsupported outer JOIN in a sublevel RIGHT JOIN SELECT - unsupported_join.* + COUNT(unsupported_join.*) FROM (distributed_table a LEFT JOIN reference_table b ON (true) RIGHT JOIN reference_table c ON (false)) as unsupported_join RIGHT JOIN (reference_table d JOIN reference_table e ON(true)) ON (true); -ERROR: cannot pushdown the subquery -DETAIL: There exist a reference table in the outer part of the outer join + count +--------------------------------------------------------------------- + 125 +(1 row) + SELECT - unsupported_join.* + COUNT(unsupported_join.*) FROM (distributed_table a LEFT JOIN (SELECT * FROM reference_table OFFSET 0) b ON (true) @@ -390,9 +440,12 @@ RIGHT JOIN ON(true) ) ON (true); -ERROR: cannot pushdown the subquery -DETAIL: Complex subqueries, CTEs and local tables cannot be in the outer part of an outer join with a distributed table -EXPLAIN SELECT + count +--------------------------------------------------------------------- + 125 +(1 row) + +EXPLAIN (COSTS OFF) SELECT unsupported_join.* FROM (distributed_table a @@ -400,7 +453,34 @@ FROM RIGHT JOIN reference_table c ON (true)) as unsupported_join (x,y,z,t,e,f,q) JOIN (reference_table d JOIN reference_table e ON(true)) ON (d.id > 0); -ERROR: cannot pushdown the subquery -DETAIL: There exist a reference table in the outer part of the outer join + QUERY PLAN +--------------------------------------------------------------------- + Custom Scan (Citus Adaptive) + -> Distributed Subplan XXX_1 + -> Custom Scan (Citus Adaptive) + Task Count: 4 + Tasks Shown: One of 4 + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Seq Scan on distributed_table_92862439 a + Task Count: 1 + Tasks Shown: All + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Nested Loop + -> Nested Loop + -> Nested Loop Left Join + -> Seq Scan on reference_table_92862438 c + -> Nested Loop Left Join + -> Function Scan on read_intermediate_result intermediate_result + -> Materialize + -> Seq Scan on reference_table_92862438 b + -> Materialize + -> Seq Scan on reference_table_92862438 d + Filter: (id > 0) + -> Materialize + -> Seq Scan on reference_table_92862438 e +(25 rows) + SET client_min_messages TO WARNING; DROP SCHEMA sqlancer_failures CASCADE; diff --git a/src/test/regress/sql/multi_outer_join.sql b/src/test/regress/sql/multi_outer_join.sql index 549e0ae2c..5f911dc3b 100644 --- a/src/test/regress/sql/multi_outer_join.sql +++ b/src/test/regress/sql/multi_outer_join.sql @@ -302,7 +302,7 @@ FROM LEFT JOIN multi_outer_join_third_reference t1 ON (r1.r_custkey = t1.t_custkey) ORDER BY l_custkey, r_custkey, t_custkey; --- Right join with single shard right most table should error out +-- Right join with single shard right most table should work SELECT l_custkey, r_custkey, t_custkey FROM diff --git a/src/test/regress/sql/multi_outer_join_reference.sql b/src/test/regress/sql/multi_outer_join_reference.sql index 9c824736b..04a9c23e2 100644 --- a/src/test/regress/sql/multi_outer_join_reference.sql +++ b/src/test/regress/sql/multi_outer_join_reference.sql @@ -302,13 +302,14 @@ FROM LEFT JOIN multi_outer_join_third_reference t1 ON (r1.r_custkey = t1.t_custkey) ORDER BY 1; --- Right join with single shard right most table should error out +-- Right join with single shard right most table should work SELECT l_custkey, r_custkey, t_custkey FROM multi_outer_join_left_hash l1 LEFT JOIN multi_outer_join_right_hash r1 ON (l1.l_custkey = r1.r_custkey) - RIGHT JOIN multi_outer_join_third_reference t1 ON (r1.r_custkey = t1.t_custkey); + RIGHT JOIN multi_outer_join_third_reference t1 ON (r1.r_custkey = t1.t_custkey) +ORDER BY 1,2,3; -- Right join with single shard left most table should work SELECT diff --git a/src/test/regress/sql/sqlancer_failures.sql b/src/test/regress/sql/sqlancer_failures.sql index afb7b909f..d003d58be 100644 --- a/src/test/regress/sql/sqlancer_failures.sql +++ b/src/test/regress/sql/sqlancer_failures.sql @@ -64,13 +64,25 @@ SELECT create_reference_table('reference_table'); CREATE TABLE distributed_table(user_id int, item_id int, buy_count int); SELECT create_distributed_table('distributed_table', 'user_id'); +INSERT INTO distributed_table VALUES +(1, 10), +(2, 22), +(3, 34), +(7, 40); + +INSERT INTO reference_table VALUES +(1, '100'), +(null, '202'), +(4, '300'), +(null, '401'), +(null, '402'); + -- postgres plans below queries by evaluating joins as below: -- L -- / \ -- ref L -- / \ -- dist ref --- so we should error out as reference table is in the outer part of the top level (left) outer join SELECT count(*) FROM distributed_table a LEFT JOIN reference_table b ON (true) @@ -123,7 +135,6 @@ SELECT create_reference_table('t4'); -- t1(ref) L -- / \ -- t0(dist) t4(ref) --- -- so we should error out SELECT count(*) FROM ( SELECT ALL t4.c1, t0.c0, t0.c1 FROM ONLY t0 LEFT OUTER JOIN t4 ON CAST(masklen('142.158.96.44') AS BOOLEAN) @@ -189,7 +200,7 @@ ON (true); -- unsupported outer JOIN in a sublevel INNER JOIN SELECT - unsupported_join.* + COUNT(unsupported_join.*) FROM (distributed_table a LEFT JOIN reference_table b ON (true) @@ -199,7 +210,7 @@ JOIN -- unsupported outer JOIN in a sublevel LEFT JOIN SELECT - unsupported_join.* + COUNT(unsupported_join.*) FROM (distributed_table a LEFT JOIN reference_table b ON (true) @@ -208,7 +219,7 @@ LEFT JOIN (reference_table d JOIN reference_table e ON(true)) ON (true); SELECT - unsupported_join.* + COUNT(unsupported_join.*) FROM (distributed_table a LEFT JOIN (SELECT * FROM reference_table OFFSET 0) b ON (true) @@ -224,7 +235,7 @@ ON (true); -- unsupported outer JOIN in a sublevel RIGHT JOIN SELECT - unsupported_join.* + COUNT(unsupported_join.*) FROM (distributed_table a LEFT JOIN reference_table b ON (true) @@ -233,7 +244,7 @@ RIGHT JOIN (reference_table d JOIN reference_table e ON(true)) ON (true); SELECT - unsupported_join.* + COUNT(unsupported_join.*) FROM (distributed_table a LEFT JOIN (SELECT * FROM reference_table OFFSET 0) b ON (true) @@ -247,7 +258,7 @@ RIGHT JOIN ) ON (true); -EXPLAIN SELECT +EXPLAIN (COSTS OFF) SELECT unsupported_join.* FROM (distributed_table a