More tests

onur-leftjoin_push-improvements
eaydingol 2025-07-29 21:03:38 +03:00
parent 9f72067c12
commit 245b9534e2
2 changed files with 456 additions and 0 deletions

View File

@ -21,9 +21,19 @@ SELECT create_distributed_table('d1', 'a');
(1 row)
INSERT INTO d1 VALUES (1,10), (1,11), (1,20), (2,10), (2,12), (2, 20), (4, 10);
CREATE TABLE d2(a int, c text);
SELECT create_distributed_table('d2', 'a');
create_distributed_table
---------------------------------------------------------------------
(1 row)
INSERT INTO d2(a, c) VALUES (1,'a'), (1,'b'), (1,'c'), (2,'d'), (2,'e'), (2,'f'), (4,'g');
--- For testing, remove before merge
CREATE TABLE d1_local(like d1);
INSERT INTO d1_local select * from d1;
CREATE TABLE d2_local(like d2);
INSERT INTO d2_local select * from d2;
SET client_min_messages TO DEBUG3;
-- Basic test cases
-- Test that the join is pushed down to the worker nodes, using "using" syntax
@ -169,6 +179,38 @@ SELECT * FROM r1_local LEFT JOIN d1_local ON r1_local.a = d1_local.a AND r1_loca
10 | 2 | |
(9 rows)
-- Verfiy that the join is pushed via the execution plan.
EXPLAIN (COSTS OFF) SELECT * FROM r1 LEFT JOIN d1 ON r1.a = d1.a AND r1.b = d1.b ORDER BY 1, 2;
DEBUG: no shard pruning constraints on d1 found
DEBUG: shard count after pruning for d1: 4
DEBUG: Router planner cannot handle multi-shard select queries
DEBUG: a push down safe left join with recurring left side
DEBUG: no shard pruning constraints on d1 found
DEBUG: shard count after pruning for d1: 4
DEBUG: assigned task to node localhost:xxxxx
DEBUG: assigned task to node localhost:xxxxx
DEBUG: assigned task to node localhost:xxxxx
DEBUG: assigned task to node localhost:xxxxx
QUERY PLAN
---------------------------------------------------------------------
Sort
Sort Key: remote_scan.a, remote_scan.b
-> Custom Scan (Citus Adaptive)
Task Count: 4
Tasks Shown: One of 4
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> Merge Left Join
Merge Cond: ((r1.a = d1.a) AND (r1.b = d1.b))
-> Sort
Sort Key: r1.a, r1.b
-> Seq Scan on r1_1520000 r1
Filter: ((a IS NULL) OR ((btint4cmp('-2147483648'::integer, hashint4(a)) < 0) AND (btint4cmp(hashint4(a), '-1073741825'::integer) <= 0)))
-> Sort
Sort Key: d1.a, d1.b
-> Seq Scan on d1_1520001 d1
(16 rows)
SELECT count(*) FROM r1 LEFT JOIN d1 ON r1.b = d1.a;
DEBUG: no shard pruning constraints on d1 found
DEBUG: shard count after pruning for d1: 4
@ -410,6 +452,34 @@ SELECT count(*) FROM r1_local LEFT JOIN (d1_local LEFT JOIN d1_local as d1_1 USI
57
(1 row)
EXPLAIN (COSTS OFF) SELECT count(*) FROM r1 LEFT JOIN (SELECT * FROM d1) USING (a);
DEBUG: no shard pruning constraints on d1 found
DEBUG: shard count after pruning for d1: 4
DEBUG: Router planner cannot handle multi-shard select queries
DEBUG: a push down safe left join with recurring left side
DEBUG: no shard pruning constraints on d1 found
DEBUG: shard count after pruning for d1: 4
DEBUG: assigned task to node localhost:xxxxx
DEBUG: assigned task to node localhost:xxxxx
DEBUG: assigned task to node localhost:xxxxx
DEBUG: assigned task to node localhost:xxxxx
QUERY PLAN
---------------------------------------------------------------------
Aggregate
-> Custom Scan (Citus Adaptive)
Task Count: 4
Tasks Shown: One of 4
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> Aggregate
-> Hash Right Join
Hash Cond: (d1.a = r1.a)
-> Seq Scan on d1_1520001 d1
-> Hash
-> Seq Scan on r1_1520000 r1
Filter: ((a IS NULL) OR ((btint4cmp('-2147483648'::integer, hashint4(a)) < 0) AND (btint4cmp(hashint4(a), '-1073741825'::integer) <= 0)))
(13 rows)
-- Using 'on' syntax
SET client_min_messages TO DEBUG3;
SELECT count(*) FROM r1 LEFT JOIN (SELECT * FROM d1) AS d1 ON r1.a = d1.a;
@ -504,3 +574,353 @@ SELECT count(*) FROM r1_local LEFT JOIN (SELECT d1_local.a as a, d1_local.b, d1_
57
(1 row)
-- Nested joins
-- It is safe to push the inner join to compute t1. However, as the var of the inner table for the top level join (t1.a) resolves to r1.a, the outer join cannot be pushed down.
SELECT count(*) FROM r1 LEFT JOIN (SELECT r1.a, d1.b FROM r1 LEFT JOIN d1 ON r1.a = d1.a) AS t1 ON r1.a = t1.a;
DEBUG: no shard pruning constraints on d1 found
DEBUG: shard count after pruning for d1: 4
DEBUG: Router planner cannot handle multi-shard select queries
DEBUG: a push down safe left join with recurring left side
DEBUG: recursively planning right side of the left join since the outer side is a recurring rel
DEBUG: recursively planning the distributed subquery since it is part of a distributed join node that is outer joined with a recurring rel
DEBUG: no shard pruning constraints on d1 found
DEBUG: shard count after pruning for d1: 4
DEBUG: Router planner cannot handle multi-shard select queries
DEBUG: a push down safe left join with recurring left side
DEBUG: no shard pruning constraints on d1 found
DEBUG: shard count after pruning for d1: 4
DEBUG: assigned task to node localhost:xxxxx
DEBUG: assigned task to node localhost:xxxxx
DEBUG: assigned task to node localhost:xxxxx
DEBUG: assigned task to node localhost:xxxxx
DEBUG: generating subplan XXX_1 for subquery SELECT r1.a, d1.b FROM (recurring_join_pushdown.r1 LEFT JOIN recurring_join_pushdown.d1 ON ((r1.a OPERATOR(pg_catalog.=) d1.a)))
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (recurring_join_pushdown.r1 LEFT JOIN (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) t1 ON ((r1.a OPERATOR(pg_catalog.=) t1.a)))
DEBUG: Creating router plan
count
---------------------------------------------------------------------
59
(1 row)
SELECT count(*) FROM r1_local LEFT JOIN (SELECT r1_local.a, d1_local.b FROM r1_local LEFT JOIN d1_local ON r1_local.a = d1_local.a) AS t1 ON r1_local.a = t1.a;
count
---------------------------------------------------------------------
59
(1 row)
EXPLAIN (COSTS OFF) SELECT count(*) FROM r1 LEFT JOIN (SELECT r1.a, d1.b FROM r1 LEFT JOIN d1 ON r1.a = d1.a) AS t1 ON r1.a = t1.a;
DEBUG: no shard pruning constraints on d1 found
DEBUG: shard count after pruning for d1: 4
DEBUG: Router planner cannot handle multi-shard select queries
DEBUG: a push down safe left join with recurring left side
DEBUG: recursively planning right side of the left join since the outer side is a recurring rel
DEBUG: recursively planning the distributed subquery since it is part of a distributed join node that is outer joined with a recurring rel
DEBUG: no shard pruning constraints on d1 found
DEBUG: shard count after pruning for d1: 4
DEBUG: Router planner cannot handle multi-shard select queries
DEBUG: a push down safe left join with recurring left side
DEBUG: no shard pruning constraints on d1 found
DEBUG: shard count after pruning for d1: 4
DEBUG: assigned task to node localhost:xxxxx
DEBUG: assigned task to node localhost:xxxxx
DEBUG: assigned task to node localhost:xxxxx
DEBUG: assigned task to node localhost:xxxxx
DEBUG: generating subplan XXX_1 for subquery SELECT r1.a, d1.b FROM (recurring_join_pushdown.r1 LEFT JOIN recurring_join_pushdown.d1 ON ((r1.a OPERATOR(pg_catalog.=) d1.a)))
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (recurring_join_pushdown.r1 LEFT JOIN (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) t1 ON ((r1.a OPERATOR(pg_catalog.=) t1.a)))
DEBUG: Creating router plan
QUERY PLAN
---------------------------------------------------------------------
Custom Scan (Citus Adaptive)
-> Distributed Subplan XXX_1
-> Custom Scan (Citus Adaptive)
Task Count: 4
Tasks Shown: One of 4
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> Hash Right Join
Hash Cond: (d1.a = r1.a)
-> Seq Scan on d1_1520001 d1
-> Hash
-> Seq Scan on r1_1520000 r1
Filter: ((a IS NULL) OR ((btint4cmp('-2147483648'::integer, hashint4(a)) < 0) AND (btint4cmp(hashint4(a), '-1073741825'::integer) <= 0)))
Task Count: 1
Tasks Shown: All
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> Aggregate
-> Merge Right Join
Merge Cond: (intermediate_result.a = r1.a)
-> Sort
Sort Key: intermediate_result.a
-> Function Scan on read_intermediate_result intermediate_result
-> Sort
Sort Key: r1.a
-> Seq Scan on r1_1520000 r1
(26 rows)
-- In the following case, it is safe to push down both joins as t1.a resolves to d1.a.
SELECT count(*) FROM r1 LEFT JOIN (SELECT d1.a, d1.b FROM r1 LEFT JOIN d1 ON r1.a = d1.a) AS t1 ON r1.a = t1.a;
DEBUG: no shard pruning constraints on d1 found
DEBUG: shard count after pruning for d1: 4
DEBUG: Router planner cannot handle multi-shard select queries
DEBUG: a push down safe left join with recurring left side
DEBUG: a push down safe left join with recurring left side
DEBUG: no shard pruning constraints on d1 found
DEBUG: shard count after pruning for d1: 4
DEBUG: assigned task to node localhost:xxxxx
DEBUG: assigned task to node localhost:xxxxx
DEBUG: assigned task to node localhost:xxxxx
DEBUG: assigned task to node localhost:xxxxx
count
---------------------------------------------------------------------
57
(1 row)
SELECT count(*) FROM r1_local LEFT JOIN (SELECT d1_local.a, d1_local.b FROM r1_local LEFT JOIN d1_local ON r1_local.a = d1_local.a) AS t1 ON r1_local.a = t1.a;
count
---------------------------------------------------------------------
57
(1 row)
EXPLAIN (COSTS OFF) SELECT count(*) FROM r1 LEFT JOIN (SELECT d1.a, d1.b FROM r1 LEFT JOIN d1 ON r1.a = d1.a) AS t1 ON r1.a = t1.a;
DEBUG: no shard pruning constraints on d1 found
DEBUG: shard count after pruning for d1: 4
DEBUG: Router planner cannot handle multi-shard select queries
DEBUG: a push down safe left join with recurring left side
DEBUG: a push down safe left join with recurring left side
DEBUG: no shard pruning constraints on d1 found
DEBUG: shard count after pruning for d1: 4
DEBUG: assigned task to node localhost:xxxxx
DEBUG: assigned task to node localhost:xxxxx
DEBUG: assigned task to node localhost:xxxxx
DEBUG: assigned task to node localhost:xxxxx
QUERY PLAN
---------------------------------------------------------------------
Aggregate
-> Custom Scan (Citus Adaptive)
Task Count: 4
Tasks Shown: One of 4
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> Aggregate
-> Hash Right Join
Hash Cond: (d1.a = r1.a)
-> Hash Join
Hash Cond: (d1.a = r1_1.a)
-> Seq Scan on d1_1520001 d1
-> Hash
-> Seq Scan on r1_1520000 r1_1
Filter: ((a IS NULL) OR ((btint4cmp('-2147483648'::integer, hashint4(a)) < 0) AND (btint4cmp(hashint4(a), '-1073741825'::integer) <= 0)))
-> Hash
-> Seq Scan on r1_1520000 r1
Filter: ((a IS NULL) OR ((btint4cmp('-2147483648'::integer, hashint4(a)) < 0) AND (btint4cmp(hashint4(a), '-1073741825'::integer) <= 0)))
(18 rows)
-- In the following case, the lower level joins will be pushed down, but as the top level join is chained, subquery pushdown will not be applied at the top level.
SELECT count(*) FROM r1 LEFT JOIN (SELECT d1.a, d1.b FROM r1 LEFT JOIN d1 ON r1.a = d1.a) AS t1 ON t1.a = r1.a LEFT JOIN (SELECT d2.a, d2.c FROM r1 LEFT JOIN d2 ON r1.a = d2.a) AS t2 ON t1.a = t2.a;
DEBUG: no shard pruning constraints on d1 found
DEBUG: shard count after pruning for d1: 4
DEBUG: no shard pruning constraints on d2 found
DEBUG: shard count after pruning for d2: 4
DEBUG: Router planner cannot handle multi-shard select queries
DEBUG: a push down safe left join with recurring left side
DEBUG: a push down safe left join with recurring left side
DEBUG: recursively planning right side of the left join since the outer side is a recurring rel
DEBUG: recursively planning the distributed subquery since it is part of a distributed join node that is outer joined with a recurring rel
DEBUG: no shard pruning constraints on d1 found
DEBUG: shard count after pruning for d1: 4
DEBUG: Router planner cannot handle multi-shard select queries
DEBUG: a push down safe left join with recurring left side
DEBUG: no shard pruning constraints on d1 found
DEBUG: shard count after pruning for d1: 4
DEBUG: assigned task to node localhost:xxxxx
DEBUG: assigned task to node localhost:xxxxx
DEBUG: assigned task to node localhost:xxxxx
DEBUG: assigned task to node localhost:xxxxx
DEBUG: generating subplan XXX_1 for subquery SELECT d1.a, d1.b FROM (recurring_join_pushdown.r1 LEFT JOIN recurring_join_pushdown.d1 ON ((r1.a OPERATOR(pg_catalog.=) d1.a)))
DEBUG: recursively planning right side of the left join since the outer side is a recurring rel
DEBUG: recursively planning the distributed subquery since it is part of a distributed join node that is outer joined with a recurring rel
DEBUG: no shard pruning constraints on d2 found
DEBUG: shard count after pruning for d2: 4
DEBUG: Router planner cannot handle multi-shard select queries
DEBUG: a push down safe left join with recurring left side
DEBUG: no shard pruning constraints on d2 found
DEBUG: shard count after pruning for d2: 4
DEBUG: assigned task to node localhost:xxxxx
DEBUG: assigned task to node localhost:xxxxx
DEBUG: assigned task to node localhost:xxxxx
DEBUG: assigned task to node localhost:xxxxx
DEBUG: generating subplan XXX_2 for subquery SELECT d2.a, d2.c FROM (recurring_join_pushdown.r1 LEFT JOIN recurring_join_pushdown.d2 ON ((r1.a OPERATOR(pg_catalog.=) d2.a)))
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((recurring_join_pushdown.r1 LEFT JOIN (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) t1 ON ((t1.a OPERATOR(pg_catalog.=) r1.a))) LEFT JOIN (SELECT intermediate_result.a, intermediate_result.c FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(a integer, c text)) t2 ON ((t1.a OPERATOR(pg_catalog.=) t2.a)))
DEBUG: Creating router plan
count
---------------------------------------------------------------------
489
(1 row)
SELECT count(*) FROM r1_local LEFT JOIN (SELECT d1_local.a, d1_local.b FROM r1_local LEFT JOIN d1_local ON r1_local.a = d1_local.a) AS t1 ON t1.a = r1_local.a LEFT JOIN (SELECT d2_local.a, d2_local.c FROM r1_local LEFT JOIN d2_local ON r1_local.a = d2_local.a) AS t2 ON t1.a = t2.a;
count
---------------------------------------------------------------------
489
(1 row)
EXPLAIN (COSTS OFF) SELECT count(*) FROM r1 LEFT JOIN (SELECT d1.a, d1.b FROM r1 LEFT JOIN d1 ON r1.a = d1.a) AS t1 ON t1.a = r1.a LEFT JOIN (SELECT d2.a, d2.c FROM r1 LEFT JOIN d2 ON r1.a = d2.a) AS t2 ON t1.a = t2.a;
DEBUG: no shard pruning constraints on d1 found
DEBUG: shard count after pruning for d1: 4
DEBUG: no shard pruning constraints on d2 found
DEBUG: shard count after pruning for d2: 4
DEBUG: Router planner cannot handle multi-shard select queries
DEBUG: a push down safe left join with recurring left side
DEBUG: a push down safe left join with recurring left side
DEBUG: recursively planning right side of the left join since the outer side is a recurring rel
DEBUG: recursively planning the distributed subquery since it is part of a distributed join node that is outer joined with a recurring rel
DEBUG: no shard pruning constraints on d1 found
DEBUG: shard count after pruning for d1: 4
DEBUG: Router planner cannot handle multi-shard select queries
DEBUG: a push down safe left join with recurring left side
DEBUG: no shard pruning constraints on d1 found
DEBUG: shard count after pruning for d1: 4
DEBUG: assigned task to node localhost:xxxxx
DEBUG: assigned task to node localhost:xxxxx
DEBUG: assigned task to node localhost:xxxxx
DEBUG: assigned task to node localhost:xxxxx
DEBUG: generating subplan XXX_1 for subquery SELECT d1.a, d1.b FROM (recurring_join_pushdown.r1 LEFT JOIN recurring_join_pushdown.d1 ON ((r1.a OPERATOR(pg_catalog.=) d1.a)))
DEBUG: recursively planning right side of the left join since the outer side is a recurring rel
DEBUG: recursively planning the distributed subquery since it is part of a distributed join node that is outer joined with a recurring rel
DEBUG: no shard pruning constraints on d2 found
DEBUG: shard count after pruning for d2: 4
DEBUG: Router planner cannot handle multi-shard select queries
DEBUG: a push down safe left join with recurring left side
DEBUG: no shard pruning constraints on d2 found
DEBUG: shard count after pruning for d2: 4
DEBUG: assigned task to node localhost:xxxxx
DEBUG: assigned task to node localhost:xxxxx
DEBUG: assigned task to node localhost:xxxxx
DEBUG: assigned task to node localhost:xxxxx
DEBUG: generating subplan XXX_2 for subquery SELECT d2.a, d2.c FROM (recurring_join_pushdown.r1 LEFT JOIN recurring_join_pushdown.d2 ON ((r1.a OPERATOR(pg_catalog.=) d2.a)))
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((recurring_join_pushdown.r1 LEFT JOIN (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) t1 ON ((t1.a OPERATOR(pg_catalog.=) r1.a))) LEFT JOIN (SELECT intermediate_result.a, intermediate_result.c FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(a integer, c text)) t2 ON ((t1.a OPERATOR(pg_catalog.=) t2.a)))
DEBUG: Creating router plan
QUERY PLAN
---------------------------------------------------------------------
Custom Scan (Citus Adaptive)
-> Distributed Subplan XXX_1
-> Custom Scan (Citus Adaptive)
Task Count: 4
Tasks Shown: One of 4
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> Hash Right Join
Hash Cond: (d1.a = r1.a)
-> Seq Scan on d1_1520001 d1
-> Hash
-> Seq Scan on r1_1520000 r1
Filter: ((a IS NULL) OR ((btint4cmp('-2147483648'::integer, hashint4(a)) < 0) AND (btint4cmp(hashint4(a), '-1073741825'::integer) <= 0)))
-> Distributed Subplan XXX_2
-> Custom Scan (Citus Adaptive)
Task Count: 4
Tasks Shown: One of 4
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> Hash Right Join
Hash Cond: (d2.a = r1.a)
-> Seq Scan on d2_1520005 d2
-> Hash
-> Seq Scan on r1_1520000 r1
Filter: ((a IS NULL) OR ((btint4cmp('-2147483648'::integer, hashint4(a)) < 0) AND (btint4cmp(hashint4(a), '-1073741825'::integer) <= 0)))
Task Count: 1
Tasks Shown: All
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> Aggregate
-> Merge Left Join
Merge Cond: (r1.a = intermediate_result.a)
-> Sort
Sort Key: r1.a
-> Seq Scan on r1_1520000 r1
-> Materialize
-> Merge Left Join
Merge Cond: (intermediate_result.a = intermediate_result_1.a)
-> Sort
Sort Key: intermediate_result.a
-> Function Scan on read_intermediate_result intermediate_result
-> Sort
Sort Key: intermediate_result_1.a
-> Function Scan on read_intermediate_result intermediate_result_1
(44 rows)
--- As both subqueries are pushed and the top level join is over their results on distribution colums, the query is pushed down as a whole.
SELECT count(*) FROM (SELECT d1_1.a, r1.b FROM r1 LEFT JOIN d1 as d1_1 ON r1.a = d1_1.a) AS t1 LEFT JOIN
(SELECT d2.a, d2.c, r1.b FROM r1 LEFT JOIN d2 ON r1.a = d2.a) AS t2 ON t1.a = t2.a;
DEBUG: no shard pruning constraints on d1 found
DEBUG: shard count after pruning for d1: 4
DEBUG: no shard pruning constraints on d2 found
DEBUG: shard count after pruning for d2: 4
DEBUG: Router planner cannot handle multi-shard select queries
DEBUG: a push down safe left join with recurring left side
DEBUG: a push down safe left join with recurring left side
DEBUG: no shard pruning constraints on d1 found
DEBUG: shard count after pruning for d1: 4
DEBUG: no shard pruning constraints on d2 found
DEBUG: shard count after pruning for d2: 4
DEBUG: assigned task to node localhost:xxxxx
DEBUG: assigned task to node localhost:xxxxx
DEBUG: assigned task to node localhost:xxxxx
DEBUG: assigned task to node localhost:xxxxx
count
---------------------------------------------------------------------
165
(1 row)
SELECT count(*) FROM (SELECT d1_1.a, r1_local.b FROM r1_local LEFT JOIN d1_local as d1_1 ON r1_local.a = d1_1.a) AS t1_local LEFT JOIN
(SELECT d2_local.a, d2_local.c, r1_local.b FROM r1_local LEFT JOIN d2_local ON r1_local.a = d2_local.a) AS t2_local ON t1_local.a = t2_local.a;
count
---------------------------------------------------------------------
165
(1 row)
EXPLAIN (COSTS OFF) SELECT count(*) FROM (SELECT d1_1.a, r1.b FROM r1 LEFT JOIN d1 as d1_1 ON r1.a = d1_1.a) AS t1 LEFT JOIN
(SELECT d2.a, d2.c, r1.b FROM r1 LEFT JOIN d2 ON r1.a = d2.a) AS t2 ON t1.a = t2.a;
DEBUG: no shard pruning constraints on d1 found
DEBUG: shard count after pruning for d1: 4
DEBUG: no shard pruning constraints on d2 found
DEBUG: shard count after pruning for d2: 4
DEBUG: Router planner cannot handle multi-shard select queries
DEBUG: a push down safe left join with recurring left side
DEBUG: a push down safe left join with recurring left side
DEBUG: no shard pruning constraints on d1 found
DEBUG: shard count after pruning for d1: 4
DEBUG: no shard pruning constraints on d2 found
DEBUG: shard count after pruning for d2: 4
DEBUG: assigned task to node localhost:xxxxx
DEBUG: assigned task to node localhost:xxxxx
DEBUG: assigned task to node localhost:xxxxx
DEBUG: assigned task to node localhost:xxxxx
QUERY PLAN
---------------------------------------------------------------------
Aggregate
-> Custom Scan (Citus Adaptive)
Task Count: 4
Tasks Shown: One of 4
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> Aggregate
-> Merge Right Join
Merge Cond: (d2.a = d1_1.a)
-> Merge Join
Merge Cond: (r1_1.a = d2.a)
-> Sort
Sort Key: r1_1.a
-> Seq Scan on r1_1520000 r1_1
Filter: ((a IS NULL) OR ((btint4cmp('-2147483648'::integer, hashint4(a)) < 0) AND (btint4cmp(hashint4(a), '-1073741825'::integer) <= 0)))
-> Sort
Sort Key: d2.a
-> Seq Scan on d2_1520005 d2
-> Sort
Sort Key: d1_1.a
-> Hash Right Join
Hash Cond: (d1_1.a = r1.a)
-> Seq Scan on d1_1520001 d1_1
-> Hash
-> Seq Scan on r1_1520000 r1
Filter: ((a IS NULL) OR ((btint4cmp('-2147483648'::integer, hashint4(a)) < 0) AND (btint4cmp(hashint4(a), '-1073741825'::integer) <= 0)))
(26 rows)

View File

@ -16,10 +16,16 @@ CREATE TABLE d1(a int, b int);
SELECT create_distributed_table('d1', 'a');
INSERT INTO d1 VALUES (1,10), (1,11), (1,20), (2,10), (2,12), (2, 20), (4, 10);
CREATE TABLE d2(a int, c text);
SELECT create_distributed_table('d2', 'a');
INSERT INTO d2(a, c) VALUES (1,'a'), (1,'b'), (1,'c'), (2,'d'), (2,'e'), (2,'f'), (4,'g');
--- For testing, remove before merge
CREATE TABLE d1_local(like d1);
INSERT INTO d1_local select * from d1;
CREATE TABLE d2_local(like d2);
INSERT INTO d2_local select * from d2;
SET client_min_messages TO DEBUG3;
@ -44,6 +50,8 @@ SELECT count(*) FROM r1_local LEFT JOIN d1_local ON r1_local.a = d1_local.a;
SELECT * FROM r1 LEFT JOIN d1 ON r1.a = d1.a AND r1.b = d1.b ORDER BY 1, 2;
SELECT * FROM r1_local LEFT JOIN d1_local ON r1_local.a = d1_local.a AND r1_local.b = d1_local.b ORDER BY 1, 2;
-- Verfiy that the join is pushed via the execution plan.
EXPLAIN (COSTS OFF) SELECT * FROM r1 LEFT JOIN d1 ON r1.a = d1.a AND r1.b = d1.b ORDER BY 1, 2;
SELECT count(*) FROM r1 LEFT JOIN d1 ON r1.b = d1.a;
SELECT count(*) FROM r1_local LEFT JOIN d1_local ON r1_local.b = d1_local.a;
@ -76,6 +84,9 @@ SELECT count(*) FROM r1_local LEFT JOIN (SELECT * FROM d1_local JOIN d1_local as
SELECT count(*) FROM r1 LEFT JOIN (d1 LEFT JOIN d1 as d1_1 USING (a)) USING (a);
SELECT count(*) FROM r1_local LEFT JOIN (d1_local LEFT JOIN d1_local as d1_1 USING (a)) USING (a);
EXPLAIN (COSTS OFF) SELECT count(*) FROM r1 LEFT JOIN (SELECT * FROM d1) USING (a);
-- Using 'on' syntax
SET client_min_messages TO DEBUG3;
SELECT count(*) FROM r1 LEFT JOIN (SELECT * FROM d1) AS d1 ON r1.a = d1.a;
@ -89,3 +100,28 @@ SELECT count(*) FROM r1_local LEFT JOIN (SELECT * FROM (SELECT * FROM d1_local)
SELECT count(*) FROM r1 LEFT JOIN (SELECT d1.a as a, d1.b, d1_1.a AS a_1 FROM d1 LEFT JOIN d1 as d1_1 ON d1.a = d1_1.a) AS d1_2 ON r1.a = d1_2.a;
SELECT count(*) FROM r1_local LEFT JOIN (SELECT d1_local.a as a, d1_local.b, d1_1.a AS a_1 FROM d1_local LEFT JOIN d1_local as d1_1 ON d1_local.a = d1_1.a) AS d1_2 ON r1_local.a = d1_2.a;
-- Nested joins
-- It is safe to push the inner join to compute t1. However, as the var of the inner table for the top level join (t1.a) resolves to r1.a, the outer join cannot be pushed down.
SELECT count(*) FROM r1 LEFT JOIN (SELECT r1.a, d1.b FROM r1 LEFT JOIN d1 ON r1.a = d1.a) AS t1 ON r1.a = t1.a;
SELECT count(*) FROM r1_local LEFT JOIN (SELECT r1_local.a, d1_local.b FROM r1_local LEFT JOIN d1_local ON r1_local.a = d1_local.a) AS t1 ON r1_local.a = t1.a;
EXPLAIN (COSTS OFF) SELECT count(*) FROM r1 LEFT JOIN (SELECT r1.a, d1.b FROM r1 LEFT JOIN d1 ON r1.a = d1.a) AS t1 ON r1.a = t1.a;
-- In the following case, it is safe to push down both joins as t1.a resolves to d1.a.
SELECT count(*) FROM r1 LEFT JOIN (SELECT d1.a, d1.b FROM r1 LEFT JOIN d1 ON r1.a = d1.a) AS t1 ON r1.a = t1.a;
SELECT count(*) FROM r1_local LEFT JOIN (SELECT d1_local.a, d1_local.b FROM r1_local LEFT JOIN d1_local ON r1_local.a = d1_local.a) AS t1 ON r1_local.a = t1.a;
EXPLAIN (COSTS OFF) SELECT count(*) FROM r1 LEFT JOIN (SELECT d1.a, d1.b FROM r1 LEFT JOIN d1 ON r1.a = d1.a) AS t1 ON r1.a = t1.a;
-- In the following case, the lower level joins will be pushed down, but as the top level join is chained, subquery pushdown will not be applied at the top level.
SELECT count(*) FROM r1 LEFT JOIN (SELECT d1.a, d1.b FROM r1 LEFT JOIN d1 ON r1.a = d1.a) AS t1 ON t1.a = r1.a LEFT JOIN (SELECT d2.a, d2.c FROM r1 LEFT JOIN d2 ON r1.a = d2.a) AS t2 ON t1.a = t2.a;
SELECT count(*) FROM r1_local LEFT JOIN (SELECT d1_local.a, d1_local.b FROM r1_local LEFT JOIN d1_local ON r1_local.a = d1_local.a) AS t1 ON t1.a = r1_local.a LEFT JOIN (SELECT d2_local.a, d2_local.c FROM r1_local LEFT JOIN d2_local ON r1_local.a = d2_local.a) AS t2 ON t1.a = t2.a;
EXPLAIN (COSTS OFF) SELECT count(*) FROM r1 LEFT JOIN (SELECT d1.a, d1.b FROM r1 LEFT JOIN d1 ON r1.a = d1.a) AS t1 ON t1.a = r1.a LEFT JOIN (SELECT d2.a, d2.c FROM r1 LEFT JOIN d2 ON r1.a = d2.a) AS t2 ON t1.a = t2.a;
--- As both subqueries are pushed and the top level join is over their results on distribution colums, the query is pushed down as a whole.
SELECT count(*) FROM (SELECT d1_1.a, r1.b FROM r1 LEFT JOIN d1 as d1_1 ON r1.a = d1_1.a) AS t1 LEFT JOIN
(SELECT d2.a, d2.c, r1.b FROM r1 LEFT JOIN d2 ON r1.a = d2.a) AS t2 ON t1.a = t2.a;
SELECT count(*) FROM (SELECT d1_1.a, r1_local.b FROM r1_local LEFT JOIN d1_local as d1_1 ON r1_local.a = d1_1.a) AS t1_local LEFT JOIN
(SELECT d2_local.a, d2_local.c, r1_local.b FROM r1_local LEFT JOIN d2_local ON r1_local.a = d2_local.a) AS t2_local ON t1_local.a = t2_local.a;
EXPLAIN (COSTS OFF) SELECT count(*) FROM (SELECT d1_1.a, r1.b FROM r1 LEFT JOIN d1 as d1_1 ON r1.a = d1_1.a) AS t1 LEFT JOIN
(SELECT d2.a, d2.c, r1.b FROM r1 LEFT JOIN d2 ON r1.a = d2.a) AS t2 ON t1.a = t2.a;