From 66e7b822aa22c3ba7445c276c71fd8dbbc3f28d1 Mon Sep 17 00:00:00 2001 From: Onder Kalaci Date: Tue, 24 Mar 2020 13:20:40 +0100 Subject: [PATCH] Basic implementation for TryIncrementSharedConnectionCounter --- .../connection/connection_management.c | 2 + .../connection/shared_connection_stats.c | 89 ++++++++++++++++--- .../citus_remote_connection_stats/9.3-2.sql | 4 +- .../citus_remote_connection_stats/latest.sql | 4 +- .../distributed/shared_connection_stats.h | 1 + 5 files changed, 84 insertions(+), 16 deletions(-) diff --git a/src/backend/distributed/connection/connection_management.c b/src/backend/distributed/connection/connection_management.c index 9c5bd17a5..27782ee9e 100644 --- a/src/backend/distributed/connection/connection_management.c +++ b/src/backend/distributed/connection/connection_management.c @@ -28,6 +28,7 @@ #include "distributed/hash_helpers.h" #include "distributed/placement_connection.h" #include "distributed/run_from_same_connection.h" +#include "distributed/shared_connection_stats.h" #include "distributed/cancel_utils.h" #include "distributed/remote_commands.h" #include "distributed/version_compat.h" @@ -321,6 +322,7 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port, * Either no caching desired, or no pre-established, non-claimed, * connection present. Initiate connection establishment. */ + TryToIncrementSharedConnectionCounter(hostname, port); connection = StartConnectionEstablishment(&key); diff --git a/src/backend/distributed/connection/shared_connection_stats.c b/src/backend/distributed/connection/shared_connection_stats.c index 7fb9750bc..63e7b285d 100644 --- a/src/backend/distributed/connection/shared_connection_stats.c +++ b/src/backend/distributed/connection/shared_connection_stats.c @@ -20,16 +20,18 @@ #include "access/hash.h" #include "access/htup_details.h" #include "catalog/pg_authid.h" +#include "commands/dbcommands.h" #include "distributed/connection_management.h" #include "distributed/metadata_cache.h" #include "distributed/shared_connection_stats.h" #include "distributed/tuplestore.h" +#include "utils/builtins.h" #include "utils/hashutils.h" #include "utils/hsearch.h" #include "storage/ipc.h" -#define REMOTE_CONNECTION_STATS_COLUMNS 6 +#define REMOTE_CONNECTION_STATS_COLUMNS 4 /* @@ -48,16 +50,18 @@ typedef struct ConnectionStatsSharedData typedef struct SharedConnStatsHashKey { /* - * Using nodeId (over hostname/hostport) make the tracking resiliant to - * master_update_node(). Plus, requires a little less memory. + * In some cases, Citus allows opening connections to hosts where + * there is no notion of "WorkerNode", such as task-tracker daemon. + * That's why, we prefer to use "hostname/port" over nodeId. */ - uint32 nodeId; + char hostname[MAX_NODE_LENGTH]; + int32 port; /* * Given that citus.shared_max_pool_size can be defined per database, we * should keep track of shared connections per database. */ - char database[NAMEDATALEN]; + Oid databaseOid; } SharedConnStatsHashKey; /* hash entry for per worker stats */ @@ -79,6 +83,8 @@ typedef struct SharedConnStatsHashEntry */ int MaxTrackedWorkerNodes = 1024; +static int MaxSharedPoolSize = 100; + /* the following two structs used for accessing shared memory */ static HTAB *SharedConnStatsHash = NULL; static ConnectionStatsSharedData *ConnectionStatsSharedState = NULL; @@ -147,9 +153,12 @@ StoreAllConnections(Tuplestorestate *tupleStore, TupleDesc tupleDescriptor) memset(values, 0, sizeof(values)); memset(isNulls, false, sizeof(isNulls)); - values[0] = Int32GetDatum(connectionEntry->key.nodeId); - values[1] = PointerGetDatum(connectionEntry->key.database); - values[2] = Int32GetDatum(connectionEntry->connectionCount); + char *databaseName = get_database_name(connectionEntry->key.databaseOid); + + values[0] = PointerGetDatum(cstring_to_text(connectionEntry->key.hostname)); + values[1] = Int32GetDatum(connectionEntry->key.port); + values[2] = PointerGetDatum(cstring_to_text(databaseName)); + values[3] = Int32GetDatum(connectionEntry->connectionCount); tuplestore_putvalues(tupleStore, tupleDescriptor, values, isNulls); } @@ -158,6 +167,60 @@ StoreAllConnections(Tuplestorestate *tupleStore, TupleDesc tupleDescriptor) } +/* + * Tries to increment the shared connection counter for the given nodeId and + * the current database in SharedConnStatsHash. + * + * The function first checks whether the number of connections is less than + * citus.max_shared_pool_size. If so, the function increments the counter + * by one and returns true. Else, the function returns false. + */ +bool +TryToIncrementSharedConnectionCounter(const char *hostname, int port) +{ + bool counterIncremented = false; + SharedConnStatsHashKey connKey; + + strlcpy(connKey.hostname, hostname, MAX_NODE_LENGTH); + if (strlen(hostname) > MAX_NODE_LENGTH) + { + ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("hostname exceeds the maximum length of %d", + MAX_NODE_LENGTH))); + } + + connKey.port = port; + connKey.databaseOid = MyDatabaseId; + + LockConnectionSharedMemory(LW_EXCLUSIVE); + + bool entryFound = false; + SharedConnStatsHashEntry *connectionEntry = + hash_search(SharedConnStatsHash, &connKey, HASH_ENTER, &entryFound); + + if (!entryFound) + { + connectionEntry->connectionCount = 1; + + counterIncremented = true; + } + else if (connectionEntry->connectionCount + 1 >= MaxSharedPoolSize) + { + /* there is no space left for this connection */ + counterIncremented = false; + } + else + { + connectionEntry->connectionCount++; + counterIncremented = true; + } + + UnLockConnectionSharedMemory(); + + return counterIncremented; +} + + /* * LockConnectionSharedMemory is a utility function that should be used when * accessing to the SharedConnStatsHash, which is in the shared memory. @@ -284,8 +347,9 @@ SharedConnectionHashHash(const void *key, Size keysize) { SharedConnStatsHashKey *entry = (SharedConnStatsHashKey *) key; - uint32 hash = hash_uint32(entry->nodeId); - hash = hash_combine(hash, string_hash(entry->database, NAMEDATALEN)); + uint32 hash = string_hash(entry->hostname, NAMEDATALEN); + hash = hash_combine(hash, hash_uint32(entry->port)); + hash = hash_combine(hash, hash_uint32(entry->databaseOid)); return hash; } @@ -297,8 +361,9 @@ SharedConnectionHashCompare(const void *a, const void *b, Size keysize) SharedConnStatsHashKey *ca = (SharedConnStatsHashKey *) a; SharedConnStatsHashKey *cb = (SharedConnStatsHashKey *) b; - if (ca->nodeId != cb->nodeId || - strncmp(ca->database, cb->database, NAMEDATALEN) != 0) + if (strncmp(ca->hostname, cb->hostname, MAX_NODE_LENGTH) != 0 || + ca->port != cb->port || + ca->databaseOid != cb->databaseOid) { return 1; } diff --git a/src/backend/distributed/sql/udfs/citus_remote_connection_stats/9.3-2.sql b/src/backend/distributed/sql/udfs/citus_remote_connection_stats/9.3-2.sql index 3369c1ac7..873f2a8c5 100644 --- a/src/backend/distributed/sql/udfs/citus_remote_connection_stats/9.3-2.sql +++ b/src/backend/distributed/sql/udfs/citus_remote_connection_stats/9.3-2.sql @@ -1,6 +1,6 @@ -CREATE OR REPLACE FUNCTION citus_remote_connection_stats(OUT node_id int, OUT database_name text, OUT connection_count_to_node int) +CREATE OR REPLACE FUNCTION citus_remote_connection_stats(OUT hostname text, OUT port int, OUT database_name text, OUT connection_count_to_node int) RETURNS SETOF RECORD LANGUAGE C STRICT AS 'MODULE_PATHNAME', $$citus_remote_connection_stats$$; - COMMENT ON FUNCTION citus_remote_connection_stats(OUT node_id int, OUT database_name text, OUT connection_count_to_node int) + COMMENT ON FUNCTION citus_remote_connection_stats(OUT hostname text, OUT port int, OUT database_name text, OUT connection_count_to_node int) IS 'returns statistics about remote connections'; diff --git a/src/backend/distributed/sql/udfs/citus_remote_connection_stats/latest.sql b/src/backend/distributed/sql/udfs/citus_remote_connection_stats/latest.sql index 3369c1ac7..873f2a8c5 100644 --- a/src/backend/distributed/sql/udfs/citus_remote_connection_stats/latest.sql +++ b/src/backend/distributed/sql/udfs/citus_remote_connection_stats/latest.sql @@ -1,6 +1,6 @@ -CREATE OR REPLACE FUNCTION citus_remote_connection_stats(OUT node_id int, OUT database_name text, OUT connection_count_to_node int) +CREATE OR REPLACE FUNCTION citus_remote_connection_stats(OUT hostname text, OUT port int, OUT database_name text, OUT connection_count_to_node int) RETURNS SETOF RECORD LANGUAGE C STRICT AS 'MODULE_PATHNAME', $$citus_remote_connection_stats$$; - COMMENT ON FUNCTION citus_remote_connection_stats(OUT node_id int, OUT database_name text, OUT connection_count_to_node int) + COMMENT ON FUNCTION citus_remote_connection_stats(OUT hostname text, OUT port int, OUT database_name text, OUT connection_count_to_node int) IS 'returns statistics about remote connections'; diff --git a/src/include/distributed/shared_connection_stats.h b/src/include/distributed/shared_connection_stats.h index 9e9e2102e..f3dd45833 100644 --- a/src/include/distributed/shared_connection_stats.h +++ b/src/include/distributed/shared_connection_stats.h @@ -14,5 +14,6 @@ extern int MaxTrackedWorkerNodes; extern void InitializeSharedConnectionStats(void); +extern bool TryToIncrementSharedConnectionCounter(const char *hostname, int port); #endif /* SHARED_CONNECTION_STATS_H */