diff --git a/src/backend/distributed/utils/attribute.c b/src/backend/distributed/utils/attribute.c index 19d290cfd..aa04aebc6 100644 --- a/src/backend/distributed/utils/attribute.c +++ b/src/backend/distributed/utils/attribute.c @@ -15,6 +15,8 @@ #include "distributed/log_utils.h" #include "distributed/listutils.h" #include "distributed/metadata_cache.h" +#include "distributed/jsonbutils.h" +#include "distributed/colocation_utils.h" #include "distributed/tuplestore.h" #include "executor/execdesc.h" #include "storage/ipc.h" @@ -22,7 +24,7 @@ #include "storage/shmem.h" #include #include "utils/builtins.h" - +#include "utils/json.h" #include "distributed/utils/attribute.h" #include @@ -31,8 +33,8 @@ static void AttributeMetricsIfApplicable(void); ExecutorEnd_hook_type prev_ExecutorEnd = NULL; -#define ATTRIBUTE_PREFIX "/* attributeTo: " -#define ATTRIBUTE_STRING_FORMAT "/* attributeTo: %s,%d */" +#define ATTRIBUTE_PREFIX "/*{" +#define ATTRIBUTE_STRING_FORMAT "/*{\"tId\":%s,\"cId\":%d}*/" #define CITUS_STATS_TENANTS_COLUMNS 7 #define ONE_QUERY_SCORE 1000000000 @@ -62,6 +64,7 @@ static void MultiTenantMonitorSMInit(void); static int CreateTenantStats(MultiTenantMonitor *monitor, time_t queryTime); static int FindTenantStats(MultiTenantMonitor *monitor); static size_t MultiTenantMonitorshmemSize(void); +static char * extractTopComment(const char *inputString); int MultiTenantMonitoringLogLevel = CITUS_LOG_LEVEL_OFF; int CitusStatsTenantsPeriod = (time_t) 60; @@ -198,54 +201,27 @@ AttributeQueryIfAnnotated(const char *query_string, CmdType commandType) return; } - if (strncmp(ATTRIBUTE_PREFIX, query_string, strlen(ATTRIBUTE_PREFIX)) == 0) + char *annotation = extractTopComment(query_string); + if (annotation != NULL) { - /* TODO create a function to safely parse the tenant identifier from the query comment */ - /* query is attributed to a tenant */ - char *tenantId = (char *) query_string + strlen(ATTRIBUTE_PREFIX); - char *tenantEnd = tenantId; - while (true && tenantEnd[0] != '\0') - { - if (tenantEnd[0] == ' ' && tenantEnd[1] == '*' && tenantEnd[2] == '/') - { - break; - } + Datum jsonbDatum = DirectFunctionCall1(jsonb_in, PointerGetDatum(annotation)); - tenantEnd++; - } - tenantEnd--; - - colocationGroupId = 0; - while (*tenantEnd != ',') + text *tenantIdTextP = ExtractFieldTextP(jsonbDatum, "tId"); + if (tenantIdTextP != NULL) { - colocationGroupId *= 10; - colocationGroupId += *tenantEnd - '0'; - tenantEnd--; + char *tenantId = text_to_cstring(tenantIdTextP); + strcpy_s(attributeToTenant, sizeof(attributeToTenant), tenantId); } - int t = colocationGroupId; - colocationGroupId = 0; - while (t) - { - colocationGroupId *= 10; - colocationGroupId += t % 10; - t /= 10; - } - - /* hack to get a clean copy of the tenant id string */ - char tenantEndTmp = *tenantEnd; - *tenantEnd = '\0'; - tenantId = pstrdup(tenantId); - *tenantEnd = tenantEndTmp; + colocationGroupId = ExtractFieldInt32(jsonbDatum, "cId", INVALID_COLOCATION_ID); if (MultiTenantMonitoringLogLevel != CITUS_LOG_LEVEL_OFF) { - ereport(NOTICE, (errmsg("attributing query to tenant: %s", - quote_literal_cstr(tenantId)))); + ereport(NOTICE, (errmsg( + "attributing query to tenant: %s, colocationGroupId: %d", + quote_literal_cstr(attributeToTenant), + colocationGroupId))); } - - strcpy_s(attributeToTenant, sizeof(attributeToTenant), tenantId); - attributeToTenantStart = clock(); } else { @@ -253,6 +229,7 @@ AttributeQueryIfAnnotated(const char *query_string, CmdType commandType) } /*DetachSegment(); */ + attributeToTenantStart = clock(); } @@ -266,9 +243,13 @@ AnnotateQuery(char *queryString, char *partitionColumn, int colocationId) { return queryString; } - StringInfo newQuery = makeStringInfo(); - appendStringInfo(newQuery, ATTRIBUTE_STRING_FORMAT, partitionColumn, colocationId); + StringInfo escapedSourceName = makeStringInfo(); + escape_json(escapedSourceName, partitionColumn); + + StringInfo newQuery = makeStringInfo(); + appendStringInfo(newQuery, ATTRIBUTE_STRING_FORMAT, escapedSourceName->data, + colocationId); appendStringInfoString(newQuery, queryString); return newQuery->data; @@ -686,3 +667,35 @@ MultiTenantMonitorshmemSize(void) return size; } + + +/* + * extractTopComment extracts the top-level multi-line comment from a given input string. + */ +static char * +extractTopComment(const char *inputString) +{ + int i = 0; + + /* If query starts with a comment */ + if (inputString[i] == '/' && inputString[i + 1] == '*') + { + /* Skip the comment start characters */ + i += 2; + while (inputString[i] && (inputString[i] != '*' && inputString[i + 1] != '/')) + { + i++; + } + } + + if (i > 2) + { + char *result = (char *) malloc(sizeof(char) * (i - 1)); + strncpy(result, inputString + 2, i - 2); + return result; + } + else + { + return NULL; + } +}