enable local table join with reference table (#3697)

* enable local table join with reference table

* test different cases with local table and reference join
pull/3733/head
SaitTalhaNisanci 2020-04-09 15:25:54 +03:00 committed by GitHub
parent ebda3eff61
commit 24dcb02bca
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 391 additions and 171 deletions

View File

@ -72,7 +72,6 @@ int ExecutorLevel = 0;
/* local function forward declarations */ /* local function forward declarations */
static Relation StubRelation(TupleDesc tupleDescriptor); static Relation StubRelation(TupleDesc tupleDescriptor);
static bool AlterTableConstraintCheck(QueryDesc *queryDesc); static bool AlterTableConstraintCheck(QueryDesc *queryDesc);
static bool IsLocalReferenceTableJoinPlan(PlannedStmt *plan);
static List * FindCitusCustomScanStates(PlanState *planState); static List * FindCitusCustomScanStates(PlanState *planState);
static bool CitusCustomScanStateWalker(PlanState *planState, static bool CitusCustomScanStateWalker(PlanState *planState,
List **citusCustomScanStates); List **citusCustomScanStates);
@ -149,24 +148,6 @@ CitusExecutorRun(QueryDesc *queryDesc,
InstrStartNode(totalTime); InstrStartNode(totalTime);
} }
if (CitusHasBeenLoaded())
{
if (IsLocalReferenceTableJoinPlan(queryDesc->plannedstmt) &&
IsMultiStatementTransaction())
{
/*
* Currently we don't support this to avoid problems with tuple
* visibility, locking, etc. For example, change to the reference
* table can go through a MultiConnection, which won't be visible
* to the locally planned queries.
*/
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot join local tables and reference tables in "
"a transaction block, udf block, or distributed "
"CTE subquery")));
}
}
/* /*
* Disable execution of ALTER TABLE constraint validation queries. These * Disable execution of ALTER TABLE constraint validation queries. These
* constraints will be validated in worker nodes, so running these queries * constraints will be validated in worker nodes, so running these queries
@ -741,110 +722,3 @@ AlterTableConstraintCheck(QueryDesc *queryDesc)
return true; return true;
} }
/*
* IsLocalReferenceTableJoinPlan returns true if the given plan joins local tables
* with reference table shards.
*
* This should be consistent with IsLocalReferenceTableJoin() in distributed_planner.c.
*/
static bool
IsLocalReferenceTableJoinPlan(PlannedStmt *plan)
{
bool hasReferenceTable = false;
bool hasLocalTable = false;
bool hasReferenceTableReplica = false;
/*
* We only allow join between reference tables and local tables in the
* coordinator.
*/
if (!IsCoordinator())
{
return false;
}
/*
* All groups that have pg_dist_node entries, also have reference
* table replicas.
*/
PrimaryNodeForGroup(GetLocalGroupId(), &hasReferenceTableReplica);
/*
* If reference table doesn't have replicas on the coordinator, we don't
* allow joins with local tables.
*/
if (!hasReferenceTableReplica)
{
return false;
}
/*
* No need to check FOR UPDATE/SHARE or modifying subqueries, those have
* already errored out in distributed_planner.c if they contain mix of
* local and distributed tables.
*/
if (plan->commandType != CMD_SELECT)
{
return false;
}
/*
* plan->rtable contains the flattened RTE lists of the plan tree, which
* includes rtes in subqueries, CTEs, ...
*
* It doesn't contain optimized away table accesses (due to join optimization),
* which is fine for our purpose.
*/
RangeTblEntry *rangeTableEntry = NULL;
foreach_ptr(rangeTableEntry, plan->rtable)
{
bool onlySearchPath = false;
/*
* Planner's IsLocalReferenceTableJoin() doesn't allow planning functions
* in FROM clause locally. Early exit. We cannot use Assert() here since
* all non-Citus plans might pass through these checks.
*/
if (rangeTableEntry->rtekind == RTE_FUNCTION)
{
return false;
}
if (rangeTableEntry->rtekind != RTE_RELATION)
{
continue;
}
/*
* Planner's IsLocalReferenceTableJoin() doesn't allow planning reference
* table and view join locally. Early exit. We cannot use Assert() here
* since all non-Citus plans might pass through these checks.
*/
if (rangeTableEntry->relkind == RELKIND_VIEW)
{
return false;
}
if (RelationIsAKnownShard(rangeTableEntry->relid, onlySearchPath))
{
/*
* We don't allow joining non-reference distributed tables, so we
* can skip checking that this is a reference table shard or not.
*/
hasReferenceTable = true;
}
else
{
hasLocalTable = true;
}
if (hasReferenceTable && hasLocalTable)
{
return true;
}
}
return false;
}

View File

@ -207,10 +207,177 @@ ERROR: cannot execute command because a local execution has accessed a placemen
DETAIL: Some parallel commands cannot be executed if a previous command has already been executed locally DETAIL: Some parallel commands cannot be executed if a previous command has already been executed locally
HINT: Try re-running the transaction with "SET LOCAL citus.enable_local_execution TO OFF;" HINT: Try re-running the transaction with "SET LOCAL citus.enable_local_execution TO OFF;"
ROLLBACK; ROLLBACK;
CREATE TABLE ref (a int, b int);
SELECT create_reference_table('ref');
create_reference_table
---------------------------------------------------------------------
(1 row)
CREATE TABLE local (x int, y int);
BEGIN;
SELECT count(*) FROM test;
NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503000 test WHERE true
NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503003 test WHERE true
count
---------------------------------------------------------------------
100
(1 row)
SELECT * FROM ref JOIN local ON (a = x);
a | b | x | y
---------------------------------------------------------------------
(0 rows)
TRUNCATE ref;
NOTICE: executing the command locally: TRUNCATE TABLE coordinator_shouldhaveshards.ref_xxxxx CASCADE
ROLLBACK;
BEGIN;
SELECT count(*) FROM test;
NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503000 test WHERE true
NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503003 test WHERE true
count
---------------------------------------------------------------------
100
(1 row)
TRUNCATE ref;
NOTICE: executing the command locally: TRUNCATE TABLE coordinator_shouldhaveshards.ref_xxxxx CASCADE
SELECT * FROM ref JOIN local ON (a = x);
a | b | x | y
---------------------------------------------------------------------
(0 rows)
ROLLBACK;
BEGIN;
SELECT count(*) FROM test;
NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503000 test WHERE true
NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503003 test WHERE true
count
---------------------------------------------------------------------
100
(1 row)
INSERT INTO ref VALUES (1,2);
NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.ref_1503016 (a, b) VALUES (1, 2)
INSERT INTO local VALUES (1,2);
SELECT * FROM ref JOIN local ON (a = x);
a | b | x | y
---------------------------------------------------------------------
1 | 2 | 1 | 2
(1 row)
ROLLBACK;
set citus.enable_cte_inlining to off;
BEGIN;
SELECT count(*) FROM test;
NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503000 test WHERE true
NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503003 test WHERE true
count
---------------------------------------------------------------------
100
(1 row)
-- we wont see the modifying cte in this query because we will use local execution and
-- in postgres we wouldn't see this modifying cte, so it is consistent with postgres.
WITH a AS (SELECT count(*) FROM test), b AS (INSERT INTO local VALUES (3,2) RETURNING *), c AS (INSERT INTO ref VALUES (3,2) RETURNING *), d AS (SELECT count(*) FROM ref JOIN local ON (a = x)) SELECT * FROM a, b, c, d ORDER BY x,y,a,b;
NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503000 test WHERE true
NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503003 test WHERE true
NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.ref_1503016 (a, b) VALUES (3, 2) RETURNING a, b
NOTICE: executing the command locally: SELECT a.count, b.x, b.y, c.a, c.b, d.count FROM (SELECT intermediate_result.count FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(count bigint)) a, (SELECT intermediate_result.x, intermediate_result.y FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(x integer, y integer)) b, (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) c, (SELECT intermediate_result.count FROM read_intermediate_result('XXX_4'::text, 'binary'::citus_copy_format) intermediate_result(count bigint)) d ORDER BY b.x, b.y, c.a, c.b
count | x | y | a | b | count
---------------------------------------------------------------------
100 | 3 | 2 | 3 | 2 | 0
(1 row)
TRUNCATE ref;
NOTICE: executing the command locally: TRUNCATE TABLE coordinator_shouldhaveshards.ref_xxxxx CASCADE
SELECT * FROM ref JOIN local ON (a = x);
a | b | x | y
---------------------------------------------------------------------
(0 rows)
-- we wont see the modifying cte in this query because we will use local execution and
-- in postgres we wouldn't see this modifying cte, so it is consistent with postgres.
WITH a AS (SELECT count(*) FROM test), b AS (INSERT INTO local VALUES (3,2) RETURNING *), c AS (INSERT INTO ref VALUES (3,2) RETURNING *), d AS (SELECT count(*) FROM ref JOIN local ON (a = x)) SELECT * FROM a, b, c, d ORDER BY x,y,a,b;
NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503000 test WHERE true
NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503003 test WHERE true
NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.ref_1503016 (a, b) VALUES (3, 2) RETURNING a, b
NOTICE: executing the command locally: SELECT a.count, b.x, b.y, c.a, c.b, d.count FROM (SELECT intermediate_result.count FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(count bigint)) a, (SELECT intermediate_result.x, intermediate_result.y FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(x integer, y integer)) b, (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) c, (SELECT intermediate_result.count FROM read_intermediate_result('XXX_4'::text, 'binary'::citus_copy_format) intermediate_result(count bigint)) d ORDER BY b.x, b.y, c.a, c.b
count | x | y | a | b | count
---------------------------------------------------------------------
100 | 3 | 2 | 3 | 2 | 0
(1 row)
ROLLBACK;
BEGIN;
-- we wont see the modifying cte in this query because we will use local execution and
-- in postgres we wouldn't see this modifying cte, so it is consistent with postgres.
WITH a AS (SELECT count(*) FROM test), b AS (INSERT INTO local VALUES (3,2) RETURNING *), c AS (INSERT INTO ref VALUES (3,2) RETURNING *), d AS (SELECT count(*) FROM ref JOIN local ON (a = x)) SELECT * FROM a, b, c, d ORDER BY x,y,a,b;
NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503000 test WHERE true
NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503003 test WHERE true
NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.ref_1503016 (a, b) VALUES (3, 2) RETURNING a, b
NOTICE: executing the command locally: SELECT a.count, b.x, b.y, c.a, c.b, d.count FROM (SELECT intermediate_result.count FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(count bigint)) a, (SELECT intermediate_result.x, intermediate_result.y FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(x integer, y integer)) b, (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) c, (SELECT intermediate_result.count FROM read_intermediate_result('XXX_4'::text, 'binary'::citus_copy_format) intermediate_result(count bigint)) d ORDER BY b.x, b.y, c.a, c.b
count | x | y | a | b | count
---------------------------------------------------------------------
100 | 3 | 2 | 3 | 2 | 0
(1 row)
ROLLBACK;
BEGIN;
-- we wont see the modifying cte in this query because we will use local execution and
-- in postgres we wouldn't see this modifying cte, so it is consistent with postgres.
WITH a AS (SELECT count(*) FROM test), b AS (INSERT INTO local VALUES (3,2) RETURNING *), c AS (INSERT INTO ref SELECT *,* FROM generate_series(1,10) RETURNING *), d AS (SELECT count(*) FROM ref JOIN local ON (a = x)) SELECT * FROM a, b, c, d ORDER BY x,y,a,b;
NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503000 test WHERE true
NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503003 test WHERE true
NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.ref_1503016 AS citus_table_alias (a, b) SELECT a, b FROM read_intermediate_result('insert_select_XXX_1503016'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer) RETURNING citus_table_alias.a, citus_table_alias.b
NOTICE: executing the command locally: SELECT a.count, b.x, b.y, c.a, c.b, d.count FROM (SELECT intermediate_result.count FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(count bigint)) a, (SELECT intermediate_result.x, intermediate_result.y FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(x integer, y integer)) b, (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) c, (SELECT intermediate_result.count FROM read_intermediate_result('XXX_4'::text, 'binary'::citus_copy_format) intermediate_result(count bigint)) d ORDER BY b.x, b.y, c.a, c.b
count | x | y | a | b | count
---------------------------------------------------------------------
100 | 3 | 2 | 1 | 1 | 0
100 | 3 | 2 | 2 | 2 | 0
100 | 3 | 2 | 3 | 3 | 0
100 | 3 | 2 | 4 | 4 | 0
100 | 3 | 2 | 5 | 5 | 0
100 | 3 | 2 | 6 | 6 | 0
100 | 3 | 2 | 7 | 7 | 0
100 | 3 | 2 | 8 | 8 | 0
100 | 3 | 2 | 9 | 9 | 0
100 | 3 | 2 | 10 | 10 | 0
(10 rows)
ROLLBACK;
-- same local table reference table tests, but outside a transaction block
INSERT INTO ref VALUES (1,2);
NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.ref_1503016 (a, b) VALUES (1, 2)
INSERT INTO local VALUES (1,2);
SELECT * FROM ref JOIN local ON (a = x);
a | b | x | y
---------------------------------------------------------------------
1 | 2 | 1 | 2
(1 row)
-- we wont see the modifying cte in this query because we will use local execution and
-- in postgres we wouldn't see this modifying cte, so it is consistent with postgres.
WITH a AS (SELECT count(*) FROM test), b AS (INSERT INTO local VALUES (3,2) RETURNING *), c AS (INSERT INTO ref VALUES (3,2) RETURNING *), d AS (SELECT count(*) FROM ref JOIN local ON (a = x)) SELECT * FROM a, b, c, d ORDER BY x,y,a,b;
NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503000 test WHERE true
NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503003 test WHERE true
NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.ref_1503016 (a, b) VALUES (3, 2) RETURNING a, b
NOTICE: executing the command locally: SELECT a.count, b.x, b.y, c.a, c.b, d.count FROM (SELECT intermediate_result.count FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(count bigint)) a, (SELECT intermediate_result.x, intermediate_result.y FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(x integer, y integer)) b, (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) c, (SELECT intermediate_result.count FROM read_intermediate_result('XXX_4'::text, 'binary'::citus_copy_format) intermediate_result(count bigint)) d ORDER BY b.x, b.y, c.a, c.b
count | x | y | a | b | count
---------------------------------------------------------------------
100 | 3 | 2 | 3 | 2 | 1
(1 row)
RESET citus.enable_cte_inlining;
DELETE FROM test; DELETE FROM test;
DROP TABLE test; DROP TABLE test;
DROP SCHEMA coordinator_shouldhaveshards CASCADE; DROP SCHEMA coordinator_shouldhaveshards CASCADE;
NOTICE: drop cascades to table dist_table NOTICE: drop cascades to 4 other objects
DETAIL: drop cascades to table dist_table
drop cascades to table ref
drop cascades to table ref_1503016
drop cascades to table local
SELECT 1 FROM master_set_node_property('localhost', :master_port, 'shouldhaveshards', false); SELECT 1 FROM master_set_node_property('localhost', :master_port, 'shouldhaveshards', false);
?column? ?column?
--------------------------------------------------------------------- ---------------------------------------------------------------------

View File

@ -36,6 +36,11 @@ SELECT create_reference_table('numbers');
INSERT INTO numbers VALUES (20), (21); INSERT INTO numbers VALUES (20), (21);
NOTICE: executing the command locally: INSERT INTO replicate_ref_to_coordinator.numbers_8000001 AS citus_table_alias (a) VALUES (20), (21) NOTICE: executing the command locally: INSERT INTO replicate_ref_to_coordinator.numbers_8000001 AS citus_table_alias (a) VALUES (20), (21)
CREATE OR REPLACE FUNCTION my_volatile_fn()
RETURNS INT AS $$
BEGIN
RETURN 1;
END; $$ language plpgsql VOLATILE;
-- INSERT ... SELECT between reference tables -- INSERT ... SELECT between reference tables
BEGIN; BEGIN;
EXPLAIN INSERT INTO squares SELECT a, a*a FROM numbers; EXPLAIN INSERT INTO squares SELECT a, a*a FROM numbers;
@ -125,20 +130,21 @@ ORDER BY 1,2,3;
7 | 10 | 100 7 | 10 | 100
(3 rows) (3 rows)
-- error if in transaction block -- should work if in transaction block
BEGIN; BEGIN;
SELECT local_table.a, numbers.a FROM local_table NATURAL JOIN numbers ORDER BY 1; SELECT local_table.a, numbers.a FROM local_table NATURAL JOIN numbers ORDER BY 1;
ERROR: cannot join local tables and reference tables in a transaction block, udf block, or distributed CTE subquery a | a
---------------------------------------------------------------------
20 | 20
(1 row)
ROLLBACK; ROLLBACK;
-- error if in a DO block -- should work if in a DO block
DO $$ DO $$
BEGIN BEGIN
PERFORM local_table.a, numbers.a FROM local_table NATURAL JOIN numbers; PERFORM local_table.a, numbers.a FROM local_table NATURAL JOIN numbers;
END; END;
$$; $$;
ERROR: cannot join local tables and reference tables in a transaction block, udf block, or distributed CTE subquery
CONTEXT: SQL statement "SELECT local_table.a, numbers.a FROM local_table NATURAL JOIN numbers"
PL/pgSQL function inline_code_block line 3 at PERFORM
-- test plpgsql function -- test plpgsql function
CREATE FUNCTION test_reference_local_join_plpgsql_func() CREATE FUNCTION test_reference_local_join_plpgsql_func()
RETURNS void AS $$ RETURNS void AS $$
@ -154,9 +160,8 @@ SELECT test_reference_local_join_plpgsql_func();
NOTICE: executing the command locally: INSERT INTO replicate_ref_to_coordinator.numbers_8000001 (a) VALUES (4) NOTICE: executing the command locally: INSERT INTO replicate_ref_to_coordinator.numbers_8000001 (a) VALUES (4)
CONTEXT: SQL statement "INSERT INTO numbers VALUES (4)" CONTEXT: SQL statement "INSERT INTO numbers VALUES (4)"
PL/pgSQL function test_reference_local_join_plpgsql_func() line 4 at SQL statement PL/pgSQL function test_reference_local_join_plpgsql_func() line 4 at SQL statement
ERROR: cannot join local tables and reference tables in a transaction block, udf block, or distributed CTE subquery ERROR:
CONTEXT: SQL statement "SELECT local_table.a, numbers.a FROM local_table NATURAL JOIN numbers ORDER BY 1" CONTEXT: PL/pgSQL function test_reference_local_join_plpgsql_func() line 6 at RAISE
PL/pgSQL function test_reference_local_join_plpgsql_func() line 5 at PERFORM
SELECT sum(a) FROM local_table; SELECT sum(a) FROM local_table;
sum sum
--------------------------------------------------------------------- ---------------------------------------------------------------------
@ -175,9 +180,6 @@ CREATE PROCEDURE test_reference_local_join_proc() AS $$
SELECT local_table.a, numbers.a FROM local_table NATURAL JOIN numbers ORDER BY 1; SELECT local_table.a, numbers.a FROM local_table NATURAL JOIN numbers ORDER BY 1;
$$ LANGUAGE sql; $$ LANGUAGE sql;
CALL test_reference_local_join_proc(); CALL test_reference_local_join_proc();
ERROR: cannot join local tables and reference tables in a transaction block, udf block, or distributed CTE subquery
CONTEXT: SQL function "test_reference_local_join_proc" statement 1
-- error if in a transaction block even if reference table is not in search path
CREATE SCHEMA s1; CREATE SCHEMA s1;
CREATE TABLE s1.ref(a int); CREATE TABLE s1.ref(a int);
SELECT create_reference_table('s1.ref'); SELECT create_reference_table('s1.ref');
@ -188,35 +190,54 @@ SELECT create_reference_table('s1.ref');
BEGIN; BEGIN;
SELECT local_table.a, r.a FROM local_table NATURAL JOIN s1.ref r ORDER BY 1; SELECT local_table.a, r.a FROM local_table NATURAL JOIN s1.ref r ORDER BY 1;
ERROR: cannot join local tables and reference tables in a transaction block, udf block, or distributed CTE subquery a | a
---------------------------------------------------------------------
(0 rows)
ROLLBACK; ROLLBACK;
BEGIN; BEGIN;
WITH t1 AS ( WITH t1 AS (
SELECT random() r, a FROM local_table SELECT my_volatile_fn() r, a FROM local_table
) SELECT count(*) FROM t1, numbers WHERE t1.a = numbers.a AND r < 0.5; ) SELECT count(*) FROM t1, numbers WHERE t1.a = numbers.a AND r < 0.5;
ERROR: cannot join local tables and reference tables in a transaction block, udf block, or distributed CTE subquery count
---------------------------------------------------------------------
0
(1 row)
END; END;
BEGIN; BEGIN;
WITH t1 AS ( WITH t1 AS (
SELECT random() r, a FROM numbers SELECT my_volatile_fn() r, a FROM numbers
) SELECT count(*) FROM t1, local_table WHERE t1.a = local_table.a AND r < 0.5; ) SELECT count(*) FROM t1, local_table WHERE t1.a = local_table.a AND r < 0.5;
ERROR: cannot join local tables and reference tables in a transaction block, udf block, or distributed CTE subquery count
---------------------------------------------------------------------
0
(1 row)
END; END;
BEGIN; BEGIN;
SELECT count(*) FROM local_table SELECT count(*) FROM local_table
WHERE EXISTS(SELECT random() FROM numbers WHERE local_table.a = numbers.a); WHERE EXISTS(SELECT my_volatile_fn() FROM numbers WHERE local_table.a = numbers.a);
ERROR: cannot join local tables and reference tables in a transaction block, udf block, or distributed CTE subquery count
---------------------------------------------------------------------
1
(1 row)
END; END;
BEGIN; BEGIN;
SELECT count(*) FROM numbers SELECT count(*) FROM numbers
WHERE EXISTS(SELECT random() FROM local_table WHERE local_table.a = numbers.a); WHERE EXISTS(SELECT my_volatile_fn() FROM local_table WHERE local_table.a = numbers.a);
ERROR: cannot join local tables and reference tables in a transaction block, udf block, or distributed CTE subquery count
---------------------------------------------------------------------
1
(1 row)
END; END;
DROP SCHEMA s1 CASCADE; DROP SCHEMA s1 CASCADE;
NOTICE: drop cascades to 2 other objects NOTICE: drop cascades to 2 other objects
DETAIL: drop cascades to table s1.ref DETAIL: drop cascades to table s1.ref
drop cascades to table s1.ref_8000002 drop cascades to table s1.ref_8000002
-- error if inside a SQL UDF call -- not error if inside a SQL UDF call
CREATE or replace FUNCTION test_reference_local_join_func() CREATE or replace FUNCTION test_reference_local_join_func()
RETURNS SETOF RECORD AS $$ RETURNS SETOF RECORD AS $$
SET LOCAL citus.enable_local_execution to false; SET LOCAL citus.enable_local_execution to false;
@ -224,18 +245,21 @@ INSERT INTO numbers VALUES (2);
SELECT local_table.a, numbers.a FROM local_table NATURAL JOIN numbers ORDER BY 1; SELECT local_table.a, numbers.a FROM local_table NATURAL JOIN numbers ORDER BY 1;
$$ LANGUAGE sql; $$ LANGUAGE sql;
SELECT test_reference_local_join_func(); SELECT test_reference_local_join_func();
ERROR: cannot join local tables and reference tables in a transaction block, udf block, or distributed CTE subquery test_reference_local_join_func
CONTEXT: SQL function "test_reference_local_join_func" statement 3 ---------------------------------------------------------------------
(20,20)
(1 row)
-- shouldn't plan locally if modifications happen in CTEs, ... -- shouldn't plan locally if modifications happen in CTEs, ...
WITH ins AS (INSERT INTO numbers VALUES (1) RETURNING *) WITH ins AS (INSERT INTO numbers VALUES (1) RETURNING *)
SELECT * FROM numbers, local_table; SELECT * FROM numbers, local_table;
ERROR: relation local_table is not distributed ERROR: relation local_table is not distributed
WITH t AS (SELECT *, random() x FROM numbers FOR UPDATE) WITH t AS (SELECT *, my_volatile_fn() x FROM numbers FOR UPDATE)
SELECT * FROM numbers, local_table SELECT * FROM numbers, local_table
WHERE EXISTS (SELECT * FROM t WHERE t.x = numbers.a); WHERE EXISTS (SELECT * FROM t WHERE t.x = numbers.a);
ERROR: relation local_table is not distributed ERROR: relation local_table is not distributed
-- but this should be fine -- but this should be fine
WITH t AS (SELECT *, random() x FROM numbers) WITH t AS (SELECT *, my_volatile_fn() x FROM numbers)
SELECT * FROM numbers, local_table SELECT * FROM numbers, local_table
WHERE EXISTS (SELECT * FROM t WHERE t.x = numbers.a); WHERE EXISTS (SELECT * FROM t WHERE t.x = numbers.a);
a | a a | a
@ -251,15 +275,19 @@ SELECT create_distributed_table('dist', 'a');
(1 row) (1 row)
INSERT INTO dist VALUES (20),(30); INSERT INTO dist VALUES (20),(30);
WITH t AS (SELECT *, random() x FROM dist) WITH t AS (SELECT *, my_volatile_fn() x FROM dist)
SELECT * FROM numbers, local_table SELECT * FROM numbers, local_table
WHERE EXISTS (SELECT * FROM t WHERE t.x = numbers.a); WHERE EXISTS (SELECT * FROM t WHERE t.x = numbers.a);
ERROR: relation local_table is not distributed ERROR: relation local_table is not distributed
-- test CTE being reference/local join for distributed query -- test CTE being reference/local join for distributed query
WITH t as (SELECT n.a, random() x FROM numbers n NATURAL JOIN local_table l) WITH t as (SELECT n.a, my_volatile_fn() x FROM numbers n NATURAL JOIN local_table l)
SELECT a FROM t NATURAL JOIN dist; SELECT a FROM t NATURAL JOIN dist;
ERROR: cannot join local tables and reference tables in a transaction block, udf block, or distributed CTE subquery a
-- error if FOR UPDATE/FOR SHARE ---------------------------------------------------------------------
20
(1 row)
-- shouldn't error if FOR UPDATE/FOR SHARE
SELECT local_table.a, numbers.a FROM local_table NATURAL JOIN numbers FOR SHARE; SELECT local_table.a, numbers.a FROM local_table NATURAL JOIN numbers FOR SHARE;
ERROR: could not run distributed query with FOR UPDATE/SHARE commands ERROR: could not run distributed query with FOR UPDATE/SHARE commands
HINT: Consider using an equality filter on the distributed table's partition column. HINT: Consider using an equality filter on the distributed table's partition column.
@ -313,7 +341,11 @@ $Q$);
BEGIN; BEGIN;
SELECT * FROM squares JOIN numbers_v ON squares.a = numbers_v.a; SELECT * FROM squares JOIN numbers_v ON squares.a = numbers_v.a;
ERROR: cannot join local tables and reference tables in a transaction block, udf block, or distributed CTE subquery a | b | a
---------------------------------------------------------------------
2 | 4 | 2
(1 row)
END; END;
-- --
-- Joins between reference tables, local tables, and function calls shouldn't -- Joins between reference tables, local tables, and function calls shouldn't
@ -342,6 +374,55 @@ $Q$);
f f
(1 row) (1 row)
TRUNCATE local_table;
TRUNCATE numbers;
NOTICE: executing the command locally: TRUNCATE TABLE replicate_ref_to_coordinator.numbers_xxxxx CASCADE
BEGIN;
INSERT INTO local_table VALUES (1), (2), (3), (4);
INSERT INTO numbers VALUES (1), (2), (3), (4);
NOTICE: executing the command locally: INSERT INTO replicate_ref_to_coordinator.numbers_8000001 AS citus_table_alias (a) VALUES (1), (2), (3), (4)
ALTER TABLE numbers ADD COLUMN d int;
NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (8000001, 'replicate_ref_to_coordinator', 'ALTER TABLE numbers ADD COLUMN d int;')
SELECT * FROM local_table JOIN numbers USING(a) ORDER BY a;
a | d
---------------------------------------------------------------------
1 |
2 |
3 |
4 |
(4 rows)
ROLLBACK;
BEGIN;
INSERT INTO local_table VALUES (1), (2), (3);
WITH t as (SELECT n.a, my_volatile_fn() x FROM numbers n NATURAL JOIN local_table l ORDER BY n.a, x)
SELECT a FROM t NATURAL JOIN dist ORDER BY a;
a
---------------------------------------------------------------------
(0 rows)
ROLLBACK;
BEGIN;
INSERT INTO local_table VALUES (1), (2), (3);
INSERT INTO numbers SELECT * FROM generate_series(1, 100);
NOTICE: executing the copy locally for shard xxxxx
INSERT INTO numbers SELECT * FROM numbers;
NOTICE: executing the command locally: INSERT INTO replicate_ref_to_coordinator.numbers_8000001 AS citus_table_alias (a) SELECT a FROM replicate_ref_to_coordinator.numbers_8000001 numbers
SELECT COUNT(*) FROM local_table JOIN numbers using (a);
count
---------------------------------------------------------------------
6
(1 row)
UPDATE numbers SET a = a + 1;
NOTICE: executing the command locally: UPDATE replicate_ref_to_coordinator.numbers_8000001 numbers SET a = (a OPERATOR(pg_catalog.+) 1)
SELECT COUNT(*) FROM local_table JOIN numbers using (a);
count
---------------------------------------------------------------------
4
(1 row)
ROLLBACK;
-- verify that we can drop columns from reference tables replicated to the coordinator -- verify that we can drop columns from reference tables replicated to the coordinator
-- see https://github.com/citusdata/citus/issues/3279 -- see https://github.com/citusdata/citus/issues/3279
ALTER TABLE squares DROP COLUMN b; ALTER TABLE squares DROP COLUMN b;

View File

@ -94,6 +94,70 @@ SELECT y FROM test WHERE x = 1;
SELECT count(*) FROM test t1, test t2 WHERE t1.x = t2.y; SELECT count(*) FROM test t1, test t2 WHERE t1.x = t2.y;
ROLLBACK; ROLLBACK;
CREATE TABLE ref (a int, b int);
SELECT create_reference_table('ref');
CREATE TABLE local (x int, y int);
BEGIN;
SELECT count(*) FROM test;
SELECT * FROM ref JOIN local ON (a = x);
TRUNCATE ref;
ROLLBACK;
BEGIN;
SELECT count(*) FROM test;
TRUNCATE ref;
SELECT * FROM ref JOIN local ON (a = x);
ROLLBACK;
BEGIN;
SELECT count(*) FROM test;
INSERT INTO ref VALUES (1,2);
INSERT INTO local VALUES (1,2);
SELECT * FROM ref JOIN local ON (a = x);
ROLLBACK;
set citus.enable_cte_inlining to off;
BEGIN;
SELECT count(*) FROM test;
-- we wont see the modifying cte in this query because we will use local execution and
-- in postgres we wouldn't see this modifying cte, so it is consistent with postgres.
WITH a AS (SELECT count(*) FROM test), b AS (INSERT INTO local VALUES (3,2) RETURNING *), c AS (INSERT INTO ref VALUES (3,2) RETURNING *), d AS (SELECT count(*) FROM ref JOIN local ON (a = x)) SELECT * FROM a, b, c, d ORDER BY x,y,a,b;
TRUNCATE ref;
SELECT * FROM ref JOIN local ON (a = x);
-- we wont see the modifying cte in this query because we will use local execution and
-- in postgres we wouldn't see this modifying cte, so it is consistent with postgres.
WITH a AS (SELECT count(*) FROM test), b AS (INSERT INTO local VALUES (3,2) RETURNING *), c AS (INSERT INTO ref VALUES (3,2) RETURNING *), d AS (SELECT count(*) FROM ref JOIN local ON (a = x)) SELECT * FROM a, b, c, d ORDER BY x,y,a,b;
ROLLBACK;
BEGIN;
-- we wont see the modifying cte in this query because we will use local execution and
-- in postgres we wouldn't see this modifying cte, so it is consistent with postgres.
WITH a AS (SELECT count(*) FROM test), b AS (INSERT INTO local VALUES (3,2) RETURNING *), c AS (INSERT INTO ref VALUES (3,2) RETURNING *), d AS (SELECT count(*) FROM ref JOIN local ON (a = x)) SELECT * FROM a, b, c, d ORDER BY x,y,a,b;
ROLLBACK;
BEGIN;
-- we wont see the modifying cte in this query because we will use local execution and
-- in postgres we wouldn't see this modifying cte, so it is consistent with postgres.
WITH a AS (SELECT count(*) FROM test), b AS (INSERT INTO local VALUES (3,2) RETURNING *), c AS (INSERT INTO ref SELECT *,* FROM generate_series(1,10) RETURNING *), d AS (SELECT count(*) FROM ref JOIN local ON (a = x)) SELECT * FROM a, b, c, d ORDER BY x,y,a,b;
ROLLBACK;
-- same local table reference table tests, but outside a transaction block
INSERT INTO ref VALUES (1,2);
INSERT INTO local VALUES (1,2);
SELECT * FROM ref JOIN local ON (a = x);
-- we wont see the modifying cte in this query because we will use local execution and
-- in postgres we wouldn't see this modifying cte, so it is consistent with postgres.
WITH a AS (SELECT count(*) FROM test), b AS (INSERT INTO local VALUES (3,2) RETURNING *), c AS (INSERT INTO ref VALUES (3,2) RETURNING *), d AS (SELECT count(*) FROM ref JOIN local ON (a = x)) SELECT * FROM a, b, c, d ORDER BY x,y,a,b;
RESET citus.enable_cte_inlining;
DELETE FROM test; DELETE FROM test;
DROP TABLE test; DROP TABLE test;

View File

@ -24,6 +24,13 @@ CREATE TABLE numbers(a int);
SELECT create_reference_table('numbers'); SELECT create_reference_table('numbers');
INSERT INTO numbers VALUES (20), (21); INSERT INTO numbers VALUES (20), (21);
CREATE OR REPLACE FUNCTION my_volatile_fn()
RETURNS INT AS $$
BEGIN
RETURN 1;
END; $$ language plpgsql VOLATILE;
-- INSERT ... SELECT between reference tables -- INSERT ... SELECT between reference tables
BEGIN; BEGIN;
EXPLAIN INSERT INTO squares SELECT a, a*a FROM numbers; EXPLAIN INSERT INTO squares SELECT a, a*a FROM numbers;
@ -53,12 +60,12 @@ FROM local_table lt
JOIN squares sq ON sq.a > lt.a and sq.b > 90 JOIN squares sq ON sq.a > lt.a and sq.b > 90
ORDER BY 1,2,3; ORDER BY 1,2,3;
-- error if in transaction block -- should work if in transaction block
BEGIN; BEGIN;
SELECT local_table.a, numbers.a FROM local_table NATURAL JOIN numbers ORDER BY 1; SELECT local_table.a, numbers.a FROM local_table NATURAL JOIN numbers ORDER BY 1;
ROLLBACK; ROLLBACK;
-- error if in a DO block -- should work if in a DO block
DO $$ DO $$
BEGIN BEGIN
PERFORM local_table.a, numbers.a FROM local_table NATURAL JOIN numbers; PERFORM local_table.a, numbers.a FROM local_table NATURAL JOIN numbers;
@ -86,7 +93,6 @@ SELECT local_table.a, numbers.a FROM local_table NATURAL JOIN numbers ORDER BY 1
$$ LANGUAGE sql; $$ LANGUAGE sql;
CALL test_reference_local_join_proc(); CALL test_reference_local_join_proc();
-- error if in a transaction block even if reference table is not in search path
CREATE SCHEMA s1; CREATE SCHEMA s1;
CREATE TABLE s1.ref(a int); CREATE TABLE s1.ref(a int);
SELECT create_reference_table('s1.ref'); SELECT create_reference_table('s1.ref');
@ -97,29 +103,29 @@ ROLLBACK;
BEGIN; BEGIN;
WITH t1 AS ( WITH t1 AS (
SELECT random() r, a FROM local_table SELECT my_volatile_fn() r, a FROM local_table
) SELECT count(*) FROM t1, numbers WHERE t1.a = numbers.a AND r < 0.5; ) SELECT count(*) FROM t1, numbers WHERE t1.a = numbers.a AND r < 0.5;
END; END;
BEGIN; BEGIN;
WITH t1 AS ( WITH t1 AS (
SELECT random() r, a FROM numbers SELECT my_volatile_fn() r, a FROM numbers
) SELECT count(*) FROM t1, local_table WHERE t1.a = local_table.a AND r < 0.5; ) SELECT count(*) FROM t1, local_table WHERE t1.a = local_table.a AND r < 0.5;
END; END;
BEGIN; BEGIN;
SELECT count(*) FROM local_table SELECT count(*) FROM local_table
WHERE EXISTS(SELECT random() FROM numbers WHERE local_table.a = numbers.a); WHERE EXISTS(SELECT my_volatile_fn() FROM numbers WHERE local_table.a = numbers.a);
END; END;
BEGIN; BEGIN;
SELECT count(*) FROM numbers SELECT count(*) FROM numbers
WHERE EXISTS(SELECT random() FROM local_table WHERE local_table.a = numbers.a); WHERE EXISTS(SELECT my_volatile_fn() FROM local_table WHERE local_table.a = numbers.a);
END; END;
DROP SCHEMA s1 CASCADE; DROP SCHEMA s1 CASCADE;
-- error if inside a SQL UDF call -- not error if inside a SQL UDF call
CREATE or replace FUNCTION test_reference_local_join_func() CREATE or replace FUNCTION test_reference_local_join_func()
RETURNS SETOF RECORD AS $$ RETURNS SETOF RECORD AS $$
SET LOCAL citus.enable_local_execution to false; SET LOCAL citus.enable_local_execution to false;
@ -133,12 +139,12 @@ SELECT test_reference_local_join_func();
WITH ins AS (INSERT INTO numbers VALUES (1) RETURNING *) WITH ins AS (INSERT INTO numbers VALUES (1) RETURNING *)
SELECT * FROM numbers, local_table; SELECT * FROM numbers, local_table;
WITH t AS (SELECT *, random() x FROM numbers FOR UPDATE) WITH t AS (SELECT *, my_volatile_fn() x FROM numbers FOR UPDATE)
SELECT * FROM numbers, local_table SELECT * FROM numbers, local_table
WHERE EXISTS (SELECT * FROM t WHERE t.x = numbers.a); WHERE EXISTS (SELECT * FROM t WHERE t.x = numbers.a);
-- but this should be fine -- but this should be fine
WITH t AS (SELECT *, random() x FROM numbers) WITH t AS (SELECT *, my_volatile_fn() x FROM numbers)
SELECT * FROM numbers, local_table SELECT * FROM numbers, local_table
WHERE EXISTS (SELECT * FROM t WHERE t.x = numbers.a); WHERE EXISTS (SELECT * FROM t WHERE t.x = numbers.a);
@ -147,15 +153,15 @@ CREATE TABLE dist(a int);
SELECT create_distributed_table('dist', 'a'); SELECT create_distributed_table('dist', 'a');
INSERT INTO dist VALUES (20),(30); INSERT INTO dist VALUES (20),(30);
WITH t AS (SELECT *, random() x FROM dist) WITH t AS (SELECT *, my_volatile_fn() x FROM dist)
SELECT * FROM numbers, local_table SELECT * FROM numbers, local_table
WHERE EXISTS (SELECT * FROM t WHERE t.x = numbers.a); WHERE EXISTS (SELECT * FROM t WHERE t.x = numbers.a);
-- test CTE being reference/local join for distributed query -- test CTE being reference/local join for distributed query
WITH t as (SELECT n.a, random() x FROM numbers n NATURAL JOIN local_table l) WITH t as (SELECT n.a, my_volatile_fn() x FROM numbers n NATURAL JOIN local_table l)
SELECT a FROM t NATURAL JOIN dist; SELECT a FROM t NATURAL JOIN dist;
-- error if FOR UPDATE/FOR SHARE -- shouldn't error if FOR UPDATE/FOR SHARE
SELECT local_table.a, numbers.a FROM local_table NATURAL JOIN numbers FOR SHARE; SELECT local_table.a, numbers.a FROM local_table NATURAL JOIN numbers FOR SHARE;
SELECT local_table.a, numbers.a FROM local_table NATURAL JOIN numbers FOR UPDATE; SELECT local_table.a, numbers.a FROM local_table NATURAL JOIN numbers FOR UPDATE;
@ -212,6 +218,34 @@ EXPLAIN (COSTS FALSE)
SELECT a.a FROM local_table a, numbers b WHERE a.a = b.a ORDER BY abs(a.a); SELECT a.a FROM local_table a, numbers b WHERE a.a = b.a ORDER BY abs(a.a);
$Q$); $Q$);
TRUNCATE local_table;
TRUNCATE numbers;
BEGIN;
INSERT INTO local_table VALUES (1), (2), (3), (4);
INSERT INTO numbers VALUES (1), (2), (3), (4);
ALTER TABLE numbers ADD COLUMN d int;
SELECT * FROM local_table JOIN numbers USING(a) ORDER BY a;
ROLLBACK;
BEGIN;
INSERT INTO local_table VALUES (1), (2), (3);
WITH t as (SELECT n.a, my_volatile_fn() x FROM numbers n NATURAL JOIN local_table l ORDER BY n.a, x)
SELECT a FROM t NATURAL JOIN dist ORDER BY a;
ROLLBACK;
BEGIN;
INSERT INTO local_table VALUES (1), (2), (3);
INSERT INTO numbers SELECT * FROM generate_series(1, 100);
INSERT INTO numbers SELECT * FROM numbers;
SELECT COUNT(*) FROM local_table JOIN numbers using (a);
UPDATE numbers SET a = a + 1;
SELECT COUNT(*) FROM local_table JOIN numbers using (a);
ROLLBACK;
-- verify that we can drop columns from reference tables replicated to the coordinator -- verify that we can drop columns from reference tables replicated to the coordinator
-- see https://github.com/citusdata/citus/issues/3279 -- see https://github.com/citusdata/citus/issues/3279
ALTER TABLE squares DROP COLUMN b; ALTER TABLE squares DROP COLUMN b;