diff --git a/src/backend/distributed/utils/attribute.c b/src/backend/distributed/utils/attribute.c index 5c482bcd9..7b697ae06 100644 --- a/src/backend/distributed/utils/attribute.c +++ b/src/backend/distributed/utils/attribute.c @@ -15,24 +15,28 @@ #include "distributed/log_utils.h" #include "distributed/listutils.h" #include "distributed/metadata_cache.h" +#include "distributed/jsonbutils.h" +#include "distributed/colocation_utils.h" #include "distributed/tuplestore.h" +#include "distributed/colocation_utils.h" #include "executor/execdesc.h" #include "storage/ipc.h" #include "storage/lwlock.h" #include "storage/shmem.h" #include #include "utils/builtins.h" - +#include "utils/json.h" #include "distributed/utils/attribute.h" + #include static void AttributeMetricsIfApplicable(void); ExecutorEnd_hook_type prev_ExecutorEnd = NULL; -#define ATTRIBUTE_PREFIX "/* attributeTo: " -#define ATTRIBUTE_STRING_FORMAT "/* attributeTo: %s,%d */" +#define ATTRIBUTE_PREFIX "/*{\"tId\":" +#define ATTRIBUTE_STRING_FORMAT "/*{\"tId\":%s,\"cId\":%d}*/" #define CITUS_STATS_TENANTS_COLUMNS 7 #define ONE_QUERY_SCORE 1000000000 @@ -61,6 +65,9 @@ static void MultiTenantMonitorSMInit(void); static int CreateTenantStats(MultiTenantMonitor *monitor, time_t queryTime); static int FindTenantStats(MultiTenantMonitor *monitor); static size_t MultiTenantMonitorshmemSize(void); +static char * ExtractTopComment(const char *inputString); +static char * EscapeCommentChars(const char *str); +static char * UnescapeCommentChars(const char *str); int MultiTenantMonitoringLogLevel = CITUS_LOG_LEVEL_OFF; int CitusStatsTenantsPeriod = (time_t) 60; @@ -199,52 +206,27 @@ AttributeQueryIfAnnotated(const char *query_string, CmdType commandType) if (strncmp(ATTRIBUTE_PREFIX, query_string, strlen(ATTRIBUTE_PREFIX)) == 0) { - /* TODO create a function to safely parse the tenant identifier from the query comment */ - /* query is attributed to a tenant */ - char *tenantId = (char *) query_string + strlen(ATTRIBUTE_PREFIX); - char *tenantEnd = tenantId; - while (true && tenantEnd[0] != '\0') + char *annotation = ExtractTopComment(query_string); + if (annotation != NULL) { - if (tenantEnd[0] == ' ' && tenantEnd[1] == '*' && tenantEnd[2] == '/') + Datum jsonbDatum = DirectFunctionCall1(jsonb_in, PointerGetDatum(annotation)); + + text *tenantIdTextP = ExtractFieldTextP(jsonbDatum, "tId"); + if (tenantIdTextP != NULL) { - break; + char *tenantId = UnescapeCommentChars(text_to_cstring(tenantIdTextP)); + strncpy_s(attributeToTenant, MAX_TENANT_ATTRIBUTE_LENGTH, tenantId, + MAX_TENANT_ATTRIBUTE_LENGTH - 1); } - tenantEnd++; + colocationGroupId = ExtractFieldInt32(jsonbDatum, "cId", + INVALID_COLOCATION_ID); } - tenantEnd--; - - colocationGroupId = 0; - while (*tenantEnd != ',') - { - colocationGroupId *= 10; - colocationGroupId += *tenantEnd - '0'; - tenantEnd--; - } - - int t = colocationGroupId; - colocationGroupId = 0; - while (t) - { - colocationGroupId *= 10; - colocationGroupId += t % 10; - t /= 10; - } - - /* hack to get a clean copy of the tenant id string */ - char tenantEndTmp = *tenantEnd; - *tenantEnd = '\0'; - tenantId = pstrdup(tenantId); - *tenantEnd = tenantEndTmp; - - strcpy_s(attributeToTenant, sizeof(attributeToTenant), tenantId); } else { strcpy_s(attributeToTenant, sizeof(attributeToTenant), ""); } - - /*DetachSegment(); */ } @@ -258,8 +240,15 @@ AnnotateQuery(char *queryString, char *partitionColumn, int colocationId) { return queryString; } + + char *commentCharsEscaped = EscapeCommentChars(partitionColumn); + StringInfo escapedSourceName = makeStringInfo(); + + escape_json(escapedSourceName, commentCharsEscaped); + StringInfo newQuery = makeStringInfo(); - appendStringInfo(newQuery, ATTRIBUTE_STRING_FORMAT, partitionColumn, colocationId); + appendStringInfo(newQuery, ATTRIBUTE_STRING_FORMAT, escapedSourceName->data, + colocationId); appendStringInfoString(newQuery, queryString); @@ -668,3 +657,92 @@ MultiTenantMonitorshmemSize(void) return size; } + + +/* + * ExtractTopComment extracts the top-level multi-line comment from a given input string. + */ +static char * +ExtractTopComment(const char *inputString) +{ + int commentCharsLength = 2; + int inputStringLen = strlen(inputString); + if (inputStringLen < commentCharsLength) + { + return NULL; + } + + const char *commentStartChars = "/*"; + const char *commentEndChars = "*/"; + + /* If query doesn't start with a comment, return NULL */ + if (strstr(inputString, commentStartChars) != inputString) + { + return NULL; + } + + StringInfo commentData = makeStringInfo(); + + /* Skip the comment start characters */ + const char *commentStart = inputString + commentCharsLength; + + /* Find the first comment end character */ + const char *commentEnd = strstr(commentStart, commentEndChars); + if (commentEnd == NULL) + { + return NULL; + } + + /* Append the comment to the StringInfo buffer */ + int commentLength = commentEnd - commentStart; + appendStringInfo(commentData, "%.*s", commentLength, commentStart); + + /* Return the extracted comment */ + return commentData->data; +} + + +/* EscapeCommentChars adds a backslash before each occurrence of '*' or '/' in the input string */ +static char * +EscapeCommentChars(const char *str) +{ + int originalStringLength = strlen(str); + StringInfo escapedString = makeStringInfo(); + + for (int originalStringIndex = 0; originalStringIndex < originalStringLength; + originalStringIndex++) + { + if (str[originalStringIndex] == '*' || str[originalStringIndex] == '/') + { + appendStringInfoChar(escapedString, '\\'); + } + + appendStringInfoChar(escapedString, str[originalStringIndex]); + } + + return escapedString->data; +} + + +/* UnescapeCommentChars removes the backslash that precedes '*' or '/' in the input string. */ +static char * +UnescapeCommentChars(const char *str) +{ + int originalStringLength = strlen(str); + StringInfo unescapedString = makeStringInfo(); + + for (int originalStringindex = 0; originalStringindex < originalStringLength; + originalStringindex++) + { + if (str[originalStringindex] == '\\' && + originalStringindex < originalStringLength - 1 && + (str[originalStringindex + 1] == '*' || + str[originalStringindex + 1] == '/')) + { + originalStringindex++; + } + appendStringInfoChar(unescapedString, str[originalStringindex]); + } + + return unescapedString->data; +} diff --git a/src/backend/distributed/utils/jsonbutils.c b/src/backend/distributed/utils/jsonbutils.c index 22fa4f568..4855ee004 100644 --- a/src/backend/distributed/utils/jsonbutils.c +++ b/src/backend/distributed/utils/jsonbutils.c @@ -83,6 +83,25 @@ ExtractFieldBoolean(Datum jsonbDoc, const char *fieldName, bool defaultValue) } +/* + * ExtractFieldInt32 gets value of fieldName from jsonbDoc, or returns + * defaultValue if it doesn't exist. + */ +int32 +ExtractFieldInt32(Datum jsonbDoc, const char *fieldName, int32 defaultValue) +{ + Datum jsonbDatum = 0; + bool found = ExtractFieldJsonb(jsonbDoc, fieldName, &jsonbDatum, false); + if (!found) + { + return defaultValue; + } + + Datum int32Datum = DirectFunctionCall1(jsonb_int4, jsonbDatum); + return DatumGetInt32(int32Datum); +} + + /* * ExtractFieldTextP gets value of fieldName as text* from jsonbDoc, or * returns NULL if it doesn't exist. diff --git a/src/include/distributed/jsonbutils.h b/src/include/distributed/jsonbutils.h index 3e37fa38e..d44044fcb 100644 --- a/src/include/distributed/jsonbutils.h +++ b/src/include/distributed/jsonbutils.h @@ -16,5 +16,6 @@ bool ExtractFieldJsonbDatum(Datum jsonbDoc, const char *fieldName, Datum *result); text * ExtractFieldTextP(Datum jsonbDoc, const char *fieldName); bool ExtractFieldBoolean(Datum jsonbDoc, const char *fieldName, bool defaultValue); +int32 ExtractFieldInt32(Datum jsonbDoc, const char *fieldName, int32 defaultValue); #endif /* CITUS_JSONBUTILS_H */ diff --git a/src/test/regress/bin/normalize.sed b/src/test/regress/bin/normalize.sed index 33a35f286..65692e1c9 100644 --- a/src/test/regress/bin/normalize.sed +++ b/src/test/regress/bin/normalize.sed @@ -307,5 +307,5 @@ s/(NOTICE: issuing SET LOCAL application_name TO 'citus_rebalancer gpid=)[0-9]+ # shard_rebalancer output, flaky improvement number s/improvement of 0.1[0-9]* is lower/improvement of 0.1xxxxx is lower/g - -s/\/\* attributeTo.*\*\///g +# normalize tenants statistics annotations +s/\/\*\{"tId":.*\*\///g diff --git a/src/test/regress/expected/citus_stats_tenants.out b/src/test/regress/expected/citus_stats_tenants.out index e4705eb24..c0a8c896e 100644 --- a/src/test/regress/expected/citus_stats_tenants.out +++ b/src/test/regress/expected/citus_stats_tenants.out @@ -220,8 +220,10 @@ SELECT tenant_attribute, query_count_in_this_period, score FROM citus_stats_tena bcde | 3 | 3000000000 2 | 1 | 1000000000 3 | 1 | 1000000000 + 4 | 1 | 1000000000 + cdef | 1 | 1000000000 defg | 1 | 1000000000 -(5 rows) +(7 rows) -- test period passing SELECT result FROM run_command_on_all_nodes('SELECT clean_citus_stats_tenants()'); @@ -262,6 +264,14 @@ SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, q 5 | 0 | 0 | 0 | 1 (2 rows) +\c - - - :worker_2_port +SELECT tenant_attribute, query_count_in_this_period, score FROM citus_stats_tenants(true) ORDER BY score DESC; + tenant_attribute | query_count_in_this_period | score +--------------------------------------------------------------------- + 1 | 0 | 500000000 + 5 | 0 | 500000000 +(2 rows) + \c - - - :master_port SET search_path TO citus_stats_tenants; -- test logs @@ -304,5 +314,129 @@ CONTEXT: PL/pgSQL function citus_stats_tenants(boolean) line XX at RAISE t (1 row) +-- test special and multibyte characters in tenant attribute +SELECT result FROM run_command_on_all_nodes('SELECT clean_citus_stats_tenants()'); + result +--------------------------------------------------------------------- + + + +(3 rows) + +TRUNCATE TABLE dist_tbl_text; +SELECT count(*)>=0 FROM dist_tbl_text WHERE a = '/bcde'; + ?column? +--------------------------------------------------------------------- + t +(1 row) + +SELECT count(*)>=0 FROM dist_tbl_text WHERE a = '/*bcde'; + ?column? +--------------------------------------------------------------------- + t +(1 row) + +SELECT count(*)>=0 FROM dist_tbl_text WHERE a = '/b*cde'; + ?column? +--------------------------------------------------------------------- + t +(1 row) + +SELECT count(*)>=0 FROM dist_tbl_text WHERE a = '/b*c/de'; + ?column? +--------------------------------------------------------------------- + t +(1 row) + +SELECT count(*)>=0 FROM dist_tbl_text WHERE a = 'b/*//cde'; + ?column? +--------------------------------------------------------------------- + t +(1 row) + +SELECT count(*)>=0 FROM dist_tbl_text WHERE a = '/b/*/cde'; + ?column? +--------------------------------------------------------------------- + t +(1 row) + +SELECT count(*)>=0 FROM dist_tbl_text WHERE a = '/b/**/cde'; + ?column? +--------------------------------------------------------------------- + t +(1 row) + +SELECT count(*)>=0 FROM dist_tbl_text WHERE a = 'bcde*'; + ?column? +--------------------------------------------------------------------- + t +(1 row) + +SELECT count(*)>=0 FROM dist_tbl_text WHERE a = 'bcde*/'; + ?column? +--------------------------------------------------------------------- + t +(1 row) + +SELECT count(*)>=0 FROM dist_tbl_text WHERE a = U&'\0061\0308bc'; + ?column? +--------------------------------------------------------------------- + t +(1 row) + +\c - - - :worker_1_port +SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period FROM citus_stats_tenants ORDER BY tenant_attribute; + tenant_attribute | read_count_in_this_period | read_count_in_last_period | query_count_in_this_period | query_count_in_last_period +--------------------------------------------------------------------- + /*bcde | 1 | 0 | 1 | 0 + /b*c/de | 1 | 0 | 1 | 0 + /b*cde | 1 | 0 | 1 | 0 + /b/**/cde | 1 | 0 | 1 | 0 + /b/*/cde | 1 | 0 | 1 | 0 + /bcde | 1 | 0 | 1 | 0 + äbc | 1 | 0 | 1 | 0 + b/*//cde | 1 | 0 | 1 | 0 + bcde* | 1 | 0 | 1 | 0 + bcde*/ | 1 | 0 | 1 | 0 +(10 rows) + +\c - - - :worker_2_port +SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period FROM citus_stats_tenants ORDER BY tenant_attribute; + tenant_attribute | read_count_in_this_period | read_count_in_last_period | query_count_in_this_period | query_count_in_last_period +--------------------------------------------------------------------- + /*bcde | 1 | 0 | 1 | 0 + /b*c/de | 1 | 0 | 1 | 0 + /b*cde | 1 | 0 | 1 | 0 + /b/**/cde | 1 | 0 | 1 | 0 + /b/*/cde | 1 | 0 | 1 | 0 + /bcde | 1 | 0 | 1 | 0 + äbc | 1 | 0 | 1 | 0 + b/*//cde | 1 | 0 | 1 | 0 + bcde* | 1 | 0 | 1 | 0 + bcde*/ | 1 | 0 | 1 | 0 +(10 rows) + +\c - - - :master_port +SET search_path TO citus_stats_tenants; +SELECT result FROM run_command_on_all_nodes('SELECT clean_citus_stats_tenants()'); + result +--------------------------------------------------------------------- + + + +(3 rows) + +SELECT count(*)>=0 FROM dist_tbl_text WHERE a = 'thisisaveryloooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooongname'; + ?column? +--------------------------------------------------------------------- + t +(1 row) + +SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period FROM citus_stats_tenants ORDER BY tenant_attribute; + tenant_attribute | read_count_in_this_period | read_count_in_last_period | query_count_in_this_period | query_count_in_last_period +--------------------------------------------------------------------- + thisisaverylooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooo | 1 | 0 | 1 | 0 +(1 row) + SET client_min_messages TO ERROR; DROP SCHEMA citus_stats_tenants CASCADE; diff --git a/src/test/regress/pg_regress_multi.pl b/src/test/regress/pg_regress_multi.pl index 92cd0120d..73426cf1f 100755 --- a/src/test/regress/pg_regress_multi.pl +++ b/src/test/regress/pg_regress_multi.pl @@ -489,7 +489,7 @@ push(@pgOptions, "citus.enable_manual_changes_to_shards=on"); push(@pgOptions, "citus.allow_unsafe_locks_from_workers=on"); push(@pgOptions, "citus.stat_statements_track = 'all'"); push(@pgOptions, "citus.enable_change_data_capture=on"); -push(@pgOptions, "citus.stats_tenants_limit = 2"); +push(@pgOptions, "citus.stats_tenants_limit = 10"); # Some tests look at shards in pg_class, make sure we can usually see them: push(@pgOptions, "citus.show_shards_for_app_name_prefixes='pg_regress'"); diff --git a/src/test/regress/sql/citus_stats_tenants.sql b/src/test/regress/sql/citus_stats_tenants.sql index b895a8423..daafea712 100644 --- a/src/test/regress/sql/citus_stats_tenants.sql +++ b/src/test/regress/sql/citus_stats_tenants.sql @@ -93,6 +93,9 @@ SET citus.stats_tenants_period TO 2; SELECT sleep_until_next_period(); SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period FROM citus_stats_tenants_local ORDER BY tenant_attribute; +\c - - - :worker_2_port +SELECT tenant_attribute, query_count_in_this_period, score FROM citus_stats_tenants(true) ORDER BY score DESC; + \c - - - :master_port SET search_path TO citus_stats_tenants; @@ -108,5 +111,31 @@ SELECT count(*)>=0 FROM citus_stats_tenants; SET citus.multi_tenant_monitoring_log_level TO DEBUG; SELECT count(*)>=0 FROM citus_stats_tenants; +-- test special and multibyte characters in tenant attribute +SELECT result FROM run_command_on_all_nodes('SELECT clean_citus_stats_tenants()'); +TRUNCATE TABLE dist_tbl_text; + +SELECT count(*)>=0 FROM dist_tbl_text WHERE a = '/bcde'; +SELECT count(*)>=0 FROM dist_tbl_text WHERE a = '/*bcde'; +SELECT count(*)>=0 FROM dist_tbl_text WHERE a = '/b*cde'; +SELECT count(*)>=0 FROM dist_tbl_text WHERE a = '/b*c/de'; +SELECT count(*)>=0 FROM dist_tbl_text WHERE a = 'b/*//cde'; +SELECT count(*)>=0 FROM dist_tbl_text WHERE a = '/b/*/cde'; +SELECT count(*)>=0 FROM dist_tbl_text WHERE a = '/b/**/cde'; +SELECT count(*)>=0 FROM dist_tbl_text WHERE a = 'bcde*'; +SELECT count(*)>=0 FROM dist_tbl_text WHERE a = 'bcde*/'; +SELECT count(*)>=0 FROM dist_tbl_text WHERE a = U&'\0061\0308bc'; + +\c - - - :worker_1_port +SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period FROM citus_stats_tenants ORDER BY tenant_attribute; +\c - - - :worker_2_port +SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period FROM citus_stats_tenants ORDER BY tenant_attribute; +\c - - - :master_port +SET search_path TO citus_stats_tenants; + +SELECT result FROM run_command_on_all_nodes('SELECT clean_citus_stats_tenants()'); +SELECT count(*)>=0 FROM dist_tbl_text WHERE a = 'thisisaveryloooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooongname'; +SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period FROM citus_stats_tenants ORDER BY tenant_attribute; + SET client_min_messages TO ERROR; DROP SCHEMA citus_stats_tenants CASCADE;