diff --git a/src/test/regress/citus_tests/config.py b/src/test/regress/citus_tests/config.py index f9d409ba8..0f1530fc5 100644 --- a/src/test/regress/citus_tests/config.py +++ b/src/test/regress/citus_tests/config.py @@ -257,6 +257,7 @@ class CitusUnusualExecutorConfig(CitusMXBaseClusterConfig): # "citus.force_max_query_parallelization": "on", "citus.binary_worker_copy_format": False, "citus.enable_binary_protocol": False, + "citus.local_table_join_policy": "prefer-distributed", } @@ -290,7 +291,7 @@ class CitusUnusualQuerySettingsConfig(CitusMXBaseClusterConfig): "citus.enable_single_hash_repartition_joins": True, "citus.recover_2pc_interval": "1s", "citus.remote_task_check_interval": "1ms", - "citus.values_materialization_threshold" : "0" + "citus.values_materialization_threshold": "0", } diff --git a/src/test/regress/create_schedule b/src/test/regress/create_schedule index 40a585a2e..ef218a161 100644 --- a/src/test/regress/create_schedule +++ b/src/test/regress/create_schedule @@ -1,3 +1,4 @@ test: create_test intermediate_result_pruning_create test: prepared_statements_create_load ch_benchmarks_create_load test: dropped_columns_create_load distributed_planning_create_load +test: local_dist_join_load diff --git a/src/test/regress/expected/local_dist_join.out b/src/test/regress/expected/local_dist_join.out new file mode 100644 index 000000000..8339a0d88 --- /dev/null +++ b/src/test/regress/expected/local_dist_join.out @@ -0,0 +1,906 @@ +SET search_path TO local_dist_join_mixed; +-- very simple 1-1 Joins +SELECT count(*) FROM distributed JOIN local USING (id); + count +--------------------------------------------------------------------- + 101 +(1 row) + +SELECT count(*) FROM distributed JOIN local ON (name = title); + count +--------------------------------------------------------------------- + 101 +(1 row) + +SELECT count(*) FROM distributed d1 JOIN local ON (name = d1.id::text); + count +--------------------------------------------------------------------- + 10201 +(1 row) + +SELECT count(*) FROM distributed d1 JOIN local ON (name = d1.id::text AND d1.id < local.title::int); + count +--------------------------------------------------------------------- + 5050 +(1 row) + +SELECT count(*) FROM distributed d1 JOIN local ON (name = d1.id::text AND d1.id < local.title::int) WHERE d1.id = 1; + count +--------------------------------------------------------------------- + 99 +(1 row) + +SELECT count(*) FROM distributed JOIN local USING (id) WHERE false; + count +--------------------------------------------------------------------- + 0 +(1 row) + +SELECT count(*) FROM distributed d1 JOIN local ON (name = d1.id::text AND d1.id < local.title::int) WHERE d1.id = 1 OR True; + count +--------------------------------------------------------------------- + 5050 +(1 row) + +SELECT count(*) FROM distributed d1 JOIN local ON (name::int + local.id > d1.id AND d1.id < local.title::int) WHERE d1.id = 1; + count +--------------------------------------------------------------------- + 99 +(1 row) + +SELECT count(*) FROM distributed JOIN local ON (hashtext(name) = hashtext(title)); + count +--------------------------------------------------------------------- + 101 +(1 row) + +SELECT hashtext(local.id::text) FROM distributed JOIN local ON (hashtext(name) = hashtext(title)) ORDER BY 1 LIMIT 4; + hashtext +--------------------------------------------------------------------- + -2114455578 + -2097988278 + -1997006946 + -1985772843 +(4 rows) + +SELECT '' as "xxx", local.*, 'xxx' as "test" FROM distributed JOIN local ON (hashtext(name) = hashtext(title)) ORDER BY 1,2,3 LIMIT 4; + xxx | id | title | test +--------------------------------------------------------------------- + | 0 | 0 | xxx + | 1 | 1 | xxx + | 2 | 2 | xxx + | 3 | 3 | xxx +(4 rows) + +SELECT local.title, count(*) FROM distributed JOIN local USING (id) GROUP BY 1 ORDER BY 1, 2 DESC LIMIT 5; + title | count +--------------------------------------------------------------------- + 0 | 1 + 1 | 1 + 10 | 1 + 100 | 1 + 11 | 1 +(5 rows) + +SELECT distributed.id as id1, local.id as id2 FROM distributed JOIN local USING(id) ORDER BY distributed.id + local.id LIMIT 5; + id1 | id2 +--------------------------------------------------------------------- + 0 | 0 + 1 | 1 + 2 | 2 + 3 | 3 + 4 | 4 +(5 rows) + +SELECT distributed.id as id1, local.id as id2, count(*) FROM distributed JOIN local USING(id) GROUP BY distributed.id, local.id ORDER BY 1,2 LIMIT 5; + id1 | id2 | count +--------------------------------------------------------------------- + 0 | 0 | 1 + 1 | 1 | 1 + 2 | 2 | 1 + 3 | 3 | 1 + 4 | 4 | 1 +(5 rows) + +-- basic subqueries that cannot be pulled up +SELECT count(*) FROM (SELECT *, random() FROM distributed) as d1 JOIN local USING (id); + count +--------------------------------------------------------------------- + 101 +(1 row) + +SELECT count(*) FROM (SELECT *, random() FROM distributed) as d1 JOIN local ON (name = title); + count +--------------------------------------------------------------------- + 101 +(1 row) + +SELECT count(*) FROM (SELECT *, random() FROM distributed) as d1 JOIN local ON (name = d1.id::text); + count +--------------------------------------------------------------------- + 10201 +(1 row) + +SELECT count(*) FROM (SELECT *, random() FROM distributed) as d1 JOIN local ON (name = d1.id::text AND d1.id < local.title::int); + count +--------------------------------------------------------------------- + 5050 +(1 row) + +SELECT count(*) FROM (SELECT *, random() FROM distributed) as d1 JOIN local ON (name = d1.id::text AND d1.id < local.title::int) WHERE d1.id = 1; + count +--------------------------------------------------------------------- + 99 +(1 row) + +SELECT count(*) FROM (SELECT *, random() FROM distributed) as d1 JOIN local ON (name = d1.id::text AND d1.id < local.title::int) WHERE d1.id = 1 AND false; + count +--------------------------------------------------------------------- + 0 +(1 row) + +SELECT count(*) FROM (SELECT *, random() FROM distributed) as d1 JOIN local ON (name = d1.id::text AND d1.id < local.title::int) WHERE d1.id = 1 OR true; + count +--------------------------------------------------------------------- + 5050 +(1 row) + +-- pull up subqueries as they are pretty simple, local table should be recursively planned +SELECT count(*) FROM (SELECT * FROM distributed) as d1 JOIN local USING (id); + count +--------------------------------------------------------------------- + 101 +(1 row) + +SELECT count(*) FROM (SELECT * FROM distributed) as d1 JOIN local ON (name = title); + count +--------------------------------------------------------------------- + 101 +(1 row) + +SELECT count(*) FROM (SELECT * FROM distributed) as d1 JOIN local ON (name = d1.id::text); + count +--------------------------------------------------------------------- + 10201 +(1 row) + +SELECT count(*) FROM (SELECT * FROM distributed) as d1 JOIN local ON (name = d1.id::text AND d1.id < local.title::int); + count +--------------------------------------------------------------------- + 5050 +(1 row) + +SELECT count(*) FROM (SELECT * FROM distributed) as d1 JOIN local ON (name = d1.id::text AND d1.id < local.title::int) WHERE d1.id = 1; + count +--------------------------------------------------------------------- + 99 +(1 row) + +SELECT count(*) FROM (SELECT * FROM distributed) as d1 JOIN local ON (name = d1.id::text AND d1.id < local.title::int) WHERE d1.id = 1 AND false; + count +--------------------------------------------------------------------- + 0 +(1 row) + +SELECT count(*) FROM (SELECT * FROM distributed) as d1 JOIN local ON (name = d1.id::text AND d1.id < local.title::int) WHERE d1.id = 1 OR true; + count +--------------------------------------------------------------------- + 5050 +(1 row) + +SELECT count(*) FROM (SELECT * FROM distributed WHERE id = 2) as d1 JOIN local ON (name = d1.id::text AND d1.id < local.title::int) WHERE d1.id = 1; + count +--------------------------------------------------------------------- + 0 +(1 row) + +SELECT count(*) FROM (SELECT * FROM distributed WHERE false) as d1 JOIN local ON (name = d1.id::text AND d1.id < local.title::int) WHERE d1.id = 1; + count +--------------------------------------------------------------------- + 0 +(1 row) + +-- final queries are pushdown queries +SELECT sum(d1.id + local.id) FROM distributed d1 JOIN local USING (id); + sum +--------------------------------------------------------------------- + 10100 +(1 row) + +SELECT sum(d1.id + local.id) OVER (PARTITION BY d1.id) FROM distributed d1 JOIN local USING (id) ORDER BY 1 DESC LIMIT 4; + sum +--------------------------------------------------------------------- + 200 + 198 + 196 + 194 +(4 rows) + +SELECT count(*) FROM distributed d1 JOIN local USING (id) LEFT JOIN distributed d2 USING (id) ORDER BY 1 DESC LIMIT 4; + count +--------------------------------------------------------------------- + 101 +(1 row) + +SELECT count(DISTINCT d1.name::int * local.id) FROM distributed d1 JOIN local USING (id); + count +--------------------------------------------------------------------- + 101 +(1 row) + +-- final queries are router queries +SELECT sum(d1.id + local.id) FROM distributed d1 JOIN local USING (id) WHERE d1.id = 1; + sum +--------------------------------------------------------------------- + 2 +(1 row) + +SELECT sum(d1.id + local.id) OVER (PARTITION BY d1.id) FROM distributed d1 JOIN local USING (id) WHERE d1.id = 1 ORDER BY 1 DESC LIMIT 4; + sum +--------------------------------------------------------------------- + 2 +(1 row) + +SELECT count(*) FROM distributed d1 JOIN local USING (id) LEFT JOIN distributed d2 USING (id) WHERE d2.id = 1 ORDER BY 1 DESC LIMIT 4; + count +--------------------------------------------------------------------- + 1 +(1 row) + +-- final queries are pull to coordinator queries +SELECT sum(d1.id + local.id) OVER (PARTITION BY d1.id + local.id) FROM distributed d1 JOIN local USING (id) ORDER BY 1 DESC LIMIT 4; + sum +--------------------------------------------------------------------- + 200 + 198 + 196 + 194 +(4 rows) + +-- nested subqueries +SELECT + count(*) +FROM + (SELECT * FROM (SELECT * FROM distributed) as foo) as bar + JOIN + local + USING(id); + count +--------------------------------------------------------------------- + 101 +(1 row) + +SELECT + count(*) +FROM + (SELECT *, random() FROM (SELECT *, random() FROM distributed) as foo) as bar + JOIN + local + USING(id); + count +--------------------------------------------------------------------- + 101 +(1 row) + +SELECT + count(*) +FROM + (SELECT *, random() FROM (SELECT *, random() FROM distributed) as foo) as bar + JOIN + local + USING(id); + count +--------------------------------------------------------------------- + 101 +(1 row) + +SELECT + count(*) +FROM + (SELECT *, random() FROM (SELECT *, random() FROM distributed) as foo) as bar + JOIN + (SELECT *, random() FROM (SELECT *,random() FROM local) as foo2) as bar2 + USING(id); + count +--------------------------------------------------------------------- + 101 +(1 row) + +-- TODO: Unnecessary recursive planning for local +SELECT + count(*) +FROM + (SELECT *, random() FROM (SELECT *, random() FROM distributed LIMIT 1) as foo) as bar + JOIN + (SELECT *, random() FROM (SELECT *,random() FROM local) as foo2) as bar2 + USING(id); + count +--------------------------------------------------------------------- + 1 +(1 row) + +-- subqueries in WHERE clause +-- is not colocated, and the JOIN inside as well. +-- so should be recursively planned twice +SELECT + count(*) +FROM + distributed +WHERE + id > (SELECT + count(*) + FROM + (SELECT *, random() FROM (SELECT *, random() FROM distributed) as foo) as bar + JOIN + (SELECT *, random() FROM (SELECT *,random() FROM local) as foo2) as bar2 + USING(id) + ); + count +--------------------------------------------------------------------- + 0 +(1 row) + +-- two distributed tables are co-located and JOINed on distribution +-- key, so should be fine to pushdown +SELECT + count(*) +FROM + distributed d_upper +WHERE + (SELECT + bar.id + FROM + (SELECT *, random() FROM (SELECT *, random() FROM distributed WHERE distributed.id = d_upper.id) as foo) as bar + JOIN + (SELECT *, random() FROM (SELECT *,random() FROM local) as foo2) as bar2 + USING(id) + ) IS NOT NULL; + count +--------------------------------------------------------------------- + 101 +(1 row) + +SELECT + count(*) +FROM + distributed d_upper +WHERE + (SELECT + bar.id + FROM + (SELECT *, random() FROM (SELECT *, random() FROM distributed WHERE distributed.id = d_upper.id) as foo) as bar + JOIN + local as foo + USING(id) + ) IS NOT NULL; + count +--------------------------------------------------------------------- + 101 +(1 row) + +SELECT + count(*) +FROM + distributed d_upper +WHERE d_upper.id > + (SELECT + bar.id + FROM + (SELECT *, random() FROM (SELECT *, random() FROM distributed WHERE distributed.id = d_upper.id) as foo) as bar + JOIN + local as foo + USING(id) + ); + count +--------------------------------------------------------------------- + 0 +(1 row) + +-- subqueries in the target list +-- router, should work +select (SELECT local.id) FROM local, distributed WHERE distributed.id = 1 LIMIT 1; + id +--------------------------------------------------------------------- + 0 +(1 row) + +-- set operations +SELECT local.* FROM distributed JOIN local USING (id) + EXCEPT +SELECT local.* FROM distributed JOIN local USING (id); + id | title +--------------------------------------------------------------------- +(0 rows) + +SELECT distributed.* FROM distributed JOIN local USING (id) + EXCEPT +SELECT distributed.* FROM distributed JOIN local USING (id); + id | name | created_at +--------------------------------------------------------------------- +(0 rows) + +SELECT count(*) FROM +( + (SELECT * FROM (SELECT * FROM local) as f JOIN distributed USING (id)) + UNION ALL + (SELECT * FROM (SELECT * FROM local) as f2 JOIN distributed USING (id)) +) bar; + count +--------------------------------------------------------------------- + 202 +(1 row) + +SELECT count(*) FROM +( + (SELECT * FROM (SELECT distributed.* FROM local JOIN distributed USING (id)) as fo) + UNION ALL + (SELECT * FROM (SELECT distributed.* FROM local JOIN distributed USING (id)) as ba) +) bar; + count +--------------------------------------------------------------------- + 202 +(1 row) + +select count(DISTINCT id) +FROM +( + (SELECT * FROM (SELECT distributed.* FROM local JOIN distributed USING (id)) as fo) + UNION ALL + (SELECT * FROM (SELECT distributed.* FROM local JOIN distributed USING (id)) as ba) +) bar; + count +--------------------------------------------------------------------- + 101 +(1 row) + +-- lateral joins +SELECT COUNT(*) FROM local JOIN LATERAL (SELECT * FROM distributed WHERE local.id = distributed.id) as foo ON (true); + count +--------------------------------------------------------------------- + 101 +(1 row) + +SELECT COUNT(*) FROM local JOIN LATERAL (SELECT * FROM distributed WHERE local.id > distributed.id) as foo ON (true); + count +--------------------------------------------------------------------- + 5050 +(1 row) + +SELECT count(*) FROM distributed CROSS JOIN local; + count +--------------------------------------------------------------------- + 10201 +(1 row) + +SELECT count(*) FROM distributed CROSS JOIN local WHERE distributed.id = 1; + count +--------------------------------------------------------------------- + 101 +(1 row) + +-- w count(*) it works fine as PG ignores the inner tables +SELECT count(*) FROM distributed LEFT JOIN local USING (id); + count +--------------------------------------------------------------------- + 101 +(1 row) + +SELECT count(*) FROM local LEFT JOIN distributed USING (id); + count +--------------------------------------------------------------------- + 101 +(1 row) + +SELECT id, name FROM distributed LEFT JOIN local USING (id) ORDER BY 1 LIMIT 1; + id | name +--------------------------------------------------------------------- + 0 | 0 +(1 row) + + SELECT + foo1.id + FROM + (SELECT local.id, local.title FROM local, distributed WHERE local.id = distributed.id ) as foo9, + (SELECT local.id, local.title FROM local, distributed WHERE local.id = distributed.id ) as foo8, + (SELECT local.id, local.title FROM local, distributed WHERE local.id = distributed.id ) as foo7, + (SELECT local.id, local.title FROM local, distributed WHERE local.id = distributed.id ) as foo6, + (SELECT local.id, local.title FROM local, distributed WHERE local.id = distributed.id ) as foo5, + (SELECT local.id, local.title FROM local, distributed WHERE local.id = distributed.id ) as foo4, + (SELECT local.id, local.title FROM local, distributed WHERE local.id = distributed.id ) as foo3, + (SELECT local.id, local.title FROM local, distributed WHERE local.id = distributed.id ) as foo2, + (SELECT local.id, local.title FROM local, distributed WHERE local.id = distributed.id ) as foo10, + (SELECT local.id, local.title FROM local, distributed WHERE local.id = distributed.id ) as foo1 + WHERE + foo1.id = foo9.id AND + foo1.id = foo8.id AND + foo1.id = foo7.id AND + foo1.id = foo6.id AND + foo1.id = foo5.id AND + foo1.id = foo4.id AND + foo1.id = foo3.id AND + foo1.id = foo2.id AND + foo1.id = foo10.id AND + foo1.id = foo1.id +ORDER BY 1; + id +--------------------------------------------------------------------- + 0 + 1 + 2 + 3 + 4 + 5 + 6 + 7 + 8 + 9 + 10 + 11 + 12 + 13 + 14 + 15 + 16 + 17 + 18 + 19 + 20 + 21 + 22 + 23 + 24 + 25 + 26 + 27 + 28 + 29 + 30 + 31 + 32 + 33 + 34 + 35 + 36 + 37 + 38 + 39 + 40 + 41 + 42 + 43 + 44 + 45 + 46 + 47 + 48 + 49 + 50 + 51 + 52 + 53 + 54 + 55 + 56 + 57 + 58 + 59 + 60 + 61 + 62 + 63 + 64 + 65 + 66 + 67 + 68 + 69 + 70 + 71 + 72 + 73 + 74 + 75 + 76 + 77 + 78 + 79 + 80 + 81 + 82 + 83 + 84 + 85 + 86 + 87 + 88 + 89 + 90 + 91 + 92 + 93 + 94 + 95 + 96 + 97 + 98 + 99 + 100 +(101 rows) + +SELECT + foo1.id +FROM + (SELECT local.id FROM distributed, local WHERE local.id = distributed.id ) as foo1, + (SELECT local.id FROM distributed, local WHERE local.id = distributed.id ) as foo2, + (SELECT local.id FROM distributed, local WHERE local.id = distributed.id ) as foo3, + (SELECT local.id FROM distributed, local WHERE local.id = distributed.id ) as foo4, + (SELECT local.id FROM distributed, local WHERE local.id = distributed.id ) as foo5 +WHERE + foo1.id = foo4.id AND + foo1.id = foo2.id AND + foo1.id = foo3.id AND + foo1.id = foo4.id AND + foo1.id = foo5.id +ORDER BY 1; + id +--------------------------------------------------------------------- + 0 + 1 + 2 + 3 + 4 + 5 + 6 + 7 + 8 + 9 + 10 + 11 + 12 + 13 + 14 + 15 + 16 + 17 + 18 + 19 + 20 + 21 + 22 + 23 + 24 + 25 + 26 + 27 + 28 + 29 + 30 + 31 + 32 + 33 + 34 + 35 + 36 + 37 + 38 + 39 + 40 + 41 + 42 + 43 + 44 + 45 + 46 + 47 + 48 + 49 + 50 + 51 + 52 + 53 + 54 + 55 + 56 + 57 + 58 + 59 + 60 + 61 + 62 + 63 + 64 + 65 + 66 + 67 + 68 + 69 + 70 + 71 + 72 + 73 + 74 + 75 + 76 + 77 + 78 + 79 + 80 + 81 + 82 + 83 + 84 + 85 + 86 + 87 + 88 + 89 + 90 + 91 + 92 + 93 + 94 + 95 + 96 + 97 + 98 + 99 + 100 +(101 rows) + +SELECT + foo1.id +FROM + (SELECT local.id FROM distributed, local WHERE local.id = distributed.id AND distributed.id = 1) as foo1, + (SELECT local.id FROM distributed, local WHERE local.id = distributed.id AND distributed.id = 2) as foo2, + (SELECT local.id FROM distributed, local WHERE local.id = distributed.id AND distributed.id = 3) as foo3, + (SELECT local.id FROM distributed, local WHERE local.id = distributed.id AND distributed.id = 4) as foo4, + (SELECT local.id FROM distributed, local WHERE local.id = distributed.id AND distributed.id = 5) as foo5 +WHERE + foo1.id = foo4.id AND + foo1.id = foo2.id AND + foo1.id = foo3.id AND + foo1.id = foo4.id AND + foo1.id = foo5.id +ORDER BY 1; + id +--------------------------------------------------------------------- +(0 rows) + +--https://github.com/citusdata/citus/issues/5384 +-- SELECT +-- count(*) +-- FROM +-- distributed +-- JOIN LATERAL +-- (SELECT +-- * +-- FROM +-- local +-- JOIN +-- distributed d2 +-- ON(true) +-- WHERE local.id = distributed.id AND d2.id = local.id) as foo +-- ON (true); +SELECT local.title, local.title FROM local JOIN distributed USING(id) ORDER BY 1,2 LIMIt 1; + title | title +--------------------------------------------------------------------- + 0 | 0 +(1 row) + +SELECT NULL FROM local JOIN distributed USING(id) ORDER BY 1 LIMIt 1; + ?column? +--------------------------------------------------------------------- + +(1 row) + +SELECT distributed.name, distributed.name, local.title, local.title FROM local JOIN distributed USING(id) ORDER BY 1,2,3,4 LIMIT 1; + name | name | title | title +--------------------------------------------------------------------- + 0 | 0 | 0 | 0 +(1 row) + +SELECT + COUNT(*) +FROM + local +JOIN + distributed +USING + (id) +JOIN + (SELECT id, NULL, NULL FROM distributed) foo +USING + (id); + count +--------------------------------------------------------------------- + 101 +(1 row) + +BEGIN; +SELECT COUNT(DISTINCT title) FROM local; + count +--------------------------------------------------------------------- + 101 +(1 row) + +UPDATE + local +SET + title = 'test' +FROM + distributed +WHERE + distributed.id = local.id; +SELECT COUNT(DISTINCT title) FROM local; + count +--------------------------------------------------------------------- + 1 +(1 row) + +ROLLBACK; +BEGIN; +SELECT COUNT(DISTINCT name) FROM distributed; + count +--------------------------------------------------------------------- + 101 +(1 row) + +UPDATE + distributed +SET + name = 'test' +FROM + local +WHERE + distributed.id = local.id; +SELECT COUNT(DISTINCT name) FROM distributed; + count +--------------------------------------------------------------------- + 1 +(1 row) + +ROLLBACK; +BEGIN; +SELECT COUNT(DISTINCT name) FROM distributed; + count +--------------------------------------------------------------------- + 101 +(1 row) + +UPDATE + distributed +SET + name = 'test' +FROM + local +WHERE + distributed.id = local.id; +SELECT COUNT(DISTINCT name) FROM distributed; + count +--------------------------------------------------------------------- + 1 +(1 row) + +ROLLBACK; +BEGIN; +SELECT COUNT(DISTINCT name) FROM distributed; + count +--------------------------------------------------------------------- + 101 +(1 row) + +UPDATE + distributed +SET + name = 'test' +FROM + local +WHERE + distributed.id = local.id; +SELECT COUNT(DISTINCT name) FROM distributed; + count +--------------------------------------------------------------------- + 1 +(1 row) + +ROLLBACK; diff --git a/src/test/regress/expected/local_dist_join_load.out b/src/test/regress/expected/local_dist_join_load.out new file mode 100644 index 000000000..ce68edc8d --- /dev/null +++ b/src/test/regress/expected/local_dist_join_load.out @@ -0,0 +1,39 @@ +CREATE SCHEMA local_dist_join_mixed; +SET search_path TO local_dist_join_mixed; +CREATE TABLE distributed (key int, id bigserial PRIMARY KEY, + name text, + created_at timestamptz DEFAULT now(), b int); +CREATE TABLE reference (a int, id bigserial PRIMARY KEY, + title text, b int); +CREATE TABLE local (key int, id bigserial PRIMARY KEY, key2 int, + title text, key3 int); +-- drop columns so that we test the correctness in different scenarios. +ALTER TABLE local DROP column key; +ALTER TABLE local DROP column key2; +ALTER TABLE local DROP column key3; +ALTER TABLE distributed DROP column key; +ALTER TABLE reference DROP column b; +-- these above restrictions brought us to the following schema +SELECT create_reference_table('reference'); + create_reference_table +--------------------------------------------------------------------- + +(1 row) + +SELECT create_distributed_table('distributed', 'id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT citus_add_local_table_to_metadata('local'); + citus_add_local_table_to_metadata +--------------------------------------------------------------------- + +(1 row) + +ALTER TABLE distributed DROP column b; +ALTER TABLE reference DROP column a; +INSERT INTO distributed SELECT i, i::text, now() FROM generate_series(0,100)i; +INSERT INTO reference SELECT i, i::text FROM generate_series(0,100)i; +INSERT INTO local SELECT i, i::text FROM generate_series(0,100)i; diff --git a/src/test/regress/sql/local_dist_join.sql b/src/test/regress/sql/local_dist_join.sql new file mode 100644 index 000000000..d93662ea5 --- /dev/null +++ b/src/test/regress/sql/local_dist_join.sql @@ -0,0 +1,355 @@ +SET search_path TO local_dist_join_mixed; + +-- very simple 1-1 Joins +SELECT count(*) FROM distributed JOIN local USING (id); +SELECT count(*) FROM distributed JOIN local ON (name = title); +SELECT count(*) FROM distributed d1 JOIN local ON (name = d1.id::text); +SELECT count(*) FROM distributed d1 JOIN local ON (name = d1.id::text AND d1.id < local.title::int); +SELECT count(*) FROM distributed d1 JOIN local ON (name = d1.id::text AND d1.id < local.title::int) WHERE d1.id = 1; +SELECT count(*) FROM distributed JOIN local USING (id) WHERE false; +SELECT count(*) FROM distributed d1 JOIN local ON (name = d1.id::text AND d1.id < local.title::int) WHERE d1.id = 1 OR True; +SELECT count(*) FROM distributed d1 JOIN local ON (name::int + local.id > d1.id AND d1.id < local.title::int) WHERE d1.id = 1; +SELECT count(*) FROM distributed JOIN local ON (hashtext(name) = hashtext(title)); +SELECT hashtext(local.id::text) FROM distributed JOIN local ON (hashtext(name) = hashtext(title)) ORDER BY 1 LIMIT 4; +SELECT '' as "xxx", local.*, 'xxx' as "test" FROM distributed JOIN local ON (hashtext(name) = hashtext(title)) ORDER BY 1,2,3 LIMIT 4; +SELECT local.title, count(*) FROM distributed JOIN local USING (id) GROUP BY 1 ORDER BY 1, 2 DESC LIMIT 5; +SELECT distributed.id as id1, local.id as id2 FROM distributed JOIN local USING(id) ORDER BY distributed.id + local.id LIMIT 5; +SELECT distributed.id as id1, local.id as id2, count(*) FROM distributed JOIN local USING(id) GROUP BY distributed.id, local.id ORDER BY 1,2 LIMIT 5; + + +-- basic subqueries that cannot be pulled up +SELECT count(*) FROM (SELECT *, random() FROM distributed) as d1 JOIN local USING (id); +SELECT count(*) FROM (SELECT *, random() FROM distributed) as d1 JOIN local ON (name = title); +SELECT count(*) FROM (SELECT *, random() FROM distributed) as d1 JOIN local ON (name = d1.id::text); +SELECT count(*) FROM (SELECT *, random() FROM distributed) as d1 JOIN local ON (name = d1.id::text AND d1.id < local.title::int); +SELECT count(*) FROM (SELECT *, random() FROM distributed) as d1 JOIN local ON (name = d1.id::text AND d1.id < local.title::int) WHERE d1.id = 1; +SELECT count(*) FROM (SELECT *, random() FROM distributed) as d1 JOIN local ON (name = d1.id::text AND d1.id < local.title::int) WHERE d1.id = 1 AND false; +SELECT count(*) FROM (SELECT *, random() FROM distributed) as d1 JOIN local ON (name = d1.id::text AND d1.id < local.title::int) WHERE d1.id = 1 OR true; + +-- pull up subqueries as they are pretty simple, local table should be recursively planned +SELECT count(*) FROM (SELECT * FROM distributed) as d1 JOIN local USING (id); +SELECT count(*) FROM (SELECT * FROM distributed) as d1 JOIN local ON (name = title); +SELECT count(*) FROM (SELECT * FROM distributed) as d1 JOIN local ON (name = d1.id::text); +SELECT count(*) FROM (SELECT * FROM distributed) as d1 JOIN local ON (name = d1.id::text AND d1.id < local.title::int); +SELECT count(*) FROM (SELECT * FROM distributed) as d1 JOIN local ON (name = d1.id::text AND d1.id < local.title::int) WHERE d1.id = 1; +SELECT count(*) FROM (SELECT * FROM distributed) as d1 JOIN local ON (name = d1.id::text AND d1.id < local.title::int) WHERE d1.id = 1 AND false; +SELECT count(*) FROM (SELECT * FROM distributed) as d1 JOIN local ON (name = d1.id::text AND d1.id < local.title::int) WHERE d1.id = 1 OR true; +SELECT count(*) FROM (SELECT * FROM distributed WHERE id = 2) as d1 JOIN local ON (name = d1.id::text AND d1.id < local.title::int) WHERE d1.id = 1; +SELECT count(*) FROM (SELECT * FROM distributed WHERE false) as d1 JOIN local ON (name = d1.id::text AND d1.id < local.title::int) WHERE d1.id = 1; + +-- final queries are pushdown queries +SELECT sum(d1.id + local.id) FROM distributed d1 JOIN local USING (id); +SELECT sum(d1.id + local.id) OVER (PARTITION BY d1.id) FROM distributed d1 JOIN local USING (id) ORDER BY 1 DESC LIMIT 4; +SELECT count(*) FROM distributed d1 JOIN local USING (id) LEFT JOIN distributed d2 USING (id) ORDER BY 1 DESC LIMIT 4; +SELECT count(DISTINCT d1.name::int * local.id) FROM distributed d1 JOIN local USING (id); + +-- final queries are router queries +SELECT sum(d1.id + local.id) FROM distributed d1 JOIN local USING (id) WHERE d1.id = 1; +SELECT sum(d1.id + local.id) OVER (PARTITION BY d1.id) FROM distributed d1 JOIN local USING (id) WHERE d1.id = 1 ORDER BY 1 DESC LIMIT 4; +SELECT count(*) FROM distributed d1 JOIN local USING (id) LEFT JOIN distributed d2 USING (id) WHERE d2.id = 1 ORDER BY 1 DESC LIMIT 4; + +-- final queries are pull to coordinator queries +SELECT sum(d1.id + local.id) OVER (PARTITION BY d1.id + local.id) FROM distributed d1 JOIN local USING (id) ORDER BY 1 DESC LIMIT 4; + + + +-- nested subqueries +SELECT + count(*) +FROM + (SELECT * FROM (SELECT * FROM distributed) as foo) as bar + JOIN + local + USING(id); + + +SELECT + count(*) +FROM + (SELECT *, random() FROM (SELECT *, random() FROM distributed) as foo) as bar + JOIN + local + USING(id); + +SELECT + count(*) +FROM + (SELECT *, random() FROM (SELECT *, random() FROM distributed) as foo) as bar + JOIN + local + USING(id); +SELECT + count(*) +FROM + (SELECT *, random() FROM (SELECT *, random() FROM distributed) as foo) as bar + JOIN + (SELECT *, random() FROM (SELECT *,random() FROM local) as foo2) as bar2 + USING(id); + +-- TODO: Unnecessary recursive planning for local +SELECT + count(*) +FROM + (SELECT *, random() FROM (SELECT *, random() FROM distributed LIMIT 1) as foo) as bar + JOIN + (SELECT *, random() FROM (SELECT *,random() FROM local) as foo2) as bar2 + USING(id); + +-- subqueries in WHERE clause +-- is not colocated, and the JOIN inside as well. +-- so should be recursively planned twice +SELECT + count(*) +FROM + distributed +WHERE + id > (SELECT + count(*) + FROM + (SELECT *, random() FROM (SELECT *, random() FROM distributed) as foo) as bar + JOIN + (SELECT *, random() FROM (SELECT *,random() FROM local) as foo2) as bar2 + USING(id) + ); + +-- two distributed tables are co-located and JOINed on distribution +-- key, so should be fine to pushdown +SELECT + count(*) +FROM + distributed d_upper +WHERE + (SELECT + bar.id + FROM + (SELECT *, random() FROM (SELECT *, random() FROM distributed WHERE distributed.id = d_upper.id) as foo) as bar + JOIN + (SELECT *, random() FROM (SELECT *,random() FROM local) as foo2) as bar2 + USING(id) + ) IS NOT NULL; + +SELECT + count(*) +FROM + distributed d_upper +WHERE + (SELECT + bar.id + FROM + (SELECT *, random() FROM (SELECT *, random() FROM distributed WHERE distributed.id = d_upper.id) as foo) as bar + JOIN + local as foo + USING(id) + ) IS NOT NULL; + +SELECT + count(*) +FROM + distributed d_upper +WHERE d_upper.id > + (SELECT + bar.id + FROM + (SELECT *, random() FROM (SELECT *, random() FROM distributed WHERE distributed.id = d_upper.id) as foo) as bar + JOIN + local as foo + USING(id) + ); + + + +-- subqueries in the target list + +-- router, should work +select (SELECT local.id) FROM local, distributed WHERE distributed.id = 1 LIMIT 1; + +-- set operations + +SELECT local.* FROM distributed JOIN local USING (id) + EXCEPT +SELECT local.* FROM distributed JOIN local USING (id); + +SELECT distributed.* FROM distributed JOIN local USING (id) + EXCEPT +SELECT distributed.* FROM distributed JOIN local USING (id); + + +SELECT count(*) FROM +( + (SELECT * FROM (SELECT * FROM local) as f JOIN distributed USING (id)) + UNION ALL + (SELECT * FROM (SELECT * FROM local) as f2 JOIN distributed USING (id)) +) bar; + +SELECT count(*) FROM +( + (SELECT * FROM (SELECT distributed.* FROM local JOIN distributed USING (id)) as fo) + UNION ALL + (SELECT * FROM (SELECT distributed.* FROM local JOIN distributed USING (id)) as ba) +) bar; + +select count(DISTINCT id) +FROM +( + (SELECT * FROM (SELECT distributed.* FROM local JOIN distributed USING (id)) as fo) + UNION ALL + (SELECT * FROM (SELECT distributed.* FROM local JOIN distributed USING (id)) as ba) +) bar; + +-- lateral joins + +SELECT COUNT(*) FROM local JOIN LATERAL (SELECT * FROM distributed WHERE local.id = distributed.id) as foo ON (true); +SELECT COUNT(*) FROM local JOIN LATERAL (SELECT * FROM distributed WHERE local.id > distributed.id) as foo ON (true); + + + +SELECT count(*) FROM distributed CROSS JOIN local; +SELECT count(*) FROM distributed CROSS JOIN local WHERE distributed.id = 1; + +-- w count(*) it works fine as PG ignores the inner tables +SELECT count(*) FROM distributed LEFT JOIN local USING (id); +SELECT count(*) FROM local LEFT JOIN distributed USING (id); + +SELECT id, name FROM distributed LEFT JOIN local USING (id) ORDER BY 1 LIMIT 1; + + SELECT + foo1.id + FROM + (SELECT local.id, local.title FROM local, distributed WHERE local.id = distributed.id ) as foo9, + (SELECT local.id, local.title FROM local, distributed WHERE local.id = distributed.id ) as foo8, + (SELECT local.id, local.title FROM local, distributed WHERE local.id = distributed.id ) as foo7, + (SELECT local.id, local.title FROM local, distributed WHERE local.id = distributed.id ) as foo6, + (SELECT local.id, local.title FROM local, distributed WHERE local.id = distributed.id ) as foo5, + (SELECT local.id, local.title FROM local, distributed WHERE local.id = distributed.id ) as foo4, + (SELECT local.id, local.title FROM local, distributed WHERE local.id = distributed.id ) as foo3, + (SELECT local.id, local.title FROM local, distributed WHERE local.id = distributed.id ) as foo2, + (SELECT local.id, local.title FROM local, distributed WHERE local.id = distributed.id ) as foo10, + (SELECT local.id, local.title FROM local, distributed WHERE local.id = distributed.id ) as foo1 + WHERE + foo1.id = foo9.id AND + foo1.id = foo8.id AND + foo1.id = foo7.id AND + foo1.id = foo6.id AND + foo1.id = foo5.id AND + foo1.id = foo4.id AND + foo1.id = foo3.id AND + foo1.id = foo2.id AND + foo1.id = foo10.id AND + foo1.id = foo1.id +ORDER BY 1; + +SELECT + foo1.id +FROM + (SELECT local.id FROM distributed, local WHERE local.id = distributed.id ) as foo1, + (SELECT local.id FROM distributed, local WHERE local.id = distributed.id ) as foo2, + (SELECT local.id FROM distributed, local WHERE local.id = distributed.id ) as foo3, + (SELECT local.id FROM distributed, local WHERE local.id = distributed.id ) as foo4, + (SELECT local.id FROM distributed, local WHERE local.id = distributed.id ) as foo5 +WHERE + foo1.id = foo4.id AND + foo1.id = foo2.id AND + foo1.id = foo3.id AND + foo1.id = foo4.id AND + foo1.id = foo5.id +ORDER BY 1; + +SELECT + foo1.id +FROM + (SELECT local.id FROM distributed, local WHERE local.id = distributed.id AND distributed.id = 1) as foo1, + (SELECT local.id FROM distributed, local WHERE local.id = distributed.id AND distributed.id = 2) as foo2, + (SELECT local.id FROM distributed, local WHERE local.id = distributed.id AND distributed.id = 3) as foo3, + (SELECT local.id FROM distributed, local WHERE local.id = distributed.id AND distributed.id = 4) as foo4, + (SELECT local.id FROM distributed, local WHERE local.id = distributed.id AND distributed.id = 5) as foo5 +WHERE + foo1.id = foo4.id AND + foo1.id = foo2.id AND + foo1.id = foo3.id AND + foo1.id = foo4.id AND + foo1.id = foo5.id +ORDER BY 1; + +--https://github.com/citusdata/citus/issues/5384 +-- SELECT +-- count(*) +-- FROM +-- distributed +-- JOIN LATERAL +-- (SELECT +-- * +-- FROM +-- local +-- JOIN +-- distributed d2 +-- ON(true) +-- WHERE local.id = distributed.id AND d2.id = local.id) as foo +-- ON (true); + +SELECT local.title, local.title FROM local JOIN distributed USING(id) ORDER BY 1,2 LIMIt 1; +SELECT NULL FROM local JOIN distributed USING(id) ORDER BY 1 LIMIt 1; +SELECT distributed.name, distributed.name, local.title, local.title FROM local JOIN distributed USING(id) ORDER BY 1,2,3,4 LIMIT 1; +SELECT + COUNT(*) +FROM + local +JOIN + distributed +USING + (id) +JOIN + (SELECT id, NULL, NULL FROM distributed) foo +USING + (id); + +BEGIN; +SELECT COUNT(DISTINCT title) FROM local; +UPDATE + local +SET + title = 'test' +FROM + distributed +WHERE + distributed.id = local.id; +SELECT COUNT(DISTINCT title) FROM local; +ROLLBACK; + +BEGIN; +SELECT COUNT(DISTINCT name) FROM distributed; +UPDATE + distributed +SET + name = 'test' +FROM + local +WHERE + distributed.id = local.id; +SELECT COUNT(DISTINCT name) FROM distributed; +ROLLBACK; + +BEGIN; +SELECT COUNT(DISTINCT name) FROM distributed; +UPDATE + distributed +SET + name = 'test' +FROM + local +WHERE + distributed.id = local.id; +SELECT COUNT(DISTINCT name) FROM distributed; +ROLLBACK; + +BEGIN; +SELECT COUNT(DISTINCT name) FROM distributed; +UPDATE + distributed +SET + name = 'test' +FROM + local +WHERE + distributed.id = local.id; +SELECT COUNT(DISTINCT name) FROM distributed; +ROLLBACK; diff --git a/src/test/regress/sql/local_dist_join_load.sql b/src/test/regress/sql/local_dist_join_load.sql new file mode 100644 index 000000000..53cc668f8 --- /dev/null +++ b/src/test/regress/sql/local_dist_join_load.sql @@ -0,0 +1,33 @@ +CREATE SCHEMA local_dist_join_mixed; +SET search_path TO local_dist_join_mixed; + + + +CREATE TABLE distributed (key int, id bigserial PRIMARY KEY, + name text, + created_at timestamptz DEFAULT now(), b int); +CREATE TABLE reference (a int, id bigserial PRIMARY KEY, + title text, b int); + +CREATE TABLE local (key int, id bigserial PRIMARY KEY, key2 int, + title text, key3 int); + +-- drop columns so that we test the correctness in different scenarios. +ALTER TABLE local DROP column key; +ALTER TABLE local DROP column key2; +ALTER TABLE local DROP column key3; + +ALTER TABLE distributed DROP column key; +ALTER TABLE reference DROP column b; + +-- these above restrictions brought us to the following schema +SELECT create_reference_table('reference'); +SELECT create_distributed_table('distributed', 'id'); +SELECT citus_add_local_table_to_metadata('local'); + +ALTER TABLE distributed DROP column b; +ALTER TABLE reference DROP column a; + +INSERT INTO distributed SELECT i, i::text, now() FROM generate_series(0,100)i; +INSERT INTO reference SELECT i, i::text FROM generate_series(0,100)i; +INSERT INTO local SELECT i, i::text FROM generate_series(0,100)i; diff --git a/src/test/regress/sql_schedule b/src/test/regress/sql_schedule index 7f29ea93b..ebc6ed11b 100644 --- a/src/test/regress/sql_schedule +++ b/src/test/regress/sql_schedule @@ -5,3 +5,4 @@ test: ch_benchmarks_1 ch_benchmarks_2 ch_benchmarks_3 test: ch_benchmarks_4 ch_benchmarks_5 ch_benchmarks_6 test: intermediate_result_pruning_queries_1 intermediate_result_pruning_queries_2 test: dropped_columns_1 distributed_planning +test: local_dist_join