diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index 7ee2a0e77..30b2e981e 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -589,6 +589,7 @@ static WorkerPool * FindOrCreateWorkerPool(DistributedExecution *execution, static WorkerSession * FindOrCreateWorkerSession(WorkerPool *workerPool, MultiConnection *connection); static void ManageWorkerPool(WorkerPool *workerPool); +static bool CanUseOptionalConnection(WorkerPool *workerPool); static void CheckConnectionTimeout(WorkerPool *workerPool); static int UsableConnectionCount(WorkerPool *workerPool); static long NextEventTimeout(DistributedExecution *execution); @@ -2386,7 +2387,7 @@ ManageWorkerPool(WorkerPool *workerPool) connectionFlags |= OUTSIDE_TRANSACTION; } - if (list_length(workerPool->sessionList) > 0 && !UseConnectionPerPlacement()) + if (CanUseOptionalConnection(workerPool)) { /* * The executor can finish the execution with a single connection, @@ -2433,6 +2434,45 @@ ManageWorkerPool(WorkerPool *workerPool) } +/* + * ConnectionThrottlingRequired contains the logic to decide whether a new connection + * can be considered as optional or not. When the function return true, the connection + * can be established with optinal flag, else it should not be an optional connection. + */ +static bool +CanUseOptionalConnection(WorkerPool *workerPool) +{ + if (list_length(workerPool->sessionList) == 0) + { + /* + * We definitely need at least 1 connection to finish the execution. + * All single shard queries hit here with the default settings. + */ + return false; + } + else if (list_length(workerPool->sessionList) < MaxCachedConnectionsPerWorker) + { + /* + * Until this session caches MaxCachedConnectionsPerWorker connections, + * this might lead some optional connections to be considered as non-optional + * when MaxCachedConnectionsPerWorker > 1. + * + * However, once the session caches MaxCachedConnectionsPerWorker (which is + * the second transaction executed in the session), Citus would utilize the + * cached connections as much as possible. + */ + return false; + } + else if (UseConnectionPerPlacement()) + { + /* user wants one connection per placement, so no throttling is desired */ + return false; + } + + return true; +} + + /* * CheckConnectionTimeout makes sure that the execution enforces the connection * establishment timeout defined by the user (NodeConnectionTimeout). diff --git a/src/test/regress/expected/shared_connection_stats.out b/src/test/regress/expected/shared_connection_stats.out index 3986af298..998709279 100644 --- a/src/test/regress/expected/shared_connection_stats.out +++ b/src/test/regress/expected/shared_connection_stats.out @@ -297,6 +297,55 @@ BEGIN; 16 (2 rows) +COMMIT; +-- now show that when max_cached_conns_per_worker > 1 +-- Citus forces the first execution to open at least 2 +-- connections that are cached. Later, that 2 cached +-- connections are user +BEGIN; + SET LOCAL citus.max_cached_conns_per_worker TO 2; + SELECT count(*) FROM test; + count +--------------------------------------------------------------------- + 101 +(1 row) + + SELECT + connection_count_to_node >= 2 + FROM + citus_remote_connection_stats() + WHERE + port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND + database_name = 'regression' + ORDER BY + hostname, port; + ?column? +--------------------------------------------------------------------- + t + t +(2 rows) + + SELECT count(*) FROM test; + count +--------------------------------------------------------------------- + 101 +(1 row) + + SELECT + connection_count_to_node >= 2 + FROM + citus_remote_connection_stats() + WHERE + port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND + database_name = 'regression' + ORDER BY + hostname, port; + ?column? +--------------------------------------------------------------------- + t + t +(2 rows) + COMMIT; -- connection_retry_timeout cannot be smaller than node_connection_timeout SET citus.connection_retry_timeout TO 1000; diff --git a/src/test/regress/sql/shared_connection_stats.sql b/src/test/regress/sql/shared_connection_stats.sql index 7213537d2..581698809 100644 --- a/src/test/regress/sql/shared_connection_stats.sql +++ b/src/test/regress/sql/shared_connection_stats.sql @@ -172,6 +172,34 @@ BEGIN; hostname, port; COMMIT; +-- now show that when max_cached_conns_per_worker > 1 +-- Citus forces the first execution to open at least 2 +-- connections that are cached. Later, that 2 cached +-- connections are user +BEGIN; + SET LOCAL citus.max_cached_conns_per_worker TO 2; + SELECT count(*) FROM test; + SELECT + connection_count_to_node >= 2 + FROM + citus_remote_connection_stats() + WHERE + port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND + database_name = 'regression' + ORDER BY + hostname, port; + SELECT count(*) FROM test; + SELECT + connection_count_to_node >= 2 + FROM + citus_remote_connection_stats() + WHERE + port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND + database_name = 'regression' + ORDER BY + hostname, port; +COMMIT; + -- connection_retry_timeout cannot be smaller than node_connection_timeout SET citus.connection_retry_timeout TO 1000;