From 625922e2004b887520279aec193c72f095c70e39 Mon Sep 17 00:00:00 2001 From: Marco Slot Date: Fri, 6 Dec 2019 14:55:59 +0100 Subject: [PATCH] Fix inserts into local tables with distributed subqueries --- .../planner/multi_router_planner.c | 13 +++++- src/test/regress/expected/dml_recursive.out | 2 +- .../regress/expected/multi_modifications.out | 6 +++ .../expected/multi_shard_update_delete.out | 46 +++++++++---------- src/test/regress/sql/multi_modifications.sql | 6 +++ 5 files changed, 47 insertions(+), 26 deletions(-) diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index 14f732bdb..98250559e 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -546,14 +546,23 @@ DeferredErrorMessage * ModifyQuerySupported(Query *queryTree, Query *originalQuery, bool multiShardQuery, PlannerRestrictionContext *plannerRestrictionContext) { - Oid distributedTableId = ExtractFirstDistributedTableId(queryTree); uint32 rangeTableId = 1; - Var *partitionColumn = PartitionColumn(distributedTableId, rangeTableId); List *rangeTableList = NIL; ListCell *rangeTableCell = NULL; uint32 queryTableCount = 0; CmdType commandType = queryTree->commandType; + Oid distributedTableId = ModifyQueryResultRelationId(queryTree); + if (!IsDistributedTable(distributedTableId)) + { + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "cannot plan modifications of local tables involving " + "distributed tables", + NULL, NULL); + } + + Var *partitionColumn = PartitionColumn(distributedTableId, rangeTableId); + DeferredErrorMessage *deferredError = DeferErrorIfModifyView(queryTree); if (deferredError != NULL) { diff --git a/src/test/regress/expected/dml_recursive.out b/src/test/regress/expected/dml_recursive.out index 64690f3a7..d096e78ea 100644 --- a/src/test/regress/expected/dml_recursive.out +++ b/src/test/regress/expected/dml_recursive.out @@ -305,7 +305,7 @@ FROM distributed_table WHERE distributed_table.tenant_id = local_table.id; -ERROR: relation local_table is not distributed +ERROR: cannot plan modifications of local tables involving distributed tables RESET client_min_messages; DROP SCHEMA recursive_dml_queries CASCADE; NOTICE: drop cascades to 5 other objects diff --git a/src/test/regress/expected/multi_modifications.out b/src/test/regress/expected/multi_modifications.out index f1a9813d9..8188853f8 100644 --- a/src/test/regress/expected/multi_modifications.out +++ b/src/test/regress/expected/multi_modifications.out @@ -1,6 +1,7 @@ SET citus.shard_count TO 32; SET citus.next_shard_id TO 750000; SET citus.next_placement_id TO 750000; +CREATE SCHEMA multi_modifications; -- some failure messages that comes from the worker nodes -- might change due to parallel executions, so suppress those -- using \set VERBOSITY terse @@ -1304,7 +1305,12 @@ ERROR: relation pg_namespace is not distributed DELETE FROM summary_table WHERE id < ( SELECT 0 FROM pg_dist_node ); +CREATE TABLE multi_modifications.local (a int default 1, b int); +INSERT INTO multi_modifications.local VALUES (default, (SELECT min(id) FROM summary_table)); +ERROR: cannot plan modifications of local tables involving distributed tables DROP TABLE raw_table; DROP TABLE summary_table; DROP TABLE reference_raw_table; DROP TABLE reference_summary_table; +DROP SCHEMA multi_modifications CASCADE; +NOTICE: drop cascades to table multi_modifications.local diff --git a/src/test/regress/expected/multi_shard_update_delete.out b/src/test/regress/expected/multi_shard_update_delete.out index 851ac88e0..48c4d3ee5 100644 --- a/src/test/regress/expected/multi_shard_update_delete.out +++ b/src/test/regress/expected/multi_shard_update_delete.out @@ -163,7 +163,7 @@ SELECT SUM(value_1), SUM(value_3) FROM users_test_table; 0 | 0 (1 row) --- Test on append table (set executor mode to sequential, since with the append +-- Test on append table (set executor mode to sequential, since with the append -- distributed tables parallel executor may create tons of connections) SET citus.multi_shard_modify_mode to sequential; CREATE TABLE append_stage_table(id int, col_2 int); @@ -250,7 +250,7 @@ INSERT INTO tt1 VALUES(7,7); INSERT INTO tt1 VALUES(9,8); BEGIN; -- Update rows from partititon tt1_1120 -UPDATE tt1 SET col_2 = 12 WHERE col_2 > 10 and col_2 < 20; +UPDATE tt1 SET col_2 = 12 WHERE col_2 > 10 and col_2 < 20; -- Update rows from partititon tt1_510 UPDATE tt1 SET col_2 = 7 WHERE col_2 < 10 and col_2 > 5; COMMIT; @@ -267,7 +267,7 @@ SELECT * FROM tt1 ORDER BY id; -- Modify main table and partition table within same transaction BEGIN; -UPDATE tt1 SET col_2 = 12 WHERE col_2 > 10 and col_2 < 20; +UPDATE tt1 SET col_2 = 12 WHERE col_2 > 10 and col_2 < 20; UPDATE tt1 SET col_2 = 7 WHERE col_2 < 10 and col_2 > 5; DELETE FROM tt1_510; DELETE FROM tt1_1120; @@ -394,7 +394,7 @@ WHERE user_id IN (SELECT user_id FROM users_test_table UNION SELECT user_id - FROM events_test_table) returning value_3; + FROM events_test_table) returning value_3; value_3 --------- 0 @@ -434,33 +434,33 @@ WHERE user_id IN (SELECT user_id UPDATE users_test_table SET value_1 = 5 -WHERE - value_2 > - (SELECT - max(value_2) - FROM - events_test_table - WHERE +WHERE + value_2 > + (SELECT + max(value_2) + FROM + events_test_table + WHERE users_test_table.user_id = events_test_table.user_id GROUP BY user_id ); UPDATE users_test_table SET value_3 = 1 -WHERE - value_2 > - (SELECT - max(value_2) - FROM - events_test_table - WHERE - users_test_table.user_id = events_test_table.user_id AND +WHERE + value_2 > + (SELECT + max(value_2) + FROM + events_test_table + WHERE + users_test_table.user_id = events_test_table.user_id AND users_test_table.value_2 > events_test_table.value_2 GROUP BY user_id ); UPDATE users_test_table -SET value_2 = 4 +SET value_2 = 4 WHERE value_1 > 1 AND value_1 < 3 AND value_2 >= 1 @@ -541,7 +541,7 @@ WHERE users_reference_copy_table.user_id = events_test_table.value_1; -- Both reference tables and hash distributed tables can be used in subquery UPDATE events_test_table as ett SET value_2 = 6 -WHERE ett.value_3 IN (SELECT utt.value_3 +WHERE ett.value_3 IN (SELECT utt.value_3 FROM users_test_table as utt, users_reference_copy_table as uct WHERE utt.user_id = uct.user_id AND utt.user_id = ett.user_id); -- We don't need equality check with constant values in sub-select @@ -730,7 +730,7 @@ UPDATE events_test_table_local SET value_2 = 5 FROM users_test_table WHERE events_test_table_local.user_id = users_test_table.user_id; -ERROR: relation events_test_table_local is not distributed +ERROR: cannot plan modifications of local tables involving distributed tables -- Local tables in a subquery are supported through recursive planning UPDATE users_test_table SET value_2 = 5 @@ -742,7 +742,7 @@ FROM events_test_table_2 WHERE users_test_table.user_id = events_test_table_2.user_id; ERROR: cannot push down this subquery DETAIL: Shards of relations in subquery need to have 1-to-1 shard partitioning --- Should error out due to multiple row return from subquery, but we can not get this information within +-- Should error out due to multiple row return from subquery, but we can not get this information within -- subquery pushdown planner. This query will be sent to worker with recursive planner. \set VERBOSITY terse DELETE FROM users_test_table diff --git a/src/test/regress/sql/multi_modifications.sql b/src/test/regress/sql/multi_modifications.sql index 4b1010fbc..912198f69 100644 --- a/src/test/regress/sql/multi_modifications.sql +++ b/src/test/regress/sql/multi_modifications.sql @@ -2,6 +2,8 @@ SET citus.shard_count TO 32; SET citus.next_shard_id TO 750000; SET citus.next_placement_id TO 750000; +CREATE SCHEMA multi_modifications; + -- some failure messages that comes from the worker nodes -- might change due to parallel executions, so suppress those -- using \set VERBOSITY terse @@ -880,7 +882,11 @@ DELETE FROM summary_table WHERE id < ( SELECT 0 FROM pg_dist_node ); +CREATE TABLE multi_modifications.local (a int default 1, b int); +INSERT INTO multi_modifications.local VALUES (default, (SELECT min(id) FROM summary_table)); + DROP TABLE raw_table; DROP TABLE summary_table; DROP TABLE reference_raw_table; DROP TABLE reference_summary_table; +DROP SCHEMA multi_modifications CASCADE;