diff --git a/src/test/regress/expected/multi_explain_0.out b/src/test/regress/expected/multi_explain_0.out index 0f76dc9d0..322445b7f 100644 --- a/src/test/regress/expected/multi_explain_0.out +++ b/src/test/regress/expected/multi_explain_0.out @@ -6,7 +6,7 @@ ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 570000; SELECT substring(version(), '\d+(?:\.\d+)?') AS major_version; major_version --------------- - 9.5 + 9.6 (1 row) \a\t @@ -42,7 +42,7 @@ EXPLAIN (COSTS FALSE, FORMAT TEXT) SELECT l_quantity, count(*) count_quantity FROM lineitem GROUP BY l_quantity ORDER BY count_quantity, l_quantity; Sort - Sort Key: COALESCE((sum((COALESCE((sum(remote_scan.count_quantity))::bigint, '0'::bigint))))::bigint, '0'::bigint), remote_scan.l_quantity + Sort Key: COALESCE((pg_catalog.sum((COALESCE((pg_catalog.sum(remote_scan.count_quantity))::bigint, '0'::bigint))))::bigint, '0'::bigint), remote_scan.l_quantity -> HashAggregate Group Key: remote_scan.l_quantity -> Custom Scan (Citus Real-Time) @@ -61,18 +61,22 @@ EXPLAIN (COSTS FALSE, FORMAT JSON) { "Plan": { "Node Type": "Sort", - "Sort Key": ["COALESCE((sum((COALESCE((sum(remote_scan.count_quantity))::bigint, '0'::bigint))))::bigint, '0'::bigint)", "remote_scan.l_quantity"], + "Parallel Aware": false, + "Sort Key": ["COALESCE((pg_catalog.sum((COALESCE((pg_catalog.sum(remote_scan.count_quantity))::bigint, '0'::bigint))))::bigint, '0'::bigint)", "remote_scan.l_quantity"], "Plans": [ { "Node Type": "Aggregate", "Strategy": "Hashed", + "Partial Mode": "Simple", "Parent Relationship": "Outer", + "Parallel Aware": false, "Group Key": ["remote_scan.l_quantity"], "Plans": [ { "Node Type": "Custom Scan", "Parent Relationship": "Outer", "Custom Plan Provider": "Citus Real-Time", + "Parallel Aware": false, "Distributed Query": { "Job": { "Task Count": 8, @@ -86,11 +90,14 @@ EXPLAIN (COSTS FALSE, FORMAT JSON) "Plan": { "Node Type": "Aggregate", "Strategy": "Hashed", + "Partial Mode": "Simple", + "Parallel Aware": false, "Group Key": ["l_quantity"], "Plans": [ { "Node Type": "Seq Scan", "Parent Relationship": "Outer", + "Parallel Aware": false, "Relation Name": "lineitem_290001", "Alias": "lineitem" } @@ -124,15 +131,18 @@ EXPLAIN (COSTS FALSE, FORMAT XML) Sort + false - COALESCE((sum((COALESCE((sum(remote_scan.count_quantity))::bigint, '0'::bigint))))::bigint, '0'::bigint) + COALESCE((pg_catalog.sum((COALESCE((pg_catalog.sum(remote_scan.count_quantity))::bigint, '0'::bigint))))::bigint, '0'::bigint) remote_scan.l_quantity Aggregate Hashed + Simple Outer + false remote_scan.l_quantity @@ -141,6 +151,7 @@ EXPLAIN (COSTS FALSE, FORMAT XML) Custom Scan Outer Citus Real-Time + false 8 @@ -154,6 +165,8 @@ EXPLAIN (COSTS FALSE, FORMAT XML) Aggregate Hashed + Simple + false l_quantity @@ -161,6 +174,7 @@ EXPLAIN (COSTS FALSE, FORMAT XML) Seq Scan Outer + false lineitem_290001 lineitem @@ -191,19 +205,23 @@ EXPLAIN (COSTS FALSE, FORMAT YAML) GROUP BY l_quantity ORDER BY count_quantity, l_quantity; - Plan: Node Type: "Sort" + Parallel Aware: false Sort Key: - - "COALESCE((sum((COALESCE((sum(remote_scan.count_quantity))::bigint, '0'::bigint))))::bigint, '0'::bigint)" + - "COALESCE((pg_catalog.sum((COALESCE((pg_catalog.sum(remote_scan.count_quantity))::bigint, '0'::bigint))))::bigint, '0'::bigint)" - "remote_scan.l_quantity" Plans: - Node Type: "Aggregate" Strategy: "Hashed" + Partial Mode: "Simple" Parent Relationship: "Outer" + Parallel Aware: false Group Key: - "remote_scan.l_quantity" Plans: - Node Type: "Custom Scan" Parent Relationship: "Outer" Custom Plan Provider: "Citus Real-Time" + Parallel Aware: false Distributed Query: Job: Task Count: 8 @@ -214,11 +232,14 @@ EXPLAIN (COSTS FALSE, FORMAT YAML) - Plan: Node Type: "Aggregate" Strategy: "Hashed" + Partial Mode: "Simple" + Parallel Aware: false Group Key: - "l_quantity" Plans: - Node Type: "Seq Scan" Parent Relationship: "Outer" + Parallel Aware: false Relation Name: "lineitem_290001" Alias: "lineitem" @@ -227,7 +248,7 @@ EXPLAIN (COSTS FALSE, FORMAT TEXT) SELECT l_quantity, count(*) count_quantity FROM lineitem GROUP BY l_quantity ORDER BY count_quantity, l_quantity; Sort - Sort Key: COALESCE((sum((COALESCE((sum(remote_scan.count_quantity))::bigint, '0'::bigint))))::bigint, '0'::bigint), remote_scan.l_quantity + Sort Key: COALESCE((pg_catalog.sum((COALESCE((pg_catalog.sum(remote_scan.count_quantity))::bigint, '0'::bigint))))::bigint, '0'::bigint), remote_scan.l_quantity -> HashAggregate Group Key: remote_scan.l_quantity -> Custom Scan (Citus Real-Time) @@ -242,7 +263,7 @@ Sort EXPLAIN (COSTS FALSE, VERBOSE TRUE) SELECT sum(l_quantity) / avg(l_quantity) FROM lineitem; Aggregate - Output: (sum(remote_scan."?column?") / (sum(remote_scan."?column?_1") / sum(remote_scan."?column?_2"))) + Output: (sum(remote_scan."?column?") / (sum(remote_scan."?column?_1") / pg_catalog.sum(remote_scan."?column?_2"))) -> Custom Scan (Citus Real-Time) Output: remote_scan."?column?", remote_scan."?column?_1", remote_scan."?column?_2" Task Count: 8 @@ -344,7 +365,7 @@ EXPLAIN (COSTS FALSE, VERBOSE TRUE) SELECT sum(l_quantity) / avg(l_quantity) FROM lineitem HAVING sum(l_quantity) > 100; Aggregate - Output: (sum(remote_scan."?column?") / (sum(remote_scan."?column?_1") / sum(remote_scan."?column?_2"))) + Output: (sum(remote_scan."?column?") / (sum(remote_scan."?column?_1") / pg_catalog.sum(remote_scan."?column?_2"))) Filter: (sum(remote_scan.worker_column_4) > '100'::numeric) -> Custom Scan (Citus Real-Time) Output: remote_scan."?column?", remote_scan."?column?_1", remote_scan."?column?_2", remote_scan.worker_column_4 @@ -410,11 +431,15 @@ Aggregate Node: host=localhost port=57637 dbname=regression -> Aggregate -> GroupAggregate - Group Key: ((users.composite_id).tenant_id), ((users.composite_id).user_id) + Group Key: (((NULL::user_composite_type)).tenant_id), (((NULL::user_composite_type)).user_id) -> Sort - Sort Key: ((users.composite_id).tenant_id), ((users.composite_id).user_id) - -> Result - One-Time Filter: false + Sort Key: (((NULL::user_composite_type)).tenant_id), (((NULL::user_composite_type)).user_id) + -> Nested Loop + Join Filter: ((NULL::user_composite_type) = events.composite_id) + -> Result + One-Time Filter: false + -> Seq Scan on events_1400027 events + Filter: ((event_type)::text = ANY ('{click,submit,pay}'::text[])) -- Union and left join subquery pushdown EXPLAIN (COSTS OFF) SELECT @@ -485,29 +510,40 @@ HashAggregate Tasks Shown: One of 4 -> Task Node: host=localhost port=57637 dbname=regression - -> HashAggregate - Group Key: COALESCE(subquery_2.hasdone, 'Has not done paying'::text) - -> GroupAggregate - Group Key: ((composite_id).tenant_id), ((composite_id).user_id), subquery_2.hasdone - -> Sort - Sort Key: ((composite_id).tenant_id), ((composite_id).user_id), subquery_2.hasdone - -> Hash Left Join - Hash Cond: (composite_id = subquery_2.composite_id) - -> Unique - -> Sort - Sort Key: ((composite_id).tenant_id), ((composite_id).user_id), composite_id, ('action=>1'::text), event_time - -> Append - -> Result - One-Time Filter: false - -> Result - One-Time Filter: false - -> Hash - -> Subquery Scan on subquery_2 + -> GroupAggregate + Group Key: subquery_top.hasdone + -> Sort + Sort Key: subquery_top.hasdone + -> Subquery Scan on subquery_top + -> GroupAggregate + Group Key: (((NULL::user_composite_type)).tenant_id), (((NULL::user_composite_type)).user_id), subquery_2.hasdone + -> Sort + Sort Key: (((NULL::user_composite_type)).tenant_id), (((NULL::user_composite_type)).user_id), subquery_2.hasdone + -> Hash Left Join + Hash Cond: ((NULL::user_composite_type) = subquery_2.composite_id) -> Unique -> Sort - Sort Key: ((events.composite_id).tenant_id), ((events.composite_id).user_id) - -> Seq Scan on events_1400027 events - Filter: ((composite_id >= '(1,-9223372036854775808)'::user_composite_type) AND (composite_id <= '(1,9223372036854775807)'::user_composite_type) AND ((event_type)::text = 'pay'::text)) + Sort Key: (((NULL::user_composite_type)).tenant_id), (((NULL::user_composite_type)).user_id), (NULL::user_composite_type), ('action=>1'::text), events.event_time + -> Append + -> Nested Loop + Join Filter: ((NULL::user_composite_type) = events.composite_id) + -> Result + One-Time Filter: false + -> Seq Scan on events_1400027 events + Filter: ((event_type)::text = 'click'::text) + -> Nested Loop + Join Filter: ((NULL::user_composite_type) = events_1.composite_id) + -> Result + One-Time Filter: false + -> Seq Scan on events_1400027 events_1 + Filter: ((event_type)::text = 'submit'::text) + -> Hash + -> Subquery Scan on subquery_2 + -> Unique + -> Sort + Sort Key: ((events_2.composite_id).tenant_id), ((events_2.composite_id).user_id) + -> Seq Scan on events_1400027 events_2 + Filter: ((composite_id >= '(1,-9223372036854775808)'::user_composite_type) AND (composite_id <= '(1,9223372036854775807)'::user_composite_type) AND ((event_type)::text = 'pay'::text)) -- Union, left join and having subquery pushdown EXPLAIN (COSTS OFF) SELECT @@ -643,22 +679,23 @@ Limit Node: host=localhost port=57637 dbname=regression -> Limit -> Sort - Sort Key: (max(lastseen)) DESC + Sort Key: (max(users.lastseen)) DESC -> GroupAggregate - Group Key: ((composite_id).tenant_id), ((composite_id).user_id) + Group Key: ((users.composite_id).tenant_id), ((users.composite_id).user_id) -> Sort - Sort Key: ((composite_id).tenant_id), ((composite_id).user_id) + Sort Key: ((users.composite_id).tenant_id), ((users.composite_id).user_id) -> Nested Loop Left Join -> Limit -> Sort - Sort Key: lastseen DESC - -> Result - One-Time Filter: false + Sort Key: users.lastseen DESC + -> Subquery Scan on users + -> Result + One-Time Filter: false -> Limit -> Sort Sort Key: events.event_time DESC -> Seq Scan on events_1400027 events - Filter: (composite_id = composite_id) + Filter: (composite_id = users.composite_id) -- Test all tasks output SET citus.explain_all_tasks TO on; EXPLAIN (COSTS FALSE) @@ -736,11 +773,14 @@ EXPLAIN (COSTS FALSE, FORMAT JSON) "Plan": { "Node Type": "Aggregate", "Strategy": "Plain", + "Partial Mode": "Simple", + "Parallel Aware": false, "Plans": [ { "Node Type": "Custom Scan", "Parent Relationship": "Outer", "Custom Plan Provider": "Citus Task-Tracker", + "Parallel Aware": false, "Distributed Query": { "Job": { "Task Count": 1, @@ -782,11 +822,14 @@ EXPLAIN (COSTS FALSE, FORMAT XML) Aggregate Plain + Simple + false Custom Scan Outer Citus Task-Tracker + false 1 @@ -839,10 +882,13 @@ EXPLAIN (COSTS FALSE, FORMAT YAML) - Plan: Node Type: "Aggregate" Strategy: "Plain" + Partial Mode: "Simple" + Parallel Aware: false Plans: - Node Type: "Custom Scan" Parent Relationship: "Outer" Custom Plan Provider: "Citus Task-Tracker" + Parallel Aware: false Distributed Query: Job: Task Count: 1 @@ -855,18 +901,17 @@ EXPLAIN (COSTS FALSE, FORMAT YAML) Merge Task Count: 1 -- test parallel aggregates SET parallel_setup_cost=0; -ERROR: unrecognized configuration parameter "parallel_setup_cost" SET parallel_tuple_cost=0; -ERROR: unrecognized configuration parameter "parallel_tuple_cost" SET min_parallel_relation_size=0; -ERROR: unrecognized configuration parameter "min_parallel_relation_size" SET max_parallel_workers_per_gather=4; -ERROR: unrecognized configuration parameter "max_parallel_workers_per_gather" -- ensure local plans display correctly CREATE TABLE lineitem_clone (LIKE lineitem); EXPLAIN (COSTS FALSE) SELECT avg(l_linenumber) FROM lineitem_clone; -Aggregate - -> Seq Scan on lineitem_clone +Finalize Aggregate + -> Gather + Workers Planned: 3 + -> Partial Aggregate + -> Parallel Seq Scan on lineitem_clone -- ensure distributed plans don't break EXPLAIN (COSTS FALSE) SELECT avg(l_linenumber) FROM lineitem; Aggregate diff --git a/src/test/regress/expected/multi_explain_1.out b/src/test/regress/expected/multi_explain_1.out new file mode 100644 index 000000000..0f76dc9d0 --- /dev/null +++ b/src/test/regress/expected/multi_explain_1.out @@ -0,0 +1,942 @@ +-- +-- MULTI_EXPLAIN +-- +ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 570000; +-- print major version to make version-specific tests clear +SELECT substring(version(), '\d+(?:\.\d+)?') AS major_version; + major_version +--------------- + 9.5 +(1 row) + +\a\t +SET citus.task_executor_type TO 'real-time'; +SET citus.explain_distributed_queries TO on; +-- Function that parses explain output as JSON +CREATE FUNCTION explain_json(query text) +RETURNS jsonb +AS $BODY$ +DECLARE + result jsonb; +BEGIN + EXECUTE format('EXPLAIN (FORMAT JSON) %s', query) INTO result; + RETURN result; +END; +$BODY$ LANGUAGE plpgsql; +-- Function that parses explain output as XML +CREATE FUNCTION explain_xml(query text) +RETURNS xml +AS $BODY$ +DECLARE + result xml; +BEGIN + EXECUTE format('EXPLAIN (FORMAT XML) %s', query) INTO result; + RETURN result; +END; +$BODY$ LANGUAGE plpgsql; +-- VACUMM related tables to ensure test outputs are stable +VACUUM ANALYZE lineitem; +VACUUM ANALYZE orders; +-- Test Text format +EXPLAIN (COSTS FALSE, FORMAT TEXT) + SELECT l_quantity, count(*) count_quantity FROM lineitem + GROUP BY l_quantity ORDER BY count_quantity, l_quantity; +Sort + Sort Key: COALESCE((sum((COALESCE((sum(remote_scan.count_quantity))::bigint, '0'::bigint))))::bigint, '0'::bigint), remote_scan.l_quantity + -> HashAggregate + Group Key: remote_scan.l_quantity + -> Custom Scan (Citus Real-Time) + Task Count: 8 + Tasks Shown: One of 8 + -> Task + Node: host=localhost port=57637 dbname=regression + -> HashAggregate + Group Key: l_quantity + -> Seq Scan on lineitem_290001 lineitem +-- Test JSON format +EXPLAIN (COSTS FALSE, FORMAT JSON) + SELECT l_quantity, count(*) count_quantity FROM lineitem + GROUP BY l_quantity ORDER BY count_quantity, l_quantity; +[ + { + "Plan": { + "Node Type": "Sort", + "Sort Key": ["COALESCE((sum((COALESCE((sum(remote_scan.count_quantity))::bigint, '0'::bigint))))::bigint, '0'::bigint)", "remote_scan.l_quantity"], + "Plans": [ + { + "Node Type": "Aggregate", + "Strategy": "Hashed", + "Parent Relationship": "Outer", + "Group Key": ["remote_scan.l_quantity"], + "Plans": [ + { + "Node Type": "Custom Scan", + "Parent Relationship": "Outer", + "Custom Plan Provider": "Citus Real-Time", + "Distributed Query": { + "Job": { + "Task Count": 8, + "Tasks Shown": "One of 8", + "Tasks": [ + { + "Node": "host=localhost port=57637 dbname=regression", + "Remote Plan": [ + [ + { + "Plan": { + "Node Type": "Aggregate", + "Strategy": "Hashed", + "Group Key": ["l_quantity"], + "Plans": [ + { + "Node Type": "Seq Scan", + "Parent Relationship": "Outer", + "Relation Name": "lineitem_290001", + "Alias": "lineitem" + } + ] + } + } + ] + + ] + } + ] + } + } + } + ] + } + ] + } + } +] +-- Validate JSON format +SELECT true AS valid FROM explain_json($$ + SELECT l_quantity, count(*) count_quantity FROM lineitem + GROUP BY l_quantity ORDER BY count_quantity, l_quantity$$); +t +-- Test XML format +EXPLAIN (COSTS FALSE, FORMAT XML) + SELECT l_quantity, count(*) count_quantity FROM lineitem + GROUP BY l_quantity ORDER BY count_quantity, l_quantity; + + + + Sort + + COALESCE((sum((COALESCE((sum(remote_scan.count_quantity))::bigint, '0'::bigint))))::bigint, '0'::bigint) + remote_scan.l_quantity + + + + Aggregate + Hashed + Outer + + remote_scan.l_quantity + + + + Custom Scan + Outer + Citus Real-Time + + + 8 + One of 8 + + + host=localhost port=57637 dbname=regression + + + + + Aggregate + Hashed + + l_quantity + + + + Seq Scan + Outer + lineitem_290001 + lineitem + + + + + + + + + + + + + + + + + +-- Validate XML format +SELECT true AS valid FROM explain_xml($$ + SELECT l_quantity, count(*) count_quantity FROM lineitem + GROUP BY l_quantity ORDER BY count_quantity, l_quantity$$); +t +-- Test YAML format +EXPLAIN (COSTS FALSE, FORMAT YAML) + SELECT l_quantity, count(*) count_quantity FROM lineitem + GROUP BY l_quantity ORDER BY count_quantity, l_quantity; +- Plan: + Node Type: "Sort" + Sort Key: + - "COALESCE((sum((COALESCE((sum(remote_scan.count_quantity))::bigint, '0'::bigint))))::bigint, '0'::bigint)" + - "remote_scan.l_quantity" + Plans: + - Node Type: "Aggregate" + Strategy: "Hashed" + Parent Relationship: "Outer" + Group Key: + - "remote_scan.l_quantity" + Plans: + - Node Type: "Custom Scan" + Parent Relationship: "Outer" + Custom Plan Provider: "Citus Real-Time" + Distributed Query: + Job: + Task Count: 8 + Tasks Shown: "One of 8" + Tasks: + - Node: "host=localhost port=57637 dbname=regression" + Remote Plan: + - Plan: + Node Type: "Aggregate" + Strategy: "Hashed" + Group Key: + - "l_quantity" + Plans: + - Node Type: "Seq Scan" + Parent Relationship: "Outer" + Relation Name: "lineitem_290001" + Alias: "lineitem" + +-- Test Text format +EXPLAIN (COSTS FALSE, FORMAT TEXT) + SELECT l_quantity, count(*) count_quantity FROM lineitem + GROUP BY l_quantity ORDER BY count_quantity, l_quantity; +Sort + Sort Key: COALESCE((sum((COALESCE((sum(remote_scan.count_quantity))::bigint, '0'::bigint))))::bigint, '0'::bigint), remote_scan.l_quantity + -> HashAggregate + Group Key: remote_scan.l_quantity + -> Custom Scan (Citus Real-Time) + Task Count: 8 + Tasks Shown: One of 8 + -> Task + Node: host=localhost port=57637 dbname=regression + -> HashAggregate + Group Key: l_quantity + -> Seq Scan on lineitem_290001 lineitem +-- Test verbose +EXPLAIN (COSTS FALSE, VERBOSE TRUE) + SELECT sum(l_quantity) / avg(l_quantity) FROM lineitem; +Aggregate + Output: (sum(remote_scan."?column?") / (sum(remote_scan."?column?_1") / sum(remote_scan."?column?_2"))) + -> Custom Scan (Citus Real-Time) + Output: remote_scan."?column?", remote_scan."?column?_1", remote_scan."?column?_2" + Task Count: 8 + Tasks Shown: One of 8 + -> Task + Node: host=localhost port=57637 dbname=regression + -> Aggregate + Output: sum(l_quantity), sum(l_quantity), count(l_quantity) + -> Seq Scan on public.lineitem_290001 lineitem + Output: l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment +-- Test join +EXPLAIN (COSTS FALSE) + SELECT * FROM lineitem + JOIN orders ON l_orderkey = o_orderkey AND l_quantity < 5.0 + ORDER BY l_quantity LIMIT 10; +Limit + -> Sort + Sort Key: remote_scan.l_quantity + -> Custom Scan (Citus Real-Time) + Task Count: 8 + Tasks Shown: One of 8 + -> Task + Node: host=localhost port=57637 dbname=regression + -> Limit + -> Sort + Sort Key: lineitem.l_quantity + -> Merge Join + Merge Cond: (orders.o_orderkey = lineitem.l_orderkey) + -> Index Scan using orders_pkey_290008 on orders_290008 orders + -> Sort + Sort Key: lineitem.l_orderkey + -> Seq Scan on lineitem_290001 lineitem + Filter: (l_quantity < 5.0) +-- Test insert +EXPLAIN (COSTS FALSE) + INSERT INTO lineitem VALUES(1,0); +Custom Scan (Citus Router) + Task Count: 1 + Tasks Shown: All + -> Task + Node: host=localhost port=57638 dbname=regression + -> Insert on lineitem_290000 + -> Result +-- Test update +EXPLAIN (COSTS FALSE) + UPDATE lineitem + SET l_suppkey = 12 + WHERE l_orderkey = 1 AND l_partkey = 0; +Custom Scan (Citus Router) + Task Count: 1 + Tasks Shown: All + -> Task + Node: host=localhost port=57638 dbname=regression + -> Update on lineitem_290000 + -> Index Scan using lineitem_pkey_290000 on lineitem_290000 + Index Cond: (l_orderkey = 1) + Filter: (l_partkey = 0) +-- Test delete +EXPLAIN (COSTS FALSE) + DELETE FROM lineitem + WHERE l_orderkey = 1 AND l_partkey = 0; +Custom Scan (Citus Router) + Task Count: 1 + Tasks Shown: All + -> Task + Node: host=localhost port=57638 dbname=regression + -> Delete on lineitem_290000 + -> Index Scan using lineitem_pkey_290000 on lineitem_290000 + Index Cond: (l_orderkey = 1) + Filter: (l_partkey = 0) +-- Test single-shard SELECT +EXPLAIN (COSTS FALSE) + SELECT l_quantity FROM lineitem WHERE l_orderkey = 5; +Custom Scan (Citus Router) + Task Count: 1 + Tasks Shown: All + -> Task + Node: host=localhost port=57637 dbname=regression + -> Index Scan using lineitem_pkey_290000 on lineitem_290000 lineitem + Index Cond: (l_orderkey = 5) +SELECT true AS valid FROM explain_xml($$ + SELECT l_quantity FROM lineitem WHERE l_orderkey = 5$$); +t +SELECT true AS valid FROM explain_json($$ + SELECT l_quantity FROM lineitem WHERE l_orderkey = 5$$); +t +-- Test CREATE TABLE ... AS +EXPLAIN (COSTS FALSE) + CREATE TABLE explain_result AS + SELECT * FROM lineitem; +Custom Scan (Citus Real-Time) + Task Count: 8 + Tasks Shown: One of 8 + -> Task + Node: host=localhost port=57637 dbname=regression + -> Seq Scan on lineitem_290001 lineitem +-- Test having +EXPLAIN (COSTS FALSE, VERBOSE TRUE) + SELECT sum(l_quantity) / avg(l_quantity) FROM lineitem + HAVING sum(l_quantity) > 100; +Aggregate + Output: (sum(remote_scan."?column?") / (sum(remote_scan."?column?_1") / sum(remote_scan."?column?_2"))) + Filter: (sum(remote_scan.worker_column_4) > '100'::numeric) + -> Custom Scan (Citus Real-Time) + Output: remote_scan."?column?", remote_scan."?column?_1", remote_scan."?column?_2", remote_scan.worker_column_4 + Task Count: 8 + Tasks Shown: One of 8 + -> Task + Node: host=localhost port=57637 dbname=regression + -> Aggregate + Output: sum(l_quantity), sum(l_quantity), count(l_quantity), sum(l_quantity) + -> Seq Scan on public.lineitem_290001 lineitem + Output: l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment +-- Test having without aggregate +EXPLAIN (COSTS FALSE, VERBOSE TRUE) + SELECT l_quantity FROM lineitem + GROUP BY l_quantity + HAVING l_quantity > (100 * random()); +HashAggregate + Output: remote_scan.l_quantity + Group Key: remote_scan.l_quantity + Filter: ((remote_scan.worker_column_2)::double precision > ('100'::double precision * random())) + -> Custom Scan (Citus Real-Time) + Output: remote_scan.l_quantity, remote_scan.worker_column_2 + Task Count: 8 + Tasks Shown: One of 8 + -> Task + Node: host=localhost port=57637 dbname=regression + -> HashAggregate + Output: l_quantity, l_quantity + Group Key: lineitem.l_quantity + -> Seq Scan on public.lineitem_290001 lineitem + Output: l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment +-- Subquery pushdown tests with explain +EXPLAIN (COSTS OFF) +SELECT + avg(array_length(events, 1)) AS event_average +FROM + (SELECT + tenant_id, + user_id, + array_agg(event_type ORDER BY event_time) AS events + FROM + (SELECT + (users.composite_id).tenant_id, + (users.composite_id).user_id, + event_type, + events.event_time + FROM + users, + events + WHERE + (users.composite_id) = (events.composite_id) AND + users.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND + users.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND + event_type IN ('click', 'submit', 'pay')) AS subquery + GROUP BY + tenant_id, + user_id) AS subquery; +Aggregate + -> Custom Scan (Citus Real-Time) + Task Count: 4 + Tasks Shown: One of 4 + -> Task + Node: host=localhost port=57637 dbname=regression + -> Aggregate + -> GroupAggregate + Group Key: ((users.composite_id).tenant_id), ((users.composite_id).user_id) + -> Sort + Sort Key: ((users.composite_id).tenant_id), ((users.composite_id).user_id) + -> Result + One-Time Filter: false +-- Union and left join subquery pushdown +EXPLAIN (COSTS OFF) +SELECT + avg(array_length(events, 1)) AS event_average, + hasdone +FROM + (SELECT + subquery_1.tenant_id, + subquery_1.user_id, + array_agg(event ORDER BY event_time) AS events, + COALESCE(hasdone, 'Has not done paying') AS hasdone + FROM + ( + (SELECT + (users.composite_id).tenant_id, + (users.composite_id).user_id, + (users.composite_id) as composite_id, + 'action=>1'AS event, + events.event_time + FROM + users, + events + WHERE + (users.composite_id) = (events.composite_id) AND + users.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND + users.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND + event_type = 'click') + UNION + (SELECT + (users.composite_id).tenant_id, + (users.composite_id).user_id, + (users.composite_id) as composite_id, + 'action=>2'AS event, + events.event_time + FROM + users, + events + WHERE + (users.composite_id) = (events.composite_id) AND + users.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND + users.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND + event_type = 'submit') + ) AS subquery_1 + LEFT JOIN + (SELECT + DISTINCT ON ((composite_id).tenant_id, (composite_id).user_id) composite_id, + (composite_id).tenant_id, + (composite_id).user_id, + 'Has done paying'::TEXT AS hasdone + FROM + events + WHERE + events.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND + events.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND + event_type = 'pay') AS subquery_2 + ON + subquery_1.composite_id = subquery_2.composite_id + GROUP BY + subquery_1.tenant_id, + subquery_1.user_id, + hasdone) AS subquery_top +GROUP BY + hasdone; +HashAggregate + Group Key: remote_scan.hasdone + -> Custom Scan (Citus Real-Time) + Task Count: 4 + Tasks Shown: One of 4 + -> Task + Node: host=localhost port=57637 dbname=regression + -> HashAggregate + Group Key: COALESCE(subquery_2.hasdone, 'Has not done paying'::text) + -> GroupAggregate + Group Key: ((composite_id).tenant_id), ((composite_id).user_id), subquery_2.hasdone + -> Sort + Sort Key: ((composite_id).tenant_id), ((composite_id).user_id), subquery_2.hasdone + -> Hash Left Join + Hash Cond: (composite_id = subquery_2.composite_id) + -> Unique + -> Sort + Sort Key: ((composite_id).tenant_id), ((composite_id).user_id), composite_id, ('action=>1'::text), event_time + -> Append + -> Result + One-Time Filter: false + -> Result + One-Time Filter: false + -> Hash + -> Subquery Scan on subquery_2 + -> Unique + -> Sort + Sort Key: ((events.composite_id).tenant_id), ((events.composite_id).user_id) + -> Seq Scan on events_1400027 events + Filter: ((composite_id >= '(1,-9223372036854775808)'::user_composite_type) AND (composite_id <= '(1,9223372036854775807)'::user_composite_type) AND ((event_type)::text = 'pay'::text)) +-- Union, left join and having subquery pushdown +EXPLAIN (COSTS OFF) + SELECT + avg(array_length(events, 1)) AS event_average, + count_pay + FROM ( + SELECT + subquery_1.tenant_id, + subquery_1.user_id, + array_agg(event ORDER BY event_time) AS events, + COALESCE(count_pay, 0) AS count_pay + FROM + ( + (SELECT + (users.composite_id).tenant_id, + (users.composite_id).user_id, + (users.composite_id), + 'action=>1'AS event, + events.event_time + FROM + users, + events + WHERE + (users.composite_id) = (events.composite_id) AND + users.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND + users.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND + event_type = 'click') + UNION + (SELECT + (users.composite_id).tenant_id, + (users.composite_id).user_id, + (users.composite_id), + 'action=>2'AS event, + events.event_time + FROM + users, + events + WHERE + (users.composite_id) = (events.composite_id) AND + users.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND + users.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND + event_type = 'submit') + ) AS subquery_1 + LEFT JOIN + (SELECT + (composite_id).tenant_id, + (composite_id).user_id, + composite_id, + COUNT(*) AS count_pay + FROM + events + WHERE + events.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND + events.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND + event_type = 'pay' + GROUP BY + composite_id + HAVING + COUNT(*) > 2) AS subquery_2 + ON + subquery_1.composite_id = subquery_2.composite_id + GROUP BY + subquery_1.tenant_id, + subquery_1.user_id, + count_pay) AS subquery_top +WHERE + array_ndims(events) > 0 +GROUP BY + count_pay +ORDER BY + count_pay; +ERROR: bogus varattno for OUTER_VAR var: 3 +-- Lateral join subquery pushdown +-- set subquery_pushdown due to limit in the query +SET citus.subquery_pushdown to ON; +EXPLAIN (COSTS OFF) +SELECT + tenant_id, + user_id, + user_lastseen, + event_array +FROM + (SELECT + tenant_id, + user_id, + max(lastseen) as user_lastseen, + array_agg(event_type ORDER BY event_time) AS event_array + FROM + (SELECT + (composite_id).tenant_id, + (composite_id).user_id, + composite_id, + lastseen + FROM + users + WHERE + composite_id >= '(1, -9223372036854775808)'::user_composite_type AND + composite_id <= '(1, 9223372036854775807)'::user_composite_type + ORDER BY + lastseen DESC + LIMIT + 10 + ) AS subquery_top + LEFT JOIN LATERAL + (SELECT + event_type, + event_time + FROM + events + WHERE + (composite_id) = subquery_top.composite_id + ORDER BY + event_time DESC + LIMIT + 99) AS subquery_lateral + ON + true + GROUP BY + tenant_id, + user_id + ) AS shard_union +ORDER BY + user_lastseen DESC +LIMIT + 10; +Limit + -> Sort + Sort Key: remote_scan.user_lastseen DESC + -> Custom Scan (Citus Real-Time) + Task Count: 4 + Tasks Shown: One of 4 + -> Task + Node: host=localhost port=57637 dbname=regression + -> Limit + -> Sort + Sort Key: (max(lastseen)) DESC + -> GroupAggregate + Group Key: ((composite_id).tenant_id), ((composite_id).user_id) + -> Sort + Sort Key: ((composite_id).tenant_id), ((composite_id).user_id) + -> Nested Loop Left Join + -> Limit + -> Sort + Sort Key: lastseen DESC + -> Result + One-Time Filter: false + -> Limit + -> Sort + Sort Key: events.event_time DESC + -> Seq Scan on events_1400027 events + Filter: (composite_id = composite_id) +-- Test all tasks output +SET citus.explain_all_tasks TO on; +EXPLAIN (COSTS FALSE) + SELECT avg(l_linenumber) FROM lineitem WHERE l_orderkey > 9030; +Aggregate + -> Custom Scan (Citus Real-Time) + Task Count: 4 + Tasks Shown: All + -> Task + Node: host=localhost port=57637 dbname=regression + -> Aggregate + -> Seq Scan on lineitem_290005 lineitem + Filter: (l_orderkey > 9030) + -> Task + Node: host=localhost port=57638 dbname=regression + -> Aggregate + -> Seq Scan on lineitem_290004 lineitem + Filter: (l_orderkey > 9030) + -> Task + Node: host=localhost port=57637 dbname=regression + -> Aggregate + -> Seq Scan on lineitem_290007 lineitem + Filter: (l_orderkey > 9030) + -> Task + Node: host=localhost port=57638 dbname=regression + -> Aggregate + -> Seq Scan on lineitem_290006 lineitem + Filter: (l_orderkey > 9030) +SELECT true AS valid FROM explain_xml($$ + SELECT avg(l_linenumber) FROM lineitem WHERE l_orderkey > 9030$$); +t +SELECT true AS valid FROM explain_json($$ + SELECT avg(l_linenumber) FROM lineitem WHERE l_orderkey > 9030$$); +t +-- Test track tracker +SET citus.task_executor_type TO 'task-tracker'; +SET citus.explain_all_tasks TO off; +EXPLAIN (COSTS FALSE) + SELECT avg(l_linenumber) FROM lineitem WHERE l_orderkey > 9030; +Aggregate + -> Custom Scan (Citus Task-Tracker) + Task Count: 4 + Tasks Shown: One of 4 + -> Task + Node: host=localhost port=57637 dbname=regression + -> Aggregate + -> Seq Scan on lineitem_290005 lineitem + Filter: (l_orderkey > 9030) +-- Test re-partition join +SET citus.large_table_shard_count TO 1; +EXPLAIN (COSTS FALSE) + SELECT count(*) + FROM lineitem, orders, customer, supplier_single_shard + WHERE l_orderkey = o_orderkey + AND o_custkey = c_custkey + AND l_suppkey = s_suppkey; +Aggregate + -> Custom Scan (Citus Task-Tracker) + Task Count: 1 + Tasks Shown: None, not supported for re-partition queries + -> MapMergeJob + Map Task Count: 1 + Merge Task Count: 1 + -> MapMergeJob + Map Task Count: 8 + Merge Task Count: 1 +EXPLAIN (COSTS FALSE, FORMAT JSON) + SELECT count(*) + FROM lineitem, orders, customer, supplier_single_shard + WHERE l_orderkey = o_orderkey + AND o_custkey = c_custkey + AND l_suppkey = s_suppkey; +[ + { + "Plan": { + "Node Type": "Aggregate", + "Strategy": "Plain", + "Plans": [ + { + "Node Type": "Custom Scan", + "Parent Relationship": "Outer", + "Custom Plan Provider": "Citus Task-Tracker", + "Distributed Query": { + "Job": { + "Task Count": 1, + "Tasks Shown": "None, not supported for re-partition queries", + "Depended Jobs": [ + { + "Map Task Count": 1, + "Merge Task Count": 1, + "Depended Jobs": [ + { + "Map Task Count": 8, + "Merge Task Count": 1 + } + ] + } + ] + } + } + } + ] + } + } +] +SELECT true AS valid FROM explain_json($$ + SELECT count(*) + FROM lineitem, orders, customer, supplier_single_shard + WHERE l_orderkey = o_orderkey + AND o_custkey = c_custkey + AND l_suppkey = s_suppkey$$); +t +EXPLAIN (COSTS FALSE, FORMAT XML) + SELECT count(*) + FROM lineitem, orders, customer, supplier_single_shard + WHERE l_orderkey = o_orderkey + AND o_custkey = c_custkey + AND l_suppkey = s_suppkey; + + + + Aggregate + Plain + + + Custom Scan + Outer + Citus Task-Tracker + + + 1 + None, not supported for re-partition queries + + + 1 + 1 + + + 8 + 1 + + + + + + + + + + + +SELECT true AS valid FROM explain_xml($$ + SELECT count(*) + FROM lineitem, orders, customer, supplier + WHERE l_orderkey = o_orderkey + AND o_custkey = c_custkey + AND l_suppkey = s_suppkey$$); +t +-- make sure that EXPLAIN works without +-- problems for queries that inlvolves only +-- reference tables +SELECT true AS valid FROM explain_xml($$ + SELECT count(*) + FROM nation + WHERE n_name = 'CHINA'$$); +t +SELECT true AS valid FROM explain_xml($$ + SELECT count(*) + FROM nation, supplier + WHERE nation.n_nationkey = supplier.s_nationkey$$); +t +EXPLAIN (COSTS FALSE, FORMAT YAML) + SELECT count(*) + FROM lineitem, orders, customer, supplier_single_shard + WHERE l_orderkey = o_orderkey + AND o_custkey = c_custkey + AND l_suppkey = s_suppkey; +- Plan: + Node Type: "Aggregate" + Strategy: "Plain" + Plans: + - Node Type: "Custom Scan" + Parent Relationship: "Outer" + Custom Plan Provider: "Citus Task-Tracker" + Distributed Query: + Job: + Task Count: 1 + Tasks Shown: "None, not supported for re-partition queries" + Depended Jobs: + - Map Task Count: 1 + Merge Task Count: 1 + Depended Jobs: + - Map Task Count: 8 + Merge Task Count: 1 +-- test parallel aggregates +SET parallel_setup_cost=0; +ERROR: unrecognized configuration parameter "parallel_setup_cost" +SET parallel_tuple_cost=0; +ERROR: unrecognized configuration parameter "parallel_tuple_cost" +SET min_parallel_relation_size=0; +ERROR: unrecognized configuration parameter "min_parallel_relation_size" +SET max_parallel_workers_per_gather=4; +ERROR: unrecognized configuration parameter "max_parallel_workers_per_gather" +-- ensure local plans display correctly +CREATE TABLE lineitem_clone (LIKE lineitem); +EXPLAIN (COSTS FALSE) SELECT avg(l_linenumber) FROM lineitem_clone; +Aggregate + -> Seq Scan on lineitem_clone +-- ensure distributed plans don't break +EXPLAIN (COSTS FALSE) SELECT avg(l_linenumber) FROM lineitem; +Aggregate + -> Custom Scan (Citus Task-Tracker) + Task Count: 8 + Tasks Shown: One of 8 + -> Task + Node: host=localhost port=57637 dbname=regression + -> Aggregate + -> Seq Scan on lineitem_290001 lineitem +-- ensure EXPLAIN EXECUTE doesn't crash +PREPARE task_tracker_query AS + SELECT avg(l_linenumber) FROM lineitem WHERE l_orderkey > 9030; +EXPLAIN (COSTS FALSE) EXECUTE task_tracker_query; +Aggregate + -> Custom Scan (Citus Task-Tracker) + Task Count: 4 + Tasks Shown: One of 4 + -> Task + Node: host=localhost port=57637 dbname=regression + -> Aggregate + -> Seq Scan on lineitem_290005 lineitem + Filter: (l_orderkey > 9030) +SET citus.task_executor_type TO 'real-time'; +PREPARE router_executor_query AS SELECT l_quantity FROM lineitem WHERE l_orderkey = 5; +EXPLAIN EXECUTE router_executor_query; +Custom Scan (Citus Router) (cost=0.00..0.00 rows=0 width=0) + Task Count: 1 + Tasks Shown: All + -> Task + Node: host=localhost port=57637 dbname=regression + -> Index Scan using lineitem_pkey_290000 on lineitem_290000 lineitem (cost=0.28..11.83 rows=3 width=5) + Index Cond: (l_orderkey = 5) +PREPARE real_time_executor_query AS + SELECT avg(l_linenumber) FROM lineitem WHERE l_orderkey > 9030; +EXPLAIN (COSTS FALSE) EXECUTE real_time_executor_query; +Aggregate + -> Custom Scan (Citus Real-Time) + Task Count: 4 + Tasks Shown: One of 4 + -> Task + Node: host=localhost port=57637 dbname=regression + -> Aggregate + -> Seq Scan on lineitem_290005 lineitem + Filter: (l_orderkey > 9030) +-- EXPLAIN EXECUTE of parametrized prepared statements is broken, but +-- at least make sure to fail without crashing +PREPARE router_executor_query_param(int) AS SELECT l_quantity FROM lineitem WHERE l_orderkey = $1; +EXPLAIN EXECUTE router_executor_query_param(5); +Custom Scan (Citus Router) (cost=0.00..0.00 rows=0 width=0) + Task Count: 1 + Tasks Shown: All + -> Task + Node: host=localhost port=57637 dbname=regression + -> Index Scan using lineitem_pkey_290000 on lineitem_290000 lineitem (cost=0.28..11.83 rows=3 width=5) + Index Cond: (l_orderkey = 5) +-- test explain in a transaction with alter table to test we use right connections +BEGIN; +CREATE TABLE explain_table(id int); +SELECT create_distributed_table('explain_table', 'id'); + +ALTER TABLE explain_table ADD COLUMN value int; +NOTICE: using one-phase commit for distributed DDL commands +HINT: You can enable two-phase commit for extra safety with: SET citus.multi_shard_commit_protocol TO '2pc' +EXPLAIN (COSTS FALSE) SELECT value FROM explain_table WHERE id = 1; +Custom Scan (Citus Router) + Task Count: 1 + Tasks Shown: All + -> Task + Node: host=localhost port=57637 dbname=regression + -> Seq Scan on explain_table_570001 explain_table + Filter: (id = 1) +ROLLBACK; diff --git a/src/test/regress/expected/multi_large_table_join_planning_0.out b/src/test/regress/expected/multi_large_table_join_planning_0.out new file mode 100644 index 000000000..2237b8014 --- /dev/null +++ b/src/test/regress/expected/multi_large_table_join_planning_0.out @@ -0,0 +1,246 @@ +-- +-- MULTI_LARGE_TABLE_PLANNING +-- +-- Tests that cover large table join planning. Note that we explicitly start a +-- transaction block here so that we don't emit debug messages with changing +-- transaction ids in them. Also, we set the executor type to task tracker +-- executor here, as we cannot run repartition jobs with real time executor. +ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 690000; +SET citus.enable_unique_job_ids TO off; +BEGIN; +SET client_min_messages TO DEBUG4; +DEBUG: CommitTransactionCommand +SET citus.large_table_shard_count TO 2; +DEBUG: StartTransactionCommand +DEBUG: ProcessUtility +DEBUG: CommitTransactionCommand +SET citus.task_executor_type TO 'task-tracker'; +DEBUG: StartTransactionCommand +DEBUG: ProcessUtility +DEBUG: CommitTransactionCommand +-- Debug4 log messages display jobIds within them. We explicitly set the jobId +-- sequence here so that the regression output becomes independent of the number +-- of jobs executed prior to running this test. +-- Multi-level repartition join to verify our projection columns are correctly +-- referenced and propagated across multiple repartition jobs. The test also +-- validates that only the minimal necessary projection columns are transferred +-- between jobs. +SELECT + l_partkey, o_orderkey, count(*) +FROM + lineitem, part, orders, customer +WHERE + l_orderkey = o_orderkey AND + l_partkey = p_partkey AND + c_custkey = o_custkey AND + (l_quantity > 5.0 OR l_extendedprice > 1200.0) AND + p_size > 8 AND o_totalprice > 10.0 AND + c_acctbal < 5000.0 AND l_partkey < 1000 +GROUP BY + l_partkey, o_orderkey +ORDER BY + l_partkey, o_orderkey; +DEBUG: StartTransactionCommand +DEBUG: join prunable for intervals [1,1509] and [8997,14946] +DEBUG: join prunable for intervals [1509,4964] and [8997,14946] +DEBUG: join prunable for intervals [2951,4455] and [8997,14946] +DEBUG: join prunable for intervals [4480,5986] and [8997,14946] +DEBUG: join prunable for intervals [8997,10560] and [1,5986] +DEBUG: join prunable for intervals [10560,12036] and [1,5986] +DEBUG: join prunable for intervals [12036,13473] and [1,5986] +DEBUG: join prunable for intervals [13473,14947] and [1,5986] +DEBUG: generated sql query for task 3 +DETAIL: query string: "SELECT lineitem.l_partkey, orders.o_orderkey, lineitem.l_quantity, lineitem.l_extendedprice, orders.o_custkey FROM (lineitem_290000 lineitem JOIN orders_290008 orders ON ((lineitem.l_orderkey = orders.o_orderkey))) WHERE ((lineitem.l_partkey < 1000) AND (orders.o_totalprice > 10.0))" +DEBUG: generated sql query for task 6 +DETAIL: query string: "SELECT lineitem.l_partkey, orders.o_orderkey, lineitem.l_quantity, lineitem.l_extendedprice, orders.o_custkey FROM (lineitem_290001 lineitem JOIN orders_290008 orders ON ((lineitem.l_orderkey = orders.o_orderkey))) WHERE ((lineitem.l_partkey < 1000) AND (orders.o_totalprice > 10.0))" +DEBUG: generated sql query for task 9 +DETAIL: query string: "SELECT lineitem.l_partkey, orders.o_orderkey, lineitem.l_quantity, lineitem.l_extendedprice, orders.o_custkey FROM (lineitem_290002 lineitem JOIN orders_290008 orders ON ((lineitem.l_orderkey = orders.o_orderkey))) WHERE ((lineitem.l_partkey < 1000) AND (orders.o_totalprice > 10.0))" +DEBUG: generated sql query for task 12 +DETAIL: query string: "SELECT lineitem.l_partkey, orders.o_orderkey, lineitem.l_quantity, lineitem.l_extendedprice, orders.o_custkey FROM (lineitem_290003 lineitem JOIN orders_290008 orders ON ((lineitem.l_orderkey = orders.o_orderkey))) WHERE ((lineitem.l_partkey < 1000) AND (orders.o_totalprice > 10.0))" +DEBUG: generated sql query for task 15 +DETAIL: query string: "SELECT lineitem.l_partkey, orders.o_orderkey, lineitem.l_quantity, lineitem.l_extendedprice, orders.o_custkey FROM (lineitem_290004 lineitem JOIN orders_290009 orders ON ((lineitem.l_orderkey = orders.o_orderkey))) WHERE ((lineitem.l_partkey < 1000) AND (orders.o_totalprice > 10.0))" +DEBUG: generated sql query for task 18 +DETAIL: query string: "SELECT lineitem.l_partkey, orders.o_orderkey, lineitem.l_quantity, lineitem.l_extendedprice, orders.o_custkey FROM (lineitem_290005 lineitem JOIN orders_290009 orders ON ((lineitem.l_orderkey = orders.o_orderkey))) WHERE ((lineitem.l_partkey < 1000) AND (orders.o_totalprice > 10.0))" +DEBUG: generated sql query for task 21 +DETAIL: query string: "SELECT lineitem.l_partkey, orders.o_orderkey, lineitem.l_quantity, lineitem.l_extendedprice, orders.o_custkey FROM (lineitem_290006 lineitem JOIN orders_290009 orders ON ((lineitem.l_orderkey = orders.o_orderkey))) WHERE ((lineitem.l_partkey < 1000) AND (orders.o_totalprice > 10.0))" +DEBUG: generated sql query for task 24 +DETAIL: query string: "SELECT lineitem.l_partkey, orders.o_orderkey, lineitem.l_quantity, lineitem.l_extendedprice, orders.o_custkey FROM (lineitem_290007 lineitem JOIN orders_290009 orders ON ((lineitem.l_orderkey = orders.o_orderkey))) WHERE ((lineitem.l_partkey < 1000) AND (orders.o_totalprice > 10.0))" +DEBUG: assigned task 6 to node localhost:57637 +DEBUG: assigned task 3 to node localhost:57638 +DEBUG: assigned task 12 to node localhost:57637 +DEBUG: assigned task 9 to node localhost:57638 +DEBUG: assigned task 18 to node localhost:57637 +DEBUG: assigned task 15 to node localhost:57638 +DEBUG: assigned task 24 to node localhost:57637 +DEBUG: assigned task 21 to node localhost:57638 +DEBUG: join prunable for intervals [1,1000] and [6001,7000] +DEBUG: join prunable for intervals [6001,7000] and [1,1000] +DEBUG: generated sql query for task 3 +DETAIL: query string: "SELECT "pg_merge_job_0001.task_000025".intermediate_column_1_0, "pg_merge_job_0001.task_000025".intermediate_column_1_1, "pg_merge_job_0001.task_000025".intermediate_column_1_2, "pg_merge_job_0001.task_000025".intermediate_column_1_3, "pg_merge_job_0001.task_000025".intermediate_column_1_4 FROM (pg_merge_job_0001.task_000025 "pg_merge_job_0001.task_000025" JOIN part_290011 part ON (("pg_merge_job_0001.task_000025".intermediate_column_1_0 = part.p_partkey))) WHERE (part.p_size > 8)" +DEBUG: generated sql query for task 6 +DETAIL: query string: "SELECT "pg_merge_job_0001.task_000034".intermediate_column_1_0, "pg_merge_job_0001.task_000034".intermediate_column_1_1, "pg_merge_job_0001.task_000034".intermediate_column_1_2, "pg_merge_job_0001.task_000034".intermediate_column_1_3, "pg_merge_job_0001.task_000034".intermediate_column_1_4 FROM (pg_merge_job_0001.task_000034 "pg_merge_job_0001.task_000034" JOIN part_280002 part ON (("pg_merge_job_0001.task_000034".intermediate_column_1_0 = part.p_partkey))) WHERE (part.p_size > 8)" +DEBUG: pruning merge fetch taskId 1 +DETAIL: Creating dependency on merge taskId 25 +DEBUG: pruning merge fetch taskId 4 +DETAIL: Creating dependency on merge taskId 34 +DEBUG: assigned task 3 to node localhost:57637 +DEBUG: assigned task 6 to node localhost:57638 +DEBUG: join prunable for intervals [1,1000] and [1001,2000] +DEBUG: join prunable for intervals [1,1000] and [6001,7000] +DEBUG: join prunable for intervals [1001,2000] and [1,1000] +DEBUG: join prunable for intervals [1001,2000] and [6001,7000] +DEBUG: join prunable for intervals [6001,7000] and [1,1000] +DEBUG: join prunable for intervals [6001,7000] and [1001,2000] +DEBUG: generated sql query for task 3 +DETAIL: query string: "SELECT "pg_merge_job_0002.task_000007".intermediate_column_2_0 AS l_partkey, "pg_merge_job_0002.task_000007".intermediate_column_2_1 AS o_orderkey, count(*) AS count FROM (pg_merge_job_0002.task_000007 "pg_merge_job_0002.task_000007" JOIN customer_290010 customer ON ((customer.c_custkey = "pg_merge_job_0002.task_000007".intermediate_column_2_4))) WHERE ((("pg_merge_job_0002.task_000007".intermediate_column_2_2 > 5.0) OR ("pg_merge_job_0002.task_000007".intermediate_column_2_3 > 1200.0)) AND (customer.c_acctbal < 5000.0)) GROUP BY "pg_merge_job_0002.task_000007".intermediate_column_2_0, "pg_merge_job_0002.task_000007".intermediate_column_2_1" +DEBUG: generated sql query for task 6 +DETAIL: query string: "SELECT "pg_merge_job_0002.task_000010".intermediate_column_2_0 AS l_partkey, "pg_merge_job_0002.task_000010".intermediate_column_2_1 AS o_orderkey, count(*) AS count FROM (pg_merge_job_0002.task_000010 "pg_merge_job_0002.task_000010" JOIN customer_280001 customer ON ((customer.c_custkey = "pg_merge_job_0002.task_000010".intermediate_column_2_4))) WHERE ((("pg_merge_job_0002.task_000010".intermediate_column_2_2 > 5.0) OR ("pg_merge_job_0002.task_000010".intermediate_column_2_3 > 1200.0)) AND (customer.c_acctbal < 5000.0)) GROUP BY "pg_merge_job_0002.task_000010".intermediate_column_2_0, "pg_merge_job_0002.task_000010".intermediate_column_2_1" +DEBUG: generated sql query for task 9 +DETAIL: query string: "SELECT "pg_merge_job_0002.task_000013".intermediate_column_2_0 AS l_partkey, "pg_merge_job_0002.task_000013".intermediate_column_2_1 AS o_orderkey, count(*) AS count FROM (pg_merge_job_0002.task_000013 "pg_merge_job_0002.task_000013" JOIN customer_280000 customer ON ((customer.c_custkey = "pg_merge_job_0002.task_000013".intermediate_column_2_4))) WHERE ((("pg_merge_job_0002.task_000013".intermediate_column_2_2 > 5.0) OR ("pg_merge_job_0002.task_000013".intermediate_column_2_3 > 1200.0)) AND (customer.c_acctbal < 5000.0)) GROUP BY "pg_merge_job_0002.task_000013".intermediate_column_2_0, "pg_merge_job_0002.task_000013".intermediate_column_2_1" +DEBUG: pruning merge fetch taskId 1 +DETAIL: Creating dependency on merge taskId 7 +DEBUG: pruning merge fetch taskId 4 +DETAIL: Creating dependency on merge taskId 10 +DEBUG: pruning merge fetch taskId 7 +DETAIL: Creating dependency on merge taskId 13 +DEBUG: assigned task 6 to node localhost:57637 +DEBUG: assigned task 9 to node localhost:57638 +DEBUG: assigned task 3 to node localhost:57637 +DEBUG: completed cleanup query for job 3 +DEBUG: completed cleanup query for job 3 +DEBUG: completed cleanup query for job 2 +DEBUG: completed cleanup query for job 2 +DEBUG: completed cleanup query for job 1 +DEBUG: completed cleanup query for job 1 +DEBUG: CommitTransactionCommand + l_partkey | o_orderkey | count +-----------+------------+------- + 18 | 12005 | 1 + 79 | 5121 | 1 + 91 | 2883 | 1 + 222 | 9413 | 1 + 278 | 1287 | 1 + 309 | 2374 | 1 + 318 | 321 | 1 + 321 | 5984 | 1 + 337 | 10403 | 1 + 350 | 13698 | 1 + 358 | 4323 | 1 + 364 | 9347 | 1 + 416 | 640 | 1 + 426 | 10855 | 1 + 450 | 35 | 1 + 484 | 3843 | 1 + 504 | 14566 | 1 + 510 | 13569 | 1 + 532 | 3175 | 1 + 641 | 134 | 1 + 669 | 10944 | 1 + 716 | 2885 | 1 + 738 | 4355 | 1 + 802 | 2534 | 1 + 824 | 9287 | 1 + 864 | 3175 | 1 + 957 | 4293 | 1 + 960 | 10980 | 1 + 963 | 4580 | 1 +(29 rows) + +SELECT + l_partkey, o_orderkey, count(*) +FROM + lineitem, orders +WHERE + l_suppkey = o_shippriority AND + l_quantity < 5.0 AND o_totalprice <> 4.0 +GROUP BY + l_partkey, o_orderkey +ORDER BY + l_partkey, o_orderkey; +DEBUG: StartTransactionCommand +DEBUG: generated sql query for task 2 +DETAIL: query string: "SELECT l_partkey, l_suppkey FROM lineitem_290000 lineitem WHERE (l_quantity < 5.0)" +DEBUG: generated sql query for task 4 +DETAIL: query string: "SELECT l_partkey, l_suppkey FROM lineitem_290001 lineitem WHERE (l_quantity < 5.0)" +DEBUG: generated sql query for task 6 +DETAIL: query string: "SELECT l_partkey, l_suppkey FROM lineitem_290002 lineitem WHERE (l_quantity < 5.0)" +DEBUG: generated sql query for task 8 +DETAIL: query string: "SELECT l_partkey, l_suppkey FROM lineitem_290003 lineitem WHERE (l_quantity < 5.0)" +DEBUG: generated sql query for task 10 +DETAIL: query string: "SELECT l_partkey, l_suppkey FROM lineitem_290004 lineitem WHERE (l_quantity < 5.0)" +DEBUG: generated sql query for task 12 +DETAIL: query string: "SELECT l_partkey, l_suppkey FROM lineitem_290005 lineitem WHERE (l_quantity < 5.0)" +DEBUG: generated sql query for task 14 +DETAIL: query string: "SELECT l_partkey, l_suppkey FROM lineitem_290006 lineitem WHERE (l_quantity < 5.0)" +DEBUG: generated sql query for task 16 +DETAIL: query string: "SELECT l_partkey, l_suppkey FROM lineitem_290007 lineitem WHERE (l_quantity < 5.0)" +DEBUG: assigned task 4 to node localhost:57637 +DEBUG: assigned task 2 to node localhost:57638 +DEBUG: assigned task 8 to node localhost:57637 +DEBUG: assigned task 6 to node localhost:57638 +DEBUG: assigned task 12 to node localhost:57637 +DEBUG: assigned task 10 to node localhost:57638 +DEBUG: assigned task 16 to node localhost:57637 +DEBUG: assigned task 14 to node localhost:57638 +DEBUG: generated sql query for task 2 +DETAIL: query string: "SELECT o_orderkey, o_shippriority FROM orders_290008 orders WHERE (o_totalprice <> 4.0)" +DEBUG: generated sql query for task 4 +DETAIL: query string: "SELECT o_orderkey, o_shippriority FROM orders_290009 orders WHERE (o_totalprice <> 4.0)" +DEBUG: assigned task 4 to node localhost:57637 +DEBUG: assigned task 2 to node localhost:57638 +DEBUG: join prunable for task partitionId 0 and 1 +DEBUG: join prunable for task partitionId 0 and 2 +DEBUG: join prunable for task partitionId 0 and 3 +DEBUG: join prunable for task partitionId 1 and 0 +DEBUG: join prunable for task partitionId 1 and 2 +DEBUG: join prunable for task partitionId 1 and 3 +DEBUG: join prunable for task partitionId 2 and 0 +DEBUG: join prunable for task partitionId 2 and 1 +DEBUG: join prunable for task partitionId 2 and 3 +DEBUG: join prunable for task partitionId 3 and 0 +DEBUG: join prunable for task partitionId 3 and 1 +DEBUG: join prunable for task partitionId 3 and 2 +DEBUG: generated sql query for task 3 +DETAIL: query string: "SELECT "pg_merge_job_0004.task_000017".intermediate_column_4_0 AS l_partkey, "pg_merge_job_0005.task_000005".intermediate_column_5_0 AS o_orderkey, count(*) AS count FROM (pg_merge_job_0004.task_000017 "pg_merge_job_0004.task_000017" JOIN pg_merge_job_0005.task_000005 "pg_merge_job_0005.task_000005" ON (("pg_merge_job_0004.task_000017".intermediate_column_4_1 = "pg_merge_job_0005.task_000005".intermediate_column_5_1))) WHERE true GROUP BY "pg_merge_job_0004.task_000017".intermediate_column_4_0, "pg_merge_job_0005.task_000005".intermediate_column_5_0" +DEBUG: generated sql query for task 6 +DETAIL: query string: "SELECT "pg_merge_job_0004.task_000026".intermediate_column_4_0 AS l_partkey, "pg_merge_job_0005.task_000008".intermediate_column_5_0 AS o_orderkey, count(*) AS count FROM (pg_merge_job_0004.task_000026 "pg_merge_job_0004.task_000026" JOIN pg_merge_job_0005.task_000008 "pg_merge_job_0005.task_000008" ON (("pg_merge_job_0004.task_000026".intermediate_column_4_1 = "pg_merge_job_0005.task_000008".intermediate_column_5_1))) WHERE true GROUP BY "pg_merge_job_0004.task_000026".intermediate_column_4_0, "pg_merge_job_0005.task_000008".intermediate_column_5_0" +DEBUG: generated sql query for task 9 +DETAIL: query string: "SELECT "pg_merge_job_0004.task_000035".intermediate_column_4_0 AS l_partkey, "pg_merge_job_0005.task_000011".intermediate_column_5_0 AS o_orderkey, count(*) AS count FROM (pg_merge_job_0004.task_000035 "pg_merge_job_0004.task_000035" JOIN pg_merge_job_0005.task_000011 "pg_merge_job_0005.task_000011" ON (("pg_merge_job_0004.task_000035".intermediate_column_4_1 = "pg_merge_job_0005.task_000011".intermediate_column_5_1))) WHERE true GROUP BY "pg_merge_job_0004.task_000035".intermediate_column_4_0, "pg_merge_job_0005.task_000011".intermediate_column_5_0" +DEBUG: generated sql query for task 12 +DETAIL: query string: "SELECT "pg_merge_job_0004.task_000044".intermediate_column_4_0 AS l_partkey, "pg_merge_job_0005.task_000014".intermediate_column_5_0 AS o_orderkey, count(*) AS count FROM (pg_merge_job_0004.task_000044 "pg_merge_job_0004.task_000044" JOIN pg_merge_job_0005.task_000014 "pg_merge_job_0005.task_000014" ON (("pg_merge_job_0004.task_000044".intermediate_column_4_1 = "pg_merge_job_0005.task_000014".intermediate_column_5_1))) WHERE true GROUP BY "pg_merge_job_0004.task_000044".intermediate_column_4_0, "pg_merge_job_0005.task_000014".intermediate_column_5_0" +DEBUG: pruning merge fetch taskId 1 +DETAIL: Creating dependency on merge taskId 17 +DEBUG: pruning merge fetch taskId 2 +DETAIL: Creating dependency on merge taskId 5 +DEBUG: pruning merge fetch taskId 4 +DETAIL: Creating dependency on merge taskId 26 +DEBUG: pruning merge fetch taskId 5 +DETAIL: Creating dependency on merge taskId 8 +DEBUG: pruning merge fetch taskId 7 +DETAIL: Creating dependency on merge taskId 35 +DEBUG: pruning merge fetch taskId 8 +DETAIL: Creating dependency on merge taskId 11 +DEBUG: pruning merge fetch taskId 10 +DETAIL: Creating dependency on merge taskId 44 +DEBUG: pruning merge fetch taskId 11 +DETAIL: Creating dependency on merge taskId 14 +DEBUG: assigned task 3 to node localhost:57637 +DEBUG: assigned task 6 to node localhost:57638 +DEBUG: assigned task 9 to node localhost:57637 +DEBUG: assigned task 12 to node localhost:57638 +DEBUG: completed cleanup query for job 6 +DEBUG: completed cleanup query for job 6 +DEBUG: completed cleanup query for job 4 +DEBUG: completed cleanup query for job 4 +DEBUG: completed cleanup query for job 5 +DEBUG: completed cleanup query for job 5 +DEBUG: CommitTransactionCommand + l_partkey | o_orderkey | count +-----------+------------+------- +(0 rows) + +-- Reset client logging level to its previous value +SET client_min_messages TO NOTICE; +DEBUG: StartTransactionCommand +DEBUG: ProcessUtility +COMMIT; diff --git a/src/test/regress/expected/multi_large_table_task_assignment_0.out b/src/test/regress/expected/multi_large_table_task_assignment_0.out new file mode 100644 index 000000000..49c75fb6f --- /dev/null +++ b/src/test/regress/expected/multi_large_table_task_assignment_0.out @@ -0,0 +1,250 @@ +-- +-- MULTI_LARGE_TABLE_TASK_ASSIGNMENT +-- +-- Tests which cover task assignment for MapMerge jobs for single range repartition +-- and dual hash repartition joins. The tests also cover task assignment propagation +-- from a sql task to its depended tasks. Note that we set the executor type to task +-- tracker executor here, as we cannot run repartition jobs with real time executor. +ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 710000; +BEGIN; +SET client_min_messages TO DEBUG3; +DEBUG: CommitTransactionCommand +SET citus.large_table_shard_count TO 2; +DEBUG: StartTransactionCommand +DEBUG: ProcessUtility +DEBUG: CommitTransactionCommand +SET citus.task_executor_type TO 'task-tracker'; +DEBUG: StartTransactionCommand +DEBUG: ProcessUtility +DEBUG: CommitTransactionCommand +-- Single range repartition join to test anchor-shard based task assignment and +-- assignment propagation to merge and data-fetch tasks. +SELECT + count(*) +FROM + orders, customer +WHERE + o_custkey = c_custkey; +DEBUG: StartTransactionCommand +DEBUG: assigned task 4 to node localhost:57637 +DEBUG: assigned task 2 to node localhost:57638 +DEBUG: join prunable for intervals [1,1000] and [1001,2000] +DEBUG: join prunable for intervals [1,1000] and [6001,7000] +DEBUG: join prunable for intervals [1001,2000] and [1,1000] +DEBUG: join prunable for intervals [1001,2000] and [6001,7000] +DEBUG: join prunable for intervals [6001,7000] and [1,1000] +DEBUG: join prunable for intervals [6001,7000] and [1001,2000] +DEBUG: pruning merge fetch taskId 1 +DETAIL: Creating dependency on merge taskId 5 +DEBUG: pruning merge fetch taskId 4 +DETAIL: Creating dependency on merge taskId 8 +DEBUG: pruning merge fetch taskId 7 +DETAIL: Creating dependency on merge taskId 11 +DEBUG: assigned task 6 to node localhost:57637 +DEBUG: assigned task 9 to node localhost:57638 +DEBUG: assigned task 3 to node localhost:57637 +DEBUG: CommitTransactionCommand + count +------- + 2984 +(1 row) + +-- Single range repartition join, along with a join with a small table containing +-- more than one shard. This situation results in multiple sql tasks depending on +-- the same merge task, and tests our constraint group creation and assignment +-- propagation. Here 'orders' is considered the small table. +SET citus.large_table_shard_count TO 3; +DEBUG: StartTransactionCommand +DEBUG: ProcessUtility +DEBUG: CommitTransactionCommand +SELECT + count(*) +FROM + orders, customer, lineitem +WHERE + o_custkey = c_custkey AND + o_orderkey = l_orderkey; +DEBUG: StartTransactionCommand +DEBUG: assigned task 9 to node localhost:57637 +DEBUG: assigned task 15 to node localhost:57638 +DEBUG: assigned task 12 to node localhost:57637 +DEBUG: assigned task 18 to node localhost:57638 +DEBUG: assigned task 3 to node localhost:57637 +DEBUG: assigned task 6 to node localhost:57638 +DEBUG: join prunable for intervals [1,1509] and [2951,4455] +DEBUG: join prunable for intervals [1,1509] and [4480,5986] +DEBUG: join prunable for intervals [1,1509] and [8997,10560] +DEBUG: join prunable for intervals [1,1509] and [10560,12036] +DEBUG: join prunable for intervals [1,1509] and [12036,13473] +DEBUG: join prunable for intervals [1,1509] and [13473,14947] +DEBUG: join prunable for intervals [1509,4964] and [8997,10560] +DEBUG: join prunable for intervals [1509,4964] and [10560,12036] +DEBUG: join prunable for intervals [1509,4964] and [12036,13473] +DEBUG: join prunable for intervals [1509,4964] and [13473,14947] +DEBUG: join prunable for intervals [2951,4455] and [1,1509] +DEBUG: join prunable for intervals [2951,4455] and [4480,5986] +DEBUG: join prunable for intervals [2951,4455] and [8997,10560] +DEBUG: join prunable for intervals [2951,4455] and [10560,12036] +DEBUG: join prunable for intervals [2951,4455] and [12036,13473] +DEBUG: join prunable for intervals [2951,4455] and [13473,14947] +DEBUG: join prunable for intervals [4480,5986] and [1,1509] +DEBUG: join prunable for intervals [4480,5986] and [2951,4455] +DEBUG: join prunable for intervals [4480,5986] and [8997,10560] +DEBUG: join prunable for intervals [4480,5986] and [10560,12036] +DEBUG: join prunable for intervals [4480,5986] and [12036,13473] +DEBUG: join prunable for intervals [4480,5986] and [13473,14947] +DEBUG: join prunable for intervals [8997,10560] and [1,1509] +DEBUG: join prunable for intervals [8997,10560] and [1509,4964] +DEBUG: join prunable for intervals [8997,10560] and [2951,4455] +DEBUG: join prunable for intervals [8997,10560] and [4480,5986] +DEBUG: join prunable for intervals [8997,10560] and [12036,13473] +DEBUG: join prunable for intervals [8997,10560] and [13473,14947] +DEBUG: join prunable for intervals [10560,12036] and [1,1509] +DEBUG: join prunable for intervals [10560,12036] and [1509,4964] +DEBUG: join prunable for intervals [10560,12036] and [2951,4455] +DEBUG: join prunable for intervals [10560,12036] and [4480,5986] +DEBUG: join prunable for intervals [10560,12036] and [13473,14947] +DEBUG: join prunable for intervals [12036,13473] and [1,1509] +DEBUG: join prunable for intervals [12036,13473] and [1509,4964] +DEBUG: join prunable for intervals [12036,13473] and [2951,4455] +DEBUG: join prunable for intervals [12036,13473] and [4480,5986] +DEBUG: join prunable for intervals [12036,13473] and [8997,10560] +DEBUG: join prunable for intervals [13473,14947] and [1,1509] +DEBUG: join prunable for intervals [13473,14947] and [1509,4964] +DEBUG: join prunable for intervals [13473,14947] and [2951,4455] +DEBUG: join prunable for intervals [13473,14947] and [4480,5986] +DEBUG: join prunable for intervals [13473,14947] and [8997,10560] +DEBUG: join prunable for intervals [13473,14947] and [10560,12036] +DEBUG: pruning merge fetch taskId 1 +DETAIL: Creating dependency on merge taskId 19 +DEBUG: pruning merge fetch taskId 4 +DETAIL: Creating dependency on merge taskId 19 +DEBUG: pruning merge fetch taskId 7 +DETAIL: Creating dependency on merge taskId 26 +DEBUG: pruning merge fetch taskId 10 +DETAIL: Creating dependency on merge taskId 26 +DEBUG: pruning merge fetch taskId 13 +DETAIL: Creating dependency on merge taskId 26 +DEBUG: pruning merge fetch taskId 16 +DETAIL: Creating dependency on merge taskId 26 +DEBUG: pruning merge fetch taskId 19 +DETAIL: Creating dependency on merge taskId 33 +DEBUG: pruning merge fetch taskId 22 +DETAIL: Creating dependency on merge taskId 33 +DEBUG: pruning merge fetch taskId 25 +DETAIL: Creating dependency on merge taskId 40 +DEBUG: pruning merge fetch taskId 28 +DETAIL: Creating dependency on merge taskId 40 +DEBUG: pruning merge fetch taskId 31 +DETAIL: Creating dependency on merge taskId 47 +DEBUG: pruning merge fetch taskId 34 +DETAIL: Creating dependency on merge taskId 47 +DEBUG: pruning merge fetch taskId 37 +DETAIL: Creating dependency on merge taskId 54 +DEBUG: pruning merge fetch taskId 40 +DETAIL: Creating dependency on merge taskId 54 +DEBUG: pruning merge fetch taskId 43 +DETAIL: Creating dependency on merge taskId 54 +DEBUG: pruning merge fetch taskId 46 +DETAIL: Creating dependency on merge taskId 61 +DEBUG: pruning merge fetch taskId 49 +DETAIL: Creating dependency on merge taskId 61 +DEBUG: pruning merge fetch taskId 52 +DETAIL: Creating dependency on merge taskId 61 +DEBUG: pruning merge fetch taskId 55 +DETAIL: Creating dependency on merge taskId 68 +DEBUG: pruning merge fetch taskId 58 +DETAIL: Creating dependency on merge taskId 68 +DEBUG: assigned task 21 to node localhost:57637 +DEBUG: assigned task 3 to node localhost:57638 +DEBUG: assigned task 27 to node localhost:57637 +DEBUG: assigned task 9 to node localhost:57638 +DEBUG: assigned task 48 to node localhost:57637 +DEBUG: assigned task 33 to node localhost:57638 +DEBUG: assigned task 39 to node localhost:57637 +DEBUG: assigned task 57 to node localhost:57638 +DEBUG: propagating assignment from merge task 19 to constrained sql task 6 +DEBUG: propagating assignment from merge task 26 to constrained sql task 12 +DEBUG: propagating assignment from merge task 26 to constrained sql task 15 +DEBUG: propagating assignment from merge task 26 to constrained sql task 18 +DEBUG: propagating assignment from merge task 33 to constrained sql task 24 +DEBUG: propagating assignment from merge task 40 to constrained sql task 30 +DEBUG: propagating assignment from merge task 47 to constrained sql task 36 +DEBUG: propagating assignment from merge task 54 to constrained sql task 42 +DEBUG: propagating assignment from merge task 54 to constrained sql task 45 +DEBUG: propagating assignment from merge task 61 to constrained sql task 51 +DEBUG: propagating assignment from merge task 61 to constrained sql task 54 +DEBUG: propagating assignment from merge task 68 to constrained sql task 60 +DEBUG: CommitTransactionCommand + count +------- + 11998 +(1 row) + +SET citus.large_table_shard_count TO 2; +DEBUG: StartTransactionCommand +DEBUG: ProcessUtility +DEBUG: CommitTransactionCommand +-- Dual hash repartition join which tests the separate hash repartition join +-- task assignment algorithm. +SELECT + count(*) +FROM + lineitem, customer +WHERE + l_partkey = c_nationkey; +DEBUG: StartTransactionCommand +DEBUG: assigned task 4 to node localhost:57637 +DEBUG: assigned task 2 to node localhost:57638 +DEBUG: assigned task 8 to node localhost:57637 +DEBUG: assigned task 6 to node localhost:57638 +DEBUG: assigned task 12 to node localhost:57637 +DEBUG: assigned task 10 to node localhost:57638 +DEBUG: assigned task 16 to node localhost:57637 +DEBUG: assigned task 14 to node localhost:57638 +DEBUG: assigned task 4 to node localhost:57637 +DEBUG: assigned task 6 to node localhost:57638 +DEBUG: assigned task 2 to node localhost:57637 +DEBUG: join prunable for task partitionId 0 and 1 +DEBUG: join prunable for task partitionId 0 and 2 +DEBUG: join prunable for task partitionId 0 and 3 +DEBUG: join prunable for task partitionId 1 and 0 +DEBUG: join prunable for task partitionId 1 and 2 +DEBUG: join prunable for task partitionId 1 and 3 +DEBUG: join prunable for task partitionId 2 and 0 +DEBUG: join prunable for task partitionId 2 and 1 +DEBUG: join prunable for task partitionId 2 and 3 +DEBUG: join prunable for task partitionId 3 and 0 +DEBUG: join prunable for task partitionId 3 and 1 +DEBUG: join prunable for task partitionId 3 and 2 +DEBUG: pruning merge fetch taskId 1 +DETAIL: Creating dependency on merge taskId 17 +DEBUG: pruning merge fetch taskId 2 +DETAIL: Creating dependency on merge taskId 7 +DEBUG: pruning merge fetch taskId 4 +DETAIL: Creating dependency on merge taskId 26 +DEBUG: pruning merge fetch taskId 5 +DETAIL: Creating dependency on merge taskId 11 +DEBUG: pruning merge fetch taskId 7 +DETAIL: Creating dependency on merge taskId 35 +DEBUG: pruning merge fetch taskId 8 +DETAIL: Creating dependency on merge taskId 15 +DEBUG: pruning merge fetch taskId 10 +DETAIL: Creating dependency on merge taskId 44 +DEBUG: pruning merge fetch taskId 11 +DETAIL: Creating dependency on merge taskId 19 +DEBUG: assigned task 3 to node localhost:57638 +DEBUG: assigned task 6 to node localhost:57637 +DEBUG: assigned task 9 to node localhost:57638 +DEBUG: assigned task 12 to node localhost:57637 +DEBUG: CommitTransactionCommand + count +------- + 125 +(1 row) + +-- Reset client logging level to its previous value +SET client_min_messages TO NOTICE; +DEBUG: StartTransactionCommand +DEBUG: ProcessUtility +COMMIT; diff --git a/src/test/regress/expected/multi_null_minmax_value_pruning_0.out b/src/test/regress/expected/multi_null_minmax_value_pruning_0.out new file mode 100644 index 000000000..fd5491287 --- /dev/null +++ b/src/test/regress/expected/multi_null_minmax_value_pruning_0.out @@ -0,0 +1,446 @@ +-- +-- MULTI_NULL_MINMAX_VALUE_PRUNING +-- +-- This test checks that we can handle null min/max values in shard statistics +-- and that we don't partition or join prune shards that have null values. +ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 760000; +SET client_min_messages TO DEBUG2; +SET citus.explain_all_tasks TO on; +-- to avoid differing explain output - executor doesn't matter, +-- because were testing pruning here. +SET citus.task_executor_type TO 'real-time'; +-- Change configuration to treat lineitem and orders tables as large +SET citus.large_table_shard_count TO 2; +SELECT shardminvalue, shardmaxvalue from pg_dist_shard WHERE shardid = 290000; + shardminvalue | shardmaxvalue +---------------+--------------- + 1 | 1509 +(1 row) + +SELECT shardminvalue, shardmaxvalue from pg_dist_shard WHERE shardid = 290001; + shardminvalue | shardmaxvalue +---------------+--------------- + 1509 | 2951 +(1 row) + +-- Check that partition and join pruning works when min/max values exist +-- Adding l_orderkey = 1 to make the query not router executable +EXPLAIN (COSTS FALSE) +SELECT l_orderkey, l_linenumber, l_shipdate FROM lineitem WHERE l_orderkey = 9030 or l_orderkey = 1; + QUERY PLAN +----------------------------------------------------------------------- + Custom Scan (Citus Real-Time) + Task Count: 2 + Tasks Shown: All + -> Task + Node: host=localhost port=57637 dbname=regression + -> Bitmap Heap Scan on lineitem_290000 lineitem + Recheck Cond: ((l_orderkey = 9030) OR (l_orderkey = 1)) + -> BitmapOr + -> Bitmap Index Scan on lineitem_pkey_290000 + Index Cond: (l_orderkey = 9030) + -> Bitmap Index Scan on lineitem_pkey_290000 + Index Cond: (l_orderkey = 1) + -> Task + Node: host=localhost port=57638 dbname=regression + -> Bitmap Heap Scan on lineitem_290004 lineitem + Recheck Cond: ((l_orderkey = 9030) OR (l_orderkey = 1)) + -> BitmapOr + -> Bitmap Index Scan on lineitem_pkey_290004 + Index Cond: (l_orderkey = 9030) + -> Bitmap Index Scan on lineitem_pkey_290004 + Index Cond: (l_orderkey = 1) +(21 rows) + +EXPLAIN (COSTS FALSE) +SELECT sum(l_linenumber), avg(l_linenumber) FROM lineitem, orders + WHERE l_orderkey = o_orderkey; +DEBUG: join prunable for intervals [1,1509] and [8997,14946] +DEBUG: join prunable for intervals [1509,2951] and [8997,14946] +DEBUG: join prunable for intervals [2951,4455] and [8997,14946] +DEBUG: join prunable for intervals [4480,5986] and [8997,14946] +DEBUG: join prunable for intervals [8997,10560] and [1,5986] +DEBUG: join prunable for intervals [10560,12036] and [1,5986] +DEBUG: join prunable for intervals [12036,13473] and [1,5986] +DEBUG: join prunable for intervals [13473,14947] and [1,5986] + QUERY PLAN +------------------------------------------------------------------------------------------------------ + Aggregate + -> Custom Scan (Citus Real-Time) + Task Count: 8 + Tasks Shown: All + -> Task + Node: host=localhost port=57637 dbname=regression + -> Aggregate + -> Merge Join + Merge Cond: (orders.o_orderkey = lineitem.l_orderkey) + -> Index Only Scan using orders_pkey_290008 on orders_290008 orders + -> Index Only Scan using lineitem_pkey_290001 on lineitem_290001 lineitem + -> Task + Node: host=localhost port=57638 dbname=regression + -> Aggregate + -> Merge Join + Merge Cond: (lineitem.l_orderkey = orders.o_orderkey) + -> Index Only Scan using lineitem_pkey_290000 on lineitem_290000 lineitem + -> Index Only Scan using orders_pkey_290008 on orders_290008 orders + -> Task + Node: host=localhost port=57637 dbname=regression + -> Aggregate + -> Merge Join + Merge Cond: (orders.o_orderkey = lineitem.l_orderkey) + -> Index Only Scan using orders_pkey_290008 on orders_290008 orders + -> Index Only Scan using lineitem_pkey_290003 on lineitem_290003 lineitem + -> Task + Node: host=localhost port=57638 dbname=regression + -> Aggregate + -> Merge Join + Merge Cond: (orders.o_orderkey = lineitem.l_orderkey) + -> Index Only Scan using orders_pkey_290008 on orders_290008 orders + -> Index Only Scan using lineitem_pkey_290002 on lineitem_290002 lineitem + -> Task + Node: host=localhost port=57637 dbname=regression + -> Aggregate + -> Merge Join + Merge Cond: (orders.o_orderkey = lineitem.l_orderkey) + -> Index Only Scan using orders_pkey_290009 on orders_290009 orders + -> Index Only Scan using lineitem_pkey_290005 on lineitem_290005 lineitem + -> Task + Node: host=localhost port=57638 dbname=regression + -> Aggregate + -> Merge Join + Merge Cond: (orders.o_orderkey = lineitem.l_orderkey) + -> Index Only Scan using orders_pkey_290009 on orders_290009 orders + -> Index Only Scan using lineitem_pkey_290004 on lineitem_290004 lineitem + -> Task + Node: host=localhost port=57637 dbname=regression + -> Aggregate + -> Merge Join + Merge Cond: (orders.o_orderkey = lineitem.l_orderkey) + -> Index Only Scan using orders_pkey_290009 on orders_290009 orders + -> Index Only Scan using lineitem_pkey_290007 on lineitem_290007 lineitem + -> Task + Node: host=localhost port=57638 dbname=regression + -> Aggregate + -> Merge Join + Merge Cond: (orders.o_orderkey = lineitem.l_orderkey) + -> Index Only Scan using orders_pkey_290009 on orders_290009 orders + -> Index Only Scan using lineitem_pkey_290006 on lineitem_290006 lineitem +(60 rows) + +-- Now set the minimum value for a shard to null. Then check that we don't apply +-- partition or join pruning for the shard with null min value. +UPDATE pg_dist_shard SET shardminvalue = NULL WHERE shardid = 290000; +EXPLAIN (COSTS FALSE) +SELECT l_orderkey, l_linenumber, l_shipdate FROM lineitem WHERE l_orderkey = 9030; + QUERY PLAN +------------------------------------------------------------------------------- + Custom Scan (Citus Real-Time) + Task Count: 2 + Tasks Shown: All + -> Task + Node: host=localhost port=57637 dbname=regression + -> Index Scan using lineitem_pkey_290000 on lineitem_290000 lineitem + Index Cond: (l_orderkey = 9030) + -> Task + Node: host=localhost port=57638 dbname=regression + -> Index Scan using lineitem_pkey_290004 on lineitem_290004 lineitem + Index Cond: (l_orderkey = 9030) +(11 rows) + +EXPLAIN (COSTS FALSE) +SELECT sum(l_linenumber), avg(l_linenumber) FROM lineitem, orders + WHERE l_orderkey = o_orderkey; +DEBUG: join prunable for intervals [1509,2951] and [8997,14946] +DEBUG: join prunable for intervals [2951,4455] and [8997,14946] +DEBUG: join prunable for intervals [4480,5986] and [8997,14946] +DEBUG: join prunable for intervals [8997,10560] and [1,5986] +DEBUG: join prunable for intervals [10560,12036] and [1,5986] +DEBUG: join prunable for intervals [12036,13473] and [1,5986] +DEBUG: join prunable for intervals [13473,14947] and [1,5986] + QUERY PLAN +------------------------------------------------------------------------------------------------------ + Aggregate + -> Custom Scan (Citus Real-Time) + Task Count: 9 + Tasks Shown: All + -> Task + Node: host=localhost port=57637 dbname=regression + -> Aggregate + -> Merge Join + Merge Cond: (orders.o_orderkey = lineitem.l_orderkey) + -> Index Only Scan using orders_pkey_290008 on orders_290008 orders + -> Index Only Scan using lineitem_pkey_290001 on lineitem_290001 lineitem + -> Task + Node: host=localhost port=57638 dbname=regression + -> Aggregate + -> Merge Join + Merge Cond: (lineitem.l_orderkey = orders.o_orderkey) + -> Index Only Scan using lineitem_pkey_290000 on lineitem_290000 lineitem + -> Index Only Scan using orders_pkey_290008 on orders_290008 orders + -> Task + Node: host=localhost port=57637 dbname=regression + -> Aggregate + -> Merge Join + Merge Cond: (orders.o_orderkey = lineitem.l_orderkey) + -> Index Only Scan using orders_pkey_290008 on orders_290008 orders + -> Index Only Scan using lineitem_pkey_290003 on lineitem_290003 lineitem + -> Task + Node: host=localhost port=57638 dbname=regression + -> Aggregate + -> Merge Join + Merge Cond: (orders.o_orderkey = lineitem.l_orderkey) + -> Index Only Scan using orders_pkey_290009 on orders_290009 orders + -> Index Only Scan using lineitem_pkey_290000 on lineitem_290000 lineitem + -> Task + Node: host=localhost port=57637 dbname=regression + -> Aggregate + -> Merge Join + Merge Cond: (orders.o_orderkey = lineitem.l_orderkey) + -> Index Only Scan using orders_pkey_290009 on orders_290009 orders + -> Index Only Scan using lineitem_pkey_290005 on lineitem_290005 lineitem + -> Task + Node: host=localhost port=57638 dbname=regression + -> Aggregate + -> Merge Join + Merge Cond: (orders.o_orderkey = lineitem.l_orderkey) + -> Index Only Scan using orders_pkey_290008 on orders_290008 orders + -> Index Only Scan using lineitem_pkey_290002 on lineitem_290002 lineitem + -> Task + Node: host=localhost port=57637 dbname=regression + -> Aggregate + -> Merge Join + Merge Cond: (orders.o_orderkey = lineitem.l_orderkey) + -> Index Only Scan using orders_pkey_290009 on orders_290009 orders + -> Index Only Scan using lineitem_pkey_290007 on lineitem_290007 lineitem + -> Task + Node: host=localhost port=57638 dbname=regression + -> Aggregate + -> Merge Join + Merge Cond: (orders.o_orderkey = lineitem.l_orderkey) + -> Index Only Scan using orders_pkey_290009 on orders_290009 orders + -> Index Only Scan using lineitem_pkey_290004 on lineitem_290004 lineitem + -> Task + Node: host=localhost port=57637 dbname=regression + -> Aggregate + -> Merge Join + Merge Cond: (orders.o_orderkey = lineitem.l_orderkey) + -> Index Only Scan using orders_pkey_290009 on orders_290009 orders + -> Index Only Scan using lineitem_pkey_290006 on lineitem_290006 lineitem +(67 rows) + +-- Next, set the maximum value for another shard to null. Then check that we +-- don't apply partition or join pruning for this other shard either. +UPDATE pg_dist_shard SET shardmaxvalue = NULL WHERE shardid = 290001; +EXPLAIN (COSTS FALSE) +SELECT l_orderkey, l_linenumber, l_shipdate FROM lineitem WHERE l_orderkey = 9030; + QUERY PLAN +------------------------------------------------------------------------------- + Custom Scan (Citus Real-Time) + Task Count: 3 + Tasks Shown: All + -> Task + Node: host=localhost port=57637 dbname=regression + -> Index Scan using lineitem_pkey_290001 on lineitem_290001 lineitem + Index Cond: (l_orderkey = 9030) + -> Task + Node: host=localhost port=57638 dbname=regression + -> Index Scan using lineitem_pkey_290000 on lineitem_290000 lineitem + Index Cond: (l_orderkey = 9030) + -> Task + Node: host=localhost port=57637 dbname=regression + -> Index Scan using lineitem_pkey_290004 on lineitem_290004 lineitem + Index Cond: (l_orderkey = 9030) +(15 rows) + +EXPLAIN (COSTS FALSE) +SELECT sum(l_linenumber), avg(l_linenumber) FROM lineitem, orders + WHERE l_orderkey = o_orderkey; +DEBUG: join prunable for intervals [2951,4455] and [8997,14946] +DEBUG: join prunable for intervals [4480,5986] and [8997,14946] +DEBUG: join prunable for intervals [8997,10560] and [1,5986] +DEBUG: join prunable for intervals [10560,12036] and [1,5986] +DEBUG: join prunable for intervals [12036,13473] and [1,5986] +DEBUG: join prunable for intervals [13473,14947] and [1,5986] + QUERY PLAN +------------------------------------------------------------------------------------------------------ + Aggregate + -> Custom Scan (Citus Real-Time) + Task Count: 10 + Tasks Shown: All + -> Task + Node: host=localhost port=57637 dbname=regression + -> Aggregate + -> Merge Join + Merge Cond: (orders.o_orderkey = lineitem.l_orderkey) + -> Index Only Scan using orders_pkey_290009 on orders_290009 orders + -> Index Only Scan using lineitem_pkey_290001 on lineitem_290001 lineitem + -> Task + Node: host=localhost port=57638 dbname=regression + -> Aggregate + -> Merge Join + Merge Cond: (lineitem.l_orderkey = orders.o_orderkey) + -> Index Only Scan using lineitem_pkey_290000 on lineitem_290000 lineitem + -> Index Only Scan using orders_pkey_290008 on orders_290008 orders + -> Task + Node: host=localhost port=57637 dbname=regression + -> Aggregate + -> Merge Join + Merge Cond: (orders.o_orderkey = lineitem.l_orderkey) + -> Index Only Scan using orders_pkey_290008 on orders_290008 orders + -> Index Only Scan using lineitem_pkey_290001 on lineitem_290001 lineitem + -> Task + Node: host=localhost port=57638 dbname=regression + -> Aggregate + -> Merge Join + Merge Cond: (orders.o_orderkey = lineitem.l_orderkey) + -> Index Only Scan using orders_pkey_290009 on orders_290009 orders + -> Index Only Scan using lineitem_pkey_290000 on lineitem_290000 lineitem + -> Task + Node: host=localhost port=57637 dbname=regression + -> Aggregate + -> Merge Join + Merge Cond: (orders.o_orderkey = lineitem.l_orderkey) + -> Index Only Scan using orders_pkey_290008 on orders_290008 orders + -> Index Only Scan using lineitem_pkey_290003 on lineitem_290003 lineitem + -> Task + Node: host=localhost port=57638 dbname=regression + -> Aggregate + -> Merge Join + Merge Cond: (orders.o_orderkey = lineitem.l_orderkey) + -> Index Only Scan using orders_pkey_290008 on orders_290008 orders + -> Index Only Scan using lineitem_pkey_290002 on lineitem_290002 lineitem + -> Task + Node: host=localhost port=57637 dbname=regression + -> Aggregate + -> Merge Join + Merge Cond: (orders.o_orderkey = lineitem.l_orderkey) + -> Index Only Scan using orders_pkey_290009 on orders_290009 orders + -> Index Only Scan using lineitem_pkey_290005 on lineitem_290005 lineitem + -> Task + Node: host=localhost port=57638 dbname=regression + -> Aggregate + -> Merge Join + Merge Cond: (orders.o_orderkey = lineitem.l_orderkey) + -> Index Only Scan using orders_pkey_290009 on orders_290009 orders + -> Index Only Scan using lineitem_pkey_290004 on lineitem_290004 lineitem + -> Task + Node: host=localhost port=57637 dbname=regression + -> Aggregate + -> Merge Join + Merge Cond: (orders.o_orderkey = lineitem.l_orderkey) + -> Index Only Scan using orders_pkey_290009 on orders_290009 orders + -> Index Only Scan using lineitem_pkey_290007 on lineitem_290007 lineitem + -> Task + Node: host=localhost port=57638 dbname=regression + -> Aggregate + -> Merge Join + Merge Cond: (orders.o_orderkey = lineitem.l_orderkey) + -> Index Only Scan using orders_pkey_290009 on orders_290009 orders + -> Index Only Scan using lineitem_pkey_290006 on lineitem_290006 lineitem +(74 rows) + +-- Last, set the minimum value to 0 and check that we don't treat it as null. We +-- should apply partition and join pruning for this shard now. +UPDATE pg_dist_shard SET shardminvalue = '0' WHERE shardid = 290000; +EXPLAIN (COSTS FALSE) +SELECT l_orderkey, l_linenumber, l_shipdate FROM lineitem WHERE l_orderkey = 9030; + QUERY PLAN +------------------------------------------------------------------------------- + Custom Scan (Citus Real-Time) + Task Count: 2 + Tasks Shown: All + -> Task + Node: host=localhost port=57637 dbname=regression + -> Index Scan using lineitem_pkey_290001 on lineitem_290001 lineitem + Index Cond: (l_orderkey = 9030) + -> Task + Node: host=localhost port=57638 dbname=regression + -> Index Scan using lineitem_pkey_290004 on lineitem_290004 lineitem + Index Cond: (l_orderkey = 9030) +(11 rows) + +EXPLAIN (COSTS FALSE) +SELECT sum(l_linenumber), avg(l_linenumber) FROM lineitem, orders + WHERE l_orderkey = o_orderkey; +DEBUG: join prunable for intervals [0,1509] and [8997,14946] +DEBUG: join prunable for intervals [2951,4455] and [8997,14946] +DEBUG: join prunable for intervals [4480,5986] and [8997,14946] +DEBUG: join prunable for intervals [8997,10560] and [1,5986] +DEBUG: join prunable for intervals [10560,12036] and [1,5986] +DEBUG: join prunable for intervals [12036,13473] and [1,5986] +DEBUG: join prunable for intervals [13473,14947] and [1,5986] + QUERY PLAN +------------------------------------------------------------------------------------------------------ + Aggregate + -> Custom Scan (Citus Real-Time) + Task Count: 9 + Tasks Shown: All + -> Task + Node: host=localhost port=57637 dbname=regression + -> Aggregate + -> Merge Join + Merge Cond: (orders.o_orderkey = lineitem.l_orderkey) + -> Index Only Scan using orders_pkey_290009 on orders_290009 orders + -> Index Only Scan using lineitem_pkey_290001 on lineitem_290001 lineitem + -> Task + Node: host=localhost port=57638 dbname=regression + -> Aggregate + -> Merge Join + Merge Cond: (lineitem.l_orderkey = orders.o_orderkey) + -> Index Only Scan using lineitem_pkey_290000 on lineitem_290000 lineitem + -> Index Only Scan using orders_pkey_290008 on orders_290008 orders + -> Task + Node: host=localhost port=57637 dbname=regression + -> Aggregate + -> Merge Join + Merge Cond: (orders.o_orderkey = lineitem.l_orderkey) + -> Index Only Scan using orders_pkey_290008 on orders_290008 orders + -> Index Only Scan using lineitem_pkey_290001 on lineitem_290001 lineitem + -> Task + Node: host=localhost port=57638 dbname=regression + -> Aggregate + -> Merge Join + Merge Cond: (orders.o_orderkey = lineitem.l_orderkey) + -> Index Only Scan using orders_pkey_290008 on orders_290008 orders + -> Index Only Scan using lineitem_pkey_290002 on lineitem_290002 lineitem + -> Task + Node: host=localhost port=57637 dbname=regression + -> Aggregate + -> Merge Join + Merge Cond: (orders.o_orderkey = lineitem.l_orderkey) + -> Index Only Scan using orders_pkey_290008 on orders_290008 orders + -> Index Only Scan using lineitem_pkey_290003 on lineitem_290003 lineitem + -> Task + Node: host=localhost port=57638 dbname=regression + -> Aggregate + -> Merge Join + Merge Cond: (orders.o_orderkey = lineitem.l_orderkey) + -> Index Only Scan using orders_pkey_290009 on orders_290009 orders + -> Index Only Scan using lineitem_pkey_290004 on lineitem_290004 lineitem + -> Task + Node: host=localhost port=57637 dbname=regression + -> Aggregate + -> Merge Join + Merge Cond: (orders.o_orderkey = lineitem.l_orderkey) + -> Index Only Scan using orders_pkey_290009 on orders_290009 orders + -> Index Only Scan using lineitem_pkey_290005 on lineitem_290005 lineitem + -> Task + Node: host=localhost port=57638 dbname=regression + -> Aggregate + -> Merge Join + Merge Cond: (orders.o_orderkey = lineitem.l_orderkey) + -> Index Only Scan using orders_pkey_290009 on orders_290009 orders + -> Index Only Scan using lineitem_pkey_290006 on lineitem_290006 lineitem + -> Task + Node: host=localhost port=57637 dbname=regression + -> Aggregate + -> Merge Join + Merge Cond: (orders.o_orderkey = lineitem.l_orderkey) + -> Index Only Scan using orders_pkey_290009 on orders_290009 orders + -> Index Only Scan using lineitem_pkey_290007 on lineitem_290007 lineitem +(67 rows) + +-- Set minimum and maximum values for two shards back to their original values +UPDATE pg_dist_shard SET shardminvalue = '1' WHERE shardid = 290000; +UPDATE pg_dist_shard SET shardmaxvalue = '4964' WHERE shardid = 290001; +SET client_min_messages TO NOTICE; diff --git a/src/test/regress/expected/multi_task_assignment_policy_0.out b/src/test/regress/expected/multi_task_assignment_policy_0.out new file mode 100644 index 000000000..0468d42e2 --- /dev/null +++ b/src/test/regress/expected/multi_task_assignment_policy_0.out @@ -0,0 +1,170 @@ +-- +-- MULTI_TASK_ASSIGNMENT +-- +ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 880000; +SET citus.explain_distributed_queries TO off; +-- Check that our policies for assigning tasks to worker nodes run as expected. +-- To test this, we first create a shell table, and then manually insert shard +-- and shard placement data into system catalogs. We next run Explain command, +-- and check that tasks are assigned to worker nodes as expected. +CREATE TABLE task_assignment_test_table (test_id integer); +SELECT master_create_distributed_table('task_assignment_test_table', 'test_id', 'append'); + master_create_distributed_table +--------------------------------- + +(1 row) + +-- Create logical shards with shardids 200, 201, and 202 +INSERT INTO pg_dist_shard (logicalrelid, shardid, shardstorage, shardminvalue, shardmaxvalue) + SELECT pg_class.oid, series.index, 'r', 1, 1000 + FROM pg_class, generate_series(200, 202) AS series(index) + WHERE pg_class.relname = 'task_assignment_test_table'; +-- Create shard placements for shard 200 and 201 +INSERT INTO pg_dist_shard_placement (shardid, shardstate, shardlength, nodename, nodeport) + SELECT 200, 1, 1, nodename, nodeport + FROM pg_dist_shard_placement + GROUP BY nodename, nodeport + ORDER BY nodename, nodeport ASC + LIMIT 2; +INSERT INTO pg_dist_shard_placement (shardid, shardstate, shardlength, nodename, nodeport) + SELECT 201, 1, 1, nodename, nodeport + FROM pg_dist_shard_placement + GROUP BY nodename, nodeport + ORDER BY nodename, nodeport ASC + LIMIT 2; +-- Create shard placements for shard 202 +INSERT INTO pg_dist_shard_placement (shardid, shardstate, shardlength, nodename, nodeport) + SELECT 202, 1, 1, nodename, nodeport + FROM pg_dist_shard_placement + GROUP BY nodename, nodeport + ORDER BY nodename, nodeport DESC + LIMIT 2; +-- Start transaction block to avoid auto commits. This avoids additional debug +-- messages from getting printed at real transaction starts and commits. +BEGIN; +-- Increase log level to see which worker nodes tasks are assigned to. Note that +-- the following log messages print node name and port numbers; and node numbers +-- in regression tests depend upon PG_VERSION_NUM. +SET client_min_messages TO DEBUG3; +DEBUG: CommitTransactionCommand +-- First test the default greedy task assignment policy +SET citus.task_assignment_policy TO 'greedy'; +DEBUG: StartTransactionCommand +DEBUG: ProcessUtility +DEBUG: CommitTransactionCommand +EXPLAIN SELECT count(*) FROM task_assignment_test_table; +DEBUG: StartTransactionCommand +DEBUG: ProcessUtility +DEBUG: assigned task 6 to node localhost:57637 +DEBUG: assigned task 2 to node localhost:57638 +DEBUG: assigned task 4 to node localhost:57637 +DEBUG: CommitTransactionCommand + QUERY PLAN +----------------------------------------------------------------------- + Aggregate (cost=0.00..0.00 rows=0 width=0) + -> Custom Scan (Citus Real-Time) (cost=0.00..0.00 rows=0 width=0) + explain statements for distributed queries are not enabled +(3 rows) + +EXPLAIN SELECT count(*) FROM task_assignment_test_table; +DEBUG: StartTransactionCommand +DEBUG: ProcessUtility +DEBUG: assigned task 6 to node localhost:57637 +DEBUG: assigned task 2 to node localhost:57638 +DEBUG: assigned task 4 to node localhost:57637 +DEBUG: CommitTransactionCommand + QUERY PLAN +----------------------------------------------------------------------- + Aggregate (cost=0.00..0.00 rows=0 width=0) + -> Custom Scan (Citus Real-Time) (cost=0.00..0.00 rows=0 width=0) + explain statements for distributed queries are not enabled +(3 rows) + +-- Next test the first-replica task assignment policy +SET citus.task_assignment_policy TO 'first-replica'; +DEBUG: StartTransactionCommand +DEBUG: ProcessUtility +DEBUG: CommitTransactionCommand +EXPLAIN SELECT count(*) FROM task_assignment_test_table; +DEBUG: StartTransactionCommand +DEBUG: ProcessUtility +DEBUG: assigned task 6 to node localhost:57637 +DEBUG: assigned task 4 to node localhost:57637 +DEBUG: assigned task 2 to node localhost:57638 +DEBUG: CommitTransactionCommand + QUERY PLAN +----------------------------------------------------------------------- + Aggregate (cost=0.00..0.00 rows=0 width=0) + -> Custom Scan (Citus Real-Time) (cost=0.00..0.00 rows=0 width=0) + explain statements for distributed queries are not enabled +(3 rows) + +EXPLAIN SELECT count(*) FROM task_assignment_test_table; +DEBUG: StartTransactionCommand +DEBUG: ProcessUtility +DEBUG: assigned task 6 to node localhost:57637 +DEBUG: assigned task 4 to node localhost:57637 +DEBUG: assigned task 2 to node localhost:57638 +DEBUG: CommitTransactionCommand + QUERY PLAN +----------------------------------------------------------------------- + Aggregate (cost=0.00..0.00 rows=0 width=0) + -> Custom Scan (Citus Real-Time) (cost=0.00..0.00 rows=0 width=0) + explain statements for distributed queries are not enabled +(3 rows) + +-- Finally test the round-robin task assignment policy +SET citus.task_assignment_policy TO 'round-robin'; +DEBUG: StartTransactionCommand +DEBUG: ProcessUtility +DEBUG: CommitTransactionCommand +EXPLAIN SELECT count(*) FROM task_assignment_test_table; +DEBUG: StartTransactionCommand +DEBUG: ProcessUtility +DEBUG: assigned task 6 to node localhost:57638 +DEBUG: assigned task 4 to node localhost:57638 +DEBUG: assigned task 2 to node localhost:57637 +DEBUG: CommitTransactionCommand + QUERY PLAN +----------------------------------------------------------------------- + Aggregate (cost=0.00..0.00 rows=0 width=0) + -> Custom Scan (Citus Real-Time) (cost=0.00..0.00 rows=0 width=0) + explain statements for distributed queries are not enabled +(3 rows) + +EXPLAIN SELECT count(*) FROM task_assignment_test_table; +DEBUG: StartTransactionCommand +DEBUG: ProcessUtility +DEBUG: assigned task 6 to node localhost:57637 +DEBUG: assigned task 4 to node localhost:57637 +DEBUG: assigned task 2 to node localhost:57638 +DEBUG: CommitTransactionCommand + QUERY PLAN +----------------------------------------------------------------------- + Aggregate (cost=0.00..0.00 rows=0 width=0) + -> Custom Scan (Citus Real-Time) (cost=0.00..0.00 rows=0 width=0) + explain statements for distributed queries are not enabled +(3 rows) + +EXPLAIN SELECT count(*) FROM task_assignment_test_table; +DEBUG: StartTransactionCommand +DEBUG: ProcessUtility +DEBUG: assigned task 6 to node localhost:57638 +DEBUG: assigned task 4 to node localhost:57638 +DEBUG: assigned task 2 to node localhost:57637 +DEBUG: CommitTransactionCommand + QUERY PLAN +----------------------------------------------------------------------- + Aggregate (cost=0.00..0.00 rows=0 width=0) + -> Custom Scan (Citus Real-Time) (cost=0.00..0.00 rows=0 width=0) + explain statements for distributed queries are not enabled +(3 rows) + +RESET citus.task_assignment_policy; +DEBUG: StartTransactionCommand +DEBUG: ProcessUtility +DEBUG: CommitTransactionCommand +RESET client_min_messages; +DEBUG: StartTransactionCommand +DEBUG: ProcessUtility +COMMIT;