Phase - III: recursively plan non-recurring sub join trees too

pull/6512/head
Onur Tirtir 2022-11-29 17:38:06 +03:00
parent f52381387e
commit e7e4881289
7 changed files with 238 additions and 68 deletions

View File

@ -826,19 +826,28 @@ RecursivelyPlanDistributedJoinNode(Node *node, Query *query,
if (IsA(node, JoinExpr))
{
/*
* XXX: This, for example, means that RecursivelyPlanRecurringTupleOuterJoins
* needs to plan inner side, i.e., <distributed> INNER JOIN <distributed>,
* of the following join:
* This, for example, means that RecursivelyPlanRecurringTupleOuterJoinWalker
* needs to plan inner side, i.e., "<distributed> INNER JOIN <distributed>",
* of the following join:
* <recurring> LEFT JOIN (<distributed> JOIN <distributed>)
*
* <recurring> LEFT JOIN (<distributed> INNER JOIN <distributed>)
* XXX: Ideally, we should handle such a sub join tree by moving
* it into a subquery "as a whole" but this implies that we need to
* rebuild the rtable and re-point all the Vars to the new rtable
* indexes, so we've not implemented that yet.
*
* However, this would require moving part of the join tree into a
* subquery but this implies that we need to rebuild the rtable and
* re-point all the Vars to the new rtable indexes. We have not
* implemented that yet.
* Instead, we recursively plan all the distributed tables in that
* sub join tree. This is much more inefficient than the other
* approach (since we lose the opportunity to push-down the whole
* sub join tree into the workers) but is easier to implement.
*/
ereport(DEBUG4, (errmsg("recursive planner cannot plan distributed sub "
"join nodes yet")));
RecursivelyPlanDistributedJoinNode(((JoinExpr *) node)->larg,
query, recursivePlanningContext);
RecursivelyPlanDistributedJoinNode(((JoinExpr *) node)->rarg,
query, recursivePlanningContext);
return;
}

View File

@ -406,7 +406,7 @@ ORDER BY l_custkey, r_custkey, t_custkey;
30 | 30 | 30
(17 rows)
-- Right join with single shard right most table should error out
-- Right join with single shard right most table should work
SELECT
l_custkey, r_custkey, t_custkey
FROM
@ -414,8 +414,42 @@ FROM
LEFT JOIN multi_outer_join_right r1 ON (l1.l_custkey = r1.r_custkey)
RIGHT JOIN multi_outer_join_third_reference t1 ON (r1.r_custkey = t1.t_custkey)
ORDER BY l_custkey, r_custkey, t_custkey;
ERROR: cannot pushdown the subquery
DETAIL: There exist a reference table in the outer part of the outer join
LOG: join order: [ "multi_outer_join_left" ]
LOG: join order: [ "multi_outer_join_right" ]
l_custkey | r_custkey | t_custkey
---------------------------------------------------------------------
11 | 11 | 11
12 | 12 | 12
14 | 14 | 14
16 | 16 | 16
17 | 17 | 17
18 | 18 | 18
20 | 20 | 20
21 | 21 | 21
22 | 22 | 22
24 | 24 | 24
26 | 26 | 26
27 | 27 | 27
28 | 28 | 28
30 | 30 | 30
| | 1
| | 2
| | 3
| | 4
| | 5
| | 6
| | 7
| | 8
| | 9
| | 10
| | 13
| | 15
| | 19
| | 23
| | 25
| | 29
(30 rows)
-- Right join with single shard left most table should work
SELECT
t_custkey, r_custkey, l_custkey

View File

@ -401,15 +401,50 @@ ORDER BY 1;
30 | 30 | 30
(25 rows)
-- Right join with single shard right most table should error out
-- Right join with single shard right most table should work
SELECT
l_custkey, r_custkey, t_custkey
FROM
multi_outer_join_left_hash l1
LEFT JOIN multi_outer_join_right_hash r1 ON (l1.l_custkey = r1.r_custkey)
RIGHT JOIN multi_outer_join_third_reference t1 ON (r1.r_custkey = t1.t_custkey);
ERROR: cannot pushdown the subquery
DETAIL: There exist a reference table in the outer part of the outer join
RIGHT JOIN multi_outer_join_third_reference t1 ON (r1.r_custkey = t1.t_custkey)
ORDER BY 1,2,3;
LOG: join order: [ "multi_outer_join_left_hash" ]
LOG: join order: [ "multi_outer_join_right_hash" ]
l_custkey | r_custkey | t_custkey
---------------------------------------------------------------------
11 | 11 | 11
12 | 12 | 12
13 | 13 | 13
14 | 14 | 14
15 | 15 | 15
21 | 21 | 21
22 | 22 | 22
23 | 23 | 23
24 | 24 | 24
25 | 25 | 25
26 | 26 | 26
27 | 27 | 27
28 | 28 | 28
29 | 29 | 29
30 | 30 | 30
| | 1
| | 2
| | 3
| | 4
| | 5
| | 6
| | 7
| | 8
| | 9
| | 10
| | 16
| | 17
| | 18
| | 19
| | 20
(30 rows)
-- Right join with single shard left most table should work
SELECT
t_custkey, r_custkey, l_custkey

View File

@ -168,33 +168,55 @@ SELECT create_distributed_table('distributed_table', 'user_id');
(1 row)
INSERT INTO distributed_table VALUES
(1, 10),
(2, 22),
(3, 34),
(7, 40);
INSERT INTO reference_table VALUES
(1, '100'),
(null, '202'),
(4, '300'),
(null, '401'),
(null, '402');
-- postgres plans below queries by evaluating joins as below:
-- L
-- / \
-- ref L
-- / \
-- dist ref
-- so we should error out as reference table is in the outer part of the top level (left) outer join
SELECT count(*) FROM distributed_table a
LEFT JOIN reference_table b ON (true)
RIGHT JOIN reference_table c ON (true);
ERROR: cannot pushdown the subquery
DETAIL: There exist a reference table in the outer part of the outer join
count
---------------------------------------------------------------------
100
(1 row)
SELECT count(*) FROM distributed_table a
LEFT JOIN (SELECT * FROM reference_table OFFSET 0) b ON (true)
RIGHT JOIN (SELECT * FROM reference_table OFFSET 0) c ON (true);
ERROR: cannot pushdown the subquery
DETAIL: Complex subqueries, CTEs and local tables cannot be in the outer part of an outer join with a distributed table
count
---------------------------------------------------------------------
100
(1 row)
SELECT count(*) FROM distributed_table a
LEFT JOIN reference_table b ON (true)
RIGHT JOIN reference_table c ON (c.id > 0);
ERROR: cannot pushdown the subquery
DETAIL: There exist a reference table in the outer part of the outer join
count
---------------------------------------------------------------------
43
(1 row)
SELECT count(*) FROM distributed_table a
LEFT JOIN (SELECT * FROM reference_table OFFSET 0) b ON (true)
RIGHT JOIN (SELECT * FROM reference_table OFFSET 0) c ON (c.id > 0);
ERROR: cannot pushdown the subquery
DETAIL: Complex subqueries, CTEs and local tables cannot be in the outer part of an outer join with a distributed table
count
---------------------------------------------------------------------
43
(1 row)
-- drop existing sqlancer tables before next tests
DROP TABLE t0, t1, t2, t3, t4 CASCADE;
CREATE TABLE tbl1(a REAL, b FLOAT, c money);
@ -258,14 +280,16 @@ SELECT create_reference_table('t4');
-- t1(ref) L
-- / \
-- t0(dist) t4(ref)
-- -- so we should error out
SELECT count(*) FROM (
SELECT ALL t4.c1, t0.c0, t0.c1 FROM ONLY t0
LEFT OUTER JOIN t4 ON CAST(masklen('142.158.96.44') AS BOOLEAN)
RIGHT OUTER JOIN t1 ON ((0.024767844)::MONEY) BETWEEN (t1.c1) AND (CAST(0.0602135 AS MONEY))
) AS foo;
ERROR: cannot pushdown the subquery
DETAIL: There exist a reference table in the outer part of the outer join
count
---------------------------------------------------------------------
0
(1 row)
-- first subquery has the same join tree as above, so we should error out
SELECT count(*) FROM (
SELECT ALL t4.c1, t0.c0, t0.c1 FROM ONLY t0
@ -281,21 +305,29 @@ UNION ALL SELECT ALL t4.c1, t0.c0, t0.c1 FROM ONLY t0
RIGHT OUTER JOIN t1 ON ((0.024767844)::MONEY) BETWEEN (t1.c1) AND ((0.0602135)::MONEY)
WHERE (NOT (((t0.c0)LIKE((t4.c0))))) ISNULL
) AS foo;
ERROR: cannot pushdown the subquery
DETAIL: There exist a reference table in the outer part of the outer join
count
---------------------------------------------------------------------
0
(1 row)
-- unsupported outer JOIN inside a subquery in WHERE clause
SELECT * FROM distributed_table WHERE buy_count > (
SELECT count(*) FROM distributed_table a
LEFT JOIN reference_table b ON (true)
RIGHT JOIN reference_table c ON (false));
ERROR: cannot pushdown the subquery
DETAIL: There exist a reference table in the outer part of the outer join
user_id | item_id | buy_count
---------------------------------------------------------------------
(0 rows)
-- unsupported outer JOIN via subqueries
SELECT count(*) FROM (SELECT *, random() FROM distributed_table) AS a
LEFT JOIN (SELECT *, random() FROM reference_table) AS b ON (true)
RIGHT JOIN (SELECT *, random() FROM reference_table) AS c ON (false);
ERROR: cannot pushdown the subquery
DETAIL: There exist a reference table in the outer part of the outer join
count
---------------------------------------------------------------------
5
(1 row)
-- unsupported outer JOIN in a sublevel subquery
SELECT
count(*)
@ -310,8 +342,11 @@ JOIN
RIGHT JOIN reference_table c ON (true)
) AS unsupported_join
ON (true);
ERROR: cannot pushdown the subquery
DETAIL: There exist a reference table in the outer part of the outer join
count
---------------------------------------------------------------------
400
(1 row)
SELECT
count(*)
FROM
@ -325,32 +360,41 @@ JOIN
RIGHT JOIN (SELECT * FROM reference_table OFFSET 0) c ON (true)
) AS unsupported_join
ON (true);
ERROR: cannot pushdown the subquery
DETAIL: Complex subqueries, CTEs and local tables cannot be in the outer part of an outer join with a distributed table
count
---------------------------------------------------------------------
400
(1 row)
-- unsupported outer JOIN in a sublevel INNER JOIN
SELECT
unsupported_join.*
COUNT(unsupported_join.*)
FROM
(distributed_table a
LEFT JOIN reference_table b ON (true)
RIGHT JOIN reference_table c ON (true)) as unsupported_join (x,y,z,t,e,f,q)
JOIN
(reference_table d JOIN reference_table e ON(true)) ON (true);
ERROR: cannot pushdown the subquery
DETAIL: There exist a reference table in the outer part of the outer join
count
---------------------------------------------------------------------
2500
(1 row)
-- unsupported outer JOIN in a sublevel LEFT JOIN
SELECT
unsupported_join.*
COUNT(unsupported_join.*)
FROM
(distributed_table a
LEFT JOIN reference_table b ON (true)
RIGHT JOIN reference_table c ON (true)) as unsupported_join
LEFT JOIN
(reference_table d JOIN reference_table e ON(true)) ON (true);
ERROR: cannot pushdown the subquery
DETAIL: There exist a reference table in the outer part of the outer join
count
---------------------------------------------------------------------
2500
(1 row)
SELECT
unsupported_join.*
COUNT(unsupported_join.*)
FROM
(distributed_table a
LEFT JOIN (SELECT * FROM reference_table OFFSET 0) b ON (true)
@ -363,21 +407,27 @@ LEFT JOIN
ON(true)
)
ON (true);
ERROR: cannot pushdown the subquery
DETAIL: Complex subqueries, CTEs and local tables cannot be in the outer part of an outer join with a distributed table
count
---------------------------------------------------------------------
2500
(1 row)
-- unsupported outer JOIN in a sublevel RIGHT JOIN
SELECT
unsupported_join.*
COUNT(unsupported_join.*)
FROM
(distributed_table a
LEFT JOIN reference_table b ON (true)
RIGHT JOIN reference_table c ON (false)) as unsupported_join
RIGHT JOIN
(reference_table d JOIN reference_table e ON(true)) ON (true);
ERROR: cannot pushdown the subquery
DETAIL: There exist a reference table in the outer part of the outer join
count
---------------------------------------------------------------------
125
(1 row)
SELECT
unsupported_join.*
COUNT(unsupported_join.*)
FROM
(distributed_table a
LEFT JOIN (SELECT * FROM reference_table OFFSET 0) b ON (true)
@ -390,9 +440,12 @@ RIGHT JOIN
ON(true)
)
ON (true);
ERROR: cannot pushdown the subquery
DETAIL: Complex subqueries, CTEs and local tables cannot be in the outer part of an outer join with a distributed table
EXPLAIN SELECT
count
---------------------------------------------------------------------
125
(1 row)
EXPLAIN (COSTS OFF) SELECT
unsupported_join.*
FROM
(distributed_table a
@ -400,7 +453,34 @@ FROM
RIGHT JOIN reference_table c ON (true)) as unsupported_join (x,y,z,t,e,f,q)
JOIN
(reference_table d JOIN reference_table e ON(true)) ON (d.id > 0);
ERROR: cannot pushdown the subquery
DETAIL: There exist a reference table in the outer part of the outer join
QUERY PLAN
---------------------------------------------------------------------
Custom Scan (Citus Adaptive)
-> Distributed Subplan XXX_1
-> Custom Scan (Citus Adaptive)
Task Count: 4
Tasks Shown: One of 4
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> Seq Scan on distributed_table_92862439 a
Task Count: 1
Tasks Shown: All
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> Nested Loop
-> Nested Loop
-> Nested Loop Left Join
-> Seq Scan on reference_table_92862438 c
-> Nested Loop Left Join
-> Function Scan on read_intermediate_result intermediate_result
-> Materialize
-> Seq Scan on reference_table_92862438 b
-> Materialize
-> Seq Scan on reference_table_92862438 d
Filter: (id > 0)
-> Materialize
-> Seq Scan on reference_table_92862438 e
(25 rows)
SET client_min_messages TO WARNING;
DROP SCHEMA sqlancer_failures CASCADE;

View File

@ -302,7 +302,7 @@ FROM
LEFT JOIN multi_outer_join_third_reference t1 ON (r1.r_custkey = t1.t_custkey)
ORDER BY l_custkey, r_custkey, t_custkey;
-- Right join with single shard right most table should error out
-- Right join with single shard right most table should work
SELECT
l_custkey, r_custkey, t_custkey
FROM

View File

@ -302,13 +302,14 @@ FROM
LEFT JOIN multi_outer_join_third_reference t1 ON (r1.r_custkey = t1.t_custkey)
ORDER BY 1;
-- Right join with single shard right most table should error out
-- Right join with single shard right most table should work
SELECT
l_custkey, r_custkey, t_custkey
FROM
multi_outer_join_left_hash l1
LEFT JOIN multi_outer_join_right_hash r1 ON (l1.l_custkey = r1.r_custkey)
RIGHT JOIN multi_outer_join_third_reference t1 ON (r1.r_custkey = t1.t_custkey);
RIGHT JOIN multi_outer_join_third_reference t1 ON (r1.r_custkey = t1.t_custkey)
ORDER BY 1,2,3;
-- Right join with single shard left most table should work
SELECT

View File

@ -64,13 +64,25 @@ SELECT create_reference_table('reference_table');
CREATE TABLE distributed_table(user_id int, item_id int, buy_count int);
SELECT create_distributed_table('distributed_table', 'user_id');
INSERT INTO distributed_table VALUES
(1, 10),
(2, 22),
(3, 34),
(7, 40);
INSERT INTO reference_table VALUES
(1, '100'),
(null, '202'),
(4, '300'),
(null, '401'),
(null, '402');
-- postgres plans below queries by evaluating joins as below:
-- L
-- / \
-- ref L
-- / \
-- dist ref
-- so we should error out as reference table is in the outer part of the top level (left) outer join
SELECT count(*) FROM distributed_table a
LEFT JOIN reference_table b ON (true)
@ -123,7 +135,6 @@ SELECT create_reference_table('t4');
-- t1(ref) L
-- / \
-- t0(dist) t4(ref)
-- -- so we should error out
SELECT count(*) FROM (
SELECT ALL t4.c1, t0.c0, t0.c1 FROM ONLY t0
LEFT OUTER JOIN t4 ON CAST(masklen('142.158.96.44') AS BOOLEAN)
@ -189,7 +200,7 @@ ON (true);
-- unsupported outer JOIN in a sublevel INNER JOIN
SELECT
unsupported_join.*
COUNT(unsupported_join.*)
FROM
(distributed_table a
LEFT JOIN reference_table b ON (true)
@ -199,7 +210,7 @@ JOIN
-- unsupported outer JOIN in a sublevel LEFT JOIN
SELECT
unsupported_join.*
COUNT(unsupported_join.*)
FROM
(distributed_table a
LEFT JOIN reference_table b ON (true)
@ -208,7 +219,7 @@ LEFT JOIN
(reference_table d JOIN reference_table e ON(true)) ON (true);
SELECT
unsupported_join.*
COUNT(unsupported_join.*)
FROM
(distributed_table a
LEFT JOIN (SELECT * FROM reference_table OFFSET 0) b ON (true)
@ -224,7 +235,7 @@ ON (true);
-- unsupported outer JOIN in a sublevel RIGHT JOIN
SELECT
unsupported_join.*
COUNT(unsupported_join.*)
FROM
(distributed_table a
LEFT JOIN reference_table b ON (true)
@ -233,7 +244,7 @@ RIGHT JOIN
(reference_table d JOIN reference_table e ON(true)) ON (true);
SELECT
unsupported_join.*
COUNT(unsupported_join.*)
FROM
(distributed_table a
LEFT JOIN (SELECT * FROM reference_table OFFSET 0) b ON (true)
@ -247,7 +258,7 @@ RIGHT JOIN
)
ON (true);
EXPLAIN SELECT
EXPLAIN (COSTS OFF) SELECT
unsupported_join.*
FROM
(distributed_table a