From 725bdce5bdbbc4cf0dbff2898c4fb171ac3377bb Mon Sep 17 00:00:00 2001 From: ivyazmitinov Date: Thu, 9 May 2024 16:20:33 +0200 Subject: [PATCH] Preliminary add "citus.maintenance_connection_pool_timeout" --- .../connection/shared_connection_stats.c | 52 +++++++++++++++---- src/backend/distributed/shared_library_init.c | 17 +++++- .../test/shared_connection_counters.c | 3 +- .../distributed/shared_connection_stats.h | 5 +- .../multi_maintenance_multiple_databases.out | 2 + src/test/regress/multi_1_schedule | 1 + .../sql/maintenance_connection_timeout.sql | 18 +++++++ .../multi_maintenance_multiple_databases.sql | 2 + 8 files changed, 85 insertions(+), 15 deletions(-) create mode 100644 src/test/regress/sql/maintenance_connection_timeout.sql diff --git a/src/backend/distributed/connection/shared_connection_stats.c b/src/backend/distributed/connection/shared_connection_stats.c index 4ad3f287e..8812f0e75 100644 --- a/src/backend/distributed/connection/shared_connection_stats.c +++ b/src/backend/distributed/connection/shared_connection_stats.c @@ -51,7 +51,8 @@ typedef struct ConnectionStatsSharedData char *sharedConnectionHashTrancheName; LWLock sharedConnectionHashLock; - ConditionVariable waitersConditionVariable; + ConditionVariable regularConnectionWaitersConditionVariable; + ConditionVariable maintenanceConnectionWaitersConditionVariable; } ConnectionStatsSharedData; /* @@ -108,7 +109,8 @@ int MaxSharedPoolSize = 0; * Pool size for maintenance connections exclusively * "0" or "-1" means do not apply connection throttling */ -int MaxMaintenanceSharedPoolSize = 5; +int MaxMaintenanceSharedPoolSize = -1; +int MaintenanceConnectionPoolTimeout = 30 * MS_PER_SECOND; /* * Controlled via a GUC, never access directly, use GetLocalSharedPoolSize(). @@ -306,7 +308,7 @@ WaitLoopForSharedConnection(uint32 flags, const char *hostname, int port) { CHECK_FOR_INTERRUPTS(); - WaitForSharedConnection(); + WaitForSharedConnection(flags); } ConditionVariableCancelSleep(); @@ -537,7 +539,7 @@ DecrementSharedConnectionCounter(uint32 externalFlags, const char *hostname, int DecrementSharedConnectionCounterInternal(externalFlags, hostname, port, MyDatabaseId); UnLockConnectionSharedMemory(); - WakeupWaiterBackendsForSharedConnection(); + WakeupWaiterBackendsForSharedConnection(externalFlags); } @@ -671,9 +673,18 @@ UnLockConnectionSharedMemory(void) * this function. */ void -WakeupWaiterBackendsForSharedConnection(void) +WakeupWaiterBackendsForSharedConnection(uint32 flags) { - ConditionVariableBroadcast(&ConnectionStatsSharedState->waitersConditionVariable); + if (flags & MAINTENANCE_CONNECTION) + { + ConditionVariableBroadcast( + &ConnectionStatsSharedState->maintenanceConnectionWaitersConditionVariable); + } + else + { + ConditionVariableBroadcast( + &ConnectionStatsSharedState->regularConnectionWaitersConditionVariable); + } } @@ -684,10 +695,28 @@ WakeupWaiterBackendsForSharedConnection(void) * WakeupWaiterBackendsForSharedConnection(). */ void -WaitForSharedConnection(void) +WaitForSharedConnection(uint32 flags) { - ConditionVariableSleep(&ConnectionStatsSharedState->waitersConditionVariable, - PG_WAIT_EXTENSION); + if (flags & MAINTENANCE_CONNECTION) + { + bool connectionSlotNotAcquired = ConditionVariableTimedSleep( + &ConnectionStatsSharedState->maintenanceConnectionWaitersConditionVariable, + MaintenanceConnectionPoolTimeout, + PG_WAIT_EXTENSION); + if (connectionSlotNotAcquired) + { + ereport(ERROR, (errmsg("Failed to acquire maintenance connection for %i ms", + MaintenanceConnectionPoolTimeout), + errhint( + "Try to increase citus.maintenance_connection_pool_timeout"))); + } + } + else + { + ConditionVariableSleep( + &ConnectionStatsSharedState->regularConnectionWaitersConditionVariable, + PG_WAIT_EXTENSION); + } } @@ -774,7 +803,10 @@ SharedConnectionStatsShmemInit(void) LWLockInitialize(&ConnectionStatsSharedState->sharedConnectionHashLock, ConnectionStatsSharedState->sharedConnectionHashTrancheId); - ConditionVariableInit(&ConnectionStatsSharedState->waitersConditionVariable); + ConditionVariableInit( + &ConnectionStatsSharedState->regularConnectionWaitersConditionVariable); + ConditionVariableInit( + &ConnectionStatsSharedState->maintenanceConnectionWaitersConditionVariable); } /* allocate hash tables */ diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index 2a9b73f20..0f1815a5c 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -1844,6 +1844,19 @@ RegisterCitusConfigVariables(void) GUC_STANDARD, NULL, NULL, NULL); + DefineCustomIntVariable( + "citus.maintenance_connection_pool_timeout", + gettext_noop( + "Timeout for acquiring a connection from a maintenance shared pool size. " + "Applicable only when the maintenance pool is enabled via citus.max_maintenance_shared_pool_size. " + "Setting it to 0 or -1 disables the timeout"), + NULL, + &MaintenanceConnectionPoolTimeout, + 30 * MS_PER_SECOND, -1, MS_PER_HOUR, + PGC_SIGHUP, + GUC_SUPERUSER_ONLY, + NULL, NULL, NULL); + DefineCustomIntVariable( "citus.max_adaptive_executor_pool_size", gettext_noop("Sets the maximum number of connections per worker node used by " @@ -1975,11 +1988,11 @@ RegisterCitusConfigVariables(void) DefineCustomIntVariable( "citus.max_maintenance_shared_pool_size", gettext_noop("Similar to citus.max_shared_pool_size, but applies to connections " - "for maintenance operations only." + "for maintenance operations only. " "Setting it to 0 or -1 disables maintenance connection throttling."), NULL, &MaxMaintenanceSharedPoolSize, - 5, -1, INT_MAX, + -1, -1, INT_MAX, PGC_SIGHUP, GUC_SUPERUSER_ONLY, NULL, NULL, NULL); diff --git a/src/backend/distributed/test/shared_connection_counters.c b/src/backend/distributed/test/shared_connection_counters.c index c59602887..917ffc19d 100644 --- a/src/backend/distributed/test/shared_connection_counters.c +++ b/src/backend/distributed/test/shared_connection_counters.c @@ -33,7 +33,8 @@ PG_FUNCTION_INFO_V1(set_max_shared_pool_size); Datum wake_up_connection_pool_waiters(PG_FUNCTION_ARGS) { - WakeupWaiterBackendsForSharedConnection(); + WakeupWaiterBackendsForSharedConnection(0); + WakeupWaiterBackendsForSharedConnection(MAINTENANCE_CONNECTION); PG_RETURN_VOID(); } diff --git a/src/include/distributed/shared_connection_stats.h b/src/include/distributed/shared_connection_stats.h index ea8346769..681e5bf5b 100644 --- a/src/include/distributed/shared_connection_stats.h +++ b/src/include/distributed/shared_connection_stats.h @@ -26,13 +26,14 @@ enum SharedPoolCounterMode extern int MaxSharedPoolSize; extern int MaxMaintenanceSharedPoolSize; +extern int MaintenanceConnectionPoolTimeout; extern int LocalSharedPoolSize; extern int MaxClientConnections; extern void InitializeSharedConnectionStats(void); -extern void WaitForSharedConnection(void); -extern void WakeupWaiterBackendsForSharedConnection(void); +extern void WaitForSharedConnection(uint32); +extern void WakeupWaiterBackendsForSharedConnection(uint32); extern size_t SharedConnectionStatsShmemSize(void); extern void SharedConnectionStatsShmemInit(void); extern int GetMaxClientConnections(void); diff --git a/src/test/regress/expected/multi_maintenance_multiple_databases.out b/src/test/regress/expected/multi_maintenance_multiple_databases.out index 04cb9c1a0..6c5a4c4d3 100644 --- a/src/test/regress/expected/multi_maintenance_multiple_databases.out +++ b/src/test/regress/expected/multi_maintenance_multiple_databases.out @@ -9,6 +9,7 @@ SELECT $definition$ \gset SELECT $deinition$ ALTER SYSTEM SET citus.recover_2pc_interval TO '5s'; +ALTER SYSTEM SET citus.max_maintenance_shared_pool_size = 10; ALTER SYSTEM RESET citus.distributed_deadlock_detection_factor; SELECT pg_reload_conf(); $deinition$ AS turn_on_maintenance @@ -374,6 +375,7 @@ $$; SELECT $definition$ ALTER SYSTEM RESET citus.recover_2pc_interval; ALTER SYSTEM RESET citus.distributed_deadlock_detection_factor; + ALTER SYSTEM RESET citus.max_maintenance_shared_pool_size; SELECT pg_reload_conf(); DO diff --git a/src/test/regress/multi_1_schedule b/src/test/regress/multi_1_schedule index cbd8ecf2f..50ddd3e7a 100644 --- a/src/test/regress/multi_1_schedule +++ b/src/test/regress/multi_1_schedule @@ -221,6 +221,7 @@ test: multi_create_shards test: multi_transaction_recovery test: multi_transaction_recovery_multiple_databases test: multi_maintenance_multiple_databases +#test: maintenance_connection_timeout test: local_dist_join_modifications test: local_table_join diff --git a/src/test/regress/sql/maintenance_connection_timeout.sql b/src/test/regress/sql/maintenance_connection_timeout.sql new file mode 100644 index 000000000..b293c0b95 --- /dev/null +++ b/src/test/regress/sql/maintenance_connection_timeout.sql @@ -0,0 +1,18 @@ +-- Create a new database +ALTER SYSTEM SET citus.max_maintenance_shared_pool_size = 1; +SELECT pg_reload_conf(); +set citus.enable_create_database_propagation to on; +CREATE DATABASE role_operations_test_db; +SET citus.superuser TO 'postgres'; +-- Connect to the new database +\c role_operations_test_db + +CREATE ROLE test_role1; + +\c regression - - :master_port +-- Clean up: drop the database +set citus.enable_create_database_propagation to on; +DROP DATABASE role_operations_test_db; +reset citus.enable_create_database_propagation; +ALTER SYSTEM SET citus.max_maintenance_shared_pool_size = 1; +SELECT pg_reload_conf(); diff --git a/src/test/regress/sql/multi_maintenance_multiple_databases.sql b/src/test/regress/sql/multi_maintenance_multiple_databases.sql index 4c29c6bc9..bfcf6d788 100644 --- a/src/test/regress/sql/multi_maintenance_multiple_databases.sql +++ b/src/test/regress/sql/multi_maintenance_multiple_databases.sql @@ -11,6 +11,7 @@ SELECT $definition$ SELECT $deinition$ ALTER SYSTEM SET citus.recover_2pc_interval TO '5s'; +ALTER SYSTEM SET citus.max_maintenance_shared_pool_size = 10; ALTER SYSTEM RESET citus.distributed_deadlock_detection_factor; SELECT pg_reload_conf(); $deinition$ AS turn_on_maintenance @@ -327,6 +328,7 @@ $$; SELECT $definition$ ALTER SYSTEM RESET citus.recover_2pc_interval; ALTER SYSTEM RESET citus.distributed_deadlock_detection_factor; + ALTER SYSTEM RESET citus.max_maintenance_shared_pool_size; SELECT pg_reload_conf(); DO