diff --git a/src/backend/distributed/connection/connection_management.c b/src/backend/distributed/connection/connection_management.c index 7a7043111..a6e23bb5b 100644 --- a/src/backend/distributed/connection/connection_management.c +++ b/src/backend/distributed/connection/connection_management.c @@ -464,7 +464,7 @@ ShutdownConnection(MultiConnection *connection) if (PQstatus(connection->pgConn) == CONNECTION_OK && PQtransactionStatus(connection->pgConn) == PQTRANS_ACTIVE) { - char errorMessage[256] = { 0 }; + char errorMessage[ERROR_BUFFER_SIZE] = { 0 }; PGcancel *cancel = PQgetCancel(connection->pgConn); if (!PQcancel(cancel, errorMessage, sizeof(errorMessage))) diff --git a/src/backend/distributed/connection/remote_commands.c b/src/backend/distributed/connection/remote_commands.c index 6f5132c21..dbce8e070 100644 --- a/src/backend/distributed/connection/remote_commands.c +++ b/src/backend/distributed/connection/remote_commands.c @@ -1008,3 +1008,26 @@ BuildWaitEventSet(MultiConnection **allConnections, int totalConnectionCount, return waitEventSet; } + + +/* + * MultiClientCancel sends a cancelation request on the given connection. Return + * value indicates whether the cancelation request was sent successfully. + */ +bool +SendCancelationRequest(MultiConnection *connection) +{ + char errorBuffer[ERROR_BUFFER_SIZE] = { 0 }; + PGcancel *cancelObject = PQgetCancel(connection->pgConn); + + bool cancelSent = PQcancel(cancelObject, errorBuffer, sizeof(errorBuffer)); + if (!cancelSent) + { + ereport(WARNING, (errmsg("could not issue cancel request"), + errdetail("Client error: %s", errorBuffer))); + } + + PQfreeCancel(cancelObject); + + return cancelSent; +} diff --git a/src/backend/distributed/executor/multi_client_executor.c b/src/backend/distributed/executor/multi_client_executor.c index 907a37a1c..029152319 100644 --- a/src/backend/distributed/executor/multi_client_executor.c +++ b/src/backend/distributed/executor/multi_client_executor.c @@ -443,27 +443,13 @@ bool MultiClientCancel(int32 connectionId) { MultiConnection *connection = NULL; - PGcancel *cancelObject = NULL; - int cancelSent = 0; bool canceled = true; - char errorBuffer[STRING_BUFFER_SIZE]; Assert(connectionId != INVALID_CONNECTION_ID); connection = ClientConnectionArray[connectionId]; Assert(connection != NULL); - cancelObject = PQgetCancel(connection->pgConn); - - cancelSent = PQcancel(cancelObject, errorBuffer, sizeof(errorBuffer)); - if (cancelSent == 0) - { - ereport(WARNING, (errmsg("could not issue cancel request"), - errdetail("Client error: %s", errorBuffer))); - - canceled = false; - } - - PQfreeCancel(cancelObject); + canceled = SendCancelationRequest(connection); return canceled; } diff --git a/src/backend/distributed/executor/multi_router_executor.c b/src/backend/distributed/executor/multi_router_executor.c index 25ea7c921..8bdb8f4f5 100644 --- a/src/backend/distributed/executor/multi_router_executor.c +++ b/src/backend/distributed/executor/multi_router_executor.c @@ -1574,6 +1574,7 @@ ExecuteModifyTasks(List *taskList, bool expectResults, ParamListInfo paramListIn queryOK = SendQueryInSingleRowMode(connection, queryString, paramListInfo); if (!queryOK) { + UnclaimAllShardConnections(shardConnectionHash); ReportConnectionError(connection, ERROR); } } @@ -1616,28 +1617,38 @@ ExecuteModifyTasks(List *taskList, bool expectResults, ParamListInfo paramListIn SetCitusNoticeLevel(INFO); } - /* - * If caller is interested, store query results the first time - * through. The output of the query's execution on other shards is - * discarded if we run there (because it's a modification query). - */ - if (placementIndex == 0 && expectResults) + PG_TRY(); { - Assert(scanState != NULL); + /* + * If caller is interested, store query results the first time + * through. The output of the query's execution on other shards is + * discarded if we run there (because it's a modification query). + */ + if (placementIndex == 0 && expectResults) + { + Assert(scanState != NULL); - queryOK = StoreQueryResult(scanState, connection, - alwaysThrowErrorOnFailure, - ¤tAffectedTupleCount, NULL); + queryOK = StoreQueryResult(scanState, connection, + alwaysThrowErrorOnFailure, + ¤tAffectedTupleCount, NULL); + } + else + { + queryOK = ConsumeQueryResult(connection, alwaysThrowErrorOnFailure, + ¤tAffectedTupleCount); + } } - else + PG_CATCH(); { - queryOK = ConsumeQueryResult(connection, alwaysThrowErrorOnFailure, - ¤tAffectedTupleCount); + UnclaimAllShardConnections(shardConnectionHash); + PG_RE_THROW(); } + PG_END_TRY(); /* We error out if the worker fails to return a result for the query. */ if (!queryOK) { + UnclaimAllShardConnections(shardConnectionHash); ReportConnectionError(connection, ERROR); } diff --git a/src/backend/distributed/transaction/remote_transaction.c b/src/backend/distributed/transaction/remote_transaction.c index f9028c379..e49ba9e6d 100644 --- a/src/backend/distributed/transaction/remote_transaction.c +++ b/src/backend/distributed/transaction/remote_transaction.c @@ -1068,6 +1068,12 @@ CoordinatedRemoteTransactionsSavepointRollback(SubTransactionId subId) MultiConnection *connection = dlist_container(MultiConnection, transactionNode, iter.cur); RemoteTransaction *transaction = &connection->remoteTransaction; + + /* cancel any ongoing queries before issuing rollback */ + ClearResultsIfReady(connection); + SendCancelationRequest(connection); + ForgetResults(connection); + if (transaction->transactionFailed) { if (transaction->lastSuccessfulSubXact <= subId) diff --git a/src/include/distributed/connection_management.h b/src/include/distributed/connection_management.h index b6b6b0f1e..1cbf8719e 100644 --- a/src/include/distributed/connection_management.h +++ b/src/include/distributed/connection_management.h @@ -21,6 +21,9 @@ /* maximum (textual) lengths of hostname and port */ #define MAX_NODE_LENGTH 255 /* includes 0 byte */ +/* used for libpq commands that get an error buffer. Postgres docs recommend 256. */ +#define ERROR_BUFFER_SIZE 256 + /* default notice level */ #define DEFAULT_CITUS_NOTICE_LEVEL DEBUG1 diff --git a/src/include/distributed/remote_commands.h b/src/include/distributed/remote_commands.h index 9a0ad75be..424cb00a7 100644 --- a/src/include/distributed/remote_commands.h +++ b/src/include/distributed/remote_commands.h @@ -57,5 +57,6 @@ extern bool PutRemoteCopyEnd(MultiConnection *connection, const char *errormsg); /* waiting for multiple command results */ extern void WaitForAllConnections(List *connectionList, bool raiseInterrupts); +extern bool SendCancelationRequest(MultiConnection *connection); #endif /* REMOTE_COMMAND_H */ diff --git a/src/test/regress/expected/failure_savepoints.out b/src/test/regress/expected/failure_savepoints.out index ca8a968b1..357ce6118 100644 --- a/src/test/regress/expected/failure_savepoints.out +++ b/src/test/regress/expected/failure_savepoints.out @@ -38,6 +38,10 @@ CONTEXT: while executing command on localhost:9060 WARNING: connection not open CONTEXT: while executing command on localhost:9060 DELETE FROM artists WHERE id=4; +WARNING: could not issue cancel request +DETAIL: Client error: PQcancel() -- no cancel object supplied +WARNING: connection not open +CONTEXT: while executing command on localhost:9060 WARNING: connection not open CONTEXT: while executing command on localhost:9060 WARNING: connection error: localhost:9060 @@ -195,6 +199,8 @@ INSERT INTO artists VALUES (7, 'Emily Carr'); ROLLBACK TO SAVEPOINT s1; WARNING: connection not open WARNING: connection not open +WARNING: could not issue cancel request +WARNING: connection not open COMMIT; ERROR: could not make changes to shard 100950 on any node SELECT * FROM artists WHERE id=6; @@ -230,6 +236,8 @@ WARNING: connection not open WARNING: connection not open INSERT INTO researchers VALUES (8, 4, 'Alonzo Church'); ROLLBACK TO s1; +WARNING: could not issue cancel request +WARNING: connection not open WARNING: connection not open WARNING: connection error: localhost:9060 WARNING: connection not open diff --git a/src/test/regress/expected/multi_subtransactions.out b/src/test/regress/expected/multi_subtransactions.out index d28d62bce..5f677d7d8 100644 --- a/src/test/regress/expected/multi_subtransactions.out +++ b/src/test/regress/expected/multi_subtransactions.out @@ -129,6 +129,28 @@ ERROR: current transaction is aborted, commands ignored until end of transactio ROLLBACK TO SAVEPOINT s3; ERROR: savepoint "s3" does not exist COMMIT; +-- Recover from multi-shard modify errors +BEGIN; +INSERT INTO artists VALUES (8, 'Uncle Yaakov'); +SAVEPOINT s1; +UPDATE artists SET name = NULL; +ERROR: null value in column "name" violates not-null constraint +ROLLBACK TO s1; +INSERT INTO artists VALUES (9, 'Anna Schaeffer'); +COMMIT; +SELECT * FROM artists ORDER BY id; + id | name +----+------------------ + 1 | Pablo Picasso + 2 | Vincent van Gogh + 3 | Claude Monet + 4 | William Kurelek + 5 | Jacob Kahn + 6 | Emily Carr + 8 | Uncle Yaakov + 9 | Anna Schaeffer +(8 rows) + -- =================================================================== -- Tests for replication factor > 1 -- =================================================================== diff --git a/src/test/regress/expected/multi_subtransactions_0.out b/src/test/regress/expected/multi_subtransactions_0.out index 74765de10..e03a667d6 100644 --- a/src/test/regress/expected/multi_subtransactions_0.out +++ b/src/test/regress/expected/multi_subtransactions_0.out @@ -129,6 +129,28 @@ ERROR: current transaction is aborted, commands ignored until end of transactio ROLLBACK TO SAVEPOINT s3; ERROR: no such savepoint COMMIT; +-- Recover from multi-shard modify errors +BEGIN; +INSERT INTO artists VALUES (8, 'Uncle Yaakov'); +SAVEPOINT s1; +UPDATE artists SET name = NULL; +ERROR: null value in column "name" violates not-null constraint +ROLLBACK TO s1; +INSERT INTO artists VALUES (9, 'Anna Schaeffer'); +COMMIT; +SELECT * FROM artists ORDER BY id; + id | name +----+------------------ + 1 | Pablo Picasso + 2 | Vincent van Gogh + 3 | Claude Monet + 4 | William Kurelek + 5 | Jacob Kahn + 6 | Emily Carr + 8 | Uncle Yaakov + 9 | Anna Schaeffer +(8 rows) + -- =================================================================== -- Tests for replication factor > 1 -- =================================================================== diff --git a/src/test/regress/sql/multi_subtransactions.sql b/src/test/regress/sql/multi_subtransactions.sql index 5865aa467..992e23824 100644 --- a/src/test/regress/sql/multi_subtransactions.sql +++ b/src/test/regress/sql/multi_subtransactions.sql @@ -105,6 +105,17 @@ SAVEPOINT s3; ROLLBACK TO SAVEPOINT s3; COMMIT; +-- Recover from multi-shard modify errors +BEGIN; +INSERT INTO artists VALUES (8, 'Uncle Yaakov'); +SAVEPOINT s1; +UPDATE artists SET name = NULL; +ROLLBACK TO s1; +INSERT INTO artists VALUES (9, 'Anna Schaeffer'); +COMMIT; + +SELECT * FROM artists ORDER BY id; + -- =================================================================== -- Tests for replication factor > 1 -- =================================================================== @@ -216,3 +227,4 @@ SELECT * FROM researchers WHERE lab_id=10; -- Clean-up DROP TABLE artists; DROP TABLE researchers; +