From 24dcb02bca6b864d9bb4e6ff0c3eec005cc398d4 Mon Sep 17 00:00:00 2001 From: SaitTalhaNisanci Date: Thu, 9 Apr 2020 15:25:54 +0300 Subject: [PATCH] enable local table join with reference table (#3697) * enable local table join with reference table * test different cases with local table and reference join --- .../distributed/executor/multi_executor.c | 126 ------------- .../expected/coordinator_shouldhaveshards.out | 169 +++++++++++++++++- ...licate_reference_tables_to_coordinator.out | 143 +++++++++++---- .../sql/coordinator_shouldhaveshards.sql | 64 +++++++ ...licate_reference_tables_to_coordinator.sql | 60 +++++-- 5 files changed, 391 insertions(+), 171 deletions(-) diff --git a/src/backend/distributed/executor/multi_executor.c b/src/backend/distributed/executor/multi_executor.c index 56f9b30e2..e21a2c935 100644 --- a/src/backend/distributed/executor/multi_executor.c +++ b/src/backend/distributed/executor/multi_executor.c @@ -72,7 +72,6 @@ int ExecutorLevel = 0; /* local function forward declarations */ static Relation StubRelation(TupleDesc tupleDescriptor); static bool AlterTableConstraintCheck(QueryDesc *queryDesc); -static bool IsLocalReferenceTableJoinPlan(PlannedStmt *plan); static List * FindCitusCustomScanStates(PlanState *planState); static bool CitusCustomScanStateWalker(PlanState *planState, List **citusCustomScanStates); @@ -149,24 +148,6 @@ CitusExecutorRun(QueryDesc *queryDesc, InstrStartNode(totalTime); } - if (CitusHasBeenLoaded()) - { - if (IsLocalReferenceTableJoinPlan(queryDesc->plannedstmt) && - IsMultiStatementTransaction()) - { - /* - * Currently we don't support this to avoid problems with tuple - * visibility, locking, etc. For example, change to the reference - * table can go through a MultiConnection, which won't be visible - * to the locally planned queries. - */ - ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("cannot join local tables and reference tables in " - "a transaction block, udf block, or distributed " - "CTE subquery"))); - } - } - /* * Disable execution of ALTER TABLE constraint validation queries. These * constraints will be validated in worker nodes, so running these queries @@ -741,110 +722,3 @@ AlterTableConstraintCheck(QueryDesc *queryDesc) return true; } - - -/* - * IsLocalReferenceTableJoinPlan returns true if the given plan joins local tables - * with reference table shards. - * - * This should be consistent with IsLocalReferenceTableJoin() in distributed_planner.c. - */ -static bool -IsLocalReferenceTableJoinPlan(PlannedStmt *plan) -{ - bool hasReferenceTable = false; - bool hasLocalTable = false; - bool hasReferenceTableReplica = false; - - /* - * We only allow join between reference tables and local tables in the - * coordinator. - */ - if (!IsCoordinator()) - { - return false; - } - - /* - * All groups that have pg_dist_node entries, also have reference - * table replicas. - */ - PrimaryNodeForGroup(GetLocalGroupId(), &hasReferenceTableReplica); - - /* - * If reference table doesn't have replicas on the coordinator, we don't - * allow joins with local tables. - */ - if (!hasReferenceTableReplica) - { - return false; - } - - /* - * No need to check FOR UPDATE/SHARE or modifying subqueries, those have - * already errored out in distributed_planner.c if they contain mix of - * local and distributed tables. - */ - if (plan->commandType != CMD_SELECT) - { - return false; - } - - /* - * plan->rtable contains the flattened RTE lists of the plan tree, which - * includes rtes in subqueries, CTEs, ... - * - * It doesn't contain optimized away table accesses (due to join optimization), - * which is fine for our purpose. - */ - RangeTblEntry *rangeTableEntry = NULL; - foreach_ptr(rangeTableEntry, plan->rtable) - { - bool onlySearchPath = false; - - /* - * Planner's IsLocalReferenceTableJoin() doesn't allow planning functions - * in FROM clause locally. Early exit. We cannot use Assert() here since - * all non-Citus plans might pass through these checks. - */ - if (rangeTableEntry->rtekind == RTE_FUNCTION) - { - return false; - } - - if (rangeTableEntry->rtekind != RTE_RELATION) - { - continue; - } - - /* - * Planner's IsLocalReferenceTableJoin() doesn't allow planning reference - * table and view join locally. Early exit. We cannot use Assert() here - * since all non-Citus plans might pass through these checks. - */ - if (rangeTableEntry->relkind == RELKIND_VIEW) - { - return false; - } - - if (RelationIsAKnownShard(rangeTableEntry->relid, onlySearchPath)) - { - /* - * We don't allow joining non-reference distributed tables, so we - * can skip checking that this is a reference table shard or not. - */ - hasReferenceTable = true; - } - else - { - hasLocalTable = true; - } - - if (hasReferenceTable && hasLocalTable) - { - return true; - } - } - - return false; -} diff --git a/src/test/regress/expected/coordinator_shouldhaveshards.out b/src/test/regress/expected/coordinator_shouldhaveshards.out index 5ce4b80a8..ac0813ef0 100644 --- a/src/test/regress/expected/coordinator_shouldhaveshards.out +++ b/src/test/regress/expected/coordinator_shouldhaveshards.out @@ -207,10 +207,177 @@ ERROR: cannot execute command because a local execution has accessed a placemen DETAIL: Some parallel commands cannot be executed if a previous command has already been executed locally HINT: Try re-running the transaction with "SET LOCAL citus.enable_local_execution TO OFF;" ROLLBACK; +CREATE TABLE ref (a int, b int); +SELECT create_reference_table('ref'); + create_reference_table +--------------------------------------------------------------------- + +(1 row) + +CREATE TABLE local (x int, y int); +BEGIN; +SELECT count(*) FROM test; +NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503000 test WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503003 test WHERE true + count +--------------------------------------------------------------------- + 100 +(1 row) + +SELECT * FROM ref JOIN local ON (a = x); + a | b | x | y +--------------------------------------------------------------------- +(0 rows) + +TRUNCATE ref; +NOTICE: executing the command locally: TRUNCATE TABLE coordinator_shouldhaveshards.ref_xxxxx CASCADE +ROLLBACK; +BEGIN; +SELECT count(*) FROM test; +NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503000 test WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503003 test WHERE true + count +--------------------------------------------------------------------- + 100 +(1 row) + +TRUNCATE ref; +NOTICE: executing the command locally: TRUNCATE TABLE coordinator_shouldhaveshards.ref_xxxxx CASCADE +SELECT * FROM ref JOIN local ON (a = x); + a | b | x | y +--------------------------------------------------------------------- +(0 rows) + +ROLLBACK; +BEGIN; +SELECT count(*) FROM test; +NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503000 test WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503003 test WHERE true + count +--------------------------------------------------------------------- + 100 +(1 row) + +INSERT INTO ref VALUES (1,2); +NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.ref_1503016 (a, b) VALUES (1, 2) +INSERT INTO local VALUES (1,2); +SELECT * FROM ref JOIN local ON (a = x); + a | b | x | y +--------------------------------------------------------------------- + 1 | 2 | 1 | 2 +(1 row) + +ROLLBACK; +set citus.enable_cte_inlining to off; +BEGIN; +SELECT count(*) FROM test; +NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503000 test WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503003 test WHERE true + count +--------------------------------------------------------------------- + 100 +(1 row) + +-- we wont see the modifying cte in this query because we will use local execution and +-- in postgres we wouldn't see this modifying cte, so it is consistent with postgres. +WITH a AS (SELECT count(*) FROM test), b AS (INSERT INTO local VALUES (3,2) RETURNING *), c AS (INSERT INTO ref VALUES (3,2) RETURNING *), d AS (SELECT count(*) FROM ref JOIN local ON (a = x)) SELECT * FROM a, b, c, d ORDER BY x,y,a,b; +NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503000 test WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503003 test WHERE true +NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.ref_1503016 (a, b) VALUES (3, 2) RETURNING a, b +NOTICE: executing the command locally: SELECT a.count, b.x, b.y, c.a, c.b, d.count FROM (SELECT intermediate_result.count FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(count bigint)) a, (SELECT intermediate_result.x, intermediate_result.y FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(x integer, y integer)) b, (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) c, (SELECT intermediate_result.count FROM read_intermediate_result('XXX_4'::text, 'binary'::citus_copy_format) intermediate_result(count bigint)) d ORDER BY b.x, b.y, c.a, c.b + count | x | y | a | b | count +--------------------------------------------------------------------- + 100 | 3 | 2 | 3 | 2 | 0 +(1 row) + +TRUNCATE ref; +NOTICE: executing the command locally: TRUNCATE TABLE coordinator_shouldhaveshards.ref_xxxxx CASCADE +SELECT * FROM ref JOIN local ON (a = x); + a | b | x | y +--------------------------------------------------------------------- +(0 rows) + +-- we wont see the modifying cte in this query because we will use local execution and +-- in postgres we wouldn't see this modifying cte, so it is consistent with postgres. +WITH a AS (SELECT count(*) FROM test), b AS (INSERT INTO local VALUES (3,2) RETURNING *), c AS (INSERT INTO ref VALUES (3,2) RETURNING *), d AS (SELECT count(*) FROM ref JOIN local ON (a = x)) SELECT * FROM a, b, c, d ORDER BY x,y,a,b; +NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503000 test WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503003 test WHERE true +NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.ref_1503016 (a, b) VALUES (3, 2) RETURNING a, b +NOTICE: executing the command locally: SELECT a.count, b.x, b.y, c.a, c.b, d.count FROM (SELECT intermediate_result.count FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(count bigint)) a, (SELECT intermediate_result.x, intermediate_result.y FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(x integer, y integer)) b, (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) c, (SELECT intermediate_result.count FROM read_intermediate_result('XXX_4'::text, 'binary'::citus_copy_format) intermediate_result(count bigint)) d ORDER BY b.x, b.y, c.a, c.b + count | x | y | a | b | count +--------------------------------------------------------------------- + 100 | 3 | 2 | 3 | 2 | 0 +(1 row) + +ROLLBACK; +BEGIN; +-- we wont see the modifying cte in this query because we will use local execution and +-- in postgres we wouldn't see this modifying cte, so it is consistent with postgres. +WITH a AS (SELECT count(*) FROM test), b AS (INSERT INTO local VALUES (3,2) RETURNING *), c AS (INSERT INTO ref VALUES (3,2) RETURNING *), d AS (SELECT count(*) FROM ref JOIN local ON (a = x)) SELECT * FROM a, b, c, d ORDER BY x,y,a,b; +NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503000 test WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503003 test WHERE true +NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.ref_1503016 (a, b) VALUES (3, 2) RETURNING a, b +NOTICE: executing the command locally: SELECT a.count, b.x, b.y, c.a, c.b, d.count FROM (SELECT intermediate_result.count FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(count bigint)) a, (SELECT intermediate_result.x, intermediate_result.y FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(x integer, y integer)) b, (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) c, (SELECT intermediate_result.count FROM read_intermediate_result('XXX_4'::text, 'binary'::citus_copy_format) intermediate_result(count bigint)) d ORDER BY b.x, b.y, c.a, c.b + count | x | y | a | b | count +--------------------------------------------------------------------- + 100 | 3 | 2 | 3 | 2 | 0 +(1 row) + +ROLLBACK; +BEGIN; +-- we wont see the modifying cte in this query because we will use local execution and +-- in postgres we wouldn't see this modifying cte, so it is consistent with postgres. +WITH a AS (SELECT count(*) FROM test), b AS (INSERT INTO local VALUES (3,2) RETURNING *), c AS (INSERT INTO ref SELECT *,* FROM generate_series(1,10) RETURNING *), d AS (SELECT count(*) FROM ref JOIN local ON (a = x)) SELECT * FROM a, b, c, d ORDER BY x,y,a,b; +NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503000 test WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503003 test WHERE true +NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.ref_1503016 AS citus_table_alias (a, b) SELECT a, b FROM read_intermediate_result('insert_select_XXX_1503016'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer) RETURNING citus_table_alias.a, citus_table_alias.b +NOTICE: executing the command locally: SELECT a.count, b.x, b.y, c.a, c.b, d.count FROM (SELECT intermediate_result.count FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(count bigint)) a, (SELECT intermediate_result.x, intermediate_result.y FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(x integer, y integer)) b, (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) c, (SELECT intermediate_result.count FROM read_intermediate_result('XXX_4'::text, 'binary'::citus_copy_format) intermediate_result(count bigint)) d ORDER BY b.x, b.y, c.a, c.b + count | x | y | a | b | count +--------------------------------------------------------------------- + 100 | 3 | 2 | 1 | 1 | 0 + 100 | 3 | 2 | 2 | 2 | 0 + 100 | 3 | 2 | 3 | 3 | 0 + 100 | 3 | 2 | 4 | 4 | 0 + 100 | 3 | 2 | 5 | 5 | 0 + 100 | 3 | 2 | 6 | 6 | 0 + 100 | 3 | 2 | 7 | 7 | 0 + 100 | 3 | 2 | 8 | 8 | 0 + 100 | 3 | 2 | 9 | 9 | 0 + 100 | 3 | 2 | 10 | 10 | 0 +(10 rows) + +ROLLBACK; +-- same local table reference table tests, but outside a transaction block +INSERT INTO ref VALUES (1,2); +NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.ref_1503016 (a, b) VALUES (1, 2) +INSERT INTO local VALUES (1,2); +SELECT * FROM ref JOIN local ON (a = x); + a | b | x | y +--------------------------------------------------------------------- + 1 | 2 | 1 | 2 +(1 row) + +-- we wont see the modifying cte in this query because we will use local execution and +-- in postgres we wouldn't see this modifying cte, so it is consistent with postgres. +WITH a AS (SELECT count(*) FROM test), b AS (INSERT INTO local VALUES (3,2) RETURNING *), c AS (INSERT INTO ref VALUES (3,2) RETURNING *), d AS (SELECT count(*) FROM ref JOIN local ON (a = x)) SELECT * FROM a, b, c, d ORDER BY x,y,a,b; +NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503000 test WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503003 test WHERE true +NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.ref_1503016 (a, b) VALUES (3, 2) RETURNING a, b +NOTICE: executing the command locally: SELECT a.count, b.x, b.y, c.a, c.b, d.count FROM (SELECT intermediate_result.count FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(count bigint)) a, (SELECT intermediate_result.x, intermediate_result.y FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(x integer, y integer)) b, (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) c, (SELECT intermediate_result.count FROM read_intermediate_result('XXX_4'::text, 'binary'::citus_copy_format) intermediate_result(count bigint)) d ORDER BY b.x, b.y, c.a, c.b + count | x | y | a | b | count +--------------------------------------------------------------------- + 100 | 3 | 2 | 3 | 2 | 1 +(1 row) + +RESET citus.enable_cte_inlining; DELETE FROM test; DROP TABLE test; DROP SCHEMA coordinator_shouldhaveshards CASCADE; -NOTICE: drop cascades to table dist_table +NOTICE: drop cascades to 4 other objects +DETAIL: drop cascades to table dist_table +drop cascades to table ref +drop cascades to table ref_1503016 +drop cascades to table local SELECT 1 FROM master_set_node_property('localhost', :master_port, 'shouldhaveshards', false); ?column? --------------------------------------------------------------------- diff --git a/src/test/regress/expected/replicate_reference_tables_to_coordinator.out b/src/test/regress/expected/replicate_reference_tables_to_coordinator.out index f0fb28056..2b2229a1b 100644 --- a/src/test/regress/expected/replicate_reference_tables_to_coordinator.out +++ b/src/test/regress/expected/replicate_reference_tables_to_coordinator.out @@ -36,6 +36,11 @@ SELECT create_reference_table('numbers'); INSERT INTO numbers VALUES (20), (21); NOTICE: executing the command locally: INSERT INTO replicate_ref_to_coordinator.numbers_8000001 AS citus_table_alias (a) VALUES (20), (21) +CREATE OR REPLACE FUNCTION my_volatile_fn() +RETURNS INT AS $$ +BEGIN + RETURN 1; +END; $$ language plpgsql VOLATILE; -- INSERT ... SELECT between reference tables BEGIN; EXPLAIN INSERT INTO squares SELECT a, a*a FROM numbers; @@ -125,20 +130,21 @@ ORDER BY 1,2,3; 7 | 10 | 100 (3 rows) --- error if in transaction block +-- should work if in transaction block BEGIN; SELECT local_table.a, numbers.a FROM local_table NATURAL JOIN numbers ORDER BY 1; -ERROR: cannot join local tables and reference tables in a transaction block, udf block, or distributed CTE subquery + a | a +--------------------------------------------------------------------- + 20 | 20 +(1 row) + ROLLBACK; --- error if in a DO block +-- should work if in a DO block DO $$ BEGIN PERFORM local_table.a, numbers.a FROM local_table NATURAL JOIN numbers; END; $$; -ERROR: cannot join local tables and reference tables in a transaction block, udf block, or distributed CTE subquery -CONTEXT: SQL statement "SELECT local_table.a, numbers.a FROM local_table NATURAL JOIN numbers" -PL/pgSQL function inline_code_block line 3 at PERFORM -- test plpgsql function CREATE FUNCTION test_reference_local_join_plpgsql_func() RETURNS void AS $$ @@ -154,9 +160,8 @@ SELECT test_reference_local_join_plpgsql_func(); NOTICE: executing the command locally: INSERT INTO replicate_ref_to_coordinator.numbers_8000001 (a) VALUES (4) CONTEXT: SQL statement "INSERT INTO numbers VALUES (4)" PL/pgSQL function test_reference_local_join_plpgsql_func() line 4 at SQL statement -ERROR: cannot join local tables and reference tables in a transaction block, udf block, or distributed CTE subquery -CONTEXT: SQL statement "SELECT local_table.a, numbers.a FROM local_table NATURAL JOIN numbers ORDER BY 1" -PL/pgSQL function test_reference_local_join_plpgsql_func() line 5 at PERFORM +ERROR: +CONTEXT: PL/pgSQL function test_reference_local_join_plpgsql_func() line 6 at RAISE SELECT sum(a) FROM local_table; sum --------------------------------------------------------------------- @@ -175,9 +180,6 @@ CREATE PROCEDURE test_reference_local_join_proc() AS $$ SELECT local_table.a, numbers.a FROM local_table NATURAL JOIN numbers ORDER BY 1; $$ LANGUAGE sql; CALL test_reference_local_join_proc(); -ERROR: cannot join local tables and reference tables in a transaction block, udf block, or distributed CTE subquery -CONTEXT: SQL function "test_reference_local_join_proc" statement 1 --- error if in a transaction block even if reference table is not in search path CREATE SCHEMA s1; CREATE TABLE s1.ref(a int); SELECT create_reference_table('s1.ref'); @@ -188,35 +190,54 @@ SELECT create_reference_table('s1.ref'); BEGIN; SELECT local_table.a, r.a FROM local_table NATURAL JOIN s1.ref r ORDER BY 1; -ERROR: cannot join local tables and reference tables in a transaction block, udf block, or distributed CTE subquery + a | a +--------------------------------------------------------------------- +(0 rows) + ROLLBACK; BEGIN; WITH t1 AS ( - SELECT random() r, a FROM local_table + SELECT my_volatile_fn() r, a FROM local_table ) SELECT count(*) FROM t1, numbers WHERE t1.a = numbers.a AND r < 0.5; -ERROR: cannot join local tables and reference tables in a transaction block, udf block, or distributed CTE subquery + count +--------------------------------------------------------------------- + 0 +(1 row) + END; BEGIN; WITH t1 AS ( - SELECT random() r, a FROM numbers + SELECT my_volatile_fn() r, a FROM numbers ) SELECT count(*) FROM t1, local_table WHERE t1.a = local_table.a AND r < 0.5; -ERROR: cannot join local tables and reference tables in a transaction block, udf block, or distributed CTE subquery + count +--------------------------------------------------------------------- + 0 +(1 row) + END; BEGIN; SELECT count(*) FROM local_table -WHERE EXISTS(SELECT random() FROM numbers WHERE local_table.a = numbers.a); -ERROR: cannot join local tables and reference tables in a transaction block, udf block, or distributed CTE subquery +WHERE EXISTS(SELECT my_volatile_fn() FROM numbers WHERE local_table.a = numbers.a); + count +--------------------------------------------------------------------- + 1 +(1 row) + END; BEGIN; SELECT count(*) FROM numbers -WHERE EXISTS(SELECT random() FROM local_table WHERE local_table.a = numbers.a); -ERROR: cannot join local tables and reference tables in a transaction block, udf block, or distributed CTE subquery +WHERE EXISTS(SELECT my_volatile_fn() FROM local_table WHERE local_table.a = numbers.a); + count +--------------------------------------------------------------------- + 1 +(1 row) + END; DROP SCHEMA s1 CASCADE; NOTICE: drop cascades to 2 other objects DETAIL: drop cascades to table s1.ref drop cascades to table s1.ref_8000002 --- error if inside a SQL UDF call +-- not error if inside a SQL UDF call CREATE or replace FUNCTION test_reference_local_join_func() RETURNS SETOF RECORD AS $$ SET LOCAL citus.enable_local_execution to false; @@ -224,18 +245,21 @@ INSERT INTO numbers VALUES (2); SELECT local_table.a, numbers.a FROM local_table NATURAL JOIN numbers ORDER BY 1; $$ LANGUAGE sql; SELECT test_reference_local_join_func(); -ERROR: cannot join local tables and reference tables in a transaction block, udf block, or distributed CTE subquery -CONTEXT: SQL function "test_reference_local_join_func" statement 3 + test_reference_local_join_func +--------------------------------------------------------------------- + (20,20) +(1 row) + -- shouldn't plan locally if modifications happen in CTEs, ... WITH ins AS (INSERT INTO numbers VALUES (1) RETURNING *) SELECT * FROM numbers, local_table; ERROR: relation local_table is not distributed -WITH t AS (SELECT *, random() x FROM numbers FOR UPDATE) +WITH t AS (SELECT *, my_volatile_fn() x FROM numbers FOR UPDATE) SELECT * FROM numbers, local_table WHERE EXISTS (SELECT * FROM t WHERE t.x = numbers.a); ERROR: relation local_table is not distributed -- but this should be fine -WITH t AS (SELECT *, random() x FROM numbers) +WITH t AS (SELECT *, my_volatile_fn() x FROM numbers) SELECT * FROM numbers, local_table WHERE EXISTS (SELECT * FROM t WHERE t.x = numbers.a); a | a @@ -251,15 +275,19 @@ SELECT create_distributed_table('dist', 'a'); (1 row) INSERT INTO dist VALUES (20),(30); -WITH t AS (SELECT *, random() x FROM dist) +WITH t AS (SELECT *, my_volatile_fn() x FROM dist) SELECT * FROM numbers, local_table WHERE EXISTS (SELECT * FROM t WHERE t.x = numbers.a); ERROR: relation local_table is not distributed -- test CTE being reference/local join for distributed query -WITH t as (SELECT n.a, random() x FROM numbers n NATURAL JOIN local_table l) +WITH t as (SELECT n.a, my_volatile_fn() x FROM numbers n NATURAL JOIN local_table l) SELECT a FROM t NATURAL JOIN dist; -ERROR: cannot join local tables and reference tables in a transaction block, udf block, or distributed CTE subquery - -- error if FOR UPDATE/FOR SHARE + a +--------------------------------------------------------------------- + 20 +(1 row) + + -- shouldn't error if FOR UPDATE/FOR SHARE SELECT local_table.a, numbers.a FROM local_table NATURAL JOIN numbers FOR SHARE; ERROR: could not run distributed query with FOR UPDATE/SHARE commands HINT: Consider using an equality filter on the distributed table's partition column. @@ -313,7 +341,11 @@ $Q$); BEGIN; SELECT * FROM squares JOIN numbers_v ON squares.a = numbers_v.a; -ERROR: cannot join local tables and reference tables in a transaction block, udf block, or distributed CTE subquery + a | b | a +--------------------------------------------------------------------- + 2 | 4 | 2 +(1 row) + END; -- -- Joins between reference tables, local tables, and function calls shouldn't @@ -342,6 +374,55 @@ $Q$); f (1 row) +TRUNCATE local_table; +TRUNCATE numbers; +NOTICE: executing the command locally: TRUNCATE TABLE replicate_ref_to_coordinator.numbers_xxxxx CASCADE +BEGIN; +INSERT INTO local_table VALUES (1), (2), (3), (4); +INSERT INTO numbers VALUES (1), (2), (3), (4); +NOTICE: executing the command locally: INSERT INTO replicate_ref_to_coordinator.numbers_8000001 AS citus_table_alias (a) VALUES (1), (2), (3), (4) +ALTER TABLE numbers ADD COLUMN d int; +NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (8000001, 'replicate_ref_to_coordinator', 'ALTER TABLE numbers ADD COLUMN d int;') +SELECT * FROM local_table JOIN numbers USING(a) ORDER BY a; + a | d +--------------------------------------------------------------------- + 1 | + 2 | + 3 | + 4 | +(4 rows) + +ROLLBACK; +BEGIN; +INSERT INTO local_table VALUES (1), (2), (3); +WITH t as (SELECT n.a, my_volatile_fn() x FROM numbers n NATURAL JOIN local_table l ORDER BY n.a, x) +SELECT a FROM t NATURAL JOIN dist ORDER BY a; + a +--------------------------------------------------------------------- +(0 rows) + +ROLLBACK; +BEGIN; +INSERT INTO local_table VALUES (1), (2), (3); +INSERT INTO numbers SELECT * FROM generate_series(1, 100); +NOTICE: executing the copy locally for shard xxxxx +INSERT INTO numbers SELECT * FROM numbers; +NOTICE: executing the command locally: INSERT INTO replicate_ref_to_coordinator.numbers_8000001 AS citus_table_alias (a) SELECT a FROM replicate_ref_to_coordinator.numbers_8000001 numbers +SELECT COUNT(*) FROM local_table JOIN numbers using (a); + count +--------------------------------------------------------------------- + 6 +(1 row) + +UPDATE numbers SET a = a + 1; +NOTICE: executing the command locally: UPDATE replicate_ref_to_coordinator.numbers_8000001 numbers SET a = (a OPERATOR(pg_catalog.+) 1) +SELECT COUNT(*) FROM local_table JOIN numbers using (a); + count +--------------------------------------------------------------------- + 4 +(1 row) + +ROLLBACK; -- verify that we can drop columns from reference tables replicated to the coordinator -- see https://github.com/citusdata/citus/issues/3279 ALTER TABLE squares DROP COLUMN b; diff --git a/src/test/regress/sql/coordinator_shouldhaveshards.sql b/src/test/regress/sql/coordinator_shouldhaveshards.sql index 741c2076a..4b2a03332 100644 --- a/src/test/regress/sql/coordinator_shouldhaveshards.sql +++ b/src/test/regress/sql/coordinator_shouldhaveshards.sql @@ -94,6 +94,70 @@ SELECT y FROM test WHERE x = 1; SELECT count(*) FROM test t1, test t2 WHERE t1.x = t2.y; ROLLBACK; +CREATE TABLE ref (a int, b int); +SELECT create_reference_table('ref'); + +CREATE TABLE local (x int, y int); + +BEGIN; +SELECT count(*) FROM test; +SELECT * FROM ref JOIN local ON (a = x); +TRUNCATE ref; +ROLLBACK; + + +BEGIN; +SELECT count(*) FROM test; +TRUNCATE ref; +SELECT * FROM ref JOIN local ON (a = x); +ROLLBACK; + +BEGIN; +SELECT count(*) FROM test; +INSERT INTO ref VALUES (1,2); +INSERT INTO local VALUES (1,2); +SELECT * FROM ref JOIN local ON (a = x); +ROLLBACK; + +set citus.enable_cte_inlining to off; + +BEGIN; +SELECT count(*) FROM test; +-- we wont see the modifying cte in this query because we will use local execution and +-- in postgres we wouldn't see this modifying cte, so it is consistent with postgres. +WITH a AS (SELECT count(*) FROM test), b AS (INSERT INTO local VALUES (3,2) RETURNING *), c AS (INSERT INTO ref VALUES (3,2) RETURNING *), d AS (SELECT count(*) FROM ref JOIN local ON (a = x)) SELECT * FROM a, b, c, d ORDER BY x,y,a,b; +TRUNCATE ref; +SELECT * FROM ref JOIN local ON (a = x); +-- we wont see the modifying cte in this query because we will use local execution and +-- in postgres we wouldn't see this modifying cte, so it is consistent with postgres. +WITH a AS (SELECT count(*) FROM test), b AS (INSERT INTO local VALUES (3,2) RETURNING *), c AS (INSERT INTO ref VALUES (3,2) RETURNING *), d AS (SELECT count(*) FROM ref JOIN local ON (a = x)) SELECT * FROM a, b, c, d ORDER BY x,y,a,b; +ROLLBACK; + + +BEGIN; +-- we wont see the modifying cte in this query because we will use local execution and +-- in postgres we wouldn't see this modifying cte, so it is consistent with postgres. +WITH a AS (SELECT count(*) FROM test), b AS (INSERT INTO local VALUES (3,2) RETURNING *), c AS (INSERT INTO ref VALUES (3,2) RETURNING *), d AS (SELECT count(*) FROM ref JOIN local ON (a = x)) SELECT * FROM a, b, c, d ORDER BY x,y,a,b; +ROLLBACK; + +BEGIN; +-- we wont see the modifying cte in this query because we will use local execution and +-- in postgres we wouldn't see this modifying cte, so it is consistent with postgres. +WITH a AS (SELECT count(*) FROM test), b AS (INSERT INTO local VALUES (3,2) RETURNING *), c AS (INSERT INTO ref SELECT *,* FROM generate_series(1,10) RETURNING *), d AS (SELECT count(*) FROM ref JOIN local ON (a = x)) SELECT * FROM a, b, c, d ORDER BY x,y,a,b; +ROLLBACK; + +-- same local table reference table tests, but outside a transaction block +INSERT INTO ref VALUES (1,2); +INSERT INTO local VALUES (1,2); +SELECT * FROM ref JOIN local ON (a = x); + +-- we wont see the modifying cte in this query because we will use local execution and +-- in postgres we wouldn't see this modifying cte, so it is consistent with postgres. +WITH a AS (SELECT count(*) FROM test), b AS (INSERT INTO local VALUES (3,2) RETURNING *), c AS (INSERT INTO ref VALUES (3,2) RETURNING *), d AS (SELECT count(*) FROM ref JOIN local ON (a = x)) SELECT * FROM a, b, c, d ORDER BY x,y,a,b; + + +RESET citus.enable_cte_inlining; + DELETE FROM test; DROP TABLE test; diff --git a/src/test/regress/sql/replicate_reference_tables_to_coordinator.sql b/src/test/regress/sql/replicate_reference_tables_to_coordinator.sql index 02ddab970..cf78531a8 100644 --- a/src/test/regress/sql/replicate_reference_tables_to_coordinator.sql +++ b/src/test/regress/sql/replicate_reference_tables_to_coordinator.sql @@ -24,6 +24,13 @@ CREATE TABLE numbers(a int); SELECT create_reference_table('numbers'); INSERT INTO numbers VALUES (20), (21); +CREATE OR REPLACE FUNCTION my_volatile_fn() +RETURNS INT AS $$ +BEGIN + RETURN 1; +END; $$ language plpgsql VOLATILE; + + -- INSERT ... SELECT between reference tables BEGIN; EXPLAIN INSERT INTO squares SELECT a, a*a FROM numbers; @@ -53,12 +60,12 @@ FROM local_table lt JOIN squares sq ON sq.a > lt.a and sq.b > 90 ORDER BY 1,2,3; --- error if in transaction block +-- should work if in transaction block BEGIN; SELECT local_table.a, numbers.a FROM local_table NATURAL JOIN numbers ORDER BY 1; ROLLBACK; --- error if in a DO block +-- should work if in a DO block DO $$ BEGIN PERFORM local_table.a, numbers.a FROM local_table NATURAL JOIN numbers; @@ -86,7 +93,6 @@ SELECT local_table.a, numbers.a FROM local_table NATURAL JOIN numbers ORDER BY 1 $$ LANGUAGE sql; CALL test_reference_local_join_proc(); --- error if in a transaction block even if reference table is not in search path CREATE SCHEMA s1; CREATE TABLE s1.ref(a int); SELECT create_reference_table('s1.ref'); @@ -97,29 +103,29 @@ ROLLBACK; BEGIN; WITH t1 AS ( - SELECT random() r, a FROM local_table + SELECT my_volatile_fn() r, a FROM local_table ) SELECT count(*) FROM t1, numbers WHERE t1.a = numbers.a AND r < 0.5; END; BEGIN; WITH t1 AS ( - SELECT random() r, a FROM numbers + SELECT my_volatile_fn() r, a FROM numbers ) SELECT count(*) FROM t1, local_table WHERE t1.a = local_table.a AND r < 0.5; END; BEGIN; SELECT count(*) FROM local_table -WHERE EXISTS(SELECT random() FROM numbers WHERE local_table.a = numbers.a); +WHERE EXISTS(SELECT my_volatile_fn() FROM numbers WHERE local_table.a = numbers.a); END; BEGIN; SELECT count(*) FROM numbers -WHERE EXISTS(SELECT random() FROM local_table WHERE local_table.a = numbers.a); +WHERE EXISTS(SELECT my_volatile_fn() FROM local_table WHERE local_table.a = numbers.a); END; DROP SCHEMA s1 CASCADE; --- error if inside a SQL UDF call +-- not error if inside a SQL UDF call CREATE or replace FUNCTION test_reference_local_join_func() RETURNS SETOF RECORD AS $$ SET LOCAL citus.enable_local_execution to false; @@ -133,12 +139,12 @@ SELECT test_reference_local_join_func(); WITH ins AS (INSERT INTO numbers VALUES (1) RETURNING *) SELECT * FROM numbers, local_table; -WITH t AS (SELECT *, random() x FROM numbers FOR UPDATE) +WITH t AS (SELECT *, my_volatile_fn() x FROM numbers FOR UPDATE) SELECT * FROM numbers, local_table WHERE EXISTS (SELECT * FROM t WHERE t.x = numbers.a); -- but this should be fine -WITH t AS (SELECT *, random() x FROM numbers) +WITH t AS (SELECT *, my_volatile_fn() x FROM numbers) SELECT * FROM numbers, local_table WHERE EXISTS (SELECT * FROM t WHERE t.x = numbers.a); @@ -147,15 +153,15 @@ CREATE TABLE dist(a int); SELECT create_distributed_table('dist', 'a'); INSERT INTO dist VALUES (20),(30); -WITH t AS (SELECT *, random() x FROM dist) +WITH t AS (SELECT *, my_volatile_fn() x FROM dist) SELECT * FROM numbers, local_table WHERE EXISTS (SELECT * FROM t WHERE t.x = numbers.a); -- test CTE being reference/local join for distributed query -WITH t as (SELECT n.a, random() x FROM numbers n NATURAL JOIN local_table l) +WITH t as (SELECT n.a, my_volatile_fn() x FROM numbers n NATURAL JOIN local_table l) SELECT a FROM t NATURAL JOIN dist; - -- error if FOR UPDATE/FOR SHARE + -- shouldn't error if FOR UPDATE/FOR SHARE SELECT local_table.a, numbers.a FROM local_table NATURAL JOIN numbers FOR SHARE; SELECT local_table.a, numbers.a FROM local_table NATURAL JOIN numbers FOR UPDATE; @@ -212,6 +218,34 @@ EXPLAIN (COSTS FALSE) SELECT a.a FROM local_table a, numbers b WHERE a.a = b.a ORDER BY abs(a.a); $Q$); +TRUNCATE local_table; +TRUNCATE numbers; + +BEGIN; +INSERT INTO local_table VALUES (1), (2), (3), (4); +INSERT INTO numbers VALUES (1), (2), (3), (4); +ALTER TABLE numbers ADD COLUMN d int; +SELECT * FROM local_table JOIN numbers USING(a) ORDER BY a; +ROLLBACK; + +BEGIN; +INSERT INTO local_table VALUES (1), (2), (3); +WITH t as (SELECT n.a, my_volatile_fn() x FROM numbers n NATURAL JOIN local_table l ORDER BY n.a, x) +SELECT a FROM t NATURAL JOIN dist ORDER BY a; +ROLLBACK; + +BEGIN; +INSERT INTO local_table VALUES (1), (2), (3); +INSERT INTO numbers SELECT * FROM generate_series(1, 100); +INSERT INTO numbers SELECT * FROM numbers; +SELECT COUNT(*) FROM local_table JOIN numbers using (a); +UPDATE numbers SET a = a + 1; +SELECT COUNT(*) FROM local_table JOIN numbers using (a); +ROLLBACK; + + + + -- verify that we can drop columns from reference tables replicated to the coordinator -- see https://github.com/citusdata/citus/issues/3279 ALTER TABLE squares DROP COLUMN b;