CREATE SCHEMA adaptive_executor; SET search_path TO adaptive_executor; SET citus.task_executor_type to 'adaptive'; SET citus.shard_replication_factor to 1; SET citus.enable_repartition_joins TO true; CREATE TABLE ab(a int, b int); SELECT create_distributed_table('ab', 'a'); create_distributed_table --------------------------------------------------------------------- (1 row) INSERT INTO ab SELECT *,* FROM generate_series(1,10); SELECT COUNT(*) FROM ab k, ab l WHERE k.a = l.b; count --------------------------------------------------------------------- 10 (1 row) SELECT COUNT(*) FROM ab k, ab l, ab m, ab t WHERE k.a = l.b AND k.a = m.b AND t.b = l.a; count --------------------------------------------------------------------- 10 (1 row) SELECT count(*) FROM (SELECT k.a FROM ab k, ab l WHERE k.a = l.b) first, (SELECT * FROM ab) second WHERE first.a = second.b; count --------------------------------------------------------------------- 10 (1 row) BEGIN; SELECT count(*) FROM (SELECT k.a FROM ab k, ab l WHERE k.a = l.b) first, (SELECT * FROM ab) second WHERE first.a = second.b; count --------------------------------------------------------------------- 10 (1 row) SELECT count(*) FROM (SELECT k.a FROM ab k, ab l WHERE k.a = l.b) first, (SELECT * FROM ab) second WHERE first.a = second.b; count --------------------------------------------------------------------- 10 (1 row) SELECT count(*) FROM (SELECT k.a FROM ab k, ab l WHERE k.a = l.b) first, (SELECT * FROM ab) second WHERE first.a = second.b; count --------------------------------------------------------------------- 10 (1 row) ROLLBACK; BEGIN; INSERT INTO ab values(1, 2); -- DDL happened before repartition query in a transaction block, so this should error. SELECT count(*) FROM (SELECT k.a FROM ab k, ab l WHERE k.a = l.b) first, (SELECT * FROM ab) second WHERE first.a = second.b; ERROR: cannot open new connections after the first modification command within a transaction ROLLBACK; SET citus.enable_single_hash_repartition_joins TO ON; CREATE TABLE single_hash_repartition_first (id int, sum int, avg float); CREATE TABLE single_hash_repartition_second (id int, sum int, avg float); CREATE TABLE ref_table (id int, sum int, avg float); SELECT create_distributed_table('single_hash_repartition_first', 'id'); create_distributed_table --------------------------------------------------------------------- (1 row) SELECT create_distributed_table('single_hash_repartition_second', 'id'); create_distributed_table --------------------------------------------------------------------- (1 row) SELECT create_reference_table('ref_table'); create_reference_table --------------------------------------------------------------------- (1 row) -- single hash repartition after bcast joins EXPLAIN SELECT count(*) FROM ref_table r1, single_hash_repartition_second t1, single_hash_repartition_first t2 WHERE r1.id = t1.id AND t2.sum = t1.id; QUERY PLAN --------------------------------------------------------------------- Aggregate (cost=0.00..0.00 rows=0 width=0) -> Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=0 width=0) Task Count: 4 Tasks Shown: None, not supported for re-partition queries -> MapMergeJob Map Task Count: 4 Merge Task Count: 4 (7 rows) -- a more complicated join order, first colocated join, later single hash repartition join EXPLAIN SELECT count(*) FROM single_hash_repartition_first t1, single_hash_repartition_first t2, single_hash_repartition_second t3 WHERE t1.id = t2.id AND t1.sum = t3.id; QUERY PLAN --------------------------------------------------------------------- Aggregate (cost=0.00..0.00 rows=0 width=0) -> Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=0 width=0) Task Count: 4 Tasks Shown: None, not supported for re-partition queries -> MapMergeJob Map Task Count: 4 Merge Task Count: 4 (7 rows) SET citus.enable_single_hash_repartition_joins TO OFF; DROP SCHEMA adaptive_executor CASCADE; NOTICE: drop cascades to 4 other objects DETAIL: drop cascades to table ab drop cascades to table single_hash_repartition_first drop cascades to table single_hash_repartition_second drop cascades to table ref_table