-- -- PG17 -- SHOW server_version \gset SELECT substring(:'server_version', '\d+')::int >= 17 AS server_version_ge_17 \gset -- PG17 has the capabilty to pull up a correlated ANY subquery to a join if -- the subquery only refers to its immediate parent query. Previously, the -- subquery needed to be implemented as a SubPlan node, typically as a -- filter on a scan or join node. This PG17 capability enables Citus to -- run queries with correlated subqueries in certain cases, as shown here. -- Relevant PG commit: -- https://git.postgresql.org/gitweb/?p=postgresql.git;a=commitdiff;h=9f1337639 -- This feature is tested for all PG versions, not just PG17; each test query with -- a correlated subquery should fail with PG version < 17.0, but the test query -- rewritten to reflect how PG17 optimizes it should succeed with PG < 17.0 CREATE SCHEMA pg17_corr_subq_folding; SET search_path TO pg17_corr_subq_folding; SET citus.next_shard_id TO 20240017; SET citus.shard_count TO 2; SET citus.shard_replication_factor TO 1; CREATE TABLE test (x int, y int); SELECT create_distributed_table('test', 'x'); create_distributed_table --------------------------------------------------------------------- (1 row) INSERT INTO test VALUES (1,1), (2,2); -- Query 1: WHERE clause has a correlated subquery with a UNION. PG17 can plan -- this as a nested loop join with the subquery as the inner. The correlation -- is on the distribution column so the join can be pushed down by Citus. explain (costs off) SELECT * FROM test a WHERE x IN (SELECT x FROM test b UNION SELECT y FROM test c WHERE a.x = c.x) ORDER BY 1,2; ERROR: cannot push down this subquery DETAIL: Complex subqueries and CTEs are not supported within a UNION SET client_min_messages TO DEBUG2; SELECT * FROM test a WHERE x IN (SELECT x FROM test b UNION SELECT y FROM test c WHERE a.x = c.x) ORDER BY 1,2; DEBUG: Router planner cannot handle multi-shard select queries DEBUG: Router planner cannot handle multi-shard select queries DEBUG: generating subplan XXX_1 for subquery SELECT x FROM pg17_corr_subq_folding.test b DEBUG: skipping recursive planning for the subquery since it contains references to outer queries DEBUG: skipping recursive planning for the subquery since it contains references to outer queries DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT x, y FROM pg17_corr_subq_folding.test a WHERE (x OPERATOR(pg_catalog.=) ANY (SELECT intermediate_result.x FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(x integer) UNION SELECT c.y FROM pg17_corr_subq_folding.test c WHERE (a.x OPERATOR(pg_catalog.=) c.x))) ORDER BY x, y DEBUG: Router planner cannot handle multi-shard select queries DEBUG: skipping recursive planning for the subquery since it contains references to outer queries DEBUG: skipping recursive planning for the subquery since it contains references to outer queries ERROR: cannot push down this subquery DETAIL: Complex subqueries and CTEs are not supported within a UNION RESET client_min_messages; -- Query 1 rewritten with subquery pulled up to a join, as done by PG17 planner; -- this query can be run without issues by Citus with older (pre PG17) PGs. explain (costs off) SELECT a.* FROM test a JOIN LATERAL (SELECT x FROM test b UNION SELECT y FROM test c WHERE a.x = c.x) dt1 ON a.x = dt1.x ORDER BY 1,2; QUERY PLAN --------------------------------------------------------------------- Sort Sort Key: remote_scan.x, remote_scan.y -> Custom Scan (Citus Adaptive) Task Count: 2 Tasks Shown: One of 2 -> Task Node: host=localhost port=xxxxx dbname=regression -> Nested Loop -> Seq Scan on test_20240017 a -> Subquery Scan on dt1 Filter: (a.x = dt1.x) -> HashAggregate Group Key: b.x -> Append -> Seq Scan on test_20240017 b -> Seq Scan on test_20240017 c Filter: (a.x = x) (17 rows) SET client_min_messages TO DEBUG2; SELECT a.* FROM test a JOIN LATERAL (SELECT x FROM test b UNION SELECT y FROM test c WHERE a.x = c.x) dt1 ON a.x = dt1.x ORDER BY 1,2; DEBUG: Router planner cannot handle multi-shard select queries x | y --------------------------------------------------------------------- 1 | 1 2 | 2 (2 rows) RESET client_min_messages; CREATE TABLE users (user_id int, time int, dept int, info bigint); CREATE TABLE events (user_id int, time int, event_type int, payload text); select create_distributed_table('users', 'user_id'); create_distributed_table --------------------------------------------------------------------- (1 row) select create_distributed_table('events', 'user_id'); create_distributed_table --------------------------------------------------------------------- (1 row) insert into users select i, 2021 + (i % 3), i % 5, 99999 * i from generate_series(1, 10) i; insert into events select i % 10 + 1, 2021 + (i % 3), i %11, md5((i*i)::text) from generate_series(1, 100) i; -- Query 2. In Citus correlated subqueries can not be used in the WHERE -- clause but if the subquery can be pulled up to a join it becomes possible -- for Citus to run the query, per this example. Pre PG17 the suqbuery -- was implemented as a SubPlan filter on the events table scan. EXPLAIN (costs off) WITH event_id AS(SELECT user_id AS events_user_id, time AS events_time, event_type FROM events) SELECT Count(*) FROM event_id WHERE (events_user_id) IN (SELECT user_id FROM users WHERE users.time = events_time); ERROR: correlated subqueries are not supported when the FROM clause contains a CTE or subquery SET client_min_messages TO DEBUG2; WITH event_id AS(SELECT user_id AS events_user_id, time AS events_time, event_type FROM events) SELECT Count(*) FROM event_id WHERE (events_user_id) IN (SELECT user_id FROM users WHERE users.time = events_time); DEBUG: CTE event_id is going to be inlined via distributed planning DEBUG: Router planner cannot handle multi-shard select queries DEBUG: skipping recursive planning for the subquery since it contains references to outer queries DEBUG: Router planner cannot handle multi-shard select queries DEBUG: generating subplan XXX_1 for CTE event_id: SELECT user_id AS events_user_id, "time" AS events_time, event_type FROM pg17_corr_subq_folding.events DEBUG: Router planner cannot handle multi-shard select queries DEBUG: skipping recursive planning for the subquery since it contains references to outer queries DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (SELECT intermediate_result.events_user_id, intermediate_result.events_time, intermediate_result.event_type FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(events_user_id integer, events_time integer, event_type integer)) event_id WHERE (events_user_id OPERATOR(pg_catalog.=) ANY (SELECT users.user_id FROM pg17_corr_subq_folding.users WHERE (users."time" OPERATOR(pg_catalog.=) event_id.events_time))) DEBUG: Router planner cannot handle multi-shard select queries DEBUG: skipping recursive planning for the subquery since it contains references to outer queries ERROR: correlated subqueries are not supported when the FROM clause contains a CTE or subquery RESET client_min_messages; -- Query 2 rewritten with subquery pulled up to a join, as done by pg17 planner. Citus -- Citus is able to run this query with previous pg versions. Note that the CTE can be -- disregarded because it is inlined, being only referenced once. EXPLAIN (COSTS OFF) SELECT Count(*) FROM (SELECT user_id AS events_user_id, time AS events_time, event_type FROM events) dt1 INNER JOIN (SELECT distinct user_id, time FROM users) dt ON events_user_id = dt.user_id and events_time = dt.time; QUERY PLAN --------------------------------------------------------------------- Aggregate -> Custom Scan (Citus Adaptive) Task Count: 2 Tasks Shown: One of 2 -> Task Node: host=localhost port=xxxxx dbname=regression -> Aggregate -> Hash Join Hash Cond: ((events.user_id = users.user_id) AND (events."time" = users."time")) -> Seq Scan on events_20240021 events -> Hash -> HashAggregate Group Key: users.user_id, users."time" -> Seq Scan on users_20240019 users (14 rows) SET client_min_messages TO DEBUG2; SELECT Count(*) FROM (SELECT user_id AS events_user_id, time AS events_time, event_type FROM events) dt1 INNER JOIN (SELECT distinct user_id, time FROM users) dt ON events_user_id = dt.user_id and events_time = dt.time; DEBUG: Router planner cannot handle multi-shard select queries count --------------------------------------------------------------------- 31 (1 row) RESET client_min_messages; -- Query 3: another example where recursive planning was prevented due to -- correlated subqueries, but with PG17 folding the subquery to a join it is -- possible for Citus to plan and run the query. EXPLAIN (costs off) SELECT dept, sum(user_id) FROM (SELECT users.dept, users.user_id FROM users, events as d1 WHERE d1.user_id = users.user_id AND users.dept IN (3,4) AND users.user_id IN (SELECT s2.user_id FROM users as s2 GROUP BY d1.user_id, s2.user_id)) dt GROUP BY dept; ERROR: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns SET client_min_messages TO DEBUG2; SELECT dept, sum(user_id) FROM (SELECT users.dept, users.user_id FROM users, events as d1 WHERE d1.user_id = users.user_id AND users.dept IN (3,4) AND users.user_id IN (SELECT s2.user_id FROM users as s2 GROUP BY d1.user_id, s2.user_id)) dt GROUP BY dept; DEBUG: Router planner cannot handle multi-shard select queries DEBUG: skipping recursive planning for the subquery since it contains references to outer queries DEBUG: Router planner cannot handle multi-shard select queries DEBUG: skipping recursive planning for the subquery since it contains references to outer queries ERROR: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns RESET client_min_messages; -- Query 3 rewritten in a similar way to how the PG17 pulls up the subquery; -- the join is on the distribution key so Citus can push down. EXPLAIN (costs off) SELECT dept, sum(user_id) FROM (SELECT users.dept, users.user_id FROM users, events as d1 JOIN LATERAL (SELECT s2.user_id FROM users as s2 GROUP BY s2.user_id HAVING d1.user_id IS NOT NULL) as d2 ON 1=1 WHERE d1.user_id = users.user_id AND users.dept IN (3,4) AND users.user_id = d2.user_id) dt GROUP BY dept; QUERY PLAN --------------------------------------------------------------------- HashAggregate Group Key: remote_scan.dept -> Custom Scan (Citus Adaptive) Task Count: 2 Tasks Shown: One of 2 -> Task Node: host=localhost port=xxxxx dbname=regression -> GroupAggregate Group Key: users.dept -> Sort Sort Key: users.dept -> Nested Loop -> Hash Join Hash Cond: (d1.user_id = users.user_id) -> Seq Scan on events_20240021 d1 -> Hash -> Seq Scan on users_20240019 users Filter: (dept = ANY ('{3,4}'::integer[])) -> Subquery Scan on d2 Filter: (d1.user_id = d2.user_id) -> HashAggregate Group Key: s2.user_id -> Result One-Time Filter: (d1.user_id IS NOT NULL) -> Seq Scan on users_20240019 s2 (25 rows) SET client_min_messages TO DEBUG2; SELECT dept, sum(user_id) FROM (SELECT users.dept, users.user_id FROM users, events as d1 JOIN LATERAL (SELECT s2.user_id FROM users as s2 GROUP BY s2.user_id HAVING d1.user_id IS NOT NULL) as d2 ON 1=1 WHERE d1.user_id = users.user_id AND users.dept IN (3,4) AND users.user_id = d2.user_id) dt GROUP BY dept; DEBUG: Router planner cannot handle multi-shard select queries dept | sum --------------------------------------------------------------------- 3 | 110 4 | 130 (2 rows) SET client_min_messages TO DEBUG3; CREATE TABLE users_ref(user_id int, dept int); SELECT create_reference_table('users_ref'); create_reference_table --------------------------------------------------------------------- (1 row) INSERT INTO users_ref VALUES (1, 3), (2, 4), (3, 3), (4, 4); DEBUG: Creating router plan DEBUG: assigned task to node localhost:xxxxx -- In PG17, the planner can pull up a correlated ANY subquery to a join, resulting -- in a different query plan compared to PG16. Specifically, for the following query -- the rewritten query has a lateral recurring outer join, which requires recursive -- computation of the inner part. However, this join is not analyzed during the recursive -- planning step, as it is performed on the original query structure. As a result, -- the lateral join is not recursively planned, and a lateral join error is raised -- at a later stage. SELECT user_id FROM users RIGHT JOIN users_ref USING (user_id) WHERE users_ref.dept IN ( SELECT events.event_type FROM events WHERE events.user_id = users.user_id ) ORDER BY 1 LIMIT 1; DEBUG: no shard pruning constraints on events found DEBUG: shard count after pruning for events: 2 DEBUG: no shard pruning constraints on users found DEBUG: shard count after pruning for users: 2 DEBUG: Router planner cannot handle multi-shard select queries DEBUG: a push down safe right join with recurring left side DEBUG: push down of limit count: 1 DEBUG: no shard pruning constraints on events found DEBUG: shard count after pruning for events: 2 DEBUG: no shard pruning constraints on users found DEBUG: shard count after pruning for users: 2 DEBUG: assigned task to node localhost:xxxxx DEBUG: assigned task to node localhost:xxxxx user_id --------------------------------------------------------------------- 1 (1 row) RESET client_min_messages; RESET search_path; DROP SCHEMA pg17_corr_subq_folding CASCADE; NOTICE: drop cascades to 5 other objects DETAIL: drop cascades to table pg17_corr_subq_folding.test drop cascades to table pg17_corr_subq_folding.users drop cascades to table pg17_corr_subq_folding.events drop cascades to table pg17_corr_subq_folding.users_ref drop cascades to table pg17_corr_subq_folding.users_ref_20240023 -- Queries with outer joins with pseudoconstant quals work only in PG17 -- Relevant PG17 commit: -- https://github.com/postgres/postgres/commit/9e9931d2b CREATE SCHEMA pg17_outerjoin; SET search_path to pg17_outerjoin, public; SET citus.next_shard_id TO 20250321; -- issue https://github.com/citusdata/citus/issues/7697 create table t0 (vkey int4 , c3 timestamp); create table t3 ( vkey int4 ,c26 timestamp); create table t4 ( vkey int4 ); insert into t0 (vkey, c3) values (13,make_timestamp(2019, 10, 23, 15, 34, 50)); insert into t3 (vkey,c26) values (1, make_timestamp(2024, 3, 26, 17, 36, 53)); insert into t4 (vkey) values (1); select * from (t0 full outer join t3 on (t0.c3 = t3.c26 )) where (exists (select * from t4)) order by 1, 2, 3; vkey | c3 | vkey | c26 --------------------------------------------------------------------- 13 | Wed Oct 23 15:34:50 2019 | | | | 1 | Tue Mar 26 17:36:53 2024 (2 rows) SELECT create_distributed_table('t0', 'vkey'); NOTICE: Copying data from local table... NOTICE: copying the data has completed DETAIL: The local data in the table is no longer visible, but is still on disk. HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$pg17_outerjoin.t0$$) create_distributed_table --------------------------------------------------------------------- (1 row) select * from (t0 full outer join t3 on (t0.c3 = t3.c26 )) where (exists (select * from t4)) order by 1, 2, 3; ERROR: Distributed queries with outer joins and pseudoconstant quals are not supported in PG15 and PG16. DETAIL: PG15 and PG16 disallow replacing joins with scans when the query has pseudoconstant quals HINT: Consider upgrading your PG version to PG17+ -- issue https://github.com/citusdata/citus/issues/7696 create table t1 ( vkey int4 ); create table t2 ( vkey int4 ); insert into t2 (vkey) values (5); select * from (t2 full outer join t1 on(t2.vkey = t1.vkey )) where not((85) in (select 1 from t2)); vkey | vkey --------------------------------------------------------------------- 5 | (1 row) SELECT create_distributed_table('t1', 'vkey'); create_distributed_table --------------------------------------------------------------------- (1 row) SELECT create_reference_table('t2'); NOTICE: Copying data from local table... NOTICE: copying the data has completed DETAIL: The local data in the table is no longer visible, but is still on disk. HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$pg17_outerjoin.t2$$) create_reference_table --------------------------------------------------------------------- (1 row) select * from (t2 full outer join t1 on(t2.vkey = t1.vkey )) where not((85) in (select 1 from t2)); ERROR: Distributed queries with outer joins and pseudoconstant quals are not supported in PG15 and PG16. DETAIL: PG15 and PG16 disallow replacing joins with scans when the query has pseudoconstant quals HINT: Consider upgrading your PG version to PG17+ -- issue https://github.com/citusdata/citus/issues/7698 create table t5 ( vkey int4, c10 int4 ); create table t6 ( vkey int4 ); insert into t5 (vkey,c10) values (4, -70); insert into t6 (vkey) values (1); select t6.vkey from (t5 right outer join t6 on (t5.c10 = t6.vkey)) where exists (select * from t6); vkey --------------------------------------------------------------------- 1 (1 row) SELECT create_distributed_table('t5', 'vkey'); NOTICE: Copying data from local table... NOTICE: copying the data has completed DETAIL: The local data in the table is no longer visible, but is still on disk. HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$pg17_outerjoin.t5$$) create_distributed_table --------------------------------------------------------------------- (1 row) select t6.vkey from (t5 right outer join t6 on (t5.c10 = t6.vkey)) where exists (select * from t6); ERROR: Distributed queries with outer joins and pseudoconstant quals are not supported in PG15 and PG16. DETAIL: PG15 and PG16 disallow replacing joins with scans when the query has pseudoconstant quals HINT: Consider upgrading your PG version to PG17+ -- issue https://github.com/citusdata/citus/issues/7119 -- this test was removed in -- https://github.com/citusdata/citus/commit/a5ce601c0 -- Citus doesn't support it in PG15 and PG16, but supports it in PG17 CREATE TABLE users_table_local AS SELECT * FROM users_table; CREATE TABLE events_table_local AS SELECT * FROM events_table; SET client_min_messages TO DEBUG1; -- subquery in FROM -> FROM -> WHERE -> WHERE should be replaced if -- it contains onle local tables -- Later the upper level query is also recursively planned due to LIMIT SELECT user_id, array_length(events_table, 1) FROM ( SELECT user_id, array_agg(event ORDER BY time) AS events_table FROM ( SELECT u.user_id, e.event_type::text AS event, e.time FROM users_table AS u, events_table AS e WHERE u.user_id = e.user_id AND u.user_id IN ( SELECT user_id FROM users_table WHERE value_2 >= 5 AND EXISTS (SELECT user_id FROM events_table_local WHERE event_type > 1 AND event_type <= 3 AND value_3 > 1) AND NOT EXISTS (SELECT user_id FROM events_table WHERE event_type > 3 AND event_type <= 4 AND value_3 > 1 AND user_id = users_table.user_id) LIMIT 5 ) ) t GROUP BY user_id ) q ORDER BY 2 DESC, 1; DEBUG: generating subplan XXX_1 for subquery SELECT user_id FROM pg17_outerjoin.events_table_local WHERE ((event_type OPERATOR(pg_catalog.>) 1) AND (event_type OPERATOR(pg_catalog.<=) 3) AND (value_3 OPERATOR(pg_catalog.>) (1)::double precision)) ERROR: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns RESET search_path; SET citus.next_shard_id TO 20240023; SET client_min_messages TO ERROR; DROP SCHEMA pg17_outerjoin CASCADE; RESET client_min_messages; \if :server_version_ge_17 \else \q