diff --git a/src/backend/distributed/connection/connection_management.c b/src/backend/distributed/connection/connection_management.c index 9679c064c..aaabbfda8 100644 --- a/src/backend/distributed/connection/connection_management.c +++ b/src/backend/distributed/connection/connection_management.c @@ -44,6 +44,9 @@ static int ConnectionHashCompare(const void *a, const void *b, Size keysize); static MultiConnection * StartConnectionEstablishment(ConnectionHashKey *key); static void FreeConnParamsHashEntryFields(ConnParamsHashEntry *entry); static void AfterXactHostConnectionHandling(ConnectionHashEntry *entry, bool isCommit); +static bool ShouldShutdownConnection(MultiConnection *connection, const int + cachedConnectionCount); +static void ResetConnection(MultiConnection *connection); static void DefaultCitusNoticeProcessor(void *arg, const char *message); static MultiConnection * FindAvailableConnection(dlist_head *connections, uint32 flags); static bool RemoteTransactionIdle(MultiConnection *connection); @@ -136,7 +139,7 @@ InvalidateConnParamsHashEntries(void) /* - * Perform connection management activity after the end of a transaction. Both + * AfterXactConnectionHandling performs connection management activity after the end of a transaction. Both * COMMIT and ABORT paths are handled here. * * This is called by Citus' global transaction callback. @@ -1039,7 +1042,7 @@ FreeConnParamsHashEntryFields(ConnParamsHashEntry *entry) /* - * Close all remote connections if necessary anymore (i.e. not session + * AfterXactHostConnectionHandling closes all remote connections if not necessary anymore (i.e. not session * lifetime), or if in a failed state. */ static void @@ -1047,18 +1050,6 @@ AfterXactHostConnectionHandling(ConnectionHashEntry *entry, bool isCommit) { dlist_mutable_iter iter; int cachedConnectionCount = 0; - bool isCitusInitiatedBackend = false; - - /* - * When we are in a backend that was created to service an internal connection - * from the coordinator or another worker, we disable connection caching to avoid - * escalating the number of cached connections. We can recognize such backends - * from their application name. - */ - if (application_name != NULL && strcmp(application_name, CITUS_APPLICATION_NAME) == 0) - { - isCitusInitiatedBackend = true; - } dlist_foreach_modify(iter, entry->connections) { @@ -1066,8 +1057,8 @@ AfterXactHostConnectionHandling(ConnectionHashEntry *entry, bool isCommit) dlist_container(MultiConnection, connectionNode, iter.cur); /* - * To avoid code leaking connections we warn if connections are - * still claimed exclusively. We can only do so if the transaction + * To avoid leaking connections we warn if connections are + * still claimed exclusively. We can only do so if the transaction is * committed, as it's normal that code didn't have chance to clean * up after errors. */ @@ -1077,14 +1068,8 @@ AfterXactHostConnectionHandling(ConnectionHashEntry *entry, bool isCommit) (errmsg("connection claimed exclusively at transaction commit"))); } - /* - * Preserve session lifespan connections if they are still healthy. - */ - if (isCitusInitiatedBackend || - cachedConnectionCount >= MaxCachedConnectionsPerWorker || - connection->forceCloseAtTransactionEnd || - PQstatus(connection->pgConn) != CONNECTION_OK || - !RemoteTransactionIdle(connection)) + + if (ShouldShutdownConnection(connection, cachedConnectionCount)) { ShutdownConnection(connection); @@ -1095,14 +1080,10 @@ AfterXactHostConnectionHandling(ConnectionHashEntry *entry, bool isCommit) } else { - /* reset per-transaction state */ - ResetRemoteTransaction(connection); - ResetShardPlacementAssociation(connection); - - /* reset copy state */ - connection->copyBytesWrittenSinceLastFlush = 0; - - UnclaimConnection(connection); + /* + * reset healthy session lifespan connections. + */ + ResetConnection(connection); cachedConnectionCount++; } @@ -1110,6 +1091,56 @@ AfterXactHostConnectionHandling(ConnectionHashEntry *entry, bool isCommit) } +/* + * ShouldShutdownConnection returns true if either one of the followings is true: + * - The connection is citus initiated. + * - Current cached connections is already at MaxCachedConnectionPerWorker + * - Connection is forced to close at the end of transaction + * - Connection is not in OK state + * - A transaction is still in progress (usually because we are cancelling a distributed transaction) + */ +static bool +ShouldShutdownConnection(MultiConnection *connection, const int cachedConnectionCount) +{ + bool isCitusInitiatedBackend = false; + + /* + * When we are in a backend that was created to serve an internal connection + * from the coordinator or another worker, we disable connection caching to avoid + * escalating the number of cached connections. We can recognize such backends + * from their application name. + */ + if (application_name != NULL && strcmp(application_name, CITUS_APPLICATION_NAME) == 0) + { + isCitusInitiatedBackend = true; + } + + return isCitusInitiatedBackend || + cachedConnectionCount >= MaxCachedConnectionsPerWorker || + connection->forceCloseAtTransactionEnd || + PQstatus(connection->pgConn) != CONNECTION_OK || + !RemoteTransactionIdle(connection); +} + + +/* + * ResetConnection preserves the given connection for later usage by + * resetting its states. + */ +static void +ResetConnection(MultiConnection *connection) +{ + /* reset per-transaction state */ + ResetRemoteTransaction(connection); + ResetShardPlacementAssociation(connection); + + /* reset copy state */ + connection->copyBytesWrittenSinceLastFlush = 0; + + UnclaimConnection(connection); +} + + /* * RemoteTransactionIdle function returns true if we manually * set flag on run_commands_on_session_level_connection_to_node to true to