- Separate HTAB for connection management and statistics.

- Refactoring
pull/7286/head
ivyazmitinov 2023-10-13 21:34:55 +07:00
parent b2051bffd7
commit 23e6b36e23
7 changed files with 392 additions and 291 deletions

View File

@ -379,7 +379,10 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port,
if (flags & WAIT_FOR_CONNECTION) if (flags & WAIT_FOR_CONNECTION)
{ {
WaitLoopForSharedConnection(hostname, port); int sharedCounterFlags = (flags & REQUIRE_MAINTENANCE_CONNECTION)
? MAINTENANCE_CONNECTION_POOL
: 0;
WaitLoopForSharedConnection(sharedCounterFlags, hostname, port);
} }
else if (flags & OPTIONAL_CONNECTION) else if (flags & OPTIONAL_CONNECTION)
{ {
@ -389,7 +392,8 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port,
* cannot reserve the right to establish a connection, we prefer to * cannot reserve the right to establish a connection, we prefer to
* error out. * error out.
*/ */
if (!TryToIncrementSharedConnectionCounter(hostname, port)) int sharedCounterFlags = 0;
if (!TryToIncrementSharedConnectionCounter(sharedCounterFlags, hostname, port))
{ {
/* do not track the connection anymore */ /* do not track the connection anymore */
dlist_delete(&connection->connectionNode); dlist_delete(&connection->connectionNode);
@ -409,7 +413,8 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port,
* *
* Still, we keep track of the connection counter. * Still, we keep track of the connection counter.
*/ */
IncrementSharedConnectionCounter(hostname, port); int sharedCounterFlags = 0;
IncrementSharedConnectionCounter(sharedCounterFlags, hostname, port);
} }

View File

@ -439,13 +439,16 @@ EnsureConnectionPossibilityForNode(WorkerNode *workerNode, bool waitForConnectio
* Increment the shared counter, we may need to wait if there are * Increment the shared counter, we may need to wait if there are
* no space left. * no space left.
*/ */
WaitLoopForSharedConnection(workerNode->workerName, workerNode->workerPort); int sharedCounterFlags = 0;
WaitLoopForSharedConnection(sharedCounterFlags, workerNode->workerName, workerNode->workerPort);
} }
else else
{ {
bool incremented = int sharedCounterFlags = 0;
TryToIncrementSharedConnectionCounter(workerNode->workerName, bool incremented = TryToIncrementSharedConnectionCounter(
workerNode->workerPort); sharedCounterFlags,
workerNode->workerName,
workerNode->workerPort);
if (!incremented) if (!incremented)
{ {
/* /*

View File

@ -18,7 +18,6 @@
#include "access/hash.h" #include "access/hash.h"
#include "access/htup_details.h" #include "access/htup_details.h"
#include "catalog/pg_authid.h"
#include "commands/dbcommands.h" #include "commands/dbcommands.h"
#include "common/hashfn.h" #include "common/hashfn.h"
#include "storage/ipc.h" #include "storage/ipc.h"
@ -27,15 +26,13 @@
#include "pg_version_constants.h" #include "pg_version_constants.h"
#include "distributed/backend_data.h" #include "distributed/backend_data.h"
#include "distributed/cancel_utils.h"
#include "distributed/connection_management.h" #include "distributed/connection_management.h"
#include "distributed/listutils.h"
#include "distributed/locally_reserved_shared_connections.h" #include "distributed/locally_reserved_shared_connections.h"
#include "distributed/metadata_cache.h" #include "distributed/metadata_cache.h"
#include "distributed/multi_executor.h"
#include "distributed/placement_connection.h" #include "distributed/placement_connection.h"
#include "distributed/shared_connection_stats.h" #include "distributed/shared_connection_stats.h"
#include "distributed/time_constants.h" #include "distributed/time_constants.h"
#include "distributed/worker_manager.h"
#include "distributed/tuplestore.h" #include "distributed/tuplestore.h"
#include "distributed/worker_manager.h" #include "distributed/worker_manager.h"
@ -58,8 +55,14 @@ typedef struct ConnectionStatsSharedData
ConditionVariable waitersConditionVariable; ConditionVariable waitersConditionVariable;
} ConnectionStatsSharedData; } ConnectionStatsSharedData;
/*
typedef struct SharedConnStatsHashKey * There are two hash tables:
*
* 1. The first one tracks the connection count per worker node and used for the connection throttling
* 2. The second one tracks the connection count per database on a worker node and used for statistics
*
*/
typedef struct SharedWorkerNodeConnStatsHashKey
{ {
/* /*
* We keep the entries in the shared memory even after master_update_node() * We keep the entries in the shared memory even after master_update_node()
@ -68,16 +71,28 @@ typedef struct SharedConnStatsHashKey
*/ */
char hostname[MAX_NODE_LENGTH]; char hostname[MAX_NODE_LENGTH];
int32 port; int32 port;
} SharedConnStatsHashKey; } SharedWorkerNodeConnStatsHashKey;
typedef struct SharedWorkerNodeDatabaseConnStatsHashKey
{
SharedWorkerNodeConnStatsHashKey workerNodeKey;
Oid database;
} SharedWorkerNodeDatabaseConnStatsHashKey;
/* hash entry for per worker stats */ /* hash entry for per worker stats */
typedef struct SharedConnStatsHashEntry typedef struct SharedWorkerNodeConnStatsHashEntry
{ {
SharedConnStatsHashKey key; SharedWorkerNodeConnStatsHashKey key;
Oid databaseOid;
int connectionCount; int count;
} SharedConnStatsHashEntry; } SharedWorkerNodeConnStatsHashEntry;
/* hash entry for per database on worker stats */
typedef struct SharedWorkerNodeDatabaseConnStatsHashEntry
{
SharedWorkerNodeDatabaseConnStatsHashKey key;
int count;
} SharedWorkerNodeDatabaseConnStatsHashEntry;
/* /*
@ -88,6 +103,11 @@ typedef struct SharedConnStatsHashEntry
*/ */
int MaxSharedPoolSize = 0; int MaxSharedPoolSize = 0;
/*
*
*/
float SharedPoolSizeMaintenancePercent = 10.0f;
/* /*
* Controlled via a GUC, never access directly, use GetLocalSharedPoolSize(). * Controlled via a GUC, never access directly, use GetLocalSharedPoolSize().
* "0" means adjust LocalSharedPoolSize automatically by using MaxConnections. * "0" means adjust LocalSharedPoolSize automatically by using MaxConnections.
@ -101,7 +121,8 @@ int MaxClientConnections = ALLOW_ALL_EXTERNAL_CONNECTIONS;
/* the following two structs are used for accessing shared memory */ /* the following two structs are used for accessing shared memory */
static HTAB *SharedConnStatsHash = NULL; static HTAB *SharedWorkerNodeConnStatsHash = NULL;
static HTAB *SharedWorkerNodeDatabaseConnStatsHash = NULL;
static ConnectionStatsSharedData *ConnectionStatsSharedState = NULL; static ConnectionStatsSharedData *ConnectionStatsSharedState = NULL;
@ -116,10 +137,25 @@ static void UnLockConnectionSharedMemory(void);
static bool ShouldWaitForConnection(int currentConnectionCount); static bool ShouldWaitForConnection(int currentConnectionCount);
static uint32 SharedConnectionHashHash(const void *key, Size keysize); static uint32 SharedConnectionHashHash(const void *key, Size keysize);
static int SharedConnectionHashCompare(const void *a, const void *b, Size keysize); static int SharedConnectionHashCompare(const void *a, const void *b, Size keysize);
static uint32 SharedWorkerNodeDatabaseHashHash(const void *key, Size keysize);
static int SharedWorkerNodeDatabaseHashCompare(const void *a, const void *b, Size keysize);
static bool IsConnectionToLocalNode(SharedWorkerNodeConnStatsHashKey *connKey);
static bool isConnectionSlotAvailable(SharedWorkerNodeConnStatsHashKey *connKey,
const SharedWorkerNodeConnStatsHashEntry *connectionEntry);
static bool
IncrementSharedConnectionCounterInternal(uint32 externalFlags, bool checkLimits, const char *hostname, int port,
Oid database);
static SharedWorkerNodeConnStatsHashKey PrepareWorkerNodeHashKey(const char *hostname, int port);
static SharedWorkerNodeDatabaseConnStatsHashKey PrepareWorkerNodeDatabaseHashKey(const char *hostname,
int port,
Oid database);
static void
DecrementSharedConnectionCounterInternal(const char *hostname, int port);
PG_FUNCTION_INFO_V1(citus_remote_connection_stats); PG_FUNCTION_INFO_V1(citus_remote_connection_stats);
/* /*
* citus_remote_connection_stats returns all the avaliable information about all * citus_remote_connection_stats returns all the avaliable information about all
* the remote connections (a.k.a., connections to remote nodes). * the remote connections (a.k.a., connections to remote nodes).
@ -155,26 +191,26 @@ StoreAllRemoteConnectionStats(Tuplestorestate *tupleStore, TupleDesc tupleDescri
LockConnectionSharedMemory(LW_SHARED); LockConnectionSharedMemory(LW_SHARED);
HASH_SEQ_STATUS status; HASH_SEQ_STATUS status;
SharedConnStatsHashEntry *connectionEntry = NULL; SharedWorkerNodeDatabaseConnStatsHashEntry *connectionEntry = NULL;
hash_seq_init(&status, SharedConnStatsHash); hash_seq_init(&status, SharedWorkerNodeDatabaseConnStatsHash);
while ((connectionEntry = (SharedConnStatsHashEntry *) hash_seq_search(&status)) != 0) while ((connectionEntry = (SharedWorkerNodeDatabaseConnStatsHashEntry *) hash_seq_search(&status)) != 0)
{ {
/* get ready for the next tuple */ /* get ready for the next tuple */
memset(values, 0, sizeof(values)); memset(values, 0, sizeof(values));
memset(isNulls, false, sizeof(isNulls)); memset(isNulls, false, sizeof(isNulls));
char *databaseName = get_database_name(connectionEntry->databaseOid); char *databaseName = get_database_name(connectionEntry->key.database);
if (databaseName == NULL) if (databaseName == NULL)
{ {
/* database might have been dropped */ /* database might have been dropped */
continue; continue;
} }
values[0] = PointerGetDatum(cstring_to_text(connectionEntry->key.hostname)); values[0] = PointerGetDatum(cstring_to_text(connectionEntry->key.workerNodeKey.hostname));
values[1] = Int32GetDatum(connectionEntry->key.port); values[1] = Int32GetDatum(connectionEntry->key.workerNodeKey.port);
values[2] = PointerGetDatum(cstring_to_text(databaseName)); values[2] = PointerGetDatum(cstring_to_text(databaseName));
values[3] = Int32GetDatum(connectionEntry->connectionCount); values[3] = Int32GetDatum(connectionEntry->count);
tuplestore_putvalues(tupleStore, tupleDescriptor, values, isNulls); tuplestore_putvalues(tupleStore, tupleDescriptor, values, isNulls);
} }
@ -220,6 +256,12 @@ GetMaxSharedPoolSize(void)
return MaxSharedPoolSize; return MaxSharedPoolSize;
} }
float
GetSharedPoolSizeMaintenancePercent(void)
{
return SharedPoolSizeMaintenancePercent;
}
/* /*
* GetLocalSharedPoolSize is a wrapper around LocalSharedPoolSize which is * GetLocalSharedPoolSize is a wrapper around LocalSharedPoolSize which is
@ -243,14 +285,14 @@ GetLocalSharedPoolSize(void)
/* /*
* WaitLoopForSharedConnection tries to increment the shared connection * WaitLoopForSharedConnection tries to increment the shared connection
* counter for the given hostname/port and the current database in * counter for the given hostname/port and the current database in
* SharedConnStatsHash. * SharedWorkerNodeConnStatsHash.
* *
* The function implements a retry mechanism via a condition variable. * The function implements a retry mechanism via a condition variable.
*/ */
void void
WaitLoopForSharedConnection(const char *hostname, int port) WaitLoopForSharedConnection(uint32 flags, const char *hostname, int port)
{ {
while (!TryToIncrementSharedConnectionCounter(hostname, port)) while (!TryToIncrementSharedConnectionCounter(flags, hostname, port))
{ {
CHECK_FOR_INTERRUPTS(); CHECK_FOR_INTERRUPTS();
@ -260,149 +302,37 @@ WaitLoopForSharedConnection(const char *hostname, int port)
ConditionVariableCancelSleep(); ConditionVariableCancelSleep();
} }
/* /*
* TryToIncrementSharedConnectionCounter tries to increment the shared * TryToIncrementSharedConnectionCounter tries to increment the shared
* connection counter for the given nodeId and the current database in * connection counter for the given nodeId and the current database in
* SharedConnStatsHash. * SharedWorkerNodeConnStatsHash.
* *
* If the function returns true, the caller is allowed (and expected) * If the function returns true, the caller is allowed (and expected)
* to establish a new connection to the given node. Else, the caller * to establish a new connection to the given node. Else, the caller
* is not allowed to establish a new connection. * is not allowed to establish a new connection.
*/ */
bool bool
TryToIncrementSharedConnectionCounter(const char *hostname, int port) TryToIncrementSharedConnectionCounter(uint32 flags, const char *hostname, int port)
{ {
if (GetMaxSharedPoolSize() == DISABLE_CONNECTION_THROTTLING) if (GetMaxSharedPoolSize() == DISABLE_CONNECTION_THROTTLING)
{ {
/* connection throttling disabled */ /* connection throttling disabled */
return true; return true;
} }
bool counterIncremented = false; /*
SharedConnStatsHashKey connKey;
strlcpy(connKey.hostname, hostname, MAX_NODE_LENGTH);
if (strlen(hostname) > MAX_NODE_LENGTH)
{
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("hostname exceeds the maximum length of %d",
MAX_NODE_LENGTH)));
}
/*
* The local session might already have some reserved connections to the given * The local session might already have some reserved connections to the given
* node. In that case, we don't need to go through the shared memory. * node. In that case, we don't need to go through the shared memory.
*/ */
Oid userId = GetUserId(); Oid userId = GetUserId();
if (CanUseReservedConnection(hostname, port, userId, MyDatabaseId)) if (CanUseReservedConnection(hostname, port, userId, MyDatabaseId))
{ {
MarkReservedConnectionUsed(hostname, port, userId, MyDatabaseId); MarkReservedConnectionUsed(hostname, port, userId, MyDatabaseId);
return true; return true;
} }
connKey.port = port; return IncrementSharedConnectionCounterInternal(flags, true, hostname, port, MyDatabaseId);
/*
* Handle adaptive connection management for the local node slightly different
* as local node can failover to local execution.
*/
bool connectionToLocalNode = false;
int activeBackendCount = 0;
WorkerNode *workerNode = FindWorkerNode(hostname, port);
if (workerNode)
{
connectionToLocalNode = (workerNode->groupId == GetLocalGroupId());
if (connectionToLocalNode &&
GetLocalSharedPoolSize() == DISABLE_REMOTE_CONNECTIONS_FOR_LOCAL_QUERIES)
{
/*
* This early return is required as LocalNodeParallelExecutionFactor
* is ignored for the first connection below. This check makes the
* user experience is more accurate and also makes it easy for
* having regression tests which emulates the local node adaptive
* connection management.
*/
return false;
}
activeBackendCount = GetExternalClientBackendCount();
}
LockConnectionSharedMemory(LW_EXCLUSIVE);
/*
* As the hash map is allocated in shared memory, it doesn't rely on palloc for
* memory allocation, so we could get NULL via HASH_ENTER_NULL when there is no
* space in the shared memory. That's why we prefer continuing the execution
* instead of throwing an error.
*/
bool entryFound = false;
SharedConnStatsHashEntry *connectionEntry =
hash_search(SharedConnStatsHash, &connKey, HASH_ENTER_NULL, &entryFound);
/*
* It is possible to throw an error at this point, but that doesn't help us in anyway.
* Instead, we try our best, let the connection establishment continue by-passing the
* connection throttling.
*/
if (!connectionEntry)
{
UnLockConnectionSharedMemory();
return true;
}
if (!entryFound)
{
/* we successfully allocated the entry for the first time, so initialize it */
connectionEntry->connectionCount = 1;
connectionEntry->databaseOid = MyDatabaseId;
counterIncremented = true;
}
else if (connectionToLocalNode)
{
/*
* For local nodes, solely relying on citus.max_shared_pool_size or
* max_connections might not be sufficient. The former gives us
* a preview of the future (e.g., we let the new connections to establish,
* but they are not established yet). The latter gives us the close to
* precise view of the past (e.g., the active number of client backends).
*
* Overall, we want to limit both of the metrics. The former limit typically
* kicks in under regular loads, where the load of the database increases in
* a reasonable pace. The latter limit typically kicks in when the database
* is issued lots of concurrent sessions at the same time, such as benchmarks.
*/
if (activeBackendCount + 1 > GetLocalSharedPoolSize())
{
counterIncremented = false;
}
else if (connectionEntry->connectionCount + 1 > GetLocalSharedPoolSize())
{
counterIncremented = false;
}
else
{
connectionEntry->connectionCount++;
counterIncremented = true;
}
}
else if (connectionEntry->connectionCount + 1 > GetMaxSharedPoolSize())
{
/* there is no space left for this connection */
counterIncremented = false;
}
else
{
connectionEntry->connectionCount++;
counterIncremented = true;
}
UnLockConnectionSharedMemory();
return counterIncremented;
} }
@ -411,64 +341,162 @@ TryToIncrementSharedConnectionCounter(const char *hostname, int port)
* for the given hostname and port. * for the given hostname and port.
*/ */
void void
IncrementSharedConnectionCounter(const char *hostname, int port) IncrementSharedConnectionCounter(uint32 flags, const char *hostname, int port)
{ {
SharedConnStatsHashKey connKey; if (MaxSharedPoolSize == DISABLE_CONNECTION_THROTTLING)
{
/* connection throttling disabled */
return;
}
if (MaxSharedPoolSize == DISABLE_CONNECTION_THROTTLING) IncrementSharedConnectionCounterInternal(flags, false, hostname, port, MyDatabaseId);
{
/* connection throttling disabled */
return;
}
strlcpy(connKey.hostname, hostname, MAX_NODE_LENGTH);
if (strlen(hostname) > MAX_NODE_LENGTH)
{
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("hostname exceeds the maximum length of %d",
MAX_NODE_LENGTH)));
}
connKey.port = port;
LockConnectionSharedMemory(LW_EXCLUSIVE);
/*
* As the hash map is allocated in shared memory, it doesn't rely on palloc for
* memory allocation, so we could get NULL via HASH_ENTER_NULL. That's why we prefer
* continuing the execution instead of throwing an error.
*/
bool entryFound = false;
SharedConnStatsHashEntry *connectionEntry =
hash_search(SharedConnStatsHash, &connKey, HASH_ENTER_NULL, &entryFound);
/*
* It is possible to throw an error at this point, but that doesn't help us in anyway.
* Instead, we try our best, let the connection establishment continue by-passing the
* connection throttling.
*/
if (!connectionEntry)
{
UnLockConnectionSharedMemory();
ereport(DEBUG4, (errmsg("No entry found for node %s:%d while incrementing "
"connection counter", hostname, port)));
return;
}
if (!entryFound)
{
/* we successfully allocated the entry for the first time, so initialize it */
connectionEntry->connectionCount = 0;
connectionEntry->databaseOid = MyDatabaseId;
}
connectionEntry->connectionCount += 1;
UnLockConnectionSharedMemory();
} }
static SharedWorkerNodeConnStatsHashKey PrepareWorkerNodeHashKey(const char *hostname, int port)
{
SharedWorkerNodeConnStatsHashKey key;
strlcpy(key.hostname, hostname, MAX_NODE_LENGTH);
if (strlen(hostname) > MAX_NODE_LENGTH)
{
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("hostname exceeds the maximum length of %d",
MAX_NODE_LENGTH)));
}
key.port = port;
return key;
}
static SharedWorkerNodeDatabaseConnStatsHashKey PrepareWorkerNodeDatabaseHashKey(const char *hostname,
int port,
Oid database)
{
SharedWorkerNodeDatabaseConnStatsHashKey workerNodeDatabaseKey;
workerNodeDatabaseKey.workerNodeKey = PrepareWorkerNodeHashKey(hostname, port);
workerNodeDatabaseKey.database = database;
return workerNodeDatabaseKey;
}
static bool
IncrementSharedConnectionCounterInternal(uint32 externalFlags,
bool checkLimits,
const char *hostname,
int port,
Oid database)
{
LockConnectionSharedMemory(LW_EXCLUSIVE);
/*
* As the hash map is allocated in shared memory, it doesn't rely on palloc for
* memory allocation, so we could get NULL via HASH_ENTER_NULL when there is no
* space in the shared memory. That's why we prefer continuing the execution
* instead of throwing an error.
*/
SharedWorkerNodeConnStatsHashKey workerNodeKey = PrepareWorkerNodeHashKey(hostname, port);
bool workerNodeEntryFound = false;
SharedWorkerNodeConnStatsHashEntry *workerNodeConnectionEntry =
hash_search(SharedWorkerNodeConnStatsHash,
&workerNodeKey,
HASH_ENTER_NULL,
&workerNodeEntryFound);
/*
* It is possible to throw an error at this point, but that doesn't help us in anyway.
* Instead, we try our best, let the connection establishment continue by-passing the
* connection throttling.
*/
if (!workerNodeConnectionEntry)
{
UnLockConnectionSharedMemory();
return true;
}
if (!workerNodeEntryFound)
{
/* we successfully allocated the entry for the first time, so initialize it */
workerNodeConnectionEntry->count = 0;
}
/* Initialized SharedWorkerNodeDatabaseConnStatsHash the same way */
SharedWorkerNodeDatabaseConnStatsHashKey workerNodeDatabaseKey =
PrepareWorkerNodeDatabaseHashKey(hostname, port, database);
bool workerNodeDatabaseEntryFound = false;
SharedWorkerNodeDatabaseConnStatsHashEntry *workerNodeDatabaseEntry =
hash_search(SharedWorkerNodeDatabaseConnStatsHash,
&workerNodeDatabaseKey,
HASH_ENTER_NULL,
&workerNodeDatabaseEntryFound);
if (!workerNodeDatabaseEntry)
{
UnLockConnectionSharedMemory();
return true;
}
if (!workerNodeDatabaseEntryFound)
{
workerNodeDatabaseEntry->count = 0;
}
/* Increment counter if possible */
bool connectionSlotAvailable = true;
connectionSlotAvailable = !checkLimits ||
isConnectionSlotAvailable(&workerNodeKey, workerNodeConnectionEntry);
if (connectionSlotAvailable)
{
workerNodeConnectionEntry->count += 1;
workerNodeDatabaseEntry->count += 1;
}
UnLockConnectionSharedMemory();
return connectionSlotAvailable;
}
static bool IsConnectionToLocalNode(SharedWorkerNodeConnStatsHashKey *connKey)
{
WorkerNode *workerNode = FindWorkerNode(connKey->hostname, connKey->port);
return workerNode && (workerNode->groupId == GetLocalGroupId());
}
static bool isConnectionSlotAvailable(SharedWorkerNodeConnStatsHashKey *connKey,
const SharedWorkerNodeConnStatsHashEntry *connectionEntry)
{
bool connectionSlotAvailable = true;
bool connectionToLocalNode = IsConnectionToLocalNode(connKey);
if (connectionToLocalNode)
{
bool remoteConnectionsForLocalQueriesDisabled =
GetLocalSharedPoolSize() == DISABLE_REMOTE_CONNECTIONS_FOR_LOCAL_QUERIES;
/*
* For local nodes, solely relying on citus.max_shared_pool_size or
* max_connections might not be sufficient. The former gives us
* a preview of the future (e.g., we let the new connections to establish,
* but they are not established yet). The latter gives us the close to
* precise view of the past (e.g., the active number of client backends).
*
* Overall, we want to limit both of the metrics. The former limit typically
* kicks in under regular loads, where the load of the database increases in
* a reasonable pace. The latter limit typically kicks in when the database
* is issued lots of concurrent sessions at the same time, such as benchmarks.
*/
bool localConnectionLimitExceeded =
GetExternalClientBackendCount() + 1 > GetLocalSharedPoolSize() ||
connectionEntry->count + 1 > GetLocalSharedPoolSize();
if (remoteConnectionsForLocalQueriesDisabled || localConnectionLimitExceeded)
{
connectionSlotAvailable = false;
}
}
else if (connectionEntry->count + 1 > GetMaxSharedPoolSize())
{
connectionSlotAvailable = false;
}
return connectionSlotAvailable;
}
/* /*
* DecrementSharedConnectionCounter decrements the shared counter * DecrementSharedConnectionCounter decrements the shared counter
@ -477,79 +505,96 @@ IncrementSharedConnectionCounter(const char *hostname, int port)
void void
DecrementSharedConnectionCounter(const char *hostname, int port) DecrementSharedConnectionCounter(const char *hostname, int port)
{ {
SharedConnStatsHashKey connKey;
/* DecrementSharedConnectionCounterInternal(hostname, port);
* Do not call GetMaxSharedPoolSize() here, since it may read from UnLockConnectionSharedMemory();
* the catalog and we may be in the process exit handler. WakeupWaiterBackendsForSharedConnection();
*/ }
if (MaxSharedPoolSize == DISABLE_CONNECTION_THROTTLING)
{
/* connection throttling disabled */
return;
}
strlcpy(connKey.hostname, hostname, MAX_NODE_LENGTH); static void
if (strlen(hostname) > MAX_NODE_LENGTH) DecrementSharedConnectionCounterInternal(const char *hostname, int port)
{ {
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), /*
errmsg("hostname exceeds the maximum length of %d", * Do not call GetMaxSharedPoolSize() here, since it may read from
MAX_NODE_LENGTH))); * the catalog and we may be in the process exit handler.
} */
if (MaxSharedPoolSize == DISABLE_CONNECTION_THROTTLING)
{
/* connection throttling disabled */
return;
}
connKey.port = port; LockConnectionSharedMemory(LW_EXCLUSIVE);
LockConnectionSharedMemory(LW_EXCLUSIVE); bool workerNodeEntryFound = false;
SharedWorkerNodeConnStatsHashKey workerNodeKey = PrepareWorkerNodeHashKey(hostname, port);
SharedWorkerNodeConnStatsHashEntry *workerNodeEntry =
hash_search(SharedWorkerNodeConnStatsHash, &workerNodeKey, HASH_FIND, &workerNodeEntryFound);
bool entryFound = false; /* this worker node is removed or updated, no need to care */
SharedConnStatsHashEntry *connectionEntry = if (!workerNodeEntryFound)
hash_search(SharedConnStatsHash, &connKey, HASH_FIND, &entryFound); {
ereport(DEBUG4, (errmsg("No entry found for node %s:%d while decrementing "
"connection counter", hostname, port)));
return;
}
/* this worker node is removed or updated, no need to care */ /* we should never go below 0 */
if (!entryFound) Assert(workerNodeEntry->count > 0);
{
UnLockConnectionSharedMemory();
/* wake up any waiters in case any backend is waiting for this node */
WakeupWaiterBackendsForSharedConnection();
ereport(DEBUG4, (errmsg("No entry found for node %s:%d while decrementing " workerNodeEntry->count -= 1;
"connection counter", hostname, port)));
return;
}
/* we should never go below 0 */ /*
Assert(connectionEntry->connectionCount > 0); * We don't have to remove at this point as the node might be still active
* and will have new connections open to it. Still, this seems like a convenient
* place to remove the entry, as count == 0 implies that the server is
* not busy, and given the default value of MaxCachedConnectionsPerWorker = 1,
* we're unlikely to trigger this often.
*/
if (workerNodeEntry->count == 0)
{
hash_search(SharedWorkerNodeConnStatsHash, &workerNodeKey, HASH_REMOVE, NULL);
}
connectionEntry->connectionCount -= 1; /*
* Perform the same with SharedWorkerNodeDatabaseConnStatsHashKey
*/
if (connectionEntry->connectionCount == 0) SharedWorkerNodeDatabaseConnStatsHashKey workerNodeDatabaseKey =
{ PrepareWorkerNodeDatabaseHashKey(hostname, port, MyDatabaseId);
/* bool workerNodeDatabaseEntryFound = false;
* We don't have to remove at this point as the node might be still active SharedWorkerNodeDatabaseConnStatsHashEntry *workerNodeDatabaseEntry =
* and will have new connections open to it. Still, this seems like a convenient hash_search(SharedWorkerNodeDatabaseConnStatsHash,
* place to remove the entry, as connectionCount == 0 implies that the server is &workerNodeDatabaseKey,
* not busy, and given the default value of MaxCachedConnectionsPerWorker = 1, HASH_FIND,
* we're unlikely to trigger this often. &workerNodeDatabaseEntryFound);
*/
hash_search(SharedConnStatsHash, &connKey, HASH_REMOVE, &entryFound);
}
UnLockConnectionSharedMemory(); if (!workerNodeDatabaseEntryFound)
{
return;
}
WakeupWaiterBackendsForSharedConnection(); Assert(workerNodeDatabaseEntry->count > 0);
workerNodeDatabaseEntry->count -= 1;
if (workerNodeDatabaseEntry->count == 0)
{
hash_search(SharedWorkerNodeDatabaseConnStatsHash, &workerNodeDatabaseKey, HASH_REMOVE, NULL);
}
} }
/* /*
* LockConnectionSharedMemory is a utility function that should be used when * LockConnectionSharedMemory is a utility function that should be used when
* accessing to the SharedConnStatsHash, which is in the shared memory. * accessing to the SharedWorkerNodeConnStatsHash, which is in the shared memory.
*/ */
static void static void
LockConnectionSharedMemory(LWLockMode lockMode) LockConnectionSharedMemory(LWLockMode lockMode)
{ {
LWLockAcquire(&ConnectionStatsSharedState->sharedConnectionHashLock, lockMode); LWLockAcquire(&ConnectionStatsSharedState->sharedConnectionHashLock, lockMode);
} }
@ -634,12 +679,17 @@ SharedConnectionStatsShmemSize(void)
size = add_size(size, sizeof(ConnectionStatsSharedData)); size = add_size(size, sizeof(ConnectionStatsSharedData));
Size hashSize = hash_estimate_size(MaxWorkerNodesTracked, Size workerNodeConnHashSize = hash_estimate_size(MaxWorkerNodesTracked,
sizeof(SharedConnStatsHashEntry)); sizeof(SharedWorkerNodeConnStatsHashEntry));
size = add_size(size, hashSize); size = add_size(size, workerNodeConnHashSize);
return size; Size workerNodeDatabaseConnSize = hash_estimate_size(DatabasesPerWorker,
sizeof(SharedWorkerNodeDatabaseConnStatsHashEntry));
size = add_size(size, workerNodeDatabaseConnSize);
return size;
} }
@ -651,15 +701,6 @@ void
SharedConnectionStatsShmemInit(void) SharedConnectionStatsShmemInit(void)
{ {
bool alreadyInitialized = false; bool alreadyInitialized = false;
HASHCTL info;
/* create (hostname, port, database) -> [counter] */
memset(&info, 0, sizeof(info));
info.keysize = sizeof(SharedConnStatsHashKey);
info.entrysize = sizeof(SharedConnStatsHashEntry);
info.hash = SharedConnectionHashHash;
info.match = SharedConnectionHashCompare;
uint32 hashFlags = (HASH_ELEM | HASH_FUNCTION | HASH_COMPARE);
/* /*
* Currently the lock isn't required because allocation only happens at * Currently the lock isn't required because allocation only happens at
@ -688,14 +729,42 @@ SharedConnectionStatsShmemInit(void)
ConditionVariableInit(&ConnectionStatsSharedState->waitersConditionVariable); ConditionVariableInit(&ConnectionStatsSharedState->waitersConditionVariable);
} }
/* allocate hash table */ /* allocate hash tables */
SharedConnStatsHash =
ShmemInitHash("Shared Conn. Stats Hash", MaxWorkerNodesTracked, /* create (hostname, port) -> [counter] */
MaxWorkerNodesTracked, &info, hashFlags); HASHCTL sharedWorkerNodeConnStatsHashInfo;
memset(&sharedWorkerNodeConnStatsHashInfo, 0, sizeof(sharedWorkerNodeConnStatsHashInfo));
sharedWorkerNodeConnStatsHashInfo.keysize = sizeof(SharedWorkerNodeConnStatsHashKey);
sharedWorkerNodeConnStatsHashInfo.entrysize = sizeof(SharedWorkerNodeConnStatsHashEntry);
sharedWorkerNodeConnStatsHashInfo.hash = SharedConnectionHashHash;
sharedWorkerNodeConnStatsHashInfo.match = SharedConnectionHashCompare;
SharedWorkerNodeConnStatsHash =
ShmemInitHash("Shared Conn. Stats Hash",
MaxWorkerNodesTracked,
MaxWorkerNodesTracked,
&sharedWorkerNodeConnStatsHashInfo,
(HASH_ELEM | HASH_FUNCTION | HASH_COMPARE));
/* create (hostname, port, database) -> [counter] */
HASHCTL sharedWorkerNodeDatabaseConnStatsHashInfo;
memset(&sharedWorkerNodeDatabaseConnStatsHashInfo, 0, sizeof(sharedWorkerNodeDatabaseConnStatsHashInfo));
sharedWorkerNodeDatabaseConnStatsHashInfo.keysize = sizeof(SharedWorkerNodeDatabaseConnStatsHashKey);
sharedWorkerNodeDatabaseConnStatsHashInfo.entrysize = sizeof(SharedWorkerNodeDatabaseConnStatsHashEntry);
sharedWorkerNodeDatabaseConnStatsHashInfo.hash = SharedWorkerNodeDatabaseHashHash;
sharedWorkerNodeDatabaseConnStatsHashInfo.match = SharedWorkerNodeDatabaseHashCompare;
int sharedWorkerNodeDatabaseConnStatsHashSize = MaxWorkerNodesTracked * DatabasesPerWorker;
SharedWorkerNodeDatabaseConnStatsHash =
ShmemInitHash("Shared Conn Per Database. Stats Hash",
sharedWorkerNodeDatabaseConnStatsHashSize,
sharedWorkerNodeDatabaseConnStatsHashSize,
&sharedWorkerNodeDatabaseConnStatsHashInfo,
(HASH_ELEM | HASH_FUNCTION | HASH_COMPARE));
LWLockRelease(AddinShmemInitLock); LWLockRelease(AddinShmemInitLock);
Assert(SharedConnStatsHash != NULL); Assert(SharedWorkerNodeConnStatsHash != NULL);
Assert(SharedWorkerNodeDatabaseConnStatsHash != NULL);
Assert(ConnectionStatsSharedState->sharedConnectionHashTrancheId != 0); Assert(ConnectionStatsSharedState->sharedConnectionHashTrancheId != 0);
if (prev_shmem_startup_hook != NULL) if (prev_shmem_startup_hook != NULL)
@ -797,7 +866,7 @@ ShouldWaitForConnection(int currentConnectionCount)
static uint32 static uint32
SharedConnectionHashHash(const void *key, Size keysize) SharedConnectionHashHash(const void *key, Size keysize)
{ {
SharedConnStatsHashKey *entry = (SharedConnStatsHashKey *) key; SharedWorkerNodeConnStatsHashKey *entry = (SharedWorkerNodeConnStatsHashKey *) key;
uint32 hash = string_hash(entry->hostname, NAMEDATALEN); uint32 hash = string_hash(entry->hostname, NAMEDATALEN);
hash = hash_combine(hash, hash_uint32(entry->port)); hash = hash_combine(hash, hash_uint32(entry->port));
@ -805,20 +874,35 @@ SharedConnectionHashHash(const void *key, Size keysize)
return hash; return hash;
} }
static uint32
SharedWorkerNodeDatabaseHashHash(const void *key, Size keysize)
{
SharedWorkerNodeDatabaseConnStatsHashKey *entry = (SharedWorkerNodeDatabaseConnStatsHashKey *) key;
uint32 hash = string_hash(entry->workerNodeKey.hostname, NAMEDATALEN);
hash = hash_combine(hash, hash_uint32(entry->workerNodeKey.port));
hash = hash_combine(hash, hash_uint32(entry->database));
return hash;
}
static int static int
SharedConnectionHashCompare(const void *a, const void *b, Size keysize) SharedConnectionHashCompare(const void *a, const void *b, Size keysize)
{ {
SharedConnStatsHashKey *ca = (SharedConnStatsHashKey *) a; SharedWorkerNodeConnStatsHashKey *ca = (SharedWorkerNodeConnStatsHashKey *) a;
SharedConnStatsHashKey *cb = (SharedConnStatsHashKey *) b; SharedWorkerNodeConnStatsHashKey *cb = (SharedWorkerNodeConnStatsHashKey *) b;
if (strncmp(ca->hostname, cb->hostname, MAX_NODE_LENGTH) != 0 || return strncmp(ca->hostname, cb->hostname, MAX_NODE_LENGTH) != 0 ||
ca->port != cb->port) ca->port != cb->port;
{ }
return 1;
} static int
else SharedWorkerNodeDatabaseHashCompare(const void *a, const void *b, Size keysize)
{ {
return 0; SharedWorkerNodeDatabaseConnStatsHashKey *ca = (SharedWorkerNodeDatabaseConnStatsHashKey *) a;
} SharedWorkerNodeDatabaseConnStatsHashKey *cb = (SharedWorkerNodeDatabaseConnStatsHashKey *) b;
return strncmp(ca->workerNodeKey.hostname, cb->workerNodeKey.hostname, MAX_NODE_LENGTH) != 0 ||
ca->workerNodeKey.port != cb->workerNodeKey.port ||
ca->database != cb->database;
} }

View File

@ -39,6 +39,7 @@
/* Config variables managed via guc.c */ /* Config variables managed via guc.c */
char *WorkerListFileName; char *WorkerListFileName;
int MaxWorkerNodesTracked = 2048; /* determines worker node hash table size */ int MaxWorkerNodesTracked = 2048; /* determines worker node hash table size */
int DatabasesPerWorker = 10; /* determine database per worker hash table size */
/* Local functions forward declarations */ /* Local functions forward declarations */

View File

@ -123,7 +123,9 @@ enum MultiConnectionMode
* *
* This is need to run 'CREATE_REPLICATION_SLOT' command. * This is need to run 'CREATE_REPLICATION_SLOT' command.
*/ */
REQUIRE_REPLICATION_CONNECTION_PARAM = 1 << 8 REQUIRE_REPLICATION_CONNECTION_PARAM = 1 << 8,
REQUIRE_MAINTENANCE_CONNECTION = 1 << 9
}; };

View File

@ -16,8 +16,13 @@
#define DISABLE_REMOTE_CONNECTIONS_FOR_LOCAL_QUERIES -1 #define DISABLE_REMOTE_CONNECTIONS_FOR_LOCAL_QUERIES -1
#define ALLOW_ALL_EXTERNAL_CONNECTIONS -1 #define ALLOW_ALL_EXTERNAL_CONNECTIONS -1
enum SharedPoolCounterMode
{
MAINTENANCE_CONNECTION_POOL = 1 << 0
};
extern int MaxSharedPoolSize; extern int MaxSharedPoolSize;
extern float MaintenanceSharedPoolSizePercent;
extern int LocalSharedPoolSize; extern int LocalSharedPoolSize;
extern int MaxClientConnections; extern int MaxClientConnections;
@ -30,10 +35,10 @@ extern void SharedConnectionStatsShmemInit(void);
extern int GetMaxClientConnections(void); extern int GetMaxClientConnections(void);
extern int GetMaxSharedPoolSize(void); extern int GetMaxSharedPoolSize(void);
extern int GetLocalSharedPoolSize(void); extern int GetLocalSharedPoolSize(void);
extern bool TryToIncrementSharedConnectionCounter(const char *hostname, int port); extern bool TryToIncrementSharedConnectionCounter(uint32 flags, const char *hostname, int port);
extern void WaitLoopForSharedConnection(const char *hostname, int port); extern void WaitLoopForSharedConnection(uint32 flags, const char *hostname, int port);
extern void DecrementSharedConnectionCounter(const char *hostname, int port); extern void DecrementSharedConnectionCounter(const char *hostname, int port);
extern void IncrementSharedConnectionCounter(const char *hostname, int port); extern void IncrementSharedConnectionCounter(uint32 flags, const char *hostname, int port);
extern int AdaptiveConnectionManagementFlag(bool connectToLocalNode, int extern int AdaptiveConnectionManagementFlag(bool connectToLocalNode, int
activeConnectionCount); activeConnectionCount);

View File

@ -59,6 +59,7 @@ typedef struct WorkerNode
/* Config variables managed via guc.c */ /* Config variables managed via guc.c */
extern int MaxWorkerNodesTracked; extern int MaxWorkerNodesTracked;
extern int DatabasesPerWorker;
extern char *WorkerListFileName; extern char *WorkerListFileName;
extern char *CurrentCluster; extern char *CurrentCluster;