From f4e87bb717bcc021d940f76df9518fa1feab8030 Mon Sep 17 00:00:00 2001 From: Andres Freund Date: Fri, 30 Jun 2017 18:20:54 -0700 Subject: [PATCH] Always use connections in non-blocking mode. Now that there's no blocking libpq callers left, default to using non-blocking mode in connection_management.c. This has two advantages: 1) Blockiness doesn't have to frequently be reset, simplifying code 2) Prevents accidental use of blocking libpq functions, since they'll frequently return 'need IO' --- .../connection/connection_management.c | 7 +++ .../distributed/connection/remote_commands.c | 53 ++++--------------- 2 files changed, 16 insertions(+), 44 deletions(-) diff --git a/src/backend/distributed/connection/connection_management.c b/src/backend/distributed/connection/connection_management.c index 724a6ec39..4a97e28ec 100644 --- a/src/backend/distributed/connection/connection_management.c +++ b/src/backend/distributed/connection/connection_management.c @@ -642,6 +642,13 @@ StartConnectionEstablishment(ConnectionHashKey *key) connection->pgConn = PQconnectStartParams(keywords, values, false); connection->connectionStart = GetCurrentTimestamp(); + /* + * To avoid issues with interrupts not getting caught all our connections + * are managed in a non-blocking manner. remote_commands.c provides + * wrappers emulating blocking behaviour. + */ + PQsetnonblocking(connection->pgConn, true); + return connection; } diff --git a/src/backend/distributed/connection/remote_commands.c b/src/backend/distributed/connection/remote_commands.c index a118994ff..7a8da97b8 100644 --- a/src/backend/distributed/connection/remote_commands.c +++ b/src/backend/distributed/connection/remote_commands.c @@ -278,7 +278,6 @@ SendRemoteCommandParams(MultiConnection *connection, const char *command, const char *const *parameterValues) { PGconn *pgConn = connection->pgConn; - bool wasNonblocking = false; int rc = 0; LogRemoteCommand(connection, command); @@ -292,23 +291,11 @@ SendRemoteCommandParams(MultiConnection *connection, const char *command, return 0; } - wasNonblocking = PQisnonblocking(pgConn); - - /* make sure not to block anywhere */ - if (!wasNonblocking) - { - PQsetnonblocking(pgConn, true); - } + Assert(PQisnonblocking(pgConn)); rc = PQsendQueryParams(pgConn, command, parameterCount, parameterTypes, parameterValues, NULL, NULL, 0); - /* reset nonblocking connection to its original state */ - if (!wasNonblocking) - { - PQsetnonblocking(pgConn, false); - } - return rc; } @@ -346,7 +333,6 @@ PGresult * GetRemoteCommandResult(MultiConnection *connection, bool raiseInterrupts) { PGconn *pgConn = connection->pgConn; - bool wasNonblocking = false; PGresult *result = NULL; /* @@ -359,11 +345,6 @@ GetRemoteCommandResult(MultiConnection *connection, bool raiseInterrupts) return PQgetResult(connection->pgConn); } - wasNonblocking = PQisnonblocking(pgConn); - - /* make sure not to block anywhere */ - PQsetnonblocking(pgConn, true); - if (!FinishConnectionIO(connection, raiseInterrupts)) { return NULL; @@ -374,8 +355,6 @@ GetRemoteCommandResult(MultiConnection *connection, bool raiseInterrupts) result = PQgetResult(connection->pgConn); - PQsetnonblocking(pgConn, wasNonblocking); - return result; } @@ -390,38 +369,31 @@ bool PutRemoteCopyData(MultiConnection *connection, const char *buffer, int nbytes) { PGconn *pgConn = connection->pgConn; - bool wasNonblocking = false; int copyState = 0; - bool success = false; if (PQstatus(pgConn) != CONNECTION_OK) { return false; } - wasNonblocking = PQisnonblocking(pgConn); - - PQsetnonblocking(pgConn, true); + Assert(PQisnonblocking(pgConn)); copyState = PQputCopyData(pgConn, buffer, nbytes); if (copyState == 1) { /* successful */ - success = true; + return true; } else if (copyState == -1) { - success = false; + return false; } else { bool allowInterrupts = true; - success = FinishConnectionIO(connection, allowInterrupts); + return FinishConnectionIO(connection, allowInterrupts); } - - PQsetnonblocking(pgConn, wasNonblocking); - return success; } @@ -435,38 +407,31 @@ bool PutRemoteCopyEnd(MultiConnection *connection, const char *errormsg) { PGconn *pgConn = connection->pgConn; - bool wasNonblocking = false; int copyState = 0; - bool success = false; if (PQstatus(pgConn) != CONNECTION_OK) { return false; } - wasNonblocking = PQisnonblocking(pgConn); - - PQsetnonblocking(pgConn, true); + Assert(PQisnonblocking(pgConn)); copyState = PQputCopyEnd(pgConn, errormsg); if (copyState == 1) { /* successful */ - success = true; + return true; } else if (copyState == -1) { - success = false; + return false; } else { bool allowInterrupts = true; - success = FinishConnectionIO(connection, allowInterrupts); + return FinishConnectionIO(connection, allowInterrupts); } - - PQsetnonblocking(pgConn, wasNonblocking); - return success; }