mirror of https://github.com/citusdata/citus.git
create ProcessWaitEvents for separating the logic of handling events
parent
c35981f9de
commit
355805c7d8
|
@ -620,6 +620,8 @@ static void ExtractParametersForRemoteExecution(ParamListInfo paramListInfo,
|
||||||
const char ***parameterValues);
|
const char ***parameterValues);
|
||||||
static int GetEventSetSize(List *sessionList);
|
static int GetEventSetSize(List *sessionList);
|
||||||
static int UpdateWaitEventSet(DistributedExecution *execution);
|
static int UpdateWaitEventSet(DistributedExecution *execution);
|
||||||
|
static bool ProcessWaitEvents(DistributedExecution *execution, WaitEvent *events, int
|
||||||
|
eventCount);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* AdaptiveExecutor is called via CitusExecScan on the
|
* AdaptiveExecutor is called via CitusExecScan on the
|
||||||
|
@ -2048,46 +2050,9 @@ RunDistributedExecution(DistributedExecution *execution)
|
||||||
/* wait for I/O events */
|
/* wait for I/O events */
|
||||||
int eventCount = WaitEventSetWait(execution->waitEventSet, timeout, events,
|
int eventCount = WaitEventSetWait(execution->waitEventSet, timeout, events,
|
||||||
eventSetSize, WAIT_EVENT_CLIENT_READ);
|
eventSetSize, WAIT_EVENT_CLIENT_READ);
|
||||||
|
if (!ProcessWaitEvents(execution, events, eventCount))
|
||||||
int eventIndex = 0;
|
|
||||||
|
|
||||||
/* process I/O events */
|
|
||||||
for (; eventIndex < eventCount; eventIndex++)
|
|
||||||
{
|
{
|
||||||
WaitEvent *event = &events[eventIndex];
|
cancellationReceived = true;
|
||||||
|
|
||||||
if (event->events & WL_POSTMASTER_DEATH)
|
|
||||||
{
|
|
||||||
ereport(ERROR, (errmsg("postmaster was shut down, exiting")));
|
|
||||||
}
|
|
||||||
|
|
||||||
if (event->events & WL_LATCH_SET)
|
|
||||||
{
|
|
||||||
ResetLatch(MyLatch);
|
|
||||||
|
|
||||||
if (execution->raiseInterrupts)
|
|
||||||
{
|
|
||||||
CHECK_FOR_INTERRUPTS();
|
|
||||||
}
|
|
||||||
|
|
||||||
if (IsHoldOffCancellationReceived())
|
|
||||||
{
|
|
||||||
/*
|
|
||||||
* Break out of event loop immediately in case of cancellation.
|
|
||||||
* We cannot use "return" here inside a PG_TRY() block since
|
|
||||||
* then the exception stack won't be reset.
|
|
||||||
*/
|
|
||||||
cancellationReceived = true;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
WorkerSession *session = (WorkerSession *) event->user_data;
|
|
||||||
session->latestUnconsumedWaitEvents = event->events;
|
|
||||||
|
|
||||||
ConnectionStateMachine(session);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2153,6 +2118,56 @@ UpdateWaitEventSet(DistributedExecution *execution)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* ProcessWaitEvents processes the received events from connections.
|
||||||
|
* It returns true if all events are processed, otherwise false.
|
||||||
|
*/
|
||||||
|
static bool
|
||||||
|
ProcessWaitEvents(DistributedExecution *execution, WaitEvent *events, int eventCount)
|
||||||
|
{
|
||||||
|
int eventIndex = 0;
|
||||||
|
|
||||||
|
/* process I/O events */
|
||||||
|
for (; eventIndex < eventCount; eventIndex++)
|
||||||
|
{
|
||||||
|
WaitEvent *event = &events[eventIndex];
|
||||||
|
|
||||||
|
if (event->events & WL_POSTMASTER_DEATH)
|
||||||
|
{
|
||||||
|
ereport(ERROR, (errmsg("postmaster was shut down, exiting")));
|
||||||
|
}
|
||||||
|
|
||||||
|
if (event->events & WL_LATCH_SET)
|
||||||
|
{
|
||||||
|
ResetLatch(MyLatch);
|
||||||
|
|
||||||
|
if (execution->raiseInterrupts)
|
||||||
|
{
|
||||||
|
CHECK_FOR_INTERRUPTS();
|
||||||
|
}
|
||||||
|
|
||||||
|
if (IsHoldOffCancellationReceived())
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* Break out of event loop immediately in case of cancellation.
|
||||||
|
* We cannot use "return" here inside a PG_TRY() block since
|
||||||
|
* then the exception stack won't be reset.
|
||||||
|
*/
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
WorkerSession *session = (WorkerSession *) event->user_data;
|
||||||
|
session->latestUnconsumedWaitEvents = event->events;
|
||||||
|
|
||||||
|
ConnectionStateMachine(session);
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* ManageWorkerPool ensures the worker pool has the appropriate number of connections
|
* ManageWorkerPool ensures the worker pool has the appropriate number of connections
|
||||||
* based on the number of pending tasks.
|
* based on the number of pending tasks.
|
||||||
|
|
Loading…
Reference in New Issue