From b6b47370dc424d078b61d3abc664911eba3eac3e Mon Sep 17 00:00:00 2001 From: Onder Kalaci Date: Tue, 7 Apr 2020 10:20:08 +0200 Subject: [PATCH] WIP on update/delete --- .../connection/connection_management.c | 29 +++++-- .../connection/shared_connection_stats.c | 84 +++++++++++++------ .../distributed/metadata/node_metadata.c | 26 ++++-- .../transaction/transaction_management.c | 1 + .../distributed/shared_connection_stats.h | 2 +- 5 files changed, 97 insertions(+), 45 deletions(-) diff --git a/src/backend/distributed/connection/connection_management.c b/src/backend/distributed/connection/connection_management.c index af2c6ba9c..a20222a32 100644 --- a/src/backend/distributed/connection/connection_management.c +++ b/src/backend/distributed/connection/connection_management.c @@ -259,7 +259,6 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port, const char *user, const char *database) { ConnectionHashKey key; - MultiConnection *connection; bool found; /* do some minimal input checks */ @@ -312,7 +311,7 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port, if (!(flags & FORCE_NEW_CONNECTION)) { /* check connection cache for a connection that's not already in use */ - connection = FindAvailableConnection(entry->connections, flags); + MultiConnection *connection = FindAvailableConnection(entry->connections, flags); if (connection) { return connection; @@ -355,6 +354,7 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port, * Either no caching desired, or no pre-established, non-claimed, * connection present. Initiate connection establishment. */ + volatile MultiConnection *connection = NULL; PG_TRY(); { connection = StartConnectionEstablishment(&key); @@ -366,17 +366,28 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port, * here otherwise we'd leak the increment we have done for this * connection attempt. */ - DecrementSharedConnectionCounter(hostname, port); + if (connection->pgConn == NULL) + { + DecrementSharedConnectionCounter(hostname, port); + } + else + { + CitusPQFinish(connection); + } PG_RE_THROW(); } PG_END_TRY(); - dlist_push_tail(entry->connections, &connection->connectionNode); - ResetShardPlacementAssociation(connection); + MultiConnection *newConnection = (MultiConnection *) connection; - return connection; + dlist_push_tail(entry->connections, &newConnection->connectionNode); + + ResetShardPlacementAssociation(newConnection); + GivePurposeToConnection(newConnection, flags); + + return newConnection; } @@ -963,10 +974,10 @@ CitusPQFinish(MultiConnection *connection) if (connection->pgConn != NULL) { DecrementSharedConnectionCounter(connection->hostname, connection->port); - } - PQfinish(connection->pgConn); - connection->pgConn = NULL; + PQfinish(connection->pgConn); + connection->pgConn = NULL; + } } diff --git a/src/backend/distributed/connection/shared_connection_stats.c b/src/backend/distributed/connection/shared_connection_stats.c index b2bf4ef7e..d39ddd4ca 100644 --- a/src/backend/distributed/connection/shared_connection_stats.c +++ b/src/backend/distributed/connection/shared_connection_stats.c @@ -23,6 +23,7 @@ #include "commands/dbcommands.h" #include "distributed/cancel_utils.h" #include "distributed/connection_management.h" +#include "distributed/listutils.h" #include "distributed/metadata_cache.h" #include "distributed/shared_connection_stats.h" #include "distributed/time_constants.h" @@ -87,6 +88,7 @@ typedef struct SharedConnStatsHashEntry */ int MaxSharedPoolSize = 0; + /* the following two structs are used for accessing shared memory */ static HTAB *SharedConnStatsHash = NULL; static ConnectionStatsSharedData *ConnectionStatsSharedState = NULL; @@ -174,24 +176,34 @@ StoreAllConnections(Tuplestorestate *tupleStore, TupleDesc tupleDescriptor) } + /* - * RemoveAllSharedConnectionEntriesForNode gets a nodename and nodeport, and removes - * the corresponding entry in the shared memory for the current database. + * RemoveInactiveNodesFromSharedConnections goes over the SharedConnStatsHash + * and removes the inactive entries. */ void -RemoveAllSharedConnectionEntriesForNode(char *hostname, int port) +RemoveInactiveNodesFromSharedConnections(void) { - SharedConnStatsHashKey connKey; - connKey.databaseOid = MyDatabaseId; - connKey.port = port; - strlcpy(connKey.hostname, hostname, MAX_NODE_LENGTH); - - /* we're modifying the hashmap, prevent any concurrent access */ + /* we're reading all shared connections, prevent any changes */ LockConnectionSharedMemory(LW_EXCLUSIVE); - bool entryFound = false; - hash_search(SharedConnStatsHash, &connKey, HASH_REMOVE, &entryFound); + HASH_SEQ_STATUS status; + SharedConnStatsHashEntry *connectionEntry = NULL; + + hash_seq_init(&status, SharedConnStatsHash); + while ((connectionEntry = (SharedConnStatsHashEntry *) hash_seq_search(&status)) != 0) + { + SharedConnStatsHashKey connectionKey = connectionEntry->key; + WorkerNode *workerNode = FindWorkerNode(connectionKey.hostname, connectionKey.port); + + if (workerNode == NULL || !workerNode->isActive) + { + hash_search(SharedConnStatsHash, &connectionKey, HASH_REMOVE, NULL); + if (connectionEntry->connectionCount < 0) + elog(INFO, "removing %s %d count: %d", connectionKey.hostname, connectionKey.port, connectionEntry->connectionCount); + } + } UnLockConnectionSharedMemory(); } @@ -272,8 +284,9 @@ TryToIncrementSharedConnectionCounter(const char *hostname, int port) /* * As the hash map is allocated in shared memory, it doesn't rely on palloc for - * memory allocation, so we could get NULL via HASH_ENTER_NULL. That's why we prefer - * continuing the execution instead of throwing an error. + * memory allocation, so we could get NULL via HASH_ENTER_NULL when there is no + * space in the shared memory. That's why we prefer continuing the execution + * instead of throwing an error. */ bool entryFound = false; SharedConnStatsHashEntry *connectionEntry = @@ -287,7 +300,6 @@ TryToIncrementSharedConnectionCounter(const char *hostname, int port) if (!connectionEntry) { UnLockConnectionSharedMemory(); - return true; } @@ -343,18 +355,37 @@ IncrementSharedConnectionCounter(const char *hostname, int port) LockConnectionSharedMemory(LW_EXCLUSIVE); + /* + * As the hash map is allocated in shared memory, it doesn't rely on palloc for + * memory allocation, so we could get NULL via HASH_ENTER_NULL. That's why we prefer + * continuing the execution instead of throwing an error. + */ bool entryFound = false; SharedConnStatsHashEntry *connectionEntry = - hash_search(SharedConnStatsHash, &connKey, HASH_FIND, &entryFound); + hash_search(SharedConnStatsHash, &connKey, HASH_ENTER_NULL, &entryFound); - /* this worker node is removed or updated */ - if (!entryFound) + /* + * It is possible to throw an error at this point, but that doesn't help us in anyway. + * Instead, we try our best, let the connection establishment continue by-passing the + * connection throttling. + */ + if (!connectionEntry) { UnLockConnectionSharedMemory(); return; } + if (!entryFound) + { + /* we successfully allocated the entry for the first time, so initialize it */ + connectionEntry->connectionCount = 0; + } + + /* we should never have a negative entry */ + if (connectionEntry->connectionCount < 0 ) + elog(WARNING,"IncrementSharedConnectionCounter: connectionEntry->connectionCount %s %d: %d",hostname, port, connectionEntry->connectionCount); + connectionEntry->connectionCount += 1; UnLockConnectionSharedMemory(); @@ -393,24 +424,25 @@ DecrementSharedConnectionCounter(const char *hostname, int port) SharedConnStatsHashEntry *connectionEntry = hash_search(SharedConnStatsHash, &connKey, HASH_FIND, &entryFound); - /* this worker node is removed or updated */ + /* this worker node is removed or updated, no need to care */ if (!entryFound) { UnLockConnectionSharedMemory(); + /* wake up any waiters in case any backend is waiting for this node */ + WakeupWaiterBackendsForSharedConnection(); + + /* make sure we don't have any lingering session lifespan connections */ + CloseNodeConnectionsAfterTransaction(hostname, port); + return; } - /* - * We might have some connection lingering for worker nodes - * that are removed/updated. - */ - if (connectionEntry->connectionCount <= 0) + /* we should never decrement a counter that has not been incremented */ + if (connectionEntry->connectionCount < 1) { - UnLockConnectionSharedMemory(); - return; + elog(WARNING, "DecrementSharedConnectionCounter %s %d connectionEntry->connectionCount:%d",hostname, port, connectionEntry->connectionCount); } - connectionEntry->connectionCount -= 1; UnLockConnectionSharedMemory(); diff --git a/src/backend/distributed/metadata/node_metadata.c b/src/backend/distributed/metadata/node_metadata.c index ec0f8e2c2..1e33b3cae 100644 --- a/src/backend/distributed/metadata/node_metadata.c +++ b/src/backend/distributed/metadata/node_metadata.c @@ -281,6 +281,12 @@ master_disable_node(PG_FUNCTION_ARGS) bool onlyConsiderActivePlacements = false; MemoryContext savedContext = CurrentMemoryContext; + /* remove the shared connection counters to have some space */ + RemoveInactiveNodesFromSharedConnections(); + + /* make sure we don't have any lingering session lifespan connections */ + //CloseNodeConnectionsAfterTransaction(nodeName, nodePort); + PG_TRY(); { if (NodeIsPrimary(workerNode)) @@ -584,6 +590,9 @@ ActivateNode(char *nodeName, int nodePort) { bool isActive = true; + /* remove the shared connection counters to have some space */ + RemoveInactiveNodesFromSharedConnections(); + /* take an exclusive lock on pg_dist_node to serialize pg_dist_node changes */ LockRelationOid(DistNodeRelationId(), ExclusiveLock); @@ -626,6 +635,9 @@ master_update_node(PG_FUNCTION_ARGS) CheckCitusVersion(ERROR); + /* remove the shared connection counters to have some space */ + RemoveInactiveNodesFromSharedConnections(); + WorkerNode *workerNodeWithSameAddress = FindWorkerNodeAnyCluster(newNodeNameString, newNodePort); if (workerNodeWithSameAddress != NULL) @@ -698,13 +710,7 @@ master_update_node(PG_FUNCTION_ARGS) int oldWorkerPort = workerNode->workerPort; /* make sure we don't have any lingering session lifespan connections */ - CloseNodeConnectionsAfterTransaction(oldWorkerName, oldWorkerPort); - - /* - * We're not sure that there is no concurrent queries going on, so remove all - * entries in shared connection stats for the current database. - */ - RemoveAllSharedConnectionEntriesForNode(oldWorkerName, oldWorkerPort); + //CloseNodeConnectionsAfterTransaction(oldWorkerName, oldWorkerPort); UpdateNodeLocation(nodeId, newNodeNameString, newNodePort); @@ -1004,8 +1010,10 @@ ReadDistNode(bool includeNodesFromOtherClusters) static void RemoveNodeFromCluster(char *nodeName, int32 nodePort) { - WorkerNode *workerNode = ModifiableWorkerNode(nodeName, nodePort); + /* remove the shared connection counters to have some space */ + RemoveInactiveNodesFromSharedConnections(); + WorkerNode *workerNode = ModifiableWorkerNode(nodeName, nodePort); if (NodeIsPrimary(workerNode)) { bool onlyConsiderActivePlacements = false; @@ -1029,7 +1037,7 @@ RemoveNodeFromCluster(char *nodeName, int32 nodePort) char *nodeDeleteCommand = NodeDeleteCommand(workerNode->nodeId); /* make sure we don't have any lingering session lifespan connections */ - CloseNodeConnectionsAfterTransaction(workerNode->workerName, nodePort); + //CloseNodeConnectionsAfterTransaction(workerNode->workerName, nodePort); SendCommandToWorkersWithMetadata(nodeDeleteCommand); } diff --git a/src/backend/distributed/transaction/transaction_management.c b/src/backend/distributed/transaction/transaction_management.c index f28a43571..6c2403852 100644 --- a/src/backend/distributed/transaction/transaction_management.c +++ b/src/backend/distributed/transaction/transaction_management.c @@ -30,6 +30,7 @@ #include "distributed/multi_executor.h" #include "distributed/transaction_management.h" #include "distributed/placement_connection.h" +#include "distributed/shared_connection_stats.h" #include "distributed/subplan_execution.h" #include "distributed/version_compat.h" #include "utils/hsearch.h" diff --git a/src/include/distributed/shared_connection_stats.h b/src/include/distributed/shared_connection_stats.h index 717797053..8ff2c64f4 100644 --- a/src/include/distributed/shared_connection_stats.h +++ b/src/include/distributed/shared_connection_stats.h @@ -18,7 +18,7 @@ extern int ConnectionRetryTimout; extern void InitializeSharedConnectionStats(void); extern void WaitForSharedConnection(void); extern void WakeupWaiterBackendsForSharedConnection(void); -extern void RemoveAllSharedConnectionEntriesForNode(char *hostname, int port); +extern void RemoveInactiveNodesFromSharedConnections(void); extern int GetMaxSharedPoolSize(void); extern bool TryToIncrementSharedConnectionCounter(const char *hostname, int port); extern void WaitOrErrorForSharedConnection(const char *hostname, int port);