From 81bda6fb8ef940dae060eed133da6aaab1a5f815 Mon Sep 17 00:00:00 2001 From: Colm Date: Wed, 20 Nov 2024 11:51:16 +0000 Subject: [PATCH] PG17 compatibility: add/fix tests with correlated subqueries that can be pulled to a join (#7745) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fix Test Failure in subquery_in_where, set_operations, dml_recursive in PG17 #7741 The test failures are caused by[ this commit in PG17](https://git.postgresql.org/gitweb/?p=postgresql.git;a=commitdiff;h=9f1337639), which enables correlated subqueries to be pulled up to a join. Prior to this, the correlated subquery was implemented as a subplan. In citus, it is not possible to pushdown a correlated subplan, but with a different plan in PG17 the query can be executed, per the test diff from `subquery_in_where`: ``` 37,39c37,41 < DEBUG: generating subplan XXX_1 for CTE event_id: SELECT user_id AS events_user_id, "time" AS events_time, event_type FROM public.events_table < DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ... < ERROR: correlated subqueries are not supported when the FROM clause contains a CTE or subquery --- > count > --------------------------------------------------------------------- > 0 > (1 row) > ``` This is because with pg17 `= ANY subquery` in the queries can be implemented as a join, instead of as a subplan filter on a table scan. For example, `SELECT * FROM test a WHERE x IN (SELECT x FROM test b UNION SELECT y FROM test c WHERE a.x = c.x) ORDER BY 1,2` (from set_operations) has this plan in pg17; note that the subquery is the inner side of a nested loop join: ``` ┌───────────────────────────────────────────────────┐ │ QUERY PLAN │ ├───────────────────────────────────────────────────┤ │ Sort │ │ Sort Key: a.x, a.y │ │ -> Nested Loop │ │ -> Seq Scan on test a │ │ -> Subquery Scan on "ANY_subquery" │ │ Filter: (a.x = "ANY_subquery".x) │ │ -> HashAggregate │ │ Group Key: b.x │ │ -> Append │ │ -> Seq Scan on test b │ │ -> Seq Scan on test c │ │ Filter: (a.x = x) │ └───────────────────────────────────────────────────┘ ``` and this plan in pg16 (and previous pg versions); the subquery is a correlated subplan filter on a table scan: ``` ┌───────────────────────────────────────────────┐ │ QUERY PLAN │ ├───────────────────────────────────────────────┤ │ Sort │ │ Sort Key: a.x, a.y │ │ -> Seq Scan on test a │ │ Filter: (SubPlan 1) │ │ SubPlan 1 │ │ -> HashAggregate │ │ Group Key: b.x │ │ -> Append │ │ -> Seq Scan on test b │ │ -> Seq Scan on test c │ │ Filter: (a.x = x) │ └───────────────────────────────────────────────┘ ``` The fix Modifies the queries causing the test failures so that an ANY subquery is not folded to a join, preserving the expected output of the tests. A similar approach was taken for existing regress tests in the[ postgres commit](https://git.postgresql.org/gitweb/?p=postgresql.git;a=commitdiff;h=9f1337639). See the `join `regress test, for example. We also add pg17 specific tests that leverage this improvement in Postgres with Citus distributed planning as well. --- src/test/regress/expected/dml_recursive.out | 3 +- src/test/regress/expected/pg17.out | 354 ++++++++++++++++++ src/test/regress/expected/pg17_0.out | 295 +++++++++++++++ src/test/regress/expected/set_operations.out | 4 +- .../regress/expected/subquery_in_where.out | 4 +- src/test/regress/multi_schedule | 1 + src/test/regress/sql/dml_recursive.sql | 3 +- src/test/regress/sql/pg17.sql | 182 +++++++++ src/test/regress/sql/set_operations.sql | 2 +- src/test/regress/sql/subquery_in_where.sql | 2 +- 10 files changed, 842 insertions(+), 8 deletions(-) create mode 100644 src/test/regress/expected/pg17.out create mode 100644 src/test/regress/expected/pg17_0.out create mode 100644 src/test/regress/sql/pg17.sql diff --git a/src/test/regress/expected/dml_recursive.out b/src/test/regress/expected/dml_recursive.out index cc4058def..be131f661 100644 --- a/src/test/regress/expected/dml_recursive.out +++ b/src/test/regress/expected/dml_recursive.out @@ -266,6 +266,7 @@ ERROR: complex joins are only supported when all distributed tables are co-loca -- again a correlated subquery -- this time distribution key eq. exists -- however recursive planning is prevented due to correlated subqueries +-- that cannot be folded to joins. UPDATE second_distributed_table SET @@ -285,7 +286,7 @@ FROM AND second_distributed_table.tenant_id IN ( - SELECT s2.tenant_id + SELECT s2.tenant_id || random()::text FROM second_distributed_table as s2 GROUP BY d1.tenant_id, s2.tenant_id ) diff --git a/src/test/regress/expected/pg17.out b/src/test/regress/expected/pg17.out new file mode 100644 index 000000000..8943e78e7 --- /dev/null +++ b/src/test/regress/expected/pg17.out @@ -0,0 +1,354 @@ +-- +-- PG17 +-- +SHOW server_version \gset +SELECT substring(:'server_version', '\d+')::int >= 17 AS server_version_ge_17 +\gset +-- PG17 has the capabilty to pull up a correlated ANY subquery to a join if +-- the subquery only refers to its immediate parent query. Previously, the +-- subquery needed to be implemented as a SubPlan node, typically as a +-- filter on a scan or join node. This PG17 capability enables Citus to +-- run queries with correlated subqueries in certain cases, as shown here. +-- Relevant PG commit: +-- https://git.postgresql.org/gitweb/?p=postgresql.git;a=commitdiff;h=9f1337639 +-- This feature is tested for all PG versions, not just PG17; each test query with +-- a correlated subquery should fail with PG version < 17.0, but the test query +-- rewritten to reflect how PG17 optimizes it should succeed with PG < 17.0 +CREATE SCHEMA pg17_corr_subq_folding; +SET search_path TO pg17_corr_subq_folding; +SET citus.next_shard_id TO 20240017; +SET citus.shard_count TO 2; +SET citus.shard_replication_factor TO 1; +CREATE TABLE test (x int, y int); +SELECT create_distributed_table('test', 'x'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO test VALUES (1,1), (2,2); +-- Query 1: WHERE clause has a correlated subquery with a UNION. PG17 can plan +-- this as a nested loop join with the subquery as the inner. The correlation +-- is on the distribution column so the join can be pushed down by Citus. +explain (costs off) +SELECT * +FROM test a +WHERE x IN (SELECT x FROM test b UNION SELECT y FROM test c WHERE a.x = c.x) +ORDER BY 1,2; + QUERY PLAN +--------------------------------------------------------------------- + Sort + Sort Key: remote_scan.x, remote_scan.y + -> Custom Scan (Citus Adaptive) + Task Count: 2 + Tasks Shown: One of 2 + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Nested Loop + -> Seq Scan on test_20240017 a + -> Subquery Scan on "ANY_subquery" + Filter: (a.x = "ANY_subquery".x) + -> HashAggregate + Group Key: b.x + -> Append + -> Seq Scan on test_20240017 b + -> Seq Scan on test_20240017 c + Filter: (a.x = x) +(17 rows) + +SET client_min_messages TO DEBUG2; +SELECT * +FROM test a +WHERE x IN (SELECT x FROM test b UNION SELECT y FROM test c WHERE a.x = c.x) +ORDER BY 1,2; +DEBUG: Router planner cannot handle multi-shard select queries + x | y +--------------------------------------------------------------------- + 1 | 1 + 2 | 2 +(2 rows) + +RESET client_min_messages; +-- Query 1 rewritten with subquery pulled up to a join, as done by PG17 planner; +-- this query can be run without issues by Citus with older (pre PG17) PGs. +explain (costs off) +SELECT a.* +FROM test a JOIN LATERAL (SELECT x FROM test b UNION SELECT y FROM test c WHERE a.x = c.x) dt1 ON a.x = dt1.x +ORDER BY 1,2; + QUERY PLAN +--------------------------------------------------------------------- + Sort + Sort Key: remote_scan.x, remote_scan.y + -> Custom Scan (Citus Adaptive) + Task Count: 2 + Tasks Shown: One of 2 + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Nested Loop + -> Seq Scan on test_20240017 a + -> Subquery Scan on dt1 + Filter: (a.x = dt1.x) + -> HashAggregate + Group Key: b.x + -> Append + -> Seq Scan on test_20240017 b + -> Seq Scan on test_20240017 c + Filter: (a.x = x) +(17 rows) + +SET client_min_messages TO DEBUG2; +SELECT a.* +FROM test a JOIN LATERAL (SELECT x FROM test b UNION SELECT y FROM test c WHERE a.x = c.x) dt1 ON a.x = dt1.x +ORDER BY 1,2; +DEBUG: Router planner cannot handle multi-shard select queries + x | y +--------------------------------------------------------------------- + 1 | 1 + 2 | 2 +(2 rows) + +RESET client_min_messages; +CREATE TABLE users (user_id int, time int, dept int, info bigint); +CREATE TABLE events (user_id int, time int, event_type int, payload text); +select create_distributed_table('users', 'user_id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +select create_distributed_table('events', 'user_id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +insert into users +select i, 2021 + (i % 3), i % 5, 99999 * i from generate_series(1, 10) i; +insert into events +select i % 10 + 1, 2021 + (i % 3), i %11, md5((i*i)::text) from generate_series(1, 100) i; +-- Query 2. In Citus correlated subqueries can not be used in the WHERE +-- clause but if the subquery can be pulled up to a join it becomes possible +-- for Citus to run the query, per this example. Pre PG17 the suqbuery +-- was implemented as a SubPlan filter on the events table scan. +EXPLAIN (costs off) +WITH event_id + AS(SELECT user_id AS events_user_id, + time AS events_time, + event_type + FROM events) +SELECT Count(*) +FROM event_id +WHERE (events_user_id) IN (SELECT user_id + FROM users + WHERE users.time = events_time); + QUERY PLAN +--------------------------------------------------------------------- + Aggregate + -> Custom Scan (Citus Adaptive) + Task Count: 2 + Tasks Shown: One of 2 + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Aggregate + -> Hash Join + Hash Cond: ((events."time" = users."time") AND (events.user_id = users.user_id)) + -> Seq Scan on events_20240021 events + -> Hash + -> HashAggregate + Group Key: users."time", users.user_id + -> Seq Scan on users_20240019 users +(14 rows) + +SET client_min_messages TO DEBUG2; +WITH event_id + AS(SELECT user_id AS events_user_id, + time AS events_time, + event_type + FROM events) +SELECT Count(*) +FROM event_id +WHERE (events_user_id) IN (SELECT user_id + FROM users + WHERE users.time = events_time); +DEBUG: CTE event_id is going to be inlined via distributed planning +DEBUG: Router planner cannot handle multi-shard select queries + count +--------------------------------------------------------------------- + 31 +(1 row) + +RESET client_min_messages; +-- Query 2 rewritten with subquery pulled up to a join, as done by pg17 planner. Citus +-- Citus is able to run this query with previous pg versions. Note that the CTE can be +-- disregarded because it is inlined, being only referenced once. +EXPLAIN (COSTS OFF) +SELECT Count(*) +FROM (SELECT user_id AS events_user_id, + time AS events_time, + event_type FROM events) dt1 +INNER JOIN (SELECT distinct user_id, time FROM users) dt + ON events_user_id = dt.user_id and events_time = dt.time; + QUERY PLAN +--------------------------------------------------------------------- + Aggregate + -> Custom Scan (Citus Adaptive) + Task Count: 2 + Tasks Shown: One of 2 + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Aggregate + -> Hash Join + Hash Cond: ((events.user_id = users.user_id) AND (events."time" = users."time")) + -> Seq Scan on events_20240021 events + -> Hash + -> HashAggregate + Group Key: users.user_id, users."time" + -> Seq Scan on users_20240019 users +(14 rows) + +SET client_min_messages TO DEBUG2; +SELECT Count(*) +FROM (SELECT user_id AS events_user_id, + time AS events_time, + event_type FROM events) dt1 +INNER JOIN (SELECT distinct user_id, time FROM users) dt + ON events_user_id = dt.user_id and events_time = dt.time; +DEBUG: Router planner cannot handle multi-shard select queries + count +--------------------------------------------------------------------- + 31 +(1 row) + +RESET client_min_messages; +-- Query 3: another example where recursive planning was prevented due to +-- correlated subqueries, but with PG17 folding the subquery to a join it is +-- possible for Citus to plan and run the query. +EXPLAIN (costs off) +SELECT dept, sum(user_id) FROM +(SELECT users.dept, users.user_id +FROM users, events as d1 +WHERE d1.user_id = users.user_id + AND users.dept IN (3,4) + AND users.user_id IN + (SELECT s2.user_id FROM users as s2 + GROUP BY d1.user_id, s2.user_id)) dt +GROUP BY dept; + QUERY PLAN +--------------------------------------------------------------------- + HashAggregate + Group Key: remote_scan.dept + -> Custom Scan (Citus Adaptive) + Task Count: 2 + Tasks Shown: One of 2 + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> GroupAggregate + Group Key: users.dept + -> Sort + Sort Key: users.dept + -> Nested Loop Semi Join + -> Hash Join + Hash Cond: (d1.user_id = users.user_id) + -> Seq Scan on events_20240021 d1 + -> Hash + -> Seq Scan on users_20240019 users + Filter: (dept = ANY ('{3,4}'::integer[])) + -> Subquery Scan on "ANY_subquery" + Filter: (d1.user_id = "ANY_subquery".user_id) + -> HashAggregate + Group Key: s2.user_id + -> Seq Scan on users_20240019 s2 +(23 rows) + +SET client_min_messages TO DEBUG2; +SELECT dept, sum(user_id) FROM +(SELECT users.dept, users.user_id +FROM users, events as d1 +WHERE d1.user_id = users.user_id + AND users.dept IN (3,4) + AND users.user_id IN + (SELECT s2.user_id FROM users as s2 + GROUP BY d1.user_id, s2.user_id)) dt +GROUP BY dept; +DEBUG: Router planner cannot handle multi-shard select queries + dept | sum +--------------------------------------------------------------------- + 3 | 110 + 4 | 130 +(2 rows) + +RESET client_min_messages; +-- Query 3 rewritten in a similar way to how the PG17 pulls up the subquery; +-- the join is on the distribution key so Citus can push down. +EXPLAIN (costs off) +SELECT dept, sum(user_id) FROM +(SELECT users.dept, users.user_id +FROM users, events as d1 + JOIN LATERAL (SELECT s2.user_id FROM users as s2 + GROUP BY s2.user_id HAVING d1.user_id IS NOT NULL) as d2 ON 1=1 +WHERE d1.user_id = users.user_id + AND users.dept IN (3,4) + AND users.user_id = d2.user_id) dt +GROUP BY dept; + QUERY PLAN +--------------------------------------------------------------------- + HashAggregate + Group Key: remote_scan.dept + -> Custom Scan (Citus Adaptive) + Task Count: 2 + Tasks Shown: One of 2 + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> GroupAggregate + Group Key: users.dept + -> Sort + Sort Key: users.dept + -> Nested Loop + -> Hash Join + Hash Cond: (d1.user_id = users.user_id) + -> Seq Scan on events_20240021 d1 + -> Hash + -> Seq Scan on users_20240019 users + Filter: (dept = ANY ('{3,4}'::integer[])) + -> Subquery Scan on d2 + Filter: (d1.user_id = d2.user_id) + -> HashAggregate + Group Key: s2.user_id + -> Result + One-Time Filter: (d1.user_id IS NOT NULL) + -> Seq Scan on users_20240019 s2 +(25 rows) + +SET client_min_messages TO DEBUG2; +SELECT dept, sum(user_id) FROM +(SELECT users.dept, users.user_id +FROM users, events as d1 + JOIN LATERAL (SELECT s2.user_id FROM users as s2 + GROUP BY s2.user_id HAVING d1.user_id IS NOT NULL) as d2 ON 1=1 +WHERE d1.user_id = users.user_id + AND users.dept IN (3,4) + AND users.user_id = d2.user_id) dt +GROUP BY dept; +DEBUG: Router planner cannot handle multi-shard select queries + dept | sum +--------------------------------------------------------------------- + 3 | 110 + 4 | 130 +(2 rows) + +RESET client_min_messages; +RESET search_path; +RESET citus.next_shard_id; +RESET citus.shard_count; +RESET citus.shard_replication_factor; +DROP SCHEMA pg17_corr_subq_folding CASCADE; +NOTICE: drop cascades to 3 other objects +DETAIL: drop cascades to table pg17_corr_subq_folding.test +drop cascades to table pg17_corr_subq_folding.users +drop cascades to table pg17_corr_subq_folding.events +\if :server_version_ge_17 +\else +\q +\endif +-- PG17-specific tests go here. +-- diff --git a/src/test/regress/expected/pg17_0.out b/src/test/regress/expected/pg17_0.out new file mode 100644 index 000000000..66dba2c29 --- /dev/null +++ b/src/test/regress/expected/pg17_0.out @@ -0,0 +1,295 @@ +-- +-- PG17 +-- +SHOW server_version \gset +SELECT substring(:'server_version', '\d+')::int >= 17 AS server_version_ge_17 +\gset +-- PG17 has the capabilty to pull up a correlated ANY subquery to a join if +-- the subquery only refers to its immediate parent query. Previously, the +-- subquery needed to be implemented as a SubPlan node, typically as a +-- filter on a scan or join node. This PG17 capability enables Citus to +-- run queries with correlated subqueries in certain cases, as shown here. +-- Relevant PG commit: +-- https://git.postgresql.org/gitweb/?p=postgresql.git;a=commitdiff;h=9f1337639 +-- This feature is tested for all PG versions, not just PG17; each test query with +-- a correlated subquery should fail with PG version < 17.0, but the test query +-- rewritten to reflect how PG17 optimizes it should succeed with PG < 17.0 +CREATE SCHEMA pg17_corr_subq_folding; +SET search_path TO pg17_corr_subq_folding; +SET citus.next_shard_id TO 20240017; +SET citus.shard_count TO 2; +SET citus.shard_replication_factor TO 1; +CREATE TABLE test (x int, y int); +SELECT create_distributed_table('test', 'x'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO test VALUES (1,1), (2,2); +-- Query 1: WHERE clause has a correlated subquery with a UNION. PG17 can plan +-- this as a nested loop join with the subquery as the inner. The correlation +-- is on the distribution column so the join can be pushed down by Citus. +explain (costs off) +SELECT * +FROM test a +WHERE x IN (SELECT x FROM test b UNION SELECT y FROM test c WHERE a.x = c.x) +ORDER BY 1,2; +ERROR: cannot push down this subquery +DETAIL: Complex subqueries and CTEs are not supported within a UNION +SET client_min_messages TO DEBUG2; +SELECT * +FROM test a +WHERE x IN (SELECT x FROM test b UNION SELECT y FROM test c WHERE a.x = c.x) +ORDER BY 1,2; +DEBUG: Router planner cannot handle multi-shard select queries +DEBUG: Router planner cannot handle multi-shard select queries +DEBUG: generating subplan XXX_1 for subquery SELECT x FROM pg17_corr_subq_folding.test b +DEBUG: skipping recursive planning for the subquery since it contains references to outer queries +DEBUG: skipping recursive planning for the subquery since it contains references to outer queries +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT x, y FROM pg17_corr_subq_folding.test a WHERE (x OPERATOR(pg_catalog.=) ANY (SELECT intermediate_result.x FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(x integer) UNION SELECT c.y FROM pg17_corr_subq_folding.test c WHERE (a.x OPERATOR(pg_catalog.=) c.x))) ORDER BY x, y +DEBUG: Router planner cannot handle multi-shard select queries +DEBUG: skipping recursive planning for the subquery since it contains references to outer queries +DEBUG: skipping recursive planning for the subquery since it contains references to outer queries +ERROR: cannot push down this subquery +DETAIL: Complex subqueries and CTEs are not supported within a UNION +RESET client_min_messages; +-- Query 1 rewritten with subquery pulled up to a join, as done by PG17 planner; +-- this query can be run without issues by Citus with older (pre PG17) PGs. +explain (costs off) +SELECT a.* +FROM test a JOIN LATERAL (SELECT x FROM test b UNION SELECT y FROM test c WHERE a.x = c.x) dt1 ON a.x = dt1.x +ORDER BY 1,2; + QUERY PLAN +--------------------------------------------------------------------- + Sort + Sort Key: remote_scan.x, remote_scan.y + -> Custom Scan (Citus Adaptive) + Task Count: 2 + Tasks Shown: One of 2 + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Nested Loop + -> Seq Scan on test_20240017 a + -> Subquery Scan on dt1 + Filter: (a.x = dt1.x) + -> HashAggregate + Group Key: b.x + -> Append + -> Seq Scan on test_20240017 b + -> Seq Scan on test_20240017 c + Filter: (a.x = x) +(17 rows) + +SET client_min_messages TO DEBUG2; +SELECT a.* +FROM test a JOIN LATERAL (SELECT x FROM test b UNION SELECT y FROM test c WHERE a.x = c.x) dt1 ON a.x = dt1.x +ORDER BY 1,2; +DEBUG: Router planner cannot handle multi-shard select queries + x | y +--------------------------------------------------------------------- + 1 | 1 + 2 | 2 +(2 rows) + +RESET client_min_messages; +CREATE TABLE users (user_id int, time int, dept int, info bigint); +CREATE TABLE events (user_id int, time int, event_type int, payload text); +select create_distributed_table('users', 'user_id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +select create_distributed_table('events', 'user_id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +insert into users +select i, 2021 + (i % 3), i % 5, 99999 * i from generate_series(1, 10) i; +insert into events +select i % 10 + 1, 2021 + (i % 3), i %11, md5((i*i)::text) from generate_series(1, 100) i; +-- Query 2. In Citus correlated subqueries can not be used in the WHERE +-- clause but if the subquery can be pulled up to a join it becomes possible +-- for Citus to run the query, per this example. Pre PG17 the suqbuery +-- was implemented as a SubPlan filter on the events table scan. +EXPLAIN (costs off) +WITH event_id + AS(SELECT user_id AS events_user_id, + time AS events_time, + event_type + FROM events) +SELECT Count(*) +FROM event_id +WHERE (events_user_id) IN (SELECT user_id + FROM users + WHERE users.time = events_time); +ERROR: correlated subqueries are not supported when the FROM clause contains a CTE or subquery +SET client_min_messages TO DEBUG2; +WITH event_id + AS(SELECT user_id AS events_user_id, + time AS events_time, + event_type + FROM events) +SELECT Count(*) +FROM event_id +WHERE (events_user_id) IN (SELECT user_id + FROM users + WHERE users.time = events_time); +DEBUG: CTE event_id is going to be inlined via distributed planning +DEBUG: Router planner cannot handle multi-shard select queries +DEBUG: skipping recursive planning for the subquery since it contains references to outer queries +DEBUG: Router planner cannot handle multi-shard select queries +DEBUG: generating subplan XXX_1 for CTE event_id: SELECT user_id AS events_user_id, "time" AS events_time, event_type FROM pg17_corr_subq_folding.events +DEBUG: Router planner cannot handle multi-shard select queries +DEBUG: skipping recursive planning for the subquery since it contains references to outer queries +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (SELECT intermediate_result.events_user_id, intermediate_result.events_time, intermediate_result.event_type FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(events_user_id integer, events_time integer, event_type integer)) event_id WHERE (events_user_id OPERATOR(pg_catalog.=) ANY (SELECT users.user_id FROM pg17_corr_subq_folding.users WHERE (users."time" OPERATOR(pg_catalog.=) event_id.events_time))) +DEBUG: Router planner cannot handle multi-shard select queries +DEBUG: skipping recursive planning for the subquery since it contains references to outer queries +ERROR: correlated subqueries are not supported when the FROM clause contains a CTE or subquery +RESET client_min_messages; +-- Query 2 rewritten with subquery pulled up to a join, as done by pg17 planner. Citus +-- Citus is able to run this query with previous pg versions. Note that the CTE can be +-- disregarded because it is inlined, being only referenced once. +EXPLAIN (COSTS OFF) +SELECT Count(*) +FROM (SELECT user_id AS events_user_id, + time AS events_time, + event_type FROM events) dt1 +INNER JOIN (SELECT distinct user_id, time FROM users) dt + ON events_user_id = dt.user_id and events_time = dt.time; + QUERY PLAN +--------------------------------------------------------------------- + Aggregate + -> Custom Scan (Citus Adaptive) + Task Count: 2 + Tasks Shown: One of 2 + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Aggregate + -> Hash Join + Hash Cond: ((events.user_id = users.user_id) AND (events."time" = users."time")) + -> Seq Scan on events_20240021 events + -> Hash + -> HashAggregate + Group Key: users.user_id, users."time" + -> Seq Scan on users_20240019 users +(14 rows) + +SET client_min_messages TO DEBUG2; +SELECT Count(*) +FROM (SELECT user_id AS events_user_id, + time AS events_time, + event_type FROM events) dt1 +INNER JOIN (SELECT distinct user_id, time FROM users) dt + ON events_user_id = dt.user_id and events_time = dt.time; +DEBUG: Router planner cannot handle multi-shard select queries + count +--------------------------------------------------------------------- + 31 +(1 row) + +RESET client_min_messages; +-- Query 3: another example where recursive planning was prevented due to +-- correlated subqueries, but with PG17 folding the subquery to a join it is +-- possible for Citus to plan and run the query. +EXPLAIN (costs off) +SELECT dept, sum(user_id) FROM +(SELECT users.dept, users.user_id +FROM users, events as d1 +WHERE d1.user_id = users.user_id + AND users.dept IN (3,4) + AND users.user_id IN + (SELECT s2.user_id FROM users as s2 + GROUP BY d1.user_id, s2.user_id)) dt +GROUP BY dept; +ERROR: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns +SET client_min_messages TO DEBUG2; +SELECT dept, sum(user_id) FROM +(SELECT users.dept, users.user_id +FROM users, events as d1 +WHERE d1.user_id = users.user_id + AND users.dept IN (3,4) + AND users.user_id IN + (SELECT s2.user_id FROM users as s2 + GROUP BY d1.user_id, s2.user_id)) dt +GROUP BY dept; +DEBUG: Router planner cannot handle multi-shard select queries +DEBUG: skipping recursive planning for the subquery since it contains references to outer queries +DEBUG: Router planner cannot handle multi-shard select queries +DEBUG: skipping recursive planning for the subquery since it contains references to outer queries +ERROR: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns +RESET client_min_messages; +-- Query 3 rewritten in a similar way to how the PG17 pulls up the subquery; +-- the join is on the distribution key so Citus can push down. +EXPLAIN (costs off) +SELECT dept, sum(user_id) FROM +(SELECT users.dept, users.user_id +FROM users, events as d1 + JOIN LATERAL (SELECT s2.user_id FROM users as s2 + GROUP BY s2.user_id HAVING d1.user_id IS NOT NULL) as d2 ON 1=1 +WHERE d1.user_id = users.user_id + AND users.dept IN (3,4) + AND users.user_id = d2.user_id) dt +GROUP BY dept; + QUERY PLAN +--------------------------------------------------------------------- + HashAggregate + Group Key: remote_scan.dept + -> Custom Scan (Citus Adaptive) + Task Count: 2 + Tasks Shown: One of 2 + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> GroupAggregate + Group Key: users.dept + -> Sort + Sort Key: users.dept + -> Nested Loop + -> Hash Join + Hash Cond: (d1.user_id = users.user_id) + -> Seq Scan on events_20240021 d1 + -> Hash + -> Seq Scan on users_20240019 users + Filter: (dept = ANY ('{3,4}'::integer[])) + -> Subquery Scan on d2 + Filter: (d1.user_id = d2.user_id) + -> HashAggregate + Group Key: s2.user_id + -> Result + One-Time Filter: (d1.user_id IS NOT NULL) + -> Seq Scan on users_20240019 s2 +(25 rows) + +SET client_min_messages TO DEBUG2; +SELECT dept, sum(user_id) FROM +(SELECT users.dept, users.user_id +FROM users, events as d1 + JOIN LATERAL (SELECT s2.user_id FROM users as s2 + GROUP BY s2.user_id HAVING d1.user_id IS NOT NULL) as d2 ON 1=1 +WHERE d1.user_id = users.user_id + AND users.dept IN (3,4) + AND users.user_id = d2.user_id) dt +GROUP BY dept; +DEBUG: Router planner cannot handle multi-shard select queries + dept | sum +--------------------------------------------------------------------- + 3 | 110 + 4 | 130 +(2 rows) + +RESET client_min_messages; +RESET search_path; +RESET citus.next_shard_id; +RESET citus.shard_count; +RESET citus.shard_replication_factor; +DROP SCHEMA pg17_corr_subq_folding CASCADE; +NOTICE: drop cascades to 3 other objects +DETAIL: drop cascades to table pg17_corr_subq_folding.test +drop cascades to table pg17_corr_subq_folding.users +drop cascades to table pg17_corr_subq_folding.events +\if :server_version_ge_17 +\else +\q diff --git a/src/test/regress/expected/set_operations.out b/src/test/regress/expected/set_operations.out index f2e0616e7..15a0345b5 100644 --- a/src/test/regress/expected/set_operations.out +++ b/src/test/regress/expected/set_operations.out @@ -771,13 +771,13 @@ DEBUG: Router planner cannot handle multi-shard select queries (2 rows) -- correlated subquery with union in WHERE clause -SELECT * FROM test a WHERE x IN (SELECT x FROM test b UNION SELECT y FROM test c WHERE a.x = c.x) ORDER BY 1,2; +SELECT * FROM test a WHERE (x + random()) IN (SELECT x FROM test b UNION SELECT y FROM test c WHERE a.x = c.x) ORDER BY 1,2; DEBUG: Router planner cannot handle multi-shard select queries DEBUG: Router planner cannot handle multi-shard select queries DEBUG: generating subplan XXX_1 for subquery SELECT x FROM recursive_union.test b DEBUG: skipping recursive planning for the subquery since it contains references to outer queries DEBUG: skipping recursive planning for the subquery since it contains references to outer queries -DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT x, y FROM recursive_union.test a WHERE (x OPERATOR(pg_catalog.=) ANY (SELECT intermediate_result.x FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(x integer) UNION SELECT c.y FROM recursive_union.test c WHERE (a.x OPERATOR(pg_catalog.=) c.x))) ORDER BY x, y +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT x, y FROM recursive_union.test a WHERE (((x)::double precision OPERATOR(pg_catalog.+) random()) OPERATOR(pg_catalog.=) ANY (SELECT intermediate_result.x FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(x integer) UNION SELECT c.y FROM recursive_union.test c WHERE (a.x OPERATOR(pg_catalog.=) c.x))) ORDER BY x, y DEBUG: Router planner cannot handle multi-shard select queries DEBUG: skipping recursive planning for the subquery since it contains references to outer queries DEBUG: skipping recursive planning for the subquery since it contains references to outer queries diff --git a/src/test/regress/expected/subquery_in_where.out b/src/test/regress/expected/subquery_in_where.out index eb56acd87..990c29084 100644 --- a/src/test/regress/expected/subquery_in_where.out +++ b/src/test/regress/expected/subquery_in_where.out @@ -30,12 +30,12 @@ WITH event_id FROM events_table) SELECT Count(*) FROM event_id -WHERE events_user_id IN (SELECT user_id +WHERE (events_user_id, random()) IN (SELECT user_id, 1 FROM users_table WHERE users_table.time = events_time); DEBUG: CTE event_id is going to be inlined via distributed planning DEBUG: generating subplan XXX_1 for CTE event_id: SELECT user_id AS events_user_id, "time" AS events_time, event_type FROM public.events_table -DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (SELECT intermediate_result.events_user_id, intermediate_result.events_time, intermediate_result.event_type FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(events_user_id integer, events_time timestamp without time zone, event_type integer)) event_id WHERE (events_user_id OPERATOR(pg_catalog.=) ANY (SELECT users_table.user_id FROM public.users_table WHERE (users_table."time" OPERATOR(pg_catalog.=) event_id.events_time))) +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (SELECT intermediate_result.events_user_id, intermediate_result.events_time, intermediate_result.event_type FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(events_user_id integer, events_time timestamp without time zone, event_type integer)) event_id WHERE ((events_user_id, random()) OPERATOR(pg_catalog.=) ANY (SELECT users_table.user_id, 1 FROM public.users_table WHERE (users_table."time" OPERATOR(pg_catalog.=) event_id.events_time))) ERROR: correlated subqueries are not supported when the FROM clause contains a CTE or subquery -- Recurring tuples as empty join tree SELECT * diff --git a/src/test/regress/multi_schedule b/src/test/regress/multi_schedule index 535ef1d62..0a809d236 100644 --- a/src/test/regress/multi_schedule +++ b/src/test/regress/multi_schedule @@ -66,6 +66,7 @@ test: pg13 pg12 test: pg14 test: pg15 test: pg15_jsonpath detect_conn_close +test: pg17 test: drop_column_partitioned_table test: tableam diff --git a/src/test/regress/sql/dml_recursive.sql b/src/test/regress/sql/dml_recursive.sql index 89e654b6c..7337c9672 100644 --- a/src/test/regress/sql/dml_recursive.sql +++ b/src/test/regress/sql/dml_recursive.sql @@ -212,6 +212,7 @@ RETURNING *; -- again a correlated subquery -- this time distribution key eq. exists -- however recursive planning is prevented due to correlated subqueries +-- that cannot be folded to joins. UPDATE second_distributed_table SET @@ -231,7 +232,7 @@ FROM AND second_distributed_table.tenant_id IN ( - SELECT s2.tenant_id + SELECT s2.tenant_id || random()::text FROM second_distributed_table as s2 GROUP BY d1.tenant_id, s2.tenant_id ) diff --git a/src/test/regress/sql/pg17.sql b/src/test/regress/sql/pg17.sql new file mode 100644 index 000000000..4fdde71ca --- /dev/null +++ b/src/test/regress/sql/pg17.sql @@ -0,0 +1,182 @@ +-- +-- PG17 +-- +SHOW server_version \gset +SELECT substring(:'server_version', '\d+')::int >= 17 AS server_version_ge_17 +\gset + +-- PG17 has the capabilty to pull up a correlated ANY subquery to a join if +-- the subquery only refers to its immediate parent query. Previously, the +-- subquery needed to be implemented as a SubPlan node, typically as a +-- filter on a scan or join node. This PG17 capability enables Citus to +-- run queries with correlated subqueries in certain cases, as shown here. +-- Relevant PG commit: +-- https://git.postgresql.org/gitweb/?p=postgresql.git;a=commitdiff;h=9f1337639 + +-- This feature is tested for all PG versions, not just PG17; each test query with +-- a correlated subquery should fail with PG version < 17.0, but the test query +-- rewritten to reflect how PG17 optimizes it should succeed with PG < 17.0 + +CREATE SCHEMA pg17_corr_subq_folding; +SET search_path TO pg17_corr_subq_folding; +SET citus.next_shard_id TO 20240017; +SET citus.shard_count TO 2; +SET citus.shard_replication_factor TO 1; + +CREATE TABLE test (x int, y int); +SELECT create_distributed_table('test', 'x'); +INSERT INTO test VALUES (1,1), (2,2); + +-- Query 1: WHERE clause has a correlated subquery with a UNION. PG17 can plan +-- this as a nested loop join with the subquery as the inner. The correlation +-- is on the distribution column so the join can be pushed down by Citus. +explain (costs off) +SELECT * +FROM test a +WHERE x IN (SELECT x FROM test b UNION SELECT y FROM test c WHERE a.x = c.x) +ORDER BY 1,2; + +SET client_min_messages TO DEBUG2; +SELECT * +FROM test a +WHERE x IN (SELECT x FROM test b UNION SELECT y FROM test c WHERE a.x = c.x) +ORDER BY 1,2; +RESET client_min_messages; + +-- Query 1 rewritten with subquery pulled up to a join, as done by PG17 planner; +-- this query can be run without issues by Citus with older (pre PG17) PGs. +explain (costs off) +SELECT a.* +FROM test a JOIN LATERAL (SELECT x FROM test b UNION SELECT y FROM test c WHERE a.x = c.x) dt1 ON a.x = dt1.x +ORDER BY 1,2; + +SET client_min_messages TO DEBUG2; +SELECT a.* +FROM test a JOIN LATERAL (SELECT x FROM test b UNION SELECT y FROM test c WHERE a.x = c.x) dt1 ON a.x = dt1.x +ORDER BY 1,2; +RESET client_min_messages; + +CREATE TABLE users (user_id int, time int, dept int, info bigint); +CREATE TABLE events (user_id int, time int, event_type int, payload text); +select create_distributed_table('users', 'user_id'); +select create_distributed_table('events', 'user_id'); + +insert into users +select i, 2021 + (i % 3), i % 5, 99999 * i from generate_series(1, 10) i; + +insert into events +select i % 10 + 1, 2021 + (i % 3), i %11, md5((i*i)::text) from generate_series(1, 100) i; + +-- Query 2. In Citus correlated subqueries can not be used in the WHERE +-- clause but if the subquery can be pulled up to a join it becomes possible +-- for Citus to run the query, per this example. Pre PG17 the suqbuery +-- was implemented as a SubPlan filter on the events table scan. +EXPLAIN (costs off) +WITH event_id + AS(SELECT user_id AS events_user_id, + time AS events_time, + event_type + FROM events) +SELECT Count(*) +FROM event_id +WHERE (events_user_id) IN (SELECT user_id + FROM users + WHERE users.time = events_time); + +SET client_min_messages TO DEBUG2; +WITH event_id + AS(SELECT user_id AS events_user_id, + time AS events_time, + event_type + FROM events) +SELECT Count(*) +FROM event_id +WHERE (events_user_id) IN (SELECT user_id + FROM users + WHERE users.time = events_time); +RESET client_min_messages; + +-- Query 2 rewritten with subquery pulled up to a join, as done by pg17 planner. Citus +-- Citus is able to run this query with previous pg versions. Note that the CTE can be +-- disregarded because it is inlined, being only referenced once. +EXPLAIN (COSTS OFF) +SELECT Count(*) +FROM (SELECT user_id AS events_user_id, + time AS events_time, + event_type FROM events) dt1 +INNER JOIN (SELECT distinct user_id, time FROM users) dt + ON events_user_id = dt.user_id and events_time = dt.time; + +SET client_min_messages TO DEBUG2; +SELECT Count(*) +FROM (SELECT user_id AS events_user_id, + time AS events_time, + event_type FROM events) dt1 +INNER JOIN (SELECT distinct user_id, time FROM users) dt + ON events_user_id = dt.user_id and events_time = dt.time; +RESET client_min_messages; + +-- Query 3: another example where recursive planning was prevented due to +-- correlated subqueries, but with PG17 folding the subquery to a join it is +-- possible for Citus to plan and run the query. +EXPLAIN (costs off) +SELECT dept, sum(user_id) FROM +(SELECT users.dept, users.user_id +FROM users, events as d1 +WHERE d1.user_id = users.user_id + AND users.dept IN (3,4) + AND users.user_id IN + (SELECT s2.user_id FROM users as s2 + GROUP BY d1.user_id, s2.user_id)) dt +GROUP BY dept; + +SET client_min_messages TO DEBUG2; +SELECT dept, sum(user_id) FROM +(SELECT users.dept, users.user_id +FROM users, events as d1 +WHERE d1.user_id = users.user_id + AND users.dept IN (3,4) + AND users.user_id IN + (SELECT s2.user_id FROM users as s2 + GROUP BY d1.user_id, s2.user_id)) dt +GROUP BY dept; +RESET client_min_messages; + +-- Query 3 rewritten in a similar way to how the PG17 pulls up the subquery; +-- the join is on the distribution key so Citus can push down. +EXPLAIN (costs off) +SELECT dept, sum(user_id) FROM +(SELECT users.dept, users.user_id +FROM users, events as d1 + JOIN LATERAL (SELECT s2.user_id FROM users as s2 + GROUP BY s2.user_id HAVING d1.user_id IS NOT NULL) as d2 ON 1=1 +WHERE d1.user_id = users.user_id + AND users.dept IN (3,4) + AND users.user_id = d2.user_id) dt +GROUP BY dept; + +SET client_min_messages TO DEBUG2; +SELECT dept, sum(user_id) FROM +(SELECT users.dept, users.user_id +FROM users, events as d1 + JOIN LATERAL (SELECT s2.user_id FROM users as s2 + GROUP BY s2.user_id HAVING d1.user_id IS NOT NULL) as d2 ON 1=1 +WHERE d1.user_id = users.user_id + AND users.dept IN (3,4) + AND users.user_id = d2.user_id) dt +GROUP BY dept; +RESET client_min_messages; + +RESET search_path; +RESET citus.next_shard_id; +RESET citus.shard_count; +RESET citus.shard_replication_factor; +DROP SCHEMA pg17_corr_subq_folding CASCADE; + +\if :server_version_ge_17 +\else +\q +\endif + +-- PG17-specific tests go here. +-- diff --git a/src/test/regress/sql/set_operations.sql b/src/test/regress/sql/set_operations.sql index 633b5c0b5..58907a281 100644 --- a/src/test/regress/sql/set_operations.sql +++ b/src/test/regress/sql/set_operations.sql @@ -134,7 +134,7 @@ SELECT * FROM test a WHERE x NOT IN (SELECT x FROM test b WHERE y = 1 UNION SELE SELECT * FROM test a WHERE x IN (SELECT x FROM test b UNION SELECT y FROM test c) ORDER BY 1,2; -- correlated subquery with union in WHERE clause -SELECT * FROM test a WHERE x IN (SELECT x FROM test b UNION SELECT y FROM test c WHERE a.x = c.x) ORDER BY 1,2; +SELECT * FROM test a WHERE (x + random()) IN (SELECT x FROM test b UNION SELECT y FROM test c WHERE a.x = c.x) ORDER BY 1,2; -- force unions to be planned while subqueries are being planned SELECT * FROM ((SELECT * FROM test) UNION (SELECT * FROM test) ORDER BY 1,2 LIMIT 5) as foo ORDER BY 1 DESC LIMIT 3; diff --git a/src/test/regress/sql/subquery_in_where.sql b/src/test/regress/sql/subquery_in_where.sql index 90386122f..8316508b7 100644 --- a/src/test/regress/sql/subquery_in_where.sql +++ b/src/test/regress/sql/subquery_in_where.sql @@ -25,7 +25,7 @@ WITH event_id FROM events_table) SELECT Count(*) FROM event_id -WHERE events_user_id IN (SELECT user_id +WHERE (events_user_id, random()) IN (SELECT user_id, 1 FROM users_table WHERE users_table.time = events_time);