From 754d6e201e456a9d5462531c684e1c621a20c760 Mon Sep 17 00:00:00 2001 From: Onder Kalaci Date: Fri, 18 Nov 2022 14:00:10 +0100 Subject: [PATCH] w --- .../distributed/executor/adaptive_executor.c | 92 +++++++++++++------ 1 file changed, 62 insertions(+), 30 deletions(-) diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index d88b4beda..88702cdae 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -243,6 +243,7 @@ typedef struct DistributedExecution * and "man 3 siglongjmp" for more context. */ WaitEventSet *waitEventSet; + WaitEvent *events; /* * The number of connections we aim to open per worker. @@ -680,9 +681,9 @@ static void CheckConnectionTimeout(WorkerPool *workerPool); static void MarkEstablishingSessionsTimedOut(WorkerPool *workerPool); static int UsableConnectionCount(WorkerPool *workerPool); static long NextEventTimeout(DistributedExecution *execution); -static WaitEventSet * BuildWaitEventSet(List *sessionList); +static WaitEventSet * BuildWaitEventSet(List *sessionList, bool s); static void AddSessionToWaitEventSet(WorkerSession *session, - WaitEventSet *waitEventSet); + WaitEventSet *waitEventSet, bool s); static void RebuildWaitEventSetFlags(WaitEventSet *waitEventSet, List *sessionList); static TaskPlacementExecution * PopPlacementExecution(WorkerSession *session); static TaskPlacementExecution * PopAssignedPlacementExecution(WorkerSession *session); @@ -720,12 +721,12 @@ static int GetEventSetSize(List *sessionList); static bool ProcessSessionsWithFailedWaitEventSetOperations( DistributedExecution *execution); static bool HasIncompleteConnectionEstablishment(DistributedExecution *execution); -static int RebuildWaitEventSet(DistributedExecution *execution); +static int RebuildWaitEventSet(DistributedExecution *execution, bool socketClosedOnly); static void ProcessWaitEvents(DistributedExecution *execution, WaitEvent *events, int eventCount, bool *cancellationReceived); #if PG_VERSION_NUM >= PG_VERSION_15 -static bool ProcessWaitEventsForSocketClosed(WaitEvent *events, int eventCount); +static void ProcessWaitEventsForSocketClosed(WaitEvent *events, int eventCount); #endif static long MillisecondsBetweenTimestamps(instr_time startTime, instr_time endTime); static uint64 MicrosecondsBetweenTimestamps(instr_time startTime, instr_time endTime); @@ -2492,6 +2493,7 @@ RemoteSocketClosedForNewSession(WorkerSession *session) { bool socketClosed = false; + return false; #if PG_VERSION_NUM >= PG_VERSION_15 if (!WaitEventSetCanReportClosed()) @@ -2516,7 +2518,7 @@ RemoteSocketClosedForNewSession(WorkerSession *session) WaitEventSet *waitEventSet = CreateWaitEventSet(CurrentMemoryContext, eventSetSize); - AddSessionToWaitEventSet(session, waitEventSet); + AddSessionToWaitEventSet(session, waitEventSet, true); /* always good to wait for postmaster death */ CitusAddWaitEventSetToSet(waitEventSet, WL_POSTMASTER_DEATH, PGINVALID_SOCKET, NULL, @@ -2524,8 +2526,7 @@ RemoteSocketClosedForNewSession(WorkerSession *session) int eventCount = WaitEventSetWait(waitEventSet, timeout, events, eventSetSize, WAIT_EVENT_CLIENT_READ); - socketClosed = - ProcessWaitEventsForSocketClosed(events, eventCount); + ProcessWaitEventsForSocketClosed(events, eventCount); /* we can at most receive a single event, which is WL_SOCKET_CLOSED */ Assert(eventCount <= 1); @@ -2632,8 +2633,6 @@ SequentialRunDistributedExecution(DistributedExecution *execution) void RunDistributedExecution(DistributedExecution *execution) { - WaitEvent *events = NULL; - AssignTasksToConnectionsOrWorkerPool(execution); PG_TRY(); @@ -2690,17 +2689,16 @@ RunDistributedExecution(DistributedExecution *execution) } else if (execution->rebuildWaitEventSet) { - if (events != NULL) + if (execution->events != NULL) { /* * The execution might take a while, so explicitly free at this point * because we don't need anymore. */ - pfree(events); - events = NULL; + pfree(execution->events); + execution->events = NULL; } - eventSetSize = RebuildWaitEventSet(execution); - events = palloc0(eventSetSize * sizeof(WaitEvent)); + eventSetSize = RebuildWaitEventSet(execution, false); skipWaitEvents = ProcessSessionsWithFailedWaitEventSetOperations(execution); @@ -2727,14 +2725,14 @@ RunDistributedExecution(DistributedExecution *execution) /* wait for I/O events */ long timeout = NextEventTimeout(execution); - int eventCount = WaitEventSetWait(execution->waitEventSet, timeout, events, + int eventCount = WaitEventSetWait(execution->waitEventSet, timeout, execution->events, eventSetSize, WAIT_EVENT_CLIENT_READ); - ProcessWaitEvents(execution, events, eventCount, &cancellationReceived); + ProcessWaitEvents(execution, execution->events, eventCount, &cancellationReceived); } - if (events != NULL) + if (execution->events != NULL) { - pfree(events); + pfree(execution->events); } if (execution->waitEventSet != NULL) @@ -2844,7 +2842,7 @@ HasIncompleteConnectionEstablishment(DistributedExecution *execution) * It returns the new event set size. */ static int -RebuildWaitEventSet(DistributedExecution *execution) +RebuildWaitEventSet(DistributedExecution *execution, bool socketClosedOnly) { if (execution->waitEventSet != NULL) { @@ -2852,7 +2850,8 @@ RebuildWaitEventSet(DistributedExecution *execution) execution->waitEventSet = NULL; } - execution->waitEventSet = BuildWaitEventSet(execution->sessionList); + execution->waitEventSet = BuildWaitEventSet(execution->sessionList, socketClosedOnly); + execution->events = palloc0(GetEventSetSize(execution->sessionList) * sizeof(WaitEvent)); execution->rebuildWaitEventSet = false; execution->waitFlagsChanged = false; @@ -2916,7 +2915,7 @@ ProcessWaitEvents(DistributedExecution *execution, WaitEvent *events, int eventC * If WL_SOCKET_CLOSED is found, the function sets the underlying connection's * state as MULTI_CONNECTION_LOST. */ -static bool +static void ProcessWaitEventsForSocketClosed(WaitEvent *events, int eventCount) { int eventIndex = 0; @@ -2936,11 +2935,11 @@ ProcessWaitEventsForSocketClosed(WaitEvent *events, int eventCount) if (session->latestUnconsumedWaitEvents & WL_SOCKET_CLOSED) { - return true; + session->connection->connectionState = MULTI_CONNECTION_LOST; } } - return false; + return ; } @@ -3024,7 +3023,6 @@ ManageWorkerPool(WorkerPool *workerPool) } INSTR_TIME_SET_CURRENT(workerPool->lastConnectionOpenTime); - execution->rebuildWaitEventSet = true; } @@ -3347,6 +3345,7 @@ OpenNewConnections(WorkerPool *workerPool, int newConnectionCount, ereport(DEBUG4, (errmsg("opening %d new connections to %s:%d", newConnectionCount, workerPool->nodeName, workerPool->nodePort))); + List *newConnectionsList = NIL; for (int connectionIndex = 0; connectionIndex < newConnectionCount; connectionIndex++) { /* experimental: just to see the perf benefits of caching connections */ @@ -3406,6 +3405,22 @@ OpenNewConnections(WorkerPool *workerPool, int newConnectionCount, /* create a session for the connection */ WorkerSession *session = FindOrCreateWorkerSession(workerPool, connection); + newConnectionsList = lappend(newConnectionsList, session); + } + + bool socketClosedOnly = true; + int size = RebuildWaitEventSet(workerPool->distributedExecution, socketClosedOnly); + + long timeout = 0;/* don't wait */ + + int eventCount = WaitEventSetWait(workerPool->distributedExecution->waitEventSet, timeout, workerPool->distributedExecution->events, size, WAIT_EVENT_CLIENT_READ); + ProcessWaitEventsForSocketClosed(workerPool->distributedExecution->events, eventCount); + + workerPool->distributedExecution->waitFlagsChanged = true; + + WorkerSession *session = NULL; + foreach_ptr(session, workerPool->sessionList) + { /* immediately run the state machine to handle potential failure */ ConnectionStateMachine(session); } @@ -3666,6 +3681,13 @@ ConnectionStateMachine(WorkerSession *session) MultiConnection *connection = session->connection; MultiConnectionState currentState; +#if PG_VERSION_NUM >= PG_VERSION_15 + if ((session->latestUnconsumedWaitEvents & WL_SOCKET_CLOSED) != 0) + { + connection->connectionState = MULTI_CONNECTION_LOST; + } +#endif + do { currentState = connection->connectionState; @@ -5518,7 +5540,7 @@ TaskExecutionStateMachine(ShardCommandExecution *shardCommandExecution) * write-ready. */ static WaitEventSet * -BuildWaitEventSet(List *sessionList) +BuildWaitEventSet(List *sessionList, bool socketClosedOnly) { /* additional 2 is for postmaster and latch */ int eventSetSize = GetEventSetSize(sessionList); @@ -5529,7 +5551,7 @@ BuildWaitEventSet(List *sessionList) WorkerSession *session = NULL; foreach_ptr(session, sessionList) { - AddSessionToWaitEventSet(session, waitEventSet); + AddSessionToWaitEventSet(session, waitEventSet, socketClosedOnly); } CitusAddWaitEventSetToSet(waitEventSet, WL_POSTMASTER_DEATH, PGINVALID_SOCKET, NULL, @@ -5547,7 +5569,7 @@ BuildWaitEventSet(List *sessionList) * to the waitEventSet. */ static void -AddSessionToWaitEventSet(WorkerSession *session, WaitEventSet *waitEventSet) +AddSessionToWaitEventSet(WorkerSession *session, WaitEventSet *waitEventSet, bool socketClosedOnly) { MultiConnection *connection = session->connection; @@ -5557,7 +5579,7 @@ AddSessionToWaitEventSet(WorkerSession *session, WaitEventSet *waitEventSet) return; } - if (connection->waitFlags == 0) + if (connection->waitFlags == 0 && !socketClosedOnly) { /* not currently waiting for this connection */ return; @@ -5570,9 +5592,19 @@ AddSessionToWaitEventSet(WorkerSession *session, WaitEventSet *waitEventSet) return; } - int waitEventSetIndex = - CitusAddWaitEventSetToSet(waitEventSet, connection->waitFlags, sock, + int waitEventSetIndex = WAIT_EVENT_SET_INDEX_NOT_INITIALIZED; + + if (socketClosedOnly) + { + waitEventSetIndex = CitusAddWaitEventSetToSet(waitEventSet, WL_SOCKET_CLOSED , sock, NULL, (void *) session); + } + else + { + waitEventSetIndex = CitusAddWaitEventSetToSet(waitEventSet, connection->waitFlags , sock, + NULL, (void *) session); + } + session->waitEventSetIndex = waitEventSetIndex; /*