diff --git a/src/backend/distributed/planner/recursive_planning.c b/src/backend/distributed/planner/recursive_planning.c index 156988360..d99002b40 100644 --- a/src/backend/distributed/planner/recursive_planning.c +++ b/src/backend/distributed/planner/recursive_planning.c @@ -873,12 +873,7 @@ RecursivelyPlanNonRecurringJoinNode(Node *distributedNode, Query *query, } else if (distributedRte->rtekind == RTE_SUBQUERY) { - /* - * XXX: Similar to JoinExpr, we don't know how to recursively plan distributed - * subqueries within join expressions yet. - */ - ereport(DEBUG4, (errmsg("recursive planner cannot plan distributed " - "subqueries within join expressions yet"))); + RecursivelyPlanSubquery(distributedRte->subquery, recursivePlanningContext); return; } else diff --git a/src/test/regress/expected/multi_insert_select_non_pushable_queries.out b/src/test/regress/expected/multi_insert_select_non_pushable_queries.out index 222157a87..5af9744dc 100644 --- a/src/test/regress/expected/multi_insert_select_non_pushable_queries.out +++ b/src/test/regress/expected/multi_insert_select_non_pushable_queries.out @@ -107,7 +107,7 @@ FROM ( ERROR: the query contains a join that requires repartitioning HINT: Set citus.enable_repartition_joins to on to enable repartitioning -- the LEFT JOIN conditon is not on the partition column (i.e., is it part_key divided by 2) --- still, recursive planning will kick in to plan some part of the query +-- but, we can plan the query thanks to recursive planning SET client_min_messages TO DEBUG1; INSERT INTO agg_results_third (user_id, value_1_agg, value_2_agg ) SELECT user_id, sum(array_length(events_table, 1)), length(hasdone_event) @@ -145,14 +145,20 @@ FROM ( AND e.event_type IN (106, 107, 108) ) t2 ON (t1.user_id = (t2.user_id)/2) GROUP BY t1.user_id, hasdone_event -) t GROUP BY user_id, hasdone_event; +) t GROUP BY user_id, hasdone_event +RETURNING user_id, value_1_agg, value_2_agg; DEBUG: Set operations are not allowed in distributed INSERT ... SELECT queries DEBUG: generating subplan XXX_1 for subquery SELECT u.user_id, 'step=>1'::text AS event, e."time" FROM public.users_table u, public.events_table e WHERE ((u.user_id OPERATOR(pg_catalog.=) e.user_id) AND (u.user_id OPERATOR(pg_catalog.>=) 10) AND (u.user_id OPERATOR(pg_catalog.<=) 25) AND (e.event_type OPERATOR(pg_catalog.=) ANY (ARRAY[100, 101, 102]))) DEBUG: generating subplan XXX_2 for subquery SELECT u.user_id, 'step=>2'::text AS event, e."time" FROM public.users_table u, public.events_table e WHERE ((u.user_id OPERATOR(pg_catalog.=) e.user_id) AND (u.user_id OPERATOR(pg_catalog.>=) 10) AND (u.user_id OPERATOR(pg_catalog.<=) 25) AND (e.event_type OPERATOR(pg_catalog.=) ANY (ARRAY[103, 104, 105]))) DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT intermediate_result.user_id, intermediate_result.event, intermediate_result."time" FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer, event text, "time" timestamp without time zone) UNION SELECT intermediate_result.user_id, intermediate_result.event, intermediate_result."time" FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer, event text, "time" timestamp without time zone) DEBUG: generating subplan XXX_1 for subquery SELECT u.user_id, 'step=>1'::text AS event, e."time" FROM public.users_table u, public.events_table e WHERE ((u.user_id OPERATOR(pg_catalog.=) e.user_id) AND (u.user_id OPERATOR(pg_catalog.>=) 10) AND (u.user_id OPERATOR(pg_catalog.<=) 25) AND (e.event_type OPERATOR(pg_catalog.=) ANY (ARRAY[100, 101, 102]))) UNION SELECT u.user_id, 'step=>2'::text AS event, e."time" FROM public.users_table u, public.events_table e WHERE ((u.user_id OPERATOR(pg_catalog.=) e.user_id) AND (u.user_id OPERATOR(pg_catalog.>=) 10) AND (u.user_id OPERATOR(pg_catalog.<=) 25) AND (e.event_type OPERATOR(pg_catalog.=) ANY (ARRAY[103, 104, 105]))) -ERROR: cannot pushdown the subquery -DETAIL: Complex subqueries, CTEs and local tables cannot be in the outer part of an outer join with a distributed table +DEBUG: generating subplan XXX_2 for subquery SELECT DISTINCT user_id, 'Has done event'::text AS hasdone_event FROM public.events_table e WHERE ((user_id OPERATOR(pg_catalog.>=) 10) AND (user_id OPERATOR(pg_catalog.<=) 25) AND (event_type OPERATOR(pg_catalog.=) ANY (ARRAY[106, 107, 108]))) +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT user_id, int4(sum(array_length(events_table, 1))) AS value_1_agg, length(hasdone_event) AS value_2_agg FROM (SELECT t1.user_id, array_agg(t1.event ORDER BY t1."time") AS events_table, COALESCE(t2.hasdone_event, 'Has not done event'::text) AS hasdone_event FROM ((SELECT intermediate_result.user_id, intermediate_result.event, intermediate_result."time" FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer, event text, "time" timestamp without time zone)) t1 LEFT JOIN (SELECT intermediate_result.user_id, intermediate_result.hasdone_event FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer, hasdone_event text)) t2 ON ((t1.user_id OPERATOR(pg_catalog.=) (t2.user_id OPERATOR(pg_catalog./) 2)))) GROUP BY t1.user_id, t2.hasdone_event) t GROUP BY user_id, hasdone_event +DEBUG: Collecting INSERT ... SELECT results on coordinator + user_id | value_1_agg | value_2_agg +--------------------------------------------------------------------- +(0 rows) + RESET client_min_messages; --------------------------------------------------------------------- --------------------------------------------------------------------- @@ -229,7 +235,7 @@ ORDER BY ERROR: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator -- not pushable since the JOIN condition is not equi JOIN -- (subquery_1 JOIN subquery_2) --- still, recursive planning will kick in +-- but, we can plan the query thanks to recursive planning SET client_min_messages TO DEBUG1; INSERT INTO agg_results_third (user_id, value_1_agg, value_2_agg) SELECT @@ -295,14 +301,20 @@ WHERE GROUP BY count_pay, user_id ORDER BY - count_pay; + count_pay +RETURNING user_id, value_1_agg, value_2_agg; DEBUG: Set operations are not allowed in distributed INSERT ... SELECT queries DEBUG: generating subplan XXX_1 for subquery SELECT users_table.user_id, 'action=>1'::text AS event, events_table."time" FROM public.users_table, public.events_table WHERE ((users_table.user_id OPERATOR(pg_catalog.=) events_table.user_id) AND (users_table.user_id OPERATOR(pg_catalog.>=) 10) AND (users_table.user_id OPERATOR(pg_catalog.<=) 70) AND (events_table.event_type OPERATOR(pg_catalog.>) 10) AND (events_table.event_type OPERATOR(pg_catalog.<) 12)) DEBUG: generating subplan XXX_2 for subquery SELECT users_table.user_id, 'action=>2'::text AS event, events_table."time" FROM public.users_table, public.events_table WHERE ((users_table.user_id OPERATOR(pg_catalog.=) events_table.user_id) AND (users_table.user_id OPERATOR(pg_catalog.>=) 10) AND (users_table.user_id OPERATOR(pg_catalog.<=) 70) AND (events_table.event_type OPERATOR(pg_catalog.>) 12) AND (events_table.event_type OPERATOR(pg_catalog.<) 14)) DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT intermediate_result.user_id, intermediate_result.event, intermediate_result."time" FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer, event text, "time" timestamp without time zone) UNION SELECT intermediate_result.user_id, intermediate_result.event, intermediate_result."time" FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer, event text, "time" timestamp without time zone) DEBUG: generating subplan XXX_1 for subquery SELECT users_table.user_id, 'action=>1'::text AS event, events_table."time" FROM public.users_table, public.events_table WHERE ((users_table.user_id OPERATOR(pg_catalog.=) events_table.user_id) AND (users_table.user_id OPERATOR(pg_catalog.>=) 10) AND (users_table.user_id OPERATOR(pg_catalog.<=) 70) AND (events_table.event_type OPERATOR(pg_catalog.>) 10) AND (events_table.event_type OPERATOR(pg_catalog.<) 12)) UNION SELECT users_table.user_id, 'action=>2'::text AS event, events_table."time" FROM public.users_table, public.events_table WHERE ((users_table.user_id OPERATOR(pg_catalog.=) events_table.user_id) AND (users_table.user_id OPERATOR(pg_catalog.>=) 10) AND (users_table.user_id OPERATOR(pg_catalog.<=) 70) AND (events_table.event_type OPERATOR(pg_catalog.>) 12) AND (events_table.event_type OPERATOR(pg_catalog.<) 14)) -ERROR: cannot pushdown the subquery -DETAIL: Complex subqueries, CTEs and local tables cannot be in the outer part of an outer join with a distributed table +DEBUG: generating subplan XXX_2 for subquery SELECT user_id, count(*) AS count_pay FROM public.users_table WHERE ((user_id OPERATOR(pg_catalog.>=) 10) AND (user_id OPERATOR(pg_catalog.<=) 70) AND (value_1 OPERATOR(pg_catalog.>) 15) AND (value_1 OPERATOR(pg_catalog.<) 17)) GROUP BY user_id HAVING (count(*) OPERATOR(pg_catalog.>) 1) +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT user_id, int4(avg(array_length(events_table, 1))) AS value_1_agg, int4(count_pay) AS value_2_agg FROM (SELECT subquery_1.user_id, array_agg(subquery_1.event ORDER BY subquery_1."time") AS events_table, COALESCE(subquery_2.count_pay, (0)::bigint) AS count_pay FROM ((SELECT intermediate_result.user_id, intermediate_result.event, intermediate_result."time" FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer, event text, "time" timestamp without time zone)) subquery_1 LEFT JOIN (SELECT intermediate_result.user_id, intermediate_result.count_pay FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer, count_pay bigint)) subquery_2 ON ((subquery_1.user_id OPERATOR(pg_catalog.>) subquery_2.user_id))) GROUP BY subquery_1.user_id, subquery_2.count_pay) subquery_top WHERE (array_ndims(events_table) OPERATOR(pg_catalog.>) 0) GROUP BY count_pay, user_id ORDER BY count_pay +DEBUG: Collecting INSERT ... SELECT results on coordinator + user_id | value_1_agg | value_2_agg +--------------------------------------------------------------------- +(0 rows) + RESET client_min_messages; --------------------------------------------------------------------- --------------------------------------------------------------------- diff --git a/src/test/regress/expected/with_basics.out b/src/test/regress/expected/with_basics.out index 0d0070fd6..d4b83632f 100644 --- a/src/test/regress/expected/with_basics.out +++ b/src/test/regress/expected/with_basics.out @@ -840,8 +840,18 @@ LEFT JOIN WHERE distinct_undistribured.user_id = test_cte_distributed.user_id)) exsist_in_distributed ON distinct_undistribured.user_id = exsist_in_distributed.user_id ORDER BY 2 DESC, 1 DESC; -ERROR: cannot pushdown the subquery -DETAIL: Complex subqueries, CTEs and local tables cannot be in the outer part of an outer join with a distributed table + user_id | user_id +--------------------------------------------------------------------- + 7 | + 6 | + 4 | + 2 | + 8 | 8 + 5 | 5 + 3 | 3 + 1 | 1 +(8 rows) + -- similar query as the above, but this time -- use NOT EXITS, which is pretty common struct WITH distinct_undistribured AS @@ -856,8 +866,18 @@ LEFT JOIN (SELECT NULL FROM distinct_undistribured WHERE distinct_undistribured.user_id = test_cte_distributed.user_id)) exsist_in_distributed ON distinct_undistribured.user_id = exsist_in_distributed.user_id; -ERROR: cannot pushdown the subquery -DETAIL: Complex subqueries, CTEs and local tables cannot be in the outer part of an outer join with a distributed table + user_id | user_id +--------------------------------------------------------------------- + 2 | + 4 | + 7 | + 3 | + 6 | + 5 | + 1 | + 8 | +(8 rows) + -- same NOT EXISTS struct, but with CTE -- so should work WITH distinct_undistribured AS ( @@ -905,8 +925,11 @@ LEFT JOIN (SELECT NULL FROM distinct_undistribured d2 WHERE d1.user_id = d2.user_id )) AS bar USING (user_id); -ERROR: cannot pushdown the subquery -DETAIL: Complex subqueries, CTEs and local tables cannot be in the outer part of an outer join with a distributed table + count +--------------------------------------------------------------------- + 8 +(1 row) + -- should work fine with materialized ctes WITH distinct_undistribured AS MATERIALIZED ( SELECT DISTINCT user_id @@ -951,8 +974,18 @@ LEFT JOIN WHERE distinct_undistribured.user_id = test_cte_distributed.user_id)) exsist_in_distributed ON distinct_undistribured.user_id = exsist_in_distributed.user_id ORDER BY 2 DESC, 1 DESC; -ERROR: cannot pushdown the subquery -DETAIL: Complex subqueries, CTEs and local tables cannot be in the outer part of an outer join with a distributed table + user_id | user_id +--------------------------------------------------------------------- + 7 | + 6 | + 4 | + 2 | + 8 | 8 + 5 | 5 + 3 | 3 + 1 | 1 +(8 rows) + WITH distinct_undistribured AS MATERIALIZED (SELECT DISTINCT user_id FROM test_cte) @@ -965,8 +998,18 @@ LEFT JOIN (SELECT NULL FROM distinct_undistribured WHERE distinct_undistribured.user_id = test_cte_distributed.user_id)) exsist_in_distributed ON distinct_undistribured.user_id = exsist_in_distributed.user_id; -ERROR: cannot pushdown the subquery -DETAIL: Complex subqueries, CTEs and local tables cannot be in the outer part of an outer join with a distributed table + user_id | user_id +--------------------------------------------------------------------- + 2 | + 4 | + 7 | + 3 | + 6 | + 5 | + 1 | + 8 | +(8 rows) + -- NOT EXISTS struct, with cte inlining disabled WITH distinct_undistribured AS MATERIALIZED( SELECT DISTINCT user_id @@ -1013,8 +1056,11 @@ LEFT JOIN (SELECT NULL FROM distinct_undistribured d2 WHERE d1.user_id = d2.user_id )) AS bar USING (user_id); -ERROR: cannot pushdown the subquery -DETAIL: Complex subqueries, CTEs and local tables cannot be in the outer part of an outer join with a distributed table + count +--------------------------------------------------------------------- + 8 +(1 row) + -- some test with failures WITH a AS MATERIALIZED (SELECT * FROM users_table LIMIT 10) SELECT user_id/0 FROM users_table JOIN a USING (user_id); diff --git a/src/test/regress/sql/multi_insert_select_non_pushable_queries.sql b/src/test/regress/sql/multi_insert_select_non_pushable_queries.sql index 399883ed9..b4654144b 100644 --- a/src/test/regress/sql/multi_insert_select_non_pushable_queries.sql +++ b/src/test/regress/sql/multi_insert_select_non_pushable_queries.sql @@ -112,7 +112,7 @@ FROM ( ) t GROUP BY user_id, hasdone_event; -- the LEFT JOIN conditon is not on the partition column (i.e., is it part_key divided by 2) --- still, recursive planning will kick in to plan some part of the query +-- but, we can plan the query thanks to recursive planning SET client_min_messages TO DEBUG1; INSERT INTO agg_results_third (user_id, value_1_agg, value_2_agg ) @@ -153,7 +153,8 @@ FROM ( ) t2 ON (t1.user_id = (t2.user_id)/2) GROUP BY t1.user_id, hasdone_event -) t GROUP BY user_id, hasdone_event; +) t GROUP BY user_id, hasdone_event +RETURNING user_id, value_1_agg, value_2_agg; RESET client_min_messages; ------------------------------------ @@ -232,7 +233,7 @@ ORDER BY -- not pushable since the JOIN condition is not equi JOIN -- (subquery_1 JOIN subquery_2) --- still, recursive planning will kick in +-- but, we can plan the query thanks to recursive planning SET client_min_messages TO DEBUG1; INSERT INTO agg_results_third (user_id, value_1_agg, value_2_agg) SELECT @@ -298,7 +299,8 @@ WHERE GROUP BY count_pay, user_id ORDER BY - count_pay; + count_pay +RETURNING user_id, value_1_agg, value_2_agg; RESET client_min_messages; ------------------------------------