From 9311b4b95397ff809fccec3e73e2df77241b8a78 Mon Sep 17 00:00:00 2001 From: Onder Kalaci Date: Tue, 7 Apr 2020 13:52:24 +0200 Subject: [PATCH] WIP handle node operations --- .../distributed/commands/utility_hook.c | 3 +- .../connection/connection_management.c | 2 +- .../connection/shared_connection_stats.c | 30 +++++++++---------- .../distributed/metadata/node_metadata.c | 12 -------- .../distributed/shared_connection_stats.h | 1 + 5 files changed, 19 insertions(+), 29 deletions(-) diff --git a/src/backend/distributed/commands/utility_hook.c b/src/backend/distributed/commands/utility_hook.c index 2dfd4386f..a054b7a6b 100644 --- a/src/backend/distributed/commands/utility_hook.c +++ b/src/backend/distributed/commands/utility_hook.c @@ -63,7 +63,7 @@ #include "utils/builtins.h" #include "utils/lsyscache.h" #include "utils/syscache.h" - +#include "distributed/shared_connection_stats.h" bool EnableDDLPropagation = true; /* ddl propagation is enabled */ PropSetCmdBehavior PropagateSetCommands = PROPSETCMD_NONE; /* SET prop off */ @@ -454,6 +454,7 @@ multi_ProcessUtility(PlannedStmt *pstmt, if (IsDropCitusStmt(parsetree)) { StopMaintenanceDaemon(MyDatabaseId); + RemoveAllSharedConnectionEntries(); } pstmt->utilityStmt = parsetree; diff --git a/src/backend/distributed/connection/connection_management.c b/src/backend/distributed/connection/connection_management.c index a20222a32..80212b461 100644 --- a/src/backend/distributed/connection/connection_management.c +++ b/src/backend/distributed/connection/connection_management.c @@ -372,7 +372,7 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port, } else { - CitusPQFinish(connection); + CitusPQFinish((MultiConnection *) connection); } PG_RE_THROW(); diff --git a/src/backend/distributed/connection/shared_connection_stats.c b/src/backend/distributed/connection/shared_connection_stats.c index d39ddd4ca..d44946b82 100644 --- a/src/backend/distributed/connection/shared_connection_stats.c +++ b/src/backend/distributed/connection/shared_connection_stats.c @@ -23,6 +23,7 @@ #include "commands/dbcommands.h" #include "distributed/cancel_utils.h" #include "distributed/connection_management.h" +#include "distributed/hash_helpers.h" #include "distributed/listutils.h" #include "distributed/metadata_cache.h" #include "distributed/shared_connection_stats.h" @@ -176,6 +177,20 @@ StoreAllConnections(Tuplestorestate *tupleStore, TupleDesc tupleDescriptor) } +/* + * RemoveAllSharedConnectionEntries removes all the entries in SharedConnStatsHash. + */ +void +RemoveAllSharedConnectionEntries(void) +{ + /* we're reading all shared connections, prevent any changes */ + LockConnectionSharedMemory(LW_EXCLUSIVE); + + hash_delete_all(SharedConnStatsHash); + + UnLockConnectionSharedMemory(); +} + /* * RemoveInactiveNodesFromSharedConnections goes over the SharedConnStatsHash @@ -184,7 +199,6 @@ StoreAllConnections(Tuplestorestate *tupleStore, TupleDesc tupleDescriptor) void RemoveInactiveNodesFromSharedConnections(void) { - /* we're reading all shared connections, prevent any changes */ LockConnectionSharedMemory(LW_EXCLUSIVE); @@ -200,8 +214,6 @@ RemoveInactiveNodesFromSharedConnections(void) if (workerNode == NULL || !workerNode->isActive) { hash_search(SharedConnStatsHash, &connectionKey, HASH_REMOVE, NULL); - if (connectionEntry->connectionCount < 0) - elog(INFO, "removing %s %d count: %d", connectionKey.hostname, connectionKey.port, connectionEntry->connectionCount); } } @@ -382,10 +394,6 @@ IncrementSharedConnectionCounter(const char *hostname, int port) connectionEntry->connectionCount = 0; } - /* we should never have a negative entry */ - if (connectionEntry->connectionCount < 0 ) - elog(WARNING,"IncrementSharedConnectionCounter: connectionEntry->connectionCount %s %d: %d",hostname, port, connectionEntry->connectionCount); - connectionEntry->connectionCount += 1; UnLockConnectionSharedMemory(); @@ -432,17 +440,9 @@ DecrementSharedConnectionCounter(const char *hostname, int port) /* wake up any waiters in case any backend is waiting for this node */ WakeupWaiterBackendsForSharedConnection(); - /* make sure we don't have any lingering session lifespan connections */ - CloseNodeConnectionsAfterTransaction(hostname, port); - return; } - /* we should never decrement a counter that has not been incremented */ - if (connectionEntry->connectionCount < 1) - { - elog(WARNING, "DecrementSharedConnectionCounter %s %d connectionEntry->connectionCount:%d",hostname, port, connectionEntry->connectionCount); - } connectionEntry->connectionCount -= 1; UnLockConnectionSharedMemory(); diff --git a/src/backend/distributed/metadata/node_metadata.c b/src/backend/distributed/metadata/node_metadata.c index 1e33b3cae..4903edf5b 100644 --- a/src/backend/distributed/metadata/node_metadata.c +++ b/src/backend/distributed/metadata/node_metadata.c @@ -284,9 +284,6 @@ master_disable_node(PG_FUNCTION_ARGS) /* remove the shared connection counters to have some space */ RemoveInactiveNodesFromSharedConnections(); - /* make sure we don't have any lingering session lifespan connections */ - //CloseNodeConnectionsAfterTransaction(nodeName, nodePort); - PG_TRY(); { if (NodeIsPrimary(workerNode)) @@ -706,12 +703,6 @@ master_update_node(PG_FUNCTION_ARGS) LockShardsInPlacementListMetadata(placementList, AccessExclusiveLock); } - char *oldWorkerName = pstrdup(workerNode->workerName); - int oldWorkerPort = workerNode->workerPort; - - /* make sure we don't have any lingering session lifespan connections */ - //CloseNodeConnectionsAfterTransaction(oldWorkerName, oldWorkerPort); - UpdateNodeLocation(nodeId, newNodeNameString, newNodePort); /* we should be able to find the new node from the metadata */ @@ -1036,9 +1027,6 @@ RemoveNodeFromCluster(char *nodeName, int32 nodePort) char *nodeDeleteCommand = NodeDeleteCommand(workerNode->nodeId); - /* make sure we don't have any lingering session lifespan connections */ - //CloseNodeConnectionsAfterTransaction(workerNode->workerName, nodePort); - SendCommandToWorkersWithMetadata(nodeDeleteCommand); } diff --git a/src/include/distributed/shared_connection_stats.h b/src/include/distributed/shared_connection_stats.h index 8ff2c64f4..dc9487b34 100644 --- a/src/include/distributed/shared_connection_stats.h +++ b/src/include/distributed/shared_connection_stats.h @@ -19,6 +19,7 @@ extern void InitializeSharedConnectionStats(void); extern void WaitForSharedConnection(void); extern void WakeupWaiterBackendsForSharedConnection(void); extern void RemoveInactiveNodesFromSharedConnections(void); +extern void RemoveAllSharedConnectionEntries(void); extern int GetMaxSharedPoolSize(void); extern bool TryToIncrementSharedConnectionCounter(const char *hostname, int port); extern void WaitOrErrorForSharedConnection(const char *hostname, int port);