mirror of https://github.com/citusdata/citus.git
Remove nodes that do not have any connections
parent
a7254db8fd
commit
cb97957717
|
@ -184,20 +184,44 @@ StoreAllConnections(Tuplestorestate *tupleStore, TupleDesc tupleDescriptor)
|
||||||
void
|
void
|
||||||
RemoveInactiveNodesFromSharedConnections(void)
|
RemoveInactiveNodesFromSharedConnections(void)
|
||||||
{
|
{
|
||||||
/* we're reading all shared connections, prevent any changes */
|
/* we're modifying connections, prevent any changes */
|
||||||
LockConnectionSharedMemory(LW_EXCLUSIVE);
|
LockConnectionSharedMemory(LW_EXCLUSIVE);
|
||||||
|
|
||||||
HASH_SEQ_STATUS status;
|
HASH_SEQ_STATUS status;
|
||||||
SharedConnStatsHashEntry *connectionEntry = NULL;
|
SharedConnStatsHashEntry *connectionEntry = NULL;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* In the first iteration, try to remove worker nodes that doesn't have any active
|
||||||
|
* conections and the node does not exits in the metadata anymore.
|
||||||
|
*/
|
||||||
|
hash_seq_init(&status, SharedConnStatsHash);
|
||||||
|
while ((connectionEntry = (SharedConnStatsHashEntry *) hash_seq_search(&status)) != 0)
|
||||||
|
{
|
||||||
|
SharedConnStatsHashKey connectionKey = connectionEntry->key;
|
||||||
|
WorkerNode *workerNode =
|
||||||
|
FindWorkerNode(connectionKey.hostname, connectionKey.port);
|
||||||
|
|
||||||
|
if (connectionEntry->connectionCount == 0 &&
|
||||||
|
(workerNode == NULL || !workerNode->isActive))
|
||||||
|
{
|
||||||
|
hash_search(SharedConnStatsHash, &connectionKey, HASH_REMOVE, NULL);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
int entryCount = hash_get_num_entries(SharedConnStatsHash);
|
int entryCount = hash_get_num_entries(SharedConnStatsHash);
|
||||||
if (entryCount + 1 < MaxWorkerNodesTracked)
|
if (entryCount + 1 < MaxWorkerNodesTracked)
|
||||||
{
|
{
|
||||||
|
/* we're good, we have at least one more space for a new worker */
|
||||||
UnLockConnectionSharedMemory();
|
UnLockConnectionSharedMemory();
|
||||||
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* We aimed to remove nodes that don't have any open connections. If we
|
||||||
|
* failed to find one, we have to be more aggressive and remove at least
|
||||||
|
* one of the inactive ones.
|
||||||
|
*/
|
||||||
hash_seq_init(&status, SharedConnStatsHash);
|
hash_seq_init(&status, SharedConnStatsHash);
|
||||||
while ((connectionEntry = (SharedConnStatsHashEntry *) hash_seq_search(&status)) != 0)
|
while ((connectionEntry = (SharedConnStatsHashEntry *) hash_seq_search(&status)) != 0)
|
||||||
{
|
{
|
||||||
|
@ -208,9 +232,14 @@ RemoveInactiveNodesFromSharedConnections(void)
|
||||||
if (workerNode == NULL || !workerNode->isActive)
|
if (workerNode == NULL || !workerNode->isActive)
|
||||||
{
|
{
|
||||||
hash_search(SharedConnStatsHash, &connectionKey, HASH_REMOVE, NULL);
|
hash_search(SharedConnStatsHash, &connectionKey, HASH_REMOVE, NULL);
|
||||||
|
|
||||||
|
hash_seq_term(&status);
|
||||||
|
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
UnLockConnectionSharedMemory();
|
UnLockConnectionSharedMemory();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -438,7 +467,7 @@ DecrementSharedConnectionCounter(const char *hostname, int port)
|
||||||
}
|
}
|
||||||
|
|
||||||
/* we should never go below 0 */
|
/* we should never go below 0 */
|
||||||
Assert (connectionEntry->connectionCount > 0);
|
Assert(connectionEntry->connectionCount > 0);
|
||||||
|
|
||||||
connectionEntry->connectionCount -= 1;
|
connectionEntry->connectionCount -= 1;
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue