From a919f09c9622ca0e91eb4d5fc0ecb2fb250bb8a4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C3=96nder=20Kalac=C4=B1?= Date: Fri, 17 Apr 2020 16:14:58 +0200 Subject: [PATCH] Remove the entries from the shared connection counter hash when no connections remain (#3775) We initially considered removing entries just before any change to pg_dist_node. However, that ended-up being very complex and making MX even more complex. Instead, we're switching to a simpler solution, where we remove entries when the counter gets to 0. With certain workloads, this may have some performance penalty. But, two notes on that: - When counter == 0, it implies that the cluster is not busy - With cached connections, that's not possible --- .../connection/shared_connection_stats.c | 79 +++---------------- .../distributed/metadata/node_metadata.c | 12 --- .../distributed/shared_connection_stats.h | 1 - .../ensure_no_shared_connection_leak.out | 4 +- .../expected/shared_connection_stats.out | 16 +--- 5 files changed, 17 insertions(+), 95 deletions(-) diff --git a/src/backend/distributed/connection/shared_connection_stats.c b/src/backend/distributed/connection/shared_connection_stats.c index ffcd3ca30..09b1b9322 100644 --- a/src/backend/distributed/connection/shared_connection_stats.c +++ b/src/backend/distributed/connection/shared_connection_stats.c @@ -177,73 +177,6 @@ StoreAllRemoteConnectionStats(Tuplestorestate *tupleStore, TupleDesc tupleDescri } -/* - * RemoveInactiveNodesFromSharedConnections goes over the SharedConnStatsHash - * and removes the inactive entries. - */ -void -RemoveInactiveNodesFromSharedConnections(void) -{ - /* we're modifying connections, prevent any changes */ - LockConnectionSharedMemory(LW_EXCLUSIVE); - - HASH_SEQ_STATUS status; - SharedConnStatsHashEntry *connectionEntry = NULL; - - /* - * In the first iteration, try to remove worker nodes that doesn't have any active - * conections and the node does not exits in the metadata anymore. - */ - hash_seq_init(&status, SharedConnStatsHash); - while ((connectionEntry = (SharedConnStatsHashEntry *) hash_seq_search(&status)) != 0) - { - SharedConnStatsHashKey connectionKey = connectionEntry->key; - WorkerNode *workerNode = - FindWorkerNode(connectionKey.hostname, connectionKey.port); - - if (connectionEntry->connectionCount == 0 && - (workerNode == NULL || !workerNode->isActive)) - { - hash_search(SharedConnStatsHash, &connectionKey, HASH_REMOVE, NULL); - } - } - - int entryCount = hash_get_num_entries(SharedConnStatsHash); - if (entryCount + 1 < MaxWorkerNodesTracked) - { - /* we're good, we have at least one more space for a new worker */ - UnLockConnectionSharedMemory(); - - return; - } - - /* - * We aimed to remove nodes that don't have any open connections. If we - * failed to find one, we have to be more aggressive and remove at least - * one of the inactive ones. - */ - hash_seq_init(&status, SharedConnStatsHash); - while ((connectionEntry = (SharedConnStatsHashEntry *) hash_seq_search(&status)) != 0) - { - SharedConnStatsHashKey connectionKey = connectionEntry->key; - WorkerNode *workerNode = - FindWorkerNode(connectionKey.hostname, connectionKey.port); - - if (workerNode == NULL || !workerNode->isActive) - { - hash_search(SharedConnStatsHash, &connectionKey, HASH_REMOVE, NULL); - - hash_seq_term(&status); - - break; - } - } - - - UnLockConnectionSharedMemory(); -} - - /* * GetMaxSharedPoolSize is a wrapper around MaxSharedPoolSize which is controlled * via a GUC. @@ -478,6 +411,18 @@ DecrementSharedConnectionCounter(const char *hostname, int port) connectionEntry->connectionCount -= 1; + if (connectionEntry->connectionCount == 0) + { + /* + * We don't have to remove at this point as the node might be still active + * and will have new connections open to it. Still, this seems like a convenient + * place to remove the entry, as connectionCount == 0 implies that the server is + * not busy, and given the default value of MaxCachedConnectionsPerWorker = 1, + * we're unlikely to trigger this often. + */ + hash_search(SharedConnStatsHash, &connKey, HASH_REMOVE, &entryFound); + } + UnLockConnectionSharedMemory(); WakeupWaiterBackendsForSharedConnection(); diff --git a/src/backend/distributed/metadata/node_metadata.c b/src/backend/distributed/metadata/node_metadata.c index 58073775b..8d372be9e 100644 --- a/src/backend/distributed/metadata/node_metadata.c +++ b/src/backend/distributed/metadata/node_metadata.c @@ -295,9 +295,6 @@ master_disable_node(PG_FUNCTION_ARGS) bool onlyConsiderActivePlacements = false; MemoryContext savedContext = CurrentMemoryContext; - /* remove the shared connection counters to have some space */ - RemoveInactiveNodesFromSharedConnections(); - PG_TRY(); { if (NodeIsPrimary(workerNode)) @@ -634,9 +631,6 @@ ActivateNode(char *nodeName, int nodePort) { bool isActive = true; - /* remove the shared connection counters to have some space */ - RemoveInactiveNodesFromSharedConnections(); - /* take an exclusive lock on pg_dist_node to serialize pg_dist_node changes */ LockRelationOid(DistNodeRelationId(), ExclusiveLock); @@ -678,9 +672,6 @@ master_update_node(PG_FUNCTION_ARGS) CheckCitusVersion(ERROR); - /* remove the shared connection counters to have some space */ - RemoveInactiveNodesFromSharedConnections(); - WorkerNode *workerNodeWithSameAddress = FindWorkerNodeAnyCluster(newNodeNameString, newNodePort); if (workerNodeWithSameAddress != NULL) @@ -1049,9 +1040,6 @@ ReadDistNode(bool includeNodesFromOtherClusters) static void RemoveNodeFromCluster(char *nodeName, int32 nodePort) { - /* remove the shared connection counters to have some space */ - RemoveInactiveNodesFromSharedConnections(); - WorkerNode *workerNode = ModifiableWorkerNode(nodeName, nodePort); if (NodeIsPrimary(workerNode)) { diff --git a/src/include/distributed/shared_connection_stats.h b/src/include/distributed/shared_connection_stats.h index 1964d50b7..29e050114 100644 --- a/src/include/distributed/shared_connection_stats.h +++ b/src/include/distributed/shared_connection_stats.h @@ -17,7 +17,6 @@ extern int MaxSharedPoolSize; extern void InitializeSharedConnectionStats(void); extern void WaitForSharedConnection(void); extern void WakeupWaiterBackendsForSharedConnection(void); -extern void RemoveInactiveNodesFromSharedConnections(void); extern int GetMaxSharedPoolSize(void); extern bool TryToIncrementSharedConnectionCounter(const char *hostname, int port); extern void WaitLoopForSharedConnection(const char *hostname, int port); diff --git a/src/test/regress/expected/ensure_no_shared_connection_leak.out b/src/test/regress/expected/ensure_no_shared_connection_leak.out index 060313c60..257ecd2ed 100644 --- a/src/test/regress/expected/ensure_no_shared_connection_leak.out +++ b/src/test/regress/expected/ensure_no_shared_connection_leak.out @@ -129,9 +129,7 @@ WHERE ORDER BY 1; no_connection_to_node --------------------------------------------------------------------- - t - t -(2 rows) +(0 rows) -- now, ensure this from the workers perspective -- we should only see the connection/backend that is running the command below diff --git a/src/test/regress/expected/shared_connection_stats.out b/src/test/regress/expected/shared_connection_stats.out index 229a00aff..262c8a6b6 100644 --- a/src/test/regress/expected/shared_connection_stats.out +++ b/src/test/regress/expected/shared_connection_stats.out @@ -60,9 +60,7 @@ ORDER BY hostname, port; connection_count_to_node --------------------------------------------------------------------- - 0 - 0 -(2 rows) +(0 rows) -- single shard queries require single connection per node BEGIN; @@ -106,9 +104,7 @@ ORDER BY hostname, port; connection_count_to_node --------------------------------------------------------------------- - 0 - 0 -(2 rows) +(0 rows) -- executor is only allowed to establish a single connection per node BEGIN; @@ -147,9 +143,7 @@ ORDER BY hostname, port; connection_count_to_node --------------------------------------------------------------------- - 0 - 0 -(2 rows) +(0 rows) -- sequential mode is allowed to establish a single connection per node BEGIN; @@ -188,9 +182,7 @@ ORDER BY hostname, port; connection_count_to_node --------------------------------------------------------------------- - 0 - 0 -(2 rows) +(0 rows) -- now, decrease the shared pool size, and still force -- one connection per placement