-- -- MULTI_SUBQUERY -- -- no need to set shardid sequence given that we're not creating any shards SET citus.next_shard_id TO 570032; SET citus.coordinator_aggregation_strategy TO 'disabled'; -- Check that we error out if shard min/max values are not exactly same. SELECT avg(unit_price) FROM (SELECT l_orderkey, avg(o_totalprice) AS unit_price FROM lineitem_subquery, orders_subquery WHERE l_orderkey = o_orderkey GROUP BY l_orderkey) AS unit_prices; avg --------------------------------------------------------------------- 142158.8766934673366834 (1 row) -- Update metadata in order to make all shards equal -- note that the table is created on multi_insert_select_create_table.sql UPDATE pg_dist_shard SET shardmaxvalue = '14947' WHERE shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'orders_subquery'::regclass ORDER BY shardid DESC LIMIT 1); SET client_min_messages TO DEBUG1; -- If group by is not on partition column then we recursively plan SELECT avg(order_count) FROM (SELECT l_suppkey, count(*) AS order_count FROM lineitem_subquery GROUP BY l_suppkey) AS order_counts; DEBUG: generating subplan XXX_1 for subquery SELECT l_suppkey, count(*) AS order_count FROM public.lineitem_subquery GROUP BY l_suppkey DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT avg(order_count) AS avg FROM (SELECT intermediate_result.l_suppkey, intermediate_result.order_count FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(l_suppkey integer, order_count bigint)) order_counts avg --------------------------------------------------------------------- 1.7199369356456930 (1 row) -- Check that we recursively plan if join is not on partition columns. SELECT avg(unit_price) FROM (SELECT l_orderkey, avg(o_totalprice / l_quantity) AS unit_price FROM lineitem_subquery, orders_subquery GROUP BY l_orderkey) AS unit_prices; ERROR: cannot perform distributed planning on this query DETAIL: Cartesian products are currently unsupported RESET client_min_messages; -- Subqueries without relation with a volatile functions (non-constant) are planned recursively SELECT count(*) FROM ( SELECT l_orderkey FROM lineitem_subquery JOIN (SELECT random()::int r) sub ON (l_orderkey = r) WHERE r > 10 ) b; count --------------------------------------------------------------------- 0 (1 row) SET client_min_messages TO DEBUG; -- If there is non relation subqueries then we recursively plan SELECT count(*) FROM ( (SELECT l_orderkey FROM lineitem_subquery) UNION ALL (SELECT 1::bigint) ) b; DEBUG: Router planner cannot handle multi-shard select queries DEBUG: Router planner cannot handle multi-shard select queries DEBUG: generating subplan XXX_1 for subquery SELECT l_orderkey FROM public.lineitem_subquery DEBUG: Creating router plan DEBUG: generating subplan XXX_2 for subquery SELECT intermediate_result.l_orderkey FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(l_orderkey bigint) UNION ALL SELECT (1)::bigint AS int8 DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (SELECT intermediate_result.l_orderkey FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(l_orderkey bigint)) b DEBUG: Creating router plan count --------------------------------------------------------------------- 12001 (1 row) -- If queries in union do not include partition columns then we recursively plan SELECT count(*) FROM ( (SELECT l_orderkey FROM lineitem_subquery) UNION (SELECT l_partkey FROM lineitem_subquery) ) b; DEBUG: Router planner cannot handle multi-shard select queries DEBUG: Router planner cannot handle multi-shard select queries DEBUG: generating subplan XXX_1 for subquery SELECT l_orderkey FROM public.lineitem_subquery DEBUG: Router planner cannot handle multi-shard select queries DEBUG: generating subplan XXX_2 for subquery SELECT l_partkey FROM public.lineitem_subquery DEBUG: Creating router plan DEBUG: generating subplan XXX_3 for subquery SELECT intermediate_result.l_orderkey FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(l_orderkey bigint) UNION SELECT intermediate_result.l_partkey FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(l_partkey integer) DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (SELECT intermediate_result.l_orderkey FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(l_orderkey bigint)) b DEBUG: Creating router plan count --------------------------------------------------------------------- 14496 (1 row) -- Check that we push down union queries if partition column is selected (no DEBUG messages) SELECT count(*) FROM ( (SELECT l_orderkey FROM lineitem_subquery) UNION (SELECT l_orderkey FROM lineitem_subquery) ) b; DEBUG: Router planner cannot handle multi-shard select queries count --------------------------------------------------------------------- 2985 (1 row) RESET client_min_messages; -- we'd error out if inner query has Limit but subquery_pushdown is not set -- but we recursively plan the query SELECT avg(o_totalprice/l_quantity) FROM (SELECT l_orderkey, l_quantity FROM lineitem_subquery ORDER BY l_orderkey, l_quantity LIMIT 10 ) lineitem_quantities JOIN LATERAL (SELECT o_totalprice FROM orders_subquery WHERE lineitem_quantities.l_orderkey = o_orderkey) orders_price ON true; avg --------------------------------------------------------------------- 17470.0940725222668915 (1 row) -- Limit is only supported when subquery_pushdown is set -- Check that we error out if inner query has limit but outer query has not. SET citus.subquery_pushdown to ON; NOTICE: Setting citus.subquery_pushdown flag is discouraged becuase it forces the planner to pushdown certain queries, skipping relevant correctness checks. DETAIL: When enabled, the planner skips many correctness checks for subqueries and pushes down the queries to shards as-is. It means that the queries are likely to return wrong results unless the user is absolutely sure that pushing down the subquery is safe. This GUC is maintained only for backward compatibility, no new users are supposed to use it. The planner is capable of pushing down as much computation as possible to the shards depending on the query. SELECT avg(o_totalprice/l_quantity) FROM (SELECT l_orderkey, l_quantity FROM lineitem_subquery ORDER BY l_orderkey, l_quantity LIMIT 10 ) lineitem_quantities JOIN LATERAL (SELECT o_totalprice FROM orders_subquery WHERE lineitem_quantities.l_orderkey = o_orderkey) orders_price ON true; ERROR: cannot push down this subquery DETAIL: Limit in subquery without limit in the outermost query is unsupported -- reset the flag for next query SET citus.subquery_pushdown to OFF; -- some queries without a subquery uses subquery planner SELECT l_orderkey FROM lineitem_subquery l JOIN orders_subquery o ON (l_orderkey = o_orderkey) WHERE (o_orderkey < l_quantity) ORDER BY l_orderkey DESC LIMIT 10; l_orderkey --------------------------------------------------------------------- 39 39 39 39 38 37 37 37 36 33 (10 rows) -- query is still supported if contains additional join -- clauses that includes arithmetic expressions SELECT l_orderkey FROM lineitem_subquery l JOIN orders_subquery o ON (l_orderkey = o_orderkey) WHERE (o_orderkey < l_quantity + 3) ORDER BY l_orderkey DESC LIMIT 10; l_orderkey --------------------------------------------------------------------- 39 39 39 39 38 37 37 37 36 35 (10 rows) -- implicit typecasts in joins is supported SELECT l_orderkey FROM lineitem_subquery l JOIN orders_subquery o ON (l_orderkey::int8 = o_orderkey::int8) WHERE (o_orderkey < l_quantity + 3) ORDER BY l_orderkey DESC LIMIT 10; l_orderkey --------------------------------------------------------------------- 39 39 39 39 38 37 37 37 36 35 (10 rows) -- non-implicit typecasts in joins is not supported SELECT l_orderkey FROM lineitem_subquery l JOIN orders_subquery o ON (l_orderkey::int8 = o_orderkey::int4) WHERE (o_orderkey < l_quantity + 3) ORDER BY l_orderkey DESC LIMIT 10; ERROR: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator -- implicit typecast supported in equi-join SELECT l_orderkey FROM lineitem_subquery l JOIN orders_subquery o ON (l_orderkey::int8 = o_orderkey::int8) ORDER BY l_orderkey DESC LIMIT 10; l_orderkey --------------------------------------------------------------------- 14947 14947 14946 14946 14945 14945 14945 14945 14945 14945 (10 rows) -- non-implicit typecast is not supported in equi-join SELECT l_orderkey FROM lineitem_subquery l JOIN orders_subquery o ON (l_orderkey::int4 = o_orderkey::int8) ORDER BY l_orderkey DESC LIMIT 10; ERROR: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator -- type casts in filters are supported as long as -- a valid equi-join exists SELECT l_orderkey FROM lineitem_subquery l JOIN orders_subquery o ON (l_orderkey = o_orderkey) WHERE (o_orderkey::int8 < l_quantity::int8 + 3) ORDER BY l_orderkey DESC LIMIT 10; l_orderkey --------------------------------------------------------------------- 39 39 39 39 38 37 37 37 36 35 (10 rows) -- even if type cast is non-implicit SELECT l_orderkey FROM lineitem_subquery l JOIN orders_subquery o ON (l_orderkey = o_orderkey) WHERE (o_orderkey::int4 < l_quantity::int8 + 3) ORDER BY l_orderkey DESC LIMIT 10; l_orderkey --------------------------------------------------------------------- 39 39 39 39 38 37 37 37 36 35 (10 rows) -- query is not supported if contains an partition column -- equi join that includes arithmetic expressions SELECT l_orderkey FROM lineitem_subquery l JOIN orders_subquery o ON (l_orderkey = o_orderkey + 1) WHERE (o_orderkey < l_quantity) ORDER BY l_orderkey DESC LIMIT 10; ERROR: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator -- query is not supported if there is a single -- join clause with arithmetic expression. It fails -- with a different error message SELECT l_orderkey FROM lineitem_subquery l JOIN orders_subquery o ON (l_orderkey = o_orderkey + 1) ORDER BY l_orderkey DESC LIMIT 10; ERROR: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator -- query is not supported if does not have equi-join clause SELECT l_orderkey FROM lineitem_subquery l JOIN orders_subquery o ON (l_orderkey < o_orderkey) WHERE (o_orderkey < l_quantity) ORDER BY l_orderkey DESC LIMIT 10; ERROR: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator -- outer joins on reference tables with functions works SELECT DISTINCT ON (t1.user_id) t1.user_id, t2.value_1, t2.value_2, t2.value_3 FROM events_table t1 LEFT JOIN users_reference_table t2 ON t1.user_id = trunc(t2.user_id) ORDER BY 1 DESC, 2 DESC, 3 DESC, 4 DESC LIMIT 5; user_id | value_1 | value_2 | value_3 --------------------------------------------------------------------- 6 | 5 | 2 | 0 5 | 5 | 5 | 1 4 | 5 | 4 | 1 3 | 5 | 5 | 3 2 | 4 | 4 | 5 (5 rows) -- outer joins on reference tables with simple expressions should work SELECT DISTINCT ON (t1.user_id) t1.user_id, t2.value_1, t2.value_2, t2.value_3 FROM events_table t1 LEFT JOIN users_reference_table t2 ON t1.user_id > t2.user_id ORDER BY 1 DESC, 2 DESC, 3 DESC, 4 DESC LIMIT 5; user_id | value_1 | value_2 | value_3 --------------------------------------------------------------------- 6 | 5 | 5 | 3 5 | 5 | 5 | 3 4 | 5 | 5 | 3 3 | 5 | 4 | 3 2 | 5 | 4 | 3 (5 rows) -- outer joins on distributed tables with simple expressions should not work SELECT DISTINCT ON (t1.user_id) t1.user_id, t2.value_1, t2.value_2, t2.value_3 FROM events_table t1 LEFT JOIN users_table t2 ON t1.user_id > t2.user_id ORDER BY 1 DESC, 2 DESC, 3 DESC, 4 DESC LIMIT 5; ERROR: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns -- outer joins on reference tables with expressions should work SELECT DISTINCT ON (t1.user_id) t1.user_id, t2.value_1, t2.value_2, t2.value_3 FROM events_table t1 LEFT JOIN users_reference_table t2 ON t1.user_id = (CASE WHEN t2.user_id > 3 THEN 3 ELSE t2.user_id END) ORDER BY 1 DESC, 2 DESC, 3 DESC, 4 DESC LIMIT 5; user_id | value_1 | value_2 | value_3 --------------------------------------------------------------------- 6 | | | 5 | | | 4 | | | 3 | 5 | 5 | 3 2 | 4 | 4 | 5 (5 rows) -- outer joins on distributed tables and reference tables with expressions should work SELECT DISTINCT ON (t1.user_id) t1.user_id, t2.value_1, t2.value_2, t2.value_3 FROM users_table t0 LEFT JOIN events_table t1 ON t0.user_id = t1.user_id LEFT JOIN users_reference_table t2 ON t1.user_id = trunc(t2.user_id) ORDER BY 1 DESC, 2 DESC, 3 DESC, 4 DESC LIMIT 5; user_id | value_1 | value_2 | value_3 --------------------------------------------------------------------- 6 | 5 | 2 | 0 5 | 5 | 5 | 1 4 | 5 | 4 | 1 3 | 5 | 5 | 3 2 | 4 | 4 | 5 (5 rows) -- outer joins on distributed tables with expressions should not work SELECT DISTINCT ON (t1.user_id) t1.user_id, t2.value_1, t2.value_2, t2.value_3 FROM users_table t0 LEFT JOIN events_table t1 ON t0.user_id = trunc(t1.user_id) LEFT JOIN users_reference_table t2 ON t1.user_id = trunc(t2.user_id) ORDER BY 1 DESC, 2 DESC, 3 DESC, 4 DESC LIMIT 5; ERROR: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns -- outer joins as subqueries should work -- https://github.com/citusdata/citus/issues/2739 SELECT user_id, value_1, event_type FROM ( SELECT a.user_id, a.value_1, b.event_type FROM users_table a LEFT JOIN events_table b ON a.user_id = b.user_id ) lo ORDER BY 1, 2, 3 LIMIT 5; user_id | value_1 | event_type --------------------------------------------------------------------- 1 | 1 | 0 1 | 1 | 0 1 | 1 | 1 1 | 1 | 1 1 | 1 | 2 (5 rows) -- inner joins on reference tables with functions works SELECT DISTINCT ON (t1.user_id) t1.user_id, t2.value_1, t2.value_2, t2.value_3 FROM events_table t1 JOIN users_reference_table t2 ON t1.user_id = trunc(t2.user_id) ORDER BY 1 DESC, 2 DESC, 3 DESC, 4 DESC LIMIT 5; user_id | value_1 | value_2 | value_3 --------------------------------------------------------------------- 6 | 5 | 2 | 0 5 | 5 | 5 | 1 4 | 5 | 4 | 1 3 | 5 | 5 | 3 2 | 4 | 4 | 5 (5 rows) -- distinct queries work SELECT DISTINCT l_orderkey FROM lineitem_subquery l JOIN orders_subquery o ON (l_orderkey = o_orderkey) WHERE (o_orderkey < l_quantity) ORDER BY l_orderkey DESC LIMIT 10; l_orderkey --------------------------------------------------------------------- 39 38 37 36 33 32 7 6 5 4 (10 rows) -- count(distinct) queries work SELECT COUNT(DISTINCT l_orderkey) FROM lineitem_subquery l JOIN orders_subquery o ON (l_orderkey = o_orderkey) WHERE (o_orderkey < l_quantity); count --------------------------------------------------------------------- 13 (1 row) -- the same queries returning a non-partition column SELECT l_quantity FROM lineitem_subquery l JOIN orders_subquery o ON (l_orderkey = o_orderkey) WHERE (o_orderkey < l_quantity) ORDER BY l_quantity DESC LIMIT 10; l_quantity --------------------------------------------------------------------- 50.00 49.00 46.00 46.00 45.00 44.00 44.00 44.00 43.00 43.00 (10 rows) -- distinct queries work SELECT DISTINCT l_quantity FROM lineitem_subquery l JOIN orders_subquery o ON (l_orderkey = o_orderkey) WHERE (o_orderkey < l_quantity) ORDER BY l_quantity DESC LIMIT 10; l_quantity --------------------------------------------------------------------- 50.00 49.00 46.00 45.00 44.00 43.00 42.00 41.00 40.00 39.00 (10 rows) -- count(distinct) queries work SELECT COUNT(DISTINCT l_quantity) FROM lineitem_subquery l JOIN orders_subquery o ON (l_orderkey = o_orderkey) WHERE (o_orderkey < l_quantity); count --------------------------------------------------------------------- 25 (1 row) -- Check that we support count distinct with a subquery SELECT count(DISTINCT a) FROM ( SELECT count(*) a FROM lineitem_subquery GROUP BY l_orderkey ) z; count --------------------------------------------------------------------- 7 (1 row) -- We do not support distinct aggregates other than count distinct with a subquery SELECT sum(DISTINCT a) FROM ( SELECT count(*) a FROM lineitem_subquery GROUP BY l_orderkey ) z; ERROR: cannot compute aggregate (distinct) DETAIL: Only count(distinct) aggregate is supported in subqueries SELECT avg(DISTINCT a) FROM ( SELECT count(*) a FROM lineitem_subquery GROUP BY l_orderkey ) z; ERROR: cannot compute aggregate (distinct) DETAIL: Only count(distinct) aggregate is supported in subqueries -- Check supported subquery types. SELECT o_custkey, sum(order_count) as total_order_count FROM (SELECT o_orderkey, o_custkey, count(*) AS order_count FROM orders_subquery WHERE o_orderkey > 0 AND o_orderkey < 12000 GROUP BY o_orderkey, o_custkey) AS order_counts GROUP BY o_custkey ORDER BY total_order_count DESC, o_custkey ASC LIMIT 10; o_custkey | total_order_count --------------------------------------------------------------------- 1462 | 9 619 | 8 643 | 8 1030 | 8 1486 | 8 79 | 7 304 | 7 319 | 7 343 | 7 448 | 7 (10 rows) SELECT avg(unit_price) FROM (SELECT l_orderkey, avg(o_totalprice / l_quantity) AS unit_price FROM lineitem_subquery, orders_subquery WHERE l_orderkey = o_orderkey GROUP BY l_orderkey) AS unit_prices WHERE unit_price > 1000 AND unit_price < 10000; avg --------------------------------------------------------------------- 4968.4946466804019323 (1 row) -- Check subqueries in target list SELECT (SELECT 1) FROM orders_subquery ORDER BY 1 LIMIT 1; ?column? --------------------------------------------------------------------- 1 (1 row) SELECT sum((SELECT 1)) FROM orders_subquery; sum --------------------------------------------------------------------- 2985 (1 row) -- Check that if subquery is pulled, we don't error and run query properly. SELECT count(*) FROM ( SELECT l_orderkey FROM ( (SELECT l_orderkey FROM lineitem_subquery) UNION (SELECT l_orderkey FROM lineitem_subquery) ) a WHERE l_orderkey = 1 ) b; count --------------------------------------------------------------------- 1 (1 row) SELECT count(*) FROM ( SELECT * FROM ( (SELECT * FROM lineitem_subquery) UNION (SELECT * FROM lineitem_subquery) ) a WHERE l_orderkey = 1 ) b; count --------------------------------------------------------------------- 6 (1 row) SELECT max(l_orderkey) FROM ( SELECT l_orderkey FROM ( SELECT l_orderkey FROM lineitem_subquery WHERE l_orderkey < 20000 GROUP BY l_orderkey ) z ) y; max --------------------------------------------------------------------- 14947 (1 row) -- Subqueries filter by 2 different users SELECT * FROM (SELECT * FROM (SELECT user_id, sum(value_2) AS counter FROM events_table WHERE user_id = 2 GROUP BY user_id) AS foo, (SELECT user_id, sum(value_2) AS counter FROM events_table WHERE user_id = 3 GROUP BY user_id) AS bar WHERE foo.user_id = bar.user_id ) AS baz; user_id | counter | user_id | counter --------------------------------------------------------------------- (0 rows) -- Subqueries filter by different users, one of which overlaps SELECT * FROM (SELECT * FROM (SELECT user_id, sum(value_2) AS counter FROM events_table WHERE user_id = 2 OR user_id = 3 GROUP BY user_id) AS foo, (SELECT user_id, sum(value_2) AS counter FROM events_table WHERE user_id = 2 GROUP BY user_id) AS bar WHERE foo.user_id = bar.user_id ) AS baz ORDER BY 1,2 LIMIT 5; user_id | counter | user_id | counter --------------------------------------------------------------------- 2 | 57 | 2 | 57 (1 row) -- Add one more shard to one relation, then test if we error out because of different -- shard counts for joining relations. SELECT master_create_empty_shard('orders_subquery') AS new_shard_id \gset UPDATE pg_dist_shard SET shardminvalue = 15000, shardmaxvalue = 20000 WHERE shardid = :new_shard_id; SELECT avg(unit_price) FROM (SELECT l_orderkey, avg(o_totalprice / l_quantity) AS unit_price FROM lineitem_subquery, orders_subquery WHERE l_orderkey = o_orderkey GROUP BY l_orderkey) AS unit_prices; ERROR: shard counts of co-located tables do not match -- Check that we can prune shards in subqueries with VARCHAR partition columns CREATE TABLE subquery_pruning_varchar_test_table ( a varchar, b int ); SET citus.shard_replication_factor TO 1; SELECT create_distributed_table('subquery_pruning_varchar_test_table', 'a', 'hash'); create_distributed_table --------------------------------------------------------------------- (1 row) -- temporarily disable router executor to test pruning behaviour of subquery pushdown SET citus.enable_router_execution TO off; SET client_min_messages TO DEBUG2; SELECT * FROM (SELECT count(*) FROM subquery_pruning_varchar_test_table WHERE a = 'onder' GROUP BY a) AS foo; DEBUG: Router planner not enabled. count --------------------------------------------------------------------- (0 rows) SELECT * FROM (SELECT count(*) FROM subquery_pruning_varchar_test_table WHERE 'eren' = a GROUP BY a) AS foo; DEBUG: Router planner not enabled. count --------------------------------------------------------------------- (0 rows) SET client_min_messages TO NOTICE; -- test subquery join on VARCHAR partition column SELECT * FROM (SELECT a_inner AS a FROM (SELECT subquery_pruning_varchar_test_table.a AS a_inner FROM subquery_pruning_varchar_test_table GROUP BY subquery_pruning_varchar_test_table.a HAVING count(subquery_pruning_varchar_test_table.a) < 3) AS f1, (SELECT subquery_pruning_varchar_test_table.a FROM subquery_pruning_varchar_test_table GROUP BY subquery_pruning_varchar_test_table.a HAVING sum(coalesce(subquery_pruning_varchar_test_table.b,0)) > 20.0) AS f2 WHERE f1.a_inner = f2.a GROUP BY a_inner) AS foo; a --------------------------------------------------------------------- (0 rows) RESET citus.enable_router_execution; -- Test https://github.com/citusdata/citus/issues/3424 insert into subquery_pruning_varchar_test_table values ('1', '1'), (2, '1'), (3, '2'), (3, '1'), (4, '4'), (5, '6'); WITH cte_1 AS (SELECT b max FROM subquery_pruning_varchar_test_table) SELECT a FROM subquery_pruning_varchar_test_table JOIN cte_1 ON a = max::text GROUP BY a HAVING a = (SELECT a) ORDER BY 1; ERROR: Subqueries in HAVING cannot refer to outer query -- Test https://github.com/citusdata/citus/issues/3432 SELECT t1.event_type FROM events_table t1 GROUP BY t1.event_type HAVING t1.event_type > avg((SELECT t2.value_2 FROM users_table t2 ORDER BY 1 DESC LIMIT 1)) ORDER BY 1; event_type --------------------------------------------------------------------- 6 (1 row) SELECT t1.event_type FROM events_table t1 GROUP BY t1.event_type HAVING t1.event_type > avg(2 + (SELECT t2.value_2 FROM users_table t2 ORDER BY 1 DESC LIMIT 1)) ORDER BY 1; event_type --------------------------------------------------------------------- (0 rows) SELECT t1.event_type FROM events_table t1 GROUP BY t1.event_type HAVING t1.event_type > avg((SELECT t2.value_2 FROM users_table t2 ORDER BY 1 DESC LIMIT 1) - t1.value_2) ORDER BY 1; event_type --------------------------------------------------------------------- 4 5 6 (3 rows) RESET citus.coordinator_aggregation_strategy; SELECT t1.event_type FROM events_table t1 GROUP BY t1.event_type HAVING t1.event_type > corr(t1.value_3, t1.value_2 + (SELECT t2.value_2 FROM users_table t2 ORDER BY 1 DESC LIMIT 1)) ORDER BY 1; event_type --------------------------------------------------------------------- 0 1 2 3 4 5 (6 rows) SELECT t1.event_type FROM events_table t1 GROUP BY t1.event_type HAVING t1.event_type * 5 > sum(distinct t1.value_3) ORDER BY 1; event_type --------------------------------------------------------------------- 3 4 5 6 (4 rows) SET citus.coordinator_aggregation_strategy TO 'disabled'; -- Test https://github.com/citusdata/citus/issues/3433 CREATE TABLE keyval1 (key int, value int); SELECT create_distributed_table('keyval1', 'key'); create_distributed_table --------------------------------------------------------------------- (1 row) CREATE TABLE keyval2 (key int, value int); SELECT create_distributed_table('keyval2', 'key'); create_distributed_table --------------------------------------------------------------------- (1 row) CREATE TABLE keyvalref (key int, value int); SELECT create_reference_table('keyvalref'); create_reference_table --------------------------------------------------------------------- (1 row) EXPLAIN (COSTS OFF) SELECT count(*) FROM keyval1 GROUP BY key HAVING sum(value) > (SELECT sum(value) FROM keyvalref GROUP BY key); QUERY PLAN --------------------------------------------------------------------- Custom Scan (Citus Adaptive) -> Distributed Subplan XXX_1 -> Custom Scan (Citus Adaptive) Task Count: 1 Tasks Shown: All -> Task Node: host=localhost port=xxxxx dbname=regression -> HashAggregate Group Key: key -> Seq Scan on keyvalref_xxxxxxx keyvalref Task Count: 4 Tasks Shown: One of 4 -> Task Node: host=localhost port=xxxxx dbname=regression -> HashAggregate Group Key: keyval1.key Filter: (sum(keyval1.value) > $0) InitPlan 1 (returns $0) -> Function Scan on read_intermediate_result intermediate_result -> Seq Scan on keyval1_xxxxxxx keyval1 (20 rows) -- For some reason 'ORDER BY 1 DESC LIMIT 1' triggers recursive planning EXPLAIN (COSTS OFF) SELECT count(*) FROM keyval1 GROUP BY key HAVING sum(value) > (SELECT sum(value) FROM keyvalref GROUP BY key ORDER BY 1 DESC LIMIT 1); QUERY PLAN --------------------------------------------------------------------- Custom Scan (Citus Adaptive) -> Distributed Subplan XXX_1 -> Custom Scan (Citus Adaptive) Task Count: 1 Tasks Shown: All -> Task Node: host=localhost port=xxxxx dbname=regression -> Limit -> Sort Sort Key: (sum(value)) DESC -> HashAggregate Group Key: key -> Seq Scan on keyvalref_xxxxxxx keyvalref Task Count: 4 Tasks Shown: One of 4 -> Task Node: host=localhost port=xxxxx dbname=regression -> HashAggregate Group Key: keyval1.key Filter: (sum(keyval1.value) > $0) InitPlan 1 (returns $0) -> Function Scan on read_intermediate_result intermediate_result -> Seq Scan on keyval1_xxxxxxx keyval1 (23 rows) EXPLAIN (COSTS OFF) SELECT count(*) FROM keyval1 GROUP BY key HAVING sum(value) > (SELECT sum(value) FROM keyval2 GROUP BY key ORDER BY 1 DESC LIMIT 1); QUERY PLAN --------------------------------------------------------------------- Custom Scan (Citus Adaptive) -> Distributed Subplan XXX_1 -> Limit -> Sort Sort Key: remote_scan.sum DESC -> Custom Scan (Citus Adaptive) Task Count: 4 Tasks Shown: One of 4 -> Task Node: host=localhost port=xxxxx dbname=regression -> Limit -> Sort Sort Key: (sum(value)) DESC -> HashAggregate Group Key: key -> Seq Scan on keyval2_xxxxxxx keyval2 Task Count: 4 Tasks Shown: One of 4 -> Task Node: host=localhost port=xxxxx dbname=regression -> HashAggregate Group Key: keyval1.key Filter: (sum(keyval1.value) > $0) InitPlan 1 (returns $0) -> Function Scan on read_intermediate_result intermediate_result -> Seq Scan on keyval1_xxxxxxx keyval1 (26 rows) EXPLAIN (COSTS OFF) SELECT count(*) FROM keyval1 k1 WHERE k1.key = 2 HAVING sum(value) > (SELECT sum(value) FROM keyval2 k2 WHERE k2.key = 2 ORDER BY 1 DESC LIMIT 1); QUERY PLAN --------------------------------------------------------------------- Custom Scan (Citus Adaptive) Task Count: 1 Tasks Shown: All -> Task Node: host=localhost port=xxxxx dbname=regression -> Aggregate Filter: (sum(k1.value) > $0) InitPlan 1 (returns $0) -> Limit -> Sort Sort Key: (sum(k2.value)) DESC -> Aggregate -> Seq Scan on keyval2_xxxxxxx k2 Filter: (key = 2) -> Seq Scan on keyval1_xxxxxxx k1 Filter: (key = 2) (16 rows) -- Simple join subquery pushdown SELECT avg(array_length(events, 1)) AS event_average FROM (SELECT tenant_id, user_id, array_agg(event_type ORDER BY event_time) AS events FROM (SELECT (users.composite_id).tenant_id, (users.composite_id).user_id, event_type, events.event_time FROM users, events WHERE (users.composite_id) = (events.composite_id) AND users.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND users.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND event_type IN ('click', 'submit', 'pay')) AS subquery GROUP BY tenant_id, user_id) AS subquery; event_average --------------------------------------------------------------------- 3.6666666666666667 (1 row) -- Union and left join subquery pushdown SELECT avg(array_length(events, 1)) AS event_average, hasdone FROM (SELECT subquery_1.tenant_id, subquery_1.user_id, array_agg(event ORDER BY event_time) AS events, COALESCE(hasdone, 'Has not done paying') AS hasdone FROM ( (SELECT (users.composite_id).tenant_id, (users.composite_id).user_id, (users.composite_id) as composite_id, 'action=>1'AS event, events.event_time FROM users, events WHERE (users.composite_id) = (events.composite_id) AND users.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND users.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND event_type = 'click') UNION (SELECT (users.composite_id).tenant_id, (users.composite_id).user_id, (users.composite_id) as composite_id, 'action=>2'AS event, events.event_time FROM users, events WHERE (users.composite_id) = (events.composite_id) AND users.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND users.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND event_type = 'submit') ) AS subquery_1 LEFT JOIN (SELECT DISTINCT ON ((composite_id).tenant_id, (composite_id).user_id) composite_id, (composite_id).tenant_id, (composite_id).user_id, 'Has done paying'::TEXT AS hasdone FROM events WHERE events.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND events.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND event_type = 'pay') AS subquery_2 ON subquery_1.composite_id = subquery_2.composite_id GROUP BY subquery_1.tenant_id, subquery_1.user_id, hasdone) AS subquery_top GROUP BY hasdone ORDER BY event_average DESC; event_average | hasdone --------------------------------------------------------------------- 4.0000000000000000 | Has not done paying 2.5000000000000000 | Has done paying (2 rows) -- Union, left join and having subquery pushdown SELECT avg(array_length(events, 1)) AS event_average, count_pay FROM ( SELECT subquery_1.tenant_id, subquery_1.user_id, array_agg(event ORDER BY event_time) AS events, COALESCE(count_pay, 0) AS count_pay FROM ( (SELECT (users.composite_id).tenant_id, (users.composite_id).user_id, (users.composite_id), 'action=>1'AS event, events.event_time FROM users, events WHERE (users.composite_id) = (events.composite_id) AND users.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND users.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND event_type = 'click') UNION (SELECT (users.composite_id).tenant_id, (users.composite_id).user_id, (users.composite_id), 'action=>2'AS event, events.event_time FROM users, events WHERE (users.composite_id) = (events.composite_id) AND users.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND users.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND event_type = 'submit') ) AS subquery_1 LEFT JOIN (SELECT (composite_id).tenant_id, (composite_id).user_id, composite_id, COUNT(*) AS count_pay FROM events WHERE events.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND events.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND event_type = 'pay' GROUP BY composite_id HAVING COUNT(*) > 2) AS subquery_2 ON subquery_1.composite_id = subquery_2.composite_id GROUP BY subquery_1.tenant_id, subquery_1.user_id, count_pay) AS subquery_top WHERE array_ndims(events) > 0 GROUP BY count_pay ORDER BY count_pay; event_average | count_pay --------------------------------------------------------------------- 3.0000000000000000 | 0 (1 row) -- Lateral join subquery pushdown -- set subquery_pushdown since there is limit in the query SET citus.subquery_pushdown to ON; NOTICE: Setting citus.subquery_pushdown flag is discouraged becuase it forces the planner to pushdown certain queries, skipping relevant correctness checks. DETAIL: When enabled, the planner skips many correctness checks for subqueries and pushes down the queries to shards as-is. It means that the queries are likely to return wrong results unless the user is absolutely sure that pushing down the subquery is safe. This GUC is maintained only for backward compatibility, no new users are supposed to use it. The planner is capable of pushing down as much computation as possible to the shards depending on the query. SELECT tenant_id, user_id, user_lastseen, event_array FROM (SELECT tenant_id, user_id, max(lastseen) as user_lastseen, array_agg(event_type ORDER BY event_time) AS event_array FROM (SELECT (composite_id).tenant_id, (composite_id).user_id, composite_id, lastseen FROM users WHERE composite_id >= '(1, -9223372036854775808)'::user_composite_type AND composite_id <= '(1, 9223372036854775807)'::user_composite_type ORDER BY lastseen DESC LIMIT 10 ) AS subquery_top LEFT JOIN LATERAL (SELECT event_type, event_time FROM events WHERE (composite_id) = subquery_top.composite_id ORDER BY event_time DESC LIMIT 99) AS subquery_lateral ON true GROUP BY tenant_id, user_id ) AS shard_union ORDER BY user_lastseen DESC LIMIT 10; tenant_id | user_id | user_lastseen | event_array --------------------------------------------------------------------- 1 | 1003 | 1472807315 | {click,click,click,submit} 1 | 1002 | 1472807215 | {click,click,submit,pay} 1 | 1001 | 1472807115 | {click,submit,pay} (3 rows) -- cleanup the tables and the type & functions -- also set the min messages to WARNING to skip -- CASCADE NOTICE messagez SET client_min_messages TO WARNING; DROP TABLE users, events, subquery_pruning_varchar_test_table, keyval1, keyval2, keyvalref; DROP TYPE user_composite_type CASCADE; SET client_min_messages TO DEFAULT; SET citus.subquery_pushdown to OFF;