-- -- MULTI_EXPLAIN -- SET citus.next_shard_id TO 570000; -- print whether we're using version > 9 to make version-specific tests clear SHOW server_version \gset SELECT substring(:'server_version', '\d+')::int > 9 AS version_above_nine; version_above_nine -------------------- t (1 row) \a\t SET citus.task_executor_type TO 'real-time'; SET citus.explain_distributed_queries TO on; -- Function that parses explain output as JSON CREATE FUNCTION explain_json(query text) RETURNS jsonb AS $BODY$ DECLARE result jsonb; BEGIN EXECUTE format('EXPLAIN (FORMAT JSON) %s', query) INTO result; RETURN result; END; $BODY$ LANGUAGE plpgsql; -- Function that parses explain output as XML CREATE FUNCTION explain_xml(query text) RETURNS xml AS $BODY$ DECLARE result xml; BEGIN EXECUTE format('EXPLAIN (FORMAT XML) %s', query) INTO result; RETURN result; END; $BODY$ LANGUAGE plpgsql; -- VACUMM related tables to ensure test outputs are stable VACUUM ANALYZE lineitem; VACUUM ANALYZE orders; -- Test Text format EXPLAIN (COSTS FALSE, FORMAT TEXT) SELECT l_quantity, count(*) count_quantity FROM lineitem GROUP BY l_quantity ORDER BY count_quantity, l_quantity; Sort Sort Key: COALESCE((pg_catalog.sum((COALESCE((pg_catalog.sum(remote_scan.count_quantity))::bigint, '0'::bigint))))::bigint, '0'::bigint), remote_scan.l_quantity -> HashAggregate Group Key: remote_scan.l_quantity -> Custom Scan (Citus Real-Time) Task Count: 8 Tasks Shown: One of 8 -> Task Node: host=localhost port=57637 dbname=regression -> HashAggregate Group Key: l_quantity -> Seq Scan on lineitem_290001 lineitem -- Test disable hash aggregate SET enable_hashagg TO off; EXPLAIN (COSTS FALSE, FORMAT TEXT) SELECT l_quantity, count(*) count_quantity FROM lineitem GROUP BY l_quantity ORDER BY count_quantity, l_quantity; Sort Sort Key: COALESCE((pg_catalog.sum((COALESCE((pg_catalog.sum(remote_scan.count_quantity))::bigint, '0'::bigint))))::bigint, '0'::bigint), remote_scan.l_quantity -> GroupAggregate Group Key: remote_scan.l_quantity -> Sort Sort Key: remote_scan.l_quantity -> Custom Scan (Citus Real-Time) Task Count: 8 Tasks Shown: One of 8 -> Task Node: host=localhost port=57637 dbname=regression -> HashAggregate Group Key: l_quantity -> Seq Scan on lineitem_290001 lineitem SET enable_hashagg TO on; -- Test JSON format EXPLAIN (COSTS FALSE, FORMAT JSON) SELECT l_quantity, count(*) count_quantity FROM lineitem GROUP BY l_quantity ORDER BY count_quantity, l_quantity; [ { "Plan": { "Node Type": "Sort", "Parallel Aware": false, "Sort Key": ["COALESCE((pg_catalog.sum((COALESCE((pg_catalog.sum(remote_scan.count_quantity))::bigint, '0'::bigint))))::bigint, '0'::bigint)", "remote_scan.l_quantity"], "Plans": [ { "Node Type": "Aggregate", "Strategy": "Hashed", "Partial Mode": "Simple", "Parent Relationship": "Outer", "Parallel Aware": false, "Group Key": ["remote_scan.l_quantity"], "Plans": [ { "Node Type": "Custom Scan", "Parent Relationship": "Outer", "Custom Plan Provider": "Citus Real-Time", "Parallel Aware": false, "Distributed Query": { "Job": { "Task Count": 8, "Tasks Shown": "One of 8", "Tasks": [ { "Node": "host=localhost port=57637 dbname=regression", "Remote Plan": [ [ { "Plan": { "Node Type": "Aggregate", "Strategy": "Hashed", "Partial Mode": "Simple", "Parallel Aware": false, "Group Key": ["l_quantity"], "Plans": [ { "Node Type": "Seq Scan", "Parent Relationship": "Outer", "Parallel Aware": false, "Relation Name": "lineitem_290001", "Alias": "lineitem" } ] } } ] ] } ] } } } ] } ] } } ] -- Validate JSON format SELECT true AS valid FROM explain_json($$ SELECT l_quantity, count(*) count_quantity FROM lineitem GROUP BY l_quantity ORDER BY count_quantity, l_quantity$$); t -- Test XML format EXPLAIN (COSTS FALSE, FORMAT XML) SELECT l_quantity, count(*) count_quantity FROM lineitem GROUP BY l_quantity ORDER BY count_quantity, l_quantity; Sort false COALESCE((pg_catalog.sum((COALESCE((pg_catalog.sum(remote_scan.count_quantity))::bigint, '0'::bigint))))::bigint, '0'::bigint) remote_scan.l_quantity Aggregate Hashed Simple Outer false remote_scan.l_quantity Custom Scan Outer Citus Real-Time false 8 One of 8 host=localhost port=57637 dbname=regression Aggregate Hashed Simple false l_quantity Seq Scan Outer false lineitem_290001 lineitem -- Validate XML format SELECT true AS valid FROM explain_xml($$ SELECT l_quantity, count(*) count_quantity FROM lineitem GROUP BY l_quantity ORDER BY count_quantity, l_quantity$$); t -- Test YAML format EXPLAIN (COSTS FALSE, FORMAT YAML) SELECT l_quantity, count(*) count_quantity FROM lineitem GROUP BY l_quantity ORDER BY count_quantity, l_quantity; - Plan: Node Type: "Sort" Parallel Aware: false Sort Key: - "COALESCE((pg_catalog.sum((COALESCE((pg_catalog.sum(remote_scan.count_quantity))::bigint, '0'::bigint))))::bigint, '0'::bigint)" - "remote_scan.l_quantity" Plans: - Node Type: "Aggregate" Strategy: "Hashed" Partial Mode: "Simple" Parent Relationship: "Outer" Parallel Aware: false Group Key: - "remote_scan.l_quantity" Plans: - Node Type: "Custom Scan" Parent Relationship: "Outer" Custom Plan Provider: "Citus Real-Time" Parallel Aware: false Distributed Query: Job: Task Count: 8 Tasks Shown: "One of 8" Tasks: - Node: "host=localhost port=57637 dbname=regression" Remote Plan: - Plan: Node Type: "Aggregate" Strategy: "Hashed" Partial Mode: "Simple" Parallel Aware: false Group Key: - "l_quantity" Plans: - Node Type: "Seq Scan" Parent Relationship: "Outer" Parallel Aware: false Relation Name: "lineitem_290001" Alias: "lineitem" -- Test Text format EXPLAIN (COSTS FALSE, FORMAT TEXT) SELECT l_quantity, count(*) count_quantity FROM lineitem GROUP BY l_quantity ORDER BY count_quantity, l_quantity; Sort Sort Key: COALESCE((pg_catalog.sum((COALESCE((pg_catalog.sum(remote_scan.count_quantity))::bigint, '0'::bigint))))::bigint, '0'::bigint), remote_scan.l_quantity -> HashAggregate Group Key: remote_scan.l_quantity -> Custom Scan (Citus Real-Time) Task Count: 8 Tasks Shown: One of 8 -> Task Node: host=localhost port=57637 dbname=regression -> HashAggregate Group Key: l_quantity -> Seq Scan on lineitem_290001 lineitem -- Test verbose EXPLAIN (COSTS FALSE, VERBOSE TRUE) SELECT sum(l_quantity) / avg(l_quantity) FROM lineitem; Aggregate Output: (sum(remote_scan."?column?") / (sum(remote_scan."?column?_1") / pg_catalog.sum(remote_scan."?column?_2"))) -> Custom Scan (Citus Real-Time) Output: remote_scan."?column?", remote_scan."?column?_1", remote_scan."?column?_2" Task Count: 8 Tasks Shown: One of 8 -> Task Node: host=localhost port=57637 dbname=regression -> Aggregate Output: sum(l_quantity), sum(l_quantity), count(l_quantity) -> Seq Scan on public.lineitem_290001 lineitem Output: l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment -- Test join EXPLAIN (COSTS FALSE) SELECT * FROM lineitem JOIN orders ON l_orderkey = o_orderkey AND l_quantity < 5.0 ORDER BY l_quantity LIMIT 10; Limit -> Sort Sort Key: remote_scan.l_quantity -> Custom Scan (Citus Real-Time) Task Count: 8 Tasks Shown: One of 8 -> Task Node: host=localhost port=57637 dbname=regression -> Limit -> Sort Sort Key: lineitem.l_quantity -> Merge Join Merge Cond: (orders.o_orderkey = lineitem.l_orderkey) -> Index Scan using orders_pkey_290008 on orders_290008 orders -> Sort Sort Key: lineitem.l_orderkey -> Seq Scan on lineitem_290001 lineitem Filter: (l_quantity < 5.0) -- Test insert EXPLAIN (COSTS FALSE) INSERT INTO lineitem VALUES (1,0), (2, 0), (3, 0), (4, 0); Custom Scan (Citus Router) Task Count: 1 Tasks Shown: All -> Task Node: host=localhost port=57638 dbname=regression -> Insert on lineitem_290000 citus_table_alias -> Values Scan on "*VALUES*" -- Test update EXPLAIN (COSTS FALSE) UPDATE lineitem SET l_suppkey = 12 WHERE l_orderkey = 1 AND l_partkey = 0; Custom Scan (Citus Router) Task Count: 1 Tasks Shown: All -> Task Node: host=localhost port=57638 dbname=regression -> Update on lineitem_290000 lineitem -> Index Scan using lineitem_pkey_290000 on lineitem_290000 lineitem Index Cond: (l_orderkey = 1) Filter: (l_partkey = 0) -- Test delete EXPLAIN (COSTS FALSE) DELETE FROM lineitem WHERE l_orderkey = 1 AND l_partkey = 0; Custom Scan (Citus Router) Task Count: 1 Tasks Shown: All -> Task Node: host=localhost port=57638 dbname=regression -> Delete on lineitem_290000 lineitem -> Index Scan using lineitem_pkey_290000 on lineitem_290000 lineitem Index Cond: (l_orderkey = 1) Filter: (l_partkey = 0) -- Test zero-shard update EXPLAIN (COSTS FALSE) UPDATE lineitem SET l_suppkey = 12 WHERE l_orderkey = 1 AND l_orderkey = 0; Custom Scan (Citus Router) Task Count: 0 Tasks Shown: All -- Test zero-shard delete EXPLAIN (COSTS FALSE) DELETE FROM lineitem WHERE l_orderkey = 1 AND l_orderkey = 0; Custom Scan (Citus Router) Task Count: 0 Tasks Shown: All -- Test single-shard SELECT EXPLAIN (COSTS FALSE) SELECT l_quantity FROM lineitem WHERE l_orderkey = 5; Custom Scan (Citus Router) Task Count: 1 Tasks Shown: All -> Task Node: host=localhost port=57637 dbname=regression -> Index Scan using lineitem_pkey_290000 on lineitem_290000 lineitem Index Cond: (l_orderkey = 5) SELECT true AS valid FROM explain_xml($$ SELECT l_quantity FROM lineitem WHERE l_orderkey = 5$$); t SELECT true AS valid FROM explain_json($$ SELECT l_quantity FROM lineitem WHERE l_orderkey = 5$$); t -- Test CREATE TABLE ... AS EXPLAIN (COSTS FALSE) CREATE TABLE explain_result AS SELECT * FROM lineitem; Custom Scan (Citus Real-Time) Task Count: 8 Tasks Shown: One of 8 -> Task Node: host=localhost port=57637 dbname=regression -> Seq Scan on lineitem_290001 lineitem -- Test having EXPLAIN (COSTS FALSE, VERBOSE TRUE) SELECT sum(l_quantity) / avg(l_quantity) FROM lineitem HAVING sum(l_quantity) > 100; Aggregate Output: (sum(remote_scan."?column?") / (sum(remote_scan."?column?_1") / pg_catalog.sum(remote_scan."?column?_2"))) Filter: (sum(remote_scan.worker_column_4) > '100'::numeric) -> Custom Scan (Citus Real-Time) Output: remote_scan."?column?", remote_scan."?column?_1", remote_scan."?column?_2", remote_scan.worker_column_4 Task Count: 8 Tasks Shown: One of 8 -> Task Node: host=localhost port=57637 dbname=regression -> Aggregate Output: sum(l_quantity), sum(l_quantity), count(l_quantity), sum(l_quantity) -> Seq Scan on public.lineitem_290001 lineitem Output: l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment -- Test having without aggregate EXPLAIN (COSTS FALSE, VERBOSE TRUE) SELECT l_quantity FROM lineitem GROUP BY l_quantity HAVING l_quantity > (100 * random()); HashAggregate Output: remote_scan.l_quantity Group Key: remote_scan.l_quantity Filter: ((remote_scan.worker_column_2)::double precision > ('100'::double precision * random())) -> Custom Scan (Citus Real-Time) Output: remote_scan.l_quantity, remote_scan.worker_column_2 Task Count: 8 Tasks Shown: One of 8 -> Task Node: host=localhost port=57637 dbname=regression -> HashAggregate Output: l_quantity, l_quantity Group Key: lineitem.l_quantity -> Seq Scan on public.lineitem_290001 lineitem Output: l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment -- Subquery pushdown tests with explain EXPLAIN (COSTS OFF) SELECT avg(array_length(events, 1)) AS event_average FROM (SELECT tenant_id, user_id, array_agg(event_type ORDER BY event_time) AS events FROM (SELECT (users.composite_id).tenant_id, (users.composite_id).user_id, event_type, events.event_time FROM users, events WHERE (users.composite_id) = (events.composite_id) AND users.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND users.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND event_type IN ('click', 'submit', 'pay')) AS subquery GROUP BY tenant_id, user_id) AS subquery; Aggregate -> Custom Scan (Citus Real-Time) Task Count: 4 Tasks Shown: One of 4 -> Task Node: host=localhost port=57637 dbname=regression -> Aggregate -> GroupAggregate Group Key: ((users.composite_id).tenant_id), ((users.composite_id).user_id) -> Sort Sort Key: ((users.composite_id).tenant_id), ((users.composite_id).user_id) -> Hash Join Hash Cond: (users.composite_id = events.composite_id) -> Seq Scan on users_1400033 users Filter: ((composite_id >= '(1,-9223372036854775808)'::user_composite_type) AND (composite_id <= '(1,9223372036854775807)'::user_composite_type)) -> Hash -> Seq Scan on events_1400029 events Filter: ((event_type)::text = ANY ('{click,submit,pay}'::text[])) -- Union and left join subquery pushdown EXPLAIN (COSTS OFF) SELECT avg(array_length(events, 1)) AS event_average, hasdone FROM (SELECT subquery_1.tenant_id, subquery_1.user_id, array_agg(event ORDER BY event_time) AS events, COALESCE(hasdone, 'Has not done paying') AS hasdone FROM ( (SELECT (users.composite_id).tenant_id, (users.composite_id).user_id, (users.composite_id) as composite_id, 'action=>1'AS event, events.event_time FROM users, events WHERE (users.composite_id) = (events.composite_id) AND users.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND users.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND event_type = 'click') UNION (SELECT (users.composite_id).tenant_id, (users.composite_id).user_id, (users.composite_id) as composite_id, 'action=>2'AS event, events.event_time FROM users, events WHERE (users.composite_id) = (events.composite_id) AND users.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND users.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND event_type = 'submit') ) AS subquery_1 LEFT JOIN (SELECT DISTINCT ON ((composite_id).tenant_id, (composite_id).user_id) composite_id, (composite_id).tenant_id, (composite_id).user_id, 'Has done paying'::TEXT AS hasdone FROM events WHERE events.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND events.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND event_type = 'pay') AS subquery_2 ON subquery_1.composite_id = subquery_2.composite_id GROUP BY subquery_1.tenant_id, subquery_1.user_id, hasdone) AS subquery_top GROUP BY hasdone; HashAggregate Group Key: remote_scan.hasdone -> Custom Scan (Citus Real-Time) Task Count: 4 Tasks Shown: One of 4 -> Task Node: host=localhost port=57637 dbname=regression -> GroupAggregate Group Key: subquery_top.hasdone -> Sort Sort Key: subquery_top.hasdone -> Subquery Scan on subquery_top -> GroupAggregate Group Key: ((users.composite_id).tenant_id), ((users.composite_id).user_id), subquery_2.hasdone -> Sort Sort Key: ((users.composite_id).tenant_id), ((users.composite_id).user_id), subquery_2.hasdone -> Hash Left Join Hash Cond: (users.composite_id = subquery_2.composite_id) -> HashAggregate Group Key: ((users.composite_id).tenant_id), ((users.composite_id).user_id), users.composite_id, ('action=>1'::text), events.event_time -> Append -> Hash Join Hash Cond: (users.composite_id = events.composite_id) -> Seq Scan on users_1400033 users Filter: ((composite_id >= '(1,-9223372036854775808)'::user_composite_type) AND (composite_id <= '(1,9223372036854775807)'::user_composite_type)) -> Hash -> Seq Scan on events_1400029 events Filter: ((event_type)::text = 'click'::text) -> Hash Join Hash Cond: (users_1.composite_id = events_1.composite_id) -> Seq Scan on users_1400033 users_1 Filter: ((composite_id >= '(1,-9223372036854775808)'::user_composite_type) AND (composite_id <= '(1,9223372036854775807)'::user_composite_type)) -> Hash -> Seq Scan on events_1400029 events_1 Filter: ((event_type)::text = 'submit'::text) -> Hash -> Subquery Scan on subquery_2 -> Unique -> Sort Sort Key: ((events_2.composite_id).tenant_id), ((events_2.composite_id).user_id) -> Seq Scan on events_1400029 events_2 Filter: ((composite_id >= '(1,-9223372036854775808)'::user_composite_type) AND (composite_id <= '(1,9223372036854775807)'::user_composite_type) AND ((event_type)::text = 'pay'::text)) -- Union, left join and having subquery pushdown EXPLAIN (COSTS OFF) SELECT avg(array_length(events, 1)) AS event_average, count_pay FROM ( SELECT subquery_1.tenant_id, subquery_1.user_id, array_agg(event ORDER BY event_time) AS events, COALESCE(count_pay, 0) AS count_pay FROM ( (SELECT (users.composite_id).tenant_id, (users.composite_id).user_id, (users.composite_id), 'action=>1'AS event, events.event_time FROM users, events WHERE (users.composite_id) = (events.composite_id) AND users.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND users.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND event_type = 'click') UNION (SELECT (users.composite_id).tenant_id, (users.composite_id).user_id, (users.composite_id), 'action=>2'AS event, events.event_time FROM users, events WHERE (users.composite_id) = (events.composite_id) AND users.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND users.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND event_type = 'submit') ) AS subquery_1 LEFT JOIN (SELECT (composite_id).tenant_id, (composite_id).user_id, composite_id, COUNT(*) AS count_pay FROM events WHERE events.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND events.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND event_type = 'pay' GROUP BY composite_id HAVING COUNT(*) > 2) AS subquery_2 ON subquery_1.composite_id = subquery_2.composite_id GROUP BY subquery_1.tenant_id, subquery_1.user_id, count_pay) AS subquery_top WHERE array_ndims(events) > 0 GROUP BY count_pay ORDER BY count_pay; ERROR: bogus varattno for OUTER_VAR var: 3 -- Lateral join subquery pushdown -- set subquery_pushdown due to limit in the query SET citus.subquery_pushdown to ON; EXPLAIN (COSTS OFF) SELECT tenant_id, user_id, user_lastseen, event_array FROM (SELECT tenant_id, user_id, max(lastseen) as user_lastseen, array_agg(event_type ORDER BY event_time) AS event_array FROM (SELECT (composite_id).tenant_id, (composite_id).user_id, composite_id, lastseen FROM users WHERE composite_id >= '(1, -9223372036854775808)'::user_composite_type AND composite_id <= '(1, 9223372036854775807)'::user_composite_type ORDER BY lastseen DESC LIMIT 10 ) AS subquery_top LEFT JOIN LATERAL (SELECT event_type, event_time FROM events WHERE (composite_id) = subquery_top.composite_id ORDER BY event_time DESC LIMIT 99) AS subquery_lateral ON true GROUP BY tenant_id, user_id ) AS shard_union ORDER BY user_lastseen DESC LIMIT 10; Limit -> Sort Sort Key: remote_scan.user_lastseen DESC -> Custom Scan (Citus Real-Time) Task Count: 4 Tasks Shown: One of 4 -> Task Node: host=localhost port=57637 dbname=regression -> Limit -> Sort Sort Key: (max(users.lastseen)) DESC -> GroupAggregate Group Key: ((users.composite_id).tenant_id), ((users.composite_id).user_id) -> Sort Sort Key: ((users.composite_id).tenant_id), ((users.composite_id).user_id) -> Nested Loop Left Join -> Limit -> Sort Sort Key: users.lastseen DESC -> Seq Scan on users_1400033 users Filter: ((composite_id >= '(1,-9223372036854775808)'::user_composite_type) AND (composite_id <= '(1,9223372036854775807)'::user_composite_type)) -> Limit -> Sort Sort Key: events.event_time DESC -> Seq Scan on events_1400029 events Filter: (composite_id = users.composite_id) RESET citus.subquery_pushdown; -- Test all tasks output SET citus.explain_all_tasks TO on; EXPLAIN (COSTS FALSE) SELECT avg(l_linenumber) FROM lineitem WHERE l_orderkey > 9030; Aggregate -> Custom Scan (Citus Real-Time) Task Count: 4 Tasks Shown: All -> Task Node: host=localhost port=57637 dbname=regression -> Aggregate -> Seq Scan on lineitem_290005 lineitem Filter: (l_orderkey > 9030) -> Task Node: host=localhost port=57638 dbname=regression -> Aggregate -> Seq Scan on lineitem_290004 lineitem Filter: (l_orderkey > 9030) -> Task Node: host=localhost port=57637 dbname=regression -> Aggregate -> Seq Scan on lineitem_290007 lineitem Filter: (l_orderkey > 9030) -> Task Node: host=localhost port=57638 dbname=regression -> Aggregate -> Seq Scan on lineitem_290006 lineitem Filter: (l_orderkey > 9030) SELECT true AS valid FROM explain_xml($$ SELECT avg(l_linenumber) FROM lineitem WHERE l_orderkey > 9030$$); t SELECT true AS valid FROM explain_json($$ SELECT avg(l_linenumber) FROM lineitem WHERE l_orderkey > 9030$$); t -- Test multi shard update EXPLAIN (COSTS FALSE) UPDATE lineitem_hash_part SET l_suppkey = 12; Custom Scan (Citus Router) Task Count: 4 Tasks Shown: All -> Task Node: host=localhost port=57637 dbname=regression -> Update on lineitem_hash_part_360038 lineitem_hash_part -> Seq Scan on lineitem_hash_part_360038 lineitem_hash_part -> Task Node: host=localhost port=57638 dbname=regression -> Update on lineitem_hash_part_360039 lineitem_hash_part -> Seq Scan on lineitem_hash_part_360039 lineitem_hash_part -> Task Node: host=localhost port=57637 dbname=regression -> Update on lineitem_hash_part_360040 lineitem_hash_part -> Seq Scan on lineitem_hash_part_360040 lineitem_hash_part -> Task Node: host=localhost port=57638 dbname=regression -> Update on lineitem_hash_part_360041 lineitem_hash_part -> Seq Scan on lineitem_hash_part_360041 lineitem_hash_part EXPLAIN (COSTS FALSE) UPDATE lineitem_hash_part SET l_suppkey = 12 WHERE l_orderkey = 1 OR l_orderkey = 3; Custom Scan (Citus Router) Task Count: 2 Tasks Shown: All -> Task Node: host=localhost port=57637 dbname=regression -> Update on lineitem_hash_part_360038 lineitem_hash_part -> Seq Scan on lineitem_hash_part_360038 lineitem_hash_part Filter: ((l_orderkey = 1) OR (l_orderkey = 3)) -> Task Node: host=localhost port=57638 dbname=regression -> Update on lineitem_hash_part_360039 lineitem_hash_part -> Seq Scan on lineitem_hash_part_360039 lineitem_hash_part Filter: ((l_orderkey = 1) OR (l_orderkey = 3)) -- Test multi shard delete EXPLAIN (COSTS FALSE) DELETE FROM lineitem_hash_part; Custom Scan (Citus Router) Task Count: 4 Tasks Shown: All -> Task Node: host=localhost port=57637 dbname=regression -> Delete on lineitem_hash_part_360038 lineitem_hash_part -> Seq Scan on lineitem_hash_part_360038 lineitem_hash_part -> Task Node: host=localhost port=57638 dbname=regression -> Delete on lineitem_hash_part_360039 lineitem_hash_part -> Seq Scan on lineitem_hash_part_360039 lineitem_hash_part -> Task Node: host=localhost port=57637 dbname=regression -> Delete on lineitem_hash_part_360040 lineitem_hash_part -> Seq Scan on lineitem_hash_part_360040 lineitem_hash_part -> Task Node: host=localhost port=57638 dbname=regression -> Delete on lineitem_hash_part_360041 lineitem_hash_part -> Seq Scan on lineitem_hash_part_360041 lineitem_hash_part -- Test track tracker SET citus.task_executor_type TO 'task-tracker'; SET citus.explain_all_tasks TO off; EXPLAIN (COSTS FALSE) SELECT avg(l_linenumber) FROM lineitem WHERE l_orderkey > 9030; Aggregate -> Custom Scan (Citus Task-Tracker) Task Count: 4 Tasks Shown: One of 4 -> Task Node: host=localhost port=57637 dbname=regression -> Aggregate -> Seq Scan on lineitem_290005 lineitem Filter: (l_orderkey > 9030) -- Test re-partition join SET citus.large_table_shard_count TO 1; EXPLAIN (COSTS FALSE) SELECT count(*) FROM lineitem, orders, customer, supplier_single_shard WHERE l_orderkey = o_orderkey AND o_custkey = c_custkey AND l_suppkey = s_suppkey; Aggregate -> Custom Scan (Citus Task-Tracker) Task Count: 1 Tasks Shown: None, not supported for re-partition queries -> MapMergeJob Map Task Count: 1 Merge Task Count: 1 -> MapMergeJob Map Task Count: 8 Merge Task Count: 1 EXPLAIN (COSTS FALSE, FORMAT JSON) SELECT count(*) FROM lineitem, orders, customer, supplier_single_shard WHERE l_orderkey = o_orderkey AND o_custkey = c_custkey AND l_suppkey = s_suppkey; [ { "Plan": { "Node Type": "Aggregate", "Strategy": "Plain", "Partial Mode": "Simple", "Parallel Aware": false, "Plans": [ { "Node Type": "Custom Scan", "Parent Relationship": "Outer", "Custom Plan Provider": "Citus Task-Tracker", "Parallel Aware": false, "Distributed Query": { "Job": { "Task Count": 1, "Tasks Shown": "None, not supported for re-partition queries", "Depended Jobs": [ { "Map Task Count": 1, "Merge Task Count": 1, "Depended Jobs": [ { "Map Task Count": 8, "Merge Task Count": 1 } ] } ] } } } ] } } ] SELECT true AS valid FROM explain_json($$ SELECT count(*) FROM lineitem, orders, customer, supplier_single_shard WHERE l_orderkey = o_orderkey AND o_custkey = c_custkey AND l_suppkey = s_suppkey$$); t EXPLAIN (COSTS FALSE, FORMAT XML) SELECT count(*) FROM lineitem, orders, customer, supplier_single_shard WHERE l_orderkey = o_orderkey AND o_custkey = c_custkey AND l_suppkey = s_suppkey; Aggregate Plain Simple false Custom Scan Outer Citus Task-Tracker false 1 None, not supported for re-partition queries 1 1 8 1 SELECT true AS valid FROM explain_xml($$ SELECT count(*) FROM lineitem, orders, customer, supplier WHERE l_orderkey = o_orderkey AND o_custkey = c_custkey AND l_suppkey = s_suppkey$$); t -- make sure that EXPLAIN works without -- problems for queries that inlvolves only -- reference tables SELECT true AS valid FROM explain_xml($$ SELECT count(*) FROM nation WHERE n_name = 'CHINA'$$); t SELECT true AS valid FROM explain_xml($$ SELECT count(*) FROM nation, supplier WHERE nation.n_nationkey = supplier.s_nationkey$$); t EXPLAIN (COSTS FALSE, FORMAT YAML) SELECT count(*) FROM lineitem, orders, customer, supplier_single_shard WHERE l_orderkey = o_orderkey AND o_custkey = c_custkey AND l_suppkey = s_suppkey; - Plan: Node Type: "Aggregate" Strategy: "Plain" Partial Mode: "Simple" Parallel Aware: false Plans: - Node Type: "Custom Scan" Parent Relationship: "Outer" Custom Plan Provider: "Citus Task-Tracker" Parallel Aware: false Distributed Query: Job: Task Count: 1 Tasks Shown: "None, not supported for re-partition queries" Depended Jobs: - Map Task Count: 1 Merge Task Count: 1 Depended Jobs: - Map Task Count: 8 Merge Task Count: 1 -- test parallel aggregates SET parallel_setup_cost=0; SET parallel_tuple_cost=0; SET min_parallel_relation_size=0; ERROR: unrecognized configuration parameter "min_parallel_relation_size" SET min_parallel_table_scan_size=0; SET max_parallel_workers_per_gather=4; -- ensure local plans display correctly CREATE TABLE lineitem_clone (LIKE lineitem); EXPLAIN (COSTS FALSE) SELECT avg(l_linenumber) FROM lineitem_clone; Finalize Aggregate -> Gather Workers Planned: 3 -> Partial Aggregate -> Parallel Seq Scan on lineitem_clone -- ensure distributed plans don't break EXPLAIN (COSTS FALSE) SELECT avg(l_linenumber) FROM lineitem; Aggregate -> Custom Scan (Citus Task-Tracker) Task Count: 8 Tasks Shown: One of 8 -> Task Node: host=localhost port=57637 dbname=regression -> Aggregate -> Seq Scan on lineitem_290001 lineitem -- ensure EXPLAIN EXECUTE doesn't crash PREPARE task_tracker_query AS SELECT avg(l_linenumber) FROM lineitem WHERE l_orderkey > 9030; EXPLAIN (COSTS FALSE) EXECUTE task_tracker_query; Aggregate -> Custom Scan (Citus Task-Tracker) Task Count: 4 Tasks Shown: One of 4 -> Task Node: host=localhost port=57637 dbname=regression -> Aggregate -> Seq Scan on lineitem_290005 lineitem Filter: (l_orderkey > 9030) SET citus.task_executor_type TO 'real-time'; PREPARE router_executor_query AS SELECT l_quantity FROM lineitem WHERE l_orderkey = 5; EXPLAIN EXECUTE router_executor_query; Custom Scan (Citus Router) (cost=0.00..0.00 rows=0 width=0) Task Count: 1 Tasks Shown: All -> Task Node: host=localhost port=57637 dbname=regression -> Index Scan using lineitem_pkey_290000 on lineitem_290000 lineitem (cost=0.28..11.83 rows=3 width=5) Index Cond: (l_orderkey = 5) PREPARE real_time_executor_query AS SELECT avg(l_linenumber) FROM lineitem WHERE l_orderkey > 9030; EXPLAIN (COSTS FALSE) EXECUTE real_time_executor_query; Aggregate -> Custom Scan (Citus Real-Time) Task Count: 4 Tasks Shown: One of 4 -> Task Node: host=localhost port=57637 dbname=regression -> Aggregate -> Seq Scan on lineitem_290005 lineitem Filter: (l_orderkey > 9030) -- EXPLAIN EXECUTE of parametrized prepared statements is broken, but -- at least make sure to fail without crashing PREPARE router_executor_query_param(int) AS SELECT l_quantity FROM lineitem WHERE l_orderkey = $1; EXPLAIN EXECUTE router_executor_query_param(5); Custom Scan (Citus Router) (cost=0.00..0.00 rows=0 width=0) Task Count: 1 Tasks Shown: All -> Task Node: host=localhost port=57637 dbname=regression -> Index Scan using lineitem_pkey_290000 on lineitem_290000 lineitem (cost=0.28..11.83 rows=3 width=5) Index Cond: (l_orderkey = 5) -- test explain in a transaction with alter table to test we use right connections BEGIN; CREATE TABLE explain_table(id int); SELECT create_distributed_table('explain_table', 'id'); ALTER TABLE explain_table ADD COLUMN value int; ROLLBACK; -- test explain with local INSERT ... SELECT EXPLAIN (COSTS OFF) INSERT INTO lineitem_hash_part SELECT o_orderkey FROM orders_hash_part LIMIT 3; Custom Scan (Citus INSERT ... SELECT via coordinator) -> Limit -> Custom Scan (Citus Real-Time) Task Count: 4 Tasks Shown: One of 4 -> Task Node: host=localhost port=57637 dbname=regression -> Limit -> Seq Scan on orders_hash_part_360043 orders_hash_part SELECT true AS valid FROM explain_json($$ INSERT INTO lineitem_hash_part (l_orderkey) SELECT o_orderkey FROM orders_hash_part LIMIT 3; $$); t EXPLAIN (COSTS OFF) INSERT INTO lineitem_hash_part (l_orderkey, l_quantity) SELECT o_orderkey, 5 FROM orders_hash_part LIMIT 3; Custom Scan (Citus INSERT ... SELECT via coordinator) -> Limit -> Custom Scan (Citus Real-Time) Task Count: 4 Tasks Shown: One of 4 -> Task Node: host=localhost port=57637 dbname=regression -> Limit -> Seq Scan on orders_hash_part_360043 orders_hash_part EXPLAIN (COSTS OFF) INSERT INTO lineitem_hash_part (l_orderkey) SELECT s FROM generate_series(1,5) s; Custom Scan (Citus INSERT ... SELECT via coordinator) -> Function Scan on generate_series s EXPLAIN (COSTS OFF) WITH cte1 AS (SELECT s FROM generate_series(1,10) s) INSERT INTO lineitem_hash_part WITH cte1 AS (SELECT * FROM cte1 LIMIT 5) SELECT s FROM cte1; Custom Scan (Citus INSERT ... SELECT via coordinator) -> CTE Scan on cte1 CTE cte1 -> Function Scan on generate_series s CTE cte1 -> Limit -> CTE Scan on cte1 cte1_1 EXPLAIN (COSTS OFF) INSERT INTO lineitem_hash_part ( SELECT s FROM generate_series(1,5) s) UNION ( SELECT s FROM generate_series(5,10) s); Custom Scan (Citus INSERT ... SELECT via coordinator) -> HashAggregate Group Key: s.s -> Append -> Function Scan on generate_series s -> Function Scan on generate_series s_1 -- explain with recursive planning EXPLAIN (COSTS OFF, VERBOSE true) WITH keys AS ( SELECT DISTINCT l_orderkey FROM lineitem_hash_part ), series AS ( SELECT s FROM generate_series(1,10) s ) SELECT l_orderkey FROM series JOIN keys ON (s = l_orderkey) ORDER BY s; Custom Scan (Citus Router) Output: remote_scan.l_orderkey -> Distributed Subplan 55_1 -> HashAggregate Output: remote_scan.l_orderkey Group Key: remote_scan.l_orderkey -> Custom Scan (Citus Real-Time) Output: remote_scan.l_orderkey Task Count: 4 Tasks Shown: One of 4 -> Task Node: host=localhost port=57637 dbname=regression -> HashAggregate Output: l_orderkey Group Key: lineitem_hash_part.l_orderkey -> Seq Scan on public.lineitem_hash_part_360038 lineitem_hash_part Output: l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment -> Distributed Subplan 55_2 -> Function Scan on pg_catalog.generate_series s Output: s Function Call: generate_series(1, 10) Task Count: 1 Tasks Shown: All -> Task Node: host=localhost port=57638 dbname=regression -> Merge Join Output: intermediate_result_1.l_orderkey, intermediate_result.s Merge Cond: (intermediate_result.s = intermediate_result_1.l_orderkey) -> Sort Output: intermediate_result.s Sort Key: intermediate_result.s -> Function Scan on pg_catalog.read_intermediate_result intermediate_result Output: intermediate_result.s Function Call: read_intermediate_result('55_2'::text, 'binary'::citus_copy_format) -> Sort Output: intermediate_result_1.l_orderkey Sort Key: intermediate_result_1.l_orderkey -> Function Scan on pg_catalog.read_intermediate_result intermediate_result_1 Output: intermediate_result_1.l_orderkey Function Call: read_intermediate_result('55_1'::text, 'binary'::citus_copy_format) SELECT true AS valid FROM explain_json($$ WITH result AS ( SELECT l_quantity, count(*) count_quantity FROM lineitem GROUP BY l_quantity ORDER BY count_quantity, l_quantity ), series AS ( SELECT s FROM generate_series(1,10) s ) SELECT * FROM result JOIN series ON (s = count_quantity) JOIN orders_hash_part ON (s = o_orderkey) $$); t SELECT true AS valid FROM explain_xml($$ WITH result AS ( SELECT l_quantity, count(*) count_quantity FROM lineitem GROUP BY l_quantity ORDER BY count_quantity, l_quantity ), series AS ( SELECT s FROM generate_series(1,10) s ) SELECT * FROM result JOIN series ON (s = l_quantity) JOIN orders_hash_part ON (s = o_orderkey) $$); t