Support for maintenance quota

pull/7286/head
ivyazmitinov 2023-10-13 22:37:20 +07:00
parent 0bb5876a87
commit 4d775ab361
4 changed files with 63 additions and 42 deletions

View File

@ -327,7 +327,7 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port,
*/
ConnectionHashEntry *entry = hash_search(ConnectionHash, &key, HASH_ENTER, &found);
if (!found || !entry->isValid)
if (!(found && entry->isValid))
{
/*
* We are just building hash entry or previously it was left in an
@ -377,11 +377,11 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port,
/* these two flags are by nature cannot happen at the same time */
Assert(!((flags & WAIT_FOR_CONNECTION) && (flags & OPTIONAL_CONNECTION)));
int sharedCounterFlags = (flags & REQUIRE_MAINTENANCE_CONNECTION)
? MAINTENANCE_CONNECTION
: 0;
if (flags & WAIT_FOR_CONNECTION)
{
int sharedCounterFlags = (flags & REQUIRE_MAINTENANCE_CONNECTION)
? MAINTENANCE_CONNECTION_POOL
: 0;
WaitLoopForSharedConnection(sharedCounterFlags, hostname, port);
}
else if (flags & OPTIONAL_CONNECTION)
@ -392,7 +392,6 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port,
* cannot reserve the right to establish a connection, we prefer to
* error out.
*/
int sharedCounterFlags = 0;
if (!TryToIncrementSharedConnectionCounter(sharedCounterFlags, hostname, port))
{
/* do not track the connection anymore */
@ -413,7 +412,6 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port,
*
* Still, we keep track of the connection counter.
*/
int sharedCounterFlags = 0;
IncrementSharedConnectionCounter(sharedCounterFlags, hostname, port);
}

View File

@ -35,7 +35,7 @@
#include "distributed/worker_manager.h"
#include "distributed/tuplestore.h"
#include "distributed/worker_manager.h"
#include "math.h"
#define REMOTE_CONNECTION_STATS_COLUMNS 4
@ -104,9 +104,11 @@ typedef struct SharedWorkerNodeDatabaseConnStatsHashEntry
int MaxSharedPoolSize = 0;
/*
*
* Controlled via a GUC, never access directly, use GetSharedPoolSizeMaintenanceQuota().
* Percent of MaxSharedPoolSize reserved for maintenance operations.
* "0" effectively means that regular and maintenance connection will compete over the common pool
*/
float SharedPoolSizeMaintenancePercent = 10.0f;
double SharedPoolSizeMaintenanceQuota = 0.1;
/*
* Controlled via a GUC, never access directly, use GetLocalSharedPoolSize().
@ -140,7 +142,7 @@ static int SharedConnectionHashCompare(const void *a, const void *b, Size keysiz
static uint32 SharedWorkerNodeDatabaseHashHash(const void *key, Size keysize);
static int SharedWorkerNodeDatabaseHashCompare(const void *a, const void *b, Size keysize);
static bool IsConnectionToLocalNode(SharedWorkerNodeConnStatsHashKey *connKey);
static bool isConnectionSlotAvailable(SharedWorkerNodeConnStatsHashKey *connKey,
static bool isConnectionSlotAvailable(uint32 flags, SharedWorkerNodeConnStatsHashKey *connKey,
const SharedWorkerNodeConnStatsHashEntry *connectionEntry);
static bool
IncrementSharedConnectionCounterInternal(uint32 externalFlags, bool checkLimits, const char *hostname, int port,
@ -256,10 +258,10 @@ GetMaxSharedPoolSize(void)
return MaxSharedPoolSize;
}
float
GetSharedPoolSizeMaintenancePercent(void)
double
GetSharedPoolSizeMaintenanceQuota(void)
{
return SharedPoolSizeMaintenancePercent;
return SharedPoolSizeMaintenanceQuota;
}
@ -352,29 +354,6 @@ IncrementSharedConnectionCounter(uint32 flags, const char *hostname, int port)
IncrementSharedConnectionCounterInternal(flags, false, hostname, port, MyDatabaseId);
}
static SharedWorkerNodeConnStatsHashKey PrepareWorkerNodeHashKey(const char *hostname, int port)
{
SharedWorkerNodeConnStatsHashKey key;
strlcpy(key.hostname, hostname, MAX_NODE_LENGTH);
if (strlen(hostname) > MAX_NODE_LENGTH)
{
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("hostname exceeds the maximum length of %d",
MAX_NODE_LENGTH)));
}
key.port = port;
return key;
}
static SharedWorkerNodeDatabaseConnStatsHashKey PrepareWorkerNodeDatabaseHashKey(const char *hostname,
int port,
Oid database)
{
SharedWorkerNodeDatabaseConnStatsHashKey workerNodeDatabaseKey;
workerNodeDatabaseKey.workerNodeKey = PrepareWorkerNodeHashKey(hostname, port);
workerNodeDatabaseKey.database = database;
return workerNodeDatabaseKey;
}
static bool
IncrementSharedConnectionCounterInternal(uint32 externalFlags,
@ -437,10 +416,13 @@ IncrementSharedConnectionCounterInternal(uint32 externalFlags,
workerNodeDatabaseEntry->count = 0;
}
/* Increment counter if possible */
/* Increment counter if a slot available */
bool connectionSlotAvailable = true;
connectionSlotAvailable = !checkLimits ||
isConnectionSlotAvailable(&workerNodeKey, workerNodeConnectionEntry);
connectionSlotAvailable =
!checkLimits ||
isConnectionSlotAvailable(externalFlags,
&workerNodeKey,
workerNodeConnectionEntry);
if (connectionSlotAvailable)
{
@ -460,11 +442,19 @@ static bool IsConnectionToLocalNode(SharedWorkerNodeConnStatsHashKey *connKey)
}
static bool isConnectionSlotAvailable(SharedWorkerNodeConnStatsHashKey *connKey,
static bool isConnectionSlotAvailable(uint32 flags,
SharedWorkerNodeConnStatsHashKey *connKey,
const SharedWorkerNodeConnStatsHashEntry *connectionEntry)
{
bool connectionSlotAvailable = true;
bool connectionToLocalNode = IsConnectionToLocalNode(connKey);
/*
* Use full capacity for maintenance connections,
*/
int maintenanceConnectionsQuota =
(flags & MAINTENANCE_CONNECTION)
? 0
: (int) floor((double) GetMaxSharedPoolSize() * GetSharedPoolSizeMaintenanceQuota());
if (connectionToLocalNode)
{
bool remoteConnectionsForLocalQueriesDisabled =
@ -489,7 +479,7 @@ static bool isConnectionSlotAvailable(SharedWorkerNodeConnStatsHashKey *connKey,
connectionSlotAvailable = false;
}
}
else if (connectionEntry->count + 1 > GetMaxSharedPoolSize())
else if (connectionEntry->count + 1 > (GetMaxSharedPoolSize() - maintenanceConnectionsQuota))
{
connectionSlotAvailable = false;
}
@ -862,6 +852,30 @@ ShouldWaitForConnection(int currentConnectionCount)
return false;
}
static SharedWorkerNodeConnStatsHashKey PrepareWorkerNodeHashKey(const char *hostname, int port)
{
SharedWorkerNodeConnStatsHashKey key;
strlcpy(key.hostname, hostname, MAX_NODE_LENGTH);
if (strlen(hostname) > MAX_NODE_LENGTH)
{
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("hostname exceeds the maximum length of %d",
MAX_NODE_LENGTH)));
}
key.port = port;
return key;
}
static SharedWorkerNodeDatabaseConnStatsHashKey PrepareWorkerNodeDatabaseHashKey(const char *hostname,
int port,
Oid database)
{
SharedWorkerNodeDatabaseConnStatsHashKey workerNodeDatabaseKey;
workerNodeDatabaseKey.workerNodeKey = PrepareWorkerNodeHashKey(hostname, port);
workerNodeDatabaseKey.database = database;
return workerNodeDatabaseKey;
}
static uint32
SharedConnectionHashHash(const void *key, Size keysize)

View File

@ -125,6 +125,11 @@ enum MultiConnectionMode
*/
REQUIRE_REPLICATION_CONNECTION_PARAM = 1 << 8,
/*
* This flag specifies that connection is required for maintenance operations, e.g.
* transaction recovery, distributed deadlock detection. Such connections may have
* special treatment, like dedicated share of pool, etc.
*/
REQUIRE_MAINTENANCE_CONNECTION = 1 << 9
};

View File

@ -18,7 +18,10 @@
enum SharedPoolCounterMode
{
MAINTENANCE_CONNECTION_POOL = 1 << 0
/*
* Use this flag to reserve a connection from a maintenance quota
*/
MAINTENANCE_CONNECTION = 1 << 0
};
extern int MaxSharedPoolSize;
@ -34,6 +37,7 @@ extern size_t SharedConnectionStatsShmemSize(void);
extern void SharedConnectionStatsShmemInit(void);
extern int GetMaxClientConnections(void);
extern int GetMaxSharedPoolSize(void);
extern double GetSharedPoolSizeMaintenanceQuota(void);
extern int GetLocalSharedPoolSize(void);
extern bool TryToIncrementSharedConnectionCounter(uint32 flags, const char *hostname, int port);
extern void WaitLoopForSharedConnection(uint32 flags, const char *hostname, int port);