From 6eeb43af15274019a527acdd777fcd4979312e71 Mon Sep 17 00:00:00 2001 From: Andres Freund Date: Thu, 8 Dec 2016 18:29:42 -0800 Subject: [PATCH] Add PQgetResult() wrapper handling interrupts. This makes it possible to implement cancelling queries blocked on communication with remote nodes. --- .../distributed/connection/remote_commands.c | 193 +++++++++++++++++- src/include/distributed/remote_commands.h | 4 + 2 files changed, 193 insertions(+), 4 deletions(-) diff --git a/src/backend/distributed/connection/remote_commands.c b/src/backend/distributed/connection/remote_commands.c index 22ba1c9f7..6c5658988 100644 --- a/src/backend/distributed/connection/remote_commands.c +++ b/src/backend/distributed/connection/remote_commands.c @@ -14,6 +14,8 @@ #include "distributed/connection_management.h" #include "distributed/remote_commands.h" +#include "miscadmin.h" +#include "storage/latch.h" /* GUC, determining whether statements sent to remote nodes are logged */ @@ -52,7 +54,9 @@ ForgetResults(MultiConnection *connection) while (true) { PGresult *result = NULL; - result = PQgetResult(connection->pgConn); + const bool dontRaiseErrors = false; + + result = GetRemoteCommandResult(connection, dontRaiseErrors); if (result == NULL) { break; @@ -184,12 +188,193 @@ LogRemoteCommand(MultiConnection *connection, const char *command) /* wrappers around libpq functions, with command logging support */ /* - * SendRemoteCommand is a tiny PQsendQuery wrapper that logs remote commands, - * and accepts a MultiConnection instead of a plain PGconn. + * SendRemoteCommand is a PQsendQuery wrapper that logs remote commands, and + * accepts a MultiConnection instead of a plain PGconn. It makes sure it can + * send commands asynchronously without blocking (at the potential expense of + * an additional memory allocation). */ int SendRemoteCommand(MultiConnection *connection, const char *command) { + PGconn *pgConn = connection->pgConn; + bool wasNonblocking = PQisnonblocking(pgConn); + int rc = 0; + LogRemoteCommand(connection, command); - return PQsendQuery(connection->pgConn, command); + + /* make sure not to block anywhere */ + if (!wasNonblocking) + { + PQsetnonblocking(pgConn, true); + } + + rc = PQsendQuery(pgConn, command); + + /* reset nonblocking connection to its original state */ + if (!wasNonblocking) + { + PQsetnonblocking(pgConn, false); + } + + return rc; +} + + +/* + * GetCommandResult is a wrapper around PQgetResult() that handles interrupts. + * + * If raiseInterrupts is true and an interrupt arrives, e.g. the query is + * being cancelled, CHECK_FOR_INTERRUPTS() will be called, which then throws + * an error. + * + * If raiseInterrupts is false and an interrupt arrives that'd otherwise raise + * an error, GetRemotecommandResult returns NULL, and the transaction is + * marked as having failed. While that's not a perfect way to signal failure, + * callers will usually treat that as an error, and it's easy to use. + * + * Handling of interrupts is important to allow queries being cancelled while + * waiting on remote nodes. In a distributed deadlock scenario cancelling + * might be the only way to resolve the deadlock. + */ +PGresult * +GetRemoteCommandResult(MultiConnection *connection, bool raiseInterrupts) +{ + PGconn *pgConn = connection->pgConn; + int socket = 0; + int waitFlags = WL_POSTMASTER_DEATH | WL_LATCH_SET; + bool wasNonblocking = false; + PGresult *result = NULL; + bool failed = false; + + /* short circuit tests around the more expensive parts of this routine */ + if (!PQisBusy(pgConn)) + { + return PQgetResult(connection->pgConn); + } + + socket = PQsocket(pgConn); + wasNonblocking = PQisnonblocking(pgConn); + + /* make sure not to block anywhere */ + if (!wasNonblocking) + { + PQsetnonblocking(pgConn, true); + } + + if (raiseInterrupts) + { + CHECK_FOR_INTERRUPTS(); + } + + /* make sure command has been sent out */ + while (!failed) + { + int rc = 0; + + ResetLatch(MyLatch); + + /* try to send all the data */ + rc = PQflush(pgConn); + + /* stop writing if all data has been sent, or there was none to send */ + if (rc == 0) + { + break; + } + + /* if sending failed, there's nothing more we can do */ + if (rc == -1) + { + failed = true; + break; + } + + /* this means we have to wait for data to go out */ + Assert(rc == 1); + + rc = WaitLatchOrSocket(MyLatch, waitFlags | WL_SOCKET_WRITEABLE, socket, 0); + + if (rc & WL_POSTMASTER_DEATH) + { + ereport(ERROR, (errmsg("postmaster was shut down, exiting"))); + } + + if (rc & WL_LATCH_SET) + { + /* if allowed raise errors */ + if (raiseInterrupts) + { + CHECK_FOR_INTERRUPTS(); + } + + /* + * If raising errors allowed, or called within in a section with + * interrupts held, return NULL instead, and mark the transaction + * as failed. + */ + if (InterruptHoldoffCount > 0 && (QueryCancelPending || ProcDiePending)) + { + connection->remoteTransaction.transactionFailed = true; + failed = true; + break; + } + } + } + + /* wait for the result of the command to come in */ + while (!failed) + { + int rc = 0; + + ResetLatch(MyLatch); + + /* if reading fails, there's not much we can do */ + if (PQconsumeInput(pgConn) == 0) + { + failed = true; + break; + } + + /* check if all the necessary data is now available */ + if (!PQisBusy(pgConn)) + { + result = PQgetResult(connection->pgConn); + break; + } + + rc = WaitLatchOrSocket(MyLatch, waitFlags | WL_SOCKET_READABLE, socket, 0); + + if (rc & WL_POSTMASTER_DEATH) + { + ereport(ERROR, (errmsg("postmaster was shut down, exiting"))); + } + + if (rc & WL_LATCH_SET) + { + /* if allowed raise errors */ + if (raiseInterrupts) + { + CHECK_FOR_INTERRUPTS(); + } + + /* + * If raising errors allowed, or called within in a section with + * interrupts held, return NULL instead, and mark the transaction + * as failed. + */ + if (InterruptHoldoffCount > 0 && (QueryCancelPending || ProcDiePending)) + { + connection->remoteTransaction.transactionFailed = true; + failed = true; + break; + } + } + } + + if (!wasNonblocking) + { + PQsetnonblocking(pgConn, false); + } + + return result; } diff --git a/src/include/distributed/remote_commands.h b/src/include/distributed/remote_commands.h index 6ce25ccfd..66430ea64 100644 --- a/src/include/distributed/remote_commands.h +++ b/src/include/distributed/remote_commands.h @@ -14,6 +14,8 @@ #include "distributed/connection_management.h" +struct pg_result; /* target of the PGresult typedef */ + /* GUC, determining whether statements sent to remote nodes are logged */ extern bool LogRemoteCommands; @@ -31,6 +33,8 @@ extern void LogRemoteCommand(MultiConnection *connection, const char *command); /* wrappers around libpq functions, with command logging support */ extern int SendRemoteCommand(MultiConnection *connection, const char *command); +extern struct pg_result * GetRemoteCommandResult(MultiConnection *connection, + bool raiseInterrupts); #endif /* REMOTE_COMMAND_H */