From e5fbcf37dde751eeab29231c4067d49d11c70fe0 Mon Sep 17 00:00:00 2001 From: Hadi Moshayedi Date: Tue, 15 Aug 2017 13:02:28 -0400 Subject: [PATCH] Add Savepoint Support (#1539) This change adds support for SAVEPOINT, ROLLBACK TO SAVEPOINT, and RELEASE SAVEPOINT. When transaction connections are not established yet, savepoints are kept in a stack and sent to the worker when the connection is later established. After establishing connections, savepoint commands are sent as they arrive. This change fixes #1493 . --- .../transaction/remote_transaction.c | 316 +++++++++++++++++- .../transaction/transaction_management.c | 101 +++++- src/include/distributed/remote_transaction.h | 18 + .../distributed/transaction_management.h | 4 + .../expected/multi_modifying_xacts.out | 13 +- .../expected/multi_mx_modifying_xacts.out | 11 +- .../expected/multi_subtransactions.out | 277 +++++++++++++++ .../input/multi_alter_table_statements.source | 3 +- src/test/regress/multi_schedule | 1 + .../multi_alter_table_statements.source | 11 +- .../regress/sql/multi_modifying_xacts.sql | 4 +- .../regress/sql/multi_mx_modifying_xacts.sql | 2 +- .../regress/sql/multi_subtransactions.sql | 215 ++++++++++++ 13 files changed, 935 insertions(+), 41 deletions(-) create mode 100644 src/test/regress/expected/multi_subtransactions.out create mode 100644 src/test/regress/sql/multi_subtransactions.sql diff --git a/src/backend/distributed/transaction/remote_transaction.c b/src/backend/distributed/transaction/remote_transaction.c index f89efdf1b..fc131622b 100644 --- a/src/backend/distributed/transaction/remote_transaction.c +++ b/src/backend/distributed/transaction/remote_transaction.c @@ -27,6 +27,19 @@ #include "utils/hsearch.h" +static void StartRemoteTransactionSavepointBegin(MultiConnection *connection, + SubTransactionId subId); +static void FinishRemoteTransactionSavepointBegin(MultiConnection *connection, + SubTransactionId subId); +static void StartRemoteTransactionSavepointRelease(MultiConnection *connection, + SubTransactionId subId); +static void FinishRemoteTransactionSavepointRelease(MultiConnection *connection, + SubTransactionId subId); +static void StartRemoteTransactionSavepointRollback(MultiConnection *connection, + SubTransactionId subId); +static void FinishRemoteTransactionSavepointRollback(MultiConnection *connection, + SubTransactionId subId); + static void CheckTransactionHealth(void); static void Assign2PCIdentifier(MultiConnection *connection); static void WarnAboutLeakedPreparedTransaction(MultiConnection *connection, bool commit); @@ -44,6 +57,8 @@ StartRemoteTransactionBegin(struct MultiConnection *connection) RemoteTransaction *transaction = &connection->remoteTransaction; StringInfo beginAndSetDistributedTransactionId = makeStringInfo(); DistributedTransactionId *distributedTransactionId = NULL; + ListCell *subIdCell = NULL; + List *activeSubXacts = NIL; Assert(transaction->transactionState == REMOTE_TRANS_INVALID); @@ -67,11 +82,22 @@ StartRemoteTransactionBegin(struct MultiConnection *connection) */ distributedTransactionId = GetCurrentDistributedTransactionId(); appendStringInfo(beginAndSetDistributedTransactionId, - "SELECT assign_distributed_transaction_id(%d, %ld, '%s')", + "SELECT assign_distributed_transaction_id(%d, %ld, '%s');", distributedTransactionId->initiatorNodeIdentifier, distributedTransactionId->transactionNumber, timestamptz_to_str(distributedTransactionId->timestamp)); + /* append in-progress savepoints for this transaction */ + activeSubXacts = ActiveSubXacts(); + transaction->lastSuccessfulSubXact = TopSubTransactionId; + transaction->lastQueuedSubXact = TopSubTransactionId; + foreach(subIdCell, activeSubXacts) + { + SubTransactionId subId = lfirst_int(subIdCell); + appendStringInfo(beginAndSetDistributedTransactionId, + "SAVEPOINT savepoint_%u;", subId); + transaction->lastQueuedSubXact = subId; + } if (!SendRemoteCommand(connection, beginAndSetDistributedTransactionId->data)) { @@ -103,6 +129,7 @@ FinishRemoteTransactionBegin(struct MultiConnection *connection) else { transaction->transactionState = REMOTE_TRANS_STARTED; + transaction->lastSuccessfulSubXact = transaction->lastQueuedSubXact; } PQclear(result); @@ -861,6 +888,293 @@ CoordinatedRemoteTransactionsAbort(void) } +/* + * CoordinatedRemoteTransactionsSavepointBegin sends the SAVEPOINT command for + * the given sub-transaction id to all connections participating in the current + * transaction. + */ +void +CoordinatedRemoteTransactionsSavepointBegin(SubTransactionId subId) +{ + dlist_iter iter; + const bool raiseInterrupts = true; + List *connectionList = NIL; + + /* asynchronously send SAVEPOINT */ + dlist_foreach(iter, &InProgressTransactions) + { + MultiConnection *connection = dlist_container(MultiConnection, transactionNode, + iter.cur); + RemoteTransaction *transaction = &connection->remoteTransaction; + if (transaction->transactionFailed) + { + continue; + } + + StartRemoteTransactionSavepointBegin(connection, subId); + connectionList = lappend(connectionList, connection); + } + + WaitForAllConnections(connectionList, raiseInterrupts); + + /* and wait for the results */ + dlist_foreach(iter, &InProgressTransactions) + { + MultiConnection *connection = dlist_container(MultiConnection, transactionNode, + iter.cur); + RemoteTransaction *transaction = &connection->remoteTransaction; + if (transaction->transactionFailed) + { + continue; + } + + FinishRemoteTransactionSavepointBegin(connection, subId); + + if (!transaction->transactionFailed) + { + transaction->lastSuccessfulSubXact = subId; + } + } +} + + +/* + * CoordinatedRemoteTransactionsSavepointRelease sends the RELEASE SAVEPOINT + * command for the given sub-transaction id to all connections participating in + * the current transaction. + */ +void +CoordinatedRemoteTransactionsSavepointRelease(SubTransactionId subId) +{ + dlist_iter iter; + const bool raiseInterrupts = true; + List *connectionList = NIL; + + /* asynchronously send RELEASE SAVEPOINT */ + dlist_foreach(iter, &InProgressTransactions) + { + MultiConnection *connection = dlist_container(MultiConnection, transactionNode, + iter.cur); + RemoteTransaction *transaction = &connection->remoteTransaction; + if (transaction->transactionFailed) + { + continue; + } + + StartRemoteTransactionSavepointRelease(connection, subId); + connectionList = lappend(connectionList, connection); + } + + WaitForAllConnections(connectionList, raiseInterrupts); + + /* and wait for the results */ + dlist_foreach(iter, &InProgressTransactions) + { + MultiConnection *connection = dlist_container(MultiConnection, transactionNode, + iter.cur); + RemoteTransaction *transaction = &connection->remoteTransaction; + if (transaction->transactionFailed) + { + continue; + } + + FinishRemoteTransactionSavepointRelease(connection, subId); + } +} + + +/* + * CoordinatedRemoteTransactionsSavepointRollback sends the ROLLBACK TO SAVEPOINT + * command for the given sub-transaction id to all connections participating in + * the current transaction. + */ +void +CoordinatedRemoteTransactionsSavepointRollback(SubTransactionId subId) +{ + dlist_iter iter; + const bool dontRaiseInterrupts = false; + List *connectionList = NIL; + + /* asynchronously send ROLLBACK TO SAVEPOINT */ + dlist_foreach(iter, &InProgressTransactions) + { + MultiConnection *connection = dlist_container(MultiConnection, transactionNode, + iter.cur); + RemoteTransaction *transaction = &connection->remoteTransaction; + if (transaction->transactionFailed) + { + if (transaction->lastSuccessfulSubXact <= subId) + { + transaction->transactionRecovering = true; + + /* + * Clear the results of the failed query so we can send the ROLLBACK + * TO SAVEPOINT command for a savepoint that can recover the transaction + * from failure. + */ + ForgetResults(connection); + } + else + { + continue; + } + } + StartRemoteTransactionSavepointRollback(connection, subId); + connectionList = lappend(connectionList, connection); + } + + WaitForAllConnections(connectionList, dontRaiseInterrupts); + + /* and wait for the results */ + dlist_foreach(iter, &InProgressTransactions) + { + MultiConnection *connection = dlist_container(MultiConnection, transactionNode, + iter.cur); + RemoteTransaction *transaction = &connection->remoteTransaction; + if (transaction->transactionFailed && !transaction->transactionRecovering) + { + continue; + } + + FinishRemoteTransactionSavepointRollback(connection, subId); + } +} + + +/* + * StartRemoteTransactionSavepointBegin initiates SAVEPOINT command for the given + * subtransaction id in a non-blocking manner. + */ +static void +StartRemoteTransactionSavepointBegin(MultiConnection *connection, SubTransactionId subId) +{ + const bool raiseErrors = true; + StringInfo savepointCommand = makeStringInfo(); + appendStringInfo(savepointCommand, "SAVEPOINT savepoint_%u", subId); + + if (!SendRemoteCommand(connection, savepointCommand->data)) + { + ReportConnectionError(connection, WARNING); + MarkRemoteTransactionFailed(connection, raiseErrors); + } +} + + +/* + * FinishRemoteTransactionSavepointBegin finishes the work + * StartRemoteTransactionSavepointBegin initiated. It blocks if necessary (i.e. + * if PQisBusy() would return true). + */ +static void +FinishRemoteTransactionSavepointBegin(MultiConnection *connection, SubTransactionId subId) +{ + const bool raiseErrors = true; + PGresult *result = GetRemoteCommandResult(connection, raiseErrors); + if (!IsResponseOK(result)) + { + ReportResultError(connection, result, WARNING); + MarkRemoteTransactionFailed(connection, raiseErrors); + } + + PQclear(result); + ForgetResults(connection); +} + + +/* + * StartRemoteTransactionSavepointRelease initiates RELEASE SAVEPOINT command for + * the given subtransaction id in a non-blocking manner. + */ +static void +StartRemoteTransactionSavepointRelease(MultiConnection *connection, + SubTransactionId subId) +{ + const bool raiseErrors = true; + StringInfo savepointCommand = makeStringInfo(); + appendStringInfo(savepointCommand, "RELEASE SAVEPOINT savepoint_%u", subId); + + if (!SendRemoteCommand(connection, savepointCommand->data)) + { + ReportConnectionError(connection, WARNING); + MarkRemoteTransactionFailed(connection, raiseErrors); + } +} + + +/* + * FinishRemoteTransactionSavepointRelease finishes the work + * StartRemoteTransactionSavepointRelease initiated. It blocks if necessary (i.e. + * if PQisBusy() would return true). + */ +static void +FinishRemoteTransactionSavepointRelease(MultiConnection *connection, + SubTransactionId subId) +{ + const bool raiseErrors = true; + PGresult *result = GetRemoteCommandResult(connection, raiseErrors); + if (!IsResponseOK(result)) + { + ReportResultError(connection, result, WARNING); + MarkRemoteTransactionFailed(connection, raiseErrors); + } + + PQclear(result); + ForgetResults(connection); +} + + +/* + * StartRemoteTransactionSavepointRollback initiates ROLLBACK TO SAVEPOINT command + * for the given subtransaction id in a non-blocking manner. + */ +static void +StartRemoteTransactionSavepointRollback(MultiConnection *connection, + SubTransactionId subId) +{ + const bool dontRaiseErrors = false; + StringInfo savepointCommand = makeStringInfo(); + appendStringInfo(savepointCommand, "ROLLBACK TO SAVEPOINT savepoint_%u", subId); + + if (!SendRemoteCommand(connection, savepointCommand->data)) + { + ReportConnectionError(connection, WARNING); + MarkRemoteTransactionFailed(connection, dontRaiseErrors); + } +} + + +/* + * FinishRemoteTransactionSavepointRollback finishes the work + * StartRemoteTransactionSavepointRollback initiated. It blocks if necessary (i.e. + * if PQisBusy() would return true). It also recovers the transaction from failure + * if transaction is recovering and the rollback command succeeds. + */ +static void +FinishRemoteTransactionSavepointRollback(MultiConnection *connection, SubTransactionId + subId) +{ + const bool dontRaiseErrors = false; + RemoteTransaction *transaction = &connection->remoteTransaction; + + PGresult *result = GetRemoteCommandResult(connection, dontRaiseErrors); + if (!IsResponseOK(result)) + { + ReportResultError(connection, result, WARNING); + MarkRemoteTransactionFailed(connection, dontRaiseErrors); + } + + /* ROLLBACK TO SAVEPOINT succeeded, check if it recovers the transaction */ + else if (transaction->transactionRecovering) + { + transaction->transactionFailed = false; + transaction->transactionRecovering = false; + } + + PQclear(result); + ForgetResults(connection); +} + + /* * CheckTransactionHealth checks if any of the participating transactions in a * coordinated transaction failed, and what consequence that should have. diff --git a/src/backend/distributed/transaction/transaction_management.c b/src/backend/distributed/transaction/transaction_management.c index e3094840d..9d7d0b30c 100644 --- a/src/backend/distributed/transaction/transaction_management.c +++ b/src/backend/distributed/transaction/transaction_management.c @@ -27,6 +27,7 @@ #include "distributed/placement_connection.h" #include "utils/hsearch.h" #include "utils/guc.h" +#include "utils/memutils.h" CoordinatedTransactionState CurrentCoordinatedTransactionState = COORD_TRANS_NONE; @@ -41,8 +42,8 @@ XactModificationType XactModificationLevel = XACT_MODIFICATION_NONE; /* list of connections that are part of the current coordinated transaction */ dlist_head InProgressTransactions = DLIST_STATIC_INIT(InProgressTransactions); - -static bool subXactAbortAttempted = false; +/* stack of active sub-transactions */ +static List *activeSubXacts = NIL; /* * Should this coordinated transaction use 2PC? Set by @@ -58,6 +59,8 @@ static void CoordinatedSubTransactionCallback(SubXactEvent event, SubTransaction /* remaining functions */ static void AdjustMaxPreparedTransactions(void); +static void PushSubXact(SubTransactionId subId); +static void PopSubXact(SubTransactionId subId); /* @@ -166,7 +169,6 @@ CoordinatedTransactionCallback(XactEvent event, void *arg) AfterXactConnectionHandling(true); } - Assert(!subXactAbortAttempted); CurrentCoordinatedTransactionState = COORD_TRANS_NONE; XactModificationLevel = XACT_MODIFICATION_NONE; dlist_init(&InProgressTransactions); @@ -208,7 +210,6 @@ CoordinatedTransactionCallback(XactEvent event, void *arg) XactModificationLevel = XACT_MODIFICATION_NONE; dlist_init(&InProgressTransactions); CoordinatedTransactionUses2PC = false; - subXactAbortAttempted = false; UnSetDistributedTransactionId(); break; } @@ -227,18 +228,6 @@ CoordinatedTransactionCallback(XactEvent event, void *arg) case XACT_EVENT_PRE_COMMIT: { - if (subXactAbortAttempted) - { - subXactAbortAttempted = false; - - if (XactModificationLevel != XACT_MODIFICATION_NONE) - { - ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("cannot ROLLBACK TO SAVEPOINT in transactions " - "which modify distributed tables"))); - } - } - /* nothing further to do if there's no managed remote xacts */ if (CurrentCoordinatedTransactionState == COORD_TRANS_NONE) { @@ -308,9 +297,43 @@ static void CoordinatedSubTransactionCallback(SubXactEvent event, SubTransactionId subId, SubTransactionId parentSubid, void *arg) { - if (event == SUBXACT_EVENT_ABORT_SUB) + switch (event) { - subXactAbortAttempted = true; + case SUBXACT_EVENT_START_SUB: + { + PushSubXact(subId); + if (InCoordinatedTransaction()) + { + CoordinatedRemoteTransactionsSavepointBegin(subId); + } + break; + } + + case SUBXACT_EVENT_COMMIT_SUB: + { + PopSubXact(subId); + if (InCoordinatedTransaction()) + { + CoordinatedRemoteTransactionsSavepointRelease(subId); + } + break; + } + + case SUBXACT_EVENT_ABORT_SUB: + { + PopSubXact(subId); + if (InCoordinatedTransaction()) + { + CoordinatedRemoteTransactionsSavepointRollback(subId); + } + break; + } + + case SUBXACT_EVENT_PRE_COMMIT_SUB: + { + /* nothing to do */ + break; + } } } @@ -344,3 +367,45 @@ AdjustMaxPreparedTransactions(void) newvalue))); } } + + +/* PushSubXact pushes subId to the stack of active sub-transactions. */ +static void +PushSubXact(SubTransactionId subId) +{ + MemoryContext old_context = MemoryContextSwitchTo(CurTransactionContext); + activeSubXacts = lcons_int(subId, activeSubXacts); + MemoryContextSwitchTo(old_context); +} + + +/* PopSubXact pops subId from the stack of active sub-transactions. */ +static void +PopSubXact(SubTransactionId subId) +{ + MemoryContext old_context = MemoryContextSwitchTo(CurTransactionContext); + Assert(linitial_int(activeSubXacts) == subId); + activeSubXacts = list_delete_first(activeSubXacts); + MemoryContextSwitchTo(old_context); +} + + +/* ActiveSubXacts returns list of active sub-transactions in temporal order. */ +List * +ActiveSubXacts(void) +{ + ListCell *subIdCell = NULL; + List *activeSubXactsReversed = NIL; + + /* + * activeSubXacts is in reversed temporal order, so we reverse it to get it + * in temporal order. + */ + foreach(subIdCell, activeSubXacts) + { + SubTransactionId subId = lfirst_int(subIdCell); + activeSubXactsReversed = lcons_int(subId, activeSubXactsReversed); + } + + return activeSubXactsReversed; +} diff --git a/src/include/distributed/remote_transaction.h b/src/include/distributed/remote_transaction.h index 45743c9b8..30ab3bb92 100644 --- a/src/include/distributed/remote_transaction.h +++ b/src/include/distributed/remote_transaction.h @@ -61,6 +61,19 @@ typedef struct RemoteTransaction /* failed in current transaction */ bool transactionFailed; + /* + * Id of last savepoint that successfully began before transaction failure. + * Since savepoint ids are assigned incrementally, rolling back to any savepoint + * with id equal to or less than this id recovers the transaction from failures. + */ + SubTransactionId lastSuccessfulSubXact; + + /* Id of last savepoint queued before first query of transaction */ + SubTransactionId lastQueuedSubXact; + + /* waiting for the result of a recovering ROLLBACK TO SAVEPOINT command */ + bool transactionRecovering; + /* 2PC transaction name currently associated with connection */ char preparedName[NAMEDATALEN]; } RemoteTransaction; @@ -107,4 +120,9 @@ extern void CoordinatedRemoteTransactionsPrepare(void); extern void CoordinatedRemoteTransactionsCommit(void); extern void CoordinatedRemoteTransactionsAbort(void); +/* remote savepoint commands */ +extern void CoordinatedRemoteTransactionsSavepointBegin(SubTransactionId subId); +extern void CoordinatedRemoteTransactionsSavepointRelease(SubTransactionId subId); +extern void CoordinatedRemoteTransactionsSavepointRollback(SubTransactionId subId); + #endif /* REMOTE_TRANSACTION_H */ diff --git a/src/include/distributed/transaction_management.h b/src/include/distributed/transaction_management.h index dd0c691ed..731befdfc 100644 --- a/src/include/distributed/transaction_management.h +++ b/src/include/distributed/transaction_management.h @@ -10,6 +10,7 @@ #define TRANSACTION_MANAGMENT_H #include "lib/ilist.h" +#include "nodes/pg_list.h" /* describes what kind of modifications have occurred in the current transaction */ typedef enum @@ -77,5 +78,8 @@ extern void CoordinatedTransactionUse2PC(void); /* initialization function(s) */ extern void InitializeTransactionManagement(void); +/* other functions */ +extern List * ActiveSubXacts(void); + #endif /* TRANSACTION_MANAGMENT_H */ diff --git a/src/test/regress/expected/multi_modifying_xacts.out b/src/test/regress/expected/multi_modifying_xacts.out index 986007d02..2465c7499 100644 --- a/src/test/regress/expected/multi_modifying_xacts.out +++ b/src/test/regress/expected/multi_modifying_xacts.out @@ -108,18 +108,18 @@ EXCEPTION RAISE NOTICE 'caught not_null_violation'; END $$; COMMIT; --- but rollback should not +-- rollback should also work BEGIN; INSERT INTO researchers VALUES (7, 4, 'Jim Gray'); SAVEPOINT hire_engelbart; INSERT INTO researchers VALUES (8, 4, 'Douglas Engelbart'); ROLLBACK TO hire_engelbart; COMMIT; -ERROR: cannot ROLLBACK TO SAVEPOINT in transactions which modify distributed tables SELECT name FROM researchers WHERE lab_id = 4; - name ------- -(0 rows) + name +---------- + Jim Gray +(1 row) BEGIN; DO $$ @@ -132,7 +132,6 @@ EXCEPTION END $$; NOTICE: caught not_null_violation COMMIT; -ERROR: cannot ROLLBACK TO SAVEPOINT in transactions which modify distributed tables \set VERBOSITY default -- should be valid to edit labs after researchers... BEGIN; @@ -1522,4 +1521,4 @@ SELECT * FROM users JOIN usergroups ON (user_group = gid) WHERE id = 4; (1 row) END; -DROP TABLE items, users, itemgroups, usergroups; +DROP TABLE items, users, itemgroups, usergroups, researchers, labs; diff --git a/src/test/regress/expected/multi_mx_modifying_xacts.out b/src/test/regress/expected/multi_mx_modifying_xacts.out index 40d879eb6..0efee2b2a 100644 --- a/src/test/regress/expected/multi_mx_modifying_xacts.out +++ b/src/test/regress/expected/multi_mx_modifying_xacts.out @@ -100,18 +100,18 @@ EXCEPTION RAISE NOTICE 'caught not_null_violation'; END $$; COMMIT; --- but rollback should not +-- rollback should also work BEGIN; INSERT INTO researchers_mx VALUES (7, 4, 'Jim Gray'); SAVEPOINT hire_engelbart; INSERT INTO researchers_mx VALUES (8, 4, 'Douglas Engelbart'); ROLLBACK TO hire_engelbart; COMMIT; -ERROR: cannot ROLLBACK TO SAVEPOINT in transactions which modify distributed tables SELECT name FROM researchers_mx WHERE lab_id = 4; - name ------- -(0 rows) + name +---------- + Jim Gray +(1 row) BEGIN; DO $$ @@ -123,7 +123,6 @@ EXCEPTION END $$; NOTICE: caught not_null_violation COMMIT; -ERROR: could not make changes to shard 1220100 on any node \set VERBOSITY default -- should be valid to edit labs_mx after researchers_mx... BEGIN; diff --git a/src/test/regress/expected/multi_subtransactions.out b/src/test/regress/expected/multi_subtransactions.out new file mode 100644 index 000000000..8a34a7a6d --- /dev/null +++ b/src/test/regress/expected/multi_subtransactions.out @@ -0,0 +1,277 @@ +CREATE TABLE artists ( + id bigint NOT NULL, + name text NOT NULL +); +SELECT create_distributed_table('artists', 'id'); + create_distributed_table +-------------------------- + +(1 row) + +-- add some data +INSERT INTO artists VALUES (1, 'Pablo Picasso'); +INSERT INTO artists VALUES (2, 'Vincent van Gogh'); +INSERT INTO artists VALUES (3, 'Claude Monet'); +INSERT INTO artists VALUES (4, 'William Kurelek'); +-- RELEASE SAVEPOINT +BEGIN; +INSERT INTO artists VALUES (5, 'Asher Lev'); +SAVEPOINT s1; +DELETE FROM artists WHERE id=5; +RELEASE SAVEPOINT s1; +COMMIT; +SELECT * FROM artists WHERE id=5; + id | name +----+------ +(0 rows) + +-- ROLLBACK TO SAVEPOINT +BEGIN; +INSERT INTO artists VALUES (5, 'Asher Lev'); +SAVEPOINT s1; +DELETE FROM artists WHERE id=5; +ROLLBACK TO SAVEPOINT s1; +COMMIT; +SELECT * FROM artists WHERE id=5; + id | name +----+----------- + 5 | Asher Lev +(1 row) + +-- Serial sub-transaction releases +BEGIN; +SAVEPOINT s1; +DELETE FROM artists WHERE id=5; +RELEASE SAVEPOINT s1; +SAVEPOINT s2; +INSERT INTO artists VALUES (5, 'Jacob Kahn'); +RELEASE SAVEPOINT s2; +COMMIT; +SELECT * FROM artists WHERE id=5; + id | name +----+------------ + 5 | Jacob Kahn +(1 row) + +-- Serial sub-transaction rollbacks +BEGIN; +SAVEPOINT s1; +UPDATE artists SET name='A' WHERE id=5; +ROLLBACK TO SAVEPOINT s1; +SAVEPOINT s2; +DELETE FROM artists WHERE id=5; +ROLLBACK TO SAVEPOINT s2; +COMMIT; +SELECT * FROM artists WHERE id=5; + id | name +----+------------ + 5 | Jacob Kahn +(1 row) + +-- Multiple sub-transaction activity before first query +BEGIN; +SAVEPOINT s0; +SAVEPOINT s1; +SAVEPOINT s2; +SAVEPOINT s3; +ROLLBACK TO SAVEPOINT s2; +RELEASE SAVEPOINT s1; +INSERT INTO artists VALUES (6, 'John J. Audubon'); +ROLLBACK TO SAVEPOINT s0; +INSERT INTO artists VALUES (6, 'Emily Carr'); +COMMIT; +SELECT * FROM artists WHERE id=6; + id | name +----+------------ + 6 | Emily Carr +(1 row) + +-- Release after rollback +BEGIN; +SAVEPOINT s1; +ROLLBACK TO s1; +RELEASE SAVEPOINT s1; +SAVEPOINT s2; +INSERT INTO artists VALUES (7, 'John J. Audubon'); +ROLLBACK TO s2; +RELEASE SAVEPOINT s2; +COMMIT; +SELECT * FROM artists WHERE id=7; + id | name +----+------ +(0 rows) + +-- Recover from errors +\set VERBOSITY terse +BEGIN; +SAVEPOINT s1; +SAVEPOINT s2; +INSERT INTO artists VALUES (7, NULL); +ERROR: null value in column "name" violates not-null constraint +ROLLBACK TO SAVEPOINT s1; +COMMIT; +-- Don't recover from errors +BEGIN; +SAVEPOINT s1; +SAVEPOINT s2; +INSERT INTO artists VALUES (7, NULL); +ERROR: null value in column "name" violates not-null constraint +SAVEPOINT s3; +ERROR: current transaction is aborted, commands ignored until end of transaction block +ROLLBACK TO SAVEPOINT s3; +ERROR: no such savepoint +COMMIT; +-- =================================================================== +-- Tests for replication factor > 1 +-- =================================================================== +CREATE TABLE researchers ( + id bigint NOT NULL, + lab_id int NOT NULL, + name text NOT NULL +); +SELECT master_create_distributed_table('researchers', 'lab_id', 'hash'); + master_create_distributed_table +--------------------------------- + +(1 row) + +SELECT master_create_worker_shards('researchers', 2, 2); + master_create_worker_shards +----------------------------- + +(1 row) + +-- Basic rollback and release +BEGIN; +INSERT INTO researchers VALUES (7, 4, 'Jan Plaza'); +SAVEPOINT s1; +INSERT INTO researchers VALUES (8, 4, 'Alonzo Church'); +ROLLBACK TO s1; +RELEASE SAVEPOINT s1; +COMMIT; +SELECT * FROM researchers WHERE id in (7, 8); + id | lab_id | name +----+--------+----------- + 7 | 4 | Jan Plaza +(1 row) + +-- Recover from failure on one of nodes +BEGIN; +SAVEPOINT s1; +INSERT INTO researchers VALUES (11, 11, 'Dana Scott'); +INSERT INTO researchers VALUES (NULL, 10, 'Stephen Kleene'); +ERROR: null value in column "id" violates not-null constraint +ROLLBACK TO SAVEPOINT s1; +INSERT INTO researchers VALUES (12, 10, 'Stephen Kleene'); +COMMIT; +SELECT * FROM researchers WHERE lab_id=10; + id | lab_id | name +----+--------+---------------- + 12 | 10 | Stephen Kleene +(1 row) + +-- Don't recover, but rollback +BEGIN; +SAVEPOINT s1; +INSERT INTO researchers VALUES (NULL, 10, 'Raymond Smullyan'); +ERROR: null value in column "id" violates not-null constraint +RELEASE SAVEPOINT s1; +ERROR: current transaction is aborted, commands ignored until end of transaction block +SAVEPOINT s2; +ERROR: current transaction is aborted, commands ignored until end of transaction block +ROLLBACK; +SELECT * FROM researchers WHERE lab_id=10; + id | lab_id | name +----+--------+---------------- + 12 | 10 | Stephen Kleene +(1 row) + +-- Don't recover, and commit +BEGIN; +SAVEPOINT s1; +INSERT INTO researchers VALUES (NULL, 10, 'Raymond Smullyan'); +ERROR: null value in column "id" violates not-null constraint +RELEASE SAVEPOINT s1; +ERROR: current transaction is aborted, commands ignored until end of transaction block +SAVEPOINT s2; +ERROR: current transaction is aborted, commands ignored until end of transaction block +COMMIT; +SELECT * FROM researchers WHERE lab_id=10; + id | lab_id | name +----+--------+---------------- + 12 | 10 | Stephen Kleene +(1 row) + +-- Implicit savepoints via pl/pgsql exceptions +BEGIN; +DO $$ +BEGIN + INSERT INTO researchers VALUES (15, 10, 'Melvin Fitting'); + INSERT INTO researchers VALUES (NULL, 10, 'Raymond Smullyan'); +EXCEPTION + WHEN not_null_violation THEN + RAISE NOTICE 'caught not_null_violation'; +END $$; +NOTICE: caught not_null_violation +COMMIT; +SELECT * FROM researchers WHERE lab_id=10; + id | lab_id | name +----+--------+---------------- + 12 | 10 | Stephen Kleene +(1 row) + +BEGIN; +DO $$ +BEGIN + INSERT INTO researchers VALUES (15, 10, 'Melvin Fitting'); + RAISE EXCEPTION plpgsql_error; +EXCEPTION + WHEN plpgsql_error THEN + RAISE NOTICE 'caught manual plpgsql_error'; +END $$; +NOTICE: caught manual plpgsql_error +COMMIT; +SELECT * FROM researchers WHERE lab_id=10; + id | lab_id | name +----+--------+---------------- + 12 | 10 | Stephen Kleene +(1 row) + +BEGIN; +DO $$ +BEGIN + INSERT INTO researchers VALUES (15, 10, 'Melvin Fitting'); + INSERT INTO researchers VALUES (NULL, 10, 'Raymond Smullyan'); +EXCEPTION + WHEN not_null_violation THEN + RAISE EXCEPTION not_null_violation; -- rethrow it +END $$; +ERROR: not_null_violation +COMMIT; +SELECT * FROM researchers WHERE lab_id=10; + id | lab_id | name +----+--------+---------------- + 12 | 10 | Stephen Kleene +(1 row) + +-- Insert something after catching error. +BEGIN; +DO $$ +BEGIN + INSERT INTO researchers VALUES (15, 10, 'Melvin Fitting'); + INSERT INTO researchers VALUES (NULL, 10, 'Raymond Smullyan'); +EXCEPTION + WHEN not_null_violation THEN + INSERT INTO researchers VALUES (32, 10, 'Raymond Smullyan'); +END $$; +COMMIT; +SELECT * FROM researchers WHERE lab_id=10; + id | lab_id | name +----+--------+------------------ + 12 | 10 | Stephen Kleene + 32 | 10 | Raymond Smullyan +(2 rows) + +-- Clean-up +DROP TABLE artists; +DROP TABLE researchers; diff --git a/src/test/regress/input/multi_alter_table_statements.source b/src/test/regress/input/multi_alter_table_statements.source index 246adcc49..c7b4fe06b 100644 --- a/src/test/regress/input/multi_alter_table_statements.source +++ b/src/test/regress/input/multi_alter_table_statements.source @@ -214,7 +214,7 @@ SAVEPOINT my_savepoint; CREATE INDEX temp_index_3 ON lineitem_alter(l_partkey); ROLLBACK; --- but that actually rolling back to it is not +-- and also rolling back to it is also allowed BEGIN; CREATE INDEX temp_index_2 ON lineitem_alter(l_orderkey); SAVEPOINT my_savepoint; @@ -223,6 +223,7 @@ ROLLBACK TO my_savepoint; COMMIT; SELECT indexname, tablename FROM pg_indexes WHERE tablename = 'lineitem_alter'; +DROP INDEX temp_index_2; -- Add column on only one worker... \c - - - :worker_2_port diff --git a/src/test/regress/multi_schedule b/src/test/regress/multi_schedule index b860d0e71..1b42d6602 100644 --- a/src/test/regress/multi_schedule +++ b/src/test/regress/multi_schedule @@ -159,6 +159,7 @@ test: multi_repartition_udt test: multi_repartitioned_subquery_udf test: multi_modifying_xacts test: multi_transaction_recovery +test: multi_subtransactions # --------- # multi_copy creates hash and range-partitioned tables and performs COPY diff --git a/src/test/regress/output/multi_alter_table_statements.source b/src/test/regress/output/multi_alter_table_statements.source index 561093627..573e4ff95 100644 --- a/src/test/regress/output/multi_alter_table_statements.source +++ b/src/test/regress/output/multi_alter_table_statements.source @@ -505,19 +505,20 @@ CREATE INDEX temp_index_2 ON lineitem_alter(l_orderkey); SAVEPOINT my_savepoint; CREATE INDEX temp_index_3 ON lineitem_alter(l_partkey); ROLLBACK; --- but that actually rolling back to it is not +-- and also rolling back to it is also allowed BEGIN; CREATE INDEX temp_index_2 ON lineitem_alter(l_orderkey); SAVEPOINT my_savepoint; CREATE INDEX temp_index_3 ON lineitem_alter(l_partkey); ROLLBACK TO my_savepoint; COMMIT; -ERROR: cannot ROLLBACK TO SAVEPOINT in transactions which modify distributed tables SELECT indexname, tablename FROM pg_indexes WHERE tablename = 'lineitem_alter'; - indexname | tablename ------------+----------- -(0 rows) + indexname | tablename +--------------+---------------- + temp_index_2 | lineitem_alter +(1 row) +DROP INDEX temp_index_2; -- Add column on only one worker... \c - - - :worker_2_port ALTER TABLE lineitem_alter_220000 ADD COLUMN first integer; diff --git a/src/test/regress/sql/multi_modifying_xacts.sql b/src/test/regress/sql/multi_modifying_xacts.sql index 1b4d84b9b..34cac1151 100644 --- a/src/test/regress/sql/multi_modifying_xacts.sql +++ b/src/test/regress/sql/multi_modifying_xacts.sql @@ -82,7 +82,7 @@ EXCEPTION END $$; COMMIT; --- but rollback should not +-- rollback should also work BEGIN; INSERT INTO researchers VALUES (7, 4, 'Jim Gray'); SAVEPOINT hire_engelbart; @@ -1108,4 +1108,4 @@ SELECT * FROM users JOIN usergroups ON (user_group = gid) WHERE id = 2; SELECT * FROM users JOIN usergroups ON (user_group = gid) WHERE id = 4; END; -DROP TABLE items, users, itemgroups, usergroups; +DROP TABLE items, users, itemgroups, usergroups, researchers, labs; diff --git a/src/test/regress/sql/multi_mx_modifying_xacts.sql b/src/test/regress/sql/multi_mx_modifying_xacts.sql index 5182ca3fb..0e74d8fbc 100644 --- a/src/test/regress/sql/multi_mx_modifying_xacts.sql +++ b/src/test/regress/sql/multi_mx_modifying_xacts.sql @@ -88,7 +88,7 @@ EXCEPTION END $$; COMMIT; --- but rollback should not +-- rollback should also work BEGIN; INSERT INTO researchers_mx VALUES (7, 4, 'Jim Gray'); SAVEPOINT hire_engelbart; diff --git a/src/test/regress/sql/multi_subtransactions.sql b/src/test/regress/sql/multi_subtransactions.sql new file mode 100644 index 000000000..0998cb014 --- /dev/null +++ b/src/test/regress/sql/multi_subtransactions.sql @@ -0,0 +1,215 @@ + +CREATE TABLE artists ( + id bigint NOT NULL, + name text NOT NULL +); +SELECT create_distributed_table('artists', 'id'); + +-- add some data +INSERT INTO artists VALUES (1, 'Pablo Picasso'); +INSERT INTO artists VALUES (2, 'Vincent van Gogh'); +INSERT INTO artists VALUES (3, 'Claude Monet'); +INSERT INTO artists VALUES (4, 'William Kurelek'); + +-- RELEASE SAVEPOINT +BEGIN; +INSERT INTO artists VALUES (5, 'Asher Lev'); +SAVEPOINT s1; +DELETE FROM artists WHERE id=5; +RELEASE SAVEPOINT s1; +COMMIT; + +SELECT * FROM artists WHERE id=5; + +-- ROLLBACK TO SAVEPOINT +BEGIN; +INSERT INTO artists VALUES (5, 'Asher Lev'); +SAVEPOINT s1; +DELETE FROM artists WHERE id=5; +ROLLBACK TO SAVEPOINT s1; +COMMIT; + +SELECT * FROM artists WHERE id=5; + +-- Serial sub-transaction releases +BEGIN; +SAVEPOINT s1; +DELETE FROM artists WHERE id=5; +RELEASE SAVEPOINT s1; +SAVEPOINT s2; +INSERT INTO artists VALUES (5, 'Jacob Kahn'); +RELEASE SAVEPOINT s2; +COMMIT; + +SELECT * FROM artists WHERE id=5; + +-- Serial sub-transaction rollbacks +BEGIN; +SAVEPOINT s1; +UPDATE artists SET name='A' WHERE id=5; +ROLLBACK TO SAVEPOINT s1; +SAVEPOINT s2; +DELETE FROM artists WHERE id=5; +ROLLBACK TO SAVEPOINT s2; +COMMIT; + +SELECT * FROM artists WHERE id=5; + +-- Multiple sub-transaction activity before first query +BEGIN; +SAVEPOINT s0; +SAVEPOINT s1; +SAVEPOINT s2; +SAVEPOINT s3; +ROLLBACK TO SAVEPOINT s2; +RELEASE SAVEPOINT s1; +INSERT INTO artists VALUES (6, 'John J. Audubon'); +ROLLBACK TO SAVEPOINT s0; +INSERT INTO artists VALUES (6, 'Emily Carr'); +COMMIT; + +SELECT * FROM artists WHERE id=6; + +-- Release after rollback +BEGIN; +SAVEPOINT s1; +ROLLBACK TO s1; +RELEASE SAVEPOINT s1; +SAVEPOINT s2; +INSERT INTO artists VALUES (7, 'John J. Audubon'); +ROLLBACK TO s2; +RELEASE SAVEPOINT s2; +COMMIT; + +SELECT * FROM artists WHERE id=7; + +-- Recover from errors +\set VERBOSITY terse +BEGIN; +SAVEPOINT s1; +SAVEPOINT s2; +INSERT INTO artists VALUES (7, NULL); +ROLLBACK TO SAVEPOINT s1; +COMMIT; + +-- Don't recover from errors +BEGIN; +SAVEPOINT s1; +SAVEPOINT s2; +INSERT INTO artists VALUES (7, NULL); +SAVEPOINT s3; +ROLLBACK TO SAVEPOINT s3; +COMMIT; + +-- =================================================================== +-- Tests for replication factor > 1 +-- =================================================================== + +CREATE TABLE researchers ( + id bigint NOT NULL, + lab_id int NOT NULL, + name text NOT NULL +); + +SELECT master_create_distributed_table('researchers', 'lab_id', 'hash'); +SELECT master_create_worker_shards('researchers', 2, 2); + +-- Basic rollback and release +BEGIN; +INSERT INTO researchers VALUES (7, 4, 'Jan Plaza'); +SAVEPOINT s1; +INSERT INTO researchers VALUES (8, 4, 'Alonzo Church'); +ROLLBACK TO s1; +RELEASE SAVEPOINT s1; +COMMIT; + +SELECT * FROM researchers WHERE id in (7, 8); + +-- Recover from failure on one of nodes +BEGIN; +SAVEPOINT s1; +INSERT INTO researchers VALUES (11, 11, 'Dana Scott'); +INSERT INTO researchers VALUES (NULL, 10, 'Stephen Kleene'); +ROLLBACK TO SAVEPOINT s1; +INSERT INTO researchers VALUES (12, 10, 'Stephen Kleene'); +COMMIT; + +SELECT * FROM researchers WHERE lab_id=10; + +-- Don't recover, but rollback +BEGIN; +SAVEPOINT s1; +INSERT INTO researchers VALUES (NULL, 10, 'Raymond Smullyan'); +RELEASE SAVEPOINT s1; +SAVEPOINT s2; +ROLLBACK; + +SELECT * FROM researchers WHERE lab_id=10; + +-- Don't recover, and commit +BEGIN; +SAVEPOINT s1; +INSERT INTO researchers VALUES (NULL, 10, 'Raymond Smullyan'); +RELEASE SAVEPOINT s1; +SAVEPOINT s2; +COMMIT; + +SELECT * FROM researchers WHERE lab_id=10; + +-- Implicit savepoints via pl/pgsql exceptions +BEGIN; +DO $$ +BEGIN + INSERT INTO researchers VALUES (15, 10, 'Melvin Fitting'); + INSERT INTO researchers VALUES (NULL, 10, 'Raymond Smullyan'); +EXCEPTION + WHEN not_null_violation THEN + RAISE NOTICE 'caught not_null_violation'; +END $$; +COMMIT; + +SELECT * FROM researchers WHERE lab_id=10; + +BEGIN; +DO $$ +BEGIN + INSERT INTO researchers VALUES (15, 10, 'Melvin Fitting'); + RAISE EXCEPTION plpgsql_error; +EXCEPTION + WHEN plpgsql_error THEN + RAISE NOTICE 'caught manual plpgsql_error'; +END $$; +COMMIT; + +SELECT * FROM researchers WHERE lab_id=10; + +BEGIN; +DO $$ +BEGIN + INSERT INTO researchers VALUES (15, 10, 'Melvin Fitting'); + INSERT INTO researchers VALUES (NULL, 10, 'Raymond Smullyan'); +EXCEPTION + WHEN not_null_violation THEN + RAISE EXCEPTION not_null_violation; -- rethrow it +END $$; +COMMIT; + +SELECT * FROM researchers WHERE lab_id=10; + +-- Insert something after catching error. +BEGIN; +DO $$ +BEGIN + INSERT INTO researchers VALUES (15, 10, 'Melvin Fitting'); + INSERT INTO researchers VALUES (NULL, 10, 'Raymond Smullyan'); +EXCEPTION + WHEN not_null_violation THEN + INSERT INTO researchers VALUES (32, 10, 'Raymond Smullyan'); +END $$; +COMMIT; + +SELECT * FROM researchers WHERE lab_id=10; + +-- Clean-up +DROP TABLE artists; +DROP TABLE researchers;