mirror of https://github.com/citusdata/citus.git
Rebuild wait event sets after PQconnectPoll() if socket changes
The reason is that PQconnectPoll() may change the underlying socket. If we don't rebuild the wait event set, the low level APIs (such as epoll_ctl()) may fail due to invalid sockets. Instead, rebuilding ensures that we'll use accurate/active sockets.pull/3812/head
parent
c6f5d5fe88
commit
77c397e9ae
|
@ -208,7 +208,7 @@ typedef struct DistributedExecution
|
|||
* Flag to indiciate that the set of connections we are interested
|
||||
* in has changed and waitEventSet needs to be rebuilt.
|
||||
*/
|
||||
bool connectionSetChanged;
|
||||
bool rebuildWaitEventSet;
|
||||
|
||||
/*
|
||||
* Flag to indiciate that the set of wait events we are interested
|
||||
|
@ -1064,7 +1064,7 @@ CreateDistributedExecution(RowModifyLevel modLevel, List *taskList,
|
|||
|
||||
execution->raiseInterrupts = true;
|
||||
|
||||
execution->connectionSetChanged = false;
|
||||
execution->rebuildWaitEventSet = false;
|
||||
execution->waitFlagsChanged = false;
|
||||
|
||||
execution->jobIdList = jobIdList;
|
||||
|
@ -2008,6 +2008,7 @@ FindOrCreateWorkerSession(WorkerPool *workerPool, MultiConnection *connection)
|
|||
session->connection = connection;
|
||||
session->workerPool = workerPool;
|
||||
session->commandsSent = 0;
|
||||
|
||||
dlist_init(&session->pendingTaskQueue);
|
||||
dlist_init(&session->readyTaskQueue);
|
||||
|
||||
|
@ -2142,7 +2143,7 @@ RunDistributedExecution(DistributedExecution *execution)
|
|||
int eventSetSize = GetEventSetSize(execution->sessionList);
|
||||
|
||||
/* always (re)build the wait event set the first time */
|
||||
execution->connectionSetChanged = true;
|
||||
execution->rebuildWaitEventSet = true;
|
||||
|
||||
while (execution->unfinishedTaskCount > 0 && !cancellationReceived)
|
||||
{
|
||||
|
@ -2154,7 +2155,7 @@ RunDistributedExecution(DistributedExecution *execution)
|
|||
ManageWorkerPool(workerPool);
|
||||
}
|
||||
|
||||
if (execution->connectionSetChanged)
|
||||
if (execution->rebuildWaitEventSet)
|
||||
{
|
||||
if (events != NULL)
|
||||
{
|
||||
|
@ -2236,7 +2237,7 @@ RebuildWaitEventSet(DistributedExecution *execution)
|
|||
}
|
||||
|
||||
execution->waitEventSet = BuildWaitEventSet(execution->sessionList);
|
||||
execution->connectionSetChanged = false;
|
||||
execution->rebuildWaitEventSet = false;
|
||||
execution->waitFlagsChanged = false;
|
||||
|
||||
return GetEventSetSize(execution->sessionList);
|
||||
|
@ -2482,7 +2483,7 @@ ManageWorkerPool(WorkerPool *workerPool)
|
|||
}
|
||||
|
||||
INSTR_TIME_SET_CURRENT(workerPool->lastConnectionOpenTime);
|
||||
execution->connectionSetChanged = true;
|
||||
execution->rebuildWaitEventSet = true;
|
||||
}
|
||||
|
||||
|
||||
|
@ -2751,7 +2752,15 @@ ConnectionStateMachine(WorkerSession *session)
|
|||
break;
|
||||
}
|
||||
|
||||
int beforePollSocket = PQsocket(connection->pgConn);
|
||||
PostgresPollingStatusType pollMode = PQconnectPoll(connection->pgConn);
|
||||
|
||||
if (beforePollSocket != PQsocket(connection->pgConn))
|
||||
{
|
||||
/* rebuild the wait events if PQconnectPoll() changed the socket */
|
||||
execution->rebuildWaitEventSet = true;
|
||||
}
|
||||
|
||||
if (pollMode == PGRES_POLLING_FAILED)
|
||||
{
|
||||
connection->connectionState = MULTI_CONNECTION_FAILED;
|
||||
|
@ -2759,10 +2768,16 @@ ConnectionStateMachine(WorkerSession *session)
|
|||
else if (pollMode == PGRES_POLLING_READING)
|
||||
{
|
||||
UpdateConnectionWaitFlags(session, WL_SOCKET_READABLE);
|
||||
|
||||
/* we should have a valid socket */
|
||||
Assert(PQsocket(connection->pgConn) != -1);
|
||||
}
|
||||
else if (pollMode == PGRES_POLLING_WRITING)
|
||||
{
|
||||
UpdateConnectionWaitFlags(session, WL_SOCKET_WRITEABLE);
|
||||
|
||||
/* we should have a valid socket */
|
||||
Assert(PQsocket(connection->pgConn) != -1);
|
||||
}
|
||||
else
|
||||
{
|
||||
|
@ -2771,6 +2786,9 @@ ConnectionStateMachine(WorkerSession *session)
|
|||
WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE);
|
||||
|
||||
connection->connectionState = MULTI_CONNECTION_CONNECTED;
|
||||
|
||||
/* we should have a valid socket */
|
||||
Assert(PQsocket(connection->pgConn) != -1);
|
||||
}
|
||||
|
||||
break;
|
||||
|
@ -2855,7 +2873,7 @@ ConnectionStateMachine(WorkerSession *session)
|
|||
ShutdownConnection(connection);
|
||||
|
||||
/* remove connection from wait event set */
|
||||
execution->connectionSetChanged = true;
|
||||
execution->rebuildWaitEventSet = true;
|
||||
|
||||
/*
|
||||
* Reset the transaction state machine since CloseConnection()
|
||||
|
|
Loading…
Reference in New Issue