Add the GUC for MaxSharedPoolSize

preventConflictingFlags
Onder Kalaci 2020-03-26 12:29:45 +01:00
parent 34dec931f4
commit bed023c03d
3 changed files with 72 additions and 2 deletions

View File

@ -84,7 +84,14 @@ typedef struct SharedConnStatsHashEntry
} SharedConnStatsHashEntry;
static int MaxSharedPoolSize = 100;
/*
* Controlled via a GUC, never access directly, use GetMaxSharedPoolSize().
* "0" means adjust MaxSharedPoolSize automatically by using 2 * MaxConnections.
* "-1" means do not apply connection throttling
* Anything else means use that number
*/
int MaxSharedPoolSize = 0;
/* the following two structs used for accessing shared memory */
static HTAB *SharedConnStatsHash = NULL;
@ -173,6 +180,25 @@ StoreAllConnections(Tuplestorestate *tupleStore, TupleDesc tupleDescriptor)
}
/*
* GetMaxSharedPoolSize is a wrapper around MaxSharedPoolSize which is controlled
* via a GUC.
* "0" means adjust MaxSharedPoolSize automatically by using 2 * MaxConnections
* "-1" means do not apply connection throttling
* Anything else means use that number
*/
int
GetMaxSharedPoolSize(void)
{
if (MaxSharedPoolSize == 0)
{
return 2 * MaxConnections;
}
return MaxSharedPoolSize;
}
/*
* WaitOrErrorForSharedConnection tries to increment the shared connection
* counter for the given hostname/port and the current database in
@ -286,7 +312,7 @@ TryToIncrementSharedConnectionCounter(const char *hostname, int port)
counterIncremented = true;
}
else if (connectionEntry->connectionCount + 1 >= MaxSharedPoolSize)
else if (connectionEntry->connectionCount + 1 >= GetMaxSharedPoolSize())
{
/* there is no space left for this connection */
counterIncremented = false;

View File

@ -98,6 +98,7 @@ static bool ErrorIfNotASuitableDeadlockFactor(double *newval, void **extra,
static bool WarnIfDeprecatedExecutorUsed(int *newval, void **extra, GucSource source);
static bool NodeConninfoGucCheckHook(char **newval, void **extra, GucSource source);
static void NodeConninfoGucAssignHook(const char *newval, void *extra);
static const char * MaxSharedPoolSizeGucShowHook(void);
static bool StatisticsCollectionGucCheckHook(bool *newval, void **extra, GucSource
source);
@ -929,6 +930,23 @@ RegisterCitusConfigVariables(void)
GUC_STANDARD,
NULL, NULL, NULL);
DefineCustomIntVariable(
"citus.max_shared_pool_size",
gettext_noop("Sets the maximum number of connections allowed per worker node "
"across all the backends from this node. Setting to -1 disables "
"connections throttling. Setting to 0 makes it auto-adjust, meaning"
"2 * max_connections."),
gettext_noop("For single coordinator setups, the value should be adjusted "
"to match the max_connections on the remote nodes. For Citus MX, "
"the value should be tuned such that it is roughly "
"(max_connections / #workerNodes) so that each worker doesn't "
"become overwhelmed by too many incoming connections"),
&MaxSharedPoolSize,
0, -1, INT_MAX,
PGC_SIGHUP,
GUC_STANDARD,
NULL, NULL, MaxSharedPoolSizeGucShowHook);
DefineCustomIntVariable(
"citus.max_worker_nodes_tracked",
gettext_noop("Sets the maximum number of worker nodes that are tracked."),
@ -1516,6 +1534,28 @@ NodeConninfoGucAssignHook(const char *newval, void *extra)
}
/*
* MaxSharedPoolSizeGucShowHook overrides the value that is shown to the
* user when the default value has not been set.
*/
static const char *
MaxSharedPoolSizeGucShowHook(void)
{
StringInfo newvalue = makeStringInfo();
if (MaxSharedPoolSize == 0)
{
appendStringInfo(newvalue, "%d", GetMaxSharedPoolSize());
}
else
{
appendStringInfo(newvalue, "%d", MaxSharedPoolSize);
}
return (const char *) newvalue->data;
}
static bool
StatisticsCollectionGucCheckHook(bool *newval, void **extra, GucSource source)
{

View File

@ -11,7 +11,11 @@
#ifndef SHARED_CONNECTION_STATS_H
#define SHARED_CONNECTION_STATS_H
extern int MaxSharedPoolSize;
extern void InitializeSharedConnectionStats(void);
extern int GetMaxSharedPoolSize(void);
extern bool TryToIncrementSharedConnectionCounter(const char *hostname, int port);
extern void WaitOrErrorForSharedConnection(const char *hostname, int port);
extern void DecrementSharedConnectionCounter(const char *hostname, int port);