diff --git a/src/backend/distributed/connection/locally_reserved_shared_connections.c b/src/backend/distributed/connection/locally_reserved_shared_connections.c index dc31ad080..023c9b7c0 100644 --- a/src/backend/distributed/connection/locally_reserved_shared_connections.c +++ b/src/backend/distributed/connection/locally_reserved_shared_connections.c @@ -481,6 +481,7 @@ EnsureConnectionPossibilityForNode(WorkerNode *workerNode, bool waitForConnectio bool IsReservationPossible(void) { + // TODO add check for maintenance connection if (GetMaxSharedPoolSize() == DISABLE_CONNECTION_THROTTLING) { /* connection throttling disabled */ diff --git a/src/backend/distributed/connection/shared_connection_stats.c b/src/backend/distributed/connection/shared_connection_stats.c index a2849caec..1cab33306 100644 --- a/src/backend/distributed/connection/shared_connection_stats.c +++ b/src/backend/distributed/connection/shared_connection_stats.c @@ -105,11 +105,11 @@ typedef struct SharedWorkerNodeDatabaseConnStatsHashEntry int MaxSharedPoolSize = 0; /* - * Controlled via a GUC, never access directly, use GetSharedPoolSizeMaintenanceQuota(). - * Percent of MaxSharedPoolSize reserved for maintenance operations. - * "0" effectively means that regular and maintenance connection will compete over the common pool + * Controlled via a GUC, never access directly, use GetMaxMaintenanceSharedPoolSize(). + * Pool size for maintenance connections exclusively + * "0" or "-1" means do not apply connection throttling */ -double SharedPoolSizeMaintenanceQuota = 0.1; +int MaxMaintenanceSharedPoolSize = 5; /* * Controlled via a GUC, never access directly, use GetLocalSharedPoolSize(). @@ -142,7 +142,7 @@ static uint32 SharedConnectionHashHash(const void *key, Size keysize); static int SharedConnectionHashCompare(const void *a, const void *b, Size keysize); static uint32 SharedWorkerNodeDatabaseHashHash(const void *key, Size keysize); static int SharedWorkerNodeDatabaseHashCompare(const void *a, const void *b, Size keysize); -static bool isConnectionThrottlingDisabled(); +static bool isConnectionThrottlingDisabled(uint32 externalFlags); static bool IncrementSharedConnectionCounterInternal(uint32 externalFlags, bool checkLimits, const char *hostname, int port, Oid database); @@ -257,10 +257,10 @@ GetMaxSharedPoolSize(void) return MaxSharedPoolSize; } -double -GetSharedPoolSizeMaintenanceQuota(void) +int +GetMaxMaintenanceSharedPoolSize(void) { - return SharedPoolSizeMaintenanceQuota; + return MaxMaintenanceSharedPoolSize; } @@ -315,7 +315,7 @@ WaitLoopForSharedConnection(uint32 flags, const char *hostname, int port) bool TryToIncrementSharedConnectionCounter(uint32 flags, const char *hostname, int port) { - if (isConnectionThrottlingDisabled()) + if (isConnectionThrottlingDisabled(flags)) { return true; } @@ -347,7 +347,7 @@ TryToIncrementSharedConnectionCounter(uint32 flags, const char *hostname, int po void IncrementSharedConnectionCounter(uint32 flags, const char *hostname, int port) { - if (isConnectionThrottlingDisabled()) + if (isConnectionThrottlingDisabled(flags)) { return; } @@ -430,29 +430,22 @@ IncrementSharedConnectionCounterInternal(uint32 externalFlags, { WorkerNode *workerNode = FindWorkerNode(hostname, port); bool connectionToLocalNode = workerNode && (workerNode->groupId == GetLocalGroupId()); - int currentConnectionsLimit = connectionToLocalNode - ? GetLocalSharedPoolSize() - : GetMaxSharedPoolSize(); + + int currentConnectionsLimit; int currentConnectionsCount; - - if (GetSharedPoolSizeMaintenanceQuota() > 0) + if (maintenanceConnection) { - int maintenanceQuota = (int) ceil((double) currentConnectionsLimit * GetSharedPoolSizeMaintenanceQuota()); - /* Connections limit should never go below 1 */ - currentConnectionsLimit = Max(maintenanceConnection - ? maintenanceQuota - : currentConnectionsLimit - maintenanceQuota, 1); - currentConnectionsCount = maintenanceConnection - ? workerNodeConnectionEntry->maintenanceConnectionsCount - : workerNodeConnectionEntry->regularConnectionsCount; - + currentConnectionsLimit = GetMaxMaintenanceSharedPoolSize(); + currentConnectionsCount = workerNodeConnectionEntry->maintenanceConnectionsCount; } else { - /* When maintenance quota disabled, all connections treated equally*/ - currentConnectionsCount = (workerNodeConnectionEntry->maintenanceConnectionsCount + - workerNodeConnectionEntry->regularConnectionsCount); + currentConnectionsLimit = connectionToLocalNode + ? GetLocalSharedPoolSize() + : GetMaxSharedPoolSize(); + currentConnectionsCount = workerNodeConnectionEntry->regularConnectionsCount; } + bool remoteNodeLimitExceeded = currentConnectionsCount + 1 > currentConnectionsLimit; /* @@ -519,7 +512,8 @@ IncrementSharedConnectionCounterInternal(uint32 externalFlags, void DecrementSharedConnectionCounter(uint32 externalFlags, const char *hostname, int port) { - if (isConnectionThrottlingDisabled()) + // TODO: possible bug, remove this check? + if (isConnectionThrottlingDisabled(externalFlags)) { return; } @@ -964,11 +958,14 @@ SharedWorkerNodeDatabaseHashCompare(const void *a, const void *b, Size keysize) ca->database != cb->database; } -static bool isConnectionThrottlingDisabled() +static bool isConnectionThrottlingDisabled(uint32 externalFlags) { + bool maintenanceConnection = externalFlags & MAINTENANCE_CONNECTION; /* * Do not call Get*PoolSize() functions here, since it may read from * the catalog and we may be in the process exit handler. */ - return MaxSharedPoolSize == DISABLE_CONNECTION_THROTTLING; + return maintenanceConnection + ? MaxMaintenanceSharedPoolSize <= 0 + : MaxSharedPoolSize == DISABLE_CONNECTION_THROTTLING; } diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index 3c9b2d97c..d0f8dfbd6 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -1993,18 +1993,17 @@ RegisterCitusConfigVariables(void) GUC_SUPERUSER_ONLY, NULL, NULL, MaxSharedPoolSizeGucShowHook); - DefineCustomRealVariable( - "citus.shared_pool_size_maintenance_quota", - gettext_noop("Sets the fraction of citus.max_shared_pool_size reserved " - "for maintenance operations only. " - "Setting it to 0 disables the quota. " - "This way the maintenance and regular connections will share the same pool"), + DefineCustomIntVariable( + "citus.max_maintenance_shared_pool_size", + gettext_noop("Similar to citus.max_shared_pool_size, but applies to connections " + "for maintenance operations only." + "Setting it to 0 or -1 disables maintenance connection throttling."), NULL, - &SharedPoolSizeMaintenanceQuota, - 0.1, 0, 1, + &MaxMaintenanceSharedPoolSize, + 5, -1, INT_MAX, PGC_SIGHUP, GUC_SUPERUSER_ONLY, - NULL, NULL, MaxSharedPoolSizeGucShowHook); + NULL, NULL, NULL); DefineCustomIntVariable( "citus.max_worker_nodes_tracked", diff --git a/src/include/distributed/shared_connection_stats.h b/src/include/distributed/shared_connection_stats.h index 5a0be6ea2..d5901014a 100644 --- a/src/include/distributed/shared_connection_stats.h +++ b/src/include/distributed/shared_connection_stats.h @@ -25,7 +25,7 @@ enum SharedPoolCounterMode }; extern int MaxSharedPoolSize; -extern double SharedPoolSizeMaintenanceQuota; +extern int MaxMaintenanceSharedPoolSize; extern int LocalSharedPoolSize; extern int MaxClientConnections; @@ -37,7 +37,7 @@ extern size_t SharedConnectionStatsShmemSize(void); extern void SharedConnectionStatsShmemInit(void); extern int GetMaxClientConnections(void); extern int GetMaxSharedPoolSize(void); -extern double GetSharedPoolSizeMaintenanceQuota(void); +extern int GetMaxMaintenanceSharedPoolSize(void); extern int GetLocalSharedPoolSize(void); extern bool TryToIncrementSharedConnectionCounter(uint32 flags, const char *hostname, int port); extern void WaitLoopForSharedConnection(uint32 flags, const char *hostname, int port); diff --git a/src/test/regress/expected/multi_maintenance_multiple_databases.out b/src/test/regress/expected/multi_maintenance_multiple_databases.out index 854807508..5013c3a84 100644 --- a/src/test/regress/expected/multi_maintenance_multiple_databases.out +++ b/src/test/regress/expected/multi_maintenance_multiple_databases.out @@ -263,6 +263,14 @@ SELECT pg_sleep_for('10 seconds'::interval); (1 row) -- Verify maintenance result +SELECT count(*) = 0 AS too_many_clients_test +FROM regexp_split_to_table(pg_read_file('../log/postmaster.log'), E'\n') AS t(log_line) +WHERE log_line LIKE '%sorry, too many clients already%'; + too_many_clients_test +--------------------------------------------------------------------- + t +(1 row) + SELECT count(*) = 0 AS pg_dist_transaction_after_recovery_coordinator_test FROM pg_database, dblink(format('dbname=%s host=localhost port=%s user=postgres', datname, @@ -289,6 +297,14 @@ WHERE state = 'idle' (1 row) \c - - - :worker_1_port +SELECT count(*) = 0 AS too_many_clients_test +FROM regexp_split_to_table(pg_read_file('../log/postmaster.log'), E'\n') AS t(log_line) +WHERE log_line LIKE '%sorry, too many clients already%'; + too_many_clients_test +--------------------------------------------------------------------- + t +(1 row) + SELECT count(*) = 0 AS pg_prepared_xacts_after_recover_worker_1_test FROM pg_prepared_xacts WHERE gid LIKE 'citus_0_1234_4_0_%' @@ -308,6 +324,14 @@ WHERE state = 'idle' (1 row) \c - - - :worker_2_port +SELECT count(*) = 0 AS too_many_clients_test +FROM regexp_split_to_table(pg_read_file('../log/postmaster.log'), E'\n') AS t(log_line) +WHERE log_line LIKE '%sorry, too many clients already%'; + too_many_clients_test +--------------------------------------------------------------------- + t +(1 row) + SELECT count(*) = 0 AS pg_prepared_xacts_after_recover_worker_2_test FROM pg_prepared_xacts WHERE gid LIKE 'citus_0_1234_4_0_%' diff --git a/src/test/regress/sql/multi_maintenance_multiple_databases.sql b/src/test/regress/sql/multi_maintenance_multiple_databases.sql index 25c26c738..20a6aa132 100644 --- a/src/test/regress/sql/multi_maintenance_multiple_databases.sql +++ b/src/test/regress/sql/multi_maintenance_multiple_databases.sql @@ -218,8 +218,6 @@ WHERE state = 'idle' :turn_on_maintenance - - \c - - - :master_port -- Let maintenance do it's work... @@ -228,6 +226,10 @@ SELECT pg_sleep_for('10 seconds'::interval); -- Verify maintenance result +SELECT count(*) = 0 AS too_many_clients_test +FROM regexp_split_to_table(pg_read_file('../log/postmaster.log'), E'\n') AS t(log_line) +WHERE log_line LIKE '%sorry, too many clients already%'; + SELECT count(*) = 0 AS pg_dist_transaction_after_recovery_coordinator_test FROM pg_database, dblink(format('dbname=%s host=localhost port=%s user=postgres', datname, @@ -247,6 +249,10 @@ WHERE state = 'idle' \c - - - :worker_1_port +SELECT count(*) = 0 AS too_many_clients_test +FROM regexp_split_to_table(pg_read_file('../log/postmaster.log'), E'\n') AS t(log_line) +WHERE log_line LIKE '%sorry, too many clients already%'; + SELECT count(*) = 0 AS pg_prepared_xacts_after_recover_worker_1_test FROM pg_prepared_xacts WHERE gid LIKE 'citus_0_1234_4_0_%' @@ -259,6 +265,10 @@ WHERE state = 'idle' \c - - - :worker_2_port +SELECT count(*) = 0 AS too_many_clients_test +FROM regexp_split_to_table(pg_read_file('../log/postmaster.log'), E'\n') AS t(log_line) +WHERE log_line LIKE '%sorry, too many clients already%'; + SELECT count(*) = 0 AS pg_prepared_xacts_after_recover_worker_2_test FROM pg_prepared_xacts WHERE gid LIKE 'citus_0_1234_4_0_%'