mirror of https://github.com/citusdata/citus.git
Repartitioned INSERT/SELECT: Test rollback behaviour
parent
43218eebf6
commit
8635396cea
|
@ -154,7 +154,12 @@ CoordinatorInsertSelectExplainScan(CustomScanState *node, List *ancestors,
|
||||||
"... SELECT commands via the coordinator")));
|
"... SELECT commands via the coordinator")));
|
||||||
}
|
}
|
||||||
|
|
||||||
PlannedStmt *selectPlan = pg_plan_query(query, cursorOptions, params);
|
/*
|
||||||
|
* Make a copy of the query, since pg_plan_query may scribble on it and later
|
||||||
|
* stages of EXPLAIN require it.
|
||||||
|
*/
|
||||||
|
Query *queryCopy = copyObject(query);
|
||||||
|
PlannedStmt *selectPlan = pg_plan_query(queryCopy, cursorOptions, params);
|
||||||
if (IsRedistributablePlan(selectPlan->planTree) &&
|
if (IsRedistributablePlan(selectPlan->planTree) &&
|
||||||
IsSupportedRedistributionTarget(targetRelationId))
|
IsSupportedRedistributionTarget(targetRelationId))
|
||||||
{
|
{
|
||||||
|
|
|
@ -629,6 +629,133 @@ SELECT a, count(*), count(distinct b) distinct_values FROM target_table GROUP BY
|
||||||
4 | 6 | 1
|
4 | 6 | 1
|
||||||
(5 rows)
|
(5 rows)
|
||||||
|
|
||||||
|
DROP TABLE source_table, target_table;
|
||||||
|
--
|
||||||
|
-- Constraint failure and rollback
|
||||||
|
--
|
||||||
|
SET citus.shard_count TO 4;
|
||||||
|
CREATE TABLE source_table(a int, b int);
|
||||||
|
SELECT create_distributed_table('source_table', 'a');
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
INSERT INTO source_table SELECT i, i * i FROM generate_series(1, 10) i;
|
||||||
|
UPDATE source_table SET b = NULL where b IN (9, 4);
|
||||||
|
SET citus.shard_replication_factor TO 2;
|
||||||
|
CREATE TABLE target_table(a int, b int not null);
|
||||||
|
SELECT create_distributed_table('target_table', 'a', 'range');
|
||||||
|
NOTICE: using statement-based replication
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
CALL public.create_range_partitioned_shards('target_table', '{0,3,6,9}','{2,5,8,50}');
|
||||||
|
INSERT INTO target_table VALUES (11,9), (22,4);
|
||||||
|
EXPLAIN (costs off) INSERT INTO target_table SELECT * FROM source_table;
|
||||||
|
QUERY PLAN
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
Custom Scan (Citus INSERT ... SELECT)
|
||||||
|
INSERT/SELECT method: repartition
|
||||||
|
-> Custom Scan (Citus Adaptive)
|
||||||
|
Task Count: 4
|
||||||
|
Tasks Shown: One of 4
|
||||||
|
-> Task
|
||||||
|
Node: host=localhost port=xxxxx dbname=regression
|
||||||
|
-> Seq Scan on source_table_4213613 source_table
|
||||||
|
(8 rows)
|
||||||
|
|
||||||
|
EXPLAIN (costs off) INSERT INTO target_table SELECT * FROM source_table WHERE b IS NOT NULL;
|
||||||
|
QUERY PLAN
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
Custom Scan (Citus INSERT ... SELECT)
|
||||||
|
INSERT/SELECT method: repartition
|
||||||
|
-> Custom Scan (Citus Adaptive)
|
||||||
|
Task Count: 4
|
||||||
|
Tasks Shown: One of 4
|
||||||
|
-> Task
|
||||||
|
Node: host=localhost port=xxxxx dbname=regression
|
||||||
|
-> Seq Scan on source_table_4213613 source_table
|
||||||
|
Filter: (b IS NOT NULL)
|
||||||
|
(9 rows)
|
||||||
|
|
||||||
|
BEGIN;
|
||||||
|
SAVEPOINT s1;
|
||||||
|
INSERT INTO target_table SELECT * FROM source_table;
|
||||||
|
ERROR: null value in column "b" violates not-null constraint
|
||||||
|
ROLLBACK TO SAVEPOINT s1;
|
||||||
|
INSERT INTO target_table SELECT * FROM source_table WHERE b IS NOT NULL;
|
||||||
|
END;
|
||||||
|
SELECT * FROM target_table ORDER BY b;
|
||||||
|
a | b
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
1 | 1
|
||||||
|
22 | 4
|
||||||
|
11 | 9
|
||||||
|
4 | 16
|
||||||
|
5 | 25
|
||||||
|
6 | 36
|
||||||
|
7 | 49
|
||||||
|
8 | 64
|
||||||
|
9 | 81
|
||||||
|
10 | 100
|
||||||
|
(10 rows)
|
||||||
|
|
||||||
|
-- verify that values have been replicated to both replicas
|
||||||
|
SELECT * FROM run_command_on_placements('target_table', 'select count(*) from %s') ORDER BY shardid, nodeport;
|
||||||
|
nodename | nodeport | shardid | success | result
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
localhost | 57637 | 4213617 | t | 1
|
||||||
|
localhost | 57638 | 4213617 | t | 1
|
||||||
|
localhost | 57637 | 4213618 | t | 2
|
||||||
|
localhost | 57638 | 4213618 | t | 2
|
||||||
|
localhost | 57637 | 4213619 | t | 3
|
||||||
|
localhost | 57638 | 4213619 | t | 3
|
||||||
|
localhost | 57637 | 4213620 | t | 4
|
||||||
|
localhost | 57638 | 4213620 | t | 4
|
||||||
|
(8 rows)
|
||||||
|
|
||||||
|
--
|
||||||
|
-- Multiple casts in the SELECT query
|
||||||
|
--
|
||||||
|
TRUNCATE target_table;
|
||||||
|
SET client_min_messages TO DEBUG2;
|
||||||
|
INSERT INTO target_table SELECT 1.12, b::bigint FROM source_table WHERE b IS NOT NULL;
|
||||||
|
DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match
|
||||||
|
DEBUG: Router planner cannot handle multi-shard select queries
|
||||||
|
DEBUG: performing repartitioned INSERT ... SELECT
|
||||||
|
DEBUG: partitioning SELECT query by column index 0 with name 'auto_coerced_by_citus_0'
|
||||||
|
DEBUG: distributed statement: INSERT INTO insert_select_repartition.target_table_4213617 AS citus_table_alias (a, b) SELECT auto_coerced_by_citus_0, auto_coerced_by_citus_1 FROM read_intermediate_results('{repartitioned_results_from_4213613_to_0,repartitioned_results_from_4213614_to_0,repartitioned_results_from_4213615_to_0,repartitioned_results_from_4213616_to_0}'::text[], 'binary'::citus_copy_format) intermediate_result(auto_coerced_by_citus_0 integer, auto_coerced_by_citus_1 integer)
|
||||||
|
RESET client_min_messages;
|
||||||
|
SELECT * FROM target_table ORDER BY a, b;
|
||||||
|
a | b
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
1 | 1
|
||||||
|
1 | 16
|
||||||
|
1 | 25
|
||||||
|
1 | 36
|
||||||
|
1 | 49
|
||||||
|
1 | 64
|
||||||
|
1 | 81
|
||||||
|
1 | 100
|
||||||
|
(8 rows)
|
||||||
|
|
||||||
|
--
|
||||||
|
-- ROLLBACK after out of range error
|
||||||
|
--
|
||||||
|
TRUNCATE target_table;
|
||||||
|
BEGIN;
|
||||||
|
INSERT INTO target_table SELECT a * 10, b FROM source_table WHERE b IS NOT NULL;
|
||||||
|
ERROR: could not find shard for partition column value
|
||||||
|
END;
|
||||||
|
SELECT max(result) FROM run_command_on_placements('target_table', 'select count(*) from %s');
|
||||||
|
max
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
0
|
||||||
|
(1 row)
|
||||||
|
|
||||||
DROP TABLE source_table, target_table;
|
DROP TABLE source_table, target_table;
|
||||||
SET client_min_messages TO WARNING;
|
SET client_min_messages TO WARNING;
|
||||||
DROP SCHEMA insert_select_repartition CASCADE;
|
DROP SCHEMA insert_select_repartition CASCADE;
|
||||||
|
|
|
@ -299,6 +299,66 @@ RESET client_min_messages;
|
||||||
SELECT a, count(*), count(distinct b) distinct_values FROM target_table GROUP BY a ORDER BY a;
|
SELECT a, count(*), count(distinct b) distinct_values FROM target_table GROUP BY a ORDER BY a;
|
||||||
|
|
||||||
|
|
||||||
|
DROP TABLE source_table, target_table;
|
||||||
|
|
||||||
|
--
|
||||||
|
-- Constraint failure and rollback
|
||||||
|
--
|
||||||
|
|
||||||
|
SET citus.shard_count TO 4;
|
||||||
|
CREATE TABLE source_table(a int, b int);
|
||||||
|
SELECT create_distributed_table('source_table', 'a');
|
||||||
|
INSERT INTO source_table SELECT i, i * i FROM generate_series(1, 10) i;
|
||||||
|
UPDATE source_table SET b = NULL where b IN (9, 4);
|
||||||
|
|
||||||
|
SET citus.shard_replication_factor TO 2;
|
||||||
|
CREATE TABLE target_table(a int, b int not null);
|
||||||
|
SELECT create_distributed_table('target_table', 'a', 'range');
|
||||||
|
CALL public.create_range_partitioned_shards('target_table', '{0,3,6,9}','{2,5,8,50}');
|
||||||
|
|
||||||
|
INSERT INTO target_table VALUES (11,9), (22,4);
|
||||||
|
|
||||||
|
EXPLAIN (costs off) INSERT INTO target_table SELECT * FROM source_table;
|
||||||
|
EXPLAIN (costs off) INSERT INTO target_table SELECT * FROM source_table WHERE b IS NOT NULL;
|
||||||
|
|
||||||
|
BEGIN;
|
||||||
|
SAVEPOINT s1;
|
||||||
|
INSERT INTO target_table SELECT * FROM source_table;
|
||||||
|
ROLLBACK TO SAVEPOINT s1;
|
||||||
|
INSERT INTO target_table SELECT * FROM source_table WHERE b IS NOT NULL;
|
||||||
|
END;
|
||||||
|
|
||||||
|
SELECT * FROM target_table ORDER BY b;
|
||||||
|
|
||||||
|
-- verify that values have been replicated to both replicas
|
||||||
|
SELECT * FROM run_command_on_placements('target_table', 'select count(*) from %s') ORDER BY shardid, nodeport;
|
||||||
|
|
||||||
|
--
|
||||||
|
-- Multiple casts in the SELECT query
|
||||||
|
--
|
||||||
|
|
||||||
|
TRUNCATE target_table;
|
||||||
|
|
||||||
|
SET client_min_messages TO DEBUG2;
|
||||||
|
INSERT INTO target_table SELECT 1.12, b::bigint FROM source_table WHERE b IS NOT NULL;
|
||||||
|
RESET client_min_messages;
|
||||||
|
|
||||||
|
SELECT * FROM target_table ORDER BY a, b;
|
||||||
|
|
||||||
|
--
|
||||||
|
-- ROLLBACK after out of range error
|
||||||
|
--
|
||||||
|
|
||||||
|
TRUNCATE target_table;
|
||||||
|
|
||||||
|
BEGIN;
|
||||||
|
INSERT INTO target_table SELECT a * 10, b FROM source_table WHERE b IS NOT NULL;
|
||||||
|
END;
|
||||||
|
|
||||||
|
SELECT max(result) FROM run_command_on_placements('target_table', 'select count(*) from %s');
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
DROP TABLE source_table, target_table;
|
DROP TABLE source_table, target_table;
|
||||||
|
|
||||||
SET client_min_messages TO WARNING;
|
SET client_min_messages TO WARNING;
|
||||||
|
|
Loading…
Reference in New Issue