mirror of https://github.com/citusdata/citus.git
Fix inserts into local tables with distributed subqueries
parent
8e5041885d
commit
486c620a3c
|
@ -547,14 +547,23 @@ DeferredErrorMessage *
|
||||||
ModifyQuerySupported(Query *queryTree, Query *originalQuery, bool multiShardQuery,
|
ModifyQuerySupported(Query *queryTree, Query *originalQuery, bool multiShardQuery,
|
||||||
PlannerRestrictionContext *plannerRestrictionContext)
|
PlannerRestrictionContext *plannerRestrictionContext)
|
||||||
{
|
{
|
||||||
Oid distributedTableId = ExtractFirstDistributedTableId(queryTree);
|
|
||||||
uint32 rangeTableId = 1;
|
uint32 rangeTableId = 1;
|
||||||
Var *partitionColumn = PartitionColumn(distributedTableId, rangeTableId);
|
|
||||||
List *rangeTableList = NIL;
|
List *rangeTableList = NIL;
|
||||||
ListCell *rangeTableCell = NULL;
|
ListCell *rangeTableCell = NULL;
|
||||||
uint32 queryTableCount = 0;
|
uint32 queryTableCount = 0;
|
||||||
CmdType commandType = queryTree->commandType;
|
CmdType commandType = queryTree->commandType;
|
||||||
|
|
||||||
|
Oid distributedTableId = ModifyQueryResultRelationId(queryTree);
|
||||||
|
if (!IsDistributedTable(distributedTableId))
|
||||||
|
{
|
||||||
|
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
|
||||||
|
"cannot plan modifications of local tables involving "
|
||||||
|
"distributed tables",
|
||||||
|
NULL, NULL);
|
||||||
|
}
|
||||||
|
|
||||||
|
Var *partitionColumn = PartitionColumn(distributedTableId, rangeTableId);
|
||||||
|
|
||||||
DeferredErrorMessage *deferredError = DeferErrorIfModifyView(queryTree);
|
DeferredErrorMessage *deferredError = DeferErrorIfModifyView(queryTree);
|
||||||
if (deferredError != NULL)
|
if (deferredError != NULL)
|
||||||
{
|
{
|
||||||
|
|
|
@ -305,7 +305,7 @@ FROM
|
||||||
distributed_table
|
distributed_table
|
||||||
WHERE
|
WHERE
|
||||||
distributed_table.tenant_id = local_table.id;
|
distributed_table.tenant_id = local_table.id;
|
||||||
ERROR: relation local_table is not distributed
|
ERROR: cannot plan modifications of local tables involving distributed tables
|
||||||
RESET client_min_messages;
|
RESET client_min_messages;
|
||||||
DROP SCHEMA recursive_dml_queries CASCADE;
|
DROP SCHEMA recursive_dml_queries CASCADE;
|
||||||
NOTICE: drop cascades to 5 other objects
|
NOTICE: drop cascades to 5 other objects
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
SET citus.shard_count TO 32;
|
SET citus.shard_count TO 32;
|
||||||
SET citus.next_shard_id TO 750000;
|
SET citus.next_shard_id TO 750000;
|
||||||
SET citus.next_placement_id TO 750000;
|
SET citus.next_placement_id TO 750000;
|
||||||
|
CREATE SCHEMA multi_modifications;
|
||||||
-- some failure messages that comes from the worker nodes
|
-- some failure messages that comes from the worker nodes
|
||||||
-- might change due to parallel executions, so suppress those
|
-- might change due to parallel executions, so suppress those
|
||||||
-- using \set VERBOSITY terse
|
-- using \set VERBOSITY terse
|
||||||
|
@ -1297,7 +1298,12 @@ ERROR: relation pg_namespace is not distributed
|
||||||
DELETE FROM summary_table WHERE id < (
|
DELETE FROM summary_table WHERE id < (
|
||||||
SELECT 0 FROM pg_dist_node
|
SELECT 0 FROM pg_dist_node
|
||||||
);
|
);
|
||||||
|
CREATE TABLE multi_modifications.local (a int default 1, b int);
|
||||||
|
INSERT INTO multi_modifications.local VALUES (default, (SELECT min(id) FROM summary_table));
|
||||||
|
ERROR: cannot plan modifications of local tables involving distributed tables
|
||||||
DROP TABLE raw_table;
|
DROP TABLE raw_table;
|
||||||
DROP TABLE summary_table;
|
DROP TABLE summary_table;
|
||||||
DROP TABLE reference_raw_table;
|
DROP TABLE reference_raw_table;
|
||||||
DROP TABLE reference_summary_table;
|
DROP TABLE reference_summary_table;
|
||||||
|
DROP SCHEMA multi_modifications CASCADE;
|
||||||
|
NOTICE: drop cascades to table multi_modifications.local
|
||||||
|
|
|
@ -163,7 +163,7 @@ SELECT SUM(value_1), SUM(value_3) FROM users_test_table;
|
||||||
0 | 0
|
0 | 0
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
-- Test on append table (set executor mode to sequential, since with the append
|
-- Test on append table (set executor mode to sequential, since with the append
|
||||||
-- distributed tables parallel executor may create tons of connections)
|
-- distributed tables parallel executor may create tons of connections)
|
||||||
SET citus.multi_shard_modify_mode to sequential;
|
SET citus.multi_shard_modify_mode to sequential;
|
||||||
CREATE TABLE append_stage_table(id int, col_2 int);
|
CREATE TABLE append_stage_table(id int, col_2 int);
|
||||||
|
@ -250,7 +250,7 @@ INSERT INTO tt1 VALUES(7,7);
|
||||||
INSERT INTO tt1 VALUES(9,8);
|
INSERT INTO tt1 VALUES(9,8);
|
||||||
BEGIN;
|
BEGIN;
|
||||||
-- Update rows from partititon tt1_1120
|
-- Update rows from partititon tt1_1120
|
||||||
UPDATE tt1 SET col_2 = 12 WHERE col_2 > 10 and col_2 < 20;
|
UPDATE tt1 SET col_2 = 12 WHERE col_2 > 10 and col_2 < 20;
|
||||||
-- Update rows from partititon tt1_510
|
-- Update rows from partititon tt1_510
|
||||||
UPDATE tt1 SET col_2 = 7 WHERE col_2 < 10 and col_2 > 5;
|
UPDATE tt1 SET col_2 = 7 WHERE col_2 < 10 and col_2 > 5;
|
||||||
COMMIT;
|
COMMIT;
|
||||||
|
@ -267,7 +267,7 @@ SELECT * FROM tt1 ORDER BY id;
|
||||||
|
|
||||||
-- Modify main table and partition table within same transaction
|
-- Modify main table and partition table within same transaction
|
||||||
BEGIN;
|
BEGIN;
|
||||||
UPDATE tt1 SET col_2 = 12 WHERE col_2 > 10 and col_2 < 20;
|
UPDATE tt1 SET col_2 = 12 WHERE col_2 > 10 and col_2 < 20;
|
||||||
UPDATE tt1 SET col_2 = 7 WHERE col_2 < 10 and col_2 > 5;
|
UPDATE tt1 SET col_2 = 7 WHERE col_2 < 10 and col_2 > 5;
|
||||||
DELETE FROM tt1_510;
|
DELETE FROM tt1_510;
|
||||||
DELETE FROM tt1_1120;
|
DELETE FROM tt1_1120;
|
||||||
|
@ -394,7 +394,7 @@ WHERE user_id IN (SELECT user_id
|
||||||
FROM users_test_table
|
FROM users_test_table
|
||||||
UNION
|
UNION
|
||||||
SELECT user_id
|
SELECT user_id
|
||||||
FROM events_test_table) returning value_3;
|
FROM events_test_table) returning value_3;
|
||||||
value_3
|
value_3
|
||||||
---------
|
---------
|
||||||
0
|
0
|
||||||
|
@ -434,33 +434,33 @@ WHERE user_id IN (SELECT user_id
|
||||||
|
|
||||||
UPDATE users_test_table
|
UPDATE users_test_table
|
||||||
SET value_1 = 5
|
SET value_1 = 5
|
||||||
WHERE
|
WHERE
|
||||||
value_2 >
|
value_2 >
|
||||||
(SELECT
|
(SELECT
|
||||||
max(value_2)
|
max(value_2)
|
||||||
FROM
|
FROM
|
||||||
events_test_table
|
events_test_table
|
||||||
WHERE
|
WHERE
|
||||||
users_test_table.user_id = events_test_table.user_id
|
users_test_table.user_id = events_test_table.user_id
|
||||||
GROUP BY
|
GROUP BY
|
||||||
user_id
|
user_id
|
||||||
);
|
);
|
||||||
UPDATE users_test_table
|
UPDATE users_test_table
|
||||||
SET value_3 = 1
|
SET value_3 = 1
|
||||||
WHERE
|
WHERE
|
||||||
value_2 >
|
value_2 >
|
||||||
(SELECT
|
(SELECT
|
||||||
max(value_2)
|
max(value_2)
|
||||||
FROM
|
FROM
|
||||||
events_test_table
|
events_test_table
|
||||||
WHERE
|
WHERE
|
||||||
users_test_table.user_id = events_test_table.user_id AND
|
users_test_table.user_id = events_test_table.user_id AND
|
||||||
users_test_table.value_2 > events_test_table.value_2
|
users_test_table.value_2 > events_test_table.value_2
|
||||||
GROUP BY
|
GROUP BY
|
||||||
user_id
|
user_id
|
||||||
);
|
);
|
||||||
UPDATE users_test_table
|
UPDATE users_test_table
|
||||||
SET value_2 = 4
|
SET value_2 = 4
|
||||||
WHERE
|
WHERE
|
||||||
value_1 > 1 AND value_1 < 3
|
value_1 > 1 AND value_1 < 3
|
||||||
AND value_2 >= 1
|
AND value_2 >= 1
|
||||||
|
@ -541,7 +541,7 @@ WHERE users_reference_copy_table.user_id = events_test_table.value_1;
|
||||||
-- Both reference tables and hash distributed tables can be used in subquery
|
-- Both reference tables and hash distributed tables can be used in subquery
|
||||||
UPDATE events_test_table as ett
|
UPDATE events_test_table as ett
|
||||||
SET value_2 = 6
|
SET value_2 = 6
|
||||||
WHERE ett.value_3 IN (SELECT utt.value_3
|
WHERE ett.value_3 IN (SELECT utt.value_3
|
||||||
FROM users_test_table as utt, users_reference_copy_table as uct
|
FROM users_test_table as utt, users_reference_copy_table as uct
|
||||||
WHERE utt.user_id = uct.user_id AND utt.user_id = ett.user_id);
|
WHERE utt.user_id = uct.user_id AND utt.user_id = ett.user_id);
|
||||||
-- We don't need equality check with constant values in sub-select
|
-- We don't need equality check with constant values in sub-select
|
||||||
|
@ -730,7 +730,7 @@ UPDATE events_test_table_local
|
||||||
SET value_2 = 5
|
SET value_2 = 5
|
||||||
FROM users_test_table
|
FROM users_test_table
|
||||||
WHERE events_test_table_local.user_id = users_test_table.user_id;
|
WHERE events_test_table_local.user_id = users_test_table.user_id;
|
||||||
ERROR: relation events_test_table_local is not distributed
|
ERROR: cannot plan modifications of local tables involving distributed tables
|
||||||
-- Local tables in a subquery are supported through recursive planning
|
-- Local tables in a subquery are supported through recursive planning
|
||||||
UPDATE users_test_table
|
UPDATE users_test_table
|
||||||
SET value_2 = 5
|
SET value_2 = 5
|
||||||
|
@ -742,7 +742,7 @@ FROM events_test_table_2
|
||||||
WHERE users_test_table.user_id = events_test_table_2.user_id;
|
WHERE users_test_table.user_id = events_test_table_2.user_id;
|
||||||
ERROR: cannot push down this subquery
|
ERROR: cannot push down this subquery
|
||||||
DETAIL: Shards of relations in subquery need to have 1-to-1 shard partitioning
|
DETAIL: Shards of relations in subquery need to have 1-to-1 shard partitioning
|
||||||
-- Should error out due to multiple row return from subquery, but we can not get this information within
|
-- Should error out due to multiple row return from subquery, but we can not get this information within
|
||||||
-- subquery pushdown planner. This query will be sent to worker with recursive planner.
|
-- subquery pushdown planner. This query will be sent to worker with recursive planner.
|
||||||
\set VERBOSITY terse
|
\set VERBOSITY terse
|
||||||
DELETE FROM users_test_table
|
DELETE FROM users_test_table
|
||||||
|
|
|
@ -2,6 +2,8 @@ SET citus.shard_count TO 32;
|
||||||
SET citus.next_shard_id TO 750000;
|
SET citus.next_shard_id TO 750000;
|
||||||
SET citus.next_placement_id TO 750000;
|
SET citus.next_placement_id TO 750000;
|
||||||
|
|
||||||
|
CREATE SCHEMA multi_modifications;
|
||||||
|
|
||||||
-- some failure messages that comes from the worker nodes
|
-- some failure messages that comes from the worker nodes
|
||||||
-- might change due to parallel executions, so suppress those
|
-- might change due to parallel executions, so suppress those
|
||||||
-- using \set VERBOSITY terse
|
-- using \set VERBOSITY terse
|
||||||
|
@ -871,7 +873,11 @@ DELETE FROM summary_table WHERE id < (
|
||||||
SELECT 0 FROM pg_dist_node
|
SELECT 0 FROM pg_dist_node
|
||||||
);
|
);
|
||||||
|
|
||||||
|
CREATE TABLE multi_modifications.local (a int default 1, b int);
|
||||||
|
INSERT INTO multi_modifications.local VALUES (default, (SELECT min(id) FROM summary_table));
|
||||||
|
|
||||||
DROP TABLE raw_table;
|
DROP TABLE raw_table;
|
||||||
DROP TABLE summary_table;
|
DROP TABLE summary_table;
|
||||||
DROP TABLE reference_raw_table;
|
DROP TABLE reference_raw_table;
|
||||||
DROP TABLE reference_summary_table;
|
DROP TABLE reference_summary_table;
|
||||||
|
DROP SCHEMA multi_modifications CASCADE;
|
||||||
|
|
Loading…
Reference in New Issue