fix order of recursive sublink planning (#6657)

We should do the sublink conversations at the end of the recursive
planning because earlier steps might have transformed the query into a
shape that needs recursively planning the sublinks.

DESCRIPTION: Fixes early sublink check at recursive planner.

Related to PR https://github.com/citusdata/citus/pull/6650
pull/6640/head
aykut-bozkurt 2023-01-27 14:35:16 +03:00 committed by GitHub
parent 97dba0ac00
commit 8870f0f90b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 138 additions and 23 deletions

View File

@ -316,23 +316,6 @@ RecursivelyPlanSubqueriesAndCTEs(Query *query, RecursivePlanningContext *context
RecursivelyPlanSetOperations(query, (Node *) query->setOperations, context);
}
/*
* If the FROM clause is recurring (does not contain a distributed table),
* then we cannot have any distributed tables appearing in subqueries in
* the SELECT and WHERE clauses.
*/
if (ShouldRecursivelyPlanSublinks(query))
{
/* replace all subqueries in the WHERE clause */
if (query->jointree && query->jointree->quals)
{
RecursivelyPlanAllSubqueries((Node *) query->jointree->quals, context);
}
/* replace all subqueries in the SELECT clause */
RecursivelyPlanAllSubqueries((Node *) query->targetList, context);
}
if (query->havingQual != NULL)
{
if (NodeContainsSubqueryReferencingOuterQuery(query->havingQual))
@ -379,6 +362,27 @@ RecursivelyPlanSubqueriesAndCTEs(Query *query, RecursivePlanningContext *context
query, context);
}
/*
* If the FROM clause is recurring (does not contain a distributed table),
* then we cannot have any distributed tables appearing in subqueries in
* the SELECT and WHERE clauses.
*
* We do the sublink conversations at the end of the recursive planning
* because earlier steps might have transformed the query into a
* shape that needs recursively planning the sublinks.
*/
if (ShouldRecursivelyPlanSublinks(query))
{
/* replace all subqueries in the WHERE clause */
if (query->jointree && query->jointree->quals)
{
RecursivelyPlanAllSubqueries((Node *) query->jointree->quals, context);
}
/* replace all subqueries in the SELECT clause */
RecursivelyPlanAllSubqueries((Node *) query->targetList, context);
}
return NULL;
}

View File

@ -1463,9 +1463,8 @@ SELECT COUNT(*) FROM distributed_table_pkey JOIN postgres_table using(key)
WHERE distributed_table_pkey.key IN (SELECT key FROM distributed_table_pkey WHERE key = 5);
DEBUG: Wrapping relation "distributed_table_pkey" to a subquery
DEBUG: generating subplan XXX_1 for subquery SELECT key FROM local_table_join.distributed_table_pkey WHERE (key OPERATOR(pg_catalog.=) 5)
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((SELECT distributed_table_pkey_1.key, NULL::text AS value, NULL::jsonb AS value_2 FROM (SELECT intermediate_result.key FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer)) distributed_table_pkey_1) distributed_table_pkey JOIN local_table_join.postgres_table USING (key)) WHERE (distributed_table_pkey.key OPERATOR(pg_catalog.=) ANY (SELECT distributed_table_pkey_1.key FROM local_table_join.distributed_table_pkey distributed_table_pkey_1 WHERE (distributed_table_pkey_1.key OPERATOR(pg_catalog.=) 5)))
DEBUG: generating subplan XXX_1 for subquery SELECT key FROM local_table_join.distributed_table_pkey WHERE (key OPERATOR(pg_catalog.=) 5)
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((SELECT distributed_table_pkey_1.key, NULL::text AS value, NULL::jsonb AS value_2 FROM (SELECT intermediate_result.key FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer)) distributed_table_pkey_1) distributed_table_pkey JOIN local_table_join.postgres_table USING (key)) WHERE (distributed_table_pkey.key OPERATOR(pg_catalog.=) ANY (SELECT intermediate_result.key FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer)))
DEBUG: generating subplan XXX_2 for subquery SELECT key FROM local_table_join.distributed_table_pkey WHERE (key OPERATOR(pg_catalog.=) 5)
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((SELECT distributed_table_pkey_1.key, NULL::text AS value, NULL::jsonb AS value_2 FROM (SELECT intermediate_result.key FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer)) distributed_table_pkey_1) distributed_table_pkey JOIN local_table_join.postgres_table USING (key)) WHERE (distributed_table_pkey.key OPERATOR(pg_catalog.=) ANY (SELECT intermediate_result.key FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer)))
count
---------------------------------------------------------------------
100
@ -1475,9 +1474,8 @@ SELECT COUNT(*) FROM distributed_table_pkey JOIN postgres_table using(key)
WHERE distributed_table_pkey.key IN (SELECT key FROM distributed_table_pkey WHERE key = 5) AND distributed_table_pkey.key = 5;
DEBUG: Wrapping relation "distributed_table_pkey" to a subquery
DEBUG: generating subplan XXX_1 for subquery SELECT key FROM local_table_join.distributed_table_pkey WHERE (key OPERATOR(pg_catalog.=) 5)
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((SELECT distributed_table_pkey_1.key, NULL::text AS value, NULL::jsonb AS value_2 FROM (SELECT intermediate_result.key FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer)) distributed_table_pkey_1) distributed_table_pkey JOIN local_table_join.postgres_table USING (key)) WHERE ((distributed_table_pkey.key OPERATOR(pg_catalog.=) ANY (SELECT distributed_table_pkey_1.key FROM local_table_join.distributed_table_pkey distributed_table_pkey_1 WHERE (distributed_table_pkey_1.key OPERATOR(pg_catalog.=) 5))) AND (distributed_table_pkey.key OPERATOR(pg_catalog.=) 5))
DEBUG: generating subplan XXX_1 for subquery SELECT key FROM local_table_join.distributed_table_pkey WHERE (key OPERATOR(pg_catalog.=) 5)
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((SELECT distributed_table_pkey_1.key, NULL::text AS value, NULL::jsonb AS value_2 FROM (SELECT intermediate_result.key FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer)) distributed_table_pkey_1) distributed_table_pkey JOIN local_table_join.postgres_table USING (key)) WHERE ((distributed_table_pkey.key OPERATOR(pg_catalog.=) ANY (SELECT intermediate_result.key FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer))) AND (distributed_table_pkey.key OPERATOR(pg_catalog.=) 5))
DEBUG: generating subplan XXX_2 for subquery SELECT key FROM local_table_join.distributed_table_pkey WHERE (key OPERATOR(pg_catalog.=) 5)
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((SELECT distributed_table_pkey_1.key, NULL::text AS value, NULL::jsonb AS value_2 FROM (SELECT intermediate_result.key FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer)) distributed_table_pkey_1) distributed_table_pkey JOIN local_table_join.postgres_table USING (key)) WHERE ((distributed_table_pkey.key OPERATOR(pg_catalog.=) ANY (SELECT intermediate_result.key FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer))) AND (distributed_table_pkey.key OPERATOR(pg_catalog.=) 5))
count
---------------------------------------------------------------------
100

View File

@ -503,3 +503,82 @@ DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT user_id FROM
---------------------------------------------------------------------
(0 rows)
CREATE TABLE dist(id int, value int);
SELECT create_distributed_table('dist','id');
create_distributed_table
---------------------------------------------------------------------
(1 row)
INSERT INTO dist SELECT i, i FROM generate_series(0,100) i;
DEBUG: distributed INSERT ... SELECT can only select from distributed tables
DEBUG: Collecting INSERT ... SELECT results on coordinator
CREATE TABLE ref(id int);
SELECT create_reference_table('ref');
create_reference_table
---------------------------------------------------------------------
(1 row)
INSERT INTO ref SELECT i FROM generate_series(50,150) i;
DEBUG: distributed INSERT ... SELECT can only select from distributed tables
DEBUG: Collecting INSERT ... SELECT results on coordinator
CREATE TABLE local(id int);
INSERT INTO local SELECT i FROM generate_series(50,150) i;
-- planner recursively plans local table in local-dist join and then the whole query is routed
SELECT COUNT(*) FROM dist JOIN local USING(id)
WHERE
dist.id IN (SELECT id FROM dist WHERE id = 55) AND
dist.id = 55 AND
dist.value IN (SELECT value FROM dist WHERE id = 55);
DEBUG: Wrapping relation "local" to a subquery
DEBUG: generating subplan XXX_1 for subquery SELECT id FROM public.local WHERE (id OPERATOR(pg_catalog.=) 55)
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (public.dist JOIN (SELECT local_1.id FROM (SELECT intermediate_result.id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(id integer)) local_1) local USING (id)) WHERE ((dist.id OPERATOR(pg_catalog.=) ANY (SELECT dist_1.id FROM public.dist dist_1 WHERE (dist_1.id OPERATOR(pg_catalog.=) 55))) AND (dist.id OPERATOR(pg_catalog.=) 55) AND (dist.value OPERATOR(pg_catalog.=) ANY (SELECT dist_1.value FROM public.dist dist_1 WHERE (dist_1.id OPERATOR(pg_catalog.=) 55))))
count
---------------------------------------------------------------------
1
(1 row)
-- subquery in WHERE clause should be recursively planned after planner recursively plans recurring full join
SELECT COUNT(*) FROM ref FULL JOIN dist USING (id)
WHERE
dist.id IN (SELECT id FROM dist GROUP BY id);
DEBUG: recursively planning right side of the full join since the other side is a recurring rel
DEBUG: recursively planning distributed relation "dist" since it is part of a distributed join node that is outer joined with a recurring rel
DEBUG: Wrapping relation "dist" to a subquery
DEBUG: generating subplan XXX_1 for subquery SELECT id FROM public.dist WHERE true
DEBUG: generating subplan XXX_2 for subquery SELECT id FROM public.dist GROUP BY id
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (public.ref FULL JOIN (SELECT dist_1.id, NULL::integer AS value FROM (SELECT intermediate_result.id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(id integer)) dist_1) dist USING (id)) WHERE (dist.id OPERATOR(pg_catalog.=) ANY (SELECT intermediate_result.id FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(id integer)))
count
---------------------------------------------------------------------
101
(1 row)
-- subqueries in WHERE clause should be recursively planned after planner recursively plans full outer join
SELECT COUNT(*) FROM dist FULL JOIN ref USING(id)
WHERE
dist.id IN (SELECT id FROM dist WHERE id > 5) AND
dist.value IN (SELECT value FROM dist WHERE id > 15);
DEBUG: generating subplan XXX_1 for subquery SELECT value FROM public.dist WHERE (id OPERATOR(pg_catalog.>) 15)
DEBUG: recursively planning left side of the full join since the other side is a recurring rel
DEBUG: recursively planning distributed relation "dist" since it is part of a distributed join node that is outer joined with a recurring rel
DEBUG: Wrapping relation "dist" to a subquery
DEBUG: generating subplan XXX_2 for subquery SELECT id, value FROM public.dist WHERE true
DEBUG: generating subplan XXX_3 for subquery SELECT id FROM public.dist WHERE (id OPERATOR(pg_catalog.>) 5)
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((SELECT dist_1.id, dist_1.value FROM (SELECT intermediate_result.id, intermediate_result.value FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(id integer, value integer)) dist_1) dist FULL JOIN public.ref USING (id)) WHERE ((dist.id OPERATOR(pg_catalog.=) ANY (SELECT intermediate_result.id FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(id integer))) AND (dist.value OPERATOR(pg_catalog.=) ANY (SELECT intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(value integer))))
count
---------------------------------------------------------------------
85
(1 row)
-- sublinks in the targetlist are not supported
SELECT (SELECT id FROM dist WHERE dist.id > d1.id GROUP BY id) FROM ref FULL JOIN dist d1 USING (id);
DEBUG: recursively planning right side of the full join since the other side is a recurring rel
DEBUG: recursively planning distributed relation "dist" "d1" since it is part of a distributed join node that is outer joined with a recurring rel
DEBUG: Wrapping relation "dist" "d1" to a subquery
DEBUG: generating subplan XXX_1 for subquery SELECT id FROM public.dist d1 WHERE true
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT (SELECT dist.id FROM public.dist WHERE (dist.id OPERATOR(pg_catalog.>) d1.id) GROUP BY dist.id) AS id FROM (public.ref FULL JOIN (SELECT d1_1.id, NULL::integer AS value FROM (SELECT intermediate_result.id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(id integer)) d1_1) d1 USING (id))
ERROR: correlated subqueries are not supported when the FROM clause contains a reference table
DROP TABLE dist;
DROP TABLE ref;
DROP TABLE local;

View File

@ -360,3 +360,37 @@ FROM
ORDER BY 1
LIMIT 5
) as foo WHERE user_id IN (SELECT count(*) FROM users_table GROUP BY user_id);
CREATE TABLE dist(id int, value int);
SELECT create_distributed_table('dist','id');
INSERT INTO dist SELECT i, i FROM generate_series(0,100) i;
CREATE TABLE ref(id int);
SELECT create_reference_table('ref');
INSERT INTO ref SELECT i FROM generate_series(50,150) i;
CREATE TABLE local(id int);
INSERT INTO local SELECT i FROM generate_series(50,150) i;
-- planner recursively plans local table in local-dist join and then the whole query is routed
SELECT COUNT(*) FROM dist JOIN local USING(id)
WHERE
dist.id IN (SELECT id FROM dist WHERE id = 55) AND
dist.id = 55 AND
dist.value IN (SELECT value FROM dist WHERE id = 55);
-- subquery in WHERE clause should be recursively planned after planner recursively plans recurring full join
SELECT COUNT(*) FROM ref FULL JOIN dist USING (id)
WHERE
dist.id IN (SELECT id FROM dist GROUP BY id);
-- subqueries in WHERE clause should be recursively planned after planner recursively plans full outer join
SELECT COUNT(*) FROM dist FULL JOIN ref USING(id)
WHERE
dist.id IN (SELECT id FROM dist WHERE id > 5) AND
dist.value IN (SELECT value FROM dist WHERE id > 15);
-- sublinks in the targetlist are not supported
SELECT (SELECT id FROM dist WHERE dist.id > d1.id GROUP BY id) FROM ref FULL JOIN dist d1 USING (id);
DROP TABLE dist;
DROP TABLE ref;
DROP TABLE local;