WIP handle node operations

preventConflictingFlags
Onder Kalaci 2020-04-07 13:52:24 +02:00
parent b6b47370dc
commit 9311b4b953
5 changed files with 19 additions and 29 deletions

View File

@ -63,7 +63,7 @@
#include "utils/builtins.h" #include "utils/builtins.h"
#include "utils/lsyscache.h" #include "utils/lsyscache.h"
#include "utils/syscache.h" #include "utils/syscache.h"
#include "distributed/shared_connection_stats.h"
bool EnableDDLPropagation = true; /* ddl propagation is enabled */ bool EnableDDLPropagation = true; /* ddl propagation is enabled */
PropSetCmdBehavior PropagateSetCommands = PROPSETCMD_NONE; /* SET prop off */ PropSetCmdBehavior PropagateSetCommands = PROPSETCMD_NONE; /* SET prop off */
@ -454,6 +454,7 @@ multi_ProcessUtility(PlannedStmt *pstmt,
if (IsDropCitusStmt(parsetree)) if (IsDropCitusStmt(parsetree))
{ {
StopMaintenanceDaemon(MyDatabaseId); StopMaintenanceDaemon(MyDatabaseId);
RemoveAllSharedConnectionEntries();
} }
pstmt->utilityStmt = parsetree; pstmt->utilityStmt = parsetree;

View File

@ -372,7 +372,7 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port,
} }
else else
{ {
CitusPQFinish(connection); CitusPQFinish((MultiConnection *) connection);
} }
PG_RE_THROW(); PG_RE_THROW();

View File

@ -23,6 +23,7 @@
#include "commands/dbcommands.h" #include "commands/dbcommands.h"
#include "distributed/cancel_utils.h" #include "distributed/cancel_utils.h"
#include "distributed/connection_management.h" #include "distributed/connection_management.h"
#include "distributed/hash_helpers.h"
#include "distributed/listutils.h" #include "distributed/listutils.h"
#include "distributed/metadata_cache.h" #include "distributed/metadata_cache.h"
#include "distributed/shared_connection_stats.h" #include "distributed/shared_connection_stats.h"
@ -176,6 +177,20 @@ StoreAllConnections(Tuplestorestate *tupleStore, TupleDesc tupleDescriptor)
} }
/*
* RemoveAllSharedConnectionEntries removes all the entries in SharedConnStatsHash.
*/
void
RemoveAllSharedConnectionEntries(void)
{
/* we're reading all shared connections, prevent any changes */
LockConnectionSharedMemory(LW_EXCLUSIVE);
hash_delete_all(SharedConnStatsHash);
UnLockConnectionSharedMemory();
}
/* /*
* RemoveInactiveNodesFromSharedConnections goes over the SharedConnStatsHash * RemoveInactiveNodesFromSharedConnections goes over the SharedConnStatsHash
@ -184,7 +199,6 @@ StoreAllConnections(Tuplestorestate *tupleStore, TupleDesc tupleDescriptor)
void void
RemoveInactiveNodesFromSharedConnections(void) RemoveInactiveNodesFromSharedConnections(void)
{ {
/* we're reading all shared connections, prevent any changes */ /* we're reading all shared connections, prevent any changes */
LockConnectionSharedMemory(LW_EXCLUSIVE); LockConnectionSharedMemory(LW_EXCLUSIVE);
@ -200,8 +214,6 @@ RemoveInactiveNodesFromSharedConnections(void)
if (workerNode == NULL || !workerNode->isActive) if (workerNode == NULL || !workerNode->isActive)
{ {
hash_search(SharedConnStatsHash, &connectionKey, HASH_REMOVE, NULL); hash_search(SharedConnStatsHash, &connectionKey, HASH_REMOVE, NULL);
if (connectionEntry->connectionCount < 0)
elog(INFO, "removing %s %d count: %d", connectionKey.hostname, connectionKey.port, connectionEntry->connectionCount);
} }
} }
@ -382,10 +394,6 @@ IncrementSharedConnectionCounter(const char *hostname, int port)
connectionEntry->connectionCount = 0; connectionEntry->connectionCount = 0;
} }
/* we should never have a negative entry */
if (connectionEntry->connectionCount < 0 )
elog(WARNING,"IncrementSharedConnectionCounter: connectionEntry->connectionCount %s %d: %d",hostname, port, connectionEntry->connectionCount);
connectionEntry->connectionCount += 1; connectionEntry->connectionCount += 1;
UnLockConnectionSharedMemory(); UnLockConnectionSharedMemory();
@ -432,17 +440,9 @@ DecrementSharedConnectionCounter(const char *hostname, int port)
/* wake up any waiters in case any backend is waiting for this node */ /* wake up any waiters in case any backend is waiting for this node */
WakeupWaiterBackendsForSharedConnection(); WakeupWaiterBackendsForSharedConnection();
/* make sure we don't have any lingering session lifespan connections */
CloseNodeConnectionsAfterTransaction(hostname, port);
return; return;
} }
/* we should never decrement a counter that has not been incremented */
if (connectionEntry->connectionCount < 1)
{
elog(WARNING, "DecrementSharedConnectionCounter %s %d connectionEntry->connectionCount:%d",hostname, port, connectionEntry->connectionCount);
}
connectionEntry->connectionCount -= 1; connectionEntry->connectionCount -= 1;
UnLockConnectionSharedMemory(); UnLockConnectionSharedMemory();

View File

@ -284,9 +284,6 @@ master_disable_node(PG_FUNCTION_ARGS)
/* remove the shared connection counters to have some space */ /* remove the shared connection counters to have some space */
RemoveInactiveNodesFromSharedConnections(); RemoveInactiveNodesFromSharedConnections();
/* make sure we don't have any lingering session lifespan connections */
//CloseNodeConnectionsAfterTransaction(nodeName, nodePort);
PG_TRY(); PG_TRY();
{ {
if (NodeIsPrimary(workerNode)) if (NodeIsPrimary(workerNode))
@ -706,12 +703,6 @@ master_update_node(PG_FUNCTION_ARGS)
LockShardsInPlacementListMetadata(placementList, AccessExclusiveLock); LockShardsInPlacementListMetadata(placementList, AccessExclusiveLock);
} }
char *oldWorkerName = pstrdup(workerNode->workerName);
int oldWorkerPort = workerNode->workerPort;
/* make sure we don't have any lingering session lifespan connections */
//CloseNodeConnectionsAfterTransaction(oldWorkerName, oldWorkerPort);
UpdateNodeLocation(nodeId, newNodeNameString, newNodePort); UpdateNodeLocation(nodeId, newNodeNameString, newNodePort);
/* we should be able to find the new node from the metadata */ /* we should be able to find the new node from the metadata */
@ -1036,9 +1027,6 @@ RemoveNodeFromCluster(char *nodeName, int32 nodePort)
char *nodeDeleteCommand = NodeDeleteCommand(workerNode->nodeId); char *nodeDeleteCommand = NodeDeleteCommand(workerNode->nodeId);
/* make sure we don't have any lingering session lifespan connections */
//CloseNodeConnectionsAfterTransaction(workerNode->workerName, nodePort);
SendCommandToWorkersWithMetadata(nodeDeleteCommand); SendCommandToWorkersWithMetadata(nodeDeleteCommand);
} }

View File

@ -19,6 +19,7 @@ extern void InitializeSharedConnectionStats(void);
extern void WaitForSharedConnection(void); extern void WaitForSharedConnection(void);
extern void WakeupWaiterBackendsForSharedConnection(void); extern void WakeupWaiterBackendsForSharedConnection(void);
extern void RemoveInactiveNodesFromSharedConnections(void); extern void RemoveInactiveNodesFromSharedConnections(void);
extern void RemoveAllSharedConnectionEntries(void);
extern int GetMaxSharedPoolSize(void); extern int GetMaxSharedPoolSize(void);
extern bool TryToIncrementSharedConnectionCounter(const char *hostname, int port); extern bool TryToIncrementSharedConnectionCounter(const char *hostname, int port);
extern void WaitOrErrorForSharedConnection(const char *hostname, int port); extern void WaitOrErrorForSharedConnection(const char *hostname, int port);