create UpdateWaitEventSet for better readability

pull/3491/head
SaitTalhaNisanci 2020-02-12 12:11:46 +03:00
parent a7e735a648
commit c35981f9de
1 changed files with 27 additions and 15 deletions

View File

@ -619,6 +619,7 @@ static void ExtractParametersForRemoteExecution(ParamListInfo paramListInfo,
Oid **parameterTypes, Oid **parameterTypes,
const char ***parameterValues); const char ***parameterValues);
static int GetEventSetSize(List *sessionList); static int GetEventSetSize(List *sessionList);
static int UpdateWaitEventSet(DistributedExecution *execution);
/* /*
* AdaptiveExecutor is called via CitusExecScan on the * AdaptiveExecutor is called via CitusExecScan on the
@ -2015,7 +2016,6 @@ RunDistributedExecution(DistributedExecution *execution)
while (execution->unfinishedTaskCount > 0 && !cancellationReceived) while (execution->unfinishedTaskCount > 0 && !cancellationReceived)
{ {
int eventIndex = 0;
long timeout = NextEventTimeout(execution); long timeout = NextEventTimeout(execution);
WorkerPool *workerPool = NULL; WorkerPool *workerPool = NULL;
@ -2026,12 +2026,6 @@ RunDistributedExecution(DistributedExecution *execution)
if (execution->connectionSetChanged) if (execution->connectionSetChanged)
{ {
if (execution->waitEventSet != NULL)
{
FreeWaitEventSet(execution->waitEventSet);
execution->waitEventSet = NULL;
}
if (events != NULL) if (events != NULL)
{ {
/* /*
@ -2041,16 +2035,9 @@ RunDistributedExecution(DistributedExecution *execution)
pfree(events); pfree(events);
events = NULL; events = NULL;
} }
eventSetSize = UpdateWaitEventSet(execution);
execution->waitEventSet = BuildWaitEventSet(execution->sessionList);
/* recalculate (and allocate) since the sessions have changed */
eventSetSize = GetEventSetSize(execution->sessionList);
events = palloc0(eventSetSize * sizeof(WaitEvent)); events = palloc0(eventSetSize * sizeof(WaitEvent));
execution->connectionSetChanged = false;
execution->waitFlagsChanged = false;
} }
else if (execution->waitFlagsChanged) else if (execution->waitFlagsChanged)
{ {
@ -2062,6 +2049,8 @@ RunDistributedExecution(DistributedExecution *execution)
int eventCount = WaitEventSetWait(execution->waitEventSet, timeout, events, int eventCount = WaitEventSetWait(execution->waitEventSet, timeout, events,
eventSetSize, WAIT_EVENT_CLIENT_READ); eventSetSize, WAIT_EVENT_CLIENT_READ);
int eventIndex = 0;
/* process I/O events */ /* process I/O events */
for (; eventIndex < eventCount; eventIndex++) for (; eventIndex < eventCount; eventIndex++)
{ {
@ -2141,6 +2130,29 @@ RunDistributedExecution(DistributedExecution *execution)
} }
/*
* UpdateWaitEventSet updates the waitEventSet for the distributed execution.
* This happens when the connection set for the distributed execution is changed,
* which means that we need to update which connections we wait on for events.
* It returns the new event set size.
*/
static int
UpdateWaitEventSet(DistributedExecution *execution)
{
if (execution->waitEventSet != NULL)
{
FreeWaitEventSet(execution->waitEventSet);
execution->waitEventSet = NULL;
}
execution->waitEventSet = BuildWaitEventSet(execution->sessionList);
execution->connectionSetChanged = false;
execution->waitFlagsChanged = false;
return GetEventSetSize(execution->sessionList);
}
/* /*
* ManageWorkerPool ensures the worker pool has the appropriate number of connections * ManageWorkerPool ensures the worker pool has the appropriate number of connections
* based on the number of pending tasks. * based on the number of pending tasks.