CREATE SCHEMA shared_connection_stats; SET search_path TO shared_connection_stats; -- set the cached connections to zero -- and execute a distributed query so that -- we end up with zero cached connections afterwards ALTER SYSTEM SET citus.max_cached_conns_per_worker TO 0; SELECT pg_reload_conf(); pg_reload_conf --------------------------------------------------------------------- t (1 row) -- disable deadlock detection and re-trigger 2PC recovery -- once more when citus.max_cached_conns_per_worker is zero -- so that we can be sure that the connections established for -- maintanince daemon is closed properly. -- this is to prevent random failures in the tests (otherwise, we -- might see connections established for this operations) ALTER SYSTEM SET citus.distributed_deadlock_detection_factor TO -1; ALTER SYSTEM SET citus.recover_2pc_interval TO '1ms'; SELECT pg_reload_conf(); pg_reload_conf --------------------------------------------------------------------- t (1 row) SELECT pg_sleep(0.1); pg_sleep --------------------------------------------------------------------- (1 row) -- now that last 2PC recovery is done, we're good to disable it ALTER SYSTEM SET citus.recover_2pc_interval TO '1h'; SELECT pg_reload_conf(); pg_reload_conf --------------------------------------------------------------------- t (1 row) SET citus.shard_count TO 32; SET citus.shard_replication_factor TO 1; CREATE TABLE test (a int); SELECT create_distributed_table('test', 'a'); create_distributed_table --------------------------------------------------------------------- (1 row) INSERT INTO test SELECT i FROM generate_series(0,100)i; -- show that no connections are cached SELECT connection_count_to_node FROM citus_remote_connection_stats() WHERE port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND database_name = 'regression' ORDER BY hostname, port; connection_count_to_node --------------------------------------------------------------------- 0 0 (2 rows) -- single shard queries require single connection per node BEGIN; SELECT count(*) FROM test WHERE a = 1; count --------------------------------------------------------------------- 1 (1 row) SELECT count(*) FROM test WHERE a = 2; count --------------------------------------------------------------------- 1 (1 row) SELECT connection_count_to_node FROM citus_remote_connection_stats() WHERE port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND database_name = 'regression' ORDER BY hostname, port; connection_count_to_node --------------------------------------------------------------------- 1 1 (2 rows) COMMIT; -- show that no connections are cached SELECT connection_count_to_node FROM citus_remote_connection_stats() WHERE port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND database_name = 'regression' ORDER BY hostname, port; connection_count_to_node --------------------------------------------------------------------- 0 0 (2 rows) -- executor is only allowed to establish a single connection per node BEGIN; SET LOCAL citus.max_adaptive_executor_pool_size TO 1; SELECT count(*) FROM test; count --------------------------------------------------------------------- 101 (1 row) SELECT connection_count_to_node FROM citus_remote_connection_stats() WHERE port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND database_name = 'regression' ORDER BY hostname, port; connection_count_to_node --------------------------------------------------------------------- 1 1 (2 rows) COMMIT; -- show that no connections are cached SELECT connection_count_to_node FROM citus_remote_connection_stats() WHERE port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND database_name = 'regression' ORDER BY hostname, port; connection_count_to_node --------------------------------------------------------------------- 0 0 (2 rows) -- sequential mode is allowed to establish a single connection per node BEGIN; SET LOCAL citus.multi_shard_modify_mode TO 'sequential'; SELECT count(*) FROM test; count --------------------------------------------------------------------- 101 (1 row) SELECT connection_count_to_node FROM citus_remote_connection_stats() WHERE port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND database_name = 'regression' ORDER BY hostname, port; connection_count_to_node --------------------------------------------------------------------- 1 1 (2 rows) COMMIT; -- show that no connections are cached SELECT connection_count_to_node FROM citus_remote_connection_stats() WHERE port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND database_name = 'regression' ORDER BY hostname, port; connection_count_to_node --------------------------------------------------------------------- 0 0 (2 rows) -- now, decrease the shared pool size, and prevent -- establishing all the required connections ALTER SYSTEM SET citus.max_shared_pool_size TO 5; SELECT pg_reload_conf(); pg_reload_conf --------------------------------------------------------------------- t (1 row) SELECT pg_sleep(0.1); pg_sleep --------------------------------------------------------------------- (1 row) BEGIN; SET LOCAL citus.node_connection_timeout TO 1000; SET LOCAL citus.connection_retry_timeout TO 2000; SET LOCAL citus.force_max_query_parallelization TO ON; SELECT count(*) FROM test; ERROR: citus.max_shared_pool_size number of connections are already established to the node localhost:xxxxx,so cannot establish any more connections HINT: consider increasing citus.max_shared_pool_size or citus.connection_retry_timeout COMMIT; -- pg_sleep forces almost 1 connection per placement -- now, some of the optional connections would be skipped, -- and only 5 connections are used per node BEGIN; SELECT count(*), pg_sleep(0.1) FROM test; count | pg_sleep --------------------------------------------------------------------- 101 | (1 row) SELECT connection_count_to_node FROM citus_remote_connection_stats() WHERE port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND database_name = 'regression' ORDER BY hostname, port; connection_count_to_node --------------------------------------------------------------------- 5 5 (2 rows) COMMIT; SHOW citus.max_shared_pool_size; citus.max_shared_pool_size --------------------------------------------------------------------- 5 (1 row) -- by default max_shared_pool_size equals to max_connections; ALTER SYSTEM RESET citus.max_shared_pool_size; SELECT pg_reload_conf(); pg_reload_conf --------------------------------------------------------------------- t (1 row) SELECT pg_sleep(0.1); pg_sleep --------------------------------------------------------------------- (1 row) SHOW citus.max_shared_pool_size; citus.max_shared_pool_size --------------------------------------------------------------------- 100 (1 row) SHOW max_connections; max_connections --------------------------------------------------------------------- 100 (1 row) -- now, each node gets 16 connections as we force 1 connection per placement BEGIN; SET LOCAL citus.force_max_query_parallelization TO ON; SELECT count(*) FROM test; count --------------------------------------------------------------------- 101 (1 row) SELECT connection_count_to_node FROM citus_remote_connection_stats() WHERE port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND database_name = 'regression' ORDER BY hostname, port; connection_count_to_node --------------------------------------------------------------------- 16 16 (2 rows) COMMIT; -- connection_retry_timeout cannot be smaller than node_connection_timeout SET citus.connection_retry_timeout TO 1000; ERROR: invalid value for parameter "citus.connection_retry_timeout": 1000 DETAIL: citus.connection_retry_timeout cannot be smaller than citus.node_connection_timeout. -- in case other tests relies on these setting, reset them ALTER SYSTEM RESET citus.distributed_deadlock_detection_factor; ALTER SYSTEM RESET citus.recover_2pc_interval; ALTER SYSTEM RESET citus.max_cached_conns_per_worker; SELECT pg_reload_conf(); pg_reload_conf --------------------------------------------------------------------- t (1 row) DROP SCHEMA shared_connection_stats CASCADE; NOTICE: drop cascades to table test