mirror of https://github.com/citusdata/citus.git
Introduce CanUseOptionalConnection() to consider cached connections
parent
275d9d83f8
commit
531de5d848
|
@ -589,6 +589,7 @@ static WorkerPool * FindOrCreateWorkerPool(DistributedExecution *execution,
|
||||||
static WorkerSession * FindOrCreateWorkerSession(WorkerPool *workerPool,
|
static WorkerSession * FindOrCreateWorkerSession(WorkerPool *workerPool,
|
||||||
MultiConnection *connection);
|
MultiConnection *connection);
|
||||||
static void ManageWorkerPool(WorkerPool *workerPool);
|
static void ManageWorkerPool(WorkerPool *workerPool);
|
||||||
|
static bool CanUseOptionalConnection(WorkerPool *workerPool);
|
||||||
static void CheckConnectionTimeout(WorkerPool *workerPool);
|
static void CheckConnectionTimeout(WorkerPool *workerPool);
|
||||||
static int UsableConnectionCount(WorkerPool *workerPool);
|
static int UsableConnectionCount(WorkerPool *workerPool);
|
||||||
static long NextEventTimeout(DistributedExecution *execution);
|
static long NextEventTimeout(DistributedExecution *execution);
|
||||||
|
@ -2386,7 +2387,7 @@ ManageWorkerPool(WorkerPool *workerPool)
|
||||||
connectionFlags |= OUTSIDE_TRANSACTION;
|
connectionFlags |= OUTSIDE_TRANSACTION;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (list_length(workerPool->sessionList) > 0 && !UseConnectionPerPlacement())
|
if (CanUseOptionalConnection(workerPool))
|
||||||
{
|
{
|
||||||
/*
|
/*
|
||||||
* The executor can finish the execution with a single connection,
|
* The executor can finish the execution with a single connection,
|
||||||
|
@ -2433,6 +2434,45 @@ ManageWorkerPool(WorkerPool *workerPool)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* ConnectionThrottlingRequired contains the logic to decide whether a new connection
|
||||||
|
* can be considered as optional or not. When the function return true, the connection
|
||||||
|
* can be established with optinal flag, else it should not be an optional connection.
|
||||||
|
*/
|
||||||
|
static bool
|
||||||
|
CanUseOptionalConnection(WorkerPool *workerPool)
|
||||||
|
{
|
||||||
|
if (list_length(workerPool->sessionList) == 0)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* We definitely need at least 1 connection to finish the execution.
|
||||||
|
* All single shard queries hit here with the default settings.
|
||||||
|
*/
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
else if (list_length(workerPool->sessionList) < MaxCachedConnectionsPerWorker)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* Until this session caches MaxCachedConnectionsPerWorker connections,
|
||||||
|
* this might lead some optional connections to be considered as non-optional
|
||||||
|
* when MaxCachedConnectionsPerWorker > 1.
|
||||||
|
*
|
||||||
|
* However, once the session caches MaxCachedConnectionsPerWorker (which is
|
||||||
|
* the second transaction executed in the session), Citus would utilize the
|
||||||
|
* cached connections as much as possible.
|
||||||
|
*/
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
else if (UseConnectionPerPlacement())
|
||||||
|
{
|
||||||
|
/* user wants one connection per placement, so no throttling is desired */
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* CheckConnectionTimeout makes sure that the execution enforces the connection
|
* CheckConnectionTimeout makes sure that the execution enforces the connection
|
||||||
* establishment timeout defined by the user (NodeConnectionTimeout).
|
* establishment timeout defined by the user (NodeConnectionTimeout).
|
||||||
|
|
|
@ -297,6 +297,55 @@ BEGIN;
|
||||||
16
|
16
|
||||||
(2 rows)
|
(2 rows)
|
||||||
|
|
||||||
|
COMMIT;
|
||||||
|
-- now show that when max_cached_conns_per_worker > 1
|
||||||
|
-- Citus forces the first execution to open at least 2
|
||||||
|
-- connections that are cached. Later, that 2 cached
|
||||||
|
-- connections are user
|
||||||
|
BEGIN;
|
||||||
|
SET LOCAL citus.max_cached_conns_per_worker TO 2;
|
||||||
|
SELECT count(*) FROM test;
|
||||||
|
count
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
101
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT
|
||||||
|
connection_count_to_node >= 2
|
||||||
|
FROM
|
||||||
|
citus_remote_connection_stats()
|
||||||
|
WHERE
|
||||||
|
port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND
|
||||||
|
database_name = 'regression'
|
||||||
|
ORDER BY
|
||||||
|
hostname, port;
|
||||||
|
?column?
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
t
|
||||||
|
t
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
SELECT count(*) FROM test;
|
||||||
|
count
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
101
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT
|
||||||
|
connection_count_to_node >= 2
|
||||||
|
FROM
|
||||||
|
citus_remote_connection_stats()
|
||||||
|
WHERE
|
||||||
|
port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND
|
||||||
|
database_name = 'regression'
|
||||||
|
ORDER BY
|
||||||
|
hostname, port;
|
||||||
|
?column?
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
t
|
||||||
|
t
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
COMMIT;
|
COMMIT;
|
||||||
-- connection_retry_timeout cannot be smaller than node_connection_timeout
|
-- connection_retry_timeout cannot be smaller than node_connection_timeout
|
||||||
SET citus.connection_retry_timeout TO 1000;
|
SET citus.connection_retry_timeout TO 1000;
|
||||||
|
|
|
@ -172,6 +172,34 @@ BEGIN;
|
||||||
hostname, port;
|
hostname, port;
|
||||||
COMMIT;
|
COMMIT;
|
||||||
|
|
||||||
|
-- now show that when max_cached_conns_per_worker > 1
|
||||||
|
-- Citus forces the first execution to open at least 2
|
||||||
|
-- connections that are cached. Later, that 2 cached
|
||||||
|
-- connections are user
|
||||||
|
BEGIN;
|
||||||
|
SET LOCAL citus.max_cached_conns_per_worker TO 2;
|
||||||
|
SELECT count(*) FROM test;
|
||||||
|
SELECT
|
||||||
|
connection_count_to_node >= 2
|
||||||
|
FROM
|
||||||
|
citus_remote_connection_stats()
|
||||||
|
WHERE
|
||||||
|
port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND
|
||||||
|
database_name = 'regression'
|
||||||
|
ORDER BY
|
||||||
|
hostname, port;
|
||||||
|
SELECT count(*) FROM test;
|
||||||
|
SELECT
|
||||||
|
connection_count_to_node >= 2
|
||||||
|
FROM
|
||||||
|
citus_remote_connection_stats()
|
||||||
|
WHERE
|
||||||
|
port IN (SELECT node_port FROM master_get_active_worker_nodes()) AND
|
||||||
|
database_name = 'regression'
|
||||||
|
ORDER BY
|
||||||
|
hostname, port;
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
-- connection_retry_timeout cannot be smaller than node_connection_timeout
|
-- connection_retry_timeout cannot be smaller than node_connection_timeout
|
||||||
SET citus.connection_retry_timeout TO 1000;
|
SET citus.connection_retry_timeout TO 1000;
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue