mirror of https://github.com/citusdata/citus.git
Test ROLLBACK TO SAVEPOINT with multi-shard CTE failures
parent
aafd22dffa
commit
32ecb6884c
|
@ -2262,6 +2262,10 @@ CitusCopyDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest)
|
||||||
}
|
}
|
||||||
PG_CATCH();
|
PG_CATCH();
|
||||||
{
|
{
|
||||||
|
/*
|
||||||
|
* We might be able to recover from errors with ROLLBACK TO SAVEPOINT,
|
||||||
|
* so unclaim the connections before throwing errors.
|
||||||
|
*/
|
||||||
HTAB *shardConnectionHash = copyDest->shardConnectionHash;
|
HTAB *shardConnectionHash = copyDest->shardConnectionHash;
|
||||||
UnclaimAllShardConnections(shardConnectionHash);
|
UnclaimAllShardConnections(shardConnectionHash);
|
||||||
|
|
||||||
|
|
|
@ -1038,8 +1038,8 @@ BuildWaitEventSet(MultiConnection **allConnections, int totalConnectionCount,
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* MultiClientCancel sends a cancelation request on the given connection. Return
|
* SendCancelationRequest sends a cancelation request on the given connection.
|
||||||
* value indicates whether the cancelation request was sent successfully.
|
* Return value indicates whether the cancelation request was sent successfully.
|
||||||
*/
|
*/
|
||||||
bool
|
bool
|
||||||
SendCancelationRequest(MultiConnection *connection)
|
SendCancelationRequest(MultiConnection *connection)
|
||||||
|
|
|
@ -1640,6 +1640,10 @@ ExecuteModifyTasks(List *taskList, bool expectResults, ParamListInfo paramListIn
|
||||||
}
|
}
|
||||||
PG_CATCH();
|
PG_CATCH();
|
||||||
{
|
{
|
||||||
|
/*
|
||||||
|
* We might be able to recover from errors with ROLLBACK TO SAVEPOINT,
|
||||||
|
* so unclaim the connections before throwing errors.
|
||||||
|
*/
|
||||||
UnclaimAllShardConnections(shardConnectionHash);
|
UnclaimAllShardConnections(shardConnectionHash);
|
||||||
PG_RE_THROW();
|
PG_RE_THROW();
|
||||||
}
|
}
|
||||||
|
|
|
@ -193,6 +193,76 @@ SELECT * FROM artists WHERE id IN (11, 12) ORDER BY id;
|
||||||
12 | Marc Chagall
|
12 | Marc Chagall
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
|
-- Recover from multi-shard CTE modify failures
|
||||||
|
create table t1(a int, b int);
|
||||||
|
create table t2(a int, b int CHECK(b > 0));
|
||||||
|
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1190000;
|
||||||
|
select create_distributed_table('t1', 'a'),
|
||||||
|
create_distributed_table('t2', 'a');
|
||||||
|
create_distributed_table | create_distributed_table
|
||||||
|
--------------------------+--------------------------
|
||||||
|
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
begin;
|
||||||
|
insert into t2 select i, i+1 from generate_series(1, 3) i;
|
||||||
|
with r AS (
|
||||||
|
update t2 set b = b + 1
|
||||||
|
returning *
|
||||||
|
) insert into t1 select * from r;
|
||||||
|
savepoint s1;
|
||||||
|
with r AS (
|
||||||
|
update t1 set b = b - 10
|
||||||
|
returning *
|
||||||
|
) insert into t2 select * from r;
|
||||||
|
ERROR: new row for relation "t2_1190005" violates check constraint "t2_b_check"
|
||||||
|
rollback to savepoint s1;
|
||||||
|
savepoint s2;
|
||||||
|
with r AS (
|
||||||
|
update t2 set b = b - 10
|
||||||
|
returning *
|
||||||
|
) insert into t1 select * from r;
|
||||||
|
ERROR: new row for relation "t2_1190004" violates check constraint "t2_b_check"
|
||||||
|
rollback to savepoint s2;
|
||||||
|
savepoint s3;
|
||||||
|
with r AS (
|
||||||
|
insert into t2 select i, i+1 from generate_series(-10,-5) i
|
||||||
|
returning *
|
||||||
|
) insert into t1 select * from r;
|
||||||
|
ERROR: new row for relation "t2_1190004" violates check constraint "t2_b_check"
|
||||||
|
rollback to savepoint s3;
|
||||||
|
savepoint s4;
|
||||||
|
with r AS (
|
||||||
|
insert into t1 select i, i+1 from generate_series(-10,-5) i
|
||||||
|
returning *
|
||||||
|
) insert into t2 select * from r;
|
||||||
|
ERROR: new row for relation "t2_1190005" violates check constraint "t2_b_check"
|
||||||
|
rollback to savepoint s4;
|
||||||
|
with r AS (
|
||||||
|
update t2 set b = b + 1
|
||||||
|
returning *
|
||||||
|
) insert into t1 select * from r;
|
||||||
|
commit;
|
||||||
|
select * from t2 order by a, b;
|
||||||
|
a | b
|
||||||
|
---+---
|
||||||
|
1 | 4
|
||||||
|
2 | 5
|
||||||
|
3 | 6
|
||||||
|
(3 rows)
|
||||||
|
|
||||||
|
select * from t1 order by a, b;
|
||||||
|
a | b
|
||||||
|
---+---
|
||||||
|
1 | 3
|
||||||
|
1 | 4
|
||||||
|
2 | 4
|
||||||
|
2 | 5
|
||||||
|
3 | 5
|
||||||
|
3 | 6
|
||||||
|
(6 rows)
|
||||||
|
|
||||||
|
drop table t1, t2;
|
||||||
-- ===================================================================
|
-- ===================================================================
|
||||||
-- Tests for replication factor > 1
|
-- Tests for replication factor > 1
|
||||||
-- ===================================================================
|
-- ===================================================================
|
||||||
|
|
|
@ -193,6 +193,76 @@ SELECT * FROM artists WHERE id IN (11, 12) ORDER BY id;
|
||||||
12 | Marc Chagall
|
12 | Marc Chagall
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
|
-- Recover from multi-shard CTE modify failures
|
||||||
|
create table t1(a int, b int);
|
||||||
|
create table t2(a int, b int CHECK(b > 0));
|
||||||
|
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1190000;
|
||||||
|
select create_distributed_table('t1', 'a'),
|
||||||
|
create_distributed_table('t2', 'a');
|
||||||
|
create_distributed_table | create_distributed_table
|
||||||
|
--------------------------+--------------------------
|
||||||
|
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
begin;
|
||||||
|
insert into t2 select i, i+1 from generate_series(1, 3) i;
|
||||||
|
with r AS (
|
||||||
|
update t2 set b = b + 1
|
||||||
|
returning *
|
||||||
|
) insert into t1 select * from r;
|
||||||
|
savepoint s1;
|
||||||
|
with r AS (
|
||||||
|
update t1 set b = b - 10
|
||||||
|
returning *
|
||||||
|
) insert into t2 select * from r;
|
||||||
|
ERROR: new row for relation "t2_1190005" violates check constraint "t2_b_check"
|
||||||
|
rollback to savepoint s1;
|
||||||
|
savepoint s2;
|
||||||
|
with r AS (
|
||||||
|
update t2 set b = b - 10
|
||||||
|
returning *
|
||||||
|
) insert into t1 select * from r;
|
||||||
|
ERROR: new row for relation "t2_1190004" violates check constraint "t2_b_check"
|
||||||
|
rollback to savepoint s2;
|
||||||
|
savepoint s3;
|
||||||
|
with r AS (
|
||||||
|
insert into t2 select i, i+1 from generate_series(-10,-5) i
|
||||||
|
returning *
|
||||||
|
) insert into t1 select * from r;
|
||||||
|
ERROR: new row for relation "t2_1190004" violates check constraint "t2_b_check"
|
||||||
|
rollback to savepoint s3;
|
||||||
|
savepoint s4;
|
||||||
|
with r AS (
|
||||||
|
insert into t1 select i, i+1 from generate_series(-10,-5) i
|
||||||
|
returning *
|
||||||
|
) insert into t2 select * from r;
|
||||||
|
ERROR: new row for relation "t2_1190005" violates check constraint "t2_b_check"
|
||||||
|
rollback to savepoint s4;
|
||||||
|
with r AS (
|
||||||
|
update t2 set b = b + 1
|
||||||
|
returning *
|
||||||
|
) insert into t1 select * from r;
|
||||||
|
commit;
|
||||||
|
select * from t2 order by a, b;
|
||||||
|
a | b
|
||||||
|
---+---
|
||||||
|
1 | 4
|
||||||
|
2 | 5
|
||||||
|
3 | 6
|
||||||
|
(3 rows)
|
||||||
|
|
||||||
|
select * from t1 order by a, b;
|
||||||
|
a | b
|
||||||
|
---+---
|
||||||
|
1 | 3
|
||||||
|
1 | 4
|
||||||
|
2 | 4
|
||||||
|
2 | 5
|
||||||
|
3 | 5
|
||||||
|
3 | 6
|
||||||
|
(6 rows)
|
||||||
|
|
||||||
|
drop table t1, t2;
|
||||||
-- ===================================================================
|
-- ===================================================================
|
||||||
-- Tests for replication factor > 1
|
-- Tests for replication factor > 1
|
||||||
-- ===================================================================
|
-- ===================================================================
|
||||||
|
|
|
@ -152,6 +152,55 @@ COMMIT;
|
||||||
|
|
||||||
SELECT * FROM artists WHERE id IN (11, 12) ORDER BY id;
|
SELECT * FROM artists WHERE id IN (11, 12) ORDER BY id;
|
||||||
|
|
||||||
|
-- Recover from multi-shard CTE modify failures
|
||||||
|
create table t1(a int, b int);
|
||||||
|
create table t2(a int, b int CHECK(b > 0));
|
||||||
|
|
||||||
|
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1190000;
|
||||||
|
|
||||||
|
select create_distributed_table('t1', 'a'),
|
||||||
|
create_distributed_table('t2', 'a');
|
||||||
|
|
||||||
|
begin;
|
||||||
|
insert into t2 select i, i+1 from generate_series(1, 3) i;
|
||||||
|
with r AS (
|
||||||
|
update t2 set b = b + 1
|
||||||
|
returning *
|
||||||
|
) insert into t1 select * from r;
|
||||||
|
savepoint s1;
|
||||||
|
with r AS (
|
||||||
|
update t1 set b = b - 10
|
||||||
|
returning *
|
||||||
|
) insert into t2 select * from r;
|
||||||
|
rollback to savepoint s1;
|
||||||
|
savepoint s2;
|
||||||
|
with r AS (
|
||||||
|
update t2 set b = b - 10
|
||||||
|
returning *
|
||||||
|
) insert into t1 select * from r;
|
||||||
|
rollback to savepoint s2;
|
||||||
|
savepoint s3;
|
||||||
|
with r AS (
|
||||||
|
insert into t2 select i, i+1 from generate_series(-10,-5) i
|
||||||
|
returning *
|
||||||
|
) insert into t1 select * from r;
|
||||||
|
rollback to savepoint s3;
|
||||||
|
savepoint s4;
|
||||||
|
with r AS (
|
||||||
|
insert into t1 select i, i+1 from generate_series(-10,-5) i
|
||||||
|
returning *
|
||||||
|
) insert into t2 select * from r;
|
||||||
|
rollback to savepoint s4;
|
||||||
|
with r AS (
|
||||||
|
update t2 set b = b + 1
|
||||||
|
returning *
|
||||||
|
) insert into t1 select * from r;
|
||||||
|
commit;
|
||||||
|
|
||||||
|
select * from t2 order by a, b;
|
||||||
|
select * from t1 order by a, b;
|
||||||
|
|
||||||
|
drop table t1, t2;
|
||||||
|
|
||||||
-- ===================================================================
|
-- ===================================================================
|
||||||
-- Tests for replication factor > 1
|
-- Tests for replication factor > 1
|
||||||
|
|
Loading…
Reference in New Issue