Merge pull request #3491 from citusdata/refactor/runDistributedExecution

refactor RunDistributedExecution
pull/3506/head^2
SaitTalhaNisanci 2020-02-17 14:27:14 +03:00 committed by GitHub
commit 2b08916f93
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 100 additions and 66 deletions

View File

@ -37,7 +37,7 @@
* same connection since it may hold relevant locks or have uncommitted * same connection since it may hold relevant locks or have uncommitted
* writes. In that case we "assign" the task to a connection by adding * writes. In that case we "assign" the task to a connection by adding
* it to the task queue of specific connection (in * it to the task queue of specific connection (in
* AssignTasksToConnections). Otherwise we consider the task unassigned * AssignTasksToConnectionsOrWorkerPool ). Otherwise we consider the task unassigned
* and add it to the task queue of a worker pool, which means that it * and add it to the task queue of a worker pool, which means that it
* can be executed over any connection in the pool. * can be executed over any connection in the pool.
* *
@ -576,7 +576,7 @@ static bool TaskListModifiesDatabase(RowModifyLevel modLevel, List *taskList);
static bool DistributedExecutionRequiresRollback(List *taskList); static bool DistributedExecutionRequiresRollback(List *taskList);
static bool TaskListRequires2PC(List *taskList); static bool TaskListRequires2PC(List *taskList);
static bool SelectForUpdateOnReferenceTable(RowModifyLevel modLevel, List *taskList); static bool SelectForUpdateOnReferenceTable(RowModifyLevel modLevel, List *taskList);
static void AssignTasksToConnections(DistributedExecution *execution); static void AssignTasksToConnectionsOrWorkerPool(DistributedExecution *execution);
static void UnclaimAllSessionConnections(List *sessionList); static void UnclaimAllSessionConnections(List *sessionList);
static bool UseConnectionPerPlacement(void); static bool UseConnectionPerPlacement(void);
static PlacementExecutionOrder ExecutionOrderForTask(RowModifyLevel modLevel, Task *task); static PlacementExecutionOrder ExecutionOrderForTask(RowModifyLevel modLevel, Task *task);
@ -590,7 +590,7 @@ static int UsableConnectionCount(WorkerPool *workerPool);
static long NextEventTimeout(DistributedExecution *execution); static long NextEventTimeout(DistributedExecution *execution);
static long MillisecondsBetweenTimestamps(TimestampTz startTime, TimestampTz endTime); static long MillisecondsBetweenTimestamps(TimestampTz startTime, TimestampTz endTime);
static WaitEventSet * BuildWaitEventSet(List *sessionList); static WaitEventSet * BuildWaitEventSet(List *sessionList);
static void UpdateWaitEventSetFlags(WaitEventSet *waitEventSet, List *sessionList); static void RebuildWaitEventSetFlags(WaitEventSet *waitEventSet, List *sessionList);
static TaskPlacementExecution * PopPlacementExecution(WorkerSession *session); static TaskPlacementExecution * PopPlacementExecution(WorkerSession *session);
static TaskPlacementExecution * PopAssignedPlacementExecution(WorkerSession *session); static TaskPlacementExecution * PopAssignedPlacementExecution(WorkerSession *session);
static TaskPlacementExecution * PopUnassignedPlacementExecution(WorkerPool *workerPool); static TaskPlacementExecution * PopUnassignedPlacementExecution(WorkerPool *workerPool);
@ -618,6 +618,10 @@ static bool HasDependentJobs(Job *mainJob);
static void ExtractParametersForRemoteExecution(ParamListInfo paramListInfo, static void ExtractParametersForRemoteExecution(ParamListInfo paramListInfo,
Oid **parameterTypes, Oid **parameterTypes,
const char ***parameterValues); const char ***parameterValues);
static int GetEventSetSize(List *sessionList);
static int RebuildWaitEventSet(DistributedExecution *execution);
static void ProcessWaitEvents(DistributedExecution *execution, WaitEvent *events, int
eventCount, bool *cancellationReceived);
/* /*
* AdaptiveExecutor is called via CitusExecScan on the * AdaptiveExecutor is called via CitusExecScan on the
@ -1579,13 +1583,13 @@ UnclaimAllSessionConnections(List *sessionList)
/* /*
* AssignTasksToConnections goes through the list of tasks to determine whether any * AssignTasksToConnectionsOrWorkerPool goes through the list of tasks to determine whether any
* task placements need to be assigned to particular connections because of preceding * task placements need to be assigned to particular connections because of preceding
* operations in the transaction. It then adds those connections to the pool and adds * operations in the transaction. It then adds those connections to the pool and adds
* the task placement executions to the assigned task queue of the connection. * the task placement executions to the assigned task queue of the connection.
*/ */
static void static void
AssignTasksToConnections(DistributedExecution *execution) AssignTasksToConnectionsOrWorkerPool(DistributedExecution *execution)
{ {
RowModifyLevel modLevel = execution->modLevel; RowModifyLevel modLevel = execution->modLevel;
List *taskList = execution->tasksToExecute; List *taskList = execution->tasksToExecute;
@ -2001,21 +2005,19 @@ RunDistributedExecution(DistributedExecution *execution)
{ {
WaitEvent *events = NULL; WaitEvent *events = NULL;
AssignTasksToConnections(execution); AssignTasksToConnectionsOrWorkerPool(execution);
PG_TRY(); PG_TRY();
{ {
bool cancellationReceived = false; bool cancellationReceived = false;
/* additional 2 is for postmaster and latch */ int eventSetSize = GetEventSetSize(execution->sessionList);
int eventSetSize = list_length(execution->sessionList) + 2;
/* always (re)build the wait event set the first time */ /* always (re)build the wait event set the first time */
execution->connectionSetChanged = true; execution->connectionSetChanged = true;
while (execution->unfinishedTaskCount > 0 && !cancellationReceived) while (execution->unfinishedTaskCount > 0 && !cancellationReceived)
{ {
int eventIndex = 0;
long timeout = NextEventTimeout(execution); long timeout = NextEventTimeout(execution);
WorkerPool *workerPool = NULL; WorkerPool *workerPool = NULL;
@ -2026,12 +2028,6 @@ RunDistributedExecution(DistributedExecution *execution)
if (execution->connectionSetChanged) if (execution->connectionSetChanged)
{ {
if (execution->waitEventSet != NULL)
{
FreeWaitEventSet(execution->waitEventSet);
execution->waitEventSet = NULL;
}
if (events != NULL) if (events != NULL)
{ {
/* /*
@ -2041,65 +2037,20 @@ RunDistributedExecution(DistributedExecution *execution)
pfree(events); pfree(events);
events = NULL; events = NULL;
} }
eventSetSize = RebuildWaitEventSet(execution);
execution->waitEventSet = BuildWaitEventSet(execution->sessionList);
/* recalculate (and allocate) since the sessions have changed */
eventSetSize = list_length(execution->sessionList) + 2;
events = palloc0(eventSetSize * sizeof(WaitEvent)); events = palloc0(eventSetSize * sizeof(WaitEvent));
execution->connectionSetChanged = false;
execution->waitFlagsChanged = false;
} }
else if (execution->waitFlagsChanged) else if (execution->waitFlagsChanged)
{ {
UpdateWaitEventSetFlags(execution->waitEventSet, execution->sessionList); RebuildWaitEventSetFlags(execution->waitEventSet, execution->sessionList);
execution->waitFlagsChanged = false; execution->waitFlagsChanged = false;
} }
/* wait for I/O events */ /* wait for I/O events */
int eventCount = WaitEventSetWait(execution->waitEventSet, timeout, events, int eventCount = WaitEventSetWait(execution->waitEventSet, timeout, events,
eventSetSize, WAIT_EVENT_CLIENT_READ); eventSetSize, WAIT_EVENT_CLIENT_READ);
ProcessWaitEvents(execution, events, eventCount, &cancellationReceived);
/* process I/O events */
for (; eventIndex < eventCount; eventIndex++)
{
WaitEvent *event = &events[eventIndex];
if (event->events & WL_POSTMASTER_DEATH)
{
ereport(ERROR, (errmsg("postmaster was shut down, exiting")));
}
if (event->events & WL_LATCH_SET)
{
ResetLatch(MyLatch);
if (execution->raiseInterrupts)
{
CHECK_FOR_INTERRUPTS();
}
if (IsHoldOffCancellationReceived())
{
/*
* Break out of event loop immediately in case of cancellation.
* We cannot use "return" here inside a PG_TRY() block since
* then the exception stack won't be reset.
*/
cancellationReceived = true;
break;
}
continue;
}
WorkerSession *session = (WorkerSession *) event->user_data;
session->latestUnconsumedWaitEvents = event->events;
ConnectionStateMachine(session);
}
} }
if (events != NULL) if (events != NULL)
@ -2141,6 +2092,78 @@ RunDistributedExecution(DistributedExecution *execution)
} }
/*
* RebuildWaitEventSet updates the waitEventSet for the distributed execution.
* This happens when the connection set for the distributed execution is changed,
* which means that we need to update which connections we wait on for events.
* It returns the new event set size.
*/
static int
RebuildWaitEventSet(DistributedExecution *execution)
{
if (execution->waitEventSet != NULL)
{
FreeWaitEventSet(execution->waitEventSet);
execution->waitEventSet = NULL;
}
execution->waitEventSet = BuildWaitEventSet(execution->sessionList);
execution->connectionSetChanged = false;
execution->waitFlagsChanged = false;
return GetEventSetSize(execution->sessionList);
}
/*
* ProcessWaitEvents processes the received events from connections.
*/
static void
ProcessWaitEvents(DistributedExecution *execution, WaitEvent *events, int eventCount,
bool *cancellationReceived)
{
int eventIndex = 0;
/* process I/O events */
for (; eventIndex < eventCount; eventIndex++)
{
WaitEvent *event = &events[eventIndex];
if (event->events & WL_POSTMASTER_DEATH)
{
ereport(ERROR, (errmsg("postmaster was shut down, exiting")));
}
if (event->events & WL_LATCH_SET)
{
ResetLatch(MyLatch);
if (execution->raiseInterrupts)
{
CHECK_FOR_INTERRUPTS();
}
if (IsHoldOffCancellationReceived())
{
/*
* Break out of event loop immediately in case of cancellation.
* We cannot use "return" here inside a PG_TRY() block since
* then the exception stack won't be reset.
*/
*cancellationReceived = true;
}
continue;
}
WorkerSession *session = (WorkerSession *) event->user_data;
session->latestUnconsumedWaitEvents = event->events;
ConnectionStateMachine(session);
}
}
/* /*
* ManageWorkerPool ensures the worker pool has the appropriate number of connections * ManageWorkerPool ensures the worker pool has the appropriate number of connections
* based on the number of pending tasks. * based on the number of pending tasks.
@ -3766,7 +3789,7 @@ static WaitEventSet *
BuildWaitEventSet(List *sessionList) BuildWaitEventSet(List *sessionList)
{ {
/* additional 2 is for postmaster and latch */ /* additional 2 is for postmaster and latch */
int eventSetSize = list_length(sessionList) + 2; int eventSetSize = GetEventSetSize(sessionList);
WaitEventSet *waitEventSet = WaitEventSet *waitEventSet =
CreateWaitEventSet(CurrentMemoryContext, eventSetSize); CreateWaitEventSet(CurrentMemoryContext, eventSetSize);
@ -3808,11 +3831,22 @@ BuildWaitEventSet(List *sessionList)
/* /*
* UpdateWaitEventSetFlags modifies the given waitEventSet with the wait flags * GetEventSetSize returns the event set size for a list of sessions.
*/
static int
GetEventSetSize(List *sessionList)
{
/* additional 2 is for postmaster and latch */
return list_length(sessionList) + 2;
}
/*
* RebuildWaitEventSetFlags modifies the given waitEventSet with the wait flags
* for connections in the sessionList. * for connections in the sessionList.
*/ */
static void static void
UpdateWaitEventSetFlags(WaitEventSet *waitEventSet, List *sessionList) RebuildWaitEventSetFlags(WaitEventSet *waitEventSet, List *sessionList)
{ {
WorkerSession *session = NULL; WorkerSession *session = NULL;
foreach_ptr(session, sessionList) foreach_ptr(session, sessionList)