mirror of https://github.com/citusdata/citus.git
Fix relkind checks in planner for relkinds other than RELKIND_RELATION (#4294)
We were qualifying relations with relkind != RELKIND_RELATION as non-relations due to the strict checks around RangeTblEntry->relkind in planner.pull/4299/head
parent
25de5b1290
commit
cc8be422ce
|
@ -2296,11 +2296,26 @@ GetRTEListProperties(List *rangeTableList)
|
|||
RangeTblEntry *rangeTableEntry = NULL;
|
||||
foreach_ptr(rangeTableEntry, rangeTableList)
|
||||
{
|
||||
if (!(rangeTableEntry->rtekind == RTE_RELATION &&
|
||||
rangeTableEntry->relkind == RELKIND_RELATION))
|
||||
if (rangeTableEntry->rtekind != RTE_RELATION)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
else if (rangeTableEntry->relkind == RELKIND_VIEW)
|
||||
{
|
||||
/*
|
||||
* Skip over views, distributed tables within (regular) views are
|
||||
* already in rangeTableList.
|
||||
*/
|
||||
continue;
|
||||
}
|
||||
else if (rangeTableEntry->relkind == RELKIND_MATVIEW)
|
||||
{
|
||||
/*
|
||||
* Skip over materialized views, here we should not consider
|
||||
* materialized views as local tables.
|
||||
*/
|
||||
continue;
|
||||
}
|
||||
|
||||
Oid relationId = rangeTableEntry->relid;
|
||||
CitusTableCacheEntry *cacheEntry = LookupCitusTableCacheEntry(relationId);
|
||||
|
|
|
@ -312,8 +312,7 @@ NodeTryGetRteRelid(Node *node)
|
|||
|
||||
RangeTblEntry *rangeTableEntry = (RangeTblEntry *) node;
|
||||
|
||||
if (!(rangeTableEntry->rtekind == RTE_RELATION &&
|
||||
rangeTableEntry->relkind == RELKIND_RELATION))
|
||||
if (rangeTableEntry->rtekind != RTE_RELATION)
|
||||
{
|
||||
return InvalidOid;
|
||||
}
|
||||
|
|
|
@ -518,14 +518,13 @@ static DeferredErrorMessage *
|
|||
ModifyPartialQuerySupported(Query *queryTree, bool multiShardQuery,
|
||||
Oid *distributedTableIdOutput)
|
||||
{
|
||||
DeferredErrorMessage *deferredError =
|
||||
DeferErrorIfUnsupportedModifyQueryWithLocalTable(queryTree);
|
||||
DeferredErrorMessage *deferredError = DeferErrorIfModifyView(queryTree);
|
||||
if (deferredError != NULL)
|
||||
{
|
||||
return deferredError;
|
||||
}
|
||||
|
||||
deferredError = DeferErrorIfModifyView(queryTree);
|
||||
deferredError = DeferErrorIfUnsupportedModifyQueryWithLocalTable(queryTree);
|
||||
if (deferredError != NULL)
|
||||
{
|
||||
return deferredError;
|
||||
|
|
|
@ -0,0 +1,707 @@
|
|||
\SET VERBOSITY terse
|
||||
invalid command \SET
|
||||
SET citus.next_shard_id TO 1513000;
|
||||
SET citus.shard_replication_factor TO 1;
|
||||
CREATE SCHEMA mixed_relkind_tests;
|
||||
SET search_path TO mixed_relkind_tests;
|
||||
-- ensure that coordinator is added to pg_dist_node
|
||||
SET client_min_messages TO ERROR;
|
||||
SELECT 1 FROM master_add_node('localhost', :master_port, groupId => 0);
|
||||
?column?
|
||||
---------------------------------------------------------------------
|
||||
1
|
||||
(1 row)
|
||||
|
||||
RESET client_min_messages;
|
||||
-- make results consistent
|
||||
SET citus.enable_cte_inlining TO OFF;
|
||||
-- create test tables
|
||||
CREATE TABLE postgres_local_table (a int);
|
||||
CREATE TABLE partitioned_postgres_local_table(a int) PARTITION BY RANGE(a);
|
||||
CREATE TABLE partitioned_postgres_local_table_1 PARTITION OF partitioned_postgres_local_table FOR VALUES FROM (0) TO (3);
|
||||
CREATE TABLE partitioned_postgres_local_table_2 PARTITION OF partitioned_postgres_local_table FOR VALUES FROM (3) TO (1000);
|
||||
CREATE TABLE reference_table(a int);
|
||||
SELECT create_reference_table('reference_table');
|
||||
create_reference_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
CREATE VIEW view_on_ref AS SELECT * FROM reference_table;
|
||||
CREATE TABLE citus_local_table(a int);
|
||||
SELECT create_citus_local_table('citus_local_table');
|
||||
create_citus_local_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
CREATE VIEW view_on_citus_local AS SELECT * FROM citus_local_table;
|
||||
CREATE UNLOGGED TABLE unlogged_distributed_table(a int, b int);
|
||||
SELECT create_distributed_table('unlogged_distributed_table', 'a');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
CREATE TABLE distributed_table(a int);
|
||||
SELECT create_distributed_table('distributed_table', 'a');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
CREATE VIEW view_on_dist AS SELECT * FROM distributed_table;
|
||||
CREATE MATERIALIZED VIEW mat_view_on_dist AS SELECT * FROM distributed_table;
|
||||
CREATE TABLE partitioned_distributed_table(a int, b int) PARTITION BY RANGE(a);
|
||||
CREATE TABLE partitioned_distributed_table_1 PARTITION OF partitioned_distributed_table FOR VALUES FROM (0) TO (3);
|
||||
CREATE TABLE partitioned_distributed_table_2 PARTITION OF partitioned_distributed_table FOR VALUES FROM (3) TO (1000);
|
||||
SELECT create_distributed_table('partitioned_distributed_table', 'a');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
CREATE VIEW view_on_part_dist AS SELECT * FROM partitioned_distributed_table;
|
||||
CREATE MATERIALIZED VIEW mat_view_on_part_dist AS SELECT * FROM partitioned_distributed_table;
|
||||
CREATE FOREIGN TABLE foreign_distributed_table (a int, b int) SERVER fake_fdw_server;
|
||||
SELECT create_distributed_table('foreign_distributed_table', 'a');
|
||||
NOTICE: foreign-data wrapper "fake_fdw" does not have an extension defined
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- and insert some data
|
||||
INSERT INTO postgres_local_table SELECT * FROM generate_series(0, 5);
|
||||
INSERT INTO partitioned_postgres_local_table SELECT * FROM generate_series(0, 5);
|
||||
INSERT INTO reference_table SELECT * FROM generate_series(0, 5);
|
||||
INSERT INTO citus_local_table SELECT * FROM generate_series(0, 5);
|
||||
INSERT INTO unlogged_distributed_table SELECT a,a+1 FROM generate_series(0, 5) AS a;
|
||||
INSERT INTO distributed_table SELECT * FROM generate_series(0, 5);
|
||||
INSERT INTO partitioned_distributed_table SELECT a,a+1 FROM generate_series(0, 5) AS a;
|
||||
-- should work
|
||||
SELECT * FROM partitioned_distributed_table UNION SELECT 1,1 ORDER BY 1,2;
|
||||
a | b
|
||||
---------------------------------------------------------------------
|
||||
0 | 1
|
||||
1 | 1
|
||||
1 | 2
|
||||
2 | 3
|
||||
3 | 4
|
||||
4 | 5
|
||||
5 | 6
|
||||
(7 rows)
|
||||
|
||||
SELECT * FROM partitioned_distributed_table UNION SELECT 1, * FROM postgres_local_table ORDER BY 1,2;
|
||||
a | b
|
||||
---------------------------------------------------------------------
|
||||
0 | 1
|
||||
1 | 0
|
||||
1 | 1
|
||||
1 | 2
|
||||
1 | 3
|
||||
1 | 4
|
||||
1 | 5
|
||||
2 | 3
|
||||
3 | 4
|
||||
4 | 5
|
||||
5 | 6
|
||||
(11 rows)
|
||||
|
||||
SELECT * FROM partitioned_distributed_table UNION SELECT * FROM unlogged_distributed_table ORDER BY 1,2;
|
||||
a | b
|
||||
---------------------------------------------------------------------
|
||||
0 | 1
|
||||
1 | 2
|
||||
2 | 3
|
||||
3 | 4
|
||||
4 | 5
|
||||
5 | 6
|
||||
(6 rows)
|
||||
|
||||
SELECT *, 1 FROM postgres_local_table UNION SELECT * FROM unlogged_distributed_table ORDER BY 1,2;
|
||||
a | ?column?
|
||||
---------------------------------------------------------------------
|
||||
0 | 1
|
||||
1 | 1
|
||||
1 | 2
|
||||
2 | 1
|
||||
2 | 3
|
||||
3 | 1
|
||||
3 | 4
|
||||
4 | 1
|
||||
4 | 5
|
||||
5 | 1
|
||||
5 | 6
|
||||
(11 rows)
|
||||
|
||||
SELECT * FROM unlogged_distributed_table UNION SELECT 1,1 ORDER BY 1,2;
|
||||
a | b
|
||||
---------------------------------------------------------------------
|
||||
0 | 1
|
||||
1 | 1
|
||||
1 | 2
|
||||
2 | 3
|
||||
3 | 4
|
||||
4 | 5
|
||||
5 | 6
|
||||
(7 rows)
|
||||
|
||||
SELECT * from foreign_distributed_table UNION SELECT 1,1 ORDER BY 1,2;
|
||||
a | b
|
||||
---------------------------------------------------------------------
|
||||
1 | 1
|
||||
(1 row)
|
||||
|
||||
SELECT 1 UNION SELECT * FROM citus_local_table ORDER BY 1;
|
||||
?column?
|
||||
---------------------------------------------------------------------
|
||||
0
|
||||
1
|
||||
2
|
||||
3
|
||||
4
|
||||
5
|
||||
(6 rows)
|
||||
|
||||
SELECT * FROM view_on_part_dist UNION SELECT 1,1 ORDER BY 1,2;
|
||||
a | b
|
||||
---------------------------------------------------------------------
|
||||
0 | 1
|
||||
1 | 1
|
||||
1 | 2
|
||||
2 | 3
|
||||
3 | 4
|
||||
4 | 5
|
||||
5 | 6
|
||||
(7 rows)
|
||||
|
||||
SELECT * FROM mat_view_on_part_dist UNION SELECT 1,1 ORDER BY 1,2;
|
||||
a | b
|
||||
---------------------------------------------------------------------
|
||||
1 | 1
|
||||
(1 row)
|
||||
|
||||
SELECT * FROM view_on_citus_local UNION SELECT 1 ORDER BY 1;
|
||||
a
|
||||
---------------------------------------------------------------------
|
||||
0
|
||||
1
|
||||
2
|
||||
3
|
||||
4
|
||||
5
|
||||
(6 rows)
|
||||
|
||||
SELECT * FROM view_on_dist UNION SELECT 1 ORDER BY 1;
|
||||
a
|
||||
---------------------------------------------------------------------
|
||||
0
|
||||
1
|
||||
2
|
||||
3
|
||||
4
|
||||
5
|
||||
(6 rows)
|
||||
|
||||
SELECT * FROM mat_view_on_dist UNION SELECT 1 ORDER BY 1;
|
||||
a
|
||||
---------------------------------------------------------------------
|
||||
1
|
||||
(1 row)
|
||||
|
||||
SET client_min_messages TO DEBUG1;
|
||||
-- can push down the union in subquery
|
||||
SELECT * FROM (SELECT * FROM partitioned_distributed_table UNION SELECT * FROM partitioned_distributed_table) AS foo;
|
||||
a | b
|
||||
---------------------------------------------------------------------
|
||||
1 | 2
|
||||
5 | 6
|
||||
4 | 5
|
||||
3 | 4
|
||||
0 | 1
|
||||
2 | 3
|
||||
(6 rows)
|
||||
|
||||
-- cannot push down the subquery, should evaluate subquery by creating a subplan
|
||||
SELECT COUNT(*) FROM (SELECT b, random() FROM partitioned_distributed_table GROUP BY b) AS foo;
|
||||
DEBUG: generating subplan XXX_1 for subquery SELECT b, random() AS random FROM mixed_relkind_tests.partitioned_distributed_table GROUP BY b
|
||||
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (SELECT intermediate_result.b, intermediate_result.random FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(b integer, random double precision)) foo
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
6
|
||||
(1 row)
|
||||
|
||||
SELECT * FROM partitioned_distributed_table WHERE b IN (SELECT a FROM postgres_local_table) ORDER BY 1,2;
|
||||
DEBUG: generating subplan XXX_1 for subquery SELECT a FROM mixed_relkind_tests.postgres_local_table
|
||||
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT a, b FROM mixed_relkind_tests.partitioned_distributed_table WHERE (b OPERATOR(pg_catalog.=) ANY (SELECT intermediate_result.a FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer))) ORDER BY a, b
|
||||
a | b
|
||||
---------------------------------------------------------------------
|
||||
0 | 1
|
||||
1 | 2
|
||||
2 | 3
|
||||
3 | 4
|
||||
4 | 5
|
||||
(5 rows)
|
||||
|
||||
-- can push down the subquery
|
||||
SELECT * FROM partitioned_distributed_table WHERE a IN (SELECT a FROM distributed_table) ORDER BY 1,2;
|
||||
a | b
|
||||
---------------------------------------------------------------------
|
||||
0 | 1
|
||||
1 | 2
|
||||
2 | 3
|
||||
3 | 4
|
||||
4 | 5
|
||||
5 | 6
|
||||
(6 rows)
|
||||
|
||||
SELECT * FROM partitioned_distributed_table WHERE a IN (SELECT a FROM view_on_part_dist) ORDER BY 1,2;
|
||||
a | b
|
||||
---------------------------------------------------------------------
|
||||
0 | 1
|
||||
1 | 2
|
||||
2 | 3
|
||||
3 | 4
|
||||
4 | 5
|
||||
5 | 6
|
||||
(6 rows)
|
||||
|
||||
SELECT * FROM distributed_table WHERE a IN (SELECT a FROM view_on_part_dist) ORDER BY 1;
|
||||
a
|
||||
---------------------------------------------------------------------
|
||||
0
|
||||
1
|
||||
2
|
||||
3
|
||||
4
|
||||
5
|
||||
(6 rows)
|
||||
|
||||
SELECT * FROM view_on_dist WHERE a IN (SELECT a FROM view_on_part_dist) ORDER BY 1;
|
||||
a
|
||||
---------------------------------------------------------------------
|
||||
0
|
||||
1
|
||||
2
|
||||
3
|
||||
4
|
||||
5
|
||||
(6 rows)
|
||||
|
||||
SELECT * FROM view_on_citus_local WHERE a IN (SELECT a FROM reference_table) ORDER BY 1;
|
||||
a
|
||||
---------------------------------------------------------------------
|
||||
0
|
||||
1
|
||||
2
|
||||
3
|
||||
4
|
||||
5
|
||||
(6 rows)
|
||||
|
||||
SELECT COUNT(*) FROM (SELECT a, random() FROM partitioned_distributed_table GROUP BY a) AS foo;
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
6
|
||||
(1 row)
|
||||
|
||||
-- should add (a IS NOT NULL) filters similar to regular distributed tables
|
||||
RESET client_min_messages;
|
||||
SELECT public.explain_has_is_not_null(
|
||||
$$
|
||||
EXPLAIN (COSTS OFF)
|
||||
INSERT INTO partitioned_distributed_table SELECT * FROM partitioned_distributed_table;
|
||||
$$);
|
||||
explain_has_is_not_null
|
||||
---------------------------------------------------------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
SET client_min_messages TO DEBUG1;
|
||||
-- should fail
|
||||
SELECT * FROM partitioned_postgres_local_table JOIN distributed_table ON (true);
|
||||
ERROR: direct joins between distributed and local tables are not supported
|
||||
HINT: Use CTE's or subqueries to select from local tables and use them in joins
|
||||
SELECT * FROM partitioned_postgres_local_table JOIN partitioned_distributed_table ON (true);
|
||||
ERROR: direct joins between distributed and local tables are not supported
|
||||
HINT: Use CTE's or subqueries to select from local tables and use them in joins
|
||||
SELECT * FROM distributed_table JOIN partitioned_postgres_local_table ON (true);
|
||||
ERROR: direct joins between distributed and local tables are not supported
|
||||
HINT: Use CTE's or subqueries to select from local tables and use them in joins
|
||||
SELECT * FROM reference_table LEFT JOIN partitioned_distributed_table ON true;
|
||||
ERROR: cannot pushdown the subquery
|
||||
DETAIL: There exist a reference table in the outer part of the outer join
|
||||
INSERT INTO partitioned_distributed_table SELECT foo.* FROM partitioned_distributed_table AS foo JOIN citus_local_table ON (true);
|
||||
DEBUG: distributed INSERT ... SELECT cannot select from distributed tables and local tables at the same time
|
||||
ERROR: direct joins between distributed and local tables are not supported
|
||||
INSERT INTO partitioned_distributed_table SELECT foo.* FROM distributed_table AS foo JOIN citus_local_table ON (true);
|
||||
DEBUG: distributed INSERT ... SELECT cannot select from distributed tables and local tables at the same time
|
||||
ERROR: direct joins between distributed and local tables are not supported
|
||||
INSERT INTO distributed_table SELECT foo.a FROM partitioned_distributed_table AS foo JOIN citus_local_table ON (true);
|
||||
DEBUG: distributed INSERT ... SELECT cannot select from distributed tables and local tables at the same time
|
||||
ERROR: direct joins between distributed and local tables are not supported
|
||||
-- non-colocated subquery should work
|
||||
SELECT COUNT(*) FROM
|
||||
(SELECT *, random() FROM partitioned_distributed_table) AS foo,
|
||||
(SELECT *, random() FROM partitioned_distributed_table) AS bar
|
||||
WHERE foo.a = bar.b;
|
||||
DEBUG: generating subplan XXX_1 for subquery SELECT a, b, random() AS random FROM mixed_relkind_tests.partitioned_distributed_table
|
||||
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (SELECT partitioned_distributed_table.a, partitioned_distributed_table.b, random() AS random FROM mixed_relkind_tests.partitioned_distributed_table) foo, (SELECT intermediate_result.a, intermediate_result.b, intermediate_result.random FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer, random double precision)) bar WHERE (foo.a OPERATOR(pg_catalog.=) bar.b)
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
5
|
||||
(1 row)
|
||||
|
||||
SELECT COUNT(*) FROM
|
||||
(SELECT *, random() FROM unlogged_distributed_table) AS foo,
|
||||
(SELECT *, random() FROM foreign_distributed_table) AS bar
|
||||
WHERE foo.a = bar.b;
|
||||
DEBUG: generating subplan XXX_1 for subquery SELECT a, b, random() AS random FROM mixed_relkind_tests.foreign_distributed_table
|
||||
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (SELECT unlogged_distributed_table.a, unlogged_distributed_table.b, random() AS random FROM mixed_relkind_tests.unlogged_distributed_table) foo, (SELECT intermediate_result.a, intermediate_result.b, intermediate_result.random FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer, random double precision)) bar WHERE (foo.a OPERATOR(pg_catalog.=) bar.b)
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
-- should fail
|
||||
UPDATE partitioned_distributed_table SET b = foo.a FROM citus_local_table AS foo;
|
||||
ERROR: cannot plan modifications with citus local tables and distributed tables
|
||||
HINT: Use CTE's or subqueries to select from local tables and use them in joins
|
||||
UPDATE partitioned_distributed_table SET b = foo.a FROM postgres_local_table AS foo;
|
||||
ERROR: cannot plan modifications with local tables involving citus tables
|
||||
HINT: Use CTE's or subqueries to select from local tables and use them in joins
|
||||
UPDATE partitioned_distributed_table SET a = foo.a FROM postgres_local_table AS foo WHERE foo.a = partitioned_distributed_table.a;
|
||||
ERROR: cannot plan modifications with local tables involving citus tables
|
||||
HINT: Use CTE's or subqueries to select from local tables and use them in joins
|
||||
UPDATE partitioned_distributed_table SET a = foo.a FROM citus_local_table AS foo WHERE foo.a = partitioned_distributed_table.a;
|
||||
ERROR: cannot plan modifications with citus local tables and distributed tables
|
||||
HINT: Use CTE's or subqueries to select from local tables and use them in joins
|
||||
UPDATE partitioned_distributed_table SET a = foo.a FROM mat_view_on_part_dist AS foo WHERE foo.a = partitioned_distributed_table.a;
|
||||
ERROR: materialized views in modify queries are not supported
|
||||
UPDATE partitioned_distributed_table SET a = foo.a FROM partitioned_distributed_table AS foo WHERE foo.a < partitioned_distributed_table.a;
|
||||
ERROR: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns
|
||||
UPDATE partitioned_distributed_table SET a = foo.a FROM distributed_table AS foo WHERE foo.a < partitioned_distributed_table.a;
|
||||
ERROR: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns
|
||||
-- should work
|
||||
UPDATE partitioned_distributed_table SET a = foo.a FROM partitioned_distributed_table AS foo WHERE foo.a = partitioned_distributed_table.a;
|
||||
UPDATE partitioned_distributed_table SET a = foo.a FROM view_on_part_dist AS foo WHERE foo.a = partitioned_distributed_table.a;
|
||||
UPDATE partitioned_distributed_table SET a = foo.a FROM view_on_dist AS foo WHERE foo.a = partitioned_distributed_table.a;
|
||||
UPDATE partitioned_distributed_table SET a = foo.a FROM view_on_ref AS foo WHERE foo.a = partitioned_distributed_table.a;
|
||||
-- JOINs on the distribution key
|
||||
SELECT COUNT(*) FROM partitioned_distributed_table p1 JOIN partitioned_distributed_table p2 USING (a);
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
6
|
||||
(1 row)
|
||||
|
||||
SELECT COUNT(*) FROM unlogged_distributed_table u1 JOIN partitioned_distributed_table p2 USING (a);
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
6
|
||||
(1 row)
|
||||
|
||||
SELECT COUNT(*) FROM partitioned_distributed_table p1 LEFT JOIN partitioned_distributed_table p2 USING (a);
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
6
|
||||
(1 row)
|
||||
|
||||
-- lateral JOIN
|
||||
SELECT COUNT(*) FROM partitioned_distributed_table p1 JOIN LATERAL (SELECT * FROM partitioned_distributed_table p2 WHERE p1.a = p2.a) AS foo ON (true);
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
6
|
||||
(1 row)
|
||||
|
||||
-- router query
|
||||
SELECT COUNT(*) FROM partitioned_distributed_table p1 JOIN partitioned_distributed_table p2 USING (a) WHERE a = 1;
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
1
|
||||
(1 row)
|
||||
|
||||
-- repartition query
|
||||
SET citus.enable_repartition_joins TO ON;
|
||||
SELECT COUNT(*) FROM partitioned_distributed_table p1 JOIN partitioned_distributed_table p2 USING (b) WHERE b = 1;
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
1
|
||||
(1 row)
|
||||
|
||||
SELECT COUNT(*) FROM unlogged_distributed_table u1 JOIN partitioned_distributed_table p2 USING (b) WHERE b = 1;
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
1
|
||||
(1 row)
|
||||
|
||||
RESET citus.enable_repartition_joins;
|
||||
-- joins with cte's
|
||||
WITH cte_1 AS (SELECT * FROM partitioned_distributed_table)
|
||||
SELECT COUNT(*) FROM cte_1;
|
||||
DEBUG: generating subplan XXX_1 for CTE cte_1: SELECT a, b FROM mixed_relkind_tests.partitioned_distributed_table
|
||||
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) cte_1
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
6
|
||||
(1 row)
|
||||
|
||||
WITH cte_1 AS (SELECT * FROM partitioned_distributed_table)
|
||||
SELECT COUNT(*) FROM cte_1 JOIN partitioned_distributed_table USING (a);
|
||||
DEBUG: generating subplan XXX_1 for CTE cte_1: SELECT a, b FROM mixed_relkind_tests.partitioned_distributed_table
|
||||
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) cte_1 JOIN mixed_relkind_tests.partitioned_distributed_table USING (a))
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
6
|
||||
(1 row)
|
||||
|
||||
WITH cte_1 AS (SELECT * FROM foreign_distributed_table)
|
||||
SELECT COUNT(*) FROM cte_1 JOIN foreign_distributed_table USING (a);
|
||||
DEBUG: generating subplan XXX_1 for CTE cte_1: SELECT a, b FROM mixed_relkind_tests.foreign_distributed_table
|
||||
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) cte_1 JOIN mixed_relkind_tests.foreign_distributed_table USING (a))
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
WITH cte_1 AS (SELECT * FROM partitioned_distributed_table)
|
||||
SELECT COUNT(*) FROM cte_1 JOIN partitioned_distributed_table USING (b);
|
||||
DEBUG: generating subplan XXX_1 for CTE cte_1: SELECT a, b FROM mixed_relkind_tests.partitioned_distributed_table
|
||||
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) cte_1 JOIN mixed_relkind_tests.partitioned_distributed_table USING (b))
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
6
|
||||
(1 row)
|
||||
|
||||
-- multi shard colocated update
|
||||
UPDATE partitioned_distributed_table dt
|
||||
SET b = sub1.a + sub2.a
|
||||
FROM (SELECT * FROM partitioned_distributed_table WHERE b = 1) AS sub1,
|
||||
(SELECT * FROM partitioned_distributed_table WHERE b = 2) AS sub2
|
||||
WHERE sub1.a = sub2.a AND sub1.a = dt.a AND dt.a > 1;
|
||||
UPDATE unlogged_distributed_table dt
|
||||
SET b = sub1.a + sub2.a
|
||||
FROM (SELECT * FROM unlogged_distributed_table WHERE b = 1) AS sub1,
|
||||
(SELECT * FROM unlogged_distributed_table WHERE b = 2) AS sub2
|
||||
WHERE sub1.a = sub2.a AND sub1.a = dt.a AND dt.a > 1;
|
||||
-- multi shard non-colocated update
|
||||
WITH cte1 AS (SELECT * FROM partitioned_distributed_table WHERE b = 1),
|
||||
cte2 AS (SELECT * FROM partitioned_distributed_table WHERE b = 2)
|
||||
UPDATE partitioned_distributed_table dt SET b = cte1.a + cte2.a
|
||||
FROM cte1, cte2 WHERE cte1.a != cte2.a AND cte1.a = dt.a AND dt.a > 1;
|
||||
DEBUG: generating subplan XXX_1 for CTE cte1: SELECT a, b FROM mixed_relkind_tests.partitioned_distributed_table WHERE (b OPERATOR(pg_catalog.=) 1)
|
||||
DEBUG: generating subplan XXX_2 for CTE cte2: SELECT a, b FROM mixed_relkind_tests.partitioned_distributed_table WHERE (b OPERATOR(pg_catalog.=) 2)
|
||||
DEBUG: Plan XXX query after replacing subqueries and CTEs: UPDATE mixed_relkind_tests.partitioned_distributed_table dt SET b = (cte1.a OPERATOR(pg_catalog.+) cte2.a) FROM (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) cte1, (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) cte2 WHERE ((cte1.a OPERATOR(pg_catalog.<>) cte2.a) AND (cte1.a OPERATOR(pg_catalog.=) dt.a) AND (dt.a OPERATOR(pg_catalog.>) 1))
|
||||
-- router update with CTE
|
||||
UPDATE partitioned_distributed_table dt
|
||||
SET b = sub1.a + sub2.a
|
||||
FROM (SELECT * FROM partitioned_distributed_table WHERE b = 1) AS sub1,
|
||||
(SELECT * FROM partitioned_distributed_table WHERE b = 2) AS sub2
|
||||
WHERE sub1.a = sub2.a AND sub1.a = dt.a AND dt.a = 1;
|
||||
-- INSERT .. SELECT via coordinator
|
||||
RESET client_min_messages;
|
||||
SELECT public.coordinator_plan($Q$
|
||||
EXPLAIN (COSTS OFF)
|
||||
INSERT INTO partitioned_distributed_table SELECT * FROM partitioned_distributed_table ORDER BY 1,2 LIMIT 5;
|
||||
$Q$);
|
||||
coordinator_plan
|
||||
---------------------------------------------------------------------
|
||||
Custom Scan (Citus INSERT ... SELECT)
|
||||
INSERT/SELECT method: pull to coordinator
|
||||
-> Limit
|
||||
-> Sort
|
||||
Sort Key: remote_scan.a, remote_scan.b
|
||||
-> Custom Scan (Citus Adaptive)
|
||||
Task Count: 4
|
||||
(7 rows)
|
||||
|
||||
SELECT public.coordinator_plan($Q$
|
||||
EXPLAIN (COSTS OFF)
|
||||
INSERT INTO unlogged_distributed_table SELECT * FROM partitioned_distributed_table ORDER BY 1,2 LIMIT 5;
|
||||
$Q$);
|
||||
coordinator_plan
|
||||
---------------------------------------------------------------------
|
||||
Custom Scan (Citus INSERT ... SELECT)
|
||||
INSERT/SELECT method: pull to coordinator
|
||||
-> Limit
|
||||
-> Sort
|
||||
Sort Key: remote_scan.a, remote_scan.b
|
||||
-> Custom Scan (Citus Adaptive)
|
||||
Task Count: 4
|
||||
(7 rows)
|
||||
|
||||
SELECT public.coordinator_plan($Q$
|
||||
EXPLAIN (COSTS OFF)
|
||||
INSERT INTO partitioned_distributed_table SELECT * FROM distributed_table ORDER BY 1 LIMIT 5;
|
||||
$Q$);
|
||||
coordinator_plan
|
||||
---------------------------------------------------------------------
|
||||
Custom Scan (Citus INSERT ... SELECT)
|
||||
INSERT/SELECT method: pull to coordinator
|
||||
-> Limit
|
||||
-> Sort
|
||||
Sort Key: remote_scan.a
|
||||
-> Custom Scan (Citus Adaptive)
|
||||
Task Count: 4
|
||||
(7 rows)
|
||||
|
||||
-- INSERT .. SELECT via repartition
|
||||
SELECT public.coordinator_plan($Q$
|
||||
EXPLAIN (COSTS OFF)
|
||||
INSERT INTO partitioned_distributed_table SELECT a + 1 FROM partitioned_distributed_table;
|
||||
$Q$);
|
||||
coordinator_plan
|
||||
---------------------------------------------------------------------
|
||||
Custom Scan (Citus INSERT ... SELECT)
|
||||
INSERT/SELECT method: repartition
|
||||
-> Custom Scan (Citus Adaptive)
|
||||
Task Count: 4
|
||||
(4 rows)
|
||||
|
||||
SELECT public.coordinator_plan($Q$
|
||||
EXPLAIN (COSTS OFF)
|
||||
INSERT INTO unlogged_distributed_table SELECT a + 1 FROM partitioned_distributed_table;
|
||||
$Q$);
|
||||
coordinator_plan
|
||||
---------------------------------------------------------------------
|
||||
Custom Scan (Citus INSERT ... SELECT)
|
||||
INSERT/SELECT method: repartition
|
||||
-> Custom Scan (Citus Adaptive)
|
||||
Task Count: 4
|
||||
(4 rows)
|
||||
|
||||
SELECT public.coordinator_plan($Q$
|
||||
EXPLAIN (COSTS OFF)
|
||||
INSERT INTO partitioned_distributed_table SELECT a + 1 FROM distributed_table;
|
||||
$Q$);
|
||||
coordinator_plan
|
||||
---------------------------------------------------------------------
|
||||
Custom Scan (Citus INSERT ... SELECT)
|
||||
INSERT/SELECT method: repartition
|
||||
-> Custom Scan (Citus Adaptive)
|
||||
Task Count: 4
|
||||
(4 rows)
|
||||
|
||||
SELECT public.coordinator_plan($Q$
|
||||
EXPLAIN (COSTS OFF)
|
||||
INSERT INTO partitioned_distributed_table SELECT a + 1 FROM unlogged_distributed_table;
|
||||
$Q$);
|
||||
coordinator_plan
|
||||
---------------------------------------------------------------------
|
||||
Custom Scan (Citus INSERT ... SELECT)
|
||||
INSERT/SELECT method: repartition
|
||||
-> Custom Scan (Citus Adaptive)
|
||||
Task Count: 4
|
||||
(4 rows)
|
||||
|
||||
SET client_min_messages TO DEBUG1;
|
||||
-- some aggregate queries
|
||||
SELECT sum(a) FROM partitioned_distributed_table;
|
||||
sum
|
||||
---------------------------------------------------------------------
|
||||
15
|
||||
(1 row)
|
||||
|
||||
SELECT ceil(regr_syy(a, b)) FROM partitioned_distributed_table;
|
||||
ceil
|
||||
---------------------------------------------------------------------
|
||||
18
|
||||
(1 row)
|
||||
|
||||
SELECT ceil(regr_syy(a, b)) FROM unlogged_distributed_table;
|
||||
ceil
|
||||
---------------------------------------------------------------------
|
||||
18
|
||||
(1 row)
|
||||
|
||||
-- pushdown WINDOW
|
||||
SELECT public.coordinator_plan($Q$
|
||||
EXPLAIN (COSTS OFF)
|
||||
SELECT a, COUNT(*) OVER (PARTITION BY a) FROM partitioned_distributed_table ORDER BY 1,2;
|
||||
$Q$);
|
||||
coordinator_plan
|
||||
---------------------------------------------------------------------
|
||||
Sort
|
||||
Sort Key: remote_scan.a, remote_scan.count
|
||||
-> Custom Scan (Citus Adaptive)
|
||||
Task Count: 4
|
||||
(4 rows)
|
||||
|
||||
SELECT public.coordinator_plan($Q$
|
||||
EXPLAIN (COSTS OFF)
|
||||
SELECT a, COUNT(*) OVER (PARTITION BY a) FROM foreign_distributed_table ORDER BY 1,2;
|
||||
$Q$);
|
||||
coordinator_plan
|
||||
---------------------------------------------------------------------
|
||||
Sort
|
||||
Sort Key: remote_scan.a, remote_scan.count
|
||||
-> Custom Scan (Citus Adaptive)
|
||||
Task Count: 4
|
||||
(4 rows)
|
||||
|
||||
-- pull to coordinator WINDOW
|
||||
SELECT public.coordinator_plan($Q$
|
||||
EXPLAIN (COSTS OFF)
|
||||
SELECT a, COUNT(*) OVER (PARTITION BY a+1) FROM partitioned_distributed_table ORDER BY 1,2;
|
||||
$Q$);
|
||||
coordinator_plan
|
||||
---------------------------------------------------------------------
|
||||
Sort
|
||||
Sort Key: remote_scan.a, (count(*) OVER (?))
|
||||
-> WindowAgg
|
||||
-> Sort
|
||||
Sort Key: remote_scan.worker_column_2
|
||||
-> Custom Scan (Citus Adaptive)
|
||||
Task Count: 4
|
||||
(7 rows)
|
||||
|
||||
SELECT public.coordinator_plan($Q$
|
||||
EXPLAIN (COSTS OFF)
|
||||
SELECT a, COUNT(*) OVER (PARTITION BY a+1) FROM foreign_distributed_table ORDER BY 1,2;
|
||||
$Q$);
|
||||
coordinator_plan
|
||||
---------------------------------------------------------------------
|
||||
Sort
|
||||
Sort Key: remote_scan.a, (count(*) OVER (?))
|
||||
-> WindowAgg
|
||||
-> Sort
|
||||
Sort Key: remote_scan.worker_column_2
|
||||
-> Custom Scan (Citus Adaptive)
|
||||
Task Count: 4
|
||||
(7 rows)
|
||||
|
||||
-- FOR UPDATE
|
||||
SELECT * FROM partitioned_distributed_table WHERE a = 1 ORDER BY 1,2 FOR UPDATE;
|
||||
a | b
|
||||
---------------------------------------------------------------------
|
||||
1 | 2
|
||||
(1 row)
|
||||
|
||||
SELECT * FROM unlogged_distributed_table WHERE a = 1 ORDER BY 1,2 FOR UPDATE;
|
||||
a | b
|
||||
---------------------------------------------------------------------
|
||||
1 | 2
|
||||
(1 row)
|
||||
|
||||
VACUUM partitioned_distributed_table;
|
||||
TRUNCATE partitioned_distributed_table;
|
||||
SET client_min_messages TO ERROR;
|
||||
-- drop column followed by SELECT in transaction block
|
||||
BEGIN;
|
||||
ALTER TABLE partitioned_distributed_table DROP COLUMN b CASCADE;
|
||||
SELECT * FROM partitioned_distributed_table;
|
||||
a
|
||||
---------------------------------------------------------------------
|
||||
(0 rows)
|
||||
|
||||
COMMIT;
|
||||
BEGIN;
|
||||
ALTER TABLE foreign_distributed_table DROP COLUMN b CASCADE;
|
||||
SELECT * FROM foreign_distributed_table;
|
||||
a
|
||||
---------------------------------------------------------------------
|
||||
(0 rows)
|
||||
|
||||
COMMIT;
|
||||
-- cleanup at exit
|
||||
DROP SCHEMA mixed_relkind_tests CASCADE;
|
|
@ -29,6 +29,20 @@ BEGIN
|
|||
END LOOP;
|
||||
RETURN;
|
||||
END; $$ language plpgsql;
|
||||
-- helper function that returns true if output of given explain has "is not null" (case in-sensitive)
|
||||
CREATE OR REPLACE FUNCTION explain_has_is_not_null(explain_commmand text)
|
||||
RETURNS BOOLEAN AS $$
|
||||
DECLARE
|
||||
query_plan text;
|
||||
BEGIN
|
||||
FOR query_plan IN EXECUTE explain_commmand LOOP
|
||||
IF query_plan ILIKE '%is not null%'
|
||||
THEN
|
||||
RETURN true;
|
||||
END IF;
|
||||
END LOOP;
|
||||
RETURN false;
|
||||
END; $$ language plpgsql;
|
||||
-- helper function to quickly run SQL on the whole cluster
|
||||
CREATE OR REPLACE FUNCTION run_command_on_coordinator_and_workers(p_sql text)
|
||||
RETURNS void LANGUAGE plpgsql AS $$
|
||||
|
|
|
@ -310,7 +310,7 @@ test: replicate_reference_tables_to_coordinator
|
|||
test: coordinator_shouldhaveshards
|
||||
test: local_shard_utility_command_execution
|
||||
test: citus_local_tables
|
||||
test: multi_row_router_insert
|
||||
test: multi_row_router_insert mixed_relkind_tests
|
||||
|
||||
test: remove_coordinator
|
||||
|
||||
|
|
|
@ -0,0 +1,284 @@
|
|||
\SET VERBOSITY terse
|
||||
|
||||
SET citus.next_shard_id TO 1513000;
|
||||
SET citus.shard_replication_factor TO 1;
|
||||
|
||||
CREATE SCHEMA mixed_relkind_tests;
|
||||
SET search_path TO mixed_relkind_tests;
|
||||
|
||||
-- ensure that coordinator is added to pg_dist_node
|
||||
SET client_min_messages TO ERROR;
|
||||
SELECT 1 FROM master_add_node('localhost', :master_port, groupId => 0);
|
||||
RESET client_min_messages;
|
||||
|
||||
-- make results consistent
|
||||
SET citus.enable_cte_inlining TO OFF;
|
||||
|
||||
-- create test tables
|
||||
CREATE TABLE postgres_local_table (a int);
|
||||
|
||||
CREATE TABLE partitioned_postgres_local_table(a int) PARTITION BY RANGE(a);
|
||||
CREATE TABLE partitioned_postgres_local_table_1 PARTITION OF partitioned_postgres_local_table FOR VALUES FROM (0) TO (3);
|
||||
CREATE TABLE partitioned_postgres_local_table_2 PARTITION OF partitioned_postgres_local_table FOR VALUES FROM (3) TO (1000);
|
||||
|
||||
CREATE TABLE reference_table(a int);
|
||||
SELECT create_reference_table('reference_table');
|
||||
|
||||
CREATE VIEW view_on_ref AS SELECT * FROM reference_table;
|
||||
|
||||
CREATE TABLE citus_local_table(a int);
|
||||
SELECT create_citus_local_table('citus_local_table');
|
||||
|
||||
CREATE VIEW view_on_citus_local AS SELECT * FROM citus_local_table;
|
||||
|
||||
CREATE UNLOGGED TABLE unlogged_distributed_table(a int, b int);
|
||||
SELECT create_distributed_table('unlogged_distributed_table', 'a');
|
||||
|
||||
CREATE TABLE distributed_table(a int);
|
||||
SELECT create_distributed_table('distributed_table', 'a');
|
||||
|
||||
CREATE VIEW view_on_dist AS SELECT * FROM distributed_table;
|
||||
CREATE MATERIALIZED VIEW mat_view_on_dist AS SELECT * FROM distributed_table;
|
||||
|
||||
CREATE TABLE partitioned_distributed_table(a int, b int) PARTITION BY RANGE(a);
|
||||
CREATE TABLE partitioned_distributed_table_1 PARTITION OF partitioned_distributed_table FOR VALUES FROM (0) TO (3);
|
||||
CREATE TABLE partitioned_distributed_table_2 PARTITION OF partitioned_distributed_table FOR VALUES FROM (3) TO (1000);
|
||||
SELECT create_distributed_table('partitioned_distributed_table', 'a');
|
||||
|
||||
CREATE VIEW view_on_part_dist AS SELECT * FROM partitioned_distributed_table;
|
||||
CREATE MATERIALIZED VIEW mat_view_on_part_dist AS SELECT * FROM partitioned_distributed_table;
|
||||
|
||||
CREATE FOREIGN TABLE foreign_distributed_table (a int, b int) SERVER fake_fdw_server;
|
||||
SELECT create_distributed_table('foreign_distributed_table', 'a');
|
||||
|
||||
-- and insert some data
|
||||
INSERT INTO postgres_local_table SELECT * FROM generate_series(0, 5);
|
||||
INSERT INTO partitioned_postgres_local_table SELECT * FROM generate_series(0, 5);
|
||||
INSERT INTO reference_table SELECT * FROM generate_series(0, 5);
|
||||
INSERT INTO citus_local_table SELECT * FROM generate_series(0, 5);
|
||||
INSERT INTO unlogged_distributed_table SELECT a,a+1 FROM generate_series(0, 5) AS a;
|
||||
INSERT INTO distributed_table SELECT * FROM generate_series(0, 5);
|
||||
INSERT INTO partitioned_distributed_table SELECT a,a+1 FROM generate_series(0, 5) AS a;
|
||||
|
||||
-- should work
|
||||
SELECT * FROM partitioned_distributed_table UNION SELECT 1,1 ORDER BY 1,2;
|
||||
SELECT * FROM partitioned_distributed_table UNION SELECT 1, * FROM postgres_local_table ORDER BY 1,2;
|
||||
SELECT * FROM partitioned_distributed_table UNION SELECT * FROM unlogged_distributed_table ORDER BY 1,2;
|
||||
SELECT *, 1 FROM postgres_local_table UNION SELECT * FROM unlogged_distributed_table ORDER BY 1,2;
|
||||
SELECT * FROM unlogged_distributed_table UNION SELECT 1,1 ORDER BY 1,2;
|
||||
SELECT * from foreign_distributed_table UNION SELECT 1,1 ORDER BY 1,2;
|
||||
SELECT 1 UNION SELECT * FROM citus_local_table ORDER BY 1;
|
||||
|
||||
SELECT * FROM view_on_part_dist UNION SELECT 1,1 ORDER BY 1,2;
|
||||
SELECT * FROM mat_view_on_part_dist UNION SELECT 1,1 ORDER BY 1,2;
|
||||
SELECT * FROM view_on_citus_local UNION SELECT 1 ORDER BY 1;
|
||||
SELECT * FROM view_on_dist UNION SELECT 1 ORDER BY 1;
|
||||
SELECT * FROM mat_view_on_dist UNION SELECT 1 ORDER BY 1;
|
||||
|
||||
SET client_min_messages TO DEBUG1;
|
||||
|
||||
-- can push down the union in subquery
|
||||
SELECT * FROM (SELECT * FROM partitioned_distributed_table UNION SELECT * FROM partitioned_distributed_table) AS foo;
|
||||
|
||||
-- cannot push down the subquery, should evaluate subquery by creating a subplan
|
||||
SELECT COUNT(*) FROM (SELECT b, random() FROM partitioned_distributed_table GROUP BY b) AS foo;
|
||||
SELECT * FROM partitioned_distributed_table WHERE b IN (SELECT a FROM postgres_local_table) ORDER BY 1,2;
|
||||
|
||||
-- can push down the subquery
|
||||
SELECT * FROM partitioned_distributed_table WHERE a IN (SELECT a FROM distributed_table) ORDER BY 1,2;
|
||||
SELECT * FROM partitioned_distributed_table WHERE a IN (SELECT a FROM view_on_part_dist) ORDER BY 1,2;
|
||||
SELECT * FROM distributed_table WHERE a IN (SELECT a FROM view_on_part_dist) ORDER BY 1;
|
||||
SELECT * FROM view_on_dist WHERE a IN (SELECT a FROM view_on_part_dist) ORDER BY 1;
|
||||
SELECT * FROM view_on_citus_local WHERE a IN (SELECT a FROM reference_table) ORDER BY 1;
|
||||
SELECT COUNT(*) FROM (SELECT a, random() FROM partitioned_distributed_table GROUP BY a) AS foo;
|
||||
|
||||
-- should add (a IS NOT NULL) filters similar to regular distributed tables
|
||||
RESET client_min_messages;
|
||||
SELECT public.explain_has_is_not_null(
|
||||
$$
|
||||
EXPLAIN (COSTS OFF)
|
||||
INSERT INTO partitioned_distributed_table SELECT * FROM partitioned_distributed_table;
|
||||
$$);
|
||||
SET client_min_messages TO DEBUG1;
|
||||
|
||||
-- should fail
|
||||
SELECT * FROM partitioned_postgres_local_table JOIN distributed_table ON (true);
|
||||
SELECT * FROM partitioned_postgres_local_table JOIN partitioned_distributed_table ON (true);
|
||||
SELECT * FROM distributed_table JOIN partitioned_postgres_local_table ON (true);
|
||||
SELECT * FROM reference_table LEFT JOIN partitioned_distributed_table ON true;
|
||||
INSERT INTO partitioned_distributed_table SELECT foo.* FROM partitioned_distributed_table AS foo JOIN citus_local_table ON (true);
|
||||
INSERT INTO partitioned_distributed_table SELECT foo.* FROM distributed_table AS foo JOIN citus_local_table ON (true);
|
||||
INSERT INTO distributed_table SELECT foo.a FROM partitioned_distributed_table AS foo JOIN citus_local_table ON (true);
|
||||
|
||||
-- non-colocated subquery should work
|
||||
SELECT COUNT(*) FROM
|
||||
(SELECT *, random() FROM partitioned_distributed_table) AS foo,
|
||||
(SELECT *, random() FROM partitioned_distributed_table) AS bar
|
||||
WHERE foo.a = bar.b;
|
||||
|
||||
SELECT COUNT(*) FROM
|
||||
(SELECT *, random() FROM unlogged_distributed_table) AS foo,
|
||||
(SELECT *, random() FROM foreign_distributed_table) AS bar
|
||||
WHERE foo.a = bar.b;
|
||||
|
||||
-- should fail
|
||||
UPDATE partitioned_distributed_table SET b = foo.a FROM citus_local_table AS foo;
|
||||
UPDATE partitioned_distributed_table SET b = foo.a FROM postgres_local_table AS foo;
|
||||
UPDATE partitioned_distributed_table SET a = foo.a FROM postgres_local_table AS foo WHERE foo.a = partitioned_distributed_table.a;
|
||||
UPDATE partitioned_distributed_table SET a = foo.a FROM citus_local_table AS foo WHERE foo.a = partitioned_distributed_table.a;
|
||||
UPDATE partitioned_distributed_table SET a = foo.a FROM mat_view_on_part_dist AS foo WHERE foo.a = partitioned_distributed_table.a;
|
||||
UPDATE partitioned_distributed_table SET a = foo.a FROM partitioned_distributed_table AS foo WHERE foo.a < partitioned_distributed_table.a;
|
||||
UPDATE partitioned_distributed_table SET a = foo.a FROM distributed_table AS foo WHERE foo.a < partitioned_distributed_table.a;
|
||||
|
||||
-- should work
|
||||
UPDATE partitioned_distributed_table SET a = foo.a FROM partitioned_distributed_table AS foo WHERE foo.a = partitioned_distributed_table.a;
|
||||
UPDATE partitioned_distributed_table SET a = foo.a FROM view_on_part_dist AS foo WHERE foo.a = partitioned_distributed_table.a;
|
||||
UPDATE partitioned_distributed_table SET a = foo.a FROM view_on_dist AS foo WHERE foo.a = partitioned_distributed_table.a;
|
||||
UPDATE partitioned_distributed_table SET a = foo.a FROM view_on_ref AS foo WHERE foo.a = partitioned_distributed_table.a;
|
||||
|
||||
-- JOINs on the distribution key
|
||||
SELECT COUNT(*) FROM partitioned_distributed_table p1 JOIN partitioned_distributed_table p2 USING (a);
|
||||
SELECT COUNT(*) FROM unlogged_distributed_table u1 JOIN partitioned_distributed_table p2 USING (a);
|
||||
SELECT COUNT(*) FROM partitioned_distributed_table p1 LEFT JOIN partitioned_distributed_table p2 USING (a);
|
||||
|
||||
-- lateral JOIN
|
||||
SELECT COUNT(*) FROM partitioned_distributed_table p1 JOIN LATERAL (SELECT * FROM partitioned_distributed_table p2 WHERE p1.a = p2.a) AS foo ON (true);
|
||||
|
||||
-- router query
|
||||
SELECT COUNT(*) FROM partitioned_distributed_table p1 JOIN partitioned_distributed_table p2 USING (a) WHERE a = 1;
|
||||
|
||||
-- repartition query
|
||||
SET citus.enable_repartition_joins TO ON;
|
||||
SELECT COUNT(*) FROM partitioned_distributed_table p1 JOIN partitioned_distributed_table p2 USING (b) WHERE b = 1;
|
||||
SELECT COUNT(*) FROM unlogged_distributed_table u1 JOIN partitioned_distributed_table p2 USING (b) WHERE b = 1;
|
||||
RESET citus.enable_repartition_joins;
|
||||
|
||||
-- joins with cte's
|
||||
WITH cte_1 AS (SELECT * FROM partitioned_distributed_table)
|
||||
SELECT COUNT(*) FROM cte_1;
|
||||
|
||||
WITH cte_1 AS (SELECT * FROM partitioned_distributed_table)
|
||||
SELECT COUNT(*) FROM cte_1 JOIN partitioned_distributed_table USING (a);
|
||||
|
||||
WITH cte_1 AS (SELECT * FROM foreign_distributed_table)
|
||||
SELECT COUNT(*) FROM cte_1 JOIN foreign_distributed_table USING (a);
|
||||
|
||||
WITH cte_1 AS (SELECT * FROM partitioned_distributed_table)
|
||||
SELECT COUNT(*) FROM cte_1 JOIN partitioned_distributed_table USING (b);
|
||||
|
||||
-- multi shard colocated update
|
||||
UPDATE partitioned_distributed_table dt
|
||||
SET b = sub1.a + sub2.a
|
||||
FROM (SELECT * FROM partitioned_distributed_table WHERE b = 1) AS sub1,
|
||||
(SELECT * FROM partitioned_distributed_table WHERE b = 2) AS sub2
|
||||
WHERE sub1.a = sub2.a AND sub1.a = dt.a AND dt.a > 1;
|
||||
|
||||
UPDATE unlogged_distributed_table dt
|
||||
SET b = sub1.a + sub2.a
|
||||
FROM (SELECT * FROM unlogged_distributed_table WHERE b = 1) AS sub1,
|
||||
(SELECT * FROM unlogged_distributed_table WHERE b = 2) AS sub2
|
||||
WHERE sub1.a = sub2.a AND sub1.a = dt.a AND dt.a > 1;
|
||||
|
||||
-- multi shard non-colocated update
|
||||
WITH cte1 AS (SELECT * FROM partitioned_distributed_table WHERE b = 1),
|
||||
cte2 AS (SELECT * FROM partitioned_distributed_table WHERE b = 2)
|
||||
UPDATE partitioned_distributed_table dt SET b = cte1.a + cte2.a
|
||||
FROM cte1, cte2 WHERE cte1.a != cte2.a AND cte1.a = dt.a AND dt.a > 1;
|
||||
|
||||
-- router update with CTE
|
||||
UPDATE partitioned_distributed_table dt
|
||||
SET b = sub1.a + sub2.a
|
||||
FROM (SELECT * FROM partitioned_distributed_table WHERE b = 1) AS sub1,
|
||||
(SELECT * FROM partitioned_distributed_table WHERE b = 2) AS sub2
|
||||
WHERE sub1.a = sub2.a AND sub1.a = dt.a AND dt.a = 1;
|
||||
|
||||
-- INSERT .. SELECT via coordinator
|
||||
RESET client_min_messages;
|
||||
|
||||
SELECT public.coordinator_plan($Q$
|
||||
EXPLAIN (COSTS OFF)
|
||||
INSERT INTO partitioned_distributed_table SELECT * FROM partitioned_distributed_table ORDER BY 1,2 LIMIT 5;
|
||||
$Q$);
|
||||
|
||||
SELECT public.coordinator_plan($Q$
|
||||
EXPLAIN (COSTS OFF)
|
||||
INSERT INTO unlogged_distributed_table SELECT * FROM partitioned_distributed_table ORDER BY 1,2 LIMIT 5;
|
||||
$Q$);
|
||||
|
||||
SELECT public.coordinator_plan($Q$
|
||||
EXPLAIN (COSTS OFF)
|
||||
INSERT INTO partitioned_distributed_table SELECT * FROM distributed_table ORDER BY 1 LIMIT 5;
|
||||
$Q$);
|
||||
|
||||
-- INSERT .. SELECT via repartition
|
||||
SELECT public.coordinator_plan($Q$
|
||||
EXPLAIN (COSTS OFF)
|
||||
INSERT INTO partitioned_distributed_table SELECT a + 1 FROM partitioned_distributed_table;
|
||||
$Q$);
|
||||
|
||||
SELECT public.coordinator_plan($Q$
|
||||
EXPLAIN (COSTS OFF)
|
||||
INSERT INTO unlogged_distributed_table SELECT a + 1 FROM partitioned_distributed_table;
|
||||
$Q$);
|
||||
|
||||
SELECT public.coordinator_plan($Q$
|
||||
EXPLAIN (COSTS OFF)
|
||||
INSERT INTO partitioned_distributed_table SELECT a + 1 FROM distributed_table;
|
||||
$Q$);
|
||||
|
||||
SELECT public.coordinator_plan($Q$
|
||||
EXPLAIN (COSTS OFF)
|
||||
INSERT INTO partitioned_distributed_table SELECT a + 1 FROM unlogged_distributed_table;
|
||||
$Q$);
|
||||
|
||||
SET client_min_messages TO DEBUG1;
|
||||
|
||||
-- some aggregate queries
|
||||
SELECT sum(a) FROM partitioned_distributed_table;
|
||||
SELECT ceil(regr_syy(a, b)) FROM partitioned_distributed_table;
|
||||
SELECT ceil(regr_syy(a, b)) FROM unlogged_distributed_table;
|
||||
|
||||
-- pushdown WINDOW
|
||||
SELECT public.coordinator_plan($Q$
|
||||
EXPLAIN (COSTS OFF)
|
||||
SELECT a, COUNT(*) OVER (PARTITION BY a) FROM partitioned_distributed_table ORDER BY 1,2;
|
||||
$Q$);
|
||||
|
||||
SELECT public.coordinator_plan($Q$
|
||||
EXPLAIN (COSTS OFF)
|
||||
SELECT a, COUNT(*) OVER (PARTITION BY a) FROM foreign_distributed_table ORDER BY 1,2;
|
||||
$Q$);
|
||||
|
||||
-- pull to coordinator WINDOW
|
||||
SELECT public.coordinator_plan($Q$
|
||||
EXPLAIN (COSTS OFF)
|
||||
SELECT a, COUNT(*) OVER (PARTITION BY a+1) FROM partitioned_distributed_table ORDER BY 1,2;
|
||||
$Q$);
|
||||
|
||||
SELECT public.coordinator_plan($Q$
|
||||
EXPLAIN (COSTS OFF)
|
||||
SELECT a, COUNT(*) OVER (PARTITION BY a+1) FROM foreign_distributed_table ORDER BY 1,2;
|
||||
$Q$);
|
||||
|
||||
-- FOR UPDATE
|
||||
SELECT * FROM partitioned_distributed_table WHERE a = 1 ORDER BY 1,2 FOR UPDATE;
|
||||
SELECT * FROM unlogged_distributed_table WHERE a = 1 ORDER BY 1,2 FOR UPDATE;
|
||||
|
||||
VACUUM partitioned_distributed_table;
|
||||
TRUNCATE partitioned_distributed_table;
|
||||
|
||||
SET client_min_messages TO ERROR;
|
||||
|
||||
-- drop column followed by SELECT in transaction block
|
||||
BEGIN;
|
||||
ALTER TABLE partitioned_distributed_table DROP COLUMN b CASCADE;
|
||||
SELECT * FROM partitioned_distributed_table;
|
||||
COMMIT;
|
||||
|
||||
BEGIN;
|
||||
ALTER TABLE foreign_distributed_table DROP COLUMN b CASCADE;
|
||||
SELECT * FROM foreign_distributed_table;
|
||||
COMMIT;
|
||||
|
||||
-- cleanup at exit
|
||||
DROP SCHEMA mixed_relkind_tests CASCADE;
|
|
@ -33,6 +33,21 @@ BEGIN
|
|||
RETURN;
|
||||
END; $$ language plpgsql;
|
||||
|
||||
-- helper function that returns true if output of given explain has "is not null" (case in-sensitive)
|
||||
CREATE OR REPLACE FUNCTION explain_has_is_not_null(explain_commmand text)
|
||||
RETURNS BOOLEAN AS $$
|
||||
DECLARE
|
||||
query_plan text;
|
||||
BEGIN
|
||||
FOR query_plan IN EXECUTE explain_commmand LOOP
|
||||
IF query_plan ILIKE '%is not null%'
|
||||
THEN
|
||||
RETURN true;
|
||||
END IF;
|
||||
END LOOP;
|
||||
RETURN false;
|
||||
END; $$ language plpgsql;
|
||||
|
||||
-- helper function to quickly run SQL on the whole cluster
|
||||
CREATE OR REPLACE FUNCTION run_command_on_coordinator_and_workers(p_sql text)
|
||||
RETURNS void LANGUAGE plpgsql AS $$
|
||||
|
|
Loading…
Reference in New Issue