diff --git a/src/backend/distributed/planner/deparse_shard_query.c b/src/backend/distributed/planner/deparse_shard_query.c index 1b8cd862b..abdae7854 100644 --- a/src/backend/distributed/planner/deparse_shard_query.c +++ b/src/backend/distributed/planner/deparse_shard_query.c @@ -320,8 +320,38 @@ DefineQualsForShardInterval(RelationShard *relationShard, int attnum, int rtinde /* - * UpdateWhereClauseForOuterJoin walks over the query tree and appends quals - * to the WHERE clause to filter w.r.to the distribution column of the corresponding shard. + * UpdateWhereClauseForOuterJoin + * + * Inject shard interval predicates into the query WHERE clause for certain + * outer joins to make the join semantically correct when distributed. + * + * Why this is needed: + * When an inner side of an OUTER JOIN is a distributed table that has been + * routed to a single shard, we cannot simply replace the RTE with the shard + * name and rely on implicit pruning: the preserved (outer) side could still + * produce rows whose join keys would hash to other shards. To keep results + * consistent with the global execution semantics we restrict the preserved + * (outer) side to only those partition key values that would route to the + * chosen shard (plus NULLs, which are assigned to exactly one shard). + * + * What the function does: + * 1. Iterate over the top-level jointree->fromlist. + * 2. For each JoinExpr call CheckPushDownFeasibilityAndComputeIndexes() which: + * - Verifies shape / join type is eligible. + * - Returns: + * outerRtIndex : RT index whose column we will constrain, + * outerRte / innerRte, + * attnum : attribute number (partition column) on outer side. + * 3. Find the RelationShard for the inner distributed table (innerRte->relid) + * in relationShardList; skip if absent (no fixed shard chosen). + * 4. Build the shard qualification with DefineQualsForShardInterval(): + * (minValue < hash(partcol) AND hash(partcol) <= maxValue) + * and, for the first shard only, OR (partcol IS NULL). + * The Var refers to (outerRtIndex, attnum) so the restriction applies to + * the preserved outer input. + * 5. AND the new quals into jointree->quals (creating it if NULL). + * + * The function does not return anything, it modifies the query in place. */ void UpdateWhereClauseForOuterJoin(Query *query, List *relationShardList) @@ -387,7 +417,6 @@ UpdateWhereClauseForOuterJoin(Query *query, List *relationShardList) fromExpr->quals = make_and_qual(fromExpr->quals, shardIntervalBoundQuals); } } - return; } diff --git a/src/include/distributed/listutils.h b/src/include/distributed/listutils.h index 32f9102c4..db9ea7ce7 100644 --- a/src/include/distributed/listutils.h +++ b/src/include/distributed/listutils.h @@ -160,7 +160,7 @@ typedef struct ListCellAndListWrapper for (int var ## PositionDoNotUse = 0; \ (var ## PositionDoNotUse) < list_length(l) && \ (((var) = list_nth(l, var ## PositionDoNotUse)) || true); \ - var ## PositionDoNotUse++) + var ## PositionDoNotUse ++) /* utility functions declaration shared within this module */ extern List * SortList(List *pointerList, diff --git a/src/include/distributed/resource_lock.h b/src/include/distributed/resource_lock.h index 9e77310f5..dfadc0263 100644 --- a/src/include/distributed/resource_lock.h +++ b/src/include/distributed/resource_lock.h @@ -181,9 +181,9 @@ IsNodeWideObjectClass(ObjectClass objectClass) #if PG_VERSION_NUM >= PG_VERSION_16 case OCLASS_ROLE_MEMBERSHIP: #endif - { - return true; - } + { + return true; + } default: return false; diff --git a/src/test/regress/citus_tests/run_test.py b/src/test/regress/citus_tests/run_test.py index 0bf03f7a1..63b5b1fd4 100755 --- a/src/test/regress/citus_tests/run_test.py +++ b/src/test/regress/citus_tests/run_test.py @@ -237,11 +237,11 @@ DEPS = { ), "multi_limit_clause_approximate": TestDeps( "minimal_schedule", - ["multi_create_table", "multi_create_users", "multi_multiuser_load_data"], + ["multi_create_table", "multi_create_users", "multi_load_data"], ), "multi_single_relation_subquery": TestDeps( "minimal_schedule", - ["multi_create_table", "multi_create_users", "multi_multiuser_load_data"], + ["multi_create_table", "multi_create_users", "multi_load_data"], ), "multi_subquery_complex_reference_clause": TestDeps( "minimal_schedule", ["multi_behavioral_analytics_create_table"] diff --git a/src/test/regress/expected/recurring_outer_join.out b/src/test/regress/expected/recurring_outer_join.out index f2137ffb4..0c479f7e1 100644 --- a/src/test/regress/expected/recurring_outer_join.out +++ b/src/test/regress/expected/recurring_outer_join.out @@ -560,8 +560,11 @@ LATERAL --------------------------------------------------------------------- (0 rows) --- Qual is the same but top-level join is an anti-join. Right join --- is pushed down. +-- Qual is the same but top-level join is an anti-join. +-- The right join between t2 and t3 is pushed down. +-- Citus determines that the whole query can be pushed down +-- due to the equality constraint between two distributed +-- tables t1 and t2. SELECT COUNT(*) FROM dist_1 t1 WHERE NOT EXISTS ( SELECT * FROM dist_1 t2 @@ -573,22 +576,53 @@ WHERE NOT EXISTS ( 8 (1 row) +SET client_min_messages TO DEBUG3; -- This time the semi-join qual is (not <) -- where t3 is the outer rel of the right join. Hence Postgres can't --- replace right join with an inner join and so we recursively plan --- inner side of the right join since the outer side is a recurring --- rel. +-- replace right join with an inner join. +-- Citus pushes down the right join between t2 and t3 with constraints on +-- the recurring outer part (t3). However, it cannnot push down the whole +-- query as it can not establish an equivalence between the distribution +-- tables t1 and t2. Hence, Citus tries to recursively plan the subquery. +-- This attempt fails since the subquery has a reference to outer query. +-- See #8113 SELECT COUNT(*) FROM dist_1 t1 WHERE EXISTS ( SELECT * FROM dist_1 t2 RIGHT JOIN ref_1 t3 USING (a) WHERE t3.a = t1.a ); -DEBUG: recursively planning left side of the right join since the outer side is a recurring rel -DEBUG: recursively planning distributed relation "dist_1" "t2" since it is part of a distributed join node that is outer joined with a recurring rel -DEBUG: Wrapping relation "dist_1" "t2" to a subquery -DEBUG: generating subplan XXX_1 for subquery SELECT a FROM recurring_outer_join.dist_1 t2 WHERE true -DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM recurring_outer_join.dist_1 t1 WHERE (EXISTS (SELECT t3.a, t2.b, t3.b FROM ((SELECT t2_1.a, NULL::integer AS b FROM (SELECT intermediate_result.a FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer)) t2_1) t2 RIGHT JOIN recurring_outer_join.ref_1 t3 USING (a)) WHERE (t3.a OPERATOR(pg_catalog.=) t1.a))) +DEBUG: no shard pruning constraints on dist_1 found +DEBUG: shard count after pruning for dist_1: 32 +DEBUG: no shard pruning constraints on dist_1 found +DEBUG: shard count after pruning for dist_1: 32 +DEBUG: Router planner cannot handle multi-shard select queries +DEBUG: a push down safe right join with recurring left side +DEBUG: skipping recursive planning for the subquery since it contains references to outer queries +ERROR: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns +SELECT COUNT(*) FROM dist_1 t1 +WHERE NOT EXISTS ( + SELECT * FROM dist_1 t2 + RIGHT JOIN ref_1 t3 USING (a) + WHERE t3.a = t1.a +); +DEBUG: no shard pruning constraints on dist_1 found +DEBUG: shard count after pruning for dist_1: 32 +DEBUG: no shard pruning constraints on dist_1 found +DEBUG: shard count after pruning for dist_1: 32 +DEBUG: Router planner cannot handle multi-shard select queries +DEBUG: a push down safe right join with recurring left side +DEBUG: skipping recursive planning for the subquery since it contains references to outer queries +ERROR: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns +SET client_min_messages TO DEBUG1; +-- Force recursive planning of the right join with offset +SELECT COUNT(*) FROM dist_1 t1 +WHERE EXISTS ( + SELECT * FROM dist_1 t2 + RIGHT JOIN ref_1 t3 USING (a) + WHERE t3.a = t1.a + OFFSET 0 +); count --------------------------------------------------------------------- 7 @@ -599,12 +633,8 @@ WHERE NOT EXISTS ( SELECT * FROM dist_1 t2 RIGHT JOIN ref_1 t3 USING (a) WHERE t3.a = t1.a + OFFSET 0 ); -DEBUG: recursively planning left side of the right join since the outer side is a recurring rel -DEBUG: recursively planning distributed relation "dist_1" "t2" since it is part of a distributed join node that is outer joined with a recurring rel -DEBUG: Wrapping relation "dist_1" "t2" to a subquery -DEBUG: generating subplan XXX_1 for subquery SELECT a FROM recurring_outer_join.dist_1 t2 WHERE true -DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM recurring_outer_join.dist_1 t1 WHERE (NOT (EXISTS (SELECT t3.a, t2.b, t3.b FROM ((SELECT t2_1.a, NULL::integer AS b FROM (SELECT intermediate_result.a FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer)) t2_1) t2 RIGHT JOIN recurring_outer_join.ref_1 t3 USING (a)) WHERE (t3.a OPERATOR(pg_catalog.=) t1.a)))) count --------------------------------------------------------------------- 8 @@ -1548,7 +1578,7 @@ SELECT * FROM ref_1 t36 WHERE (b,100,a) IN ( DISTINCT t31.b, -- 1) we first search for such joins in the target list and recursively plan t33 -- because t32 is recurring - (SELECT max(b) FROM ref_1 t32 LEFT JOIN dist_1 t33 USING(a,b) WHERE t31.a = t32.a), + (SELECT max(b) FROM ref_1 t32 LEFT JOIN dist_1 t33 USING(b) WHERE t31.a = t32.a), (SELECT t34.a) FROM ref_1 t35 LEFT JOIN dist_1 t31 USING (a,b) @@ -1599,16 +1629,13 @@ DEBUG: CTE cte_1_inner_cte is going to be inlined via distributed planning DEBUG: recursively planning right side of the left join since the outer side is a recurring rel DEBUG: recursively planning distributed relation "dist_1" "t33" since it is part of a distributed join node that is outer joined with a recurring rel DEBUG: Wrapping relation "dist_1" "t33" to a subquery -DEBUG: generating subplan XXX_1 for subquery SELECT a, b FROM recurring_outer_join.dist_1 t33 WHERE true -DEBUG: recursively planning right side of the left join since the outer side is a recurring rel -DEBUG: recursively planning distributed relation "dist_1" "t12" since it is part of a distributed join node that is outer joined with a recurring rel -DEBUG: Wrapping relation "dist_1" "t12" to a subquery -DEBUG: generating subplan XXX_2 for subquery SELECT a, b FROM recurring_outer_join.dist_1 t12 WHERE true +DEBUG: generating subplan XXX_1 for subquery SELECT b FROM recurring_outer_join.dist_1 t33 WHERE true +DEBUG: generating subplan XXX_2 for subquery SELECT t11.a, t11.b FROM (recurring_outer_join.ref_1 t11 LEFT JOIN recurring_outer_join.dist_1 t12 USING (a, b)) DEBUG: recursively planning right side of the left join since the outer side is a recurring rel DEBUG: recursively planning distributed relation "dist_1" "t14" since it is part of a distributed join node that is outer joined with a recurring rel DEBUG: Wrapping relation "dist_1" "t14" to a subquery DEBUG: generating subplan XXX_3 for subquery SELECT a, b FROM recurring_outer_join.dist_1 t14 WHERE true -DEBUG: generating subplan XXX_4 for subquery SELECT t13.a, t13.b FROM ((recurring_outer_join.ref_1 t13 LEFT JOIN (SELECT t14_1.a, t14_1.b FROM (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) t14_1) t14 USING (a, b)) JOIN (SELECT t11.a, t11.b FROM (recurring_outer_join.ref_1 t11 LEFT JOIN (SELECT t12_1.a, t12_1.b FROM (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) t12_1) t12 USING (a, b))) t15 USING (a, b)) OFFSET 0 +DEBUG: generating subplan XXX_4 for subquery SELECT t13.a, t13.b FROM ((recurring_outer_join.ref_1 t13 LEFT JOIN (SELECT t14_1.a, t14_1.b FROM (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) t14_1) t14 USING (a, b)) JOIN (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) t15 USING (a, b)) OFFSET 0 DEBUG: recursively planning right side of the left join since the outer side is a recurring rel DEBUG: recursively planning distributed relation "dist_1" "d" since it is part of a distributed join node that is outer joined with a recurring rel DEBUG: Wrapping relation "dist_1" "d" to a subquery @@ -1658,23 +1685,15 @@ DEBUG: recursively planning right side of the left join since the outer side is DEBUG: recursively planning distributed relation "dist_1" "t34" since it is part of a distributed join node that is outer joined with a recurring rel DEBUG: Wrapping relation "dist_1" "t34" to a subquery DEBUG: generating subplan XXX_17 for subquery SELECT a, b FROM recurring_outer_join.dist_1 t34 WHERE true -DEBUG: generating subplan XXX_18 for subquery SELECT DISTINCT t31.b, (SELECT max(t32.b) AS max FROM (recurring_outer_join.ref_1 t32 LEFT JOIN (SELECT t33_1.a, t33_1.b FROM (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) t33_1) t33 USING (a, b)) WHERE (t31.a OPERATOR(pg_catalog.=) t32.a)) AS max, (SELECT t34.a) AS a FROM ((((recurring_outer_join.ref_1 t35 LEFT JOIN (SELECT t31_1.a, t31_1.b FROM (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_16'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) t31_1) t31 USING (a, b)) LEFT JOIN (SELECT t34_1.a, t34_1.b FROM (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_17'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) t34_1) t34 USING (a, b)) LEFT JOIN (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_4'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) cte_1 USING (a, b)) LEFT JOIN (SELECT intermediate_result.a FROM read_intermediate_result('XXX_15'::text, 'binary'::citus_copy_format) intermediate_result(a bigint)) t30 ON ((t30.a OPERATOR(pg_catalog.=) cte_1.a))) ORDER BY t31.b, (SELECT max(t32.b) AS max FROM (recurring_outer_join.ref_1 t32 LEFT JOIN (SELECT t33_1.a, t33_1.b FROM (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) t33_1) t33 USING (a, b)) WHERE (t31.a OPERATOR(pg_catalog.=) t32.a)), (SELECT t34.a) -DEBUG: recursively planning right side of the left join since the outer side is a recurring rel -DEBUG: recursively planning distributed relation "dist_1" "t3" since it is part of a distributed join node that is outer joined with a recurring rel -DEBUG: Wrapping relation "dist_1" "t3" to a subquery -DEBUG: generating subplan XXX_19 for subquery SELECT a, b FROM recurring_outer_join.dist_1 t3 WHERE true +DEBUG: generating subplan XXX_18 for subquery SELECT DISTINCT t31.b, (SELECT max(t32.b) AS max FROM (recurring_outer_join.ref_1 t32 LEFT JOIN (SELECT NULL::integer AS a, t33_1.b FROM (SELECT intermediate_result.b FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(b integer)) t33_1) t33 USING (b)) WHERE (t31.a OPERATOR(pg_catalog.=) t32.a)) AS max, (SELECT t34.a) AS a FROM ((((recurring_outer_join.ref_1 t35 LEFT JOIN (SELECT t31_1.a, t31_1.b FROM (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_16'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) t31_1) t31 USING (a, b)) LEFT JOIN (SELECT t34_1.a, t34_1.b FROM (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_17'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) t34_1) t34 USING (a, b)) LEFT JOIN (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_4'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) cte_1 USING (a, b)) LEFT JOIN (SELECT intermediate_result.a FROM read_intermediate_result('XXX_15'::text, 'binary'::citus_copy_format) intermediate_result(a bigint)) t30 ON ((t30.a OPERATOR(pg_catalog.=) cte_1.a))) ORDER BY t31.b, (SELECT max(t32.b) AS max FROM (recurring_outer_join.ref_1 t32 LEFT JOIN (SELECT NULL::integer AS a, t33_1.b FROM (SELECT intermediate_result.b FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(b integer)) t33_1) t33 USING (b)) WHERE (t31.a OPERATOR(pg_catalog.=) t32.a)), (SELECT t34.a) DEBUG: push down of limit count: 10 -DEBUG: generating subplan XXX_20 for subquery SELECT b, (SELECT t2.a FROM (recurring_outer_join.ref_1 t2 LEFT JOIN (SELECT t3_1.a, t3_1.b FROM (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_19'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) t3_1) t3 USING (a, b)) WHERE (t2.a OPERATOR(pg_catalog.=) t1.a) ORDER BY t2.a LIMIT 1) AS a FROM recurring_outer_join.dist_1 t1 ORDER BY b, (SELECT t2.a FROM (recurring_outer_join.ref_1 t2 LEFT JOIN (SELECT t3_1.a, t3_1.b FROM (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_19'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) t3_1) t3 USING (a, b)) WHERE (t2.a OPERATOR(pg_catalog.=) t1.a) ORDER BY t2.a LIMIT 1) LIMIT 10 -DEBUG: recursively planning right side of the left join since the outer side is a recurring rel -DEBUG: recursively planning distributed relation "dist_1" "t5" since it is part of a distributed join node that is outer joined with a recurring rel -DEBUG: Wrapping relation "dist_1" "t5" to a subquery -DEBUG: generating subplan XXX_21 for subquery SELECT a, b FROM recurring_outer_join.dist_1 t5 WHERE true -DEBUG: generating subplan XXX_22 for subquery SELECT a, b FROM (SELECT t8.a, t8.b FROM (SELECT t7.a, t7.b FROM (SELECT t6.a, t6.b FROM (SELECT t4.a, t4.b FROM (recurring_outer_join.ref_1 t4 LEFT JOIN (SELECT t5_1.a, t5_1.b FROM (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_21'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) t5_1) t5 USING (a, b))) t6) t7) t8) t9 OFFSET 0 +DEBUG: generating subplan XXX_19 for subquery SELECT b, (SELECT t2.a FROM (recurring_outer_join.ref_1 t2 LEFT JOIN recurring_outer_join.dist_1 t3 USING (a, b)) WHERE (t2.a OPERATOR(pg_catalog.=) t1.a) ORDER BY t2.a LIMIT 1) AS a FROM recurring_outer_join.dist_1 t1 ORDER BY b, (SELECT t2.a FROM (recurring_outer_join.ref_1 t2 LEFT JOIN recurring_outer_join.dist_1 t3 USING (a, b)) WHERE (t2.a OPERATOR(pg_catalog.=) t1.a) ORDER BY t2.a LIMIT 1) LIMIT 10 +DEBUG: generating subplan XXX_20 for subquery SELECT a, b FROM (SELECT t8.a, t8.b FROM (SELECT t7.a, t7.b FROM (SELECT t6.a, t6.b FROM (SELECT t4.a, t4.b FROM (recurring_outer_join.ref_1 t4 LEFT JOIN recurring_outer_join.dist_1 t5 USING (a, b))) t6) t7) t8) t9 OFFSET 0 DEBUG: recursively planning right side of the left join since the outer side is a recurring rel DEBUG: recursively planning distributed relation "dist_1" "t11" since it is part of a distributed join node that is outer joined with a recurring rel DEBUG: Wrapping relation "dist_1" "t11" to a subquery -DEBUG: generating subplan XXX_23 for subquery SELECT b FROM recurring_outer_join.dist_1 t11 WHERE true -DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT a, b FROM recurring_outer_join.ref_1 t36 WHERE (((b, 100, a) OPERATOR(pg_catalog.=) ANY (SELECT intermediate_result.b, intermediate_result.max, intermediate_result.a FROM read_intermediate_result('XXX_18'::text, 'binary'::citus_copy_format) intermediate_result(b integer, max integer, a integer))) AND (NOT (a OPERATOR(pg_catalog.=) ANY (SELECT outer_cte_1.b FROM ((SELECT intermediate_result.b, intermediate_result.a FROM read_intermediate_result('XXX_20'::text, 'binary'::citus_copy_format) intermediate_result(b integer, a integer)) outer_cte_1 LEFT JOIN (SELECT t10.b, t10.a, t11.a FROM ((SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_22'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) t10 LEFT JOIN (SELECT NULL::integer AS a, t11_1.b FROM (SELECT intermediate_result.b FROM read_intermediate_result('XXX_23'::text, 'binary'::citus_copy_format) intermediate_result(b integer)) t11_1) t11 USING (b))) outer_cte_2(b, a, a_1) USING (b)))))) +DEBUG: generating subplan XXX_21 for subquery SELECT b FROM recurring_outer_join.dist_1 t11 WHERE true +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT a, b FROM recurring_outer_join.ref_1 t36 WHERE (((b, 100, a) OPERATOR(pg_catalog.=) ANY (SELECT intermediate_result.b, intermediate_result.max, intermediate_result.a FROM read_intermediate_result('XXX_18'::text, 'binary'::citus_copy_format) intermediate_result(b integer, max integer, a integer))) AND (NOT (a OPERATOR(pg_catalog.=) ANY (SELECT outer_cte_1.b FROM ((SELECT intermediate_result.b, intermediate_result.a FROM read_intermediate_result('XXX_19'::text, 'binary'::citus_copy_format) intermediate_result(b integer, a integer)) outer_cte_1 LEFT JOIN (SELECT t10.b, t10.a, t11.a FROM ((SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_20'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) t10 LEFT JOIN (SELECT NULL::integer AS a, t11_1.b FROM (SELECT intermediate_result.b FROM read_intermediate_result('XXX_21'::text, 'binary'::citus_copy_format) intermediate_result(b integer)) t11_1) t11 USING (b))) outer_cte_2(b, a, a_1) USING (b)))))) a | b --------------------------------------------------------------------- 1 | 11 diff --git a/src/test/regress/sql/recurring_outer_join.sql b/src/test/regress/sql/recurring_outer_join.sql index 6851d5cf0..2e064661d 100644 --- a/src/test/regress/sql/recurring_outer_join.sql +++ b/src/test/regress/sql/recurring_outer_join.sql @@ -292,8 +292,11 @@ LATERAL WHERE r1.a > dist_1.b ) as foo; --- Qual is the same but top-level join is an anti-join. Right join --- is pushed down. +-- Qual is the same but top-level join is an anti-join. +-- The right join between t2 and t3 is pushed down. +-- Citus determines that the whole query can be pushed down +-- due to the equality constraint between two distributed +-- tables t1 and t2. SELECT COUNT(*) FROM dist_1 t1 WHERE NOT EXISTS ( SELECT * FROM dist_1 t2 @@ -301,11 +304,17 @@ WHERE NOT EXISTS ( WHERE t2.a = t1.a ); +SET client_min_messages TO DEBUG3; + -- This time the semi-join qual is (not <) -- where t3 is the outer rel of the right join. Hence Postgres can't --- replace right join with an inner join and so we recursively plan --- inner side of the right join since the outer side is a recurring --- rel. +-- replace right join with an inner join. +-- Citus pushes down the right join between t2 and t3 with constraints on +-- the recurring outer part (t3). However, it cannnot push down the whole +-- query as it can not establish an equivalence between the distribution +-- tables t1 and t2. Hence, Citus tries to recursively plan the subquery. +-- This attempt fails since the subquery has a reference to outer query. +-- See #8113 SELECT COUNT(*) FROM dist_1 t1 WHERE EXISTS ( SELECT * FROM dist_1 t2 @@ -320,6 +329,25 @@ WHERE NOT EXISTS ( WHERE t3.a = t1.a ); +SET client_min_messages TO DEBUG1; + +-- Force recursive planning of the right join with offset +SELECT COUNT(*) FROM dist_1 t1 +WHERE EXISTS ( + SELECT * FROM dist_1 t2 + RIGHT JOIN ref_1 t3 USING (a) + WHERE t3.a = t1.a + OFFSET 0 +); + +SELECT COUNT(*) FROM dist_1 t1 +WHERE NOT EXISTS ( + SELECT * FROM dist_1 t2 + RIGHT JOIN ref_1 t3 USING (a) + WHERE t3.a = t1.a + OFFSET 0 +); + -- -- more complex cases -- @@ -840,7 +868,7 @@ SELECT * FROM ref_1 t36 WHERE (b,100,a) IN ( DISTINCT t31.b, -- 1) we first search for such joins in the target list and recursively plan t33 -- because t32 is recurring - (SELECT max(b) FROM ref_1 t32 LEFT JOIN dist_1 t33 USING(a,b) WHERE t31.a = t32.a), + (SELECT max(b) FROM ref_1 t32 LEFT JOIN dist_1 t33 USING(b) WHERE t31.a = t32.a), (SELECT t34.a) FROM ref_1 t35 LEFT JOIN dist_1 t31 USING (a,b)