diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index ffab29354..093ad22f5 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -389,6 +389,9 @@ typedef struct WorkerSession /* index in the wait event set */ int waitEventSetIndex; + + /* events reported by the latest call to WaitEventSetWait */ + int latestUnconsumedWaitEvents; } WorkerSession; @@ -1722,6 +1725,7 @@ RunDistributedExecution(DistributedExecution *execution) } session = (WorkerSession *) event->user_data; + session->latestUnconsumedWaitEvents = event->events; ConnectionStateMachine(session); } @@ -2573,11 +2577,13 @@ CheckConnectionReady(WorkerSession *session) waitFlags = waitFlags | WL_SOCKET_WRITEABLE; } - /* if reading fails, there's not much we can do */ - if (PQconsumeInput(connection->pgConn) == 0) + if ((session->latestUnconsumedWaitEvents & WL_SOCKET_READABLE) != 0) { - connection->connectionState = MULTI_CONNECTION_LOST; - return false; + if (PQconsumeInput(connection->pgConn) == 0) + { + connection->connectionState = MULTI_CONNECTION_LOST; + return false; + } } if (!PQisBusy(connection->pgConn)) @@ -2587,6 +2593,9 @@ CheckConnectionReady(WorkerSession *session) UpdateConnectionWaitFlags(session, waitFlags); + /* don't consume input redundantly if we cycle back into CheckConnectionReady */ + session->latestUnconsumedWaitEvents = 0; + return connectionReady; }