Add Savepoint Support (#1539)

This change adds support for SAVEPOINT, ROLLBACK TO SAVEPOINT, and RELEASE SAVEPOINT.

When transaction connections are not established yet, savepoints are kept in a stack and sent to the worker when the connection is later established. After establishing connections, savepoint commands are sent as they arrive.

This change fixes #1493 .
pull/1577/head
Hadi Moshayedi 2017-08-15 13:02:28 -04:00 committed by GitHub
parent dcabbc4a8e
commit e5fbcf37dd
13 changed files with 935 additions and 41 deletions

View File

@ -27,6 +27,19 @@
#include "utils/hsearch.h"
static void StartRemoteTransactionSavepointBegin(MultiConnection *connection,
SubTransactionId subId);
static void FinishRemoteTransactionSavepointBegin(MultiConnection *connection,
SubTransactionId subId);
static void StartRemoteTransactionSavepointRelease(MultiConnection *connection,
SubTransactionId subId);
static void FinishRemoteTransactionSavepointRelease(MultiConnection *connection,
SubTransactionId subId);
static void StartRemoteTransactionSavepointRollback(MultiConnection *connection,
SubTransactionId subId);
static void FinishRemoteTransactionSavepointRollback(MultiConnection *connection,
SubTransactionId subId);
static void CheckTransactionHealth(void);
static void Assign2PCIdentifier(MultiConnection *connection);
static void WarnAboutLeakedPreparedTransaction(MultiConnection *connection, bool commit);
@ -44,6 +57,8 @@ StartRemoteTransactionBegin(struct MultiConnection *connection)
RemoteTransaction *transaction = &connection->remoteTransaction;
StringInfo beginAndSetDistributedTransactionId = makeStringInfo();
DistributedTransactionId *distributedTransactionId = NULL;
ListCell *subIdCell = NULL;
List *activeSubXacts = NIL;
Assert(transaction->transactionState == REMOTE_TRANS_INVALID);
@ -67,11 +82,22 @@ StartRemoteTransactionBegin(struct MultiConnection *connection)
*/
distributedTransactionId = GetCurrentDistributedTransactionId();
appendStringInfo(beginAndSetDistributedTransactionId,
"SELECT assign_distributed_transaction_id(%d, %ld, '%s')",
"SELECT assign_distributed_transaction_id(%d, %ld, '%s');",
distributedTransactionId->initiatorNodeIdentifier,
distributedTransactionId->transactionNumber,
timestamptz_to_str(distributedTransactionId->timestamp));
/* append in-progress savepoints for this transaction */
activeSubXacts = ActiveSubXacts();
transaction->lastSuccessfulSubXact = TopSubTransactionId;
transaction->lastQueuedSubXact = TopSubTransactionId;
foreach(subIdCell, activeSubXacts)
{
SubTransactionId subId = lfirst_int(subIdCell);
appendStringInfo(beginAndSetDistributedTransactionId,
"SAVEPOINT savepoint_%u;", subId);
transaction->lastQueuedSubXact = subId;
}
if (!SendRemoteCommand(connection, beginAndSetDistributedTransactionId->data))
{
@ -103,6 +129,7 @@ FinishRemoteTransactionBegin(struct MultiConnection *connection)
else
{
transaction->transactionState = REMOTE_TRANS_STARTED;
transaction->lastSuccessfulSubXact = transaction->lastQueuedSubXact;
}
PQclear(result);
@ -861,6 +888,293 @@ CoordinatedRemoteTransactionsAbort(void)
}
/*
* CoordinatedRemoteTransactionsSavepointBegin sends the SAVEPOINT command for
* the given sub-transaction id to all connections participating in the current
* transaction.
*/
void
CoordinatedRemoteTransactionsSavepointBegin(SubTransactionId subId)
{
dlist_iter iter;
const bool raiseInterrupts = true;
List *connectionList = NIL;
/* asynchronously send SAVEPOINT */
dlist_foreach(iter, &InProgressTransactions)
{
MultiConnection *connection = dlist_container(MultiConnection, transactionNode,
iter.cur);
RemoteTransaction *transaction = &connection->remoteTransaction;
if (transaction->transactionFailed)
{
continue;
}
StartRemoteTransactionSavepointBegin(connection, subId);
connectionList = lappend(connectionList, connection);
}
WaitForAllConnections(connectionList, raiseInterrupts);
/* and wait for the results */
dlist_foreach(iter, &InProgressTransactions)
{
MultiConnection *connection = dlist_container(MultiConnection, transactionNode,
iter.cur);
RemoteTransaction *transaction = &connection->remoteTransaction;
if (transaction->transactionFailed)
{
continue;
}
FinishRemoteTransactionSavepointBegin(connection, subId);
if (!transaction->transactionFailed)
{
transaction->lastSuccessfulSubXact = subId;
}
}
}
/*
* CoordinatedRemoteTransactionsSavepointRelease sends the RELEASE SAVEPOINT
* command for the given sub-transaction id to all connections participating in
* the current transaction.
*/
void
CoordinatedRemoteTransactionsSavepointRelease(SubTransactionId subId)
{
dlist_iter iter;
const bool raiseInterrupts = true;
List *connectionList = NIL;
/* asynchronously send RELEASE SAVEPOINT */
dlist_foreach(iter, &InProgressTransactions)
{
MultiConnection *connection = dlist_container(MultiConnection, transactionNode,
iter.cur);
RemoteTransaction *transaction = &connection->remoteTransaction;
if (transaction->transactionFailed)
{
continue;
}
StartRemoteTransactionSavepointRelease(connection, subId);
connectionList = lappend(connectionList, connection);
}
WaitForAllConnections(connectionList, raiseInterrupts);
/* and wait for the results */
dlist_foreach(iter, &InProgressTransactions)
{
MultiConnection *connection = dlist_container(MultiConnection, transactionNode,
iter.cur);
RemoteTransaction *transaction = &connection->remoteTransaction;
if (transaction->transactionFailed)
{
continue;
}
FinishRemoteTransactionSavepointRelease(connection, subId);
}
}
/*
* CoordinatedRemoteTransactionsSavepointRollback sends the ROLLBACK TO SAVEPOINT
* command for the given sub-transaction id to all connections participating in
* the current transaction.
*/
void
CoordinatedRemoteTransactionsSavepointRollback(SubTransactionId subId)
{
dlist_iter iter;
const bool dontRaiseInterrupts = false;
List *connectionList = NIL;
/* asynchronously send ROLLBACK TO SAVEPOINT */
dlist_foreach(iter, &InProgressTransactions)
{
MultiConnection *connection = dlist_container(MultiConnection, transactionNode,
iter.cur);
RemoteTransaction *transaction = &connection->remoteTransaction;
if (transaction->transactionFailed)
{
if (transaction->lastSuccessfulSubXact <= subId)
{
transaction->transactionRecovering = true;
/*
* Clear the results of the failed query so we can send the ROLLBACK
* TO SAVEPOINT command for a savepoint that can recover the transaction
* from failure.
*/
ForgetResults(connection);
}
else
{
continue;
}
}
StartRemoteTransactionSavepointRollback(connection, subId);
connectionList = lappend(connectionList, connection);
}
WaitForAllConnections(connectionList, dontRaiseInterrupts);
/* and wait for the results */
dlist_foreach(iter, &InProgressTransactions)
{
MultiConnection *connection = dlist_container(MultiConnection, transactionNode,
iter.cur);
RemoteTransaction *transaction = &connection->remoteTransaction;
if (transaction->transactionFailed && !transaction->transactionRecovering)
{
continue;
}
FinishRemoteTransactionSavepointRollback(connection, subId);
}
}
/*
* StartRemoteTransactionSavepointBegin initiates SAVEPOINT command for the given
* subtransaction id in a non-blocking manner.
*/
static void
StartRemoteTransactionSavepointBegin(MultiConnection *connection, SubTransactionId subId)
{
const bool raiseErrors = true;
StringInfo savepointCommand = makeStringInfo();
appendStringInfo(savepointCommand, "SAVEPOINT savepoint_%u", subId);
if (!SendRemoteCommand(connection, savepointCommand->data))
{
ReportConnectionError(connection, WARNING);
MarkRemoteTransactionFailed(connection, raiseErrors);
}
}
/*
* FinishRemoteTransactionSavepointBegin finishes the work
* StartRemoteTransactionSavepointBegin initiated. It blocks if necessary (i.e.
* if PQisBusy() would return true).
*/
static void
FinishRemoteTransactionSavepointBegin(MultiConnection *connection, SubTransactionId subId)
{
const bool raiseErrors = true;
PGresult *result = GetRemoteCommandResult(connection, raiseErrors);
if (!IsResponseOK(result))
{
ReportResultError(connection, result, WARNING);
MarkRemoteTransactionFailed(connection, raiseErrors);
}
PQclear(result);
ForgetResults(connection);
}
/*
* StartRemoteTransactionSavepointRelease initiates RELEASE SAVEPOINT command for
* the given subtransaction id in a non-blocking manner.
*/
static void
StartRemoteTransactionSavepointRelease(MultiConnection *connection,
SubTransactionId subId)
{
const bool raiseErrors = true;
StringInfo savepointCommand = makeStringInfo();
appendStringInfo(savepointCommand, "RELEASE SAVEPOINT savepoint_%u", subId);
if (!SendRemoteCommand(connection, savepointCommand->data))
{
ReportConnectionError(connection, WARNING);
MarkRemoteTransactionFailed(connection, raiseErrors);
}
}
/*
* FinishRemoteTransactionSavepointRelease finishes the work
* StartRemoteTransactionSavepointRelease initiated. It blocks if necessary (i.e.
* if PQisBusy() would return true).
*/
static void
FinishRemoteTransactionSavepointRelease(MultiConnection *connection,
SubTransactionId subId)
{
const bool raiseErrors = true;
PGresult *result = GetRemoteCommandResult(connection, raiseErrors);
if (!IsResponseOK(result))
{
ReportResultError(connection, result, WARNING);
MarkRemoteTransactionFailed(connection, raiseErrors);
}
PQclear(result);
ForgetResults(connection);
}
/*
* StartRemoteTransactionSavepointRollback initiates ROLLBACK TO SAVEPOINT command
* for the given subtransaction id in a non-blocking manner.
*/
static void
StartRemoteTransactionSavepointRollback(MultiConnection *connection,
SubTransactionId subId)
{
const bool dontRaiseErrors = false;
StringInfo savepointCommand = makeStringInfo();
appendStringInfo(savepointCommand, "ROLLBACK TO SAVEPOINT savepoint_%u", subId);
if (!SendRemoteCommand(connection, savepointCommand->data))
{
ReportConnectionError(connection, WARNING);
MarkRemoteTransactionFailed(connection, dontRaiseErrors);
}
}
/*
* FinishRemoteTransactionSavepointRollback finishes the work
* StartRemoteTransactionSavepointRollback initiated. It blocks if necessary (i.e.
* if PQisBusy() would return true). It also recovers the transaction from failure
* if transaction is recovering and the rollback command succeeds.
*/
static void
FinishRemoteTransactionSavepointRollback(MultiConnection *connection, SubTransactionId
subId)
{
const bool dontRaiseErrors = false;
RemoteTransaction *transaction = &connection->remoteTransaction;
PGresult *result = GetRemoteCommandResult(connection, dontRaiseErrors);
if (!IsResponseOK(result))
{
ReportResultError(connection, result, WARNING);
MarkRemoteTransactionFailed(connection, dontRaiseErrors);
}
/* ROLLBACK TO SAVEPOINT succeeded, check if it recovers the transaction */
else if (transaction->transactionRecovering)
{
transaction->transactionFailed = false;
transaction->transactionRecovering = false;
}
PQclear(result);
ForgetResults(connection);
}
/*
* CheckTransactionHealth checks if any of the participating transactions in a
* coordinated transaction failed, and what consequence that should have.

View File

@ -27,6 +27,7 @@
#include "distributed/placement_connection.h"
#include "utils/hsearch.h"
#include "utils/guc.h"
#include "utils/memutils.h"
CoordinatedTransactionState CurrentCoordinatedTransactionState = COORD_TRANS_NONE;
@ -41,8 +42,8 @@ XactModificationType XactModificationLevel = XACT_MODIFICATION_NONE;
/* list of connections that are part of the current coordinated transaction */
dlist_head InProgressTransactions = DLIST_STATIC_INIT(InProgressTransactions);
static bool subXactAbortAttempted = false;
/* stack of active sub-transactions */
static List *activeSubXacts = NIL;
/*
* Should this coordinated transaction use 2PC? Set by
@ -58,6 +59,8 @@ static void CoordinatedSubTransactionCallback(SubXactEvent event, SubTransaction
/* remaining functions */
static void AdjustMaxPreparedTransactions(void);
static void PushSubXact(SubTransactionId subId);
static void PopSubXact(SubTransactionId subId);
/*
@ -166,7 +169,6 @@ CoordinatedTransactionCallback(XactEvent event, void *arg)
AfterXactConnectionHandling(true);
}
Assert(!subXactAbortAttempted);
CurrentCoordinatedTransactionState = COORD_TRANS_NONE;
XactModificationLevel = XACT_MODIFICATION_NONE;
dlist_init(&InProgressTransactions);
@ -208,7 +210,6 @@ CoordinatedTransactionCallback(XactEvent event, void *arg)
XactModificationLevel = XACT_MODIFICATION_NONE;
dlist_init(&InProgressTransactions);
CoordinatedTransactionUses2PC = false;
subXactAbortAttempted = false;
UnSetDistributedTransactionId();
break;
}
@ -227,18 +228,6 @@ CoordinatedTransactionCallback(XactEvent event, void *arg)
case XACT_EVENT_PRE_COMMIT:
{
if (subXactAbortAttempted)
{
subXactAbortAttempted = false;
if (XactModificationLevel != XACT_MODIFICATION_NONE)
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot ROLLBACK TO SAVEPOINT in transactions "
"which modify distributed tables")));
}
}
/* nothing further to do if there's no managed remote xacts */
if (CurrentCoordinatedTransactionState == COORD_TRANS_NONE)
{
@ -308,9 +297,43 @@ static void
CoordinatedSubTransactionCallback(SubXactEvent event, SubTransactionId subId,
SubTransactionId parentSubid, void *arg)
{
if (event == SUBXACT_EVENT_ABORT_SUB)
switch (event)
{
subXactAbortAttempted = true;
case SUBXACT_EVENT_START_SUB:
{
PushSubXact(subId);
if (InCoordinatedTransaction())
{
CoordinatedRemoteTransactionsSavepointBegin(subId);
}
break;
}
case SUBXACT_EVENT_COMMIT_SUB:
{
PopSubXact(subId);
if (InCoordinatedTransaction())
{
CoordinatedRemoteTransactionsSavepointRelease(subId);
}
break;
}
case SUBXACT_EVENT_ABORT_SUB:
{
PopSubXact(subId);
if (InCoordinatedTransaction())
{
CoordinatedRemoteTransactionsSavepointRollback(subId);
}
break;
}
case SUBXACT_EVENT_PRE_COMMIT_SUB:
{
/* nothing to do */
break;
}
}
}
@ -344,3 +367,45 @@ AdjustMaxPreparedTransactions(void)
newvalue)));
}
}
/* PushSubXact pushes subId to the stack of active sub-transactions. */
static void
PushSubXact(SubTransactionId subId)
{
MemoryContext old_context = MemoryContextSwitchTo(CurTransactionContext);
activeSubXacts = lcons_int(subId, activeSubXacts);
MemoryContextSwitchTo(old_context);
}
/* PopSubXact pops subId from the stack of active sub-transactions. */
static void
PopSubXact(SubTransactionId subId)
{
MemoryContext old_context = MemoryContextSwitchTo(CurTransactionContext);
Assert(linitial_int(activeSubXacts) == subId);
activeSubXacts = list_delete_first(activeSubXacts);
MemoryContextSwitchTo(old_context);
}
/* ActiveSubXacts returns list of active sub-transactions in temporal order. */
List *
ActiveSubXacts(void)
{
ListCell *subIdCell = NULL;
List *activeSubXactsReversed = NIL;
/*
* activeSubXacts is in reversed temporal order, so we reverse it to get it
* in temporal order.
*/
foreach(subIdCell, activeSubXacts)
{
SubTransactionId subId = lfirst_int(subIdCell);
activeSubXactsReversed = lcons_int(subId, activeSubXactsReversed);
}
return activeSubXactsReversed;
}

View File

@ -61,6 +61,19 @@ typedef struct RemoteTransaction
/* failed in current transaction */
bool transactionFailed;
/*
* Id of last savepoint that successfully began before transaction failure.
* Since savepoint ids are assigned incrementally, rolling back to any savepoint
* with id equal to or less than this id recovers the transaction from failures.
*/
SubTransactionId lastSuccessfulSubXact;
/* Id of last savepoint queued before first query of transaction */
SubTransactionId lastQueuedSubXact;
/* waiting for the result of a recovering ROLLBACK TO SAVEPOINT command */
bool transactionRecovering;
/* 2PC transaction name currently associated with connection */
char preparedName[NAMEDATALEN];
} RemoteTransaction;
@ -107,4 +120,9 @@ extern void CoordinatedRemoteTransactionsPrepare(void);
extern void CoordinatedRemoteTransactionsCommit(void);
extern void CoordinatedRemoteTransactionsAbort(void);
/* remote savepoint commands */
extern void CoordinatedRemoteTransactionsSavepointBegin(SubTransactionId subId);
extern void CoordinatedRemoteTransactionsSavepointRelease(SubTransactionId subId);
extern void CoordinatedRemoteTransactionsSavepointRollback(SubTransactionId subId);
#endif /* REMOTE_TRANSACTION_H */

View File

@ -10,6 +10,7 @@
#define TRANSACTION_MANAGMENT_H
#include "lib/ilist.h"
#include "nodes/pg_list.h"
/* describes what kind of modifications have occurred in the current transaction */
typedef enum
@ -77,5 +78,8 @@ extern void CoordinatedTransactionUse2PC(void);
/* initialization function(s) */
extern void InitializeTransactionManagement(void);
/* other functions */
extern List * ActiveSubXacts(void);
#endif /* TRANSACTION_MANAGMENT_H */

View File

@ -108,18 +108,18 @@ EXCEPTION
RAISE NOTICE 'caught not_null_violation';
END $$;
COMMIT;
-- but rollback should not
-- rollback should also work
BEGIN;
INSERT INTO researchers VALUES (7, 4, 'Jim Gray');
SAVEPOINT hire_engelbart;
INSERT INTO researchers VALUES (8, 4, 'Douglas Engelbart');
ROLLBACK TO hire_engelbart;
COMMIT;
ERROR: cannot ROLLBACK TO SAVEPOINT in transactions which modify distributed tables
SELECT name FROM researchers WHERE lab_id = 4;
name
------
(0 rows)
name
----------
Jim Gray
(1 row)
BEGIN;
DO $$
@ -132,7 +132,6 @@ EXCEPTION
END $$;
NOTICE: caught not_null_violation
COMMIT;
ERROR: cannot ROLLBACK TO SAVEPOINT in transactions which modify distributed tables
\set VERBOSITY default
-- should be valid to edit labs after researchers...
BEGIN;
@ -1522,4 +1521,4 @@ SELECT * FROM users JOIN usergroups ON (user_group = gid) WHERE id = 4;
(1 row)
END;
DROP TABLE items, users, itemgroups, usergroups;
DROP TABLE items, users, itemgroups, usergroups, researchers, labs;

View File

@ -100,18 +100,18 @@ EXCEPTION
RAISE NOTICE 'caught not_null_violation';
END $$;
COMMIT;
-- but rollback should not
-- rollback should also work
BEGIN;
INSERT INTO researchers_mx VALUES (7, 4, 'Jim Gray');
SAVEPOINT hire_engelbart;
INSERT INTO researchers_mx VALUES (8, 4, 'Douglas Engelbart');
ROLLBACK TO hire_engelbart;
COMMIT;
ERROR: cannot ROLLBACK TO SAVEPOINT in transactions which modify distributed tables
SELECT name FROM researchers_mx WHERE lab_id = 4;
name
------
(0 rows)
name
----------
Jim Gray
(1 row)
BEGIN;
DO $$
@ -123,7 +123,6 @@ EXCEPTION
END $$;
NOTICE: caught not_null_violation
COMMIT;
ERROR: could not make changes to shard 1220100 on any node
\set VERBOSITY default
-- should be valid to edit labs_mx after researchers_mx...
BEGIN;

View File

@ -0,0 +1,277 @@
CREATE TABLE artists (
id bigint NOT NULL,
name text NOT NULL
);
SELECT create_distributed_table('artists', 'id');
create_distributed_table
--------------------------
(1 row)
-- add some data
INSERT INTO artists VALUES (1, 'Pablo Picasso');
INSERT INTO artists VALUES (2, 'Vincent van Gogh');
INSERT INTO artists VALUES (3, 'Claude Monet');
INSERT INTO artists VALUES (4, 'William Kurelek');
-- RELEASE SAVEPOINT
BEGIN;
INSERT INTO artists VALUES (5, 'Asher Lev');
SAVEPOINT s1;
DELETE FROM artists WHERE id=5;
RELEASE SAVEPOINT s1;
COMMIT;
SELECT * FROM artists WHERE id=5;
id | name
----+------
(0 rows)
-- ROLLBACK TO SAVEPOINT
BEGIN;
INSERT INTO artists VALUES (5, 'Asher Lev');
SAVEPOINT s1;
DELETE FROM artists WHERE id=5;
ROLLBACK TO SAVEPOINT s1;
COMMIT;
SELECT * FROM artists WHERE id=5;
id | name
----+-----------
5 | Asher Lev
(1 row)
-- Serial sub-transaction releases
BEGIN;
SAVEPOINT s1;
DELETE FROM artists WHERE id=5;
RELEASE SAVEPOINT s1;
SAVEPOINT s2;
INSERT INTO artists VALUES (5, 'Jacob Kahn');
RELEASE SAVEPOINT s2;
COMMIT;
SELECT * FROM artists WHERE id=5;
id | name
----+------------
5 | Jacob Kahn
(1 row)
-- Serial sub-transaction rollbacks
BEGIN;
SAVEPOINT s1;
UPDATE artists SET name='A' WHERE id=5;
ROLLBACK TO SAVEPOINT s1;
SAVEPOINT s2;
DELETE FROM artists WHERE id=5;
ROLLBACK TO SAVEPOINT s2;
COMMIT;
SELECT * FROM artists WHERE id=5;
id | name
----+------------
5 | Jacob Kahn
(1 row)
-- Multiple sub-transaction activity before first query
BEGIN;
SAVEPOINT s0;
SAVEPOINT s1;
SAVEPOINT s2;
SAVEPOINT s3;
ROLLBACK TO SAVEPOINT s2;
RELEASE SAVEPOINT s1;
INSERT INTO artists VALUES (6, 'John J. Audubon');
ROLLBACK TO SAVEPOINT s0;
INSERT INTO artists VALUES (6, 'Emily Carr');
COMMIT;
SELECT * FROM artists WHERE id=6;
id | name
----+------------
6 | Emily Carr
(1 row)
-- Release after rollback
BEGIN;
SAVEPOINT s1;
ROLLBACK TO s1;
RELEASE SAVEPOINT s1;
SAVEPOINT s2;
INSERT INTO artists VALUES (7, 'John J. Audubon');
ROLLBACK TO s2;
RELEASE SAVEPOINT s2;
COMMIT;
SELECT * FROM artists WHERE id=7;
id | name
----+------
(0 rows)
-- Recover from errors
\set VERBOSITY terse
BEGIN;
SAVEPOINT s1;
SAVEPOINT s2;
INSERT INTO artists VALUES (7, NULL);
ERROR: null value in column "name" violates not-null constraint
ROLLBACK TO SAVEPOINT s1;
COMMIT;
-- Don't recover from errors
BEGIN;
SAVEPOINT s1;
SAVEPOINT s2;
INSERT INTO artists VALUES (7, NULL);
ERROR: null value in column "name" violates not-null constraint
SAVEPOINT s3;
ERROR: current transaction is aborted, commands ignored until end of transaction block
ROLLBACK TO SAVEPOINT s3;
ERROR: no such savepoint
COMMIT;
-- ===================================================================
-- Tests for replication factor > 1
-- ===================================================================
CREATE TABLE researchers (
id bigint NOT NULL,
lab_id int NOT NULL,
name text NOT NULL
);
SELECT master_create_distributed_table('researchers', 'lab_id', 'hash');
master_create_distributed_table
---------------------------------
(1 row)
SELECT master_create_worker_shards('researchers', 2, 2);
master_create_worker_shards
-----------------------------
(1 row)
-- Basic rollback and release
BEGIN;
INSERT INTO researchers VALUES (7, 4, 'Jan Plaza');
SAVEPOINT s1;
INSERT INTO researchers VALUES (8, 4, 'Alonzo Church');
ROLLBACK TO s1;
RELEASE SAVEPOINT s1;
COMMIT;
SELECT * FROM researchers WHERE id in (7, 8);
id | lab_id | name
----+--------+-----------
7 | 4 | Jan Plaza
(1 row)
-- Recover from failure on one of nodes
BEGIN;
SAVEPOINT s1;
INSERT INTO researchers VALUES (11, 11, 'Dana Scott');
INSERT INTO researchers VALUES (NULL, 10, 'Stephen Kleene');
ERROR: null value in column "id" violates not-null constraint
ROLLBACK TO SAVEPOINT s1;
INSERT INTO researchers VALUES (12, 10, 'Stephen Kleene');
COMMIT;
SELECT * FROM researchers WHERE lab_id=10;
id | lab_id | name
----+--------+----------------
12 | 10 | Stephen Kleene
(1 row)
-- Don't recover, but rollback
BEGIN;
SAVEPOINT s1;
INSERT INTO researchers VALUES (NULL, 10, 'Raymond Smullyan');
ERROR: null value in column "id" violates not-null constraint
RELEASE SAVEPOINT s1;
ERROR: current transaction is aborted, commands ignored until end of transaction block
SAVEPOINT s2;
ERROR: current transaction is aborted, commands ignored until end of transaction block
ROLLBACK;
SELECT * FROM researchers WHERE lab_id=10;
id | lab_id | name
----+--------+----------------
12 | 10 | Stephen Kleene
(1 row)
-- Don't recover, and commit
BEGIN;
SAVEPOINT s1;
INSERT INTO researchers VALUES (NULL, 10, 'Raymond Smullyan');
ERROR: null value in column "id" violates not-null constraint
RELEASE SAVEPOINT s1;
ERROR: current transaction is aborted, commands ignored until end of transaction block
SAVEPOINT s2;
ERROR: current transaction is aborted, commands ignored until end of transaction block
COMMIT;
SELECT * FROM researchers WHERE lab_id=10;
id | lab_id | name
----+--------+----------------
12 | 10 | Stephen Kleene
(1 row)
-- Implicit savepoints via pl/pgsql exceptions
BEGIN;
DO $$
BEGIN
INSERT INTO researchers VALUES (15, 10, 'Melvin Fitting');
INSERT INTO researchers VALUES (NULL, 10, 'Raymond Smullyan');
EXCEPTION
WHEN not_null_violation THEN
RAISE NOTICE 'caught not_null_violation';
END $$;
NOTICE: caught not_null_violation
COMMIT;
SELECT * FROM researchers WHERE lab_id=10;
id | lab_id | name
----+--------+----------------
12 | 10 | Stephen Kleene
(1 row)
BEGIN;
DO $$
BEGIN
INSERT INTO researchers VALUES (15, 10, 'Melvin Fitting');
RAISE EXCEPTION plpgsql_error;
EXCEPTION
WHEN plpgsql_error THEN
RAISE NOTICE 'caught manual plpgsql_error';
END $$;
NOTICE: caught manual plpgsql_error
COMMIT;
SELECT * FROM researchers WHERE lab_id=10;
id | lab_id | name
----+--------+----------------
12 | 10 | Stephen Kleene
(1 row)
BEGIN;
DO $$
BEGIN
INSERT INTO researchers VALUES (15, 10, 'Melvin Fitting');
INSERT INTO researchers VALUES (NULL, 10, 'Raymond Smullyan');
EXCEPTION
WHEN not_null_violation THEN
RAISE EXCEPTION not_null_violation; -- rethrow it
END $$;
ERROR: not_null_violation
COMMIT;
SELECT * FROM researchers WHERE lab_id=10;
id | lab_id | name
----+--------+----------------
12 | 10 | Stephen Kleene
(1 row)
-- Insert something after catching error.
BEGIN;
DO $$
BEGIN
INSERT INTO researchers VALUES (15, 10, 'Melvin Fitting');
INSERT INTO researchers VALUES (NULL, 10, 'Raymond Smullyan');
EXCEPTION
WHEN not_null_violation THEN
INSERT INTO researchers VALUES (32, 10, 'Raymond Smullyan');
END $$;
COMMIT;
SELECT * FROM researchers WHERE lab_id=10;
id | lab_id | name
----+--------+------------------
12 | 10 | Stephen Kleene
32 | 10 | Raymond Smullyan
(2 rows)
-- Clean-up
DROP TABLE artists;
DROP TABLE researchers;

View File

@ -214,7 +214,7 @@ SAVEPOINT my_savepoint;
CREATE INDEX temp_index_3 ON lineitem_alter(l_partkey);
ROLLBACK;
-- but that actually rolling back to it is not
-- and also rolling back to it is also allowed
BEGIN;
CREATE INDEX temp_index_2 ON lineitem_alter(l_orderkey);
SAVEPOINT my_savepoint;
@ -223,6 +223,7 @@ ROLLBACK TO my_savepoint;
COMMIT;
SELECT indexname, tablename FROM pg_indexes WHERE tablename = 'lineitem_alter';
DROP INDEX temp_index_2;
-- Add column on only one worker...
\c - - - :worker_2_port

View File

@ -159,6 +159,7 @@ test: multi_repartition_udt
test: multi_repartitioned_subquery_udf
test: multi_modifying_xacts
test: multi_transaction_recovery
test: multi_subtransactions
# ---------
# multi_copy creates hash and range-partitioned tables and performs COPY

View File

@ -505,19 +505,20 @@ CREATE INDEX temp_index_2 ON lineitem_alter(l_orderkey);
SAVEPOINT my_savepoint;
CREATE INDEX temp_index_3 ON lineitem_alter(l_partkey);
ROLLBACK;
-- but that actually rolling back to it is not
-- and also rolling back to it is also allowed
BEGIN;
CREATE INDEX temp_index_2 ON lineitem_alter(l_orderkey);
SAVEPOINT my_savepoint;
CREATE INDEX temp_index_3 ON lineitem_alter(l_partkey);
ROLLBACK TO my_savepoint;
COMMIT;
ERROR: cannot ROLLBACK TO SAVEPOINT in transactions which modify distributed tables
SELECT indexname, tablename FROM pg_indexes WHERE tablename = 'lineitem_alter';
indexname | tablename
-----------+-----------
(0 rows)
indexname | tablename
--------------+----------------
temp_index_2 | lineitem_alter
(1 row)
DROP INDEX temp_index_2;
-- Add column on only one worker...
\c - - - :worker_2_port
ALTER TABLE lineitem_alter_220000 ADD COLUMN first integer;

View File

@ -82,7 +82,7 @@ EXCEPTION
END $$;
COMMIT;
-- but rollback should not
-- rollback should also work
BEGIN;
INSERT INTO researchers VALUES (7, 4, 'Jim Gray');
SAVEPOINT hire_engelbart;
@ -1108,4 +1108,4 @@ SELECT * FROM users JOIN usergroups ON (user_group = gid) WHERE id = 2;
SELECT * FROM users JOIN usergroups ON (user_group = gid) WHERE id = 4;
END;
DROP TABLE items, users, itemgroups, usergroups;
DROP TABLE items, users, itemgroups, usergroups, researchers, labs;

View File

@ -88,7 +88,7 @@ EXCEPTION
END $$;
COMMIT;
-- but rollback should not
-- rollback should also work
BEGIN;
INSERT INTO researchers_mx VALUES (7, 4, 'Jim Gray');
SAVEPOINT hire_engelbart;

View File

@ -0,0 +1,215 @@
CREATE TABLE artists (
id bigint NOT NULL,
name text NOT NULL
);
SELECT create_distributed_table('artists', 'id');
-- add some data
INSERT INTO artists VALUES (1, 'Pablo Picasso');
INSERT INTO artists VALUES (2, 'Vincent van Gogh');
INSERT INTO artists VALUES (3, 'Claude Monet');
INSERT INTO artists VALUES (4, 'William Kurelek');
-- RELEASE SAVEPOINT
BEGIN;
INSERT INTO artists VALUES (5, 'Asher Lev');
SAVEPOINT s1;
DELETE FROM artists WHERE id=5;
RELEASE SAVEPOINT s1;
COMMIT;
SELECT * FROM artists WHERE id=5;
-- ROLLBACK TO SAVEPOINT
BEGIN;
INSERT INTO artists VALUES (5, 'Asher Lev');
SAVEPOINT s1;
DELETE FROM artists WHERE id=5;
ROLLBACK TO SAVEPOINT s1;
COMMIT;
SELECT * FROM artists WHERE id=5;
-- Serial sub-transaction releases
BEGIN;
SAVEPOINT s1;
DELETE FROM artists WHERE id=5;
RELEASE SAVEPOINT s1;
SAVEPOINT s2;
INSERT INTO artists VALUES (5, 'Jacob Kahn');
RELEASE SAVEPOINT s2;
COMMIT;
SELECT * FROM artists WHERE id=5;
-- Serial sub-transaction rollbacks
BEGIN;
SAVEPOINT s1;
UPDATE artists SET name='A' WHERE id=5;
ROLLBACK TO SAVEPOINT s1;
SAVEPOINT s2;
DELETE FROM artists WHERE id=5;
ROLLBACK TO SAVEPOINT s2;
COMMIT;
SELECT * FROM artists WHERE id=5;
-- Multiple sub-transaction activity before first query
BEGIN;
SAVEPOINT s0;
SAVEPOINT s1;
SAVEPOINT s2;
SAVEPOINT s3;
ROLLBACK TO SAVEPOINT s2;
RELEASE SAVEPOINT s1;
INSERT INTO artists VALUES (6, 'John J. Audubon');
ROLLBACK TO SAVEPOINT s0;
INSERT INTO artists VALUES (6, 'Emily Carr');
COMMIT;
SELECT * FROM artists WHERE id=6;
-- Release after rollback
BEGIN;
SAVEPOINT s1;
ROLLBACK TO s1;
RELEASE SAVEPOINT s1;
SAVEPOINT s2;
INSERT INTO artists VALUES (7, 'John J. Audubon');
ROLLBACK TO s2;
RELEASE SAVEPOINT s2;
COMMIT;
SELECT * FROM artists WHERE id=7;
-- Recover from errors
\set VERBOSITY terse
BEGIN;
SAVEPOINT s1;
SAVEPOINT s2;
INSERT INTO artists VALUES (7, NULL);
ROLLBACK TO SAVEPOINT s1;
COMMIT;
-- Don't recover from errors
BEGIN;
SAVEPOINT s1;
SAVEPOINT s2;
INSERT INTO artists VALUES (7, NULL);
SAVEPOINT s3;
ROLLBACK TO SAVEPOINT s3;
COMMIT;
-- ===================================================================
-- Tests for replication factor > 1
-- ===================================================================
CREATE TABLE researchers (
id bigint NOT NULL,
lab_id int NOT NULL,
name text NOT NULL
);
SELECT master_create_distributed_table('researchers', 'lab_id', 'hash');
SELECT master_create_worker_shards('researchers', 2, 2);
-- Basic rollback and release
BEGIN;
INSERT INTO researchers VALUES (7, 4, 'Jan Plaza');
SAVEPOINT s1;
INSERT INTO researchers VALUES (8, 4, 'Alonzo Church');
ROLLBACK TO s1;
RELEASE SAVEPOINT s1;
COMMIT;
SELECT * FROM researchers WHERE id in (7, 8);
-- Recover from failure on one of nodes
BEGIN;
SAVEPOINT s1;
INSERT INTO researchers VALUES (11, 11, 'Dana Scott');
INSERT INTO researchers VALUES (NULL, 10, 'Stephen Kleene');
ROLLBACK TO SAVEPOINT s1;
INSERT INTO researchers VALUES (12, 10, 'Stephen Kleene');
COMMIT;
SELECT * FROM researchers WHERE lab_id=10;
-- Don't recover, but rollback
BEGIN;
SAVEPOINT s1;
INSERT INTO researchers VALUES (NULL, 10, 'Raymond Smullyan');
RELEASE SAVEPOINT s1;
SAVEPOINT s2;
ROLLBACK;
SELECT * FROM researchers WHERE lab_id=10;
-- Don't recover, and commit
BEGIN;
SAVEPOINT s1;
INSERT INTO researchers VALUES (NULL, 10, 'Raymond Smullyan');
RELEASE SAVEPOINT s1;
SAVEPOINT s2;
COMMIT;
SELECT * FROM researchers WHERE lab_id=10;
-- Implicit savepoints via pl/pgsql exceptions
BEGIN;
DO $$
BEGIN
INSERT INTO researchers VALUES (15, 10, 'Melvin Fitting');
INSERT INTO researchers VALUES (NULL, 10, 'Raymond Smullyan');
EXCEPTION
WHEN not_null_violation THEN
RAISE NOTICE 'caught not_null_violation';
END $$;
COMMIT;
SELECT * FROM researchers WHERE lab_id=10;
BEGIN;
DO $$
BEGIN
INSERT INTO researchers VALUES (15, 10, 'Melvin Fitting');
RAISE EXCEPTION plpgsql_error;
EXCEPTION
WHEN plpgsql_error THEN
RAISE NOTICE 'caught manual plpgsql_error';
END $$;
COMMIT;
SELECT * FROM researchers WHERE lab_id=10;
BEGIN;
DO $$
BEGIN
INSERT INTO researchers VALUES (15, 10, 'Melvin Fitting');
INSERT INTO researchers VALUES (NULL, 10, 'Raymond Smullyan');
EXCEPTION
WHEN not_null_violation THEN
RAISE EXCEPTION not_null_violation; -- rethrow it
END $$;
COMMIT;
SELECT * FROM researchers WHERE lab_id=10;
-- Insert something after catching error.
BEGIN;
DO $$
BEGIN
INSERT INTO researchers VALUES (15, 10, 'Melvin Fitting');
INSERT INTO researchers VALUES (NULL, 10, 'Raymond Smullyan');
EXCEPTION
WHEN not_null_violation THEN
INSERT INTO researchers VALUES (32, 10, 'Raymond Smullyan');
END $$;
COMMIT;
SELECT * FROM researchers WHERE lab_id=10;
-- Clean-up
DROP TABLE artists;
DROP TABLE researchers;