Merge pull request #3317 from citusdata/cherrypick-3280

Cherrypick 3280
release-9.1
Hadi Moshayedi 2019-12-17 15:30:05 -08:00 committed by GitHub
commit c88648cbe0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 280 additions and 14 deletions

View File

@ -14,6 +14,7 @@
#include "access/xact.h" #include "access/xact.h"
#include "catalog/dependency.h" #include "catalog/dependency.h"
#include "catalog/pg_class.h"
#include "catalog/namespace.h" #include "catalog/namespace.h"
#include "distributed/citus_custom_scan.h" #include "distributed/citus_custom_scan.h"
#include "distributed/commands/multi_copy.h" #include "distributed/commands/multi_copy.h"
@ -585,7 +586,7 @@ IsLocalReferenceTableJoinPlan(PlannedStmt *plan)
{ {
bool hasReferenceTable = false; bool hasReferenceTable = false;
bool hasLocalTable = false; bool hasLocalTable = false;
ListCell *oidCell = NULL; ListCell *rangeTableCell = NULL;
bool hasReferenceTableReplica = false; bool hasReferenceTableReplica = false;
/* /*
@ -622,12 +623,44 @@ IsLocalReferenceTableJoinPlan(PlannedStmt *plan)
return false; return false;
} }
foreach(oidCell, plan->relationOids) /*
* plan->rtable contains the flattened RTE lists of the plan tree, which
* includes rtes in subqueries, CTEs, ...
*
* It doesn't contain optimized away table accesses (due to join optimization),
* which is fine for our purpose.
*/
foreach(rangeTableCell, plan->rtable)
{ {
Oid relationId = lfirst_oid(oidCell); RangeTblEntry *rangeTableEntry = (RangeTblEntry *) lfirst(rangeTableCell);
bool onlySearchPath = false; bool onlySearchPath = false;
if (RelationIsAKnownShard(relationId, onlySearchPath)) /*
* Planner's IsLocalReferenceTableJoin() doesn't allow planning functions
* in FROM clause locally. Early exit. We cannot use Assert() here since
* all non-Citus plans might pass through these checks.
*/
if (rangeTableEntry->rtekind == RTE_FUNCTION)
{
return false;
}
if (rangeTableEntry->rtekind != RTE_RELATION)
{
continue;
}
/*
* Planner's IsLocalReferenceTableJoin() doesn't allow planning reference
* table and view join locally. Early exit. We cannot use Assert() here
* since all non-Citus plans might pass through these checks.
*/
if (rangeTableEntry->relkind == RELKIND_VIEW)
{
return false;
}
if (RelationIsAKnownShard(rangeTableEntry->relid, onlySearchPath))
{ {
/* /*
* We don't allow joining non-reference distributed tables, so we * We don't allow joining non-reference distributed tables, so we

View File

@ -1872,6 +1872,20 @@ IsLocalReferenceTableJoin(Query *parse, List *rangeTableList)
{ {
RangeTblEntry *rangeTableEntry = (RangeTblEntry *) lfirst(rangeTableCell); RangeTblEntry *rangeTableEntry = (RangeTblEntry *) lfirst(rangeTableCell);
/*
* Don't plan joins involving functions locally since we are not sure if
* they do distributed accesses or not, and defaulting to local planning
* might break transactional semantics.
*
* For example, Access to the reference table in the function might go
* over a connection, but access to the same reference table outside
* the function will go over the current backend. The snapshot for the
* connection in the function is taken after the statement snapshot,
* so they can see two different views of data.
*
* Looking at gram.y, RTE_TABLEFUNC is used only for XMLTABLE() which
* is okay to be planned locally, so allowing that.
*/
if (rangeTableEntry->rtekind == RTE_FUNCTION) if (rangeTableEntry->rtekind == RTE_FUNCTION)
{ {
return false; return false;
@ -1882,6 +1896,15 @@ IsLocalReferenceTableJoin(Query *parse, List *rangeTableList)
continue; continue;
} }
/*
* We only allow local join for the relation kinds for which we can
* determine deterministcly that access to hem are local or distributed.
* For this reason, we don't allow non-materialized views.
*/
if (rangeTableEntry->relkind == RELKIND_VIEW)
{
return false;
}
if (!IsDistributedTable(rangeTableEntry->relid)) if (!IsDistributedTable(rangeTableEntry->relid))
{ {

View File

@ -1,6 +1,6 @@
-- File to create functions and helpers needed for subsequent tests -- File to CREATE FUNCTIONs and helpers needed for subsequent tests
-- create a helper function to create objects on each node -- create a helper function to create objects on each node
CREATE FUNCTION run_command_on_master_and_workers(p_sql text) CREATE OR REPLACE FUNCTION run_command_on_master_and_workers(p_sql text)
RETURNS void LANGUAGE plpgsql AS $$ RETURNS void LANGUAGE plpgsql AS $$
BEGIN BEGIN
EXECUTE p_sql; EXECUTE p_sql;
@ -111,7 +111,7 @@ $desc_views$
(1 row) (1 row)
-- Create a function to make sure that queries returning the same result -- Create a function to make sure that queries returning the same result
CREATE FUNCTION raise_failed_execution(query text) RETURNS void AS $$ CREATE OR REPLACE FUNCTION raise_failed_execution(query text) RETURNS void AS $$
BEGIN BEGIN
EXECUTE query; EXECUTE query;
EXCEPTION WHEN OTHERS THEN EXCEPTION WHEN OTHERS THEN
@ -133,8 +133,22 @@ BEGIN
END LOOP; END LOOP;
RETURN; RETURN;
END; $$ language plpgsql; END; $$ language plpgsql;
-- Is a distributed plan?
CREATE OR REPLACE FUNCTION plan_is_distributed(explain_commmand text)
RETURNS BOOLEAN AS $$
DECLARE
query_plan TEXT;
BEGIN
FOR query_plan IN execute explain_commmand LOOP
IF query_plan LIKE '%Task Count:%'
THEN
RETURN TRUE;
END IF;
END LOOP;
RETURN FALSE;
END; $$ language plpgsql;
-- helper function to quickly run SQL on the whole cluster -- helper function to quickly run SQL on the whole cluster
CREATE FUNCTION run_command_on_coordinator_and_workers(p_sql text) CREATE OR REPLACE FUNCTION run_command_on_coordinator_and_workers(p_sql text)
RETURNS void LANGUAGE plpgsql AS $$ RETURNS void LANGUAGE plpgsql AS $$
BEGIN BEGIN
EXECUTE p_sql; EXECUTE p_sql;
@ -142,7 +156,7 @@ BEGIN
END;$$; END;$$;
-- 1. Marks the given procedure as colocated with the given table. -- 1. Marks the given procedure as colocated with the given table.
-- 2. Marks the argument index with which we route the procedure. -- 2. Marks the argument index with which we route the procedure.
CREATE FUNCTION colocate_proc_with_table(procname text, tablerelid regclass, argument_index int) CREATE OR REPLACE FUNCTION colocate_proc_with_table(procname text, tablerelid regclass, argument_index int)
RETURNS void LANGUAGE plpgsql AS $$ RETURNS void LANGUAGE plpgsql AS $$
BEGIN BEGIN
update citus.pg_dist_object update citus.pg_dist_object

View File

@ -131,6 +131,28 @@ BEGIN;
SELECT local_table.a, r.a FROM local_table NATURAL JOIN s1.ref r ORDER BY 1; SELECT local_table.a, r.a FROM local_table NATURAL JOIN s1.ref r ORDER BY 1;
ERROR: cannot join local tables and reference tables in a transaction block ERROR: cannot join local tables and reference tables in a transaction block
ROLLBACK; ROLLBACK;
BEGIN;
WITH t1 AS (
SELECT random() r, a FROM local_table
) SELECT count(*) FROM t1, numbers WHERE t1.a = numbers.a AND r < 0.5;
ERROR: cannot join local tables and reference tables in a transaction block
END;
BEGIN;
WITH t1 AS (
SELECT random() r, a FROM numbers
) SELECT count(*) FROM t1, local_table WHERE t1.a = local_table.a AND r < 0.5;
ERROR: cannot join local tables and reference tables in a transaction block
END;
BEGIN;
SELECT count(*) FROM local_table
WHERE EXISTS(SELECT random() FROM numbers WHERE local_table.a = numbers.a);
ERROR: cannot join local tables and reference tables in a transaction block
END;
BEGIN;
SELECT count(*) FROM numbers
WHERE EXISTS(SELECT random() FROM local_table WHERE local_table.a = numbers.a);
ERROR: cannot join local tables and reference tables in a transaction block
END;
DROP SCHEMA s1 CASCADE; DROP SCHEMA s1 CASCADE;
NOTICE: drop cascades to 2 other objects NOTICE: drop cascades to 2 other objects
DETAIL: drop cascades to table s1.ref DETAIL: drop cascades to table s1.ref
@ -166,6 +188,85 @@ HINT: Consider using an equality filter on the distributed table's partition co
SELECT local_table.a, numbers.a FROM local_table NATURAL JOIN numbers FOR UPDATE; SELECT local_table.a, numbers.a FROM local_table NATURAL JOIN numbers FOR UPDATE;
ERROR: could not run distributed query with FOR UPDATE/SHARE commands ERROR: could not run distributed query with FOR UPDATE/SHARE commands
HINT: Consider using an equality filter on the distributed table's partition column. HINT: Consider using an equality filter on the distributed table's partition column.
--
-- Joins between reference tables and views shouldn't be planned locally.
--
CREATE VIEW numbers_v AS SELECT * FROM numbers WHERE a=1;
SELECT public.coordinator_plan($Q$
EXPLAIN (COSTS FALSE)
SELECT * FROM squares JOIN numbers_v ON squares.a = numbers_v.a;
$Q$);
coordinator_plan
------------------------------
Custom Scan (Citus Adaptive)
Task Count: 1
(2 rows)
CREATE VIEW local_table_v AS SELECT * FROM local_table WHERE a BETWEEN 1 AND 10;
SELECT public.coordinator_plan($Q$
EXPLAIN (COSTS FALSE)
SELECT * FROM squares JOIN local_table_v ON squares.a = local_table_v.a;
$Q$);
coordinator_plan
------------------------------------------------
Custom Scan (Citus Adaptive)
-> Distributed Subplan 19_1
-> Seq Scan on local_table
Filter: ((a >= 1) AND (a <= 10))
Task Count: 1
(5 rows)
DROP VIEW numbers_v, local_table_v;
--
-- Joins between reference tables and materialized views are allowed to
-- be planned locally.
--
CREATE MATERIALIZED VIEW numbers_v AS SELECT * FROM numbers WHERE a BETWEEN 1 AND 10;
LOG: executing the command locally: SELECT a FROM replicate_ref_to_coordinator.numbers_8000001 numbers WHERE ((a OPERATOR(pg_catalog.>=) 1) AND (a OPERATOR(pg_catalog.<=) 10))
REFRESH MATERIALIZED VIEW numbers_v;
SELECT public.plan_is_distributed($Q$
EXPLAIN (COSTS FALSE)
SELECT * FROM squares JOIN numbers_v ON squares.a = numbers_v.a;
$Q$);
plan_is_distributed
---------------------
f
(1 row)
BEGIN;
SELECT * FROM squares JOIN numbers_v ON squares.a = numbers_v.a;
ERROR: cannot join local tables and reference tables in a transaction block
END;
--
-- Joins between reference tables, local tables, and function calls shouldn't
-- be planned locally.
--
SELECT count(*)
FROM local_table a, numbers b, generate_series(1, 10) c
WHERE a.a = b.a AND a.a = c;
ERROR: relation local_table is not distributed
-- but it should be okay if the function call is not a data source
SELECT public.plan_is_distributed($Q$
EXPLAIN (COSTS FALSE)
SELECT abs(a.a) FROM local_table a, numbers b WHERE a.a = b.a;
$Q$);
plan_is_distributed
---------------------
f
(1 row)
SELECT public.plan_is_distributed($Q$
EXPLAIN (COSTS FALSE)
SELECT a.a FROM local_table a, numbers b WHERE a.a = b.a ORDER BY abs(a.a);
$Q$);
plan_is_distributed
---------------------
f
(1 row)
-- verify that we can drop columns from reference tables replicated to the coordinator
-- see https://github.com/citusdata/citus/issues/3279
ALTER TABLE squares DROP COLUMN b;
-- clean-up -- clean-up
SET client_min_messages TO ERROR; SET client_min_messages TO ERROR;
DROP SCHEMA replicate_ref_to_coordinator CASCADE; DROP SCHEMA replicate_ref_to_coordinator CASCADE;

View File

@ -1,7 +1,7 @@
-- File to create functions and helpers needed for subsequent tests -- File to CREATE FUNCTIONs and helpers needed for subsequent tests
-- create a helper function to create objects on each node -- create a helper function to create objects on each node
CREATE FUNCTION run_command_on_master_and_workers(p_sql text) CREATE OR REPLACE FUNCTION run_command_on_master_and_workers(p_sql text)
RETURNS void LANGUAGE plpgsql AS $$ RETURNS void LANGUAGE plpgsql AS $$
BEGIN BEGIN
EXECUTE p_sql; EXECUTE p_sql;
@ -109,7 +109,7 @@ $desc_views$
); );
-- Create a function to make sure that queries returning the same result -- Create a function to make sure that queries returning the same result
CREATE FUNCTION raise_failed_execution(query text) RETURNS void AS $$ CREATE OR REPLACE FUNCTION raise_failed_execution(query text) RETURNS void AS $$
BEGIN BEGIN
EXECUTE query; EXECUTE query;
EXCEPTION WHEN OTHERS THEN EXCEPTION WHEN OTHERS THEN
@ -133,8 +133,23 @@ BEGIN
RETURN; RETURN;
END; $$ language plpgsql; END; $$ language plpgsql;
-- Is a distributed plan?
CREATE OR REPLACE FUNCTION plan_is_distributed(explain_commmand text)
RETURNS BOOLEAN AS $$
DECLARE
query_plan TEXT;
BEGIN
FOR query_plan IN execute explain_commmand LOOP
IF query_plan LIKE '%Task Count:%'
THEN
RETURN TRUE;
END IF;
END LOOP;
RETURN FALSE;
END; $$ language plpgsql;
-- helper function to quickly run SQL on the whole cluster -- helper function to quickly run SQL on the whole cluster
CREATE FUNCTION run_command_on_coordinator_and_workers(p_sql text) CREATE OR REPLACE FUNCTION run_command_on_coordinator_and_workers(p_sql text)
RETURNS void LANGUAGE plpgsql AS $$ RETURNS void LANGUAGE plpgsql AS $$
BEGIN BEGIN
EXECUTE p_sql; EXECUTE p_sql;
@ -143,7 +158,7 @@ END;$$;
-- 1. Marks the given procedure as colocated with the given table. -- 1. Marks the given procedure as colocated with the given table.
-- 2. Marks the argument index with which we route the procedure. -- 2. Marks the argument index with which we route the procedure.
CREATE FUNCTION colocate_proc_with_table(procname text, tablerelid regclass, argument_index int) CREATE OR REPLACE FUNCTION colocate_proc_with_table(procname text, tablerelid regclass, argument_index int)
RETURNS void LANGUAGE plpgsql AS $$ RETURNS void LANGUAGE plpgsql AS $$
BEGIN BEGIN
update citus.pg_dist_object update citus.pg_dist_object

View File

@ -62,6 +62,28 @@ BEGIN;
SELECT local_table.a, r.a FROM local_table NATURAL JOIN s1.ref r ORDER BY 1; SELECT local_table.a, r.a FROM local_table NATURAL JOIN s1.ref r ORDER BY 1;
ROLLBACK; ROLLBACK;
BEGIN;
WITH t1 AS (
SELECT random() r, a FROM local_table
) SELECT count(*) FROM t1, numbers WHERE t1.a = numbers.a AND r < 0.5;
END;
BEGIN;
WITH t1 AS (
SELECT random() r, a FROM numbers
) SELECT count(*) FROM t1, local_table WHERE t1.a = local_table.a AND r < 0.5;
END;
BEGIN;
SELECT count(*) FROM local_table
WHERE EXISTS(SELECT random() FROM numbers WHERE local_table.a = numbers.a);
END;
BEGIN;
SELECT count(*) FROM numbers
WHERE EXISTS(SELECT random() FROM local_table WHERE local_table.a = numbers.a);
END;
DROP SCHEMA s1 CASCADE; DROP SCHEMA s1 CASCADE;
-- shouldn't plan locally if modifications happen in CTEs, ... -- shouldn't plan locally if modifications happen in CTEs, ...
@ -83,6 +105,64 @@ WITH t AS (SELECT *, random() x FROM dist) SELECT * FROM numbers, local_table
SELECT local_table.a, numbers.a FROM local_table NATURAL JOIN numbers FOR SHARE; SELECT local_table.a, numbers.a FROM local_table NATURAL JOIN numbers FOR SHARE;
SELECT local_table.a, numbers.a FROM local_table NATURAL JOIN numbers FOR UPDATE; SELECT local_table.a, numbers.a FROM local_table NATURAL JOIN numbers FOR UPDATE;
--
-- Joins between reference tables and views shouldn't be planned locally.
--
CREATE VIEW numbers_v AS SELECT * FROM numbers WHERE a=1;
SELECT public.coordinator_plan($Q$
EXPLAIN (COSTS FALSE)
SELECT * FROM squares JOIN numbers_v ON squares.a = numbers_v.a;
$Q$);
CREATE VIEW local_table_v AS SELECT * FROM local_table WHERE a BETWEEN 1 AND 10;
SELECT public.coordinator_plan($Q$
EXPLAIN (COSTS FALSE)
SELECT * FROM squares JOIN local_table_v ON squares.a = local_table_v.a;
$Q$);
DROP VIEW numbers_v, local_table_v;
--
-- Joins between reference tables and materialized views are allowed to
-- be planned locally.
--
CREATE MATERIALIZED VIEW numbers_v AS SELECT * FROM numbers WHERE a BETWEEN 1 AND 10;
REFRESH MATERIALIZED VIEW numbers_v;
SELECT public.plan_is_distributed($Q$
EXPLAIN (COSTS FALSE)
SELECT * FROM squares JOIN numbers_v ON squares.a = numbers_v.a;
$Q$);
BEGIN;
SELECT * FROM squares JOIN numbers_v ON squares.a = numbers_v.a;
END;
--
-- Joins between reference tables, local tables, and function calls shouldn't
-- be planned locally.
--
SELECT count(*)
FROM local_table a, numbers b, generate_series(1, 10) c
WHERE a.a = b.a AND a.a = c;
-- but it should be okay if the function call is not a data source
SELECT public.plan_is_distributed($Q$
EXPLAIN (COSTS FALSE)
SELECT abs(a.a) FROM local_table a, numbers b WHERE a.a = b.a;
$Q$);
SELECT public.plan_is_distributed($Q$
EXPLAIN (COSTS FALSE)
SELECT a.a FROM local_table a, numbers b WHERE a.a = b.a ORDER BY abs(a.a);
$Q$);
-- verify that we can drop columns from reference tables replicated to the coordinator
-- see https://github.com/citusdata/citus/issues/3279
ALTER TABLE squares DROP COLUMN b;
-- clean-up -- clean-up
SET client_min_messages TO ERROR; SET client_min_messages TO ERROR;
DROP SCHEMA replicate_ref_to_coordinator CASCADE; DROP SCHEMA replicate_ref_to_coordinator CASCADE;