mirror of https://github.com/citusdata/citus.git
Initial hashtable implementation for tenant stats
parent
b437aa9e52
commit
ccd464ba04
|
@ -2003,7 +2003,7 @@ GenerateTaskMoveDependencyList(PlacementUpdateEvent *move, int64 colocationId,
|
||||||
* overlaps with the current move's target node.
|
* overlaps with the current move's target node.
|
||||||
* The earlier/first move might make space for the later/second move.
|
* The earlier/first move might make space for the later/second move.
|
||||||
* So we could run out of disk space (or at least overload the node)
|
* So we could run out of disk space (or at least overload the node)
|
||||||
* if we move the second shard to it before the first one is moved away.
|
* if we move the second shard to it before the first one is moved away.
|
||||||
*/
|
*/
|
||||||
ShardMoveSourceNodeHashEntry *shardMoveSourceNodeHashEntry = hash_search(
|
ShardMoveSourceNodeHashEntry *shardMoveSourceNodeHashEntry = hash_search(
|
||||||
shardMoveDependencies.nodeDependencies, &move->targetNode->nodeId, HASH_FIND,
|
shardMoveDependencies.nodeDependencies, &move->targetNode->nodeId, HASH_FIND,
|
||||||
|
|
|
@ -11,6 +11,7 @@
|
||||||
#include "postgres.h"
|
#include "postgres.h"
|
||||||
#include "unistd.h"
|
#include "unistd.h"
|
||||||
|
|
||||||
|
#include "access/hash.h"
|
||||||
#include "distributed/citus_safe_lib.h"
|
#include "distributed/citus_safe_lib.h"
|
||||||
#include "distributed/colocation_utils.h"
|
#include "distributed/colocation_utils.h"
|
||||||
#include "distributed/distributed_planner.h"
|
#include "distributed/distributed_planner.h"
|
||||||
|
@ -59,12 +60,16 @@ static void UpdatePeriodsIfNecessary(TenantStats *tenantStats, TimestampTz query
|
||||||
static void ReduceScoreIfNecessary(TenantStats *tenantStats, TimestampTz queryTime);
|
static void ReduceScoreIfNecessary(TenantStats *tenantStats, TimestampTz queryTime);
|
||||||
static void EvictTenantsIfNecessary(TimestampTz queryTime);
|
static void EvictTenantsIfNecessary(TimestampTz queryTime);
|
||||||
static void RecordTenantStats(TenantStats *tenantStats, TimestampTz queryTime);
|
static void RecordTenantStats(TenantStats *tenantStats, TimestampTz queryTime);
|
||||||
static void CreateMultiTenantMonitor(void);
|
|
||||||
static MultiTenantMonitor * CreateSharedMemoryForMultiTenantMonitor(void);
|
static MultiTenantMonitor * CreateSharedMemoryForMultiTenantMonitor(void);
|
||||||
static MultiTenantMonitor * GetMultiTenantMonitor(void);
|
static MultiTenantMonitor * GetMultiTenantMonitor(void);
|
||||||
static void MultiTenantMonitorSMInit(void);
|
static void MultiTenantMonitorSMInit(void);
|
||||||
static int CreateTenantStats(MultiTenantMonitor *monitor, TimestampTz queryTime);
|
static TenantStats * CreateTenantStats(MultiTenantMonitor *monitor, TimestampTz
|
||||||
static int FindTenantStats(MultiTenantMonitor *monitor);
|
queryTime);
|
||||||
|
static TenantStatsHashKey * CreateTenantStatsHashKey(char *tenantAttribute, uint32
|
||||||
|
colocationGroupId);
|
||||||
|
static TenantStats * FindTenantStats(MultiTenantMonitor *monitor);
|
||||||
|
static uint32 TenantStatsHashFn(const void *key, Size keysize);
|
||||||
|
static int TenantStatsMatchFn(const void *key1, const void *key2, Size keysize);
|
||||||
static size_t MultiTenantMonitorshmemSize(void);
|
static size_t MultiTenantMonitorshmemSize(void);
|
||||||
static char * ExtractTopComment(const char *inputString);
|
static char * ExtractTopComment(const char *inputString);
|
||||||
static char * EscapeCommentChars(const char *str);
|
static char * EscapeCommentChars(const char *str);
|
||||||
|
@ -112,21 +117,37 @@ citus_stat_tenants_local(PG_FUNCTION_ARGS)
|
||||||
LWLockAcquire(&monitor->lock, LW_EXCLUSIVE);
|
LWLockAcquire(&monitor->lock, LW_EXCLUSIVE);
|
||||||
|
|
||||||
int numberOfRowsToReturn = 0;
|
int numberOfRowsToReturn = 0;
|
||||||
|
int tenantStatsCount = hash_get_num_entries(monitor->tenants);
|
||||||
if (returnAllTenants)
|
if (returnAllTenants)
|
||||||
{
|
{
|
||||||
numberOfRowsToReturn = monitor->tenantCount;
|
numberOfRowsToReturn = tenantStatsCount;
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
numberOfRowsToReturn = Min(monitor->tenantCount, StatTenantsLimit);
|
numberOfRowsToReturn = Min(tenantStatsCount,
|
||||||
|
StatTenantsLimit);
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int tenantIndex = 0; tenantIndex < monitor->tenantCount; tenantIndex++)
|
TenantStats **stats = palloc(tenantStatsCount *
|
||||||
|
sizeof(TenantStats *));
|
||||||
|
|
||||||
|
HASH_SEQ_STATUS hash_seq;
|
||||||
|
TenantStats *stat;
|
||||||
|
|
||||||
|
int j = 0;
|
||||||
|
hash_seq_init(&hash_seq, monitor->tenants);
|
||||||
|
while ((stat = hash_seq_search(&hash_seq)) != NULL)
|
||||||
{
|
{
|
||||||
UpdatePeriodsIfNecessary(&monitor->tenants[tenantIndex], monitoringTime);
|
SpinLockAcquire(&stat->lock);
|
||||||
ReduceScoreIfNecessary(&monitor->tenants[tenantIndex], monitoringTime);
|
|
||||||
|
stats[j++] = stat;
|
||||||
|
UpdatePeriodsIfNecessary(stat, monitoringTime);
|
||||||
|
ReduceScoreIfNecessary(stat, monitoringTime);
|
||||||
|
|
||||||
|
SpinLockRelease(&stat->lock);
|
||||||
}
|
}
|
||||||
SafeQsort(monitor->tenants, monitor->tenantCount, sizeof(TenantStats),
|
|
||||||
|
SafeQsort(stats, j, sizeof(TenantStats *),
|
||||||
CompareTenantScore);
|
CompareTenantScore);
|
||||||
|
|
||||||
for (int i = 0; i < numberOfRowsToReturn; i++)
|
for (int i = 0; i < numberOfRowsToReturn; i++)
|
||||||
|
@ -134,10 +155,12 @@ citus_stat_tenants_local(PG_FUNCTION_ARGS)
|
||||||
memset(values, 0, sizeof(values));
|
memset(values, 0, sizeof(values));
|
||||||
memset(isNulls, false, sizeof(isNulls));
|
memset(isNulls, false, sizeof(isNulls));
|
||||||
|
|
||||||
TenantStats *tenantStats = &monitor->tenants[i];
|
TenantStats *tenantStats = stats[i];
|
||||||
|
|
||||||
values[0] = Int32GetDatum(tenantStats->colocationGroupId);
|
SpinLockAcquire(&tenantStats->lock);
|
||||||
values[1] = PointerGetDatum(cstring_to_text(tenantStats->tenantAttribute));
|
|
||||||
|
values[0] = Int32GetDatum(tenantStats->key.colocationGroupId);
|
||||||
|
values[1] = PointerGetDatum(cstring_to_text(tenantStats->key.tenantAttribute));
|
||||||
values[2] = Int32GetDatum(tenantStats->readsInThisPeriod);
|
values[2] = Int32GetDatum(tenantStats->readsInThisPeriod);
|
||||||
values[3] = Int32GetDatum(tenantStats->readsInLastPeriod);
|
values[3] = Int32GetDatum(tenantStats->readsInLastPeriod);
|
||||||
values[4] = Int32GetDatum(tenantStats->readsInThisPeriod +
|
values[4] = Int32GetDatum(tenantStats->readsInThisPeriod +
|
||||||
|
@ -148,15 +171,21 @@ citus_stat_tenants_local(PG_FUNCTION_ARGS)
|
||||||
values[7] = Float8GetDatum(tenantStats->cpuUsageInLastPeriod);
|
values[7] = Float8GetDatum(tenantStats->cpuUsageInLastPeriod);
|
||||||
values[8] = Int64GetDatum(tenantStats->score);
|
values[8] = Int64GetDatum(tenantStats->score);
|
||||||
|
|
||||||
|
SpinLockRelease(&tenantStats->lock);
|
||||||
|
|
||||||
tuplestore_putvalues(tupleStore, tupleDescriptor, values, isNulls);
|
tuplestore_putvalues(tupleStore, tupleDescriptor, values, isNulls);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pfree(stats);
|
||||||
|
|
||||||
LWLockRelease(&monitor->lock);
|
LWLockRelease(&monitor->lock);
|
||||||
|
|
||||||
PG_RETURN_VOID();
|
PG_RETURN_VOID();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
#include "miscadmin.h"
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* citus_stat_tenants_local_reset resets monitor for tenant statistics
|
* citus_stat_tenants_local_reset resets monitor for tenant statistics
|
||||||
* on the local node.
|
* on the local node.
|
||||||
|
@ -164,8 +193,44 @@ citus_stat_tenants_local(PG_FUNCTION_ARGS)
|
||||||
Datum
|
Datum
|
||||||
citus_stat_tenants_local_reset(PG_FUNCTION_ARGS)
|
citus_stat_tenants_local_reset(PG_FUNCTION_ARGS)
|
||||||
{
|
{
|
||||||
|
/* ereport(NOTICE, (errmsg("MyProcPid: %d", MyProcPid))); */
|
||||||
|
/*sleep(10); */
|
||||||
|
|
||||||
MultiTenantMonitor *monitor = GetMultiTenantMonitor();
|
MultiTenantMonitor *monitor = GetMultiTenantMonitor();
|
||||||
monitor->tenantCount = 0;
|
|
||||||
|
/* if monitor is not created yet, there is nothing to reset */
|
||||||
|
if (monitor == NULL)
|
||||||
|
{
|
||||||
|
PG_RETURN_VOID();
|
||||||
|
}
|
||||||
|
|
||||||
|
HASH_SEQ_STATUS hash_seq;
|
||||||
|
TenantStats *stats;
|
||||||
|
|
||||||
|
LWLockAcquire(&monitor->lock, LW_EXCLUSIVE);
|
||||||
|
|
||||||
|
hash_seq_init(&hash_seq, monitor->tenants);
|
||||||
|
while ((stats = hash_seq_search(&hash_seq)) != NULL)
|
||||||
|
{
|
||||||
|
bool found = false;
|
||||||
|
hash_search(monitor->tenants, &stats->key, HASH_REMOVE, &found);
|
||||||
|
|
||||||
|
if (found)
|
||||||
|
{
|
||||||
|
stats->writesInLastPeriod = 0;
|
||||||
|
stats->writesInThisPeriod = 0;
|
||||||
|
stats->readsInLastPeriod = 0;
|
||||||
|
stats->readsInThisPeriod = 0;
|
||||||
|
stats->cpuUsageInLastPeriod = 0;
|
||||||
|
stats->cpuUsageInThisPeriod = 0;
|
||||||
|
stats->score = 0;
|
||||||
|
stats->lastScoreReduction = 0;
|
||||||
|
|
||||||
|
/* pfree(stats); */
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
LWLockRelease(&monitor->lock);
|
||||||
|
|
||||||
PG_RETURN_VOID();
|
PG_RETURN_VOID();
|
||||||
}
|
}
|
||||||
|
@ -352,11 +417,10 @@ AttributeMetricsIfApplicable()
|
||||||
*/
|
*/
|
||||||
LWLockAcquire(&monitor->lock, LW_SHARED);
|
LWLockAcquire(&monitor->lock, LW_SHARED);
|
||||||
|
|
||||||
int currentTenantIndex = FindTenantStats(monitor);
|
TenantStats *tenantStats = FindTenantStats(monitor);
|
||||||
|
|
||||||
if (currentTenantIndex != -1)
|
if (tenantStats != NULL)
|
||||||
{
|
{
|
||||||
TenantStats *tenantStats = &monitor->tenants[currentTenantIndex];
|
|
||||||
SpinLockAcquire(&tenantStats->lock);
|
SpinLockAcquire(&tenantStats->lock);
|
||||||
|
|
||||||
UpdatePeriodsIfNecessary(tenantStats, queryTime);
|
UpdatePeriodsIfNecessary(tenantStats, queryTime);
|
||||||
|
@ -370,20 +434,19 @@ AttributeMetricsIfApplicable()
|
||||||
LWLockRelease(&monitor->lock);
|
LWLockRelease(&monitor->lock);
|
||||||
|
|
||||||
LWLockAcquire(&monitor->lock, LW_EXCLUSIVE);
|
LWLockAcquire(&monitor->lock, LW_EXCLUSIVE);
|
||||||
currentTenantIndex = FindTenantStats(monitor);
|
tenantStats = FindTenantStats(monitor);
|
||||||
|
|
||||||
if (currentTenantIndex == -1)
|
if (tenantStats == NULL)
|
||||||
{
|
{
|
||||||
currentTenantIndex = CreateTenantStats(monitor, queryTime);
|
tenantStats = CreateTenantStats(monitor, queryTime);
|
||||||
}
|
}
|
||||||
|
|
||||||
LWLockRelease(&monitor->lock);
|
LWLockRelease(&monitor->lock);
|
||||||
|
|
||||||
LWLockAcquire(&monitor->lock, LW_SHARED);
|
LWLockAcquire(&monitor->lock, LW_SHARED);
|
||||||
currentTenantIndex = FindTenantStats(monitor);
|
tenantStats = FindTenantStats(monitor);
|
||||||
if (currentTenantIndex != -1)
|
if (tenantStats != NULL)
|
||||||
{
|
{
|
||||||
TenantStats *tenantStats = &monitor->tenants[currentTenantIndex];
|
|
||||||
SpinLockAcquire(&tenantStats->lock);
|
SpinLockAcquire(&tenantStats->lock);
|
||||||
|
|
||||||
UpdatePeriodsIfNecessary(tenantStats, queryTime);
|
UpdatePeriodsIfNecessary(tenantStats, queryTime);
|
||||||
|
@ -506,15 +569,29 @@ EvictTenantsIfNecessary(TimestampTz queryTime)
|
||||||
*
|
*
|
||||||
* Every time tenant count hits StatTenantsLimit * 3, we reduce it back to StatTenantsLimit * 2.
|
* Every time tenant count hits StatTenantsLimit * 3, we reduce it back to StatTenantsLimit * 2.
|
||||||
*/
|
*/
|
||||||
if (monitor->tenantCount >= StatTenantsLimit * 3)
|
long tenantStatsCount = hash_get_num_entries(monitor->tenants);
|
||||||
|
if (tenantStatsCount >= StatTenantsLimit * 3)
|
||||||
{
|
{
|
||||||
for (int tenantIndex = 0; tenantIndex < monitor->tenantCount; tenantIndex++)
|
HASH_SEQ_STATUS hash_seq;
|
||||||
|
TenantStats *stat;
|
||||||
|
TenantStats **stats = palloc(tenantStatsCount *
|
||||||
|
sizeof(TenantStats *));
|
||||||
|
|
||||||
|
int i = 0;
|
||||||
|
hash_seq_init(&hash_seq, monitor->tenants);
|
||||||
|
while ((stat = hash_seq_search(&hash_seq)) != NULL)
|
||||||
{
|
{
|
||||||
ReduceScoreIfNecessary(&monitor->tenants[tenantIndex], queryTime);
|
stats[i++] = stat;
|
||||||
}
|
}
|
||||||
SafeQsort(monitor->tenants, monitor->tenantCount, sizeof(TenantStats),
|
|
||||||
CompareTenantScore);
|
SafeQsort(stats, i, sizeof(TenantStats *), CompareTenantScore);
|
||||||
monitor->tenantCount = StatTenantsLimit * 2;
|
|
||||||
|
for (i = StatTenantsLimit * 2; i < tenantStatsCount; i++)
|
||||||
|
{
|
||||||
|
hash_search(monitor->tenants, &stats[i]->key, HASH_REMOVE, NULL);
|
||||||
|
}
|
||||||
|
|
||||||
|
pfree(stats);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -552,17 +629,6 @@ RecordTenantStats(TenantStats *tenantStats, TimestampTz queryTime)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* CreateMultiTenantMonitor creates the data structure for multi tenant monitor.
|
|
||||||
*/
|
|
||||||
static void
|
|
||||||
CreateMultiTenantMonitor()
|
|
||||||
{
|
|
||||||
MultiTenantMonitor *monitor = CreateSharedMemoryForMultiTenantMonitor();
|
|
||||||
monitor->tenantCount = 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* CreateSharedMemoryForMultiTenantMonitor creates a dynamic shared memory segment for multi tenant monitor.
|
* CreateSharedMemoryForMultiTenantMonitor creates a dynamic shared memory segment for multi tenant monitor.
|
||||||
*/
|
*/
|
||||||
|
@ -585,6 +651,19 @@ CreateSharedMemoryForMultiTenantMonitor()
|
||||||
monitor->namedLockTranche.trancheName);
|
monitor->namedLockTranche.trancheName);
|
||||||
LWLockInitialize(&monitor->lock, monitor->namedLockTranche.trancheId);
|
LWLockInitialize(&monitor->lock, monitor->namedLockTranche.trancheId);
|
||||||
|
|
||||||
|
HASHCTL info;
|
||||||
|
|
||||||
|
memset(&info, 0, sizeof(info));
|
||||||
|
info.keysize = sizeof(TenantStatsHashKey);
|
||||||
|
info.entrysize = sizeof(TenantStats);
|
||||||
|
info.hash = TenantStatsHashFn;
|
||||||
|
info.match = TenantStatsMatchFn;
|
||||||
|
|
||||||
|
monitor->tenants = ShmemInitHash("citus_stats_tenants hash",
|
||||||
|
StatTenantsLimit * 3, StatTenantsLimit * 3,
|
||||||
|
&info, HASH_ELEM | HASH_FUNCTION | HASH_COMPARE |
|
||||||
|
HASH_SHARED_MEM);
|
||||||
|
|
||||||
return monitor;
|
return monitor;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -628,7 +707,7 @@ InitializeMultiTenantMonitorSMHandleManagement()
|
||||||
static void
|
static void
|
||||||
MultiTenantMonitorSMInit()
|
MultiTenantMonitorSMInit()
|
||||||
{
|
{
|
||||||
CreateMultiTenantMonitor();
|
CreateSharedMemoryForMultiTenantMonitor();
|
||||||
|
|
||||||
if (prev_shmem_startup_hook != NULL)
|
if (prev_shmem_startup_hook != NULL)
|
||||||
{
|
{
|
||||||
|
@ -642,7 +721,7 @@ MultiTenantMonitorSMInit()
|
||||||
*
|
*
|
||||||
* Calling this function should be protected by the monitor->lock in LW_EXCLUSIVE mode.
|
* Calling this function should be protected by the monitor->lock in LW_EXCLUSIVE mode.
|
||||||
*/
|
*/
|
||||||
static int
|
static TenantStats *
|
||||||
CreateTenantStats(MultiTenantMonitor *monitor, TimestampTz queryTime)
|
CreateTenantStats(MultiTenantMonitor *monitor, TimestampTz queryTime)
|
||||||
{
|
{
|
||||||
/*
|
/*
|
||||||
|
@ -651,38 +730,85 @@ CreateTenantStats(MultiTenantMonitor *monitor, TimestampTz queryTime)
|
||||||
*/
|
*/
|
||||||
EvictTenantsIfNecessary(queryTime);
|
EvictTenantsIfNecessary(queryTime);
|
||||||
|
|
||||||
int tenantIndex = monitor->tenantCount;
|
bool found;
|
||||||
|
TenantStatsHashKey *key = CreateTenantStatsHashKey(AttributeToTenant,
|
||||||
|
AttributeToColocationGroupId);
|
||||||
|
|
||||||
memset(&monitor->tenants[tenantIndex], 0, sizeof(monitor->tenants[tenantIndex]));
|
TenantStats *stats = (TenantStats *) hash_search(monitor->tenants, key,
|
||||||
|
HASH_ENTER, &found);
|
||||||
|
|
||||||
strcpy_s(monitor->tenants[tenantIndex].tenantAttribute,
|
pfree(key);
|
||||||
sizeof(monitor->tenants[tenantIndex].tenantAttribute), AttributeToTenant);
|
|
||||||
monitor->tenants[tenantIndex].colocationGroupId = AttributeToColocationGroupId;
|
|
||||||
SpinLockInit(&monitor->tenants[tenantIndex].lock);
|
|
||||||
|
|
||||||
monitor->tenantCount++;
|
if (!found)
|
||||||
|
{
|
||||||
|
/* initialize the stats lock for the new entry in the hash table */
|
||||||
|
SpinLockInit(&stats->lock);
|
||||||
|
}
|
||||||
|
|
||||||
return tenantIndex;
|
return stats;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* FindTenantStats finds the index for the current tenant's statistics.
|
* FindTenantStats finds the current tenant's statistics.
|
||||||
*/
|
*/
|
||||||
static int
|
static TenantStats *
|
||||||
FindTenantStats(MultiTenantMonitor *monitor)
|
FindTenantStats(MultiTenantMonitor *monitor)
|
||||||
{
|
{
|
||||||
for (int i = 0; i < monitor->tenantCount; i++)
|
TenantStatsHashKey *key = CreateTenantStatsHashKey(AttributeToTenant,
|
||||||
{
|
AttributeToColocationGroupId);
|
||||||
TenantStats *tenantStats = &monitor->tenants[i];
|
|
||||||
if (strcmp(tenantStats->tenantAttribute, AttributeToTenant) == 0 &&
|
|
||||||
tenantStats->colocationGroupId == AttributeToColocationGroupId)
|
|
||||||
{
|
|
||||||
return i;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return -1;
|
TenantStats *stats = (TenantStats *) hash_search(monitor->tenants, key,
|
||||||
|
HASH_FIND, NULL);
|
||||||
|
|
||||||
|
pfree(key);
|
||||||
|
|
||||||
|
return stats;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static TenantStatsHashKey *
|
||||||
|
CreateTenantStatsHashKey(char *tenantAttribute, uint32 colocationGroupId)
|
||||||
|
{
|
||||||
|
TenantStatsHashKey *key = (TenantStatsHashKey *) palloc(sizeof(TenantStatsHashKey));
|
||||||
|
memset(key->tenantAttribute, 0, MAX_TENANT_ATTRIBUTE_LENGTH);
|
||||||
|
strlcpy(key->tenantAttribute, tenantAttribute, MAX_TENANT_ATTRIBUTE_LENGTH);
|
||||||
|
key->colocationGroupId = colocationGroupId;
|
||||||
|
|
||||||
|
return key;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* CitusQuerysStatsHashFn calculates and returns hash value for a key
|
||||||
|
*/
|
||||||
|
static uint32
|
||||||
|
TenantStatsHashFn(const void *key, Size keysize)
|
||||||
|
{
|
||||||
|
const TenantStatsHashKey *k = (const TenantStatsHashKey *) key;
|
||||||
|
|
||||||
|
return hash_any((const unsigned char *) (k->tenantAttribute), strlen(
|
||||||
|
k->tenantAttribute)) ^
|
||||||
|
hash_uint32((uint32) k->colocationGroupId);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* TenantStatsMatchFn compares two keys - zero means match.
|
||||||
|
* See definition of HashCompareFunc in hsearch.h for more info.
|
||||||
|
*/
|
||||||
|
static int
|
||||||
|
TenantStatsMatchFn(const void *key1, const void *key2, Size keysize)
|
||||||
|
{
|
||||||
|
const TenantStatsHashKey *k1 = (const TenantStatsHashKey *) key1;
|
||||||
|
const TenantStatsHashKey *k2 = (const TenantStatsHashKey *) key2;
|
||||||
|
|
||||||
|
if (strcmp(k1->tenantAttribute, k2->tenantAttribute) == 0 &&
|
||||||
|
k1->colocationGroupId == k2->colocationGroupId)
|
||||||
|
{
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -15,20 +15,27 @@
|
||||||
#include "executor/executor.h"
|
#include "executor/executor.h"
|
||||||
#include "storage/lwlock.h"
|
#include "storage/lwlock.h"
|
||||||
#include "utils/datetime.h"
|
#include "utils/datetime.h"
|
||||||
|
#include "utils/hsearch.h"
|
||||||
|
|
||||||
#define MAX_TENANT_ATTRIBUTE_LENGTH 100
|
#define MAX_TENANT_ATTRIBUTE_LENGTH 100
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Hashtable key that defines the identity of a hashtable entry.
|
||||||
|
* The key is the attribute value, e.g distribution column and the colocation group id of the tenant.
|
||||||
|
*/
|
||||||
|
typedef struct TenantStatsHashKey
|
||||||
|
{
|
||||||
|
char tenantAttribute[MAX_TENANT_ATTRIBUTE_LENGTH];
|
||||||
|
int colocationGroupId;
|
||||||
|
} TenantStatsHashKey;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* TenantStats is the struct that keeps statistics about one tenant.
|
* TenantStats is the struct that keeps statistics about one tenant.
|
||||||
*/
|
*/
|
||||||
typedef struct TenantStats
|
typedef struct TenantStats
|
||||||
{
|
{
|
||||||
/*
|
TenantStatsHashKey key; /* hash key of entry - MUST BE FIRST */
|
||||||
* The attribute value, e.g distribution column, and colocation group id
|
|
||||||
* of the tenant.
|
|
||||||
*/
|
|
||||||
char tenantAttribute[MAX_TENANT_ATTRIBUTE_LENGTH];
|
|
||||||
int colocationGroupId;
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Number of SELECT queries this tenant ran in this and last periods.
|
* Number of SELECT queries this tenant ran in this and last periods.
|
||||||
|
@ -88,12 +95,9 @@ typedef struct MultiTenantMonitor
|
||||||
LWLock lock;
|
LWLock lock;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* tenantCount is the number of items in the tenants array.
|
* The max length of tenants hashtable is 3 * citus.stat_tenants_limit
|
||||||
* The total length of tenants array is set up at CreateSharedMemoryForMultiTenantMonitor
|
|
||||||
* and is 3 * citus.stat_tenants_limit
|
|
||||||
*/
|
*/
|
||||||
int tenantCount;
|
HTAB *tenants;
|
||||||
TenantStats tenants[FLEXIBLE_ARRAY_MEMBER];
|
|
||||||
} MultiTenantMonitor;
|
} MultiTenantMonitor;
|
||||||
|
|
||||||
typedef enum
|
typedef enum
|
||||||
|
|
|
@ -791,7 +791,7 @@ SELECT select_from_dist_tbl_text(U&'\0061\0308bc');
|
||||||
t
|
t
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
SELECT tenant_attribute, query_count_in_this_period FROM citus_stat_tenants;
|
SELECT tenant_attribute, query_count_in_this_period FROM citus_stat_tenants ORDER BY tenant_attribute;
|
||||||
tenant_attribute | query_count_in_this_period
|
tenant_attribute | query_count_in_this_period
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
/b*c/de | 2
|
/b*c/de | 2
|
||||||
|
@ -817,7 +817,7 @@ CALL citus_stat_tenants.select_from_dist_tbl_text_proc(U&'\0061\0308bc');
|
||||||
CALL citus_stat_tenants.select_from_dist_tbl_text_proc(U&'\0061\0308bc');
|
CALL citus_stat_tenants.select_from_dist_tbl_text_proc(U&'\0061\0308bc');
|
||||||
CALL citus_stat_tenants.select_from_dist_tbl_text_proc(U&'\0061\0308bc');
|
CALL citus_stat_tenants.select_from_dist_tbl_text_proc(U&'\0061\0308bc');
|
||||||
CALL citus_stat_tenants.select_from_dist_tbl_text_proc(NULL);
|
CALL citus_stat_tenants.select_from_dist_tbl_text_proc(NULL);
|
||||||
SELECT tenant_attribute, query_count_in_this_period FROM citus_stat_tenants;
|
SELECT tenant_attribute, query_count_in_this_period FROM citus_stat_tenants ORDER BY tenant_attribute;
|
||||||
tenant_attribute | query_count_in_this_period
|
tenant_attribute | query_count_in_this_period
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
/b*c/de | 8
|
/b*c/de | 8
|
||||||
|
@ -864,7 +864,7 @@ SELECT count(*)>=0 FROM select_from_dist_tbl_text_view WHERE a = U&'\0061\0308bc
|
||||||
t
|
t
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
SELECT tenant_attribute, query_count_in_this_period FROM citus_stat_tenants;
|
SELECT tenant_attribute, query_count_in_this_period FROM citus_stat_tenants ORDER BY tenant_attribute;
|
||||||
tenant_attribute | query_count_in_this_period
|
tenant_attribute | query_count_in_this_period
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
/b*c/de | 11
|
/b*c/de | 11
|
||||||
|
|
|
@ -272,7 +272,7 @@ SELECT select_from_dist_tbl_text('/b*c/de');
|
||||||
SELECT select_from_dist_tbl_text(U&'\0061\0308bc');
|
SELECT select_from_dist_tbl_text(U&'\0061\0308bc');
|
||||||
SELECT select_from_dist_tbl_text(U&'\0061\0308bc');
|
SELECT select_from_dist_tbl_text(U&'\0061\0308bc');
|
||||||
|
|
||||||
SELECT tenant_attribute, query_count_in_this_period FROM citus_stat_tenants;
|
SELECT tenant_attribute, query_count_in_this_period FROM citus_stat_tenants ORDER BY tenant_attribute;
|
||||||
|
|
||||||
CREATE OR REPLACE PROCEDURE select_from_dist_tbl_text_proc(
|
CREATE OR REPLACE PROCEDURE select_from_dist_tbl_text_proc(
|
||||||
p_keyword text
|
p_keyword text
|
||||||
|
@ -295,7 +295,7 @@ CALL citus_stat_tenants.select_from_dist_tbl_text_proc(U&'\0061\0308bc');
|
||||||
CALL citus_stat_tenants.select_from_dist_tbl_text_proc(U&'\0061\0308bc');
|
CALL citus_stat_tenants.select_from_dist_tbl_text_proc(U&'\0061\0308bc');
|
||||||
CALL citus_stat_tenants.select_from_dist_tbl_text_proc(NULL);
|
CALL citus_stat_tenants.select_from_dist_tbl_text_proc(NULL);
|
||||||
|
|
||||||
SELECT tenant_attribute, query_count_in_this_period FROM citus_stat_tenants;
|
SELECT tenant_attribute, query_count_in_this_period FROM citus_stat_tenants ORDER BY tenant_attribute;
|
||||||
|
|
||||||
CREATE OR REPLACE VIEW
|
CREATE OR REPLACE VIEW
|
||||||
select_from_dist_tbl_text_view
|
select_from_dist_tbl_text_view
|
||||||
|
@ -309,7 +309,7 @@ SELECT count(*)>=0 FROM select_from_dist_tbl_text_view WHERE a = U&'\0061\0308bc
|
||||||
SELECT count(*)>=0 FROM select_from_dist_tbl_text_view WHERE a = U&'\0061\0308bc';
|
SELECT count(*)>=0 FROM select_from_dist_tbl_text_view WHERE a = U&'\0061\0308bc';
|
||||||
SELECT count(*)>=0 FROM select_from_dist_tbl_text_view WHERE a = U&'\0061\0308bc';
|
SELECT count(*)>=0 FROM select_from_dist_tbl_text_view WHERE a = U&'\0061\0308bc';
|
||||||
|
|
||||||
SELECT tenant_attribute, query_count_in_this_period FROM citus_stat_tenants;
|
SELECT tenant_attribute, query_count_in_this_period FROM citus_stat_tenants ORDER BY tenant_attribute;
|
||||||
|
|
||||||
SET client_min_messages TO ERROR;
|
SET client_min_messages TO ERROR;
|
||||||
DROP SCHEMA citus_stat_tenants CASCADE;
|
DROP SCHEMA citus_stat_tenants CASCADE;
|
||||||
|
|
Loading…
Reference in New Issue