mirror of https://github.com/citusdata/citus.git
Defer initial PQflush to main loop in WaitForAllConnections
parent
d6dadb1b25
commit
e6e6897499
|
@ -848,7 +848,14 @@ WaitForAllConnections(List *connectionList, bool raiseInterrupts)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (event->events & WL_SOCKET_READABLE)
|
/*
|
||||||
|
* Check whether the connection is done is the socket is either readable
|
||||||
|
* or writable. If it was only writable, we performed a PQflush which
|
||||||
|
* might have read from the socket, meaning we may not see the socket
|
||||||
|
* becoming readable again, so better to check it now.
|
||||||
|
*/
|
||||||
|
if (event->events & WL_SOCKET_READABLE ||
|
||||||
|
event->events & WL_SOCKET_WRITEABLE)
|
||||||
{
|
{
|
||||||
int receiveStatus = PQconsumeInput(connection->pgConn);
|
int receiveStatus = PQconsumeInput(connection->pgConn);
|
||||||
if (receiveStatus == 0)
|
if (receiveStatus == 0)
|
||||||
|
@ -907,8 +914,8 @@ WaitForAllConnections(List *connectionList, bool raiseInterrupts)
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* BuildWaitEventSet creates a WaitEventSet for the given array of connections
|
* BuildWaitEventSet creates a WaitEventSet for the given array of connections
|
||||||
* which can be used to wait for any of the sockets to become read-ready, or
|
* which can be used to wait for any of the sockets to become read-ready or
|
||||||
* write-ready in case there is data to send.
|
* write-ready.
|
||||||
*/
|
*/
|
||||||
static WaitEventSet *
|
static WaitEventSet *
|
||||||
BuildWaitEventSet(MultiConnection **allConnections, int totalConnectionCount,
|
BuildWaitEventSet(MultiConnection **allConnections, int totalConnectionCount,
|
||||||
|
@ -926,14 +933,7 @@ BuildWaitEventSet(MultiConnection **allConnections, int totalConnectionCount,
|
||||||
{
|
{
|
||||||
MultiConnection *connection = allConnections[connectionIndex];
|
MultiConnection *connection = allConnections[connectionIndex];
|
||||||
int socket = PQsocket(connection->pgConn);
|
int socket = PQsocket(connection->pgConn);
|
||||||
int eventMask = WL_SOCKET_READABLE;
|
int eventMask = WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE;
|
||||||
|
|
||||||
int sendStatus = PQflush(connection->pgConn);
|
|
||||||
if (sendStatus == 1)
|
|
||||||
{
|
|
||||||
/* we have data to send, wake up when the socket is ready to write */
|
|
||||||
eventMask = WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE;
|
|
||||||
}
|
|
||||||
|
|
||||||
AddWaitEventToSet(waitEventSet, eventMask, socket, NULL, (void *) connection);
|
AddWaitEventToSet(waitEventSet, eventMask, socket, NULL, (void *) connection);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue