Parse the annotation string correctly (#6796)

This pull request modifies our query annotation and parsing logic by
using a JSON-structured annotation string. Basically, it prepends a JSON
string in a multiline comment that contains `tenantId` and
`colocationId` to a query string to be able to track query statistics on
the worker nodes. It also parses the received annotation in the query
string and sets the relevant tenantId and colocationId on the worker
nodes.

---------

Co-authored-by: Jelte Fennema <github-tech@jeltef.nl>
multi-tenant-monitoring-pgbench
Gokhan Gulbiz 2023-03-29 15:51:05 +03:00 committed by Halil Ozan Akgul
parent e0046d2d79
commit 1b7d075da0
7 changed files with 305 additions and 44 deletions

View File

@ -15,24 +15,28 @@
#include "distributed/log_utils.h" #include "distributed/log_utils.h"
#include "distributed/listutils.h" #include "distributed/listutils.h"
#include "distributed/metadata_cache.h" #include "distributed/metadata_cache.h"
#include "distributed/jsonbutils.h"
#include "distributed/colocation_utils.h"
#include "distributed/tuplestore.h" #include "distributed/tuplestore.h"
#include "distributed/colocation_utils.h"
#include "executor/execdesc.h" #include "executor/execdesc.h"
#include "storage/ipc.h" #include "storage/ipc.h"
#include "storage/lwlock.h" #include "storage/lwlock.h"
#include "storage/shmem.h" #include "storage/shmem.h"
#include <sys/time.h> #include <sys/time.h>
#include "utils/builtins.h" #include "utils/builtins.h"
#include "utils/json.h"
#include "distributed/utils/attribute.h" #include "distributed/utils/attribute.h"
#include <time.h> #include <time.h>
static void AttributeMetricsIfApplicable(void); static void AttributeMetricsIfApplicable(void);
ExecutorEnd_hook_type prev_ExecutorEnd = NULL; ExecutorEnd_hook_type prev_ExecutorEnd = NULL;
#define ATTRIBUTE_PREFIX "/* attributeTo: " #define ATTRIBUTE_PREFIX "/*{\"tId\":"
#define ATTRIBUTE_STRING_FORMAT "/* attributeTo: %s,%d */" #define ATTRIBUTE_STRING_FORMAT "/*{\"tId\":%s,\"cId\":%d}*/"
#define CITUS_STATS_TENANTS_COLUMNS 7 #define CITUS_STATS_TENANTS_COLUMNS 7
#define ONE_QUERY_SCORE 1000000000 #define ONE_QUERY_SCORE 1000000000
@ -61,6 +65,9 @@ static void MultiTenantMonitorSMInit(void);
static int CreateTenantStats(MultiTenantMonitor *monitor, time_t queryTime); static int CreateTenantStats(MultiTenantMonitor *monitor, time_t queryTime);
static int FindTenantStats(MultiTenantMonitor *monitor); static int FindTenantStats(MultiTenantMonitor *monitor);
static size_t MultiTenantMonitorshmemSize(void); static size_t MultiTenantMonitorshmemSize(void);
static char * ExtractTopComment(const char *inputString);
static char * EscapeCommentChars(const char *str);
static char * UnescapeCommentChars(const char *str);
int MultiTenantMonitoringLogLevel = CITUS_LOG_LEVEL_OFF; int MultiTenantMonitoringLogLevel = CITUS_LOG_LEVEL_OFF;
int CitusStatsTenantsPeriod = (time_t) 60; int CitusStatsTenantsPeriod = (time_t) 60;
@ -199,52 +206,27 @@ AttributeQueryIfAnnotated(const char *query_string, CmdType commandType)
if (strncmp(ATTRIBUTE_PREFIX, query_string, strlen(ATTRIBUTE_PREFIX)) == 0) if (strncmp(ATTRIBUTE_PREFIX, query_string, strlen(ATTRIBUTE_PREFIX)) == 0)
{ {
/* TODO create a function to safely parse the tenant identifier from the query comment */ char *annotation = ExtractTopComment(query_string);
/* query is attributed to a tenant */ if (annotation != NULL)
char *tenantId = (char *) query_string + strlen(ATTRIBUTE_PREFIX);
char *tenantEnd = tenantId;
while (true && tenantEnd[0] != '\0')
{ {
if (tenantEnd[0] == ' ' && tenantEnd[1] == '*' && tenantEnd[2] == '/') Datum jsonbDatum = DirectFunctionCall1(jsonb_in, PointerGetDatum(annotation));
text *tenantIdTextP = ExtractFieldTextP(jsonbDatum, "tId");
if (tenantIdTextP != NULL)
{ {
break; char *tenantId = UnescapeCommentChars(text_to_cstring(tenantIdTextP));
strncpy_s(attributeToTenant, MAX_TENANT_ATTRIBUTE_LENGTH, tenantId,
MAX_TENANT_ATTRIBUTE_LENGTH - 1);
} }
tenantEnd++; colocationGroupId = ExtractFieldInt32(jsonbDatum, "cId",
INVALID_COLOCATION_ID);
} }
tenantEnd--;
colocationGroupId = 0;
while (*tenantEnd != ',')
{
colocationGroupId *= 10;
colocationGroupId += *tenantEnd - '0';
tenantEnd--;
}
int t = colocationGroupId;
colocationGroupId = 0;
while (t)
{
colocationGroupId *= 10;
colocationGroupId += t % 10;
t /= 10;
}
/* hack to get a clean copy of the tenant id string */
char tenantEndTmp = *tenantEnd;
*tenantEnd = '\0';
tenantId = pstrdup(tenantId);
*tenantEnd = tenantEndTmp;
strcpy_s(attributeToTenant, sizeof(attributeToTenant), tenantId);
} }
else else
{ {
strcpy_s(attributeToTenant, sizeof(attributeToTenant), ""); strcpy_s(attributeToTenant, sizeof(attributeToTenant), "");
} }
/*DetachSegment(); */
} }
@ -258,8 +240,15 @@ AnnotateQuery(char *queryString, char *partitionColumn, int colocationId)
{ {
return queryString; return queryString;
} }
char *commentCharsEscaped = EscapeCommentChars(partitionColumn);
StringInfo escapedSourceName = makeStringInfo();
escape_json(escapedSourceName, commentCharsEscaped);
StringInfo newQuery = makeStringInfo(); StringInfo newQuery = makeStringInfo();
appendStringInfo(newQuery, ATTRIBUTE_STRING_FORMAT, partitionColumn, colocationId); appendStringInfo(newQuery, ATTRIBUTE_STRING_FORMAT, escapedSourceName->data,
colocationId);
appendStringInfoString(newQuery, queryString); appendStringInfoString(newQuery, queryString);
@ -668,3 +657,92 @@ MultiTenantMonitorshmemSize(void)
return size; return size;
} }
/*
* ExtractTopComment extracts the top-level multi-line comment from a given input string.
*/
static char *
ExtractTopComment(const char *inputString)
{
int commentCharsLength = 2;
int inputStringLen = strlen(inputString);
if (inputStringLen < commentCharsLength)
{
return NULL;
}
const char *commentStartChars = "/*";
const char *commentEndChars = "*/";
/* If query doesn't start with a comment, return NULL */
if (strstr(inputString, commentStartChars) != inputString)
{
return NULL;
}
StringInfo commentData = makeStringInfo();
/* Skip the comment start characters */
const char *commentStart = inputString + commentCharsLength;
/* Find the first comment end character */
const char *commentEnd = strstr(commentStart, commentEndChars);
if (commentEnd == NULL)
{
return NULL;
}
/* Append the comment to the StringInfo buffer */
int commentLength = commentEnd - commentStart;
appendStringInfo(commentData, "%.*s", commentLength, commentStart);
/* Return the extracted comment */
return commentData->data;
}
/* EscapeCommentChars adds a backslash before each occurrence of '*' or '/' in the input string */
static char *
EscapeCommentChars(const char *str)
{
int originalStringLength = strlen(str);
StringInfo escapedString = makeStringInfo();
for (int originalStringIndex = 0; originalStringIndex < originalStringLength;
originalStringIndex++)
{
if (str[originalStringIndex] == '*' || str[originalStringIndex] == '/')
{
appendStringInfoChar(escapedString, '\\');
}
appendStringInfoChar(escapedString, str[originalStringIndex]);
}
return escapedString->data;
}
/* UnescapeCommentChars removes the backslash that precedes '*' or '/' in the input string. */
static char *
UnescapeCommentChars(const char *str)
{
int originalStringLength = strlen(str);
StringInfo unescapedString = makeStringInfo();
for (int originalStringindex = 0; originalStringindex < originalStringLength;
originalStringindex++)
{
if (str[originalStringindex] == '\\' &&
originalStringindex < originalStringLength - 1 &&
(str[originalStringindex + 1] == '*' ||
str[originalStringindex + 1] == '/'))
{
originalStringindex++;
}
appendStringInfoChar(unescapedString, str[originalStringindex]);
}
return unescapedString->data;
}

View File

@ -83,6 +83,25 @@ ExtractFieldBoolean(Datum jsonbDoc, const char *fieldName, bool defaultValue)
} }
/*
* ExtractFieldInt32 gets value of fieldName from jsonbDoc, or returns
* defaultValue if it doesn't exist.
*/
int32
ExtractFieldInt32(Datum jsonbDoc, const char *fieldName, int32 defaultValue)
{
Datum jsonbDatum = 0;
bool found = ExtractFieldJsonb(jsonbDoc, fieldName, &jsonbDatum, false);
if (!found)
{
return defaultValue;
}
Datum int32Datum = DirectFunctionCall1(jsonb_int4, jsonbDatum);
return DatumGetInt32(int32Datum);
}
/* /*
* ExtractFieldTextP gets value of fieldName as text* from jsonbDoc, or * ExtractFieldTextP gets value of fieldName as text* from jsonbDoc, or
* returns NULL if it doesn't exist. * returns NULL if it doesn't exist.

View File

@ -16,5 +16,6 @@
bool ExtractFieldJsonbDatum(Datum jsonbDoc, const char *fieldName, Datum *result); bool ExtractFieldJsonbDatum(Datum jsonbDoc, const char *fieldName, Datum *result);
text * ExtractFieldTextP(Datum jsonbDoc, const char *fieldName); text * ExtractFieldTextP(Datum jsonbDoc, const char *fieldName);
bool ExtractFieldBoolean(Datum jsonbDoc, const char *fieldName, bool defaultValue); bool ExtractFieldBoolean(Datum jsonbDoc, const char *fieldName, bool defaultValue);
int32 ExtractFieldInt32(Datum jsonbDoc, const char *fieldName, int32 defaultValue);
#endif /* CITUS_JSONBUTILS_H */ #endif /* CITUS_JSONBUTILS_H */

View File

@ -307,5 +307,5 @@ s/(NOTICE: issuing SET LOCAL application_name TO 'citus_rebalancer gpid=)[0-9]+
# shard_rebalancer output, flaky improvement number # shard_rebalancer output, flaky improvement number
s/improvement of 0.1[0-9]* is lower/improvement of 0.1xxxxx is lower/g s/improvement of 0.1[0-9]* is lower/improvement of 0.1xxxxx is lower/g
# normalize tenants statistics annotations
s/\/\* attributeTo.*\*\///g s/\/\*\{"tId":.*\*\///g

View File

@ -220,8 +220,10 @@ SELECT tenant_attribute, query_count_in_this_period, score FROM citus_stats_tena
bcde | 3 | 3000000000 bcde | 3 | 3000000000
2 | 1 | 1000000000 2 | 1 | 1000000000
3 | 1 | 1000000000 3 | 1 | 1000000000
4 | 1 | 1000000000
cdef | 1 | 1000000000
defg | 1 | 1000000000 defg | 1 | 1000000000
(5 rows) (7 rows)
-- test period passing -- test period passing
SELECT result FROM run_command_on_all_nodes('SELECT clean_citus_stats_tenants()'); SELECT result FROM run_command_on_all_nodes('SELECT clean_citus_stats_tenants()');
@ -262,6 +264,14 @@ SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, q
5 | 0 | 0 | 0 | 1 5 | 0 | 0 | 0 | 1
(2 rows) (2 rows)
\c - - - :worker_2_port
SELECT tenant_attribute, query_count_in_this_period, score FROM citus_stats_tenants(true) ORDER BY score DESC;
tenant_attribute | query_count_in_this_period | score
---------------------------------------------------------------------
1 | 0 | 500000000
5 | 0 | 500000000
(2 rows)
\c - - - :master_port \c - - - :master_port
SET search_path TO citus_stats_tenants; SET search_path TO citus_stats_tenants;
-- test logs -- test logs
@ -304,5 +314,129 @@ CONTEXT: PL/pgSQL function citus_stats_tenants(boolean) line XX at RAISE
t t
(1 row) (1 row)
-- test special and multibyte characters in tenant attribute
SELECT result FROM run_command_on_all_nodes('SELECT clean_citus_stats_tenants()');
result
---------------------------------------------------------------------
(3 rows)
TRUNCATE TABLE dist_tbl_text;
SELECT count(*)>=0 FROM dist_tbl_text WHERE a = '/bcde';
?column?
---------------------------------------------------------------------
t
(1 row)
SELECT count(*)>=0 FROM dist_tbl_text WHERE a = '/*bcde';
?column?
---------------------------------------------------------------------
t
(1 row)
SELECT count(*)>=0 FROM dist_tbl_text WHERE a = '/b*cde';
?column?
---------------------------------------------------------------------
t
(1 row)
SELECT count(*)>=0 FROM dist_tbl_text WHERE a = '/b*c/de';
?column?
---------------------------------------------------------------------
t
(1 row)
SELECT count(*)>=0 FROM dist_tbl_text WHERE a = 'b/*//cde';
?column?
---------------------------------------------------------------------
t
(1 row)
SELECT count(*)>=0 FROM dist_tbl_text WHERE a = '/b/*/cde';
?column?
---------------------------------------------------------------------
t
(1 row)
SELECT count(*)>=0 FROM dist_tbl_text WHERE a = '/b/**/cde';
?column?
---------------------------------------------------------------------
t
(1 row)
SELECT count(*)>=0 FROM dist_tbl_text WHERE a = 'bcde*';
?column?
---------------------------------------------------------------------
t
(1 row)
SELECT count(*)>=0 FROM dist_tbl_text WHERE a = 'bcde*/';
?column?
---------------------------------------------------------------------
t
(1 row)
SELECT count(*)>=0 FROM dist_tbl_text WHERE a = U&'\0061\0308bc';
?column?
---------------------------------------------------------------------
t
(1 row)
\c - - - :worker_1_port
SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period FROM citus_stats_tenants ORDER BY tenant_attribute;
tenant_attribute | read_count_in_this_period | read_count_in_last_period | query_count_in_this_period | query_count_in_last_period
---------------------------------------------------------------------
/*bcde | 1 | 0 | 1 | 0
/b*c/de | 1 | 0 | 1 | 0
/b*cde | 1 | 0 | 1 | 0
/b/**/cde | 1 | 0 | 1 | 0
/b/*/cde | 1 | 0 | 1 | 0
/bcde | 1 | 0 | 1 | 0
äbc | 1 | 0 | 1 | 0
b/*//cde | 1 | 0 | 1 | 0
bcde* | 1 | 0 | 1 | 0
bcde*/ | 1 | 0 | 1 | 0
(10 rows)
\c - - - :worker_2_port
SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period FROM citus_stats_tenants ORDER BY tenant_attribute;
tenant_attribute | read_count_in_this_period | read_count_in_last_period | query_count_in_this_period | query_count_in_last_period
---------------------------------------------------------------------
/*bcde | 1 | 0 | 1 | 0
/b*c/de | 1 | 0 | 1 | 0
/b*cde | 1 | 0 | 1 | 0
/b/**/cde | 1 | 0 | 1 | 0
/b/*/cde | 1 | 0 | 1 | 0
/bcde | 1 | 0 | 1 | 0
äbc | 1 | 0 | 1 | 0
b/*//cde | 1 | 0 | 1 | 0
bcde* | 1 | 0 | 1 | 0
bcde*/ | 1 | 0 | 1 | 0
(10 rows)
\c - - - :master_port
SET search_path TO citus_stats_tenants;
SELECT result FROM run_command_on_all_nodes('SELECT clean_citus_stats_tenants()');
result
---------------------------------------------------------------------
(3 rows)
SELECT count(*)>=0 FROM dist_tbl_text WHERE a = 'thisisaveryloooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooongname';
?column?
---------------------------------------------------------------------
t
(1 row)
SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period FROM citus_stats_tenants ORDER BY tenant_attribute;
tenant_attribute | read_count_in_this_period | read_count_in_last_period | query_count_in_this_period | query_count_in_last_period
---------------------------------------------------------------------
thisisaverylooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooo | 1 | 0 | 1 | 0
(1 row)
SET client_min_messages TO ERROR; SET client_min_messages TO ERROR;
DROP SCHEMA citus_stats_tenants CASCADE; DROP SCHEMA citus_stats_tenants CASCADE;

View File

@ -489,7 +489,7 @@ push(@pgOptions, "citus.enable_manual_changes_to_shards=on");
push(@pgOptions, "citus.allow_unsafe_locks_from_workers=on"); push(@pgOptions, "citus.allow_unsafe_locks_from_workers=on");
push(@pgOptions, "citus.stat_statements_track = 'all'"); push(@pgOptions, "citus.stat_statements_track = 'all'");
push(@pgOptions, "citus.enable_change_data_capture=on"); push(@pgOptions, "citus.enable_change_data_capture=on");
push(@pgOptions, "citus.stats_tenants_limit = 2"); push(@pgOptions, "citus.stats_tenants_limit = 10");
# Some tests look at shards in pg_class, make sure we can usually see them: # Some tests look at shards in pg_class, make sure we can usually see them:
push(@pgOptions, "citus.show_shards_for_app_name_prefixes='pg_regress'"); push(@pgOptions, "citus.show_shards_for_app_name_prefixes='pg_regress'");

View File

@ -93,6 +93,9 @@ SET citus.stats_tenants_period TO 2;
SELECT sleep_until_next_period(); SELECT sleep_until_next_period();
SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period FROM citus_stats_tenants_local ORDER BY tenant_attribute; SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period FROM citus_stats_tenants_local ORDER BY tenant_attribute;
\c - - - :worker_2_port
SELECT tenant_attribute, query_count_in_this_period, score FROM citus_stats_tenants(true) ORDER BY score DESC;
\c - - - :master_port \c - - - :master_port
SET search_path TO citus_stats_tenants; SET search_path TO citus_stats_tenants;
@ -108,5 +111,31 @@ SELECT count(*)>=0 FROM citus_stats_tenants;
SET citus.multi_tenant_monitoring_log_level TO DEBUG; SET citus.multi_tenant_monitoring_log_level TO DEBUG;
SELECT count(*)>=0 FROM citus_stats_tenants; SELECT count(*)>=0 FROM citus_stats_tenants;
-- test special and multibyte characters in tenant attribute
SELECT result FROM run_command_on_all_nodes('SELECT clean_citus_stats_tenants()');
TRUNCATE TABLE dist_tbl_text;
SELECT count(*)>=0 FROM dist_tbl_text WHERE a = '/bcde';
SELECT count(*)>=0 FROM dist_tbl_text WHERE a = '/*bcde';
SELECT count(*)>=0 FROM dist_tbl_text WHERE a = '/b*cde';
SELECT count(*)>=0 FROM dist_tbl_text WHERE a = '/b*c/de';
SELECT count(*)>=0 FROM dist_tbl_text WHERE a = 'b/*//cde';
SELECT count(*)>=0 FROM dist_tbl_text WHERE a = '/b/*/cde';
SELECT count(*)>=0 FROM dist_tbl_text WHERE a = '/b/**/cde';
SELECT count(*)>=0 FROM dist_tbl_text WHERE a = 'bcde*';
SELECT count(*)>=0 FROM dist_tbl_text WHERE a = 'bcde*/';
SELECT count(*)>=0 FROM dist_tbl_text WHERE a = U&'\0061\0308bc';
\c - - - :worker_1_port
SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period FROM citus_stats_tenants ORDER BY tenant_attribute;
\c - - - :worker_2_port
SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period FROM citus_stats_tenants ORDER BY tenant_attribute;
\c - - - :master_port
SET search_path TO citus_stats_tenants;
SELECT result FROM run_command_on_all_nodes('SELECT clean_citus_stats_tenants()');
SELECT count(*)>=0 FROM dist_tbl_text WHERE a = 'thisisaveryloooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooongname';
SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period FROM citus_stats_tenants ORDER BY tenant_attribute;
SET client_min_messages TO ERROR; SET client_min_messages TO ERROR;
DROP SCHEMA citus_stats_tenants CASCADE; DROP SCHEMA citus_stats_tenants CASCADE;