Always use connections in non-blocking mode.

Now that there's no blocking libpq callers left, default to using
non-blocking mode in connection_management.c.  This has two
advantages:
1) Blockiness doesn't have to frequently be reset, simplifying code
2) Prevents accidental use of blocking libpq functions, since they'll
   frequently return 'need IO'
pull/1650/head
Andres Freund 2017-06-30 18:20:54 -07:00 committed by Metin Doslu
parent a66ece75a0
commit f4e87bb717
2 changed files with 16 additions and 44 deletions

View File

@ -642,6 +642,13 @@ StartConnectionEstablishment(ConnectionHashKey *key)
connection->pgConn = PQconnectStartParams(keywords, values, false); connection->pgConn = PQconnectStartParams(keywords, values, false);
connection->connectionStart = GetCurrentTimestamp(); connection->connectionStart = GetCurrentTimestamp();
/*
* To avoid issues with interrupts not getting caught all our connections
* are managed in a non-blocking manner. remote_commands.c provides
* wrappers emulating blocking behaviour.
*/
PQsetnonblocking(connection->pgConn, true);
return connection; return connection;
} }

View File

@ -278,7 +278,6 @@ SendRemoteCommandParams(MultiConnection *connection, const char *command,
const char *const *parameterValues) const char *const *parameterValues)
{ {
PGconn *pgConn = connection->pgConn; PGconn *pgConn = connection->pgConn;
bool wasNonblocking = false;
int rc = 0; int rc = 0;
LogRemoteCommand(connection, command); LogRemoteCommand(connection, command);
@ -292,23 +291,11 @@ SendRemoteCommandParams(MultiConnection *connection, const char *command,
return 0; return 0;
} }
wasNonblocking = PQisnonblocking(pgConn); Assert(PQisnonblocking(pgConn));
/* make sure not to block anywhere */
if (!wasNonblocking)
{
PQsetnonblocking(pgConn, true);
}
rc = PQsendQueryParams(pgConn, command, parameterCount, parameterTypes, rc = PQsendQueryParams(pgConn, command, parameterCount, parameterTypes,
parameterValues, NULL, NULL, 0); parameterValues, NULL, NULL, 0);
/* reset nonblocking connection to its original state */
if (!wasNonblocking)
{
PQsetnonblocking(pgConn, false);
}
return rc; return rc;
} }
@ -346,7 +333,6 @@ PGresult *
GetRemoteCommandResult(MultiConnection *connection, bool raiseInterrupts) GetRemoteCommandResult(MultiConnection *connection, bool raiseInterrupts)
{ {
PGconn *pgConn = connection->pgConn; PGconn *pgConn = connection->pgConn;
bool wasNonblocking = false;
PGresult *result = NULL; PGresult *result = NULL;
/* /*
@ -359,11 +345,6 @@ GetRemoteCommandResult(MultiConnection *connection, bool raiseInterrupts)
return PQgetResult(connection->pgConn); return PQgetResult(connection->pgConn);
} }
wasNonblocking = PQisnonblocking(pgConn);
/* make sure not to block anywhere */
PQsetnonblocking(pgConn, true);
if (!FinishConnectionIO(connection, raiseInterrupts)) if (!FinishConnectionIO(connection, raiseInterrupts))
{ {
return NULL; return NULL;
@ -374,8 +355,6 @@ GetRemoteCommandResult(MultiConnection *connection, bool raiseInterrupts)
result = PQgetResult(connection->pgConn); result = PQgetResult(connection->pgConn);
PQsetnonblocking(pgConn, wasNonblocking);
return result; return result;
} }
@ -390,38 +369,31 @@ bool
PutRemoteCopyData(MultiConnection *connection, const char *buffer, int nbytes) PutRemoteCopyData(MultiConnection *connection, const char *buffer, int nbytes)
{ {
PGconn *pgConn = connection->pgConn; PGconn *pgConn = connection->pgConn;
bool wasNonblocking = false;
int copyState = 0; int copyState = 0;
bool success = false;
if (PQstatus(pgConn) != CONNECTION_OK) if (PQstatus(pgConn) != CONNECTION_OK)
{ {
return false; return false;
} }
wasNonblocking = PQisnonblocking(pgConn); Assert(PQisnonblocking(pgConn));
PQsetnonblocking(pgConn, true);
copyState = PQputCopyData(pgConn, buffer, nbytes); copyState = PQputCopyData(pgConn, buffer, nbytes);
if (copyState == 1) if (copyState == 1)
{ {
/* successful */ /* successful */
success = true; return true;
} }
else if (copyState == -1) else if (copyState == -1)
{ {
success = false; return false;
} }
else else
{ {
bool allowInterrupts = true; bool allowInterrupts = true;
success = FinishConnectionIO(connection, allowInterrupts); return FinishConnectionIO(connection, allowInterrupts);
} }
PQsetnonblocking(pgConn, wasNonblocking);
return success;
} }
@ -435,38 +407,31 @@ bool
PutRemoteCopyEnd(MultiConnection *connection, const char *errormsg) PutRemoteCopyEnd(MultiConnection *connection, const char *errormsg)
{ {
PGconn *pgConn = connection->pgConn; PGconn *pgConn = connection->pgConn;
bool wasNonblocking = false;
int copyState = 0; int copyState = 0;
bool success = false;
if (PQstatus(pgConn) != CONNECTION_OK) if (PQstatus(pgConn) != CONNECTION_OK)
{ {
return false; return false;
} }
wasNonblocking = PQisnonblocking(pgConn); Assert(PQisnonblocking(pgConn));
PQsetnonblocking(pgConn, true);
copyState = PQputCopyEnd(pgConn, errormsg); copyState = PQputCopyEnd(pgConn, errormsg);
if (copyState == 1) if (copyState == 1)
{ {
/* successful */ /* successful */
success = true; return true;
} }
else if (copyState == -1) else if (copyState == -1)
{ {
success = false; return false;
} }
else else
{ {
bool allowInterrupts = true; bool allowInterrupts = true;
success = FinishConnectionIO(connection, allowInterrupts); return FinishConnectionIO(connection, allowInterrupts);
} }
PQsetnonblocking(pgConn, wasNonblocking);
return success;
} }