mirror of https://github.com/citusdata/citus.git
Add the necessary shared memory infrastructure
- The hashmap in the shared memory - The lock to access the hashmap - The GUC to control the sizepull/3692/head
parent
4e3d402473
commit
0dbfbe0c37
|
@ -0,0 +1,217 @@
|
|||
/*-------------------------------------------------------------------------
|
||||
*
|
||||
* shared_connection_stats.c
|
||||
* Keeps track of the number of connections to remote nodes across
|
||||
* backends. The primary goal is to prevent excessive number of
|
||||
* connections (typically > max_connections) to any worker node.
|
||||
*
|
||||
* Copyright (c) Citus Data, Inc.
|
||||
*
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
|
||||
#include "postgres.h"
|
||||
#include "pgstat.h"
|
||||
|
||||
#include "libpq-fe.h"
|
||||
|
||||
#include "miscadmin.h"
|
||||
|
||||
#include "access/hash.h"
|
||||
|
||||
#include "distributed/connection_management.h"
|
||||
#include "distributed/shared_connection_stats.h"
|
||||
#include "utils/hashutils.h"
|
||||
#include "utils/hsearch.h"
|
||||
#include "storage/ipc.h"
|
||||
|
||||
|
||||
/*
|
||||
* The data structure used to store data in shared memory. This data structure only
|
||||
* used for storing the lock. The actual statistics about the connections are stored
|
||||
* in the hashmap, which is allocated separately, as Postgres provides different APIs
|
||||
* for allocating hashmaps in the shared memory.
|
||||
*/
|
||||
typedef struct ConnectionStatsSharedData
|
||||
{
|
||||
int sharedConnectionHashTrancheId;
|
||||
char *sharedConnectionHashTrancheName;
|
||||
LWLock sharedConnectionHashLock;
|
||||
} ConnectionStatsSharedData;
|
||||
|
||||
typedef struct SharedConnStatsHashKey
|
||||
{
|
||||
/*
|
||||
* Using nodeId (over hostname/hostport) make the tracking resiliant to
|
||||
* master_update_node(). Plus, requires a little less memory.
|
||||
*/
|
||||
uint32 nodeId;
|
||||
|
||||
/*
|
||||
* Given that citus.shared_max_pool_size can be defined per database, we
|
||||
* should keep track of shared connections per database.
|
||||
*/
|
||||
char database[NAMEDATALEN];
|
||||
} SharedConnStatsHashKey;
|
||||
|
||||
/* hash entry for per worker stats */
|
||||
typedef struct SharedConnStatsHashEntry
|
||||
{
|
||||
SharedConnStatsHashKey key;
|
||||
|
||||
int connectionCount;
|
||||
} SharedConnStatsHashEntry;
|
||||
|
||||
|
||||
/*
|
||||
* Controlled via a GUC.
|
||||
*
|
||||
* By default, Citus tracks 1024 worker nodes, which is already
|
||||
* very unlikely number of worker nodes. Given that the shared
|
||||
* memory required per worker is pretty small (~120 Bytes), we think it
|
||||
* is a good default that wouldn't hurt any users in any dimension.
|
||||
*/
|
||||
int MaxTrackedWorkerNodes = 1024;
|
||||
|
||||
/* the following two structs used for accessing shared memory */
|
||||
static HTAB *SharedConnStatsHash = NULL;
|
||||
static ConnectionStatsSharedData *ConnectionStatsSharedState = NULL;
|
||||
|
||||
|
||||
static shmem_startup_hook_type prev_shmem_startup_hook = NULL;
|
||||
|
||||
|
||||
/* local function declarations */
|
||||
static void SharedConnectionStatsShmemInit(void);
|
||||
static size_t SharedConnectionStatsShmemSize(void);
|
||||
static int SharedConnectionHashCompare(const void *a, const void *b, Size keysize);
|
||||
static uint32 SharedConnectionHashHash(const void *key, Size keysize);
|
||||
|
||||
|
||||
/*
|
||||
* InitializeSharedConnectionStats requests the necessary shared memory
|
||||
* from Postgres and sets up the shared memory startup hook.
|
||||
*/
|
||||
void
|
||||
InitializeSharedConnectionStats(void)
|
||||
{
|
||||
/* allocate shared memory */
|
||||
if (!IsUnderPostmaster)
|
||||
{
|
||||
RequestAddinShmemSpace(SharedConnectionStatsShmemSize());
|
||||
}
|
||||
|
||||
prev_shmem_startup_hook = shmem_startup_hook;
|
||||
shmem_startup_hook = SharedConnectionStatsShmemInit;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* SharedConnectionStatsShmemSize returns the size that should be allocated
|
||||
* on the shared memory for shared connection stats.
|
||||
*/
|
||||
static size_t
|
||||
SharedConnectionStatsShmemSize(void)
|
||||
{
|
||||
Size size = 0;
|
||||
|
||||
size = add_size(size, sizeof(ConnectionStatsSharedData));
|
||||
size = add_size(size, mul_size(sizeof(LWLock), MaxTrackedWorkerNodes));
|
||||
|
||||
Size hashSize = hash_estimate_size(MaxTrackedWorkerNodes,
|
||||
sizeof(SharedConnStatsHashEntry));
|
||||
|
||||
size = add_size(size, hashSize);
|
||||
|
||||
return size;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* SharedConnectionStatsShmemInit initializes the shared memory used
|
||||
* for keeping track of connection stats across backends.
|
||||
*/
|
||||
static void
|
||||
SharedConnectionStatsShmemInit(void)
|
||||
{
|
||||
bool alreadyInitialized = false;
|
||||
HASHCTL info;
|
||||
|
||||
/* create (nodeId,database) -> [counter] */
|
||||
memset(&info, 0, sizeof(info));
|
||||
info.keysize = sizeof(SharedConnStatsHashKey);
|
||||
info.entrysize = sizeof(SharedConnStatsHashEntry);
|
||||
info.hash = SharedConnectionHashHash;
|
||||
info.match = SharedConnectionHashCompare;
|
||||
uint32 hashFlags = (HASH_ELEM | HASH_FUNCTION | HASH_COMPARE);
|
||||
|
||||
/*
|
||||
* Currently the lock isn't required because allocation only happens at
|
||||
* startup in postmaster, but it doesn't hurt, and makes things more
|
||||
* consistent with other extensions.
|
||||
*/
|
||||
LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE);
|
||||
|
||||
ConnectionStatsSharedState =
|
||||
(ConnectionStatsSharedData *) ShmemInitStruct(
|
||||
"Shared Connection Stats Data",
|
||||
sizeof(ConnectionStatsSharedData),
|
||||
&alreadyInitialized);
|
||||
|
||||
if (!alreadyInitialized)
|
||||
{
|
||||
ConnectionStatsSharedState->sharedConnectionHashTrancheId = LWLockNewTrancheId();
|
||||
ConnectionStatsSharedState->sharedConnectionHashTrancheName =
|
||||
"Shared Connection Tracking Hash Tranche";
|
||||
LWLockRegisterTranche(ConnectionStatsSharedState->sharedConnectionHashTrancheId,
|
||||
ConnectionStatsSharedState->sharedConnectionHashTrancheName);
|
||||
|
||||
LWLockInitialize(&ConnectionStatsSharedState->sharedConnectionHashLock,
|
||||
ConnectionStatsSharedState->sharedConnectionHashTrancheId);
|
||||
}
|
||||
|
||||
/* allocate hash table */
|
||||
SharedConnStatsHash =
|
||||
ShmemInitHash("Shared Conn. Stats Hash", MaxTrackedWorkerNodes,
|
||||
MaxTrackedWorkerNodes, &info, hashFlags);
|
||||
|
||||
LWLockRelease(AddinShmemInitLock);
|
||||
|
||||
Assert(SharedConnStatsHash != NULL);
|
||||
Assert(ConnectionStatsSharedState->sharedConnectionHashTrancheId != 0);
|
||||
|
||||
if (prev_shmem_startup_hook != NULL)
|
||||
{
|
||||
prev_shmem_startup_hook();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
static uint32
|
||||
SharedConnectionHashHash(const void *key, Size keysize)
|
||||
{
|
||||
SharedConnStatsHashKey *entry = (SharedConnStatsHashKey *) key;
|
||||
|
||||
uint32 hash = hash_uint32(entry->nodeId);
|
||||
hash = hash_combine(hash, string_hash(entry->database, NAMEDATALEN));
|
||||
|
||||
return hash;
|
||||
}
|
||||
|
||||
|
||||
static int
|
||||
SharedConnectionHashCompare(const void *a, const void *b, Size keysize)
|
||||
{
|
||||
SharedConnStatsHashKey *ca = (SharedConnStatsHashKey *) a;
|
||||
SharedConnStatsHashKey *cb = (SharedConnStatsHashKey *) b;
|
||||
|
||||
if (ca->nodeId != cb->nodeId ||
|
||||
strncmp(ca->database, cb->database, NAMEDATALEN) != 0)
|
||||
{
|
||||
return 1;
|
||||
}
|
||||
else
|
||||
{
|
||||
return 0;
|
||||
}
|
||||
}
|
|
@ -56,6 +56,7 @@
|
|||
#include "distributed/reference_table_utils.h"
|
||||
#include "distributed/relation_access_tracking.h"
|
||||
#include "distributed/run_from_same_connection.h"
|
||||
#include "distributed/shared_connection_stats.h"
|
||||
#include "distributed/query_pushdown_planning.h"
|
||||
#include "distributed/time_constants.h"
|
||||
#include "distributed/query_stats.h"
|
||||
|
@ -271,6 +272,7 @@ _PG_init(void)
|
|||
InitializeConnectionManagement();
|
||||
InitPlacementConnectionManagement();
|
||||
InitializeCitusQueryStats();
|
||||
InitializeSharedConnectionStats();
|
||||
|
||||
atexit(CitusBackendAtExit);
|
||||
|
||||
|
@ -1011,6 +1013,23 @@ RegisterCitusConfigVariables(void)
|
|||
GUC_STANDARD,
|
||||
NULL, NULL, NULL);
|
||||
|
||||
DefineCustomIntVariable(
|
||||
"citus.max_tracked_worker_nodes",
|
||||
gettext_noop("Sets the maximum number of worker tracked."),
|
||||
gettext_noop("Citus doesn't have any limitations in terms of the "
|
||||
"number of worker nodes allowed in the cluster. But, "
|
||||
"Citus keeps some information about the worker nodes "
|
||||
"in the shared memory for certain optimizations. The "
|
||||
"optimizations are enforced up to this number of worker "
|
||||
"nodes. Any additional worker nodes may not benefit from"
|
||||
"the optimizations."),
|
||||
&MaxTrackedWorkerNodes,
|
||||
1024, 256, INT_MAX,
|
||||
PGC_POSTMASTER,
|
||||
GUC_STANDARD,
|
||||
NULL, NULL, NULL);
|
||||
|
||||
|
||||
DefineCustomIntVariable(
|
||||
"citus.max_running_tasks_per_node",
|
||||
gettext_noop("Sets the maximum number of tasks to run concurrently per node."),
|
||||
|
|
|
@ -0,0 +1,18 @@
|
|||
/*-------------------------------------------------------------------------
|
||||
*
|
||||
* shared_connection_stats.h
|
||||
* Central management of connections and their life-cycle
|
||||
*
|
||||
* Copyright (c) Citus Data, Inc.
|
||||
*
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
|
||||
#ifndef SHARED_CONNECTION_STATS_H
|
||||
#define SHARED_CONNECTION_STATS_H
|
||||
|
||||
extern int MaxTrackedWorkerNodes;
|
||||
|
||||
extern void InitializeSharedConnectionStats(void);
|
||||
|
||||
#endif /* SHARED_CONNECTION_STATS_H */
|
Loading…
Reference in New Issue