From 23e6b36e23012d66e67cd351f25a25cddff0df45 Mon Sep 17 00:00:00 2001 From: ivyazmitinov Date: Fri, 13 Oct 2023 21:34:55 +0700 Subject: [PATCH] - Separate HTAB for connection management and statistics. - Refactoring --- .../connection/connection_management.c | 11 +- .../locally_reserved_shared_connections.c | 11 +- .../connection/shared_connection_stats.c | 644 ++++++++++-------- .../operations/worker_node_manager.c | 1 + .../distributed/connection_management.h | 4 +- .../distributed/shared_connection_stats.h | 11 +- src/include/distributed/worker_manager.h | 1 + 7 files changed, 392 insertions(+), 291 deletions(-) diff --git a/src/backend/distributed/connection/connection_management.c b/src/backend/distributed/connection/connection_management.c index f8e4816ed..96e750059 100644 --- a/src/backend/distributed/connection/connection_management.c +++ b/src/backend/distributed/connection/connection_management.c @@ -379,7 +379,10 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port, if (flags & WAIT_FOR_CONNECTION) { - WaitLoopForSharedConnection(hostname, port); + int sharedCounterFlags = (flags & REQUIRE_MAINTENANCE_CONNECTION) + ? MAINTENANCE_CONNECTION_POOL + : 0; + WaitLoopForSharedConnection(sharedCounterFlags, hostname, port); } else if (flags & OPTIONAL_CONNECTION) { @@ -389,7 +392,8 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port, * cannot reserve the right to establish a connection, we prefer to * error out. */ - if (!TryToIncrementSharedConnectionCounter(hostname, port)) + int sharedCounterFlags = 0; + if (!TryToIncrementSharedConnectionCounter(sharedCounterFlags, hostname, port)) { /* do not track the connection anymore */ dlist_delete(&connection->connectionNode); @@ -409,7 +413,8 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port, * * Still, we keep track of the connection counter. */ - IncrementSharedConnectionCounter(hostname, port); + int sharedCounterFlags = 0; + IncrementSharedConnectionCounter(sharedCounterFlags, hostname, port); } diff --git a/src/backend/distributed/connection/locally_reserved_shared_connections.c b/src/backend/distributed/connection/locally_reserved_shared_connections.c index a64930b32..a2d2fac24 100644 --- a/src/backend/distributed/connection/locally_reserved_shared_connections.c +++ b/src/backend/distributed/connection/locally_reserved_shared_connections.c @@ -439,13 +439,16 @@ EnsureConnectionPossibilityForNode(WorkerNode *workerNode, bool waitForConnectio * Increment the shared counter, we may need to wait if there are * no space left. */ - WaitLoopForSharedConnection(workerNode->workerName, workerNode->workerPort); + int sharedCounterFlags = 0; + WaitLoopForSharedConnection(sharedCounterFlags, workerNode->workerName, workerNode->workerPort); } else { - bool incremented = - TryToIncrementSharedConnectionCounter(workerNode->workerName, - workerNode->workerPort); + int sharedCounterFlags = 0; + bool incremented = TryToIncrementSharedConnectionCounter( + sharedCounterFlags, + workerNode->workerName, + workerNode->workerPort); if (!incremented) { /* diff --git a/src/backend/distributed/connection/shared_connection_stats.c b/src/backend/distributed/connection/shared_connection_stats.c index b748fdf02..40d74ca11 100644 --- a/src/backend/distributed/connection/shared_connection_stats.c +++ b/src/backend/distributed/connection/shared_connection_stats.c @@ -18,7 +18,6 @@ #include "access/hash.h" #include "access/htup_details.h" -#include "catalog/pg_authid.h" #include "commands/dbcommands.h" #include "common/hashfn.h" #include "storage/ipc.h" @@ -27,15 +26,13 @@ #include "pg_version_constants.h" #include "distributed/backend_data.h" -#include "distributed/cancel_utils.h" #include "distributed/connection_management.h" -#include "distributed/listutils.h" #include "distributed/locally_reserved_shared_connections.h" #include "distributed/metadata_cache.h" -#include "distributed/multi_executor.h" #include "distributed/placement_connection.h" #include "distributed/shared_connection_stats.h" #include "distributed/time_constants.h" +#include "distributed/worker_manager.h" #include "distributed/tuplestore.h" #include "distributed/worker_manager.h" @@ -58,8 +55,14 @@ typedef struct ConnectionStatsSharedData ConditionVariable waitersConditionVariable; } ConnectionStatsSharedData; - -typedef struct SharedConnStatsHashKey +/* + * There are two hash tables: + * + * 1. The first one tracks the connection count per worker node and used for the connection throttling + * 2. The second one tracks the connection count per database on a worker node and used for statistics + * + */ +typedef struct SharedWorkerNodeConnStatsHashKey { /* * We keep the entries in the shared memory even after master_update_node() @@ -68,16 +71,28 @@ typedef struct SharedConnStatsHashKey */ char hostname[MAX_NODE_LENGTH]; int32 port; -} SharedConnStatsHashKey; +} SharedWorkerNodeConnStatsHashKey; + +typedef struct SharedWorkerNodeDatabaseConnStatsHashKey +{ + SharedWorkerNodeConnStatsHashKey workerNodeKey; + Oid database; +} SharedWorkerNodeDatabaseConnStatsHashKey; /* hash entry for per worker stats */ -typedef struct SharedConnStatsHashEntry +typedef struct SharedWorkerNodeConnStatsHashEntry { - SharedConnStatsHashKey key; - Oid databaseOid; + SharedWorkerNodeConnStatsHashKey key; - int connectionCount; -} SharedConnStatsHashEntry; + int count; +} SharedWorkerNodeConnStatsHashEntry; + +/* hash entry for per database on worker stats */ +typedef struct SharedWorkerNodeDatabaseConnStatsHashEntry +{ + SharedWorkerNodeDatabaseConnStatsHashKey key; + int count; +} SharedWorkerNodeDatabaseConnStatsHashEntry; /* @@ -88,6 +103,11 @@ typedef struct SharedConnStatsHashEntry */ int MaxSharedPoolSize = 0; +/* + * + */ +float SharedPoolSizeMaintenancePercent = 10.0f; + /* * Controlled via a GUC, never access directly, use GetLocalSharedPoolSize(). * "0" means adjust LocalSharedPoolSize automatically by using MaxConnections. @@ -101,7 +121,8 @@ int MaxClientConnections = ALLOW_ALL_EXTERNAL_CONNECTIONS; /* the following two structs are used for accessing shared memory */ -static HTAB *SharedConnStatsHash = NULL; +static HTAB *SharedWorkerNodeConnStatsHash = NULL; +static HTAB *SharedWorkerNodeDatabaseConnStatsHash = NULL; static ConnectionStatsSharedData *ConnectionStatsSharedState = NULL; @@ -116,10 +137,25 @@ static void UnLockConnectionSharedMemory(void); static bool ShouldWaitForConnection(int currentConnectionCount); static uint32 SharedConnectionHashHash(const void *key, Size keysize); static int SharedConnectionHashCompare(const void *a, const void *b, Size keysize); +static uint32 SharedWorkerNodeDatabaseHashHash(const void *key, Size keysize); +static int SharedWorkerNodeDatabaseHashCompare(const void *a, const void *b, Size keysize); +static bool IsConnectionToLocalNode(SharedWorkerNodeConnStatsHashKey *connKey); +static bool isConnectionSlotAvailable(SharedWorkerNodeConnStatsHashKey *connKey, + const SharedWorkerNodeConnStatsHashEntry *connectionEntry); +static bool +IncrementSharedConnectionCounterInternal(uint32 externalFlags, bool checkLimits, const char *hostname, int port, + Oid database); +static SharedWorkerNodeConnStatsHashKey PrepareWorkerNodeHashKey(const char *hostname, int port); +static SharedWorkerNodeDatabaseConnStatsHashKey PrepareWorkerNodeDatabaseHashKey(const char *hostname, + int port, + Oid database); +static void +DecrementSharedConnectionCounterInternal(const char *hostname, int port); PG_FUNCTION_INFO_V1(citus_remote_connection_stats); + /* * citus_remote_connection_stats returns all the avaliable information about all * the remote connections (a.k.a., connections to remote nodes). @@ -155,26 +191,26 @@ StoreAllRemoteConnectionStats(Tuplestorestate *tupleStore, TupleDesc tupleDescri LockConnectionSharedMemory(LW_SHARED); HASH_SEQ_STATUS status; - SharedConnStatsHashEntry *connectionEntry = NULL; + SharedWorkerNodeDatabaseConnStatsHashEntry *connectionEntry = NULL; - hash_seq_init(&status, SharedConnStatsHash); - while ((connectionEntry = (SharedConnStatsHashEntry *) hash_seq_search(&status)) != 0) + hash_seq_init(&status, SharedWorkerNodeDatabaseConnStatsHash); + while ((connectionEntry = (SharedWorkerNodeDatabaseConnStatsHashEntry *) hash_seq_search(&status)) != 0) { /* get ready for the next tuple */ memset(values, 0, sizeof(values)); memset(isNulls, false, sizeof(isNulls)); - char *databaseName = get_database_name(connectionEntry->databaseOid); + char *databaseName = get_database_name(connectionEntry->key.database); if (databaseName == NULL) { /* database might have been dropped */ continue; } - values[0] = PointerGetDatum(cstring_to_text(connectionEntry->key.hostname)); - values[1] = Int32GetDatum(connectionEntry->key.port); + values[0] = PointerGetDatum(cstring_to_text(connectionEntry->key.workerNodeKey.hostname)); + values[1] = Int32GetDatum(connectionEntry->key.workerNodeKey.port); values[2] = PointerGetDatum(cstring_to_text(databaseName)); - values[3] = Int32GetDatum(connectionEntry->connectionCount); + values[3] = Int32GetDatum(connectionEntry->count); tuplestore_putvalues(tupleStore, tupleDescriptor, values, isNulls); } @@ -220,6 +256,12 @@ GetMaxSharedPoolSize(void) return MaxSharedPoolSize; } +float +GetSharedPoolSizeMaintenancePercent(void) +{ + return SharedPoolSizeMaintenancePercent; +} + /* * GetLocalSharedPoolSize is a wrapper around LocalSharedPoolSize which is @@ -243,14 +285,14 @@ GetLocalSharedPoolSize(void) /* * WaitLoopForSharedConnection tries to increment the shared connection * counter for the given hostname/port and the current database in - * SharedConnStatsHash. + * SharedWorkerNodeConnStatsHash. * * The function implements a retry mechanism via a condition variable. */ void -WaitLoopForSharedConnection(const char *hostname, int port) +WaitLoopForSharedConnection(uint32 flags, const char *hostname, int port) { - while (!TryToIncrementSharedConnectionCounter(hostname, port)) + while (!TryToIncrementSharedConnectionCounter(flags, hostname, port)) { CHECK_FOR_INTERRUPTS(); @@ -260,149 +302,37 @@ WaitLoopForSharedConnection(const char *hostname, int port) ConditionVariableCancelSleep(); } - /* * TryToIncrementSharedConnectionCounter tries to increment the shared * connection counter for the given nodeId and the current database in - * SharedConnStatsHash. + * SharedWorkerNodeConnStatsHash. * * If the function returns true, the caller is allowed (and expected) * to establish a new connection to the given node. Else, the caller * is not allowed to establish a new connection. */ bool -TryToIncrementSharedConnectionCounter(const char *hostname, int port) +TryToIncrementSharedConnectionCounter(uint32 flags, const char *hostname, int port) { - if (GetMaxSharedPoolSize() == DISABLE_CONNECTION_THROTTLING) + if (GetMaxSharedPoolSize() == DISABLE_CONNECTION_THROTTLING) { /* connection throttling disabled */ return true; } - bool counterIncremented = false; - SharedConnStatsHashKey connKey; - - strlcpy(connKey.hostname, hostname, MAX_NODE_LENGTH); - if (strlen(hostname) > MAX_NODE_LENGTH) - { - ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("hostname exceeds the maximum length of %d", - MAX_NODE_LENGTH))); - } - - /* + /* * The local session might already have some reserved connections to the given * node. In that case, we don't need to go through the shared memory. */ - Oid userId = GetUserId(); - if (CanUseReservedConnection(hostname, port, userId, MyDatabaseId)) - { - MarkReservedConnectionUsed(hostname, port, userId, MyDatabaseId); + Oid userId = GetUserId(); + if (CanUseReservedConnection(hostname, port, userId, MyDatabaseId)) + { + MarkReservedConnectionUsed(hostname, port, userId, MyDatabaseId); - return true; - } + return true; + } - connKey.port = port; - - /* - * Handle adaptive connection management for the local node slightly different - * as local node can failover to local execution. - */ - bool connectionToLocalNode = false; - int activeBackendCount = 0; - WorkerNode *workerNode = FindWorkerNode(hostname, port); - if (workerNode) - { - connectionToLocalNode = (workerNode->groupId == GetLocalGroupId()); - if (connectionToLocalNode && - GetLocalSharedPoolSize() == DISABLE_REMOTE_CONNECTIONS_FOR_LOCAL_QUERIES) - { - /* - * This early return is required as LocalNodeParallelExecutionFactor - * is ignored for the first connection below. This check makes the - * user experience is more accurate and also makes it easy for - * having regression tests which emulates the local node adaptive - * connection management. - */ - return false; - } - - activeBackendCount = GetExternalClientBackendCount(); - } - - LockConnectionSharedMemory(LW_EXCLUSIVE); - - /* - * As the hash map is allocated in shared memory, it doesn't rely on palloc for - * memory allocation, so we could get NULL via HASH_ENTER_NULL when there is no - * space in the shared memory. That's why we prefer continuing the execution - * instead of throwing an error. - */ - bool entryFound = false; - SharedConnStatsHashEntry *connectionEntry = - hash_search(SharedConnStatsHash, &connKey, HASH_ENTER_NULL, &entryFound); - - /* - * It is possible to throw an error at this point, but that doesn't help us in anyway. - * Instead, we try our best, let the connection establishment continue by-passing the - * connection throttling. - */ - if (!connectionEntry) - { - UnLockConnectionSharedMemory(); - return true; - } - - if (!entryFound) - { - /* we successfully allocated the entry for the first time, so initialize it */ - connectionEntry->connectionCount = 1; - connectionEntry->databaseOid = MyDatabaseId; - - counterIncremented = true; - } - else if (connectionToLocalNode) - { - /* - * For local nodes, solely relying on citus.max_shared_pool_size or - * max_connections might not be sufficient. The former gives us - * a preview of the future (e.g., we let the new connections to establish, - * but they are not established yet). The latter gives us the close to - * precise view of the past (e.g., the active number of client backends). - * - * Overall, we want to limit both of the metrics. The former limit typically - * kicks in under regular loads, where the load of the database increases in - * a reasonable pace. The latter limit typically kicks in when the database - * is issued lots of concurrent sessions at the same time, such as benchmarks. - */ - if (activeBackendCount + 1 > GetLocalSharedPoolSize()) - { - counterIncremented = false; - } - else if (connectionEntry->connectionCount + 1 > GetLocalSharedPoolSize()) - { - counterIncremented = false; - } - else - { - connectionEntry->connectionCount++; - counterIncremented = true; - } - } - else if (connectionEntry->connectionCount + 1 > GetMaxSharedPoolSize()) - { - /* there is no space left for this connection */ - counterIncremented = false; - } - else - { - connectionEntry->connectionCount++; - counterIncremented = true; - } - - UnLockConnectionSharedMemory(); - - return counterIncremented; + return IncrementSharedConnectionCounterInternal(flags, true, hostname, port, MyDatabaseId); } @@ -411,64 +341,162 @@ TryToIncrementSharedConnectionCounter(const char *hostname, int port) * for the given hostname and port. */ void -IncrementSharedConnectionCounter(const char *hostname, int port) +IncrementSharedConnectionCounter(uint32 flags, const char *hostname, int port) { - SharedConnStatsHashKey connKey; + if (MaxSharedPoolSize == DISABLE_CONNECTION_THROTTLING) + { + /* connection throttling disabled */ + return; + } - if (MaxSharedPoolSize == DISABLE_CONNECTION_THROTTLING) - { - /* connection throttling disabled */ - return; - } - - strlcpy(connKey.hostname, hostname, MAX_NODE_LENGTH); - if (strlen(hostname) > MAX_NODE_LENGTH) - { - ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("hostname exceeds the maximum length of %d", - MAX_NODE_LENGTH))); - } - - connKey.port = port; - - LockConnectionSharedMemory(LW_EXCLUSIVE); - - /* - * As the hash map is allocated in shared memory, it doesn't rely on palloc for - * memory allocation, so we could get NULL via HASH_ENTER_NULL. That's why we prefer - * continuing the execution instead of throwing an error. - */ - bool entryFound = false; - SharedConnStatsHashEntry *connectionEntry = - hash_search(SharedConnStatsHash, &connKey, HASH_ENTER_NULL, &entryFound); - - /* - * It is possible to throw an error at this point, but that doesn't help us in anyway. - * Instead, we try our best, let the connection establishment continue by-passing the - * connection throttling. - */ - if (!connectionEntry) - { - UnLockConnectionSharedMemory(); - - ereport(DEBUG4, (errmsg("No entry found for node %s:%d while incrementing " - "connection counter", hostname, port))); - - return; - } - - if (!entryFound) - { - /* we successfully allocated the entry for the first time, so initialize it */ - connectionEntry->connectionCount = 0; - connectionEntry->databaseOid = MyDatabaseId; - } - - connectionEntry->connectionCount += 1; - - UnLockConnectionSharedMemory(); + IncrementSharedConnectionCounterInternal(flags, false, hostname, port, MyDatabaseId); } +static SharedWorkerNodeConnStatsHashKey PrepareWorkerNodeHashKey(const char *hostname, int port) +{ + SharedWorkerNodeConnStatsHashKey key; + strlcpy(key.hostname, hostname, MAX_NODE_LENGTH); + if (strlen(hostname) > MAX_NODE_LENGTH) + { + ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("hostname exceeds the maximum length of %d", + MAX_NODE_LENGTH))); + } + key.port = port; + return key; +} + +static SharedWorkerNodeDatabaseConnStatsHashKey PrepareWorkerNodeDatabaseHashKey(const char *hostname, + int port, + Oid database) +{ + SharedWorkerNodeDatabaseConnStatsHashKey workerNodeDatabaseKey; + workerNodeDatabaseKey.workerNodeKey = PrepareWorkerNodeHashKey(hostname, port); + workerNodeDatabaseKey.database = database; + return workerNodeDatabaseKey; +} + +static bool +IncrementSharedConnectionCounterInternal(uint32 externalFlags, + bool checkLimits, + const char *hostname, + int port, + Oid database) +{ + LockConnectionSharedMemory(LW_EXCLUSIVE); + + /* + * As the hash map is allocated in shared memory, it doesn't rely on palloc for + * memory allocation, so we could get NULL via HASH_ENTER_NULL when there is no + * space in the shared memory. That's why we prefer continuing the execution + * instead of throwing an error. + */ + SharedWorkerNodeConnStatsHashKey workerNodeKey = PrepareWorkerNodeHashKey(hostname, port); + bool workerNodeEntryFound = false; + SharedWorkerNodeConnStatsHashEntry *workerNodeConnectionEntry = + hash_search(SharedWorkerNodeConnStatsHash, + &workerNodeKey, + HASH_ENTER_NULL, + &workerNodeEntryFound); + + /* + * It is possible to throw an error at this point, but that doesn't help us in anyway. + * Instead, we try our best, let the connection establishment continue by-passing the + * connection throttling. + */ + if (!workerNodeConnectionEntry) + { + UnLockConnectionSharedMemory(); + return true; + } + + if (!workerNodeEntryFound) + { + /* we successfully allocated the entry for the first time, so initialize it */ + workerNodeConnectionEntry->count = 0; + } + + /* Initialized SharedWorkerNodeDatabaseConnStatsHash the same way */ + SharedWorkerNodeDatabaseConnStatsHashKey workerNodeDatabaseKey = + PrepareWorkerNodeDatabaseHashKey(hostname, port, database); + bool workerNodeDatabaseEntryFound = false; + SharedWorkerNodeDatabaseConnStatsHashEntry *workerNodeDatabaseEntry = + hash_search(SharedWorkerNodeDatabaseConnStatsHash, + &workerNodeDatabaseKey, + HASH_ENTER_NULL, + &workerNodeDatabaseEntryFound); + + if (!workerNodeDatabaseEntry) + { + UnLockConnectionSharedMemory(); + return true; + } + + if (!workerNodeDatabaseEntryFound) + { + workerNodeDatabaseEntry->count = 0; + } + + /* Increment counter if possible */ + bool connectionSlotAvailable = true; + connectionSlotAvailable = !checkLimits || + isConnectionSlotAvailable(&workerNodeKey, workerNodeConnectionEntry); + + if (connectionSlotAvailable) + { + workerNodeConnectionEntry->count += 1; + workerNodeDatabaseEntry->count += 1; + } + + UnLockConnectionSharedMemory(); + + return connectionSlotAvailable; +} + +static bool IsConnectionToLocalNode(SharedWorkerNodeConnStatsHashKey *connKey) +{ + WorkerNode *workerNode = FindWorkerNode(connKey->hostname, connKey->port); + return workerNode && (workerNode->groupId == GetLocalGroupId()); +} + + +static bool isConnectionSlotAvailable(SharedWorkerNodeConnStatsHashKey *connKey, + const SharedWorkerNodeConnStatsHashEntry *connectionEntry) +{ + bool connectionSlotAvailable = true; + bool connectionToLocalNode = IsConnectionToLocalNode(connKey); + if (connectionToLocalNode) + { + bool remoteConnectionsForLocalQueriesDisabled = + GetLocalSharedPoolSize() == DISABLE_REMOTE_CONNECTIONS_FOR_LOCAL_QUERIES; + /* + * For local nodes, solely relying on citus.max_shared_pool_size or + * max_connections might not be sufficient. The former gives us + * a preview of the future (e.g., we let the new connections to establish, + * but they are not established yet). The latter gives us the close to + * precise view of the past (e.g., the active number of client backends). + * + * Overall, we want to limit both of the metrics. The former limit typically + * kicks in under regular loads, where the load of the database increases in + * a reasonable pace. The latter limit typically kicks in when the database + * is issued lots of concurrent sessions at the same time, such as benchmarks. + */ + bool localConnectionLimitExceeded = + GetExternalClientBackendCount() + 1 > GetLocalSharedPoolSize() || + connectionEntry->count + 1 > GetLocalSharedPoolSize(); + if (remoteConnectionsForLocalQueriesDisabled || localConnectionLimitExceeded) + { + connectionSlotAvailable = false; + } + } + else if (connectionEntry->count + 1 > GetMaxSharedPoolSize()) + { + connectionSlotAvailable = false; + } + return connectionSlotAvailable; +} + + /* * DecrementSharedConnectionCounter decrements the shared counter @@ -477,79 +505,96 @@ IncrementSharedConnectionCounter(const char *hostname, int port) void DecrementSharedConnectionCounter(const char *hostname, int port) { - SharedConnStatsHashKey connKey; - /* - * Do not call GetMaxSharedPoolSize() here, since it may read from - * the catalog and we may be in the process exit handler. - */ - if (MaxSharedPoolSize == DISABLE_CONNECTION_THROTTLING) - { - /* connection throttling disabled */ - return; - } + DecrementSharedConnectionCounterInternal(hostname, port); + UnLockConnectionSharedMemory(); + WakeupWaiterBackendsForSharedConnection(); +} - strlcpy(connKey.hostname, hostname, MAX_NODE_LENGTH); - if (strlen(hostname) > MAX_NODE_LENGTH) - { - ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("hostname exceeds the maximum length of %d", - MAX_NODE_LENGTH))); - } +static void +DecrementSharedConnectionCounterInternal(const char *hostname, int port) +{ + /* + * Do not call GetMaxSharedPoolSize() here, since it may read from + * the catalog and we may be in the process exit handler. + */ + if (MaxSharedPoolSize == DISABLE_CONNECTION_THROTTLING) + { + /* connection throttling disabled */ + return; + } - connKey.port = port; + LockConnectionSharedMemory(LW_EXCLUSIVE); - LockConnectionSharedMemory(LW_EXCLUSIVE); + bool workerNodeEntryFound = false; + SharedWorkerNodeConnStatsHashKey workerNodeKey = PrepareWorkerNodeHashKey(hostname, port); + SharedWorkerNodeConnStatsHashEntry *workerNodeEntry = + hash_search(SharedWorkerNodeConnStatsHash, &workerNodeKey, HASH_FIND, &workerNodeEntryFound); - bool entryFound = false; - SharedConnStatsHashEntry *connectionEntry = - hash_search(SharedConnStatsHash, &connKey, HASH_FIND, &entryFound); + /* this worker node is removed or updated, no need to care */ + if (!workerNodeEntryFound) + { + ereport(DEBUG4, (errmsg("No entry found for node %s:%d while decrementing " + "connection counter", hostname, port))); + return; + } - /* this worker node is removed or updated, no need to care */ - if (!entryFound) - { - UnLockConnectionSharedMemory(); + /* we should never go below 0 */ + Assert(workerNodeEntry->count > 0); - /* wake up any waiters in case any backend is waiting for this node */ - WakeupWaiterBackendsForSharedConnection(); - ereport(DEBUG4, (errmsg("No entry found for node %s:%d while decrementing " - "connection counter", hostname, port))); + workerNodeEntry->count -= 1; - return; - } - /* we should never go below 0 */ - Assert(connectionEntry->connectionCount > 0); + /* + * We don't have to remove at this point as the node might be still active + * and will have new connections open to it. Still, this seems like a convenient + * place to remove the entry, as count == 0 implies that the server is + * not busy, and given the default value of MaxCachedConnectionsPerWorker = 1, + * we're unlikely to trigger this often. + */ + if (workerNodeEntry->count == 0) + { + hash_search(SharedWorkerNodeConnStatsHash, &workerNodeKey, HASH_REMOVE, NULL); + } - connectionEntry->connectionCount -= 1; + /* + * Perform the same with SharedWorkerNodeDatabaseConnStatsHashKey + */ - if (connectionEntry->connectionCount == 0) - { - /* - * We don't have to remove at this point as the node might be still active - * and will have new connections open to it. Still, this seems like a convenient - * place to remove the entry, as connectionCount == 0 implies that the server is - * not busy, and given the default value of MaxCachedConnectionsPerWorker = 1, - * we're unlikely to trigger this often. - */ - hash_search(SharedConnStatsHash, &connKey, HASH_REMOVE, &entryFound); - } + SharedWorkerNodeDatabaseConnStatsHashKey workerNodeDatabaseKey = + PrepareWorkerNodeDatabaseHashKey(hostname, port, MyDatabaseId); + bool workerNodeDatabaseEntryFound = false; + SharedWorkerNodeDatabaseConnStatsHashEntry *workerNodeDatabaseEntry = + hash_search(SharedWorkerNodeDatabaseConnStatsHash, + &workerNodeDatabaseKey, + HASH_FIND, + &workerNodeDatabaseEntryFound); - UnLockConnectionSharedMemory(); + if (!workerNodeDatabaseEntryFound) + { + return; + } - WakeupWaiterBackendsForSharedConnection(); + Assert(workerNodeDatabaseEntry->count > 0); + + workerNodeDatabaseEntry->count -= 1; + + if (workerNodeDatabaseEntry->count == 0) + { + hash_search(SharedWorkerNodeDatabaseConnStatsHash, &workerNodeDatabaseKey, HASH_REMOVE, NULL); + } } /* * LockConnectionSharedMemory is a utility function that should be used when - * accessing to the SharedConnStatsHash, which is in the shared memory. + * accessing to the SharedWorkerNodeConnStatsHash, which is in the shared memory. */ static void LockConnectionSharedMemory(LWLockMode lockMode) { - LWLockAcquire(&ConnectionStatsSharedState->sharedConnectionHashLock, lockMode); + LWLockAcquire(&ConnectionStatsSharedState->sharedConnectionHashLock, lockMode); } @@ -634,12 +679,17 @@ SharedConnectionStatsShmemSize(void) size = add_size(size, sizeof(ConnectionStatsSharedData)); - Size hashSize = hash_estimate_size(MaxWorkerNodesTracked, - sizeof(SharedConnStatsHashEntry)); + Size workerNodeConnHashSize = hash_estimate_size(MaxWorkerNodesTracked, + sizeof(SharedWorkerNodeConnStatsHashEntry)); - size = add_size(size, hashSize); + size = add_size(size, workerNodeConnHashSize); - return size; + Size workerNodeDatabaseConnSize = hash_estimate_size(DatabasesPerWorker, + sizeof(SharedWorkerNodeDatabaseConnStatsHashEntry)); + + size = add_size(size, workerNodeDatabaseConnSize); + + return size; } @@ -651,15 +701,6 @@ void SharedConnectionStatsShmemInit(void) { bool alreadyInitialized = false; - HASHCTL info; - - /* create (hostname, port, database) -> [counter] */ - memset(&info, 0, sizeof(info)); - info.keysize = sizeof(SharedConnStatsHashKey); - info.entrysize = sizeof(SharedConnStatsHashEntry); - info.hash = SharedConnectionHashHash; - info.match = SharedConnectionHashCompare; - uint32 hashFlags = (HASH_ELEM | HASH_FUNCTION | HASH_COMPARE); /* * Currently the lock isn't required because allocation only happens at @@ -688,14 +729,42 @@ SharedConnectionStatsShmemInit(void) ConditionVariableInit(&ConnectionStatsSharedState->waitersConditionVariable); } - /* allocate hash table */ - SharedConnStatsHash = - ShmemInitHash("Shared Conn. Stats Hash", MaxWorkerNodesTracked, - MaxWorkerNodesTracked, &info, hashFlags); + /* allocate hash tables */ + + /* create (hostname, port) -> [counter] */ + HASHCTL sharedWorkerNodeConnStatsHashInfo; + memset(&sharedWorkerNodeConnStatsHashInfo, 0, sizeof(sharedWorkerNodeConnStatsHashInfo)); + sharedWorkerNodeConnStatsHashInfo.keysize = sizeof(SharedWorkerNodeConnStatsHashKey); + sharedWorkerNodeConnStatsHashInfo.entrysize = sizeof(SharedWorkerNodeConnStatsHashEntry); + sharedWorkerNodeConnStatsHashInfo.hash = SharedConnectionHashHash; + sharedWorkerNodeConnStatsHashInfo.match = SharedConnectionHashCompare; + SharedWorkerNodeConnStatsHash = + ShmemInitHash("Shared Conn. Stats Hash", + MaxWorkerNodesTracked, + MaxWorkerNodesTracked, + &sharedWorkerNodeConnStatsHashInfo, + (HASH_ELEM | HASH_FUNCTION | HASH_COMPARE)); + + /* create (hostname, port, database) -> [counter] */ + HASHCTL sharedWorkerNodeDatabaseConnStatsHashInfo; + memset(&sharedWorkerNodeDatabaseConnStatsHashInfo, 0, sizeof(sharedWorkerNodeDatabaseConnStatsHashInfo)); + sharedWorkerNodeDatabaseConnStatsHashInfo.keysize = sizeof(SharedWorkerNodeDatabaseConnStatsHashKey); + sharedWorkerNodeDatabaseConnStatsHashInfo.entrysize = sizeof(SharedWorkerNodeDatabaseConnStatsHashEntry); + sharedWorkerNodeDatabaseConnStatsHashInfo.hash = SharedWorkerNodeDatabaseHashHash; + sharedWorkerNodeDatabaseConnStatsHashInfo.match = SharedWorkerNodeDatabaseHashCompare; + + int sharedWorkerNodeDatabaseConnStatsHashSize = MaxWorkerNodesTracked * DatabasesPerWorker; + SharedWorkerNodeDatabaseConnStatsHash = + ShmemInitHash("Shared Conn Per Database. Stats Hash", + sharedWorkerNodeDatabaseConnStatsHashSize, + sharedWorkerNodeDatabaseConnStatsHashSize, + &sharedWorkerNodeDatabaseConnStatsHashInfo, + (HASH_ELEM | HASH_FUNCTION | HASH_COMPARE)); LWLockRelease(AddinShmemInitLock); - Assert(SharedConnStatsHash != NULL); + Assert(SharedWorkerNodeConnStatsHash != NULL); + Assert(SharedWorkerNodeDatabaseConnStatsHash != NULL); Assert(ConnectionStatsSharedState->sharedConnectionHashTrancheId != 0); if (prev_shmem_startup_hook != NULL) @@ -797,7 +866,7 @@ ShouldWaitForConnection(int currentConnectionCount) static uint32 SharedConnectionHashHash(const void *key, Size keysize) { - SharedConnStatsHashKey *entry = (SharedConnStatsHashKey *) key; + SharedWorkerNodeConnStatsHashKey *entry = (SharedWorkerNodeConnStatsHashKey *) key; uint32 hash = string_hash(entry->hostname, NAMEDATALEN); hash = hash_combine(hash, hash_uint32(entry->port)); @@ -805,20 +874,35 @@ SharedConnectionHashHash(const void *key, Size keysize) return hash; } +static uint32 +SharedWorkerNodeDatabaseHashHash(const void *key, Size keysize) +{ + SharedWorkerNodeDatabaseConnStatsHashKey *entry = (SharedWorkerNodeDatabaseConnStatsHashKey *) key; + uint32 hash = string_hash(entry->workerNodeKey.hostname, NAMEDATALEN); + hash = hash_combine(hash, hash_uint32(entry->workerNodeKey.port)); + hash = hash_combine(hash, hash_uint32(entry->database)); + + return hash; +} + static int SharedConnectionHashCompare(const void *a, const void *b, Size keysize) { - SharedConnStatsHashKey *ca = (SharedConnStatsHashKey *) a; - SharedConnStatsHashKey *cb = (SharedConnStatsHashKey *) b; + SharedWorkerNodeConnStatsHashKey *ca = (SharedWorkerNodeConnStatsHashKey *) a; + SharedWorkerNodeConnStatsHashKey *cb = (SharedWorkerNodeConnStatsHashKey *) b; - if (strncmp(ca->hostname, cb->hostname, MAX_NODE_LENGTH) != 0 || - ca->port != cb->port) - { - return 1; - } - else - { - return 0; - } + return strncmp(ca->hostname, cb->hostname, MAX_NODE_LENGTH) != 0 || + ca->port != cb->port; +} + +static int +SharedWorkerNodeDatabaseHashCompare(const void *a, const void *b, Size keysize) +{ + SharedWorkerNodeDatabaseConnStatsHashKey *ca = (SharedWorkerNodeDatabaseConnStatsHashKey *) a; + SharedWorkerNodeDatabaseConnStatsHashKey *cb = (SharedWorkerNodeDatabaseConnStatsHashKey *) b; + + return strncmp(ca->workerNodeKey.hostname, cb->workerNodeKey.hostname, MAX_NODE_LENGTH) != 0 || + ca->workerNodeKey.port != cb->workerNodeKey.port || + ca->database != cb->database; } diff --git a/src/backend/distributed/operations/worker_node_manager.c b/src/backend/distributed/operations/worker_node_manager.c index ba622e4d7..594e9f23f 100644 --- a/src/backend/distributed/operations/worker_node_manager.c +++ b/src/backend/distributed/operations/worker_node_manager.c @@ -39,6 +39,7 @@ /* Config variables managed via guc.c */ char *WorkerListFileName; int MaxWorkerNodesTracked = 2048; /* determines worker node hash table size */ +int DatabasesPerWorker = 10; /* determine database per worker hash table size */ /* Local functions forward declarations */ diff --git a/src/include/distributed/connection_management.h b/src/include/distributed/connection_management.h index d93e4483a..4f6c3d82f 100644 --- a/src/include/distributed/connection_management.h +++ b/src/include/distributed/connection_management.h @@ -123,7 +123,9 @@ enum MultiConnectionMode * * This is need to run 'CREATE_REPLICATION_SLOT' command. */ - REQUIRE_REPLICATION_CONNECTION_PARAM = 1 << 8 + REQUIRE_REPLICATION_CONNECTION_PARAM = 1 << 8, + + REQUIRE_MAINTENANCE_CONNECTION = 1 << 9 }; diff --git a/src/include/distributed/shared_connection_stats.h b/src/include/distributed/shared_connection_stats.h index 007691e16..b23dee905 100644 --- a/src/include/distributed/shared_connection_stats.h +++ b/src/include/distributed/shared_connection_stats.h @@ -16,8 +16,13 @@ #define DISABLE_REMOTE_CONNECTIONS_FOR_LOCAL_QUERIES -1 #define ALLOW_ALL_EXTERNAL_CONNECTIONS -1 +enum SharedPoolCounterMode +{ + MAINTENANCE_CONNECTION_POOL = 1 << 0 +}; extern int MaxSharedPoolSize; +extern float MaintenanceSharedPoolSizePercent; extern int LocalSharedPoolSize; extern int MaxClientConnections; @@ -30,10 +35,10 @@ extern void SharedConnectionStatsShmemInit(void); extern int GetMaxClientConnections(void); extern int GetMaxSharedPoolSize(void); extern int GetLocalSharedPoolSize(void); -extern bool TryToIncrementSharedConnectionCounter(const char *hostname, int port); -extern void WaitLoopForSharedConnection(const char *hostname, int port); +extern bool TryToIncrementSharedConnectionCounter(uint32 flags, const char *hostname, int port); +extern void WaitLoopForSharedConnection(uint32 flags, const char *hostname, int port); extern void DecrementSharedConnectionCounter(const char *hostname, int port); -extern void IncrementSharedConnectionCounter(const char *hostname, int port); +extern void IncrementSharedConnectionCounter(uint32 flags, const char *hostname, int port); extern int AdaptiveConnectionManagementFlag(bool connectToLocalNode, int activeConnectionCount); diff --git a/src/include/distributed/worker_manager.h b/src/include/distributed/worker_manager.h index 02a43fe0b..aad2f157c 100644 --- a/src/include/distributed/worker_manager.h +++ b/src/include/distributed/worker_manager.h @@ -59,6 +59,7 @@ typedef struct WorkerNode /* Config variables managed via guc.c */ extern int MaxWorkerNodesTracked; +extern int DatabasesPerWorker; extern char *WorkerListFileName; extern char *CurrentCluster;