Merge pull request #1737 from citusdata/fix_writeability_checks

Fix issues in WaitForAllConnections
pull/1743/head
Marco Slot 2017-10-31 13:16:06 +01:00 committed by GitHub
commit bc3bdeaac8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 44 additions and 12 deletions

View File

@ -831,7 +831,6 @@ WaitForAllConnections(List *connectionList, bool raiseInterrupts)
} }
connection = (MultiConnection *) event->user_data; connection = (MultiConnection *) event->user_data;
connectionIndex = event->pos + pendingConnectionsStartIndex;
if (event->events & WL_SOCKET_WRITEABLE) if (event->events & WL_SOCKET_WRITEABLE)
{ {
@ -844,12 +843,18 @@ WaitForAllConnections(List *connectionList, bool raiseInterrupts)
else if (sendStatus == 0) else if (sendStatus == 0)
{ {
/* done writing, only wait for read events */ /* done writing, only wait for read events */
ModifyWaitEvent(waitEventSet, connectionIndex, WL_SOCKET_READABLE, ModifyWaitEvent(waitEventSet, event->pos, WL_SOCKET_READABLE,
NULL); NULL);
} }
} }
if (event->events & WL_SOCKET_READABLE) /*
* Check whether the connection is done is the socket is either readable
* or writable. If it was only writable, we performed a PQflush which
* might have read from the socket, meaning we may not see the socket
* becoming readable again, so better to check it now.
*/
if (event->events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE))
{ {
int receiveStatus = PQconsumeInput(connection->pgConn); int receiveStatus = PQconsumeInput(connection->pgConn);
if (receiveStatus == 0) if (receiveStatus == 0)
@ -866,7 +871,21 @@ WaitForAllConnections(List *connectionList, bool raiseInterrupts)
if (connectionIsReady) if (connectionIsReady)
{ {
/*
* All pending connections are kept at the end of the allConnections
* array and the connectionReady array matches the allConnections
* array. The wait event set corresponds to the pending connections
* subarray so we can get the index in the allConnections array by
* taking the event index + the offset of the subarray.
*/
connectionIndex = event->pos + pendingConnectionsStartIndex;
connectionReady[connectionIndex] = true; connectionReady[connectionIndex] = true;
/*
* When a connection is ready, we should build a new wait event
* set that excludes this connection.
*/
rebuildWaitEventSet = true; rebuildWaitEventSet = true;
} }
} }
@ -877,9 +896,24 @@ WaitForAllConnections(List *connectionList, bool raiseInterrupts)
{ {
if (connectionReady[connectionIndex]) if (connectionReady[connectionIndex])
{ {
/*
* Replace the ready connection with a connection from
* the start of the pending connections subarray. This
* may be the connection itself, in which case this is
* a noop.
*/
allConnections[connectionIndex] = allConnections[connectionIndex] =
allConnections[pendingConnectionsStartIndex]; allConnections[pendingConnectionsStartIndex];
/* offset of the pending connections subarray is now 1 higher */
pendingConnectionsStartIndex++; pendingConnectionsStartIndex++;
/*
* We've moved a pending connection into this position,
* so we must reset the ready flag. Otherwise, we'd
* falsely interpret it as ready in the next round.
*/
connectionReady[connectionIndex] = false;
} }
} }
} }
@ -907,8 +941,8 @@ WaitForAllConnections(List *connectionList, bool raiseInterrupts)
/* /*
* BuildWaitEventSet creates a WaitEventSet for the given array of connections * BuildWaitEventSet creates a WaitEventSet for the given array of connections
* which can be used to wait for any of the sockets to become read-ready, or * which can be used to wait for any of the sockets to become read-ready or
* write-ready in case there is data to send. * write-ready.
*/ */
static WaitEventSet * static WaitEventSet *
BuildWaitEventSet(MultiConnection **allConnections, int totalConnectionCount, BuildWaitEventSet(MultiConnection **allConnections, int totalConnectionCount,
@ -926,14 +960,12 @@ BuildWaitEventSet(MultiConnection **allConnections, int totalConnectionCount,
{ {
MultiConnection *connection = allConnections[connectionIndex]; MultiConnection *connection = allConnections[connectionIndex];
int socket = PQsocket(connection->pgConn); int socket = PQsocket(connection->pgConn);
int eventMask = WL_SOCKET_READABLE;
int sendStatus = PQflush(connection->pgConn); /*
if (sendStatus == 1) * Always start by polling for both readability (server sent bytes)
{ * and writeability (server is ready to receive bytes).
/* we have data to send, wake up when the socket is ready to write */ */
eventMask = WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE; int eventMask = WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE;
}
AddWaitEventToSet(waitEventSet, eventMask, socket, NULL, (void *) connection); AddWaitEventToSet(waitEventSet, eventMask, socket, NULL, (void *) connection);
} }