From 939d3c955bc5de2a8f688cd533533b64f4a94356 Mon Sep 17 00:00:00 2001 From: Hadi Moshayedi Date: Wed, 11 Dec 2019 16:06:48 -0800 Subject: [PATCH] Don't plan function joins locally --- .../distributed/executor/multi_executor.c | 21 ++++++++++++ .../distributed/planner/distributed_planner.c | 14 ++++++++ ...licate_reference_tables_to_coordinator.out | 33 ++++++++++++++++++- ...licate_reference_tables_to_coordinator.sql | 25 +++++++++++++- 4 files changed, 91 insertions(+), 2 deletions(-) diff --git a/src/backend/distributed/executor/multi_executor.c b/src/backend/distributed/executor/multi_executor.c index 691abf1ff..436ce34be 100644 --- a/src/backend/distributed/executor/multi_executor.c +++ b/src/backend/distributed/executor/multi_executor.c @@ -14,6 +14,7 @@ #include "access/xact.h" #include "catalog/dependency.h" +#include "catalog/pg_class.h" #include "catalog/namespace.h" #include "distributed/citus_custom_scan.h" #include "distributed/commands/multi_copy.h" @@ -629,11 +630,31 @@ IsLocalReferenceTableJoinPlan(PlannedStmt *plan) RangeTblEntry *rangeTableEntry = (RangeTblEntry *) lfirst(rangeTableCell); bool onlySearchPath = false; + /* + * Planner's IsLocalReferenceTableJoin() doesn't allow planning functions + * in FROM clause locally. Early exit. We cannot use Assert() here since + * all non-Citus plans might pass through these checks. + */ + if (rangeTableEntry->rtekind == RTE_FUNCTION) + { + return false; + } + if (rangeTableEntry->rtekind != RTE_RELATION) { continue; } + /* + * Planner's IsLocalReferenceTableJoin() doesn't allow planning reference + * table and view join locally. Early exit. We cannot use Assert() here + * since all non-Citus plans might pass through these checks. + */ + if (rangeTableEntry->relkind == RELKIND_VIEW) + { + return false; + } + if (RelationIsAKnownShard(rangeTableEntry->relid, onlySearchPath)) { /* diff --git a/src/backend/distributed/planner/distributed_planner.c b/src/backend/distributed/planner/distributed_planner.c index 47f75da80..a767dac59 100644 --- a/src/backend/distributed/planner/distributed_planner.c +++ b/src/backend/distributed/planner/distributed_planner.c @@ -1955,6 +1955,20 @@ IsLocalReferenceTableJoin(Query *parse, List *rangeTableList) { RangeTblEntry *rangeTableEntry = (RangeTblEntry *) lfirst(rangeTableCell); + /* + * Don't plan joins involving functions locally since we are not sure if + * they do distributed accesses or not, and defaulting to local planning + * might break transactional semantics. + * + * For example, Access to the reference table in the function might go + * over a connection, but access to the same reference table outside + * the function will go over the current backend. The snapshot for the + * connection in the function is taken after the statement snapshot, + * so they can see two different views of data. + * + * Looking at gram.y, RTE_TABLEFUNC is used only for XMLTABLE() which + * is okay to be planned locally, so allowing that. + */ if (rangeTableEntry->rtekind == RTE_FUNCTION) { return false; diff --git a/src/test/regress/expected/replicate_reference_tables_to_coordinator.out b/src/test/regress/expected/replicate_reference_tables_to_coordinator.out index 287475d58..37fba7cf4 100644 --- a/src/test/regress/expected/replicate_reference_tables_to_coordinator.out +++ b/src/test/regress/expected/replicate_reference_tables_to_coordinator.out @@ -275,7 +275,7 @@ $Q$); DROP VIEW numbers_v, local_table_v; -- -- Joins between reference tables and materialized views are allowed to --- be planned locally +-- be planned locally. -- CREATE MATERIALIZED VIEW numbers_v AS SELECT * FROM numbers WHERE a BETWEEN 1 AND 10; LOG: executing the command locally: SELECT a FROM replicate_ref_to_coordinator.numbers_8000001 numbers WHERE ((a OPERATOR(pg_catalog.>=) 1) AND (a OPERATOR(pg_catalog.<=) 10)) @@ -289,6 +289,37 @@ $Q$); f (1 row) +BEGIN; +SELECT * FROM squares JOIN numbers_v ON squares.a = numbers_v.a; +ERROR: cannot join local tables and reference tables in a transaction block, udf block, or distributed CTE subquery +END; +-- +-- Joins between reference tables, local tables, and function calls shouldn't +-- be planned locally. +-- +SELECT count(*) +FROM local_table a, numbers b, generate_series(1, 10) c +WHERE a.a = b.a AND a.a = c; +ERROR: relation local_table is not distributed +-- but it should be okay if the function call is not a data source +SELECT public.plan_is_distributed($Q$ +EXPLAIN (COSTS FALSE) +SELECT abs(a.a) FROM local_table a, numbers b WHERE a.a = b.a; +$Q$); + plan_is_distributed +--------------------- + f +(1 row) + +SELECT public.plan_is_distributed($Q$ +EXPLAIN (COSTS FALSE) +SELECT a.a FROM local_table a, numbers b WHERE a.a = b.a ORDER BY abs(a.a); +$Q$); + plan_is_distributed +--------------------- + f +(1 row) + -- verify that we can drop columns from reference tables replicated to the coordinator -- see https://github.com/citusdata/citus/issues/3279 ALTER TABLE squares DROP COLUMN b; diff --git a/src/test/regress/sql/replicate_reference_tables_to_coordinator.sql b/src/test/regress/sql/replicate_reference_tables_to_coordinator.sql index d509f9bd1..23bcf469f 100644 --- a/src/test/regress/sql/replicate_reference_tables_to_coordinator.sql +++ b/src/test/regress/sql/replicate_reference_tables_to_coordinator.sql @@ -159,7 +159,7 @@ DROP VIEW numbers_v, local_table_v; -- -- Joins between reference tables and materialized views are allowed to --- be planned locally +-- be planned locally. -- CREATE MATERIALIZED VIEW numbers_v AS SELECT * FROM numbers WHERE a BETWEEN 1 AND 10; REFRESH MATERIALIZED VIEW numbers_v; @@ -168,6 +168,29 @@ EXPLAIN (COSTS FALSE) SELECT * FROM squares JOIN numbers_v ON squares.a = numbers_v.a; $Q$); +BEGIN; +SELECT * FROM squares JOIN numbers_v ON squares.a = numbers_v.a; +END; + +-- +-- Joins between reference tables, local tables, and function calls shouldn't +-- be planned locally. +-- +SELECT count(*) +FROM local_table a, numbers b, generate_series(1, 10) c +WHERE a.a = b.a AND a.a = c; + +-- but it should be okay if the function call is not a data source +SELECT public.plan_is_distributed($Q$ +EXPLAIN (COSTS FALSE) +SELECT abs(a.a) FROM local_table a, numbers b WHERE a.a = b.a; +$Q$); + +SELECT public.plan_is_distributed($Q$ +EXPLAIN (COSTS FALSE) +SELECT a.a FROM local_table a, numbers b WHERE a.a = b.a ORDER BY abs(a.a); +$Q$); + -- verify that we can drop columns from reference tables replicated to the coordinator -- see https://github.com/citusdata/citus/issues/3279 ALTER TABLE squares DROP COLUMN b;