Introduce JSON based annotation parsing

pull/6763/head
Gokhan Gulbiz 2023-03-09 11:20:55 +03:00
parent d7dee1d001
commit 6b46403675
No known key found for this signature in database
GPG Key ID: 608EF06B6BD1B45B
1 changed files with 67 additions and 54 deletions

View File

@ -13,13 +13,14 @@
#include "distributed/log_utils.h" #include "distributed/log_utils.h"
#include "distributed/listutils.h" #include "distributed/listutils.h"
#include "distributed/jsonbutils.h"
#include "distributed/tuplestore.h" #include "distributed/tuplestore.h"
#include "executor/execdesc.h" #include "executor/execdesc.h"
#include "storage/ipc.h" #include "storage/ipc.h"
#include "storage/lwlock.h" #include "storage/lwlock.h"
#include "storage/shmem.h" #include "storage/shmem.h"
#include "utils/builtins.h" #include "utils/builtins.h"
#include "utils/json.h"
#include "distributed/utils/attribute.h" #include "distributed/utils/attribute.h"
#include <time.h> #include <time.h>
@ -28,8 +29,8 @@ static void AttributeMetricsIfApplicable(void);
ExecutorEnd_hook_type prev_ExecutorEnd = NULL; ExecutorEnd_hook_type prev_ExecutorEnd = NULL;
#define ATTRIBUTE_PREFIX "/* attributeTo: " #define ATTRIBUTE_PREFIX "/*{"
#define ATTRIBUTE_STRING_FORMAT "/* attributeTo: %s,%d */" #define ATTRIBUTE_STRING_FORMAT "/*{\"tId\":%s,\"cId\":%d}*/"
#define CITUS_STATS_TENANTS_COLUMNS 7 #define CITUS_STATS_TENANTS_COLUMNS 7
#define ONE_QUERY_SCORE 1000000000 #define ONE_QUERY_SCORE 1000000000
@ -55,6 +56,7 @@ static void MultiTenantMonitorSMInit(void);
static int CreateTenantStats(MultiTenantMonitor *monitor); static int CreateTenantStats(MultiTenantMonitor *monitor);
static int FindTenantStats(MultiTenantMonitor *monitor); static int FindTenantStats(MultiTenantMonitor *monitor);
static size_t MultiTenantMonitorshmemSize(void); static size_t MultiTenantMonitorshmemSize(void);
static char * extractTopComment(const char *inputString);
int MultiTenantMonitoringLogLevel = CITUS_LOG_LEVEL_OFF; int MultiTenantMonitoringLogLevel = CITUS_LOG_LEVEL_OFF;
int CitusStatsTenantsPeriod = (time_t) 60; int CitusStatsTenantsPeriod = (time_t) 60;
@ -96,7 +98,8 @@ citus_stats_tenants(PG_FUNCTION_ARGS)
//!!!!!!!LWLockAcquire(&monitor->lock, LW_EXCLUSIVE); //!!!!!!!LWLockAcquire(&monitor->lock, LW_EXCLUSIVE);
monitor->periodStart = monitor->periodStart + monitor->periodStart = monitor->periodStart +
((monitoringTime - monitor->periodStart) / CitusStatsTenantsPeriod) * ((monitoringTime - monitor->periodStart) /
CitusStatsTenantsPeriod) *
CitusStatsTenantsPeriod; CitusStatsTenantsPeriod;
int numberOfRowsToReturn = 0; int numberOfRowsToReturn = 0;
@ -144,8 +147,6 @@ citus_stats_tenants(PG_FUNCTION_ARGS)
void void
AttributeQueryIfAnnotated(const char *query_string, CmdType commandType) AttributeQueryIfAnnotated(const char *query_string, CmdType commandType)
{ {
// attributeToTenant = NULL;
attributeCommandType = commandType; attributeCommandType = commandType;
if (query_string == NULL) if (query_string == NULL)
@ -153,53 +154,27 @@ AttributeQueryIfAnnotated(const char *query_string, CmdType commandType)
return; return;
} }
if (strncmp(ATTRIBUTE_PREFIX, query_string, strlen(ATTRIBUTE_PREFIX)) == 0) char *annotation = extractTopComment(query_string);
if (annotation != NULL)
{ {
/* TODO create a function to safely parse the tenant identifier from the query comment */ Datum jsonbDatum = DirectFunctionCall1(jsonb_in, PointerGetDatum(annotation));
/* query is attributed to a tenant */
char *tenantId = (char *) query_string + strlen(ATTRIBUTE_PREFIX);
char *tenantEnd = tenantId;
while (true && tenantEnd[0] != '\0')
{
if (tenantEnd[0] == ' ' && tenantEnd[1] == '*' && tenantEnd[2] == '/')
{
break;
}
tenantEnd++; text *tenantIdTextP = ExtractFieldTextP(jsonbDatum, "tId");
} if (tenantIdTextP != NULL)
tenantEnd--;
colocationGroupId = 0;
while (*tenantEnd != ',')
{ {
colocationGroupId *= 10; char *tenantId = text_to_cstring(tenantIdTextP);
colocationGroupId += *tenantEnd - '0'; strcpy(attributeToTenant, tenantId);
tenantEnd--;
} }
int t = colocationGroupId; colocationGroupId = ExtractFieldInt32(jsonbDatum, "cId", 0);
colocationGroupId = 0;
while (t)
{
colocationGroupId *= 10;
colocationGroupId += t%10;
t/=10;
}
/* hack to get a clean copy of the tenant id string */
char tenantEndTmp = *tenantEnd;
*tenantEnd = '\0';
tenantId = pstrdup(tenantId);
*tenantEnd = tenantEndTmp;
if (MultiTenantMonitoringLogLevel != CITUS_LOG_LEVEL_OFF) if (MultiTenantMonitoringLogLevel != CITUS_LOG_LEVEL_OFF)
{ {
ereport(NOTICE, (errmsg("attributing query to tenant: %s", ereport(NOTICE, (errmsg(
quote_literal_cstr(tenantId)))); "attributing query to tenant: %s, colocationGroupId: %d",
quote_literal_cstr(attributeToTenant),
colocationGroupId)));
} }
// attributeToTenant = (char *) malloc(strlen(tenantId));
strcpy(attributeToTenant, tenantId);
} }
else else
{ {
@ -207,7 +182,6 @@ AttributeQueryIfAnnotated(const char *query_string, CmdType commandType)
} }
/*DetachSegment(); */ /*DetachSegment(); */
attributeToTenantStart = clock(); attributeToTenantStart = clock();
} }
@ -222,9 +196,13 @@ AnnotateQuery(char *queryString, char *partitionColumn, int colocationId)
{ {
return queryString; return queryString;
} }
StringInfo newQuery = makeStringInfo();
appendStringInfo(newQuery, ATTRIBUTE_STRING_FORMAT, partitionColumn, colocationId);
StringInfo escapedSourceName = makeStringInfo();
escape_json(escapedSourceName, partitionColumn);
StringInfo newQuery = makeStringInfo();
appendStringInfo(newQuery, ATTRIBUTE_STRING_FORMAT, escapedSourceName->data,
colocationId);
appendStringInfoString(newQuery, queryString); appendStringInfoString(newQuery, queryString);
return newQuery->data; return newQuery->data;
@ -261,11 +239,10 @@ AttributeMetricsIfApplicable()
if (strcmp(attributeToTenant, "") != 0) if (strcmp(attributeToTenant, "") != 0)
{ {
clock_t end = { 0 }; clock_t end = { 0 };
double cpu_time_used = 0;
end = clock(); end = clock();
time_t queryTime = time(0); time_t queryTime = time(0);
cpu_time_used = ((double) (end - attributeToTenantStart)) / CLOCKS_PER_SEC; double cpu_time_used = ((double) (end - attributeToTenantStart)) / CLOCKS_PER_SEC;
if (MultiTenantMonitoringLogLevel != CITUS_LOG_LEVEL_OFF) if (MultiTenantMonitoringLogLevel != CITUS_LOG_LEVEL_OFF)
{ {
@ -278,7 +255,8 @@ AttributeMetricsIfApplicable()
//!!!!!!!LWLockAcquire(&monitor->lock, LW_SHARED); //!!!!!!!LWLockAcquire(&monitor->lock, LW_SHARED);
monitor->periodStart = monitor->periodStart + monitor->periodStart = monitor->periodStart +
((queryTime - monitor->periodStart) / CitusStatsTenantsPeriod) * ((queryTime - monitor->periodStart) /
CitusStatsTenantsPeriod) *
CitusStatsTenantsPeriod; CitusStatsTenantsPeriod;
int tenantIndex = FindTenantStats(monitor); int tenantIndex = FindTenantStats(monitor);
@ -310,7 +288,8 @@ AttributeMetricsIfApplicable()
{ {
//!!!!!!!LWLockAcquire(&monitor->tenants[tenantIndex - 1].lock, LW_EXCLUSIVE); //!!!!!!!LWLockAcquire(&monitor->tenants[tenantIndex - 1].lock, LW_EXCLUSIVE);
ReduceScoreIfNecessary(monitor, &monitor->tenants[tenantIndex - 1], queryTime); ReduceScoreIfNecessary(monitor, &monitor->tenants[tenantIndex - 1],
queryTime);
TenantStats tempTenant = monitor->tenants[tenantIndex]; TenantStats tempTenant = monitor->tenants[tenantIndex];
monitor->tenants[tenantIndex] = monitor->tenants[tenantIndex - 1]; monitor->tenants[tenantIndex] = monitor->tenants[tenantIndex - 1];
@ -352,9 +331,11 @@ AttributeMetricsIfApplicable()
if (MultiTenantMonitoringLogLevel != CITUS_LOG_LEVEL_OFF) if (MultiTenantMonitoringLogLevel != CITUS_LOG_LEVEL_OFF)
{ {
ereport(NOTICE, (errmsg("total select count = %d, total CPU time = %f to tenant: %s", ereport(NOTICE, (errmsg(
tenantStats->selectCount, tenantStats->totalSelectTime, "total select count = %d, total CPU time = %f to tenant: %s",
tenantStats->tenantAttribute))); tenantStats->selectCount,
tenantStats->totalSelectTime,
tenantStats->tenantAttribute)));
} }
} }
//attributeToTenant = NULL; //attributeToTenant = NULL;
@ -588,3 +569,35 @@ MultiTenantMonitorshmemSize(void)
return size; return size;
} }
/*
* extractTopComment extracts the top-level multi-line comment from a given input string.
*/
static char *
extractTopComment(const char *inputString)
{
int i = 0;
/* If query starts with a comment */
if (inputString[i] == '/' && inputString[i + 1] == '*')
{
/* Skip the comment start characters */
i += 2;
while (inputString[i] && (inputString[i] != '*' && inputString[i + 1] != '/'))
{
i++;
}
}
if (i > 2)
{
char *result = (char *) malloc(sizeof(char) * (i - 1));
strncpy(result, inputString + 2, i - 2);
return result;
}
else
{
return NULL;
}
}