From b69a762e0bb91f885ac90080a4bf0a48ec725641 Mon Sep 17 00:00:00 2001 From: Hadi Moshayedi Date: Wed, 17 Apr 2019 10:56:40 -0700 Subject: [PATCH 1/3] Fix savepoint rollback after multi-shard update failure. --- .../connection/connection_management.c | 2 +- .../distributed/connection/remote_commands.c | 23 ++++++++++++ .../executor/multi_client_executor.c | 16 +------- .../executor/multi_router_executor.c | 37 ++++++++++++------- .../transaction/remote_transaction.c | 6 +++ .../distributed/connection_management.h | 3 ++ src/include/distributed/remote_commands.h | 1 + .../regress/expected/failure_savepoints.out | 8 ++++ .../expected/multi_subtransactions.out | 22 +++++++++++ .../expected/multi_subtransactions_0.out | 22 +++++++++++ .../regress/sql/multi_subtransactions.sql | 12 ++++++ 11 files changed, 123 insertions(+), 29 deletions(-) diff --git a/src/backend/distributed/connection/connection_management.c b/src/backend/distributed/connection/connection_management.c index 7a7043111..a6e23bb5b 100644 --- a/src/backend/distributed/connection/connection_management.c +++ b/src/backend/distributed/connection/connection_management.c @@ -464,7 +464,7 @@ ShutdownConnection(MultiConnection *connection) if (PQstatus(connection->pgConn) == CONNECTION_OK && PQtransactionStatus(connection->pgConn) == PQTRANS_ACTIVE) { - char errorMessage[256] = { 0 }; + char errorMessage[ERROR_BUFFER_SIZE] = { 0 }; PGcancel *cancel = PQgetCancel(connection->pgConn); if (!PQcancel(cancel, errorMessage, sizeof(errorMessage))) diff --git a/src/backend/distributed/connection/remote_commands.c b/src/backend/distributed/connection/remote_commands.c index 6f5132c21..dbce8e070 100644 --- a/src/backend/distributed/connection/remote_commands.c +++ b/src/backend/distributed/connection/remote_commands.c @@ -1008,3 +1008,26 @@ BuildWaitEventSet(MultiConnection **allConnections, int totalConnectionCount, return waitEventSet; } + + +/* + * MultiClientCancel sends a cancelation request on the given connection. Return + * value indicates whether the cancelation request was sent successfully. + */ +bool +SendCancelationRequest(MultiConnection *connection) +{ + char errorBuffer[ERROR_BUFFER_SIZE] = { 0 }; + PGcancel *cancelObject = PQgetCancel(connection->pgConn); + + bool cancelSent = PQcancel(cancelObject, errorBuffer, sizeof(errorBuffer)); + if (!cancelSent) + { + ereport(WARNING, (errmsg("could not issue cancel request"), + errdetail("Client error: %s", errorBuffer))); + } + + PQfreeCancel(cancelObject); + + return cancelSent; +} diff --git a/src/backend/distributed/executor/multi_client_executor.c b/src/backend/distributed/executor/multi_client_executor.c index 907a37a1c..029152319 100644 --- a/src/backend/distributed/executor/multi_client_executor.c +++ b/src/backend/distributed/executor/multi_client_executor.c @@ -443,27 +443,13 @@ bool MultiClientCancel(int32 connectionId) { MultiConnection *connection = NULL; - PGcancel *cancelObject = NULL; - int cancelSent = 0; bool canceled = true; - char errorBuffer[STRING_BUFFER_SIZE]; Assert(connectionId != INVALID_CONNECTION_ID); connection = ClientConnectionArray[connectionId]; Assert(connection != NULL); - cancelObject = PQgetCancel(connection->pgConn); - - cancelSent = PQcancel(cancelObject, errorBuffer, sizeof(errorBuffer)); - if (cancelSent == 0) - { - ereport(WARNING, (errmsg("could not issue cancel request"), - errdetail("Client error: %s", errorBuffer))); - - canceled = false; - } - - PQfreeCancel(cancelObject); + canceled = SendCancelationRequest(connection); return canceled; } diff --git a/src/backend/distributed/executor/multi_router_executor.c b/src/backend/distributed/executor/multi_router_executor.c index 25ea7c921..8bdb8f4f5 100644 --- a/src/backend/distributed/executor/multi_router_executor.c +++ b/src/backend/distributed/executor/multi_router_executor.c @@ -1574,6 +1574,7 @@ ExecuteModifyTasks(List *taskList, bool expectResults, ParamListInfo paramListIn queryOK = SendQueryInSingleRowMode(connection, queryString, paramListInfo); if (!queryOK) { + UnclaimAllShardConnections(shardConnectionHash); ReportConnectionError(connection, ERROR); } } @@ -1616,28 +1617,38 @@ ExecuteModifyTasks(List *taskList, bool expectResults, ParamListInfo paramListIn SetCitusNoticeLevel(INFO); } - /* - * If caller is interested, store query results the first time - * through. The output of the query's execution on other shards is - * discarded if we run there (because it's a modification query). - */ - if (placementIndex == 0 && expectResults) + PG_TRY(); { - Assert(scanState != NULL); + /* + * If caller is interested, store query results the first time + * through. The output of the query's execution on other shards is + * discarded if we run there (because it's a modification query). + */ + if (placementIndex == 0 && expectResults) + { + Assert(scanState != NULL); - queryOK = StoreQueryResult(scanState, connection, - alwaysThrowErrorOnFailure, - ¤tAffectedTupleCount, NULL); + queryOK = StoreQueryResult(scanState, connection, + alwaysThrowErrorOnFailure, + ¤tAffectedTupleCount, NULL); + } + else + { + queryOK = ConsumeQueryResult(connection, alwaysThrowErrorOnFailure, + ¤tAffectedTupleCount); + } } - else + PG_CATCH(); { - queryOK = ConsumeQueryResult(connection, alwaysThrowErrorOnFailure, - ¤tAffectedTupleCount); + UnclaimAllShardConnections(shardConnectionHash); + PG_RE_THROW(); } + PG_END_TRY(); /* We error out if the worker fails to return a result for the query. */ if (!queryOK) { + UnclaimAllShardConnections(shardConnectionHash); ReportConnectionError(connection, ERROR); } diff --git a/src/backend/distributed/transaction/remote_transaction.c b/src/backend/distributed/transaction/remote_transaction.c index f9028c379..e49ba9e6d 100644 --- a/src/backend/distributed/transaction/remote_transaction.c +++ b/src/backend/distributed/transaction/remote_transaction.c @@ -1068,6 +1068,12 @@ CoordinatedRemoteTransactionsSavepointRollback(SubTransactionId subId) MultiConnection *connection = dlist_container(MultiConnection, transactionNode, iter.cur); RemoteTransaction *transaction = &connection->remoteTransaction; + + /* cancel any ongoing queries before issuing rollback */ + ClearResultsIfReady(connection); + SendCancelationRequest(connection); + ForgetResults(connection); + if (transaction->transactionFailed) { if (transaction->lastSuccessfulSubXact <= subId) diff --git a/src/include/distributed/connection_management.h b/src/include/distributed/connection_management.h index b6b6b0f1e..1cbf8719e 100644 --- a/src/include/distributed/connection_management.h +++ b/src/include/distributed/connection_management.h @@ -21,6 +21,9 @@ /* maximum (textual) lengths of hostname and port */ #define MAX_NODE_LENGTH 255 /* includes 0 byte */ +/* used for libpq commands that get an error buffer. Postgres docs recommend 256. */ +#define ERROR_BUFFER_SIZE 256 + /* default notice level */ #define DEFAULT_CITUS_NOTICE_LEVEL DEBUG1 diff --git a/src/include/distributed/remote_commands.h b/src/include/distributed/remote_commands.h index 9a0ad75be..424cb00a7 100644 --- a/src/include/distributed/remote_commands.h +++ b/src/include/distributed/remote_commands.h @@ -57,5 +57,6 @@ extern bool PutRemoteCopyEnd(MultiConnection *connection, const char *errormsg); /* waiting for multiple command results */ extern void WaitForAllConnections(List *connectionList, bool raiseInterrupts); +extern bool SendCancelationRequest(MultiConnection *connection); #endif /* REMOTE_COMMAND_H */ diff --git a/src/test/regress/expected/failure_savepoints.out b/src/test/regress/expected/failure_savepoints.out index ca8a968b1..357ce6118 100644 --- a/src/test/regress/expected/failure_savepoints.out +++ b/src/test/regress/expected/failure_savepoints.out @@ -38,6 +38,10 @@ CONTEXT: while executing command on localhost:9060 WARNING: connection not open CONTEXT: while executing command on localhost:9060 DELETE FROM artists WHERE id=4; +WARNING: could not issue cancel request +DETAIL: Client error: PQcancel() -- no cancel object supplied +WARNING: connection not open +CONTEXT: while executing command on localhost:9060 WARNING: connection not open CONTEXT: while executing command on localhost:9060 WARNING: connection error: localhost:9060 @@ -195,6 +199,8 @@ INSERT INTO artists VALUES (7, 'Emily Carr'); ROLLBACK TO SAVEPOINT s1; WARNING: connection not open WARNING: connection not open +WARNING: could not issue cancel request +WARNING: connection not open COMMIT; ERROR: could not make changes to shard 100950 on any node SELECT * FROM artists WHERE id=6; @@ -230,6 +236,8 @@ WARNING: connection not open WARNING: connection not open INSERT INTO researchers VALUES (8, 4, 'Alonzo Church'); ROLLBACK TO s1; +WARNING: could not issue cancel request +WARNING: connection not open WARNING: connection not open WARNING: connection error: localhost:9060 WARNING: connection not open diff --git a/src/test/regress/expected/multi_subtransactions.out b/src/test/regress/expected/multi_subtransactions.out index d28d62bce..5f677d7d8 100644 --- a/src/test/regress/expected/multi_subtransactions.out +++ b/src/test/regress/expected/multi_subtransactions.out @@ -129,6 +129,28 @@ ERROR: current transaction is aborted, commands ignored until end of transactio ROLLBACK TO SAVEPOINT s3; ERROR: savepoint "s3" does not exist COMMIT; +-- Recover from multi-shard modify errors +BEGIN; +INSERT INTO artists VALUES (8, 'Uncle Yaakov'); +SAVEPOINT s1; +UPDATE artists SET name = NULL; +ERROR: null value in column "name" violates not-null constraint +ROLLBACK TO s1; +INSERT INTO artists VALUES (9, 'Anna Schaeffer'); +COMMIT; +SELECT * FROM artists ORDER BY id; + id | name +----+------------------ + 1 | Pablo Picasso + 2 | Vincent van Gogh + 3 | Claude Monet + 4 | William Kurelek + 5 | Jacob Kahn + 6 | Emily Carr + 8 | Uncle Yaakov + 9 | Anna Schaeffer +(8 rows) + -- =================================================================== -- Tests for replication factor > 1 -- =================================================================== diff --git a/src/test/regress/expected/multi_subtransactions_0.out b/src/test/regress/expected/multi_subtransactions_0.out index 74765de10..e03a667d6 100644 --- a/src/test/regress/expected/multi_subtransactions_0.out +++ b/src/test/regress/expected/multi_subtransactions_0.out @@ -129,6 +129,28 @@ ERROR: current transaction is aborted, commands ignored until end of transactio ROLLBACK TO SAVEPOINT s3; ERROR: no such savepoint COMMIT; +-- Recover from multi-shard modify errors +BEGIN; +INSERT INTO artists VALUES (8, 'Uncle Yaakov'); +SAVEPOINT s1; +UPDATE artists SET name = NULL; +ERROR: null value in column "name" violates not-null constraint +ROLLBACK TO s1; +INSERT INTO artists VALUES (9, 'Anna Schaeffer'); +COMMIT; +SELECT * FROM artists ORDER BY id; + id | name +----+------------------ + 1 | Pablo Picasso + 2 | Vincent van Gogh + 3 | Claude Monet + 4 | William Kurelek + 5 | Jacob Kahn + 6 | Emily Carr + 8 | Uncle Yaakov + 9 | Anna Schaeffer +(8 rows) + -- =================================================================== -- Tests for replication factor > 1 -- =================================================================== diff --git a/src/test/regress/sql/multi_subtransactions.sql b/src/test/regress/sql/multi_subtransactions.sql index 5865aa467..992e23824 100644 --- a/src/test/regress/sql/multi_subtransactions.sql +++ b/src/test/regress/sql/multi_subtransactions.sql @@ -105,6 +105,17 @@ SAVEPOINT s3; ROLLBACK TO SAVEPOINT s3; COMMIT; +-- Recover from multi-shard modify errors +BEGIN; +INSERT INTO artists VALUES (8, 'Uncle Yaakov'); +SAVEPOINT s1; +UPDATE artists SET name = NULL; +ROLLBACK TO s1; +INSERT INTO artists VALUES (9, 'Anna Schaeffer'); +COMMIT; + +SELECT * FROM artists ORDER BY id; + -- =================================================================== -- Tests for replication factor > 1 -- =================================================================== @@ -216,3 +227,4 @@ SELECT * FROM researchers WHERE lab_id=10; -- Clean-up DROP TABLE artists; DROP TABLE researchers; + From aafd22dffa3134b2275195be25ad2b2f31ad7482 Mon Sep 17 00:00:00 2001 From: Hadi Moshayedi Date: Mon, 22 Apr 2019 11:55:11 -0700 Subject: [PATCH 2/3] Fix savepoint rollback for INSERT INTO ... SELECT. --- src/backend/distributed/commands/multi_copy.c | 65 ++++++++++++++--- .../distributed/connection/remote_commands.c | 31 +++++++- .../transaction/remote_transaction.c | 5 +- src/include/distributed/remote_commands.h | 1 + .../regress/expected/failure_savepoints.out | 4 -- .../expected/multi_subtransactions.out | 70 +++++++++++++++---- .../expected/multi_subtransactions_0.out | 70 +++++++++++++++---- .../regress/sql/multi_subtransactions.sql | 43 +++++++++++- 8 files changed, 239 insertions(+), 50 deletions(-) diff --git a/src/backend/distributed/commands/multi_copy.c b/src/backend/distributed/commands/multi_copy.c index a96d87529..9c439f831 100644 --- a/src/backend/distributed/commands/multi_copy.c +++ b/src/backend/distributed/commands/multi_copy.c @@ -154,6 +154,8 @@ static void CopySendInt32(CopyOutState outputState, int32 val); static void CopySendInt16(CopyOutState outputState, int16 val); static void CopyAttributeOutText(CopyOutState outputState, char *string); static inline void CopyFlushOutput(CopyOutState outputState, char *start, char *pointer); +static bool CitusSendTupleToPlacements(TupleTableSlot *slot, + CitusCopyDestReceiver *copyDest); /* CitusCopyDestReceiver functions */ static void CitusCopyDestReceiverStartup(DestReceiver *copyDest, int operation, @@ -2251,8 +2253,33 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation, static bool CitusCopyDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest) { + bool result = false; CitusCopyDestReceiver *copyDest = (CitusCopyDestReceiver *) dest; + PG_TRY(); + { + result = CitusSendTupleToPlacements(slot, copyDest); + } + PG_CATCH(); + { + HTAB *shardConnectionHash = copyDest->shardConnectionHash; + UnclaimAllShardConnections(shardConnectionHash); + + PG_RE_THROW(); + } + PG_END_TRY(); + + return result; +} + + +/* + * CitusSendTupleToPlacements sends the given TupleTableSlot to the appropriate + * shard placement(s). + */ +static bool +CitusSendTupleToPlacements(TupleTableSlot *slot, CitusCopyDestReceiver *copyDest) +{ int partitionColumnIndex = copyDest->partitionColumnIndex; TupleDesc tupleDescriptor = copyDest->tupleDescriptor; CopyStmt *copyStatement = copyDest->copyStatement; @@ -2409,21 +2436,37 @@ CitusCopyDestReceiverShutdown(DestReceiver *destReceiver) Relation distributedRelation = copyDest->distributedRelation; shardConnectionsList = ShardConnectionList(shardConnectionHash); - foreach(shardConnectionsCell, shardConnectionsList) + + PG_TRY(); { - ShardConnections *shardConnections = (ShardConnections *) lfirst( - shardConnectionsCell); - - /* send copy binary footers to all shard placements */ - if (copyOutState->binary) + foreach(shardConnectionsCell, shardConnectionsList) { - SendCopyBinaryFooters(copyOutState, shardConnections->shardId, - shardConnections->connectionList); - } + ShardConnections *shardConnections = (ShardConnections *) lfirst( + shardConnectionsCell); - /* close the COPY input on all shard placements */ - EndRemoteCopy(shardConnections->shardId, shardConnections->connectionList, true); + /* send copy binary footers to all shard placements */ + if (copyOutState->binary) + { + SendCopyBinaryFooters(copyOutState, shardConnections->shardId, + shardConnections->connectionList); + } + + /* close the COPY input on all shard placements */ + EndRemoteCopy(shardConnections->shardId, shardConnections->connectionList, + true); + } } + PG_CATCH(); + { + /* + * We might be able to recover from errors with ROLLBACK TO SAVEPOINT, + * so unclaim the connections before throwing errors. + */ + UnclaimAllShardConnections(shardConnectionHash); + + PG_RE_THROW(); + } + PG_END_TRY(); heap_close(distributedRelation, NoLock); } diff --git a/src/backend/distributed/connection/remote_commands.c b/src/backend/distributed/connection/remote_commands.c index dbce8e070..8fe174251 100644 --- a/src/backend/distributed/connection/remote_commands.c +++ b/src/backend/distributed/connection/remote_commands.c @@ -29,6 +29,8 @@ bool LogRemoteCommands = false; +static bool ClearResultsInternal(MultiConnection *connection, bool raiseErrors, + bool discardWarnings); static bool FinishConnectionIO(MultiConnection *connection, bool raiseInterrupts); static WaitEventSet * BuildWaitEventSet(MultiConnection **allConnections, int totalConnectionCount, @@ -71,7 +73,7 @@ ForgetResults(MultiConnection *connection) /* - * ClearResults clears a connection from pending activity, + * ClearResultsInternal clears a connection from pending activity, * returns true if all pending commands return success. It raises * error if raiseErrors flag is set, any command fails and transaction * is marked critical. @@ -81,6 +83,27 @@ ForgetResults(MultiConnection *connection) */ bool ClearResults(MultiConnection *connection, bool raiseErrors) +{ + return ClearResultsInternal(connection, raiseErrors, false); +} + + +/* + * ClearResultsDiscardWarnings does the same thing as ClearResults, but doesn't + * emit warnings. + */ +bool +ClearResultsDiscardWarnings(MultiConnection *connection, bool raiseErrors) +{ + return ClearResultsInternal(connection, raiseErrors, true); +} + + +/* + * ClearResultsInternal is used by ClearResults and ClearResultsDiscardWarnings. + */ +static bool +ClearResultsInternal(MultiConnection *connection, bool raiseErrors, bool discardWarnings) { bool success = true; @@ -103,7 +126,11 @@ ClearResults(MultiConnection *connection, bool raiseErrors) if (!IsResponseOK(result)) { - ReportResultError(connection, result, WARNING); + if (!discardWarnings) + { + ReportResultError(connection, result, WARNING); + } + MarkRemoteTransactionFailed(connection, raiseErrors); success = false; diff --git a/src/backend/distributed/transaction/remote_transaction.c b/src/backend/distributed/transaction/remote_transaction.c index e49ba9e6d..5df7a7ad8 100644 --- a/src/backend/distributed/transaction/remote_transaction.c +++ b/src/backend/distributed/transaction/remote_transaction.c @@ -1070,9 +1070,10 @@ CoordinatedRemoteTransactionsSavepointRollback(SubTransactionId subId) RemoteTransaction *transaction = &connection->remoteTransaction; /* cancel any ongoing queries before issuing rollback */ - ClearResultsIfReady(connection); SendCancelationRequest(connection); - ForgetResults(connection); + + /* clear results, but don't show cancelation warning messages from workers. */ + ClearResultsDiscardWarnings(connection, raiseInterrupts); if (transaction->transactionFailed) { diff --git a/src/include/distributed/remote_commands.h b/src/include/distributed/remote_commands.h index 424cb00a7..5a55a25a1 100644 --- a/src/include/distributed/remote_commands.h +++ b/src/include/distributed/remote_commands.h @@ -27,6 +27,7 @@ extern bool LogRemoteCommands; extern bool IsResponseOK(struct pg_result *result); extern void ForgetResults(MultiConnection *connection); extern bool ClearResults(MultiConnection *connection, bool raiseErrors); +extern bool ClearResultsDiscardWarnings(MultiConnection *connection, bool raiseErrors); extern bool ClearResultsIfReady(MultiConnection *connection); extern bool SqlStateMatchesCategory(char *sqlStateString, int category); diff --git a/src/test/regress/expected/failure_savepoints.out b/src/test/regress/expected/failure_savepoints.out index 357ce6118..a95451d1a 100644 --- a/src/test/regress/expected/failure_savepoints.out +++ b/src/test/regress/expected/failure_savepoints.out @@ -42,8 +42,6 @@ WARNING: could not issue cancel request DETAIL: Client error: PQcancel() -- no cancel object supplied WARNING: connection not open CONTEXT: while executing command on localhost:9060 -WARNING: connection not open -CONTEXT: while executing command on localhost:9060 WARNING: connection error: localhost:9060 DETAIL: connection not open WARNING: connection not open @@ -200,7 +198,6 @@ ROLLBACK TO SAVEPOINT s1; WARNING: connection not open WARNING: connection not open WARNING: could not issue cancel request -WARNING: connection not open COMMIT; ERROR: could not make changes to shard 100950 on any node SELECT * FROM artists WHERE id=6; @@ -238,7 +235,6 @@ INSERT INTO researchers VALUES (8, 4, 'Alonzo Church'); ROLLBACK TO s1; WARNING: could not issue cancel request WARNING: connection not open -WARNING: connection not open WARNING: connection error: localhost:9060 WARNING: connection not open WARNING: connection not open diff --git a/src/test/regress/expected/multi_subtransactions.out b/src/test/regress/expected/multi_subtransactions.out index 5f677d7d8..d8020bc8d 100644 --- a/src/test/regress/expected/multi_subtransactions.out +++ b/src/test/regress/expected/multi_subtransactions.out @@ -131,25 +131,67 @@ ERROR: savepoint "s3" does not exist COMMIT; -- Recover from multi-shard modify errors BEGIN; -INSERT INTO artists VALUES (8, 'Uncle Yaakov'); +INSERT INTO artists VALUES (8, 'Sogand'); SAVEPOINT s1; UPDATE artists SET name = NULL; ERROR: null value in column "name" violates not-null constraint ROLLBACK TO s1; -INSERT INTO artists VALUES (9, 'Anna Schaeffer'); +INSERT INTO artists VALUES (9, 'Mohsen Namjoo'); COMMIT; -SELECT * FROM artists ORDER BY id; - id | name -----+------------------ - 1 | Pablo Picasso - 2 | Vincent van Gogh - 3 | Claude Monet - 4 | William Kurelek - 5 | Jacob Kahn - 6 | Emily Carr - 8 | Uncle Yaakov - 9 | Anna Schaeffer -(8 rows) +SELECT * FROM artists WHERE id IN (7, 8, 9) ORDER BY id; + id | name +----+--------------- + 8 | Sogand + 9 | Mohsen Namjoo +(2 rows) + +-- Recover from multi-shard copy shutdown failure. +-- Constraint check for non-partition columns happen only at copy shutdown. +BEGIN; +DELETE FROM artists; +SAVEPOINT s1; +INSERT INTO artists SELECT i, NULL FROM generate_series(1, 5) i; +ERROR: null value in column "name" violates not-null constraint +ROLLBACK TO s1; +INSERT INTO artists VALUES (10, 'Mahmoud Farshchian'); +COMMIT; +SELECT * FROM artists WHERE id IN (9, 10) ORDER BY id; + id | name +----+-------------------- + 10 | Mahmoud Farshchian +(1 row) + +-- Recover from multi-shard copy send failure. +-- Constraint check for partition column happens at copy send. +BEGIN; +DELETE FROM artists; +SAVEPOINT s1; +INSERT INTO artists SELECT NULL, NULL FROM generate_series(1, 5) i; +ERROR: the partition column of table public.artists cannot be NULL +ROLLBACK TO s1; +INSERT INTO artists VALUES (11, 'Egon Schiele'); +COMMIT; +SELECT * FROM artists WHERE id IN (10, 11) ORDER BY id; + id | name +----+-------------- + 11 | Egon Schiele +(1 row) + +-- Recover from multi-shard copy startup failure. +-- Check for existence of a value for partition columnn happens at copy startup. +BEGIN; +DELETE FROM artists; +SAVEPOINT s1; +INSERT INTO artists(name) SELECT 'a' FROM generate_series(1, 5) i; +ERROR: the partition column of table public.artists should have a value +ROLLBACK TO s1; +INSERT INTO artists VALUES (12, 'Marc Chagall'); +COMMIT; +SELECT * FROM artists WHERE id IN (11, 12) ORDER BY id; + id | name +----+-------------- + 12 | Marc Chagall +(1 row) -- =================================================================== -- Tests for replication factor > 1 diff --git a/src/test/regress/expected/multi_subtransactions_0.out b/src/test/regress/expected/multi_subtransactions_0.out index e03a667d6..9caa09e59 100644 --- a/src/test/regress/expected/multi_subtransactions_0.out +++ b/src/test/regress/expected/multi_subtransactions_0.out @@ -131,25 +131,67 @@ ERROR: no such savepoint COMMIT; -- Recover from multi-shard modify errors BEGIN; -INSERT INTO artists VALUES (8, 'Uncle Yaakov'); +INSERT INTO artists VALUES (8, 'Sogand'); SAVEPOINT s1; UPDATE artists SET name = NULL; ERROR: null value in column "name" violates not-null constraint ROLLBACK TO s1; -INSERT INTO artists VALUES (9, 'Anna Schaeffer'); +INSERT INTO artists VALUES (9, 'Mohsen Namjoo'); COMMIT; -SELECT * FROM artists ORDER BY id; - id | name -----+------------------ - 1 | Pablo Picasso - 2 | Vincent van Gogh - 3 | Claude Monet - 4 | William Kurelek - 5 | Jacob Kahn - 6 | Emily Carr - 8 | Uncle Yaakov - 9 | Anna Schaeffer -(8 rows) +SELECT * FROM artists WHERE id IN (7, 8, 9) ORDER BY id; + id | name +----+--------------- + 8 | Sogand + 9 | Mohsen Namjoo +(2 rows) + +-- Recover from multi-shard copy shutdown failure. +-- Constraint check for non-partition columns happen only at copy shutdown. +BEGIN; +DELETE FROM artists; +SAVEPOINT s1; +INSERT INTO artists SELECT i, NULL FROM generate_series(1, 5) i; +ERROR: null value in column "name" violates not-null constraint +ROLLBACK TO s1; +INSERT INTO artists VALUES (10, 'Mahmoud Farshchian'); +COMMIT; +SELECT * FROM artists WHERE id IN (9, 10) ORDER BY id; + id | name +----+-------------------- + 10 | Mahmoud Farshchian +(1 row) + +-- Recover from multi-shard copy send failure. +-- Constraint check for partition column happens at copy send. +BEGIN; +DELETE FROM artists; +SAVEPOINT s1; +INSERT INTO artists SELECT NULL, NULL FROM generate_series(1, 5) i; +ERROR: the partition column of table public.artists cannot be NULL +ROLLBACK TO s1; +INSERT INTO artists VALUES (11, 'Egon Schiele'); +COMMIT; +SELECT * FROM artists WHERE id IN (10, 11) ORDER BY id; + id | name +----+-------------- + 11 | Egon Schiele +(1 row) + +-- Recover from multi-shard copy startup failure. +-- Check for existence of a value for partition columnn happens at copy startup. +BEGIN; +DELETE FROM artists; +SAVEPOINT s1; +INSERT INTO artists(name) SELECT 'a' FROM generate_series(1, 5) i; +ERROR: the partition column of table public.artists should have a value +ROLLBACK TO s1; +INSERT INTO artists VALUES (12, 'Marc Chagall'); +COMMIT; +SELECT * FROM artists WHERE id IN (11, 12) ORDER BY id; + id | name +----+-------------- + 12 | Marc Chagall +(1 row) -- =================================================================== -- Tests for replication factor > 1 diff --git a/src/test/regress/sql/multi_subtransactions.sql b/src/test/regress/sql/multi_subtransactions.sql index 992e23824..d0f163244 100644 --- a/src/test/regress/sql/multi_subtransactions.sql +++ b/src/test/regress/sql/multi_subtransactions.sql @@ -107,14 +107,51 @@ COMMIT; -- Recover from multi-shard modify errors BEGIN; -INSERT INTO artists VALUES (8, 'Uncle Yaakov'); +INSERT INTO artists VALUES (8, 'Sogand'); SAVEPOINT s1; UPDATE artists SET name = NULL; ROLLBACK TO s1; -INSERT INTO artists VALUES (9, 'Anna Schaeffer'); +INSERT INTO artists VALUES (9, 'Mohsen Namjoo'); COMMIT; -SELECT * FROM artists ORDER BY id; +SELECT * FROM artists WHERE id IN (7, 8, 9) ORDER BY id; + +-- Recover from multi-shard copy shutdown failure. +-- Constraint check for non-partition columns happen only at copy shutdown. +BEGIN; +DELETE FROM artists; +SAVEPOINT s1; +INSERT INTO artists SELECT i, NULL FROM generate_series(1, 5) i; +ROLLBACK TO s1; +INSERT INTO artists VALUES (10, 'Mahmoud Farshchian'); +COMMIT; + +SELECT * FROM artists WHERE id IN (9, 10) ORDER BY id; + +-- Recover from multi-shard copy send failure. +-- Constraint check for partition column happens at copy send. +BEGIN; +DELETE FROM artists; +SAVEPOINT s1; +INSERT INTO artists SELECT NULL, NULL FROM generate_series(1, 5) i; +ROLLBACK TO s1; +INSERT INTO artists VALUES (11, 'Egon Schiele'); +COMMIT; + +SELECT * FROM artists WHERE id IN (10, 11) ORDER BY id; + +-- Recover from multi-shard copy startup failure. +-- Check for existence of a value for partition columnn happens at copy startup. +BEGIN; +DELETE FROM artists; +SAVEPOINT s1; +INSERT INTO artists(name) SELECT 'a' FROM generate_series(1, 5) i; +ROLLBACK TO s1; +INSERT INTO artists VALUES (12, 'Marc Chagall'); +COMMIT; + +SELECT * FROM artists WHERE id IN (11, 12) ORDER BY id; + -- =================================================================== -- Tests for replication factor > 1 From 32ecb6884c57735ebb8ddf0059419139d031ca24 Mon Sep 17 00:00:00 2001 From: Hadi Moshayedi Date: Mon, 29 Apr 2019 11:13:11 -0700 Subject: [PATCH 3/3] Test ROLLBACK TO SAVEPOINT with multi-shard CTE failures --- src/backend/distributed/commands/multi_copy.c | 4 ++ .../distributed/connection/remote_commands.c | 4 +- .../executor/multi_router_executor.c | 4 ++ .../expected/multi_subtransactions.out | 70 +++++++++++++++++++ .../expected/multi_subtransactions_0.out | 70 +++++++++++++++++++ .../regress/sql/multi_subtransactions.sql | 49 +++++++++++++ 6 files changed, 199 insertions(+), 2 deletions(-) diff --git a/src/backend/distributed/commands/multi_copy.c b/src/backend/distributed/commands/multi_copy.c index 9c439f831..6cc0c5dca 100644 --- a/src/backend/distributed/commands/multi_copy.c +++ b/src/backend/distributed/commands/multi_copy.c @@ -2262,6 +2262,10 @@ CitusCopyDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest) } PG_CATCH(); { + /* + * We might be able to recover from errors with ROLLBACK TO SAVEPOINT, + * so unclaim the connections before throwing errors. + */ HTAB *shardConnectionHash = copyDest->shardConnectionHash; UnclaimAllShardConnections(shardConnectionHash); diff --git a/src/backend/distributed/connection/remote_commands.c b/src/backend/distributed/connection/remote_commands.c index 8fe174251..21d8bd56e 100644 --- a/src/backend/distributed/connection/remote_commands.c +++ b/src/backend/distributed/connection/remote_commands.c @@ -1038,8 +1038,8 @@ BuildWaitEventSet(MultiConnection **allConnections, int totalConnectionCount, /* - * MultiClientCancel sends a cancelation request on the given connection. Return - * value indicates whether the cancelation request was sent successfully. + * SendCancelationRequest sends a cancelation request on the given connection. + * Return value indicates whether the cancelation request was sent successfully. */ bool SendCancelationRequest(MultiConnection *connection) diff --git a/src/backend/distributed/executor/multi_router_executor.c b/src/backend/distributed/executor/multi_router_executor.c index 8bdb8f4f5..c5259cc6f 100644 --- a/src/backend/distributed/executor/multi_router_executor.c +++ b/src/backend/distributed/executor/multi_router_executor.c @@ -1640,6 +1640,10 @@ ExecuteModifyTasks(List *taskList, bool expectResults, ParamListInfo paramListIn } PG_CATCH(); { + /* + * We might be able to recover from errors with ROLLBACK TO SAVEPOINT, + * so unclaim the connections before throwing errors. + */ UnclaimAllShardConnections(shardConnectionHash); PG_RE_THROW(); } diff --git a/src/test/regress/expected/multi_subtransactions.out b/src/test/regress/expected/multi_subtransactions.out index d8020bc8d..7a6812dce 100644 --- a/src/test/regress/expected/multi_subtransactions.out +++ b/src/test/regress/expected/multi_subtransactions.out @@ -193,6 +193,76 @@ SELECT * FROM artists WHERE id IN (11, 12) ORDER BY id; 12 | Marc Chagall (1 row) +-- Recover from multi-shard CTE modify failures +create table t1(a int, b int); +create table t2(a int, b int CHECK(b > 0)); +ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1190000; +select create_distributed_table('t1', 'a'), + create_distributed_table('t2', 'a'); + create_distributed_table | create_distributed_table +--------------------------+-------------------------- + | +(1 row) + +begin; +insert into t2 select i, i+1 from generate_series(1, 3) i; +with r AS ( + update t2 set b = b + 1 + returning * +) insert into t1 select * from r; +savepoint s1; +with r AS ( + update t1 set b = b - 10 + returning * +) insert into t2 select * from r; +ERROR: new row for relation "t2_1190005" violates check constraint "t2_b_check" +rollback to savepoint s1; +savepoint s2; +with r AS ( + update t2 set b = b - 10 + returning * +) insert into t1 select * from r; +ERROR: new row for relation "t2_1190004" violates check constraint "t2_b_check" +rollback to savepoint s2; +savepoint s3; +with r AS ( + insert into t2 select i, i+1 from generate_series(-10,-5) i + returning * +) insert into t1 select * from r; +ERROR: new row for relation "t2_1190004" violates check constraint "t2_b_check" +rollback to savepoint s3; +savepoint s4; +with r AS ( + insert into t1 select i, i+1 from generate_series(-10,-5) i + returning * +) insert into t2 select * from r; +ERROR: new row for relation "t2_1190005" violates check constraint "t2_b_check" +rollback to savepoint s4; +with r AS ( + update t2 set b = b + 1 + returning * +) insert into t1 select * from r; +commit; +select * from t2 order by a, b; + a | b +---+--- + 1 | 4 + 2 | 5 + 3 | 6 +(3 rows) + +select * from t1 order by a, b; + a | b +---+--- + 1 | 3 + 1 | 4 + 2 | 4 + 2 | 5 + 3 | 5 + 3 | 6 +(6 rows) + +drop table t1, t2; -- =================================================================== -- Tests for replication factor > 1 -- =================================================================== diff --git a/src/test/regress/expected/multi_subtransactions_0.out b/src/test/regress/expected/multi_subtransactions_0.out index 9caa09e59..3f7f8aa2c 100644 --- a/src/test/regress/expected/multi_subtransactions_0.out +++ b/src/test/regress/expected/multi_subtransactions_0.out @@ -193,6 +193,76 @@ SELECT * FROM artists WHERE id IN (11, 12) ORDER BY id; 12 | Marc Chagall (1 row) +-- Recover from multi-shard CTE modify failures +create table t1(a int, b int); +create table t2(a int, b int CHECK(b > 0)); +ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1190000; +select create_distributed_table('t1', 'a'), + create_distributed_table('t2', 'a'); + create_distributed_table | create_distributed_table +--------------------------+-------------------------- + | +(1 row) + +begin; +insert into t2 select i, i+1 from generate_series(1, 3) i; +with r AS ( + update t2 set b = b + 1 + returning * +) insert into t1 select * from r; +savepoint s1; +with r AS ( + update t1 set b = b - 10 + returning * +) insert into t2 select * from r; +ERROR: new row for relation "t2_1190005" violates check constraint "t2_b_check" +rollback to savepoint s1; +savepoint s2; +with r AS ( + update t2 set b = b - 10 + returning * +) insert into t1 select * from r; +ERROR: new row for relation "t2_1190004" violates check constraint "t2_b_check" +rollback to savepoint s2; +savepoint s3; +with r AS ( + insert into t2 select i, i+1 from generate_series(-10,-5) i + returning * +) insert into t1 select * from r; +ERROR: new row for relation "t2_1190004" violates check constraint "t2_b_check" +rollback to savepoint s3; +savepoint s4; +with r AS ( + insert into t1 select i, i+1 from generate_series(-10,-5) i + returning * +) insert into t2 select * from r; +ERROR: new row for relation "t2_1190005" violates check constraint "t2_b_check" +rollback to savepoint s4; +with r AS ( + update t2 set b = b + 1 + returning * +) insert into t1 select * from r; +commit; +select * from t2 order by a, b; + a | b +---+--- + 1 | 4 + 2 | 5 + 3 | 6 +(3 rows) + +select * from t1 order by a, b; + a | b +---+--- + 1 | 3 + 1 | 4 + 2 | 4 + 2 | 5 + 3 | 5 + 3 | 6 +(6 rows) + +drop table t1, t2; -- =================================================================== -- Tests for replication factor > 1 -- =================================================================== diff --git a/src/test/regress/sql/multi_subtransactions.sql b/src/test/regress/sql/multi_subtransactions.sql index d0f163244..2dc8fb2f9 100644 --- a/src/test/regress/sql/multi_subtransactions.sql +++ b/src/test/regress/sql/multi_subtransactions.sql @@ -152,6 +152,55 @@ COMMIT; SELECT * FROM artists WHERE id IN (11, 12) ORDER BY id; +-- Recover from multi-shard CTE modify failures +create table t1(a int, b int); +create table t2(a int, b int CHECK(b > 0)); + +ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1190000; + +select create_distributed_table('t1', 'a'), + create_distributed_table('t2', 'a'); + +begin; +insert into t2 select i, i+1 from generate_series(1, 3) i; +with r AS ( + update t2 set b = b + 1 + returning * +) insert into t1 select * from r; +savepoint s1; +with r AS ( + update t1 set b = b - 10 + returning * +) insert into t2 select * from r; +rollback to savepoint s1; +savepoint s2; +with r AS ( + update t2 set b = b - 10 + returning * +) insert into t1 select * from r; +rollback to savepoint s2; +savepoint s3; +with r AS ( + insert into t2 select i, i+1 from generate_series(-10,-5) i + returning * +) insert into t1 select * from r; +rollback to savepoint s3; +savepoint s4; +with r AS ( + insert into t1 select i, i+1 from generate_series(-10,-5) i + returning * +) insert into t2 select * from r; +rollback to savepoint s4; +with r AS ( + update t2 set b = b + 1 + returning * +) insert into t1 select * from r; +commit; + +select * from t2 order by a, b; +select * from t1 order by a, b; + +drop table t1, t2; -- =================================================================== -- Tests for replication factor > 1