- Change the implementation from quota of the shared pool to a separate pool

- Improve tests
pull/7286/head
ivyazmitinov 2023-11-29 17:34:38 +03:00
parent f447b39b84
commit 09917f846f
6 changed files with 74 additions and 43 deletions

View File

@ -481,6 +481,7 @@ EnsureConnectionPossibilityForNode(WorkerNode *workerNode, bool waitForConnectio
bool bool
IsReservationPossible(void) IsReservationPossible(void)
{ {
// TODO add check for maintenance connection
if (GetMaxSharedPoolSize() == DISABLE_CONNECTION_THROTTLING) if (GetMaxSharedPoolSize() == DISABLE_CONNECTION_THROTTLING)
{ {
/* connection throttling disabled */ /* connection throttling disabled */

View File

@ -105,11 +105,11 @@ typedef struct SharedWorkerNodeDatabaseConnStatsHashEntry
int MaxSharedPoolSize = 0; int MaxSharedPoolSize = 0;
/* /*
* Controlled via a GUC, never access directly, use GetSharedPoolSizeMaintenanceQuota(). * Controlled via a GUC, never access directly, use GetMaxMaintenanceSharedPoolSize().
* Percent of MaxSharedPoolSize reserved for maintenance operations. * Pool size for maintenance connections exclusively
* "0" effectively means that regular and maintenance connection will compete over the common pool * "0" or "-1" means do not apply connection throttling
*/ */
double SharedPoolSizeMaintenanceQuota = 0.1; int MaxMaintenanceSharedPoolSize = 5;
/* /*
* Controlled via a GUC, never access directly, use GetLocalSharedPoolSize(). * Controlled via a GUC, never access directly, use GetLocalSharedPoolSize().
@ -142,7 +142,7 @@ static uint32 SharedConnectionHashHash(const void *key, Size keysize);
static int SharedConnectionHashCompare(const void *a, const void *b, Size keysize); static int SharedConnectionHashCompare(const void *a, const void *b, Size keysize);
static uint32 SharedWorkerNodeDatabaseHashHash(const void *key, Size keysize); static uint32 SharedWorkerNodeDatabaseHashHash(const void *key, Size keysize);
static int SharedWorkerNodeDatabaseHashCompare(const void *a, const void *b, Size keysize); static int SharedWorkerNodeDatabaseHashCompare(const void *a, const void *b, Size keysize);
static bool isConnectionThrottlingDisabled(); static bool isConnectionThrottlingDisabled(uint32 externalFlags);
static bool static bool
IncrementSharedConnectionCounterInternal(uint32 externalFlags, bool checkLimits, const char *hostname, int port, IncrementSharedConnectionCounterInternal(uint32 externalFlags, bool checkLimits, const char *hostname, int port,
Oid database); Oid database);
@ -257,10 +257,10 @@ GetMaxSharedPoolSize(void)
return MaxSharedPoolSize; return MaxSharedPoolSize;
} }
double int
GetSharedPoolSizeMaintenanceQuota(void) GetMaxMaintenanceSharedPoolSize(void)
{ {
return SharedPoolSizeMaintenanceQuota; return MaxMaintenanceSharedPoolSize;
} }
@ -315,7 +315,7 @@ WaitLoopForSharedConnection(uint32 flags, const char *hostname, int port)
bool bool
TryToIncrementSharedConnectionCounter(uint32 flags, const char *hostname, int port) TryToIncrementSharedConnectionCounter(uint32 flags, const char *hostname, int port)
{ {
if (isConnectionThrottlingDisabled()) if (isConnectionThrottlingDisabled(flags))
{ {
return true; return true;
} }
@ -347,7 +347,7 @@ TryToIncrementSharedConnectionCounter(uint32 flags, const char *hostname, int po
void void
IncrementSharedConnectionCounter(uint32 flags, const char *hostname, int port) IncrementSharedConnectionCounter(uint32 flags, const char *hostname, int port)
{ {
if (isConnectionThrottlingDisabled()) if (isConnectionThrottlingDisabled(flags))
{ {
return; return;
} }
@ -430,29 +430,22 @@ IncrementSharedConnectionCounterInternal(uint32 externalFlags,
{ {
WorkerNode *workerNode = FindWorkerNode(hostname, port); WorkerNode *workerNode = FindWorkerNode(hostname, port);
bool connectionToLocalNode = workerNode && (workerNode->groupId == GetLocalGroupId()); bool connectionToLocalNode = workerNode && (workerNode->groupId == GetLocalGroupId());
int currentConnectionsLimit = connectionToLocalNode
? GetLocalSharedPoolSize() int currentConnectionsLimit;
: GetMaxSharedPoolSize();
int currentConnectionsCount; int currentConnectionsCount;
if (maintenanceConnection)
if (GetSharedPoolSizeMaintenanceQuota() > 0)
{ {
int maintenanceQuota = (int) ceil((double) currentConnectionsLimit * GetSharedPoolSizeMaintenanceQuota()); currentConnectionsLimit = GetMaxMaintenanceSharedPoolSize();
/* Connections limit should never go below 1 */ currentConnectionsCount = workerNodeConnectionEntry->maintenanceConnectionsCount;
currentConnectionsLimit = Max(maintenanceConnection
? maintenanceQuota
: currentConnectionsLimit - maintenanceQuota, 1);
currentConnectionsCount = maintenanceConnection
? workerNodeConnectionEntry->maintenanceConnectionsCount
: workerNodeConnectionEntry->regularConnectionsCount;
} }
else else
{ {
/* When maintenance quota disabled, all connections treated equally*/ currentConnectionsLimit = connectionToLocalNode
currentConnectionsCount = (workerNodeConnectionEntry->maintenanceConnectionsCount + ? GetLocalSharedPoolSize()
workerNodeConnectionEntry->regularConnectionsCount); : GetMaxSharedPoolSize();
currentConnectionsCount = workerNodeConnectionEntry->regularConnectionsCount;
} }
bool remoteNodeLimitExceeded = currentConnectionsCount + 1 > currentConnectionsLimit; bool remoteNodeLimitExceeded = currentConnectionsCount + 1 > currentConnectionsLimit;
/* /*
@ -519,7 +512,8 @@ IncrementSharedConnectionCounterInternal(uint32 externalFlags,
void void
DecrementSharedConnectionCounter(uint32 externalFlags, const char *hostname, int port) DecrementSharedConnectionCounter(uint32 externalFlags, const char *hostname, int port)
{ {
if (isConnectionThrottlingDisabled()) // TODO: possible bug, remove this check?
if (isConnectionThrottlingDisabled(externalFlags))
{ {
return; return;
} }
@ -964,11 +958,14 @@ SharedWorkerNodeDatabaseHashCompare(const void *a, const void *b, Size keysize)
ca->database != cb->database; ca->database != cb->database;
} }
static bool isConnectionThrottlingDisabled() static bool isConnectionThrottlingDisabled(uint32 externalFlags)
{ {
bool maintenanceConnection = externalFlags & MAINTENANCE_CONNECTION;
/* /*
* Do not call Get*PoolSize() functions here, since it may read from * Do not call Get*PoolSize() functions here, since it may read from
* the catalog and we may be in the process exit handler. * the catalog and we may be in the process exit handler.
*/ */
return MaxSharedPoolSize == DISABLE_CONNECTION_THROTTLING; return maintenanceConnection
? MaxMaintenanceSharedPoolSize <= 0
: MaxSharedPoolSize == DISABLE_CONNECTION_THROTTLING;
} }

View File

@ -1993,18 +1993,17 @@ RegisterCitusConfigVariables(void)
GUC_SUPERUSER_ONLY, GUC_SUPERUSER_ONLY,
NULL, NULL, MaxSharedPoolSizeGucShowHook); NULL, NULL, MaxSharedPoolSizeGucShowHook);
DefineCustomRealVariable( DefineCustomIntVariable(
"citus.shared_pool_size_maintenance_quota", "citus.max_maintenance_shared_pool_size",
gettext_noop("Sets the fraction of citus.max_shared_pool_size reserved " gettext_noop("Similar to citus.max_shared_pool_size, but applies to connections "
"for maintenance operations only. " "for maintenance operations only."
"Setting it to 0 disables the quota. " "Setting it to 0 or -1 disables maintenance connection throttling."),
"This way the maintenance and regular connections will share the same pool"),
NULL, NULL,
&SharedPoolSizeMaintenanceQuota, &MaxMaintenanceSharedPoolSize,
0.1, 0, 1, 5, -1, INT_MAX,
PGC_SIGHUP, PGC_SIGHUP,
GUC_SUPERUSER_ONLY, GUC_SUPERUSER_ONLY,
NULL, NULL, MaxSharedPoolSizeGucShowHook); NULL, NULL, NULL);
DefineCustomIntVariable( DefineCustomIntVariable(
"citus.max_worker_nodes_tracked", "citus.max_worker_nodes_tracked",

View File

@ -25,7 +25,7 @@ enum SharedPoolCounterMode
}; };
extern int MaxSharedPoolSize; extern int MaxSharedPoolSize;
extern double SharedPoolSizeMaintenanceQuota; extern int MaxMaintenanceSharedPoolSize;
extern int LocalSharedPoolSize; extern int LocalSharedPoolSize;
extern int MaxClientConnections; extern int MaxClientConnections;
@ -37,7 +37,7 @@ extern size_t SharedConnectionStatsShmemSize(void);
extern void SharedConnectionStatsShmemInit(void); extern void SharedConnectionStatsShmemInit(void);
extern int GetMaxClientConnections(void); extern int GetMaxClientConnections(void);
extern int GetMaxSharedPoolSize(void); extern int GetMaxSharedPoolSize(void);
extern double GetSharedPoolSizeMaintenanceQuota(void); extern int GetMaxMaintenanceSharedPoolSize(void);
extern int GetLocalSharedPoolSize(void); extern int GetLocalSharedPoolSize(void);
extern bool TryToIncrementSharedConnectionCounter(uint32 flags, const char *hostname, int port); extern bool TryToIncrementSharedConnectionCounter(uint32 flags, const char *hostname, int port);
extern void WaitLoopForSharedConnection(uint32 flags, const char *hostname, int port); extern void WaitLoopForSharedConnection(uint32 flags, const char *hostname, int port);

View File

@ -263,6 +263,14 @@ SELECT pg_sleep_for('10 seconds'::interval);
(1 row) (1 row)
-- Verify maintenance result -- Verify maintenance result
SELECT count(*) = 0 AS too_many_clients_test
FROM regexp_split_to_table(pg_read_file('../log/postmaster.log'), E'\n') AS t(log_line)
WHERE log_line LIKE '%sorry, too many clients already%';
too_many_clients_test
---------------------------------------------------------------------
t
(1 row)
SELECT count(*) = 0 AS pg_dist_transaction_after_recovery_coordinator_test SELECT count(*) = 0 AS pg_dist_transaction_after_recovery_coordinator_test
FROM pg_database, FROM pg_database,
dblink(format('dbname=%s host=localhost port=%s user=postgres', datname, dblink(format('dbname=%s host=localhost port=%s user=postgres', datname,
@ -289,6 +297,14 @@ WHERE state = 'idle'
(1 row) (1 row)
\c - - - :worker_1_port \c - - - :worker_1_port
SELECT count(*) = 0 AS too_many_clients_test
FROM regexp_split_to_table(pg_read_file('../log/postmaster.log'), E'\n') AS t(log_line)
WHERE log_line LIKE '%sorry, too many clients already%';
too_many_clients_test
---------------------------------------------------------------------
t
(1 row)
SELECT count(*) = 0 AS pg_prepared_xacts_after_recover_worker_1_test SELECT count(*) = 0 AS pg_prepared_xacts_after_recover_worker_1_test
FROM pg_prepared_xacts FROM pg_prepared_xacts
WHERE gid LIKE 'citus_0_1234_4_0_%' WHERE gid LIKE 'citus_0_1234_4_0_%'
@ -308,6 +324,14 @@ WHERE state = 'idle'
(1 row) (1 row)
\c - - - :worker_2_port \c - - - :worker_2_port
SELECT count(*) = 0 AS too_many_clients_test
FROM regexp_split_to_table(pg_read_file('../log/postmaster.log'), E'\n') AS t(log_line)
WHERE log_line LIKE '%sorry, too many clients already%';
too_many_clients_test
---------------------------------------------------------------------
t
(1 row)
SELECT count(*) = 0 AS pg_prepared_xacts_after_recover_worker_2_test SELECT count(*) = 0 AS pg_prepared_xacts_after_recover_worker_2_test
FROM pg_prepared_xacts FROM pg_prepared_xacts
WHERE gid LIKE 'citus_0_1234_4_0_%' WHERE gid LIKE 'citus_0_1234_4_0_%'

View File

@ -218,8 +218,6 @@ WHERE state = 'idle'
:turn_on_maintenance :turn_on_maintenance
\c - - - :master_port \c - - - :master_port
-- Let maintenance do it's work... -- Let maintenance do it's work...
@ -228,6 +226,10 @@ SELECT pg_sleep_for('10 seconds'::interval);
-- Verify maintenance result -- Verify maintenance result
SELECT count(*) = 0 AS too_many_clients_test
FROM regexp_split_to_table(pg_read_file('../log/postmaster.log'), E'\n') AS t(log_line)
WHERE log_line LIKE '%sorry, too many clients already%';
SELECT count(*) = 0 AS pg_dist_transaction_after_recovery_coordinator_test SELECT count(*) = 0 AS pg_dist_transaction_after_recovery_coordinator_test
FROM pg_database, FROM pg_database,
dblink(format('dbname=%s host=localhost port=%s user=postgres', datname, dblink(format('dbname=%s host=localhost port=%s user=postgres', datname,
@ -247,6 +249,10 @@ WHERE state = 'idle'
\c - - - :worker_1_port \c - - - :worker_1_port
SELECT count(*) = 0 AS too_many_clients_test
FROM regexp_split_to_table(pg_read_file('../log/postmaster.log'), E'\n') AS t(log_line)
WHERE log_line LIKE '%sorry, too many clients already%';
SELECT count(*) = 0 AS pg_prepared_xacts_after_recover_worker_1_test SELECT count(*) = 0 AS pg_prepared_xacts_after_recover_worker_1_test
FROM pg_prepared_xacts FROM pg_prepared_xacts
WHERE gid LIKE 'citus_0_1234_4_0_%' WHERE gid LIKE 'citus_0_1234_4_0_%'
@ -259,6 +265,10 @@ WHERE state = 'idle'
\c - - - :worker_2_port \c - - - :worker_2_port
SELECT count(*) = 0 AS too_many_clients_test
FROM regexp_split_to_table(pg_read_file('../log/postmaster.log'), E'\n') AS t(log_line)
WHERE log_line LIKE '%sorry, too many clients already%';
SELECT count(*) = 0 AS pg_prepared_xacts_after_recover_worker_2_test SELECT count(*) = 0 AS pg_prepared_xacts_after_recover_worker_2_test
FROM pg_prepared_xacts FROM pg_prepared_xacts
WHERE gid LIKE 'citus_0_1234_4_0_%' WHERE gid LIKE 'citus_0_1234_4_0_%'