mirror of https://github.com/citusdata/citus.git
Reset connectionReady flag after moving a connection in WaitForAllConnections
parent
e6e6897499
commit
2bb46bb5ee
|
@ -854,8 +854,7 @@ WaitForAllConnections(List *connectionList, bool raiseInterrupts)
|
||||||
* might have read from the socket, meaning we may not see the socket
|
* might have read from the socket, meaning we may not see the socket
|
||||||
* becoming readable again, so better to check it now.
|
* becoming readable again, so better to check it now.
|
||||||
*/
|
*/
|
||||||
if (event->events & WL_SOCKET_READABLE ||
|
if (event->events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE))
|
||||||
event->events & WL_SOCKET_WRITEABLE)
|
|
||||||
{
|
{
|
||||||
int receiveStatus = PQconsumeInput(connection->pgConn);
|
int receiveStatus = PQconsumeInput(connection->pgConn);
|
||||||
if (receiveStatus == 0)
|
if (receiveStatus == 0)
|
||||||
|
@ -872,8 +871,21 @@ WaitForAllConnections(List *connectionList, bool raiseInterrupts)
|
||||||
|
|
||||||
if (connectionIsReady)
|
if (connectionIsReady)
|
||||||
{
|
{
|
||||||
|
/*
|
||||||
|
* All pending connections are kept at the end of the allConnections
|
||||||
|
* array and the connectionReady array matches the allConnections
|
||||||
|
* array. The wait event set corresponds to the pending connections
|
||||||
|
* subarray so we can get the index in the allConnections array by
|
||||||
|
* taking the event index + the offset of the subarray.
|
||||||
|
*/
|
||||||
connectionIndex = event->pos + pendingConnectionsStartIndex;
|
connectionIndex = event->pos + pendingConnectionsStartIndex;
|
||||||
|
|
||||||
connectionReady[connectionIndex] = true;
|
connectionReady[connectionIndex] = true;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* When a connection is ready, we should build a new wait event
|
||||||
|
* set that excludes this connection.
|
||||||
|
*/
|
||||||
rebuildWaitEventSet = true;
|
rebuildWaitEventSet = true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -884,9 +896,24 @@ WaitForAllConnections(List *connectionList, bool raiseInterrupts)
|
||||||
{
|
{
|
||||||
if (connectionReady[connectionIndex])
|
if (connectionReady[connectionIndex])
|
||||||
{
|
{
|
||||||
|
/*
|
||||||
|
* Replace the ready connection with a connection from
|
||||||
|
* the start of the pending connections subarray. This
|
||||||
|
* may be the connection itself, in which case this is
|
||||||
|
* a noop.
|
||||||
|
*/
|
||||||
allConnections[connectionIndex] =
|
allConnections[connectionIndex] =
|
||||||
allConnections[pendingConnectionsStartIndex];
|
allConnections[pendingConnectionsStartIndex];
|
||||||
|
|
||||||
|
/* offset of the pending connections subarray is now 1 higher */
|
||||||
pendingConnectionsStartIndex++;
|
pendingConnectionsStartIndex++;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* We've moved a pending connection into this position,
|
||||||
|
* so we must reset the ready flag. Otherwise, we'd
|
||||||
|
* falsely interpret it as ready in the next round.
|
||||||
|
*/
|
||||||
|
connectionReady[connectionIndex] = false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -933,6 +960,11 @@ BuildWaitEventSet(MultiConnection **allConnections, int totalConnectionCount,
|
||||||
{
|
{
|
||||||
MultiConnection *connection = allConnections[connectionIndex];
|
MultiConnection *connection = allConnections[connectionIndex];
|
||||||
int socket = PQsocket(connection->pgConn);
|
int socket = PQsocket(connection->pgConn);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Always start by polling for both readability (server sent bytes)
|
||||||
|
* and writeability (server is ready to receive bytes).
|
||||||
|
*/
|
||||||
int eventMask = WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE;
|
int eventMask = WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE;
|
||||||
|
|
||||||
AddWaitEventToSet(waitEventSet, eventMask, socket, NULL, (void *) connection);
|
AddWaitEventToSet(waitEventSet, eventMask, socket, NULL, (void *) connection);
|
||||||
|
|
Loading…
Reference in New Issue