Avoid unnecessary calls to PQconsumeInput

pull/2848/head
Marco Slot 2019-07-21 01:52:44 +02:00
parent 71ad5c095b
commit 32e7a80960
1 changed files with 13 additions and 4 deletions

View File

@ -389,6 +389,9 @@ typedef struct WorkerSession
/* index in the wait event set */ /* index in the wait event set */
int waitEventSetIndex; int waitEventSetIndex;
/* events reported by the latest call to WaitEventSetWait */
int latestUnconsumedWaitEvents;
} WorkerSession; } WorkerSession;
@ -1722,6 +1725,7 @@ RunDistributedExecution(DistributedExecution *execution)
} }
session = (WorkerSession *) event->user_data; session = (WorkerSession *) event->user_data;
session->latestUnconsumedWaitEvents = event->events;
ConnectionStateMachine(session); ConnectionStateMachine(session);
} }
@ -2573,11 +2577,13 @@ CheckConnectionReady(WorkerSession *session)
waitFlags = waitFlags | WL_SOCKET_WRITEABLE; waitFlags = waitFlags | WL_SOCKET_WRITEABLE;
} }
/* if reading fails, there's not much we can do */ if ((session->latestUnconsumedWaitEvents & WL_SOCKET_READABLE) != 0)
if (PQconsumeInput(connection->pgConn) == 0)
{ {
connection->connectionState = MULTI_CONNECTION_LOST; if (PQconsumeInput(connection->pgConn) == 0)
return false; {
connection->connectionState = MULTI_CONNECTION_LOST;
return false;
}
} }
if (!PQisBusy(connection->pgConn)) if (!PQisBusy(connection->pgConn))
@ -2587,6 +2593,9 @@ CheckConnectionReady(WorkerSession *session)
UpdateConnectionWaitFlags(session, waitFlags); UpdateConnectionWaitFlags(session, waitFlags);
/* don't consume input redundantly if we cycle back into CheckConnectionReady */
session->latestUnconsumedWaitEvents = 0;
return connectionReady; return connectionReady;
} }