diff --git a/src/backend/distributed/connection/shared_connection_stats.c b/src/backend/distributed/connection/shared_connection_stats.c index 29578e8a6..e35925fbc 100644 --- a/src/backend/distributed/connection/shared_connection_stats.c +++ b/src/backend/distributed/connection/shared_connection_stats.c @@ -337,7 +337,7 @@ TryToIncrementSharedConnectionCounter(const char *hostname, int port) counterIncremented = true; } - else if (connectionEntry->connectionCount + 1 >= GetMaxSharedPoolSize()) + else if (connectionEntry->connectionCount + 1 > GetMaxSharedPoolSize()) { /* there is no space left for this connection */ counterIncremented = false; diff --git a/src/test/regress/expected/shared_connection_stats.out b/src/test/regress/expected/shared_connection_stats.out new file mode 100644 index 000000000..3986af298 --- /dev/null +++ b/src/test/regress/expected/shared_connection_stats.out @@ -0,0 +1,316 @@ +CREATE SCHEMA shared_connection_stats; +SET search_path TO shared_connection_stats; +-- set the cached connections to zero +-- and execute a distributed query so that +-- we end up with zero cached connections afterwards +ALTER SYSTEM SET citus.max_cached_conns_per_worker TO 0; +SELECT pg_reload_conf(); + pg_reload_conf +--------------------------------------------------------------------- + t +(1 row) + +-- disable deadlock detection and re-trigger 2PC recovery +-- once more when citus.max_cached_conns_per_worker is zero +-- so that we can be sure that the connections established for +-- maintanince daemon is closed properly. +-- this is to prevent random failures in the tests (otherwise, we +-- might see connections established for this operations) +ALTER SYSTEM SET citus.distributed_deadlock_detection_factor TO -1; +ALTER SYSTEM SET citus.recover_2pc_interval TO '1ms'; +SELECT pg_reload_conf(); + pg_reload_conf +--------------------------------------------------------------------- + t +(1 row) + +SELECT pg_sleep(0.1); + pg_sleep +--------------------------------------------------------------------- + +(1 row) + +-- now that last 2PC recovery is done, we're good to disable it +ALTER SYSTEM SET citus.recover_2pc_interval TO '1h'; +SELECT pg_reload_conf(); + pg_reload_conf +--------------------------------------------------------------------- + t +(1 row) + +SET citus.shard_count TO 32; +SET citus.shard_replication_factor TO 1; +CREATE TABLE test (a int); +SELECT create_distributed_table('test', 'a'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO test SELECT i FROM generate_series(0,100)i; +-- show that no connections are cached +SELECT + connection_count_to_node +FROM + citus_remote_connection_stats() +WHERE + port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND + database_name = 'regression' +ORDER BY + hostname, port; + connection_count_to_node +--------------------------------------------------------------------- + 0 + 0 +(2 rows) + +-- single shard queries require single connection per node +BEGIN; + SELECT count(*) FROM test WHERE a = 1; + count +--------------------------------------------------------------------- + 1 +(1 row) + + SELECT count(*) FROM test WHERE a = 2; + count +--------------------------------------------------------------------- + 1 +(1 row) + + SELECT + connection_count_to_node + FROM + citus_remote_connection_stats() + WHERE + port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND + database_name = 'regression' + ORDER BY + hostname, port; + connection_count_to_node +--------------------------------------------------------------------- + 1 + 1 +(2 rows) + +COMMIT; +-- show that no connections are cached +SELECT + connection_count_to_node +FROM + citus_remote_connection_stats() +WHERE + port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND + database_name = 'regression' +ORDER BY + hostname, port; + connection_count_to_node +--------------------------------------------------------------------- + 0 + 0 +(2 rows) + +-- executor is only allowed to establish a single connection per node +BEGIN; + SET LOCAL citus.max_adaptive_executor_pool_size TO 1; + SELECT count(*) FROM test; + count +--------------------------------------------------------------------- + 101 +(1 row) + + SELECT + connection_count_to_node + FROM + citus_remote_connection_stats() + WHERE + port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND + database_name = 'regression' + ORDER BY + hostname, port; + connection_count_to_node +--------------------------------------------------------------------- + 1 + 1 +(2 rows) + +COMMIT; +-- show that no connections are cached +SELECT + connection_count_to_node +FROM + citus_remote_connection_stats() +WHERE + port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND + database_name = 'regression' +ORDER BY + hostname, port; + connection_count_to_node +--------------------------------------------------------------------- + 0 + 0 +(2 rows) + +-- sequential mode is allowed to establish a single connection per node +BEGIN; + SET LOCAL citus.multi_shard_modify_mode TO 'sequential'; + SELECT count(*) FROM test; + count +--------------------------------------------------------------------- + 101 +(1 row) + + SELECT + connection_count_to_node + FROM + citus_remote_connection_stats() + WHERE + port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND + database_name = 'regression' + ORDER BY + hostname, port; + connection_count_to_node +--------------------------------------------------------------------- + 1 + 1 +(2 rows) + +COMMIT; +-- show that no connections are cached +SELECT + connection_count_to_node +FROM + citus_remote_connection_stats() +WHERE + port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND + database_name = 'regression' +ORDER BY + hostname, port; + connection_count_to_node +--------------------------------------------------------------------- + 0 + 0 +(2 rows) + +-- now, decrease the shared pool size, and prevent +-- establishing all the required connections +ALTER SYSTEM SET citus.max_shared_pool_size TO 5; +SELECT pg_reload_conf(); + pg_reload_conf +--------------------------------------------------------------------- + t +(1 row) + +SELECT pg_sleep(0.1); + pg_sleep +--------------------------------------------------------------------- + +(1 row) + +BEGIN; + SET LOCAL citus.node_connection_timeout TO 1000; + SET LOCAL citus.connection_retry_timeout TO 2000; + SET LOCAL citus.force_max_query_parallelization TO ON; + SELECT count(*) FROM test; +ERROR: citus.max_shared_pool_size number of connections are already established to the node localhost:xxxxx,so cannot establish any more connections +HINT: consider increasing citus.max_shared_pool_size or citus.connection_retry_timeout +COMMIT; +-- pg_sleep forces almost 1 connection per placement +-- now, some of the optional connections would be skipped, +-- and only 5 connections are used per node +BEGIN; + SELECT count(*), pg_sleep(0.1) FROM test; + count | pg_sleep +--------------------------------------------------------------------- + 101 | +(1 row) + + SELECT + connection_count_to_node + FROM + citus_remote_connection_stats() + WHERE + port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND + database_name = 'regression' + ORDER BY + hostname, port; + connection_count_to_node +--------------------------------------------------------------------- + 5 + 5 +(2 rows) + +COMMIT; +SHOW citus.max_shared_pool_size; + citus.max_shared_pool_size +--------------------------------------------------------------------- + 5 +(1 row) + +-- by default max_shared_pool_size equals to max_connections; +ALTER SYSTEM RESET citus.max_shared_pool_size; +SELECT pg_reload_conf(); + pg_reload_conf +--------------------------------------------------------------------- + t +(1 row) + +SELECT pg_sleep(0.1); + pg_sleep +--------------------------------------------------------------------- + +(1 row) + +SHOW citus.max_shared_pool_size; + citus.max_shared_pool_size +--------------------------------------------------------------------- + 100 +(1 row) + +SHOW max_connections; + max_connections +--------------------------------------------------------------------- + 100 +(1 row) + +-- now, each node gets 16 connections as we force 1 connection per placement +BEGIN; + SET LOCAL citus.force_max_query_parallelization TO ON; + SELECT count(*) FROM test; + count +--------------------------------------------------------------------- + 101 +(1 row) + + SELECT + connection_count_to_node + FROM + citus_remote_connection_stats() + WHERE + port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND + database_name = 'regression' + ORDER BY + hostname, port; + connection_count_to_node +--------------------------------------------------------------------- + 16 + 16 +(2 rows) + +COMMIT; +-- connection_retry_timeout cannot be smaller than node_connection_timeout +SET citus.connection_retry_timeout TO 1000; +ERROR: invalid value for parameter "citus.connection_retry_timeout": 1000 +DETAIL: citus.connection_retry_timeout cannot be smaller than citus.node_connection_timeout. +-- in case other tests relies on these setting, reset them +ALTER SYSTEM RESET citus.distributed_deadlock_detection_factor; +ALTER SYSTEM RESET citus.recover_2pc_interval; +ALTER SYSTEM RESET citus.max_cached_conns_per_worker; +SELECT pg_reload_conf(); + pg_reload_conf +--------------------------------------------------------------------- + t +(1 row) + +DROP SCHEMA shared_connection_stats CASCADE; +NOTICE: drop cascades to table test diff --git a/src/test/regress/multi_schedule b/src/test/regress/multi_schedule index fba9c3986..be64d7859 100644 --- a/src/test/regress/multi_schedule +++ b/src/test/regress/multi_schedule @@ -340,6 +340,12 @@ test: distributed_procedure # --------- test: multi_deparse_function multi_deparse_procedure +# -------- +# cannot be run in parallel with any other tests as it checks +# statistics across sessions +# -------- +test: shared_connection_stats + # --------- # test that no tests leaked intermediate results. This should always be last # --------- diff --git a/src/test/regress/sql/shared_connection_stats.sql b/src/test/regress/sql/shared_connection_stats.sql new file mode 100644 index 000000000..7213537d2 --- /dev/null +++ b/src/test/regress/sql/shared_connection_stats.sql @@ -0,0 +1,184 @@ +CREATE SCHEMA shared_connection_stats; +SET search_path TO shared_connection_stats; + +-- set the cached connections to zero +-- and execute a distributed query so that +-- we end up with zero cached connections afterwards +ALTER SYSTEM SET citus.max_cached_conns_per_worker TO 0; +SELECT pg_reload_conf(); + +-- disable deadlock detection and re-trigger 2PC recovery +-- once more when citus.max_cached_conns_per_worker is zero +-- so that we can be sure that the connections established for +-- maintanince daemon is closed properly. +-- this is to prevent random failures in the tests (otherwise, we +-- might see connections established for this operations) +ALTER SYSTEM SET citus.distributed_deadlock_detection_factor TO -1; +ALTER SYSTEM SET citus.recover_2pc_interval TO '1ms'; +SELECT pg_reload_conf(); +SELECT pg_sleep(0.1); + +-- now that last 2PC recovery is done, we're good to disable it +ALTER SYSTEM SET citus.recover_2pc_interval TO '1h'; +SELECT pg_reload_conf(); + +SET citus.shard_count TO 32; +SET citus.shard_replication_factor TO 1; +CREATE TABLE test (a int); +SELECT create_distributed_table('test', 'a'); +INSERT INTO test SELECT i FROM generate_series(0,100)i; + +-- show that no connections are cached +SELECT + connection_count_to_node +FROM + citus_remote_connection_stats() +WHERE + port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND + database_name = 'regression' +ORDER BY + hostname, port; + +-- single shard queries require single connection per node +BEGIN; + SELECT count(*) FROM test WHERE a = 1; + SELECT count(*) FROM test WHERE a = 2; + SELECT + connection_count_to_node + FROM + citus_remote_connection_stats() + WHERE + port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND + database_name = 'regression' + ORDER BY + hostname, port; +COMMIT; + +-- show that no connections are cached +SELECT + connection_count_to_node +FROM + citus_remote_connection_stats() +WHERE + port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND + database_name = 'regression' +ORDER BY + hostname, port; + +-- executor is only allowed to establish a single connection per node +BEGIN; + SET LOCAL citus.max_adaptive_executor_pool_size TO 1; + SELECT count(*) FROM test; + SELECT + connection_count_to_node + FROM + citus_remote_connection_stats() + WHERE + port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND + database_name = 'regression' + ORDER BY + hostname, port; +COMMIT; + +-- show that no connections are cached +SELECT + connection_count_to_node +FROM + citus_remote_connection_stats() +WHERE + port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND + database_name = 'regression' +ORDER BY + hostname, port; + +-- sequential mode is allowed to establish a single connection per node +BEGIN; + SET LOCAL citus.multi_shard_modify_mode TO 'sequential'; + SELECT count(*) FROM test; + SELECT + connection_count_to_node + FROM + citus_remote_connection_stats() + WHERE + port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND + database_name = 'regression' + ORDER BY + hostname, port; +COMMIT; + +-- show that no connections are cached +SELECT + connection_count_to_node +FROM + citus_remote_connection_stats() +WHERE + port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND + database_name = 'regression' +ORDER BY + hostname, port; + +-- now, decrease the shared pool size, and prevent +-- establishing all the required connections +ALTER SYSTEM SET citus.max_shared_pool_size TO 5; +SELECT pg_reload_conf(); +SELECT pg_sleep(0.1); + +BEGIN; + SET LOCAL citus.node_connection_timeout TO 1000; + SET LOCAL citus.connection_retry_timeout TO 2000; + SET LOCAL citus.force_max_query_parallelization TO ON; + SELECT count(*) FROM test; +COMMIT; + +-- pg_sleep forces almost 1 connection per placement +-- now, some of the optional connections would be skipped, +-- and only 5 connections are used per node +BEGIN; + SELECT count(*), pg_sleep(0.1) FROM test; + SELECT + connection_count_to_node + FROM + citus_remote_connection_stats() + WHERE + port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND + database_name = 'regression' + ORDER BY + hostname, port; +COMMIT; + + +SHOW citus.max_shared_pool_size; + +-- by default max_shared_pool_size equals to max_connections; +ALTER SYSTEM RESET citus.max_shared_pool_size; +SELECT pg_reload_conf(); +SELECT pg_sleep(0.1); + +SHOW citus.max_shared_pool_size; +SHOW max_connections; + +-- now, each node gets 16 connections as we force 1 connection per placement +BEGIN; + SET LOCAL citus.force_max_query_parallelization TO ON; + SELECT count(*) FROM test; + SELECT + connection_count_to_node + FROM + citus_remote_connection_stats() + WHERE + port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND + database_name = 'regression' + ORDER BY + hostname, port; +COMMIT; + +-- connection_retry_timeout cannot be smaller than node_connection_timeout +SET citus.connection_retry_timeout TO 1000; + +-- in case other tests relies on these setting, reset them +ALTER SYSTEM RESET citus.distributed_deadlock_detection_factor; +ALTER SYSTEM RESET citus.recover_2pc_interval; +ALTER SYSTEM RESET citus.max_cached_conns_per_worker; +SELECT pg_reload_conf(); + +DROP SCHEMA shared_connection_stats CASCADE;