diff --git a/src/backend/distributed/planner/distributed_planner.c b/src/backend/distributed/planner/distributed_planner.c index 2e825ccb4..47f75da80 100644 --- a/src/backend/distributed/planner/distributed_planner.c +++ b/src/backend/distributed/planner/distributed_planner.c @@ -1965,6 +1965,15 @@ IsLocalReferenceTableJoin(Query *parse, List *rangeTableList) continue; } + /* + * We only allow local join for the relation kinds for which we can + * determine deterministcly that access to hem are local or distributed. + * For this reason, we don't allow non-materialized views. + */ + if (rangeTableEntry->relkind == RELKIND_VIEW) + { + return false; + } if (!IsDistributedTable(rangeTableEntry->relid)) { diff --git a/src/test/regress/expected/multi_test_helpers.out b/src/test/regress/expected/multi_test_helpers.out index 33651f00a..b1a5f5637 100644 --- a/src/test/regress/expected/multi_test_helpers.out +++ b/src/test/regress/expected/multi_test_helpers.out @@ -1,13 +1,13 @@ --- File to create functions and helpers needed for subsequent tests +-- File to CREATE FUNCTIONs and helpers needed for subsequent tests -- create a helper function to create objects on each node -CREATE FUNCTION run_command_on_master_and_workers(p_sql text) +CREATE OR REPLACE FUNCTION run_command_on_master_and_workers(p_sql text) RETURNS void LANGUAGE plpgsql AS $$ BEGIN EXECUTE p_sql; PERFORM run_command_on_workers(p_sql); END;$$; -- Create a function to make sure that queries returning the same result -CREATE FUNCTION raise_failed_execution(query text) RETURNS void AS $$ +CREATE OR REPLACE FUNCTION raise_failed_execution(query text) RETURNS void AS $$ BEGIN EXECUTE query; EXCEPTION WHEN OTHERS THEN @@ -29,8 +29,22 @@ BEGIN END LOOP; RETURN; END; $$ language plpgsql; +-- Is a distributed plan? +CREATE OR REPLACE FUNCTION plan_is_distributed(explain_commmand text) +RETURNS BOOLEAN AS $$ +DECLARE + query_plan TEXT; +BEGIN + FOR query_plan IN execute explain_commmand LOOP + IF query_plan LIKE '%Task Count:%' + THEN + RETURN TRUE; + END IF; + END LOOP; + RETURN FALSE; +END; $$ language plpgsql; -- helper function to quickly run SQL on the whole cluster -CREATE FUNCTION run_command_on_coordinator_and_workers(p_sql text) +CREATE OR REPLACE FUNCTION run_command_on_coordinator_and_workers(p_sql text) RETURNS void LANGUAGE plpgsql AS $$ BEGIN EXECUTE p_sql; @@ -38,7 +52,7 @@ BEGIN END;$$; -- 1. Marks the given procedure as colocated with the given table. -- 2. Marks the argument index with which we route the procedure. -CREATE FUNCTION colocate_proc_with_table(procname text, tablerelid regclass, argument_index int) +CREATE OR REPLACE FUNCTION colocate_proc_with_table(procname text, tablerelid regclass, argument_index int) RETURNS void LANGUAGE plpgsql AS $$ BEGIN update citus.pg_dist_object @@ -66,7 +80,7 @@ BEGIN RETURN true; END; $func$; -CREATE FUNCTION wait_until_metadata_sync(timeout INTEGER DEFAULT 15000) +CREATE OR REPLACE FUNCTION wait_until_metadata_sync(timeout INTEGER DEFAULT 15000) RETURNS void LANGUAGE C STRICT AS 'citus'; @@ -80,7 +94,7 @@ SELECT pg_reload_conf(); (1 row) -- Verifies pg_dist_node and pg_dist_palcement in the given worker matches the ones in coordinator -CREATE FUNCTION verify_metadata(hostname TEXT, port INTEGER, master_port INTEGER DEFAULT 57636) +CREATE OR REPLACE FUNCTION verify_metadata(hostname TEXT, port INTEGER, master_port INTEGER DEFAULT 57636) RETURNS BOOLEAN LANGUAGE sql AS $$ diff --git a/src/test/regress/expected/replicate_reference_tables_to_coordinator.out b/src/test/regress/expected/replicate_reference_tables_to_coordinator.out index 9ee24b477..287475d58 100644 --- a/src/test/regress/expected/replicate_reference_tables_to_coordinator.out +++ b/src/test/regress/expected/replicate_reference_tables_to_coordinator.out @@ -244,6 +244,51 @@ HINT: Consider using an equality filter on the distributed table's partition co SELECT local_table.a, numbers.a FROM local_table NATURAL JOIN numbers FOR UPDATE; ERROR: could not run distributed query with FOR UPDATE/SHARE commands HINT: Consider using an equality filter on the distributed table's partition column. +-- +-- Joins between reference tables and views shouldn't be planned locally. +-- +CREATE VIEW numbers_v AS SELECT * FROM numbers WHERE a=1; +SELECT public.coordinator_plan($Q$ +EXPLAIN (COSTS FALSE) + SELECT * FROM squares JOIN numbers_v ON squares.a = numbers_v.a; +$Q$); + coordinator_plan +------------------------------ + Custom Scan (Citus Adaptive) + Task Count: 1 +(2 rows) + +CREATE VIEW local_table_v AS SELECT * FROM local_table WHERE a BETWEEN 1 AND 10; +SELECT public.coordinator_plan($Q$ +EXPLAIN (COSTS FALSE) + SELECT * FROM squares JOIN local_table_v ON squares.a = local_table_v.a; +$Q$); + coordinator_plan +------------------------------------------------ + Custom Scan (Citus Adaptive) + -> Distributed Subplan 24_1 + -> Seq Scan on local_table + Filter: ((a >= 1) AND (a <= 10)) + Task Count: 1 +(5 rows) + +DROP VIEW numbers_v, local_table_v; +-- +-- Joins between reference tables and materialized views are allowed to +-- be planned locally +-- +CREATE MATERIALIZED VIEW numbers_v AS SELECT * FROM numbers WHERE a BETWEEN 1 AND 10; +LOG: executing the command locally: SELECT a FROM replicate_ref_to_coordinator.numbers_8000001 numbers WHERE ((a OPERATOR(pg_catalog.>=) 1) AND (a OPERATOR(pg_catalog.<=) 10)) +REFRESH MATERIALIZED VIEW numbers_v; +SELECT public.plan_is_distributed($Q$ +EXPLAIN (COSTS FALSE) + SELECT * FROM squares JOIN numbers_v ON squares.a = numbers_v.a; +$Q$); + plan_is_distributed +--------------------- + f +(1 row) + -- verify that we can drop columns from reference tables replicated to the coordinator -- see https://github.com/citusdata/citus/issues/3279 ALTER TABLE squares DROP COLUMN b; diff --git a/src/test/regress/sql/multi_test_helpers.sql b/src/test/regress/sql/multi_test_helpers.sql index 160bcb7f0..d3cb7b07e 100644 --- a/src/test/regress/sql/multi_test_helpers.sql +++ b/src/test/regress/sql/multi_test_helpers.sql @@ -1,7 +1,7 @@ --- File to create functions and helpers needed for subsequent tests +-- File to CREATE FUNCTIONs and helpers needed for subsequent tests -- create a helper function to create objects on each node -CREATE FUNCTION run_command_on_master_and_workers(p_sql text) +CREATE OR REPLACE FUNCTION run_command_on_master_and_workers(p_sql text) RETURNS void LANGUAGE plpgsql AS $$ BEGIN EXECUTE p_sql; @@ -9,7 +9,7 @@ BEGIN END;$$; -- Create a function to make sure that queries returning the same result -CREATE FUNCTION raise_failed_execution(query text) RETURNS void AS $$ +CREATE OR REPLACE FUNCTION raise_failed_execution(query text) RETURNS void AS $$ BEGIN EXECUTE query; EXCEPTION WHEN OTHERS THEN @@ -33,8 +33,23 @@ BEGIN RETURN; END; $$ language plpgsql; +-- Is a distributed plan? +CREATE OR REPLACE FUNCTION plan_is_distributed(explain_commmand text) +RETURNS BOOLEAN AS $$ +DECLARE + query_plan TEXT; +BEGIN + FOR query_plan IN execute explain_commmand LOOP + IF query_plan LIKE '%Task Count:%' + THEN + RETURN TRUE; + END IF; + END LOOP; + RETURN FALSE; +END; $$ language plpgsql; + -- helper function to quickly run SQL on the whole cluster -CREATE FUNCTION run_command_on_coordinator_and_workers(p_sql text) +CREATE OR REPLACE FUNCTION run_command_on_coordinator_and_workers(p_sql text) RETURNS void LANGUAGE plpgsql AS $$ BEGIN EXECUTE p_sql; @@ -43,7 +58,7 @@ END;$$; -- 1. Marks the given procedure as colocated with the given table. -- 2. Marks the argument index with which we route the procedure. -CREATE FUNCTION colocate_proc_with_table(procname text, tablerelid regclass, argument_index int) +CREATE OR REPLACE FUNCTION colocate_proc_with_table(procname text, tablerelid regclass, argument_index int) RETURNS void LANGUAGE plpgsql AS $$ BEGIN update citus.pg_dist_object @@ -73,7 +88,7 @@ BEGIN END; $func$; -CREATE FUNCTION wait_until_metadata_sync(timeout INTEGER DEFAULT 15000) +CREATE OR REPLACE FUNCTION wait_until_metadata_sync(timeout INTEGER DEFAULT 15000) RETURNS void LANGUAGE C STRICT AS 'citus'; @@ -84,7 +99,7 @@ ALTER SYSTEM SET citus.metadata_sync_retry_interval TO 500; SELECT pg_reload_conf(); -- Verifies pg_dist_node and pg_dist_palcement in the given worker matches the ones in coordinator -CREATE FUNCTION verify_metadata(hostname TEXT, port INTEGER, master_port INTEGER DEFAULT 57636) +CREATE OR REPLACE FUNCTION verify_metadata(hostname TEXT, port INTEGER, master_port INTEGER DEFAULT 57636) RETURNS BOOLEAN LANGUAGE sql AS $$ diff --git a/src/test/regress/sql/replicate_reference_tables_to_coordinator.sql b/src/test/regress/sql/replicate_reference_tables_to_coordinator.sql index 0f52cec00..d509f9bd1 100644 --- a/src/test/regress/sql/replicate_reference_tables_to_coordinator.sql +++ b/src/test/regress/sql/replicate_reference_tables_to_coordinator.sql @@ -138,10 +138,41 @@ SELECT a FROM t NATURAL JOIN dist; SELECT local_table.a, numbers.a FROM local_table NATURAL JOIN numbers FOR SHARE; SELECT local_table.a, numbers.a FROM local_table NATURAL JOIN numbers FOR UPDATE; + +-- +-- Joins between reference tables and views shouldn't be planned locally. +-- + +CREATE VIEW numbers_v AS SELECT * FROM numbers WHERE a=1; +SELECT public.coordinator_plan($Q$ +EXPLAIN (COSTS FALSE) + SELECT * FROM squares JOIN numbers_v ON squares.a = numbers_v.a; +$Q$); + +CREATE VIEW local_table_v AS SELECT * FROM local_table WHERE a BETWEEN 1 AND 10; +SELECT public.coordinator_plan($Q$ +EXPLAIN (COSTS FALSE) + SELECT * FROM squares JOIN local_table_v ON squares.a = local_table_v.a; +$Q$); + +DROP VIEW numbers_v, local_table_v; + +-- +-- Joins between reference tables and materialized views are allowed to +-- be planned locally +-- +CREATE MATERIALIZED VIEW numbers_v AS SELECT * FROM numbers WHERE a BETWEEN 1 AND 10; +REFRESH MATERIALIZED VIEW numbers_v; +SELECT public.plan_is_distributed($Q$ +EXPLAIN (COSTS FALSE) + SELECT * FROM squares JOIN numbers_v ON squares.a = numbers_v.a; +$Q$); + -- verify that we can drop columns from reference tables replicated to the coordinator -- see https://github.com/citusdata/citus/issues/3279 ALTER TABLE squares DROP COLUMN b; + -- clean-up SET client_min_messages TO ERROR; DROP SCHEMA replicate_ref_to_coordinator CASCADE;