fix multi level recursive plan (#6650)

Recursive planner should handle all the tree from bottom to top at
single pass. i.e. It should have already recursively planned all
required parts in its first pass. Otherwise, this means we have bug at
recursive planner, which needs to be handled. We add a check here and
return error.

DESCRIPTION: Fixes wrong results by throwing error in case recursive
planner multipass the query.

We found 3 different cases which causes recursive planner passes the
query multiple times.
1. Sublink in WHERE clause is planned at second pass after we
recursively planned a distributed table at the first pass. Fixed by PR
#6657.
2. Local-distributed joins are recursively planned at both the first and
the second pass. Issue #6659.
3. Some parts of the query is considered to be noncolocated at the
second pass as we do not generate attribute equivalances between
nondistributed and distributed tables. Issue #6653
pull/6673/head
aykut-bozkurt 2023-01-27 21:25:04 +03:00 committed by GitHub
parent 0903091343
commit ab71cd01ee
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 579 additions and 41 deletions

View File

@ -749,8 +749,11 @@ CreateDistributedPlannedStmt(DistributedPlanningContext *planContext)
hasUnresolvedParams = true;
}
bool allowRecursivePlanning = true;
DistributedPlan *distributedPlan =
CreateDistributedPlan(planId, planContext->originalQuery, planContext->query,
CreateDistributedPlan(planId, allowRecursivePlanning,
planContext->originalQuery,
planContext->query,
planContext->boundParams,
hasUnresolvedParams,
planContext->plannerRestrictionContext);
@ -921,8 +924,8 @@ TryCreateDistributedPlannedStmt(PlannedStmt *localPlan,
* 3. Logical planner
*/
DistributedPlan *
CreateDistributedPlan(uint64 planId, Query *originalQuery, Query *query, ParamListInfo
boundParams, bool hasUnresolvedParams,
CreateDistributedPlan(uint64 planId, bool allowRecursivePlanning, Query *originalQuery,
Query *query, ParamListInfo boundParams, bool hasUnresolvedParams,
PlannerRestrictionContext *plannerRestrictionContext)
{
DistributedPlan *distributedPlan = NULL;
@ -1060,6 +1063,21 @@ CreateDistributedPlan(uint64 planId, Query *originalQuery, Query *query, ParamLi
*/
if (list_length(subPlanList) > 0 || hasCtes)
{
/*
* recursive planner should handle all the tree from bottom to
* top at single pass. i.e. It should have already recursively planned all
* required parts in its first pass. Hence, we expect allowRecursivePlanning
* to be true. Otherwise, this means we have bug at recursive planner,
* which needs to be handled. We add a check here and return error.
*/
if (!allowRecursivePlanning)
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("recursive complex joins are only supported "
"when all distributed tables are co-located and "
"joined on their distribution columns")));
}
Query *newQuery = copyObject(originalQuery);
bool setPartitionedTablesInherited = false;
PlannerRestrictionContext *currentPlannerRestrictionContext =
@ -1088,8 +1106,14 @@ CreateDistributedPlan(uint64 planId, Query *originalQuery, Query *query, ParamLi
/* overwrite the old transformed query with the new transformed query */
*query = *newQuery;
/* recurse into CreateDistributedPlan with subqueries/CTEs replaced */
distributedPlan = CreateDistributedPlan(planId, originalQuery, query, NULL, false,
/*
* recurse into CreateDistributedPlan with subqueries/CTEs replaced.
* We only allow recursive planning once, which should have already done all
* the necessary transformations. So, we do not allow recursive planning once again.
*/
allowRecursivePlanning = false;
distributedPlan = CreateDistributedPlan(planId, allowRecursivePlanning,
originalQuery, query, NULL, false,
plannerRestrictionContext);
/* distributedPlan cannot be null since hasUnresolvedParams argument was false */

View File

@ -384,7 +384,9 @@ CreateInsertSelectIntoLocalTablePlan(uint64 planId, Query *insertSelectQuery,
/* get the SELECT query (may have changed after PrepareInsertSelectForCitusPlanner) */
Query *selectQuery = selectRte->subquery;
DistributedPlan *distPlan = CreateDistributedPlan(planId, selectQuery,
bool allowRecursivePlanning = true;
DistributedPlan *distPlan = CreateDistributedPlan(planId, allowRecursivePlanning,
selectQuery,
copyObject(selectQuery),
boundParams, hasUnresolvedParams,
plannerRestrictionContext);

View File

@ -1550,7 +1550,21 @@ AddRteRelationToAttributeEquivalenceClass(AttributeEquivalenceClass *
Assert(rangeTableEntry->rtekind == RTE_RELATION);
/* we don't need reference tables in the equality on columns */
/*
* we only calculate the equivalence of distributed tables.
* This leads to certain shortcomings in the query planning when reference
* tables and/or intermediate results are involved in the query. For example,
* the following query patterns could actually be pushed-down in a single iteration
* "(intermediate_res INNER JOIN dist dist1) INNER JOIN dist dist2 " or
* "(ref INNER JOIN dist dist1) JOIN dist dist2"
*
* However, if there are no explicit join conditions between distributed tables,
* the planner cannot deduce the equivalence between the distributed tables.
*
* Instead, we should be able to track all the equivalences between range table
* entries, and expand distributed table equivalences that happens via
* reference table/intermediate results
*/
if (relationPartitionKey == NULL)
{
return;

View File

@ -246,10 +246,12 @@ extern PlannedStmt * FinalizePlan(PlannedStmt *localPlan,
extern RTEListProperties * GetRTEListPropertiesForQuery(Query *query);
extern struct DistributedPlan * CreateDistributedPlan(uint64 planId, Query *originalQuery,
Query *query, ParamListInfo
boundParams, bool
hasUnresolvedParams,
extern struct DistributedPlan * CreateDistributedPlan(uint64 planId,
bool allowRecursivePlanning,
Query *originalQuery,
Query *query,
ParamListInfo boundParams,
bool hasUnresolvedParams,
PlannerRestrictionContext *
plannerRestrictionContext);

View File

@ -497,6 +497,30 @@ DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS c
100
(1 row)
-- prefer-distributed option causes recursive planner passes the query 2 times and errors out
-- planner recursively plans one of the distributed_table in its first pass. Then, at its second
-- pass, it also recursively plans other distributed_table as modification at first step caused it.
SET citus.local_table_join_policy TO 'prefer-distributed';
SELECT
COUNT(*)
FROM
postgres_table
JOIN
distributed_table
USING
(key)
JOIN
(SELECT key, NULL, NULL FROM distributed_table) foo
USING
(key);
DEBUG: Wrapping relation "distributed_table" to a subquery
DEBUG: generating subplan XXX_1 for subquery SELECT key FROM citus_local_dist_joins.distributed_table WHERE true
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((citus_local_dist_joins.postgres_table JOIN (SELECT distributed_table_1.key, NULL::text AS value, NULL::jsonb AS value_2 FROM (SELECT intermediate_result.key FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer)) distributed_table_1) distributed_table USING (key)) JOIN (SELECT distributed_table_1.key, NULL::text, NULL::text FROM citus_local_dist_joins.distributed_table distributed_table_1) foo(key, "?column?", "?column?_1") USING (key))
DEBUG: Wrapping relation "postgres_table" to a subquery
DEBUG: generating subplan XXX_1 for subquery SELECT key FROM citus_local_dist_joins.postgres_table WHERE true
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (((SELECT postgres_table_1.key, NULL::text AS value, NULL::jsonb AS value_2 FROM (SELECT intermediate_result.key FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer)) postgres_table_1) postgres_table JOIN (SELECT distributed_table_1.key, NULL::text AS value, NULL::jsonb AS value_2 FROM (SELECT intermediate_result.key FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer)) distributed_table_1) distributed_table USING (key)) JOIN (SELECT distributed_table_1.key, NULL::text, NULL::text FROM citus_local_dist_joins.distributed_table distributed_table_1) foo(key, "?column?", "?column?_1") USING (key))
ERROR: recursive complex joins are only supported when all distributed tables are co-located and joined on their distribution columns
RESET citus.local_table_join_policy;
SET client_min_messages to ERROR;
DROP TABLE citus_local;
SELECT master_remove_node('localhost', :master_port);

View File

@ -799,23 +799,6 @@ SELECT distributed.name, distributed.name, local.title, local.title FROM local
0 | 0 | 0 | 0
(1 row)
SELECT
COUNT(*)
FROM
local
JOIN
distributed
USING
(id)
JOIN
(SELECT id, NULL, NULL FROM distributed) foo
USING
(id);
count
---------------------------------------------------------------------
101
(1 row)
BEGIN;
SELECT COUNT(DISTINCT title) FROM local;
count

View File

@ -0,0 +1,307 @@
-- multi recursive queries with joins, subqueries, and ctes
CREATE SCHEMA multi_recursive;
SET search_path TO multi_recursive;
DROP TABLE IF EXISTS tbl_dist1;
NOTICE: table "tbl_dist1" does not exist, skipping
CREATE TABLE tbl_dist1(id int);
SELECT create_distributed_table('tbl_dist1','id');
create_distributed_table
---------------------------------------------------------------------
(1 row)
DROP TABLE IF EXISTS tbl_ref1;
NOTICE: table "tbl_ref1" does not exist, skipping
CREATE TABLE tbl_ref1(id int);
SELECT create_reference_table('tbl_ref1');
create_reference_table
---------------------------------------------------------------------
(1 row)
INSERT INTO tbl_dist1 SELECT i FROM generate_series(0,10) i;
INSERT INTO tbl_ref1 SELECT i FROM generate_series(0,10) i;
-- https://github.com/citusdata/citus/issues/6653
-- The reason why inlined queries failed are all the same. After we modified the query at first pass, second pass finds out
-- noncolocated queries as we donot create equivalances between nondistributed-distributed tables.
-- QUERY1
-- recursive planner multipass the query and fails.
-- Why inlined query failed?
-- limit clause is recursively planned in inlined cte. First pass finishes here. At second pass, noncolocated queries and
-- recurring full join are recursively planned. We detect that and throw error.
SELECT t1.id
FROM (
SELECT t2.id
FROM (
SELECT t0.id
FROM tbl_dist1 t0
LIMIT 5
) AS t2
INNER JOIN tbl_dist1 AS t3 USING (id)
) AS t1
FULL JOIN tbl_dist1 t4 USING (id);
ERROR: recursive complex joins are only supported when all distributed tables are co-located and joined on their distribution columns
-- QUERY2
-- recursive planner multipass the query with inlined cte and fails. Then, cte is planned without inlining and it succeeds.
-- Why inlined query failed?
-- recurring left join is recursively planned in inlined cte. Then, limit clause causes another recursive planning. First pass
-- finishes here. At second pass, noncolocated queries and recurring right join are recursively planned. We detect that and
-- throw error.
SET client_min_messages TO DEBUG1;
WITH cte_0 AS (
SELECT id FROM tbl_dist1 WHERE id IN (
SELECT id FROM tbl_ref1
LEFT JOIN tbl_dist1 USING (id)
)
)
SELECT count(id) FROM tbl_dist1
RIGHT JOIN (
SELECT table_5.id FROM (
SELECT id FROM cte_0 LIMIT 0
) AS table_5
RIGHT JOIN tbl_dist1 USING (id)
) AS table_4 USING (id);
DEBUG: CTE cte_0 is going to be inlined via distributed planning
DEBUG: recursively planning right side of the left join since the outer side is a recurring rel
DEBUG: recursively planning distributed relation "tbl_dist1" since it is part of a distributed join node that is outer joined with a recurring rel
DEBUG: Wrapping relation "tbl_dist1" to a subquery
DEBUG: generating subplan XXX_1 for subquery SELECT id FROM multi_recursive.tbl_dist1 WHERE true
DEBUG: push down of limit count: 0
DEBUG: generating subplan XXX_2 for subquery SELECT id FROM (SELECT tbl_dist1.id FROM multi_recursive.tbl_dist1 WHERE (tbl_dist1.id OPERATOR(pg_catalog.=) ANY (SELECT tbl_ref1.id FROM (multi_recursive.tbl_ref1 LEFT JOIN (SELECT tbl_dist1_2.id FROM (SELECT intermediate_result.id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(id integer)) tbl_dist1_2) tbl_dist1_1 USING (id))))) cte_0 LIMIT 0
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(table_4.id) AS count FROM (multi_recursive.tbl_dist1 RIGHT JOIN (SELECT table_5.id FROM ((SELECT intermediate_result.id FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(id integer)) table_5 RIGHT JOIN multi_recursive.tbl_dist1 tbl_dist1_1 USING (id))) table_4 USING (id))
DEBUG: generating subplan XXX_1 for subquery SELECT table_5.id FROM ((SELECT intermediate_result.id FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(id integer)) table_5 RIGHT JOIN multi_recursive.tbl_dist1 USING (id))
DEBUG: recursively planning left side of the right join since the outer side is a recurring rel
DEBUG: recursively planning distributed relation "tbl_dist1" since it is part of a distributed join node that is outer joined with a recurring rel
DEBUG: Wrapping relation "tbl_dist1" to a subquery
DEBUG: generating subplan XXX_2 for subquery SELECT id FROM multi_recursive.tbl_dist1 WHERE true
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(table_4.id) AS count FROM ((SELECT tbl_dist1_1.id FROM (SELECT intermediate_result.id FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(id integer)) tbl_dist1_1) tbl_dist1 RIGHT JOIN (SELECT intermediate_result.id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(id integer)) table_4 USING (id))
DEBUG: generating subplan XXX_1 for CTE cte_0: SELECT id FROM multi_recursive.tbl_dist1 WHERE (id OPERATOR(pg_catalog.=) ANY (SELECT tbl_ref1.id FROM (multi_recursive.tbl_ref1 LEFT JOIN multi_recursive.tbl_dist1 tbl_dist1_1 USING (id))))
DEBUG: recursively planning right side of the left join since the outer side is a recurring rel
DEBUG: recursively planning distributed relation "tbl_dist1" since it is part of a distributed join node that is outer joined with a recurring rel
DEBUG: Wrapping relation "tbl_dist1" to a subquery
DEBUG: generating subplan XXX_1 for subquery SELECT id FROM multi_recursive.tbl_dist1 WHERE true
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT id FROM multi_recursive.tbl_dist1 WHERE (id OPERATOR(pg_catalog.=) ANY (SELECT tbl_ref1.id FROM (multi_recursive.tbl_ref1 LEFT JOIN (SELECT tbl_dist1_2.id FROM (SELECT intermediate_result.id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(id integer)) tbl_dist1_2) tbl_dist1_1 USING (id))))
DEBUG: generating subplan XXX_2 for subquery SELECT id FROM (SELECT intermediate_result.id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(id integer)) cte_0 LIMIT 0
DEBUG: generating subplan XXX_3 for subquery SELECT table_5.id FROM ((SELECT intermediate_result.id FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(id integer)) table_5 RIGHT JOIN multi_recursive.tbl_dist1 USING (id))
DEBUG: recursively planning left side of the right join since the outer side is a recurring rel
DEBUG: recursively planning distributed relation "tbl_dist1" since it is part of a distributed join node that is outer joined with a recurring rel
DEBUG: Wrapping relation "tbl_dist1" to a subquery
DEBUG: generating subplan XXX_4 for subquery SELECT id FROM multi_recursive.tbl_dist1 WHERE true
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(table_4.id) AS count FROM ((SELECT tbl_dist1_1.id FROM (SELECT intermediate_result.id FROM read_intermediate_result('XXX_4'::text, 'binary'::citus_copy_format) intermediate_result(id integer)) tbl_dist1_1) tbl_dist1 RIGHT JOIN (SELECT intermediate_result.id FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(id integer)) table_4 USING (id))
count
---------------------------------------------------------------------
0
(1 row)
RESET client_min_messages;
DROP TABLE IF EXISTS dist0;
NOTICE: table "dist0" does not exist, skipping
CREATE TABLE dist0(id int);
SELECT create_distributed_table('dist0','id');
create_distributed_table
---------------------------------------------------------------------
(1 row)
DROP TABLE IF EXISTS dist1;
NOTICE: table "dist1" does not exist, skipping
CREATE TABLE dist1(id int);
SELECT create_distributed_table('dist1','id');
create_distributed_table
---------------------------------------------------------------------
(1 row)
INSERT INTO dist0 SELECT i FROM generate_series(1005,1025) i;
INSERT INTO dist1 SELECT i FROM generate_series(1015,1035) i;
-- QUERY3
-- recursive planner multipass the query with inlined cte and fails. Then, cte is planned without inlining and it succeeds.
-- Why inlined query failed?
-- noncolocated queries are recursively planned. First pass finishes here. Second pass also recursively plans noncolocated
-- queries and recurring full join. We detect the error and throw it.
SET client_min_messages TO DEBUG1;
WITH cte_0 AS (
SELECT id FROM dist0
RIGHT JOIN dist0 AS table_1 USING (id)
ORDER BY id
)
SELECT avg(avgsub.id) FROM (
SELECT table_2.id FROM (
SELECT table_3.id FROM (
SELECT table_5.id FROM cte_0 AS table_5, dist1
) AS table_3 INNER JOIN dist1 USING (id)
) AS table_2 FULL JOIN dist0 USING (id)
) AS avgsub;
DEBUG: CTE cte_0 is going to be inlined via distributed planning
DEBUG: generating subplan XXX_1 for subquery SELECT table_1.id FROM (multi_recursive.dist0 RIGHT JOIN multi_recursive.dist0 table_1 USING (id)) ORDER BY table_1.id
DEBUG: generating subplan XXX_2 for subquery SELECT table_5.id FROM (SELECT intermediate_result.id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(id integer)) table_5, multi_recursive.dist1
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT avg(id) AS avg FROM (SELECT table_2.id FROM ((SELECT table_3.id FROM ((SELECT intermediate_result.id FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(id integer)) table_3 JOIN multi_recursive.dist1 USING (id))) table_2 FULL JOIN multi_recursive.dist0 USING (id))) avgsub
DEBUG: generating subplan XXX_1 for subquery SELECT table_3.id FROM ((SELECT intermediate_result.id FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(id integer)) table_3 JOIN multi_recursive.dist1 USING (id))
DEBUG: recursively planning right side of the full join since the other side is a recurring rel
DEBUG: recursively planning distributed relation "dist0" since it is part of a distributed join node that is outer joined with a recurring rel
DEBUG: Wrapping relation "dist0" to a subquery
DEBUG: generating subplan XXX_2 for subquery SELECT id FROM multi_recursive.dist0 WHERE true
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT avg(id) AS avg FROM (SELECT table_2.id FROM ((SELECT intermediate_result.id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(id integer)) table_2 FULL JOIN (SELECT dist0_1.id FROM (SELECT intermediate_result.id FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(id integer)) dist0_1) dist0 USING (id))) avgsub
DEBUG: generating subplan XXX_1 for CTE cte_0: SELECT table_1.id FROM (multi_recursive.dist0 RIGHT JOIN multi_recursive.dist0 table_1 USING (id)) ORDER BY table_1.id
DEBUG: generating subplan XXX_1 for subquery SELECT table_5.id FROM (SELECT intermediate_result.id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(id integer)) table_5, multi_recursive.dist1
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT table_3.id FROM ((SELECT intermediate_result.id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(id integer)) table_3 JOIN multi_recursive.dist1 USING (id))
DEBUG: generating subplan XXX_2 for subquery SELECT table_3.id FROM ((SELECT table_5.id FROM (SELECT intermediate_result.id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(id integer)) table_5, multi_recursive.dist1 dist1_1) table_3 JOIN multi_recursive.dist1 USING (id))
DEBUG: recursively planning right side of the full join since the other side is a recurring rel
DEBUG: recursively planning distributed relation "dist0" since it is part of a distributed join node that is outer joined with a recurring rel
DEBUG: Wrapping relation "dist0" to a subquery
DEBUG: generating subplan XXX_3 for subquery SELECT id FROM multi_recursive.dist0 WHERE true
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT avg(id) AS avg FROM (SELECT table_2.id FROM ((SELECT intermediate_result.id FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(id integer)) table_2 FULL JOIN (SELECT dist0_1.id FROM (SELECT intermediate_result.id FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(id integer)) dist0_1) dist0 USING (id))) avgsub
avg
---------------------------------------------------------------------
1020.0000000000000000
(1 row)
RESET client_min_messages;
DROP TABLE IF EXISTS dist0;
CREATE TABLE dist0(id int);
SELECT create_distributed_table('dist0','id');
create_distributed_table
---------------------------------------------------------------------
(1 row)
DROP TABLE IF EXISTS dist1;
CREATE TABLE dist1(id int);
SELECT create_distributed_table('dist1','id');
create_distributed_table
---------------------------------------------------------------------
(1 row)
INSERT INTO dist0 SELECT i FROM generate_series(0,10) i;
INSERT INTO dist0 SELECT * FROM dist0 ORDER BY id LIMIT 1;
INSERT INTO dist1 SELECT i FROM generate_series(0,10) i;
INSERT INTO dist1 SELECT * FROM dist1 ORDER BY id LIMIT 1;
-- QUERY4
-- recursive planner multipass the query fails.
-- Why inlined query failed?
-- limit clause is recursively planned at the first pass. At second pass noncolocated queries are recursively planned.
-- We detect that and throw error.
SET client_min_messages TO DEBUG1;
SELECT avg(avgsub.id) FROM (
SELECT table_0.id FROM (
SELECT table_1.id FROM (
SELECT table_2.id FROM (
SELECT table_3.id FROM (
SELECT table_4.id FROM dist0 AS table_4
LEFT JOIN dist1 AS table_5 USING (id)
) AS table_3 INNER JOIN dist0 AS table_6 USING (id)
) AS table_2 WHERE table_2.id < 10 ORDER BY id LIMIT 47
) AS table_1 RIGHT JOIN dist0 AS table_7 USING (id)
) AS table_0 RIGHT JOIN dist1 AS table_8 USING (id)
) AS avgsub;
DEBUG: push down of limit count: 47
DEBUG: generating subplan XXX_1 for subquery SELECT id FROM (SELECT table_3.id FROM ((SELECT table_4.id FROM (multi_recursive.dist0 table_4 LEFT JOIN multi_recursive.dist1 table_5 USING (id))) table_3 JOIN multi_recursive.dist0 table_6 USING (id))) table_2 WHERE (id OPERATOR(pg_catalog.<) 10) ORDER BY id LIMIT 47
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT avg(id) AS avg FROM (SELECT table_0.id FROM ((SELECT table_1.id FROM ((SELECT intermediate_result.id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(id integer)) table_1 RIGHT JOIN multi_recursive.dist0 table_7 USING (id))) table_0 RIGHT JOIN multi_recursive.dist1 table_8 USING (id))) avgsub
DEBUG: generating subplan XXX_1 for subquery SELECT table_1.id FROM ((SELECT intermediate_result.id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(id integer)) table_1 RIGHT JOIN multi_recursive.dist0 table_7 USING (id))
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT avg(id) AS avg FROM (SELECT table_0.id FROM ((SELECT intermediate_result.id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(id integer)) table_0 RIGHT JOIN multi_recursive.dist1 table_8 USING (id))) avgsub
ERROR: recursive complex joins are only supported when all distributed tables are co-located and joined on their distribution columns
-- QUERY5
-- recursive planner multipass the query with inlined cte and fails. Then, cte is planned without inlining and it succeeds.
-- Why inlined query failed?
-- limit clause is recursively planned. First pass finishes here. At second pass, noncolocated tables and recurring full join
-- are recursively planned. We detect that and throw error.
WITH cte_0 AS (
SELECT table_0.id FROM dist1 AS table_0 LEFT JOIN dist1 AS table_1 USING (id) ORDER BY id LIMIT 41
)
SELECT avg(avgsub.id) FROM (
SELECT table_4.id FROM (
SELECT table_5.id FROM (
SELECT table_6.id FROM cte_0 AS table_6
) AS table_5
INNER JOIN dist0 USING (id) INNER JOIN dist1 AS table_9 USING (id)
) AS table_4 FULL JOIN dist0 USING (id)
) AS avgsub;
DEBUG: CTE cte_0 is going to be inlined via distributed planning
DEBUG: push down of limit count: 41
DEBUG: generating subplan XXX_1 for subquery SELECT table_0.id FROM (multi_recursive.dist1 table_0 LEFT JOIN multi_recursive.dist1 table_1 USING (id)) ORDER BY table_0.id LIMIT 41
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT avg(id) AS avg FROM (SELECT table_4.id FROM ((SELECT table_5.id FROM (((SELECT table_6.id FROM (SELECT intermediate_result.id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(id integer)) table_6) table_5 JOIN multi_recursive.dist0 dist0_1 USING (id)) JOIN multi_recursive.dist1 table_9 USING (id))) table_4 FULL JOIN multi_recursive.dist0 USING (id))) avgsub
DEBUG: generating subplan XXX_1 for subquery SELECT table_5.id FROM (((SELECT table_6.id FROM (SELECT intermediate_result.id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(id integer)) table_6) table_5 JOIN multi_recursive.dist0 USING (id)) JOIN multi_recursive.dist1 table_9 USING (id))
DEBUG: recursively planning right side of the full join since the other side is a recurring rel
DEBUG: recursively planning distributed relation "dist0" since it is part of a distributed join node that is outer joined with a recurring rel
DEBUG: Wrapping relation "dist0" to a subquery
DEBUG: generating subplan XXX_2 for subquery SELECT id FROM multi_recursive.dist0 WHERE true
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT avg(id) AS avg FROM (SELECT table_4.id FROM ((SELECT intermediate_result.id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(id integer)) table_4 FULL JOIN (SELECT dist0_1.id FROM (SELECT intermediate_result.id FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(id integer)) dist0_1) dist0 USING (id))) avgsub
DEBUG: generating subplan XXX_1 for CTE cte_0: SELECT table_0.id FROM (multi_recursive.dist1 table_0 LEFT JOIN multi_recursive.dist1 table_1 USING (id)) ORDER BY table_0.id LIMIT 41
DEBUG: push down of limit count: 41
DEBUG: generating subplan XXX_2 for subquery SELECT table_5.id FROM (((SELECT table_6.id FROM (SELECT intermediate_result.id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(id integer)) table_6) table_5 JOIN multi_recursive.dist0 USING (id)) JOIN multi_recursive.dist1 table_9 USING (id))
DEBUG: recursively planning right side of the full join since the other side is a recurring rel
DEBUG: recursively planning distributed relation "dist0" since it is part of a distributed join node that is outer joined with a recurring rel
DEBUG: Wrapping relation "dist0" to a subquery
DEBUG: generating subplan XXX_3 for subquery SELECT id FROM multi_recursive.dist0 WHERE true
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT avg(id) AS avg FROM (SELECT table_4.id FROM ((SELECT intermediate_result.id FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(id integer)) table_4 FULL JOIN (SELECT dist0_1.id FROM (SELECT intermediate_result.id FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(id integer)) dist0_1) dist0 USING (id))) avgsub
avg
---------------------------------------------------------------------
1.3095238095238095
(1 row)
-- QUERY6
-- recursive planner multipass the query with inlined cte and fails. Then, cte is planned without inlining and it succeeds.
-- Why inlined query failed?
-- Same query and flow as above with explicit (NOT MATERIALIZED) option, which makes it directly inlinable. Even if
-- planner fails with inlined query, it succeeds without inlining.
WITH cte_0 AS (
SELECT table_0.id FROM dist1 AS table_0 LEFT JOIN dist1 AS table_1 USING (id) ORDER BY id LIMIT 41
)
SELECT avg(avgsub.id) FROM (
SELECT table_4.id FROM (
SELECT table_5.id FROM (
SELECT table_6.id FROM cte_0 AS table_6
) AS table_5
INNER JOIN dist0 USING (id) INNER JOIN dist1 AS table_9 USING (id)
) AS table_4 FULL JOIN dist0 USING (id)
) AS avgsub;
DEBUG: CTE cte_0 is going to be inlined via distributed planning
DEBUG: push down of limit count: 41
DEBUG: generating subplan XXX_1 for subquery SELECT table_0.id FROM (multi_recursive.dist1 table_0 LEFT JOIN multi_recursive.dist1 table_1 USING (id)) ORDER BY table_0.id LIMIT 41
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT avg(id) AS avg FROM (SELECT table_4.id FROM ((SELECT table_5.id FROM (((SELECT table_6.id FROM (SELECT intermediate_result.id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(id integer)) table_6) table_5 JOIN multi_recursive.dist0 dist0_1 USING (id)) JOIN multi_recursive.dist1 table_9 USING (id))) table_4 FULL JOIN multi_recursive.dist0 USING (id))) avgsub
DEBUG: generating subplan XXX_1 for subquery SELECT table_5.id FROM (((SELECT table_6.id FROM (SELECT intermediate_result.id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(id integer)) table_6) table_5 JOIN multi_recursive.dist0 USING (id)) JOIN multi_recursive.dist1 table_9 USING (id))
DEBUG: recursively planning right side of the full join since the other side is a recurring rel
DEBUG: recursively planning distributed relation "dist0" since it is part of a distributed join node that is outer joined with a recurring rel
DEBUG: Wrapping relation "dist0" to a subquery
DEBUG: generating subplan XXX_2 for subquery SELECT id FROM multi_recursive.dist0 WHERE true
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT avg(id) AS avg FROM (SELECT table_4.id FROM ((SELECT intermediate_result.id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(id integer)) table_4 FULL JOIN (SELECT dist0_1.id FROM (SELECT intermediate_result.id FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(id integer)) dist0_1) dist0 USING (id))) avgsub
DEBUG: generating subplan XXX_1 for CTE cte_0: SELECT table_0.id FROM (multi_recursive.dist1 table_0 LEFT JOIN multi_recursive.dist1 table_1 USING (id)) ORDER BY table_0.id LIMIT 41
DEBUG: push down of limit count: 41
DEBUG: generating subplan XXX_2 for subquery SELECT table_5.id FROM (((SELECT table_6.id FROM (SELECT intermediate_result.id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(id integer)) table_6) table_5 JOIN multi_recursive.dist0 USING (id)) JOIN multi_recursive.dist1 table_9 USING (id))
DEBUG: recursively planning right side of the full join since the other side is a recurring rel
DEBUG: recursively planning distributed relation "dist0" since it is part of a distributed join node that is outer joined with a recurring rel
DEBUG: Wrapping relation "dist0" to a subquery
DEBUG: generating subplan XXX_3 for subquery SELECT id FROM multi_recursive.dist0 WHERE true
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT avg(id) AS avg FROM (SELECT table_4.id FROM ((SELECT intermediate_result.id FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(id integer)) table_4 FULL JOIN (SELECT dist0_1.id FROM (SELECT intermediate_result.id FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(id integer)) dist0_1) dist0 USING (id))) avgsub
avg
---------------------------------------------------------------------
1.3095238095238095
(1 row)
-- QUERY7
-- recursive planner multipass the query and fails. Note that cte is not used in the query.
-- Why inlined query failed?
-- limit clause is recursively planned. First pass finishes here. At second pass noncolocated queries are recursively planned.
-- We detect multipass and throw error.
WITH cte_0 AS (
SELECT table_0.id FROM dist1 AS table_0 FULL JOIN dist1 AS table_1 USING (id)
)
SELECT avg(table_5.id) FROM (
SELECT table_6.id FROM (
SELECT table_7.id FROM dist0 AS table_7 ORDER BY id LIMIT 87
) AS table_6 INNER JOIN dist0 AS table_8 USING (id) WHERE table_8.id < 0 ORDER BY id
) AS table_5 INNER JOIN dist0 AS table_9 USING (id);
DEBUG: push down of limit count: 87
DEBUG: generating subplan XXX_1 for subquery SELECT id FROM multi_recursive.dist0 table_7 ORDER BY id LIMIT 87
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT avg(table_5.id) AS avg FROM ((SELECT table_6.id FROM ((SELECT intermediate_result.id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(id integer)) table_6 JOIN multi_recursive.dist0 table_8 USING (id)) WHERE (table_8.id OPERATOR(pg_catalog.<) 0) ORDER BY table_6.id) table_5 JOIN multi_recursive.dist0 table_9 USING (id))
DEBUG: generating subplan XXX_1 for subquery SELECT table_6.id FROM ((SELECT intermediate_result.id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(id integer)) table_6 JOIN multi_recursive.dist0 table_8 USING (id)) WHERE (table_8.id OPERATOR(pg_catalog.<) 0) ORDER BY table_6.id
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT avg(table_5.id) AS avg FROM ((SELECT intermediate_result.id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(id integer)) table_5 JOIN multi_recursive.dist0 table_9 USING (id))
ERROR: recursive complex joins are only supported when all distributed tables are co-located and joined on their distribution columns
RESET client_min_messages;
DROP SCHEMA multi_recursive CASCADE;
NOTICE: drop cascades to 4 other objects
DETAIL: drop cascades to table tbl_dist1
drop cascades to table tbl_ref1
drop cascades to table dist0
drop cascades to table dist1

View File

@ -54,7 +54,7 @@ test: subqueries_deep subquery_view subquery_partitioning subqueries_not_support
test: subquery_in_targetlist subquery_in_where subquery_complex_target_list subquery_append
test: subquery_prepared_statements
test: non_colocated_leaf_subquery_joins non_colocated_subquery_joins
test: cte_inline recursive_view_local_table values sequences_with_different_types
test: cte_inline recursive_view_local_table values sequences_with_different_types multi_level_recursive_queries
test: pg13 pg12
# run pg14 sequentially as it syncs metadata
test: pg14

View File

@ -250,6 +250,26 @@ JOIN
citus_local c2
USING (key);
-- prefer-distributed option causes recursive planner passes the query 2 times and errors out
-- planner recursively plans one of the distributed_table in its first pass. Then, at its second
-- pass, it also recursively plans other distributed_table as modification at first step caused it.
SET citus.local_table_join_policy TO 'prefer-distributed';
SELECT
COUNT(*)
FROM
postgres_table
JOIN
distributed_table
USING
(key)
JOIN
(SELECT key, NULL, NULL FROM distributed_table) foo
USING
(key);
RESET citus.local_table_join_policy;
SET client_min_messages to ERROR;
DROP TABLE citus_local;

View File

@ -289,18 +289,6 @@ ORDER BY 1;
SELECT local.title, local.title FROM local JOIN distributed USING(id) ORDER BY 1,2 LIMIt 1;
SELECT NULL FROM local JOIN distributed USING(id) ORDER BY 1 LIMIt 1;
SELECT distributed.name, distributed.name, local.title, local.title FROM local JOIN distributed USING(id) ORDER BY 1,2,3,4 LIMIT 1;
SELECT
COUNT(*)
FROM
local
JOIN
distributed
USING
(id)
JOIN
(SELECT id, NULL, NULL FROM distributed) foo
USING
(id);
BEGIN;
SELECT COUNT(DISTINCT title) FROM local;

View File

@ -0,0 +1,174 @@
-- multi recursive queries with joins, subqueries, and ctes
CREATE SCHEMA multi_recursive;
SET search_path TO multi_recursive;
DROP TABLE IF EXISTS tbl_dist1;
CREATE TABLE tbl_dist1(id int);
SELECT create_distributed_table('tbl_dist1','id');
DROP TABLE IF EXISTS tbl_ref1;
CREATE TABLE tbl_ref1(id int);
SELECT create_reference_table('tbl_ref1');
INSERT INTO tbl_dist1 SELECT i FROM generate_series(0,10) i;
INSERT INTO tbl_ref1 SELECT i FROM generate_series(0,10) i;
-- https://github.com/citusdata/citus/issues/6653
-- The reason why inlined queries failed are all the same. After we modified the query at first pass, second pass finds out
-- noncolocated queries as we donot create equivalances between nondistributed-distributed tables.
-- QUERY1
-- recursive planner multipass the query and fails.
-- Why inlined query failed?
-- limit clause is recursively planned in inlined cte. First pass finishes here. At second pass, noncolocated queries and
-- recurring full join are recursively planned. We detect that and throw error.
SELECT t1.id
FROM (
SELECT t2.id
FROM (
SELECT t0.id
FROM tbl_dist1 t0
LIMIT 5
) AS t2
INNER JOIN tbl_dist1 AS t3 USING (id)
) AS t1
FULL JOIN tbl_dist1 t4 USING (id);
-- QUERY2
-- recursive planner multipass the query with inlined cte and fails. Then, cte is planned without inlining and it succeeds.
-- Why inlined query failed?
-- recurring left join is recursively planned in inlined cte. Then, limit clause causes another recursive planning. First pass
-- finishes here. At second pass, noncolocated queries and recurring right join are recursively planned. We detect that and
-- throw error.
SET client_min_messages TO DEBUG1;
WITH cte_0 AS (
SELECT id FROM tbl_dist1 WHERE id IN (
SELECT id FROM tbl_ref1
LEFT JOIN tbl_dist1 USING (id)
)
)
SELECT count(id) FROM tbl_dist1
RIGHT JOIN (
SELECT table_5.id FROM (
SELECT id FROM cte_0 LIMIT 0
) AS table_5
RIGHT JOIN tbl_dist1 USING (id)
) AS table_4 USING (id);
RESET client_min_messages;
DROP TABLE IF EXISTS dist0;
CREATE TABLE dist0(id int);
SELECT create_distributed_table('dist0','id');
DROP TABLE IF EXISTS dist1;
CREATE TABLE dist1(id int);
SELECT create_distributed_table('dist1','id');
INSERT INTO dist0 SELECT i FROM generate_series(1005,1025) i;
INSERT INTO dist1 SELECT i FROM generate_series(1015,1035) i;
-- QUERY3
-- recursive planner multipass the query with inlined cte and fails. Then, cte is planned without inlining and it succeeds.
-- Why inlined query failed?
-- noncolocated queries are recursively planned. First pass finishes here. Second pass also recursively plans noncolocated
-- queries and recurring full join. We detect the error and throw it.
SET client_min_messages TO DEBUG1;
WITH cte_0 AS (
SELECT id FROM dist0
RIGHT JOIN dist0 AS table_1 USING (id)
ORDER BY id
)
SELECT avg(avgsub.id) FROM (
SELECT table_2.id FROM (
SELECT table_3.id FROM (
SELECT table_5.id FROM cte_0 AS table_5, dist1
) AS table_3 INNER JOIN dist1 USING (id)
) AS table_2 FULL JOIN dist0 USING (id)
) AS avgsub;
RESET client_min_messages;
DROP TABLE IF EXISTS dist0;
CREATE TABLE dist0(id int);
SELECT create_distributed_table('dist0','id');
DROP TABLE IF EXISTS dist1;
CREATE TABLE dist1(id int);
SELECT create_distributed_table('dist1','id');
INSERT INTO dist0 SELECT i FROM generate_series(0,10) i;
INSERT INTO dist0 SELECT * FROM dist0 ORDER BY id LIMIT 1;
INSERT INTO dist1 SELECT i FROM generate_series(0,10) i;
INSERT INTO dist1 SELECT * FROM dist1 ORDER BY id LIMIT 1;
-- QUERY4
-- recursive planner multipass the query fails.
-- Why inlined query failed?
-- limit clause is recursively planned at the first pass. At second pass noncolocated queries are recursively planned.
-- We detect that and throw error.
SET client_min_messages TO DEBUG1;
SELECT avg(avgsub.id) FROM (
SELECT table_0.id FROM (
SELECT table_1.id FROM (
SELECT table_2.id FROM (
SELECT table_3.id FROM (
SELECT table_4.id FROM dist0 AS table_4
LEFT JOIN dist1 AS table_5 USING (id)
) AS table_3 INNER JOIN dist0 AS table_6 USING (id)
) AS table_2 WHERE table_2.id < 10 ORDER BY id LIMIT 47
) AS table_1 RIGHT JOIN dist0 AS table_7 USING (id)
) AS table_0 RIGHT JOIN dist1 AS table_8 USING (id)
) AS avgsub;
-- QUERY5
-- recursive planner multipass the query with inlined cte and fails. Then, cte is planned without inlining and it succeeds.
-- Why inlined query failed?
-- limit clause is recursively planned. First pass finishes here. At second pass, noncolocated tables and recurring full join
-- are recursively planned. We detect that and throw error.
WITH cte_0 AS (
SELECT table_0.id FROM dist1 AS table_0 LEFT JOIN dist1 AS table_1 USING (id) ORDER BY id LIMIT 41
)
SELECT avg(avgsub.id) FROM (
SELECT table_4.id FROM (
SELECT table_5.id FROM (
SELECT table_6.id FROM cte_0 AS table_6
) AS table_5
INNER JOIN dist0 USING (id) INNER JOIN dist1 AS table_9 USING (id)
) AS table_4 FULL JOIN dist0 USING (id)
) AS avgsub;
-- QUERY6
-- recursive planner multipass the query with inlined cte and fails. Then, cte is planned without inlining and it succeeds.
-- Why inlined query failed?
-- Same query and flow as above with explicit (NOT MATERIALIZED) option, which makes it directly inlinable. Even if
-- planner fails with inlined query, it succeeds without inlining.
WITH cte_0 AS (
SELECT table_0.id FROM dist1 AS table_0 LEFT JOIN dist1 AS table_1 USING (id) ORDER BY id LIMIT 41
)
SELECT avg(avgsub.id) FROM (
SELECT table_4.id FROM (
SELECT table_5.id FROM (
SELECT table_6.id FROM cte_0 AS table_6
) AS table_5
INNER JOIN dist0 USING (id) INNER JOIN dist1 AS table_9 USING (id)
) AS table_4 FULL JOIN dist0 USING (id)
) AS avgsub;
-- QUERY7
-- recursive planner multipass the query and fails. Note that cte is not used in the query.
-- Why inlined query failed?
-- limit clause is recursively planned. First pass finishes here. At second pass noncolocated queries are recursively planned.
-- We detect multipass and throw error.
WITH cte_0 AS (
SELECT table_0.id FROM dist1 AS table_0 FULL JOIN dist1 AS table_1 USING (id)
)
SELECT avg(table_5.id) FROM (
SELECT table_6.id FROM (
SELECT table_7.id FROM dist0 AS table_7 ORDER BY id LIMIT 87
) AS table_6 INNER JOIN dist0 AS table_8 USING (id) WHERE table_8.id < 0 ORDER BY id
) AS table_5 INNER JOIN dist0 AS table_9 USING (id);
RESET client_min_messages;
DROP SCHEMA multi_recursive CASCADE;