diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index e53d80ea3..befd47e3d 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -619,6 +619,7 @@ static void ExtractParametersForRemoteExecution(ParamListInfo paramListInfo, Oid **parameterTypes, const char ***parameterValues); static int GetEventSetSize(List *sessionList); +static int UpdateWaitEventSet(DistributedExecution *execution); /* * AdaptiveExecutor is called via CitusExecScan on the @@ -2015,7 +2016,6 @@ RunDistributedExecution(DistributedExecution *execution) while (execution->unfinishedTaskCount > 0 && !cancellationReceived) { - int eventIndex = 0; long timeout = NextEventTimeout(execution); WorkerPool *workerPool = NULL; @@ -2026,12 +2026,6 @@ RunDistributedExecution(DistributedExecution *execution) if (execution->connectionSetChanged) { - if (execution->waitEventSet != NULL) - { - FreeWaitEventSet(execution->waitEventSet); - execution->waitEventSet = NULL; - } - if (events != NULL) { /* @@ -2041,16 +2035,9 @@ RunDistributedExecution(DistributedExecution *execution) pfree(events); events = NULL; } - - execution->waitEventSet = BuildWaitEventSet(execution->sessionList); - - /* recalculate (and allocate) since the sessions have changed */ - eventSetSize = GetEventSetSize(execution->sessionList); + eventSetSize = UpdateWaitEventSet(execution); events = palloc0(eventSetSize * sizeof(WaitEvent)); - - execution->connectionSetChanged = false; - execution->waitFlagsChanged = false; } else if (execution->waitFlagsChanged) { @@ -2062,6 +2049,8 @@ RunDistributedExecution(DistributedExecution *execution) int eventCount = WaitEventSetWait(execution->waitEventSet, timeout, events, eventSetSize, WAIT_EVENT_CLIENT_READ); + int eventIndex = 0; + /* process I/O events */ for (; eventIndex < eventCount; eventIndex++) { @@ -2141,6 +2130,29 @@ RunDistributedExecution(DistributedExecution *execution) } +/* + * UpdateWaitEventSet updates the waitEventSet for the distributed execution. + * This happens when the connection set for the distributed execution is changed, + * which means that we need to update which connections we wait on for events. + * It returns the new event set size. + */ +static int +UpdateWaitEventSet(DistributedExecution *execution) +{ + if (execution->waitEventSet != NULL) + { + FreeWaitEventSet(execution->waitEventSet); + execution->waitEventSet = NULL; + } + + execution->waitEventSet = BuildWaitEventSet(execution->sessionList); + execution->connectionSetChanged = false; + execution->waitFlagsChanged = false; + + return GetEventSetSize(execution->sessionList); +} + + /* * ManageWorkerPool ensures the worker pool has the appropriate number of connections * based on the number of pending tasks.