From fca986f214a2ae58c6536cd8477e855cd6e6590d Mon Sep 17 00:00:00 2001 From: Marco Slot Date: Tue, 4 Jul 2017 21:36:27 +0200 Subject: [PATCH] Add API for waiting for multiple connections --- .../distributed/connection/remote_commands.c | 237 ++++++++++++++++++ src/include/distributed/remote_commands.h | 3 + 2 files changed, 240 insertions(+) diff --git a/src/backend/distributed/connection/remote_commands.c b/src/backend/distributed/connection/remote_commands.c index 76efecc3d..0bd1f2680 100644 --- a/src/backend/distributed/connection/remote_commands.c +++ b/src/backend/distributed/connection/remote_commands.c @@ -23,7 +23,12 @@ /* GUC, determining whether statements sent to remote nodes are logged */ bool LogRemoteCommands = false; + static bool FinishConnectionIO(MultiConnection *connection, bool raiseInterrupts); +static WaitEventSet * BuildWaitEventSet(MultiConnection **allConnections, + int totalConnectionCount, + int pendingConnectionsStartIndex); + /* simple helpers */ @@ -652,3 +657,235 @@ FinishConnectionIO(MultiConnection *connection, bool raiseInterrupts) return false; } + + +/* + * WaitForAllConnections blocks until all connections in the list are no + * longer busy, meaning the pending command has either finished or failed. + */ +void +WaitForAllConnections(List *connectionList, bool raiseInterrupts) +{ + int totalConnectionCount = list_length(connectionList); + int pendingConnectionsStartIndex = 0; + int connectionIndex = 0; + ListCell *connectionCell = NULL; + + MultiConnection *allConnections[totalConnectionCount]; + WaitEvent events[totalConnectionCount]; + bool connectionReady[totalConnectionCount]; + WaitEventSet *waitEventSet = NULL; + + /* convert connection list to an array such that we can move items around */ + foreach(connectionCell, connectionList) + { + MultiConnection *connection = (MultiConnection *) lfirst(connectionCell); + + allConnections[connectionIndex] = connection; + connectionReady[connectionIndex] = false; + connectionIndex++; + } + + /* make an initial pass to check for failed and idle connections */ + for (connectionIndex = pendingConnectionsStartIndex; + connectionIndex < totalConnectionCount; connectionIndex++) + { + MultiConnection *connection = allConnections[connectionIndex]; + + if (PQstatus(connection->pgConn) == CONNECTION_BAD || + !PQisBusy(connection->pgConn)) + { + /* connection is already done; keep non-ready connections at the end */ + allConnections[connectionIndex] = + allConnections[pendingConnectionsStartIndex]; + pendingConnectionsStartIndex++; + } + } + + PG_TRY(); + { + bool rebuildWaitEventSet = true; + + while (pendingConnectionsStartIndex < totalConnectionCount) + { + int eventIndex = 0; + int eventCount = 0; + long timeout = -1; + int pendingConnectionCount = totalConnectionCount - + pendingConnectionsStartIndex; + + /* + * We cannot disable wait events as of postgres 9.6, so we rebuild the + * WaitEventSet whenever connections are ready. + */ + if (rebuildWaitEventSet) + { + if (waitEventSet != NULL) + { + FreeWaitEventSet(waitEventSet); + } + + waitEventSet = BuildWaitEventSet(allConnections, totalConnectionCount, + pendingConnectionsStartIndex); + + rebuildWaitEventSet = false; + } + + /* wait for I/O events */ +#if (PG_VERSION_NUM >= 100000) + eventCount = WaitEventSetWait(waitEventSet, timeout, events, + pendingConnectionCount, WAIT_EVENT_CLIENT_READ); +#else + eventCount = WaitEventSetWait(waitEventSet, timeout, events, + pendingConnectionCount); +#endif + + /* process I/O events */ + for (; eventIndex < eventCount; eventIndex++) + { + WaitEvent *event = &events[eventIndex]; + MultiConnection *connection = NULL; + bool connectionIsReady = false; + + if (event->events & WL_POSTMASTER_DEATH) + { + ereport(ERROR, (errmsg("postmaster was shut down, exiting"))); + } + + if (event->events & WL_LATCH_SET) + { + ResetLatch(MyLatch); + + if (raiseInterrupts) + { + CHECK_FOR_INTERRUPTS(); + } + + if (InterruptHoldoffCount > 0 && (QueryCancelPending || + ProcDiePending)) + { + /* return immediately in case of cancellation */ + FreeWaitEventSet(waitEventSet); + return; + } + + continue; + } + + connection = (MultiConnection *) event->user_data; + connectionIndex = event->pos + pendingConnectionsStartIndex; + + if (event->events & WL_SOCKET_WRITEABLE) + { + int sendStatus = PQflush(connection->pgConn); + if (sendStatus == -1) + { + /* send failed, done with this connection */ + connectionIsReady = true; + } + else if (sendStatus == 0) + { + /* done writing, only wait for read events */ + ModifyWaitEvent(waitEventSet, connectionIndex, WL_SOCKET_READABLE, + NULL); + } + } + + if (event->events & WL_SOCKET_READABLE) + { + int receiveStatus = PQconsumeInput(connection->pgConn); + if (receiveStatus == 0) + { + /* receive failed, done with this connection */ + connectionIsReady = true; + } + else if (!PQisBusy(connection->pgConn)) + { + /* result was received */ + connectionIsReady = true; + } + } + + if (connectionIsReady) + { + connectionReady[connectionIndex] = true; + rebuildWaitEventSet = true; + } + } + + /* move non-ready connections to the back of the array */ + for (connectionIndex = pendingConnectionsStartIndex; + connectionIndex < totalConnectionCount; connectionIndex++) + { + if (connectionReady[connectionIndex]) + { + allConnections[connectionIndex] = + allConnections[pendingConnectionsStartIndex]; + pendingConnectionsStartIndex++; + } + } + } + + if (waitEventSet != NULL) + { + FreeWaitEventSet(waitEventSet); + waitEventSet = NULL; + } + } + PG_CATCH(); + { + /* make sure the epoll file descriptor is always closed */ + if (waitEventSet != NULL) + { + FreeWaitEventSet(waitEventSet); + waitEventSet = NULL; + } + + PG_RE_THROW(); + } + PG_END_TRY(); +} + + +/* + * BuildWaitEventSet creates a WaitEventSet for the given array of connections + * which can be used to wait for any of the sockets to become read-ready, or + * write-ready in case there is data to send. + */ +static WaitEventSet * +BuildWaitEventSet(MultiConnection **allConnections, int totalConnectionCount, + int pendingConnectionsStartIndex) +{ + int pendingConnectionCount = totalConnectionCount - pendingConnectionsStartIndex; + WaitEventSet *waitEventSet = NULL; + int connectionIndex = 0; + + /* allocate pending connections + 2 for the signal latch and postmaster death */ + waitEventSet = CreateWaitEventSet(CurrentMemoryContext, pendingConnectionCount + 2); + + for (connectionIndex = pendingConnectionsStartIndex; + connectionIndex < totalConnectionCount; connectionIndex++) + { + MultiConnection *connection = allConnections[connectionIndex]; + int socket = PQsocket(connection->pgConn); + int eventMask = WL_SOCKET_READABLE; + + int sendStatus = PQflush(connection->pgConn); + if (sendStatus == 1) + { + /* we have data to send, wake up when the socket is ready to write */ + eventMask = WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE; + } + + AddWaitEventToSet(waitEventSet, eventMask, socket, NULL, (void *) connection); + } + + /* + * Put the wait events for the signal latch and postmaster death at the end such that + * event index + pendingConnectionsStartIndex = the connection index in the array. + */ + AddWaitEventToSet(waitEventSet, WL_POSTMASTER_DEATH, PGINVALID_SOCKET, NULL, NULL); + AddWaitEventToSet(waitEventSet, WL_LATCH_SET, PGINVALID_SOCKET, MyLatch, NULL); + + return waitEventSet; +} diff --git a/src/include/distributed/remote_commands.h b/src/include/distributed/remote_commands.h index 6cdc28802..cdf3d772a 100644 --- a/src/include/distributed/remote_commands.h +++ b/src/include/distributed/remote_commands.h @@ -52,5 +52,8 @@ extern bool PutRemoteCopyData(MultiConnection *connection, const char *buffer, int nbytes); extern bool PutRemoteCopyEnd(MultiConnection *connection, const char *errormsg); +/* waiting for multiple command results */ +extern void WaitForAllConnections(List *connectionList, bool raiseInterrupts); + #endif /* REMOTE_COMMAND_H */