From 9302e6e6998bf82fc45a3126379affaf50ab2e23 Mon Sep 17 00:00:00 2001 From: SaitTalhaNisanci Date: Wed, 12 Feb 2020 16:52:58 +0300 Subject: [PATCH] apply review items --- .../distributed/executor/adaptive_executor.c | 42 +++++++++---------- 1 file changed, 19 insertions(+), 23 deletions(-) diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index a4851d69b..f10f25f7f 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -576,7 +576,7 @@ static bool TaskListModifiesDatabase(RowModifyLevel modLevel, List *taskList); static bool DistributedExecutionRequiresRollback(List *taskList); static bool TaskListRequires2PC(List *taskList); static bool SelectForUpdateOnReferenceTable(RowModifyLevel modLevel, List *taskList); -static void AssignTasksToConnectionsOrWorkerPool (DistributedExecution *execution); +static void AssignTasksToConnectionsOrWorkerPool(DistributedExecution *execution); static void UnclaimAllSessionConnections(List *sessionList); static bool UseConnectionPerPlacement(void); static PlacementExecutionOrder ExecutionOrderForTask(RowModifyLevel modLevel, Task *task); @@ -590,7 +590,7 @@ static int UsableConnectionCount(WorkerPool *workerPool); static long NextEventTimeout(DistributedExecution *execution); static long MillisecondsBetweenTimestamps(TimestampTz startTime, TimestampTz endTime); static WaitEventSet * BuildWaitEventSet(List *sessionList); -static void UpdateWaitEventSetFlags(WaitEventSet *waitEventSet, List *sessionList); +static void RebuildWaitEventSetFlags(WaitEventSet *waitEventSet, List *sessionList); static TaskPlacementExecution * PopPlacementExecution(WorkerSession *session); static TaskPlacementExecution * PopAssignedPlacementExecution(WorkerSession *session); static TaskPlacementExecution * PopUnassignedPlacementExecution(WorkerPool *workerPool); @@ -619,9 +619,9 @@ static void ExtractParametersForRemoteExecution(ParamListInfo paramListInfo, Oid **parameterTypes, const char ***parameterValues); static int GetEventSetSize(List *sessionList); -static int UpdateWaitEventSet(DistributedExecution *execution); -static bool ProcessWaitEvents(DistributedExecution *execution, WaitEvent *events, int - eventCount); +static int RebuildWaitEventSet(DistributedExecution *execution); +static void ProcessWaitEvents(DistributedExecution *execution, WaitEvent *events, int + eventCount, bool *cancellationReceived); /* * AdaptiveExecutor is called via CitusExecScan on the @@ -1589,7 +1589,7 @@ UnclaimAllSessionConnections(List *sessionList) * the task placement executions to the assigned task queue of the connection. */ static void -AssignTasksToConnectionsOrWorkerPool (DistributedExecution *execution) +AssignTasksToConnectionsOrWorkerPool(DistributedExecution *execution) { RowModifyLevel modLevel = execution->modLevel; List *taskList = execution->tasksToExecute; @@ -2005,7 +2005,7 @@ RunDistributedExecution(DistributedExecution *execution) { WaitEvent *events = NULL; - AssignTasksToConnectionsOrWorkerPool (execution); + AssignTasksToConnectionsOrWorkerPool(execution); PG_TRY(); { @@ -2037,23 +2037,20 @@ RunDistributedExecution(DistributedExecution *execution) pfree(events); events = NULL; } - eventSetSize = UpdateWaitEventSet(execution); + eventSetSize = RebuildWaitEventSet(execution); events = palloc0(eventSetSize * sizeof(WaitEvent)); } else if (execution->waitFlagsChanged) { - UpdateWaitEventSetFlags(execution->waitEventSet, execution->sessionList); + RebuildWaitEventSetFlags(execution->waitEventSet, execution->sessionList); execution->waitFlagsChanged = false; } /* wait for I/O events */ int eventCount = WaitEventSetWait(execution->waitEventSet, timeout, events, eventSetSize, WAIT_EVENT_CLIENT_READ); - if (!ProcessWaitEvents(execution, events, eventCount)) - { - cancellationReceived = true; - } + ProcessWaitEvents(execution, events, eventCount, &cancellationReceived); } if (events != NULL) @@ -2096,13 +2093,13 @@ RunDistributedExecution(DistributedExecution *execution) /* - * UpdateWaitEventSet updates the waitEventSet for the distributed execution. + * RebuildWaitEventSet updates the waitEventSet for the distributed execution. * This happens when the connection set for the distributed execution is changed, * which means that we need to update which connections we wait on for events. * It returns the new event set size. */ static int -UpdateWaitEventSet(DistributedExecution *execution) +RebuildWaitEventSet(DistributedExecution *execution) { if (execution->waitEventSet != NULL) { @@ -2120,10 +2117,10 @@ UpdateWaitEventSet(DistributedExecution *execution) /* * ProcessWaitEvents processes the received events from connections. - * It returns true if all events are processed, otherwise false. */ -static bool -ProcessWaitEvents(DistributedExecution *execution, WaitEvent *events, int eventCount) +static void +ProcessWaitEvents(DistributedExecution *execution, WaitEvent *events, int eventCount, + bool *cancellationReceived) { int eventIndex = 0; @@ -2153,7 +2150,7 @@ ProcessWaitEvents(DistributedExecution *execution, WaitEvent *events, int eventC * We cannot use "return" here inside a PG_TRY() block since * then the exception stack won't be reset. */ - return false; + *cancellationReceived = true; } continue; @@ -2164,7 +2161,6 @@ ProcessWaitEvents(DistributedExecution *execution, WaitEvent *events, int eventC ConnectionStateMachine(session); } - return true; } @@ -3835,7 +3831,7 @@ BuildWaitEventSet(List *sessionList) /* - * GetEventSetSize returns the event set size. + * GetEventSetSize returns the event set size for a list of sessions. */ static int GetEventSetSize(List *sessionList) @@ -3846,11 +3842,11 @@ GetEventSetSize(List *sessionList) /* - * UpdateWaitEventSetFlags modifies the given waitEventSet with the wait flags + * RebuildWaitEventSetFlags modifies the given waitEventSet with the wait flags * for connections in the sessionList. */ static void -UpdateWaitEventSetFlags(WaitEventSet *waitEventSet, List *sessionList) +RebuildWaitEventSetFlags(WaitEventSet *waitEventSet, List *sessionList) { WorkerSession *session = NULL; foreach_ptr(session, sessionList)