From cb97957717022d5c0c1f1ca3be4c5cec2af63c6d Mon Sep 17 00:00:00 2001 From: Onder Kalaci Date: Tue, 7 Apr 2020 18:04:39 +0200 Subject: [PATCH] Remove nodes that do not have any connections --- .../connection/shared_connection_stats.c | 33 +++++++++++++++++-- 1 file changed, 31 insertions(+), 2 deletions(-) diff --git a/src/backend/distributed/connection/shared_connection_stats.c b/src/backend/distributed/connection/shared_connection_stats.c index b856da36c..8db8f70e7 100644 --- a/src/backend/distributed/connection/shared_connection_stats.c +++ b/src/backend/distributed/connection/shared_connection_stats.c @@ -184,20 +184,44 @@ StoreAllConnections(Tuplestorestate *tupleStore, TupleDesc tupleDescriptor) void RemoveInactiveNodesFromSharedConnections(void) { - /* we're reading all shared connections, prevent any changes */ + /* we're modifying connections, prevent any changes */ LockConnectionSharedMemory(LW_EXCLUSIVE); HASH_SEQ_STATUS status; SharedConnStatsHashEntry *connectionEntry = NULL; + /* + * In the first iteration, try to remove worker nodes that doesn't have any active + * conections and the node does not exits in the metadata anymore. + */ + hash_seq_init(&status, SharedConnStatsHash); + while ((connectionEntry = (SharedConnStatsHashEntry *) hash_seq_search(&status)) != 0) + { + SharedConnStatsHashKey connectionKey = connectionEntry->key; + WorkerNode *workerNode = + FindWorkerNode(connectionKey.hostname, connectionKey.port); + + if (connectionEntry->connectionCount == 0 && + (workerNode == NULL || !workerNode->isActive)) + { + hash_search(SharedConnStatsHash, &connectionKey, HASH_REMOVE, NULL); + } + } + int entryCount = hash_get_num_entries(SharedConnStatsHash); if (entryCount + 1 < MaxWorkerNodesTracked) { + /* we're good, we have at least one more space for a new worker */ UnLockConnectionSharedMemory(); return; } + /* + * We aimed to remove nodes that don't have any open connections. If we + * failed to find one, we have to be more aggressive and remove at least + * one of the inactive ones. + */ hash_seq_init(&status, SharedConnStatsHash); while ((connectionEntry = (SharedConnStatsHashEntry *) hash_seq_search(&status)) != 0) { @@ -208,9 +232,14 @@ RemoveInactiveNodesFromSharedConnections(void) if (workerNode == NULL || !workerNode->isActive) { hash_search(SharedConnStatsHash, &connectionKey, HASH_REMOVE, NULL); + + hash_seq_term(&status); + + break; } } + UnLockConnectionSharedMemory(); } @@ -438,7 +467,7 @@ DecrementSharedConnectionCounter(const char *hostname, int port) } /* we should never go below 0 */ - Assert (connectionEntry->connectionCount > 0); + Assert(connectionEntry->connectionCount > 0); connectionEntry->connectionCount -= 1;