mirror of https://github.com/citusdata/citus.git
Effectively do not remove any entries
parent
ffd7dcc88f
commit
310a0cd442
|
@ -53,7 +53,6 @@
|
||||||
#include "distributed/multi_explain.h"
|
#include "distributed/multi_explain.h"
|
||||||
#include "distributed/multi_physical_planner.h"
|
#include "distributed/multi_physical_planner.h"
|
||||||
#include "distributed/resource_lock.h"
|
#include "distributed/resource_lock.h"
|
||||||
#include "distributed/shared_connection_stats.h"
|
|
||||||
#include "distributed/transmit.h"
|
#include "distributed/transmit.h"
|
||||||
#include "distributed/version_compat.h"
|
#include "distributed/version_compat.h"
|
||||||
#include "distributed/worker_transaction.h"
|
#include "distributed/worker_transaction.h"
|
||||||
|
@ -454,7 +453,6 @@ multi_ProcessUtility(PlannedStmt *pstmt,
|
||||||
if (IsDropCitusStmt(parsetree))
|
if (IsDropCitusStmt(parsetree))
|
||||||
{
|
{
|
||||||
StopMaintenanceDaemon(MyDatabaseId);
|
StopMaintenanceDaemon(MyDatabaseId);
|
||||||
RemoveAllSharedConnectionEntries();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pstmt->utilityStmt = parsetree;
|
pstmt->utilityStmt = parsetree;
|
||||||
|
|
|
@ -177,21 +177,6 @@ StoreAllConnections(Tuplestorestate *tupleStore, TupleDesc tupleDescriptor)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* RemoveAllSharedConnectionEntries removes all the entries in SharedConnStatsHash.
|
|
||||||
*/
|
|
||||||
void
|
|
||||||
RemoveAllSharedConnectionEntries(void)
|
|
||||||
{
|
|
||||||
/* we're reading all shared connections, prevent any changes */
|
|
||||||
LockConnectionSharedMemory(LW_EXCLUSIVE);
|
|
||||||
|
|
||||||
hash_delete_all(SharedConnStatsHash);
|
|
||||||
|
|
||||||
UnLockConnectionSharedMemory();
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* RemoveInactiveNodesFromSharedConnections goes over the SharedConnStatsHash
|
* RemoveInactiveNodesFromSharedConnections goes over the SharedConnStatsHash
|
||||||
* and removes the inactive entries.
|
* and removes the inactive entries.
|
||||||
|
@ -205,12 +190,20 @@ RemoveInactiveNodesFromSharedConnections(void)
|
||||||
HASH_SEQ_STATUS status;
|
HASH_SEQ_STATUS status;
|
||||||
SharedConnStatsHashEntry *connectionEntry = NULL;
|
SharedConnStatsHashEntry *connectionEntry = NULL;
|
||||||
|
|
||||||
|
int entryCount = hash_get_num_entries(SharedConnStatsHash);
|
||||||
|
if (entryCount + 1 < MaxWorkerNodesTracked)
|
||||||
|
{
|
||||||
|
UnLockConnectionSharedMemory();
|
||||||
|
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
hash_seq_init(&status, SharedConnStatsHash);
|
hash_seq_init(&status, SharedConnStatsHash);
|
||||||
while ((connectionEntry = (SharedConnStatsHashEntry *) hash_seq_search(&status)) != 0)
|
while ((connectionEntry = (SharedConnStatsHashEntry *) hash_seq_search(&status)) != 0)
|
||||||
{
|
{
|
||||||
SharedConnStatsHashKey connectionKey = connectionEntry->key;
|
SharedConnStatsHashKey connectionKey = connectionEntry->key;
|
||||||
WorkerNode *workerNode = FindWorkerNode(connectionKey.hostname,
|
WorkerNode *workerNode =
|
||||||
connectionKey.port);
|
FindWorkerNode(connectionKey.hostname, connectionKey.port);
|
||||||
|
|
||||||
if (workerNode == NULL || !workerNode->isActive)
|
if (workerNode == NULL || !workerNode->isActive)
|
||||||
{
|
{
|
||||||
|
|
|
@ -19,7 +19,6 @@ extern void InitializeSharedConnectionStats(void);
|
||||||
extern void WaitForSharedConnection(void);
|
extern void WaitForSharedConnection(void);
|
||||||
extern void WakeupWaiterBackendsForSharedConnection(void);
|
extern void WakeupWaiterBackendsForSharedConnection(void);
|
||||||
extern void RemoveInactiveNodesFromSharedConnections(void);
|
extern void RemoveInactiveNodesFromSharedConnections(void);
|
||||||
extern void RemoveAllSharedConnectionEntries(void);
|
|
||||||
extern int GetMaxSharedPoolSize(void);
|
extern int GetMaxSharedPoolSize(void);
|
||||||
extern bool TryToIncrementSharedConnectionCounter(const char *hostname, int port);
|
extern bool TryToIncrementSharedConnectionCounter(const char *hostname, int port);
|
||||||
extern void WaitOrErrorForSharedConnection(const char *hostname, int port);
|
extern void WaitOrErrorForSharedConnection(const char *hostname, int port);
|
||||||
|
|
Loading…
Reference in New Issue