mirror of https://github.com/citusdata/citus.git
Rebuild wait event sets after PQconnectPoll() if socket changes
The reason is that PQconnectPoll() may change the underlying
socket. If we don't rebuild the wait event set, the low level
APIs (such as epoll_ctl()) may fail due to invalid sockets.
Instead, rebuilding ensures that we'll use accurate/active sockets.
(cherry picked from commit 77c397e9ae
)
pull/3928/head
parent
8f9b1a839f
commit
5474508c01
|
@ -208,7 +208,7 @@ typedef struct DistributedExecution
|
|||
* Flag to indiciate that the set of connections we are interested
|
||||
* in has changed and waitEventSet needs to be rebuilt.
|
||||
*/
|
||||
bool connectionSetChanged;
|
||||
bool rebuildWaitEventSet;
|
||||
|
||||
/*
|
||||
* Flag to indiciate that the set of wait events we are interested
|
||||
|
@ -1064,7 +1064,7 @@ CreateDistributedExecution(RowModifyLevel modLevel, List *taskList,
|
|||
|
||||
execution->raiseInterrupts = true;
|
||||
|
||||
execution->connectionSetChanged = false;
|
||||
execution->rebuildWaitEventSet = false;
|
||||
execution->waitFlagsChanged = false;
|
||||
|
||||
execution->jobIdList = jobIdList;
|
||||
|
@ -2008,6 +2008,7 @@ FindOrCreateWorkerSession(WorkerPool *workerPool, MultiConnection *connection)
|
|||
session->connection = connection;
|
||||
session->workerPool = workerPool;
|
||||
session->commandsSent = 0;
|
||||
|
||||
dlist_init(&session->pendingTaskQueue);
|
||||
dlist_init(&session->readyTaskQueue);
|
||||
|
||||
|
@ -2142,7 +2143,7 @@ RunDistributedExecution(DistributedExecution *execution)
|
|||
int eventSetSize = GetEventSetSize(execution->sessionList);
|
||||
|
||||
/* always (re)build the wait event set the first time */
|
||||
execution->connectionSetChanged = true;
|
||||
execution->rebuildWaitEventSet = true;
|
||||
|
||||
while (execution->unfinishedTaskCount > 0 && !cancellationReceived)
|
||||
{
|
||||
|
@ -2154,7 +2155,7 @@ RunDistributedExecution(DistributedExecution *execution)
|
|||
ManageWorkerPool(workerPool);
|
||||
}
|
||||
|
||||
if (execution->connectionSetChanged)
|
||||
if (execution->rebuildWaitEventSet)
|
||||
{
|
||||
if (events != NULL)
|
||||
{
|
||||
|
@ -2236,7 +2237,7 @@ RebuildWaitEventSet(DistributedExecution *execution)
|
|||
}
|
||||
|
||||
execution->waitEventSet = BuildWaitEventSet(execution->sessionList);
|
||||
execution->connectionSetChanged = false;
|
||||
execution->rebuildWaitEventSet = false;
|
||||
execution->waitFlagsChanged = false;
|
||||
|
||||
return GetEventSetSize(execution->sessionList);
|
||||
|
@ -2482,7 +2483,7 @@ ManageWorkerPool(WorkerPool *workerPool)
|
|||
}
|
||||
|
||||
INSTR_TIME_SET_CURRENT(workerPool->lastConnectionOpenTime);
|
||||
execution->connectionSetChanged = true;
|
||||
execution->rebuildWaitEventSet = true;
|
||||
}
|
||||
|
||||
|
||||
|
@ -2751,7 +2752,15 @@ ConnectionStateMachine(WorkerSession *session)
|
|||
break;
|
||||
}
|
||||
|
||||
int beforePollSocket = PQsocket(connection->pgConn);
|
||||
PostgresPollingStatusType pollMode = PQconnectPoll(connection->pgConn);
|
||||
|
||||
if (beforePollSocket != PQsocket(connection->pgConn))
|
||||
{
|
||||
/* rebuild the wait events if PQconnectPoll() changed the socket */
|
||||
execution->rebuildWaitEventSet = true;
|
||||
}
|
||||
|
||||
if (pollMode == PGRES_POLLING_FAILED)
|
||||
{
|
||||
connection->connectionState = MULTI_CONNECTION_FAILED;
|
||||
|
@ -2759,10 +2768,16 @@ ConnectionStateMachine(WorkerSession *session)
|
|||
else if (pollMode == PGRES_POLLING_READING)
|
||||
{
|
||||
UpdateConnectionWaitFlags(session, WL_SOCKET_READABLE);
|
||||
|
||||
/* we should have a valid socket */
|
||||
Assert(PQsocket(connection->pgConn) != -1);
|
||||
}
|
||||
else if (pollMode == PGRES_POLLING_WRITING)
|
||||
{
|
||||
UpdateConnectionWaitFlags(session, WL_SOCKET_WRITEABLE);
|
||||
|
||||
/* we should have a valid socket */
|
||||
Assert(PQsocket(connection->pgConn) != -1);
|
||||
}
|
||||
else
|
||||
{
|
||||
|
@ -2771,6 +2786,9 @@ ConnectionStateMachine(WorkerSession *session)
|
|||
WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE);
|
||||
|
||||
connection->connectionState = MULTI_CONNECTION_CONNECTED;
|
||||
|
||||
/* we should have a valid socket */
|
||||
Assert(PQsocket(connection->pgConn) != -1);
|
||||
}
|
||||
|
||||
break;
|
||||
|
@ -2855,7 +2873,7 @@ ConnectionStateMachine(WorkerSession *session)
|
|||
ShutdownConnection(connection);
|
||||
|
||||
/* remove connection from wait event set */
|
||||
execution->connectionSetChanged = true;
|
||||
execution->rebuildWaitEventSet = true;
|
||||
|
||||
/*
|
||||
* Reset the transaction state machine since CloseConnection()
|
||||
|
|
Loading…
Reference in New Issue