Fix savepoint rollback after multi-shard update failure.

pull/2673/head
Hadi Moshayedi 2019-04-17 10:56:40 -07:00
parent 5b98f26984
commit b69a762e0b
11 changed files with 123 additions and 29 deletions

View File

@ -464,7 +464,7 @@ ShutdownConnection(MultiConnection *connection)
if (PQstatus(connection->pgConn) == CONNECTION_OK && if (PQstatus(connection->pgConn) == CONNECTION_OK &&
PQtransactionStatus(connection->pgConn) == PQTRANS_ACTIVE) PQtransactionStatus(connection->pgConn) == PQTRANS_ACTIVE)
{ {
char errorMessage[256] = { 0 }; char errorMessage[ERROR_BUFFER_SIZE] = { 0 };
PGcancel *cancel = PQgetCancel(connection->pgConn); PGcancel *cancel = PQgetCancel(connection->pgConn);
if (!PQcancel(cancel, errorMessage, sizeof(errorMessage))) if (!PQcancel(cancel, errorMessage, sizeof(errorMessage)))

View File

@ -1008,3 +1008,26 @@ BuildWaitEventSet(MultiConnection **allConnections, int totalConnectionCount,
return waitEventSet; return waitEventSet;
} }
/*
* MultiClientCancel sends a cancelation request on the given connection. Return
* value indicates whether the cancelation request was sent successfully.
*/
bool
SendCancelationRequest(MultiConnection *connection)
{
char errorBuffer[ERROR_BUFFER_SIZE] = { 0 };
PGcancel *cancelObject = PQgetCancel(connection->pgConn);
bool cancelSent = PQcancel(cancelObject, errorBuffer, sizeof(errorBuffer));
if (!cancelSent)
{
ereport(WARNING, (errmsg("could not issue cancel request"),
errdetail("Client error: %s", errorBuffer)));
}
PQfreeCancel(cancelObject);
return cancelSent;
}

View File

@ -443,27 +443,13 @@ bool
MultiClientCancel(int32 connectionId) MultiClientCancel(int32 connectionId)
{ {
MultiConnection *connection = NULL; MultiConnection *connection = NULL;
PGcancel *cancelObject = NULL;
int cancelSent = 0;
bool canceled = true; bool canceled = true;
char errorBuffer[STRING_BUFFER_SIZE];
Assert(connectionId != INVALID_CONNECTION_ID); Assert(connectionId != INVALID_CONNECTION_ID);
connection = ClientConnectionArray[connectionId]; connection = ClientConnectionArray[connectionId];
Assert(connection != NULL); Assert(connection != NULL);
cancelObject = PQgetCancel(connection->pgConn); canceled = SendCancelationRequest(connection);
cancelSent = PQcancel(cancelObject, errorBuffer, sizeof(errorBuffer));
if (cancelSent == 0)
{
ereport(WARNING, (errmsg("could not issue cancel request"),
errdetail("Client error: %s", errorBuffer)));
canceled = false;
}
PQfreeCancel(cancelObject);
return canceled; return canceled;
} }

View File

@ -1574,6 +1574,7 @@ ExecuteModifyTasks(List *taskList, bool expectResults, ParamListInfo paramListIn
queryOK = SendQueryInSingleRowMode(connection, queryString, paramListInfo); queryOK = SendQueryInSingleRowMode(connection, queryString, paramListInfo);
if (!queryOK) if (!queryOK)
{ {
UnclaimAllShardConnections(shardConnectionHash);
ReportConnectionError(connection, ERROR); ReportConnectionError(connection, ERROR);
} }
} }
@ -1616,28 +1617,38 @@ ExecuteModifyTasks(List *taskList, bool expectResults, ParamListInfo paramListIn
SetCitusNoticeLevel(INFO); SetCitusNoticeLevel(INFO);
} }
/* PG_TRY();
* If caller is interested, store query results the first time
* through. The output of the query's execution on other shards is
* discarded if we run there (because it's a modification query).
*/
if (placementIndex == 0 && expectResults)
{ {
Assert(scanState != NULL); /*
* If caller is interested, store query results the first time
* through. The output of the query's execution on other shards is
* discarded if we run there (because it's a modification query).
*/
if (placementIndex == 0 && expectResults)
{
Assert(scanState != NULL);
queryOK = StoreQueryResult(scanState, connection, queryOK = StoreQueryResult(scanState, connection,
alwaysThrowErrorOnFailure, alwaysThrowErrorOnFailure,
&currentAffectedTupleCount, NULL); &currentAffectedTupleCount, NULL);
}
else
{
queryOK = ConsumeQueryResult(connection, alwaysThrowErrorOnFailure,
&currentAffectedTupleCount);
}
} }
else PG_CATCH();
{ {
queryOK = ConsumeQueryResult(connection, alwaysThrowErrorOnFailure, UnclaimAllShardConnections(shardConnectionHash);
&currentAffectedTupleCount); PG_RE_THROW();
} }
PG_END_TRY();
/* We error out if the worker fails to return a result for the query. */ /* We error out if the worker fails to return a result for the query. */
if (!queryOK) if (!queryOK)
{ {
UnclaimAllShardConnections(shardConnectionHash);
ReportConnectionError(connection, ERROR); ReportConnectionError(connection, ERROR);
} }

View File

@ -1068,6 +1068,12 @@ CoordinatedRemoteTransactionsSavepointRollback(SubTransactionId subId)
MultiConnection *connection = dlist_container(MultiConnection, transactionNode, MultiConnection *connection = dlist_container(MultiConnection, transactionNode,
iter.cur); iter.cur);
RemoteTransaction *transaction = &connection->remoteTransaction; RemoteTransaction *transaction = &connection->remoteTransaction;
/* cancel any ongoing queries before issuing rollback */
ClearResultsIfReady(connection);
SendCancelationRequest(connection);
ForgetResults(connection);
if (transaction->transactionFailed) if (transaction->transactionFailed)
{ {
if (transaction->lastSuccessfulSubXact <= subId) if (transaction->lastSuccessfulSubXact <= subId)

View File

@ -21,6 +21,9 @@
/* maximum (textual) lengths of hostname and port */ /* maximum (textual) lengths of hostname and port */
#define MAX_NODE_LENGTH 255 /* includes 0 byte */ #define MAX_NODE_LENGTH 255 /* includes 0 byte */
/* used for libpq commands that get an error buffer. Postgres docs recommend 256. */
#define ERROR_BUFFER_SIZE 256
/* default notice level */ /* default notice level */
#define DEFAULT_CITUS_NOTICE_LEVEL DEBUG1 #define DEFAULT_CITUS_NOTICE_LEVEL DEBUG1

View File

@ -57,5 +57,6 @@ extern bool PutRemoteCopyEnd(MultiConnection *connection, const char *errormsg);
/* waiting for multiple command results */ /* waiting for multiple command results */
extern void WaitForAllConnections(List *connectionList, bool raiseInterrupts); extern void WaitForAllConnections(List *connectionList, bool raiseInterrupts);
extern bool SendCancelationRequest(MultiConnection *connection);
#endif /* REMOTE_COMMAND_H */ #endif /* REMOTE_COMMAND_H */

View File

@ -38,6 +38,10 @@ CONTEXT: while executing command on localhost:9060
WARNING: connection not open WARNING: connection not open
CONTEXT: while executing command on localhost:9060 CONTEXT: while executing command on localhost:9060
DELETE FROM artists WHERE id=4; DELETE FROM artists WHERE id=4;
WARNING: could not issue cancel request
DETAIL: Client error: PQcancel() -- no cancel object supplied
WARNING: connection not open
CONTEXT: while executing command on localhost:9060
WARNING: connection not open WARNING: connection not open
CONTEXT: while executing command on localhost:9060 CONTEXT: while executing command on localhost:9060
WARNING: connection error: localhost:9060 WARNING: connection error: localhost:9060
@ -195,6 +199,8 @@ INSERT INTO artists VALUES (7, 'Emily Carr');
ROLLBACK TO SAVEPOINT s1; ROLLBACK TO SAVEPOINT s1;
WARNING: connection not open WARNING: connection not open
WARNING: connection not open WARNING: connection not open
WARNING: could not issue cancel request
WARNING: connection not open
COMMIT; COMMIT;
ERROR: could not make changes to shard 100950 on any node ERROR: could not make changes to shard 100950 on any node
SELECT * FROM artists WHERE id=6; SELECT * FROM artists WHERE id=6;
@ -230,6 +236,8 @@ WARNING: connection not open
WARNING: connection not open WARNING: connection not open
INSERT INTO researchers VALUES (8, 4, 'Alonzo Church'); INSERT INTO researchers VALUES (8, 4, 'Alonzo Church');
ROLLBACK TO s1; ROLLBACK TO s1;
WARNING: could not issue cancel request
WARNING: connection not open
WARNING: connection not open WARNING: connection not open
WARNING: connection error: localhost:9060 WARNING: connection error: localhost:9060
WARNING: connection not open WARNING: connection not open

View File

@ -129,6 +129,28 @@ ERROR: current transaction is aborted, commands ignored until end of transactio
ROLLBACK TO SAVEPOINT s3; ROLLBACK TO SAVEPOINT s3;
ERROR: savepoint "s3" does not exist ERROR: savepoint "s3" does not exist
COMMIT; COMMIT;
-- Recover from multi-shard modify errors
BEGIN;
INSERT INTO artists VALUES (8, 'Uncle Yaakov');
SAVEPOINT s1;
UPDATE artists SET name = NULL;
ERROR: null value in column "name" violates not-null constraint
ROLLBACK TO s1;
INSERT INTO artists VALUES (9, 'Anna Schaeffer');
COMMIT;
SELECT * FROM artists ORDER BY id;
id | name
----+------------------
1 | Pablo Picasso
2 | Vincent van Gogh
3 | Claude Monet
4 | William Kurelek
5 | Jacob Kahn
6 | Emily Carr
8 | Uncle Yaakov
9 | Anna Schaeffer
(8 rows)
-- =================================================================== -- ===================================================================
-- Tests for replication factor > 1 -- Tests for replication factor > 1
-- =================================================================== -- ===================================================================

View File

@ -129,6 +129,28 @@ ERROR: current transaction is aborted, commands ignored until end of transactio
ROLLBACK TO SAVEPOINT s3; ROLLBACK TO SAVEPOINT s3;
ERROR: no such savepoint ERROR: no such savepoint
COMMIT; COMMIT;
-- Recover from multi-shard modify errors
BEGIN;
INSERT INTO artists VALUES (8, 'Uncle Yaakov');
SAVEPOINT s1;
UPDATE artists SET name = NULL;
ERROR: null value in column "name" violates not-null constraint
ROLLBACK TO s1;
INSERT INTO artists VALUES (9, 'Anna Schaeffer');
COMMIT;
SELECT * FROM artists ORDER BY id;
id | name
----+------------------
1 | Pablo Picasso
2 | Vincent van Gogh
3 | Claude Monet
4 | William Kurelek
5 | Jacob Kahn
6 | Emily Carr
8 | Uncle Yaakov
9 | Anna Schaeffer
(8 rows)
-- =================================================================== -- ===================================================================
-- Tests for replication factor > 1 -- Tests for replication factor > 1
-- =================================================================== -- ===================================================================

View File

@ -105,6 +105,17 @@ SAVEPOINT s3;
ROLLBACK TO SAVEPOINT s3; ROLLBACK TO SAVEPOINT s3;
COMMIT; COMMIT;
-- Recover from multi-shard modify errors
BEGIN;
INSERT INTO artists VALUES (8, 'Uncle Yaakov');
SAVEPOINT s1;
UPDATE artists SET name = NULL;
ROLLBACK TO s1;
INSERT INTO artists VALUES (9, 'Anna Schaeffer');
COMMIT;
SELECT * FROM artists ORDER BY id;
-- =================================================================== -- ===================================================================
-- Tests for replication factor > 1 -- Tests for replication factor > 1
-- =================================================================== -- ===================================================================
@ -216,3 +227,4 @@ SELECT * FROM researchers WHERE lab_id=10;
-- Clean-up -- Clean-up
DROP TABLE artists; DROP TABLE artists;
DROP TABLE researchers; DROP TABLE researchers;