From a7e735a64852f8661bbfec5e8186366be740fd75 Mon Sep 17 00:00:00 2001 From: SaitTalhaNisanci Date: Wed, 12 Feb 2020 11:46:38 +0300 Subject: [PATCH 1/5] use a utility method to get event size --- .../distributed/executor/adaptive_executor.c | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index 415c26d2a..e53d80ea3 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -618,6 +618,7 @@ static bool HasDependentJobs(Job *mainJob); static void ExtractParametersForRemoteExecution(ParamListInfo paramListInfo, Oid **parameterTypes, const char ***parameterValues); +static int GetEventSetSize(List *sessionList); /* * AdaptiveExecutor is called via CitusExecScan on the @@ -2007,8 +2008,7 @@ RunDistributedExecution(DistributedExecution *execution) { bool cancellationReceived = false; - /* additional 2 is for postmaster and latch */ - int eventSetSize = list_length(execution->sessionList) + 2; + int eventSetSize = GetEventSetSize(execution->sessionList); /* always (re)build the wait event set the first time */ execution->connectionSetChanged = true; @@ -2045,7 +2045,7 @@ RunDistributedExecution(DistributedExecution *execution) execution->waitEventSet = BuildWaitEventSet(execution->sessionList); /* recalculate (and allocate) since the sessions have changed */ - eventSetSize = list_length(execution->sessionList) + 2; + eventSetSize = GetEventSetSize(execution->sessionList); events = palloc0(eventSetSize * sizeof(WaitEvent)); @@ -3766,7 +3766,7 @@ static WaitEventSet * BuildWaitEventSet(List *sessionList) { /* additional 2 is for postmaster and latch */ - int eventSetSize = list_length(sessionList) + 2; + int eventSetSize = GetEventSetSize(sessionList); WaitEventSet *waitEventSet = CreateWaitEventSet(CurrentMemoryContext, eventSetSize); @@ -3807,6 +3807,17 @@ BuildWaitEventSet(List *sessionList) } +/* + * GetEventSetSize returns the event set size. + */ +static int +GetEventSetSize(List *sessionList) +{ + /* additional 2 is for postmaster and latch */ + return list_length(sessionList) + 2; +} + + /* * UpdateWaitEventSetFlags modifies the given waitEventSet with the wait flags * for connections in the sessionList. From c35981f9de1e8fd1b9210c08b3d54adfba289286 Mon Sep 17 00:00:00 2001 From: SaitTalhaNisanci Date: Wed, 12 Feb 2020 12:11:46 +0300 Subject: [PATCH 2/5] create UpdateWaitEventSet for better readability --- .../distributed/executor/adaptive_executor.c | 42 ++++++++++++------- 1 file changed, 27 insertions(+), 15 deletions(-) diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index e53d80ea3..befd47e3d 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -619,6 +619,7 @@ static void ExtractParametersForRemoteExecution(ParamListInfo paramListInfo, Oid **parameterTypes, const char ***parameterValues); static int GetEventSetSize(List *sessionList); +static int UpdateWaitEventSet(DistributedExecution *execution); /* * AdaptiveExecutor is called via CitusExecScan on the @@ -2015,7 +2016,6 @@ RunDistributedExecution(DistributedExecution *execution) while (execution->unfinishedTaskCount > 0 && !cancellationReceived) { - int eventIndex = 0; long timeout = NextEventTimeout(execution); WorkerPool *workerPool = NULL; @@ -2026,12 +2026,6 @@ RunDistributedExecution(DistributedExecution *execution) if (execution->connectionSetChanged) { - if (execution->waitEventSet != NULL) - { - FreeWaitEventSet(execution->waitEventSet); - execution->waitEventSet = NULL; - } - if (events != NULL) { /* @@ -2041,16 +2035,9 @@ RunDistributedExecution(DistributedExecution *execution) pfree(events); events = NULL; } - - execution->waitEventSet = BuildWaitEventSet(execution->sessionList); - - /* recalculate (and allocate) since the sessions have changed */ - eventSetSize = GetEventSetSize(execution->sessionList); + eventSetSize = UpdateWaitEventSet(execution); events = palloc0(eventSetSize * sizeof(WaitEvent)); - - execution->connectionSetChanged = false; - execution->waitFlagsChanged = false; } else if (execution->waitFlagsChanged) { @@ -2062,6 +2049,8 @@ RunDistributedExecution(DistributedExecution *execution) int eventCount = WaitEventSetWait(execution->waitEventSet, timeout, events, eventSetSize, WAIT_EVENT_CLIENT_READ); + int eventIndex = 0; + /* process I/O events */ for (; eventIndex < eventCount; eventIndex++) { @@ -2141,6 +2130,29 @@ RunDistributedExecution(DistributedExecution *execution) } +/* + * UpdateWaitEventSet updates the waitEventSet for the distributed execution. + * This happens when the connection set for the distributed execution is changed, + * which means that we need to update which connections we wait on for events. + * It returns the new event set size. + */ +static int +UpdateWaitEventSet(DistributedExecution *execution) +{ + if (execution->waitEventSet != NULL) + { + FreeWaitEventSet(execution->waitEventSet); + execution->waitEventSet = NULL; + } + + execution->waitEventSet = BuildWaitEventSet(execution->sessionList); + execution->connectionSetChanged = false; + execution->waitFlagsChanged = false; + + return GetEventSetSize(execution->sessionList); +} + + /* * ManageWorkerPool ensures the worker pool has the appropriate number of connections * based on the number of pending tasks. From 355805c7d8ee5cf1003148771230ec49a9b14152 Mon Sep 17 00:00:00 2001 From: SaitTalhaNisanci Date: Wed, 12 Feb 2020 12:20:37 +0300 Subject: [PATCH 3/5] create ProcessWaitEvents for separating the logic of handling events --- .../distributed/executor/adaptive_executor.c | 93 +++++++++++-------- 1 file changed, 54 insertions(+), 39 deletions(-) diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index befd47e3d..521ae798c 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -620,6 +620,8 @@ static void ExtractParametersForRemoteExecution(ParamListInfo paramListInfo, const char ***parameterValues); static int GetEventSetSize(List *sessionList); static int UpdateWaitEventSet(DistributedExecution *execution); +static bool ProcessWaitEvents(DistributedExecution *execution, WaitEvent *events, int + eventCount); /* * AdaptiveExecutor is called via CitusExecScan on the @@ -2048,46 +2050,9 @@ RunDistributedExecution(DistributedExecution *execution) /* wait for I/O events */ int eventCount = WaitEventSetWait(execution->waitEventSet, timeout, events, eventSetSize, WAIT_EVENT_CLIENT_READ); - - int eventIndex = 0; - - /* process I/O events */ - for (; eventIndex < eventCount; eventIndex++) + if (!ProcessWaitEvents(execution, events, eventCount)) { - WaitEvent *event = &events[eventIndex]; - - if (event->events & WL_POSTMASTER_DEATH) - { - ereport(ERROR, (errmsg("postmaster was shut down, exiting"))); - } - - if (event->events & WL_LATCH_SET) - { - ResetLatch(MyLatch); - - if (execution->raiseInterrupts) - { - CHECK_FOR_INTERRUPTS(); - } - - if (IsHoldOffCancellationReceived()) - { - /* - * Break out of event loop immediately in case of cancellation. - * We cannot use "return" here inside a PG_TRY() block since - * then the exception stack won't be reset. - */ - cancellationReceived = true; - break; - } - - continue; - } - - WorkerSession *session = (WorkerSession *) event->user_data; - session->latestUnconsumedWaitEvents = event->events; - - ConnectionStateMachine(session); + cancellationReceived = true; } } @@ -2153,6 +2118,56 @@ UpdateWaitEventSet(DistributedExecution *execution) } +/* + * ProcessWaitEvents processes the received events from connections. + * It returns true if all events are processed, otherwise false. + */ +static bool +ProcessWaitEvents(DistributedExecution *execution, WaitEvent *events, int eventCount) +{ + int eventIndex = 0; + + /* process I/O events */ + for (; eventIndex < eventCount; eventIndex++) + { + WaitEvent *event = &events[eventIndex]; + + if (event->events & WL_POSTMASTER_DEATH) + { + ereport(ERROR, (errmsg("postmaster was shut down, exiting"))); + } + + if (event->events & WL_LATCH_SET) + { + ResetLatch(MyLatch); + + if (execution->raiseInterrupts) + { + CHECK_FOR_INTERRUPTS(); + } + + if (IsHoldOffCancellationReceived()) + { + /* + * Break out of event loop immediately in case of cancellation. + * We cannot use "return" here inside a PG_TRY() block since + * then the exception stack won't be reset. + */ + return false; + } + + continue; + } + + WorkerSession *session = (WorkerSession *) event->user_data; + session->latestUnconsumedWaitEvents = event->events; + + ConnectionStateMachine(session); + } + return true; +} + + /* * ManageWorkerPool ensures the worker pool has the appropriate number of connections * based on the number of pending tasks. From 1b78045867dadf4f01e8c5541acb209575063a7c Mon Sep 17 00:00:00 2001 From: SaitTalhaNisanci Date: Wed, 12 Feb 2020 12:26:00 +0300 Subject: [PATCH 4/5] rename AssignTasksToConnections with AssignTasksToConnectionsOrWorkerPool --- src/backend/distributed/executor/adaptive_executor.c | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index 521ae798c..a4851d69b 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -37,7 +37,7 @@ * same connection since it may hold relevant locks or have uncommitted * writes. In that case we "assign" the task to a connection by adding * it to the task queue of specific connection (in - * AssignTasksToConnections). Otherwise we consider the task unassigned + * AssignTasksToConnectionsOrWorkerPool ). Otherwise we consider the task unassigned * and add it to the task queue of a worker pool, which means that it * can be executed over any connection in the pool. * @@ -576,7 +576,7 @@ static bool TaskListModifiesDatabase(RowModifyLevel modLevel, List *taskList); static bool DistributedExecutionRequiresRollback(List *taskList); static bool TaskListRequires2PC(List *taskList); static bool SelectForUpdateOnReferenceTable(RowModifyLevel modLevel, List *taskList); -static void AssignTasksToConnections(DistributedExecution *execution); +static void AssignTasksToConnectionsOrWorkerPool (DistributedExecution *execution); static void UnclaimAllSessionConnections(List *sessionList); static bool UseConnectionPerPlacement(void); static PlacementExecutionOrder ExecutionOrderForTask(RowModifyLevel modLevel, Task *task); @@ -1583,13 +1583,13 @@ UnclaimAllSessionConnections(List *sessionList) /* - * AssignTasksToConnections goes through the list of tasks to determine whether any + * AssignTasksToConnectionsOrWorkerPool goes through the list of tasks to determine whether any * task placements need to be assigned to particular connections because of preceding * operations in the transaction. It then adds those connections to the pool and adds * the task placement executions to the assigned task queue of the connection. */ static void -AssignTasksToConnections(DistributedExecution *execution) +AssignTasksToConnectionsOrWorkerPool (DistributedExecution *execution) { RowModifyLevel modLevel = execution->modLevel; List *taskList = execution->tasksToExecute; @@ -2005,7 +2005,7 @@ RunDistributedExecution(DistributedExecution *execution) { WaitEvent *events = NULL; - AssignTasksToConnections(execution); + AssignTasksToConnectionsOrWorkerPool (execution); PG_TRY(); { From 9302e6e6998bf82fc45a3126379affaf50ab2e23 Mon Sep 17 00:00:00 2001 From: SaitTalhaNisanci Date: Wed, 12 Feb 2020 16:52:58 +0300 Subject: [PATCH 5/5] apply review items --- .../distributed/executor/adaptive_executor.c | 42 +++++++++---------- 1 file changed, 19 insertions(+), 23 deletions(-) diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index a4851d69b..f10f25f7f 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -576,7 +576,7 @@ static bool TaskListModifiesDatabase(RowModifyLevel modLevel, List *taskList); static bool DistributedExecutionRequiresRollback(List *taskList); static bool TaskListRequires2PC(List *taskList); static bool SelectForUpdateOnReferenceTable(RowModifyLevel modLevel, List *taskList); -static void AssignTasksToConnectionsOrWorkerPool (DistributedExecution *execution); +static void AssignTasksToConnectionsOrWorkerPool(DistributedExecution *execution); static void UnclaimAllSessionConnections(List *sessionList); static bool UseConnectionPerPlacement(void); static PlacementExecutionOrder ExecutionOrderForTask(RowModifyLevel modLevel, Task *task); @@ -590,7 +590,7 @@ static int UsableConnectionCount(WorkerPool *workerPool); static long NextEventTimeout(DistributedExecution *execution); static long MillisecondsBetweenTimestamps(TimestampTz startTime, TimestampTz endTime); static WaitEventSet * BuildWaitEventSet(List *sessionList); -static void UpdateWaitEventSetFlags(WaitEventSet *waitEventSet, List *sessionList); +static void RebuildWaitEventSetFlags(WaitEventSet *waitEventSet, List *sessionList); static TaskPlacementExecution * PopPlacementExecution(WorkerSession *session); static TaskPlacementExecution * PopAssignedPlacementExecution(WorkerSession *session); static TaskPlacementExecution * PopUnassignedPlacementExecution(WorkerPool *workerPool); @@ -619,9 +619,9 @@ static void ExtractParametersForRemoteExecution(ParamListInfo paramListInfo, Oid **parameterTypes, const char ***parameterValues); static int GetEventSetSize(List *sessionList); -static int UpdateWaitEventSet(DistributedExecution *execution); -static bool ProcessWaitEvents(DistributedExecution *execution, WaitEvent *events, int - eventCount); +static int RebuildWaitEventSet(DistributedExecution *execution); +static void ProcessWaitEvents(DistributedExecution *execution, WaitEvent *events, int + eventCount, bool *cancellationReceived); /* * AdaptiveExecutor is called via CitusExecScan on the @@ -1589,7 +1589,7 @@ UnclaimAllSessionConnections(List *sessionList) * the task placement executions to the assigned task queue of the connection. */ static void -AssignTasksToConnectionsOrWorkerPool (DistributedExecution *execution) +AssignTasksToConnectionsOrWorkerPool(DistributedExecution *execution) { RowModifyLevel modLevel = execution->modLevel; List *taskList = execution->tasksToExecute; @@ -2005,7 +2005,7 @@ RunDistributedExecution(DistributedExecution *execution) { WaitEvent *events = NULL; - AssignTasksToConnectionsOrWorkerPool (execution); + AssignTasksToConnectionsOrWorkerPool(execution); PG_TRY(); { @@ -2037,23 +2037,20 @@ RunDistributedExecution(DistributedExecution *execution) pfree(events); events = NULL; } - eventSetSize = UpdateWaitEventSet(execution); + eventSetSize = RebuildWaitEventSet(execution); events = palloc0(eventSetSize * sizeof(WaitEvent)); } else if (execution->waitFlagsChanged) { - UpdateWaitEventSetFlags(execution->waitEventSet, execution->sessionList); + RebuildWaitEventSetFlags(execution->waitEventSet, execution->sessionList); execution->waitFlagsChanged = false; } /* wait for I/O events */ int eventCount = WaitEventSetWait(execution->waitEventSet, timeout, events, eventSetSize, WAIT_EVENT_CLIENT_READ); - if (!ProcessWaitEvents(execution, events, eventCount)) - { - cancellationReceived = true; - } + ProcessWaitEvents(execution, events, eventCount, &cancellationReceived); } if (events != NULL) @@ -2096,13 +2093,13 @@ RunDistributedExecution(DistributedExecution *execution) /* - * UpdateWaitEventSet updates the waitEventSet for the distributed execution. + * RebuildWaitEventSet updates the waitEventSet for the distributed execution. * This happens when the connection set for the distributed execution is changed, * which means that we need to update which connections we wait on for events. * It returns the new event set size. */ static int -UpdateWaitEventSet(DistributedExecution *execution) +RebuildWaitEventSet(DistributedExecution *execution) { if (execution->waitEventSet != NULL) { @@ -2120,10 +2117,10 @@ UpdateWaitEventSet(DistributedExecution *execution) /* * ProcessWaitEvents processes the received events from connections. - * It returns true if all events are processed, otherwise false. */ -static bool -ProcessWaitEvents(DistributedExecution *execution, WaitEvent *events, int eventCount) +static void +ProcessWaitEvents(DistributedExecution *execution, WaitEvent *events, int eventCount, + bool *cancellationReceived) { int eventIndex = 0; @@ -2153,7 +2150,7 @@ ProcessWaitEvents(DistributedExecution *execution, WaitEvent *events, int eventC * We cannot use "return" here inside a PG_TRY() block since * then the exception stack won't be reset. */ - return false; + *cancellationReceived = true; } continue; @@ -2164,7 +2161,6 @@ ProcessWaitEvents(DistributedExecution *execution, WaitEvent *events, int eventC ConnectionStateMachine(session); } - return true; } @@ -3835,7 +3831,7 @@ BuildWaitEventSet(List *sessionList) /* - * GetEventSetSize returns the event set size. + * GetEventSetSize returns the event set size for a list of sessions. */ static int GetEventSetSize(List *sessionList) @@ -3846,11 +3842,11 @@ GetEventSetSize(List *sessionList) /* - * UpdateWaitEventSetFlags modifies the given waitEventSet with the wait flags + * RebuildWaitEventSetFlags modifies the given waitEventSet with the wait flags * for connections in the sessionList. */ static void -UpdateWaitEventSetFlags(WaitEventSet *waitEventSet, List *sessionList) +RebuildWaitEventSetFlags(WaitEventSet *waitEventSet, List *sessionList) { WorkerSession *session = NULL; foreach_ptr(session, sessionList)