wip: Phase - II: recursively plan non-recurring subqueries too

onder_rec_plan_join
Onur Tirtir 2022-11-22 18:35:33 +03:00
parent 279cf5e276
commit 6ad7352c1a
4 changed files with 85 additions and 30 deletions

View File

@ -873,12 +873,7 @@ RecursivelyPlanNonRecurringJoinNode(Node *distributedNode, Query *query,
}
else if (distributedRte->rtekind == RTE_SUBQUERY)
{
/*
* XXX: Similar to JoinExpr, we don't know how to recursively plan distributed
* subqueries within join expressions yet.
*/
ereport(DEBUG4, (errmsg("recursive planner cannot plan distributed "
"subqueries within join expressions yet")));
RecursivelyPlanSubquery(distributedRte->subquery, recursivePlanningContext);
return;
}
else

View File

@ -107,7 +107,7 @@ FROM (
ERROR: the query contains a join that requires repartitioning
HINT: Set citus.enable_repartition_joins to on to enable repartitioning
-- the LEFT JOIN conditon is not on the partition column (i.e., is it part_key divided by 2)
-- still, recursive planning will kick in to plan some part of the query
-- but, we can plan the query thanks to recursive planning
SET client_min_messages TO DEBUG1;
INSERT INTO agg_results_third (user_id, value_1_agg, value_2_agg )
SELECT user_id, sum(array_length(events_table, 1)), length(hasdone_event)
@ -145,14 +145,20 @@ FROM (
AND e.event_type IN (106, 107, 108)
) t2 ON (t1.user_id = (t2.user_id)/2)
GROUP BY t1.user_id, hasdone_event
) t GROUP BY user_id, hasdone_event;
) t GROUP BY user_id, hasdone_event
RETURNING user_id, value_1_agg, value_2_agg;
DEBUG: Set operations are not allowed in distributed INSERT ... SELECT queries
DEBUG: generating subplan XXX_1 for subquery SELECT u.user_id, 'step=>1'::text AS event, e."time" FROM public.users_table u, public.events_table e WHERE ((u.user_id OPERATOR(pg_catalog.=) e.user_id) AND (u.user_id OPERATOR(pg_catalog.>=) 10) AND (u.user_id OPERATOR(pg_catalog.<=) 25) AND (e.event_type OPERATOR(pg_catalog.=) ANY (ARRAY[100, 101, 102])))
DEBUG: generating subplan XXX_2 for subquery SELECT u.user_id, 'step=>2'::text AS event, e."time" FROM public.users_table u, public.events_table e WHERE ((u.user_id OPERATOR(pg_catalog.=) e.user_id) AND (u.user_id OPERATOR(pg_catalog.>=) 10) AND (u.user_id OPERATOR(pg_catalog.<=) 25) AND (e.event_type OPERATOR(pg_catalog.=) ANY (ARRAY[103, 104, 105])))
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT intermediate_result.user_id, intermediate_result.event, intermediate_result."time" FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer, event text, "time" timestamp without time zone) UNION SELECT intermediate_result.user_id, intermediate_result.event, intermediate_result."time" FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer, event text, "time" timestamp without time zone)
DEBUG: generating subplan XXX_1 for subquery SELECT u.user_id, 'step=>1'::text AS event, e."time" FROM public.users_table u, public.events_table e WHERE ((u.user_id OPERATOR(pg_catalog.=) e.user_id) AND (u.user_id OPERATOR(pg_catalog.>=) 10) AND (u.user_id OPERATOR(pg_catalog.<=) 25) AND (e.event_type OPERATOR(pg_catalog.=) ANY (ARRAY[100, 101, 102]))) UNION SELECT u.user_id, 'step=>2'::text AS event, e."time" FROM public.users_table u, public.events_table e WHERE ((u.user_id OPERATOR(pg_catalog.=) e.user_id) AND (u.user_id OPERATOR(pg_catalog.>=) 10) AND (u.user_id OPERATOR(pg_catalog.<=) 25) AND (e.event_type OPERATOR(pg_catalog.=) ANY (ARRAY[103, 104, 105])))
ERROR: cannot pushdown the subquery
DETAIL: Complex subqueries, CTEs and local tables cannot be in the outer part of an outer join with a distributed table
DEBUG: generating subplan XXX_2 for subquery SELECT DISTINCT user_id, 'Has done event'::text AS hasdone_event FROM public.events_table e WHERE ((user_id OPERATOR(pg_catalog.>=) 10) AND (user_id OPERATOR(pg_catalog.<=) 25) AND (event_type OPERATOR(pg_catalog.=) ANY (ARRAY[106, 107, 108])))
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT user_id, int4(sum(array_length(events_table, 1))) AS value_1_agg, length(hasdone_event) AS value_2_agg FROM (SELECT t1.user_id, array_agg(t1.event ORDER BY t1."time") AS events_table, COALESCE(t2.hasdone_event, 'Has not done event'::text) AS hasdone_event FROM ((SELECT intermediate_result.user_id, intermediate_result.event, intermediate_result."time" FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer, event text, "time" timestamp without time zone)) t1 LEFT JOIN (SELECT intermediate_result.user_id, intermediate_result.hasdone_event FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer, hasdone_event text)) t2 ON ((t1.user_id OPERATOR(pg_catalog.=) (t2.user_id OPERATOR(pg_catalog./) 2)))) GROUP BY t1.user_id, t2.hasdone_event) t GROUP BY user_id, hasdone_event
DEBUG: Collecting INSERT ... SELECT results on coordinator
user_id | value_1_agg | value_2_agg
---------------------------------------------------------------------
(0 rows)
RESET client_min_messages;
---------------------------------------------------------------------
---------------------------------------------------------------------
@ -229,7 +235,7 @@ ORDER BY
ERROR: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator
-- not pushable since the JOIN condition is not equi JOIN
-- (subquery_1 JOIN subquery_2)
-- still, recursive planning will kick in
-- but, we can plan the query thanks to recursive planning
SET client_min_messages TO DEBUG1;
INSERT INTO agg_results_third (user_id, value_1_agg, value_2_agg)
SELECT
@ -295,14 +301,20 @@ WHERE
GROUP BY
count_pay, user_id
ORDER BY
count_pay;
count_pay
RETURNING user_id, value_1_agg, value_2_agg;
DEBUG: Set operations are not allowed in distributed INSERT ... SELECT queries
DEBUG: generating subplan XXX_1 for subquery SELECT users_table.user_id, 'action=>1'::text AS event, events_table."time" FROM public.users_table, public.events_table WHERE ((users_table.user_id OPERATOR(pg_catalog.=) events_table.user_id) AND (users_table.user_id OPERATOR(pg_catalog.>=) 10) AND (users_table.user_id OPERATOR(pg_catalog.<=) 70) AND (events_table.event_type OPERATOR(pg_catalog.>) 10) AND (events_table.event_type OPERATOR(pg_catalog.<) 12))
DEBUG: generating subplan XXX_2 for subquery SELECT users_table.user_id, 'action=>2'::text AS event, events_table."time" FROM public.users_table, public.events_table WHERE ((users_table.user_id OPERATOR(pg_catalog.=) events_table.user_id) AND (users_table.user_id OPERATOR(pg_catalog.>=) 10) AND (users_table.user_id OPERATOR(pg_catalog.<=) 70) AND (events_table.event_type OPERATOR(pg_catalog.>) 12) AND (events_table.event_type OPERATOR(pg_catalog.<) 14))
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT intermediate_result.user_id, intermediate_result.event, intermediate_result."time" FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer, event text, "time" timestamp without time zone) UNION SELECT intermediate_result.user_id, intermediate_result.event, intermediate_result."time" FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer, event text, "time" timestamp without time zone)
DEBUG: generating subplan XXX_1 for subquery SELECT users_table.user_id, 'action=>1'::text AS event, events_table."time" FROM public.users_table, public.events_table WHERE ((users_table.user_id OPERATOR(pg_catalog.=) events_table.user_id) AND (users_table.user_id OPERATOR(pg_catalog.>=) 10) AND (users_table.user_id OPERATOR(pg_catalog.<=) 70) AND (events_table.event_type OPERATOR(pg_catalog.>) 10) AND (events_table.event_type OPERATOR(pg_catalog.<) 12)) UNION SELECT users_table.user_id, 'action=>2'::text AS event, events_table."time" FROM public.users_table, public.events_table WHERE ((users_table.user_id OPERATOR(pg_catalog.=) events_table.user_id) AND (users_table.user_id OPERATOR(pg_catalog.>=) 10) AND (users_table.user_id OPERATOR(pg_catalog.<=) 70) AND (events_table.event_type OPERATOR(pg_catalog.>) 12) AND (events_table.event_type OPERATOR(pg_catalog.<) 14))
ERROR: cannot pushdown the subquery
DETAIL: Complex subqueries, CTEs and local tables cannot be in the outer part of an outer join with a distributed table
DEBUG: generating subplan XXX_2 for subquery SELECT user_id, count(*) AS count_pay FROM public.users_table WHERE ((user_id OPERATOR(pg_catalog.>=) 10) AND (user_id OPERATOR(pg_catalog.<=) 70) AND (value_1 OPERATOR(pg_catalog.>) 15) AND (value_1 OPERATOR(pg_catalog.<) 17)) GROUP BY user_id HAVING (count(*) OPERATOR(pg_catalog.>) 1)
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT user_id, int4(avg(array_length(events_table, 1))) AS value_1_agg, int4(count_pay) AS value_2_agg FROM (SELECT subquery_1.user_id, array_agg(subquery_1.event ORDER BY subquery_1."time") AS events_table, COALESCE(subquery_2.count_pay, (0)::bigint) AS count_pay FROM ((SELECT intermediate_result.user_id, intermediate_result.event, intermediate_result."time" FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer, event text, "time" timestamp without time zone)) subquery_1 LEFT JOIN (SELECT intermediate_result.user_id, intermediate_result.count_pay FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer, count_pay bigint)) subquery_2 ON ((subquery_1.user_id OPERATOR(pg_catalog.>) subquery_2.user_id))) GROUP BY subquery_1.user_id, subquery_2.count_pay) subquery_top WHERE (array_ndims(events_table) OPERATOR(pg_catalog.>) 0) GROUP BY count_pay, user_id ORDER BY count_pay
DEBUG: Collecting INSERT ... SELECT results on coordinator
user_id | value_1_agg | value_2_agg
---------------------------------------------------------------------
(0 rows)
RESET client_min_messages;
---------------------------------------------------------------------
---------------------------------------------------------------------

View File

@ -840,8 +840,18 @@ LEFT JOIN
WHERE distinct_undistribured.user_id = test_cte_distributed.user_id)) exsist_in_distributed
ON distinct_undistribured.user_id = exsist_in_distributed.user_id
ORDER BY 2 DESC, 1 DESC;
ERROR: cannot pushdown the subquery
DETAIL: Complex subqueries, CTEs and local tables cannot be in the outer part of an outer join with a distributed table
user_id | user_id
---------------------------------------------------------------------
7 |
6 |
4 |
2 |
8 | 8
5 | 5
3 | 3
1 | 1
(8 rows)
-- similar query as the above, but this time
-- use NOT EXITS, which is pretty common struct
WITH distinct_undistribured AS
@ -856,8 +866,18 @@ LEFT JOIN
(SELECT NULL
FROM distinct_undistribured
WHERE distinct_undistribured.user_id = test_cte_distributed.user_id)) exsist_in_distributed ON distinct_undistribured.user_id = exsist_in_distributed.user_id;
ERROR: cannot pushdown the subquery
DETAIL: Complex subqueries, CTEs and local tables cannot be in the outer part of an outer join with a distributed table
user_id | user_id
---------------------------------------------------------------------
2 |
4 |
7 |
3 |
6 |
5 |
1 |
8 |
(8 rows)
-- same NOT EXISTS struct, but with CTE
-- so should work
WITH distinct_undistribured AS (
@ -905,8 +925,11 @@ LEFT JOIN
(SELECT NULL
FROM distinct_undistribured d2
WHERE d1.user_id = d2.user_id )) AS bar USING (user_id);
ERROR: cannot pushdown the subquery
DETAIL: Complex subqueries, CTEs and local tables cannot be in the outer part of an outer join with a distributed table
count
---------------------------------------------------------------------
8
(1 row)
-- should work fine with materialized ctes
WITH distinct_undistribured AS MATERIALIZED (
SELECT DISTINCT user_id
@ -951,8 +974,18 @@ LEFT JOIN
WHERE distinct_undistribured.user_id = test_cte_distributed.user_id)) exsist_in_distributed
ON distinct_undistribured.user_id = exsist_in_distributed.user_id
ORDER BY 2 DESC, 1 DESC;
ERROR: cannot pushdown the subquery
DETAIL: Complex subqueries, CTEs and local tables cannot be in the outer part of an outer join with a distributed table
user_id | user_id
---------------------------------------------------------------------
7 |
6 |
4 |
2 |
8 | 8
5 | 5
3 | 3
1 | 1
(8 rows)
WITH distinct_undistribured AS MATERIALIZED
(SELECT DISTINCT user_id
FROM test_cte)
@ -965,8 +998,18 @@ LEFT JOIN
(SELECT NULL
FROM distinct_undistribured
WHERE distinct_undistribured.user_id = test_cte_distributed.user_id)) exsist_in_distributed ON distinct_undistribured.user_id = exsist_in_distributed.user_id;
ERROR: cannot pushdown the subquery
DETAIL: Complex subqueries, CTEs and local tables cannot be in the outer part of an outer join with a distributed table
user_id | user_id
---------------------------------------------------------------------
2 |
4 |
7 |
3 |
6 |
5 |
1 |
8 |
(8 rows)
-- NOT EXISTS struct, with cte inlining disabled
WITH distinct_undistribured AS MATERIALIZED(
SELECT DISTINCT user_id
@ -1013,8 +1056,11 @@ LEFT JOIN
(SELECT NULL
FROM distinct_undistribured d2
WHERE d1.user_id = d2.user_id )) AS bar USING (user_id);
ERROR: cannot pushdown the subquery
DETAIL: Complex subqueries, CTEs and local tables cannot be in the outer part of an outer join with a distributed table
count
---------------------------------------------------------------------
8
(1 row)
-- some test with failures
WITH a AS MATERIALIZED (SELECT * FROM users_table LIMIT 10)
SELECT user_id/0 FROM users_table JOIN a USING (user_id);

View File

@ -112,7 +112,7 @@ FROM (
) t GROUP BY user_id, hasdone_event;
-- the LEFT JOIN conditon is not on the partition column (i.e., is it part_key divided by 2)
-- still, recursive planning will kick in to plan some part of the query
-- but, we can plan the query thanks to recursive planning
SET client_min_messages TO DEBUG1;
INSERT INTO agg_results_third (user_id, value_1_agg, value_2_agg )
@ -153,7 +153,8 @@ FROM (
) t2 ON (t1.user_id = (t2.user_id)/2)
GROUP BY t1.user_id, hasdone_event
) t GROUP BY user_id, hasdone_event;
) t GROUP BY user_id, hasdone_event
RETURNING user_id, value_1_agg, value_2_agg;
RESET client_min_messages;
------------------------------------
@ -232,7 +233,7 @@ ORDER BY
-- not pushable since the JOIN condition is not equi JOIN
-- (subquery_1 JOIN subquery_2)
-- still, recursive planning will kick in
-- but, we can plan the query thanks to recursive planning
SET client_min_messages TO DEBUG1;
INSERT INTO agg_results_third (user_id, value_1_agg, value_2_agg)
SELECT
@ -298,7 +299,8 @@ WHERE
GROUP BY
count_pay, user_id
ORDER BY
count_pay;
count_pay
RETURNING user_id, value_1_agg, value_2_agg;
RESET client_min_messages;
------------------------------------