From 526d50091cdab4cd18391aafd62906f53c4590fa Mon Sep 17 00:00:00 2001 From: Onder Kalaci Date: Fri, 18 Nov 2022 16:35:25 +0100 Subject: [PATCH] w --- .../distributed/executor/adaptive_executor.c | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index bcc751d14..3b626fff5 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -244,6 +244,7 @@ typedef struct DistributedExecution */ WaitEventSet *waitEventSet; WaitEvent *events; + int eventSetSize; /* * The number of connections we aim to open per worker. @@ -721,7 +722,7 @@ static int GetEventSetSize(List *sessionList); static bool ProcessSessionsWithFailedWaitEventSetOperations( DistributedExecution *execution); static bool HasIncompleteConnectionEstablishment(DistributedExecution *execution); -static int RebuildWaitEventSet(DistributedExecution *execution, bool socketClosedOnly); +static void RebuildWaitEventSet(DistributedExecution *execution, bool socketClosedOnly); static void ProcessWaitEvents(DistributedExecution *execution, WaitEvent *events, int eventCount, bool *cancellationReceived); #if PG_VERSION_NUM >= PG_VERSION_15 @@ -2646,8 +2647,6 @@ RunDistributedExecution(DistributedExecution *execution) bool cancellationReceived = false; - int eventSetSize = GetEventSetSize(execution->sessionList); - /* always (re)build the wait event set the first time */ execution->rebuildWaitEventSet = true; @@ -2689,7 +2688,7 @@ RunDistributedExecution(DistributedExecution *execution) } else if (execution->rebuildWaitEventSet) { - eventSetSize = RebuildWaitEventSet(execution, false); + RebuildWaitEventSet(execution, false); skipWaitEvents = ProcessSessionsWithFailedWaitEventSetOperations(execution); @@ -2717,7 +2716,7 @@ RunDistributedExecution(DistributedExecution *execution) /* wait for I/O events */ long timeout = NextEventTimeout(execution); int eventCount = WaitEventSetWait(execution->waitEventSet, timeout, execution->events, - eventSetSize, WAIT_EVENT_CLIENT_READ); + execution->eventSetSize, WAIT_EVENT_CLIENT_READ); ProcessWaitEvents(execution, execution->events, eventCount, &cancellationReceived); } @@ -2832,7 +2831,7 @@ HasIncompleteConnectionEstablishment(DistributedExecution *execution) * which means that we need to update which connections we wait on for events. * It returns the new event set size. */ -static int +static void RebuildWaitEventSet(DistributedExecution *execution, bool socketClosedOnly) { if (execution->events != NULL) @@ -2855,8 +2854,7 @@ RebuildWaitEventSet(DistributedExecution *execution, bool socketClosedOnly) execution->events = palloc0(GetEventSetSize(execution->sessionList) * sizeof(WaitEvent)); execution->rebuildWaitEventSet = false; execution->waitFlagsChanged = false; - - return GetEventSetSize(execution->sessionList); + execution->eventSetSize = GetEventSetSize(execution->sessionList); } @@ -3410,14 +3408,14 @@ OpenNewConnections(WorkerPool *workerPool, int newConnectionCount, } bool socketClosedOnly = true; - int size = RebuildWaitEventSet(workerPool->distributedExecution, socketClosedOnly); + RebuildWaitEventSet(workerPool->distributedExecution, socketClosedOnly); long timeout = 0;/* don't wait */ int eventCount = WaitEventSetWait(workerPool->distributedExecution->waitEventSet, timeout, workerPool->distributedExecution->events, - size, WAIT_EVENT_CLIENT_READ); + workerPool->distributedExecution->eventSetSize, WAIT_EVENT_CLIENT_READ); ProcessWaitEventsForSocketClosed(workerPool->distributedExecution->events, eventCount);