From c281a819e56feadbd94132cc7d71ffe52e2bb3db Mon Sep 17 00:00:00 2001 From: Onder Kalaci Date: Tue, 30 Aug 2022 09:19:49 +0200 Subject: [PATCH] Error early --- .../connection/connection_management.c | 126 ++++++++++++++---- 1 file changed, 102 insertions(+), 24 deletions(-) diff --git a/src/backend/distributed/connection/connection_management.c b/src/backend/distributed/connection/connection_management.c index 685e0e435..2696ce94d 100644 --- a/src/backend/distributed/connection/connection_management.c +++ b/src/backend/distributed/connection/connection_management.c @@ -57,8 +57,10 @@ static uint32 ConnectionHashHash(const void *key, Size keysize); static int ConnectionHashCompare(const void *a, const void *b, Size keysize); static void StartConnectionEstablishment(MultiConnection *connectionn, ConnectionHashKey *key); -static MultiConnection * FindAvailableConnection(dlist_head *connections, uint32 flags); -static void ErrorIfMultipleMetadataConnectionExists(dlist_head *connections); +static MultiConnection * FindAvailableConnection(List *connections, uint32 flags); +static List * EnsureAndGetHealthyConnections(dlist_head *connections); +static bool RemoteSocketClosed(MultiConnection *connection); +static void ErrorIfMultipleMetadataConnectionExists(List *connections); static void FreeConnParamsHashEntryFields(ConnParamsHashEntry *entry); static void AfterXactHostConnectionHandling(ConnectionHashEntry *entry, bool isCommit); static bool ShouldShutdownConnection(MultiConnection *connection, const int @@ -66,7 +68,6 @@ static bool ShouldShutdownConnection(MultiConnection *connection, const int static void ResetConnection(MultiConnection *connection); static bool RemoteTransactionIdle(MultiConnection *connection); static int EventSetSizeForConnectionList(List *connections); -static bool RemoteConnectionClosed(MultiConnection *connection); /* types for async connection management */ enum MultiConnectionPhase @@ -334,8 +335,10 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port, /* if desired, check whether there's a usable connection */ if (!(flags & FORCE_NEW_CONNECTION)) { + List *healthyConnections = EnsureAndGetHealthyConnections(entry->connections); + /* check connection cache for a connection that's not already in use */ - MultiConnection *connection = FindAvailableConnection(entry->connections, flags); + MultiConnection *connection = FindAvailableConnection(healthyConnections, flags); if (connection) { return connection; @@ -427,16 +430,13 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port, * If no connection is available, FindAvailableConnection returns NULL. */ static MultiConnection * -FindAvailableConnection(dlist_head *connections, uint32 flags) +FindAvailableConnection(List *connections, uint32 flags) { List *metadataConnectionCandidateList = NIL; - dlist_iter iter; - dlist_foreach(iter, connections) + MultiConnection *connection = NULL; + foreach_ptr(connection, connections) { - MultiConnection *connection = - dlist_container(MultiConnection, connectionNode, iter.cur); - if (flags & OUTSIDE_TRANSACTION) { /* don't return connections that are used in transactions */ @@ -492,11 +492,6 @@ FindAvailableConnection(dlist_head *connections, uint32 flags) continue; } - if (RemoteConnectionClosed(connection)) - { - continue; - } - return connection; } @@ -531,8 +526,92 @@ FindAvailableConnection(dlist_head *connections, uint32 flags) } +/* + * EnsureAndGetHealthyConnections is a helper function that goes over the + * input connections, and: + * - Errors for the connections that are not healthy but still part of a + * remote transaction + * - Skips connections that are not healthy, marks them to be closed + * at the end of the transaction such that they do not linger around + * + * And, finally returns the list (unless already errored out) of connections + * that can be reused. + */ +static List * +EnsureAndGetHealthyConnections(dlist_head *connections) +{ + List *healthyConnections = NIL; + + dlist_iter iter; + dlist_foreach(iter, connections) + { + MultiConnection *connection = + dlist_container(MultiConnection, connectionNode, iter.cur); + bool healthyConnection = true; + + if (connection->pgConn == NULL || PQsocket(connection->pgConn) == -1 || + PQstatus(connection->pgConn) != CONNECTION_OK || + connection->connectionState != MULTI_CONNECTION_CONNECTED || + RemoteSocketClosed(connection)) + { + /* connection is not healthy */ + healthyConnection = false; + } + + RemoteTransaction *transaction = &connection->remoteTransaction; + bool inRemoteTrasaction = + transaction->transactionState != REMOTE_TRANS_NOT_STARTED; + + if (inRemoteTrasaction) + { + PGTransactionStatusType status = PQtransactionStatus(connection->pgConn); + + /* if the connection is in a bad state, so is the transaction's state */ + if (status == PQTRANS_INERROR || status == PQTRANS_UNKNOWN) + { + transaction->transactionFailed = true; + } + } + + if (healthyConnection) + { + healthyConnections = lappend(healthyConnections, connection); + } + else if (inRemoteTrasaction && + (transaction->transactionCritical || + !dlist_is_empty(&connection->referencedPlacements))) + { + ReportConnectionError(connection, ERROR); + + /* + * If a connection is executing a critical transaction or accessed any + * placements, we should not continue the execution. + */ + ereport(ERROR, (errmsg("failure on connection marked as essential: %s:%d", + connection->hostname, connection->port))); + } + else + { + /* + * If a connection is not usable and we do not need to throw an error, + * close at the end of the transaction. + */ + connection->forceCloseAtTransactionEnd = true; + } + } + + return healthyConnections; +} + + +/* + * RemoteSocketClosed returns true if the remote socket is closed. + * + * Note that we rely on WL_SOCKET_CLOSED, which is only available + * for PG 15+ and for kernels that support the feature. + */ static bool -RemoteConnectionClosed(MultiConnection *connection) +RemoteSocketClosed(MultiConnection *connection) { bool socketClosed = false; #if PG_VERSION_NUM >= PG_VERSION_15 @@ -569,15 +648,12 @@ RemoteConnectionClosed(MultiConnection *connection) * input connection dlist contains more than one metadata connections. */ static void -ErrorIfMultipleMetadataConnectionExists(dlist_head *connections) +ErrorIfMultipleMetadataConnectionExists(List *connections) { bool foundMetadataConnection = false; - dlist_iter iter; - dlist_foreach(iter, connections) + MultiConnection *connection = NULL; + foreach_ptr(connection, connections) { - MultiConnection *connection = - dlist_container(MultiConnection, connectionNode, iter.cur); - if (connection->useForMetadataOperations) { if (foundMetadataConnection) @@ -655,7 +731,9 @@ ConnectionAvailableToNode(char *hostName, int nodePort, const char *userName, } int flags = 0; - MultiConnection *connection = FindAvailableConnection(entry->connections, flags); + List *healthyConnections = EnsureAndGetHealthyConnections(entry->connections); + + MultiConnection *connection = FindAvailableConnection(healthyConnections, flags); return connection; } @@ -1517,7 +1595,7 @@ ShouldShutdownConnection(MultiConnection *connection, const int cachedConnection (MaxCachedConnectionLifetime >= 0 && MillisecondsToTimeout(connection->connectionEstablishmentStart, MaxCachedConnectionLifetime) <= 0) || - RemoteConnectionClosed(connection); + RemoteSocketClosed(connection); }