mirror of https://github.com/citusdata/citus.git
Introduce JSON based annotation parsing
parent
9d2d97fe67
commit
024526ab2f
|
@ -15,6 +15,8 @@
|
|||
#include "distributed/log_utils.h"
|
||||
#include "distributed/listutils.h"
|
||||
#include "distributed/metadata_cache.h"
|
||||
#include "distributed/jsonbutils.h"
|
||||
#include "distributed/colocation_utils.h"
|
||||
#include "distributed/tuplestore.h"
|
||||
#include "executor/execdesc.h"
|
||||
#include "storage/ipc.h"
|
||||
|
@ -22,7 +24,7 @@
|
|||
#include "storage/shmem.h"
|
||||
#include <sys/time.h>
|
||||
#include "utils/builtins.h"
|
||||
|
||||
#include "utils/json.h"
|
||||
#include "distributed/utils/attribute.h"
|
||||
|
||||
#include <time.h>
|
||||
|
@ -31,8 +33,8 @@ static void AttributeMetricsIfApplicable(void);
|
|||
|
||||
ExecutorEnd_hook_type prev_ExecutorEnd = NULL;
|
||||
|
||||
#define ATTRIBUTE_PREFIX "/* attributeTo: "
|
||||
#define ATTRIBUTE_STRING_FORMAT "/* attributeTo: %s,%d */"
|
||||
#define ATTRIBUTE_PREFIX "/*{"
|
||||
#define ATTRIBUTE_STRING_FORMAT "/*{\"tId\":%s,\"cId\":%d}*/"
|
||||
#define CITUS_STATS_TENANTS_COLUMNS 7
|
||||
#define ONE_QUERY_SCORE 1000000000
|
||||
|
||||
|
@ -62,6 +64,7 @@ static void MultiTenantMonitorSMInit(void);
|
|||
static int CreateTenantStats(MultiTenantMonitor *monitor, time_t queryTime);
|
||||
static int FindTenantStats(MultiTenantMonitor *monitor);
|
||||
static size_t MultiTenantMonitorshmemSize(void);
|
||||
static char * extractTopComment(const char *inputString);
|
||||
|
||||
int MultiTenantMonitoringLogLevel = CITUS_LOG_LEVEL_OFF;
|
||||
int CitusStatsTenantsPeriod = (time_t) 60;
|
||||
|
@ -198,54 +201,27 @@ AttributeQueryIfAnnotated(const char *query_string, CmdType commandType)
|
|||
return;
|
||||
}
|
||||
|
||||
if (strncmp(ATTRIBUTE_PREFIX, query_string, strlen(ATTRIBUTE_PREFIX)) == 0)
|
||||
char *annotation = extractTopComment(query_string);
|
||||
if (annotation != NULL)
|
||||
{
|
||||
/* TODO create a function to safely parse the tenant identifier from the query comment */
|
||||
/* query is attributed to a tenant */
|
||||
char *tenantId = (char *) query_string + strlen(ATTRIBUTE_PREFIX);
|
||||
char *tenantEnd = tenantId;
|
||||
while (true && tenantEnd[0] != '\0')
|
||||
Datum jsonbDatum = DirectFunctionCall1(jsonb_in, PointerGetDatum(annotation));
|
||||
|
||||
text *tenantIdTextP = ExtractFieldTextP(jsonbDatum, "tId");
|
||||
if (tenantIdTextP != NULL)
|
||||
{
|
||||
if (tenantEnd[0] == ' ' && tenantEnd[1] == '*' && tenantEnd[2] == '/')
|
||||
{
|
||||
break;
|
||||
char *tenantId = text_to_cstring(tenantIdTextP);
|
||||
strcpy_s(attributeToTenant, sizeof(attributeToTenant), tenantId);
|
||||
}
|
||||
|
||||
tenantEnd++;
|
||||
}
|
||||
tenantEnd--;
|
||||
|
||||
colocationGroupId = 0;
|
||||
while (*tenantEnd != ',')
|
||||
{
|
||||
colocationGroupId *= 10;
|
||||
colocationGroupId += *tenantEnd - '0';
|
||||
tenantEnd--;
|
||||
}
|
||||
|
||||
int t = colocationGroupId;
|
||||
colocationGroupId = 0;
|
||||
while (t)
|
||||
{
|
||||
colocationGroupId *= 10;
|
||||
colocationGroupId += t % 10;
|
||||
t /= 10;
|
||||
}
|
||||
|
||||
/* hack to get a clean copy of the tenant id string */
|
||||
char tenantEndTmp = *tenantEnd;
|
||||
*tenantEnd = '\0';
|
||||
tenantId = pstrdup(tenantId);
|
||||
*tenantEnd = tenantEndTmp;
|
||||
colocationGroupId = ExtractFieldInt32(jsonbDatum, "cId", INVALID_COLOCATION_ID);
|
||||
|
||||
if (MultiTenantMonitoringLogLevel != CITUS_LOG_LEVEL_OFF)
|
||||
{
|
||||
ereport(NOTICE, (errmsg("attributing query to tenant: %s",
|
||||
quote_literal_cstr(tenantId))));
|
||||
ereport(NOTICE, (errmsg(
|
||||
"attributing query to tenant: %s, colocationGroupId: %d",
|
||||
quote_literal_cstr(attributeToTenant),
|
||||
colocationGroupId)));
|
||||
}
|
||||
|
||||
strcpy_s(attributeToTenant, sizeof(attributeToTenant), tenantId);
|
||||
attributeToTenantStart = clock();
|
||||
}
|
||||
else
|
||||
{
|
||||
|
@ -253,6 +229,7 @@ AttributeQueryIfAnnotated(const char *query_string, CmdType commandType)
|
|||
}
|
||||
|
||||
/*DetachSegment(); */
|
||||
attributeToTenantStart = clock();
|
||||
}
|
||||
|
||||
|
||||
|
@ -266,9 +243,13 @@ AnnotateQuery(char *queryString, char *partitionColumn, int colocationId)
|
|||
{
|
||||
return queryString;
|
||||
}
|
||||
StringInfo newQuery = makeStringInfo();
|
||||
appendStringInfo(newQuery, ATTRIBUTE_STRING_FORMAT, partitionColumn, colocationId);
|
||||
|
||||
StringInfo escapedSourceName = makeStringInfo();
|
||||
escape_json(escapedSourceName, partitionColumn);
|
||||
|
||||
StringInfo newQuery = makeStringInfo();
|
||||
appendStringInfo(newQuery, ATTRIBUTE_STRING_FORMAT, escapedSourceName->data,
|
||||
colocationId);
|
||||
appendStringInfoString(newQuery, queryString);
|
||||
|
||||
return newQuery->data;
|
||||
|
@ -686,3 +667,35 @@ MultiTenantMonitorshmemSize(void)
|
|||
|
||||
return size;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* extractTopComment extracts the top-level multi-line comment from a given input string.
|
||||
*/
|
||||
static char *
|
||||
extractTopComment(const char *inputString)
|
||||
{
|
||||
int i = 0;
|
||||
|
||||
/* If query starts with a comment */
|
||||
if (inputString[i] == '/' && inputString[i + 1] == '*')
|
||||
{
|
||||
/* Skip the comment start characters */
|
||||
i += 2;
|
||||
while (inputString[i] && (inputString[i] != '*' && inputString[i + 1] != '/'))
|
||||
{
|
||||
i++;
|
||||
}
|
||||
}
|
||||
|
||||
if (i > 2)
|
||||
{
|
||||
char *result = (char *) malloc(sizeof(char) * (i - 1));
|
||||
strncpy(result, inputString + 2, i - 2);
|
||||
return result;
|
||||
}
|
||||
else
|
||||
{
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue