From 2bb46bb5ee2dbcc58c863b707af72e8ac11579dc Mon Sep 17 00:00:00 2001 From: Marco Slot Date: Mon, 30 Oct 2017 13:03:24 +0100 Subject: [PATCH] Reset connectionReady flag after moving a connection in WaitForAllConnections --- .../distributed/connection/remote_commands.c | 36 +++++++++++++++++-- 1 file changed, 34 insertions(+), 2 deletions(-) diff --git a/src/backend/distributed/connection/remote_commands.c b/src/backend/distributed/connection/remote_commands.c index ff27a5f6f..accd7f833 100644 --- a/src/backend/distributed/connection/remote_commands.c +++ b/src/backend/distributed/connection/remote_commands.c @@ -854,8 +854,7 @@ WaitForAllConnections(List *connectionList, bool raiseInterrupts) * might have read from the socket, meaning we may not see the socket * becoming readable again, so better to check it now. */ - if (event->events & WL_SOCKET_READABLE || - event->events & WL_SOCKET_WRITEABLE) + if (event->events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE)) { int receiveStatus = PQconsumeInput(connection->pgConn); if (receiveStatus == 0) @@ -872,8 +871,21 @@ WaitForAllConnections(List *connectionList, bool raiseInterrupts) if (connectionIsReady) { + /* + * All pending connections are kept at the end of the allConnections + * array and the connectionReady array matches the allConnections + * array. The wait event set corresponds to the pending connections + * subarray so we can get the index in the allConnections array by + * taking the event index + the offset of the subarray. + */ connectionIndex = event->pos + pendingConnectionsStartIndex; + connectionReady[connectionIndex] = true; + + /* + * When a connection is ready, we should build a new wait event + * set that excludes this connection. + */ rebuildWaitEventSet = true; } } @@ -884,9 +896,24 @@ WaitForAllConnections(List *connectionList, bool raiseInterrupts) { if (connectionReady[connectionIndex]) { + /* + * Replace the ready connection with a connection from + * the start of the pending connections subarray. This + * may be the connection itself, in which case this is + * a noop. + */ allConnections[connectionIndex] = allConnections[pendingConnectionsStartIndex]; + + /* offset of the pending connections subarray is now 1 higher */ pendingConnectionsStartIndex++; + + /* + * We've moved a pending connection into this position, + * so we must reset the ready flag. Otherwise, we'd + * falsely interpret it as ready in the next round. + */ + connectionReady[connectionIndex] = false; } } } @@ -933,6 +960,11 @@ BuildWaitEventSet(MultiConnection **allConnections, int totalConnectionCount, { MultiConnection *connection = allConnections[connectionIndex]; int socket = PQsocket(connection->pgConn); + + /* + * Always start by polling for both readability (server sent bytes) + * and writeability (server is ready to receive bytes). + */ int eventMask = WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE; AddWaitEventToSet(waitEventSet, eventMask, socket, NULL, (void *) connection);