WIP on update/delete

preventConflictingFlags
Onder Kalaci 2020-04-07 10:20:08 +02:00
parent 7cad050b62
commit b6b47370dc
5 changed files with 97 additions and 45 deletions

View File

@ -259,7 +259,6 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port,
const char *user, const char *database) const char *user, const char *database)
{ {
ConnectionHashKey key; ConnectionHashKey key;
MultiConnection *connection;
bool found; bool found;
/* do some minimal input checks */ /* do some minimal input checks */
@ -312,7 +311,7 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port,
if (!(flags & FORCE_NEW_CONNECTION)) if (!(flags & FORCE_NEW_CONNECTION))
{ {
/* check connection cache for a connection that's not already in use */ /* check connection cache for a connection that's not already in use */
connection = FindAvailableConnection(entry->connections, flags); MultiConnection *connection = FindAvailableConnection(entry->connections, flags);
if (connection) if (connection)
{ {
return connection; return connection;
@ -355,6 +354,7 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port,
* Either no caching desired, or no pre-established, non-claimed, * Either no caching desired, or no pre-established, non-claimed,
* connection present. Initiate connection establishment. * connection present. Initiate connection establishment.
*/ */
volatile MultiConnection *connection = NULL;
PG_TRY(); PG_TRY();
{ {
connection = StartConnectionEstablishment(&key); connection = StartConnectionEstablishment(&key);
@ -366,17 +366,28 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port,
* here otherwise we'd leak the increment we have done for this * here otherwise we'd leak the increment we have done for this
* connection attempt. * connection attempt.
*/ */
DecrementSharedConnectionCounter(hostname, port); if (connection->pgConn == NULL)
{
DecrementSharedConnectionCounter(hostname, port);
}
else
{
CitusPQFinish(connection);
}
PG_RE_THROW(); PG_RE_THROW();
} }
PG_END_TRY(); PG_END_TRY();
dlist_push_tail(entry->connections, &connection->connectionNode);
ResetShardPlacementAssociation(connection); MultiConnection *newConnection = (MultiConnection *) connection;
return connection; dlist_push_tail(entry->connections, &newConnection->connectionNode);
ResetShardPlacementAssociation(newConnection);
GivePurposeToConnection(newConnection, flags);
return newConnection;
} }
@ -963,10 +974,10 @@ CitusPQFinish(MultiConnection *connection)
if (connection->pgConn != NULL) if (connection->pgConn != NULL)
{ {
DecrementSharedConnectionCounter(connection->hostname, connection->port); DecrementSharedConnectionCounter(connection->hostname, connection->port);
}
PQfinish(connection->pgConn); PQfinish(connection->pgConn);
connection->pgConn = NULL; connection->pgConn = NULL;
}
} }

View File

@ -23,6 +23,7 @@
#include "commands/dbcommands.h" #include "commands/dbcommands.h"
#include "distributed/cancel_utils.h" #include "distributed/cancel_utils.h"
#include "distributed/connection_management.h" #include "distributed/connection_management.h"
#include "distributed/listutils.h"
#include "distributed/metadata_cache.h" #include "distributed/metadata_cache.h"
#include "distributed/shared_connection_stats.h" #include "distributed/shared_connection_stats.h"
#include "distributed/time_constants.h" #include "distributed/time_constants.h"
@ -87,6 +88,7 @@ typedef struct SharedConnStatsHashEntry
*/ */
int MaxSharedPoolSize = 0; int MaxSharedPoolSize = 0;
/* the following two structs are used for accessing shared memory */ /* the following two structs are used for accessing shared memory */
static HTAB *SharedConnStatsHash = NULL; static HTAB *SharedConnStatsHash = NULL;
static ConnectionStatsSharedData *ConnectionStatsSharedState = NULL; static ConnectionStatsSharedData *ConnectionStatsSharedState = NULL;
@ -174,24 +176,34 @@ StoreAllConnections(Tuplestorestate *tupleStore, TupleDesc tupleDescriptor)
} }
/* /*
* RemoveAllSharedConnectionEntriesForNode gets a nodename and nodeport, and removes * RemoveInactiveNodesFromSharedConnections goes over the SharedConnStatsHash
* the corresponding entry in the shared memory for the current database. * and removes the inactive entries.
*/ */
void void
RemoveAllSharedConnectionEntriesForNode(char *hostname, int port) RemoveInactiveNodesFromSharedConnections(void)
{ {
SharedConnStatsHashKey connKey;
connKey.databaseOid = MyDatabaseId; /* we're reading all shared connections, prevent any changes */
connKey.port = port;
strlcpy(connKey.hostname, hostname, MAX_NODE_LENGTH);
/* we're modifying the hashmap, prevent any concurrent access */
LockConnectionSharedMemory(LW_EXCLUSIVE); LockConnectionSharedMemory(LW_EXCLUSIVE);
bool entryFound = false; HASH_SEQ_STATUS status;
hash_search(SharedConnStatsHash, &connKey, HASH_REMOVE, &entryFound); SharedConnStatsHashEntry *connectionEntry = NULL;
hash_seq_init(&status, SharedConnStatsHash);
while ((connectionEntry = (SharedConnStatsHashEntry *) hash_seq_search(&status)) != 0)
{
SharedConnStatsHashKey connectionKey = connectionEntry->key;
WorkerNode *workerNode = FindWorkerNode(connectionKey.hostname, connectionKey.port);
if (workerNode == NULL || !workerNode->isActive)
{
hash_search(SharedConnStatsHash, &connectionKey, HASH_REMOVE, NULL);
if (connectionEntry->connectionCount < 0)
elog(INFO, "removing %s %d count: %d", connectionKey.hostname, connectionKey.port, connectionEntry->connectionCount);
}
}
UnLockConnectionSharedMemory(); UnLockConnectionSharedMemory();
} }
@ -272,8 +284,9 @@ TryToIncrementSharedConnectionCounter(const char *hostname, int port)
/* /*
* As the hash map is allocated in shared memory, it doesn't rely on palloc for * As the hash map is allocated in shared memory, it doesn't rely on palloc for
* memory allocation, so we could get NULL via HASH_ENTER_NULL. That's why we prefer * memory allocation, so we could get NULL via HASH_ENTER_NULL when there is no
* continuing the execution instead of throwing an error. * space in the shared memory. That's why we prefer continuing the execution
* instead of throwing an error.
*/ */
bool entryFound = false; bool entryFound = false;
SharedConnStatsHashEntry *connectionEntry = SharedConnStatsHashEntry *connectionEntry =
@ -287,7 +300,6 @@ TryToIncrementSharedConnectionCounter(const char *hostname, int port)
if (!connectionEntry) if (!connectionEntry)
{ {
UnLockConnectionSharedMemory(); UnLockConnectionSharedMemory();
return true; return true;
} }
@ -343,18 +355,37 @@ IncrementSharedConnectionCounter(const char *hostname, int port)
LockConnectionSharedMemory(LW_EXCLUSIVE); LockConnectionSharedMemory(LW_EXCLUSIVE);
/*
* As the hash map is allocated in shared memory, it doesn't rely on palloc for
* memory allocation, so we could get NULL via HASH_ENTER_NULL. That's why we prefer
* continuing the execution instead of throwing an error.
*/
bool entryFound = false; bool entryFound = false;
SharedConnStatsHashEntry *connectionEntry = SharedConnStatsHashEntry *connectionEntry =
hash_search(SharedConnStatsHash, &connKey, HASH_FIND, &entryFound); hash_search(SharedConnStatsHash, &connKey, HASH_ENTER_NULL, &entryFound);
/* this worker node is removed or updated */ /*
if (!entryFound) * It is possible to throw an error at this point, but that doesn't help us in anyway.
* Instead, we try our best, let the connection establishment continue by-passing the
* connection throttling.
*/
if (!connectionEntry)
{ {
UnLockConnectionSharedMemory(); UnLockConnectionSharedMemory();
return; return;
} }
if (!entryFound)
{
/* we successfully allocated the entry for the first time, so initialize it */
connectionEntry->connectionCount = 0;
}
/* we should never have a negative entry */
if (connectionEntry->connectionCount < 0 )
elog(WARNING,"IncrementSharedConnectionCounter: connectionEntry->connectionCount %s %d: %d",hostname, port, connectionEntry->connectionCount);
connectionEntry->connectionCount += 1; connectionEntry->connectionCount += 1;
UnLockConnectionSharedMemory(); UnLockConnectionSharedMemory();
@ -393,24 +424,25 @@ DecrementSharedConnectionCounter(const char *hostname, int port)
SharedConnStatsHashEntry *connectionEntry = SharedConnStatsHashEntry *connectionEntry =
hash_search(SharedConnStatsHash, &connKey, HASH_FIND, &entryFound); hash_search(SharedConnStatsHash, &connKey, HASH_FIND, &entryFound);
/* this worker node is removed or updated */ /* this worker node is removed or updated, no need to care */
if (!entryFound) if (!entryFound)
{ {
UnLockConnectionSharedMemory(); UnLockConnectionSharedMemory();
/* wake up any waiters in case any backend is waiting for this node */
WakeupWaiterBackendsForSharedConnection();
/* make sure we don't have any lingering session lifespan connections */
CloseNodeConnectionsAfterTransaction(hostname, port);
return; return;
} }
/* /* we should never decrement a counter that has not been incremented */
* We might have some connection lingering for worker nodes if (connectionEntry->connectionCount < 1)
* that are removed/updated.
*/
if (connectionEntry->connectionCount <= 0)
{ {
UnLockConnectionSharedMemory(); elog(WARNING, "DecrementSharedConnectionCounter %s %d connectionEntry->connectionCount:%d",hostname, port, connectionEntry->connectionCount);
return;
} }
connectionEntry->connectionCount -= 1; connectionEntry->connectionCount -= 1;
UnLockConnectionSharedMemory(); UnLockConnectionSharedMemory();

View File

@ -281,6 +281,12 @@ master_disable_node(PG_FUNCTION_ARGS)
bool onlyConsiderActivePlacements = false; bool onlyConsiderActivePlacements = false;
MemoryContext savedContext = CurrentMemoryContext; MemoryContext savedContext = CurrentMemoryContext;
/* remove the shared connection counters to have some space */
RemoveInactiveNodesFromSharedConnections();
/* make sure we don't have any lingering session lifespan connections */
//CloseNodeConnectionsAfterTransaction(nodeName, nodePort);
PG_TRY(); PG_TRY();
{ {
if (NodeIsPrimary(workerNode)) if (NodeIsPrimary(workerNode))
@ -584,6 +590,9 @@ ActivateNode(char *nodeName, int nodePort)
{ {
bool isActive = true; bool isActive = true;
/* remove the shared connection counters to have some space */
RemoveInactiveNodesFromSharedConnections();
/* take an exclusive lock on pg_dist_node to serialize pg_dist_node changes */ /* take an exclusive lock on pg_dist_node to serialize pg_dist_node changes */
LockRelationOid(DistNodeRelationId(), ExclusiveLock); LockRelationOid(DistNodeRelationId(), ExclusiveLock);
@ -626,6 +635,9 @@ master_update_node(PG_FUNCTION_ARGS)
CheckCitusVersion(ERROR); CheckCitusVersion(ERROR);
/* remove the shared connection counters to have some space */
RemoveInactiveNodesFromSharedConnections();
WorkerNode *workerNodeWithSameAddress = FindWorkerNodeAnyCluster(newNodeNameString, WorkerNode *workerNodeWithSameAddress = FindWorkerNodeAnyCluster(newNodeNameString,
newNodePort); newNodePort);
if (workerNodeWithSameAddress != NULL) if (workerNodeWithSameAddress != NULL)
@ -698,13 +710,7 @@ master_update_node(PG_FUNCTION_ARGS)
int oldWorkerPort = workerNode->workerPort; int oldWorkerPort = workerNode->workerPort;
/* make sure we don't have any lingering session lifespan connections */ /* make sure we don't have any lingering session lifespan connections */
CloseNodeConnectionsAfterTransaction(oldWorkerName, oldWorkerPort); //CloseNodeConnectionsAfterTransaction(oldWorkerName, oldWorkerPort);
/*
* We're not sure that there is no concurrent queries going on, so remove all
* entries in shared connection stats for the current database.
*/
RemoveAllSharedConnectionEntriesForNode(oldWorkerName, oldWorkerPort);
UpdateNodeLocation(nodeId, newNodeNameString, newNodePort); UpdateNodeLocation(nodeId, newNodeNameString, newNodePort);
@ -1004,8 +1010,10 @@ ReadDistNode(bool includeNodesFromOtherClusters)
static void static void
RemoveNodeFromCluster(char *nodeName, int32 nodePort) RemoveNodeFromCluster(char *nodeName, int32 nodePort)
{ {
WorkerNode *workerNode = ModifiableWorkerNode(nodeName, nodePort); /* remove the shared connection counters to have some space */
RemoveInactiveNodesFromSharedConnections();
WorkerNode *workerNode = ModifiableWorkerNode(nodeName, nodePort);
if (NodeIsPrimary(workerNode)) if (NodeIsPrimary(workerNode))
{ {
bool onlyConsiderActivePlacements = false; bool onlyConsiderActivePlacements = false;
@ -1029,7 +1037,7 @@ RemoveNodeFromCluster(char *nodeName, int32 nodePort)
char *nodeDeleteCommand = NodeDeleteCommand(workerNode->nodeId); char *nodeDeleteCommand = NodeDeleteCommand(workerNode->nodeId);
/* make sure we don't have any lingering session lifespan connections */ /* make sure we don't have any lingering session lifespan connections */
CloseNodeConnectionsAfterTransaction(workerNode->workerName, nodePort); //CloseNodeConnectionsAfterTransaction(workerNode->workerName, nodePort);
SendCommandToWorkersWithMetadata(nodeDeleteCommand); SendCommandToWorkersWithMetadata(nodeDeleteCommand);
} }

View File

@ -30,6 +30,7 @@
#include "distributed/multi_executor.h" #include "distributed/multi_executor.h"
#include "distributed/transaction_management.h" #include "distributed/transaction_management.h"
#include "distributed/placement_connection.h" #include "distributed/placement_connection.h"
#include "distributed/shared_connection_stats.h"
#include "distributed/subplan_execution.h" #include "distributed/subplan_execution.h"
#include "distributed/version_compat.h" #include "distributed/version_compat.h"
#include "utils/hsearch.h" #include "utils/hsearch.h"

View File

@ -18,7 +18,7 @@ extern int ConnectionRetryTimout;
extern void InitializeSharedConnectionStats(void); extern void InitializeSharedConnectionStats(void);
extern void WaitForSharedConnection(void); extern void WaitForSharedConnection(void);
extern void WakeupWaiterBackendsForSharedConnection(void); extern void WakeupWaiterBackendsForSharedConnection(void);
extern void RemoveAllSharedConnectionEntriesForNode(char *hostname, int port); extern void RemoveInactiveNodesFromSharedConnections(void);
extern int GetMaxSharedPoolSize(void); extern int GetMaxSharedPoolSize(void);
extern bool TryToIncrementSharedConnectionCounter(const char *hostname, int port); extern bool TryToIncrementSharedConnectionCounter(const char *hostname, int port);
extern void WaitOrErrorForSharedConnection(const char *hostname, int port); extern void WaitOrErrorForSharedConnection(const char *hostname, int port);