diff --git a/src/test/regress/input/multi_subquery.source b/src/test/regress/input/multi_subquery.source index e3f684b34..abbacc398 100644 --- a/src/test/regress/input/multi_subquery.source +++ b/src/test/regress/input/multi_subquery.source @@ -377,3 +377,541 @@ SELECT * FROM AS foo; DROP TABLE subquery_pruning_varchar_test_table; + +-- Create composite type to use in subquery pushdown +CREATE TYPE user_composite_type AS +( + tenant_id BIGINT, + user_id BIGINT +); + +\c - - - :worker_1_port + +CREATE TYPE user_composite_type AS +( + tenant_id BIGINT, + user_id BIGINT +); + +\c - - - :worker_2_port + +CREATE TYPE user_composite_type AS +( + tenant_id BIGINT, + user_id BIGINT +); + +\c - - - :master_port + + +CREATE TABLE events ( + composite_id user_composite_type, + event_id bigint, + event_type character varying(255), + event_time bigint +); +SELECT master_create_distributed_table('events', 'composite_id', 'range'); + +SELECT master_create_empty_shard('events') AS new_shard_id +\gset +UPDATE pg_dist_shard SET shardminvalue = '(1,1)', shardmaxvalue = '(1,2000000000)' +WHERE shardid = :new_shard_id; + +SELECT master_create_empty_shard('events') AS new_shard_id +\gset +UPDATE pg_dist_shard SET shardminvalue = '(1,2000000001)', shardmaxvalue = '(1,4300000000)' +WHERE shardid = :new_shard_id; + +SELECT master_create_empty_shard('events') AS new_shard_id +\gset +UPDATE pg_dist_shard SET shardminvalue = '(2,1)', shardmaxvalue = '(2,2000000000)' +WHERE shardid = :new_shard_id; + +SELECT master_create_empty_shard('events') AS new_shard_id +\gset +UPDATE pg_dist_shard SET shardminvalue = '(2,2000000001)', shardmaxvalue = '(2,4300000000)' +WHERE shardid = :new_shard_id; + +\COPY events FROM STDIN WITH CSV +"(1,1001)",20001,click,1472807012 +"(1,1001)",20002,submit,1472807015 +"(1,1001)",20003,pay,1472807020 +"(1,1002)",20010,click,1472807022 +"(1,1002)",20011,click,1472807023 +"(1,1002)",20012,submit,1472807025 +"(1,1002)",20013,pay,1472807030 +"(1,1003)",20014,click,1472807032 +"(1,1003)",20015,click,1472807033 +"(1,1003)",20016,click,1472807034 +"(1,1003)",20017,submit,1472807035 +\. + +CREATE TABLE users ( + composite_id user_composite_type, + lastseen bigint +); +SELECT master_create_distributed_table('users', 'composite_id', 'range'); + +SELECT master_create_empty_shard('users') AS new_shard_id +\gset +UPDATE pg_dist_shard SET shardminvalue = '(1,1)', shardmaxvalue = '(1,2000000000)' +WHERE shardid = :new_shard_id; + +SELECT master_create_empty_shard('users') AS new_shard_id +\gset +UPDATE pg_dist_shard SET shardminvalue = '(1,2000000001)', shardmaxvalue = '(1,4300000000)' +WHERE shardid = :new_shard_id; + +SELECT master_create_empty_shard('users') AS new_shard_id +\gset +UPDATE pg_dist_shard SET shardminvalue = '(2,1)', shardmaxvalue = '(2,2000000000)' +WHERE shardid = :new_shard_id; + +SELECT master_create_empty_shard('users') AS new_shard_id +\gset +UPDATE pg_dist_shard SET shardminvalue = '(2,2000000001)', shardmaxvalue = '(2,4300000000)' +WHERE shardid = :new_shard_id; + +\COPY users FROM STDIN WITH CSV +"(1,1001)",1472807115 +"(1,1002)",1472807215 +"(1,1003)",1472807315 +\. + + +SET citus.subquery_pushdown TO TRUE; + +-- Simple join subquery pushdown +SELECT + avg(array_length(events, 1)) AS event_average +FROM + (SELECT + tenant_id, + user_id, + array_agg(event_type ORDER BY event_time) AS events + FROM + (SELECT + (users.composite_id).tenant_id, + (users.composite_id).user_id, + event_type, + events.event_time + FROM + users, + events + WHERE + (users.composite_id).tenant_id = (events.composite_id).tenant_id AND + (users.composite_id).user_id = (events.composite_id).user_id AND + users.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND + users.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND + event_type IN ('click', 'submit', 'pay')) AS subquery + GROUP BY + tenant_id, + user_id) AS subquery; + +-- Union and left join subquery pushdown +SELECT + avg(array_length(events, 1)) AS event_average, + hasdone +FROM + (SELECT + subquery_1.tenant_id, + subquery_1.user_id, + array_agg(event ORDER BY event_time) AS events, + COALESCE(hasdone, 'Has not done paying') AS hasdone + FROM + ( + (SELECT + (users.composite_id).tenant_id, + (users.composite_id).user_id, + 'action=>1'AS event, + events.event_time + FROM + users, + events + WHERE + (users.composite_id).tenant_id = (events.composite_id).tenant_id AND + (users.composite_id).user_id = (events.composite_id).user_id AND + users.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND + users.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND + event_type = 'click') + UNION + (SELECT + (users.composite_id).tenant_id, + (users.composite_id).user_id, + 'action=>2'AS event, + events.event_time + FROM + users, + events + WHERE + (users.composite_id).tenant_id = (events.composite_id).tenant_id AND + (users.composite_id).user_id = (events.composite_id).user_id AND + users.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND + users.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND + event_type = 'submit') + ) AS subquery_1 + LEFT JOIN + (SELECT + DISTINCT ON ((composite_id).tenant_id, (composite_id).user_id) composite_id, + (composite_id).tenant_id, + (composite_id).user_id, + 'Has done paying'::TEXT AS hasdone + FROM + events + WHERE + events.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND + events.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND + event_type = 'pay') AS subquery_2 + ON + subquery_1.tenant_id = subquery_2.tenant_id AND + subquery_1.user_id = subquery_2.user_id + GROUP BY + subquery_1.tenant_id, + subquery_1.user_id, + hasdone) AS subquery_top +GROUP BY + hasdone; + +-- Union, left join and having subquery pushdown +SELECT + avg(array_length(events, 1)) AS event_average, + count_pay + FROM ( + SELECT + subquery_1.tenant_id, + subquery_1.user_id, + array_agg(event ORDER BY event_time) AS events, + COALESCE(count_pay, 0) AS count_pay + FROM + ( + (SELECT + (users.composite_id).tenant_id, + (users.composite_id).user_id, + 'action=>1'AS event, + events.event_time + FROM + users, + events + WHERE + (users.composite_id).tenant_id = (events.composite_id).tenant_id AND + (users.composite_id).user_id = (events.composite_id).user_id AND + users.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND + users.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND + event_type = 'click') + UNION + (SELECT + (users.composite_id).tenant_id, + (users.composite_id).user_id, + 'action=>2'AS event, + events.event_time + FROM + users, + events + WHERE + (users.composite_id).tenant_id = (events.composite_id).tenant_id AND + (users.composite_id).user_id = (events.composite_id).user_id AND + users.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND + users.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND + event_type = 'submit') + ) AS subquery_1 + LEFT JOIN + (SELECT + (composite_id).tenant_id, + (composite_id).user_id, + COUNT(*) AS count_pay + FROM + events + WHERE + events.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND + events.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND + event_type = 'pay' + GROUP BY + tenant_id, + user_id + HAVING + COUNT(*) > 2) AS subquery_2 + ON + subquery_1.tenant_id = subquery_2.tenant_id AND + subquery_1.user_id = subquery_2.user_id + GROUP BY + subquery_1.tenant_id, + subquery_1.user_id, + count_pay) AS subquery_top +WHERE + array_ndims(events) > 0 +GROUP BY + count_pay +ORDER BY + count_pay; + +-- Lateral join subquery pushdown +SELECT + tenant_id, + user_id, + user_lastseen, + event_array +FROM + (SELECT + tenant_id, + user_id, + max(lastseen) as user_lastseen, + array_agg(event_type ORDER BY event_time) AS event_array + FROM + (SELECT + (composite_id).tenant_id, + (composite_id).user_id, + lastseen + FROM + users + WHERE + composite_id >= '(1, -9223372036854775808)'::user_composite_type AND + composite_id <= '(1, 9223372036854775807)'::user_composite_type + ORDER BY + lastseen DESC + LIMIT + 10 + ) AS subquery_top + LEFT JOIN LATERAL + (SELECT + event_type, + event_time + FROM + events + WHERE + (composite_id).tenant_id = subquery_top.tenant_id AND + (composite_id).user_id = subquery_top.user_id + ORDER BY + event_time DESC + LIMIT + 99) AS subquery_lateral + ON + true + GROUP BY + tenant_id, + user_id + ) AS shard_union +ORDER BY + user_lastseen DESC +LIMIT + 10; + + +-- Same queries above with explain +SET client_min_messages TO DEBUG2; + +-- Simple join subquery pushdown +EXPLAIN SELECT + avg(array_length(events, 1)) AS event_average +FROM + (SELECT + tenant_id, + user_id, + array_agg(event_type ORDER BY event_time) AS events + FROM + (SELECT + (users.composite_id).tenant_id, + (users.composite_id).user_id, + event_type, + events.event_time + FROM + users, + events + WHERE + (users.composite_id).tenant_id = (events.composite_id).tenant_id AND + (users.composite_id).user_id = (events.composite_id).user_id AND + users.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND + users.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND + event_type IN ('click', 'submit', 'pay')) AS subquery + GROUP BY + tenant_id, + user_id) AS subquery; + +-- Union and left join subquery pushdown +EXPLAIN SELECT + avg(array_length(events, 1)) AS event_average, + hasdone +FROM + (SELECT + subquery_1.tenant_id, + subquery_1.user_id, + array_agg(event ORDER BY event_time) AS events, + COALESCE(hasdone, 'Has not done paying') AS hasdone + FROM + ( + (SELECT + (users.composite_id).tenant_id, + (users.composite_id).user_id, + 'action=>1'AS event, + events.event_time + FROM + users, + events + WHERE + (users.composite_id).tenant_id = (events.composite_id).tenant_id AND + (users.composite_id).user_id = (events.composite_id).user_id AND + users.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND + users.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND + event_type = 'click') + UNION + (SELECT + (users.composite_id).tenant_id, + (users.composite_id).user_id, + 'action=>2'AS event, + events.event_time + FROM + users, + events + WHERE + (users.composite_id).tenant_id = (events.composite_id).tenant_id AND + (users.composite_id).user_id = (events.composite_id).user_id AND + users.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND + users.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND + event_type = 'submit') + ) AS subquery_1 + LEFT JOIN + (SELECT + DISTINCT ON ((composite_id).tenant_id, (composite_id).user_id) composite_id, + (composite_id).tenant_id, + (composite_id).user_id, + 'Has done paying'::TEXT AS hasdone + FROM + events + WHERE + events.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND + events.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND + event_type = 'pay') AS subquery_2 + ON + subquery_1.tenant_id = subquery_2.tenant_id AND + subquery_1.user_id = subquery_2.user_id + GROUP BY + subquery_1.tenant_id, + subquery_1.user_id, + hasdone) AS subquery_top +GROUP BY + hasdone; + +-- Union, left join and having subquery pushdown +EXPLAIN SELECT + avg(array_length(events, 1)) AS event_average, + count_pay + FROM ( + SELECT + subquery_1.tenant_id, + subquery_1.user_id, + array_agg(event ORDER BY event_time) AS events, + COALESCE(count_pay, 0) AS count_pay + FROM + ( + (SELECT + (users.composite_id).tenant_id, + (users.composite_id).user_id, + 'action=>1'AS event, + events.event_time + FROM + users, + events + WHERE + (users.composite_id).tenant_id = (events.composite_id).tenant_id AND + (users.composite_id).user_id = (events.composite_id).user_id AND + users.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND + users.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND + event_type = 'click') + UNION + (SELECT + (users.composite_id).tenant_id, + (users.composite_id).user_id, + 'action=>2'AS event, + events.event_time + FROM + users, + events + WHERE + (users.composite_id).tenant_id = (events.composite_id).tenant_id AND + (users.composite_id).user_id = (events.composite_id).user_id AND + users.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND + users.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND + event_type = 'submit') + ) AS subquery_1 + LEFT JOIN + (SELECT + (composite_id).tenant_id, + (composite_id).user_id, + COUNT(*) AS count_pay + FROM + events + WHERE + events.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND + events.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND + event_type = 'pay' + GROUP BY + tenant_id, + user_id + HAVING + COUNT(*) > 2) AS subquery_2 + ON + subquery_1.tenant_id = subquery_2.tenant_id AND + subquery_1.user_id = subquery_2.user_id + GROUP BY + subquery_1.tenant_id, + subquery_1.user_id, + count_pay) AS subquery_top +WHERE + array_ndims(events) > 0 +GROUP BY + count_pay +ORDER BY + count_pay; + +-- Lateral join subquery pushdown +EXPLAIN SELECT + tenant_id, + user_id, + user_lastseen, + event_array +FROM + (SELECT + tenant_id, + user_id, + max(lastseen) as user_lastseen, + array_agg(event_type ORDER BY event_time) AS event_array + FROM + (SELECT + (composite_id).tenant_id, + (composite_id).user_id, + lastseen + FROM + users + WHERE + composite_id >= '(1, -9223372036854775808)'::user_composite_type AND + composite_id <= '(1, 9223372036854775807)'::user_composite_type + ORDER BY + lastseen DESC + LIMIT + 10 + ) AS subquery_top + LEFT JOIN LATERAL + (SELECT + event_type, + event_time + FROM + events + WHERE + (composite_id).tenant_id = subquery_top.tenant_id AND + (composite_id).user_id = subquery_top.user_id + ORDER BY + event_time DESC + LIMIT + 99) AS subquery_lateral + ON + true + GROUP BY + tenant_id, + user_id + ) AS shard_union +ORDER BY + user_lastseen DESC +LIMIT + 10; + +SET citusdb.task_executor_type TO 'real-time'; +SET client_min_messages TO NOTICE; diff --git a/src/test/regress/output/multi_subquery.source b/src/test/regress/output/multi_subquery.source index e0340f3f6..74a9f72f2 100644 --- a/src/test/regress/output/multi_subquery.source +++ b/src/test/regress/output/multi_subquery.source @@ -421,3 +421,661 @@ AS foo; (0 rows) DROP TABLE subquery_pruning_varchar_test_table; +-- Create composite type to use in subquery pushdown +CREATE TYPE user_composite_type AS +( + tenant_id BIGINT, + user_id BIGINT +); +\c - - - :worker_1_port +CREATE TYPE user_composite_type AS +( + tenant_id BIGINT, + user_id BIGINT +); +\c - - - :worker_2_port +CREATE TYPE user_composite_type AS +( + tenant_id BIGINT, + user_id BIGINT +); +\c - - - :master_port +CREATE TABLE events ( + composite_id user_composite_type, + event_id bigint, + event_type character varying(255), + event_time bigint +); +SELECT master_create_distributed_table('events', 'composite_id', 'range'); + master_create_distributed_table +--------------------------------- + +(1 row) + +SELECT master_create_empty_shard('events') AS new_shard_id +\gset +UPDATE pg_dist_shard SET shardminvalue = '(1,1)', shardmaxvalue = '(1,2000000000)' +WHERE shardid = :new_shard_id; +SELECT master_create_empty_shard('events') AS new_shard_id +\gset +UPDATE pg_dist_shard SET shardminvalue = '(1,2000000001)', shardmaxvalue = '(1,4300000000)' +WHERE shardid = :new_shard_id; +SELECT master_create_empty_shard('events') AS new_shard_id +\gset +UPDATE pg_dist_shard SET shardminvalue = '(2,1)', shardmaxvalue = '(2,2000000000)' +WHERE shardid = :new_shard_id; +SELECT master_create_empty_shard('events') AS new_shard_id +\gset +UPDATE pg_dist_shard SET shardminvalue = '(2,2000000001)', shardmaxvalue = '(2,4300000000)' +WHERE shardid = :new_shard_id; +\COPY events FROM STDIN WITH CSV +CREATE TABLE users ( + composite_id user_composite_type, + lastseen bigint +); +SELECT master_create_distributed_table('users', 'composite_id', 'range'); + master_create_distributed_table +--------------------------------- + +(1 row) + +SELECT master_create_empty_shard('users') AS new_shard_id +\gset +UPDATE pg_dist_shard SET shardminvalue = '(1,1)', shardmaxvalue = '(1,2000000000)' +WHERE shardid = :new_shard_id; +SELECT master_create_empty_shard('users') AS new_shard_id +\gset +UPDATE pg_dist_shard SET shardminvalue = '(1,2000000001)', shardmaxvalue = '(1,4300000000)' +WHERE shardid = :new_shard_id; +SELECT master_create_empty_shard('users') AS new_shard_id +\gset +UPDATE pg_dist_shard SET shardminvalue = '(2,1)', shardmaxvalue = '(2,2000000000)' +WHERE shardid = :new_shard_id; +SELECT master_create_empty_shard('users') AS new_shard_id +\gset +UPDATE pg_dist_shard SET shardminvalue = '(2,2000000001)', shardmaxvalue = '(2,4300000000)' +WHERE shardid = :new_shard_id; +\COPY users FROM STDIN WITH CSV +SET citus.subquery_pushdown TO TRUE; +-- Simple join subquery pushdown +SELECT + avg(array_length(events, 1)) AS event_average +FROM + (SELECT + tenant_id, + user_id, + array_agg(event_type ORDER BY event_time) AS events + FROM + (SELECT + (users.composite_id).tenant_id, + (users.composite_id).user_id, + event_type, + events.event_time + FROM + users, + events + WHERE + (users.composite_id).tenant_id = (events.composite_id).tenant_id AND + (users.composite_id).user_id = (events.composite_id).user_id AND + users.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND + users.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND + event_type IN ('click', 'submit', 'pay')) AS subquery + GROUP BY + tenant_id, + user_id) AS subquery; + event_average +-------------------- + 3.6666666666666667 +(1 row) + +-- Union and left join subquery pushdown +SELECT + avg(array_length(events, 1)) AS event_average, + hasdone +FROM + (SELECT + subquery_1.tenant_id, + subquery_1.user_id, + array_agg(event ORDER BY event_time) AS events, + COALESCE(hasdone, 'Has not done paying') AS hasdone + FROM + ( + (SELECT + (users.composite_id).tenant_id, + (users.composite_id).user_id, + 'action=>1'AS event, + events.event_time + FROM + users, + events + WHERE + (users.composite_id).tenant_id = (events.composite_id).tenant_id AND + (users.composite_id).user_id = (events.composite_id).user_id AND + users.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND + users.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND + event_type = 'click') + UNION + (SELECT + (users.composite_id).tenant_id, + (users.composite_id).user_id, + 'action=>2'AS event, + events.event_time + FROM + users, + events + WHERE + (users.composite_id).tenant_id = (events.composite_id).tenant_id AND + (users.composite_id).user_id = (events.composite_id).user_id AND + users.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND + users.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND + event_type = 'submit') + ) AS subquery_1 + LEFT JOIN + (SELECT + DISTINCT ON ((composite_id).tenant_id, (composite_id).user_id) composite_id, + (composite_id).tenant_id, + (composite_id).user_id, + 'Has done paying'::TEXT AS hasdone + FROM + events + WHERE + events.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND + events.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND + event_type = 'pay') AS subquery_2 + ON + subquery_1.tenant_id = subquery_2.tenant_id AND + subquery_1.user_id = subquery_2.user_id + GROUP BY + subquery_1.tenant_id, + subquery_1.user_id, + hasdone) AS subquery_top +GROUP BY + hasdone; + event_average | hasdone +--------------------+--------------------- + 4.0000000000000000 | Has not done paying + 2.5000000000000000 | Has done paying +(2 rows) + +-- Union, left join and having subquery pushdown +SELECT + avg(array_length(events, 1)) AS event_average, + count_pay + FROM ( + SELECT + subquery_1.tenant_id, + subquery_1.user_id, + array_agg(event ORDER BY event_time) AS events, + COALESCE(count_pay, 0) AS count_pay + FROM + ( + (SELECT + (users.composite_id).tenant_id, + (users.composite_id).user_id, + 'action=>1'AS event, + events.event_time + FROM + users, + events + WHERE + (users.composite_id).tenant_id = (events.composite_id).tenant_id AND + (users.composite_id).user_id = (events.composite_id).user_id AND + users.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND + users.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND + event_type = 'click') + UNION + (SELECT + (users.composite_id).tenant_id, + (users.composite_id).user_id, + 'action=>2'AS event, + events.event_time + FROM + users, + events + WHERE + (users.composite_id).tenant_id = (events.composite_id).tenant_id AND + (users.composite_id).user_id = (events.composite_id).user_id AND + users.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND + users.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND + event_type = 'submit') + ) AS subquery_1 + LEFT JOIN + (SELECT + (composite_id).tenant_id, + (composite_id).user_id, + COUNT(*) AS count_pay + FROM + events + WHERE + events.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND + events.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND + event_type = 'pay' + GROUP BY + tenant_id, + user_id + HAVING + COUNT(*) > 2) AS subquery_2 + ON + subquery_1.tenant_id = subquery_2.tenant_id AND + subquery_1.user_id = subquery_2.user_id + GROUP BY + subquery_1.tenant_id, + subquery_1.user_id, + count_pay) AS subquery_top +WHERE + array_ndims(events) > 0 +GROUP BY + count_pay +ORDER BY + count_pay; + event_average | count_pay +--------------------+----------- + 3.0000000000000000 | 0 +(1 row) + +-- Lateral join subquery pushdown +SELECT + tenant_id, + user_id, + user_lastseen, + event_array +FROM + (SELECT + tenant_id, + user_id, + max(lastseen) as user_lastseen, + array_agg(event_type ORDER BY event_time) AS event_array + FROM + (SELECT + (composite_id).tenant_id, + (composite_id).user_id, + lastseen + FROM + users + WHERE + composite_id >= '(1, -9223372036854775808)'::user_composite_type AND + composite_id <= '(1, 9223372036854775807)'::user_composite_type + ORDER BY + lastseen DESC + LIMIT + 10 + ) AS subquery_top + LEFT JOIN LATERAL + (SELECT + event_type, + event_time + FROM + events + WHERE + (composite_id).tenant_id = subquery_top.tenant_id AND + (composite_id).user_id = subquery_top.user_id + ORDER BY + event_time DESC + LIMIT + 99) AS subquery_lateral + ON + true + GROUP BY + tenant_id, + user_id + ) AS shard_union +ORDER BY + user_lastseen DESC +LIMIT + 10; + tenant_id | user_id | user_lastseen | event_array +-----------+---------+---------------+---------------------------- + 1 | 1003 | 1472807315 | {click,click,click,submit} + 1 | 1002 | 1472807215 | {click,click,submit,pay} + 1 | 1001 | 1472807115 | {click,submit,pay} +(3 rows) + +-- Same queries above with explain +SET client_min_messages TO DEBUG2; +-- Simple join subquery pushdown +EXPLAIN SELECT + avg(array_length(events, 1)) AS event_average +FROM + (SELECT + tenant_id, + user_id, + array_agg(event_type ORDER BY event_time) AS events + FROM + (SELECT + (users.composite_id).tenant_id, + (users.composite_id).user_id, + event_type, + events.event_time + FROM + users, + events + WHERE + (users.composite_id).tenant_id = (events.composite_id).tenant_id AND + (users.composite_id).user_id = (events.composite_id).user_id AND + users.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND + users.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND + event_type IN ('click', 'submit', 'pay')) AS subquery + GROUP BY + tenant_id, + user_id) AS subquery; +DEBUG: predicate pruning for shardId 270015 +DEBUG: predicate pruning for shardId 270016 +DEBUG: predicate pruning for shardId 270011 +DEBUG: predicate pruning for shardId 270012 + QUERY PLAN +---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- + Distributed Query into pg_merge_job_270014 + Executor: Real-Time + Task Count: 2 + Tasks Shown: One of 2 + -> Task + Node: host=localhost port=57637 dbname=regression + -> Aggregate (cost=40.01..40.02 rows=1 width=32) + -> GroupAggregate (cost=39.89..39.99 rows=1 width=556) + Group Key: ((users.composite_id).tenant_id), ((users.composite_id).user_id) + -> Merge Join (cost=39.89..39.97 rows=1 width=556) + Merge Cond: ((((users.composite_id).tenant_id) = ((events.composite_id).tenant_id)) AND (((users.composite_id).user_id) = ((events.composite_id).user_id))) + -> Sort (cost=28.08..28.09 rows=6 width=32) + Sort Key: ((users.composite_id).tenant_id), ((users.composite_id).user_id) + -> Seq Scan on users_270013 users (cost=0.00..28.00 rows=6 width=32) + Filter: ((composite_id >= '(1,-9223372036854775808)'::user_composite_type) AND (composite_id <= '(1,9223372036854775807)'::user_composite_type)) + -> Sort (cost=11.81..11.82 rows=3 width=556) + Sort Key: ((events.composite_id).tenant_id), ((events.composite_id).user_id) + -> Seq Scan on events_270009 events (cost=0.00..11.79 rows=3 width=556) + Filter: ((event_type)::text = ANY ('{click,submit,pay}'::text[])) + Master Query + -> Aggregate (cost=0.01..0.02 rows=1 width=0) + -> Seq Scan on pg_merge_job_270014 (cost=0.00..0.00 rows=0 width=0) +(22 rows) + +-- Union and left join subquery pushdown +EXPLAIN SELECT + avg(array_length(events, 1)) AS event_average, + hasdone +FROM + (SELECT + subquery_1.tenant_id, + subquery_1.user_id, + array_agg(event ORDER BY event_time) AS events, + COALESCE(hasdone, 'Has not done paying') AS hasdone + FROM + ( + (SELECT + (users.composite_id).tenant_id, + (users.composite_id).user_id, + 'action=>1'AS event, + events.event_time + FROM + users, + events + WHERE + (users.composite_id).tenant_id = (events.composite_id).tenant_id AND + (users.composite_id).user_id = (events.composite_id).user_id AND + users.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND + users.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND + event_type = 'click') + UNION + (SELECT + (users.composite_id).tenant_id, + (users.composite_id).user_id, + 'action=>2'AS event, + events.event_time + FROM + users, + events + WHERE + (users.composite_id).tenant_id = (events.composite_id).tenant_id AND + (users.composite_id).user_id = (events.composite_id).user_id AND + users.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND + users.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND + event_type = 'submit') + ) AS subquery_1 + LEFT JOIN + (SELECT + DISTINCT ON ((composite_id).tenant_id, (composite_id).user_id) composite_id, + (composite_id).tenant_id, + (composite_id).user_id, + 'Has done paying'::TEXT AS hasdone + FROM + events + WHERE + events.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND + events.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND + event_type = 'pay') AS subquery_2 + ON + subquery_1.tenant_id = subquery_2.tenant_id AND + subquery_1.user_id = subquery_2.user_id + GROUP BY + subquery_1.tenant_id, + subquery_1.user_id, + hasdone) AS subquery_top +GROUP BY + hasdone; +DEBUG: predicate pruning for shardId 270015 +DEBUG: predicate pruning for shardId 270016 +DEBUG: predicate pruning for shardId 270011 +DEBUG: predicate pruning for shardId 270012 +DEBUG: predicate pruning for shardId 270015 +DEBUG: predicate pruning for shardId 270016 +DEBUG: predicate pruning for shardId 270011 +DEBUG: predicate pruning for shardId 270012 +DEBUG: predicate pruning for shardId 270011 +DEBUG: predicate pruning for shardId 270012 +DEBUG: building index "pg_toast_17247_index" on table "pg_toast_17247" + QUERY PLAN +------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- + Distributed Query into pg_merge_job_270015 + Executor: Real-Time + Task Count: 2 + Tasks Shown: One of 2 + -> Task + Node: host=localhost port=57637 dbname=regression + -> HashAggregate (cost=91.94..91.96 rows=2 width=64) + Group Key: COALESCE(('Has done paying'::text), 'Has not done paying'::text) + -> GroupAggregate (cost=91.85..91.90 rows=2 width=88) + Group Key: ((users.composite_id).tenant_id), ((users.composite_id).user_id), ('Has done paying'::text) + -> Sort (cost=91.85..91.85 rows=2 width=88) + Sort Key: ((users.composite_id).tenant_id), ((users.composite_id).user_id), ('Has done paying'::text) + -> Merge Left Join (cost=91.75..91.84 rows=2 width=88) + Merge Cond: ((((users.composite_id).tenant_id) = ((events_2.composite_id).tenant_id)) AND (((users.composite_id).user_id) = ((events_2.composite_id).user_id))) + -> Unique (cost=79.46..79.48 rows=2 width=40) + -> Sort (cost=79.46..79.47 rows=2 width=40) + Sort Key: ((users.composite_id).tenant_id), ((users.composite_id).user_id), ('action=>1'::text), events.event_time + -> Append (cost=0.00..79.45 rows=2 width=40) + -> Nested Loop (cost=0.00..39.72 rows=1 width=40) + Join Filter: (((users.composite_id).tenant_id = (events.composite_id).tenant_id) AND ((users.composite_id).user_id = (events.composite_id).user_id)) + -> Seq Scan on events_270009 events (cost=0.00..11.62 rows=1 width=40) + Filter: ((event_type)::text = 'click'::text) + -> Seq Scan on users_270013 users (cost=0.00..28.00 rows=6 width=32) + Filter: ((composite_id >= '(1,-9223372036854775808)'::user_composite_type) AND (composite_id <= '(1,9223372036854775807)'::user_composite_type)) + -> Nested Loop (cost=0.00..39.72 rows=1 width=40) + Join Filter: (((users_1.composite_id).tenant_id = (events_1.composite_id).tenant_id) AND ((users_1.composite_id).user_id = (events_1.composite_id).user_id)) + -> Seq Scan on events_270009 events_1 (cost=0.00..11.62 rows=1 width=40) + Filter: ((event_type)::text = 'submit'::text) + -> Seq Scan on users_270013 users_1 (cost=0.00..28.00 rows=6 width=32) + Filter: ((composite_id >= '(1,-9223372036854775808)'::user_composite_type) AND (composite_id <= '(1,9223372036854775807)'::user_composite_type)) + -> Materialize (cost=12.29..12.31 rows=1 width=48) + -> Unique (cost=12.29..12.30 rows=1 width=32) + -> Sort (cost=12.29..12.29 rows=1 width=32) + Sort Key: ((events_2.composite_id).tenant_id), ((events_2.composite_id).user_id) + -> Seq Scan on events_270009 events_2 (cost=0.00..12.28 rows=1 width=32) + Filter: ((composite_id >= '(1,-9223372036854775808)'::user_composite_type) AND (composite_id <= '(1,9223372036854775807)'::user_composite_type) AND ((event_type)::text = 'pay'::text)) + Master Query + -> HashAggregate (cost=0.00..0.18 rows=10 width=0) + Group Key: intermediate_column_270015_2 + -> Seq Scan on pg_merge_job_270015 (cost=0.00..0.00 rows=0 width=0) +(40 rows) + +-- Union, left join and having subquery pushdown +EXPLAIN SELECT + avg(array_length(events, 1)) AS event_average, + count_pay + FROM ( + SELECT + subquery_1.tenant_id, + subquery_1.user_id, + array_agg(event ORDER BY event_time) AS events, + COALESCE(count_pay, 0) AS count_pay + FROM + ( + (SELECT + (users.composite_id).tenant_id, + (users.composite_id).user_id, + 'action=>1'AS event, + events.event_time + FROM + users, + events + WHERE + (users.composite_id).tenant_id = (events.composite_id).tenant_id AND + (users.composite_id).user_id = (events.composite_id).user_id AND + users.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND + users.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND + event_type = 'click') + UNION + (SELECT + (users.composite_id).tenant_id, + (users.composite_id).user_id, + 'action=>2'AS event, + events.event_time + FROM + users, + events + WHERE + (users.composite_id).tenant_id = (events.composite_id).tenant_id AND + (users.composite_id).user_id = (events.composite_id).user_id AND + users.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND + users.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND + event_type = 'submit') + ) AS subquery_1 + LEFT JOIN + (SELECT + (composite_id).tenant_id, + (composite_id).user_id, + COUNT(*) AS count_pay + FROM + events + WHERE + events.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND + events.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND + event_type = 'pay' + GROUP BY + tenant_id, + user_id + HAVING + COUNT(*) > 2) AS subquery_2 + ON + subquery_1.tenant_id = subquery_2.tenant_id AND + subquery_1.user_id = subquery_2.user_id + GROUP BY + subquery_1.tenant_id, + subquery_1.user_id, + count_pay) AS subquery_top +WHERE + array_ndims(events) > 0 +GROUP BY + count_pay +ORDER BY + count_pay; +DEBUG: predicate pruning for shardId 270015 +DEBUG: predicate pruning for shardId 270016 +DEBUG: predicate pruning for shardId 270011 +DEBUG: predicate pruning for shardId 270012 +DEBUG: predicate pruning for shardId 270015 +DEBUG: predicate pruning for shardId 270016 +DEBUG: predicate pruning for shardId 270011 +DEBUG: predicate pruning for shardId 270012 +DEBUG: predicate pruning for shardId 270011 +DEBUG: predicate pruning for shardId 270012 +ERROR: bogus varattno for OUTER_VAR var: 3 +-- Lateral join subquery pushdown +EXPLAIN SELECT + tenant_id, + user_id, + user_lastseen, + event_array +FROM + (SELECT + tenant_id, + user_id, + max(lastseen) as user_lastseen, + array_agg(event_type ORDER BY event_time) AS event_array + FROM + (SELECT + (composite_id).tenant_id, + (composite_id).user_id, + lastseen + FROM + users + WHERE + composite_id >= '(1, -9223372036854775808)'::user_composite_type AND + composite_id <= '(1, 9223372036854775807)'::user_composite_type + ORDER BY + lastseen DESC + LIMIT + 10 + ) AS subquery_top + LEFT JOIN LATERAL + (SELECT + event_type, + event_time + FROM + events + WHERE + (composite_id).tenant_id = subquery_top.tenant_id AND + (composite_id).user_id = subquery_top.user_id + ORDER BY + event_time DESC + LIMIT + 99) AS subquery_lateral + ON + true + GROUP BY + tenant_id, + user_id + ) AS shard_union +ORDER BY + user_lastseen DESC +LIMIT + 10; +DEBUG: push down of limit count: 10 +DEBUG: predicate pruning for shardId 270015 +DEBUG: predicate pruning for shardId 270016 +DEBUG: predicate pruning for shardId 270011 +DEBUG: predicate pruning for shardId 270012 +DEBUG: building index "pg_toast_17256_index" on table "pg_toast_17256" + QUERY PLAN +---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- + Distributed Query into pg_merge_job_270017 + Executor: Real-Time + Task Count: 2 + Tasks Shown: One of 2 + -> Task + Node: host=localhost port=57637 dbname=regression + -> Limit (cost=100.43..100.44 rows=6 width=56) + -> Sort (cost=100.43..100.44 rows=6 width=56) + Sort Key: (max(users.lastseen)) DESC + -> GroupAggregate (cost=100.14..100.29 rows=6 width=548) + Group Key: ((users.composite_id).tenant_id), ((users.composite_id).user_id) + -> Sort (cost=100.14..100.16 rows=6 width=548) + Sort Key: ((users.composite_id).tenant_id), ((users.composite_id).user_id) + -> Nested Loop Left Join (cost=40.04..100.06 rows=6 width=548) + -> Limit (cost=28.08..28.09 rows=6 width=40) + -> Sort (cost=28.08..28.09 rows=6 width=40) + Sort Key: users.lastseen DESC + -> Seq Scan on users_270013 users (cost=0.00..28.00 rows=6 width=40) + Filter: ((composite_id >= '(1,-9223372036854775808)'::user_composite_type) AND (composite_id <= '(1,9223372036854775807)'::user_composite_type)) + -> Limit (cost=11.96..11.96 rows=1 width=524) + -> Sort (cost=11.96..11.96 rows=1 width=524) + Sort Key: events.event_time DESC + -> Seq Scan on events_270009 events (cost=0.00..11.95 rows=1 width=524) + Filter: (((composite_id).tenant_id = ((users.composite_id).tenant_id)) AND ((composite_id).user_id = ((users.composite_id).user_id))) + Master Query + -> Limit (cost=0.01..0.02 rows=0 width=0) + -> Sort (cost=0.01..0.02 rows=0 width=0) + Sort Key: intermediate_column_270017_2 DESC + -> Seq Scan on pg_merge_job_270017 (cost=0.00..0.00 rows=0 width=0) +(29 rows) + +SET citusdb.task_executor_type TO 'real-time'; +SET client_min_messages TO NOTICE;