From d377dce85249f4743a60076c8f9e198b58fd555b Mon Sep 17 00:00:00 2001 From: Onder Kalaci Date: Mon, 29 Aug 2022 09:49:08 +0200 Subject: [PATCH] Check WL_SOCKET_CLOSED --- .../connection/connection_management.c | 152 ++++++++++++++++-- 1 file changed, 135 insertions(+), 17 deletions(-) diff --git a/src/backend/distributed/connection/connection_management.c b/src/backend/distributed/connection/connection_management.c index 577378803..97210e496 100644 --- a/src/backend/distributed/connection/connection_management.c +++ b/src/backend/distributed/connection/connection_management.c @@ -57,8 +57,10 @@ static uint32 ConnectionHashHash(const void *key, Size keysize); static int ConnectionHashCompare(const void *a, const void *b, Size keysize); static void StartConnectionEstablishment(MultiConnection *connectionn, ConnectionHashKey *key); -static MultiConnection * FindAvailableConnection(dlist_head *connections, uint32 flags); -static void ErrorIfMultipleMetadataConnectionExists(dlist_head *connections); +static MultiConnection * FindAvailableConnection(List *connections, uint32 flags); +static List * EnsureAndGetHealthyConnections(dlist_head *connections, uint32 flags); +static bool RemoteSocketClosed(MultiConnection *connection); +static void ErrorIfMultipleMetadataConnectionExists(List *connections); static void FreeConnParamsHashEntryFields(ConnParamsHashEntry *entry); static void AfterXactHostConnectionHandling(ConnectionHashEntry *entry, bool isCommit); static bool ShouldShutdownConnection(MultiConnection *connection, const int @@ -67,7 +69,6 @@ static void ResetConnection(MultiConnection *connection); static bool RemoteTransactionIdle(MultiConnection *connection); static int EventSetSizeForConnectionList(List *connections); - /* types for async connection management */ enum MultiConnectionPhase { @@ -334,8 +335,11 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port, /* if desired, check whether there's a usable connection */ if (!(flags & FORCE_NEW_CONNECTION)) { + List *healthyConnections = EnsureAndGetHealthyConnections(entry->connections, + flags); + /* check connection cache for a connection that's not already in use */ - MultiConnection *connection = FindAvailableConnection(entry->connections, flags); + MultiConnection *connection = FindAvailableConnection(healthyConnections, flags); if (connection) { return connection; @@ -427,16 +431,13 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port, * If no connection is available, FindAvailableConnection returns NULL. */ static MultiConnection * -FindAvailableConnection(dlist_head *connections, uint32 flags) +FindAvailableConnection(List *connections, uint32 flags) { List *metadataConnectionCandidateList = NIL; - dlist_iter iter; - dlist_foreach(iter, connections) + MultiConnection *connection = NULL; + foreach_ptr(connection, connections) { - MultiConnection *connection = - dlist_container(MultiConnection, connectionNode, iter.cur); - if (flags & OUTSIDE_TRANSACTION) { /* don't return connections that are used in transactions */ @@ -527,19 +528,133 @@ FindAvailableConnection(dlist_head *connections, uint32 flags) /* - * ErrorIfMultipleMetadataConnectionExists throws an error if the - * input connection dlist contains more than one metadata connections. + * EnsureAndGetHealthyConnections is a helper function that goes over the + * input connections, and: + * - Errors for the connections that are not healthy but still part of a + * remote transaction + * - Skips connections that are not healthy, marks them to be closed + * at the end of the transaction such that they do not linger around + * + * And, finally returns the list (unless already errored out) of connections + * that can be reused. */ -static void -ErrorIfMultipleMetadataConnectionExists(dlist_head *connections) +static List * +EnsureAndGetHealthyConnections(dlist_head *connections, uint32 flags) { - bool foundMetadataConnection = false; + List *healthyConnections = NIL; + dlist_iter iter; dlist_foreach(iter, connections) { MultiConnection *connection = dlist_container(MultiConnection, connectionNode, iter.cur); + bool healthyConnection = true; + if (connection->pgConn == NULL || PQsocket(connection->pgConn) == -1 || + PQstatus(connection->pgConn) != CONNECTION_OK || + connection->connectionState != MULTI_CONNECTION_CONNECTED || + RemoteSocketClosed(connection)) + { + /* connection is not healthy */ + healthyConnection = false; + } + + RemoteTransaction *transaction = &connection->remoteTransaction; + bool inRemoteTrasaction = + transaction->transactionState != REMOTE_TRANS_NOT_STARTED; + + if (inRemoteTrasaction) + { + PGTransactionStatusType status = PQtransactionStatus(connection->pgConn); + + /* if the connection is in a bad state, so is the transaction's state */ + if (status == PQTRANS_INERROR || status == PQTRANS_UNKNOWN) + { + transaction->transactionFailed = true; + } + } + + if (healthyConnection) + { + healthyConnections = lappend(healthyConnections, connection); + } + else if ((inRemoteTrasaction && !(flags & OUTSIDE_TRANSACTION)) && + (transaction->transactionCritical || + !dlist_is_empty(&connection->referencedPlacements))) + { + ReportConnectionError(connection, ERROR); + + /* + * If a connection is executing a critical transaction or accessed any + * placements, we should not continue the execution. + */ + ereport(WARNING, (errmsg("failure on connection marked as essential: %s:%d", + connection->hostname, connection->port))); + } + else + { + /* + * If a connection is not usable and we do not need to throw an error, + * close at the end of the transaction. + */ + connection->forceCloseAtTransactionEnd = true; + } + } + + return healthyConnections; +} + + +/* + * RemoteSocketClosed returns true if the remote socket is closed. + * + * Note that we rely on WL_SOCKET_CLOSED, which is only available + * for PG 15+ and for kernels that support the feature. + */ +static bool +RemoteSocketClosed(MultiConnection *connection) +{ + bool socketClosed = false; +#if PG_VERSION_NUM >= PG_VERSION_15 + + WaitEventSet *waitEventSet = + CreateWaitEventSet(CurrentMemoryContext, 1); + int sock = PQsocket(connection->pgConn); + WaitEvent *events = palloc0(sizeof(WaitEvent)); + + /* TODO: process return value of CitusAddWaitEventSetToSet */ + CitusAddWaitEventSetToSet(waitEventSet, WL_SOCKET_CLOSED, sock, + NULL, (void *) connection); + + long timeout = 0; /* do not wait at all */ + int eventCount = WaitEventSetWait(waitEventSet, timeout, events, + 1, WAIT_EVENT_CLIENT_READ); + if (eventCount > 0) + { + Assert(eventCount == 1); + + WaitEvent *event = &events[0]; + socketClosed = (event->events & WL_SOCKET_CLOSED); + } + + FreeWaitEventSet(waitEventSet); +#endif + + return socketClosed; +} + + +/* + * ErrorIfMultipleMetadataConnectionExists throws an error if the + * input connection dlist contains more than one metadata connections. + */ +static void +ErrorIfMultipleMetadataConnectionExists(List *connections) +{ + bool foundMetadataConnection = false; + MultiConnection *connection = NULL; + foreach_ptr(connection, connections) + { if (connection->useForMetadataOperations) { if (foundMetadataConnection) @@ -617,7 +732,9 @@ ConnectionAvailableToNode(char *hostName, int nodePort, const char *userName, } int flags = 0; - MultiConnection *connection = FindAvailableConnection(entry->connections, flags); + List *healthyConnections = EnsureAndGetHealthyConnections(entry->connections, flags); + + MultiConnection *connection = FindAvailableConnection(healthyConnections, flags); return connection; } @@ -1478,7 +1595,8 @@ ShouldShutdownConnection(MultiConnection *connection, const int cachedConnection connection->requiresReplication || (MaxCachedConnectionLifetime >= 0 && MillisecondsToTimeout(connection->connectionEstablishmentStart, - MaxCachedConnectionLifetime) <= 0); + MaxCachedConnectionLifetime) <= 0) || + RemoteSocketClosed(connection); }