diff --git a/src/backend/distributed/connection/connection_management.c b/src/backend/distributed/connection/connection_management.c index f66c3b653..7e5b8a54f 100644 --- a/src/backend/distributed/connection/connection_management.c +++ b/src/backend/distributed/connection/connection_management.c @@ -32,6 +32,7 @@ #include "distributed/shared_connection_stats.h" #include "distributed/cancel_utils.h" #include "distributed/remote_commands.h" +#include "distributed/time_constants.h" #include "distributed/version_compat.h" #include "distributed/worker_log_messages.h" #include "mb/pg_wchar.h" @@ -43,6 +44,7 @@ int NodeConnectionTimeout = 30000; int MaxCachedConnectionsPerWorker = 1; +int MaxCachedConnectionLifetime = 10 * MS_PER_MINUTE; HTAB *ConnectionHash = NULL; HTAB *ConnParamsHash = NULL; @@ -1288,6 +1290,7 @@ AfterXactHostConnectionHandling(ConnectionHashEntry *entry, bool isCommit) * - Connection is forced to close at the end of transaction * - Connection is not in OK state * - A transaction is still in progress (usually because we are cancelling a distributed transaction) + * - A connection reached its maximum lifetime */ static bool ShouldShutdownConnection(MultiConnection *connection, const int cachedConnectionCount) @@ -1303,7 +1306,10 @@ ShouldShutdownConnection(MultiConnection *connection, const int cachedConnection cachedConnectionCount >= MaxCachedConnectionsPerWorker || connection->forceCloseAtTransactionEnd || PQstatus(connection->pgConn) != CONNECTION_OK || - !RemoteTransactionIdle(connection); + !RemoteTransactionIdle(connection) || + (MaxCachedConnectionLifetime >= 0 && + TimestampDifferenceExceeds(connection->connectionStart, GetCurrentTimestamp(), + MaxCachedConnectionLifetime)); } diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index e5de6c689..efa467801 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -1238,6 +1238,16 @@ RegisterCitusConfigVariables(void) GUC_STANDARD, NULL, NULL, NULL); + DefineCustomIntVariable( + "citus.max_cached_connection_lifetime", + gettext_noop("Sets the maximum lifetime of cached connections to other nodes."), + NULL, + &MaxCachedConnectionLifetime, + 10 * MS_PER_MINUTE, -1, INT_MAX, + PGC_USERSET, + GUC_UNIT_MS | GUC_STANDARD, + NULL, NULL, NULL); + DefineCustomIntVariable( "citus.repartition_join_bucket_count_per_node", gettext_noop("Sets the bucket size for repartition joins per node"), diff --git a/src/include/distributed/connection_management.h b/src/include/distributed/connection_management.h index 29408d78c..710292dc2 100644 --- a/src/include/distributed/connection_management.h +++ b/src/include/distributed/connection_management.h @@ -200,6 +200,9 @@ extern int NodeConnectionTimeout; /* maximum number of connections to cache per worker per session */ extern int MaxCachedConnectionsPerWorker; +/* maximum lifetime of connections in miliseconds */ +extern int MaxCachedConnectionLifetime; + /* parameters used for outbound connections */ extern char *NodeConninfo; diff --git a/src/test/regress/expected/shared_connection_stats.out b/src/test/regress/expected/shared_connection_stats.out index 7f9c84da5..0ce22548f 100644 --- a/src/test/regress/expected/shared_connection_stats.out +++ b/src/test/regress/expected/shared_connection_stats.out @@ -807,6 +807,7 @@ SELECT pg_sleep(0.1); (1 row) -- cache connections to the nodes +SET citus.force_max_query_parallelization TO ON; SELECT count(*) FROM test; count --------------------------------------------------------------------- @@ -823,6 +824,28 @@ BEGIN; (0 rows) COMMIT; +-- should close all connections +SET citus.max_cached_connection_lifetime TO '0s'; +SELECT count(*) FROM test; + count +--------------------------------------------------------------------- + 155 +(1 row) + +-- show that no connections are cached +SELECT + connection_count_to_node +FROM + citus_remote_connection_stats() +WHERE + port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND + database_name = 'regression' +ORDER BY + hostname, port; + connection_count_to_node +--------------------------------------------------------------------- +(0 rows) + -- in case other tests relies on these setting, reset them ALTER SYSTEM RESET citus.distributed_deadlock_detection_factor; ALTER SYSTEM RESET citus.recover_2pc_interval; diff --git a/src/test/regress/sql/shared_connection_stats.sql b/src/test/regress/sql/shared_connection_stats.sql index dbb988315..7c653e788 100644 --- a/src/test/regress/sql/shared_connection_stats.sql +++ b/src/test/regress/sql/shared_connection_stats.sql @@ -511,6 +511,7 @@ SELECT pg_reload_conf(); SELECT pg_sleep(0.1); -- cache connections to the nodes +SET citus.force_max_query_parallelization TO ON; SELECT count(*) FROM test; BEGIN; -- we should not have any reserved connections @@ -519,6 +520,21 @@ BEGIN; SELECT * FROM citus_reserved_connection_stats() ORDER BY 1,2; COMMIT; +-- should close all connections +SET citus.max_cached_connection_lifetime TO '0s'; +SELECT count(*) FROM test; + +-- show that no connections are cached +SELECT + connection_count_to_node +FROM + citus_remote_connection_stats() +WHERE + port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND + database_name = 'regression' +ORDER BY + hostname, port; + -- in case other tests relies on these setting, reset them ALTER SYSTEM RESET citus.distributed_deadlock_detection_factor; ALTER SYSTEM RESET citus.recover_2pc_interval;