-- -- MULTI_SUBQUERY -- ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 270000; -- print major version to make version-specific tests clear SHOW server_version \gset SELECT substring(:'server_version', '\d+\.\d+') AS major_version; major_version --------------- 9.5 (1 row) -- Create tables for subquery tests CREATE TABLE lineitem_subquery ( l_orderkey bigint not null, l_partkey integer not null, l_suppkey integer not null, l_linenumber integer not null, l_quantity decimal(15, 2) not null, l_extendedprice decimal(15, 2) not null, l_discount decimal(15, 2) not null, l_tax decimal(15, 2) not null, l_returnflag char(1) not null, l_linestatus char(1) not null, l_shipdate date not null, l_commitdate date not null, l_receiptdate date not null, l_shipinstruct char(25) not null, l_shipmode char(10) not null, l_comment varchar(44) not null, PRIMARY KEY(l_orderkey, l_linenumber) ); SELECT master_create_distributed_table('lineitem_subquery', 'l_orderkey', 'range'); master_create_distributed_table --------------------------------- (1 row) CREATE TABLE orders_subquery ( o_orderkey bigint not null, o_custkey integer not null, o_orderstatus char(1) not null, o_totalprice decimal(15,2) not null, o_orderdate date not null, o_orderpriority char(15) not null, o_clerk char(15) not null, o_shippriority integer not null, o_comment varchar(79) not null, PRIMARY KEY(o_orderkey) ); SELECT master_create_distributed_table('orders_subquery', 'o_orderkey', 'range'); master_create_distributed_table --------------------------------- (1 row) SET citus.enable_router_execution TO 'false'; -- Check that we don't allow subquery pushdown in default settings. SELECT avg(unit_price) FROM (SELECT l_orderkey, avg(o_totalprice) AS unit_price FROM lineitem_subquery, orders_subquery WHERE l_orderkey = o_orderkey GROUP BY l_orderkey) AS unit_prices; ERROR: cannot perform distributed planning on this query DETAIL: Join in subqueries is not supported yet SET citus.subquery_pushdown to TRUE; -- Check that we don't crash if there are not any shards. SELECT avg(unit_price) FROM (SELECT l_orderkey, avg(o_totalprice) AS unit_price FROM lineitem_subquery, orders_subquery WHERE l_orderkey = o_orderkey GROUP BY l_orderkey) AS unit_prices; avg ----- (1 row) -- Load data into tables. SELECT master_create_empty_shard('lineitem_subquery') AS new_shard_id \gset UPDATE pg_dist_shard SET shardminvalue = 1, shardmaxvalue = 5986 WHERE shardid = :new_shard_id; SELECT master_create_empty_shard('lineitem_subquery') AS new_shard_id \gset UPDATE pg_dist_shard SET shardminvalue = 8997, shardmaxvalue = 14947 WHERE shardid = :new_shard_id; SELECT master_create_empty_shard('orders_subquery') AS new_shard_id \gset UPDATE pg_dist_shard SET shardminvalue = 1, shardmaxvalue = 5986 WHERE shardid = :new_shard_id; SELECT master_create_empty_shard('orders_subquery') AS new_shard_id \gset UPDATE pg_dist_shard SET shardminvalue = 8997, shardmaxvalue = 14946 WHERE shardid = :new_shard_id; SET citus.shard_max_size TO "1MB"; \copy lineitem_subquery FROM '@abs_srcdir@/data/lineitem.1.data' with delimiter '|' \copy lineitem_subquery FROM '@abs_srcdir@/data/lineitem.2.data' with delimiter '|' \copy orders_subquery FROM '@abs_srcdir@/data/orders.1.data' with delimiter '|' \copy orders_subquery FROM '@abs_srcdir@/data/orders.2.data' with delimiter '|' -- Check that we error out if shard min/max values are not exactly same. SELECT avg(unit_price) FROM (SELECT l_orderkey, avg(o_totalprice) AS unit_price FROM lineitem_subquery, orders_subquery WHERE l_orderkey = o_orderkey GROUP BY l_orderkey) AS unit_prices; ERROR: cannot push down this subquery DETAIL: Shards of relations in subquery need to have 1-to-1 shard partitioning -- Update metadata in order to make all shards equal. UPDATE pg_dist_shard SET shardmaxvalue = '14947' WHERE shardid = 270003; -- If group by is not on partition column then we error out. SELECT avg(order_count) FROM (SELECT l_suppkey, count(*) AS order_count FROM lineitem_subquery GROUP BY l_suppkey) AS order_counts; ERROR: cannot push down this subquery DETAIL: Group by list without partition column is currently unsupported -- Check that we error out if join is not on partition columns. SELECT avg(unit_price) FROM (SELECT l_orderkey, avg(o_totalprice / l_quantity) AS unit_price FROM lineitem_subquery, orders_subquery GROUP BY l_orderkey) AS unit_prices; ERROR: cannot push down this subquery DETAIL: Relations need to be joining on partition columns SELECT avg(unit_price) FROM (SELECT l_orderkey, avg(o_totalprice / l_quantity) AS unit_price FROM lineitem_subquery, orders_subquery WHERE l_orderkey = o_custkey GROUP BY l_orderkey) AS unit_prices; ERROR: cannot push down this subquery DETAIL: Relations need to be joining on partition columns -- Check that we error out if there is union all. SELECT count(*) FROM ( (SELECT l_orderkey FROM lineitem_subquery) UNION ALL (SELECT 1::bigint) ) b; ERROR: could not run distributed query with complex table expressions -- Check that we error out if queries in union do not include partition columns. SELECT count(*) FROM ( (SELECT l_orderkey FROM lineitem_subquery) UNION (SELECT l_partkey FROM lineitem_subquery) ) b; ERROR: cannot push down this subquery DETAIL: Union clauses need to select partition columns -- Check that we run union queries if partition column is selected. SELECT count(*) FROM ( (SELECT l_orderkey FROM lineitem_subquery) UNION (SELECT l_orderkey FROM lineitem_subquery) ) b; count ------- 2985 (1 row) -- Check that we error out if the outermost query has subquery join. SELECT avg(o_totalprice/l_quantity) FROM (SELECT l_orderkey, l_quantity FROM lineitem_subquery ORDER BY l_quantity LIMIT 10 ) lineitem_quantities JOIN LATERAL (SELECT o_totalprice FROM orders_subquery WHERE lineitem_quantities.l_orderkey = o_orderkey) orders_price ON true; ERROR: cannot perform distributed planning on this query DETAIL: Join in subqueries is not supported yet -- Check that we error out if the outermost query is a distinct clause. SELECT count(DISTINCT a) FROM ( SELECT count(*) a FROM lineitem_subquery ) z; ERROR: cannot push down this subquery DETAIL: distinct in the outermost query is unsupported -- Check supported subquery types. SELECT o_custkey, sum(order_count) as total_order_count FROM (SELECT o_orderkey, o_custkey, count(*) AS order_count FROM orders_subquery WHERE o_orderkey > 0 AND o_orderkey < 12000 GROUP BY o_orderkey, o_custkey) AS order_counts GROUP BY o_custkey ORDER BY total_order_count DESC, o_custkey ASC LIMIT 10; o_custkey | total_order_count -----------+------------------- 1462 | 9 619 | 8 643 | 8 1030 | 8 1486 | 8 79 | 7 304 | 7 319 | 7 343 | 7 448 | 7 (10 rows) SELECT avg(unit_price) FROM (SELECT l_orderkey, avg(o_totalprice / l_quantity) AS unit_price FROM lineitem_subquery, orders_subquery WHERE l_orderkey = o_orderkey GROUP BY l_orderkey) AS unit_prices WHERE unit_price > 1000 AND unit_price < 10000; avg ----------------------- 4968.2889885208475549 (1 row) -- Check that if subquery is pulled, we don't error and run query properly. SELECT count(*) FROM ( SELECT l_orderkey FROM ( (SELECT l_orderkey FROM lineitem_subquery) UNION (SELECT l_orderkey FROM lineitem_subquery) ) a WHERE l_orderkey = 1 ) b; count ------- 1 (1 row) SELECT count(*) FROM ( SELECT * FROM ( (SELECT * FROM lineitem_subquery) UNION (SELECT * FROM lineitem_subquery) ) a WHERE l_orderkey = 1 ) b; count ------- 6 (1 row) SELECT max(l_orderkey) FROM ( SELECT l_orderkey FROM ( SELECT l_orderkey FROM lineitem_subquery WHERE l_orderkey < 20000 GROUP BY l_orderkey ) z ) y; max ------- 14947 (1 row) -- Add one more shard to one relation, then test if we error out because of different -- shard counts for joining relations. SELECT master_create_empty_shard('orders_subquery') AS new_shard_id \gset UPDATE pg_dist_shard SET shardminvalue = 15000, shardmaxvalue = 20000 WHERE shardid = :new_shard_id; SELECT avg(unit_price) FROM (SELECT l_orderkey, avg(o_totalprice / l_quantity) AS unit_price FROM lineitem_subquery, orders_subquery WHERE l_orderkey = o_orderkey GROUP BY l_orderkey) AS unit_prices; ERROR: cannot push down this subquery DETAIL: Shards of relations in subquery need to have 1-to-1 shard partitioning -- Check that we can prune shards in subqueries with VARCHAR partition columns CREATE TABLE subquery_pruning_varchar_test_table ( a varchar, b int ); SELECT master_create_distributed_table('subquery_pruning_varchar_test_table', 'a', 'hash'); master_create_distributed_table --------------------------------- (1 row) SELECT master_create_worker_shards('subquery_pruning_varchar_test_table', 4, 1); master_create_worker_shards ----------------------------- (1 row) SET citus.subquery_pushdown TO TRUE; SET client_min_messages TO DEBUG2; SELECT * FROM (SELECT count(*) FROM subquery_pruning_varchar_test_table WHERE a = 'onder' GROUP BY a) AS foo; count ------- (0 rows) SELECT * FROM (SELECT count(*) FROM subquery_pruning_varchar_test_table WHERE 'eren' = a GROUP BY a) AS foo; count ------- (0 rows) SET client_min_messages TO NOTICE; -- test subquery join on VARCHAR partition column SELECT * FROM (SELECT a_inner AS a FROM (SELECT subquery_pruning_varchar_test_table.a AS a_inner FROM subquery_pruning_varchar_test_table GROUP BY subquery_pruning_varchar_test_table.a HAVING count(subquery_pruning_varchar_test_table.a) < 3) AS f1, (SELECT subquery_pruning_varchar_test_table.a FROM subquery_pruning_varchar_test_table GROUP BY subquery_pruning_varchar_test_table.a HAVING sum(coalesce(subquery_pruning_varchar_test_table.b,0)) > 20.0) AS f2 WHERE f1.a_inner = f2.a GROUP BY a_inner) AS foo; a --- (0 rows) DROP TABLE subquery_pruning_varchar_test_table; -- Create composite type to use in subquery pushdown CREATE TYPE user_composite_type AS ( tenant_id BIGINT, user_id BIGINT ); \c - - - :worker_1_port CREATE TYPE user_composite_type AS ( tenant_id BIGINT, user_id BIGINT ); \c - - - :worker_2_port CREATE TYPE user_composite_type AS ( tenant_id BIGINT, user_id BIGINT ); \c - - - :master_port CREATE TABLE events ( composite_id user_composite_type, event_id bigint, event_type character varying(255), event_time bigint ); SELECT master_create_distributed_table('events', 'composite_id', 'range'); master_create_distributed_table --------------------------------- (1 row) SELECT master_create_empty_shard('events') AS new_shard_id \gset UPDATE pg_dist_shard SET shardminvalue = '(1,1)', shardmaxvalue = '(1,2000000000)' WHERE shardid = :new_shard_id; SELECT master_create_empty_shard('events') AS new_shard_id \gset UPDATE pg_dist_shard SET shardminvalue = '(1,2000000001)', shardmaxvalue = '(1,4300000000)' WHERE shardid = :new_shard_id; SELECT master_create_empty_shard('events') AS new_shard_id \gset UPDATE pg_dist_shard SET shardminvalue = '(2,1)', shardmaxvalue = '(2,2000000000)' WHERE shardid = :new_shard_id; SELECT master_create_empty_shard('events') AS new_shard_id \gset UPDATE pg_dist_shard SET shardminvalue = '(2,2000000001)', shardmaxvalue = '(2,4300000000)' WHERE shardid = :new_shard_id; \COPY events FROM STDIN WITH CSV CREATE TABLE users ( composite_id user_composite_type, lastseen bigint ); SELECT master_create_distributed_table('users', 'composite_id', 'range'); master_create_distributed_table --------------------------------- (1 row) SELECT master_create_empty_shard('users') AS new_shard_id \gset UPDATE pg_dist_shard SET shardminvalue = '(1,1)', shardmaxvalue = '(1,2000000000)' WHERE shardid = :new_shard_id; SELECT master_create_empty_shard('users') AS new_shard_id \gset UPDATE pg_dist_shard SET shardminvalue = '(1,2000000001)', shardmaxvalue = '(1,4300000000)' WHERE shardid = :new_shard_id; SELECT master_create_empty_shard('users') AS new_shard_id \gset UPDATE pg_dist_shard SET shardminvalue = '(2,1)', shardmaxvalue = '(2,2000000000)' WHERE shardid = :new_shard_id; SELECT master_create_empty_shard('users') AS new_shard_id \gset UPDATE pg_dist_shard SET shardminvalue = '(2,2000000001)', shardmaxvalue = '(2,4300000000)' WHERE shardid = :new_shard_id; \COPY users FROM STDIN WITH CSV SET citus.subquery_pushdown TO TRUE; -- Simple join subquery pushdown SELECT avg(array_length(events, 1)) AS event_average FROM (SELECT tenant_id, user_id, array_agg(event_type ORDER BY event_time) AS events FROM (SELECT (users.composite_id).tenant_id, (users.composite_id).user_id, event_type, events.event_time FROM users, events WHERE (users.composite_id).tenant_id = (events.composite_id).tenant_id AND (users.composite_id).user_id = (events.composite_id).user_id AND users.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND users.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND event_type IN ('click', 'submit', 'pay')) AS subquery GROUP BY tenant_id, user_id) AS subquery; event_average -------------------- 3.6666666666666667 (1 row) -- Union and left join subquery pushdown SELECT avg(array_length(events, 1)) AS event_average, hasdone FROM (SELECT subquery_1.tenant_id, subquery_1.user_id, array_agg(event ORDER BY event_time) AS events, COALESCE(hasdone, 'Has not done paying') AS hasdone FROM ( (SELECT (users.composite_id).tenant_id, (users.composite_id).user_id, 'action=>1'AS event, events.event_time FROM users, events WHERE (users.composite_id).tenant_id = (events.composite_id).tenant_id AND (users.composite_id).user_id = (events.composite_id).user_id AND users.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND users.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND event_type = 'click') UNION (SELECT (users.composite_id).tenant_id, (users.composite_id).user_id, 'action=>2'AS event, events.event_time FROM users, events WHERE (users.composite_id).tenant_id = (events.composite_id).tenant_id AND (users.composite_id).user_id = (events.composite_id).user_id AND users.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND users.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND event_type = 'submit') ) AS subquery_1 LEFT JOIN (SELECT DISTINCT ON ((composite_id).tenant_id, (composite_id).user_id) composite_id, (composite_id).tenant_id, (composite_id).user_id, 'Has done paying'::TEXT AS hasdone FROM events WHERE events.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND events.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND event_type = 'pay') AS subquery_2 ON subquery_1.tenant_id = subquery_2.tenant_id AND subquery_1.user_id = subquery_2.user_id GROUP BY subquery_1.tenant_id, subquery_1.user_id, hasdone) AS subquery_top GROUP BY hasdone; event_average | hasdone --------------------+--------------------- 4.0000000000000000 | Has not done paying 2.5000000000000000 | Has done paying (2 rows) -- Union, left join and having subquery pushdown SELECT avg(array_length(events, 1)) AS event_average, count_pay FROM ( SELECT subquery_1.tenant_id, subquery_1.user_id, array_agg(event ORDER BY event_time) AS events, COALESCE(count_pay, 0) AS count_pay FROM ( (SELECT (users.composite_id).tenant_id, (users.composite_id).user_id, 'action=>1'AS event, events.event_time FROM users, events WHERE (users.composite_id).tenant_id = (events.composite_id).tenant_id AND (users.composite_id).user_id = (events.composite_id).user_id AND users.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND users.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND event_type = 'click') UNION (SELECT (users.composite_id).tenant_id, (users.composite_id).user_id, 'action=>2'AS event, events.event_time FROM users, events WHERE (users.composite_id).tenant_id = (events.composite_id).tenant_id AND (users.composite_id).user_id = (events.composite_id).user_id AND users.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND users.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND event_type = 'submit') ) AS subquery_1 LEFT JOIN (SELECT (composite_id).tenant_id, (composite_id).user_id, COUNT(*) AS count_pay FROM events WHERE events.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND events.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND event_type = 'pay' GROUP BY tenant_id, user_id HAVING COUNT(*) > 2) AS subquery_2 ON subquery_1.tenant_id = subquery_2.tenant_id AND subquery_1.user_id = subquery_2.user_id GROUP BY subquery_1.tenant_id, subquery_1.user_id, count_pay) AS subquery_top WHERE array_ndims(events) > 0 GROUP BY count_pay ORDER BY count_pay; event_average | count_pay --------------------+----------- 3.0000000000000000 | 0 (1 row) -- Lateral join subquery pushdown SELECT tenant_id, user_id, user_lastseen, event_array FROM (SELECT tenant_id, user_id, max(lastseen) as user_lastseen, array_agg(event_type ORDER BY event_time) AS event_array FROM (SELECT (composite_id).tenant_id, (composite_id).user_id, lastseen FROM users WHERE composite_id >= '(1, -9223372036854775808)'::user_composite_type AND composite_id <= '(1, 9223372036854775807)'::user_composite_type ORDER BY lastseen DESC LIMIT 10 ) AS subquery_top LEFT JOIN LATERAL (SELECT event_type, event_time FROM events WHERE (composite_id).tenant_id = subquery_top.tenant_id AND (composite_id).user_id = subquery_top.user_id ORDER BY event_time DESC LIMIT 99) AS subquery_lateral ON true GROUP BY tenant_id, user_id ) AS shard_union ORDER BY user_lastseen DESC LIMIT 10; tenant_id | user_id | user_lastseen | event_array -----------+---------+---------------+---------------------------- 1 | 1003 | 1472807315 | {click,click,click,submit} 1 | 1002 | 1472807215 | {click,click,submit,pay} 1 | 1001 | 1472807115 | {click,submit,pay} (3 rows) -- Same queries above with explain -- Simple join subquery pushdown EXPLAIN (COSTS OFF) SELECT avg(array_length(events, 1)) AS event_average FROM (SELECT tenant_id, user_id, array_agg(event_type ORDER BY event_time) AS events FROM (SELECT (users.composite_id).tenant_id, (users.composite_id).user_id, event_type, events.event_time FROM users, events WHERE (users.composite_id).tenant_id = (events.composite_id).tenant_id AND (users.composite_id).user_id = (events.composite_id).user_id AND users.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND users.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND event_type IN ('click', 'submit', 'pay')) AS subquery GROUP BY tenant_id, user_id) AS subquery; QUERY PLAN ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- Aggregate -> Custom Scan (Citus Real-Time) Task Count: 2 Tasks Shown: One of 2 -> Task Node: host=localhost port=57637 dbname=regression -> Aggregate -> GroupAggregate Group Key: ((users.composite_id).tenant_id), ((users.composite_id).user_id) -> Merge Join Merge Cond: ((((users.composite_id).tenant_id) = ((events.composite_id).tenant_id)) AND (((users.composite_id).user_id) = ((events.composite_id).user_id))) -> Sort Sort Key: ((users.composite_id).tenant_id), ((users.composite_id).user_id) -> Seq Scan on users_270013 users Filter: ((composite_id >= '(1,-9223372036854775808)'::user_composite_type) AND (composite_id <= '(1,9223372036854775807)'::user_composite_type)) -> Sort Sort Key: ((events.composite_id).tenant_id), ((events.composite_id).user_id) -> Seq Scan on events_270009 events Filter: ((event_type)::text = ANY ('{click,submit,pay}'::text[])) (19 rows) -- Union and left join subquery pushdown EXPLAIN (COSTS OFF) SELECT avg(array_length(events, 1)) AS event_average, hasdone FROM (SELECT subquery_1.tenant_id, subquery_1.user_id, array_agg(event ORDER BY event_time) AS events, COALESCE(hasdone, 'Has not done paying') AS hasdone FROM ( (SELECT (users.composite_id).tenant_id, (users.composite_id).user_id, 'action=>1'AS event, events.event_time FROM users, events WHERE (users.composite_id).tenant_id = (events.composite_id).tenant_id AND (users.composite_id).user_id = (events.composite_id).user_id AND users.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND users.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND event_type = 'click') UNION (SELECT (users.composite_id).tenant_id, (users.composite_id).user_id, 'action=>2'AS event, events.event_time FROM users, events WHERE (users.composite_id).tenant_id = (events.composite_id).tenant_id AND (users.composite_id).user_id = (events.composite_id).user_id AND users.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND users.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND event_type = 'submit') ) AS subquery_1 LEFT JOIN (SELECT DISTINCT ON ((composite_id).tenant_id, (composite_id).user_id) composite_id, (composite_id).tenant_id, (composite_id).user_id, 'Has done paying'::TEXT AS hasdone FROM events WHERE events.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND events.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND event_type = 'pay') AS subquery_2 ON subquery_1.tenant_id = subquery_2.tenant_id AND subquery_1.user_id = subquery_2.user_id GROUP BY subquery_1.tenant_id, subquery_1.user_id, hasdone) AS subquery_top GROUP BY hasdone; QUERY PLAN ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- HashAggregate Group Key: remote_scan.hasdone -> Custom Scan (Citus Real-Time) Task Count: 2 Tasks Shown: One of 2 -> Task Node: host=localhost port=57637 dbname=regression -> HashAggregate Group Key: COALESCE(('Has done paying'::text), 'Has not done paying'::text) -> GroupAggregate Group Key: ((users.composite_id).tenant_id), ((users.composite_id).user_id), ('Has done paying'::text) -> Sort Sort Key: ((users.composite_id).tenant_id), ((users.composite_id).user_id), ('Has done paying'::text) -> Merge Left Join Merge Cond: ((((users.composite_id).tenant_id) = ((events_2.composite_id).tenant_id)) AND (((users.composite_id).user_id) = ((events_2.composite_id).user_id))) -> Unique -> Sort Sort Key: ((users.composite_id).tenant_id), ((users.composite_id).user_id), ('action=>1'::text), events.event_time -> Append -> Nested Loop Join Filter: (((users.composite_id).tenant_id = (events.composite_id).tenant_id) AND ((users.composite_id).user_id = (events.composite_id).user_id)) -> Seq Scan on events_270009 events Filter: ((event_type)::text = 'click'::text) -> Seq Scan on users_270013 users Filter: ((composite_id >= '(1,-9223372036854775808)'::user_composite_type) AND (composite_id <= '(1,9223372036854775807)'::user_composite_type)) -> Nested Loop Join Filter: (((users_1.composite_id).tenant_id = (events_1.composite_id).tenant_id) AND ((users_1.composite_id).user_id = (events_1.composite_id).user_id)) -> Seq Scan on events_270009 events_1 Filter: ((event_type)::text = 'submit'::text) -> Seq Scan on users_270013 users_1 Filter: ((composite_id >= '(1,-9223372036854775808)'::user_composite_type) AND (composite_id <= '(1,9223372036854775807)'::user_composite_type)) -> Materialize -> Unique -> Sort Sort Key: ((events_2.composite_id).tenant_id), ((events_2.composite_id).user_id) -> Seq Scan on events_270009 events_2 Filter: ((composite_id >= '(1,-9223372036854775808)'::user_composite_type) AND (composite_id <= '(1,9223372036854775807)'::user_composite_type) AND ((event_type)::text = 'pay'::text)) (37 rows) -- Union, left join and having subquery pushdown EXPLAIN (COSTS OFF) SELECT avg(array_length(events, 1)) AS event_average, count_pay FROM ( SELECT subquery_1.tenant_id, subquery_1.user_id, array_agg(event ORDER BY event_time) AS events, COALESCE(count_pay, 0) AS count_pay FROM ( (SELECT (users.composite_id).tenant_id, (users.composite_id).user_id, 'action=>1'AS event, events.event_time FROM users, events WHERE (users.composite_id).tenant_id = (events.composite_id).tenant_id AND (users.composite_id).user_id = (events.composite_id).user_id AND users.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND users.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND event_type = 'click') UNION (SELECT (users.composite_id).tenant_id, (users.composite_id).user_id, 'action=>2'AS event, events.event_time FROM users, events WHERE (users.composite_id).tenant_id = (events.composite_id).tenant_id AND (users.composite_id).user_id = (events.composite_id).user_id AND users.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND users.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND event_type = 'submit') ) AS subquery_1 LEFT JOIN (SELECT (composite_id).tenant_id, (composite_id).user_id, COUNT(*) AS count_pay FROM events WHERE events.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND events.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND event_type = 'pay' GROUP BY tenant_id, user_id HAVING COUNT(*) > 2) AS subquery_2 ON subquery_1.tenant_id = subquery_2.tenant_id AND subquery_1.user_id = subquery_2.user_id GROUP BY subquery_1.tenant_id, subquery_1.user_id, count_pay) AS subquery_top WHERE array_ndims(events) > 0 GROUP BY count_pay ORDER BY count_pay; ERROR: bogus varattno for OUTER_VAR var: 3 -- Lateral join subquery pushdown EXPLAIN (COSTS OFF) SELECT tenant_id, user_id, user_lastseen, event_array FROM (SELECT tenant_id, user_id, max(lastseen) as user_lastseen, array_agg(event_type ORDER BY event_time) AS event_array FROM (SELECT (composite_id).tenant_id, (composite_id).user_id, lastseen FROM users WHERE composite_id >= '(1, -9223372036854775808)'::user_composite_type AND composite_id <= '(1, 9223372036854775807)'::user_composite_type ORDER BY lastseen DESC LIMIT 10 ) AS subquery_top LEFT JOIN LATERAL (SELECT event_type, event_time FROM events WHERE (composite_id).tenant_id = subquery_top.tenant_id AND (composite_id).user_id = subquery_top.user_id ORDER BY event_time DESC LIMIT 99) AS subquery_lateral ON true GROUP BY tenant_id, user_id ) AS shard_union ORDER BY user_lastseen DESC LIMIT 10; QUERY PLAN ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- Limit -> Sort Sort Key: remote_scan.user_lastseen DESC -> Custom Scan (Citus Real-Time) Task Count: 2 Tasks Shown: One of 2 -> Task Node: host=localhost port=57637 dbname=regression -> Limit -> Sort Sort Key: (max(users.lastseen)) DESC -> GroupAggregate Group Key: ((users.composite_id).tenant_id), ((users.composite_id).user_id) -> Sort Sort Key: ((users.composite_id).tenant_id), ((users.composite_id).user_id) -> Nested Loop Left Join -> Limit -> Sort Sort Key: users.lastseen DESC -> Seq Scan on users_270013 users Filter: ((composite_id >= '(1,-9223372036854775808)'::user_composite_type) AND (composite_id <= '(1,9223372036854775807)'::user_composite_type)) -> Limit -> Sort Sort Key: events.event_time DESC -> Seq Scan on events_270009 events Filter: (((composite_id).tenant_id = ((users.composite_id).tenant_id)) AND ((composite_id).user_id = ((users.composite_id).user_id))) (26 rows) SET citus.enable_router_execution TO 'true';