Add PQgetResult() wrapper handling interrupts.

This makes it possible to implement cancelling queries blocked on
communication with remote nodes.
pull/1020/head
Andres Freund 2016-12-08 18:29:42 -08:00
parent 5a00de6c62
commit 6eeb43af15
2 changed files with 193 additions and 4 deletions

View File

@ -14,6 +14,8 @@
#include "distributed/connection_management.h" #include "distributed/connection_management.h"
#include "distributed/remote_commands.h" #include "distributed/remote_commands.h"
#include "miscadmin.h"
#include "storage/latch.h"
/* GUC, determining whether statements sent to remote nodes are logged */ /* GUC, determining whether statements sent to remote nodes are logged */
@ -52,7 +54,9 @@ ForgetResults(MultiConnection *connection)
while (true) while (true)
{ {
PGresult *result = NULL; PGresult *result = NULL;
result = PQgetResult(connection->pgConn); const bool dontRaiseErrors = false;
result = GetRemoteCommandResult(connection, dontRaiseErrors);
if (result == NULL) if (result == NULL)
{ {
break; break;
@ -184,12 +188,193 @@ LogRemoteCommand(MultiConnection *connection, const char *command)
/* wrappers around libpq functions, with command logging support */ /* wrappers around libpq functions, with command logging support */
/* /*
* SendRemoteCommand is a tiny PQsendQuery wrapper that logs remote commands, * SendRemoteCommand is a PQsendQuery wrapper that logs remote commands, and
* and accepts a MultiConnection instead of a plain PGconn. * accepts a MultiConnection instead of a plain PGconn. It makes sure it can
* send commands asynchronously without blocking (at the potential expense of
* an additional memory allocation).
*/ */
int int
SendRemoteCommand(MultiConnection *connection, const char *command) SendRemoteCommand(MultiConnection *connection, const char *command)
{ {
PGconn *pgConn = connection->pgConn;
bool wasNonblocking = PQisnonblocking(pgConn);
int rc = 0;
LogRemoteCommand(connection, command); LogRemoteCommand(connection, command);
return PQsendQuery(connection->pgConn, command);
/* make sure not to block anywhere */
if (!wasNonblocking)
{
PQsetnonblocking(pgConn, true);
}
rc = PQsendQuery(pgConn, command);
/* reset nonblocking connection to its original state */
if (!wasNonblocking)
{
PQsetnonblocking(pgConn, false);
}
return rc;
}
/*
* GetCommandResult is a wrapper around PQgetResult() that handles interrupts.
*
* If raiseInterrupts is true and an interrupt arrives, e.g. the query is
* being cancelled, CHECK_FOR_INTERRUPTS() will be called, which then throws
* an error.
*
* If raiseInterrupts is false and an interrupt arrives that'd otherwise raise
* an error, GetRemotecommandResult returns NULL, and the transaction is
* marked as having failed. While that's not a perfect way to signal failure,
* callers will usually treat that as an error, and it's easy to use.
*
* Handling of interrupts is important to allow queries being cancelled while
* waiting on remote nodes. In a distributed deadlock scenario cancelling
* might be the only way to resolve the deadlock.
*/
PGresult *
GetRemoteCommandResult(MultiConnection *connection, bool raiseInterrupts)
{
PGconn *pgConn = connection->pgConn;
int socket = 0;
int waitFlags = WL_POSTMASTER_DEATH | WL_LATCH_SET;
bool wasNonblocking = false;
PGresult *result = NULL;
bool failed = false;
/* short circuit tests around the more expensive parts of this routine */
if (!PQisBusy(pgConn))
{
return PQgetResult(connection->pgConn);
}
socket = PQsocket(pgConn);
wasNonblocking = PQisnonblocking(pgConn);
/* make sure not to block anywhere */
if (!wasNonblocking)
{
PQsetnonblocking(pgConn, true);
}
if (raiseInterrupts)
{
CHECK_FOR_INTERRUPTS();
}
/* make sure command has been sent out */
while (!failed)
{
int rc = 0;
ResetLatch(MyLatch);
/* try to send all the data */
rc = PQflush(pgConn);
/* stop writing if all data has been sent, or there was none to send */
if (rc == 0)
{
break;
}
/* if sending failed, there's nothing more we can do */
if (rc == -1)
{
failed = true;
break;
}
/* this means we have to wait for data to go out */
Assert(rc == 1);
rc = WaitLatchOrSocket(MyLatch, waitFlags | WL_SOCKET_WRITEABLE, socket, 0);
if (rc & WL_POSTMASTER_DEATH)
{
ereport(ERROR, (errmsg("postmaster was shut down, exiting")));
}
if (rc & WL_LATCH_SET)
{
/* if allowed raise errors */
if (raiseInterrupts)
{
CHECK_FOR_INTERRUPTS();
}
/*
* If raising errors allowed, or called within in a section with
* interrupts held, return NULL instead, and mark the transaction
* as failed.
*/
if (InterruptHoldoffCount > 0 && (QueryCancelPending || ProcDiePending))
{
connection->remoteTransaction.transactionFailed = true;
failed = true;
break;
}
}
}
/* wait for the result of the command to come in */
while (!failed)
{
int rc = 0;
ResetLatch(MyLatch);
/* if reading fails, there's not much we can do */
if (PQconsumeInput(pgConn) == 0)
{
failed = true;
break;
}
/* check if all the necessary data is now available */
if (!PQisBusy(pgConn))
{
result = PQgetResult(connection->pgConn);
break;
}
rc = WaitLatchOrSocket(MyLatch, waitFlags | WL_SOCKET_READABLE, socket, 0);
if (rc & WL_POSTMASTER_DEATH)
{
ereport(ERROR, (errmsg("postmaster was shut down, exiting")));
}
if (rc & WL_LATCH_SET)
{
/* if allowed raise errors */
if (raiseInterrupts)
{
CHECK_FOR_INTERRUPTS();
}
/*
* If raising errors allowed, or called within in a section with
* interrupts held, return NULL instead, and mark the transaction
* as failed.
*/
if (InterruptHoldoffCount > 0 && (QueryCancelPending || ProcDiePending))
{
connection->remoteTransaction.transactionFailed = true;
failed = true;
break;
}
}
}
if (!wasNonblocking)
{
PQsetnonblocking(pgConn, false);
}
return result;
} }

View File

@ -14,6 +14,8 @@
#include "distributed/connection_management.h" #include "distributed/connection_management.h"
struct pg_result; /* target of the PGresult typedef */
/* GUC, determining whether statements sent to remote nodes are logged */ /* GUC, determining whether statements sent to remote nodes are logged */
extern bool LogRemoteCommands; extern bool LogRemoteCommands;
@ -31,6 +33,8 @@ extern void LogRemoteCommand(MultiConnection *connection, const char *command);
/* wrappers around libpq functions, with command logging support */ /* wrappers around libpq functions, with command logging support */
extern int SendRemoteCommand(MultiConnection *connection, const char *command); extern int SendRemoteCommand(MultiConnection *connection, const char *command);
extern struct pg_result * GetRemoteCommandResult(MultiConnection *connection,
bool raiseInterrupts);
#endif /* REMOTE_COMMAND_H */ #endif /* REMOTE_COMMAND_H */