Add locking mechanism for tenant monitoring probabilistic approach (#7026)

This PR 
* Addresses a concurrency issue in the probabilistic approach of tenant
monitoring by acquiring a shared lock for tenant existence checks.
* Changes `citus.stat_tenants_sample_rate_for_new_tenants` type to
double
* Renames `citus.stat_tenants_sample_rate_for_new_tenants` to
`citus.stat_tenants_untracked_sample_rate`
pull/6999/merge
Gokhan Gulbiz 2023-07-03 13:08:03 +03:00 committed by GitHub
parent ac24e11986
commit e0d3476526
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 177 additions and 19 deletions

View File

@ -2489,17 +2489,6 @@ RegisterCitusConfigVariables(void)
GUC_STANDARD,
NULL, NULL, NULL);
DefineCustomIntVariable(
"citus.stat_tenants_sample_rate_for_new_tenants",
gettext_noop("Sampling rate for new tenants in citus_stat_tenants."),
NULL,
&StatTenantsSampleRateForNewTenants,
100, 1, 100,
PGC_USERSET,
GUC_STANDARD,
NULL, NULL, NULL);
DefineCustomEnumVariable(
"citus.stat_tenants_track",
gettext_noop("Enables/Disables the stats collection for citus_stat_tenants."),
@ -2513,6 +2502,16 @@ RegisterCitusConfigVariables(void)
GUC_STANDARD,
NULL, NULL, NULL);
DefineCustomRealVariable(
"citus.stat_tenants_untracked_sample_rate",
gettext_noop("Sampling rate for new tenants in citus_stat_tenants."),
NULL,
&StatTenantsSampleRateForNewTenants,
1, 0, 1,
PGC_USERSET,
GUC_STANDARD,
NULL, NULL, NULL);
DefineCustomBoolVariable(
"citus.subquery_pushdown",
gettext_noop("Usage of this GUC is highly discouraged, please read the long "

View File

@ -36,6 +36,10 @@
#include <time.h>
#if (PG_VERSION_NUM >= PG_VERSION_15)
#include "common/pg_prng.h"
#endif
static void AttributeMetricsIfApplicable(void);
ExecutorEnd_hook_type prev_ExecutorEnd = NULL;
@ -80,7 +84,7 @@ int StatTenantsLogLevel = CITUS_LOG_LEVEL_OFF;
int StatTenantsPeriod = (time_t) 60;
int StatTenantsLimit = 100;
int StatTenantsTrack = STAT_TENANTS_TRACK_NONE;
int StatTenantsSampleRateForNewTenants = 100;
double StatTenantsSampleRateForNewTenants = 1;
PG_FUNCTION_INFO_V1(citus_stat_tenants_local);
PG_FUNCTION_INFO_V1(citus_stat_tenants_local_reset);
@ -281,13 +285,25 @@ AttributeTask(char *tenantId, int colocationId, CmdType commandType)
MultiTenantMonitor *monitor = GetMultiTenantMonitor();
bool found = false;
/* Acquire the lock in shared mode to check if the tenant is already in the hash table. */
LWLockAcquire(&monitor->lock, LW_SHARED);
hash_search(monitor->tenants, &key, HASH_FIND, &found);
LWLockRelease(&monitor->lock);
/* If the tenant is not found in the hash table, we will track the query with a probability of StatTenantsSampleRateForNewTenants. */
if (!found)
{
int randomValue = rand() % 100;
bool shouldTrackQuery = randomValue < StatTenantsSampleRateForNewTenants;
#if (PG_VERSION_NUM >= PG_VERSION_15)
double randomValue = pg_prng_double(&pg_global_prng_state);
#else
/* Generate a random double between 0 and 1 */
double randomValue = (double) random() / MAX_RANDOM_VALUE;
#endif
bool shouldTrackQuery = randomValue <= StatTenantsSampleRateForNewTenants;
if (!shouldTrackQuery)
{
return;

View File

@ -121,6 +121,6 @@ extern int StatTenantsLogLevel;
extern int StatTenantsPeriod;
extern int StatTenantsLimit;
extern int StatTenantsTrack;
extern int StatTenantsSampleRateForNewTenants;
extern double StatTenantsSampleRateForNewTenants;
#endif /*CITUS_ATTRIBUTE_H */

View File

@ -240,12 +240,21 @@ SELECT tenant_attribute, query_count_in_this_period, score FROM citus_stat_tenan
(5 rows)
-- test period passing
\c - - - :worker_1_port
SET search_path TO citus_stat_tenants;
SET citus.stat_tenants_period TO 2;
SELECT citus_stat_tenants_reset();
citus_stat_tenants_reset
---------------------------------------------------------------------
(1 row)
SELECT sleep_until_next_period();
sleep_until_next_period
---------------------------------------------------------------------
(1 row)
SELECT count(*)>=0 FROM dist_tbl WHERE a = 1;
?column?
---------------------------------------------------------------------
@ -253,7 +262,6 @@ SELECT count(*)>=0 FROM dist_tbl WHERE a = 1;
(1 row)
INSERT INTO dist_tbl VALUES (5, 'abcd');
\c - - - :worker_1_port
SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period,
(cpu_usage_in_this_period>0) AS cpu_is_used_in_this_period, (cpu_usage_in_last_period>0) AS cpu_is_used_in_last_period
FROM citus_stat_tenants_local
@ -265,13 +273,18 @@ ORDER BY tenant_attribute;
(2 rows)
-- simulate passing the period
SET citus.stat_tenants_period TO 5;
SELECT sleep_until_next_period();
sleep_until_next_period
---------------------------------------------------------------------
(1 row)
SELECT pg_sleep(1);
pg_sleep
---------------------------------------------------------------------
(1 row)
SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period,
(cpu_usage_in_this_period>0) AS cpu_is_used_in_this_period, (cpu_usage_in_last_period>0) AS cpu_is_used_in_last_period
FROM citus_stat_tenants_local
@ -288,6 +301,12 @@ SELECT sleep_until_next_period();
(1 row)
SELECT pg_sleep(1);
pg_sleep
---------------------------------------------------------------------
(1 row)
SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period,
(cpu_usage_in_this_period>0) AS cpu_is_used_in_this_period, (cpu_usage_in_last_period>0) AS cpu_is_used_in_last_period
FROM citus_stat_tenants_local
@ -1009,6 +1028,89 @@ SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, q
\c - - - :master_port
SET search_path TO citus_stat_tenants;
SET citus.enable_schema_based_sharding TO OFF;
SELECT citus_stat_tenants_reset();
citus_stat_tenants_reset
---------------------------------------------------------------------
(1 row)
-- test sampling
-- set rate to 0 to disable sampling
SELECT result FROM run_command_on_all_nodes('ALTER SYSTEM set citus.stat_tenants_untracked_sample_rate to 0;');
result
---------------------------------------------------------------------
ALTER SYSTEM
ALTER SYSTEM
ALTER SYSTEM
(3 rows)
SELECT result FROM run_command_on_all_nodes('SELECT pg_reload_conf()');
result
---------------------------------------------------------------------
t
t
t
(3 rows)
INSERT INTO dist_tbl VALUES (1, 'abcd');
INSERT INTO dist_tbl VALUES (2, 'abcd');
UPDATE dist_tbl SET b = a + 1 WHERE a = 3;
UPDATE dist_tbl SET b = a + 1 WHERE a = 4;
DELETE FROM dist_tbl WHERE a = 5;
SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period FROM citus_stat_tenants ORDER BY tenant_attribute;
tenant_attribute | read_count_in_this_period | read_count_in_last_period | query_count_in_this_period | query_count_in_last_period
---------------------------------------------------------------------
(0 rows)
-- test sampling
-- set rate to 1 to track all tenants
SELECT result FROM run_command_on_all_nodes('ALTER SYSTEM set citus.stat_tenants_untracked_sample_rate to 1;');
result
---------------------------------------------------------------------
ALTER SYSTEM
ALTER SYSTEM
ALTER SYSTEM
(3 rows)
SELECT result FROM run_command_on_all_nodes('SELECT pg_reload_conf()');
result
---------------------------------------------------------------------
t
t
t
(3 rows)
SELECT sleep_until_next_period();
sleep_until_next_period
---------------------------------------------------------------------
(1 row)
SELECT pg_sleep(0.1);
pg_sleep
---------------------------------------------------------------------
(1 row)
INSERT INTO dist_tbl VALUES (1, 'abcd');
INSERT INTO dist_tbl VALUES (2, 'abcd');
UPDATE dist_tbl SET b = a + 1 WHERE a = 3;
UPDATE dist_tbl SET b = a + 1 WHERE a = 4;
DELETE FROM dist_tbl WHERE a = 5;
SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period,
(cpu_usage_in_this_period>0) AS cpu_is_used_in_this_period, (cpu_usage_in_last_period>0) AS cpu_is_used_in_last_period
FROM citus_stat_tenants(true)
ORDER BY tenant_attribute;
tenant_attribute | read_count_in_this_period | read_count_in_last_period | query_count_in_this_period | query_count_in_last_period | cpu_is_used_in_this_period | cpu_is_used_in_last_period
---------------------------------------------------------------------
1 | 0 | 0 | 1 | 0 | t | f
2 | 0 | 0 | 1 | 0 | t | f
3 | 0 | 0 | 1 | 0 | t | f
4 | 0 | 0 | 1 | 0 | t | f
5 | 0 | 0 | 1 | 0 | t | f
(5 rows)
SET client_min_messages TO ERROR;
DROP SCHEMA citus_stat_tenants CASCADE;
DROP SCHEMA citus_stat_tenants_t1 CASCADE;

View File

@ -83,20 +83,24 @@ SELECT count(*)>=0 FROM dist_tbl_text WHERE a = 'defg';
SELECT tenant_attribute, query_count_in_this_period, score FROM citus_stat_tenants(true) WHERE nodeid = :worker_2_nodeid ORDER BY score DESC, tenant_attribute;
-- test period passing
\c - - - :worker_1_port
SET search_path TO citus_stat_tenants;
SET citus.stat_tenants_period TO 2;
SELECT citus_stat_tenants_reset();
SELECT sleep_until_next_period();
SELECT count(*)>=0 FROM dist_tbl WHERE a = 1;
INSERT INTO dist_tbl VALUES (5, 'abcd');
\c - - - :worker_1_port
SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period,
(cpu_usage_in_this_period>0) AS cpu_is_used_in_this_period, (cpu_usage_in_last_period>0) AS cpu_is_used_in_last_period
FROM citus_stat_tenants_local
ORDER BY tenant_attribute;
-- simulate passing the period
SET citus.stat_tenants_period TO 5;
SELECT sleep_until_next_period();
SELECT pg_sleep(1);
SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period,
(cpu_usage_in_this_period>0) AS cpu_is_used_in_this_period, (cpu_usage_in_last_period>0) AS cpu_is_used_in_last_period
@ -104,6 +108,7 @@ FROM citus_stat_tenants_local
ORDER BY tenant_attribute;
SELECT sleep_until_next_period();
SELECT pg_sleep(1);
SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period,
(cpu_usage_in_this_period>0) AS cpu_is_used_in_this_period, (cpu_usage_in_last_period>0) AS cpu_is_used_in_last_period
@ -377,6 +382,42 @@ SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, q
\c - - - :master_port
SET search_path TO citus_stat_tenants;
SET citus.enable_schema_based_sharding TO OFF;
SELECT citus_stat_tenants_reset();
-- test sampling
-- set rate to 0 to disable sampling
SELECT result FROM run_command_on_all_nodes('ALTER SYSTEM set citus.stat_tenants_untracked_sample_rate to 0;');
SELECT result FROM run_command_on_all_nodes('SELECT pg_reload_conf()');
INSERT INTO dist_tbl VALUES (1, 'abcd');
INSERT INTO dist_tbl VALUES (2, 'abcd');
UPDATE dist_tbl SET b = a + 1 WHERE a = 3;
UPDATE dist_tbl SET b = a + 1 WHERE a = 4;
DELETE FROM dist_tbl WHERE a = 5;
SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period FROM citus_stat_tenants ORDER BY tenant_attribute;
-- test sampling
-- set rate to 1 to track all tenants
SELECT result FROM run_command_on_all_nodes('ALTER SYSTEM set citus.stat_tenants_untracked_sample_rate to 1;');
SELECT result FROM run_command_on_all_nodes('SELECT pg_reload_conf()');
SELECT sleep_until_next_period();
SELECT pg_sleep(0.1);
INSERT INTO dist_tbl VALUES (1, 'abcd');
INSERT INTO dist_tbl VALUES (2, 'abcd');
UPDATE dist_tbl SET b = a + 1 WHERE a = 3;
UPDATE dist_tbl SET b = a + 1 WHERE a = 4;
DELETE FROM dist_tbl WHERE a = 5;
SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period,
(cpu_usage_in_this_period>0) AS cpu_is_used_in_this_period, (cpu_usage_in_last_period>0) AS cpu_is_used_in_last_period
FROM citus_stat_tenants(true)
ORDER BY tenant_attribute;
SET client_min_messages TO ERROR;
DROP SCHEMA citus_stat_tenants CASCADE;
DROP SCHEMA citus_stat_tenants_t1 CASCADE;