Preliminary add "citus.maintenance_connection_pool_timeout"

pull/7286/head
ivyazmitinov 2024-05-09 16:20:33 +02:00
parent 46ec3fe4ca
commit 725bdce5bd
8 changed files with 85 additions and 15 deletions

View File

@ -51,7 +51,8 @@ typedef struct ConnectionStatsSharedData
char *sharedConnectionHashTrancheName;
LWLock sharedConnectionHashLock;
ConditionVariable waitersConditionVariable;
ConditionVariable regularConnectionWaitersConditionVariable;
ConditionVariable maintenanceConnectionWaitersConditionVariable;
} ConnectionStatsSharedData;
/*
@ -108,7 +109,8 @@ int MaxSharedPoolSize = 0;
* Pool size for maintenance connections exclusively
* "0" or "-1" means do not apply connection throttling
*/
int MaxMaintenanceSharedPoolSize = 5;
int MaxMaintenanceSharedPoolSize = -1;
int MaintenanceConnectionPoolTimeout = 30 * MS_PER_SECOND;
/*
* Controlled via a GUC, never access directly, use GetLocalSharedPoolSize().
@ -306,7 +308,7 @@ WaitLoopForSharedConnection(uint32 flags, const char *hostname, int port)
{
CHECK_FOR_INTERRUPTS();
WaitForSharedConnection();
WaitForSharedConnection(flags);
}
ConditionVariableCancelSleep();
@ -537,7 +539,7 @@ DecrementSharedConnectionCounter(uint32 externalFlags, const char *hostname, int
DecrementSharedConnectionCounterInternal(externalFlags, hostname, port, MyDatabaseId);
UnLockConnectionSharedMemory();
WakeupWaiterBackendsForSharedConnection();
WakeupWaiterBackendsForSharedConnection(externalFlags);
}
@ -671,9 +673,18 @@ UnLockConnectionSharedMemory(void)
* this function.
*/
void
WakeupWaiterBackendsForSharedConnection(void)
WakeupWaiterBackendsForSharedConnection(uint32 flags)
{
ConditionVariableBroadcast(&ConnectionStatsSharedState->waitersConditionVariable);
if (flags & MAINTENANCE_CONNECTION)
{
ConditionVariableBroadcast(
&ConnectionStatsSharedState->maintenanceConnectionWaitersConditionVariable);
}
else
{
ConditionVariableBroadcast(
&ConnectionStatsSharedState->regularConnectionWaitersConditionVariable);
}
}
@ -684,10 +695,28 @@ WakeupWaiterBackendsForSharedConnection(void)
* WakeupWaiterBackendsForSharedConnection().
*/
void
WaitForSharedConnection(void)
WaitForSharedConnection(uint32 flags)
{
ConditionVariableSleep(&ConnectionStatsSharedState->waitersConditionVariable,
PG_WAIT_EXTENSION);
if (flags & MAINTENANCE_CONNECTION)
{
bool connectionSlotNotAcquired = ConditionVariableTimedSleep(
&ConnectionStatsSharedState->maintenanceConnectionWaitersConditionVariable,
MaintenanceConnectionPoolTimeout,
PG_WAIT_EXTENSION);
if (connectionSlotNotAcquired)
{
ereport(ERROR, (errmsg("Failed to acquire maintenance connection for %i ms",
MaintenanceConnectionPoolTimeout),
errhint(
"Try to increase citus.maintenance_connection_pool_timeout")));
}
}
else
{
ConditionVariableSleep(
&ConnectionStatsSharedState->regularConnectionWaitersConditionVariable,
PG_WAIT_EXTENSION);
}
}
@ -774,7 +803,10 @@ SharedConnectionStatsShmemInit(void)
LWLockInitialize(&ConnectionStatsSharedState->sharedConnectionHashLock,
ConnectionStatsSharedState->sharedConnectionHashTrancheId);
ConditionVariableInit(&ConnectionStatsSharedState->waitersConditionVariable);
ConditionVariableInit(
&ConnectionStatsSharedState->regularConnectionWaitersConditionVariable);
ConditionVariableInit(
&ConnectionStatsSharedState->maintenanceConnectionWaitersConditionVariable);
}
/* allocate hash tables */

View File

@ -1844,6 +1844,19 @@ RegisterCitusConfigVariables(void)
GUC_STANDARD,
NULL, NULL, NULL);
DefineCustomIntVariable(
"citus.maintenance_connection_pool_timeout",
gettext_noop(
"Timeout for acquiring a connection from a maintenance shared pool size. "
"Applicable only when the maintenance pool is enabled via citus.max_maintenance_shared_pool_size. "
"Setting it to 0 or -1 disables the timeout"),
NULL,
&MaintenanceConnectionPoolTimeout,
30 * MS_PER_SECOND, -1, MS_PER_HOUR,
PGC_SIGHUP,
GUC_SUPERUSER_ONLY,
NULL, NULL, NULL);
DefineCustomIntVariable(
"citus.max_adaptive_executor_pool_size",
gettext_noop("Sets the maximum number of connections per worker node used by "
@ -1975,11 +1988,11 @@ RegisterCitusConfigVariables(void)
DefineCustomIntVariable(
"citus.max_maintenance_shared_pool_size",
gettext_noop("Similar to citus.max_shared_pool_size, but applies to connections "
"for maintenance operations only."
"for maintenance operations only. "
"Setting it to 0 or -1 disables maintenance connection throttling."),
NULL,
&MaxMaintenanceSharedPoolSize,
5, -1, INT_MAX,
-1, -1, INT_MAX,
PGC_SIGHUP,
GUC_SUPERUSER_ONLY,
NULL, NULL, NULL);

View File

@ -33,7 +33,8 @@ PG_FUNCTION_INFO_V1(set_max_shared_pool_size);
Datum
wake_up_connection_pool_waiters(PG_FUNCTION_ARGS)
{
WakeupWaiterBackendsForSharedConnection();
WakeupWaiterBackendsForSharedConnection(0);
WakeupWaiterBackendsForSharedConnection(MAINTENANCE_CONNECTION);
PG_RETURN_VOID();
}

View File

@ -26,13 +26,14 @@ enum SharedPoolCounterMode
extern int MaxSharedPoolSize;
extern int MaxMaintenanceSharedPoolSize;
extern int MaintenanceConnectionPoolTimeout;
extern int LocalSharedPoolSize;
extern int MaxClientConnections;
extern void InitializeSharedConnectionStats(void);
extern void WaitForSharedConnection(void);
extern void WakeupWaiterBackendsForSharedConnection(void);
extern void WaitForSharedConnection(uint32);
extern void WakeupWaiterBackendsForSharedConnection(uint32);
extern size_t SharedConnectionStatsShmemSize(void);
extern void SharedConnectionStatsShmemInit(void);
extern int GetMaxClientConnections(void);

View File

@ -9,6 +9,7 @@ SELECT $definition$
\gset
SELECT $deinition$
ALTER SYSTEM SET citus.recover_2pc_interval TO '5s';
ALTER SYSTEM SET citus.max_maintenance_shared_pool_size = 10;
ALTER SYSTEM RESET citus.distributed_deadlock_detection_factor;
SELECT pg_reload_conf();
$deinition$ AS turn_on_maintenance
@ -374,6 +375,7 @@ $$;
SELECT $definition$
ALTER SYSTEM RESET citus.recover_2pc_interval;
ALTER SYSTEM RESET citus.distributed_deadlock_detection_factor;
ALTER SYSTEM RESET citus.max_maintenance_shared_pool_size;
SELECT pg_reload_conf();
DO

View File

@ -221,6 +221,7 @@ test: multi_create_shards
test: multi_transaction_recovery
test: multi_transaction_recovery_multiple_databases
test: multi_maintenance_multiple_databases
#test: maintenance_connection_timeout
test: local_dist_join_modifications
test: local_table_join

View File

@ -0,0 +1,18 @@
-- Create a new database
ALTER SYSTEM SET citus.max_maintenance_shared_pool_size = 1;
SELECT pg_reload_conf();
set citus.enable_create_database_propagation to on;
CREATE DATABASE role_operations_test_db;
SET citus.superuser TO 'postgres';
-- Connect to the new database
\c role_operations_test_db
CREATE ROLE test_role1;
\c regression - - :master_port
-- Clean up: drop the database
set citus.enable_create_database_propagation to on;
DROP DATABASE role_operations_test_db;
reset citus.enable_create_database_propagation;
ALTER SYSTEM SET citus.max_maintenance_shared_pool_size = 1;
SELECT pg_reload_conf();

View File

@ -11,6 +11,7 @@ SELECT $definition$
SELECT $deinition$
ALTER SYSTEM SET citus.recover_2pc_interval TO '5s';
ALTER SYSTEM SET citus.max_maintenance_shared_pool_size = 10;
ALTER SYSTEM RESET citus.distributed_deadlock_detection_factor;
SELECT pg_reload_conf();
$deinition$ AS turn_on_maintenance
@ -327,6 +328,7 @@ $$;
SELECT $definition$
ALTER SYSTEM RESET citus.recover_2pc_interval;
ALTER SYSTEM RESET citus.distributed_deadlock_detection_factor;
ALTER SYSTEM RESET citus.max_maintenance_shared_pool_size;
SELECT pg_reload_conf();
DO