mirror of https://github.com/citusdata/citus.git
remove_socket_check
parent
6e8605c27b
commit
526d50091c
|
@ -244,6 +244,7 @@ typedef struct DistributedExecution
|
||||||
*/
|
*/
|
||||||
WaitEventSet *waitEventSet;
|
WaitEventSet *waitEventSet;
|
||||||
WaitEvent *events;
|
WaitEvent *events;
|
||||||
|
int eventSetSize;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* The number of connections we aim to open per worker.
|
* The number of connections we aim to open per worker.
|
||||||
|
@ -721,7 +722,7 @@ static int GetEventSetSize(List *sessionList);
|
||||||
static bool ProcessSessionsWithFailedWaitEventSetOperations(
|
static bool ProcessSessionsWithFailedWaitEventSetOperations(
|
||||||
DistributedExecution *execution);
|
DistributedExecution *execution);
|
||||||
static bool HasIncompleteConnectionEstablishment(DistributedExecution *execution);
|
static bool HasIncompleteConnectionEstablishment(DistributedExecution *execution);
|
||||||
static int RebuildWaitEventSet(DistributedExecution *execution, bool socketClosedOnly);
|
static void RebuildWaitEventSet(DistributedExecution *execution, bool socketClosedOnly);
|
||||||
static void ProcessWaitEvents(DistributedExecution *execution, WaitEvent *events, int
|
static void ProcessWaitEvents(DistributedExecution *execution, WaitEvent *events, int
|
||||||
eventCount, bool *cancellationReceived);
|
eventCount, bool *cancellationReceived);
|
||||||
#if PG_VERSION_NUM >= PG_VERSION_15
|
#if PG_VERSION_NUM >= PG_VERSION_15
|
||||||
|
@ -2646,8 +2647,6 @@ RunDistributedExecution(DistributedExecution *execution)
|
||||||
|
|
||||||
bool cancellationReceived = false;
|
bool cancellationReceived = false;
|
||||||
|
|
||||||
int eventSetSize = GetEventSetSize(execution->sessionList);
|
|
||||||
|
|
||||||
/* always (re)build the wait event set the first time */
|
/* always (re)build the wait event set the first time */
|
||||||
execution->rebuildWaitEventSet = true;
|
execution->rebuildWaitEventSet = true;
|
||||||
|
|
||||||
|
@ -2689,7 +2688,7 @@ RunDistributedExecution(DistributedExecution *execution)
|
||||||
}
|
}
|
||||||
else if (execution->rebuildWaitEventSet)
|
else if (execution->rebuildWaitEventSet)
|
||||||
{
|
{
|
||||||
eventSetSize = RebuildWaitEventSet(execution, false);
|
RebuildWaitEventSet(execution, false);
|
||||||
|
|
||||||
skipWaitEvents =
|
skipWaitEvents =
|
||||||
ProcessSessionsWithFailedWaitEventSetOperations(execution);
|
ProcessSessionsWithFailedWaitEventSetOperations(execution);
|
||||||
|
@ -2717,7 +2716,7 @@ RunDistributedExecution(DistributedExecution *execution)
|
||||||
/* wait for I/O events */
|
/* wait for I/O events */
|
||||||
long timeout = NextEventTimeout(execution);
|
long timeout = NextEventTimeout(execution);
|
||||||
int eventCount = WaitEventSetWait(execution->waitEventSet, timeout, execution->events,
|
int eventCount = WaitEventSetWait(execution->waitEventSet, timeout, execution->events,
|
||||||
eventSetSize, WAIT_EVENT_CLIENT_READ);
|
execution->eventSetSize, WAIT_EVENT_CLIENT_READ);
|
||||||
ProcessWaitEvents(execution, execution->events, eventCount, &cancellationReceived);
|
ProcessWaitEvents(execution, execution->events, eventCount, &cancellationReceived);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2832,7 +2831,7 @@ HasIncompleteConnectionEstablishment(DistributedExecution *execution)
|
||||||
* which means that we need to update which connections we wait on for events.
|
* which means that we need to update which connections we wait on for events.
|
||||||
* It returns the new event set size.
|
* It returns the new event set size.
|
||||||
*/
|
*/
|
||||||
static int
|
static void
|
||||||
RebuildWaitEventSet(DistributedExecution *execution, bool socketClosedOnly)
|
RebuildWaitEventSet(DistributedExecution *execution, bool socketClosedOnly)
|
||||||
{
|
{
|
||||||
if (execution->events != NULL)
|
if (execution->events != NULL)
|
||||||
|
@ -2855,8 +2854,7 @@ RebuildWaitEventSet(DistributedExecution *execution, bool socketClosedOnly)
|
||||||
execution->events = palloc0(GetEventSetSize(execution->sessionList) * sizeof(WaitEvent));
|
execution->events = palloc0(GetEventSetSize(execution->sessionList) * sizeof(WaitEvent));
|
||||||
execution->rebuildWaitEventSet = false;
|
execution->rebuildWaitEventSet = false;
|
||||||
execution->waitFlagsChanged = false;
|
execution->waitFlagsChanged = false;
|
||||||
|
execution->eventSetSize = GetEventSetSize(execution->sessionList);
|
||||||
return GetEventSetSize(execution->sessionList);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -3410,14 +3408,14 @@ OpenNewConnections(WorkerPool *workerPool, int newConnectionCount,
|
||||||
}
|
}
|
||||||
|
|
||||||
bool socketClosedOnly = true;
|
bool socketClosedOnly = true;
|
||||||
int size = RebuildWaitEventSet(workerPool->distributedExecution, socketClosedOnly);
|
RebuildWaitEventSet(workerPool->distributedExecution, socketClosedOnly);
|
||||||
|
|
||||||
long timeout = 0;/* don't wait */
|
long timeout = 0;/* don't wait */
|
||||||
|
|
||||||
int eventCount =
|
int eventCount =
|
||||||
WaitEventSetWait(workerPool->distributedExecution->waitEventSet,
|
WaitEventSetWait(workerPool->distributedExecution->waitEventSet,
|
||||||
timeout, workerPool->distributedExecution->events,
|
timeout, workerPool->distributedExecution->events,
|
||||||
size, WAIT_EVENT_CLIENT_READ);
|
workerPool->distributedExecution->eventSetSize, WAIT_EVENT_CLIENT_READ);
|
||||||
|
|
||||||
ProcessWaitEventsForSocketClosed(workerPool->distributedExecution->events, eventCount);
|
ProcessWaitEventsForSocketClosed(workerPool->distributedExecution->events, eventCount);
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue