mirror of https://github.com/citusdata/citus.git
Merge pull request #2379 from citusdata/fix_procedure_rollback
Fixes a bug preventing rollback in stored procedurepull/2276/merge
commit
9215c00ee2
|
@ -80,6 +80,9 @@ bool AllModificationsCommutative = false;
|
||||||
/* we've deprecated this flag, keeping here for some time not to break existing users */
|
/* we've deprecated this flag, keeping here for some time not to break existing users */
|
||||||
bool EnableDeadlockPrevention = true;
|
bool EnableDeadlockPrevention = true;
|
||||||
|
|
||||||
|
/* number of nested stored procedure call levels we are currently in */
|
||||||
|
int StoredProcedureLevel = 0;
|
||||||
|
|
||||||
/* functions needed during run phase */
|
/* functions needed during run phase */
|
||||||
static void AcquireMetadataLocks(List *taskList);
|
static void AcquireMetadataLocks(List *taskList);
|
||||||
static ShardPlacementAccess * CreatePlacementAccess(ShardPlacement *placement,
|
static ShardPlacementAccess * CreatePlacementAccess(ShardPlacement *placement,
|
||||||
|
@ -613,7 +616,8 @@ RouterSequentialModifyExecScan(CustomScanState *node)
|
||||||
* customers already use functions that touch multiple shards from within
|
* customers already use functions that touch multiple shards from within
|
||||||
* a function, so we'll ignore functions for now.
|
* a function, so we'll ignore functions for now.
|
||||||
*/
|
*/
|
||||||
if (IsTransactionBlock() || multipleTasks || taskListRequires2PC)
|
if (IsTransactionBlock() || multipleTasks || taskListRequires2PC ||
|
||||||
|
StoredProcedureLevel > 0)
|
||||||
{
|
{
|
||||||
BeginOrContinueCoordinatedTransaction();
|
BeginOrContinueCoordinatedTransaction();
|
||||||
|
|
||||||
|
|
|
@ -296,6 +296,37 @@ multi_ProcessUtility(PlannedStmt *pstmt,
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#if (PG_VERSION_NUM >= 110000)
|
||||||
|
if (IsA(parsetree, CallStmt))
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* Stored procedures are a bit strange in the sense that some statements
|
||||||
|
* are not in a transaction block, but can be rolled back. We need to
|
||||||
|
* make sure we send all statements in a transaction block. The
|
||||||
|
* StoredProcedureLevel variable signals this to the router executor
|
||||||
|
* and indicates how deep in the call stack we are in case of nested
|
||||||
|
* stored procedures.
|
||||||
|
*/
|
||||||
|
StoredProcedureLevel += 1;
|
||||||
|
|
||||||
|
PG_TRY();
|
||||||
|
{
|
||||||
|
standard_ProcessUtility(pstmt, queryString, context,
|
||||||
|
params, queryEnv, dest, completionTag);
|
||||||
|
|
||||||
|
StoredProcedureLevel -= 1;
|
||||||
|
}
|
||||||
|
PG_CATCH();
|
||||||
|
{
|
||||||
|
StoredProcedureLevel -= 1;
|
||||||
|
PG_RE_THROW();
|
||||||
|
}
|
||||||
|
PG_END_TRY();
|
||||||
|
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* TRANSMIT used to be separate command, but to avoid patching the grammar
|
* TRANSMIT used to be separate command, but to avoid patching the grammar
|
||||||
* it's no overlaid onto COPY, but with FORMAT = 'transmit' instead of the
|
* it's no overlaid onto COPY, but with FORMAT = 'transmit' instead of the
|
||||||
|
|
|
@ -36,6 +36,10 @@ typedef struct XactShardConnSet
|
||||||
extern bool AllModificationsCommutative;
|
extern bool AllModificationsCommutative;
|
||||||
extern bool EnableDeadlockPrevention;
|
extern bool EnableDeadlockPrevention;
|
||||||
|
|
||||||
|
/* number of nested stored procedure call levels we are currently in */
|
||||||
|
extern int StoredProcedureLevel;
|
||||||
|
|
||||||
|
|
||||||
extern void CitusModifyBeginScan(CustomScanState *node, EState *estate, int eflags);
|
extern void CitusModifyBeginScan(CustomScanState *node, EState *estate, int eflags);
|
||||||
extern TupleTableSlot * RouterSelectExecScan(CustomScanState *node);
|
extern TupleTableSlot * RouterSelectExecScan(CustomScanState *node);
|
||||||
extern TupleTableSlot * RouterModifyExecScan(CustomScanState *node);
|
extern TupleTableSlot * RouterModifyExecScan(CustomScanState *node);
|
||||||
|
|
|
@ -128,7 +128,6 @@ SELECT * FROM test_table ORDER BY 1, 2;
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
-- delete is commited but insert is rolled back
|
-- delete is commited but insert is rolled back
|
||||||
-- there is a bug #2371 on that preventing rollback
|
|
||||||
CREATE PROCEDURE test_procedure_rollback(tt_id int, tt_org_id int) LANGUAGE PLPGSQL AS $$
|
CREATE PROCEDURE test_procedure_rollback(tt_id int, tt_org_id int) LANGUAGE PLPGSQL AS $$
|
||||||
BEGIN
|
BEGIN
|
||||||
DELETE FROM test_table;
|
DELETE FROM test_table;
|
||||||
|
@ -141,8 +140,7 @@ CALL test_procedure_rollback(2,5);
|
||||||
SELECT * FROM test_table ORDER BY 1, 2;
|
SELECT * FROM test_table ORDER BY 1, 2;
|
||||||
id | org_id
|
id | org_id
|
||||||
----+--------
|
----+--------
|
||||||
2 | 5
|
(0 rows)
|
||||||
(1 row)
|
|
||||||
|
|
||||||
-- rollback is successfull when insert is on multiple rows
|
-- rollback is successfull when insert is on multiple rows
|
||||||
CREATE PROCEDURE test_procedure_rollback_2(tt_id int, tt_org_id int) LANGUAGE PLPGSQL AS $$
|
CREATE PROCEDURE test_procedure_rollback_2(tt_id int, tt_org_id int) LANGUAGE PLPGSQL AS $$
|
||||||
|
@ -177,13 +175,48 @@ SELECT * FROM test_table ORDER BY 1, 2;
|
||||||
2 | 15
|
2 | 15
|
||||||
(2 rows)
|
(2 rows)
|
||||||
|
|
||||||
|
TRUNCATE test_table;
|
||||||
|
-- nested procedure calls should roll back normally
|
||||||
|
CREATE OR REPLACE PROCEDURE test_procedure_rollback(tt_id int, tt_org_id int) LANGUAGE PLPGSQL AS $$
|
||||||
|
BEGIN
|
||||||
|
INSERT INTO test_table VALUES (tt_id+12, tt_org_id+12);
|
||||||
|
ROLLBACK;
|
||||||
|
END;
|
||||||
|
$$;
|
||||||
|
CREATE OR REPLACE PROCEDURE test_procedure_rollback_2(tt_id int, tt_org_id int) LANGUAGE PLPGSQL AS $$
|
||||||
|
BEGIN
|
||||||
|
INSERT INTO test_table VALUES (tt_id+2, tt_org_id+1);
|
||||||
|
ROLLBACK;
|
||||||
|
END;
|
||||||
|
$$;
|
||||||
|
CREATE OR REPLACE PROCEDURE test_procedure(tt_id int, tt_org_id int) LANGUAGE PLPGSQL AS $$
|
||||||
|
BEGIN
|
||||||
|
CALL test_procedure_rollback(tt_id, tt_org_id);
|
||||||
|
CALL test_procedure_rollback_2(tt_id, tt_org_id);
|
||||||
|
INSERT INTO test_table VALUES (tt_id+100, tt_org_id+100);
|
||||||
|
ROLLBACK;
|
||||||
|
END;
|
||||||
|
$$;
|
||||||
|
SELECT * from test_table;
|
||||||
|
id | org_id
|
||||||
|
----+--------
|
||||||
|
(0 rows)
|
||||||
|
|
||||||
|
call test_procedure(1,1);
|
||||||
|
call test_procedure(20, 20);
|
||||||
|
SELECT * from test_table;
|
||||||
|
id | org_id
|
||||||
|
----+--------
|
||||||
|
(0 rows)
|
||||||
|
|
||||||
DROP SCHEMA procedure_schema CASCADE;
|
DROP SCHEMA procedure_schema CASCADE;
|
||||||
NOTICE: drop cascades to 7 other objects
|
NOTICE: drop cascades to 8 other objects
|
||||||
DETAIL: drop cascades to table test_table
|
DETAIL: drop cascades to table test_table
|
||||||
drop cascades to function test_procedure_delete_insert(integer,integer)
|
drop cascades to function test_procedure_delete_insert(integer,integer)
|
||||||
drop cascades to function test_procedure_modify_insert(integer,integer)
|
drop cascades to function test_procedure_modify_insert(integer,integer)
|
||||||
drop cascades to function test_procedure_modify_insert_commit(integer,integer)
|
drop cascades to function test_procedure_modify_insert_commit(integer,integer)
|
||||||
|
drop cascades to function test_procedure_rollback_3(integer,integer)
|
||||||
drop cascades to function test_procedure_rollback(integer,integer)
|
drop cascades to function test_procedure_rollback(integer,integer)
|
||||||
drop cascades to function test_procedure_rollback_2(integer,integer)
|
drop cascades to function test_procedure_rollback_2(integer,integer)
|
||||||
drop cascades to function test_procedure_rollback_3(integer,integer)
|
drop cascades to function test_procedure(integer,integer)
|
||||||
RESET SEARCH_PATH;
|
RESET SEARCH_PATH;
|
||||||
|
|
|
@ -159,7 +159,6 @@ SELECT * FROM test_table ORDER BY 1, 2;
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
-- delete is commited but insert is rolled back
|
-- delete is commited but insert is rolled back
|
||||||
-- there is a bug #2371 on that preventing rollback
|
|
||||||
CREATE PROCEDURE test_procedure_rollback(tt_id int, tt_org_id int) LANGUAGE PLPGSQL AS $$
|
CREATE PROCEDURE test_procedure_rollback(tt_id int, tt_org_id int) LANGUAGE PLPGSQL AS $$
|
||||||
BEGIN
|
BEGIN
|
||||||
DELETE FROM test_table;
|
DELETE FROM test_table;
|
||||||
|
@ -229,6 +228,55 @@ SELECT * FROM test_table ORDER BY 1, 2;
|
||||||
1 | 1
|
1 | 1
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
|
TRUNCATE test_table;
|
||||||
|
-- nested procedure calls should roll back normally
|
||||||
|
CREATE OR REPLACE PROCEDURE test_procedure_rollback(tt_id int, tt_org_id int) LANGUAGE PLPGSQL AS $$
|
||||||
|
BEGIN
|
||||||
|
INSERT INTO test_table VALUES (tt_id+12, tt_org_id+12);
|
||||||
|
ROLLBACK;
|
||||||
|
END;
|
||||||
|
$$;
|
||||||
|
ERROR: syntax error at or near "PROCEDURE"
|
||||||
|
LINE 1: CREATE OR REPLACE PROCEDURE test_procedure_rollback(tt_id in...
|
||||||
|
^
|
||||||
|
CREATE OR REPLACE PROCEDURE test_procedure_rollback_2(tt_id int, tt_org_id int) LANGUAGE PLPGSQL AS $$
|
||||||
|
BEGIN
|
||||||
|
INSERT INTO test_table VALUES (tt_id+2, tt_org_id+1);
|
||||||
|
ROLLBACK;
|
||||||
|
END;
|
||||||
|
$$;
|
||||||
|
ERROR: syntax error at or near "PROCEDURE"
|
||||||
|
LINE 1: CREATE OR REPLACE PROCEDURE test_procedure_rollback_2(tt_id ...
|
||||||
|
^
|
||||||
|
CREATE OR REPLACE PROCEDURE test_procedure(tt_id int, tt_org_id int) LANGUAGE PLPGSQL AS $$
|
||||||
|
BEGIN
|
||||||
|
CALL test_procedure_rollback(tt_id, tt_org_id);
|
||||||
|
CALL test_procedure_rollback_2(tt_id, tt_org_id);
|
||||||
|
INSERT INTO test_table VALUES (tt_id+100, tt_org_id+100);
|
||||||
|
ROLLBACK;
|
||||||
|
END;
|
||||||
|
$$;
|
||||||
|
ERROR: syntax error at or near "PROCEDURE"
|
||||||
|
LINE 1: CREATE OR REPLACE PROCEDURE test_procedure(tt_id int, tt_org...
|
||||||
|
^
|
||||||
|
SELECT * from test_table;
|
||||||
|
id | org_id
|
||||||
|
----+--------
|
||||||
|
(0 rows)
|
||||||
|
|
||||||
|
call test_procedure(1,1);
|
||||||
|
ERROR: syntax error at or near "call"
|
||||||
|
LINE 1: call test_procedure(1,1);
|
||||||
|
^
|
||||||
|
call test_procedure(20, 20);
|
||||||
|
ERROR: syntax error at or near "call"
|
||||||
|
LINE 1: call test_procedure(20, 20);
|
||||||
|
^
|
||||||
|
SELECT * from test_table;
|
||||||
|
id | org_id
|
||||||
|
----+--------
|
||||||
|
(0 rows)
|
||||||
|
|
||||||
DROP SCHEMA procedure_schema CASCADE;
|
DROP SCHEMA procedure_schema CASCADE;
|
||||||
NOTICE: drop cascades to table test_table
|
NOTICE: drop cascades to table test_table
|
||||||
RESET SEARCH_PATH;
|
RESET SEARCH_PATH;
|
||||||
|
|
|
@ -96,7 +96,6 @@ CALL test_procedure_modify_insert_commit(2,30);
|
||||||
SELECT * FROM test_table ORDER BY 1, 2;
|
SELECT * FROM test_table ORDER BY 1, 2;
|
||||||
|
|
||||||
-- delete is commited but insert is rolled back
|
-- delete is commited but insert is rolled back
|
||||||
-- there is a bug #2371 on that preventing rollback
|
|
||||||
CREATE PROCEDURE test_procedure_rollback(tt_id int, tt_org_id int) LANGUAGE PLPGSQL AS $$
|
CREATE PROCEDURE test_procedure_rollback(tt_id int, tt_org_id int) LANGUAGE PLPGSQL AS $$
|
||||||
BEGIN
|
BEGIN
|
||||||
DELETE FROM test_table;
|
DELETE FROM test_table;
|
||||||
|
@ -136,6 +135,37 @@ INSERT INTO test_table VALUES (1, 1), (2, 2);
|
||||||
CALL test_procedure_rollback_3(2,15);
|
CALL test_procedure_rollback_3(2,15);
|
||||||
SELECT * FROM test_table ORDER BY 1, 2;
|
SELECT * FROM test_table ORDER BY 1, 2;
|
||||||
|
|
||||||
|
TRUNCATE test_table;
|
||||||
|
|
||||||
|
-- nested procedure calls should roll back normally
|
||||||
|
CREATE OR REPLACE PROCEDURE test_procedure_rollback(tt_id int, tt_org_id int) LANGUAGE PLPGSQL AS $$
|
||||||
|
BEGIN
|
||||||
|
INSERT INTO test_table VALUES (tt_id+12, tt_org_id+12);
|
||||||
|
ROLLBACK;
|
||||||
|
END;
|
||||||
|
$$;
|
||||||
|
|
||||||
|
CREATE OR REPLACE PROCEDURE test_procedure_rollback_2(tt_id int, tt_org_id int) LANGUAGE PLPGSQL AS $$
|
||||||
|
BEGIN
|
||||||
|
INSERT INTO test_table VALUES (tt_id+2, tt_org_id+1);
|
||||||
|
ROLLBACK;
|
||||||
|
END;
|
||||||
|
$$;
|
||||||
|
|
||||||
|
CREATE OR REPLACE PROCEDURE test_procedure(tt_id int, tt_org_id int) LANGUAGE PLPGSQL AS $$
|
||||||
|
BEGIN
|
||||||
|
CALL test_procedure_rollback(tt_id, tt_org_id);
|
||||||
|
CALL test_procedure_rollback_2(tt_id, tt_org_id);
|
||||||
|
INSERT INTO test_table VALUES (tt_id+100, tt_org_id+100);
|
||||||
|
ROLLBACK;
|
||||||
|
END;
|
||||||
|
$$;
|
||||||
|
|
||||||
|
SELECT * from test_table;
|
||||||
|
call test_procedure(1,1);
|
||||||
|
call test_procedure(20, 20);
|
||||||
|
SELECT * from test_table;
|
||||||
|
|
||||||
DROP SCHEMA procedure_schema CASCADE;
|
DROP SCHEMA procedure_schema CASCADE;
|
||||||
|
|
||||||
RESET SEARCH_PATH;
|
RESET SEARCH_PATH;
|
||||||
|
|
Loading…
Reference in New Issue