diff --git a/src/backend/distributed/operations/shard_rebalancer.c b/src/backend/distributed/operations/shard_rebalancer.c index b5ec9b7ba..eadca1d91 100644 --- a/src/backend/distributed/operations/shard_rebalancer.c +++ b/src/backend/distributed/operations/shard_rebalancer.c @@ -2003,7 +2003,7 @@ GenerateTaskMoveDependencyList(PlacementUpdateEvent *move, int64 colocationId, * overlaps with the current move's target node. * The earlier/first move might make space for the later/second move. * So we could run out of disk space (or at least overload the node) - * if we move the second shard to it before the first one is moved away.  + * if we move the second shard to it before the first one is moved away. */ ShardMoveSourceNodeHashEntry *shardMoveSourceNodeHashEntry = hash_search( shardMoveDependencies.nodeDependencies, &move->targetNode->nodeId, HASH_FIND, diff --git a/src/backend/distributed/utils/citus_stat_tenants.c b/src/backend/distributed/utils/citus_stat_tenants.c index fc35ae808..cee6dbfd1 100644 --- a/src/backend/distributed/utils/citus_stat_tenants.c +++ b/src/backend/distributed/utils/citus_stat_tenants.c @@ -11,6 +11,7 @@ #include "postgres.h" #include "unistd.h" +#include "access/hash.h" #include "distributed/citus_safe_lib.h" #include "distributed/colocation_utils.h" #include "distributed/distributed_planner.h" @@ -59,12 +60,16 @@ static void UpdatePeriodsIfNecessary(TenantStats *tenantStats, TimestampTz query static void ReduceScoreIfNecessary(TenantStats *tenantStats, TimestampTz queryTime); static void EvictTenantsIfNecessary(TimestampTz queryTime); static void RecordTenantStats(TenantStats *tenantStats, TimestampTz queryTime); -static void CreateMultiTenantMonitor(void); static MultiTenantMonitor * CreateSharedMemoryForMultiTenantMonitor(void); static MultiTenantMonitor * GetMultiTenantMonitor(void); static void MultiTenantMonitorSMInit(void); -static int CreateTenantStats(MultiTenantMonitor *monitor, TimestampTz queryTime); -static int FindTenantStats(MultiTenantMonitor *monitor); +static TenantStats * CreateTenantStats(MultiTenantMonitor *monitor, TimestampTz + queryTime); +static TenantStatsHashKey * CreateTenantStatsHashKey(char *tenantAttribute, uint32 + colocationGroupId); +static TenantStats * FindTenantStats(MultiTenantMonitor *monitor); +static uint32 TenantStatsHashFn(const void *key, Size keysize); +static int TenantStatsMatchFn(const void *key1, const void *key2, Size keysize); static size_t MultiTenantMonitorshmemSize(void); static char * ExtractTopComment(const char *inputString); static char * EscapeCommentChars(const char *str); @@ -112,21 +117,37 @@ citus_stat_tenants_local(PG_FUNCTION_ARGS) LWLockAcquire(&monitor->lock, LW_EXCLUSIVE); int numberOfRowsToReturn = 0; + int tenantStatsCount = hash_get_num_entries(monitor->tenants); if (returnAllTenants) { - numberOfRowsToReturn = monitor->tenantCount; + numberOfRowsToReturn = tenantStatsCount; } else { - numberOfRowsToReturn = Min(monitor->tenantCount, StatTenantsLimit); + numberOfRowsToReturn = Min(tenantStatsCount, + StatTenantsLimit); } - for (int tenantIndex = 0; tenantIndex < monitor->tenantCount; tenantIndex++) + TenantStats **stats = palloc(tenantStatsCount * + sizeof(TenantStats *)); + + HASH_SEQ_STATUS hash_seq; + TenantStats *stat; + + int j = 0; + hash_seq_init(&hash_seq, monitor->tenants); + while ((stat = hash_seq_search(&hash_seq)) != NULL) { - UpdatePeriodsIfNecessary(&monitor->tenants[tenantIndex], monitoringTime); - ReduceScoreIfNecessary(&monitor->tenants[tenantIndex], monitoringTime); + SpinLockAcquire(&stat->lock); + + stats[j++] = stat; + UpdatePeriodsIfNecessary(stat, monitoringTime); + ReduceScoreIfNecessary(stat, monitoringTime); + + SpinLockRelease(&stat->lock); } - SafeQsort(monitor->tenants, monitor->tenantCount, sizeof(TenantStats), + + SafeQsort(stats, j, sizeof(TenantStats *), CompareTenantScore); for (int i = 0; i < numberOfRowsToReturn; i++) @@ -134,10 +155,12 @@ citus_stat_tenants_local(PG_FUNCTION_ARGS) memset(values, 0, sizeof(values)); memset(isNulls, false, sizeof(isNulls)); - TenantStats *tenantStats = &monitor->tenants[i]; + TenantStats *tenantStats = stats[i]; - values[0] = Int32GetDatum(tenantStats->colocationGroupId); - values[1] = PointerGetDatum(cstring_to_text(tenantStats->tenantAttribute)); + SpinLockAcquire(&tenantStats->lock); + + values[0] = Int32GetDatum(tenantStats->key.colocationGroupId); + values[1] = PointerGetDatum(cstring_to_text(tenantStats->key.tenantAttribute)); values[2] = Int32GetDatum(tenantStats->readsInThisPeriod); values[3] = Int32GetDatum(tenantStats->readsInLastPeriod); values[4] = Int32GetDatum(tenantStats->readsInThisPeriod + @@ -148,15 +171,21 @@ citus_stat_tenants_local(PG_FUNCTION_ARGS) values[7] = Float8GetDatum(tenantStats->cpuUsageInLastPeriod); values[8] = Int64GetDatum(tenantStats->score); + SpinLockRelease(&tenantStats->lock); + tuplestore_putvalues(tupleStore, tupleDescriptor, values, isNulls); } + pfree(stats); + LWLockRelease(&monitor->lock); PG_RETURN_VOID(); } +#include "miscadmin.h" + /* * citus_stat_tenants_local_reset resets monitor for tenant statistics * on the local node. @@ -164,8 +193,44 @@ citus_stat_tenants_local(PG_FUNCTION_ARGS) Datum citus_stat_tenants_local_reset(PG_FUNCTION_ARGS) { + /* ereport(NOTICE, (errmsg("MyProcPid: %d", MyProcPid))); */ + /*sleep(10); */ + MultiTenantMonitor *monitor = GetMultiTenantMonitor(); - monitor->tenantCount = 0; + + /* if monitor is not created yet, there is nothing to reset */ + if (monitor == NULL) + { + PG_RETURN_VOID(); + } + + HASH_SEQ_STATUS hash_seq; + TenantStats *stats; + + LWLockAcquire(&monitor->lock, LW_EXCLUSIVE); + + hash_seq_init(&hash_seq, monitor->tenants); + while ((stats = hash_seq_search(&hash_seq)) != NULL) + { + bool found = false; + hash_search(monitor->tenants, &stats->key, HASH_REMOVE, &found); + + if (found) + { + stats->writesInLastPeriod = 0; + stats->writesInThisPeriod = 0; + stats->readsInLastPeriod = 0; + stats->readsInThisPeriod = 0; + stats->cpuUsageInLastPeriod = 0; + stats->cpuUsageInThisPeriod = 0; + stats->score = 0; + stats->lastScoreReduction = 0; + + /* pfree(stats); */ + } + } + + LWLockRelease(&monitor->lock); PG_RETURN_VOID(); } @@ -352,11 +417,10 @@ AttributeMetricsIfApplicable() */ LWLockAcquire(&monitor->lock, LW_SHARED); - int currentTenantIndex = FindTenantStats(monitor); + TenantStats *tenantStats = FindTenantStats(monitor); - if (currentTenantIndex != -1) + if (tenantStats != NULL) { - TenantStats *tenantStats = &monitor->tenants[currentTenantIndex]; SpinLockAcquire(&tenantStats->lock); UpdatePeriodsIfNecessary(tenantStats, queryTime); @@ -370,20 +434,19 @@ AttributeMetricsIfApplicable() LWLockRelease(&monitor->lock); LWLockAcquire(&monitor->lock, LW_EXCLUSIVE); - currentTenantIndex = FindTenantStats(monitor); + tenantStats = FindTenantStats(monitor); - if (currentTenantIndex == -1) + if (tenantStats == NULL) { - currentTenantIndex = CreateTenantStats(monitor, queryTime); + tenantStats = CreateTenantStats(monitor, queryTime); } LWLockRelease(&monitor->lock); LWLockAcquire(&monitor->lock, LW_SHARED); - currentTenantIndex = FindTenantStats(monitor); - if (currentTenantIndex != -1) + tenantStats = FindTenantStats(monitor); + if (tenantStats != NULL) { - TenantStats *tenantStats = &monitor->tenants[currentTenantIndex]; SpinLockAcquire(&tenantStats->lock); UpdatePeriodsIfNecessary(tenantStats, queryTime); @@ -506,15 +569,29 @@ EvictTenantsIfNecessary(TimestampTz queryTime) * * Every time tenant count hits StatTenantsLimit * 3, we reduce it back to StatTenantsLimit * 2. */ - if (monitor->tenantCount >= StatTenantsLimit * 3) + long tenantStatsCount = hash_get_num_entries(monitor->tenants); + if (tenantStatsCount >= StatTenantsLimit * 3) { - for (int tenantIndex = 0; tenantIndex < monitor->tenantCount; tenantIndex++) + HASH_SEQ_STATUS hash_seq; + TenantStats *stat; + TenantStats **stats = palloc(tenantStatsCount * + sizeof(TenantStats *)); + + int i = 0; + hash_seq_init(&hash_seq, monitor->tenants); + while ((stat = hash_seq_search(&hash_seq)) != NULL) { - ReduceScoreIfNecessary(&monitor->tenants[tenantIndex], queryTime); + stats[i++] = stat; } - SafeQsort(monitor->tenants, monitor->tenantCount, sizeof(TenantStats), - CompareTenantScore); - monitor->tenantCount = StatTenantsLimit * 2; + + SafeQsort(stats, i, sizeof(TenantStats *), CompareTenantScore); + + for (i = StatTenantsLimit * 2; i < tenantStatsCount; i++) + { + hash_search(monitor->tenants, &stats[i]->key, HASH_REMOVE, NULL); + } + + pfree(stats); } } @@ -552,17 +629,6 @@ RecordTenantStats(TenantStats *tenantStats, TimestampTz queryTime) } -/* - * CreateMultiTenantMonitor creates the data structure for multi tenant monitor. - */ -static void -CreateMultiTenantMonitor() -{ - MultiTenantMonitor *monitor = CreateSharedMemoryForMultiTenantMonitor(); - monitor->tenantCount = 0; -} - - /* * CreateSharedMemoryForMultiTenantMonitor creates a dynamic shared memory segment for multi tenant monitor. */ @@ -585,6 +651,19 @@ CreateSharedMemoryForMultiTenantMonitor() monitor->namedLockTranche.trancheName); LWLockInitialize(&monitor->lock, monitor->namedLockTranche.trancheId); + HASHCTL info; + + memset(&info, 0, sizeof(info)); + info.keysize = sizeof(TenantStatsHashKey); + info.entrysize = sizeof(TenantStats); + info.hash = TenantStatsHashFn; + info.match = TenantStatsMatchFn; + + monitor->tenants = ShmemInitHash("citus_stats_tenants hash", + StatTenantsLimit * 3, StatTenantsLimit * 3, + &info, HASH_ELEM | HASH_FUNCTION | HASH_COMPARE | + HASH_SHARED_MEM); + return monitor; } @@ -628,7 +707,7 @@ InitializeMultiTenantMonitorSMHandleManagement() static void MultiTenantMonitorSMInit() { - CreateMultiTenantMonitor(); + CreateSharedMemoryForMultiTenantMonitor(); if (prev_shmem_startup_hook != NULL) { @@ -642,7 +721,7 @@ MultiTenantMonitorSMInit() * * Calling this function should be protected by the monitor->lock in LW_EXCLUSIVE mode. */ -static int +static TenantStats * CreateTenantStats(MultiTenantMonitor *monitor, TimestampTz queryTime) { /* @@ -651,38 +730,85 @@ CreateTenantStats(MultiTenantMonitor *monitor, TimestampTz queryTime) */ EvictTenantsIfNecessary(queryTime); - int tenantIndex = monitor->tenantCount; + bool found; + TenantStatsHashKey *key = CreateTenantStatsHashKey(AttributeToTenant, + AttributeToColocationGroupId); - memset(&monitor->tenants[tenantIndex], 0, sizeof(monitor->tenants[tenantIndex])); + TenantStats *stats = (TenantStats *) hash_search(monitor->tenants, key, + HASH_ENTER, &found); - strcpy_s(monitor->tenants[tenantIndex].tenantAttribute, - sizeof(monitor->tenants[tenantIndex].tenantAttribute), AttributeToTenant); - monitor->tenants[tenantIndex].colocationGroupId = AttributeToColocationGroupId; - SpinLockInit(&monitor->tenants[tenantIndex].lock); + pfree(key); - monitor->tenantCount++; + if (!found) + { + /* initialize the stats lock for the new entry in the hash table */ + SpinLockInit(&stats->lock); + } - return tenantIndex; + return stats; } /* - * FindTenantStats finds the index for the current tenant's statistics. + * FindTenantStats finds the current tenant's statistics. */ -static int +static TenantStats * FindTenantStats(MultiTenantMonitor *monitor) { - for (int i = 0; i < monitor->tenantCount; i++) - { - TenantStats *tenantStats = &monitor->tenants[i]; - if (strcmp(tenantStats->tenantAttribute, AttributeToTenant) == 0 && - tenantStats->colocationGroupId == AttributeToColocationGroupId) - { - return i; - } - } + TenantStatsHashKey *key = CreateTenantStatsHashKey(AttributeToTenant, + AttributeToColocationGroupId); - return -1; + TenantStats *stats = (TenantStats *) hash_search(monitor->tenants, key, + HASH_FIND, NULL); + + pfree(key); + + return stats; +} + + +static TenantStatsHashKey * +CreateTenantStatsHashKey(char *tenantAttribute, uint32 colocationGroupId) +{ + TenantStatsHashKey *key = (TenantStatsHashKey *) palloc(sizeof(TenantStatsHashKey)); + memset(key->tenantAttribute, 0, MAX_TENANT_ATTRIBUTE_LENGTH); + strlcpy(key->tenantAttribute, tenantAttribute, MAX_TENANT_ATTRIBUTE_LENGTH); + key->colocationGroupId = colocationGroupId; + + return key; +} + + +/* + * CitusQuerysStatsHashFn calculates and returns hash value for a key + */ +static uint32 +TenantStatsHashFn(const void *key, Size keysize) +{ + const TenantStatsHashKey *k = (const TenantStatsHashKey *) key; + + return hash_any((const unsigned char *) (k->tenantAttribute), strlen( + k->tenantAttribute)) ^ + hash_uint32((uint32) k->colocationGroupId); +} + + +/* + * TenantStatsMatchFn compares two keys - zero means match. + * See definition of HashCompareFunc in hsearch.h for more info. + */ +static int +TenantStatsMatchFn(const void *key1, const void *key2, Size keysize) +{ + const TenantStatsHashKey *k1 = (const TenantStatsHashKey *) key1; + const TenantStatsHashKey *k2 = (const TenantStatsHashKey *) key2; + + if (strcmp(k1->tenantAttribute, k2->tenantAttribute) == 0 && + k1->colocationGroupId == k2->colocationGroupId) + { + return 0; + } + return 1; } diff --git a/src/include/distributed/utils/citus_stat_tenants.h b/src/include/distributed/utils/citus_stat_tenants.h index 06041604b..bbea85d3e 100644 --- a/src/include/distributed/utils/citus_stat_tenants.h +++ b/src/include/distributed/utils/citus_stat_tenants.h @@ -15,20 +15,27 @@ #include "executor/executor.h" #include "storage/lwlock.h" #include "utils/datetime.h" +#include "utils/hsearch.h" #define MAX_TENANT_ATTRIBUTE_LENGTH 100 +/* + * Hashtable key that defines the identity of a hashtable entry. + * The key is the attribute value, e.g distribution column and the colocation group id of the tenant. + */ +typedef struct TenantStatsHashKey +{ + char tenantAttribute[MAX_TENANT_ATTRIBUTE_LENGTH]; + int colocationGroupId; +} TenantStatsHashKey; + /* * TenantStats is the struct that keeps statistics about one tenant. */ typedef struct TenantStats { - /* - * The attribute value, e.g distribution column, and colocation group id - * of the tenant. - */ - char tenantAttribute[MAX_TENANT_ATTRIBUTE_LENGTH]; - int colocationGroupId; + TenantStatsHashKey key; /* hash key of entry - MUST BE FIRST */ + /* * Number of SELECT queries this tenant ran in this and last periods. @@ -88,12 +95,9 @@ typedef struct MultiTenantMonitor LWLock lock; /* - * tenantCount is the number of items in the tenants array. - * The total length of tenants array is set up at CreateSharedMemoryForMultiTenantMonitor - * and is 3 * citus.stat_tenants_limit + * The max length of tenants hashtable is 3 * citus.stat_tenants_limit */ - int tenantCount; - TenantStats tenants[FLEXIBLE_ARRAY_MEMBER]; + HTAB *tenants; } MultiTenantMonitor; typedef enum diff --git a/src/test/regress/expected/citus_stat_tenants.out b/src/test/regress/expected/citus_stat_tenants.out index c1f07ccaa..6000eb527 100644 --- a/src/test/regress/expected/citus_stat_tenants.out +++ b/src/test/regress/expected/citus_stat_tenants.out @@ -791,7 +791,7 @@ SELECT select_from_dist_tbl_text(U&'\0061\0308bc'); t (1 row) -SELECT tenant_attribute, query_count_in_this_period FROM citus_stat_tenants; +SELECT tenant_attribute, query_count_in_this_period FROM citus_stat_tenants ORDER BY tenant_attribute; tenant_attribute | query_count_in_this_period --------------------------------------------------------------------- /b*c/de | 2 @@ -817,7 +817,7 @@ CALL citus_stat_tenants.select_from_dist_tbl_text_proc(U&'\0061\0308bc'); CALL citus_stat_tenants.select_from_dist_tbl_text_proc(U&'\0061\0308bc'); CALL citus_stat_tenants.select_from_dist_tbl_text_proc(U&'\0061\0308bc'); CALL citus_stat_tenants.select_from_dist_tbl_text_proc(NULL); -SELECT tenant_attribute, query_count_in_this_period FROM citus_stat_tenants; +SELECT tenant_attribute, query_count_in_this_period FROM citus_stat_tenants ORDER BY tenant_attribute; tenant_attribute | query_count_in_this_period --------------------------------------------------------------------- /b*c/de | 8 @@ -864,7 +864,7 @@ SELECT count(*)>=0 FROM select_from_dist_tbl_text_view WHERE a = U&'\0061\0308bc t (1 row) -SELECT tenant_attribute, query_count_in_this_period FROM citus_stat_tenants; +SELECT tenant_attribute, query_count_in_this_period FROM citus_stat_tenants ORDER BY tenant_attribute; tenant_attribute | query_count_in_this_period --------------------------------------------------------------------- /b*c/de | 11 diff --git a/src/test/regress/sql/citus_stat_tenants.sql b/src/test/regress/sql/citus_stat_tenants.sql index af44c7f1e..7d7057d56 100644 --- a/src/test/regress/sql/citus_stat_tenants.sql +++ b/src/test/regress/sql/citus_stat_tenants.sql @@ -272,7 +272,7 @@ SELECT select_from_dist_tbl_text('/b*c/de'); SELECT select_from_dist_tbl_text(U&'\0061\0308bc'); SELECT select_from_dist_tbl_text(U&'\0061\0308bc'); -SELECT tenant_attribute, query_count_in_this_period FROM citus_stat_tenants; +SELECT tenant_attribute, query_count_in_this_period FROM citus_stat_tenants ORDER BY tenant_attribute; CREATE OR REPLACE PROCEDURE select_from_dist_tbl_text_proc( p_keyword text @@ -295,7 +295,7 @@ CALL citus_stat_tenants.select_from_dist_tbl_text_proc(U&'\0061\0308bc'); CALL citus_stat_tenants.select_from_dist_tbl_text_proc(U&'\0061\0308bc'); CALL citus_stat_tenants.select_from_dist_tbl_text_proc(NULL); -SELECT tenant_attribute, query_count_in_this_period FROM citus_stat_tenants; +SELECT tenant_attribute, query_count_in_this_period FROM citus_stat_tenants ORDER BY tenant_attribute; CREATE OR REPLACE VIEW select_from_dist_tbl_text_view @@ -309,7 +309,7 @@ SELECT count(*)>=0 FROM select_from_dist_tbl_text_view WHERE a = U&'\0061\0308bc SELECT count(*)>=0 FROM select_from_dist_tbl_text_view WHERE a = U&'\0061\0308bc'; SELECT count(*)>=0 FROM select_from_dist_tbl_text_view WHERE a = U&'\0061\0308bc'; -SELECT tenant_attribute, query_count_in_this_period FROM citus_stat_tenants; +SELECT tenant_attribute, query_count_in_this_period FROM citus_stat_tenants ORDER BY tenant_attribute; SET client_min_messages TO ERROR; DROP SCHEMA citus_stat_tenants CASCADE;