From 224b0a8c144aec1afcdee6b268390322437b1a96 Mon Sep 17 00:00:00 2001 From: Murat Tuncer Date: Fri, 29 Dec 2017 12:03:32 +0000 Subject: [PATCH] Replace poll with select/poll Windows does not have poll(), so fall back to select() --- .../connection/connection_management.c | 33 ++++- .../distributed/connection/remote_commands.c | 12 +- .../executor/multi_client_executor.c | 120 +++++++++++++++++- .../distributed/multi_client_executor.h | 10 ++ 4 files changed, 165 insertions(+), 10 deletions(-) diff --git a/src/backend/distributed/connection/connection_management.c b/src/backend/distributed/connection/connection_management.c index 23dca4b47..5150a3e5a 100644 --- a/src/backend/distributed/connection/connection_management.c +++ b/src/backend/distributed/connection/connection_management.c @@ -10,7 +10,10 @@ #include "postgres.h" +#ifdef HAVE_POLL_H #include +#endif + #include "libpq-fe.h" @@ -449,9 +452,12 @@ FinishConnectionEstablishment(MultiConnection *connection) /* Loop, to handle poll() being interrupted by signals (EINTR) */ while (true) { - struct pollfd pollFileDescriptor; int pollResult = 0; + /* we use poll(2) if available, otherwise select(2) */ +#ifdef HAVE_POLL + struct pollfd pollFileDescriptor; + pollFileDescriptor.fd = PQsocket(connection->pgConn); if (pollmode == PGRES_POLLING_READING) { @@ -469,6 +475,31 @@ FinishConnectionEstablishment(MultiConnection *connection) * poll() after signal arrival. */ pollResult = poll(&pollFileDescriptor, 1, checkIntervalMS); +#else /* !HAVE_POLL */ + fd_set readFileDescriptorSet; + fd_set writeFileDescriptorSet; + fd_set exceptionFileDescriptorSet; + int selectTimeoutUS = checkIntervalMS * 1000; + struct timeval selectTimeout = { 0, selectTimeoutUS }; + int selectFileDescriptor = PQsocket(connection->pgConn); + + FD_ZERO(&readFileDescriptorSet); + FD_ZERO(&writeFileDescriptorSet); + FD_ZERO(&exceptionFileDescriptorSet); + + if (pollmode == PGRES_POLLING_READING) + { + FD_SET(selectFileDescriptor, &readFileDescriptorSet); + } + else if (pollmode == PGRES_POLLING_WRITING) + { + FD_SET(selectFileDescriptor, &writeFileDescriptorSet); + } + + pollResult = (select) (selectFileDescriptor + 1, &readFileDescriptorSet, + &writeFileDescriptorSet, &exceptionFileDescriptorSet, + &selectTimeout); +#endif /* HAVE_POLL */ if (pollResult == 0) { diff --git a/src/backend/distributed/connection/remote_commands.c b/src/backend/distributed/connection/remote_commands.c index 255541de8..06ffb27a0 100644 --- a/src/backend/distributed/connection/remote_commands.c +++ b/src/backend/distributed/connection/remote_commands.c @@ -992,13 +992,19 @@ BuildWaitEventSet(MultiConnection **allConnections, int totalConnectionCount, WaitEventSet *waitEventSet = NULL; int connectionIndex = 0; + /* we subtract 2 to make room for the WL_POSTMASTER_DEATH and WL_LATCH_SET events */ + if (pendingConnectionCount > FD_SETSIZE - 2) + { + pendingConnectionCount = FD_SETSIZE - 2; + } + /* allocate pending connections + 2 for the signal latch and postmaster death */ waitEventSet = CreateWaitEventSet(CurrentMemoryContext, pendingConnectionCount + 2); - for (connectionIndex = pendingConnectionsStartIndex; - connectionIndex < totalConnectionCount; connectionIndex++) + for (connectionIndex = 0; connectionIndex < pendingConnectionCount; connectionIndex++) { - MultiConnection *connection = allConnections[connectionIndex]; + MultiConnection *connection = allConnections[pendingConnectionsStartIndex + + connectionIndex]; int socket = PQsocket(connection->pgConn); /* diff --git a/src/backend/distributed/executor/multi_client_executor.c b/src/backend/distributed/executor/multi_client_executor.c index 06ae1268b..a28d1008c 100644 --- a/src/backend/distributed/executor/multi_client_executor.c +++ b/src/backend/distributed/executor/multi_client_executor.c @@ -29,9 +29,8 @@ #include #include +#ifdef HAVE_POLL_H #include -#ifdef HAVE_SYS_POLL_H -#include #endif @@ -815,8 +814,21 @@ MultiClientCreateWaitInfo(int maxConnections) { WaitInfo *waitInfo = palloc(sizeof(WaitInfo)); +#ifndef HAVE_POLL + + /* we subtract 2 to make room for the WL_POSTMASTER_DEATH and WL_LATCH_SET events */ + if (maxConnections > FD_SETSIZE - 2) + { + maxConnections = FD_SETSIZE - 2; + } +#endif + waitInfo->maxWaiters = maxConnections; + + /* we use poll(2) if available, otherwise select(2) */ +#ifdef HAVE_POLL waitInfo->pollfds = palloc(maxConnections * sizeof(struct pollfd)); +#endif /* initialize remaining fields */ MultiClientResetWaitInfo(waitInfo); @@ -832,6 +844,14 @@ MultiClientResetWaitInfo(WaitInfo *waitInfo) waitInfo->registeredWaiters = 0; waitInfo->haveReadyWaiter = false; waitInfo->haveFailedWaiter = false; + +#ifndef HAVE_POLL + FD_ZERO(&(waitInfo->readFileDescriptorSet)); + FD_ZERO(&(waitInfo->writeFileDescriptorSet)); + FD_ZERO(&(waitInfo->exceptionFileDescriptorSet)); + + waitInfo->maxConnectionFileDescriptor = 0; +#endif } @@ -839,7 +859,10 @@ MultiClientResetWaitInfo(WaitInfo *waitInfo) void MultiClientFreeWaitInfo(WaitInfo *waitInfo) { +#ifdef HAVE_POLL pfree(waitInfo->pollfds); +#endif + pfree(waitInfo); } @@ -853,9 +876,17 @@ MultiClientRegisterWait(WaitInfo *waitInfo, TaskExecutionStatus executionStatus, int32 connectionId) { MultiConnection *connection = NULL; +#ifdef HAVE_POLL struct pollfd *pollfd = NULL; +#else + int connectionFileDescriptor = 0; +#endif - Assert(waitInfo->registeredWaiters < waitInfo->maxWaiters); + /* This is to make sure we could never register more than maxWaiters in Windows */ + if (waitInfo->registeredWaiters >= waitInfo->maxWaiters) + { + return; + } if (executionStatus == TASK_STATUS_READY) { @@ -869,6 +900,7 @@ MultiClientRegisterWait(WaitInfo *waitInfo, TaskExecutionStatus executionStatus, } connection = ClientConnectionArray[connectionId]; +#ifdef HAVE_POLL pollfd = &waitInfo->pollfds[waitInfo->registeredWaiters]; pollfd->fd = PQsocket(connection->pgConn); if (executionStatus == TASK_STATUS_SOCKET_READ) @@ -879,6 +911,24 @@ MultiClientRegisterWait(WaitInfo *waitInfo, TaskExecutionStatus executionStatus, { pollfd->events = POLLERR | POLLOUT; } + +#else + connectionFileDescriptor = PQsocket(connection->pgConn); + if (connectionFileDescriptor > waitInfo->maxConnectionFileDescriptor) + { + waitInfo->maxConnectionFileDescriptor = connectionFileDescriptor; + } + + if (executionStatus == TASK_STATUS_SOCKET_READ) + { + FD_SET(connectionFileDescriptor, &(waitInfo->readFileDescriptorSet)); + } + else if (executionStatus == TASK_STATUS_SOCKET_WRITE) + { + FD_SET(connectionFileDescriptor, &(waitInfo->writeFileDescriptorSet)); + } +#endif + waitInfo->registeredWaiters++; } @@ -913,12 +963,32 @@ MultiClientWait(WaitInfo *waitInfo) /* * Wait for activity on any of the sockets. Limit the maximum time * spent waiting in one wait cycle, as insurance against edge - * cases. For efficiency we don't want wake up quite as often as + * cases. For efficiency we don't want to wake quite as often as * citus.remote_task_check_interval, so rather arbitrarily sleep ten * times as long. */ +#ifdef HAVE_POLL int rc = poll(waitInfo->pollfds, waitInfo->registeredWaiters, RemoteTaskCheckInterval * 10); +#else + int maxConnectionFileDescriptor = waitInfo->maxConnectionFileDescriptor; + const int maxTimeout = RemoteTaskCheckInterval * 10 * 1000L; + struct timeval selectTimeout = { 0, maxTimeout }; + int rc = 0; + + /* it is not okay to call select when there is nothing to wait for */ + if (waitInfo->registeredWaiters == 0) + { + return; + } + + rc = (select) (maxConnectionFileDescriptor + 1, + &(waitInfo->readFileDescriptorSet), + &(waitInfo->writeFileDescriptorSet), + &(waitInfo->exceptionFileDescriptorSet), + &selectTimeout); + +#endif if (rc < 0) { @@ -926,7 +996,16 @@ MultiClientWait(WaitInfo *waitInfo) * Signals that arrive can interrupt our poll(). In that case just * return. Every other error is unexpected and treated as such. */ - if (errno == EAGAIN || errno == EINTR) + int errorCode = errno; +#ifdef WIN32 + errorCode = WSAGetLastError(); +#endif + + if (errorCode == 0) + { + return; + } + else if (errorCode == EAGAIN || errorCode == EINTR) { return; } @@ -963,6 +1042,9 @@ ClientConnectionReady(MultiConnection *connection, { bool clientConnectionReady = false; int pollResult = 0; + + /* we use poll(2) if available, otherwise select(2) */ +#ifdef HAVE_POLL int fileDescriptorCount = 1; int immediateTimeout = 0; int pollEventMask = 0; @@ -982,6 +1064,32 @@ ClientConnectionReady(MultiConnection *connection, pollFileDescriptor.revents = 0; pollResult = poll(&pollFileDescriptor, fileDescriptorCount, immediateTimeout); +#else + fd_set readFileDescriptorSet; + fd_set writeFileDescriptorSet; + fd_set exceptionFileDescriptorSet; + struct timeval immediateTimeout = { 0, 0 }; + int connectionFileDescriptor = PQsocket(connection->pgConn); + + FD_ZERO(&readFileDescriptorSet); + FD_ZERO(&writeFileDescriptorSet); + FD_ZERO(&exceptionFileDescriptorSet); + + if (pollingStatus == PGRES_POLLING_READING) + { + FD_SET(connectionFileDescriptor, &exceptionFileDescriptorSet); + FD_SET(connectionFileDescriptor, &readFileDescriptorSet); + } + else if (pollingStatus == PGRES_POLLING_WRITING) + { + FD_SET(connectionFileDescriptor, &exceptionFileDescriptorSet); + FD_SET(connectionFileDescriptor, &writeFileDescriptorSet); + } + + pollResult = (select) (connectionFileDescriptor + 1, &readFileDescriptorSet, + &writeFileDescriptorSet, &exceptionFileDescriptorSet, + &immediateTimeout); +#endif /* HAVE_POLL */ if (pollResult > 0) { @@ -1013,7 +1121,7 @@ ClientConnectionReady(MultiConnection *connection, */ Assert(errno == ENOMEM); ereport(ERROR, (errcode_for_socket_access(), - errmsg("poll() failed: %m"))); + errmsg("select()/poll() failed: %m"))); } } diff --git a/src/include/distributed/multi_client_executor.h b/src/include/distributed/multi_client_executor.h index 70a29243e..2d4eb0961 100644 --- a/src/include/distributed/multi_client_executor.h +++ b/src/include/distributed/multi_client_executor.h @@ -18,6 +18,9 @@ #include "distributed/connection_management.h" #include "nodes/pg_list.h" +#ifdef HAVE_POLL_H +#include +#endif #define INVALID_CONNECTION_ID -1 /* identifies an invalid connection */ #define MAX_CONNECTION_COUNT 2048 /* simultaneous client connection count */ @@ -92,7 +95,14 @@ struct pollfd; /* forward declared, to avoid having to include poll.h */ typedef struct WaitInfo { int maxWaiters; +#ifdef HAVE_POLL struct pollfd *pollfds; +#else + fd_set readFileDescriptorSet; + fd_set writeFileDescriptorSet; + fd_set exceptionFileDescriptorSet; + int maxConnectionFileDescriptor; +#endif /* HAVE_POLL*/ int registeredWaiters; bool haveReadyWaiter; bool haveFailedWaiter;