Replace poll with select/poll

Windows does not have poll(), so fall back to select()
pull/2002/head
Murat Tuncer 2017-12-29 12:03:32 +00:00 committed by Brian Cloutier
parent 997e718b26
commit 224b0a8c14
4 changed files with 165 additions and 10 deletions

View File

@ -10,7 +10,10 @@
#include "postgres.h"
#ifdef HAVE_POLL_H
#include <poll.h>
#endif
#include "libpq-fe.h"
@ -449,9 +452,12 @@ FinishConnectionEstablishment(MultiConnection *connection)
/* Loop, to handle poll() being interrupted by signals (EINTR) */
while (true)
{
struct pollfd pollFileDescriptor;
int pollResult = 0;
/* we use poll(2) if available, otherwise select(2) */
#ifdef HAVE_POLL
struct pollfd pollFileDescriptor;
pollFileDescriptor.fd = PQsocket(connection->pgConn);
if (pollmode == PGRES_POLLING_READING)
{
@ -469,6 +475,31 @@ FinishConnectionEstablishment(MultiConnection *connection)
* poll() after signal arrival.
*/
pollResult = poll(&pollFileDescriptor, 1, checkIntervalMS);
#else /* !HAVE_POLL */
fd_set readFileDescriptorSet;
fd_set writeFileDescriptorSet;
fd_set exceptionFileDescriptorSet;
int selectTimeoutUS = checkIntervalMS * 1000;
struct timeval selectTimeout = { 0, selectTimeoutUS };
int selectFileDescriptor = PQsocket(connection->pgConn);
FD_ZERO(&readFileDescriptorSet);
FD_ZERO(&writeFileDescriptorSet);
FD_ZERO(&exceptionFileDescriptorSet);
if (pollmode == PGRES_POLLING_READING)
{
FD_SET(selectFileDescriptor, &readFileDescriptorSet);
}
else if (pollmode == PGRES_POLLING_WRITING)
{
FD_SET(selectFileDescriptor, &writeFileDescriptorSet);
}
pollResult = (select) (selectFileDescriptor + 1, &readFileDescriptorSet,
&writeFileDescriptorSet, &exceptionFileDescriptorSet,
&selectTimeout);
#endif /* HAVE_POLL */
if (pollResult == 0)
{

View File

@ -992,13 +992,19 @@ BuildWaitEventSet(MultiConnection **allConnections, int totalConnectionCount,
WaitEventSet *waitEventSet = NULL;
int connectionIndex = 0;
/* we subtract 2 to make room for the WL_POSTMASTER_DEATH and WL_LATCH_SET events */
if (pendingConnectionCount > FD_SETSIZE - 2)
{
pendingConnectionCount = FD_SETSIZE - 2;
}
/* allocate pending connections + 2 for the signal latch and postmaster death */
waitEventSet = CreateWaitEventSet(CurrentMemoryContext, pendingConnectionCount + 2);
for (connectionIndex = pendingConnectionsStartIndex;
connectionIndex < totalConnectionCount; connectionIndex++)
for (connectionIndex = 0; connectionIndex < pendingConnectionCount; connectionIndex++)
{
MultiConnection *connection = allConnections[connectionIndex];
MultiConnection *connection = allConnections[pendingConnectionsStartIndex +
connectionIndex];
int socket = PQsocket(connection->pgConn);
/*

View File

@ -29,9 +29,8 @@
#include <errno.h>
#include <unistd.h>
#ifdef HAVE_POLL_H
#include <poll.h>
#ifdef HAVE_SYS_POLL_H
#include <sys/poll.h>
#endif
@ -815,8 +814,21 @@ MultiClientCreateWaitInfo(int maxConnections)
{
WaitInfo *waitInfo = palloc(sizeof(WaitInfo));
#ifndef HAVE_POLL
/* we subtract 2 to make room for the WL_POSTMASTER_DEATH and WL_LATCH_SET events */
if (maxConnections > FD_SETSIZE - 2)
{
maxConnections = FD_SETSIZE - 2;
}
#endif
waitInfo->maxWaiters = maxConnections;
/* we use poll(2) if available, otherwise select(2) */
#ifdef HAVE_POLL
waitInfo->pollfds = palloc(maxConnections * sizeof(struct pollfd));
#endif
/* initialize remaining fields */
MultiClientResetWaitInfo(waitInfo);
@ -832,6 +844,14 @@ MultiClientResetWaitInfo(WaitInfo *waitInfo)
waitInfo->registeredWaiters = 0;
waitInfo->haveReadyWaiter = false;
waitInfo->haveFailedWaiter = false;
#ifndef HAVE_POLL
FD_ZERO(&(waitInfo->readFileDescriptorSet));
FD_ZERO(&(waitInfo->writeFileDescriptorSet));
FD_ZERO(&(waitInfo->exceptionFileDescriptorSet));
waitInfo->maxConnectionFileDescriptor = 0;
#endif
}
@ -839,7 +859,10 @@ MultiClientResetWaitInfo(WaitInfo *waitInfo)
void
MultiClientFreeWaitInfo(WaitInfo *waitInfo)
{
#ifdef HAVE_POLL
pfree(waitInfo->pollfds);
#endif
pfree(waitInfo);
}
@ -853,9 +876,17 @@ MultiClientRegisterWait(WaitInfo *waitInfo, TaskExecutionStatus executionStatus,
int32 connectionId)
{
MultiConnection *connection = NULL;
#ifdef HAVE_POLL
struct pollfd *pollfd = NULL;
#else
int connectionFileDescriptor = 0;
#endif
Assert(waitInfo->registeredWaiters < waitInfo->maxWaiters);
/* This is to make sure we could never register more than maxWaiters in Windows */
if (waitInfo->registeredWaiters >= waitInfo->maxWaiters)
{
return;
}
if (executionStatus == TASK_STATUS_READY)
{
@ -869,6 +900,7 @@ MultiClientRegisterWait(WaitInfo *waitInfo, TaskExecutionStatus executionStatus,
}
connection = ClientConnectionArray[connectionId];
#ifdef HAVE_POLL
pollfd = &waitInfo->pollfds[waitInfo->registeredWaiters];
pollfd->fd = PQsocket(connection->pgConn);
if (executionStatus == TASK_STATUS_SOCKET_READ)
@ -879,6 +911,24 @@ MultiClientRegisterWait(WaitInfo *waitInfo, TaskExecutionStatus executionStatus,
{
pollfd->events = POLLERR | POLLOUT;
}
#else
connectionFileDescriptor = PQsocket(connection->pgConn);
if (connectionFileDescriptor > waitInfo->maxConnectionFileDescriptor)
{
waitInfo->maxConnectionFileDescriptor = connectionFileDescriptor;
}
if (executionStatus == TASK_STATUS_SOCKET_READ)
{
FD_SET(connectionFileDescriptor, &(waitInfo->readFileDescriptorSet));
}
else if (executionStatus == TASK_STATUS_SOCKET_WRITE)
{
FD_SET(connectionFileDescriptor, &(waitInfo->writeFileDescriptorSet));
}
#endif
waitInfo->registeredWaiters++;
}
@ -913,12 +963,32 @@ MultiClientWait(WaitInfo *waitInfo)
/*
* Wait for activity on any of the sockets. Limit the maximum time
* spent waiting in one wait cycle, as insurance against edge
* cases. For efficiency we don't want wake up quite as often as
* cases. For efficiency we don't want to wake quite as often as
* citus.remote_task_check_interval, so rather arbitrarily sleep ten
* times as long.
*/
#ifdef HAVE_POLL
int rc = poll(waitInfo->pollfds, waitInfo->registeredWaiters,
RemoteTaskCheckInterval * 10);
#else
int maxConnectionFileDescriptor = waitInfo->maxConnectionFileDescriptor;
const int maxTimeout = RemoteTaskCheckInterval * 10 * 1000L;
struct timeval selectTimeout = { 0, maxTimeout };
int rc = 0;
/* it is not okay to call select when there is nothing to wait for */
if (waitInfo->registeredWaiters == 0)
{
return;
}
rc = (select) (maxConnectionFileDescriptor + 1,
&(waitInfo->readFileDescriptorSet),
&(waitInfo->writeFileDescriptorSet),
&(waitInfo->exceptionFileDescriptorSet),
&selectTimeout);
#endif
if (rc < 0)
{
@ -926,7 +996,16 @@ MultiClientWait(WaitInfo *waitInfo)
* Signals that arrive can interrupt our poll(). In that case just
* return. Every other error is unexpected and treated as such.
*/
if (errno == EAGAIN || errno == EINTR)
int errorCode = errno;
#ifdef WIN32
errorCode = WSAGetLastError();
#endif
if (errorCode == 0)
{
return;
}
else if (errorCode == EAGAIN || errorCode == EINTR)
{
return;
}
@ -963,6 +1042,9 @@ ClientConnectionReady(MultiConnection *connection,
{
bool clientConnectionReady = false;
int pollResult = 0;
/* we use poll(2) if available, otherwise select(2) */
#ifdef HAVE_POLL
int fileDescriptorCount = 1;
int immediateTimeout = 0;
int pollEventMask = 0;
@ -982,6 +1064,32 @@ ClientConnectionReady(MultiConnection *connection,
pollFileDescriptor.revents = 0;
pollResult = poll(&pollFileDescriptor, fileDescriptorCount, immediateTimeout);
#else
fd_set readFileDescriptorSet;
fd_set writeFileDescriptorSet;
fd_set exceptionFileDescriptorSet;
struct timeval immediateTimeout = { 0, 0 };
int connectionFileDescriptor = PQsocket(connection->pgConn);
FD_ZERO(&readFileDescriptorSet);
FD_ZERO(&writeFileDescriptorSet);
FD_ZERO(&exceptionFileDescriptorSet);
if (pollingStatus == PGRES_POLLING_READING)
{
FD_SET(connectionFileDescriptor, &exceptionFileDescriptorSet);
FD_SET(connectionFileDescriptor, &readFileDescriptorSet);
}
else if (pollingStatus == PGRES_POLLING_WRITING)
{
FD_SET(connectionFileDescriptor, &exceptionFileDescriptorSet);
FD_SET(connectionFileDescriptor, &writeFileDescriptorSet);
}
pollResult = (select) (connectionFileDescriptor + 1, &readFileDescriptorSet,
&writeFileDescriptorSet, &exceptionFileDescriptorSet,
&immediateTimeout);
#endif /* HAVE_POLL */
if (pollResult > 0)
{
@ -1013,7 +1121,7 @@ ClientConnectionReady(MultiConnection *connection,
*/
Assert(errno == ENOMEM);
ereport(ERROR, (errcode_for_socket_access(),
errmsg("poll() failed: %m")));
errmsg("select()/poll() failed: %m")));
}
}

View File

@ -18,6 +18,9 @@
#include "distributed/connection_management.h"
#include "nodes/pg_list.h"
#ifdef HAVE_POLL_H
#include <poll.h>
#endif
#define INVALID_CONNECTION_ID -1 /* identifies an invalid connection */
#define MAX_CONNECTION_COUNT 2048 /* simultaneous client connection count */
@ -92,7 +95,14 @@ struct pollfd; /* forward declared, to avoid having to include poll.h */
typedef struct WaitInfo
{
int maxWaiters;
#ifdef HAVE_POLL
struct pollfd *pollfds;
#else
fd_set readFileDescriptorSet;
fd_set writeFileDescriptorSet;
fd_set exceptionFileDescriptorSet;
int maxConnectionFileDescriptor;
#endif /* HAVE_POLL*/
int registeredWaiters;
bool haveReadyWaiter;
bool haveFailedWaiter;