CREATE SCHEMA recurring_join_pushdown; SET search_path TO recurring_join_pushdown; SET citus.next_shard_id TO 1520000; SET citus.shard_count TO 4; CREATE TABLE r1(a int, b int); SELECT create_reference_table('r1'); create_reference_table --------------------------------------------------------------------- (1 row) INSERT INTO r1 VALUES (1,10), (1,11), (1,20), (2,10), (2,12), (2, 20), (3, 20), (10, 1), (10, 2); --- For testing, remove before merge CREATE TABLE r1_local(like r1); INSERT INTO r1_local select * from r1; CREATE TABLE d1(a int, b int); SELECT create_distributed_table('d1', 'a'); create_distributed_table --------------------------------------------------------------------- (1 row) INSERT INTO d1 VALUES (1,10), (1,11), (1,20), (2,10), (2,12), (2, 20), (4, 10); --- For testing, remove before merge CREATE TABLE d1_local(like d1); INSERT INTO d1_local select * from d1; SET client_min_messages TO DEBUG3; -- Basic test cases -- Test that the join is pushed down to the worker nodes, using "using" syntax SELECT count(*) FROM r1 LEFT JOIN d1 using (a); DEBUG: no shard pruning constraints on d1 found DEBUG: shard count after pruning for d1: 4 DEBUG: Router planner cannot handle multi-shard select queries DEBUG: a push down safe left join with recurring left side DEBUG: no shard pruning constraints on d1 found DEBUG: shard count after pruning for d1: 4 DEBUG: assigned task to node localhost:xxxxx DEBUG: assigned task to node localhost:xxxxx DEBUG: assigned task to node localhost:xxxxx DEBUG: assigned task to node localhost:xxxxx count --------------------------------------------------------------------- 21 (1 row) SELECT count(*) FROM r1_local LEFT JOIN d1_local using (a); count --------------------------------------------------------------------- 21 (1 row) SELECT * FROM r1 LEFT JOIN d1 using (a, b) ORDER BY 1, 2; DEBUG: no shard pruning constraints on d1 found DEBUG: shard count after pruning for d1: 4 DEBUG: Router planner cannot handle multi-shard select queries DEBUG: a push down safe left join with recurring left side DEBUG: no shard pruning constraints on d1 found DEBUG: shard count after pruning for d1: 4 DEBUG: assigned task to node localhost:xxxxx DEBUG: assigned task to node localhost:xxxxx DEBUG: assigned task to node localhost:xxxxx DEBUG: assigned task to node localhost:xxxxx a | b --------------------------------------------------------------------- 1 | 10 1 | 11 1 | 20 2 | 10 2 | 12 2 | 20 3 | 20 10 | 1 10 | 2 (9 rows) SELECT * FROM r1_local LEFT JOIN d1_local using (a, b) ORDER BY 1, 2; a | b --------------------------------------------------------------------- 1 | 10 1 | 11 1 | 20 2 | 10 2 | 12 2 | 20 3 | 20 10 | 1 10 | 2 (9 rows) SET client_min_messages TO DEBUG1; -- Test that the join is not pushed down when joined on a non-distributed column SELECT count(*) FROM r1 LEFT JOIN d1 USING (b); DEBUG: recursively planning right side of the left join since the outer side is a recurring rel and it is not feasible to push down DEBUG: recursively planning distributed relation "d1" since it is part of a distributed join node that is outer joined with a recurring rel DEBUG: Wrapping relation "d1" to a subquery DEBUG: generating subplan XXX_1 for subquery SELECT b FROM recurring_join_pushdown.d1 WHERE true DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (recurring_join_pushdown.r1 LEFT JOIN (SELECT NULL::integer AS a, d1_1.b FROM (SELECT intermediate_result.b FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(b integer)) d1_1) d1 USING (b)) count --------------------------------------------------------------------- 16 (1 row) SELECT count(*) FROM r1_local LEFT JOIN d1_local USING (b); count --------------------------------------------------------------------- 16 (1 row) -- Basic test cases with ON syntax -- Test that the join is pushed down to the worker nodes, using "on" syntax SET client_min_messages TO DEBUG3; SELECT count(*) FROM r1 LEFT JOIN d1 ON r1.a = d1.a; DEBUG: no shard pruning constraints on d1 found DEBUG: shard count after pruning for d1: 4 DEBUG: Router planner cannot handle multi-shard select queries DEBUG: a push down safe left join with recurring left side DEBUG: no shard pruning constraints on d1 found DEBUG: shard count after pruning for d1: 4 DEBUG: assigned task to node localhost:xxxxx DEBUG: assigned task to node localhost:xxxxx DEBUG: assigned task to node localhost:xxxxx DEBUG: assigned task to node localhost:xxxxx count --------------------------------------------------------------------- 21 (1 row) SELECT count(*) FROM r1_local LEFT JOIN d1_local ON r1_local.a = d1_local.a; count --------------------------------------------------------------------- 21 (1 row) SELECT * FROM r1 LEFT JOIN d1 ON r1.a = d1.a AND r1.b = d1.b ORDER BY 1, 2; DEBUG: no shard pruning constraints on d1 found DEBUG: shard count after pruning for d1: 4 DEBUG: Router planner cannot handle multi-shard select queries DEBUG: a push down safe left join with recurring left side DEBUG: no shard pruning constraints on d1 found DEBUG: shard count after pruning for d1: 4 DEBUG: assigned task to node localhost:xxxxx DEBUG: assigned task to node localhost:xxxxx DEBUG: assigned task to node localhost:xxxxx DEBUG: assigned task to node localhost:xxxxx a | b | a | b --------------------------------------------------------------------- 1 | 10 | 1 | 10 1 | 11 | 1 | 11 1 | 20 | 1 | 20 2 | 10 | 2 | 10 2 | 12 | 2 | 12 2 | 20 | 2 | 20 3 | 20 | | 10 | 1 | | 10 | 2 | | (9 rows) SELECT * FROM r1_local LEFT JOIN d1_local ON r1_local.a = d1_local.a AND r1_local.b = d1_local.b ORDER BY 1, 2; a | b | a | b --------------------------------------------------------------------- 1 | 10 | 1 | 10 1 | 11 | 1 | 11 1 | 20 | 1 | 20 2 | 10 | 2 | 10 2 | 12 | 2 | 12 2 | 20 | 2 | 20 3 | 20 | | 10 | 1 | | 10 | 2 | | (9 rows) SELECT count(*) FROM r1 LEFT JOIN d1 ON r1.b = d1.a; DEBUG: no shard pruning constraints on d1 found DEBUG: shard count after pruning for d1: 4 DEBUG: Router planner cannot handle multi-shard select queries DEBUG: a push down safe left join with recurring left side DEBUG: no shard pruning constraints on d1 found DEBUG: shard count after pruning for d1: 4 DEBUG: assigned task to node localhost:xxxxx DEBUG: assigned task to node localhost:xxxxx DEBUG: assigned task to node localhost:xxxxx DEBUG: assigned task to node localhost:xxxxx count --------------------------------------------------------------------- 13 (1 row) SELECT count(*) FROM r1_local LEFT JOIN d1_local ON r1_local.b = d1_local.a; count --------------------------------------------------------------------- 13 (1 row) -- Test that the join is not pushed down when joined on a non-distributed column SELECT count(*) FROM r1 LEFT JOIN d1 ON r1.b = d1.b; DEBUG: no shard pruning constraints on d1 found DEBUG: shard count after pruning for d1: 4 DEBUG: Router planner cannot handle multi-shard select queries DEBUG: recursively planning right side of the left join since the outer side is a recurring rel and it is not feasible to push down DEBUG: recursively planning distributed relation "d1" since it is part of a distributed join node that is outer joined with a recurring rel DEBUG: Wrapping relation "d1" to a subquery DEBUG: no shard pruning constraints on d1 found DEBUG: shard count after pruning for d1: 4 DEBUG: Router planner cannot handle multi-shard select queries DEBUG: no shard pruning constraints on d1 found DEBUG: shard count after pruning for d1: 4 DEBUG: assigned task to node localhost:xxxxx DEBUG: assigned task to node localhost:xxxxx DEBUG: assigned task to node localhost:xxxxx DEBUG: assigned task to node localhost:xxxxx DEBUG: generating subplan XXX_1 for subquery SELECT b FROM recurring_join_pushdown.d1 WHERE true DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (recurring_join_pushdown.r1 LEFT JOIN (SELECT NULL::integer AS a, d1_1.b FROM (SELECT intermediate_result.b FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(b integer)) d1_1) d1 ON ((r1.b OPERATOR(pg_catalog.=) d1.b))) DEBUG: Creating router plan count --------------------------------------------------------------------- 16 (1 row) SELECT count(*) FROM r1_local LEFT JOIN d1_local ON r1_local.b = d1_local.b; count --------------------------------------------------------------------- 16 (1 row) SELECT count(*) FROM r1 LEFT JOIN d1 ON r1.a = d1.b; DEBUG: no shard pruning constraints on d1 found DEBUG: shard count after pruning for d1: 4 DEBUG: Router planner cannot handle multi-shard select queries DEBUG: recursively planning right side of the left join since the outer side is a recurring rel and it is not feasible to push down DEBUG: recursively planning distributed relation "d1" since it is part of a distributed join node that is outer joined with a recurring rel DEBUG: Wrapping relation "d1" to a subquery DEBUG: no shard pruning constraints on d1 found DEBUG: shard count after pruning for d1: 4 DEBUG: Router planner cannot handle multi-shard select queries DEBUG: no shard pruning constraints on d1 found DEBUG: shard count after pruning for d1: 4 DEBUG: assigned task to node localhost:xxxxx DEBUG: assigned task to node localhost:xxxxx DEBUG: assigned task to node localhost:xxxxx DEBUG: assigned task to node localhost:xxxxx DEBUG: generating subplan XXX_1 for subquery SELECT b FROM recurring_join_pushdown.d1 WHERE true DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (recurring_join_pushdown.r1 LEFT JOIN (SELECT NULL::integer AS a, d1_1.b FROM (SELECT intermediate_result.b FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(b integer)) d1_1) d1 ON ((r1.a OPERATOR(pg_catalog.=) d1.b))) DEBUG: Creating router plan count --------------------------------------------------------------------- 13 (1 row) SELECT count(*) FROM r1_local LEFT JOIN d1_local ON r1_local.a = d1_local.b; count --------------------------------------------------------------------- 13 (1 row) SET client_min_messages TO DEBUG1; -- Test that the join is not pushed down when joined on a distributed column with disjunctive conditions SELECT count(*) FROM r1 LEFT JOIN d1 ON r1.a = d1.a OR r1.b = d1.b; DEBUG: recursively planning right side of the left join since the outer side is a recurring rel and it is not feasible to push down DEBUG: recursively planning distributed relation "d1" since it is part of a distributed join node that is outer joined with a recurring rel DEBUG: Wrapping relation "d1" to a subquery DEBUG: generating subplan XXX_1 for subquery SELECT a, b FROM recurring_join_pushdown.d1 WHERE true DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (recurring_join_pushdown.r1 LEFT JOIN (SELECT d1_1.a, d1_1.b FROM (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) d1_1) d1 ON (((r1.a OPERATOR(pg_catalog.=) d1.a) OR (r1.b OPERATOR(pg_catalog.=) d1.b)))) count --------------------------------------------------------------------- 28 (1 row) SELECT count(*) FROM r1_local LEFT JOIN d1_local ON r1_local.a = d1_local.a OR r1_local.b = d1_local.b; count --------------------------------------------------------------------- 28 (1 row) -- Test join pushdown behavior when the inner part of the join is a subquery -- Using 'using' syntax SET client_min_messages TO DEBUG3; SELECT count(*) FROM r1 LEFT JOIN (SELECT * FROM d1) USING (a); DEBUG: no shard pruning constraints on d1 found DEBUG: shard count after pruning for d1: 4 DEBUG: Router planner cannot handle multi-shard select queries DEBUG: a push down safe left join with recurring left side DEBUG: no shard pruning constraints on d1 found DEBUG: shard count after pruning for d1: 4 DEBUG: assigned task to node localhost:xxxxx DEBUG: assigned task to node localhost:xxxxx DEBUG: assigned task to node localhost:xxxxx DEBUG: assigned task to node localhost:xxxxx count --------------------------------------------------------------------- 21 (1 row) SELECT count(*) FROM r1_local LEFT JOIN (SELECT * FROM d1_local) USING (a); count --------------------------------------------------------------------- 21 (1 row) SELECT count(*) FROM r1 LEFT JOIN (SELECT * FROM d1 WHERE a > 1) USING (a); DEBUG: no shard pruning constraints on d1 found DEBUG: shard count after pruning for d1: 4 DEBUG: Router planner cannot handle multi-shard select queries DEBUG: a push down safe left join with recurring left side DEBUG: no shard pruning constraints on d1 found DEBUG: shard count after pruning for d1: 4 DEBUG: assigned task to node localhost:xxxxx DEBUG: assigned task to node localhost:xxxxx DEBUG: assigned task to node localhost:xxxxx DEBUG: assigned task to node localhost:xxxxx count --------------------------------------------------------------------- 15 (1 row) SELECT count(*) FROM r1_local LEFT JOIN (SELECT * FROM d1_local WHERE a > 1) USING (a); count --------------------------------------------------------------------- 15 (1 row) SELECT count(*) FROM r1 LEFT JOIN (SELECT * FROM (SELECT * FROM d1) WHERE a > 1) USING (a); DEBUG: no shard pruning constraints on d1 found DEBUG: shard count after pruning for d1: 4 DEBUG: Router planner cannot handle multi-shard select queries DEBUG: a push down safe left join with recurring left side DEBUG: no shard pruning constraints on d1 found DEBUG: shard count after pruning for d1: 4 DEBUG: assigned task to node localhost:xxxxx DEBUG: assigned task to node localhost:xxxxx DEBUG: assigned task to node localhost:xxxxx DEBUG: assigned task to node localhost:xxxxx count --------------------------------------------------------------------- 15 (1 row) SELECT count(*) FROM r1_local LEFT JOIN (SELECT * FROM (SELECT * FROM d1_local) WHERE a > 1) USING (a); count --------------------------------------------------------------------- 15 (1 row) SELECT count(*) FROM r1 LEFT JOIN (SELECT * FROM d1 JOIN d1 as d1_1 USING (a)) USING (a); DEBUG: no shard pruning constraints on d1 found DEBUG: shard count after pruning for d1: 4 DEBUG: no shard pruning constraints on d1 found DEBUG: shard count after pruning for d1: 4 DEBUG: Router planner cannot handle multi-shard select queries DEBUG: a push down safe left join with recurring left side DEBUG: no shard pruning constraints on d1 found DEBUG: shard count after pruning for d1: 4 DEBUG: no shard pruning constraints on d1 found DEBUG: shard count after pruning for d1: 4 DEBUG: assigned task to node localhost:xxxxx DEBUG: assigned task to node localhost:xxxxx DEBUG: assigned task to node localhost:xxxxx DEBUG: assigned task to node localhost:xxxxx count --------------------------------------------------------------------- 57 (1 row) SELECT count(*) FROM r1_local LEFT JOIN (SELECT * FROM d1_local JOIN d1_local as d1_1 USING (a)) USING (a); count --------------------------------------------------------------------- 57 (1 row) SELECT count(*) FROM r1 LEFT JOIN (d1 LEFT JOIN d1 as d1_1 USING (a)) USING (a); DEBUG: no shard pruning constraints on d1 found DEBUG: shard count after pruning for d1: 4 DEBUG: no shard pruning constraints on d1 found DEBUG: shard count after pruning for d1: 4 DEBUG: Router planner cannot handle multi-shard select queries DEBUG: recursively planning right side of the left join since the outer side is a recurring rel and it is not feasible to push down DEBUG: recursively planning distributed relation "d1" since it is part of a distributed join node that is outer joined with a recurring rel DEBUG: Wrapping relation "d1" to a subquery DEBUG: no shard pruning constraints on d1 found DEBUG: shard count after pruning for d1: 4 DEBUG: Router planner cannot handle multi-shard select queries DEBUG: no shard pruning constraints on d1 found DEBUG: shard count after pruning for d1: 4 DEBUG: assigned task to node localhost:xxxxx DEBUG: assigned task to node localhost:xxxxx DEBUG: assigned task to node localhost:xxxxx DEBUG: assigned task to node localhost:xxxxx DEBUG: generating subplan XXX_1 for subquery SELECT a FROM recurring_join_pushdown.d1 WHERE true DEBUG: recursively planning distributed relation "d1" "d1_1" since it is part of a distributed join node that is outer joined with a recurring rel DEBUG: Wrapping relation "d1" "d1_1" to a subquery DEBUG: no shard pruning constraints on d1 found DEBUG: shard count after pruning for d1: 4 DEBUG: Router planner cannot handle multi-shard select queries DEBUG: no shard pruning constraints on d1 found DEBUG: shard count after pruning for d1: 4 DEBUG: assigned task to node localhost:xxxxx DEBUG: assigned task to node localhost:xxxxx DEBUG: assigned task to node localhost:xxxxx DEBUG: assigned task to node localhost:xxxxx DEBUG: generating subplan XXX_2 for subquery SELECT a FROM recurring_join_pushdown.d1 d1_1 WHERE true DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (recurring_join_pushdown.r1 LEFT JOIN ((SELECT d1_2.a, NULL::integer AS b FROM (SELECT intermediate_result.a FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer)) d1_2) d1 LEFT JOIN (SELECT d1_1_1.a, NULL::integer AS b FROM (SELECT intermediate_result.a FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(a integer)) d1_1_1) d1_1 USING (a)) USING (a)) DEBUG: Creating router plan count --------------------------------------------------------------------- 57 (1 row) SELECT count(*) FROM r1_local LEFT JOIN (d1_local LEFT JOIN d1_local as d1_1 USING (a)) USING (a); count --------------------------------------------------------------------- 57 (1 row) -- Using 'on' syntax SET client_min_messages TO DEBUG3; SELECT count(*) FROM r1 LEFT JOIN (SELECT * FROM d1) AS d1 ON r1.a = d1.a; DEBUG: no shard pruning constraints on d1 found DEBUG: shard count after pruning for d1: 4 DEBUG: Router planner cannot handle multi-shard select queries DEBUG: a push down safe left join with recurring left side DEBUG: no shard pruning constraints on d1 found DEBUG: shard count after pruning for d1: 4 DEBUG: assigned task to node localhost:xxxxx DEBUG: assigned task to node localhost:xxxxx DEBUG: assigned task to node localhost:xxxxx DEBUG: assigned task to node localhost:xxxxx count --------------------------------------------------------------------- 21 (1 row) SELECT count(*) FROM r1_local LEFT JOIN (SELECT * FROM d1_local) AS d1_local ON r1_local.a = d1_local.a; count --------------------------------------------------------------------- 21 (1 row) SELECT count(*) FROM r1 LEFT JOIN (SELECT * FROM d1 WHERE a > 1) AS d1 ON r1.a = d1.a; DEBUG: no shard pruning constraints on d1 found DEBUG: shard count after pruning for d1: 4 DEBUG: Router planner cannot handle multi-shard select queries DEBUG: a push down safe left join with recurring left side DEBUG: no shard pruning constraints on d1 found DEBUG: shard count after pruning for d1: 4 DEBUG: assigned task to node localhost:xxxxx DEBUG: assigned task to node localhost:xxxxx DEBUG: assigned task to node localhost:xxxxx DEBUG: assigned task to node localhost:xxxxx count --------------------------------------------------------------------- 15 (1 row) SELECT count(*) FROM r1_local LEFT JOIN (SELECT * FROM d1_local WHERE a > 1) AS d1_local ON r1_local.a = d1_local.a; count --------------------------------------------------------------------- 15 (1 row) SELECT count(*) FROM r1 LEFT JOIN (SELECT * FROM (SELECT * FROM d1) AS d1 WHERE a > 1) AS d1 ON r1.a = d1.a; DEBUG: no shard pruning constraints on d1 found DEBUG: shard count after pruning for d1: 4 DEBUG: Router planner cannot handle multi-shard select queries DEBUG: a push down safe left join with recurring left side DEBUG: no shard pruning constraints on d1 found DEBUG: shard count after pruning for d1: 4 DEBUG: assigned task to node localhost:xxxxx DEBUG: assigned task to node localhost:xxxxx DEBUG: assigned task to node localhost:xxxxx DEBUG: assigned task to node localhost:xxxxx count --------------------------------------------------------------------- 15 (1 row) SELECT count(*) FROM r1_local LEFT JOIN (SELECT * FROM (SELECT * FROM d1_local) AS d1_local WHERE a > 1) AS d1_local ON r1_local.a = d1_local.a; count --------------------------------------------------------------------- 15 (1 row) SELECT count(*) FROM r1 LEFT JOIN (SELECT d1.a as a, d1.b, d1_1.a AS a_1 FROM d1 LEFT JOIN d1 as d1_1 ON d1.a = d1_1.a) AS d1_2 ON r1.a = d1_2.a; DEBUG: no shard pruning constraints on d1 found DEBUG: shard count after pruning for d1: 4 DEBUG: no shard pruning constraints on d1 found DEBUG: shard count after pruning for d1: 4 DEBUG: Router planner cannot handle multi-shard select queries DEBUG: a push down safe left join with recurring left side DEBUG: no shard pruning constraints on d1 found DEBUG: shard count after pruning for d1: 4 DEBUG: no shard pruning constraints on d1 found DEBUG: shard count after pruning for d1: 4 DEBUG: assigned task to node localhost:xxxxx DEBUG: assigned task to node localhost:xxxxx DEBUG: assigned task to node localhost:xxxxx DEBUG: assigned task to node localhost:xxxxx count --------------------------------------------------------------------- 57 (1 row) SELECT count(*) FROM r1_local LEFT JOIN (SELECT d1_local.a as a, d1_local.b, d1_1.a AS a_1 FROM d1_local LEFT JOIN d1_local as d1_1 ON d1_local.a = d1_1.a) AS d1_2 ON r1_local.a = d1_2.a; count --------------------------------------------------------------------- 57 (1 row)