diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index 3b626fff5..419b41d82 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -682,9 +682,10 @@ static void CheckConnectionTimeout(WorkerPool *workerPool); static void MarkEstablishingSessionsTimedOut(WorkerPool *workerPool); static int UsableConnectionCount(WorkerPool *workerPool); static long NextEventTimeout(DistributedExecution *execution); -static WaitEventSet * BuildWaitEventSet(List *sessionList, bool s); +static WaitEventSet * BuildWaitEventSet(List *sessionList, bool socketClosedOnly); static void AddSessionToWaitEventSet(WorkerSession *session, - WaitEventSet *waitEventSet, bool s); + WaitEventSet *waitEventSet, + bool socketClosedOnly); static void RebuildWaitEventSetFlags(WaitEventSet *waitEventSet, List *sessionList); static TaskPlacementExecution * PopPlacementExecution(WorkerSession *session); static TaskPlacementExecution * PopAssignedPlacementExecution(WorkerSession *session); @@ -726,7 +727,6 @@ static void RebuildWaitEventSet(DistributedExecution *execution, bool socketClos static void ProcessWaitEvents(DistributedExecution *execution, WaitEvent *events, int eventCount, bool *cancellationReceived); #if PG_VERSION_NUM >= PG_VERSION_15 - static void ProcessWaitEventsForSocketClosed(WaitEvent *events, int eventCount); #endif static long MillisecondsBetweenTimestamps(instr_time startTime, instr_time endTime); @@ -2438,18 +2438,6 @@ FindOrCreateWorkerSession(WorkerPool *workerPool, MultiConnection *connection) dlist_init(&session->pendingTaskQueue); dlist_init(&session->readyTaskQueue); - /* - * Before using this connection in the distributed execution, we check - * whether the remote connection is closed/lost. This is common - * when we have a cached connection and remote server restarted - * (due to failover or restart etc.). We do this because we can - * retry connection a single time. - */ - if (RemoteSocketClosedForNewSession(session)) - { - connection->connectionState = MULTI_CONNECTION_LOST; - } - if (connection->connectionState == MULTI_CONNECTION_CONNECTED) { /* keep track of how many connections are ready */ @@ -2478,76 +2466,6 @@ FindOrCreateWorkerSession(WorkerPool *workerPool, MultiConnection *connection) } -/* - * RemoteSocketClosedForNewSession is a helper function for detecting whether - * the remote socket corresponding to the input session is closed. This is - * mostly common there is a cached connection and remote server restarted - * (due to failover or restart etc.). - * - * The function is not a generic function that can be called at the start of - * the execution. The function is not generic because it does not check all - * the events, even ignores cancellation events. Future callers of this - * function should consider its limitations. - */ -static bool -RemoteSocketClosedForNewSession(WorkerSession *session) -{ - bool socketClosed = false; - - return false; -#if PG_VERSION_NUM >= PG_VERSION_15 - - if (!WaitEventSetCanReportClosed()) - { - /* we cannot detect for this OS */ - return socketClosed; - } - - MultiConnection *connection = session->connection; - long timeout = 0;/* don't wait */ - int eventSetSize = 2; - WaitEvent *events = palloc0(eventSetSize * sizeof(WaitEvent)); - - /* - * Only wait for WL_SOCKET_CLOSED and postmaster death, do not even check - * for cancellations. Everything else are going to be checked soon in the - * main event processing. At this point, our only goal is to understand - * whether the remote socket is closed or not. - */ - int originalWaitFlags = connection->waitFlags; - connection->waitFlags = WL_SOCKET_CLOSED; - WaitEventSet *waitEventSet = - CreateWaitEventSet(CurrentMemoryContext, eventSetSize); - - AddSessionToWaitEventSet(session, waitEventSet, true); - - /* always good to wait for postmaster death */ - CitusAddWaitEventSetToSet(waitEventSet, WL_POSTMASTER_DEATH, PGINVALID_SOCKET, NULL, - NULL); - - int eventCount = WaitEventSetWait(waitEventSet, timeout, events, - eventSetSize, WAIT_EVENT_CLIENT_READ); - ProcessWaitEventsForSocketClosed(events, eventCount); - - /* we can at most receive a single event, which is WL_SOCKET_CLOSED */ - Assert(eventCount <= 1); - - FreeWaitEventSet(waitEventSet); - pfree(events); - - /* - * We only searched for WL_SOCKET_CLOSED, and we processed the - * event already. Now, set back to the original flags. - */ - UpdateConnectionWaitFlags(session, originalWaitFlags); - session->latestUnconsumedWaitEvents = 0; - -#endif - - return socketClosed; -} - - /* * ShouldRunTasksSequentially returns true if each of the individual tasks * should be executed one by one. Note that this is different than @@ -2715,9 +2633,12 @@ RunDistributedExecution(DistributedExecution *execution) /* wait for I/O events */ long timeout = NextEventTimeout(execution); - int eventCount = WaitEventSetWait(execution->waitEventSet, timeout, execution->events, - execution->eventSetSize, WAIT_EVENT_CLIENT_READ); - ProcessWaitEvents(execution, execution->events, eventCount, &cancellationReceived); + int eventCount = WaitEventSetWait(execution->waitEventSet, timeout, + execution->events, + execution->eventSetSize, + WAIT_EVENT_CLIENT_READ); + ProcessWaitEvents(execution, execution->events, eventCount, + &cancellationReceived); } if (execution->events != NULL) @@ -2851,7 +2772,8 @@ RebuildWaitEventSet(DistributedExecution *execution, bool socketClosedOnly) } execution->waitEventSet = BuildWaitEventSet(execution->sessionList, socketClosedOnly); - execution->events = palloc0(GetEventSetSize(execution->sessionList) * sizeof(WaitEvent)); + execution->events = palloc0(GetEventSetSize(execution->sessionList) * + sizeof(WaitEvent)); execution->rebuildWaitEventSet = false; execution->waitFlagsChanged = false; execution->eventSetSize = GetEventSetSize(execution->sessionList); @@ -2937,8 +2859,6 @@ ProcessWaitEventsForSocketClosed(WaitEvent *events, int eventCount) session->connection->connectionState = MULTI_CONNECTION_LOST; } } - - return ; } @@ -3410,17 +3330,27 @@ OpenNewConnections(WorkerPool *workerPool, int newConnectionCount, bool socketClosedOnly = true; RebuildWaitEventSet(workerPool->distributedExecution, socketClosedOnly); - long timeout = 0;/* don't wait */ + /* + * Before using this connection in the distributed execution, we check + * whether the remote connection is closed/lost. This is common + * when we have a cached connection and remote server restarted + * (due to failover or restart etc.). We do this because we can + * retry connection a single time. + */ + long timeout = 0;/* don't wait */ int eventCount = WaitEventSetWait(workerPool->distributedExecution->waitEventSet, timeout, workerPool->distributedExecution->events, - workerPool->distributedExecution->eventSetSize, WAIT_EVENT_CLIENT_READ); + workerPool->distributedExecution->eventSetSize, + WAIT_EVENT_CLIENT_READ); - ProcessWaitEventsForSocketClosed(workerPool->distributedExecution->events, eventCount); +#if PG_VERSION_NUM >= PG_VERSION_15 + ProcessWaitEventsForSocketClosed(workerPool->distributedExecution->events, + eventCount); +#endif workerPool->distributedExecution->waitFlagsChanged = true; - WorkerSession *session = NULL; foreach_ptr(session, workerPool->sessionList) { @@ -5572,7 +5502,8 @@ BuildWaitEventSet(List *sessionList, bool socketClosedOnly) * to the waitEventSet. */ static void -AddSessionToWaitEventSet(WorkerSession *session, WaitEventSet *waitEventSet, bool socketClosedOnly) +AddSessionToWaitEventSet(WorkerSession *session, WaitEventSet *waitEventSet, bool + socketClosedOnly) { MultiConnection *connection = session->connection; @@ -5599,13 +5530,15 @@ AddSessionToWaitEventSet(WorkerSession *session, WaitEventSet *waitEventSet, boo if (socketClosedOnly) { - waitEventSetIndex = CitusAddWaitEventSetToSet(waitEventSet, WL_SOCKET_CLOSED , sock, - NULL, (void *) session); + waitEventSetIndex = CitusAddWaitEventSetToSet(waitEventSet, WL_SOCKET_CLOSED, + sock, + NULL, (void *) session); } else { - waitEventSetIndex = CitusAddWaitEventSetToSet(waitEventSet, connection->waitFlags , sock, - NULL, (void *) session); + waitEventSetIndex = CitusAddWaitEventSetToSet(waitEventSet, connection->waitFlags, + sock, + NULL, (void *) session); } session->waitEventSetIndex = waitEventSetIndex;