From d85c277404fe57b4f210d2dacd567da3bf955e37 Mon Sep 17 00:00:00 2001 From: Gokhan Gulbiz Date: Mon, 13 Mar 2023 11:36:54 +0300 Subject: [PATCH 01/17] Use strncpy_s instead of strncpy --- src/backend/distributed/utils/attribute.c | 27 ++++++++++++++++++++--- 1 file changed, 24 insertions(+), 3 deletions(-) diff --git a/src/backend/distributed/utils/attribute.c b/src/backend/distributed/utils/attribute.c index 14accae8b..448efb76c 100644 --- a/src/backend/distributed/utils/attribute.c +++ b/src/backend/distributed/utils/attribute.c @@ -61,6 +61,7 @@ static int CreateTenantStats(MultiTenantMonitor *monitor); static int FindTenantStats(MultiTenantMonitor *monitor); static size_t MultiTenantMonitorshmemSize(void); static char * extractTopComment(const char *inputString); +static char* get_substring(const char* str, int start, int end); int MultiTenantMonitoringLogLevel = CITUS_LOG_LEVEL_OFF; int CitusStatsTenantsPeriod = (time_t) 60; @@ -599,12 +600,32 @@ extractTopComment(const char *inputString) if (i > 2) { - char *result = (char *) palloc(sizeof(char) * (i - 1)); - strncpy(result, inputString + 2, i - 2); - return result; + return get_substring(inputString, 2, i); } else { return NULL; } } + +static char* +get_substring(const char* str, int start, int end) { + int len = strlen(str); + char* substr = NULL; + + // Ensure start and end are within the bounds of the string + if (start < 0 || end > len || start > end) { + return NULL; + } + + // Allocate memory for the substring + substr = (char*) palloc((end - start + 1) * sizeof(char)); + + // Copy the substring to the new memory location + strncpy_s(substr, end - start + 1, str + start, end - start); + + // Add null terminator to end the substring + substr[end - start] = '\0'; + + return substr; +} From c7af2ff0efd3ec8eac6692d33c17680dd6f9c4bb Mon Sep 17 00:00:00 2001 From: Gokhan Gulbiz Date: Mon, 13 Mar 2023 12:36:02 +0300 Subject: [PATCH 02/17] Normalize multiline sql comment statements --- src/test/regress/bin/normalize.sed | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/test/regress/bin/normalize.sed b/src/test/regress/bin/normalize.sed index a6072945a..87c4f5b7a 100644 --- a/src/test/regress/bin/normalize.sed +++ b/src/test/regress/bin/normalize.sed @@ -304,4 +304,4 @@ s/(NOTICE: issuing SET LOCAL application_name TO 'citus_rebalancer gpid=)[0-9]+ # shard_rebalancer output, flaky improvement number s/improvement of 0.1[0-9]* is lower/improvement of 0.1xxxxx is lower/g -s/\/\* attributeTo.*\*\///g +s/\/\*.*\*\///g From 0de2ebad3fe30a8c0ffd770206efd78400a5b055 Mon Sep 17 00:00:00 2001 From: Gokhan Gulbiz Date: Mon, 13 Mar 2023 12:44:57 +0300 Subject: [PATCH 03/17] Validate input string length --- src/backend/distributed/utils/attribute.c | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/src/backend/distributed/utils/attribute.c b/src/backend/distributed/utils/attribute.c index 448efb76c..3e564d045 100644 --- a/src/backend/distributed/utils/attribute.c +++ b/src/backend/distributed/utils/attribute.c @@ -585,8 +585,13 @@ MultiTenantMonitorshmemSize(void) static char * extractTopComment(const char *inputString) { - int i = 0; + int commentStartCharsLength = 2; + if (strlen(inputString) < commentStartCharsLength ) + { + return NULL; + } + int i = 0; /* If query starts with a comment */ if (inputString[i] == '/' && inputString[i + 1] == '*') { @@ -598,9 +603,9 @@ extractTopComment(const char *inputString) } } - if (i > 2) + if (i > commentStartCharsLength) { - return get_substring(inputString, 2, i); + return get_substring(inputString, commentStartCharsLength, i); } else { From 59990addd14421998d253a0973aaf1cee94d19f7 Mon Sep 17 00:00:00 2001 From: Gokhan Gulbiz Date: Mon, 13 Mar 2023 13:24:27 +0300 Subject: [PATCH 04/17] Validate attribute prefix existance on query string --- src/backend/distributed/utils/attribute.c | 42 +++++++++++------------ 1 file changed, 20 insertions(+), 22 deletions(-) diff --git a/src/backend/distributed/utils/attribute.c b/src/backend/distributed/utils/attribute.c index 3e564d045..75f5abf02 100644 --- a/src/backend/distributed/utils/attribute.c +++ b/src/backend/distributed/utils/attribute.c @@ -30,7 +30,7 @@ static void AttributeMetricsIfApplicable(void); ExecutorEnd_hook_type prev_ExecutorEnd = NULL; -#define ATTRIBUTE_PREFIX "/*{" +#define ATTRIBUTE_PREFIX "{\"tId\":" #define ATTRIBUTE_STRING_FORMAT "/*{\"tId\":%s,\"cId\":%d}*/" #define CITUS_STATS_TENANTS_COLUMNS 7 #define ONE_QUERY_SCORE 1000000000 @@ -159,34 +159,32 @@ AttributeQueryIfAnnotated(const char *query_string, CmdType commandType) return; } - char *annotation = extractTopComment(query_string); - if (annotation != NULL) + if (strncmp(ATTRIBUTE_PREFIX, query_string, strlen(ATTRIBUTE_PREFIX)) == 0) { - Datum jsonbDatum = DirectFunctionCall1(jsonb_in, PointerGetDatum(annotation)); - - text *tenantIdTextP = ExtractFieldTextP(jsonbDatum, "tId"); - if (tenantIdTextP != NULL) + char *annotation = extractTopComment(query_string); + if (annotation != NULL) { - char *tenantId = text_to_cstring(tenantIdTextP); - strcpy_s(attributeToTenant, sizeof(attributeToTenant), tenantId); - } + Datum jsonbDatum = DirectFunctionCall1(jsonb_in, PointerGetDatum(annotation)); - colocationGroupId = ExtractFieldInt32(jsonbDatum, "cId", 0); + text *tenantIdTextP = ExtractFieldTextP(jsonbDatum, "tId"); + if (tenantIdTextP != NULL) + { + char *tenantId = text_to_cstring(tenantIdTextP); + strcpy_s(attributeToTenant, sizeof(attributeToTenant), tenantId); + } - if (MultiTenantMonitoringLogLevel != CITUS_LOG_LEVEL_OFF) - { - ereport(NOTICE, (errmsg( - "attributing query to tenant: %s, colocationGroupId: %d", - quote_literal_cstr(attributeToTenant), - colocationGroupId))); + colocationGroupId = ExtractFieldInt32(jsonbDatum, "cId", 0); + + if (MultiTenantMonitoringLogLevel != CITUS_LOG_LEVEL_OFF) + { + ereport(NOTICE, (errmsg( + "attributing query to tenant: %s, colocationGroupId: %d", + quote_literal_cstr(attributeToTenant), + colocationGroupId))); + } } } - else - { - /*Assert(attributeToTenant == NULL); */ - } - /*DetachSegment(); */ attributeToTenantStart = clock(); } From 596954622c9456db2965b7f735c7a1d0f4b440b7 Mon Sep 17 00:00:00 2001 From: Gokhan Gulbiz Date: Mon, 13 Mar 2023 14:31:41 +0300 Subject: [PATCH 05/17] Indent --- src/backend/distributed/utils/attribute.c | 43 ++++++++++++----------- 1 file changed, 23 insertions(+), 20 deletions(-) diff --git a/src/backend/distributed/utils/attribute.c b/src/backend/distributed/utils/attribute.c index 75f5abf02..e9b9f3624 100644 --- a/src/backend/distributed/utils/attribute.c +++ b/src/backend/distributed/utils/attribute.c @@ -61,7 +61,7 @@ static int CreateTenantStats(MultiTenantMonitor *monitor); static int FindTenantStats(MultiTenantMonitor *monitor); static size_t MultiTenantMonitorshmemSize(void); static char * extractTopComment(const char *inputString); -static char* get_substring(const char* str, int start, int end); +static char * get_substring(const char *str, int start, int end); int MultiTenantMonitoringLogLevel = CITUS_LOG_LEVEL_OFF; int CitusStatsTenantsPeriod = (time_t) 60; @@ -178,9 +178,9 @@ AttributeQueryIfAnnotated(const char *query_string, CmdType commandType) if (MultiTenantMonitoringLogLevel != CITUS_LOG_LEVEL_OFF) { ereport(NOTICE, (errmsg( - "attributing query to tenant: %s, colocationGroupId: %d", - quote_literal_cstr(attributeToTenant), - colocationGroupId))); + "attributing query to tenant: %s, colocationGroupId: %d", + quote_literal_cstr(attributeToTenant), + colocationGroupId))); } } } @@ -584,12 +584,13 @@ static char * extractTopComment(const char *inputString) { int commentStartCharsLength = 2; - if (strlen(inputString) < commentStartCharsLength ) + if (strlen(inputString) < commentStartCharsLength) { return NULL; } int i = 0; + /* If query starts with a comment */ if (inputString[i] == '/' && inputString[i + 1] == '*') { @@ -611,24 +612,26 @@ extractTopComment(const char *inputString) } } -static char* -get_substring(const char* str, int start, int end) { - int len = strlen(str); - char* substr = NULL; - // Ensure start and end are within the bounds of the string - if (start < 0 || end > len || start > end) { - return NULL; - } +static char * +get_substring(const char *str, int start, int end) +{ + int len = strlen(str); - // Allocate memory for the substring - substr = (char*) palloc((end - start + 1) * sizeof(char)); + /* Ensure start and end are within the bounds of the string */ + if (start < 0 || end > len || start > end) + { + return NULL; + } - // Copy the substring to the new memory location - strncpy_s(substr, end - start + 1, str + start, end - start); + /* Allocate memory for the substring */ + char *substr = (char *) palloc((end - start + 1) * sizeof(char)); - // Add null terminator to end the substring - substr[end - start] = '\0'; + /* Copy the substring to the new memory location */ + strncpy_s(substr, end - start + 1, str + start, end - start); - return substr; + /* Add null terminator to end the substring */ + substr[end - start] = '\0'; + + return substr; } From c241af1eb9ec2cf0a6ac26c681176b7c4b06f0e5 Mon Sep 17 00:00:00 2001 From: Gokhan Gulbiz Date: Mon, 13 Mar 2023 14:32:16 +0300 Subject: [PATCH 06/17] Fix tenant statistics annotations normalization --- src/test/regress/bin/normalize.sed | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/test/regress/bin/normalize.sed b/src/test/regress/bin/normalize.sed index 87c4f5b7a..d6e8a6111 100644 --- a/src/test/regress/bin/normalize.sed +++ b/src/test/regress/bin/normalize.sed @@ -303,5 +303,5 @@ s/(NOTICE: issuing SET LOCAL application_name TO 'citus_rebalancer gpid=)[0-9]+ # shard_rebalancer output, flaky improvement number s/improvement of 0.1[0-9]* is lower/improvement of 0.1xxxxx is lower/g - -s/\/\*.*\*\///g +# normalize tenants statistics annotations +s/\/\*\{"tId":.*\*\///g From bb1db823f73b8975ca8449a0d2ff73af97c608c6 Mon Sep 17 00:00:00 2001 From: Halil Ozan Akgul Date: Mon, 13 Mar 2023 19:41:55 +0300 Subject: [PATCH 07/17] Add tests --- src/backend/distributed/utils/attribute.c | 21 +- src/include/distributed/utils/attribute.h | 1 - .../regress/expected/citus_stats_tenants.out | 298 ++++++++++++++++++ src/test/regress/multi_1_schedule | 5 + src/test/regress/pg_regress_multi.pl | 1 + src/test/regress/sql/citus_stats_tenants.sql | 112 +++++++ 6 files changed, 435 insertions(+), 3 deletions(-) create mode 100644 src/test/regress/expected/citus_stats_tenants.out create mode 100644 src/test/regress/sql/citus_stats_tenants.sql diff --git a/src/backend/distributed/utils/attribute.c b/src/backend/distributed/utils/attribute.c index 074c2385e..3bedf4ec4 100644 --- a/src/backend/distributed/utils/attribute.c +++ b/src/backend/distributed/utils/attribute.c @@ -66,6 +66,7 @@ int CitusStatsTenantsLimit = 10; PG_FUNCTION_INFO_V1(citus_stats_tenants); +PG_FUNCTION_INFO_V1(clean_citus_stats_tenants); /* @@ -143,13 +144,27 @@ citus_stats_tenants(PG_FUNCTION_ARGS) } +/* + * clean_citus_stats_tenants cleans the citus_stats_tenants monitor. + */ +Datum +clean_citus_stats_tenants(PG_FUNCTION_ARGS) +{ + MultiTenantMonitor *monitor = GetMultiTenantMonitor(); + monitor->tenantCount = 0; + monitor->periodStart = time(0); + + PG_RETURN_VOID(); +} + + /* * AttributeQueryIfAnnotated assigns the attributes of tenant if the query is annotated. */ void AttributeQueryIfAnnotated(const char *query_string, CmdType commandType) { -/* attributeToTenant = NULL; */ + strcpy_s(attributeToTenant, sizeof(attributeToTenant), ""); attributeCommandType = commandType; @@ -369,7 +384,7 @@ AttributeMetricsIfApplicable() } } - /*attributeToTenant = NULL; */ + strcpy_s(attributeToTenant, sizeof(attributeToTenant), ""); } @@ -550,6 +565,8 @@ CreateTenantStats(MultiTenantMonitor *monitor) { int tenantIndex = monitor->tenantCount; + memset(&monitor->tenants[tenantIndex], 0 ,sizeof(monitor->tenants[tenantIndex])); + strcpy_s(monitor->tenants[tenantIndex].tenantAttribute, sizeof(monitor->tenants[tenantIndex].tenantAttribute), attributeToTenant); monitor->tenants[tenantIndex].colocationGroupId = colocationGroupId; diff --git a/src/include/distributed/utils/attribute.h b/src/include/distributed/utils/attribute.h index 7f7e14d50..b196158b0 100644 --- a/src/include/distributed/utils/attribute.h +++ b/src/include/distributed/utils/attribute.h @@ -35,7 +35,6 @@ typedef struct TenantStats long long score; time_t lastScoreReduction; - int rank; NamedLWLockTranche namedLockTranche; LWLock lock; diff --git a/src/test/regress/expected/citus_stats_tenants.out b/src/test/regress/expected/citus_stats_tenants.out new file mode 100644 index 000000000..0186bbe16 --- /dev/null +++ b/src/test/regress/expected/citus_stats_tenants.out @@ -0,0 +1,298 @@ +CREATE SCHEMA citus_stats_tenants; +SET search_path TO citus_stats_tenants; +SET citus.next_shard_id TO 5797500; +SET citus.shard_replication_factor TO 1; +CREATE OR REPLACE FUNCTION pg_catalog.clean_citus_stats_tenants() +RETURNS VOID +LANGUAGE C +AS 'citus', $$clean_citus_stats_tenants$$; +SELECT result FROM run_command_on_all_nodes('SELECT clean_citus_stats_tenants()'); + result +--------------------------------------------------------------------- + + + +(3 rows) + +CREATE TABLE dist_tbl (a INT, b TEXT); +SELECT create_distributed_table('dist_tbl', 'a', shard_count:=4, colocate_with:='none'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +CREATE TABLE dist_tbl_2 (a INT, b INT); +SELECT create_distributed_table('dist_tbl_2', 'a', colocate_with:='dist_tbl'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +CREATE TABLE dist_tbl_text (a TEXT, b INT); +SELECT create_distributed_table('dist_tbl_text', 'a', shard_count:=4, colocate_with:='none'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +CREATE TABLE ref_tbl (a INT, b INT); +SELECT create_reference_table('ref_tbl'); + create_reference_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO dist_tbl VALUES (1, 'abcd'); +INSERT INTO dist_tbl VALUES (2, 'abcd'); +INSERT INTO dist_tbl VALUES (3, 'abcd'); +INSERT INTO dist_tbl VALUES (4, 'abcd'); +INSERT INTO dist_tbl VALUES (5, 'abcd'); +\c - - - :worker_1_port +SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period FROM citus_stats_tenants ORDER BY tenant_attribute; + tenant_attribute | read_count_in_this_period | read_count_in_last_period | query_count_in_this_period | query_count_in_last_period +--------------------------------------------------------------------- + 1 | 0 | 0 | 1 | 0 + 5 | 0 | 0 | 1 | 0 +(2 rows) + +\c - - - :worker_2_port +SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period FROM citus_stats_tenants ORDER BY tenant_attribute; + tenant_attribute | read_count_in_this_period | read_count_in_last_period | query_count_in_this_period | query_count_in_last_period +--------------------------------------------------------------------- + 2 | 0 | 0 | 1 | 0 + 3 | 0 | 0 | 1 | 0 +(2 rows) + +\c - - - :master_port +SET search_path TO citus_stats_tenants; +SELECT result FROM run_command_on_all_nodes('ALTER SYSTEM SET citus.stats_tenants_period TO 3'); + result +--------------------------------------------------------------------- + ALTER SYSTEM + ALTER SYSTEM + ALTER SYSTEM +(3 rows) + +SELECT result FROM run_command_on_all_nodes('SELECT pg_reload_conf()'); + result +--------------------------------------------------------------------- + t + t + t +(3 rows) + +SELECT count(*)>=0 FROM dist_tbl WHERE a = 1; + ?column? +--------------------------------------------------------------------- + t +(1 row) + +SELECT count(*)>=0 FROM dist_tbl WHERE a = 2; + ?column? +--------------------------------------------------------------------- + t +(1 row) + +\c - - - :worker_1_port +SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period FROM citus_stats_tenants ORDER BY tenant_attribute; + tenant_attribute | read_count_in_this_period | read_count_in_last_period | query_count_in_this_period | query_count_in_last_period +--------------------------------------------------------------------- + 1 | 1 | 0 | 2 | 0 + 5 | 0 | 0 | 1 | 0 +(2 rows) + +\c - - - :worker_2_port +SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period FROM citus_stats_tenants ORDER BY tenant_attribute; + tenant_attribute | read_count_in_this_period | read_count_in_last_period | query_count_in_this_period | query_count_in_last_period +--------------------------------------------------------------------- + 2 | 1 | 0 | 2 | 0 + 3 | 0 | 0 | 1 | 0 +(2 rows) + +\c - - - :master_port +SELECT pg_sleep (3); + pg_sleep +--------------------------------------------------------------------- + +(1 row) + +\c - - - :worker_1_port +SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period FROM citus_stats_tenants ORDER BY tenant_attribute; + tenant_attribute | read_count_in_this_period | read_count_in_last_period | query_count_in_this_period | query_count_in_last_period +--------------------------------------------------------------------- + 1 | 0 | 1 | 0 | 2 + 5 | 0 | 0 | 0 | 1 +(2 rows) + +\c - - - :worker_2_port +SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period FROM citus_stats_tenants ORDER BY tenant_attribute; + tenant_attribute | read_count_in_this_period | read_count_in_last_period | query_count_in_this_period | query_count_in_last_period +--------------------------------------------------------------------- + 2 | 0 | 1 | 0 | 2 + 3 | 0 | 0 | 0 | 1 +(2 rows) + +\c - - - :master_port +SET search_path TO citus_stats_tenants; +SELECT result FROM run_command_on_all_nodes('ALTER SYSTEM SET citus.stats_tenants_period TO 60'); + result +--------------------------------------------------------------------- + ALTER SYSTEM + ALTER SYSTEM + ALTER SYSTEM +(3 rows) + +SELECT result FROM run_command_on_all_nodes('SELECT pg_reload_conf()'); + result +--------------------------------------------------------------------- + t + t + t +(3 rows) + +-- queries with multiple tenants should not be counted +SELECT count(*)>=0 FROM dist_tbl WHERE a IN (1, 5); + ?column? +--------------------------------------------------------------------- + t +(1 row) + +-- queries with reference tables should not be counted +SELECT count(*)>=0 FROM ref_tbl WHERE a = 1; + ?column? +--------------------------------------------------------------------- + t +(1 row) + +\c - - - :worker_1_port +SELECT tenant_attribute, query_count_in_this_period FROM citus_stats_tenants ORDER BY tenant_attribute; + tenant_attribute | query_count_in_this_period +--------------------------------------------------------------------- + 1 | 0 + 5 | 0 +(2 rows) + +\c - - - :master_port +SET search_path TO citus_stats_tenants; +-- queries with multiple tables but one tenant should be counted +SELECT count(*)>=0 FROM dist_tbl, dist_tbl_2 WHERE dist_tbl.a = 1 AND dist_tbl_2.a = 1; + ?column? +--------------------------------------------------------------------- + t +(1 row) + +SELECT count(*)>=0 FROM dist_tbl JOIN dist_tbl_2 ON dist_tbl.a = dist_tbl_2.a WHERE dist_tbl.a = 1; + ?column? +--------------------------------------------------------------------- + t +(1 row) + +\c - - - :worker_1_port +SELECT tenant_attribute, query_count_in_this_period FROM citus_stats_tenants WHERE tenant_attribute = '1'; + tenant_attribute | query_count_in_this_period +--------------------------------------------------------------------- + 1 | 2 +(1 row) + +\c - - - :master_port +SET search_path TO citus_stats_tenants; +-- test scoring +-- all of these distribution column values are from second worker +SELECT count(*)>=0 FROM dist_tbl WHERE a = 2; + ?column? +--------------------------------------------------------------------- + t +(1 row) + +SELECT count(*)>=0 FROM dist_tbl WHERE a = 3; + ?column? +--------------------------------------------------------------------- + t +(1 row) + +SELECT count(*)>=0 FROM dist_tbl WHERE a = 4; + ?column? +--------------------------------------------------------------------- + t +(1 row) + +SELECT count(*)>=0 FROM dist_tbl_text WHERE a = 'abcd'; + ?column? +--------------------------------------------------------------------- + t +(1 row) + +\c - - - :worker_2_port +SELECT tenant_attribute, query_count_in_this_period, score FROM citus_stats_tenants(true) ORDER BY score DESC; + tenant_attribute | query_count_in_this_period | score +--------------------------------------------------------------------- + 2 | 1 | 2000000000 + 3 | 1 | 1500000000 + 4 | 1 | 1500000000 + abcd | 1 | 1000000000 +(4 rows) + +\c - - - :master_port +SET search_path TO citus_stats_tenants; +SELECT count(*)>=0 FROM dist_tbl_text WHERE a = 'abcd'; + ?column? +--------------------------------------------------------------------- + t +(1 row) + +SELECT count(*)>=0 FROM dist_tbl_text WHERE a = 'abcd'; + ?column? +--------------------------------------------------------------------- + t +(1 row) + +SELECT count(*)>=0 FROM dist_tbl_text WHERE a = 'bcde'; + ?column? +--------------------------------------------------------------------- + t +(1 row) + +\c - - - :worker_2_port +SELECT tenant_attribute, query_count_in_this_period, score FROM citus_stats_tenants(true) ORDER BY score DESC; + tenant_attribute | query_count_in_this_period | score +--------------------------------------------------------------------- + abcd | 3 | 3000000000 + 2 | 1 | 2000000000 + 3 | 1 | 1500000000 + 4 | 1 | 1500000000 + bcde | 1 | 1000000000 +(5 rows) + +\c - - - :master_port +SET search_path TO citus_stats_tenants; +SELECT count(*)>=0 FROM dist_tbl_text WHERE a = 'bcde'; + ?column? +--------------------------------------------------------------------- + t +(1 row) + +SELECT count(*)>=0 FROM dist_tbl_text WHERE a = 'bcde'; + ?column? +--------------------------------------------------------------------- + t +(1 row) + +SELECT count(*)>=0 FROM dist_tbl_text WHERE a = 'cdef'; + ?column? +--------------------------------------------------------------------- + t +(1 row) + +\c - - - :worker_2_port +SELECT tenant_attribute, query_count_in_this_period, score FROM citus_stats_tenants(true) ORDER BY score DESC; + tenant_attribute | query_count_in_this_period | score +--------------------------------------------------------------------- + abcd | 3 | 3000000000 + bcde | 3 | 3000000000 + 2 | 1 | 2000000000 + 3 | 1 | 1500000000 +(4 rows) + +\c - - - :master_port +SET client_min_messages TO ERROR; +DROP SCHEMA citus_stats_tenants CASCADE; diff --git a/src/test/regress/multi_1_schedule b/src/test/regress/multi_1_schedule index 5e2cd17c1..8bd234828 100644 --- a/src/test/regress/multi_1_schedule +++ b/src/test/regress/multi_1_schedule @@ -109,6 +109,11 @@ test: pg13_propagate_statistics # ---------- test: citus_update_table_statistics +# ---------- +# Test for tenant statistics +# ---------- +test: citus_stats_tenants + # ---------- # Parallel TPC-H tests to check our distributed execution behavior # ---------- diff --git a/src/test/regress/pg_regress_multi.pl b/src/test/regress/pg_regress_multi.pl index 119e6a758..b003e1663 100755 --- a/src/test/regress/pg_regress_multi.pl +++ b/src/test/regress/pg_regress_multi.pl @@ -485,6 +485,7 @@ push(@pgOptions, "citus.explain_analyze_sort_method='taskId'"); push(@pgOptions, "citus.enable_manual_changes_to_shards=on"); push(@pgOptions, "citus.allow_unsafe_locks_from_workers=on"); push(@pgOptions, "citus.stat_statements_track = 'all'"); +push(@pgOptions, "citus.stats_tenants_limit = 2"); # Some tests look at shards in pg_class, make sure we can usually see them: push(@pgOptions, "citus.show_shards_for_app_name_prefixes='pg_regress'"); diff --git a/src/test/regress/sql/citus_stats_tenants.sql b/src/test/regress/sql/citus_stats_tenants.sql new file mode 100644 index 000000000..c6046984f --- /dev/null +++ b/src/test/regress/sql/citus_stats_tenants.sql @@ -0,0 +1,112 @@ +CREATE SCHEMA citus_stats_tenants; +SET search_path TO citus_stats_tenants; +SET citus.next_shard_id TO 5797500; +SET citus.shard_replication_factor TO 1; + +CREATE OR REPLACE FUNCTION pg_catalog.clean_citus_stats_tenants() +RETURNS VOID +LANGUAGE C +AS 'citus', $$clean_citus_stats_tenants$$; + +SELECT result FROM run_command_on_all_nodes('SELECT clean_citus_stats_tenants()'); + +CREATE TABLE dist_tbl (a INT, b TEXT); +SELECT create_distributed_table('dist_tbl', 'a', shard_count:=4, colocate_with:='none'); + +CREATE TABLE dist_tbl_2 (a INT, b INT); +SELECT create_distributed_table('dist_tbl_2', 'a', colocate_with:='dist_tbl'); + +CREATE TABLE dist_tbl_text (a TEXT, b INT); +SELECT create_distributed_table('dist_tbl_text', 'a', shard_count:=4, colocate_with:='none'); + +CREATE TABLE ref_tbl (a INT, b INT); +SELECT create_reference_table('ref_tbl'); + +INSERT INTO dist_tbl VALUES (1, 'abcd'); +INSERT INTO dist_tbl VALUES (2, 'abcd'); +INSERT INTO dist_tbl VALUES (3, 'abcd'); +INSERT INTO dist_tbl VALUES (4, 'abcd'); +INSERT INTO dist_tbl VALUES (5, 'abcd'); + +\c - - - :worker_1_port +SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period FROM citus_stats_tenants ORDER BY tenant_attribute; +\c - - - :worker_2_port +SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period FROM citus_stats_tenants ORDER BY tenant_attribute; +\c - - - :master_port +SET search_path TO citus_stats_tenants; + +SELECT result FROM run_command_on_all_nodes('ALTER SYSTEM SET citus.stats_tenants_period TO 3'); +SELECT result FROM run_command_on_all_nodes('SELECT pg_reload_conf()'); + +SELECT count(*)>=0 FROM dist_tbl WHERE a = 1; +SELECT count(*)>=0 FROM dist_tbl WHERE a = 2; + +\c - - - :worker_1_port +SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period FROM citus_stats_tenants ORDER BY tenant_attribute; +\c - - - :worker_2_port +SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period FROM citus_stats_tenants ORDER BY tenant_attribute; +\c - - - :master_port + +SELECT pg_sleep (3); + +\c - - - :worker_1_port +SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period FROM citus_stats_tenants ORDER BY tenant_attribute; +\c - - - :worker_2_port +SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period FROM citus_stats_tenants ORDER BY tenant_attribute; +\c - - - :master_port +SET search_path TO citus_stats_tenants; + +SELECT result FROM run_command_on_all_nodes('ALTER SYSTEM SET citus.stats_tenants_period TO 60'); +SELECT result FROM run_command_on_all_nodes('SELECT pg_reload_conf()'); + +-- queries with multiple tenants should not be counted +SELECT count(*)>=0 FROM dist_tbl WHERE a IN (1, 5); + +-- queries with reference tables should not be counted +SELECT count(*)>=0 FROM ref_tbl WHERE a = 1; + +\c - - - :worker_1_port +SELECT tenant_attribute, query_count_in_this_period FROM citus_stats_tenants ORDER BY tenant_attribute; +\c - - - :master_port +SET search_path TO citus_stats_tenants; + +-- queries with multiple tables but one tenant should be counted +SELECT count(*)>=0 FROM dist_tbl, dist_tbl_2 WHERE dist_tbl.a = 1 AND dist_tbl_2.a = 1; +SELECT count(*)>=0 FROM dist_tbl JOIN dist_tbl_2 ON dist_tbl.a = dist_tbl_2.a WHERE dist_tbl.a = 1; + +\c - - - :worker_1_port +SELECT tenant_attribute, query_count_in_this_period FROM citus_stats_tenants WHERE tenant_attribute = '1'; +\c - - - :master_port +SET search_path TO citus_stats_tenants; + +-- test scoring +-- all of these distribution column values are from second worker +SELECT count(*)>=0 FROM dist_tbl WHERE a = 2; +SELECT count(*)>=0 FROM dist_tbl WHERE a = 3; +SELECT count(*)>=0 FROM dist_tbl WHERE a = 4; +SELECT count(*)>=0 FROM dist_tbl_text WHERE a = 'abcd'; + +\c - - - :worker_2_port +SELECT tenant_attribute, query_count_in_this_period, score FROM citus_stats_tenants(true) ORDER BY score DESC; +\c - - - :master_port +SET search_path TO citus_stats_tenants; + +SELECT count(*)>=0 FROM dist_tbl_text WHERE a = 'abcd'; +SELECT count(*)>=0 FROM dist_tbl_text WHERE a = 'abcd'; +SELECT count(*)>=0 FROM dist_tbl_text WHERE a = 'bcde'; + +\c - - - :worker_2_port +SELECT tenant_attribute, query_count_in_this_period, score FROM citus_stats_tenants(true) ORDER BY score DESC; +\c - - - :master_port +SET search_path TO citus_stats_tenants; + +SELECT count(*)>=0 FROM dist_tbl_text WHERE a = 'bcde'; +SELECT count(*)>=0 FROM dist_tbl_text WHERE a = 'bcde'; +SELECT count(*)>=0 FROM dist_tbl_text WHERE a = 'cdef'; + +\c - - - :worker_2_port +SELECT tenant_attribute, query_count_in_this_period, score FROM citus_stats_tenants(true) ORDER BY score DESC; +\c - - - :master_port + +SET client_min_messages TO ERROR; +DROP SCHEMA citus_stats_tenants CASCADE; From 93fb53b72b88add8fa5f183770a8980830f2f87b Mon Sep 17 00:00:00 2001 From: Halil Ozan Akgul Date: Tue, 14 Mar 2023 10:53:27 +0300 Subject: [PATCH 08/17] Remove storage view --- .../distributed/sql/citus--11.2-1--11.3-1.sql | 1 - .../sql/downgrades/citus--11.3-1--11.2-1.sql | 3 -- .../citus_stats_tenants_storage/11.3-1.sql | 28 ------------------- .../citus_stats_tenants_storage/latest.sql | 28 ------------------- 4 files changed, 60 deletions(-) delete mode 100644 src/backend/distributed/sql/udfs/citus_stats_tenants_storage/11.3-1.sql delete mode 100644 src/backend/distributed/sql/udfs/citus_stats_tenants_storage/latest.sql diff --git a/src/backend/distributed/sql/citus--11.2-1--11.3-1.sql b/src/backend/distributed/sql/citus--11.2-1--11.3-1.sql index 6fbeede70..73a2bf8a9 100644 --- a/src/backend/distributed/sql/citus--11.2-1--11.3-1.sql +++ b/src/backend/distributed/sql/citus--11.2-1--11.3-1.sql @@ -3,4 +3,3 @@ -- bump version to 11.3-1 #include "udfs/citus_stats_tenants/11.3-1.sql" -#include "udfs/citus_stats_tenants_storage/11.3-1.sql" diff --git a/src/backend/distributed/sql/downgrades/citus--11.3-1--11.2-1.sql b/src/backend/distributed/sql/downgrades/citus--11.3-1--11.2-1.sql index 855b4f9d1..4bf4c75ff 100644 --- a/src/backend/distributed/sql/downgrades/citus--11.3-1--11.2-1.sql +++ b/src/backend/distributed/sql/downgrades/citus--11.3-1--11.2-1.sql @@ -2,7 +2,4 @@ -- this is an empty downgrade path since citus--11.2-1--11.3-1.sql is empty for now DROP VIEW pg_catalog.citus_stats_tenants; -DROP VIEW pg_catalog.citus_stats_tenants_storage; - DROP FUNCTION pg_catalog.citus_stats_tenants(boolean); -DROP FUNCTION pg_catalog.citus_stats_tenants_storage; diff --git a/src/backend/distributed/sql/udfs/citus_stats_tenants_storage/11.3-1.sql b/src/backend/distributed/sql/udfs/citus_stats_tenants_storage/11.3-1.sql deleted file mode 100644 index c1604a7a1..000000000 --- a/src/backend/distributed/sql/udfs/citus_stats_tenants_storage/11.3-1.sql +++ /dev/null @@ -1,28 +0,0 @@ -CREATE OR REPLACE FUNCTION pg_catalog.citus_stats_tenants_storage ( - OUT colocation_id INT, - OUT tenant_attribute TEXT, - OUT storage_estimate INT -) -RETURNS SETOF record -LANGUAGE plpgsql -AS $function$ -DECLARE -tn TEXT; -dc TEXT; -ci INT; -BEGIN - FOR ci, tn, dc IN SELECT cts.colocation_id, cts.table_name, cts.distribution_column FROM citus_tables cts - LOOP - RETURN QUERY - EXECUTE 'SELECT ' || ci || '::int, ' || dc || '::text, sum(pg_column_size(' || tn || '.*))::int FROM ' || tn || ' GROUP BY ' || dc; - END LOOP; -END; -$function$; - -CREATE OR REPLACE VIEW citus.citus_stats_tenants_storage AS -SELECT colocation_id, tenant_attribute, sum(storage_estimate) total_storage FROM pg_catalog.citus_stats_tenants_storage() -GROUP BY colocation_id, tenant_attribute -ORDER BY total_storage DESC; - -ALTER VIEW citus.citus_stats_tenants_storage SET SCHEMA pg_catalog; -GRANT SELECT ON pg_catalog.citus_stats_tenants_storage TO PUBLIC; diff --git a/src/backend/distributed/sql/udfs/citus_stats_tenants_storage/latest.sql b/src/backend/distributed/sql/udfs/citus_stats_tenants_storage/latest.sql deleted file mode 100644 index c1604a7a1..000000000 --- a/src/backend/distributed/sql/udfs/citus_stats_tenants_storage/latest.sql +++ /dev/null @@ -1,28 +0,0 @@ -CREATE OR REPLACE FUNCTION pg_catalog.citus_stats_tenants_storage ( - OUT colocation_id INT, - OUT tenant_attribute TEXT, - OUT storage_estimate INT -) -RETURNS SETOF record -LANGUAGE plpgsql -AS $function$ -DECLARE -tn TEXT; -dc TEXT; -ci INT; -BEGIN - FOR ci, tn, dc IN SELECT cts.colocation_id, cts.table_name, cts.distribution_column FROM citus_tables cts - LOOP - RETURN QUERY - EXECUTE 'SELECT ' || ci || '::int, ' || dc || '::text, sum(pg_column_size(' || tn || '.*))::int FROM ' || tn || ' GROUP BY ' || dc; - END LOOP; -END; -$function$; - -CREATE OR REPLACE VIEW citus.citus_stats_tenants_storage AS -SELECT colocation_id, tenant_attribute, sum(storage_estimate) total_storage FROM pg_catalog.citus_stats_tenants_storage() -GROUP BY colocation_id, tenant_attribute -ORDER BY total_storage DESC; - -ALTER VIEW citus.citus_stats_tenants_storage SET SCHEMA pg_catalog; -GRANT SELECT ON pg_catalog.citus_stats_tenants_storage TO PUBLIC; From 2aa30dd303cfb12b42a6db0cd28b9593d94d515b Mon Sep 17 00:00:00 2001 From: Halil Ozan Akgul Date: Tue, 14 Mar 2023 12:42:45 +0300 Subject: [PATCH 09/17] Remove citus_stats_tenants from tests --- src/test/regress/expected/multi_extension.out | 4 +--- src/test/regress/expected/upgrade_list_citus_objects.out | 4 +--- 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index b475d2b6a..2b165bfeb 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -1326,10 +1326,8 @@ SELECT * FROM multi_extension.print_extension_changes(); previous_object | current_object --------------------------------------------------------------------- | function citus_stats_tenants(boolean) SETOF record - | function citus_stats_tenants_storage() SETOF record | view citus_stats_tenants - | view citus_stats_tenants_storage -(4 rows) +(2 rows) DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff; -- show running version diff --git a/src/test/regress/expected/upgrade_list_citus_objects.out b/src/test/regress/expected/upgrade_list_citus_objects.out index 6733a28fc..8005c0c42 100644 --- a/src/test/regress/expected/upgrade_list_citus_objects.out +++ b/src/test/regress/expected/upgrade_list_citus_objects.out @@ -122,7 +122,6 @@ ORDER BY 1; function citus_stat_statements() function citus_stat_statements_reset() function citus_stats_tenants(boolean) - function citus_stats_tenants_storage() function citus_table_is_visible(oid) function citus_table_size(regclass) function citus_task_wait(bigint,citus_task_status) @@ -319,8 +318,7 @@ ORDER BY 1; view citus_stat_activity view citus_stat_statements view citus_stats_tenants - view citus_stats_tenants_storage view pg_dist_shard_placement view time_partitions -(314 rows) +(312 rows) From 5e9e67e5bf5e9d7cd637a563bb4b4d3efb50b316 Mon Sep 17 00:00:00 2001 From: Halil Ozan Akgul Date: Tue, 14 Mar 2023 15:52:36 +0300 Subject: [PATCH 10/17] Include update and delete queries --- .../planner/multi_router_planner.c | 16 ++++++- src/backend/distributed/utils/attribute.c | 48 ++++++++++--------- src/include/distributed/utils/attribute.h | 16 +++---- .../regress/expected/citus_stats_tenants.out | 6 +-- src/test/regress/sql/citus_stats_tenants.sql | 6 +-- 5 files changed, 54 insertions(+), 38 deletions(-) diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index 7e596bfe5..e2776cc9a 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -1997,11 +1997,25 @@ GenerateSingleShardRouterTaskList(Job *job, List *relationShardList, } else { + Datum partitionColumnValue; + Oid partitionColumnType = 0; + char *partitionColumnString = NULL; + if (job->partitionKeyValue != NULL) + { + partitionColumnValue = job->partitionKeyValue->constvalue; + partitionColumnType = job->partitionKeyValue->consttype; + partitionColumnString = DatumToString(partitionColumnValue, + partitionColumnType); + } + + SetJobColocationId(job); + job->taskList = SingleShardTaskList(originalQuery, job->jobId, relationShardList, placementList, shardId, job->parametersInJobQueryResolved, - isLocalTableModification, "", -1); + isLocalTableModification, + partitionColumnString, job->colocationId); } } diff --git a/src/backend/distributed/utils/attribute.c b/src/backend/distributed/utils/attribute.c index 3bedf4ec4..cb6f93807 100644 --- a/src/backend/distributed/utils/attribute.c +++ b/src/backend/distributed/utils/attribute.c @@ -127,12 +127,12 @@ citus_stats_tenants(PG_FUNCTION_ARGS) values[0] = Int32GetDatum(tenantStats->colocationGroupId); values[1] = PointerGetDatum(cstring_to_text(tenantStats->tenantAttribute)); - values[2] = Int32GetDatum(tenantStats->selectsInThisPeriod); - values[3] = Int32GetDatum(tenantStats->selectsInLastPeriod); - values[4] = Int32GetDatum(tenantStats->selectsInThisPeriod + - tenantStats->insertsInThisPeriod); - values[5] = Int32GetDatum(tenantStats->selectsInLastPeriod + - tenantStats->insertsInLastPeriod); + values[2] = Int32GetDatum(tenantStats->readsInThisPeriod); + values[3] = Int32GetDatum(tenantStats->readsInLastPeriod); + values[4] = Int32GetDatum(tenantStats->readsInThisPeriod + + tenantStats->writesInThisPeriod); + values[5] = Int32GetDatum(tenantStats->readsInLastPeriod + + tenantStats->writesInLastPeriod); values[6] = Int64GetDatum(tenantStats->score); tuplestore_putvalues(tupleStore, tupleDescriptor, values, isNulls); @@ -347,15 +347,17 @@ AttributeMetricsIfApplicable() if (attributeCommandType == CMD_SELECT) { - tenantStats->selectCount++; - tenantStats->selectsInThisPeriod++; - tenantStats->totalSelectTime += cpu_time_used; + tenantStats->readCount++; + tenantStats->readsInThisPeriod++; + tenantStats->totalReadTime += cpu_time_used; } - else if (attributeCommandType == CMD_INSERT) + else if (attributeCommandType == CMD_UPDATE || + attributeCommandType == CMD_INSERT || + attributeCommandType == CMD_DELETE) { - tenantStats->insertCount++; - tenantStats->insertsInThisPeriod++; - tenantStats->totalInsertTime += cpu_time_used; + tenantStats->writeCount++; + tenantStats->writesInThisPeriod++; + tenantStats->totalWriteTime += cpu_time_used; } LWLockRelease(&monitor->lock); @@ -376,10 +378,10 @@ AttributeMetricsIfApplicable() if (MultiTenantMonitoringLogLevel != CITUS_LOG_LEVEL_OFF) { - ereport(NOTICE, (errmsg("total select count = %d, total CPU time = %f " + ereport(NOTICE, (errmsg("total read count = %d, total read CPU time = %f " "to tenant: %s", - tenantStats->selectCount, - tenantStats->totalSelectTime, + tenantStats->readCount, + tenantStats->totalReadTime, tenantStats->tenantAttribute))); } } @@ -405,13 +407,13 @@ UpdatePeriodsIfNecessary(MultiTenantMonitor *monitor, TenantStats *tenantStats) * but there are some query count for this period we move them to the last period. */ if (tenantStats->lastQueryTime < monitor->periodStart && - (tenantStats->insertsInThisPeriod || tenantStats->selectsInThisPeriod)) + (tenantStats->writesInThisPeriod || tenantStats->readsInThisPeriod)) { - tenantStats->insertsInLastPeriod = tenantStats->insertsInThisPeriod; - tenantStats->insertsInThisPeriod = 0; + tenantStats->writesInLastPeriod = tenantStats->writesInThisPeriod; + tenantStats->writesInThisPeriod = 0; - tenantStats->selectsInLastPeriod = tenantStats->selectsInThisPeriod; - tenantStats->selectsInThisPeriod = 0; + tenantStats->readsInLastPeriod = tenantStats->readsInThisPeriod; + tenantStats->readsInThisPeriod = 0; } /* @@ -419,9 +421,9 @@ UpdatePeriodsIfNecessary(MultiTenantMonitor *monitor, TenantStats *tenantStats) */ if (tenantStats->lastQueryTime < monitor->periodStart - CitusStatsTenantsPeriod) { - tenantStats->insertsInLastPeriod = 0; + tenantStats->writesInLastPeriod = 0; - tenantStats->selectsInLastPeriod = 0; + tenantStats->readsInLastPeriod = 0; } } diff --git a/src/include/distributed/utils/attribute.h b/src/include/distributed/utils/attribute.h index b196158b0..c05ec950e 100644 --- a/src/include/distributed/utils/attribute.h +++ b/src/include/distributed/utils/attribute.h @@ -21,15 +21,15 @@ typedef struct TenantStats int colocationGroupId; - int selectCount; - double totalSelectTime; - int selectsInLastPeriod; - int selectsInThisPeriod; + int readCount; + double totalReadTime; + int readsInLastPeriod; + int readsInThisPeriod; - int insertCount; - double totalInsertTime; - int insertsInLastPeriod; - int insertsInThisPeriod; + int writeCount; + double totalWriteTime; + int writesInLastPeriod; + int writesInThisPeriod; time_t lastQueryTime; diff --git a/src/test/regress/expected/citus_stats_tenants.out b/src/test/regress/expected/citus_stats_tenants.out index 0186bbe16..5856bd819 100644 --- a/src/test/regress/expected/citus_stats_tenants.out +++ b/src/test/regress/expected/citus_stats_tenants.out @@ -44,9 +44,9 @@ SELECT create_reference_table('ref_tbl'); INSERT INTO dist_tbl VALUES (1, 'abcd'); INSERT INTO dist_tbl VALUES (2, 'abcd'); -INSERT INTO dist_tbl VALUES (3, 'abcd'); -INSERT INTO dist_tbl VALUES (4, 'abcd'); -INSERT INTO dist_tbl VALUES (5, 'abcd'); +UPDATE dist_tbl SET b = a + 1 WHERE a = 3; +UPDATE dist_tbl SET b = a + 1 WHERE a = 4; +DELETE FROM dist_tbl WHERE a = 5; \c - - - :worker_1_port SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period FROM citus_stats_tenants ORDER BY tenant_attribute; tenant_attribute | read_count_in_this_period | read_count_in_last_period | query_count_in_this_period | query_count_in_last_period diff --git a/src/test/regress/sql/citus_stats_tenants.sql b/src/test/regress/sql/citus_stats_tenants.sql index c6046984f..7c7e1ed4c 100644 --- a/src/test/regress/sql/citus_stats_tenants.sql +++ b/src/test/regress/sql/citus_stats_tenants.sql @@ -24,9 +24,9 @@ SELECT create_reference_table('ref_tbl'); INSERT INTO dist_tbl VALUES (1, 'abcd'); INSERT INTO dist_tbl VALUES (2, 'abcd'); -INSERT INTO dist_tbl VALUES (3, 'abcd'); -INSERT INTO dist_tbl VALUES (4, 'abcd'); -INSERT INTO dist_tbl VALUES (5, 'abcd'); +UPDATE dist_tbl SET b = a + 1 WHERE a = 3; +UPDATE dist_tbl SET b = a + 1 WHERE a = 4; +DELETE FROM dist_tbl WHERE a = 5; \c - - - :worker_1_port SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period FROM citus_stats_tenants ORDER BY tenant_attribute; From 33265dd4114ad95bf74fc2b65b7529062c75f726 Mon Sep 17 00:00:00 2001 From: Halil Ozan Akgul Date: Tue, 14 Mar 2023 17:44:33 +0300 Subject: [PATCH 11/17] Fix indent --- src/backend/distributed/utils/attribute.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/backend/distributed/utils/attribute.c b/src/backend/distributed/utils/attribute.c index cb6f93807..ecae50a47 100644 --- a/src/backend/distributed/utils/attribute.c +++ b/src/backend/distributed/utils/attribute.c @@ -567,7 +567,7 @@ CreateTenantStats(MultiTenantMonitor *monitor) { int tenantIndex = monitor->tenantCount; - memset(&monitor->tenants[tenantIndex], 0 ,sizeof(monitor->tenants[tenantIndex])); + memset(&monitor->tenants[tenantIndex], 0, sizeof(monitor->tenants[tenantIndex])); strcpy_s(monitor->tenants[tenantIndex].tenantAttribute, sizeof(monitor->tenants[tenantIndex].tenantAttribute), attributeToTenant); From 3a435e7c142c3f442d67f40b85d4763445b1afd5 Mon Sep 17 00:00:00 2001 From: Gokhan Gulbiz Date: Wed, 15 Mar 2023 08:08:33 +0300 Subject: [PATCH 12/17] Fix attribute prefix --- src/backend/distributed/utils/attribute.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/backend/distributed/utils/attribute.c b/src/backend/distributed/utils/attribute.c index e9b9f3624..c68cdd683 100644 --- a/src/backend/distributed/utils/attribute.c +++ b/src/backend/distributed/utils/attribute.c @@ -30,7 +30,7 @@ static void AttributeMetricsIfApplicable(void); ExecutorEnd_hook_type prev_ExecutorEnd = NULL; -#define ATTRIBUTE_PREFIX "{\"tId\":" +#define ATTRIBUTE_PREFIX "/*{\"tId\":" #define ATTRIBUTE_STRING_FORMAT "/*{\"tId\":%s,\"cId\":%d}*/" #define CITUS_STATS_TENANTS_COLUMNS 7 #define ONE_QUERY_SCORE 1000000000 From 5cedee7a7df89c862c3e36938af6bf7612c1e0a0 Mon Sep 17 00:00:00 2001 From: Gokhan Gulbiz Date: Wed, 15 Mar 2023 08:09:24 +0300 Subject: [PATCH 13/17] Add comment chars escaping --- src/backend/distributed/utils/attribute.c | 51 ++++++++++++++++++++++- 1 file changed, 49 insertions(+), 2 deletions(-) diff --git a/src/backend/distributed/utils/attribute.c b/src/backend/distributed/utils/attribute.c index c68cdd683..bb61a3781 100644 --- a/src/backend/distributed/utils/attribute.c +++ b/src/backend/distributed/utils/attribute.c @@ -60,8 +60,8 @@ static void MultiTenantMonitorSMInit(void); static int CreateTenantStats(MultiTenantMonitor *monitor); static int FindTenantStats(MultiTenantMonitor *monitor); static size_t MultiTenantMonitorshmemSize(void); -static char * extractTopComment(const char *inputString); -static char * get_substring(const char *str, int start, int end); +static char * EscapeCommentChars(const char *str); +static char * UnescapeCommentChars(const char *str); int MultiTenantMonitoringLogLevel = CITUS_LOG_LEVEL_OFF; int CitusStatsTenantsPeriod = (time_t) 60; @@ -635,3 +635,50 @@ get_substring(const char *str, int start, int end) return substr; } + + +/* EscapeCommentChars adds a backslash before each occurrence of '*' or '/' in the input string */ +static char * +EscapeCommentChars(const char *str) +{ + int len = strlen(str); + char *new_str = (char *) malloc(len * 2 + 1); + int j = 0; + + for (int i = 0; i < len; i++) + { + if (str[i] == '*' || str[i] == '/') + { + new_str[j++] = '\\'; + } + new_str[j++] = str[i]; + } + new_str[j] = '\0'; + + return new_str; +} + + +/* UnescapeCommentChars removes the backslash that precedes '*' or '/' in the input string. */ +static char * +UnescapeCommentChars(const char *str) +{ + int len = strlen(str); + char *new_str = (char *) malloc(len + 1); + int j = 0; + + for (int i = 0; i < len; i++) + { + if (str[i] == '\\' && i < len - 1) + { + if (str[i + 1] == '*' || str[i + 1] == '/') + { + i++; + } + } + new_str[j++] = str[i]; + } + new_str[j] = '\0'; + + return new_str; +} From 230a189c1feb69bc935f30c6199e8fa6a4bf7552 Mon Sep 17 00:00:00 2001 From: Gokhan Gulbiz Date: Wed, 15 Mar 2023 08:10:45 +0300 Subject: [PATCH 14/17] Minor renamings and refactorings --- src/backend/distributed/utils/attribute.c | 31 ++++++++++++++--------- 1 file changed, 19 insertions(+), 12 deletions(-) diff --git a/src/backend/distributed/utils/attribute.c b/src/backend/distributed/utils/attribute.c index bb61a3781..a01ba3701 100644 --- a/src/backend/distributed/utils/attribute.c +++ b/src/backend/distributed/utils/attribute.c @@ -60,6 +60,8 @@ static void MultiTenantMonitorSMInit(void); static int CreateTenantStats(MultiTenantMonitor *monitor); static int FindTenantStats(MultiTenantMonitor *monitor); static size_t MultiTenantMonitorshmemSize(void); +static char * ExtractTopComment(const char *inputString); +static char * Substring(const char *str, int start, int end); static char * EscapeCommentChars(const char *str); static char * UnescapeCommentChars(const char *str); @@ -161,7 +163,7 @@ AttributeQueryIfAnnotated(const char *query_string, CmdType commandType) if (strncmp(ATTRIBUTE_PREFIX, query_string, strlen(ATTRIBUTE_PREFIX)) == 0) { - char *annotation = extractTopComment(query_string); + char *annotation = ExtractTopComment(query_string); if (annotation != NULL) { Datum jsonbDatum = DirectFunctionCall1(jsonb_in, PointerGetDatum(annotation)); @@ -578,33 +580,37 @@ MultiTenantMonitorshmemSize(void) /* - * extractTopComment extracts the top-level multi-line comment from a given input string. + * ExtractTopComment extracts the top-level multi-line comment from a given input string. */ static char * -extractTopComment(const char *inputString) +ExtractTopComment(const char *inputString) { int commentStartCharsLength = 2; - if (strlen(inputString) < commentStartCharsLength) + int inputStringLen = strlen(inputString); + if (inputStringLen < commentStartCharsLength) { return NULL; } - int i = 0; + int commentEndCharsIndex = 0; /* If query starts with a comment */ - if (inputString[i] == '/' && inputString[i + 1] == '*') + if (inputString[commentEndCharsIndex] == '/' && inputString[commentEndCharsIndex + + 1] == '*') { /* Skip the comment start characters */ - i += 2; - while (inputString[i] && (inputString[i] != '*' && inputString[i + 1] != '/')) + commentEndCharsIndex += commentStartCharsLength; + while (inputString[commentEndCharsIndex] && commentEndCharsIndex < + inputStringLen && !(inputString[commentEndCharsIndex] == '*' && + inputString[commentEndCharsIndex + 1] == '/')) { - i++; + commentEndCharsIndex++; } } - if (i > commentStartCharsLength) + if (commentEndCharsIndex > commentStartCharsLength) { - return get_substring(inputString, commentStartCharsLength, i); + return Substring(inputString, commentStartCharsLength, commentEndCharsIndex); } else { @@ -613,8 +619,9 @@ extractTopComment(const char *inputString) } +/* Extracts a substring from the input string between the specified start and end indices.*/ static char * -get_substring(const char *str, int start, int end) +Substring(const char *str, int start, int end) { int len = strlen(str); From 44fec55d581cd62ae90aefd29cd424aeefd79c13 Mon Sep 17 00:00:00 2001 From: Gokhan Gulbiz Date: Wed, 15 Mar 2023 08:11:07 +0300 Subject: [PATCH 15/17] Escape/Unescape sql comment chars --- src/backend/distributed/utils/attribute.c | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/backend/distributed/utils/attribute.c b/src/backend/distributed/utils/attribute.c index a01ba3701..d95b87082 100644 --- a/src/backend/distributed/utils/attribute.c +++ b/src/backend/distributed/utils/attribute.c @@ -23,6 +23,7 @@ #include "utils/builtins.h" #include "utils/json.h" #include "distributed/utils/attribute.h" +#include "common/base64.h" #include @@ -171,7 +172,7 @@ AttributeQueryIfAnnotated(const char *query_string, CmdType commandType) text *tenantIdTextP = ExtractFieldTextP(jsonbDatum, "tId"); if (tenantIdTextP != NULL) { - char *tenantId = text_to_cstring(tenantIdTextP); + char *tenantId = UnescapeCommentChars(text_to_cstring(tenantIdTextP)); strcpy_s(attributeToTenant, sizeof(attributeToTenant), tenantId); } @@ -202,12 +203,15 @@ AnnotateQuery(char *queryString, char *partitionColumn, int colocationId) return queryString; } + char *commentCharsEscaped = EscapeCommentChars(partitionColumn); StringInfo escapedSourceName = makeStringInfo(); - escape_json(escapedSourceName, partitionColumn); + + escape_json(escapedSourceName, commentCharsEscaped); StringInfo newQuery = makeStringInfo(); appendStringInfo(newQuery, ATTRIBUTE_STRING_FORMAT, escapedSourceName->data, colocationId); + appendStringInfoString(newQuery, queryString); return newQuery->data; From 5cd3c8ad11c433d79e55b607810023d87473e006 Mon Sep 17 00:00:00 2001 From: Gokhan Gulbiz Date: Wed, 15 Mar 2023 08:36:43 +0300 Subject: [PATCH 16/17] Indent --- src/backend/distributed/utils/attribute.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/backend/distributed/utils/attribute.c b/src/backend/distributed/utils/attribute.c index 32d008848..3231d919d 100644 --- a/src/backend/distributed/utils/attribute.c +++ b/src/backend/distributed/utils/attribute.c @@ -627,7 +627,8 @@ ExtractTopComment(const char *inputString) commentEndCharsIndex += commentStartCharsLength; while (inputString[commentEndCharsIndex] && commentEndCharsIndex < inputStringLen && !(inputString[commentEndCharsIndex] == '*' && - inputString[commentEndCharsIndex + 1] == '/')) + inputString + [commentEndCharsIndex + 1] == '/')) { commentEndCharsIndex++; } From 57eaeb2ecbfa7f7f0fac8fa42fb1d541e80f57d4 Mon Sep 17 00:00:00 2001 From: Gokhan Gulbiz Date: Wed, 15 Mar 2023 09:01:41 +0300 Subject: [PATCH 17/17] Indent --- src/backend/distributed/utils/attribute.c | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/backend/distributed/utils/attribute.c b/src/backend/distributed/utils/attribute.c index 3231d919d..30d070c0a 100644 --- a/src/backend/distributed/utils/attribute.c +++ b/src/backend/distributed/utils/attribute.c @@ -620,15 +620,15 @@ ExtractTopComment(const char *inputString) int commentEndCharsIndex = 0; /* If query starts with a comment */ - if (inputString[commentEndCharsIndex] == '/' && inputString[commentEndCharsIndex + - 1] == '*') + if (inputString[commentEndCharsIndex] == '/' && + inputString[commentEndCharsIndex + 1] == '*') { /* Skip the comment start characters */ commentEndCharsIndex += commentStartCharsLength; - while (inputString[commentEndCharsIndex] && commentEndCharsIndex < - inputStringLen && !(inputString[commentEndCharsIndex] == '*' && - inputString - [commentEndCharsIndex + 1] == '/')) + while (inputString[commentEndCharsIndex] && + commentEndCharsIndex < inputStringLen && + !(inputString[commentEndCharsIndex] == '*' && + inputString [commentEndCharsIndex + 1] == '/')) { commentEndCharsIndex++; }