Shift version-specific files

Many tests now need 9.5-, 9.6-, and 10-specific output files. I've
"shifted" all such files. The mapping is:

  * No suffix: PostgreSQL 10
  * _0 suffix: PostgreSQL 9.6, or 9.x if no _1 present
  * _1 suffix: PostgreSQL 9.5

All tests output the version at the top, so it's clear which output
belongs to which version.
pull/1439/head
Jason Petersen 2017-04-21 15:56:01 -06:00
parent dc2fe5a7da
commit 67eca1d3f6
No known key found for this signature in database
GPG Key ID: 9F1D3510D110ABA9
6 changed files with 2145 additions and 46 deletions

View File

@ -6,7 +6,7 @@ ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 570000;
SELECT substring(version(), '\d+(?:\.\d+)?') AS major_version;
major_version
---------------
9.5
9.6
(1 row)
\a\t
@ -42,7 +42,7 @@ EXPLAIN (COSTS FALSE, FORMAT TEXT)
SELECT l_quantity, count(*) count_quantity FROM lineitem
GROUP BY l_quantity ORDER BY count_quantity, l_quantity;
Sort
Sort Key: COALESCE((sum((COALESCE((sum(remote_scan.count_quantity))::bigint, '0'::bigint))))::bigint, '0'::bigint), remote_scan.l_quantity
Sort Key: COALESCE((pg_catalog.sum((COALESCE((pg_catalog.sum(remote_scan.count_quantity))::bigint, '0'::bigint))))::bigint, '0'::bigint), remote_scan.l_quantity
-> HashAggregate
Group Key: remote_scan.l_quantity
-> Custom Scan (Citus Real-Time)
@ -61,18 +61,22 @@ EXPLAIN (COSTS FALSE, FORMAT JSON)
{
"Plan": {
"Node Type": "Sort",
"Sort Key": ["COALESCE((sum((COALESCE((sum(remote_scan.count_quantity))::bigint, '0'::bigint))))::bigint, '0'::bigint)", "remote_scan.l_quantity"],
"Parallel Aware": false,
"Sort Key": ["COALESCE((pg_catalog.sum((COALESCE((pg_catalog.sum(remote_scan.count_quantity))::bigint, '0'::bigint))))::bigint, '0'::bigint)", "remote_scan.l_quantity"],
"Plans": [
{
"Node Type": "Aggregate",
"Strategy": "Hashed",
"Partial Mode": "Simple",
"Parent Relationship": "Outer",
"Parallel Aware": false,
"Group Key": ["remote_scan.l_quantity"],
"Plans": [
{
"Node Type": "Custom Scan",
"Parent Relationship": "Outer",
"Custom Plan Provider": "Citus Real-Time",
"Parallel Aware": false,
"Distributed Query": {
"Job": {
"Task Count": 8,
@ -86,11 +90,14 @@ EXPLAIN (COSTS FALSE, FORMAT JSON)
"Plan": {
"Node Type": "Aggregate",
"Strategy": "Hashed",
"Partial Mode": "Simple",
"Parallel Aware": false,
"Group Key": ["l_quantity"],
"Plans": [
{
"Node Type": "Seq Scan",
"Parent Relationship": "Outer",
"Parallel Aware": false,
"Relation Name": "lineitem_290001",
"Alias": "lineitem"
}
@ -124,15 +131,18 @@ EXPLAIN (COSTS FALSE, FORMAT XML)
<Query>
<Plan>
<Node-Type>Sort</Node-Type>
<Parallel-Aware>false</Parallel-Aware>
<Sort-Key>
<Item>COALESCE((sum((COALESCE((sum(remote_scan.count_quantity))::bigint, '0'::bigint))))::bigint, '0'::bigint)</Item>
<Item>COALESCE((pg_catalog.sum((COALESCE((pg_catalog.sum(remote_scan.count_quantity))::bigint, '0'::bigint))))::bigint, '0'::bigint)</Item>
<Item>remote_scan.l_quantity</Item>
</Sort-Key>
<Plans>
<Plan>
<Node-Type>Aggregate</Node-Type>
<Strategy>Hashed</Strategy>
<Partial-Mode>Simple</Partial-Mode>
<Parent-Relationship>Outer</Parent-Relationship>
<Parallel-Aware>false</Parallel-Aware>
<Group-Key>
<Item>remote_scan.l_quantity</Item>
</Group-Key>
@ -141,6 +151,7 @@ EXPLAIN (COSTS FALSE, FORMAT XML)
<Node-Type>Custom Scan</Node-Type>
<Parent-Relationship>Outer</Parent-Relationship>
<Custom-Plan-Provider>Citus Real-Time</Custom-Plan-Provider>
<Parallel-Aware>false</Parallel-Aware>
<Distributed-Query>
<Job>
<Task-Count>8</Task-Count>
@ -154,6 +165,8 @@ EXPLAIN (COSTS FALSE, FORMAT XML)
<Plan>
<Node-Type>Aggregate</Node-Type>
<Strategy>Hashed</Strategy>
<Partial-Mode>Simple</Partial-Mode>
<Parallel-Aware>false</Parallel-Aware>
<Group-Key>
<Item>l_quantity</Item>
</Group-Key>
@ -161,6 +174,7 @@ EXPLAIN (COSTS FALSE, FORMAT XML)
<Plan>
<Node-Type>Seq Scan</Node-Type>
<Parent-Relationship>Outer</Parent-Relationship>
<Parallel-Aware>false</Parallel-Aware>
<Relation-Name>lineitem_290001</Relation-Name>
<Alias>lineitem</Alias>
</Plan>
@ -191,19 +205,23 @@ EXPLAIN (COSTS FALSE, FORMAT YAML)
GROUP BY l_quantity ORDER BY count_quantity, l_quantity;
- Plan:
Node Type: "Sort"
Parallel Aware: false
Sort Key:
- "COALESCE((sum((COALESCE((sum(remote_scan.count_quantity))::bigint, '0'::bigint))))::bigint, '0'::bigint)"
- "COALESCE((pg_catalog.sum((COALESCE((pg_catalog.sum(remote_scan.count_quantity))::bigint, '0'::bigint))))::bigint, '0'::bigint)"
- "remote_scan.l_quantity"
Plans:
- Node Type: "Aggregate"
Strategy: "Hashed"
Partial Mode: "Simple"
Parent Relationship: "Outer"
Parallel Aware: false
Group Key:
- "remote_scan.l_quantity"
Plans:
- Node Type: "Custom Scan"
Parent Relationship: "Outer"
Custom Plan Provider: "Citus Real-Time"
Parallel Aware: false
Distributed Query:
Job:
Task Count: 8
@ -214,11 +232,14 @@ EXPLAIN (COSTS FALSE, FORMAT YAML)
- Plan:
Node Type: "Aggregate"
Strategy: "Hashed"
Partial Mode: "Simple"
Parallel Aware: false
Group Key:
- "l_quantity"
Plans:
- Node Type: "Seq Scan"
Parent Relationship: "Outer"
Parallel Aware: false
Relation Name: "lineitem_290001"
Alias: "lineitem"
@ -227,7 +248,7 @@ EXPLAIN (COSTS FALSE, FORMAT TEXT)
SELECT l_quantity, count(*) count_quantity FROM lineitem
GROUP BY l_quantity ORDER BY count_quantity, l_quantity;
Sort
Sort Key: COALESCE((sum((COALESCE((sum(remote_scan.count_quantity))::bigint, '0'::bigint))))::bigint, '0'::bigint), remote_scan.l_quantity
Sort Key: COALESCE((pg_catalog.sum((COALESCE((pg_catalog.sum(remote_scan.count_quantity))::bigint, '0'::bigint))))::bigint, '0'::bigint), remote_scan.l_quantity
-> HashAggregate
Group Key: remote_scan.l_quantity
-> Custom Scan (Citus Real-Time)
@ -242,7 +263,7 @@ Sort
EXPLAIN (COSTS FALSE, VERBOSE TRUE)
SELECT sum(l_quantity) / avg(l_quantity) FROM lineitem;
Aggregate
Output: (sum(remote_scan."?column?") / (sum(remote_scan."?column?_1") / sum(remote_scan."?column?_2")))
Output: (sum(remote_scan."?column?") / (sum(remote_scan."?column?_1") / pg_catalog.sum(remote_scan."?column?_2")))
-> Custom Scan (Citus Real-Time)
Output: remote_scan."?column?", remote_scan."?column?_1", remote_scan."?column?_2"
Task Count: 8
@ -344,7 +365,7 @@ EXPLAIN (COSTS FALSE, VERBOSE TRUE)
SELECT sum(l_quantity) / avg(l_quantity) FROM lineitem
HAVING sum(l_quantity) > 100;
Aggregate
Output: (sum(remote_scan."?column?") / (sum(remote_scan."?column?_1") / sum(remote_scan."?column?_2")))
Output: (sum(remote_scan."?column?") / (sum(remote_scan."?column?_1") / pg_catalog.sum(remote_scan."?column?_2")))
Filter: (sum(remote_scan.worker_column_4) > '100'::numeric)
-> Custom Scan (Citus Real-Time)
Output: remote_scan."?column?", remote_scan."?column?_1", remote_scan."?column?_2", remote_scan.worker_column_4
@ -410,11 +431,15 @@ Aggregate
Node: host=localhost port=57637 dbname=regression
-> Aggregate
-> GroupAggregate
Group Key: ((users.composite_id).tenant_id), ((users.composite_id).user_id)
Group Key: (((NULL::user_composite_type)).tenant_id), (((NULL::user_composite_type)).user_id)
-> Sort
Sort Key: ((users.composite_id).tenant_id), ((users.composite_id).user_id)
-> Result
One-Time Filter: false
Sort Key: (((NULL::user_composite_type)).tenant_id), (((NULL::user_composite_type)).user_id)
-> Nested Loop
Join Filter: ((NULL::user_composite_type) = events.composite_id)
-> Result
One-Time Filter: false
-> Seq Scan on events_1400027 events
Filter: ((event_type)::text = ANY ('{click,submit,pay}'::text[]))
-- Union and left join subquery pushdown
EXPLAIN (COSTS OFF)
SELECT
@ -485,29 +510,40 @@ HashAggregate
Tasks Shown: One of 4
-> Task
Node: host=localhost port=57637 dbname=regression
-> HashAggregate
Group Key: COALESCE(subquery_2.hasdone, 'Has not done paying'::text)
-> GroupAggregate
Group Key: ((composite_id).tenant_id), ((composite_id).user_id), subquery_2.hasdone
-> Sort
Sort Key: ((composite_id).tenant_id), ((composite_id).user_id), subquery_2.hasdone
-> Hash Left Join
Hash Cond: (composite_id = subquery_2.composite_id)
-> Unique
-> Sort
Sort Key: ((composite_id).tenant_id), ((composite_id).user_id), composite_id, ('action=>1'::text), event_time
-> Append
-> Result
One-Time Filter: false
-> Result
One-Time Filter: false
-> Hash
-> Subquery Scan on subquery_2
-> GroupAggregate
Group Key: subquery_top.hasdone
-> Sort
Sort Key: subquery_top.hasdone
-> Subquery Scan on subquery_top
-> GroupAggregate
Group Key: (((NULL::user_composite_type)).tenant_id), (((NULL::user_composite_type)).user_id), subquery_2.hasdone
-> Sort
Sort Key: (((NULL::user_composite_type)).tenant_id), (((NULL::user_composite_type)).user_id), subquery_2.hasdone
-> Hash Left Join
Hash Cond: ((NULL::user_composite_type) = subquery_2.composite_id)
-> Unique
-> Sort
Sort Key: ((events.composite_id).tenant_id), ((events.composite_id).user_id)
-> Seq Scan on events_1400027 events
Filter: ((composite_id >= '(1,-9223372036854775808)'::user_composite_type) AND (composite_id <= '(1,9223372036854775807)'::user_composite_type) AND ((event_type)::text = 'pay'::text))
Sort Key: (((NULL::user_composite_type)).tenant_id), (((NULL::user_composite_type)).user_id), (NULL::user_composite_type), ('action=>1'::text), events.event_time
-> Append
-> Nested Loop
Join Filter: ((NULL::user_composite_type) = events.composite_id)
-> Result
One-Time Filter: false
-> Seq Scan on events_1400027 events
Filter: ((event_type)::text = 'click'::text)
-> Nested Loop
Join Filter: ((NULL::user_composite_type) = events_1.composite_id)
-> Result
One-Time Filter: false
-> Seq Scan on events_1400027 events_1
Filter: ((event_type)::text = 'submit'::text)
-> Hash
-> Subquery Scan on subquery_2
-> Unique
-> Sort
Sort Key: ((events_2.composite_id).tenant_id), ((events_2.composite_id).user_id)
-> Seq Scan on events_1400027 events_2
Filter: ((composite_id >= '(1,-9223372036854775808)'::user_composite_type) AND (composite_id <= '(1,9223372036854775807)'::user_composite_type) AND ((event_type)::text = 'pay'::text))
-- Union, left join and having subquery pushdown
EXPLAIN (COSTS OFF)
SELECT
@ -643,22 +679,23 @@ Limit
Node: host=localhost port=57637 dbname=regression
-> Limit
-> Sort
Sort Key: (max(lastseen)) DESC
Sort Key: (max(users.lastseen)) DESC
-> GroupAggregate
Group Key: ((composite_id).tenant_id), ((composite_id).user_id)
Group Key: ((users.composite_id).tenant_id), ((users.composite_id).user_id)
-> Sort
Sort Key: ((composite_id).tenant_id), ((composite_id).user_id)
Sort Key: ((users.composite_id).tenant_id), ((users.composite_id).user_id)
-> Nested Loop Left Join
-> Limit
-> Sort
Sort Key: lastseen DESC
-> Result
One-Time Filter: false
Sort Key: users.lastseen DESC
-> Subquery Scan on users
-> Result
One-Time Filter: false
-> Limit
-> Sort
Sort Key: events.event_time DESC
-> Seq Scan on events_1400027 events
Filter: (composite_id = composite_id)
Filter: (composite_id = users.composite_id)
-- Test all tasks output
SET citus.explain_all_tasks TO on;
EXPLAIN (COSTS FALSE)
@ -736,11 +773,14 @@ EXPLAIN (COSTS FALSE, FORMAT JSON)
"Plan": {
"Node Type": "Aggregate",
"Strategy": "Plain",
"Partial Mode": "Simple",
"Parallel Aware": false,
"Plans": [
{
"Node Type": "Custom Scan",
"Parent Relationship": "Outer",
"Custom Plan Provider": "Citus Task-Tracker",
"Parallel Aware": false,
"Distributed Query": {
"Job": {
"Task Count": 1,
@ -782,11 +822,14 @@ EXPLAIN (COSTS FALSE, FORMAT XML)
<Plan>
<Node-Type>Aggregate</Node-Type>
<Strategy>Plain</Strategy>
<Partial-Mode>Simple</Partial-Mode>
<Parallel-Aware>false</Parallel-Aware>
<Plans>
<Plan>
<Node-Type>Custom Scan</Node-Type>
<Parent-Relationship>Outer</Parent-Relationship>
<Custom-Plan-Provider>Citus Task-Tracker</Custom-Plan-Provider>
<Parallel-Aware>false</Parallel-Aware>
<Distributed-Query>
<Job>
<Task-Count>1</Task-Count>
@ -839,10 +882,13 @@ EXPLAIN (COSTS FALSE, FORMAT YAML)
- Plan:
Node Type: "Aggregate"
Strategy: "Plain"
Partial Mode: "Simple"
Parallel Aware: false
Plans:
- Node Type: "Custom Scan"
Parent Relationship: "Outer"
Custom Plan Provider: "Citus Task-Tracker"
Parallel Aware: false
Distributed Query:
Job:
Task Count: 1
@ -855,18 +901,17 @@ EXPLAIN (COSTS FALSE, FORMAT YAML)
Merge Task Count: 1
-- test parallel aggregates
SET parallel_setup_cost=0;
ERROR: unrecognized configuration parameter "parallel_setup_cost"
SET parallel_tuple_cost=0;
ERROR: unrecognized configuration parameter "parallel_tuple_cost"
SET min_parallel_relation_size=0;
ERROR: unrecognized configuration parameter "min_parallel_relation_size"
SET max_parallel_workers_per_gather=4;
ERROR: unrecognized configuration parameter "max_parallel_workers_per_gather"
-- ensure local plans display correctly
CREATE TABLE lineitem_clone (LIKE lineitem);
EXPLAIN (COSTS FALSE) SELECT avg(l_linenumber) FROM lineitem_clone;
Aggregate
-> Seq Scan on lineitem_clone
Finalize Aggregate
-> Gather
Workers Planned: 3
-> Partial Aggregate
-> Parallel Seq Scan on lineitem_clone
-- ensure distributed plans don't break
EXPLAIN (COSTS FALSE) SELECT avg(l_linenumber) FROM lineitem;
Aggregate

View File

@ -0,0 +1,942 @@
--
-- MULTI_EXPLAIN
--
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 570000;
-- print major version to make version-specific tests clear
SELECT substring(version(), '\d+(?:\.\d+)?') AS major_version;
major_version
---------------
9.5
(1 row)
\a\t
SET citus.task_executor_type TO 'real-time';
SET citus.explain_distributed_queries TO on;
-- Function that parses explain output as JSON
CREATE FUNCTION explain_json(query text)
RETURNS jsonb
AS $BODY$
DECLARE
result jsonb;
BEGIN
EXECUTE format('EXPLAIN (FORMAT JSON) %s', query) INTO result;
RETURN result;
END;
$BODY$ LANGUAGE plpgsql;
-- Function that parses explain output as XML
CREATE FUNCTION explain_xml(query text)
RETURNS xml
AS $BODY$
DECLARE
result xml;
BEGIN
EXECUTE format('EXPLAIN (FORMAT XML) %s', query) INTO result;
RETURN result;
END;
$BODY$ LANGUAGE plpgsql;
-- VACUMM related tables to ensure test outputs are stable
VACUUM ANALYZE lineitem;
VACUUM ANALYZE orders;
-- Test Text format
EXPLAIN (COSTS FALSE, FORMAT TEXT)
SELECT l_quantity, count(*) count_quantity FROM lineitem
GROUP BY l_quantity ORDER BY count_quantity, l_quantity;
Sort
Sort Key: COALESCE((sum((COALESCE((sum(remote_scan.count_quantity))::bigint, '0'::bigint))))::bigint, '0'::bigint), remote_scan.l_quantity
-> HashAggregate
Group Key: remote_scan.l_quantity
-> Custom Scan (Citus Real-Time)
Task Count: 8
Tasks Shown: One of 8
-> Task
Node: host=localhost port=57637 dbname=regression
-> HashAggregate
Group Key: l_quantity
-> Seq Scan on lineitem_290001 lineitem
-- Test JSON format
EXPLAIN (COSTS FALSE, FORMAT JSON)
SELECT l_quantity, count(*) count_quantity FROM lineitem
GROUP BY l_quantity ORDER BY count_quantity, l_quantity;
[
{
"Plan": {
"Node Type": "Sort",
"Sort Key": ["COALESCE((sum((COALESCE((sum(remote_scan.count_quantity))::bigint, '0'::bigint))))::bigint, '0'::bigint)", "remote_scan.l_quantity"],
"Plans": [
{
"Node Type": "Aggregate",
"Strategy": "Hashed",
"Parent Relationship": "Outer",
"Group Key": ["remote_scan.l_quantity"],
"Plans": [
{
"Node Type": "Custom Scan",
"Parent Relationship": "Outer",
"Custom Plan Provider": "Citus Real-Time",
"Distributed Query": {
"Job": {
"Task Count": 8,
"Tasks Shown": "One of 8",
"Tasks": [
{
"Node": "host=localhost port=57637 dbname=regression",
"Remote Plan": [
[
{
"Plan": {
"Node Type": "Aggregate",
"Strategy": "Hashed",
"Group Key": ["l_quantity"],
"Plans": [
{
"Node Type": "Seq Scan",
"Parent Relationship": "Outer",
"Relation Name": "lineitem_290001",
"Alias": "lineitem"
}
]
}
}
]
]
}
]
}
}
}
]
}
]
}
}
]
-- Validate JSON format
SELECT true AS valid FROM explain_json($$
SELECT l_quantity, count(*) count_quantity FROM lineitem
GROUP BY l_quantity ORDER BY count_quantity, l_quantity$$);
t
-- Test XML format
EXPLAIN (COSTS FALSE, FORMAT XML)
SELECT l_quantity, count(*) count_quantity FROM lineitem
GROUP BY l_quantity ORDER BY count_quantity, l_quantity;
<explain xmlns="http://www.postgresql.org/2009/explain">
<Query>
<Plan>
<Node-Type>Sort</Node-Type>
<Sort-Key>
<Item>COALESCE((sum((COALESCE((sum(remote_scan.count_quantity))::bigint, '0'::bigint))))::bigint, '0'::bigint)</Item>
<Item>remote_scan.l_quantity</Item>
</Sort-Key>
<Plans>
<Plan>
<Node-Type>Aggregate</Node-Type>
<Strategy>Hashed</Strategy>
<Parent-Relationship>Outer</Parent-Relationship>
<Group-Key>
<Item>remote_scan.l_quantity</Item>
</Group-Key>
<Plans>
<Plan>
<Node-Type>Custom Scan</Node-Type>
<Parent-Relationship>Outer</Parent-Relationship>
<Custom-Plan-Provider>Citus Real-Time</Custom-Plan-Provider>
<Distributed-Query>
<Job>
<Task-Count>8</Task-Count>
<Tasks-Shown>One of 8</Tasks-Shown>
<Tasks>
<Task>
<Node>host=localhost port=57637 dbname=regression</Node>
<Remote-Plan>
<explain xmlns="http://www.postgresql.org/2009/explain">
<Query>
<Plan>
<Node-Type>Aggregate</Node-Type>
<Strategy>Hashed</Strategy>
<Group-Key>
<Item>l_quantity</Item>
</Group-Key>
<Plans>
<Plan>
<Node-Type>Seq Scan</Node-Type>
<Parent-Relationship>Outer</Parent-Relationship>
<Relation-Name>lineitem_290001</Relation-Name>
<Alias>lineitem</Alias>
</Plan>
</Plans>
</Plan>
</Query>
</explain>
</Remote-Plan>
</Task>
</Tasks>
</Job>
</Distributed-Query>
</Plan>
</Plans>
</Plan>
</Plans>
</Plan>
</Query>
</explain>
-- Validate XML format
SELECT true AS valid FROM explain_xml($$
SELECT l_quantity, count(*) count_quantity FROM lineitem
GROUP BY l_quantity ORDER BY count_quantity, l_quantity$$);
t
-- Test YAML format
EXPLAIN (COSTS FALSE, FORMAT YAML)
SELECT l_quantity, count(*) count_quantity FROM lineitem
GROUP BY l_quantity ORDER BY count_quantity, l_quantity;
- Plan:
Node Type: "Sort"
Sort Key:
- "COALESCE((sum((COALESCE((sum(remote_scan.count_quantity))::bigint, '0'::bigint))))::bigint, '0'::bigint)"
- "remote_scan.l_quantity"
Plans:
- Node Type: "Aggregate"
Strategy: "Hashed"
Parent Relationship: "Outer"
Group Key:
- "remote_scan.l_quantity"
Plans:
- Node Type: "Custom Scan"
Parent Relationship: "Outer"
Custom Plan Provider: "Citus Real-Time"
Distributed Query:
Job:
Task Count: 8
Tasks Shown: "One of 8"
Tasks:
- Node: "host=localhost port=57637 dbname=regression"
Remote Plan:
- Plan:
Node Type: "Aggregate"
Strategy: "Hashed"
Group Key:
- "l_quantity"
Plans:
- Node Type: "Seq Scan"
Parent Relationship: "Outer"
Relation Name: "lineitem_290001"
Alias: "lineitem"
-- Test Text format
EXPLAIN (COSTS FALSE, FORMAT TEXT)
SELECT l_quantity, count(*) count_quantity FROM lineitem
GROUP BY l_quantity ORDER BY count_quantity, l_quantity;
Sort
Sort Key: COALESCE((sum((COALESCE((sum(remote_scan.count_quantity))::bigint, '0'::bigint))))::bigint, '0'::bigint), remote_scan.l_quantity
-> HashAggregate
Group Key: remote_scan.l_quantity
-> Custom Scan (Citus Real-Time)
Task Count: 8
Tasks Shown: One of 8
-> Task
Node: host=localhost port=57637 dbname=regression
-> HashAggregate
Group Key: l_quantity
-> Seq Scan on lineitem_290001 lineitem
-- Test verbose
EXPLAIN (COSTS FALSE, VERBOSE TRUE)
SELECT sum(l_quantity) / avg(l_quantity) FROM lineitem;
Aggregate
Output: (sum(remote_scan."?column?") / (sum(remote_scan."?column?_1") / sum(remote_scan."?column?_2")))
-> Custom Scan (Citus Real-Time)
Output: remote_scan."?column?", remote_scan."?column?_1", remote_scan."?column?_2"
Task Count: 8
Tasks Shown: One of 8
-> Task
Node: host=localhost port=57637 dbname=regression
-> Aggregate
Output: sum(l_quantity), sum(l_quantity), count(l_quantity)
-> Seq Scan on public.lineitem_290001 lineitem
Output: l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment
-- Test join
EXPLAIN (COSTS FALSE)
SELECT * FROM lineitem
JOIN orders ON l_orderkey = o_orderkey AND l_quantity < 5.0
ORDER BY l_quantity LIMIT 10;
Limit
-> Sort
Sort Key: remote_scan.l_quantity
-> Custom Scan (Citus Real-Time)
Task Count: 8
Tasks Shown: One of 8
-> Task
Node: host=localhost port=57637 dbname=regression
-> Limit
-> Sort
Sort Key: lineitem.l_quantity
-> Merge Join
Merge Cond: (orders.o_orderkey = lineitem.l_orderkey)
-> Index Scan using orders_pkey_290008 on orders_290008 orders
-> Sort
Sort Key: lineitem.l_orderkey
-> Seq Scan on lineitem_290001 lineitem
Filter: (l_quantity < 5.0)
-- Test insert
EXPLAIN (COSTS FALSE)
INSERT INTO lineitem VALUES(1,0);
Custom Scan (Citus Router)
Task Count: 1
Tasks Shown: All
-> Task
Node: host=localhost port=57638 dbname=regression
-> Insert on lineitem_290000
-> Result
-- Test update
EXPLAIN (COSTS FALSE)
UPDATE lineitem
SET l_suppkey = 12
WHERE l_orderkey = 1 AND l_partkey = 0;
Custom Scan (Citus Router)
Task Count: 1
Tasks Shown: All
-> Task
Node: host=localhost port=57638 dbname=regression
-> Update on lineitem_290000
-> Index Scan using lineitem_pkey_290000 on lineitem_290000
Index Cond: (l_orderkey = 1)
Filter: (l_partkey = 0)
-- Test delete
EXPLAIN (COSTS FALSE)
DELETE FROM lineitem
WHERE l_orderkey = 1 AND l_partkey = 0;
Custom Scan (Citus Router)
Task Count: 1
Tasks Shown: All
-> Task
Node: host=localhost port=57638 dbname=regression
-> Delete on lineitem_290000
-> Index Scan using lineitem_pkey_290000 on lineitem_290000
Index Cond: (l_orderkey = 1)
Filter: (l_partkey = 0)
-- Test single-shard SELECT
EXPLAIN (COSTS FALSE)
SELECT l_quantity FROM lineitem WHERE l_orderkey = 5;
Custom Scan (Citus Router)
Task Count: 1
Tasks Shown: All
-> Task
Node: host=localhost port=57637 dbname=regression
-> Index Scan using lineitem_pkey_290000 on lineitem_290000 lineitem
Index Cond: (l_orderkey = 5)
SELECT true AS valid FROM explain_xml($$
SELECT l_quantity FROM lineitem WHERE l_orderkey = 5$$);
t
SELECT true AS valid FROM explain_json($$
SELECT l_quantity FROM lineitem WHERE l_orderkey = 5$$);
t
-- Test CREATE TABLE ... AS
EXPLAIN (COSTS FALSE)
CREATE TABLE explain_result AS
SELECT * FROM lineitem;
Custom Scan (Citus Real-Time)
Task Count: 8
Tasks Shown: One of 8
-> Task
Node: host=localhost port=57637 dbname=regression
-> Seq Scan on lineitem_290001 lineitem
-- Test having
EXPLAIN (COSTS FALSE, VERBOSE TRUE)
SELECT sum(l_quantity) / avg(l_quantity) FROM lineitem
HAVING sum(l_quantity) > 100;
Aggregate
Output: (sum(remote_scan."?column?") / (sum(remote_scan."?column?_1") / sum(remote_scan."?column?_2")))
Filter: (sum(remote_scan.worker_column_4) > '100'::numeric)
-> Custom Scan (Citus Real-Time)
Output: remote_scan."?column?", remote_scan."?column?_1", remote_scan."?column?_2", remote_scan.worker_column_4
Task Count: 8
Tasks Shown: One of 8
-> Task
Node: host=localhost port=57637 dbname=regression
-> Aggregate
Output: sum(l_quantity), sum(l_quantity), count(l_quantity), sum(l_quantity)
-> Seq Scan on public.lineitem_290001 lineitem
Output: l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment
-- Test having without aggregate
EXPLAIN (COSTS FALSE, VERBOSE TRUE)
SELECT l_quantity FROM lineitem
GROUP BY l_quantity
HAVING l_quantity > (100 * random());
HashAggregate
Output: remote_scan.l_quantity
Group Key: remote_scan.l_quantity
Filter: ((remote_scan.worker_column_2)::double precision > ('100'::double precision * random()))
-> Custom Scan (Citus Real-Time)
Output: remote_scan.l_quantity, remote_scan.worker_column_2
Task Count: 8
Tasks Shown: One of 8
-> Task
Node: host=localhost port=57637 dbname=regression
-> HashAggregate
Output: l_quantity, l_quantity
Group Key: lineitem.l_quantity
-> Seq Scan on public.lineitem_290001 lineitem
Output: l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment
-- Subquery pushdown tests with explain
EXPLAIN (COSTS OFF)
SELECT
avg(array_length(events, 1)) AS event_average
FROM
(SELECT
tenant_id,
user_id,
array_agg(event_type ORDER BY event_time) AS events
FROM
(SELECT
(users.composite_id).tenant_id,
(users.composite_id).user_id,
event_type,
events.event_time
FROM
users,
events
WHERE
(users.composite_id) = (events.composite_id) AND
users.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND
users.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND
event_type IN ('click', 'submit', 'pay')) AS subquery
GROUP BY
tenant_id,
user_id) AS subquery;
Aggregate
-> Custom Scan (Citus Real-Time)
Task Count: 4
Tasks Shown: One of 4
-> Task
Node: host=localhost port=57637 dbname=regression
-> Aggregate
-> GroupAggregate
Group Key: ((users.composite_id).tenant_id), ((users.composite_id).user_id)
-> Sort
Sort Key: ((users.composite_id).tenant_id), ((users.composite_id).user_id)
-> Result
One-Time Filter: false
-- Union and left join subquery pushdown
EXPLAIN (COSTS OFF)
SELECT
avg(array_length(events, 1)) AS event_average,
hasdone
FROM
(SELECT
subquery_1.tenant_id,
subquery_1.user_id,
array_agg(event ORDER BY event_time) AS events,
COALESCE(hasdone, 'Has not done paying') AS hasdone
FROM
(
(SELECT
(users.composite_id).tenant_id,
(users.composite_id).user_id,
(users.composite_id) as composite_id,
'action=>1'AS event,
events.event_time
FROM
users,
events
WHERE
(users.composite_id) = (events.composite_id) AND
users.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND
users.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND
event_type = 'click')
UNION
(SELECT
(users.composite_id).tenant_id,
(users.composite_id).user_id,
(users.composite_id) as composite_id,
'action=>2'AS event,
events.event_time
FROM
users,
events
WHERE
(users.composite_id) = (events.composite_id) AND
users.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND
users.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND
event_type = 'submit')
) AS subquery_1
LEFT JOIN
(SELECT
DISTINCT ON ((composite_id).tenant_id, (composite_id).user_id) composite_id,
(composite_id).tenant_id,
(composite_id).user_id,
'Has done paying'::TEXT AS hasdone
FROM
events
WHERE
events.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND
events.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND
event_type = 'pay') AS subquery_2
ON
subquery_1.composite_id = subquery_2.composite_id
GROUP BY
subquery_1.tenant_id,
subquery_1.user_id,
hasdone) AS subquery_top
GROUP BY
hasdone;
HashAggregate
Group Key: remote_scan.hasdone
-> Custom Scan (Citus Real-Time)
Task Count: 4
Tasks Shown: One of 4
-> Task
Node: host=localhost port=57637 dbname=regression
-> HashAggregate
Group Key: COALESCE(subquery_2.hasdone, 'Has not done paying'::text)
-> GroupAggregate
Group Key: ((composite_id).tenant_id), ((composite_id).user_id), subquery_2.hasdone
-> Sort
Sort Key: ((composite_id).tenant_id), ((composite_id).user_id), subquery_2.hasdone
-> Hash Left Join
Hash Cond: (composite_id = subquery_2.composite_id)
-> Unique
-> Sort
Sort Key: ((composite_id).tenant_id), ((composite_id).user_id), composite_id, ('action=>1'::text), event_time
-> Append
-> Result
One-Time Filter: false
-> Result
One-Time Filter: false
-> Hash
-> Subquery Scan on subquery_2
-> Unique
-> Sort
Sort Key: ((events.composite_id).tenant_id), ((events.composite_id).user_id)
-> Seq Scan on events_1400027 events
Filter: ((composite_id >= '(1,-9223372036854775808)'::user_composite_type) AND (composite_id <= '(1,9223372036854775807)'::user_composite_type) AND ((event_type)::text = 'pay'::text))
-- Union, left join and having subquery pushdown
EXPLAIN (COSTS OFF)
SELECT
avg(array_length(events, 1)) AS event_average,
count_pay
FROM (
SELECT
subquery_1.tenant_id,
subquery_1.user_id,
array_agg(event ORDER BY event_time) AS events,
COALESCE(count_pay, 0) AS count_pay
FROM
(
(SELECT
(users.composite_id).tenant_id,
(users.composite_id).user_id,
(users.composite_id),
'action=>1'AS event,
events.event_time
FROM
users,
events
WHERE
(users.composite_id) = (events.composite_id) AND
users.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND
users.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND
event_type = 'click')
UNION
(SELECT
(users.composite_id).tenant_id,
(users.composite_id).user_id,
(users.composite_id),
'action=>2'AS event,
events.event_time
FROM
users,
events
WHERE
(users.composite_id) = (events.composite_id) AND
users.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND
users.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND
event_type = 'submit')
) AS subquery_1
LEFT JOIN
(SELECT
(composite_id).tenant_id,
(composite_id).user_id,
composite_id,
COUNT(*) AS count_pay
FROM
events
WHERE
events.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND
events.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND
event_type = 'pay'
GROUP BY
composite_id
HAVING
COUNT(*) > 2) AS subquery_2
ON
subquery_1.composite_id = subquery_2.composite_id
GROUP BY
subquery_1.tenant_id,
subquery_1.user_id,
count_pay) AS subquery_top
WHERE
array_ndims(events) > 0
GROUP BY
count_pay
ORDER BY
count_pay;
ERROR: bogus varattno for OUTER_VAR var: 3
-- Lateral join subquery pushdown
-- set subquery_pushdown due to limit in the query
SET citus.subquery_pushdown to ON;
EXPLAIN (COSTS OFF)
SELECT
tenant_id,
user_id,
user_lastseen,
event_array
FROM
(SELECT
tenant_id,
user_id,
max(lastseen) as user_lastseen,
array_agg(event_type ORDER BY event_time) AS event_array
FROM
(SELECT
(composite_id).tenant_id,
(composite_id).user_id,
composite_id,
lastseen
FROM
users
WHERE
composite_id >= '(1, -9223372036854775808)'::user_composite_type AND
composite_id <= '(1, 9223372036854775807)'::user_composite_type
ORDER BY
lastseen DESC
LIMIT
10
) AS subquery_top
LEFT JOIN LATERAL
(SELECT
event_type,
event_time
FROM
events
WHERE
(composite_id) = subquery_top.composite_id
ORDER BY
event_time DESC
LIMIT
99) AS subquery_lateral
ON
true
GROUP BY
tenant_id,
user_id
) AS shard_union
ORDER BY
user_lastseen DESC
LIMIT
10;
Limit
-> Sort
Sort Key: remote_scan.user_lastseen DESC
-> Custom Scan (Citus Real-Time)
Task Count: 4
Tasks Shown: One of 4
-> Task
Node: host=localhost port=57637 dbname=regression
-> Limit
-> Sort
Sort Key: (max(lastseen)) DESC
-> GroupAggregate
Group Key: ((composite_id).tenant_id), ((composite_id).user_id)
-> Sort
Sort Key: ((composite_id).tenant_id), ((composite_id).user_id)
-> Nested Loop Left Join
-> Limit
-> Sort
Sort Key: lastseen DESC
-> Result
One-Time Filter: false
-> Limit
-> Sort
Sort Key: events.event_time DESC
-> Seq Scan on events_1400027 events
Filter: (composite_id = composite_id)
-- Test all tasks output
SET citus.explain_all_tasks TO on;
EXPLAIN (COSTS FALSE)
SELECT avg(l_linenumber) FROM lineitem WHERE l_orderkey > 9030;
Aggregate
-> Custom Scan (Citus Real-Time)
Task Count: 4
Tasks Shown: All
-> Task
Node: host=localhost port=57637 dbname=regression
-> Aggregate
-> Seq Scan on lineitem_290005 lineitem
Filter: (l_orderkey > 9030)
-> Task
Node: host=localhost port=57638 dbname=regression
-> Aggregate
-> Seq Scan on lineitem_290004 lineitem
Filter: (l_orderkey > 9030)
-> Task
Node: host=localhost port=57637 dbname=regression
-> Aggregate
-> Seq Scan on lineitem_290007 lineitem
Filter: (l_orderkey > 9030)
-> Task
Node: host=localhost port=57638 dbname=regression
-> Aggregate
-> Seq Scan on lineitem_290006 lineitem
Filter: (l_orderkey > 9030)
SELECT true AS valid FROM explain_xml($$
SELECT avg(l_linenumber) FROM lineitem WHERE l_orderkey > 9030$$);
t
SELECT true AS valid FROM explain_json($$
SELECT avg(l_linenumber) FROM lineitem WHERE l_orderkey > 9030$$);
t
-- Test track tracker
SET citus.task_executor_type TO 'task-tracker';
SET citus.explain_all_tasks TO off;
EXPLAIN (COSTS FALSE)
SELECT avg(l_linenumber) FROM lineitem WHERE l_orderkey > 9030;
Aggregate
-> Custom Scan (Citus Task-Tracker)
Task Count: 4
Tasks Shown: One of 4
-> Task
Node: host=localhost port=57637 dbname=regression
-> Aggregate
-> Seq Scan on lineitem_290005 lineitem
Filter: (l_orderkey > 9030)
-- Test re-partition join
SET citus.large_table_shard_count TO 1;
EXPLAIN (COSTS FALSE)
SELECT count(*)
FROM lineitem, orders, customer, supplier_single_shard
WHERE l_orderkey = o_orderkey
AND o_custkey = c_custkey
AND l_suppkey = s_suppkey;
Aggregate
-> Custom Scan (Citus Task-Tracker)
Task Count: 1
Tasks Shown: None, not supported for re-partition queries
-> MapMergeJob
Map Task Count: 1
Merge Task Count: 1
-> MapMergeJob
Map Task Count: 8
Merge Task Count: 1
EXPLAIN (COSTS FALSE, FORMAT JSON)
SELECT count(*)
FROM lineitem, orders, customer, supplier_single_shard
WHERE l_orderkey = o_orderkey
AND o_custkey = c_custkey
AND l_suppkey = s_suppkey;
[
{
"Plan": {
"Node Type": "Aggregate",
"Strategy": "Plain",
"Plans": [
{
"Node Type": "Custom Scan",
"Parent Relationship": "Outer",
"Custom Plan Provider": "Citus Task-Tracker",
"Distributed Query": {
"Job": {
"Task Count": 1,
"Tasks Shown": "None, not supported for re-partition queries",
"Depended Jobs": [
{
"Map Task Count": 1,
"Merge Task Count": 1,
"Depended Jobs": [
{
"Map Task Count": 8,
"Merge Task Count": 1
}
]
}
]
}
}
}
]
}
}
]
SELECT true AS valid FROM explain_json($$
SELECT count(*)
FROM lineitem, orders, customer, supplier_single_shard
WHERE l_orderkey = o_orderkey
AND o_custkey = c_custkey
AND l_suppkey = s_suppkey$$);
t
EXPLAIN (COSTS FALSE, FORMAT XML)
SELECT count(*)
FROM lineitem, orders, customer, supplier_single_shard
WHERE l_orderkey = o_orderkey
AND o_custkey = c_custkey
AND l_suppkey = s_suppkey;
<explain xmlns="http://www.postgresql.org/2009/explain">
<Query>
<Plan>
<Node-Type>Aggregate</Node-Type>
<Strategy>Plain</Strategy>
<Plans>
<Plan>
<Node-Type>Custom Scan</Node-Type>
<Parent-Relationship>Outer</Parent-Relationship>
<Custom-Plan-Provider>Citus Task-Tracker</Custom-Plan-Provider>
<Distributed-Query>
<Job>
<Task-Count>1</Task-Count>
<Tasks-Shown>None, not supported for re-partition queries</Tasks-Shown>
<Depended-Jobs>
<MapMergeJob>
<Map-Task-Count>1</Map-Task-Count>
<Merge-Task-Count>1</Merge-Task-Count>
<Depended-Jobs>
<MapMergeJob>
<Map-Task-Count>8</Map-Task-Count>
<Merge-Task-Count>1</Merge-Task-Count>
</MapMergeJob>
</Depended-Jobs>
</MapMergeJob>
</Depended-Jobs>
</Job>
</Distributed-Query>
</Plan>
</Plans>
</Plan>
</Query>
</explain>
SELECT true AS valid FROM explain_xml($$
SELECT count(*)
FROM lineitem, orders, customer, supplier
WHERE l_orderkey = o_orderkey
AND o_custkey = c_custkey
AND l_suppkey = s_suppkey$$);
t
-- make sure that EXPLAIN works without
-- problems for queries that inlvolves only
-- reference tables
SELECT true AS valid FROM explain_xml($$
SELECT count(*)
FROM nation
WHERE n_name = 'CHINA'$$);
t
SELECT true AS valid FROM explain_xml($$
SELECT count(*)
FROM nation, supplier
WHERE nation.n_nationkey = supplier.s_nationkey$$);
t
EXPLAIN (COSTS FALSE, FORMAT YAML)
SELECT count(*)
FROM lineitem, orders, customer, supplier_single_shard
WHERE l_orderkey = o_orderkey
AND o_custkey = c_custkey
AND l_suppkey = s_suppkey;
- Plan:
Node Type: "Aggregate"
Strategy: "Plain"
Plans:
- Node Type: "Custom Scan"
Parent Relationship: "Outer"
Custom Plan Provider: "Citus Task-Tracker"
Distributed Query:
Job:
Task Count: 1
Tasks Shown: "None, not supported for re-partition queries"
Depended Jobs:
- Map Task Count: 1
Merge Task Count: 1
Depended Jobs:
- Map Task Count: 8
Merge Task Count: 1
-- test parallel aggregates
SET parallel_setup_cost=0;
ERROR: unrecognized configuration parameter "parallel_setup_cost"
SET parallel_tuple_cost=0;
ERROR: unrecognized configuration parameter "parallel_tuple_cost"
SET min_parallel_relation_size=0;
ERROR: unrecognized configuration parameter "min_parallel_relation_size"
SET max_parallel_workers_per_gather=4;
ERROR: unrecognized configuration parameter "max_parallel_workers_per_gather"
-- ensure local plans display correctly
CREATE TABLE lineitem_clone (LIKE lineitem);
EXPLAIN (COSTS FALSE) SELECT avg(l_linenumber) FROM lineitem_clone;
Aggregate
-> Seq Scan on lineitem_clone
-- ensure distributed plans don't break
EXPLAIN (COSTS FALSE) SELECT avg(l_linenumber) FROM lineitem;
Aggregate
-> Custom Scan (Citus Task-Tracker)
Task Count: 8
Tasks Shown: One of 8
-> Task
Node: host=localhost port=57637 dbname=regression
-> Aggregate
-> Seq Scan on lineitem_290001 lineitem
-- ensure EXPLAIN EXECUTE doesn't crash
PREPARE task_tracker_query AS
SELECT avg(l_linenumber) FROM lineitem WHERE l_orderkey > 9030;
EXPLAIN (COSTS FALSE) EXECUTE task_tracker_query;
Aggregate
-> Custom Scan (Citus Task-Tracker)
Task Count: 4
Tasks Shown: One of 4
-> Task
Node: host=localhost port=57637 dbname=regression
-> Aggregate
-> Seq Scan on lineitem_290005 lineitem
Filter: (l_orderkey > 9030)
SET citus.task_executor_type TO 'real-time';
PREPARE router_executor_query AS SELECT l_quantity FROM lineitem WHERE l_orderkey = 5;
EXPLAIN EXECUTE router_executor_query;
Custom Scan (Citus Router) (cost=0.00..0.00 rows=0 width=0)
Task Count: 1
Tasks Shown: All
-> Task
Node: host=localhost port=57637 dbname=regression
-> Index Scan using lineitem_pkey_290000 on lineitem_290000 lineitem (cost=0.28..11.83 rows=3 width=5)
Index Cond: (l_orderkey = 5)
PREPARE real_time_executor_query AS
SELECT avg(l_linenumber) FROM lineitem WHERE l_orderkey > 9030;
EXPLAIN (COSTS FALSE) EXECUTE real_time_executor_query;
Aggregate
-> Custom Scan (Citus Real-Time)
Task Count: 4
Tasks Shown: One of 4
-> Task
Node: host=localhost port=57637 dbname=regression
-> Aggregate
-> Seq Scan on lineitem_290005 lineitem
Filter: (l_orderkey > 9030)
-- EXPLAIN EXECUTE of parametrized prepared statements is broken, but
-- at least make sure to fail without crashing
PREPARE router_executor_query_param(int) AS SELECT l_quantity FROM lineitem WHERE l_orderkey = $1;
EXPLAIN EXECUTE router_executor_query_param(5);
Custom Scan (Citus Router) (cost=0.00..0.00 rows=0 width=0)
Task Count: 1
Tasks Shown: All
-> Task
Node: host=localhost port=57637 dbname=regression
-> Index Scan using lineitem_pkey_290000 on lineitem_290000 lineitem (cost=0.28..11.83 rows=3 width=5)
Index Cond: (l_orderkey = 5)
-- test explain in a transaction with alter table to test we use right connections
BEGIN;
CREATE TABLE explain_table(id int);
SELECT create_distributed_table('explain_table', 'id');
ALTER TABLE explain_table ADD COLUMN value int;
NOTICE: using one-phase commit for distributed DDL commands
HINT: You can enable two-phase commit for extra safety with: SET citus.multi_shard_commit_protocol TO '2pc'
EXPLAIN (COSTS FALSE) SELECT value FROM explain_table WHERE id = 1;
Custom Scan (Citus Router)
Task Count: 1
Tasks Shown: All
-> Task
Node: host=localhost port=57637 dbname=regression
-> Seq Scan on explain_table_570001 explain_table
Filter: (id = 1)
ROLLBACK;

View File

@ -0,0 +1,246 @@
--
-- MULTI_LARGE_TABLE_PLANNING
--
-- Tests that cover large table join planning. Note that we explicitly start a
-- transaction block here so that we don't emit debug messages with changing
-- transaction ids in them. Also, we set the executor type to task tracker
-- executor here, as we cannot run repartition jobs with real time executor.
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 690000;
SET citus.enable_unique_job_ids TO off;
BEGIN;
SET client_min_messages TO DEBUG4;
DEBUG: CommitTransactionCommand
SET citus.large_table_shard_count TO 2;
DEBUG: StartTransactionCommand
DEBUG: ProcessUtility
DEBUG: CommitTransactionCommand
SET citus.task_executor_type TO 'task-tracker';
DEBUG: StartTransactionCommand
DEBUG: ProcessUtility
DEBUG: CommitTransactionCommand
-- Debug4 log messages display jobIds within them. We explicitly set the jobId
-- sequence here so that the regression output becomes independent of the number
-- of jobs executed prior to running this test.
-- Multi-level repartition join to verify our projection columns are correctly
-- referenced and propagated across multiple repartition jobs. The test also
-- validates that only the minimal necessary projection columns are transferred
-- between jobs.
SELECT
l_partkey, o_orderkey, count(*)
FROM
lineitem, part, orders, customer
WHERE
l_orderkey = o_orderkey AND
l_partkey = p_partkey AND
c_custkey = o_custkey AND
(l_quantity > 5.0 OR l_extendedprice > 1200.0) AND
p_size > 8 AND o_totalprice > 10.0 AND
c_acctbal < 5000.0 AND l_partkey < 1000
GROUP BY
l_partkey, o_orderkey
ORDER BY
l_partkey, o_orderkey;
DEBUG: StartTransactionCommand
DEBUG: join prunable for intervals [1,1509] and [8997,14946]
DEBUG: join prunable for intervals [1509,4964] and [8997,14946]
DEBUG: join prunable for intervals [2951,4455] and [8997,14946]
DEBUG: join prunable for intervals [4480,5986] and [8997,14946]
DEBUG: join prunable for intervals [8997,10560] and [1,5986]
DEBUG: join prunable for intervals [10560,12036] and [1,5986]
DEBUG: join prunable for intervals [12036,13473] and [1,5986]
DEBUG: join prunable for intervals [13473,14947] and [1,5986]
DEBUG: generated sql query for task 3
DETAIL: query string: "SELECT lineitem.l_partkey, orders.o_orderkey, lineitem.l_quantity, lineitem.l_extendedprice, orders.o_custkey FROM (lineitem_290000 lineitem JOIN orders_290008 orders ON ((lineitem.l_orderkey = orders.o_orderkey))) WHERE ((lineitem.l_partkey < 1000) AND (orders.o_totalprice > 10.0))"
DEBUG: generated sql query for task 6
DETAIL: query string: "SELECT lineitem.l_partkey, orders.o_orderkey, lineitem.l_quantity, lineitem.l_extendedprice, orders.o_custkey FROM (lineitem_290001 lineitem JOIN orders_290008 orders ON ((lineitem.l_orderkey = orders.o_orderkey))) WHERE ((lineitem.l_partkey < 1000) AND (orders.o_totalprice > 10.0))"
DEBUG: generated sql query for task 9
DETAIL: query string: "SELECT lineitem.l_partkey, orders.o_orderkey, lineitem.l_quantity, lineitem.l_extendedprice, orders.o_custkey FROM (lineitem_290002 lineitem JOIN orders_290008 orders ON ((lineitem.l_orderkey = orders.o_orderkey))) WHERE ((lineitem.l_partkey < 1000) AND (orders.o_totalprice > 10.0))"
DEBUG: generated sql query for task 12
DETAIL: query string: "SELECT lineitem.l_partkey, orders.o_orderkey, lineitem.l_quantity, lineitem.l_extendedprice, orders.o_custkey FROM (lineitem_290003 lineitem JOIN orders_290008 orders ON ((lineitem.l_orderkey = orders.o_orderkey))) WHERE ((lineitem.l_partkey < 1000) AND (orders.o_totalprice > 10.0))"
DEBUG: generated sql query for task 15
DETAIL: query string: "SELECT lineitem.l_partkey, orders.o_orderkey, lineitem.l_quantity, lineitem.l_extendedprice, orders.o_custkey FROM (lineitem_290004 lineitem JOIN orders_290009 orders ON ((lineitem.l_orderkey = orders.o_orderkey))) WHERE ((lineitem.l_partkey < 1000) AND (orders.o_totalprice > 10.0))"
DEBUG: generated sql query for task 18
DETAIL: query string: "SELECT lineitem.l_partkey, orders.o_orderkey, lineitem.l_quantity, lineitem.l_extendedprice, orders.o_custkey FROM (lineitem_290005 lineitem JOIN orders_290009 orders ON ((lineitem.l_orderkey = orders.o_orderkey))) WHERE ((lineitem.l_partkey < 1000) AND (orders.o_totalprice > 10.0))"
DEBUG: generated sql query for task 21
DETAIL: query string: "SELECT lineitem.l_partkey, orders.o_orderkey, lineitem.l_quantity, lineitem.l_extendedprice, orders.o_custkey FROM (lineitem_290006 lineitem JOIN orders_290009 orders ON ((lineitem.l_orderkey = orders.o_orderkey))) WHERE ((lineitem.l_partkey < 1000) AND (orders.o_totalprice > 10.0))"
DEBUG: generated sql query for task 24
DETAIL: query string: "SELECT lineitem.l_partkey, orders.o_orderkey, lineitem.l_quantity, lineitem.l_extendedprice, orders.o_custkey FROM (lineitem_290007 lineitem JOIN orders_290009 orders ON ((lineitem.l_orderkey = orders.o_orderkey))) WHERE ((lineitem.l_partkey < 1000) AND (orders.o_totalprice > 10.0))"
DEBUG: assigned task 6 to node localhost:57637
DEBUG: assigned task 3 to node localhost:57638
DEBUG: assigned task 12 to node localhost:57637
DEBUG: assigned task 9 to node localhost:57638
DEBUG: assigned task 18 to node localhost:57637
DEBUG: assigned task 15 to node localhost:57638
DEBUG: assigned task 24 to node localhost:57637
DEBUG: assigned task 21 to node localhost:57638
DEBUG: join prunable for intervals [1,1000] and [6001,7000]
DEBUG: join prunable for intervals [6001,7000] and [1,1000]
DEBUG: generated sql query for task 3
DETAIL: query string: "SELECT "pg_merge_job_0001.task_000025".intermediate_column_1_0, "pg_merge_job_0001.task_000025".intermediate_column_1_1, "pg_merge_job_0001.task_000025".intermediate_column_1_2, "pg_merge_job_0001.task_000025".intermediate_column_1_3, "pg_merge_job_0001.task_000025".intermediate_column_1_4 FROM (pg_merge_job_0001.task_000025 "pg_merge_job_0001.task_000025" JOIN part_290011 part ON (("pg_merge_job_0001.task_000025".intermediate_column_1_0 = part.p_partkey))) WHERE (part.p_size > 8)"
DEBUG: generated sql query for task 6
DETAIL: query string: "SELECT "pg_merge_job_0001.task_000034".intermediate_column_1_0, "pg_merge_job_0001.task_000034".intermediate_column_1_1, "pg_merge_job_0001.task_000034".intermediate_column_1_2, "pg_merge_job_0001.task_000034".intermediate_column_1_3, "pg_merge_job_0001.task_000034".intermediate_column_1_4 FROM (pg_merge_job_0001.task_000034 "pg_merge_job_0001.task_000034" JOIN part_280002 part ON (("pg_merge_job_0001.task_000034".intermediate_column_1_0 = part.p_partkey))) WHERE (part.p_size > 8)"
DEBUG: pruning merge fetch taskId 1
DETAIL: Creating dependency on merge taskId 25
DEBUG: pruning merge fetch taskId 4
DETAIL: Creating dependency on merge taskId 34
DEBUG: assigned task 3 to node localhost:57637
DEBUG: assigned task 6 to node localhost:57638
DEBUG: join prunable for intervals [1,1000] and [1001,2000]
DEBUG: join prunable for intervals [1,1000] and [6001,7000]
DEBUG: join prunable for intervals [1001,2000] and [1,1000]
DEBUG: join prunable for intervals [1001,2000] and [6001,7000]
DEBUG: join prunable for intervals [6001,7000] and [1,1000]
DEBUG: join prunable for intervals [6001,7000] and [1001,2000]
DEBUG: generated sql query for task 3
DETAIL: query string: "SELECT "pg_merge_job_0002.task_000007".intermediate_column_2_0 AS l_partkey, "pg_merge_job_0002.task_000007".intermediate_column_2_1 AS o_orderkey, count(*) AS count FROM (pg_merge_job_0002.task_000007 "pg_merge_job_0002.task_000007" JOIN customer_290010 customer ON ((customer.c_custkey = "pg_merge_job_0002.task_000007".intermediate_column_2_4))) WHERE ((("pg_merge_job_0002.task_000007".intermediate_column_2_2 > 5.0) OR ("pg_merge_job_0002.task_000007".intermediate_column_2_3 > 1200.0)) AND (customer.c_acctbal < 5000.0)) GROUP BY "pg_merge_job_0002.task_000007".intermediate_column_2_0, "pg_merge_job_0002.task_000007".intermediate_column_2_1"
DEBUG: generated sql query for task 6
DETAIL: query string: "SELECT "pg_merge_job_0002.task_000010".intermediate_column_2_0 AS l_partkey, "pg_merge_job_0002.task_000010".intermediate_column_2_1 AS o_orderkey, count(*) AS count FROM (pg_merge_job_0002.task_000010 "pg_merge_job_0002.task_000010" JOIN customer_280001 customer ON ((customer.c_custkey = "pg_merge_job_0002.task_000010".intermediate_column_2_4))) WHERE ((("pg_merge_job_0002.task_000010".intermediate_column_2_2 > 5.0) OR ("pg_merge_job_0002.task_000010".intermediate_column_2_3 > 1200.0)) AND (customer.c_acctbal < 5000.0)) GROUP BY "pg_merge_job_0002.task_000010".intermediate_column_2_0, "pg_merge_job_0002.task_000010".intermediate_column_2_1"
DEBUG: generated sql query for task 9
DETAIL: query string: "SELECT "pg_merge_job_0002.task_000013".intermediate_column_2_0 AS l_partkey, "pg_merge_job_0002.task_000013".intermediate_column_2_1 AS o_orderkey, count(*) AS count FROM (pg_merge_job_0002.task_000013 "pg_merge_job_0002.task_000013" JOIN customer_280000 customer ON ((customer.c_custkey = "pg_merge_job_0002.task_000013".intermediate_column_2_4))) WHERE ((("pg_merge_job_0002.task_000013".intermediate_column_2_2 > 5.0) OR ("pg_merge_job_0002.task_000013".intermediate_column_2_3 > 1200.0)) AND (customer.c_acctbal < 5000.0)) GROUP BY "pg_merge_job_0002.task_000013".intermediate_column_2_0, "pg_merge_job_0002.task_000013".intermediate_column_2_1"
DEBUG: pruning merge fetch taskId 1
DETAIL: Creating dependency on merge taskId 7
DEBUG: pruning merge fetch taskId 4
DETAIL: Creating dependency on merge taskId 10
DEBUG: pruning merge fetch taskId 7
DETAIL: Creating dependency on merge taskId 13
DEBUG: assigned task 6 to node localhost:57637
DEBUG: assigned task 9 to node localhost:57638
DEBUG: assigned task 3 to node localhost:57637
DEBUG: completed cleanup query for job 3
DEBUG: completed cleanup query for job 3
DEBUG: completed cleanup query for job 2
DEBUG: completed cleanup query for job 2
DEBUG: completed cleanup query for job 1
DEBUG: completed cleanup query for job 1
DEBUG: CommitTransactionCommand
l_partkey | o_orderkey | count
-----------+------------+-------
18 | 12005 | 1
79 | 5121 | 1
91 | 2883 | 1
222 | 9413 | 1
278 | 1287 | 1
309 | 2374 | 1
318 | 321 | 1
321 | 5984 | 1
337 | 10403 | 1
350 | 13698 | 1
358 | 4323 | 1
364 | 9347 | 1
416 | 640 | 1
426 | 10855 | 1
450 | 35 | 1
484 | 3843 | 1
504 | 14566 | 1
510 | 13569 | 1
532 | 3175 | 1
641 | 134 | 1
669 | 10944 | 1
716 | 2885 | 1
738 | 4355 | 1
802 | 2534 | 1
824 | 9287 | 1
864 | 3175 | 1
957 | 4293 | 1
960 | 10980 | 1
963 | 4580 | 1
(29 rows)
SELECT
l_partkey, o_orderkey, count(*)
FROM
lineitem, orders
WHERE
l_suppkey = o_shippriority AND
l_quantity < 5.0 AND o_totalprice <> 4.0
GROUP BY
l_partkey, o_orderkey
ORDER BY
l_partkey, o_orderkey;
DEBUG: StartTransactionCommand
DEBUG: generated sql query for task 2
DETAIL: query string: "SELECT l_partkey, l_suppkey FROM lineitem_290000 lineitem WHERE (l_quantity < 5.0)"
DEBUG: generated sql query for task 4
DETAIL: query string: "SELECT l_partkey, l_suppkey FROM lineitem_290001 lineitem WHERE (l_quantity < 5.0)"
DEBUG: generated sql query for task 6
DETAIL: query string: "SELECT l_partkey, l_suppkey FROM lineitem_290002 lineitem WHERE (l_quantity < 5.0)"
DEBUG: generated sql query for task 8
DETAIL: query string: "SELECT l_partkey, l_suppkey FROM lineitem_290003 lineitem WHERE (l_quantity < 5.0)"
DEBUG: generated sql query for task 10
DETAIL: query string: "SELECT l_partkey, l_suppkey FROM lineitem_290004 lineitem WHERE (l_quantity < 5.0)"
DEBUG: generated sql query for task 12
DETAIL: query string: "SELECT l_partkey, l_suppkey FROM lineitem_290005 lineitem WHERE (l_quantity < 5.0)"
DEBUG: generated sql query for task 14
DETAIL: query string: "SELECT l_partkey, l_suppkey FROM lineitem_290006 lineitem WHERE (l_quantity < 5.0)"
DEBUG: generated sql query for task 16
DETAIL: query string: "SELECT l_partkey, l_suppkey FROM lineitem_290007 lineitem WHERE (l_quantity < 5.0)"
DEBUG: assigned task 4 to node localhost:57637
DEBUG: assigned task 2 to node localhost:57638
DEBUG: assigned task 8 to node localhost:57637
DEBUG: assigned task 6 to node localhost:57638
DEBUG: assigned task 12 to node localhost:57637
DEBUG: assigned task 10 to node localhost:57638
DEBUG: assigned task 16 to node localhost:57637
DEBUG: assigned task 14 to node localhost:57638
DEBUG: generated sql query for task 2
DETAIL: query string: "SELECT o_orderkey, o_shippriority FROM orders_290008 orders WHERE (o_totalprice <> 4.0)"
DEBUG: generated sql query for task 4
DETAIL: query string: "SELECT o_orderkey, o_shippriority FROM orders_290009 orders WHERE (o_totalprice <> 4.0)"
DEBUG: assigned task 4 to node localhost:57637
DEBUG: assigned task 2 to node localhost:57638
DEBUG: join prunable for task partitionId 0 and 1
DEBUG: join prunable for task partitionId 0 and 2
DEBUG: join prunable for task partitionId 0 and 3
DEBUG: join prunable for task partitionId 1 and 0
DEBUG: join prunable for task partitionId 1 and 2
DEBUG: join prunable for task partitionId 1 and 3
DEBUG: join prunable for task partitionId 2 and 0
DEBUG: join prunable for task partitionId 2 and 1
DEBUG: join prunable for task partitionId 2 and 3
DEBUG: join prunable for task partitionId 3 and 0
DEBUG: join prunable for task partitionId 3 and 1
DEBUG: join prunable for task partitionId 3 and 2
DEBUG: generated sql query for task 3
DETAIL: query string: "SELECT "pg_merge_job_0004.task_000017".intermediate_column_4_0 AS l_partkey, "pg_merge_job_0005.task_000005".intermediate_column_5_0 AS o_orderkey, count(*) AS count FROM (pg_merge_job_0004.task_000017 "pg_merge_job_0004.task_000017" JOIN pg_merge_job_0005.task_000005 "pg_merge_job_0005.task_000005" ON (("pg_merge_job_0004.task_000017".intermediate_column_4_1 = "pg_merge_job_0005.task_000005".intermediate_column_5_1))) WHERE true GROUP BY "pg_merge_job_0004.task_000017".intermediate_column_4_0, "pg_merge_job_0005.task_000005".intermediate_column_5_0"
DEBUG: generated sql query for task 6
DETAIL: query string: "SELECT "pg_merge_job_0004.task_000026".intermediate_column_4_0 AS l_partkey, "pg_merge_job_0005.task_000008".intermediate_column_5_0 AS o_orderkey, count(*) AS count FROM (pg_merge_job_0004.task_000026 "pg_merge_job_0004.task_000026" JOIN pg_merge_job_0005.task_000008 "pg_merge_job_0005.task_000008" ON (("pg_merge_job_0004.task_000026".intermediate_column_4_1 = "pg_merge_job_0005.task_000008".intermediate_column_5_1))) WHERE true GROUP BY "pg_merge_job_0004.task_000026".intermediate_column_4_0, "pg_merge_job_0005.task_000008".intermediate_column_5_0"
DEBUG: generated sql query for task 9
DETAIL: query string: "SELECT "pg_merge_job_0004.task_000035".intermediate_column_4_0 AS l_partkey, "pg_merge_job_0005.task_000011".intermediate_column_5_0 AS o_orderkey, count(*) AS count FROM (pg_merge_job_0004.task_000035 "pg_merge_job_0004.task_000035" JOIN pg_merge_job_0005.task_000011 "pg_merge_job_0005.task_000011" ON (("pg_merge_job_0004.task_000035".intermediate_column_4_1 = "pg_merge_job_0005.task_000011".intermediate_column_5_1))) WHERE true GROUP BY "pg_merge_job_0004.task_000035".intermediate_column_4_0, "pg_merge_job_0005.task_000011".intermediate_column_5_0"
DEBUG: generated sql query for task 12
DETAIL: query string: "SELECT "pg_merge_job_0004.task_000044".intermediate_column_4_0 AS l_partkey, "pg_merge_job_0005.task_000014".intermediate_column_5_0 AS o_orderkey, count(*) AS count FROM (pg_merge_job_0004.task_000044 "pg_merge_job_0004.task_000044" JOIN pg_merge_job_0005.task_000014 "pg_merge_job_0005.task_000014" ON (("pg_merge_job_0004.task_000044".intermediate_column_4_1 = "pg_merge_job_0005.task_000014".intermediate_column_5_1))) WHERE true GROUP BY "pg_merge_job_0004.task_000044".intermediate_column_4_0, "pg_merge_job_0005.task_000014".intermediate_column_5_0"
DEBUG: pruning merge fetch taskId 1
DETAIL: Creating dependency on merge taskId 17
DEBUG: pruning merge fetch taskId 2
DETAIL: Creating dependency on merge taskId 5
DEBUG: pruning merge fetch taskId 4
DETAIL: Creating dependency on merge taskId 26
DEBUG: pruning merge fetch taskId 5
DETAIL: Creating dependency on merge taskId 8
DEBUG: pruning merge fetch taskId 7
DETAIL: Creating dependency on merge taskId 35
DEBUG: pruning merge fetch taskId 8
DETAIL: Creating dependency on merge taskId 11
DEBUG: pruning merge fetch taskId 10
DETAIL: Creating dependency on merge taskId 44
DEBUG: pruning merge fetch taskId 11
DETAIL: Creating dependency on merge taskId 14
DEBUG: assigned task 3 to node localhost:57637
DEBUG: assigned task 6 to node localhost:57638
DEBUG: assigned task 9 to node localhost:57637
DEBUG: assigned task 12 to node localhost:57638
DEBUG: completed cleanup query for job 6
DEBUG: completed cleanup query for job 6
DEBUG: completed cleanup query for job 4
DEBUG: completed cleanup query for job 4
DEBUG: completed cleanup query for job 5
DEBUG: completed cleanup query for job 5
DEBUG: CommitTransactionCommand
l_partkey | o_orderkey | count
-----------+------------+-------
(0 rows)
-- Reset client logging level to its previous value
SET client_min_messages TO NOTICE;
DEBUG: StartTransactionCommand
DEBUG: ProcessUtility
COMMIT;

View File

@ -0,0 +1,250 @@
--
-- MULTI_LARGE_TABLE_TASK_ASSIGNMENT
--
-- Tests which cover task assignment for MapMerge jobs for single range repartition
-- and dual hash repartition joins. The tests also cover task assignment propagation
-- from a sql task to its depended tasks. Note that we set the executor type to task
-- tracker executor here, as we cannot run repartition jobs with real time executor.
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 710000;
BEGIN;
SET client_min_messages TO DEBUG3;
DEBUG: CommitTransactionCommand
SET citus.large_table_shard_count TO 2;
DEBUG: StartTransactionCommand
DEBUG: ProcessUtility
DEBUG: CommitTransactionCommand
SET citus.task_executor_type TO 'task-tracker';
DEBUG: StartTransactionCommand
DEBUG: ProcessUtility
DEBUG: CommitTransactionCommand
-- Single range repartition join to test anchor-shard based task assignment and
-- assignment propagation to merge and data-fetch tasks.
SELECT
count(*)
FROM
orders, customer
WHERE
o_custkey = c_custkey;
DEBUG: StartTransactionCommand
DEBUG: assigned task 4 to node localhost:57637
DEBUG: assigned task 2 to node localhost:57638
DEBUG: join prunable for intervals [1,1000] and [1001,2000]
DEBUG: join prunable for intervals [1,1000] and [6001,7000]
DEBUG: join prunable for intervals [1001,2000] and [1,1000]
DEBUG: join prunable for intervals [1001,2000] and [6001,7000]
DEBUG: join prunable for intervals [6001,7000] and [1,1000]
DEBUG: join prunable for intervals [6001,7000] and [1001,2000]
DEBUG: pruning merge fetch taskId 1
DETAIL: Creating dependency on merge taskId 5
DEBUG: pruning merge fetch taskId 4
DETAIL: Creating dependency on merge taskId 8
DEBUG: pruning merge fetch taskId 7
DETAIL: Creating dependency on merge taskId 11
DEBUG: assigned task 6 to node localhost:57637
DEBUG: assigned task 9 to node localhost:57638
DEBUG: assigned task 3 to node localhost:57637
DEBUG: CommitTransactionCommand
count
-------
2984
(1 row)
-- Single range repartition join, along with a join with a small table containing
-- more than one shard. This situation results in multiple sql tasks depending on
-- the same merge task, and tests our constraint group creation and assignment
-- propagation. Here 'orders' is considered the small table.
SET citus.large_table_shard_count TO 3;
DEBUG: StartTransactionCommand
DEBUG: ProcessUtility
DEBUG: CommitTransactionCommand
SELECT
count(*)
FROM
orders, customer, lineitem
WHERE
o_custkey = c_custkey AND
o_orderkey = l_orderkey;
DEBUG: StartTransactionCommand
DEBUG: assigned task 9 to node localhost:57637
DEBUG: assigned task 15 to node localhost:57638
DEBUG: assigned task 12 to node localhost:57637
DEBUG: assigned task 18 to node localhost:57638
DEBUG: assigned task 3 to node localhost:57637
DEBUG: assigned task 6 to node localhost:57638
DEBUG: join prunable for intervals [1,1509] and [2951,4455]
DEBUG: join prunable for intervals [1,1509] and [4480,5986]
DEBUG: join prunable for intervals [1,1509] and [8997,10560]
DEBUG: join prunable for intervals [1,1509] and [10560,12036]
DEBUG: join prunable for intervals [1,1509] and [12036,13473]
DEBUG: join prunable for intervals [1,1509] and [13473,14947]
DEBUG: join prunable for intervals [1509,4964] and [8997,10560]
DEBUG: join prunable for intervals [1509,4964] and [10560,12036]
DEBUG: join prunable for intervals [1509,4964] and [12036,13473]
DEBUG: join prunable for intervals [1509,4964] and [13473,14947]
DEBUG: join prunable for intervals [2951,4455] and [1,1509]
DEBUG: join prunable for intervals [2951,4455] and [4480,5986]
DEBUG: join prunable for intervals [2951,4455] and [8997,10560]
DEBUG: join prunable for intervals [2951,4455] and [10560,12036]
DEBUG: join prunable for intervals [2951,4455] and [12036,13473]
DEBUG: join prunable for intervals [2951,4455] and [13473,14947]
DEBUG: join prunable for intervals [4480,5986] and [1,1509]
DEBUG: join prunable for intervals [4480,5986] and [2951,4455]
DEBUG: join prunable for intervals [4480,5986] and [8997,10560]
DEBUG: join prunable for intervals [4480,5986] and [10560,12036]
DEBUG: join prunable for intervals [4480,5986] and [12036,13473]
DEBUG: join prunable for intervals [4480,5986] and [13473,14947]
DEBUG: join prunable for intervals [8997,10560] and [1,1509]
DEBUG: join prunable for intervals [8997,10560] and [1509,4964]
DEBUG: join prunable for intervals [8997,10560] and [2951,4455]
DEBUG: join prunable for intervals [8997,10560] and [4480,5986]
DEBUG: join prunable for intervals [8997,10560] and [12036,13473]
DEBUG: join prunable for intervals [8997,10560] and [13473,14947]
DEBUG: join prunable for intervals [10560,12036] and [1,1509]
DEBUG: join prunable for intervals [10560,12036] and [1509,4964]
DEBUG: join prunable for intervals [10560,12036] and [2951,4455]
DEBUG: join prunable for intervals [10560,12036] and [4480,5986]
DEBUG: join prunable for intervals [10560,12036] and [13473,14947]
DEBUG: join prunable for intervals [12036,13473] and [1,1509]
DEBUG: join prunable for intervals [12036,13473] and [1509,4964]
DEBUG: join prunable for intervals [12036,13473] and [2951,4455]
DEBUG: join prunable for intervals [12036,13473] and [4480,5986]
DEBUG: join prunable for intervals [12036,13473] and [8997,10560]
DEBUG: join prunable for intervals [13473,14947] and [1,1509]
DEBUG: join prunable for intervals [13473,14947] and [1509,4964]
DEBUG: join prunable for intervals [13473,14947] and [2951,4455]
DEBUG: join prunable for intervals [13473,14947] and [4480,5986]
DEBUG: join prunable for intervals [13473,14947] and [8997,10560]
DEBUG: join prunable for intervals [13473,14947] and [10560,12036]
DEBUG: pruning merge fetch taskId 1
DETAIL: Creating dependency on merge taskId 19
DEBUG: pruning merge fetch taskId 4
DETAIL: Creating dependency on merge taskId 19
DEBUG: pruning merge fetch taskId 7
DETAIL: Creating dependency on merge taskId 26
DEBUG: pruning merge fetch taskId 10
DETAIL: Creating dependency on merge taskId 26
DEBUG: pruning merge fetch taskId 13
DETAIL: Creating dependency on merge taskId 26
DEBUG: pruning merge fetch taskId 16
DETAIL: Creating dependency on merge taskId 26
DEBUG: pruning merge fetch taskId 19
DETAIL: Creating dependency on merge taskId 33
DEBUG: pruning merge fetch taskId 22
DETAIL: Creating dependency on merge taskId 33
DEBUG: pruning merge fetch taskId 25
DETAIL: Creating dependency on merge taskId 40
DEBUG: pruning merge fetch taskId 28
DETAIL: Creating dependency on merge taskId 40
DEBUG: pruning merge fetch taskId 31
DETAIL: Creating dependency on merge taskId 47
DEBUG: pruning merge fetch taskId 34
DETAIL: Creating dependency on merge taskId 47
DEBUG: pruning merge fetch taskId 37
DETAIL: Creating dependency on merge taskId 54
DEBUG: pruning merge fetch taskId 40
DETAIL: Creating dependency on merge taskId 54
DEBUG: pruning merge fetch taskId 43
DETAIL: Creating dependency on merge taskId 54
DEBUG: pruning merge fetch taskId 46
DETAIL: Creating dependency on merge taskId 61
DEBUG: pruning merge fetch taskId 49
DETAIL: Creating dependency on merge taskId 61
DEBUG: pruning merge fetch taskId 52
DETAIL: Creating dependency on merge taskId 61
DEBUG: pruning merge fetch taskId 55
DETAIL: Creating dependency on merge taskId 68
DEBUG: pruning merge fetch taskId 58
DETAIL: Creating dependency on merge taskId 68
DEBUG: assigned task 21 to node localhost:57637
DEBUG: assigned task 3 to node localhost:57638
DEBUG: assigned task 27 to node localhost:57637
DEBUG: assigned task 9 to node localhost:57638
DEBUG: assigned task 48 to node localhost:57637
DEBUG: assigned task 33 to node localhost:57638
DEBUG: assigned task 39 to node localhost:57637
DEBUG: assigned task 57 to node localhost:57638
DEBUG: propagating assignment from merge task 19 to constrained sql task 6
DEBUG: propagating assignment from merge task 26 to constrained sql task 12
DEBUG: propagating assignment from merge task 26 to constrained sql task 15
DEBUG: propagating assignment from merge task 26 to constrained sql task 18
DEBUG: propagating assignment from merge task 33 to constrained sql task 24
DEBUG: propagating assignment from merge task 40 to constrained sql task 30
DEBUG: propagating assignment from merge task 47 to constrained sql task 36
DEBUG: propagating assignment from merge task 54 to constrained sql task 42
DEBUG: propagating assignment from merge task 54 to constrained sql task 45
DEBUG: propagating assignment from merge task 61 to constrained sql task 51
DEBUG: propagating assignment from merge task 61 to constrained sql task 54
DEBUG: propagating assignment from merge task 68 to constrained sql task 60
DEBUG: CommitTransactionCommand
count
-------
11998
(1 row)
SET citus.large_table_shard_count TO 2;
DEBUG: StartTransactionCommand
DEBUG: ProcessUtility
DEBUG: CommitTransactionCommand
-- Dual hash repartition join which tests the separate hash repartition join
-- task assignment algorithm.
SELECT
count(*)
FROM
lineitem, customer
WHERE
l_partkey = c_nationkey;
DEBUG: StartTransactionCommand
DEBUG: assigned task 4 to node localhost:57637
DEBUG: assigned task 2 to node localhost:57638
DEBUG: assigned task 8 to node localhost:57637
DEBUG: assigned task 6 to node localhost:57638
DEBUG: assigned task 12 to node localhost:57637
DEBUG: assigned task 10 to node localhost:57638
DEBUG: assigned task 16 to node localhost:57637
DEBUG: assigned task 14 to node localhost:57638
DEBUG: assigned task 4 to node localhost:57637
DEBUG: assigned task 6 to node localhost:57638
DEBUG: assigned task 2 to node localhost:57637
DEBUG: join prunable for task partitionId 0 and 1
DEBUG: join prunable for task partitionId 0 and 2
DEBUG: join prunable for task partitionId 0 and 3
DEBUG: join prunable for task partitionId 1 and 0
DEBUG: join prunable for task partitionId 1 and 2
DEBUG: join prunable for task partitionId 1 and 3
DEBUG: join prunable for task partitionId 2 and 0
DEBUG: join prunable for task partitionId 2 and 1
DEBUG: join prunable for task partitionId 2 and 3
DEBUG: join prunable for task partitionId 3 and 0
DEBUG: join prunable for task partitionId 3 and 1
DEBUG: join prunable for task partitionId 3 and 2
DEBUG: pruning merge fetch taskId 1
DETAIL: Creating dependency on merge taskId 17
DEBUG: pruning merge fetch taskId 2
DETAIL: Creating dependency on merge taskId 7
DEBUG: pruning merge fetch taskId 4
DETAIL: Creating dependency on merge taskId 26
DEBUG: pruning merge fetch taskId 5
DETAIL: Creating dependency on merge taskId 11
DEBUG: pruning merge fetch taskId 7
DETAIL: Creating dependency on merge taskId 35
DEBUG: pruning merge fetch taskId 8
DETAIL: Creating dependency on merge taskId 15
DEBUG: pruning merge fetch taskId 10
DETAIL: Creating dependency on merge taskId 44
DEBUG: pruning merge fetch taskId 11
DETAIL: Creating dependency on merge taskId 19
DEBUG: assigned task 3 to node localhost:57638
DEBUG: assigned task 6 to node localhost:57637
DEBUG: assigned task 9 to node localhost:57638
DEBUG: assigned task 12 to node localhost:57637
DEBUG: CommitTransactionCommand
count
-------
125
(1 row)
-- Reset client logging level to its previous value
SET client_min_messages TO NOTICE;
DEBUG: StartTransactionCommand
DEBUG: ProcessUtility
COMMIT;

View File

@ -0,0 +1,446 @@
--
-- MULTI_NULL_MINMAX_VALUE_PRUNING
--
-- This test checks that we can handle null min/max values in shard statistics
-- and that we don't partition or join prune shards that have null values.
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 760000;
SET client_min_messages TO DEBUG2;
SET citus.explain_all_tasks TO on;
-- to avoid differing explain output - executor doesn't matter,
-- because were testing pruning here.
SET citus.task_executor_type TO 'real-time';
-- Change configuration to treat lineitem and orders tables as large
SET citus.large_table_shard_count TO 2;
SELECT shardminvalue, shardmaxvalue from pg_dist_shard WHERE shardid = 290000;
shardminvalue | shardmaxvalue
---------------+---------------
1 | 1509
(1 row)
SELECT shardminvalue, shardmaxvalue from pg_dist_shard WHERE shardid = 290001;
shardminvalue | shardmaxvalue
---------------+---------------
1509 | 2951
(1 row)
-- Check that partition and join pruning works when min/max values exist
-- Adding l_orderkey = 1 to make the query not router executable
EXPLAIN (COSTS FALSE)
SELECT l_orderkey, l_linenumber, l_shipdate FROM lineitem WHERE l_orderkey = 9030 or l_orderkey = 1;
QUERY PLAN
-----------------------------------------------------------------------
Custom Scan (Citus Real-Time)
Task Count: 2
Tasks Shown: All
-> Task
Node: host=localhost port=57637 dbname=regression
-> Bitmap Heap Scan on lineitem_290000 lineitem
Recheck Cond: ((l_orderkey = 9030) OR (l_orderkey = 1))
-> BitmapOr
-> Bitmap Index Scan on lineitem_pkey_290000
Index Cond: (l_orderkey = 9030)
-> Bitmap Index Scan on lineitem_pkey_290000
Index Cond: (l_orderkey = 1)
-> Task
Node: host=localhost port=57638 dbname=regression
-> Bitmap Heap Scan on lineitem_290004 lineitem
Recheck Cond: ((l_orderkey = 9030) OR (l_orderkey = 1))
-> BitmapOr
-> Bitmap Index Scan on lineitem_pkey_290004
Index Cond: (l_orderkey = 9030)
-> Bitmap Index Scan on lineitem_pkey_290004
Index Cond: (l_orderkey = 1)
(21 rows)
EXPLAIN (COSTS FALSE)
SELECT sum(l_linenumber), avg(l_linenumber) FROM lineitem, orders
WHERE l_orderkey = o_orderkey;
DEBUG: join prunable for intervals [1,1509] and [8997,14946]
DEBUG: join prunable for intervals [1509,2951] and [8997,14946]
DEBUG: join prunable for intervals [2951,4455] and [8997,14946]
DEBUG: join prunable for intervals [4480,5986] and [8997,14946]
DEBUG: join prunable for intervals [8997,10560] and [1,5986]
DEBUG: join prunable for intervals [10560,12036] and [1,5986]
DEBUG: join prunable for intervals [12036,13473] and [1,5986]
DEBUG: join prunable for intervals [13473,14947] and [1,5986]
QUERY PLAN
------------------------------------------------------------------------------------------------------
Aggregate
-> Custom Scan (Citus Real-Time)
Task Count: 8
Tasks Shown: All
-> Task
Node: host=localhost port=57637 dbname=regression
-> Aggregate
-> Merge Join
Merge Cond: (orders.o_orderkey = lineitem.l_orderkey)
-> Index Only Scan using orders_pkey_290008 on orders_290008 orders
-> Index Only Scan using lineitem_pkey_290001 on lineitem_290001 lineitem
-> Task
Node: host=localhost port=57638 dbname=regression
-> Aggregate
-> Merge Join
Merge Cond: (lineitem.l_orderkey = orders.o_orderkey)
-> Index Only Scan using lineitem_pkey_290000 on lineitem_290000 lineitem
-> Index Only Scan using orders_pkey_290008 on orders_290008 orders
-> Task
Node: host=localhost port=57637 dbname=regression
-> Aggregate
-> Merge Join
Merge Cond: (orders.o_orderkey = lineitem.l_orderkey)
-> Index Only Scan using orders_pkey_290008 on orders_290008 orders
-> Index Only Scan using lineitem_pkey_290003 on lineitem_290003 lineitem
-> Task
Node: host=localhost port=57638 dbname=regression
-> Aggregate
-> Merge Join
Merge Cond: (orders.o_orderkey = lineitem.l_orderkey)
-> Index Only Scan using orders_pkey_290008 on orders_290008 orders
-> Index Only Scan using lineitem_pkey_290002 on lineitem_290002 lineitem
-> Task
Node: host=localhost port=57637 dbname=regression
-> Aggregate
-> Merge Join
Merge Cond: (orders.o_orderkey = lineitem.l_orderkey)
-> Index Only Scan using orders_pkey_290009 on orders_290009 orders
-> Index Only Scan using lineitem_pkey_290005 on lineitem_290005 lineitem
-> Task
Node: host=localhost port=57638 dbname=regression
-> Aggregate
-> Merge Join
Merge Cond: (orders.o_orderkey = lineitem.l_orderkey)
-> Index Only Scan using orders_pkey_290009 on orders_290009 orders
-> Index Only Scan using lineitem_pkey_290004 on lineitem_290004 lineitem
-> Task
Node: host=localhost port=57637 dbname=regression
-> Aggregate
-> Merge Join
Merge Cond: (orders.o_orderkey = lineitem.l_orderkey)
-> Index Only Scan using orders_pkey_290009 on orders_290009 orders
-> Index Only Scan using lineitem_pkey_290007 on lineitem_290007 lineitem
-> Task
Node: host=localhost port=57638 dbname=regression
-> Aggregate
-> Merge Join
Merge Cond: (orders.o_orderkey = lineitem.l_orderkey)
-> Index Only Scan using orders_pkey_290009 on orders_290009 orders
-> Index Only Scan using lineitem_pkey_290006 on lineitem_290006 lineitem
(60 rows)
-- Now set the minimum value for a shard to null. Then check that we don't apply
-- partition or join pruning for the shard with null min value.
UPDATE pg_dist_shard SET shardminvalue = NULL WHERE shardid = 290000;
EXPLAIN (COSTS FALSE)
SELECT l_orderkey, l_linenumber, l_shipdate FROM lineitem WHERE l_orderkey = 9030;
QUERY PLAN
-------------------------------------------------------------------------------
Custom Scan (Citus Real-Time)
Task Count: 2
Tasks Shown: All
-> Task
Node: host=localhost port=57637 dbname=regression
-> Index Scan using lineitem_pkey_290000 on lineitem_290000 lineitem
Index Cond: (l_orderkey = 9030)
-> Task
Node: host=localhost port=57638 dbname=regression
-> Index Scan using lineitem_pkey_290004 on lineitem_290004 lineitem
Index Cond: (l_orderkey = 9030)
(11 rows)
EXPLAIN (COSTS FALSE)
SELECT sum(l_linenumber), avg(l_linenumber) FROM lineitem, orders
WHERE l_orderkey = o_orderkey;
DEBUG: join prunable for intervals [1509,2951] and [8997,14946]
DEBUG: join prunable for intervals [2951,4455] and [8997,14946]
DEBUG: join prunable for intervals [4480,5986] and [8997,14946]
DEBUG: join prunable for intervals [8997,10560] and [1,5986]
DEBUG: join prunable for intervals [10560,12036] and [1,5986]
DEBUG: join prunable for intervals [12036,13473] and [1,5986]
DEBUG: join prunable for intervals [13473,14947] and [1,5986]
QUERY PLAN
------------------------------------------------------------------------------------------------------
Aggregate
-> Custom Scan (Citus Real-Time)
Task Count: 9
Tasks Shown: All
-> Task
Node: host=localhost port=57637 dbname=regression
-> Aggregate
-> Merge Join
Merge Cond: (orders.o_orderkey = lineitem.l_orderkey)
-> Index Only Scan using orders_pkey_290008 on orders_290008 orders
-> Index Only Scan using lineitem_pkey_290001 on lineitem_290001 lineitem
-> Task
Node: host=localhost port=57638 dbname=regression
-> Aggregate
-> Merge Join
Merge Cond: (lineitem.l_orderkey = orders.o_orderkey)
-> Index Only Scan using lineitem_pkey_290000 on lineitem_290000 lineitem
-> Index Only Scan using orders_pkey_290008 on orders_290008 orders
-> Task
Node: host=localhost port=57637 dbname=regression
-> Aggregate
-> Merge Join
Merge Cond: (orders.o_orderkey = lineitem.l_orderkey)
-> Index Only Scan using orders_pkey_290008 on orders_290008 orders
-> Index Only Scan using lineitem_pkey_290003 on lineitem_290003 lineitem
-> Task
Node: host=localhost port=57638 dbname=regression
-> Aggregate
-> Merge Join
Merge Cond: (orders.o_orderkey = lineitem.l_orderkey)
-> Index Only Scan using orders_pkey_290009 on orders_290009 orders
-> Index Only Scan using lineitem_pkey_290000 on lineitem_290000 lineitem
-> Task
Node: host=localhost port=57637 dbname=regression
-> Aggregate
-> Merge Join
Merge Cond: (orders.o_orderkey = lineitem.l_orderkey)
-> Index Only Scan using orders_pkey_290009 on orders_290009 orders
-> Index Only Scan using lineitem_pkey_290005 on lineitem_290005 lineitem
-> Task
Node: host=localhost port=57638 dbname=regression
-> Aggregate
-> Merge Join
Merge Cond: (orders.o_orderkey = lineitem.l_orderkey)
-> Index Only Scan using orders_pkey_290008 on orders_290008 orders
-> Index Only Scan using lineitem_pkey_290002 on lineitem_290002 lineitem
-> Task
Node: host=localhost port=57637 dbname=regression
-> Aggregate
-> Merge Join
Merge Cond: (orders.o_orderkey = lineitem.l_orderkey)
-> Index Only Scan using orders_pkey_290009 on orders_290009 orders
-> Index Only Scan using lineitem_pkey_290007 on lineitem_290007 lineitem
-> Task
Node: host=localhost port=57638 dbname=regression
-> Aggregate
-> Merge Join
Merge Cond: (orders.o_orderkey = lineitem.l_orderkey)
-> Index Only Scan using orders_pkey_290009 on orders_290009 orders
-> Index Only Scan using lineitem_pkey_290004 on lineitem_290004 lineitem
-> Task
Node: host=localhost port=57637 dbname=regression
-> Aggregate
-> Merge Join
Merge Cond: (orders.o_orderkey = lineitem.l_orderkey)
-> Index Only Scan using orders_pkey_290009 on orders_290009 orders
-> Index Only Scan using lineitem_pkey_290006 on lineitem_290006 lineitem
(67 rows)
-- Next, set the maximum value for another shard to null. Then check that we
-- don't apply partition or join pruning for this other shard either.
UPDATE pg_dist_shard SET shardmaxvalue = NULL WHERE shardid = 290001;
EXPLAIN (COSTS FALSE)
SELECT l_orderkey, l_linenumber, l_shipdate FROM lineitem WHERE l_orderkey = 9030;
QUERY PLAN
-------------------------------------------------------------------------------
Custom Scan (Citus Real-Time)
Task Count: 3
Tasks Shown: All
-> Task
Node: host=localhost port=57637 dbname=regression
-> Index Scan using lineitem_pkey_290001 on lineitem_290001 lineitem
Index Cond: (l_orderkey = 9030)
-> Task
Node: host=localhost port=57638 dbname=regression
-> Index Scan using lineitem_pkey_290000 on lineitem_290000 lineitem
Index Cond: (l_orderkey = 9030)
-> Task
Node: host=localhost port=57637 dbname=regression
-> Index Scan using lineitem_pkey_290004 on lineitem_290004 lineitem
Index Cond: (l_orderkey = 9030)
(15 rows)
EXPLAIN (COSTS FALSE)
SELECT sum(l_linenumber), avg(l_linenumber) FROM lineitem, orders
WHERE l_orderkey = o_orderkey;
DEBUG: join prunable for intervals [2951,4455] and [8997,14946]
DEBUG: join prunable for intervals [4480,5986] and [8997,14946]
DEBUG: join prunable for intervals [8997,10560] and [1,5986]
DEBUG: join prunable for intervals [10560,12036] and [1,5986]
DEBUG: join prunable for intervals [12036,13473] and [1,5986]
DEBUG: join prunable for intervals [13473,14947] and [1,5986]
QUERY PLAN
------------------------------------------------------------------------------------------------------
Aggregate
-> Custom Scan (Citus Real-Time)
Task Count: 10
Tasks Shown: All
-> Task
Node: host=localhost port=57637 dbname=regression
-> Aggregate
-> Merge Join
Merge Cond: (orders.o_orderkey = lineitem.l_orderkey)
-> Index Only Scan using orders_pkey_290009 on orders_290009 orders
-> Index Only Scan using lineitem_pkey_290001 on lineitem_290001 lineitem
-> Task
Node: host=localhost port=57638 dbname=regression
-> Aggregate
-> Merge Join
Merge Cond: (lineitem.l_orderkey = orders.o_orderkey)
-> Index Only Scan using lineitem_pkey_290000 on lineitem_290000 lineitem
-> Index Only Scan using orders_pkey_290008 on orders_290008 orders
-> Task
Node: host=localhost port=57637 dbname=regression
-> Aggregate
-> Merge Join
Merge Cond: (orders.o_orderkey = lineitem.l_orderkey)
-> Index Only Scan using orders_pkey_290008 on orders_290008 orders
-> Index Only Scan using lineitem_pkey_290001 on lineitem_290001 lineitem
-> Task
Node: host=localhost port=57638 dbname=regression
-> Aggregate
-> Merge Join
Merge Cond: (orders.o_orderkey = lineitem.l_orderkey)
-> Index Only Scan using orders_pkey_290009 on orders_290009 orders
-> Index Only Scan using lineitem_pkey_290000 on lineitem_290000 lineitem
-> Task
Node: host=localhost port=57637 dbname=regression
-> Aggregate
-> Merge Join
Merge Cond: (orders.o_orderkey = lineitem.l_orderkey)
-> Index Only Scan using orders_pkey_290008 on orders_290008 orders
-> Index Only Scan using lineitem_pkey_290003 on lineitem_290003 lineitem
-> Task
Node: host=localhost port=57638 dbname=regression
-> Aggregate
-> Merge Join
Merge Cond: (orders.o_orderkey = lineitem.l_orderkey)
-> Index Only Scan using orders_pkey_290008 on orders_290008 orders
-> Index Only Scan using lineitem_pkey_290002 on lineitem_290002 lineitem
-> Task
Node: host=localhost port=57637 dbname=regression
-> Aggregate
-> Merge Join
Merge Cond: (orders.o_orderkey = lineitem.l_orderkey)
-> Index Only Scan using orders_pkey_290009 on orders_290009 orders
-> Index Only Scan using lineitem_pkey_290005 on lineitem_290005 lineitem
-> Task
Node: host=localhost port=57638 dbname=regression
-> Aggregate
-> Merge Join
Merge Cond: (orders.o_orderkey = lineitem.l_orderkey)
-> Index Only Scan using orders_pkey_290009 on orders_290009 orders
-> Index Only Scan using lineitem_pkey_290004 on lineitem_290004 lineitem
-> Task
Node: host=localhost port=57637 dbname=regression
-> Aggregate
-> Merge Join
Merge Cond: (orders.o_orderkey = lineitem.l_orderkey)
-> Index Only Scan using orders_pkey_290009 on orders_290009 orders
-> Index Only Scan using lineitem_pkey_290007 on lineitem_290007 lineitem
-> Task
Node: host=localhost port=57638 dbname=regression
-> Aggregate
-> Merge Join
Merge Cond: (orders.o_orderkey = lineitem.l_orderkey)
-> Index Only Scan using orders_pkey_290009 on orders_290009 orders
-> Index Only Scan using lineitem_pkey_290006 on lineitem_290006 lineitem
(74 rows)
-- Last, set the minimum value to 0 and check that we don't treat it as null. We
-- should apply partition and join pruning for this shard now.
UPDATE pg_dist_shard SET shardminvalue = '0' WHERE shardid = 290000;
EXPLAIN (COSTS FALSE)
SELECT l_orderkey, l_linenumber, l_shipdate FROM lineitem WHERE l_orderkey = 9030;
QUERY PLAN
-------------------------------------------------------------------------------
Custom Scan (Citus Real-Time)
Task Count: 2
Tasks Shown: All
-> Task
Node: host=localhost port=57637 dbname=regression
-> Index Scan using lineitem_pkey_290001 on lineitem_290001 lineitem
Index Cond: (l_orderkey = 9030)
-> Task
Node: host=localhost port=57638 dbname=regression
-> Index Scan using lineitem_pkey_290004 on lineitem_290004 lineitem
Index Cond: (l_orderkey = 9030)
(11 rows)
EXPLAIN (COSTS FALSE)
SELECT sum(l_linenumber), avg(l_linenumber) FROM lineitem, orders
WHERE l_orderkey = o_orderkey;
DEBUG: join prunable for intervals [0,1509] and [8997,14946]
DEBUG: join prunable for intervals [2951,4455] and [8997,14946]
DEBUG: join prunable for intervals [4480,5986] and [8997,14946]
DEBUG: join prunable for intervals [8997,10560] and [1,5986]
DEBUG: join prunable for intervals [10560,12036] and [1,5986]
DEBUG: join prunable for intervals [12036,13473] and [1,5986]
DEBUG: join prunable for intervals [13473,14947] and [1,5986]
QUERY PLAN
------------------------------------------------------------------------------------------------------
Aggregate
-> Custom Scan (Citus Real-Time)
Task Count: 9
Tasks Shown: All
-> Task
Node: host=localhost port=57637 dbname=regression
-> Aggregate
-> Merge Join
Merge Cond: (orders.o_orderkey = lineitem.l_orderkey)
-> Index Only Scan using orders_pkey_290009 on orders_290009 orders
-> Index Only Scan using lineitem_pkey_290001 on lineitem_290001 lineitem
-> Task
Node: host=localhost port=57638 dbname=regression
-> Aggregate
-> Merge Join
Merge Cond: (lineitem.l_orderkey = orders.o_orderkey)
-> Index Only Scan using lineitem_pkey_290000 on lineitem_290000 lineitem
-> Index Only Scan using orders_pkey_290008 on orders_290008 orders
-> Task
Node: host=localhost port=57637 dbname=regression
-> Aggregate
-> Merge Join
Merge Cond: (orders.o_orderkey = lineitem.l_orderkey)
-> Index Only Scan using orders_pkey_290008 on orders_290008 orders
-> Index Only Scan using lineitem_pkey_290001 on lineitem_290001 lineitem
-> Task
Node: host=localhost port=57638 dbname=regression
-> Aggregate
-> Merge Join
Merge Cond: (orders.o_orderkey = lineitem.l_orderkey)
-> Index Only Scan using orders_pkey_290008 on orders_290008 orders
-> Index Only Scan using lineitem_pkey_290002 on lineitem_290002 lineitem
-> Task
Node: host=localhost port=57637 dbname=regression
-> Aggregate
-> Merge Join
Merge Cond: (orders.o_orderkey = lineitem.l_orderkey)
-> Index Only Scan using orders_pkey_290008 on orders_290008 orders
-> Index Only Scan using lineitem_pkey_290003 on lineitem_290003 lineitem
-> Task
Node: host=localhost port=57638 dbname=regression
-> Aggregate
-> Merge Join
Merge Cond: (orders.o_orderkey = lineitem.l_orderkey)
-> Index Only Scan using orders_pkey_290009 on orders_290009 orders
-> Index Only Scan using lineitem_pkey_290004 on lineitem_290004 lineitem
-> Task
Node: host=localhost port=57637 dbname=regression
-> Aggregate
-> Merge Join
Merge Cond: (orders.o_orderkey = lineitem.l_orderkey)
-> Index Only Scan using orders_pkey_290009 on orders_290009 orders
-> Index Only Scan using lineitem_pkey_290005 on lineitem_290005 lineitem
-> Task
Node: host=localhost port=57638 dbname=regression
-> Aggregate
-> Merge Join
Merge Cond: (orders.o_orderkey = lineitem.l_orderkey)
-> Index Only Scan using orders_pkey_290009 on orders_290009 orders
-> Index Only Scan using lineitem_pkey_290006 on lineitem_290006 lineitem
-> Task
Node: host=localhost port=57637 dbname=regression
-> Aggregate
-> Merge Join
Merge Cond: (orders.o_orderkey = lineitem.l_orderkey)
-> Index Only Scan using orders_pkey_290009 on orders_290009 orders
-> Index Only Scan using lineitem_pkey_290007 on lineitem_290007 lineitem
(67 rows)
-- Set minimum and maximum values for two shards back to their original values
UPDATE pg_dist_shard SET shardminvalue = '1' WHERE shardid = 290000;
UPDATE pg_dist_shard SET shardmaxvalue = '4964' WHERE shardid = 290001;
SET client_min_messages TO NOTICE;

View File

@ -0,0 +1,170 @@
--
-- MULTI_TASK_ASSIGNMENT
--
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 880000;
SET citus.explain_distributed_queries TO off;
-- Check that our policies for assigning tasks to worker nodes run as expected.
-- To test this, we first create a shell table, and then manually insert shard
-- and shard placement data into system catalogs. We next run Explain command,
-- and check that tasks are assigned to worker nodes as expected.
CREATE TABLE task_assignment_test_table (test_id integer);
SELECT master_create_distributed_table('task_assignment_test_table', 'test_id', 'append');
master_create_distributed_table
---------------------------------
(1 row)
-- Create logical shards with shardids 200, 201, and 202
INSERT INTO pg_dist_shard (logicalrelid, shardid, shardstorage, shardminvalue, shardmaxvalue)
SELECT pg_class.oid, series.index, 'r', 1, 1000
FROM pg_class, generate_series(200, 202) AS series(index)
WHERE pg_class.relname = 'task_assignment_test_table';
-- Create shard placements for shard 200 and 201
INSERT INTO pg_dist_shard_placement (shardid, shardstate, shardlength, nodename, nodeport)
SELECT 200, 1, 1, nodename, nodeport
FROM pg_dist_shard_placement
GROUP BY nodename, nodeport
ORDER BY nodename, nodeport ASC
LIMIT 2;
INSERT INTO pg_dist_shard_placement (shardid, shardstate, shardlength, nodename, nodeport)
SELECT 201, 1, 1, nodename, nodeport
FROM pg_dist_shard_placement
GROUP BY nodename, nodeport
ORDER BY nodename, nodeport ASC
LIMIT 2;
-- Create shard placements for shard 202
INSERT INTO pg_dist_shard_placement (shardid, shardstate, shardlength, nodename, nodeport)
SELECT 202, 1, 1, nodename, nodeport
FROM pg_dist_shard_placement
GROUP BY nodename, nodeport
ORDER BY nodename, nodeport DESC
LIMIT 2;
-- Start transaction block to avoid auto commits. This avoids additional debug
-- messages from getting printed at real transaction starts and commits.
BEGIN;
-- Increase log level to see which worker nodes tasks are assigned to. Note that
-- the following log messages print node name and port numbers; and node numbers
-- in regression tests depend upon PG_VERSION_NUM.
SET client_min_messages TO DEBUG3;
DEBUG: CommitTransactionCommand
-- First test the default greedy task assignment policy
SET citus.task_assignment_policy TO 'greedy';
DEBUG: StartTransactionCommand
DEBUG: ProcessUtility
DEBUG: CommitTransactionCommand
EXPLAIN SELECT count(*) FROM task_assignment_test_table;
DEBUG: StartTransactionCommand
DEBUG: ProcessUtility
DEBUG: assigned task 6 to node localhost:57637
DEBUG: assigned task 2 to node localhost:57638
DEBUG: assigned task 4 to node localhost:57637
DEBUG: CommitTransactionCommand
QUERY PLAN
-----------------------------------------------------------------------
Aggregate (cost=0.00..0.00 rows=0 width=0)
-> Custom Scan (Citus Real-Time) (cost=0.00..0.00 rows=0 width=0)
explain statements for distributed queries are not enabled
(3 rows)
EXPLAIN SELECT count(*) FROM task_assignment_test_table;
DEBUG: StartTransactionCommand
DEBUG: ProcessUtility
DEBUG: assigned task 6 to node localhost:57637
DEBUG: assigned task 2 to node localhost:57638
DEBUG: assigned task 4 to node localhost:57637
DEBUG: CommitTransactionCommand
QUERY PLAN
-----------------------------------------------------------------------
Aggregate (cost=0.00..0.00 rows=0 width=0)
-> Custom Scan (Citus Real-Time) (cost=0.00..0.00 rows=0 width=0)
explain statements for distributed queries are not enabled
(3 rows)
-- Next test the first-replica task assignment policy
SET citus.task_assignment_policy TO 'first-replica';
DEBUG: StartTransactionCommand
DEBUG: ProcessUtility
DEBUG: CommitTransactionCommand
EXPLAIN SELECT count(*) FROM task_assignment_test_table;
DEBUG: StartTransactionCommand
DEBUG: ProcessUtility
DEBUG: assigned task 6 to node localhost:57637
DEBUG: assigned task 4 to node localhost:57637
DEBUG: assigned task 2 to node localhost:57638
DEBUG: CommitTransactionCommand
QUERY PLAN
-----------------------------------------------------------------------
Aggregate (cost=0.00..0.00 rows=0 width=0)
-> Custom Scan (Citus Real-Time) (cost=0.00..0.00 rows=0 width=0)
explain statements for distributed queries are not enabled
(3 rows)
EXPLAIN SELECT count(*) FROM task_assignment_test_table;
DEBUG: StartTransactionCommand
DEBUG: ProcessUtility
DEBUG: assigned task 6 to node localhost:57637
DEBUG: assigned task 4 to node localhost:57637
DEBUG: assigned task 2 to node localhost:57638
DEBUG: CommitTransactionCommand
QUERY PLAN
-----------------------------------------------------------------------
Aggregate (cost=0.00..0.00 rows=0 width=0)
-> Custom Scan (Citus Real-Time) (cost=0.00..0.00 rows=0 width=0)
explain statements for distributed queries are not enabled
(3 rows)
-- Finally test the round-robin task assignment policy
SET citus.task_assignment_policy TO 'round-robin';
DEBUG: StartTransactionCommand
DEBUG: ProcessUtility
DEBUG: CommitTransactionCommand
EXPLAIN SELECT count(*) FROM task_assignment_test_table;
DEBUG: StartTransactionCommand
DEBUG: ProcessUtility
DEBUG: assigned task 6 to node localhost:57638
DEBUG: assigned task 4 to node localhost:57638
DEBUG: assigned task 2 to node localhost:57637
DEBUG: CommitTransactionCommand
QUERY PLAN
-----------------------------------------------------------------------
Aggregate (cost=0.00..0.00 rows=0 width=0)
-> Custom Scan (Citus Real-Time) (cost=0.00..0.00 rows=0 width=0)
explain statements for distributed queries are not enabled
(3 rows)
EXPLAIN SELECT count(*) FROM task_assignment_test_table;
DEBUG: StartTransactionCommand
DEBUG: ProcessUtility
DEBUG: assigned task 6 to node localhost:57637
DEBUG: assigned task 4 to node localhost:57637
DEBUG: assigned task 2 to node localhost:57638
DEBUG: CommitTransactionCommand
QUERY PLAN
-----------------------------------------------------------------------
Aggregate (cost=0.00..0.00 rows=0 width=0)
-> Custom Scan (Citus Real-Time) (cost=0.00..0.00 rows=0 width=0)
explain statements for distributed queries are not enabled
(3 rows)
EXPLAIN SELECT count(*) FROM task_assignment_test_table;
DEBUG: StartTransactionCommand
DEBUG: ProcessUtility
DEBUG: assigned task 6 to node localhost:57638
DEBUG: assigned task 4 to node localhost:57638
DEBUG: assigned task 2 to node localhost:57637
DEBUG: CommitTransactionCommand
QUERY PLAN
-----------------------------------------------------------------------
Aggregate (cost=0.00..0.00 rows=0 width=0)
-> Custom Scan (Citus Real-Time) (cost=0.00..0.00 rows=0 width=0)
explain statements for distributed queries are not enabled
(3 rows)
RESET citus.task_assignment_policy;
DEBUG: StartTransactionCommand
DEBUG: ProcessUtility
DEBUG: CommitTransactionCommand
RESET client_min_messages;
DEBUG: StartTransactionCommand
DEBUG: ProcessUtility
COMMIT;