diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index d88b4beda..80fba5e30 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -241,8 +241,14 @@ typedef struct DistributedExecution * would make code a bit harder to read than making this non-local, so we * move it here. See comments for PG_TRY() in postgres/src/include/elog.h * and "man 3 siglongjmp" for more context. + * + * Another reason for keeping these here is to cache a single + * WaitEventSet/WaitEvent within the execution pair until we + * need to rebuild the waitEvents. */ WaitEventSet *waitEventSet; + WaitEvent *events; + int eventSetSize; /* * The number of connections we aim to open per worker. @@ -720,7 +726,7 @@ static int GetEventSetSize(List *sessionList); static bool ProcessSessionsWithFailedWaitEventSetOperations( DistributedExecution *execution); static bool HasIncompleteConnectionEstablishment(DistributedExecution *execution); -static int RebuildWaitEventSet(DistributedExecution *execution); +static void RebuildWaitEventSet(DistributedExecution *execution); static void ProcessWaitEvents(DistributedExecution *execution, WaitEvent *events, int eventCount, bool *cancellationReceived); #if PG_VERSION_NUM >= PG_VERSION_15 @@ -2632,8 +2638,6 @@ SequentialRunDistributedExecution(DistributedExecution *execution) void RunDistributedExecution(DistributedExecution *execution) { - WaitEvent *events = NULL; - AssignTasksToConnectionsOrWorkerPool(execution); PG_TRY(); @@ -2647,8 +2651,6 @@ RunDistributedExecution(DistributedExecution *execution) bool cancellationReceived = false; - int eventSetSize = GetEventSetSize(execution->sessionList); - /* always (re)build the wait event set the first time */ execution->rebuildWaitEventSet = true; @@ -2690,17 +2692,7 @@ RunDistributedExecution(DistributedExecution *execution) } else if (execution->rebuildWaitEventSet) { - if (events != NULL) - { - /* - * The execution might take a while, so explicitly free at this point - * because we don't need anymore. - */ - pfree(events); - events = NULL; - } - eventSetSize = RebuildWaitEventSet(execution); - events = palloc0(eventSetSize * sizeof(WaitEvent)); + RebuildWaitEventSet(execution); skipWaitEvents = ProcessSessionsWithFailedWaitEventSetOperations(execution); @@ -2727,14 +2719,18 @@ RunDistributedExecution(DistributedExecution *execution) /* wait for I/O events */ long timeout = NextEventTimeout(execution); - int eventCount = WaitEventSetWait(execution->waitEventSet, timeout, events, - eventSetSize, WAIT_EVENT_CLIENT_READ); - ProcessWaitEvents(execution, events, eventCount, &cancellationReceived); + int eventCount = + WaitEventSetWait(execution->waitEventSet, timeout, execution->events, + execution->eventSetSize, WAIT_EVENT_CLIENT_READ); + + ProcessWaitEvents(execution, execution->events, eventCount, + &cancellationReceived); } - if (events != NULL) + if (execution->events != NULL) { - pfree(events); + pfree(execution->events); + execution->events = NULL; } if (execution->waitEventSet != NULL) @@ -2841,11 +2837,20 @@ HasIncompleteConnectionEstablishment(DistributedExecution *execution) * RebuildWaitEventSet updates the waitEventSet for the distributed execution. * This happens when the connection set for the distributed execution is changed, * which means that we need to update which connections we wait on for events. - * It returns the new event set size. */ -static int +static void RebuildWaitEventSet(DistributedExecution *execution) { + if (execution->events != NULL) + { + /* + * The execution might take a while, so explicitly free at this point + * because we don't need anymore. + */ + pfree(execution->events); + execution->events = NULL; + } + if (execution->waitEventSet != NULL) { FreeWaitEventSet(execution->waitEventSet); @@ -2853,10 +2858,11 @@ RebuildWaitEventSet(DistributedExecution *execution) } execution->waitEventSet = BuildWaitEventSet(execution->sessionList); + execution->eventSetSize = GetEventSetSize(execution->sessionList); + execution->events = palloc0(execution->eventSetSize * sizeof(WaitEvent)); + execution->rebuildWaitEventSet = false; execution->waitFlagsChanged = false; - - return GetEventSetSize(execution->sessionList); }