Basic implementation for TryIncrementSharedConnectionCounter

preventConflictingFlags
Onder Kalaci 2020-03-24 13:20:40 +01:00
parent 22c0f2a8cd
commit 66e7b822aa
5 changed files with 84 additions and 16 deletions

View File

@ -28,6 +28,7 @@
#include "distributed/hash_helpers.h" #include "distributed/hash_helpers.h"
#include "distributed/placement_connection.h" #include "distributed/placement_connection.h"
#include "distributed/run_from_same_connection.h" #include "distributed/run_from_same_connection.h"
#include "distributed/shared_connection_stats.h"
#include "distributed/cancel_utils.h" #include "distributed/cancel_utils.h"
#include "distributed/remote_commands.h" #include "distributed/remote_commands.h"
#include "distributed/version_compat.h" #include "distributed/version_compat.h"
@ -321,6 +322,7 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port,
* Either no caching desired, or no pre-established, non-claimed, * Either no caching desired, or no pre-established, non-claimed,
* connection present. Initiate connection establishment. * connection present. Initiate connection establishment.
*/ */
TryToIncrementSharedConnectionCounter(hostname, port);
connection = StartConnectionEstablishment(&key); connection = StartConnectionEstablishment(&key);

View File

@ -20,16 +20,18 @@
#include "access/hash.h" #include "access/hash.h"
#include "access/htup_details.h" #include "access/htup_details.h"
#include "catalog/pg_authid.h" #include "catalog/pg_authid.h"
#include "commands/dbcommands.h"
#include "distributed/connection_management.h" #include "distributed/connection_management.h"
#include "distributed/metadata_cache.h" #include "distributed/metadata_cache.h"
#include "distributed/shared_connection_stats.h" #include "distributed/shared_connection_stats.h"
#include "distributed/tuplestore.h" #include "distributed/tuplestore.h"
#include "utils/builtins.h"
#include "utils/hashutils.h" #include "utils/hashutils.h"
#include "utils/hsearch.h" #include "utils/hsearch.h"
#include "storage/ipc.h" #include "storage/ipc.h"
#define REMOTE_CONNECTION_STATS_COLUMNS 6 #define REMOTE_CONNECTION_STATS_COLUMNS 4
/* /*
@ -48,16 +50,18 @@ typedef struct ConnectionStatsSharedData
typedef struct SharedConnStatsHashKey typedef struct SharedConnStatsHashKey
{ {
/* /*
* Using nodeId (over hostname/hostport) make the tracking resiliant to * In some cases, Citus allows opening connections to hosts where
* master_update_node(). Plus, requires a little less memory. * there is no notion of "WorkerNode", such as task-tracker daemon.
* That's why, we prefer to use "hostname/port" over nodeId.
*/ */
uint32 nodeId; char hostname[MAX_NODE_LENGTH];
int32 port;
/* /*
* Given that citus.shared_max_pool_size can be defined per database, we * Given that citus.shared_max_pool_size can be defined per database, we
* should keep track of shared connections per database. * should keep track of shared connections per database.
*/ */
char database[NAMEDATALEN]; Oid databaseOid;
} SharedConnStatsHashKey; } SharedConnStatsHashKey;
/* hash entry for per worker stats */ /* hash entry for per worker stats */
@ -79,6 +83,8 @@ typedef struct SharedConnStatsHashEntry
*/ */
int MaxTrackedWorkerNodes = 1024; int MaxTrackedWorkerNodes = 1024;
static int MaxSharedPoolSize = 100;
/* the following two structs used for accessing shared memory */ /* the following two structs used for accessing shared memory */
static HTAB *SharedConnStatsHash = NULL; static HTAB *SharedConnStatsHash = NULL;
static ConnectionStatsSharedData *ConnectionStatsSharedState = NULL; static ConnectionStatsSharedData *ConnectionStatsSharedState = NULL;
@ -147,9 +153,12 @@ StoreAllConnections(Tuplestorestate *tupleStore, TupleDesc tupleDescriptor)
memset(values, 0, sizeof(values)); memset(values, 0, sizeof(values));
memset(isNulls, false, sizeof(isNulls)); memset(isNulls, false, sizeof(isNulls));
values[0] = Int32GetDatum(connectionEntry->key.nodeId); char *databaseName = get_database_name(connectionEntry->key.databaseOid);
values[1] = PointerGetDatum(connectionEntry->key.database);
values[2] = Int32GetDatum(connectionEntry->connectionCount); values[0] = PointerGetDatum(cstring_to_text(connectionEntry->key.hostname));
values[1] = Int32GetDatum(connectionEntry->key.port);
values[2] = PointerGetDatum(cstring_to_text(databaseName));
values[3] = Int32GetDatum(connectionEntry->connectionCount);
tuplestore_putvalues(tupleStore, tupleDescriptor, values, isNulls); tuplestore_putvalues(tupleStore, tupleDescriptor, values, isNulls);
} }
@ -158,6 +167,60 @@ StoreAllConnections(Tuplestorestate *tupleStore, TupleDesc tupleDescriptor)
} }
/*
* Tries to increment the shared connection counter for the given nodeId and
* the current database in SharedConnStatsHash.
*
* The function first checks whether the number of connections is less than
* citus.max_shared_pool_size. If so, the function increments the counter
* by one and returns true. Else, the function returns false.
*/
bool
TryToIncrementSharedConnectionCounter(const char *hostname, int port)
{
bool counterIncremented = false;
SharedConnStatsHashKey connKey;
strlcpy(connKey.hostname, hostname, MAX_NODE_LENGTH);
if (strlen(hostname) > MAX_NODE_LENGTH)
{
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("hostname exceeds the maximum length of %d",
MAX_NODE_LENGTH)));
}
connKey.port = port;
connKey.databaseOid = MyDatabaseId;
LockConnectionSharedMemory(LW_EXCLUSIVE);
bool entryFound = false;
SharedConnStatsHashEntry *connectionEntry =
hash_search(SharedConnStatsHash, &connKey, HASH_ENTER, &entryFound);
if (!entryFound)
{
connectionEntry->connectionCount = 1;
counterIncremented = true;
}
else if (connectionEntry->connectionCount + 1 >= MaxSharedPoolSize)
{
/* there is no space left for this connection */
counterIncremented = false;
}
else
{
connectionEntry->connectionCount++;
counterIncremented = true;
}
UnLockConnectionSharedMemory();
return counterIncremented;
}
/* /*
* LockConnectionSharedMemory is a utility function that should be used when * LockConnectionSharedMemory is a utility function that should be used when
* accessing to the SharedConnStatsHash, which is in the shared memory. * accessing to the SharedConnStatsHash, which is in the shared memory.
@ -284,8 +347,9 @@ SharedConnectionHashHash(const void *key, Size keysize)
{ {
SharedConnStatsHashKey *entry = (SharedConnStatsHashKey *) key; SharedConnStatsHashKey *entry = (SharedConnStatsHashKey *) key;
uint32 hash = hash_uint32(entry->nodeId); uint32 hash = string_hash(entry->hostname, NAMEDATALEN);
hash = hash_combine(hash, string_hash(entry->database, NAMEDATALEN)); hash = hash_combine(hash, hash_uint32(entry->port));
hash = hash_combine(hash, hash_uint32(entry->databaseOid));
return hash; return hash;
} }
@ -297,8 +361,9 @@ SharedConnectionHashCompare(const void *a, const void *b, Size keysize)
SharedConnStatsHashKey *ca = (SharedConnStatsHashKey *) a; SharedConnStatsHashKey *ca = (SharedConnStatsHashKey *) a;
SharedConnStatsHashKey *cb = (SharedConnStatsHashKey *) b; SharedConnStatsHashKey *cb = (SharedConnStatsHashKey *) b;
if (ca->nodeId != cb->nodeId || if (strncmp(ca->hostname, cb->hostname, MAX_NODE_LENGTH) != 0 ||
strncmp(ca->database, cb->database, NAMEDATALEN) != 0) ca->port != cb->port ||
ca->databaseOid != cb->databaseOid)
{ {
return 1; return 1;
} }

View File

@ -1,6 +1,6 @@
CREATE OR REPLACE FUNCTION citus_remote_connection_stats(OUT node_id int, OUT database_name text, OUT connection_count_to_node int) CREATE OR REPLACE FUNCTION citus_remote_connection_stats(OUT hostname text, OUT port int, OUT database_name text, OUT connection_count_to_node int)
RETURNS SETOF RECORD RETURNS SETOF RECORD
LANGUAGE C STRICT LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$citus_remote_connection_stats$$; AS 'MODULE_PATHNAME', $$citus_remote_connection_stats$$;
COMMENT ON FUNCTION citus_remote_connection_stats(OUT node_id int, OUT database_name text, OUT connection_count_to_node int) COMMENT ON FUNCTION citus_remote_connection_stats(OUT hostname text, OUT port int, OUT database_name text, OUT connection_count_to_node int)
IS 'returns statistics about remote connections'; IS 'returns statistics about remote connections';

View File

@ -1,6 +1,6 @@
CREATE OR REPLACE FUNCTION citus_remote_connection_stats(OUT node_id int, OUT database_name text, OUT connection_count_to_node int) CREATE OR REPLACE FUNCTION citus_remote_connection_stats(OUT hostname text, OUT port int, OUT database_name text, OUT connection_count_to_node int)
RETURNS SETOF RECORD RETURNS SETOF RECORD
LANGUAGE C STRICT LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$citus_remote_connection_stats$$; AS 'MODULE_PATHNAME', $$citus_remote_connection_stats$$;
COMMENT ON FUNCTION citus_remote_connection_stats(OUT node_id int, OUT database_name text, OUT connection_count_to_node int) COMMENT ON FUNCTION citus_remote_connection_stats(OUT hostname text, OUT port int, OUT database_name text, OUT connection_count_to_node int)
IS 'returns statistics about remote connections'; IS 'returns statistics about remote connections';

View File

@ -14,5 +14,6 @@
extern int MaxTrackedWorkerNodes; extern int MaxTrackedWorkerNodes;
extern void InitializeSharedConnectionStats(void); extern void InitializeSharedConnectionStats(void);
extern bool TryToIncrementSharedConnectionCounter(const char *hostname, int port);
#endif /* SHARED_CONNECTION_STATS_H */ #endif /* SHARED_CONNECTION_STATS_H */