mirror of https://github.com/citusdata/citus.git
Add GUC to set maximum connection lifetime
parent
3b12556401
commit
1646fca445
|
@ -32,6 +32,7 @@
|
||||||
#include "distributed/shared_connection_stats.h"
|
#include "distributed/shared_connection_stats.h"
|
||||||
#include "distributed/cancel_utils.h"
|
#include "distributed/cancel_utils.h"
|
||||||
#include "distributed/remote_commands.h"
|
#include "distributed/remote_commands.h"
|
||||||
|
#include "distributed/time_constants.h"
|
||||||
#include "distributed/version_compat.h"
|
#include "distributed/version_compat.h"
|
||||||
#include "distributed/worker_log_messages.h"
|
#include "distributed/worker_log_messages.h"
|
||||||
#include "mb/pg_wchar.h"
|
#include "mb/pg_wchar.h"
|
||||||
|
@ -43,6 +44,7 @@
|
||||||
|
|
||||||
int NodeConnectionTimeout = 30000;
|
int NodeConnectionTimeout = 30000;
|
||||||
int MaxCachedConnectionsPerWorker = 1;
|
int MaxCachedConnectionsPerWorker = 1;
|
||||||
|
int MaxCachedConnectionLifetime = 10 * MS_PER_MINUTE;
|
||||||
|
|
||||||
HTAB *ConnectionHash = NULL;
|
HTAB *ConnectionHash = NULL;
|
||||||
HTAB *ConnParamsHash = NULL;
|
HTAB *ConnParamsHash = NULL;
|
||||||
|
@ -1288,6 +1290,7 @@ AfterXactHostConnectionHandling(ConnectionHashEntry *entry, bool isCommit)
|
||||||
* - Connection is forced to close at the end of transaction
|
* - Connection is forced to close at the end of transaction
|
||||||
* - Connection is not in OK state
|
* - Connection is not in OK state
|
||||||
* - A transaction is still in progress (usually because we are cancelling a distributed transaction)
|
* - A transaction is still in progress (usually because we are cancelling a distributed transaction)
|
||||||
|
* - A connection reached its maximum lifetime
|
||||||
*/
|
*/
|
||||||
static bool
|
static bool
|
||||||
ShouldShutdownConnection(MultiConnection *connection, const int cachedConnectionCount)
|
ShouldShutdownConnection(MultiConnection *connection, const int cachedConnectionCount)
|
||||||
|
@ -1303,7 +1306,10 @@ ShouldShutdownConnection(MultiConnection *connection, const int cachedConnection
|
||||||
cachedConnectionCount >= MaxCachedConnectionsPerWorker ||
|
cachedConnectionCount >= MaxCachedConnectionsPerWorker ||
|
||||||
connection->forceCloseAtTransactionEnd ||
|
connection->forceCloseAtTransactionEnd ||
|
||||||
PQstatus(connection->pgConn) != CONNECTION_OK ||
|
PQstatus(connection->pgConn) != CONNECTION_OK ||
|
||||||
!RemoteTransactionIdle(connection);
|
!RemoteTransactionIdle(connection) ||
|
||||||
|
(MaxCachedConnectionLifetime >= 0 &&
|
||||||
|
TimestampDifferenceExceeds(connection->connectionStart, GetCurrentTimestamp(),
|
||||||
|
MaxCachedConnectionLifetime));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -1238,6 +1238,16 @@ RegisterCitusConfigVariables(void)
|
||||||
GUC_STANDARD,
|
GUC_STANDARD,
|
||||||
NULL, NULL, NULL);
|
NULL, NULL, NULL);
|
||||||
|
|
||||||
|
DefineCustomIntVariable(
|
||||||
|
"citus.max_cached_connection_lifetime",
|
||||||
|
gettext_noop("Sets the maximum lifetime of cached connections to other nodes."),
|
||||||
|
NULL,
|
||||||
|
&MaxCachedConnectionLifetime,
|
||||||
|
10 * MS_PER_MINUTE, -1, INT_MAX,
|
||||||
|
PGC_USERSET,
|
||||||
|
GUC_UNIT_MS | GUC_STANDARD,
|
||||||
|
NULL, NULL, NULL);
|
||||||
|
|
||||||
DefineCustomIntVariable(
|
DefineCustomIntVariable(
|
||||||
"citus.repartition_join_bucket_count_per_node",
|
"citus.repartition_join_bucket_count_per_node",
|
||||||
gettext_noop("Sets the bucket size for repartition joins per node"),
|
gettext_noop("Sets the bucket size for repartition joins per node"),
|
||||||
|
|
|
@ -200,6 +200,9 @@ extern int NodeConnectionTimeout;
|
||||||
/* maximum number of connections to cache per worker per session */
|
/* maximum number of connections to cache per worker per session */
|
||||||
extern int MaxCachedConnectionsPerWorker;
|
extern int MaxCachedConnectionsPerWorker;
|
||||||
|
|
||||||
|
/* maximum lifetime of connections in miliseconds */
|
||||||
|
extern int MaxCachedConnectionLifetime;
|
||||||
|
|
||||||
/* parameters used for outbound connections */
|
/* parameters used for outbound connections */
|
||||||
extern char *NodeConninfo;
|
extern char *NodeConninfo;
|
||||||
|
|
||||||
|
|
|
@ -807,6 +807,7 @@ SELECT pg_sleep(0.1);
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
-- cache connections to the nodes
|
-- cache connections to the nodes
|
||||||
|
SET citus.force_max_query_parallelization TO ON;
|
||||||
SELECT count(*) FROM test;
|
SELECT count(*) FROM test;
|
||||||
count
|
count
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
|
@ -823,6 +824,28 @@ BEGIN;
|
||||||
(0 rows)
|
(0 rows)
|
||||||
|
|
||||||
COMMIT;
|
COMMIT;
|
||||||
|
-- should close all connections
|
||||||
|
SET citus.max_cached_connection_lifetime TO '0s';
|
||||||
|
SELECT count(*) FROM test;
|
||||||
|
count
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
155
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- show that no connections are cached
|
||||||
|
SELECT
|
||||||
|
connection_count_to_node
|
||||||
|
FROM
|
||||||
|
citus_remote_connection_stats()
|
||||||
|
WHERE
|
||||||
|
port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND
|
||||||
|
database_name = 'regression'
|
||||||
|
ORDER BY
|
||||||
|
hostname, port;
|
||||||
|
connection_count_to_node
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
(0 rows)
|
||||||
|
|
||||||
-- in case other tests relies on these setting, reset them
|
-- in case other tests relies on these setting, reset them
|
||||||
ALTER SYSTEM RESET citus.distributed_deadlock_detection_factor;
|
ALTER SYSTEM RESET citus.distributed_deadlock_detection_factor;
|
||||||
ALTER SYSTEM RESET citus.recover_2pc_interval;
|
ALTER SYSTEM RESET citus.recover_2pc_interval;
|
||||||
|
|
|
@ -511,6 +511,7 @@ SELECT pg_reload_conf();
|
||||||
SELECT pg_sleep(0.1);
|
SELECT pg_sleep(0.1);
|
||||||
|
|
||||||
-- cache connections to the nodes
|
-- cache connections to the nodes
|
||||||
|
SET citus.force_max_query_parallelization TO ON;
|
||||||
SELECT count(*) FROM test;
|
SELECT count(*) FROM test;
|
||||||
BEGIN;
|
BEGIN;
|
||||||
-- we should not have any reserved connections
|
-- we should not have any reserved connections
|
||||||
|
@ -519,6 +520,21 @@ BEGIN;
|
||||||
SELECT * FROM citus_reserved_connection_stats() ORDER BY 1,2;
|
SELECT * FROM citus_reserved_connection_stats() ORDER BY 1,2;
|
||||||
COMMIT;
|
COMMIT;
|
||||||
|
|
||||||
|
-- should close all connections
|
||||||
|
SET citus.max_cached_connection_lifetime TO '0s';
|
||||||
|
SELECT count(*) FROM test;
|
||||||
|
|
||||||
|
-- show that no connections are cached
|
||||||
|
SELECT
|
||||||
|
connection_count_to_node
|
||||||
|
FROM
|
||||||
|
citus_remote_connection_stats()
|
||||||
|
WHERE
|
||||||
|
port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND
|
||||||
|
database_name = 'regression'
|
||||||
|
ORDER BY
|
||||||
|
hostname, port;
|
||||||
|
|
||||||
-- in case other tests relies on these setting, reset them
|
-- in case other tests relies on these setting, reset them
|
||||||
ALTER SYSTEM RESET citus.distributed_deadlock_detection_factor;
|
ALTER SYSTEM RESET citus.distributed_deadlock_detection_factor;
|
||||||
ALTER SYSTEM RESET citus.recover_2pc_interval;
|
ALTER SYSTEM RESET citus.recover_2pc_interval;
|
||||||
|
|
Loading…
Reference in New Issue