diff --git a/src/backend/distributed/connection/connection_management.c b/src/backend/distributed/connection/connection_management.c index aff9c2f58..74eb0aece 100644 --- a/src/backend/distributed/connection/connection_management.c +++ b/src/backend/distributed/connection/connection_management.c @@ -640,6 +640,13 @@ StartConnectionEstablishment(ConnectionHashKey *key) connection->pgConn = PQconnectStartParams(keywords, values, false); connection->connectionStart = GetCurrentTimestamp(); + /* + * To avoid issues with interrupts not getting caught all our connections + * are managed in a non-blocking manner. remote_commands.c provides + * wrappers emulating blocking behaviour. + */ + PQsetnonblocking(connection->pgConn, true); + return connection; } diff --git a/src/backend/distributed/connection/remote_commands.c b/src/backend/distributed/connection/remote_commands.c index 8e3dcc669..186ca8f50 100644 --- a/src/backend/distributed/connection/remote_commands.c +++ b/src/backend/distributed/connection/remote_commands.c @@ -280,7 +280,6 @@ SendRemoteCommandParams(MultiConnection *connection, const char *command, const char *const *parameterValues) { PGconn *pgConn = connection->pgConn; - bool wasNonblocking = false; int rc = 0; LogRemoteCommand(connection, command); @@ -294,23 +293,11 @@ SendRemoteCommandParams(MultiConnection *connection, const char *command, return 0; } - wasNonblocking = PQisnonblocking(pgConn); - - /* make sure not to block anywhere */ - if (!wasNonblocking) - { - PQsetnonblocking(pgConn, true); - } + Assert(PQisnonblocking(pgConn)); rc = PQsendQueryParams(pgConn, command, parameterCount, parameterTypes, parameterValues, NULL, NULL, 0); - /* reset nonblocking connection to its original state */ - if (!wasNonblocking) - { - PQsetnonblocking(pgConn, false); - } - return rc; } @@ -380,7 +367,6 @@ PGresult * GetRemoteCommandResult(MultiConnection *connection, bool raiseInterrupts) { PGconn *pgConn = connection->pgConn; - bool wasNonblocking = false; PGresult *result = NULL; /* @@ -393,11 +379,6 @@ GetRemoteCommandResult(MultiConnection *connection, bool raiseInterrupts) return PQgetResult(connection->pgConn); } - wasNonblocking = PQisnonblocking(pgConn); - - /* make sure not to block anywhere */ - PQsetnonblocking(pgConn, true); - if (!FinishConnectionIO(connection, raiseInterrupts)) { return NULL; @@ -408,8 +389,6 @@ GetRemoteCommandResult(MultiConnection *connection, bool raiseInterrupts) result = PQgetResult(connection->pgConn); - PQsetnonblocking(pgConn, wasNonblocking); - return result; } @@ -424,38 +403,31 @@ bool PutRemoteCopyData(MultiConnection *connection, const char *buffer, int nbytes) { PGconn *pgConn = connection->pgConn; - bool wasNonblocking = false; int copyState = 0; - bool success = false; if (PQstatus(pgConn) != CONNECTION_OK) { return false; } - wasNonblocking = PQisnonblocking(pgConn); - - PQsetnonblocking(pgConn, true); + Assert(PQisnonblocking(pgConn)); copyState = PQputCopyData(pgConn, buffer, nbytes); if (copyState == 1) { /* successful */ - success = true; + return true; } else if (copyState == -1) { - success = false; + return false; } else { bool allowInterrupts = true; - success = FinishConnectionIO(connection, allowInterrupts); + return FinishConnectionIO(connection, allowInterrupts); } - - PQsetnonblocking(pgConn, wasNonblocking); - return success; } @@ -469,38 +441,31 @@ bool PutRemoteCopyEnd(MultiConnection *connection, const char *errormsg) { PGconn *pgConn = connection->pgConn; - bool wasNonblocking = false; int copyState = 0; - bool success = false; if (PQstatus(pgConn) != CONNECTION_OK) { return false; } - wasNonblocking = PQisnonblocking(pgConn); - - PQsetnonblocking(pgConn, true); + Assert(PQisnonblocking(pgConn)); copyState = PQputCopyEnd(pgConn, errormsg); if (copyState == 1) { /* successful */ - success = true; + return true; } else if (copyState == -1) { - success = false; + return false; } else { bool allowInterrupts = true; - success = FinishConnectionIO(connection, allowInterrupts); + return FinishConnectionIO(connection, allowInterrupts); } - - PQsetnonblocking(pgConn, wasNonblocking); - return success; }