From 355805c7d8ee5cf1003148771230ec49a9b14152 Mon Sep 17 00:00:00 2001 From: SaitTalhaNisanci Date: Wed, 12 Feb 2020 12:20:37 +0300 Subject: [PATCH] create ProcessWaitEvents for separating the logic of handling events --- .../distributed/executor/adaptive_executor.c | 93 +++++++++++-------- 1 file changed, 54 insertions(+), 39 deletions(-) diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index befd47e3d..521ae798c 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -620,6 +620,8 @@ static void ExtractParametersForRemoteExecution(ParamListInfo paramListInfo, const char ***parameterValues); static int GetEventSetSize(List *sessionList); static int UpdateWaitEventSet(DistributedExecution *execution); +static bool ProcessWaitEvents(DistributedExecution *execution, WaitEvent *events, int + eventCount); /* * AdaptiveExecutor is called via CitusExecScan on the @@ -2048,46 +2050,9 @@ RunDistributedExecution(DistributedExecution *execution) /* wait for I/O events */ int eventCount = WaitEventSetWait(execution->waitEventSet, timeout, events, eventSetSize, WAIT_EVENT_CLIENT_READ); - - int eventIndex = 0; - - /* process I/O events */ - for (; eventIndex < eventCount; eventIndex++) + if (!ProcessWaitEvents(execution, events, eventCount)) { - WaitEvent *event = &events[eventIndex]; - - if (event->events & WL_POSTMASTER_DEATH) - { - ereport(ERROR, (errmsg("postmaster was shut down, exiting"))); - } - - if (event->events & WL_LATCH_SET) - { - ResetLatch(MyLatch); - - if (execution->raiseInterrupts) - { - CHECK_FOR_INTERRUPTS(); - } - - if (IsHoldOffCancellationReceived()) - { - /* - * Break out of event loop immediately in case of cancellation. - * We cannot use "return" here inside a PG_TRY() block since - * then the exception stack won't be reset. - */ - cancellationReceived = true; - break; - } - - continue; - } - - WorkerSession *session = (WorkerSession *) event->user_data; - session->latestUnconsumedWaitEvents = event->events; - - ConnectionStateMachine(session); + cancellationReceived = true; } } @@ -2153,6 +2118,56 @@ UpdateWaitEventSet(DistributedExecution *execution) } +/* + * ProcessWaitEvents processes the received events from connections. + * It returns true if all events are processed, otherwise false. + */ +static bool +ProcessWaitEvents(DistributedExecution *execution, WaitEvent *events, int eventCount) +{ + int eventIndex = 0; + + /* process I/O events */ + for (; eventIndex < eventCount; eventIndex++) + { + WaitEvent *event = &events[eventIndex]; + + if (event->events & WL_POSTMASTER_DEATH) + { + ereport(ERROR, (errmsg("postmaster was shut down, exiting"))); + } + + if (event->events & WL_LATCH_SET) + { + ResetLatch(MyLatch); + + if (execution->raiseInterrupts) + { + CHECK_FOR_INTERRUPTS(); + } + + if (IsHoldOffCancellationReceived()) + { + /* + * Break out of event loop immediately in case of cancellation. + * We cannot use "return" here inside a PG_TRY() block since + * then the exception stack won't be reset. + */ + return false; + } + + continue; + } + + WorkerSession *session = (WorkerSession *) event->user_data; + session->latestUnconsumedWaitEvents = event->events; + + ConnectionStateMachine(session); + } + return true; +} + + /* * ManageWorkerPool ensures the worker pool has the appropriate number of connections * based on the number of pending tasks.