From 4d775ab3616d3357ae07bb30c292f4d0a6ee1a70 Mon Sep 17 00:00:00 2001 From: ivyazmitinov Date: Fri, 13 Oct 2023 22:37:20 +0700 Subject: [PATCH] Support for maintenance quota --- .../connection/connection_management.c | 10 +-- .../connection/shared_connection_stats.c | 84 +++++++++++-------- .../distributed/connection_management.h | 5 ++ .../distributed/shared_connection_stats.h | 6 +- 4 files changed, 63 insertions(+), 42 deletions(-) diff --git a/src/backend/distributed/connection/connection_management.c b/src/backend/distributed/connection/connection_management.c index 96e750059..6066ef4bf 100644 --- a/src/backend/distributed/connection/connection_management.c +++ b/src/backend/distributed/connection/connection_management.c @@ -327,7 +327,7 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port, */ ConnectionHashEntry *entry = hash_search(ConnectionHash, &key, HASH_ENTER, &found); - if (!found || !entry->isValid) + if (!(found && entry->isValid)) { /* * We are just building hash entry or previously it was left in an @@ -377,11 +377,11 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port, /* these two flags are by nature cannot happen at the same time */ Assert(!((flags & WAIT_FOR_CONNECTION) && (flags & OPTIONAL_CONNECTION))); + int sharedCounterFlags = (flags & REQUIRE_MAINTENANCE_CONNECTION) + ? MAINTENANCE_CONNECTION + : 0; if (flags & WAIT_FOR_CONNECTION) { - int sharedCounterFlags = (flags & REQUIRE_MAINTENANCE_CONNECTION) - ? MAINTENANCE_CONNECTION_POOL - : 0; WaitLoopForSharedConnection(sharedCounterFlags, hostname, port); } else if (flags & OPTIONAL_CONNECTION) @@ -392,7 +392,6 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port, * cannot reserve the right to establish a connection, we prefer to * error out. */ - int sharedCounterFlags = 0; if (!TryToIncrementSharedConnectionCounter(sharedCounterFlags, hostname, port)) { /* do not track the connection anymore */ @@ -413,7 +412,6 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port, * * Still, we keep track of the connection counter. */ - int sharedCounterFlags = 0; IncrementSharedConnectionCounter(sharedCounterFlags, hostname, port); } diff --git a/src/backend/distributed/connection/shared_connection_stats.c b/src/backend/distributed/connection/shared_connection_stats.c index 359d76b5a..4ea94abdb 100644 --- a/src/backend/distributed/connection/shared_connection_stats.c +++ b/src/backend/distributed/connection/shared_connection_stats.c @@ -35,7 +35,7 @@ #include "distributed/worker_manager.h" #include "distributed/tuplestore.h" #include "distributed/worker_manager.h" - +#include "math.h" #define REMOTE_CONNECTION_STATS_COLUMNS 4 @@ -104,9 +104,11 @@ typedef struct SharedWorkerNodeDatabaseConnStatsHashEntry int MaxSharedPoolSize = 0; /* - * + * Controlled via a GUC, never access directly, use GetSharedPoolSizeMaintenanceQuota(). + * Percent of MaxSharedPoolSize reserved for maintenance operations. + * "0" effectively means that regular and maintenance connection will compete over the common pool */ -float SharedPoolSizeMaintenancePercent = 10.0f; +double SharedPoolSizeMaintenanceQuota = 0.1; /* * Controlled via a GUC, never access directly, use GetLocalSharedPoolSize(). @@ -140,7 +142,7 @@ static int SharedConnectionHashCompare(const void *a, const void *b, Size keysiz static uint32 SharedWorkerNodeDatabaseHashHash(const void *key, Size keysize); static int SharedWorkerNodeDatabaseHashCompare(const void *a, const void *b, Size keysize); static bool IsConnectionToLocalNode(SharedWorkerNodeConnStatsHashKey *connKey); -static bool isConnectionSlotAvailable(SharedWorkerNodeConnStatsHashKey *connKey, +static bool isConnectionSlotAvailable(uint32 flags, SharedWorkerNodeConnStatsHashKey *connKey, const SharedWorkerNodeConnStatsHashEntry *connectionEntry); static bool IncrementSharedConnectionCounterInternal(uint32 externalFlags, bool checkLimits, const char *hostname, int port, @@ -256,10 +258,10 @@ GetMaxSharedPoolSize(void) return MaxSharedPoolSize; } -float -GetSharedPoolSizeMaintenancePercent(void) +double +GetSharedPoolSizeMaintenanceQuota(void) { - return SharedPoolSizeMaintenancePercent; + return SharedPoolSizeMaintenanceQuota; } @@ -352,29 +354,6 @@ IncrementSharedConnectionCounter(uint32 flags, const char *hostname, int port) IncrementSharedConnectionCounterInternal(flags, false, hostname, port, MyDatabaseId); } -static SharedWorkerNodeConnStatsHashKey PrepareWorkerNodeHashKey(const char *hostname, int port) -{ - SharedWorkerNodeConnStatsHashKey key; - strlcpy(key.hostname, hostname, MAX_NODE_LENGTH); - if (strlen(hostname) > MAX_NODE_LENGTH) - { - ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("hostname exceeds the maximum length of %d", - MAX_NODE_LENGTH))); - } - key.port = port; - return key; -} - -static SharedWorkerNodeDatabaseConnStatsHashKey PrepareWorkerNodeDatabaseHashKey(const char *hostname, - int port, - Oid database) -{ - SharedWorkerNodeDatabaseConnStatsHashKey workerNodeDatabaseKey; - workerNodeDatabaseKey.workerNodeKey = PrepareWorkerNodeHashKey(hostname, port); - workerNodeDatabaseKey.database = database; - return workerNodeDatabaseKey; -} static bool IncrementSharedConnectionCounterInternal(uint32 externalFlags, @@ -437,10 +416,13 @@ IncrementSharedConnectionCounterInternal(uint32 externalFlags, workerNodeDatabaseEntry->count = 0; } - /* Increment counter if possible */ + /* Increment counter if a slot available */ bool connectionSlotAvailable = true; - connectionSlotAvailable = !checkLimits || - isConnectionSlotAvailable(&workerNodeKey, workerNodeConnectionEntry); + connectionSlotAvailable = + !checkLimits || + isConnectionSlotAvailable(externalFlags, + &workerNodeKey, + workerNodeConnectionEntry); if (connectionSlotAvailable) { @@ -460,11 +442,19 @@ static bool IsConnectionToLocalNode(SharedWorkerNodeConnStatsHashKey *connKey) } -static bool isConnectionSlotAvailable(SharedWorkerNodeConnStatsHashKey *connKey, +static bool isConnectionSlotAvailable(uint32 flags, + SharedWorkerNodeConnStatsHashKey *connKey, const SharedWorkerNodeConnStatsHashEntry *connectionEntry) { bool connectionSlotAvailable = true; bool connectionToLocalNode = IsConnectionToLocalNode(connKey); + /* + * Use full capacity for maintenance connections, + */ + int maintenanceConnectionsQuota = + (flags & MAINTENANCE_CONNECTION) + ? 0 + : (int) floor((double) GetMaxSharedPoolSize() * GetSharedPoolSizeMaintenanceQuota()); if (connectionToLocalNode) { bool remoteConnectionsForLocalQueriesDisabled = @@ -489,7 +479,7 @@ static bool isConnectionSlotAvailable(SharedWorkerNodeConnStatsHashKey *connKey, connectionSlotAvailable = false; } } - else if (connectionEntry->count + 1 > GetMaxSharedPoolSize()) + else if (connectionEntry->count + 1 > (GetMaxSharedPoolSize() - maintenanceConnectionsQuota)) { connectionSlotAvailable = false; } @@ -862,6 +852,30 @@ ShouldWaitForConnection(int currentConnectionCount) return false; } +static SharedWorkerNodeConnStatsHashKey PrepareWorkerNodeHashKey(const char *hostname, int port) +{ + SharedWorkerNodeConnStatsHashKey key; + strlcpy(key.hostname, hostname, MAX_NODE_LENGTH); + if (strlen(hostname) > MAX_NODE_LENGTH) + { + ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("hostname exceeds the maximum length of %d", + MAX_NODE_LENGTH))); + } + key.port = port; + return key; +} + +static SharedWorkerNodeDatabaseConnStatsHashKey PrepareWorkerNodeDatabaseHashKey(const char *hostname, + int port, + Oid database) +{ + SharedWorkerNodeDatabaseConnStatsHashKey workerNodeDatabaseKey; + workerNodeDatabaseKey.workerNodeKey = PrepareWorkerNodeHashKey(hostname, port); + workerNodeDatabaseKey.database = database; + return workerNodeDatabaseKey; +} + static uint32 SharedConnectionHashHash(const void *key, Size keysize) diff --git a/src/include/distributed/connection_management.h b/src/include/distributed/connection_management.h index 4f6c3d82f..487b5f12f 100644 --- a/src/include/distributed/connection_management.h +++ b/src/include/distributed/connection_management.h @@ -125,6 +125,11 @@ enum MultiConnectionMode */ REQUIRE_REPLICATION_CONNECTION_PARAM = 1 << 8, + /* + * This flag specifies that connection is required for maintenance operations, e.g. + * transaction recovery, distributed deadlock detection. Such connections may have + * special treatment, like dedicated share of pool, etc. + */ REQUIRE_MAINTENANCE_CONNECTION = 1 << 9 }; diff --git a/src/include/distributed/shared_connection_stats.h b/src/include/distributed/shared_connection_stats.h index b23dee905..33e1e31b4 100644 --- a/src/include/distributed/shared_connection_stats.h +++ b/src/include/distributed/shared_connection_stats.h @@ -18,7 +18,10 @@ enum SharedPoolCounterMode { - MAINTENANCE_CONNECTION_POOL = 1 << 0 + /* + * Use this flag to reserve a connection from a maintenance quota + */ + MAINTENANCE_CONNECTION = 1 << 0 }; extern int MaxSharedPoolSize; @@ -34,6 +37,7 @@ extern size_t SharedConnectionStatsShmemSize(void); extern void SharedConnectionStatsShmemInit(void); extern int GetMaxClientConnections(void); extern int GetMaxSharedPoolSize(void); +extern double GetSharedPoolSizeMaintenanceQuota(void); extern int GetLocalSharedPoolSize(void); extern bool TryToIncrementSharedConnectionCounter(uint32 flags, const char *hostname, int port); extern void WaitLoopForSharedConnection(uint32 flags, const char *hostname, int port);