diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index 415c26d2a..f10f25f7f 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -37,7 +37,7 @@ * same connection since it may hold relevant locks or have uncommitted * writes. In that case we "assign" the task to a connection by adding * it to the task queue of specific connection (in - * AssignTasksToConnections). Otherwise we consider the task unassigned + * AssignTasksToConnectionsOrWorkerPool ). Otherwise we consider the task unassigned * and add it to the task queue of a worker pool, which means that it * can be executed over any connection in the pool. * @@ -576,7 +576,7 @@ static bool TaskListModifiesDatabase(RowModifyLevel modLevel, List *taskList); static bool DistributedExecutionRequiresRollback(List *taskList); static bool TaskListRequires2PC(List *taskList); static bool SelectForUpdateOnReferenceTable(RowModifyLevel modLevel, List *taskList); -static void AssignTasksToConnections(DistributedExecution *execution); +static void AssignTasksToConnectionsOrWorkerPool(DistributedExecution *execution); static void UnclaimAllSessionConnections(List *sessionList); static bool UseConnectionPerPlacement(void); static PlacementExecutionOrder ExecutionOrderForTask(RowModifyLevel modLevel, Task *task); @@ -590,7 +590,7 @@ static int UsableConnectionCount(WorkerPool *workerPool); static long NextEventTimeout(DistributedExecution *execution); static long MillisecondsBetweenTimestamps(TimestampTz startTime, TimestampTz endTime); static WaitEventSet * BuildWaitEventSet(List *sessionList); -static void UpdateWaitEventSetFlags(WaitEventSet *waitEventSet, List *sessionList); +static void RebuildWaitEventSetFlags(WaitEventSet *waitEventSet, List *sessionList); static TaskPlacementExecution * PopPlacementExecution(WorkerSession *session); static TaskPlacementExecution * PopAssignedPlacementExecution(WorkerSession *session); static TaskPlacementExecution * PopUnassignedPlacementExecution(WorkerPool *workerPool); @@ -618,6 +618,10 @@ static bool HasDependentJobs(Job *mainJob); static void ExtractParametersForRemoteExecution(ParamListInfo paramListInfo, Oid **parameterTypes, const char ***parameterValues); +static int GetEventSetSize(List *sessionList); +static int RebuildWaitEventSet(DistributedExecution *execution); +static void ProcessWaitEvents(DistributedExecution *execution, WaitEvent *events, int + eventCount, bool *cancellationReceived); /* * AdaptiveExecutor is called via CitusExecScan on the @@ -1579,13 +1583,13 @@ UnclaimAllSessionConnections(List *sessionList) /* - * AssignTasksToConnections goes through the list of tasks to determine whether any + * AssignTasksToConnectionsOrWorkerPool goes through the list of tasks to determine whether any * task placements need to be assigned to particular connections because of preceding * operations in the transaction. It then adds those connections to the pool and adds * the task placement executions to the assigned task queue of the connection. */ static void -AssignTasksToConnections(DistributedExecution *execution) +AssignTasksToConnectionsOrWorkerPool(DistributedExecution *execution) { RowModifyLevel modLevel = execution->modLevel; List *taskList = execution->tasksToExecute; @@ -2001,21 +2005,19 @@ RunDistributedExecution(DistributedExecution *execution) { WaitEvent *events = NULL; - AssignTasksToConnections(execution); + AssignTasksToConnectionsOrWorkerPool(execution); PG_TRY(); { bool cancellationReceived = false; - /* additional 2 is for postmaster and latch */ - int eventSetSize = list_length(execution->sessionList) + 2; + int eventSetSize = GetEventSetSize(execution->sessionList); /* always (re)build the wait event set the first time */ execution->connectionSetChanged = true; while (execution->unfinishedTaskCount > 0 && !cancellationReceived) { - int eventIndex = 0; long timeout = NextEventTimeout(execution); WorkerPool *workerPool = NULL; @@ -2026,12 +2028,6 @@ RunDistributedExecution(DistributedExecution *execution) if (execution->connectionSetChanged) { - if (execution->waitEventSet != NULL) - { - FreeWaitEventSet(execution->waitEventSet); - execution->waitEventSet = NULL; - } - if (events != NULL) { /* @@ -2041,65 +2037,20 @@ RunDistributedExecution(DistributedExecution *execution) pfree(events); events = NULL; } - - execution->waitEventSet = BuildWaitEventSet(execution->sessionList); - - /* recalculate (and allocate) since the sessions have changed */ - eventSetSize = list_length(execution->sessionList) + 2; + eventSetSize = RebuildWaitEventSet(execution); events = palloc0(eventSetSize * sizeof(WaitEvent)); - - execution->connectionSetChanged = false; - execution->waitFlagsChanged = false; } else if (execution->waitFlagsChanged) { - UpdateWaitEventSetFlags(execution->waitEventSet, execution->sessionList); + RebuildWaitEventSetFlags(execution->waitEventSet, execution->sessionList); execution->waitFlagsChanged = false; } /* wait for I/O events */ int eventCount = WaitEventSetWait(execution->waitEventSet, timeout, events, eventSetSize, WAIT_EVENT_CLIENT_READ); - - /* process I/O events */ - for (; eventIndex < eventCount; eventIndex++) - { - WaitEvent *event = &events[eventIndex]; - - if (event->events & WL_POSTMASTER_DEATH) - { - ereport(ERROR, (errmsg("postmaster was shut down, exiting"))); - } - - if (event->events & WL_LATCH_SET) - { - ResetLatch(MyLatch); - - if (execution->raiseInterrupts) - { - CHECK_FOR_INTERRUPTS(); - } - - if (IsHoldOffCancellationReceived()) - { - /* - * Break out of event loop immediately in case of cancellation. - * We cannot use "return" here inside a PG_TRY() block since - * then the exception stack won't be reset. - */ - cancellationReceived = true; - break; - } - - continue; - } - - WorkerSession *session = (WorkerSession *) event->user_data; - session->latestUnconsumedWaitEvents = event->events; - - ConnectionStateMachine(session); - } + ProcessWaitEvents(execution, events, eventCount, &cancellationReceived); } if (events != NULL) @@ -2141,6 +2092,78 @@ RunDistributedExecution(DistributedExecution *execution) } +/* + * RebuildWaitEventSet updates the waitEventSet for the distributed execution. + * This happens when the connection set for the distributed execution is changed, + * which means that we need to update which connections we wait on for events. + * It returns the new event set size. + */ +static int +RebuildWaitEventSet(DistributedExecution *execution) +{ + if (execution->waitEventSet != NULL) + { + FreeWaitEventSet(execution->waitEventSet); + execution->waitEventSet = NULL; + } + + execution->waitEventSet = BuildWaitEventSet(execution->sessionList); + execution->connectionSetChanged = false; + execution->waitFlagsChanged = false; + + return GetEventSetSize(execution->sessionList); +} + + +/* + * ProcessWaitEvents processes the received events from connections. + */ +static void +ProcessWaitEvents(DistributedExecution *execution, WaitEvent *events, int eventCount, + bool *cancellationReceived) +{ + int eventIndex = 0; + + /* process I/O events */ + for (; eventIndex < eventCount; eventIndex++) + { + WaitEvent *event = &events[eventIndex]; + + if (event->events & WL_POSTMASTER_DEATH) + { + ereport(ERROR, (errmsg("postmaster was shut down, exiting"))); + } + + if (event->events & WL_LATCH_SET) + { + ResetLatch(MyLatch); + + if (execution->raiseInterrupts) + { + CHECK_FOR_INTERRUPTS(); + } + + if (IsHoldOffCancellationReceived()) + { + /* + * Break out of event loop immediately in case of cancellation. + * We cannot use "return" here inside a PG_TRY() block since + * then the exception stack won't be reset. + */ + *cancellationReceived = true; + } + + continue; + } + + WorkerSession *session = (WorkerSession *) event->user_data; + session->latestUnconsumedWaitEvents = event->events; + + ConnectionStateMachine(session); + } +} + + /* * ManageWorkerPool ensures the worker pool has the appropriate number of connections * based on the number of pending tasks. @@ -3766,7 +3789,7 @@ static WaitEventSet * BuildWaitEventSet(List *sessionList) { /* additional 2 is for postmaster and latch */ - int eventSetSize = list_length(sessionList) + 2; + int eventSetSize = GetEventSetSize(sessionList); WaitEventSet *waitEventSet = CreateWaitEventSet(CurrentMemoryContext, eventSetSize); @@ -3808,11 +3831,22 @@ BuildWaitEventSet(List *sessionList) /* - * UpdateWaitEventSetFlags modifies the given waitEventSet with the wait flags + * GetEventSetSize returns the event set size for a list of sessions. + */ +static int +GetEventSetSize(List *sessionList) +{ + /* additional 2 is for postmaster and latch */ + return list_length(sessionList) + 2; +} + + +/* + * RebuildWaitEventSetFlags modifies the given waitEventSet with the wait flags * for connections in the sessionList. */ static void -UpdateWaitEventSetFlags(WaitEventSet *waitEventSet, List *sessionList) +RebuildWaitEventSetFlags(WaitEventSet *waitEventSet, List *sessionList) { WorkerSession *session = NULL; foreach_ptr(session, sessionList)