From d6dadb1b25c657fd8b12ae1794cfcdb1a35fd628 Mon Sep 17 00:00:00 2001 From: Marco Slot Date: Sat, 28 Oct 2017 15:54:54 +0200 Subject: [PATCH 1/3] Use correct index for ModifyWaitEvent in WaitForAllConnections --- src/backend/distributed/connection/remote_commands.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/backend/distributed/connection/remote_commands.c b/src/backend/distributed/connection/remote_commands.c index c8eb7ec98..54a5532bc 100644 --- a/src/backend/distributed/connection/remote_commands.c +++ b/src/backend/distributed/connection/remote_commands.c @@ -831,7 +831,6 @@ WaitForAllConnections(List *connectionList, bool raiseInterrupts) } connection = (MultiConnection *) event->user_data; - connectionIndex = event->pos + pendingConnectionsStartIndex; if (event->events & WL_SOCKET_WRITEABLE) { @@ -844,7 +843,7 @@ WaitForAllConnections(List *connectionList, bool raiseInterrupts) else if (sendStatus == 0) { /* done writing, only wait for read events */ - ModifyWaitEvent(waitEventSet, connectionIndex, WL_SOCKET_READABLE, + ModifyWaitEvent(waitEventSet, event->pos, WL_SOCKET_READABLE, NULL); } } @@ -866,6 +865,7 @@ WaitForAllConnections(List *connectionList, bool raiseInterrupts) if (connectionIsReady) { + connectionIndex = event->pos + pendingConnectionsStartIndex; connectionReady[connectionIndex] = true; rebuildWaitEventSet = true; } From e6e68974998a65b84eb3b18bf3beff1151e504b7 Mon Sep 17 00:00:00 2001 From: Marco Slot Date: Sat, 28 Oct 2017 15:54:18 +0200 Subject: [PATCH 2/3] Defer initial PQflush to main loop in WaitForAllConnections --- .../distributed/connection/remote_commands.c | 22 +++++++++---------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/src/backend/distributed/connection/remote_commands.c b/src/backend/distributed/connection/remote_commands.c index 54a5532bc..ff27a5f6f 100644 --- a/src/backend/distributed/connection/remote_commands.c +++ b/src/backend/distributed/connection/remote_commands.c @@ -848,7 +848,14 @@ WaitForAllConnections(List *connectionList, bool raiseInterrupts) } } - if (event->events & WL_SOCKET_READABLE) + /* + * Check whether the connection is done is the socket is either readable + * or writable. If it was only writable, we performed a PQflush which + * might have read from the socket, meaning we may not see the socket + * becoming readable again, so better to check it now. + */ + if (event->events & WL_SOCKET_READABLE || + event->events & WL_SOCKET_WRITEABLE) { int receiveStatus = PQconsumeInput(connection->pgConn); if (receiveStatus == 0) @@ -907,8 +914,8 @@ WaitForAllConnections(List *connectionList, bool raiseInterrupts) /* * BuildWaitEventSet creates a WaitEventSet for the given array of connections - * which can be used to wait for any of the sockets to become read-ready, or - * write-ready in case there is data to send. + * which can be used to wait for any of the sockets to become read-ready or + * write-ready. */ static WaitEventSet * BuildWaitEventSet(MultiConnection **allConnections, int totalConnectionCount, @@ -926,14 +933,7 @@ BuildWaitEventSet(MultiConnection **allConnections, int totalConnectionCount, { MultiConnection *connection = allConnections[connectionIndex]; int socket = PQsocket(connection->pgConn); - int eventMask = WL_SOCKET_READABLE; - - int sendStatus = PQflush(connection->pgConn); - if (sendStatus == 1) - { - /* we have data to send, wake up when the socket is ready to write */ - eventMask = WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE; - } + int eventMask = WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE; AddWaitEventToSet(waitEventSet, eventMask, socket, NULL, (void *) connection); } From 2bb46bb5ee2dbcc58c863b707af72e8ac11579dc Mon Sep 17 00:00:00 2001 From: Marco Slot Date: Mon, 30 Oct 2017 13:03:24 +0100 Subject: [PATCH 3/3] Reset connectionReady flag after moving a connection in WaitForAllConnections --- .../distributed/connection/remote_commands.c | 36 +++++++++++++++++-- 1 file changed, 34 insertions(+), 2 deletions(-) diff --git a/src/backend/distributed/connection/remote_commands.c b/src/backend/distributed/connection/remote_commands.c index ff27a5f6f..accd7f833 100644 --- a/src/backend/distributed/connection/remote_commands.c +++ b/src/backend/distributed/connection/remote_commands.c @@ -854,8 +854,7 @@ WaitForAllConnections(List *connectionList, bool raiseInterrupts) * might have read from the socket, meaning we may not see the socket * becoming readable again, so better to check it now. */ - if (event->events & WL_SOCKET_READABLE || - event->events & WL_SOCKET_WRITEABLE) + if (event->events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE)) { int receiveStatus = PQconsumeInput(connection->pgConn); if (receiveStatus == 0) @@ -872,8 +871,21 @@ WaitForAllConnections(List *connectionList, bool raiseInterrupts) if (connectionIsReady) { + /* + * All pending connections are kept at the end of the allConnections + * array and the connectionReady array matches the allConnections + * array. The wait event set corresponds to the pending connections + * subarray so we can get the index in the allConnections array by + * taking the event index + the offset of the subarray. + */ connectionIndex = event->pos + pendingConnectionsStartIndex; + connectionReady[connectionIndex] = true; + + /* + * When a connection is ready, we should build a new wait event + * set that excludes this connection. + */ rebuildWaitEventSet = true; } } @@ -884,9 +896,24 @@ WaitForAllConnections(List *connectionList, bool raiseInterrupts) { if (connectionReady[connectionIndex]) { + /* + * Replace the ready connection with a connection from + * the start of the pending connections subarray. This + * may be the connection itself, in which case this is + * a noop. + */ allConnections[connectionIndex] = allConnections[pendingConnectionsStartIndex]; + + /* offset of the pending connections subarray is now 1 higher */ pendingConnectionsStartIndex++; + + /* + * We've moved a pending connection into this position, + * so we must reset the ready flag. Otherwise, we'd + * falsely interpret it as ready in the next round. + */ + connectionReady[connectionIndex] = false; } } } @@ -933,6 +960,11 @@ BuildWaitEventSet(MultiConnection **allConnections, int totalConnectionCount, { MultiConnection *connection = allConnections[connectionIndex]; int socket = PQsocket(connection->pgConn); + + /* + * Always start by polling for both readability (server sent bytes) + * and writeability (server is ready to receive bytes). + */ int eventMask = WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE; AddWaitEventToSet(waitEventSet, eventMask, socket, NULL, (void *) connection);