mirror of https://github.com/citusdata/citus.git
Fix insert query with CTEs/sublinks/subqueries etc (#4700)
* Fix insert query with CTE
* Add more cases with deferred pruning but false fast path
* Add more tests
* Better readability with if statements
(cherry picked from commit dbb88f6f8b
)
pull/5009/head
parent
495470d291
commit
fc08ec203f
|
@ -342,9 +342,12 @@ CitusBeginModifyScan(CustomScanState *node, EState *estate, int eflags)
|
|||
/*
|
||||
* At this point, we're about to do the shard pruning for fast-path queries.
|
||||
* Given that pruning is deferred always for INSERTs, we get here
|
||||
* !EnableFastPathRouterPlanner as well.
|
||||
* !EnableFastPathRouterPlanner as well. Given that INSERT statements with
|
||||
* CTEs/sublinks etc are not eligible for fast-path router plan, we get here
|
||||
* jobQuery->commandType == CMD_INSERT as well.
|
||||
*/
|
||||
Assert(currentPlan->fastPathRouterPlan || !EnableFastPathRouterPlanner);
|
||||
Assert(currentPlan->fastPathRouterPlan || !EnableFastPathRouterPlanner ||
|
||||
jobQuery->commandType == CMD_INSERT);
|
||||
|
||||
/*
|
||||
* We can only now decide which shard to use, so we need to build a new task
|
||||
|
|
|
@ -555,6 +555,14 @@ ModifyPartialQuerySupported(Query *queryTree, bool multiShardQuery,
|
|||
{
|
||||
ListCell *cteCell = NULL;
|
||||
|
||||
/* CTEs still not supported for INSERTs. */
|
||||
if (queryTree->commandType == CMD_INSERT)
|
||||
{
|
||||
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
|
||||
"Router planner doesn't support common table expressions with INSERT queries.",
|
||||
NULL, NULL);
|
||||
}
|
||||
|
||||
foreach(cteCell, queryTree->cteList)
|
||||
{
|
||||
CommonTableExpr *cte = (CommonTableExpr *) lfirst(cteCell);
|
||||
|
@ -562,31 +570,22 @@ ModifyPartialQuerySupported(Query *queryTree, bool multiShardQuery,
|
|||
|
||||
if (cteQuery->commandType != CMD_SELECT)
|
||||
{
|
||||
/* Modifying CTEs still not supported for INSERTs & multi shard queries. */
|
||||
if (queryTree->commandType == CMD_INSERT)
|
||||
{
|
||||
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
|
||||
"Router planner doesn't support non-select common table expressions with non-select queries.",
|
||||
NULL, NULL);
|
||||
}
|
||||
|
||||
/* Modifying CTEs still not supported for multi shard queries. */
|
||||
if (multiShardQuery)
|
||||
{
|
||||
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
|
||||
"Router planner doesn't support non-select common table expressions with multi shard queries.",
|
||||
NULL, NULL);
|
||||
}
|
||||
/* Modifying CTEs exclude both INSERT CTEs & INSERT queries. */
|
||||
else if (cteQuery->commandType == CMD_INSERT)
|
||||
{
|
||||
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
|
||||
"Router planner doesn't support INSERT common table expressions.",
|
||||
NULL, NULL);
|
||||
}
|
||||
}
|
||||
|
||||
/* Modifying CTEs exclude both INSERT CTEs & INSERT queries. */
|
||||
if (cteQuery->commandType == CMD_INSERT)
|
||||
{
|
||||
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
|
||||
"Router planner doesn't support INSERT common table expressions.",
|
||||
NULL, NULL);
|
||||
}
|
||||
|
||||
|
||||
if (cteQuery->hasForUpdate &&
|
||||
FindNodeMatchingCheckFunctionInRangeTableList(cteQuery->rtable,
|
||||
IsReferenceTableRTE))
|
||||
|
|
|
@ -41,7 +41,7 @@ SELECT create_distributed_table('articles_hash', 'author_id');
|
|||
|
||||
(1 row)
|
||||
|
||||
CREATE TABLE authors_reference ( name varchar(20), id bigint );
|
||||
CREATE TABLE authors_reference (id int, name text);
|
||||
SELECT create_reference_table('authors_reference');
|
||||
create_reference_table
|
||||
---------------------------------------------------------------------
|
||||
|
@ -2111,6 +2111,26 @@ DEBUG: query has a single distribution column value: 4
|
|||
0
|
||||
(1 row)
|
||||
|
||||
-- test INSERT using values from generate_series() and repeat() functions
|
||||
INSERT INTO authors_reference (id, name) VALUES (generate_series(1, 10), repeat('Migjeni', 3));
|
||||
DEBUG: Creating router plan
|
||||
SELECT * FROM authors_reference ORDER BY 1, 2;
|
||||
DEBUG: Distributed planning for a fast-path router query
|
||||
DEBUG: Creating router plan
|
||||
id | name
|
||||
---------------------------------------------------------------------
|
||||
1 | MigjeniMigjeniMigjeni
|
||||
2 | MigjeniMigjeniMigjeni
|
||||
3 | MigjeniMigjeniMigjeni
|
||||
4 | MigjeniMigjeniMigjeni
|
||||
5 | MigjeniMigjeniMigjeni
|
||||
6 | MigjeniMigjeniMigjeni
|
||||
7 | MigjeniMigjeniMigjeni
|
||||
8 | MigjeniMigjeniMigjeni
|
||||
9 | MigjeniMigjeniMigjeni
|
||||
10 | MigjeniMigjeniMigjeni
|
||||
(10 rows)
|
||||
|
||||
SET client_min_messages to 'NOTICE';
|
||||
DROP FUNCTION author_articles_max_id();
|
||||
DROP FUNCTION author_articles_id_word_count();
|
||||
|
|
|
@ -694,6 +694,76 @@ SELECT * FROM collections_list, collections_list_0 WHERE collections_list.key=co
|
|||
100 | 0 | 10000 | 100 | 0 | 10000
|
||||
(1 row)
|
||||
|
||||
-- test hash distribution using INSERT with generate_series() function
|
||||
CREATE OR REPLACE FUNCTION part_hashint4_noop(value int4, seed int8)
|
||||
RETURNS int8 AS $$
|
||||
SELECT value + seed;
|
||||
$$ LANGUAGE SQL IMMUTABLE;
|
||||
CREATE OPERATOR CLASS part_test_int4_ops
|
||||
FOR TYPE int4
|
||||
USING HASH AS
|
||||
operator 1 =,
|
||||
function 2 part_hashint4_noop(int4, int8);
|
||||
CREATE TABLE hash_parted (
|
||||
a int,
|
||||
b int
|
||||
) PARTITION BY HASH (a part_test_int4_ops);
|
||||
CREATE TABLE hpart0 PARTITION OF hash_parted FOR VALUES WITH (modulus 4, remainder 0);
|
||||
CREATE TABLE hpart1 PARTITION OF hash_parted FOR VALUES WITH (modulus 4, remainder 1);
|
||||
CREATE TABLE hpart2 PARTITION OF hash_parted FOR VALUES WITH (modulus 4, remainder 2);
|
||||
CREATE TABLE hpart3 PARTITION OF hash_parted FOR VALUES WITH (modulus 4, remainder 3);
|
||||
SELECT create_distributed_table('hash_parted ', 'a');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
INSERT INTO hash_parted VALUES (1, generate_series(1, 10));
|
||||
SELECT * FROM hash_parted ORDER BY 1, 2;
|
||||
a | b
|
||||
---------------------------------------------------------------------
|
||||
1 | 1
|
||||
1 | 2
|
||||
1 | 3
|
||||
1 | 4
|
||||
1 | 5
|
||||
1 | 6
|
||||
1 | 7
|
||||
1 | 8
|
||||
1 | 9
|
||||
1 | 10
|
||||
(10 rows)
|
||||
|
||||
ALTER TABLE hash_parted DETACH PARTITION hpart0;
|
||||
ALTER TABLE hash_parted DETACH PARTITION hpart1;
|
||||
ALTER TABLE hash_parted DETACH PARTITION hpart2;
|
||||
ALTER TABLE hash_parted DETACH PARTITION hpart3;
|
||||
-- test range partition without creating partitions and inserting with generate_series()
|
||||
-- should error out even in plain PG since no partition of relation "parent_tab" is found for row
|
||||
-- in Citus it errors out because it fails to evaluate partition key in insert
|
||||
CREATE TABLE parent_tab (id int) PARTITION BY RANGE (id);
|
||||
SELECT create_distributed_table('parent_tab', 'id');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
INSERT INTO parent_tab VALUES (generate_series(0, 3));
|
||||
ERROR: failed to evaluate partition key in insert
|
||||
HINT: try using constant values for partition column
|
||||
-- now it should work
|
||||
CREATE TABLE parent_tab_1_2 PARTITION OF parent_tab FOR VALUES FROM (1) to (2);
|
||||
ALTER TABLE parent_tab ADD COLUMN b int;
|
||||
INSERT INTO parent_tab VALUES (1, generate_series(0, 3));
|
||||
SELECT * FROM parent_tab ORDER BY 1, 2;
|
||||
id | b
|
||||
---------------------------------------------------------------------
|
||||
1 | 0
|
||||
1 | 1
|
||||
1 | 2
|
||||
1 | 3
|
||||
(4 rows)
|
||||
|
||||
-- make sure that parallel accesses are good
|
||||
SET citus.force_max_query_parallelization TO ON;
|
||||
SELECT * FROM test_2 ORDER BY 1 DESC;
|
||||
|
|
|
@ -822,6 +822,69 @@ EXECUTE olu(1,ARRAY[1,2],ARRAY[1,2]);
|
|||
{1} | {1} | {NULL}
|
||||
(1 row)
|
||||
|
||||
-- test insert query with insert CTE
|
||||
WITH insert_cte AS
|
||||
(INSERT INTO with_modifying.modify_table VALUES (23, 7))
|
||||
INSERT INTO with_modifying.anchor_table VALUES (1998);
|
||||
SELECT * FROM with_modifying.modify_table WHERE id = 23 AND val = 7;
|
||||
id | val
|
||||
---------------------------------------------------------------------
|
||||
23 | 7
|
||||
(1 row)
|
||||
|
||||
SELECT * FROM with_modifying.anchor_table WHERE id = 1998;
|
||||
id
|
||||
---------------------------------------------------------------------
|
||||
1998
|
||||
(1 row)
|
||||
|
||||
-- test insert query with multiple CTEs
|
||||
WITH select_cte AS (SELECT * FROM with_modifying.anchor_table),
|
||||
modifying_cte AS (INSERT INTO with_modifying.anchor_table SELECT * FROM select_cte)
|
||||
INSERT INTO with_modifying.anchor_table VALUES (1995);
|
||||
SELECT * FROM with_modifying.anchor_table ORDER BY 1;
|
||||
id
|
||||
---------------------------------------------------------------------
|
||||
1
|
||||
1
|
||||
2
|
||||
2
|
||||
1995
|
||||
1998
|
||||
1998
|
||||
(7 rows)
|
||||
|
||||
-- test with returning
|
||||
WITH returning_cte AS (INSERT INTO with_modifying.anchor_table values (1997) RETURNING *)
|
||||
INSERT INTO with_modifying.anchor_table VALUES (1996);
|
||||
SELECT * FROM with_modifying.anchor_table WHERE id IN (1996, 1997) ORDER BY 1;
|
||||
id
|
||||
---------------------------------------------------------------------
|
||||
1996
|
||||
1997
|
||||
(2 rows)
|
||||
|
||||
-- test insert query with select CTE
|
||||
WITH select_cte AS
|
||||
(SELECT * FROM with_modifying.modify_table)
|
||||
INSERT INTO with_modifying.anchor_table VALUES (1990);
|
||||
SELECT * FROM with_modifying.anchor_table WHERE id = 1990;
|
||||
id
|
||||
---------------------------------------------------------------------
|
||||
1990
|
||||
(1 row)
|
||||
|
||||
-- even if we do multi-row insert, it is not fast path router due to cte
|
||||
WITH select_cte AS (SELECT 1 AS col)
|
||||
INSERT INTO with_modifying.anchor_table VALUES (1991), (1992);
|
||||
SELECT * FROM with_modifying.anchor_table WHERE id IN (1991, 1992) ORDER BY 1;
|
||||
id
|
||||
---------------------------------------------------------------------
|
||||
1991
|
||||
1992
|
||||
(2 rows)
|
||||
|
||||
DELETE FROM with_modifying.anchor_table WHERE id IN (1990, 1991, 1992, 1995, 1996, 1997, 1998);
|
||||
-- Test with replication factor 2
|
||||
SET citus.shard_replication_factor to 2;
|
||||
DROP TABLE modify_table;
|
||||
|
|
|
@ -49,7 +49,7 @@ SET citus.shard_replication_factor TO 1;
|
|||
SET citus.shard_count TO 2;
|
||||
SELECT create_distributed_table('articles_hash', 'author_id');
|
||||
|
||||
CREATE TABLE authors_reference ( name varchar(20), id bigint );
|
||||
CREATE TABLE authors_reference (id int, name text);
|
||||
SELECT create_reference_table('authors_reference');
|
||||
|
||||
-- create a bunch of test data
|
||||
|
@ -850,6 +850,10 @@ SELECT count(*) FILTER (where value = 15) FROM collections_list WHERE key = 4;
|
|||
SELECT count(*) FILTER (where value = 15) FROM collections_list_1 WHERE key = 4;
|
||||
SELECT count(*) FILTER (where value = 15) FROM collections_list_2 WHERE key = 4;
|
||||
|
||||
-- test INSERT using values from generate_series() and repeat() functions
|
||||
INSERT INTO authors_reference (id, name) VALUES (generate_series(1, 10), repeat('Migjeni', 3));
|
||||
SELECT * FROM authors_reference ORDER BY 1, 2;
|
||||
|
||||
SET client_min_messages to 'NOTICE';
|
||||
|
||||
DROP FUNCTION author_articles_max_id();
|
||||
|
|
|
@ -409,6 +409,50 @@ SELECT count(*) FROM collections_list_1 WHERE key = 11;
|
|||
ALTER TABLE collections_list DROP COLUMN ts;
|
||||
SELECT * FROM collections_list, collections_list_0 WHERE collections_list.key=collections_list_0.key ORDER BY 1 DESC,2 DESC,3 DESC,4 DESC LIMIT 1;
|
||||
|
||||
-- test hash distribution using INSERT with generate_series() function
|
||||
CREATE OR REPLACE FUNCTION part_hashint4_noop(value int4, seed int8)
|
||||
RETURNS int8 AS $$
|
||||
SELECT value + seed;
|
||||
$$ LANGUAGE SQL IMMUTABLE;
|
||||
|
||||
CREATE OPERATOR CLASS part_test_int4_ops
|
||||
FOR TYPE int4
|
||||
USING HASH AS
|
||||
operator 1 =,
|
||||
function 2 part_hashint4_noop(int4, int8);
|
||||
|
||||
CREATE TABLE hash_parted (
|
||||
a int,
|
||||
b int
|
||||
) PARTITION BY HASH (a part_test_int4_ops);
|
||||
CREATE TABLE hpart0 PARTITION OF hash_parted FOR VALUES WITH (modulus 4, remainder 0);
|
||||
CREATE TABLE hpart1 PARTITION OF hash_parted FOR VALUES WITH (modulus 4, remainder 1);
|
||||
CREATE TABLE hpart2 PARTITION OF hash_parted FOR VALUES WITH (modulus 4, remainder 2);
|
||||
CREATE TABLE hpart3 PARTITION OF hash_parted FOR VALUES WITH (modulus 4, remainder 3);
|
||||
|
||||
SELECT create_distributed_table('hash_parted ', 'a');
|
||||
|
||||
INSERT INTO hash_parted VALUES (1, generate_series(1, 10));
|
||||
|
||||
SELECT * FROM hash_parted ORDER BY 1, 2;
|
||||
|
||||
ALTER TABLE hash_parted DETACH PARTITION hpart0;
|
||||
ALTER TABLE hash_parted DETACH PARTITION hpart1;
|
||||
ALTER TABLE hash_parted DETACH PARTITION hpart2;
|
||||
ALTER TABLE hash_parted DETACH PARTITION hpart3;
|
||||
|
||||
-- test range partition without creating partitions and inserting with generate_series()
|
||||
-- should error out even in plain PG since no partition of relation "parent_tab" is found for row
|
||||
-- in Citus it errors out because it fails to evaluate partition key in insert
|
||||
CREATE TABLE parent_tab (id int) PARTITION BY RANGE (id);
|
||||
SELECT create_distributed_table('parent_tab', 'id');
|
||||
INSERT INTO parent_tab VALUES (generate_series(0, 3));
|
||||
-- now it should work
|
||||
CREATE TABLE parent_tab_1_2 PARTITION OF parent_tab FOR VALUES FROM (1) to (2);
|
||||
ALTER TABLE parent_tab ADD COLUMN b int;
|
||||
INSERT INTO parent_tab VALUES (1, generate_series(0, 3));
|
||||
SELECT * FROM parent_tab ORDER BY 1, 2;
|
||||
|
||||
-- make sure that parallel accesses are good
|
||||
SET citus.force_max_query_parallelization TO ON;
|
||||
SELECT * FROM test_2 ORDER BY 1 DESC;
|
||||
|
|
|
@ -491,6 +491,37 @@ EXECUTE olu(1,ARRAY[1,2],ARRAY[1,2]);
|
|||
EXECUTE olu(1,ARRAY[1,2],ARRAY[1,2]);
|
||||
EXECUTE olu(1,ARRAY[1,2],ARRAY[1,2]);
|
||||
|
||||
-- test insert query with insert CTE
|
||||
WITH insert_cte AS
|
||||
(INSERT INTO with_modifying.modify_table VALUES (23, 7))
|
||||
INSERT INTO with_modifying.anchor_table VALUES (1998);
|
||||
SELECT * FROM with_modifying.modify_table WHERE id = 23 AND val = 7;
|
||||
SELECT * FROM with_modifying.anchor_table WHERE id = 1998;
|
||||
|
||||
-- test insert query with multiple CTEs
|
||||
WITH select_cte AS (SELECT * FROM with_modifying.anchor_table),
|
||||
modifying_cte AS (INSERT INTO with_modifying.anchor_table SELECT * FROM select_cte)
|
||||
INSERT INTO with_modifying.anchor_table VALUES (1995);
|
||||
SELECT * FROM with_modifying.anchor_table ORDER BY 1;
|
||||
|
||||
-- test with returning
|
||||
WITH returning_cte AS (INSERT INTO with_modifying.anchor_table values (1997) RETURNING *)
|
||||
INSERT INTO with_modifying.anchor_table VALUES (1996);
|
||||
SELECT * FROM with_modifying.anchor_table WHERE id IN (1996, 1997) ORDER BY 1;
|
||||
|
||||
-- test insert query with select CTE
|
||||
WITH select_cte AS
|
||||
(SELECT * FROM with_modifying.modify_table)
|
||||
INSERT INTO with_modifying.anchor_table VALUES (1990);
|
||||
SELECT * FROM with_modifying.anchor_table WHERE id = 1990;
|
||||
|
||||
-- even if we do multi-row insert, it is not fast path router due to cte
|
||||
WITH select_cte AS (SELECT 1 AS col)
|
||||
INSERT INTO with_modifying.anchor_table VALUES (1991), (1992);
|
||||
SELECT * FROM with_modifying.anchor_table WHERE id IN (1991, 1992) ORDER BY 1;
|
||||
|
||||
DELETE FROM with_modifying.anchor_table WHERE id IN (1990, 1991, 1992, 1995, 1996, 1997, 1998);
|
||||
|
||||
-- Test with replication factor 2
|
||||
SET citus.shard_replication_factor to 2;
|
||||
|
||||
|
|
Loading…
Reference in New Issue