From 6b46403675bed7a6a7fb2cf153f5f46811ea9ff8 Mon Sep 17 00:00:00 2001 From: Gokhan Gulbiz Date: Thu, 9 Mar 2023 11:20:55 +0300 Subject: [PATCH] Introduce JSON based annotation parsing --- src/backend/distributed/utils/attribute.c | 121 ++++++++++++---------- 1 file changed, 67 insertions(+), 54 deletions(-) diff --git a/src/backend/distributed/utils/attribute.c b/src/backend/distributed/utils/attribute.c index 8eadcb3c8..65ca02d78 100644 --- a/src/backend/distributed/utils/attribute.c +++ b/src/backend/distributed/utils/attribute.c @@ -13,13 +13,14 @@ #include "distributed/log_utils.h" #include "distributed/listutils.h" +#include "distributed/jsonbutils.h" #include "distributed/tuplestore.h" #include "executor/execdesc.h" #include "storage/ipc.h" #include "storage/lwlock.h" #include "storage/shmem.h" #include "utils/builtins.h" - +#include "utils/json.h" #include "distributed/utils/attribute.h" #include @@ -28,8 +29,8 @@ static void AttributeMetricsIfApplicable(void); ExecutorEnd_hook_type prev_ExecutorEnd = NULL; -#define ATTRIBUTE_PREFIX "/* attributeTo: " -#define ATTRIBUTE_STRING_FORMAT "/* attributeTo: %s,%d */" +#define ATTRIBUTE_PREFIX "/*{" +#define ATTRIBUTE_STRING_FORMAT "/*{\"tId\":%s,\"cId\":%d}*/" #define CITUS_STATS_TENANTS_COLUMNS 7 #define ONE_QUERY_SCORE 1000000000 @@ -55,6 +56,7 @@ static void MultiTenantMonitorSMInit(void); static int CreateTenantStats(MultiTenantMonitor *monitor); static int FindTenantStats(MultiTenantMonitor *monitor); static size_t MultiTenantMonitorshmemSize(void); +static char * extractTopComment(const char *inputString); int MultiTenantMonitoringLogLevel = CITUS_LOG_LEVEL_OFF; int CitusStatsTenantsPeriod = (time_t) 60; @@ -96,7 +98,8 @@ citus_stats_tenants(PG_FUNCTION_ARGS) //!!!!!!!LWLockAcquire(&monitor->lock, LW_EXCLUSIVE); monitor->periodStart = monitor->periodStart + - ((monitoringTime - monitor->periodStart) / CitusStatsTenantsPeriod) * + ((monitoringTime - monitor->periodStart) / + CitusStatsTenantsPeriod) * CitusStatsTenantsPeriod; int numberOfRowsToReturn = 0; @@ -144,8 +147,6 @@ citus_stats_tenants(PG_FUNCTION_ARGS) void AttributeQueryIfAnnotated(const char *query_string, CmdType commandType) { -// attributeToTenant = NULL; - attributeCommandType = commandType; if (query_string == NULL) @@ -153,53 +154,27 @@ AttributeQueryIfAnnotated(const char *query_string, CmdType commandType) return; } - if (strncmp(ATTRIBUTE_PREFIX, query_string, strlen(ATTRIBUTE_PREFIX)) == 0) + char *annotation = extractTopComment(query_string); + if (annotation != NULL) { - /* TODO create a function to safely parse the tenant identifier from the query comment */ - /* query is attributed to a tenant */ - char *tenantId = (char *) query_string + strlen(ATTRIBUTE_PREFIX); - char *tenantEnd = tenantId; - while (true && tenantEnd[0] != '\0') - { - if (tenantEnd[0] == ' ' && tenantEnd[1] == '*' && tenantEnd[2] == '/') - { - break; - } + Datum jsonbDatum = DirectFunctionCall1(jsonb_in, PointerGetDatum(annotation)); - tenantEnd++; - } - tenantEnd--; - - colocationGroupId = 0; - while (*tenantEnd != ',') + text *tenantIdTextP = ExtractFieldTextP(jsonbDatum, "tId"); + if (tenantIdTextP != NULL) { - colocationGroupId *= 10; - colocationGroupId += *tenantEnd - '0'; - tenantEnd--; + char *tenantId = text_to_cstring(tenantIdTextP); + strcpy(attributeToTenant, tenantId); } - int t = colocationGroupId; - colocationGroupId = 0; - while (t) - { - colocationGroupId *= 10; - colocationGroupId += t%10; - t/=10; - } - /* hack to get a clean copy of the tenant id string */ - char tenantEndTmp = *tenantEnd; - *tenantEnd = '\0'; - tenantId = pstrdup(tenantId); - *tenantEnd = tenantEndTmp; + colocationGroupId = ExtractFieldInt32(jsonbDatum, "cId", 0); if (MultiTenantMonitoringLogLevel != CITUS_LOG_LEVEL_OFF) { - ereport(NOTICE, (errmsg("attributing query to tenant: %s", - quote_literal_cstr(tenantId)))); + ereport(NOTICE, (errmsg( + "attributing query to tenant: %s, colocationGroupId: %d", + quote_literal_cstr(attributeToTenant), + colocationGroupId))); } - - // attributeToTenant = (char *) malloc(strlen(tenantId)); - strcpy(attributeToTenant, tenantId); } else { @@ -207,7 +182,6 @@ AttributeQueryIfAnnotated(const char *query_string, CmdType commandType) } /*DetachSegment(); */ - attributeToTenantStart = clock(); } @@ -222,9 +196,13 @@ AnnotateQuery(char *queryString, char *partitionColumn, int colocationId) { return queryString; } - StringInfo newQuery = makeStringInfo(); - appendStringInfo(newQuery, ATTRIBUTE_STRING_FORMAT, partitionColumn, colocationId); + StringInfo escapedSourceName = makeStringInfo(); + escape_json(escapedSourceName, partitionColumn); + + StringInfo newQuery = makeStringInfo(); + appendStringInfo(newQuery, ATTRIBUTE_STRING_FORMAT, escapedSourceName->data, + colocationId); appendStringInfoString(newQuery, queryString); return newQuery->data; @@ -261,11 +239,10 @@ AttributeMetricsIfApplicable() if (strcmp(attributeToTenant, "") != 0) { clock_t end = { 0 }; - double cpu_time_used = 0; end = clock(); time_t queryTime = time(0); - cpu_time_used = ((double) (end - attributeToTenantStart)) / CLOCKS_PER_SEC; + double cpu_time_used = ((double) (end - attributeToTenantStart)) / CLOCKS_PER_SEC; if (MultiTenantMonitoringLogLevel != CITUS_LOG_LEVEL_OFF) { @@ -278,7 +255,8 @@ AttributeMetricsIfApplicable() //!!!!!!!LWLockAcquire(&monitor->lock, LW_SHARED); monitor->periodStart = monitor->periodStart + - ((queryTime - monitor->periodStart) / CitusStatsTenantsPeriod) * + ((queryTime - monitor->periodStart) / + CitusStatsTenantsPeriod) * CitusStatsTenantsPeriod; int tenantIndex = FindTenantStats(monitor); @@ -310,7 +288,8 @@ AttributeMetricsIfApplicable() { //!!!!!!!LWLockAcquire(&monitor->tenants[tenantIndex - 1].lock, LW_EXCLUSIVE); - ReduceScoreIfNecessary(monitor, &monitor->tenants[tenantIndex - 1], queryTime); + ReduceScoreIfNecessary(monitor, &monitor->tenants[tenantIndex - 1], + queryTime); TenantStats tempTenant = monitor->tenants[tenantIndex]; monitor->tenants[tenantIndex] = monitor->tenants[tenantIndex - 1]; @@ -352,9 +331,11 @@ AttributeMetricsIfApplicable() if (MultiTenantMonitoringLogLevel != CITUS_LOG_LEVEL_OFF) { - ereport(NOTICE, (errmsg("total select count = %d, total CPU time = %f to tenant: %s", - tenantStats->selectCount, tenantStats->totalSelectTime, - tenantStats->tenantAttribute))); + ereport(NOTICE, (errmsg( + "total select count = %d, total CPU time = %f to tenant: %s", + tenantStats->selectCount, + tenantStats->totalSelectTime, + tenantStats->tenantAttribute))); } } //attributeToTenant = NULL; @@ -588,3 +569,35 @@ MultiTenantMonitorshmemSize(void) return size; } + + +/* + * extractTopComment extracts the top-level multi-line comment from a given input string. + */ +static char * +extractTopComment(const char *inputString) +{ + int i = 0; + + /* If query starts with a comment */ + if (inputString[i] == '/' && inputString[i + 1] == '*') + { + /* Skip the comment start characters */ + i += 2; + while (inputString[i] && (inputString[i] != '*' && inputString[i + 1] != '/')) + { + i++; + } + } + + if (i > 2) + { + char *result = (char *) malloc(sizeof(char) * (i - 1)); + strncpy(result, inputString + 2, i - 2); + return result; + } + else + { + return NULL; + } +}