diff --git a/src/backend/distributed/planner/multi_explain.c b/src/backend/distributed/planner/multi_explain.c index ab86b4aa2..11ba065ac 100644 --- a/src/backend/distributed/planner/multi_explain.c +++ b/src/backend/distributed/planner/multi_explain.c @@ -154,7 +154,12 @@ CoordinatorInsertSelectExplainScan(CustomScanState *node, List *ancestors, "... SELECT commands via the coordinator"))); } - PlannedStmt *selectPlan = pg_plan_query(query, cursorOptions, params); + /* + * Make a copy of the query, since pg_plan_query may scribble on it and later + * stages of EXPLAIN require it. + */ + Query *queryCopy = copyObject(query); + PlannedStmt *selectPlan = pg_plan_query(queryCopy, cursorOptions, params); if (IsRedistributablePlan(selectPlan->planTree) && IsSupportedRedistributionTarget(targetRelationId)) { diff --git a/src/test/regress/expected/insert_select_repartition.out b/src/test/regress/expected/insert_select_repartition.out index 93b978009..0c246286e 100644 --- a/src/test/regress/expected/insert_select_repartition.out +++ b/src/test/regress/expected/insert_select_repartition.out @@ -629,6 +629,133 @@ SELECT a, count(*), count(distinct b) distinct_values FROM target_table GROUP BY 4 | 6 | 1 (5 rows) +DROP TABLE source_table, target_table; +-- +-- Constraint failure and rollback +-- +SET citus.shard_count TO 4; +CREATE TABLE source_table(a int, b int); +SELECT create_distributed_table('source_table', 'a'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO source_table SELECT i, i * i FROM generate_series(1, 10) i; +UPDATE source_table SET b = NULL where b IN (9, 4); +SET citus.shard_replication_factor TO 2; +CREATE TABLE target_table(a int, b int not null); +SELECT create_distributed_table('target_table', 'a', 'range'); +NOTICE: using statement-based replication + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +CALL public.create_range_partitioned_shards('target_table', '{0,3,6,9}','{2,5,8,50}'); +INSERT INTO target_table VALUES (11,9), (22,4); +EXPLAIN (costs off) INSERT INTO target_table SELECT * FROM source_table; + QUERY PLAN +--------------------------------------------------------------------- + Custom Scan (Citus INSERT ... SELECT) + INSERT/SELECT method: repartition + -> Custom Scan (Citus Adaptive) + Task Count: 4 + Tasks Shown: One of 4 + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Seq Scan on source_table_4213613 source_table +(8 rows) + +EXPLAIN (costs off) INSERT INTO target_table SELECT * FROM source_table WHERE b IS NOT NULL; + QUERY PLAN +--------------------------------------------------------------------- + Custom Scan (Citus INSERT ... SELECT) + INSERT/SELECT method: repartition + -> Custom Scan (Citus Adaptive) + Task Count: 4 + Tasks Shown: One of 4 + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Seq Scan on source_table_4213613 source_table + Filter: (b IS NOT NULL) +(9 rows) + +BEGIN; +SAVEPOINT s1; +INSERT INTO target_table SELECT * FROM source_table; +ERROR: null value in column "b" violates not-null constraint +ROLLBACK TO SAVEPOINT s1; +INSERT INTO target_table SELECT * FROM source_table WHERE b IS NOT NULL; +END; +SELECT * FROM target_table ORDER BY b; + a | b +--------------------------------------------------------------------- + 1 | 1 + 22 | 4 + 11 | 9 + 4 | 16 + 5 | 25 + 6 | 36 + 7 | 49 + 8 | 64 + 9 | 81 + 10 | 100 +(10 rows) + +-- verify that values have been replicated to both replicas +SELECT * FROM run_command_on_placements('target_table', 'select count(*) from %s') ORDER BY shardid, nodeport; + nodename | nodeport | shardid | success | result +--------------------------------------------------------------------- + localhost | 57637 | 4213617 | t | 1 + localhost | 57638 | 4213617 | t | 1 + localhost | 57637 | 4213618 | t | 2 + localhost | 57638 | 4213618 | t | 2 + localhost | 57637 | 4213619 | t | 3 + localhost | 57638 | 4213619 | t | 3 + localhost | 57637 | 4213620 | t | 4 + localhost | 57638 | 4213620 | t | 4 +(8 rows) + +-- +-- Multiple casts in the SELECT query +-- +TRUNCATE target_table; +SET client_min_messages TO DEBUG2; +INSERT INTO target_table SELECT 1.12, b::bigint FROM source_table WHERE b IS NOT NULL; +DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match +DEBUG: Router planner cannot handle multi-shard select queries +DEBUG: performing repartitioned INSERT ... SELECT +DEBUG: partitioning SELECT query by column index 0 with name 'auto_coerced_by_citus_0' +DEBUG: distributed statement: INSERT INTO insert_select_repartition.target_table_4213617 AS citus_table_alias (a, b) SELECT auto_coerced_by_citus_0, auto_coerced_by_citus_1 FROM read_intermediate_results('{repartitioned_results_from_4213613_to_0,repartitioned_results_from_4213614_to_0,repartitioned_results_from_4213615_to_0,repartitioned_results_from_4213616_to_0}'::text[], 'binary'::citus_copy_format) intermediate_result(auto_coerced_by_citus_0 integer, auto_coerced_by_citus_1 integer) +RESET client_min_messages; +SELECT * FROM target_table ORDER BY a, b; + a | b +--------------------------------------------------------------------- + 1 | 1 + 1 | 16 + 1 | 25 + 1 | 36 + 1 | 49 + 1 | 64 + 1 | 81 + 1 | 100 +(8 rows) + +-- +-- ROLLBACK after out of range error +-- +TRUNCATE target_table; +BEGIN; +INSERT INTO target_table SELECT a * 10, b FROM source_table WHERE b IS NOT NULL; +ERROR: could not find shard for partition column value +END; +SELECT max(result) FROM run_command_on_placements('target_table', 'select count(*) from %s'); + max +--------------------------------------------------------------------- + 0 +(1 row) + DROP TABLE source_table, target_table; SET client_min_messages TO WARNING; DROP SCHEMA insert_select_repartition CASCADE; diff --git a/src/test/regress/sql/insert_select_repartition.sql b/src/test/regress/sql/insert_select_repartition.sql index 7c02c4f47..342d3c07f 100644 --- a/src/test/regress/sql/insert_select_repartition.sql +++ b/src/test/regress/sql/insert_select_repartition.sql @@ -299,6 +299,66 @@ RESET client_min_messages; SELECT a, count(*), count(distinct b) distinct_values FROM target_table GROUP BY a ORDER BY a; +DROP TABLE source_table, target_table; + +-- +-- Constraint failure and rollback +-- + +SET citus.shard_count TO 4; +CREATE TABLE source_table(a int, b int); +SELECT create_distributed_table('source_table', 'a'); +INSERT INTO source_table SELECT i, i * i FROM generate_series(1, 10) i; +UPDATE source_table SET b = NULL where b IN (9, 4); + +SET citus.shard_replication_factor TO 2; +CREATE TABLE target_table(a int, b int not null); +SELECT create_distributed_table('target_table', 'a', 'range'); +CALL public.create_range_partitioned_shards('target_table', '{0,3,6,9}','{2,5,8,50}'); + +INSERT INTO target_table VALUES (11,9), (22,4); + +EXPLAIN (costs off) INSERT INTO target_table SELECT * FROM source_table; +EXPLAIN (costs off) INSERT INTO target_table SELECT * FROM source_table WHERE b IS NOT NULL; + +BEGIN; +SAVEPOINT s1; +INSERT INTO target_table SELECT * FROM source_table; +ROLLBACK TO SAVEPOINT s1; +INSERT INTO target_table SELECT * FROM source_table WHERE b IS NOT NULL; +END; + +SELECT * FROM target_table ORDER BY b; + +-- verify that values have been replicated to both replicas +SELECT * FROM run_command_on_placements('target_table', 'select count(*) from %s') ORDER BY shardid, nodeport; + +-- +-- Multiple casts in the SELECT query +-- + +TRUNCATE target_table; + +SET client_min_messages TO DEBUG2; +INSERT INTO target_table SELECT 1.12, b::bigint FROM source_table WHERE b IS NOT NULL; +RESET client_min_messages; + +SELECT * FROM target_table ORDER BY a, b; + +-- +-- ROLLBACK after out of range error +-- + +TRUNCATE target_table; + +BEGIN; +INSERT INTO target_table SELECT a * 10, b FROM source_table WHERE b IS NOT NULL; +END; + +SELECT max(result) FROM run_command_on_placements('target_table', 'select count(*) from %s'); + + + DROP TABLE source_table, target_table; SET client_min_messages TO WARNING;