Merge pull request #2673 from citusdata/fix_multishard_transactions

Fix savepoint rollback after multi-shard modify/copy failure.
pull/2695/head
Hadi Moshayedi 2019-05-01 08:38:25 -08:00 committed by GitHub
commit 5205bc4be9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 522 additions and 42 deletions

View File

@ -154,6 +154,8 @@ static void CopySendInt32(CopyOutState outputState, int32 val);
static void CopySendInt16(CopyOutState outputState, int16 val);
static void CopyAttributeOutText(CopyOutState outputState, char *string);
static inline void CopyFlushOutput(CopyOutState outputState, char *start, char *pointer);
static bool CitusSendTupleToPlacements(TupleTableSlot *slot,
CitusCopyDestReceiver *copyDest);
/* CitusCopyDestReceiver functions */
static void CitusCopyDestReceiverStartup(DestReceiver *copyDest, int operation,
@ -2251,8 +2253,37 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation,
static bool
CitusCopyDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest)
{
bool result = false;
CitusCopyDestReceiver *copyDest = (CitusCopyDestReceiver *) dest;
PG_TRY();
{
result = CitusSendTupleToPlacements(slot, copyDest);
}
PG_CATCH();
{
/*
* We might be able to recover from errors with ROLLBACK TO SAVEPOINT,
* so unclaim the connections before throwing errors.
*/
HTAB *shardConnectionHash = copyDest->shardConnectionHash;
UnclaimAllShardConnections(shardConnectionHash);
PG_RE_THROW();
}
PG_END_TRY();
return result;
}
/*
* CitusSendTupleToPlacements sends the given TupleTableSlot to the appropriate
* shard placement(s).
*/
static bool
CitusSendTupleToPlacements(TupleTableSlot *slot, CitusCopyDestReceiver *copyDest)
{
int partitionColumnIndex = copyDest->partitionColumnIndex;
TupleDesc tupleDescriptor = copyDest->tupleDescriptor;
CopyStmt *copyStatement = copyDest->copyStatement;
@ -2409,21 +2440,37 @@ CitusCopyDestReceiverShutdown(DestReceiver *destReceiver)
Relation distributedRelation = copyDest->distributedRelation;
shardConnectionsList = ShardConnectionList(shardConnectionHash);
foreach(shardConnectionsCell, shardConnectionsList)
PG_TRY();
{
ShardConnections *shardConnections = (ShardConnections *) lfirst(
shardConnectionsCell);
/* send copy binary footers to all shard placements */
if (copyOutState->binary)
foreach(shardConnectionsCell, shardConnectionsList)
{
SendCopyBinaryFooters(copyOutState, shardConnections->shardId,
shardConnections->connectionList);
}
ShardConnections *shardConnections = (ShardConnections *) lfirst(
shardConnectionsCell);
/* close the COPY input on all shard placements */
EndRemoteCopy(shardConnections->shardId, shardConnections->connectionList, true);
/* send copy binary footers to all shard placements */
if (copyOutState->binary)
{
SendCopyBinaryFooters(copyOutState, shardConnections->shardId,
shardConnections->connectionList);
}
/* close the COPY input on all shard placements */
EndRemoteCopy(shardConnections->shardId, shardConnections->connectionList,
true);
}
}
PG_CATCH();
{
/*
* We might be able to recover from errors with ROLLBACK TO SAVEPOINT,
* so unclaim the connections before throwing errors.
*/
UnclaimAllShardConnections(shardConnectionHash);
PG_RE_THROW();
}
PG_END_TRY();
heap_close(distributedRelation, NoLock);
}

View File

@ -464,7 +464,7 @@ ShutdownConnection(MultiConnection *connection)
if (PQstatus(connection->pgConn) == CONNECTION_OK &&
PQtransactionStatus(connection->pgConn) == PQTRANS_ACTIVE)
{
char errorMessage[256] = { 0 };
char errorMessage[ERROR_BUFFER_SIZE] = { 0 };
PGcancel *cancel = PQgetCancel(connection->pgConn);
if (!PQcancel(cancel, errorMessage, sizeof(errorMessage)))

View File

@ -29,6 +29,8 @@
bool LogRemoteCommands = false;
static bool ClearResultsInternal(MultiConnection *connection, bool raiseErrors,
bool discardWarnings);
static bool FinishConnectionIO(MultiConnection *connection, bool raiseInterrupts);
static WaitEventSet * BuildWaitEventSet(MultiConnection **allConnections,
int totalConnectionCount,
@ -71,7 +73,7 @@ ForgetResults(MultiConnection *connection)
/*
* ClearResults clears a connection from pending activity,
* ClearResultsInternal clears a connection from pending activity,
* returns true if all pending commands return success. It raises
* error if raiseErrors flag is set, any command fails and transaction
* is marked critical.
@ -81,6 +83,27 @@ ForgetResults(MultiConnection *connection)
*/
bool
ClearResults(MultiConnection *connection, bool raiseErrors)
{
return ClearResultsInternal(connection, raiseErrors, false);
}
/*
* ClearResultsDiscardWarnings does the same thing as ClearResults, but doesn't
* emit warnings.
*/
bool
ClearResultsDiscardWarnings(MultiConnection *connection, bool raiseErrors)
{
return ClearResultsInternal(connection, raiseErrors, true);
}
/*
* ClearResultsInternal is used by ClearResults and ClearResultsDiscardWarnings.
*/
static bool
ClearResultsInternal(MultiConnection *connection, bool raiseErrors, bool discardWarnings)
{
bool success = true;
@ -103,7 +126,11 @@ ClearResults(MultiConnection *connection, bool raiseErrors)
if (!IsResponseOK(result))
{
ReportResultError(connection, result, WARNING);
if (!discardWarnings)
{
ReportResultError(connection, result, WARNING);
}
MarkRemoteTransactionFailed(connection, raiseErrors);
success = false;
@ -1008,3 +1035,26 @@ BuildWaitEventSet(MultiConnection **allConnections, int totalConnectionCount,
return waitEventSet;
}
/*
* SendCancelationRequest sends a cancelation request on the given connection.
* Return value indicates whether the cancelation request was sent successfully.
*/
bool
SendCancelationRequest(MultiConnection *connection)
{
char errorBuffer[ERROR_BUFFER_SIZE] = { 0 };
PGcancel *cancelObject = PQgetCancel(connection->pgConn);
bool cancelSent = PQcancel(cancelObject, errorBuffer, sizeof(errorBuffer));
if (!cancelSent)
{
ereport(WARNING, (errmsg("could not issue cancel request"),
errdetail("Client error: %s", errorBuffer)));
}
PQfreeCancel(cancelObject);
return cancelSent;
}

View File

@ -443,27 +443,13 @@ bool
MultiClientCancel(int32 connectionId)
{
MultiConnection *connection = NULL;
PGcancel *cancelObject = NULL;
int cancelSent = 0;
bool canceled = true;
char errorBuffer[STRING_BUFFER_SIZE];
Assert(connectionId != INVALID_CONNECTION_ID);
connection = ClientConnectionArray[connectionId];
Assert(connection != NULL);
cancelObject = PQgetCancel(connection->pgConn);
cancelSent = PQcancel(cancelObject, errorBuffer, sizeof(errorBuffer));
if (cancelSent == 0)
{
ereport(WARNING, (errmsg("could not issue cancel request"),
errdetail("Client error: %s", errorBuffer)));
canceled = false;
}
PQfreeCancel(cancelObject);
canceled = SendCancelationRequest(connection);
return canceled;
}

View File

@ -1574,6 +1574,7 @@ ExecuteModifyTasks(List *taskList, bool expectResults, ParamListInfo paramListIn
queryOK = SendQueryInSingleRowMode(connection, queryString, paramListInfo);
if (!queryOK)
{
UnclaimAllShardConnections(shardConnectionHash);
ReportConnectionError(connection, ERROR);
}
}
@ -1616,28 +1617,42 @@ ExecuteModifyTasks(List *taskList, bool expectResults, ParamListInfo paramListIn
SetCitusNoticeLevel(INFO);
}
/*
* If caller is interested, store query results the first time
* through. The output of the query's execution on other shards is
* discarded if we run there (because it's a modification query).
*/
if (placementIndex == 0 && expectResults)
PG_TRY();
{
Assert(scanState != NULL);
/*
* If caller is interested, store query results the first time
* through. The output of the query's execution on other shards is
* discarded if we run there (because it's a modification query).
*/
if (placementIndex == 0 && expectResults)
{
Assert(scanState != NULL);
queryOK = StoreQueryResult(scanState, connection,
alwaysThrowErrorOnFailure,
&currentAffectedTupleCount, NULL);
queryOK = StoreQueryResult(scanState, connection,
alwaysThrowErrorOnFailure,
&currentAffectedTupleCount, NULL);
}
else
{
queryOK = ConsumeQueryResult(connection, alwaysThrowErrorOnFailure,
&currentAffectedTupleCount);
}
}
else
PG_CATCH();
{
queryOK = ConsumeQueryResult(connection, alwaysThrowErrorOnFailure,
&currentAffectedTupleCount);
/*
* We might be able to recover from errors with ROLLBACK TO SAVEPOINT,
* so unclaim the connections before throwing errors.
*/
UnclaimAllShardConnections(shardConnectionHash);
PG_RE_THROW();
}
PG_END_TRY();
/* We error out if the worker fails to return a result for the query. */
if (!queryOK)
{
UnclaimAllShardConnections(shardConnectionHash);
ReportConnectionError(connection, ERROR);
}

View File

@ -1068,6 +1068,13 @@ CoordinatedRemoteTransactionsSavepointRollback(SubTransactionId subId)
MultiConnection *connection = dlist_container(MultiConnection, transactionNode,
iter.cur);
RemoteTransaction *transaction = &connection->remoteTransaction;
/* cancel any ongoing queries before issuing rollback */
SendCancelationRequest(connection);
/* clear results, but don't show cancelation warning messages from workers. */
ClearResultsDiscardWarnings(connection, raiseInterrupts);
if (transaction->transactionFailed)
{
if (transaction->lastSuccessfulSubXact <= subId)

View File

@ -21,6 +21,9 @@
/* maximum (textual) lengths of hostname and port */
#define MAX_NODE_LENGTH 255 /* includes 0 byte */
/* used for libpq commands that get an error buffer. Postgres docs recommend 256. */
#define ERROR_BUFFER_SIZE 256
/* default notice level */
#define DEFAULT_CITUS_NOTICE_LEVEL DEBUG1

View File

@ -27,6 +27,7 @@ extern bool LogRemoteCommands;
extern bool IsResponseOK(struct pg_result *result);
extern void ForgetResults(MultiConnection *connection);
extern bool ClearResults(MultiConnection *connection, bool raiseErrors);
extern bool ClearResultsDiscardWarnings(MultiConnection *connection, bool raiseErrors);
extern bool ClearResultsIfReady(MultiConnection *connection);
extern bool SqlStateMatchesCategory(char *sqlStateString, int category);
@ -57,5 +58,6 @@ extern bool PutRemoteCopyEnd(MultiConnection *connection, const char *errormsg);
/* waiting for multiple command results */
extern void WaitForAllConnections(List *connectionList, bool raiseInterrupts);
extern bool SendCancelationRequest(MultiConnection *connection);
#endif /* REMOTE_COMMAND_H */

View File

@ -38,6 +38,8 @@ CONTEXT: while executing command on localhost:9060
WARNING: connection not open
CONTEXT: while executing command on localhost:9060
DELETE FROM artists WHERE id=4;
WARNING: could not issue cancel request
DETAIL: Client error: PQcancel() -- no cancel object supplied
WARNING: connection not open
CONTEXT: while executing command on localhost:9060
WARNING: connection error: localhost:9060
@ -195,6 +197,7 @@ INSERT INTO artists VALUES (7, 'Emily Carr');
ROLLBACK TO SAVEPOINT s1;
WARNING: connection not open
WARNING: connection not open
WARNING: could not issue cancel request
COMMIT;
ERROR: could not make changes to shard 100950 on any node
SELECT * FROM artists WHERE id=6;
@ -230,6 +233,7 @@ WARNING: connection not open
WARNING: connection not open
INSERT INTO researchers VALUES (8, 4, 'Alonzo Church');
ROLLBACK TO s1;
WARNING: could not issue cancel request
WARNING: connection not open
WARNING: connection error: localhost:9060
WARNING: connection not open

View File

@ -129,6 +129,140 @@ ERROR: current transaction is aborted, commands ignored until end of transactio
ROLLBACK TO SAVEPOINT s3;
ERROR: savepoint "s3" does not exist
COMMIT;
-- Recover from multi-shard modify errors
BEGIN;
INSERT INTO artists VALUES (8, 'Sogand');
SAVEPOINT s1;
UPDATE artists SET name = NULL;
ERROR: null value in column "name" violates not-null constraint
ROLLBACK TO s1;
INSERT INTO artists VALUES (9, 'Mohsen Namjoo');
COMMIT;
SELECT * FROM artists WHERE id IN (7, 8, 9) ORDER BY id;
id | name
----+---------------
8 | Sogand
9 | Mohsen Namjoo
(2 rows)
-- Recover from multi-shard copy shutdown failure.
-- Constraint check for non-partition columns happen only at copy shutdown.
BEGIN;
DELETE FROM artists;
SAVEPOINT s1;
INSERT INTO artists SELECT i, NULL FROM generate_series(1, 5) i;
ERROR: null value in column "name" violates not-null constraint
ROLLBACK TO s1;
INSERT INTO artists VALUES (10, 'Mahmoud Farshchian');
COMMIT;
SELECT * FROM artists WHERE id IN (9, 10) ORDER BY id;
id | name
----+--------------------
10 | Mahmoud Farshchian
(1 row)
-- Recover from multi-shard copy send failure.
-- Constraint check for partition column happens at copy send.
BEGIN;
DELETE FROM artists;
SAVEPOINT s1;
INSERT INTO artists SELECT NULL, NULL FROM generate_series(1, 5) i;
ERROR: the partition column of table public.artists cannot be NULL
ROLLBACK TO s1;
INSERT INTO artists VALUES (11, 'Egon Schiele');
COMMIT;
SELECT * FROM artists WHERE id IN (10, 11) ORDER BY id;
id | name
----+--------------
11 | Egon Schiele
(1 row)
-- Recover from multi-shard copy startup failure.
-- Check for existence of a value for partition columnn happens at copy startup.
BEGIN;
DELETE FROM artists;
SAVEPOINT s1;
INSERT INTO artists(name) SELECT 'a' FROM generate_series(1, 5) i;
ERROR: the partition column of table public.artists should have a value
ROLLBACK TO s1;
INSERT INTO artists VALUES (12, 'Marc Chagall');
COMMIT;
SELECT * FROM artists WHERE id IN (11, 12) ORDER BY id;
id | name
----+--------------
12 | Marc Chagall
(1 row)
-- Recover from multi-shard CTE modify failures
create table t1(a int, b int);
create table t2(a int, b int CHECK(b > 0));
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1190000;
select create_distributed_table('t1', 'a'),
create_distributed_table('t2', 'a');
create_distributed_table | create_distributed_table
--------------------------+--------------------------
|
(1 row)
begin;
insert into t2 select i, i+1 from generate_series(1, 3) i;
with r AS (
update t2 set b = b + 1
returning *
) insert into t1 select * from r;
savepoint s1;
with r AS (
update t1 set b = b - 10
returning *
) insert into t2 select * from r;
ERROR: new row for relation "t2_1190005" violates check constraint "t2_b_check"
rollback to savepoint s1;
savepoint s2;
with r AS (
update t2 set b = b - 10
returning *
) insert into t1 select * from r;
ERROR: new row for relation "t2_1190004" violates check constraint "t2_b_check"
rollback to savepoint s2;
savepoint s3;
with r AS (
insert into t2 select i, i+1 from generate_series(-10,-5) i
returning *
) insert into t1 select * from r;
ERROR: new row for relation "t2_1190004" violates check constraint "t2_b_check"
rollback to savepoint s3;
savepoint s4;
with r AS (
insert into t1 select i, i+1 from generate_series(-10,-5) i
returning *
) insert into t2 select * from r;
ERROR: new row for relation "t2_1190005" violates check constraint "t2_b_check"
rollback to savepoint s4;
with r AS (
update t2 set b = b + 1
returning *
) insert into t1 select * from r;
commit;
select * from t2 order by a, b;
a | b
---+---
1 | 4
2 | 5
3 | 6
(3 rows)
select * from t1 order by a, b;
a | b
---+---
1 | 3
1 | 4
2 | 4
2 | 5
3 | 5
3 | 6
(6 rows)
drop table t1, t2;
-- ===================================================================
-- Tests for replication factor > 1
-- ===================================================================

View File

@ -129,6 +129,140 @@ ERROR: current transaction is aborted, commands ignored until end of transactio
ROLLBACK TO SAVEPOINT s3;
ERROR: no such savepoint
COMMIT;
-- Recover from multi-shard modify errors
BEGIN;
INSERT INTO artists VALUES (8, 'Sogand');
SAVEPOINT s1;
UPDATE artists SET name = NULL;
ERROR: null value in column "name" violates not-null constraint
ROLLBACK TO s1;
INSERT INTO artists VALUES (9, 'Mohsen Namjoo');
COMMIT;
SELECT * FROM artists WHERE id IN (7, 8, 9) ORDER BY id;
id | name
----+---------------
8 | Sogand
9 | Mohsen Namjoo
(2 rows)
-- Recover from multi-shard copy shutdown failure.
-- Constraint check for non-partition columns happen only at copy shutdown.
BEGIN;
DELETE FROM artists;
SAVEPOINT s1;
INSERT INTO artists SELECT i, NULL FROM generate_series(1, 5) i;
ERROR: null value in column "name" violates not-null constraint
ROLLBACK TO s1;
INSERT INTO artists VALUES (10, 'Mahmoud Farshchian');
COMMIT;
SELECT * FROM artists WHERE id IN (9, 10) ORDER BY id;
id | name
----+--------------------
10 | Mahmoud Farshchian
(1 row)
-- Recover from multi-shard copy send failure.
-- Constraint check for partition column happens at copy send.
BEGIN;
DELETE FROM artists;
SAVEPOINT s1;
INSERT INTO artists SELECT NULL, NULL FROM generate_series(1, 5) i;
ERROR: the partition column of table public.artists cannot be NULL
ROLLBACK TO s1;
INSERT INTO artists VALUES (11, 'Egon Schiele');
COMMIT;
SELECT * FROM artists WHERE id IN (10, 11) ORDER BY id;
id | name
----+--------------
11 | Egon Schiele
(1 row)
-- Recover from multi-shard copy startup failure.
-- Check for existence of a value for partition columnn happens at copy startup.
BEGIN;
DELETE FROM artists;
SAVEPOINT s1;
INSERT INTO artists(name) SELECT 'a' FROM generate_series(1, 5) i;
ERROR: the partition column of table public.artists should have a value
ROLLBACK TO s1;
INSERT INTO artists VALUES (12, 'Marc Chagall');
COMMIT;
SELECT * FROM artists WHERE id IN (11, 12) ORDER BY id;
id | name
----+--------------
12 | Marc Chagall
(1 row)
-- Recover from multi-shard CTE modify failures
create table t1(a int, b int);
create table t2(a int, b int CHECK(b > 0));
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1190000;
select create_distributed_table('t1', 'a'),
create_distributed_table('t2', 'a');
create_distributed_table | create_distributed_table
--------------------------+--------------------------
|
(1 row)
begin;
insert into t2 select i, i+1 from generate_series(1, 3) i;
with r AS (
update t2 set b = b + 1
returning *
) insert into t1 select * from r;
savepoint s1;
with r AS (
update t1 set b = b - 10
returning *
) insert into t2 select * from r;
ERROR: new row for relation "t2_1190005" violates check constraint "t2_b_check"
rollback to savepoint s1;
savepoint s2;
with r AS (
update t2 set b = b - 10
returning *
) insert into t1 select * from r;
ERROR: new row for relation "t2_1190004" violates check constraint "t2_b_check"
rollback to savepoint s2;
savepoint s3;
with r AS (
insert into t2 select i, i+1 from generate_series(-10,-5) i
returning *
) insert into t1 select * from r;
ERROR: new row for relation "t2_1190004" violates check constraint "t2_b_check"
rollback to savepoint s3;
savepoint s4;
with r AS (
insert into t1 select i, i+1 from generate_series(-10,-5) i
returning *
) insert into t2 select * from r;
ERROR: new row for relation "t2_1190005" violates check constraint "t2_b_check"
rollback to savepoint s4;
with r AS (
update t2 set b = b + 1
returning *
) insert into t1 select * from r;
commit;
select * from t2 order by a, b;
a | b
---+---
1 | 4
2 | 5
3 | 6
(3 rows)
select * from t1 order by a, b;
a | b
---+---
1 | 3
1 | 4
2 | 4
2 | 5
3 | 5
3 | 6
(6 rows)
drop table t1, t2;
-- ===================================================================
-- Tests for replication factor > 1
-- ===================================================================

View File

@ -105,6 +105,103 @@ SAVEPOINT s3;
ROLLBACK TO SAVEPOINT s3;
COMMIT;
-- Recover from multi-shard modify errors
BEGIN;
INSERT INTO artists VALUES (8, 'Sogand');
SAVEPOINT s1;
UPDATE artists SET name = NULL;
ROLLBACK TO s1;
INSERT INTO artists VALUES (9, 'Mohsen Namjoo');
COMMIT;
SELECT * FROM artists WHERE id IN (7, 8, 9) ORDER BY id;
-- Recover from multi-shard copy shutdown failure.
-- Constraint check for non-partition columns happen only at copy shutdown.
BEGIN;
DELETE FROM artists;
SAVEPOINT s1;
INSERT INTO artists SELECT i, NULL FROM generate_series(1, 5) i;
ROLLBACK TO s1;
INSERT INTO artists VALUES (10, 'Mahmoud Farshchian');
COMMIT;
SELECT * FROM artists WHERE id IN (9, 10) ORDER BY id;
-- Recover from multi-shard copy send failure.
-- Constraint check for partition column happens at copy send.
BEGIN;
DELETE FROM artists;
SAVEPOINT s1;
INSERT INTO artists SELECT NULL, NULL FROM generate_series(1, 5) i;
ROLLBACK TO s1;
INSERT INTO artists VALUES (11, 'Egon Schiele');
COMMIT;
SELECT * FROM artists WHERE id IN (10, 11) ORDER BY id;
-- Recover from multi-shard copy startup failure.
-- Check for existence of a value for partition columnn happens at copy startup.
BEGIN;
DELETE FROM artists;
SAVEPOINT s1;
INSERT INTO artists(name) SELECT 'a' FROM generate_series(1, 5) i;
ROLLBACK TO s1;
INSERT INTO artists VALUES (12, 'Marc Chagall');
COMMIT;
SELECT * FROM artists WHERE id IN (11, 12) ORDER BY id;
-- Recover from multi-shard CTE modify failures
create table t1(a int, b int);
create table t2(a int, b int CHECK(b > 0));
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1190000;
select create_distributed_table('t1', 'a'),
create_distributed_table('t2', 'a');
begin;
insert into t2 select i, i+1 from generate_series(1, 3) i;
with r AS (
update t2 set b = b + 1
returning *
) insert into t1 select * from r;
savepoint s1;
with r AS (
update t1 set b = b - 10
returning *
) insert into t2 select * from r;
rollback to savepoint s1;
savepoint s2;
with r AS (
update t2 set b = b - 10
returning *
) insert into t1 select * from r;
rollback to savepoint s2;
savepoint s3;
with r AS (
insert into t2 select i, i+1 from generate_series(-10,-5) i
returning *
) insert into t1 select * from r;
rollback to savepoint s3;
savepoint s4;
with r AS (
insert into t1 select i, i+1 from generate_series(-10,-5) i
returning *
) insert into t2 select * from r;
rollback to savepoint s4;
with r AS (
update t2 set b = b + 1
returning *
) insert into t1 select * from r;
commit;
select * from t2 order by a, b;
select * from t1 order by a, b;
drop table t1, t2;
-- ===================================================================
-- Tests for replication factor > 1
-- ===================================================================
@ -216,3 +313,4 @@ SELECT * FROM researchers WHERE lab_id=10;
-- Clean-up
DROP TABLE artists;
DROP TABLE researchers;