Move WaitEvent to DistributedExecution

Prep. for caching WaitEventsSet/WaitEvents
pull/6505/head
Onder Kalaci 2022-11-18 17:19:33 +01:00
parent b5b73d78c3
commit d52da55ac0
1 changed files with 31 additions and 25 deletions

View File

@ -241,8 +241,14 @@ typedef struct DistributedExecution
* would make code a bit harder to read than making this non-local, so we * would make code a bit harder to read than making this non-local, so we
* move it here. See comments for PG_TRY() in postgres/src/include/elog.h * move it here. See comments for PG_TRY() in postgres/src/include/elog.h
* and "man 3 siglongjmp" for more context. * and "man 3 siglongjmp" for more context.
*
* Another reason for keeping these here is to cache a single
* WaitEventSet/WaitEvent within the execution pair until we
* need to rebuild the waitEvents.
*/ */
WaitEventSet *waitEventSet; WaitEventSet *waitEventSet;
WaitEvent *events;
int eventSetSize;
/* /*
* The number of connections we aim to open per worker. * The number of connections we aim to open per worker.
@ -720,7 +726,7 @@ static int GetEventSetSize(List *sessionList);
static bool ProcessSessionsWithFailedWaitEventSetOperations( static bool ProcessSessionsWithFailedWaitEventSetOperations(
DistributedExecution *execution); DistributedExecution *execution);
static bool HasIncompleteConnectionEstablishment(DistributedExecution *execution); static bool HasIncompleteConnectionEstablishment(DistributedExecution *execution);
static int RebuildWaitEventSet(DistributedExecution *execution); static void RebuildWaitEventSet(DistributedExecution *execution);
static void ProcessWaitEvents(DistributedExecution *execution, WaitEvent *events, int static void ProcessWaitEvents(DistributedExecution *execution, WaitEvent *events, int
eventCount, bool *cancellationReceived); eventCount, bool *cancellationReceived);
#if PG_VERSION_NUM >= PG_VERSION_15 #if PG_VERSION_NUM >= PG_VERSION_15
@ -2632,8 +2638,6 @@ SequentialRunDistributedExecution(DistributedExecution *execution)
void void
RunDistributedExecution(DistributedExecution *execution) RunDistributedExecution(DistributedExecution *execution)
{ {
WaitEvent *events = NULL;
AssignTasksToConnectionsOrWorkerPool(execution); AssignTasksToConnectionsOrWorkerPool(execution);
PG_TRY(); PG_TRY();
@ -2647,8 +2651,6 @@ RunDistributedExecution(DistributedExecution *execution)
bool cancellationReceived = false; bool cancellationReceived = false;
int eventSetSize = GetEventSetSize(execution->sessionList);
/* always (re)build the wait event set the first time */ /* always (re)build the wait event set the first time */
execution->rebuildWaitEventSet = true; execution->rebuildWaitEventSet = true;
@ -2690,17 +2692,7 @@ RunDistributedExecution(DistributedExecution *execution)
} }
else if (execution->rebuildWaitEventSet) else if (execution->rebuildWaitEventSet)
{ {
if (events != NULL) RebuildWaitEventSet(execution);
{
/*
* The execution might take a while, so explicitly free at this point
* because we don't need anymore.
*/
pfree(events);
events = NULL;
}
eventSetSize = RebuildWaitEventSet(execution);
events = palloc0(eventSetSize * sizeof(WaitEvent));
skipWaitEvents = skipWaitEvents =
ProcessSessionsWithFailedWaitEventSetOperations(execution); ProcessSessionsWithFailedWaitEventSetOperations(execution);
@ -2727,14 +2719,18 @@ RunDistributedExecution(DistributedExecution *execution)
/* wait for I/O events */ /* wait for I/O events */
long timeout = NextEventTimeout(execution); long timeout = NextEventTimeout(execution);
int eventCount = WaitEventSetWait(execution->waitEventSet, timeout, events, int eventCount =
eventSetSize, WAIT_EVENT_CLIENT_READ); WaitEventSetWait(execution->waitEventSet, timeout, execution->events,
ProcessWaitEvents(execution, events, eventCount, &cancellationReceived); execution->eventSetSize, WAIT_EVENT_CLIENT_READ);
ProcessWaitEvents(execution, execution->events, eventCount,
&cancellationReceived);
} }
if (events != NULL) if (execution->events != NULL)
{ {
pfree(events); pfree(execution->events);
execution->events = NULL;
} }
if (execution->waitEventSet != NULL) if (execution->waitEventSet != NULL)
@ -2841,11 +2837,20 @@ HasIncompleteConnectionEstablishment(DistributedExecution *execution)
* RebuildWaitEventSet updates the waitEventSet for the distributed execution. * RebuildWaitEventSet updates the waitEventSet for the distributed execution.
* This happens when the connection set for the distributed execution is changed, * This happens when the connection set for the distributed execution is changed,
* which means that we need to update which connections we wait on for events. * which means that we need to update which connections we wait on for events.
* It returns the new event set size.
*/ */
static int static void
RebuildWaitEventSet(DistributedExecution *execution) RebuildWaitEventSet(DistributedExecution *execution)
{ {
if (execution->events != NULL)
{
/*
* The execution might take a while, so explicitly free at this point
* because we don't need anymore.
*/
pfree(execution->events);
execution->events = NULL;
}
if (execution->waitEventSet != NULL) if (execution->waitEventSet != NULL)
{ {
FreeWaitEventSet(execution->waitEventSet); FreeWaitEventSet(execution->waitEventSet);
@ -2853,10 +2858,11 @@ RebuildWaitEventSet(DistributedExecution *execution)
} }
execution->waitEventSet = BuildWaitEventSet(execution->sessionList); execution->waitEventSet = BuildWaitEventSet(execution->sessionList);
execution->eventSetSize = GetEventSetSize(execution->sessionList);
execution->events = palloc0(execution->eventSetSize * sizeof(WaitEvent));
execution->rebuildWaitEventSet = false; execution->rebuildWaitEventSet = false;
execution->waitFlagsChanged = false; execution->waitFlagsChanged = false;
return GetEventSetSize(execution->sessionList);
} }