mirror of https://github.com/citusdata/citus.git
Remove the entries from the shared connection counter hash when no connections remain (#3775)
We initially considered removing entries just before any change to pg_dist_node. However, that ended-up being very complex and making MX even more complex. Instead, we're switching to a simpler solution, where we remove entries when the counter gets to 0. With certain workloads, this may have some performance penalty. But, two notes on that: - When counter == 0, it implies that the cluster is not busy - With cached connections, that's not possiblepull/3768/head
parent
79f6f3c02c
commit
a919f09c96
|
@ -177,73 +177,6 @@ StoreAllRemoteConnectionStats(Tuplestorestate *tupleStore, TupleDesc tupleDescri
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* RemoveInactiveNodesFromSharedConnections goes over the SharedConnStatsHash
|
||||
* and removes the inactive entries.
|
||||
*/
|
||||
void
|
||||
RemoveInactiveNodesFromSharedConnections(void)
|
||||
{
|
||||
/* we're modifying connections, prevent any changes */
|
||||
LockConnectionSharedMemory(LW_EXCLUSIVE);
|
||||
|
||||
HASH_SEQ_STATUS status;
|
||||
SharedConnStatsHashEntry *connectionEntry = NULL;
|
||||
|
||||
/*
|
||||
* In the first iteration, try to remove worker nodes that doesn't have any active
|
||||
* conections and the node does not exits in the metadata anymore.
|
||||
*/
|
||||
hash_seq_init(&status, SharedConnStatsHash);
|
||||
while ((connectionEntry = (SharedConnStatsHashEntry *) hash_seq_search(&status)) != 0)
|
||||
{
|
||||
SharedConnStatsHashKey connectionKey = connectionEntry->key;
|
||||
WorkerNode *workerNode =
|
||||
FindWorkerNode(connectionKey.hostname, connectionKey.port);
|
||||
|
||||
if (connectionEntry->connectionCount == 0 &&
|
||||
(workerNode == NULL || !workerNode->isActive))
|
||||
{
|
||||
hash_search(SharedConnStatsHash, &connectionKey, HASH_REMOVE, NULL);
|
||||
}
|
||||
}
|
||||
|
||||
int entryCount = hash_get_num_entries(SharedConnStatsHash);
|
||||
if (entryCount + 1 < MaxWorkerNodesTracked)
|
||||
{
|
||||
/* we're good, we have at least one more space for a new worker */
|
||||
UnLockConnectionSharedMemory();
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
/*
|
||||
* We aimed to remove nodes that don't have any open connections. If we
|
||||
* failed to find one, we have to be more aggressive and remove at least
|
||||
* one of the inactive ones.
|
||||
*/
|
||||
hash_seq_init(&status, SharedConnStatsHash);
|
||||
while ((connectionEntry = (SharedConnStatsHashEntry *) hash_seq_search(&status)) != 0)
|
||||
{
|
||||
SharedConnStatsHashKey connectionKey = connectionEntry->key;
|
||||
WorkerNode *workerNode =
|
||||
FindWorkerNode(connectionKey.hostname, connectionKey.port);
|
||||
|
||||
if (workerNode == NULL || !workerNode->isActive)
|
||||
{
|
||||
hash_search(SharedConnStatsHash, &connectionKey, HASH_REMOVE, NULL);
|
||||
|
||||
hash_seq_term(&status);
|
||||
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
UnLockConnectionSharedMemory();
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* GetMaxSharedPoolSize is a wrapper around MaxSharedPoolSize which is controlled
|
||||
* via a GUC.
|
||||
|
@ -478,6 +411,18 @@ DecrementSharedConnectionCounter(const char *hostname, int port)
|
|||
|
||||
connectionEntry->connectionCount -= 1;
|
||||
|
||||
if (connectionEntry->connectionCount == 0)
|
||||
{
|
||||
/*
|
||||
* We don't have to remove at this point as the node might be still active
|
||||
* and will have new connections open to it. Still, this seems like a convenient
|
||||
* place to remove the entry, as connectionCount == 0 implies that the server is
|
||||
* not busy, and given the default value of MaxCachedConnectionsPerWorker = 1,
|
||||
* we're unlikely to trigger this often.
|
||||
*/
|
||||
hash_search(SharedConnStatsHash, &connKey, HASH_REMOVE, &entryFound);
|
||||
}
|
||||
|
||||
UnLockConnectionSharedMemory();
|
||||
|
||||
WakeupWaiterBackendsForSharedConnection();
|
||||
|
|
|
@ -295,9 +295,6 @@ master_disable_node(PG_FUNCTION_ARGS)
|
|||
bool onlyConsiderActivePlacements = false;
|
||||
MemoryContext savedContext = CurrentMemoryContext;
|
||||
|
||||
/* remove the shared connection counters to have some space */
|
||||
RemoveInactiveNodesFromSharedConnections();
|
||||
|
||||
PG_TRY();
|
||||
{
|
||||
if (NodeIsPrimary(workerNode))
|
||||
|
@ -634,9 +631,6 @@ ActivateNode(char *nodeName, int nodePort)
|
|||
{
|
||||
bool isActive = true;
|
||||
|
||||
/* remove the shared connection counters to have some space */
|
||||
RemoveInactiveNodesFromSharedConnections();
|
||||
|
||||
/* take an exclusive lock on pg_dist_node to serialize pg_dist_node changes */
|
||||
LockRelationOid(DistNodeRelationId(), ExclusiveLock);
|
||||
|
||||
|
@ -678,9 +672,6 @@ master_update_node(PG_FUNCTION_ARGS)
|
|||
|
||||
CheckCitusVersion(ERROR);
|
||||
|
||||
/* remove the shared connection counters to have some space */
|
||||
RemoveInactiveNodesFromSharedConnections();
|
||||
|
||||
WorkerNode *workerNodeWithSameAddress = FindWorkerNodeAnyCluster(newNodeNameString,
|
||||
newNodePort);
|
||||
if (workerNodeWithSameAddress != NULL)
|
||||
|
@ -1049,9 +1040,6 @@ ReadDistNode(bool includeNodesFromOtherClusters)
|
|||
static void
|
||||
RemoveNodeFromCluster(char *nodeName, int32 nodePort)
|
||||
{
|
||||
/* remove the shared connection counters to have some space */
|
||||
RemoveInactiveNodesFromSharedConnections();
|
||||
|
||||
WorkerNode *workerNode = ModifiableWorkerNode(nodeName, nodePort);
|
||||
if (NodeIsPrimary(workerNode))
|
||||
{
|
||||
|
|
|
@ -17,7 +17,6 @@ extern int MaxSharedPoolSize;
|
|||
extern void InitializeSharedConnectionStats(void);
|
||||
extern void WaitForSharedConnection(void);
|
||||
extern void WakeupWaiterBackendsForSharedConnection(void);
|
||||
extern void RemoveInactiveNodesFromSharedConnections(void);
|
||||
extern int GetMaxSharedPoolSize(void);
|
||||
extern bool TryToIncrementSharedConnectionCounter(const char *hostname, int port);
|
||||
extern void WaitLoopForSharedConnection(const char *hostname, int port);
|
||||
|
|
|
@ -129,9 +129,7 @@ WHERE
|
|||
ORDER BY 1;
|
||||
no_connection_to_node
|
||||
---------------------------------------------------------------------
|
||||
t
|
||||
t
|
||||
(2 rows)
|
||||
(0 rows)
|
||||
|
||||
-- now, ensure this from the workers perspective
|
||||
-- we should only see the connection/backend that is running the command below
|
||||
|
|
|
@ -60,9 +60,7 @@ ORDER BY
|
|||
hostname, port;
|
||||
connection_count_to_node
|
||||
---------------------------------------------------------------------
|
||||
0
|
||||
0
|
||||
(2 rows)
|
||||
(0 rows)
|
||||
|
||||
-- single shard queries require single connection per node
|
||||
BEGIN;
|
||||
|
@ -106,9 +104,7 @@ ORDER BY
|
|||
hostname, port;
|
||||
connection_count_to_node
|
||||
---------------------------------------------------------------------
|
||||
0
|
||||
0
|
||||
(2 rows)
|
||||
(0 rows)
|
||||
|
||||
-- executor is only allowed to establish a single connection per node
|
||||
BEGIN;
|
||||
|
@ -147,9 +143,7 @@ ORDER BY
|
|||
hostname, port;
|
||||
connection_count_to_node
|
||||
---------------------------------------------------------------------
|
||||
0
|
||||
0
|
||||
(2 rows)
|
||||
(0 rows)
|
||||
|
||||
-- sequential mode is allowed to establish a single connection per node
|
||||
BEGIN;
|
||||
|
@ -188,9 +182,7 @@ ORDER BY
|
|||
hostname, port;
|
||||
connection_count_to_node
|
||||
---------------------------------------------------------------------
|
||||
0
|
||||
0
|
||||
(2 rows)
|
||||
(0 rows)
|
||||
|
||||
-- now, decrease the shared pool size, and still force
|
||||
-- one connection per placement
|
||||
|
|
Loading…
Reference in New Issue