From 65816467f27bb83de1842f7ac9d379d9864fa486 Mon Sep 17 00:00:00 2001 From: Onder Kalaci Date: Mon, 29 Aug 2022 09:49:08 +0200 Subject: [PATCH] Check WL_SOCKET_CLOSED --- .../connection/connection_management.c | 43 ++++++++++++++++++- 1 file changed, 41 insertions(+), 2 deletions(-) diff --git a/src/backend/distributed/connection/connection_management.c b/src/backend/distributed/connection/connection_management.c index 577378803..685e0e435 100644 --- a/src/backend/distributed/connection/connection_management.c +++ b/src/backend/distributed/connection/connection_management.c @@ -66,7 +66,7 @@ static bool ShouldShutdownConnection(MultiConnection *connection, const int static void ResetConnection(MultiConnection *connection); static bool RemoteTransactionIdle(MultiConnection *connection); static int EventSetSizeForConnectionList(List *connections); - +static bool RemoteConnectionClosed(MultiConnection *connection); /* types for async connection management */ enum MultiConnectionPhase @@ -492,6 +492,11 @@ FindAvailableConnection(dlist_head *connections, uint32 flags) continue; } + if (RemoteConnectionClosed(connection)) + { + continue; + } + return connection; } @@ -526,6 +531,39 @@ FindAvailableConnection(dlist_head *connections, uint32 flags) } +static bool +RemoteConnectionClosed(MultiConnection *connection) +{ + bool socketClosed = false; +#if PG_VERSION_NUM >= PG_VERSION_15 + + WaitEventSet *waitEventSet = + CreateWaitEventSet(CurrentMemoryContext, 1); + int sock = PQsocket(connection->pgConn); + WaitEvent *events = palloc0(sizeof(WaitEvent)); + + /* TODO: process return value of CitusAddWaitEventSetToSet */ + CitusAddWaitEventSetToSet(waitEventSet, WL_SOCKET_CLOSED, sock, + NULL, (void *) connection); + + long timeout = 0; /* do not wait at all */ + int eventCount = WaitEventSetWait(waitEventSet, timeout, events, + 1, WAIT_EVENT_CLIENT_READ); + if (eventCount > 0) + { + Assert(eventCount == 1); + + WaitEvent *event = &events[0]; + socketClosed = (event->events & WL_SOCKET_CLOSED); + } + + FreeWaitEventSet(waitEventSet); +#endif + + return socketClosed; +} + + /* * ErrorIfMultipleMetadataConnectionExists throws an error if the * input connection dlist contains more than one metadata connections. @@ -1478,7 +1516,8 @@ ShouldShutdownConnection(MultiConnection *connection, const int cachedConnection connection->requiresReplication || (MaxCachedConnectionLifetime >= 0 && MillisecondsToTimeout(connection->connectionEstablishmentStart, - MaxCachedConnectionLifetime) <= 0); + MaxCachedConnectionLifetime) <= 0) || + RemoteConnectionClosed(connection); }