Initial hashtable implementation for tenant stats

pull/6868/head
Gokhan Gulbiz 2023-04-24 14:53:03 +03:00
parent b437aa9e52
commit ccd464ba04
No known key found for this signature in database
GPG Key ID: 608EF06B6BD1B45B
5 changed files with 209 additions and 79 deletions

View File

@ -2003,7 +2003,7 @@ GenerateTaskMoveDependencyList(PlacementUpdateEvent *move, int64 colocationId,
* overlaps with the current move's target node.
* The earlier/first move might make space for the later/second move.
* So we could run out of disk space (or at least overload the node)
* if we move the second shard to it before the first one is moved away. 
* if we move the second shard to it before the first one is moved away.
*/
ShardMoveSourceNodeHashEntry *shardMoveSourceNodeHashEntry = hash_search(
shardMoveDependencies.nodeDependencies, &move->targetNode->nodeId, HASH_FIND,

View File

@ -11,6 +11,7 @@
#include "postgres.h"
#include "unistd.h"
#include "access/hash.h"
#include "distributed/citus_safe_lib.h"
#include "distributed/colocation_utils.h"
#include "distributed/distributed_planner.h"
@ -59,12 +60,16 @@ static void UpdatePeriodsIfNecessary(TenantStats *tenantStats, TimestampTz query
static void ReduceScoreIfNecessary(TenantStats *tenantStats, TimestampTz queryTime);
static void EvictTenantsIfNecessary(TimestampTz queryTime);
static void RecordTenantStats(TenantStats *tenantStats, TimestampTz queryTime);
static void CreateMultiTenantMonitor(void);
static MultiTenantMonitor * CreateSharedMemoryForMultiTenantMonitor(void);
static MultiTenantMonitor * GetMultiTenantMonitor(void);
static void MultiTenantMonitorSMInit(void);
static int CreateTenantStats(MultiTenantMonitor *monitor, TimestampTz queryTime);
static int FindTenantStats(MultiTenantMonitor *monitor);
static TenantStats * CreateTenantStats(MultiTenantMonitor *monitor, TimestampTz
queryTime);
static TenantStatsHashKey * CreateTenantStatsHashKey(char *tenantAttribute, uint32
colocationGroupId);
static TenantStats * FindTenantStats(MultiTenantMonitor *monitor);
static uint32 TenantStatsHashFn(const void *key, Size keysize);
static int TenantStatsMatchFn(const void *key1, const void *key2, Size keysize);
static size_t MultiTenantMonitorshmemSize(void);
static char * ExtractTopComment(const char *inputString);
static char * EscapeCommentChars(const char *str);
@ -112,21 +117,37 @@ citus_stat_tenants_local(PG_FUNCTION_ARGS)
LWLockAcquire(&monitor->lock, LW_EXCLUSIVE);
int numberOfRowsToReturn = 0;
int tenantStatsCount = hash_get_num_entries(monitor->tenants);
if (returnAllTenants)
{
numberOfRowsToReturn = monitor->tenantCount;
numberOfRowsToReturn = tenantStatsCount;
}
else
{
numberOfRowsToReturn = Min(monitor->tenantCount, StatTenantsLimit);
numberOfRowsToReturn = Min(tenantStatsCount,
StatTenantsLimit);
}
for (int tenantIndex = 0; tenantIndex < monitor->tenantCount; tenantIndex++)
TenantStats **stats = palloc(tenantStatsCount *
sizeof(TenantStats *));
HASH_SEQ_STATUS hash_seq;
TenantStats *stat;
int j = 0;
hash_seq_init(&hash_seq, monitor->tenants);
while ((stat = hash_seq_search(&hash_seq)) != NULL)
{
UpdatePeriodsIfNecessary(&monitor->tenants[tenantIndex], monitoringTime);
ReduceScoreIfNecessary(&monitor->tenants[tenantIndex], monitoringTime);
SpinLockAcquire(&stat->lock);
stats[j++] = stat;
UpdatePeriodsIfNecessary(stat, monitoringTime);
ReduceScoreIfNecessary(stat, monitoringTime);
SpinLockRelease(&stat->lock);
}
SafeQsort(monitor->tenants, monitor->tenantCount, sizeof(TenantStats),
SafeQsort(stats, j, sizeof(TenantStats *),
CompareTenantScore);
for (int i = 0; i < numberOfRowsToReturn; i++)
@ -134,10 +155,12 @@ citus_stat_tenants_local(PG_FUNCTION_ARGS)
memset(values, 0, sizeof(values));
memset(isNulls, false, sizeof(isNulls));
TenantStats *tenantStats = &monitor->tenants[i];
TenantStats *tenantStats = stats[i];
values[0] = Int32GetDatum(tenantStats->colocationGroupId);
values[1] = PointerGetDatum(cstring_to_text(tenantStats->tenantAttribute));
SpinLockAcquire(&tenantStats->lock);
values[0] = Int32GetDatum(tenantStats->key.colocationGroupId);
values[1] = PointerGetDatum(cstring_to_text(tenantStats->key.tenantAttribute));
values[2] = Int32GetDatum(tenantStats->readsInThisPeriod);
values[3] = Int32GetDatum(tenantStats->readsInLastPeriod);
values[4] = Int32GetDatum(tenantStats->readsInThisPeriod +
@ -148,15 +171,21 @@ citus_stat_tenants_local(PG_FUNCTION_ARGS)
values[7] = Float8GetDatum(tenantStats->cpuUsageInLastPeriod);
values[8] = Int64GetDatum(tenantStats->score);
SpinLockRelease(&tenantStats->lock);
tuplestore_putvalues(tupleStore, tupleDescriptor, values, isNulls);
}
pfree(stats);
LWLockRelease(&monitor->lock);
PG_RETURN_VOID();
}
#include "miscadmin.h"
/*
* citus_stat_tenants_local_reset resets monitor for tenant statistics
* on the local node.
@ -164,8 +193,44 @@ citus_stat_tenants_local(PG_FUNCTION_ARGS)
Datum
citus_stat_tenants_local_reset(PG_FUNCTION_ARGS)
{
/* ereport(NOTICE, (errmsg("MyProcPid: %d", MyProcPid))); */
/*sleep(10); */
MultiTenantMonitor *monitor = GetMultiTenantMonitor();
monitor->tenantCount = 0;
/* if monitor is not created yet, there is nothing to reset */
if (monitor == NULL)
{
PG_RETURN_VOID();
}
HASH_SEQ_STATUS hash_seq;
TenantStats *stats;
LWLockAcquire(&monitor->lock, LW_EXCLUSIVE);
hash_seq_init(&hash_seq, monitor->tenants);
while ((stats = hash_seq_search(&hash_seq)) != NULL)
{
bool found = false;
hash_search(monitor->tenants, &stats->key, HASH_REMOVE, &found);
if (found)
{
stats->writesInLastPeriod = 0;
stats->writesInThisPeriod = 0;
stats->readsInLastPeriod = 0;
stats->readsInThisPeriod = 0;
stats->cpuUsageInLastPeriod = 0;
stats->cpuUsageInThisPeriod = 0;
stats->score = 0;
stats->lastScoreReduction = 0;
/* pfree(stats); */
}
}
LWLockRelease(&monitor->lock);
PG_RETURN_VOID();
}
@ -352,11 +417,10 @@ AttributeMetricsIfApplicable()
*/
LWLockAcquire(&monitor->lock, LW_SHARED);
int currentTenantIndex = FindTenantStats(monitor);
TenantStats *tenantStats = FindTenantStats(monitor);
if (currentTenantIndex != -1)
if (tenantStats != NULL)
{
TenantStats *tenantStats = &monitor->tenants[currentTenantIndex];
SpinLockAcquire(&tenantStats->lock);
UpdatePeriodsIfNecessary(tenantStats, queryTime);
@ -370,20 +434,19 @@ AttributeMetricsIfApplicable()
LWLockRelease(&monitor->lock);
LWLockAcquire(&monitor->lock, LW_EXCLUSIVE);
currentTenantIndex = FindTenantStats(monitor);
tenantStats = FindTenantStats(monitor);
if (currentTenantIndex == -1)
if (tenantStats == NULL)
{
currentTenantIndex = CreateTenantStats(monitor, queryTime);
tenantStats = CreateTenantStats(monitor, queryTime);
}
LWLockRelease(&monitor->lock);
LWLockAcquire(&monitor->lock, LW_SHARED);
currentTenantIndex = FindTenantStats(monitor);
if (currentTenantIndex != -1)
tenantStats = FindTenantStats(monitor);
if (tenantStats != NULL)
{
TenantStats *tenantStats = &monitor->tenants[currentTenantIndex];
SpinLockAcquire(&tenantStats->lock);
UpdatePeriodsIfNecessary(tenantStats, queryTime);
@ -506,15 +569,29 @@ EvictTenantsIfNecessary(TimestampTz queryTime)
*
* Every time tenant count hits StatTenantsLimit * 3, we reduce it back to StatTenantsLimit * 2.
*/
if (monitor->tenantCount >= StatTenantsLimit * 3)
long tenantStatsCount = hash_get_num_entries(monitor->tenants);
if (tenantStatsCount >= StatTenantsLimit * 3)
{
for (int tenantIndex = 0; tenantIndex < monitor->tenantCount; tenantIndex++)
HASH_SEQ_STATUS hash_seq;
TenantStats *stat;
TenantStats **stats = palloc(tenantStatsCount *
sizeof(TenantStats *));
int i = 0;
hash_seq_init(&hash_seq, monitor->tenants);
while ((stat = hash_seq_search(&hash_seq)) != NULL)
{
ReduceScoreIfNecessary(&monitor->tenants[tenantIndex], queryTime);
stats[i++] = stat;
}
SafeQsort(monitor->tenants, monitor->tenantCount, sizeof(TenantStats),
CompareTenantScore);
monitor->tenantCount = StatTenantsLimit * 2;
SafeQsort(stats, i, sizeof(TenantStats *), CompareTenantScore);
for (i = StatTenantsLimit * 2; i < tenantStatsCount; i++)
{
hash_search(monitor->tenants, &stats[i]->key, HASH_REMOVE, NULL);
}
pfree(stats);
}
}
@ -552,17 +629,6 @@ RecordTenantStats(TenantStats *tenantStats, TimestampTz queryTime)
}
/*
* CreateMultiTenantMonitor creates the data structure for multi tenant monitor.
*/
static void
CreateMultiTenantMonitor()
{
MultiTenantMonitor *monitor = CreateSharedMemoryForMultiTenantMonitor();
monitor->tenantCount = 0;
}
/*
* CreateSharedMemoryForMultiTenantMonitor creates a dynamic shared memory segment for multi tenant monitor.
*/
@ -585,6 +651,19 @@ CreateSharedMemoryForMultiTenantMonitor()
monitor->namedLockTranche.trancheName);
LWLockInitialize(&monitor->lock, monitor->namedLockTranche.trancheId);
HASHCTL info;
memset(&info, 0, sizeof(info));
info.keysize = sizeof(TenantStatsHashKey);
info.entrysize = sizeof(TenantStats);
info.hash = TenantStatsHashFn;
info.match = TenantStatsMatchFn;
monitor->tenants = ShmemInitHash("citus_stats_tenants hash",
StatTenantsLimit * 3, StatTenantsLimit * 3,
&info, HASH_ELEM | HASH_FUNCTION | HASH_COMPARE |
HASH_SHARED_MEM);
return monitor;
}
@ -628,7 +707,7 @@ InitializeMultiTenantMonitorSMHandleManagement()
static void
MultiTenantMonitorSMInit()
{
CreateMultiTenantMonitor();
CreateSharedMemoryForMultiTenantMonitor();
if (prev_shmem_startup_hook != NULL)
{
@ -642,7 +721,7 @@ MultiTenantMonitorSMInit()
*
* Calling this function should be protected by the monitor->lock in LW_EXCLUSIVE mode.
*/
static int
static TenantStats *
CreateTenantStats(MultiTenantMonitor *monitor, TimestampTz queryTime)
{
/*
@ -651,38 +730,85 @@ CreateTenantStats(MultiTenantMonitor *monitor, TimestampTz queryTime)
*/
EvictTenantsIfNecessary(queryTime);
int tenantIndex = monitor->tenantCount;
bool found;
TenantStatsHashKey *key = CreateTenantStatsHashKey(AttributeToTenant,
AttributeToColocationGroupId);
memset(&monitor->tenants[tenantIndex], 0, sizeof(monitor->tenants[tenantIndex]));
TenantStats *stats = (TenantStats *) hash_search(monitor->tenants, key,
HASH_ENTER, &found);
strcpy_s(monitor->tenants[tenantIndex].tenantAttribute,
sizeof(monitor->tenants[tenantIndex].tenantAttribute), AttributeToTenant);
monitor->tenants[tenantIndex].colocationGroupId = AttributeToColocationGroupId;
SpinLockInit(&monitor->tenants[tenantIndex].lock);
pfree(key);
monitor->tenantCount++;
if (!found)
{
/* initialize the stats lock for the new entry in the hash table */
SpinLockInit(&stats->lock);
}
return tenantIndex;
return stats;
}
/*
* FindTenantStats finds the index for the current tenant's statistics.
* FindTenantStats finds the current tenant's statistics.
*/
static int
static TenantStats *
FindTenantStats(MultiTenantMonitor *monitor)
{
for (int i = 0; i < monitor->tenantCount; i++)
{
TenantStats *tenantStats = &monitor->tenants[i];
if (strcmp(tenantStats->tenantAttribute, AttributeToTenant) == 0 &&
tenantStats->colocationGroupId == AttributeToColocationGroupId)
{
return i;
}
}
TenantStatsHashKey *key = CreateTenantStatsHashKey(AttributeToTenant,
AttributeToColocationGroupId);
return -1;
TenantStats *stats = (TenantStats *) hash_search(monitor->tenants, key,
HASH_FIND, NULL);
pfree(key);
return stats;
}
static TenantStatsHashKey *
CreateTenantStatsHashKey(char *tenantAttribute, uint32 colocationGroupId)
{
TenantStatsHashKey *key = (TenantStatsHashKey *) palloc(sizeof(TenantStatsHashKey));
memset(key->tenantAttribute, 0, MAX_TENANT_ATTRIBUTE_LENGTH);
strlcpy(key->tenantAttribute, tenantAttribute, MAX_TENANT_ATTRIBUTE_LENGTH);
key->colocationGroupId = colocationGroupId;
return key;
}
/*
* CitusQuerysStatsHashFn calculates and returns hash value for a key
*/
static uint32
TenantStatsHashFn(const void *key, Size keysize)
{
const TenantStatsHashKey *k = (const TenantStatsHashKey *) key;
return hash_any((const unsigned char *) (k->tenantAttribute), strlen(
k->tenantAttribute)) ^
hash_uint32((uint32) k->colocationGroupId);
}
/*
* TenantStatsMatchFn compares two keys - zero means match.
* See definition of HashCompareFunc in hsearch.h for more info.
*/
static int
TenantStatsMatchFn(const void *key1, const void *key2, Size keysize)
{
const TenantStatsHashKey *k1 = (const TenantStatsHashKey *) key1;
const TenantStatsHashKey *k2 = (const TenantStatsHashKey *) key2;
if (strcmp(k1->tenantAttribute, k2->tenantAttribute) == 0 &&
k1->colocationGroupId == k2->colocationGroupId)
{
return 0;
}
return 1;
}

View File

@ -15,20 +15,27 @@
#include "executor/executor.h"
#include "storage/lwlock.h"
#include "utils/datetime.h"
#include "utils/hsearch.h"
#define MAX_TENANT_ATTRIBUTE_LENGTH 100
/*
* Hashtable key that defines the identity of a hashtable entry.
* The key is the attribute value, e.g distribution column and the colocation group id of the tenant.
*/
typedef struct TenantStatsHashKey
{
char tenantAttribute[MAX_TENANT_ATTRIBUTE_LENGTH];
int colocationGroupId;
} TenantStatsHashKey;
/*
* TenantStats is the struct that keeps statistics about one tenant.
*/
typedef struct TenantStats
{
/*
* The attribute value, e.g distribution column, and colocation group id
* of the tenant.
*/
char tenantAttribute[MAX_TENANT_ATTRIBUTE_LENGTH];
int colocationGroupId;
TenantStatsHashKey key; /* hash key of entry - MUST BE FIRST */
/*
* Number of SELECT queries this tenant ran in this and last periods.
@ -88,12 +95,9 @@ typedef struct MultiTenantMonitor
LWLock lock;
/*
* tenantCount is the number of items in the tenants array.
* The total length of tenants array is set up at CreateSharedMemoryForMultiTenantMonitor
* and is 3 * citus.stat_tenants_limit
* The max length of tenants hashtable is 3 * citus.stat_tenants_limit
*/
int tenantCount;
TenantStats tenants[FLEXIBLE_ARRAY_MEMBER];
HTAB *tenants;
} MultiTenantMonitor;
typedef enum

View File

@ -791,7 +791,7 @@ SELECT select_from_dist_tbl_text(U&'\0061\0308bc');
t
(1 row)
SELECT tenant_attribute, query_count_in_this_period FROM citus_stat_tenants;
SELECT tenant_attribute, query_count_in_this_period FROM citus_stat_tenants ORDER BY tenant_attribute;
tenant_attribute | query_count_in_this_period
---------------------------------------------------------------------
/b*c/de | 2
@ -817,7 +817,7 @@ CALL citus_stat_tenants.select_from_dist_tbl_text_proc(U&'\0061\0308bc');
CALL citus_stat_tenants.select_from_dist_tbl_text_proc(U&'\0061\0308bc');
CALL citus_stat_tenants.select_from_dist_tbl_text_proc(U&'\0061\0308bc');
CALL citus_stat_tenants.select_from_dist_tbl_text_proc(NULL);
SELECT tenant_attribute, query_count_in_this_period FROM citus_stat_tenants;
SELECT tenant_attribute, query_count_in_this_period FROM citus_stat_tenants ORDER BY tenant_attribute;
tenant_attribute | query_count_in_this_period
---------------------------------------------------------------------
/b*c/de | 8
@ -864,7 +864,7 @@ SELECT count(*)>=0 FROM select_from_dist_tbl_text_view WHERE a = U&'\0061\0308bc
t
(1 row)
SELECT tenant_attribute, query_count_in_this_period FROM citus_stat_tenants;
SELECT tenant_attribute, query_count_in_this_period FROM citus_stat_tenants ORDER BY tenant_attribute;
tenant_attribute | query_count_in_this_period
---------------------------------------------------------------------
/b*c/de | 11

View File

@ -272,7 +272,7 @@ SELECT select_from_dist_tbl_text('/b*c/de');
SELECT select_from_dist_tbl_text(U&'\0061\0308bc');
SELECT select_from_dist_tbl_text(U&'\0061\0308bc');
SELECT tenant_attribute, query_count_in_this_period FROM citus_stat_tenants;
SELECT tenant_attribute, query_count_in_this_period FROM citus_stat_tenants ORDER BY tenant_attribute;
CREATE OR REPLACE PROCEDURE select_from_dist_tbl_text_proc(
p_keyword text
@ -295,7 +295,7 @@ CALL citus_stat_tenants.select_from_dist_tbl_text_proc(U&'\0061\0308bc');
CALL citus_stat_tenants.select_from_dist_tbl_text_proc(U&'\0061\0308bc');
CALL citus_stat_tenants.select_from_dist_tbl_text_proc(NULL);
SELECT tenant_attribute, query_count_in_this_period FROM citus_stat_tenants;
SELECT tenant_attribute, query_count_in_this_period FROM citus_stat_tenants ORDER BY tenant_attribute;
CREATE OR REPLACE VIEW
select_from_dist_tbl_text_view
@ -309,7 +309,7 @@ SELECT count(*)>=0 FROM select_from_dist_tbl_text_view WHERE a = U&'\0061\0308bc
SELECT count(*)>=0 FROM select_from_dist_tbl_text_view WHERE a = U&'\0061\0308bc';
SELECT count(*)>=0 FROM select_from_dist_tbl_text_view WHERE a = U&'\0061\0308bc';
SELECT tenant_attribute, query_count_in_this_period FROM citus_stat_tenants;
SELECT tenant_attribute, query_count_in_this_period FROM citus_stat_tenants ORDER BY tenant_attribute;
SET client_min_messages TO ERROR;
DROP SCHEMA citus_stat_tenants CASCADE;