diff --git a/src/backend/distributed/connection/remote_commands.c b/src/backend/distributed/connection/remote_commands.c index c8eb7ec98..accd7f833 100644 --- a/src/backend/distributed/connection/remote_commands.c +++ b/src/backend/distributed/connection/remote_commands.c @@ -831,7 +831,6 @@ WaitForAllConnections(List *connectionList, bool raiseInterrupts) } connection = (MultiConnection *) event->user_data; - connectionIndex = event->pos + pendingConnectionsStartIndex; if (event->events & WL_SOCKET_WRITEABLE) { @@ -844,12 +843,18 @@ WaitForAllConnections(List *connectionList, bool raiseInterrupts) else if (sendStatus == 0) { /* done writing, only wait for read events */ - ModifyWaitEvent(waitEventSet, connectionIndex, WL_SOCKET_READABLE, + ModifyWaitEvent(waitEventSet, event->pos, WL_SOCKET_READABLE, NULL); } } - if (event->events & WL_SOCKET_READABLE) + /* + * Check whether the connection is done is the socket is either readable + * or writable. If it was only writable, we performed a PQflush which + * might have read from the socket, meaning we may not see the socket + * becoming readable again, so better to check it now. + */ + if (event->events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE)) { int receiveStatus = PQconsumeInput(connection->pgConn); if (receiveStatus == 0) @@ -866,7 +871,21 @@ WaitForAllConnections(List *connectionList, bool raiseInterrupts) if (connectionIsReady) { + /* + * All pending connections are kept at the end of the allConnections + * array and the connectionReady array matches the allConnections + * array. The wait event set corresponds to the pending connections + * subarray so we can get the index in the allConnections array by + * taking the event index + the offset of the subarray. + */ + connectionIndex = event->pos + pendingConnectionsStartIndex; + connectionReady[connectionIndex] = true; + + /* + * When a connection is ready, we should build a new wait event + * set that excludes this connection. + */ rebuildWaitEventSet = true; } } @@ -877,9 +896,24 @@ WaitForAllConnections(List *connectionList, bool raiseInterrupts) { if (connectionReady[connectionIndex]) { + /* + * Replace the ready connection with a connection from + * the start of the pending connections subarray. This + * may be the connection itself, in which case this is + * a noop. + */ allConnections[connectionIndex] = allConnections[pendingConnectionsStartIndex]; + + /* offset of the pending connections subarray is now 1 higher */ pendingConnectionsStartIndex++; + + /* + * We've moved a pending connection into this position, + * so we must reset the ready flag. Otherwise, we'd + * falsely interpret it as ready in the next round. + */ + connectionReady[connectionIndex] = false; } } } @@ -907,8 +941,8 @@ WaitForAllConnections(List *connectionList, bool raiseInterrupts) /* * BuildWaitEventSet creates a WaitEventSet for the given array of connections - * which can be used to wait for any of the sockets to become read-ready, or - * write-ready in case there is data to send. + * which can be used to wait for any of the sockets to become read-ready or + * write-ready. */ static WaitEventSet * BuildWaitEventSet(MultiConnection **allConnections, int totalConnectionCount, @@ -926,14 +960,12 @@ BuildWaitEventSet(MultiConnection **allConnections, int totalConnectionCount, { MultiConnection *connection = allConnections[connectionIndex]; int socket = PQsocket(connection->pgConn); - int eventMask = WL_SOCKET_READABLE; - int sendStatus = PQflush(connection->pgConn); - if (sendStatus == 1) - { - /* we have data to send, wake up when the socket is ready to write */ - eventMask = WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE; - } + /* + * Always start by polling for both readability (server sent bytes) + * and writeability (server is ready to receive bytes). + */ + int eventMask = WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE; AddWaitEventToSet(waitEventSet, eventMask, socket, NULL, (void *) connection); }