remove_socket_check
Onder Kalaci 2022-11-18 16:39:02 +01:00
parent 526d50091c
commit 3abaaaad1f
1 changed files with 33 additions and 100 deletions

View File

@ -682,9 +682,10 @@ static void CheckConnectionTimeout(WorkerPool *workerPool);
static void MarkEstablishingSessionsTimedOut(WorkerPool *workerPool);
static int UsableConnectionCount(WorkerPool *workerPool);
static long NextEventTimeout(DistributedExecution *execution);
static WaitEventSet * BuildWaitEventSet(List *sessionList, bool s);
static WaitEventSet * BuildWaitEventSet(List *sessionList, bool socketClosedOnly);
static void AddSessionToWaitEventSet(WorkerSession *session,
WaitEventSet *waitEventSet, bool s);
WaitEventSet *waitEventSet,
bool socketClosedOnly);
static void RebuildWaitEventSetFlags(WaitEventSet *waitEventSet, List *sessionList);
static TaskPlacementExecution * PopPlacementExecution(WorkerSession *session);
static TaskPlacementExecution * PopAssignedPlacementExecution(WorkerSession *session);
@ -726,7 +727,6 @@ static void RebuildWaitEventSet(DistributedExecution *execution, bool socketClos
static void ProcessWaitEvents(DistributedExecution *execution, WaitEvent *events, int
eventCount, bool *cancellationReceived);
#if PG_VERSION_NUM >= PG_VERSION_15
static void ProcessWaitEventsForSocketClosed(WaitEvent *events, int eventCount);
#endif
static long MillisecondsBetweenTimestamps(instr_time startTime, instr_time endTime);
@ -2438,18 +2438,6 @@ FindOrCreateWorkerSession(WorkerPool *workerPool, MultiConnection *connection)
dlist_init(&session->pendingTaskQueue);
dlist_init(&session->readyTaskQueue);
/*
* Before using this connection in the distributed execution, we check
* whether the remote connection is closed/lost. This is common
* when we have a cached connection and remote server restarted
* (due to failover or restart etc.). We do this because we can
* retry connection a single time.
*/
if (RemoteSocketClosedForNewSession(session))
{
connection->connectionState = MULTI_CONNECTION_LOST;
}
if (connection->connectionState == MULTI_CONNECTION_CONNECTED)
{
/* keep track of how many connections are ready */
@ -2478,76 +2466,6 @@ FindOrCreateWorkerSession(WorkerPool *workerPool, MultiConnection *connection)
}
/*
* RemoteSocketClosedForNewSession is a helper function for detecting whether
* the remote socket corresponding to the input session is closed. This is
* mostly common there is a cached connection and remote server restarted
* (due to failover or restart etc.).
*
* The function is not a generic function that can be called at the start of
* the execution. The function is not generic because it does not check all
* the events, even ignores cancellation events. Future callers of this
* function should consider its limitations.
*/
static bool
RemoteSocketClosedForNewSession(WorkerSession *session)
{
bool socketClosed = false;
return false;
#if PG_VERSION_NUM >= PG_VERSION_15
if (!WaitEventSetCanReportClosed())
{
/* we cannot detect for this OS */
return socketClosed;
}
MultiConnection *connection = session->connection;
long timeout = 0;/* don't wait */
int eventSetSize = 2;
WaitEvent *events = palloc0(eventSetSize * sizeof(WaitEvent));
/*
* Only wait for WL_SOCKET_CLOSED and postmaster death, do not even check
* for cancellations. Everything else are going to be checked soon in the
* main event processing. At this point, our only goal is to understand
* whether the remote socket is closed or not.
*/
int originalWaitFlags = connection->waitFlags;
connection->waitFlags = WL_SOCKET_CLOSED;
WaitEventSet *waitEventSet =
CreateWaitEventSet(CurrentMemoryContext, eventSetSize);
AddSessionToWaitEventSet(session, waitEventSet, true);
/* always good to wait for postmaster death */
CitusAddWaitEventSetToSet(waitEventSet, WL_POSTMASTER_DEATH, PGINVALID_SOCKET, NULL,
NULL);
int eventCount = WaitEventSetWait(waitEventSet, timeout, events,
eventSetSize, WAIT_EVENT_CLIENT_READ);
ProcessWaitEventsForSocketClosed(events, eventCount);
/* we can at most receive a single event, which is WL_SOCKET_CLOSED */
Assert(eventCount <= 1);
FreeWaitEventSet(waitEventSet);
pfree(events);
/*
* We only searched for WL_SOCKET_CLOSED, and we processed the
* event already. Now, set back to the original flags.
*/
UpdateConnectionWaitFlags(session, originalWaitFlags);
session->latestUnconsumedWaitEvents = 0;
#endif
return socketClosed;
}
/*
* ShouldRunTasksSequentially returns true if each of the individual tasks
* should be executed one by one. Note that this is different than
@ -2715,9 +2633,12 @@ RunDistributedExecution(DistributedExecution *execution)
/* wait for I/O events */
long timeout = NextEventTimeout(execution);
int eventCount = WaitEventSetWait(execution->waitEventSet, timeout, execution->events,
execution->eventSetSize, WAIT_EVENT_CLIENT_READ);
ProcessWaitEvents(execution, execution->events, eventCount, &cancellationReceived);
int eventCount = WaitEventSetWait(execution->waitEventSet, timeout,
execution->events,
execution->eventSetSize,
WAIT_EVENT_CLIENT_READ);
ProcessWaitEvents(execution, execution->events, eventCount,
&cancellationReceived);
}
if (execution->events != NULL)
@ -2851,7 +2772,8 @@ RebuildWaitEventSet(DistributedExecution *execution, bool socketClosedOnly)
}
execution->waitEventSet = BuildWaitEventSet(execution->sessionList, socketClosedOnly);
execution->events = palloc0(GetEventSetSize(execution->sessionList) * sizeof(WaitEvent));
execution->events = palloc0(GetEventSetSize(execution->sessionList) *
sizeof(WaitEvent));
execution->rebuildWaitEventSet = false;
execution->waitFlagsChanged = false;
execution->eventSetSize = GetEventSetSize(execution->sessionList);
@ -2937,8 +2859,6 @@ ProcessWaitEventsForSocketClosed(WaitEvent *events, int eventCount)
session->connection->connectionState = MULTI_CONNECTION_LOST;
}
}
return ;
}
@ -3410,17 +3330,27 @@ OpenNewConnections(WorkerPool *workerPool, int newConnectionCount,
bool socketClosedOnly = true;
RebuildWaitEventSet(workerPool->distributedExecution, socketClosedOnly);
long timeout = 0;/* don't wait */
/*
* Before using this connection in the distributed execution, we check
* whether the remote connection is closed/lost. This is common
* when we have a cached connection and remote server restarted
* (due to failover or restart etc.). We do this because we can
* retry connection a single time.
*/
long timeout = 0;/* don't wait */
int eventCount =
WaitEventSetWait(workerPool->distributedExecution->waitEventSet,
timeout, workerPool->distributedExecution->events,
workerPool->distributedExecution->eventSetSize, WAIT_EVENT_CLIENT_READ);
workerPool->distributedExecution->eventSetSize,
WAIT_EVENT_CLIENT_READ);
ProcessWaitEventsForSocketClosed(workerPool->distributedExecution->events, eventCount);
#if PG_VERSION_NUM >= PG_VERSION_15
ProcessWaitEventsForSocketClosed(workerPool->distributedExecution->events,
eventCount);
#endif
workerPool->distributedExecution->waitFlagsChanged = true;
WorkerSession *session = NULL;
foreach_ptr(session, workerPool->sessionList)
{
@ -5572,7 +5502,8 @@ BuildWaitEventSet(List *sessionList, bool socketClosedOnly)
* to the waitEventSet.
*/
static void
AddSessionToWaitEventSet(WorkerSession *session, WaitEventSet *waitEventSet, bool socketClosedOnly)
AddSessionToWaitEventSet(WorkerSession *session, WaitEventSet *waitEventSet, bool
socketClosedOnly)
{
MultiConnection *connection = session->connection;
@ -5599,13 +5530,15 @@ AddSessionToWaitEventSet(WorkerSession *session, WaitEventSet *waitEventSet, boo
if (socketClosedOnly)
{
waitEventSetIndex = CitusAddWaitEventSetToSet(waitEventSet, WL_SOCKET_CLOSED , sock,
NULL, (void *) session);
waitEventSetIndex = CitusAddWaitEventSetToSet(waitEventSet, WL_SOCKET_CLOSED,
sock,
NULL, (void *) session);
}
else
{
waitEventSetIndex = CitusAddWaitEventSetToSet(waitEventSet, connection->waitFlags , sock,
NULL, (void *) session);
waitEventSetIndex = CitusAddWaitEventSetToSet(waitEventSet, connection->waitFlags,
sock,
NULL, (void *) session);
}
session->waitEventSetIndex = waitEventSetIndex;