diff --git a/src/backend/distributed/connection/connection_management.c b/src/backend/distributed/connection/connection_management.c index 724a6ec39..4a97e28ec 100644 --- a/src/backend/distributed/connection/connection_management.c +++ b/src/backend/distributed/connection/connection_management.c @@ -642,6 +642,13 @@ StartConnectionEstablishment(ConnectionHashKey *key) connection->pgConn = PQconnectStartParams(keywords, values, false); connection->connectionStart = GetCurrentTimestamp(); + /* + * To avoid issues with interrupts not getting caught all our connections + * are managed in a non-blocking manner. remote_commands.c provides + * wrappers emulating blocking behaviour. + */ + PQsetnonblocking(connection->pgConn, true); + return connection; } diff --git a/src/backend/distributed/connection/remote_commands.c b/src/backend/distributed/connection/remote_commands.c index a118994ff..7a8da97b8 100644 --- a/src/backend/distributed/connection/remote_commands.c +++ b/src/backend/distributed/connection/remote_commands.c @@ -278,7 +278,6 @@ SendRemoteCommandParams(MultiConnection *connection, const char *command, const char *const *parameterValues) { PGconn *pgConn = connection->pgConn; - bool wasNonblocking = false; int rc = 0; LogRemoteCommand(connection, command); @@ -292,23 +291,11 @@ SendRemoteCommandParams(MultiConnection *connection, const char *command, return 0; } - wasNonblocking = PQisnonblocking(pgConn); - - /* make sure not to block anywhere */ - if (!wasNonblocking) - { - PQsetnonblocking(pgConn, true); - } + Assert(PQisnonblocking(pgConn)); rc = PQsendQueryParams(pgConn, command, parameterCount, parameterTypes, parameterValues, NULL, NULL, 0); - /* reset nonblocking connection to its original state */ - if (!wasNonblocking) - { - PQsetnonblocking(pgConn, false); - } - return rc; } @@ -346,7 +333,6 @@ PGresult * GetRemoteCommandResult(MultiConnection *connection, bool raiseInterrupts) { PGconn *pgConn = connection->pgConn; - bool wasNonblocking = false; PGresult *result = NULL; /* @@ -359,11 +345,6 @@ GetRemoteCommandResult(MultiConnection *connection, bool raiseInterrupts) return PQgetResult(connection->pgConn); } - wasNonblocking = PQisnonblocking(pgConn); - - /* make sure not to block anywhere */ - PQsetnonblocking(pgConn, true); - if (!FinishConnectionIO(connection, raiseInterrupts)) { return NULL; @@ -374,8 +355,6 @@ GetRemoteCommandResult(MultiConnection *connection, bool raiseInterrupts) result = PQgetResult(connection->pgConn); - PQsetnonblocking(pgConn, wasNonblocking); - return result; } @@ -390,38 +369,31 @@ bool PutRemoteCopyData(MultiConnection *connection, const char *buffer, int nbytes) { PGconn *pgConn = connection->pgConn; - bool wasNonblocking = false; int copyState = 0; - bool success = false; if (PQstatus(pgConn) != CONNECTION_OK) { return false; } - wasNonblocking = PQisnonblocking(pgConn); - - PQsetnonblocking(pgConn, true); + Assert(PQisnonblocking(pgConn)); copyState = PQputCopyData(pgConn, buffer, nbytes); if (copyState == 1) { /* successful */ - success = true; + return true; } else if (copyState == -1) { - success = false; + return false; } else { bool allowInterrupts = true; - success = FinishConnectionIO(connection, allowInterrupts); + return FinishConnectionIO(connection, allowInterrupts); } - - PQsetnonblocking(pgConn, wasNonblocking); - return success; } @@ -435,38 +407,31 @@ bool PutRemoteCopyEnd(MultiConnection *connection, const char *errormsg) { PGconn *pgConn = connection->pgConn; - bool wasNonblocking = false; int copyState = 0; - bool success = false; if (PQstatus(pgConn) != CONNECTION_OK) { return false; } - wasNonblocking = PQisnonblocking(pgConn); - - PQsetnonblocking(pgConn, true); + Assert(PQisnonblocking(pgConn)); copyState = PQputCopyEnd(pgConn, errormsg); if (copyState == 1) { /* successful */ - success = true; + return true; } else if (copyState == -1) { - success = false; + return false; } else { bool allowInterrupts = true; - success = FinishConnectionIO(connection, allowInterrupts); + return FinishConnectionIO(connection, allowInterrupts); } - - PQsetnonblocking(pgConn, wasNonblocking); - return success; }