diff --git a/src/test/regress/expected/multi_explain_0.out b/src/test/regress/expected/multi_explain_0.out
new file mode 100644
index 000000000..cc26c6c15
--- /dev/null
+++ b/src/test/regress/expected/multi_explain_0.out
@@ -0,0 +1,566 @@
+--
+-- MULTI_EXPLAIN
+--
+ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 570000;
+ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 570000;
+\a\t
+SET citus.task_executor_type TO 'real-time';
+SET citus.explain_distributed_queries TO on;
+-- Function that parses explain output as JSON
+CREATE FUNCTION explain_json(query text)
+RETURNS jsonb
+AS $BODY$
+DECLARE
+ result jsonb;
+BEGIN
+ EXECUTE format('EXPLAIN (FORMAT JSON) %s', query) INTO result;
+ RETURN result;
+END;
+$BODY$ LANGUAGE plpgsql;
+-- Function that parses explain output as XML
+CREATE FUNCTION explain_xml(query text)
+RETURNS xml
+AS $BODY$
+DECLARE
+ result xml;
+BEGIN
+ EXECUTE format('EXPLAIN (FORMAT XML) %s', query) INTO result;
+ RETURN result;
+END;
+$BODY$ LANGUAGE plpgsql;
+-- Test Text format
+EXPLAIN (COSTS FALSE, FORMAT TEXT)
+ SELECT l_quantity, count(*) count_quantity FROM lineitem
+ GROUP BY l_quantity ORDER BY count_quantity, l_quantity;
+Distributed Query into pg_merge_job_570000
+ Executor: Real-Time
+ Task Count: 8
+ Tasks Shown: One of 8
+ -> Task
+ Node: host=localhost port=57637 dbname=regression
+ -> HashAggregate
+ Group Key: l_quantity
+ -> Seq Scan on lineitem_290001 lineitem
+Master Query
+ -> Sort
+ Sort Key: COALESCE((sum((COALESCE((sum(intermediate_column_570000_1))::bigint, '0'::bigint))))::bigint, '0'::bigint), intermediate_column_570000_0
+ -> HashAggregate
+ Group Key: intermediate_column_570000_0
+ -> Seq Scan on pg_merge_job_570000
+-- Test JSON format
+EXPLAIN (COSTS FALSE, FORMAT JSON)
+ SELECT l_quantity, count(*) count_quantity FROM lineitem
+ GROUP BY l_quantity ORDER BY count_quantity, l_quantity;
+[
+ {
+ "Executor": "Real-Time",
+ "Job": {
+ "Task Count": 8,
+ "Tasks Shown": "One of 8",
+ "Tasks": [
+ {
+ "Node": "host=localhost port=57637 dbname=regression",
+ "Remote Plan": [
+ [
+ {
+ "Plan": {
+ "Node Type": "Aggregate",
+ "Strategy": "Hashed",
+ "Group Key": ["l_quantity"],
+ "Plans": [
+ {
+ "Node Type": "Seq Scan",
+ "Parent Relationship": "Outer",
+ "Relation Name": "lineitem_290001",
+ "Alias": "lineitem"
+ }
+ ]
+ }
+ }
+ ]
+
+ ]
+ }
+ ]
+ },
+ "Master Query": [
+ {
+ "Plan": {
+ "Node Type": "Sort",
+ "Sort Key": ["COALESCE((sum((COALESCE((sum(intermediate_column_570001_1))::bigint, '0'::bigint))))::bigint, '0'::bigint)", "intermediate_column_570001_0"],
+ "Plans": [
+ {
+ "Node Type": "Aggregate",
+ "Strategy": "Hashed",
+ "Parent Relationship": "Outer",
+ "Group Key": ["intermediate_column_570001_0"],
+ "Plans": [
+ {
+ "Node Type": "Seq Scan",
+ "Parent Relationship": "Outer",
+ "Relation Name": "pg_merge_job_570001",
+ "Alias": "pg_merge_job_570001"
+ }
+ ]
+ }
+ ]
+ }
+ }
+ ]
+ }
+]
+-- Validate JSON format
+SELECT true AS valid FROM explain_json($$
+ SELECT l_quantity, count(*) count_quantity FROM lineitem
+ GROUP BY l_quantity ORDER BY count_quantity, l_quantity$$);
+t
+-- Test XML format
+EXPLAIN (COSTS FALSE, FORMAT XML)
+ SELECT l_quantity, count(*) count_quantity FROM lineitem
+ GROUP BY l_quantity ORDER BY count_quantity, l_quantity;
+
+
+ Real-Time
+
+ 8
+ One of 8
+
+
+ host=localhost port=57637 dbname=regression
+
+
+
+
+ Aggregate
+ Hashed
+
+ - l_quantity
+
+
+
+ Seq Scan
+ Outer
+ lineitem_290001
+ lineitem
+
+
+
+
+
+
+
+
+
+
+
+
+ Sort
+
+ - COALESCE((sum((COALESCE((sum(intermediate_column_570003_1))::bigint, '0'::bigint))))::bigint, '0'::bigint)
+ - intermediate_column_570003_0
+
+
+
+ Aggregate
+ Hashed
+ Outer
+
+ - intermediate_column_570003_0
+
+
+
+ Seq Scan
+ Outer
+ pg_merge_job_570003
+ pg_merge_job_570003
+
+
+
+
+
+
+
+
+
+-- Validate XML format
+SELECT true AS valid FROM explain_xml($$
+ SELECT l_quantity, count(*) count_quantity FROM lineitem
+ GROUP BY l_quantity ORDER BY count_quantity, l_quantity$$);
+t
+-- Test YAML format
+EXPLAIN (COSTS FALSE, FORMAT YAML)
+ SELECT l_quantity, count(*) count_quantity FROM lineitem
+ GROUP BY l_quantity ORDER BY count_quantity, l_quantity;
+- Executor: "Real-Time"
+ Job:
+ Task Count: 8
+ Tasks Shown: "One of 8"
+ Tasks:
+ - Node: "host=localhost port=57637 dbname=regression"
+ Remote Plan:
+ - Plan:
+ Node Type: "Aggregate"
+ Strategy: "Hashed"
+ Group Key:
+ - "l_quantity"
+ Plans:
+ - Node Type: "Seq Scan"
+ Parent Relationship: "Outer"
+ Relation Name: "lineitem_290001"
+ Alias: "lineitem"
+
+ Master Query:
+ - Plan:
+ Node Type: "Sort"
+ Sort Key:
+ - "COALESCE((sum((COALESCE((sum(intermediate_column_570005_1))::bigint, '0'::bigint))))::bigint, '0'::bigint)"
+ - "intermediate_column_570005_0"
+ Plans:
+ - Node Type: "Aggregate"
+ Strategy: "Hashed"
+ Parent Relationship: "Outer"
+ Group Key:
+ - "intermediate_column_570005_0"
+ Plans:
+ - Node Type: "Seq Scan"
+ Parent Relationship: "Outer"
+ Relation Name: "pg_merge_job_570005"
+ Alias: "pg_merge_job_570005"
+-- Test Text format
+EXPLAIN (COSTS FALSE, FORMAT TEXT)
+ SELECT l_quantity, count(*) count_quantity FROM lineitem
+ GROUP BY l_quantity ORDER BY count_quantity, l_quantity;
+Distributed Query into pg_merge_job_570006
+ Executor: Real-Time
+ Task Count: 8
+ Tasks Shown: One of 8
+ -> Task
+ Node: host=localhost port=57637 dbname=regression
+ -> HashAggregate
+ Group Key: l_quantity
+ -> Seq Scan on lineitem_290001 lineitem
+Master Query
+ -> Sort
+ Sort Key: COALESCE((sum((COALESCE((sum(intermediate_column_570006_1))::bigint, '0'::bigint))))::bigint, '0'::bigint), intermediate_column_570006_0
+ -> HashAggregate
+ Group Key: intermediate_column_570006_0
+ -> Seq Scan on pg_merge_job_570006
+-- Test verbose
+EXPLAIN (COSTS FALSE, VERBOSE TRUE)
+ SELECT sum(l_quantity) / avg(l_quantity) FROM lineitem;
+Distributed Query into pg_merge_job_570007
+ Executor: Real-Time
+ Task Count: 8
+ Tasks Shown: One of 8
+ -> Task
+ Node: host=localhost port=57637 dbname=regression
+ -> Aggregate
+ Output: sum(l_quantity), sum(l_quantity), count(l_quantity)
+ -> Seq Scan on public.lineitem_290001 lineitem
+ Output: l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment
+Master Query
+ -> Aggregate
+ Output: (sum(intermediate_column_570007_0) / (sum(intermediate_column_570007_1) / sum(intermediate_column_570007_2)))
+ -> Seq Scan on pg_temp_2.pg_merge_job_570007
+ Output: intermediate_column_570007_0, intermediate_column_570007_1, intermediate_column_570007_2
+-- Test join
+EXPLAIN (COSTS FALSE)
+ SELECT * FROM lineitem
+ JOIN orders ON l_orderkey = o_orderkey AND l_quantity < 5.0
+ ORDER BY l_quantity LIMIT 10;
+Distributed Query into pg_merge_job_570008
+ Executor: Real-Time
+ Task Count: 8
+ Tasks Shown: One of 8
+ -> Task
+ Node: host=localhost port=57637 dbname=regression
+ -> Limit
+ -> Sort
+ Sort Key: lineitem.l_quantity
+ -> Hash Join
+ Hash Cond: (lineitem.l_orderkey = orders.o_orderkey)
+ -> Seq Scan on lineitem_290001 lineitem
+ Filter: (l_quantity < 5.0)
+ -> Hash
+ -> Seq Scan on orders_290008 orders
+Master Query
+ -> Limit
+ -> Sort
+ Sort Key: intermediate_column_570008_4
+ -> Seq Scan on pg_merge_job_570008
+-- Test insert
+EXPLAIN (COSTS FALSE)
+ INSERT INTO lineitem VALUES(1,0);
+Distributed Query
+ Executor: Router
+ Task Count: 1
+ Tasks Shown: All
+ -> Task
+ Node: host=localhost port=57638 dbname=regression
+ -> Insert on lineitem_290000
+ -> Result
+-- Test update
+EXPLAIN (COSTS FALSE)
+ UPDATE lineitem
+ SET l_suppkey = 12
+ WHERE l_orderkey = 1 AND l_partkey = 0;
+Distributed Query
+ Executor: Router
+ Task Count: 1
+ Tasks Shown: All
+ -> Task
+ Node: host=localhost port=57638 dbname=regression
+ -> Update on lineitem_290000
+ -> Bitmap Heap Scan on lineitem_290000
+ Recheck Cond: (l_orderkey = 1)
+ Filter: (l_partkey = 0)
+ -> Bitmap Index Scan on lineitem_pkey_290000
+ Index Cond: (l_orderkey = 1)
+-- Test delete
+EXPLAIN (COSTS FALSE)
+ DELETE FROM lineitem
+ WHERE l_orderkey = 1 AND l_partkey = 0;
+Distributed Query
+ Executor: Router
+ Task Count: 1
+ Tasks Shown: All
+ -> Task
+ Node: host=localhost port=57638 dbname=regression
+ -> Delete on lineitem_290000
+ -> Bitmap Heap Scan on lineitem_290000
+ Recheck Cond: (l_orderkey = 1)
+ Filter: (l_partkey = 0)
+ -> Bitmap Index Scan on lineitem_pkey_290000
+ Index Cond: (l_orderkey = 1)
+-- Test single-shard SELECT
+EXPLAIN (COSTS FALSE)
+ SELECT l_quantity FROM lineitem WHERE l_orderkey = 5;
+Distributed Query into pg_merge_job_570009
+ Executor: Router
+ Task Count: 1
+ Tasks Shown: All
+ -> Task
+ Node: host=localhost port=57637 dbname=regression
+ -> Bitmap Heap Scan on lineitem_290000 lineitem
+ Recheck Cond: (l_orderkey = 5)
+ -> Bitmap Index Scan on lineitem_pkey_290000
+ Index Cond: (l_orderkey = 5)
+SELECT true AS valid FROM explain_xml($$
+ SELECT l_quantity FROM lineitem WHERE l_orderkey = 5$$);
+t
+SELECT true AS valid FROM explain_json($$
+ SELECT l_quantity FROM lineitem WHERE l_orderkey = 5$$);
+t
+-- Test CREATE TABLE ... AS
+EXPLAIN (COSTS FALSE)
+ CREATE TABLE explain_result AS
+ SELECT * FROM lineitem;
+Distributed Query into pg_merge_job_570012
+ Executor: Real-Time
+ Task Count: 8
+ Tasks Shown: One of 8
+ -> Task
+ Node: host=localhost port=57637 dbname=regression
+ -> Seq Scan on lineitem_290001 lineitem
+Master Query
+ -> Seq Scan on pg_merge_job_570012
+-- Test all tasks output
+SET citus.explain_all_tasks TO on;
+EXPLAIN (COSTS FALSE)
+ SELECT avg(l_linenumber) FROM lineitem WHERE l_orderkey > 9030;
+Distributed Query into pg_merge_job_570013
+ Executor: Real-Time
+ Task Count: 4
+ Tasks Shown: All
+ -> Task
+ Node: host=localhost port=57637 dbname=regression
+ -> Aggregate
+ -> Seq Scan on lineitem_290005 lineitem
+ Filter: (l_orderkey > 9030)
+ -> Task
+ Node: host=localhost port=57638 dbname=regression
+ -> Aggregate
+ -> Seq Scan on lineitem_290004 lineitem
+ Filter: (l_orderkey > 9030)
+ -> Task
+ Node: host=localhost port=57637 dbname=regression
+ -> Aggregate
+ -> Seq Scan on lineitem_290007 lineitem
+ Filter: (l_orderkey > 9030)
+ -> Task
+ Node: host=localhost port=57638 dbname=regression
+ -> Aggregate
+ -> Seq Scan on lineitem_290006 lineitem
+ Filter: (l_orderkey > 9030)
+Master Query
+ -> Aggregate
+ -> Seq Scan on pg_merge_job_570013
+SELECT true AS valid FROM explain_xml($$
+ SELECT avg(l_linenumber) FROM lineitem WHERE l_orderkey > 9030$$);
+t
+SELECT true AS valid FROM explain_json($$
+ SELECT avg(l_linenumber) FROM lineitem WHERE l_orderkey > 9030$$);
+t
+-- Test track tracker
+SET citus.task_executor_type TO 'task-tracker';
+SET citus.explain_all_tasks TO off;
+EXPLAIN (COSTS FALSE)
+ SELECT avg(l_linenumber) FROM lineitem WHERE l_orderkey > 9030;
+Distributed Query into pg_merge_job_570016
+ Executor: Task-Tracker
+ Task Count: 4
+ Tasks Shown: One of 4
+ -> Task
+ Node: host=localhost port=57637 dbname=regression
+ -> Aggregate
+ -> Seq Scan on lineitem_290005 lineitem
+ Filter: (l_orderkey > 9030)
+Master Query
+ -> Aggregate
+ -> Seq Scan on pg_merge_job_570016
+-- Test re-partition join
+SET citus.large_table_shard_count TO 1;
+EXPLAIN (COSTS FALSE)
+ SELECT count(*)
+ FROM lineitem, orders, customer, supplier
+ WHERE l_orderkey = o_orderkey
+ AND o_custkey = c_custkey
+ AND l_suppkey = s_suppkey;
+Distributed Query into pg_merge_job_570019
+ Executor: Task-Tracker
+ Task Count: 1
+ Tasks Shown: None, not supported for re-partition queries
+ -> MapMergeJob
+ Map Task Count: 1
+ Merge Task Count: 1
+ -> MapMergeJob
+ Map Task Count: 8
+ Merge Task Count: 1
+Master Query
+ -> Aggregate
+ -> Seq Scan on pg_merge_job_570019
+EXPLAIN (COSTS FALSE, FORMAT JSON)
+ SELECT count(*)
+ FROM lineitem, orders, customer, supplier
+ WHERE l_orderkey = o_orderkey
+ AND o_custkey = c_custkey
+ AND l_suppkey = s_suppkey;
+[
+ {
+ "Executor": "Task-Tracker",
+ "Job": {
+ "Task Count": 1,
+ "Tasks Shown": "None, not supported for re-partition queries",
+ "Depended Jobs": [
+ {
+ "Map Task Count": 1,
+ "Merge Task Count": 1,
+ "Depended Jobs": [
+ {
+ "Map Task Count": 8,
+ "Merge Task Count": 1
+ }
+ ]
+ }
+ ]
+ },
+ "Master Query": [
+ {
+ "Plan": {
+ "Node Type": "Aggregate",
+ "Strategy": "Plain",
+ "Plans": [
+ {
+ "Node Type": "Seq Scan",
+ "Parent Relationship": "Outer",
+ "Relation Name": "pg_merge_job_570022",
+ "Alias": "pg_merge_job_570022"
+ }
+ ]
+ }
+ }
+ ]
+ }
+]
+SELECT true AS valid FROM explain_json($$
+ SELECT count(*)
+ FROM lineitem, orders, customer, supplier
+ WHERE l_orderkey = o_orderkey
+ AND o_custkey = c_custkey
+ AND l_suppkey = s_suppkey$$);
+t
+EXPLAIN (COSTS FALSE, FORMAT XML)
+ SELECT count(*)
+ FROM lineitem, orders, customer, supplier
+ WHERE l_orderkey = o_orderkey
+ AND o_custkey = c_custkey
+ AND l_suppkey = s_suppkey;
+
+
+ Task-Tracker
+
+ 1
+ None, not supported for re-partition queries
+
+
+ 1
+ 1
+
+
+ 8
+ 1
+
+
+
+
+
+
+
+
+ Aggregate
+ Plain
+
+
+ Seq Scan
+ Outer
+ pg_merge_job_570028
+ pg_merge_job_570028
+
+
+
+
+
+
+
+SELECT true AS valid FROM explain_xml($$
+ SELECT count(*)
+ FROM lineitem, orders, customer, supplier
+ WHERE l_orderkey = o_orderkey
+ AND o_custkey = c_custkey
+ AND l_suppkey = s_suppkey$$);
+t
+EXPLAIN (COSTS FALSE, FORMAT YAML)
+ SELECT count(*)
+ FROM lineitem, orders, customer, supplier
+ WHERE l_orderkey = o_orderkey
+ AND o_custkey = c_custkey
+ AND l_suppkey = s_suppkey;
+- Executor: "Task-Tracker"
+ Job:
+ Task Count: 1
+ Tasks Shown: "None, not supported for re-partition queries"
+ Depended Jobs:
+ - Map Task Count: 1
+ Merge Task Count: 1
+ Depended Jobs:
+ - Map Task Count: 8
+ Merge Task Count: 1
+ Master Query:
+ - Plan:
+ Node Type: "Aggregate"
+ Strategy: "Plain"
+ Plans:
+ - Node Type: "Seq Scan"
+ Parent Relationship: "Outer"
+ Relation Name: "pg_merge_job_570034"
+ Alias: "pg_merge_job_570034"
diff --git a/src/test/regress/output/multi_subquery_0.source b/src/test/regress/output/multi_subquery_0.source
new file mode 100644
index 000000000..a5367708b
--- /dev/null
+++ b/src/test/regress/output/multi_subquery_0.source
@@ -0,0 +1,1048 @@
+--
+-- MULTI_SUBQUERY
+--
+ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 270000;
+ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 270000;
+-- Create tables for subquery tests
+CREATE TABLE lineitem_subquery (
+ l_orderkey bigint not null,
+ l_partkey integer not null,
+ l_suppkey integer not null,
+ l_linenumber integer not null,
+ l_quantity decimal(15, 2) not null,
+ l_extendedprice decimal(15, 2) not null,
+ l_discount decimal(15, 2) not null,
+ l_tax decimal(15, 2) not null,
+ l_returnflag char(1) not null,
+ l_linestatus char(1) not null,
+ l_shipdate date not null,
+ l_commitdate date not null,
+ l_receiptdate date not null,
+ l_shipinstruct char(25) not null,
+ l_shipmode char(10) not null,
+ l_comment varchar(44) not null,
+ PRIMARY KEY(l_orderkey, l_linenumber) );
+SELECT master_create_distributed_table('lineitem_subquery', 'l_orderkey', 'range');
+ master_create_distributed_table
+---------------------------------
+
+(1 row)
+
+CREATE TABLE orders_subquery (
+ o_orderkey bigint not null,
+ o_custkey integer not null,
+ o_orderstatus char(1) not null,
+ o_totalprice decimal(15,2) not null,
+ o_orderdate date not null,
+ o_orderpriority char(15) not null,
+ o_clerk char(15) not null,
+ o_shippriority integer not null,
+ o_comment varchar(79) not null,
+ PRIMARY KEY(o_orderkey) );
+SELECT master_create_distributed_table('orders_subquery', 'o_orderkey', 'range');
+ master_create_distributed_table
+---------------------------------
+
+(1 row)
+
+SET citus.task_executor_type TO 'task-tracker';
+-- Check that we don't allow subquery pushdown in default settings.
+SELECT
+ avg(unit_price)
+FROM
+ (SELECT
+ l_orderkey,
+ avg(o_totalprice) AS unit_price
+ FROM
+ lineitem_subquery,
+ orders_subquery
+ WHERE
+ l_orderkey = o_orderkey
+ GROUP BY
+ l_orderkey) AS unit_prices;
+ERROR: cannot perform distributed planning on this query
+DETAIL: Join in subqueries is not supported yet
+SET citus.subquery_pushdown to TRUE;
+-- Check that we don't crash if there are not any shards.
+SELECT
+ avg(unit_price)
+FROM
+ (SELECT
+ l_orderkey,
+ avg(o_totalprice) AS unit_price
+ FROM
+ lineitem_subquery,
+ orders_subquery
+ WHERE
+ l_orderkey = o_orderkey
+ GROUP BY
+ l_orderkey) AS unit_prices;
+ avg
+-----
+
+(1 row)
+
+-- Load data into tables.
+SELECT master_create_empty_shard('lineitem_subquery') AS new_shard_id
+\gset
+UPDATE pg_dist_shard SET shardminvalue = 1, shardmaxvalue = 5986
+WHERE shardid = :new_shard_id;
+SELECT master_create_empty_shard('lineitem_subquery') AS new_shard_id
+\gset
+UPDATE pg_dist_shard SET shardminvalue = 8997, shardmaxvalue = 14947
+WHERE shardid = :new_shard_id;
+SELECT master_create_empty_shard('orders_subquery') AS new_shard_id
+\gset
+UPDATE pg_dist_shard SET shardminvalue = 1, shardmaxvalue = 5986
+WHERE shardid = :new_shard_id;
+SELECT master_create_empty_shard('orders_subquery') AS new_shard_id
+\gset
+UPDATE pg_dist_shard SET shardminvalue = 8997, shardmaxvalue = 14946
+WHERE shardid = :new_shard_id;
+SET citus.shard_max_size TO "1MB";
+\copy lineitem_subquery FROM '@abs_srcdir@/data/lineitem.1.data' with delimiter '|'
+\copy lineitem_subquery FROM '@abs_srcdir@/data/lineitem.2.data' with delimiter '|'
+\copy orders_subquery FROM '@abs_srcdir@/data/orders.1.data' with delimiter '|'
+\copy orders_subquery FROM '@abs_srcdir@/data/orders.2.data' with delimiter '|'
+-- Check that we error out if shard min/max values are not exactly same.
+SELECT
+ avg(unit_price)
+FROM
+ (SELECT
+ l_orderkey,
+ avg(o_totalprice) AS unit_price
+ FROM
+ lineitem_subquery,
+ orders_subquery
+ WHERE
+ l_orderkey = o_orderkey
+ GROUP BY
+ l_orderkey) AS unit_prices;
+ERROR: cannot push down this subquery
+DETAIL: Shards of relations in subquery need to have 1-to-1 shard partitioning
+-- Update metadata in order to make all shards equal.
+UPDATE pg_dist_shard SET shardmaxvalue = '14947' WHERE shardid = 270003;
+-- If group by is not on partition column then we error out.
+SELECT
+ avg(order_count)
+FROM
+ (SELECT
+ l_suppkey,
+ count(*) AS order_count
+ FROM
+ lineitem_subquery
+ GROUP BY
+ l_suppkey) AS order_counts;
+ERROR: cannot push down this subquery
+DETAIL: Group by list without partition column is currently unsupported
+-- Check that we error out if join is not on partition columns.
+SELECT
+ avg(unit_price)
+FROM
+ (SELECT
+ l_orderkey,
+ avg(o_totalprice / l_quantity) AS unit_price
+ FROM
+ lineitem_subquery,
+ orders_subquery
+ GROUP BY
+ l_orderkey) AS unit_prices;
+ERROR: cannot push down this subquery
+DETAIL: Relations need to be joining on partition columns
+SELECT
+ avg(unit_price)
+FROM
+ (SELECT
+ l_orderkey,
+ avg(o_totalprice / l_quantity) AS unit_price
+ FROM
+ lineitem_subquery,
+ orders_subquery
+ WHERE
+ l_orderkey = o_custkey
+ GROUP BY
+ l_orderkey) AS unit_prices;
+ERROR: cannot push down this subquery
+DETAIL: Relations need to be joining on partition columns
+-- Check that we error out if there is union all.
+SELECT count(*) FROM
+(
+ (SELECT l_orderkey FROM lineitem_subquery) UNION ALL
+ (SELECT 1::bigint)
+) b;
+ERROR: cannot perform distributed planning on this query
+DETAIL: Complex table expressions are currently unsupported
+-- Check that we error out if queries in union do not include partition columns.
+SELECT count(*) FROM
+(
+ (SELECT l_orderkey FROM lineitem_subquery) UNION
+ (SELECT l_partkey FROM lineitem_subquery)
+) b;
+ERROR: cannot push down this subquery
+DETAIL: Union clauses need to select partition columns
+-- Check that we run union queries if partition column is selected.
+SELECT count(*) FROM
+(
+ (SELECT l_orderkey FROM lineitem_subquery) UNION
+ (SELECT l_orderkey FROM lineitem_subquery)
+) b;
+ count
+-------
+ 2985
+(1 row)
+
+-- Check that we error out if the outermost query has subquery join.
+SELECT
+ avg(o_totalprice/l_quantity)
+FROM
+ (SELECT
+ l_orderkey,
+ l_quantity
+ FROM
+ lineitem_subquery
+ ORDER BY
+ l_quantity
+ LIMIT 10
+ ) lineitem_quantities
+ JOIN LATERAL
+ (SELECT
+ o_totalprice
+ FROM
+ orders_subquery
+ WHERE
+ lineitem_quantities.l_orderkey = o_orderkey) orders_price ON true;
+ERROR: cannot perform distributed planning on this query
+DETAIL: Join in subqueries is not supported yet
+-- Check that we error out if the outermost query is a distinct clause.
+SELECT
+ count(DISTINCT a)
+FROM (
+ SELECT
+ count(*) a
+ FROM
+ lineitem_subquery
+) z;
+ERROR: cannot push down this subquery
+DETAIL: distinct in the outermost query is unsupported
+-- Check supported subquery types.
+SELECT
+ o_custkey,
+ sum(order_count) as total_order_count
+FROM
+ (SELECT
+ o_orderkey,
+ o_custkey,
+ count(*) AS order_count
+ FROM
+ orders_subquery
+ WHERE
+ o_orderkey > 0 AND
+ o_orderkey < 12000
+ GROUP BY
+ o_orderkey, o_custkey) AS order_counts
+GROUP BY
+ o_custkey
+ORDER BY
+ total_order_count DESC,
+ o_custkey ASC
+LIMIT 10;
+ o_custkey | total_order_count
+-----------+-------------------
+ 1462 | 9
+ 619 | 8
+ 643 | 8
+ 1030 | 8
+ 1486 | 8
+ 79 | 7
+ 304 | 7
+ 319 | 7
+ 343 | 7
+ 448 | 7
+(10 rows)
+
+SELECT
+ avg(unit_price)
+FROM
+ (SELECT
+ l_orderkey,
+ avg(o_totalprice / l_quantity) AS unit_price
+ FROM
+ lineitem_subquery,
+ orders_subquery
+ WHERE
+ l_orderkey = o_orderkey
+ GROUP BY
+ l_orderkey) AS unit_prices
+WHERE
+ unit_price > 1000 AND
+ unit_price < 10000;
+ avg
+-----------------------
+ 4968.2889885208475549
+(1 row)
+
+-- Check that if subquery is pulled, we don't error and run query properly.
+SELECT count(*) FROM
+(
+ SELECT l_orderkey FROM (
+ (SELECT l_orderkey FROM lineitem_subquery) UNION
+ (SELECT l_orderkey FROM lineitem_subquery)
+ ) a
+ WHERE l_orderkey = 1
+) b;
+ count
+-------
+ 1
+(1 row)
+
+SELECT count(*) FROM
+(
+ SELECT * FROM (
+ (SELECT * FROM lineitem_subquery) UNION
+ (SELECT * FROM lineitem_subquery)
+ ) a
+ WHERE l_orderkey = 1
+) b;
+ count
+-------
+ 6
+(1 row)
+
+SELECT max(l_orderkey) FROM
+(
+ SELECT l_orderkey FROM (
+ SELECT
+ l_orderkey
+ FROM
+ lineitem_subquery
+ WHERE
+ l_orderkey < 20000
+ GROUP BY
+ l_orderkey
+ ) z
+) y;
+ max
+-------
+ 14947
+(1 row)
+
+-- Add one more shard to one relation, then test if we error out because of different
+-- shard counts for joining relations.
+SELECT master_create_empty_shard('orders_subquery') AS new_shard_id
+\gset
+UPDATE pg_dist_shard SET shardminvalue = 15000, shardmaxvalue = 20000
+WHERE shardid = :new_shard_id;
+SELECT
+ avg(unit_price)
+FROM
+ (SELECT
+ l_orderkey,
+ avg(o_totalprice / l_quantity) AS unit_price
+ FROM
+ lineitem_subquery,
+ orders_subquery
+ WHERE
+ l_orderkey = o_orderkey
+ GROUP BY
+ l_orderkey) AS unit_prices;
+ERROR: cannot push down this subquery
+DETAIL: Shards of relations in subquery need to have 1-to-1 shard partitioning
+-- Check that we can prune shards in subqueries with VARCHAR partition columns
+CREATE TABLE subquery_pruning_varchar_test_table
+(
+ a varchar,
+ b int
+);
+SELECT master_create_distributed_table('subquery_pruning_varchar_test_table', 'a', 'hash');
+ master_create_distributed_table
+---------------------------------
+
+(1 row)
+
+SELECT master_create_worker_shards('subquery_pruning_varchar_test_table', 4, 1);
+ master_create_worker_shards
+-----------------------------
+
+(1 row)
+
+SET citus.subquery_pushdown TO TRUE;
+SET client_min_messages TO DEBUG2;
+SELECT * FROM
+ (SELECT count(*) FROM subquery_pruning_varchar_test_table WHERE a = 'onder' GROUP BY a)
+AS foo;
+DEBUG: predicate pruning for shardId 270005
+DEBUG: predicate pruning for shardId 270006
+DEBUG: predicate pruning for shardId 270008
+ count
+-------
+(0 rows)
+
+SELECT * FROM
+ (SELECT count(*) FROM subquery_pruning_varchar_test_table WHERE 'eren' = a GROUP BY a)
+AS foo;
+DEBUG: predicate pruning for shardId 270005
+DEBUG: predicate pruning for shardId 270007
+DEBUG: predicate pruning for shardId 270008
+ count
+-------
+(0 rows)
+
+SET client_min_messages TO NOTICE;
+-- test subquery join on VARCHAR partition column
+SELECT * FROM
+ (SELECT
+ a_inner AS a
+ FROM
+ (SELECT
+ subquery_pruning_varchar_test_table.a AS a_inner
+ FROM
+ subquery_pruning_varchar_test_table
+ GROUP BY
+ subquery_pruning_varchar_test_table.a
+ HAVING
+ count(subquery_pruning_varchar_test_table.a) < 3)
+ AS f1,
+ (SELECT
+ subquery_pruning_varchar_test_table.a
+ FROM
+ subquery_pruning_varchar_test_table
+ GROUP BY
+ subquery_pruning_varchar_test_table.a
+ HAVING
+ sum(coalesce(subquery_pruning_varchar_test_table.b,0)) > 20.0)
+ AS f2
+ WHERE
+ f1.a_inner = f2.a
+ GROUP BY
+ a_inner)
+AS foo;
+ a
+---
+(0 rows)
+
+DROP TABLE subquery_pruning_varchar_test_table;
+-- Create composite type to use in subquery pushdown
+CREATE TYPE user_composite_type AS
+(
+ tenant_id BIGINT,
+ user_id BIGINT
+);
+\c - - - :worker_1_port
+CREATE TYPE user_composite_type AS
+(
+ tenant_id BIGINT,
+ user_id BIGINT
+);
+\c - - - :worker_2_port
+CREATE TYPE user_composite_type AS
+(
+ tenant_id BIGINT,
+ user_id BIGINT
+);
+\c - - - :master_port
+CREATE TABLE events (
+ composite_id user_composite_type,
+ event_id bigint,
+ event_type character varying(255),
+ event_time bigint
+);
+SELECT master_create_distributed_table('events', 'composite_id', 'range');
+ master_create_distributed_table
+---------------------------------
+
+(1 row)
+
+SELECT master_create_empty_shard('events') AS new_shard_id
+\gset
+UPDATE pg_dist_shard SET shardminvalue = '(1,1)', shardmaxvalue = '(1,2000000000)'
+WHERE shardid = :new_shard_id;
+SELECT master_create_empty_shard('events') AS new_shard_id
+\gset
+UPDATE pg_dist_shard SET shardminvalue = '(1,2000000001)', shardmaxvalue = '(1,4300000000)'
+WHERE shardid = :new_shard_id;
+SELECT master_create_empty_shard('events') AS new_shard_id
+\gset
+UPDATE pg_dist_shard SET shardminvalue = '(2,1)', shardmaxvalue = '(2,2000000000)'
+WHERE shardid = :new_shard_id;
+SELECT master_create_empty_shard('events') AS new_shard_id
+\gset
+UPDATE pg_dist_shard SET shardminvalue = '(2,2000000001)', shardmaxvalue = '(2,4300000000)'
+WHERE shardid = :new_shard_id;
+\COPY events FROM STDIN WITH CSV
+CREATE TABLE users (
+ composite_id user_composite_type,
+ lastseen bigint
+);
+SELECT master_create_distributed_table('users', 'composite_id', 'range');
+ master_create_distributed_table
+---------------------------------
+
+(1 row)
+
+SELECT master_create_empty_shard('users') AS new_shard_id
+\gset
+UPDATE pg_dist_shard SET shardminvalue = '(1,1)', shardmaxvalue = '(1,2000000000)'
+WHERE shardid = :new_shard_id;
+SELECT master_create_empty_shard('users') AS new_shard_id
+\gset
+UPDATE pg_dist_shard SET shardminvalue = '(1,2000000001)', shardmaxvalue = '(1,4300000000)'
+WHERE shardid = :new_shard_id;
+SELECT master_create_empty_shard('users') AS new_shard_id
+\gset
+UPDATE pg_dist_shard SET shardminvalue = '(2,1)', shardmaxvalue = '(2,2000000000)'
+WHERE shardid = :new_shard_id;
+SELECT master_create_empty_shard('users') AS new_shard_id
+\gset
+UPDATE pg_dist_shard SET shardminvalue = '(2,2000000001)', shardmaxvalue = '(2,4300000000)'
+WHERE shardid = :new_shard_id;
+\COPY users FROM STDIN WITH CSV
+SET citus.subquery_pushdown TO TRUE;
+-- Simple join subquery pushdown
+SELECT
+ avg(array_length(events, 1)) AS event_average
+FROM
+ (SELECT
+ tenant_id,
+ user_id,
+ array_agg(event_type ORDER BY event_time) AS events
+ FROM
+ (SELECT
+ (users.composite_id).tenant_id,
+ (users.composite_id).user_id,
+ event_type,
+ events.event_time
+ FROM
+ users,
+ events
+ WHERE
+ (users.composite_id).tenant_id = (events.composite_id).tenant_id AND
+ (users.composite_id).user_id = (events.composite_id).user_id AND
+ users.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND
+ users.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND
+ event_type IN ('click', 'submit', 'pay')) AS subquery
+ GROUP BY
+ tenant_id,
+ user_id) AS subquery;
+ event_average
+--------------------
+ 3.6666666666666667
+(1 row)
+
+-- Union and left join subquery pushdown
+SELECT
+ avg(array_length(events, 1)) AS event_average,
+ hasdone
+FROM
+ (SELECT
+ subquery_1.tenant_id,
+ subquery_1.user_id,
+ array_agg(event ORDER BY event_time) AS events,
+ COALESCE(hasdone, 'Has not done paying') AS hasdone
+ FROM
+ (
+ (SELECT
+ (users.composite_id).tenant_id,
+ (users.composite_id).user_id,
+ 'action=>1'AS event,
+ events.event_time
+ FROM
+ users,
+ events
+ WHERE
+ (users.composite_id).tenant_id = (events.composite_id).tenant_id AND
+ (users.composite_id).user_id = (events.composite_id).user_id AND
+ users.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND
+ users.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND
+ event_type = 'click')
+ UNION
+ (SELECT
+ (users.composite_id).tenant_id,
+ (users.composite_id).user_id,
+ 'action=>2'AS event,
+ events.event_time
+ FROM
+ users,
+ events
+ WHERE
+ (users.composite_id).tenant_id = (events.composite_id).tenant_id AND
+ (users.composite_id).user_id = (events.composite_id).user_id AND
+ users.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND
+ users.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND
+ event_type = 'submit')
+ ) AS subquery_1
+ LEFT JOIN
+ (SELECT
+ DISTINCT ON ((composite_id).tenant_id, (composite_id).user_id) composite_id,
+ (composite_id).tenant_id,
+ (composite_id).user_id,
+ 'Has done paying'::TEXT AS hasdone
+ FROM
+ events
+ WHERE
+ events.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND
+ events.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND
+ event_type = 'pay') AS subquery_2
+ ON
+ subquery_1.tenant_id = subquery_2.tenant_id AND
+ subquery_1.user_id = subquery_2.user_id
+ GROUP BY
+ subquery_1.tenant_id,
+ subquery_1.user_id,
+ hasdone) AS subquery_top
+GROUP BY
+ hasdone;
+ event_average | hasdone
+--------------------+---------------------
+ 4.0000000000000000 | Has not done paying
+ 2.5000000000000000 | Has done paying
+(2 rows)
+
+-- Union, left join and having subquery pushdown
+SELECT
+ avg(array_length(events, 1)) AS event_average,
+ count_pay
+ FROM (
+ SELECT
+ subquery_1.tenant_id,
+ subquery_1.user_id,
+ array_agg(event ORDER BY event_time) AS events,
+ COALESCE(count_pay, 0) AS count_pay
+ FROM
+ (
+ (SELECT
+ (users.composite_id).tenant_id,
+ (users.composite_id).user_id,
+ 'action=>1'AS event,
+ events.event_time
+ FROM
+ users,
+ events
+ WHERE
+ (users.composite_id).tenant_id = (events.composite_id).tenant_id AND
+ (users.composite_id).user_id = (events.composite_id).user_id AND
+ users.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND
+ users.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND
+ event_type = 'click')
+ UNION
+ (SELECT
+ (users.composite_id).tenant_id,
+ (users.composite_id).user_id,
+ 'action=>2'AS event,
+ events.event_time
+ FROM
+ users,
+ events
+ WHERE
+ (users.composite_id).tenant_id = (events.composite_id).tenant_id AND
+ (users.composite_id).user_id = (events.composite_id).user_id AND
+ users.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND
+ users.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND
+ event_type = 'submit')
+ ) AS subquery_1
+ LEFT JOIN
+ (SELECT
+ (composite_id).tenant_id,
+ (composite_id).user_id,
+ COUNT(*) AS count_pay
+ FROM
+ events
+ WHERE
+ events.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND
+ events.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND
+ event_type = 'pay'
+ GROUP BY
+ tenant_id,
+ user_id
+ HAVING
+ COUNT(*) > 2) AS subquery_2
+ ON
+ subquery_1.tenant_id = subquery_2.tenant_id AND
+ subquery_1.user_id = subquery_2.user_id
+ GROUP BY
+ subquery_1.tenant_id,
+ subquery_1.user_id,
+ count_pay) AS subquery_top
+WHERE
+ array_ndims(events) > 0
+GROUP BY
+ count_pay
+ORDER BY
+ count_pay;
+ event_average | count_pay
+--------------------+-----------
+ 3.0000000000000000 | 0
+(1 row)
+
+-- Lateral join subquery pushdown
+SELECT
+ tenant_id,
+ user_id,
+ user_lastseen,
+ event_array
+FROM
+ (SELECT
+ tenant_id,
+ user_id,
+ max(lastseen) as user_lastseen,
+ array_agg(event_type ORDER BY event_time) AS event_array
+ FROM
+ (SELECT
+ (composite_id).tenant_id,
+ (composite_id).user_id,
+ lastseen
+ FROM
+ users
+ WHERE
+ composite_id >= '(1, -9223372036854775808)'::user_composite_type AND
+ composite_id <= '(1, 9223372036854775807)'::user_composite_type
+ ORDER BY
+ lastseen DESC
+ LIMIT
+ 10
+ ) AS subquery_top
+ LEFT JOIN LATERAL
+ (SELECT
+ event_type,
+ event_time
+ FROM
+ events
+ WHERE
+ (composite_id).tenant_id = subquery_top.tenant_id AND
+ (composite_id).user_id = subquery_top.user_id
+ ORDER BY
+ event_time DESC
+ LIMIT
+ 99) AS subquery_lateral
+ ON
+ true
+ GROUP BY
+ tenant_id,
+ user_id
+ ) AS shard_union
+ORDER BY
+ user_lastseen DESC
+LIMIT
+ 10;
+ tenant_id | user_id | user_lastseen | event_array
+-----------+---------+---------------+----------------------------
+ 1 | 1003 | 1472807315 | {click,click,click,submit}
+ 1 | 1002 | 1472807215 | {click,click,submit,pay}
+ 1 | 1001 | 1472807115 | {click,submit,pay}
+(3 rows)
+
+-- Same queries above with explain
+-- Simple join subquery pushdown
+EXPLAIN SELECT
+ avg(array_length(events, 1)) AS event_average
+FROM
+ (SELECT
+ tenant_id,
+ user_id,
+ array_agg(event_type ORDER BY event_time) AS events
+ FROM
+ (SELECT
+ (users.composite_id).tenant_id,
+ (users.composite_id).user_id,
+ event_type,
+ events.event_time
+ FROM
+ users,
+ events
+ WHERE
+ (users.composite_id).tenant_id = (events.composite_id).tenant_id AND
+ (users.composite_id).user_id = (events.composite_id).user_id AND
+ users.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND
+ users.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND
+ event_type IN ('click', 'submit', 'pay')) AS subquery
+ GROUP BY
+ tenant_id,
+ user_id) AS subquery;
+ QUERY PLAN
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
+ Distributed Query into pg_merge_job_270014
+ Executor: Real-Time
+ Task Count: 2
+ Tasks Shown: One of 2
+ -> Task
+ Node: host=localhost port=57637 dbname=regression
+ -> Aggregate (cost=40.01..40.02 rows=1 width=32)
+ -> GroupAggregate (cost=39.89..39.99 rows=1 width=556)
+ Group Key: ((users.composite_id).tenant_id), ((users.composite_id).user_id)
+ -> Merge Join (cost=39.89..39.97 rows=1 width=556)
+ Merge Cond: ((((users.composite_id).tenant_id) = ((events.composite_id).tenant_id)) AND (((users.composite_id).user_id) = ((events.composite_id).user_id)))
+ -> Sort (cost=28.08..28.09 rows=6 width=32)
+ Sort Key: ((users.composite_id).tenant_id), ((users.composite_id).user_id)
+ -> Seq Scan on users_270013 users (cost=0.00..28.00 rows=6 width=32)
+ Filter: ((composite_id >= '(1,-9223372036854775808)'::user_composite_type) AND (composite_id <= '(1,9223372036854775807)'::user_composite_type))
+ -> Sort (cost=11.81..11.82 rows=3 width=556)
+ Sort Key: ((events.composite_id).tenant_id), ((events.composite_id).user_id)
+ -> Seq Scan on events_270009 events (cost=0.00..11.79 rows=3 width=556)
+ Filter: ((event_type)::text = ANY ('{click,submit,pay}'::text[]))
+ Master Query
+ -> Aggregate (cost=0.01..0.02 rows=1 width=0)
+ -> Seq Scan on pg_merge_job_270014 (cost=0.00..0.00 rows=0 width=0)
+(22 rows)
+
+-- Union and left join subquery pushdown
+EXPLAIN SELECT
+ avg(array_length(events, 1)) AS event_average,
+ hasdone
+FROM
+ (SELECT
+ subquery_1.tenant_id,
+ subquery_1.user_id,
+ array_agg(event ORDER BY event_time) AS events,
+ COALESCE(hasdone, 'Has not done paying') AS hasdone
+ FROM
+ (
+ (SELECT
+ (users.composite_id).tenant_id,
+ (users.composite_id).user_id,
+ 'action=>1'AS event,
+ events.event_time
+ FROM
+ users,
+ events
+ WHERE
+ (users.composite_id).tenant_id = (events.composite_id).tenant_id AND
+ (users.composite_id).user_id = (events.composite_id).user_id AND
+ users.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND
+ users.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND
+ event_type = 'click')
+ UNION
+ (SELECT
+ (users.composite_id).tenant_id,
+ (users.composite_id).user_id,
+ 'action=>2'AS event,
+ events.event_time
+ FROM
+ users,
+ events
+ WHERE
+ (users.composite_id).tenant_id = (events.composite_id).tenant_id AND
+ (users.composite_id).user_id = (events.composite_id).user_id AND
+ users.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND
+ users.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND
+ event_type = 'submit')
+ ) AS subquery_1
+ LEFT JOIN
+ (SELECT
+ DISTINCT ON ((composite_id).tenant_id, (composite_id).user_id) composite_id,
+ (composite_id).tenant_id,
+ (composite_id).user_id,
+ 'Has done paying'::TEXT AS hasdone
+ FROM
+ events
+ WHERE
+ events.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND
+ events.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND
+ event_type = 'pay') AS subquery_2
+ ON
+ subquery_1.tenant_id = subquery_2.tenant_id AND
+ subquery_1.user_id = subquery_2.user_id
+ GROUP BY
+ subquery_1.tenant_id,
+ subquery_1.user_id,
+ hasdone) AS subquery_top
+GROUP BY
+ hasdone;
+ QUERY PLAN
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
+ Distributed Query into pg_merge_job_270015
+ Executor: Real-Time
+ Task Count: 2
+ Tasks Shown: One of 2
+ -> Task
+ Node: host=localhost port=57637 dbname=regression
+ -> HashAggregate (cost=91.94..91.96 rows=2 width=64)
+ Group Key: COALESCE(('Has done paying'::text), 'Has not done paying'::text)
+ -> GroupAggregate (cost=91.85..91.90 rows=2 width=88)
+ Group Key: ((users.composite_id).tenant_id), ((users.composite_id).user_id), ('Has done paying'::text)
+ -> Sort (cost=91.85..91.85 rows=2 width=88)
+ Sort Key: ((users.composite_id).tenant_id), ((users.composite_id).user_id), ('Has done paying'::text)
+ -> Merge Left Join (cost=91.75..91.84 rows=2 width=88)
+ Merge Cond: ((((users.composite_id).tenant_id) = ((events_2.composite_id).tenant_id)) AND (((users.composite_id).user_id) = ((events_2.composite_id).user_id)))
+ -> Unique (cost=79.46..79.48 rows=2 width=40)
+ -> Sort (cost=79.46..79.47 rows=2 width=40)
+ Sort Key: ((users.composite_id).tenant_id), ((users.composite_id).user_id), ('action=>1'::text), events.event_time
+ -> Append (cost=0.00..79.45 rows=2 width=40)
+ -> Nested Loop (cost=0.00..39.72 rows=1 width=40)
+ Join Filter: (((users.composite_id).tenant_id = (events.composite_id).tenant_id) AND ((users.composite_id).user_id = (events.composite_id).user_id))
+ -> Seq Scan on events_270009 events (cost=0.00..11.62 rows=1 width=40)
+ Filter: ((event_type)::text = 'click'::text)
+ -> Seq Scan on users_270013 users (cost=0.00..28.00 rows=6 width=32)
+ Filter: ((composite_id >= '(1,-9223372036854775808)'::user_composite_type) AND (composite_id <= '(1,9223372036854775807)'::user_composite_type))
+ -> Nested Loop (cost=0.00..39.72 rows=1 width=40)
+ Join Filter: (((users_1.composite_id).tenant_id = (events_1.composite_id).tenant_id) AND ((users_1.composite_id).user_id = (events_1.composite_id).user_id))
+ -> Seq Scan on events_270009 events_1 (cost=0.00..11.62 rows=1 width=40)
+ Filter: ((event_type)::text = 'submit'::text)
+ -> Seq Scan on users_270013 users_1 (cost=0.00..28.00 rows=6 width=32)
+ Filter: ((composite_id >= '(1,-9223372036854775808)'::user_composite_type) AND (composite_id <= '(1,9223372036854775807)'::user_composite_type))
+ -> Materialize (cost=12.29..12.31 rows=1 width=48)
+ -> Unique (cost=12.29..12.30 rows=1 width=32)
+ -> Sort (cost=12.29..12.29 rows=1 width=32)
+ Sort Key: ((events_2.composite_id).tenant_id), ((events_2.composite_id).user_id)
+ -> Seq Scan on events_270009 events_2 (cost=0.00..12.28 rows=1 width=32)
+ Filter: ((composite_id >= '(1,-9223372036854775808)'::user_composite_type) AND (composite_id <= '(1,9223372036854775807)'::user_composite_type) AND ((event_type)::text = 'pay'::text))
+ Master Query
+ -> HashAggregate (cost=0.00..0.18 rows=10 width=0)
+ Group Key: intermediate_column_270015_2
+ -> Seq Scan on pg_merge_job_270015 (cost=0.00..0.00 rows=0 width=0)
+(40 rows)
+
+-- Union, left join and having subquery pushdown
+EXPLAIN SELECT
+ avg(array_length(events, 1)) AS event_average,
+ count_pay
+ FROM (
+ SELECT
+ subquery_1.tenant_id,
+ subquery_1.user_id,
+ array_agg(event ORDER BY event_time) AS events,
+ COALESCE(count_pay, 0) AS count_pay
+ FROM
+ (
+ (SELECT
+ (users.composite_id).tenant_id,
+ (users.composite_id).user_id,
+ 'action=>1'AS event,
+ events.event_time
+ FROM
+ users,
+ events
+ WHERE
+ (users.composite_id).tenant_id = (events.composite_id).tenant_id AND
+ (users.composite_id).user_id = (events.composite_id).user_id AND
+ users.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND
+ users.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND
+ event_type = 'click')
+ UNION
+ (SELECT
+ (users.composite_id).tenant_id,
+ (users.composite_id).user_id,
+ 'action=>2'AS event,
+ events.event_time
+ FROM
+ users,
+ events
+ WHERE
+ (users.composite_id).tenant_id = (events.composite_id).tenant_id AND
+ (users.composite_id).user_id = (events.composite_id).user_id AND
+ users.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND
+ users.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND
+ event_type = 'submit')
+ ) AS subquery_1
+ LEFT JOIN
+ (SELECT
+ (composite_id).tenant_id,
+ (composite_id).user_id,
+ COUNT(*) AS count_pay
+ FROM
+ events
+ WHERE
+ events.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND
+ events.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND
+ event_type = 'pay'
+ GROUP BY
+ tenant_id,
+ user_id
+ HAVING
+ COUNT(*) > 2) AS subquery_2
+ ON
+ subquery_1.tenant_id = subquery_2.tenant_id AND
+ subquery_1.user_id = subquery_2.user_id
+ GROUP BY
+ subquery_1.tenant_id,
+ subquery_1.user_id,
+ count_pay) AS subquery_top
+WHERE
+ array_ndims(events) > 0
+GROUP BY
+ count_pay
+ORDER BY
+ count_pay;
+ERROR: bogus varattno for OUTER_VAR var: 3
+-- Lateral join subquery pushdown
+EXPLAIN SELECT
+ tenant_id,
+ user_id,
+ user_lastseen,
+ event_array
+FROM
+ (SELECT
+ tenant_id,
+ user_id,
+ max(lastseen) as user_lastseen,
+ array_agg(event_type ORDER BY event_time) AS event_array
+ FROM
+ (SELECT
+ (composite_id).tenant_id,
+ (composite_id).user_id,
+ lastseen
+ FROM
+ users
+ WHERE
+ composite_id >= '(1, -9223372036854775808)'::user_composite_type AND
+ composite_id <= '(1, 9223372036854775807)'::user_composite_type
+ ORDER BY
+ lastseen DESC
+ LIMIT
+ 10
+ ) AS subquery_top
+ LEFT JOIN LATERAL
+ (SELECT
+ event_type,
+ event_time
+ FROM
+ events
+ WHERE
+ (composite_id).tenant_id = subquery_top.tenant_id AND
+ (composite_id).user_id = subquery_top.user_id
+ ORDER BY
+ event_time DESC
+ LIMIT
+ 99) AS subquery_lateral
+ ON
+ true
+ GROUP BY
+ tenant_id,
+ user_id
+ ) AS shard_union
+ORDER BY
+ user_lastseen DESC
+LIMIT
+ 10;
+ QUERY PLAN
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
+ Distributed Query into pg_merge_job_270017
+ Executor: Real-Time
+ Task Count: 2
+ Tasks Shown: One of 2
+ -> Task
+ Node: host=localhost port=57637 dbname=regression
+ -> Limit (cost=100.43..100.44 rows=6 width=56)
+ -> Sort (cost=100.43..100.44 rows=6 width=56)
+ Sort Key: (max(users.lastseen)) DESC
+ -> GroupAggregate (cost=100.14..100.29 rows=6 width=548)
+ Group Key: ((users.composite_id).tenant_id), ((users.composite_id).user_id)
+ -> Sort (cost=100.14..100.16 rows=6 width=548)
+ Sort Key: ((users.composite_id).tenant_id), ((users.composite_id).user_id)
+ -> Nested Loop Left Join (cost=40.04..100.06 rows=6 width=548)
+ -> Limit (cost=28.08..28.09 rows=6 width=40)
+ -> Sort (cost=28.08..28.09 rows=6 width=40)
+ Sort Key: users.lastseen DESC
+ -> Seq Scan on users_270013 users (cost=0.00..28.00 rows=6 width=40)
+ Filter: ((composite_id >= '(1,-9223372036854775808)'::user_composite_type) AND (composite_id <= '(1,9223372036854775807)'::user_composite_type))
+ -> Limit (cost=11.96..11.96 rows=1 width=524)
+ -> Sort (cost=11.96..11.96 rows=1 width=524)
+ Sort Key: events.event_time DESC
+ -> Seq Scan on events_270009 events (cost=0.00..11.95 rows=1 width=524)
+ Filter: (((composite_id).tenant_id = ((users.composite_id).tenant_id)) AND ((composite_id).user_id = ((users.composite_id).user_id)))
+ Master Query
+ -> Limit (cost=0.01..0.02 rows=0 width=0)
+ -> Sort (cost=0.01..0.02 rows=0 width=0)
+ Sort Key: intermediate_column_270017_2 DESC
+ -> Seq Scan on pg_merge_job_270017 (cost=0.00..0.00 rows=0 width=0)
+(29 rows)
+
+SET citusdb.task_executor_type TO 'real-time';
diff --git a/src/test/regress/sql/.gitignore b/src/test/regress/sql/.gitignore
index ff5ac8138..516d9b7bb 100644
--- a/src/test/regress/sql/.gitignore
+++ b/src/test/regress/sql/.gitignore
@@ -11,5 +11,6 @@
/multi_load_large_records.sql
/multi_load_more_data.sql
/multi_subquery.sql
+/multi_subquery_0.sql
/worker_copy.sql
/multi_complex_count_distinct.sql