mirror of https://github.com/citusdata/citus.git
dnm
parent
efd41e8ea5
commit
27085c9e6c
|
@ -689,7 +689,7 @@ LocallyExecuteTaskPlan(PlannedStmt *taskPlan, char *queryString,
|
||||||
task->partitionKeyValue->consttype);
|
task->partitionKeyValue->consttype);
|
||||||
}
|
}
|
||||||
|
|
||||||
AttributeTask(partitionKeyValueString, task->colocationId, taskPlan->commandType);
|
AttributeTask(partitionKeyValueString, task->colocationId, taskPlan->commandType, true);
|
||||||
|
|
||||||
PG_TRY();
|
PG_TRY();
|
||||||
{
|
{
|
||||||
|
|
|
@ -306,10 +306,14 @@ distributed_planner(Query *parse,
|
||||||
errhint("Consider using PL/pgSQL functions instead.")));
|
errhint("Consider using PL/pgSQL functions instead.")));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// elog(WARNING, "slm1 %s", query_string);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* We annotate the query for tenant statisisics.
|
* We annotate the query for tenant statisisics.
|
||||||
*/
|
*/
|
||||||
AttributeQueryIfAnnotated(query_string, parse->commandType);
|
AttributeQueryIfAnnotated(query_string, parse->commandType, true);
|
||||||
|
|
||||||
|
// elog(WARNING, "slm2 %s", query_string);
|
||||||
|
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
|
@ -31,9 +31,12 @@
|
||||||
#include "utils/json.h"
|
#include "utils/json.h"
|
||||||
|
|
||||||
|
|
||||||
|
#include "miscadmin.h"
|
||||||
|
|
||||||
|
|
||||||
#include <time.h>
|
#include <time.h>
|
||||||
|
|
||||||
static void AttributeMetricsIfApplicable(void);
|
static void AttributeMetricsIfApplicable(const char * query, CmdType t);
|
||||||
|
|
||||||
ExecutorEnd_hook_type prev_ExecutorEnd = NULL;
|
ExecutorEnd_hook_type prev_ExecutorEnd = NULL;
|
||||||
|
|
||||||
|
@ -177,12 +180,17 @@ citus_stat_tenants_local_reset(PG_FUNCTION_ARGS)
|
||||||
* for the tenant statistics monitoring this function records the tenant attributes.
|
* for the tenant statistics monitoring this function records the tenant attributes.
|
||||||
*/
|
*/
|
||||||
void
|
void
|
||||||
AttributeQueryIfAnnotated(const char *query_string, CmdType commandType)
|
AttributeQueryIfAnnotated(const char *query_string, CmdType commandType, bool istrue)
|
||||||
{
|
{
|
||||||
if (StatTenantsTrack == STAT_TENANTS_TRACK_NONE)
|
if (StatTenantsTrack == STAT_TENANTS_TRACK_NONE)
|
||||||
{
|
{
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
//elog(WARNING, "ExecutorLevel: %d, PlannerLevel: %d, queryString %s", ExecutorLevel, PlannerLevel, query_string);
|
||||||
|
|
||||||
|
|
||||||
|
// print current backend pid
|
||||||
|
// check if coordinator
|
||||||
|
|
||||||
strcpy_s(AttributeToTenant, sizeof(AttributeToTenant), "");
|
strcpy_s(AttributeToTenant, sizeof(AttributeToTenant), "");
|
||||||
|
|
||||||
|
@ -208,7 +216,7 @@ AttributeQueryIfAnnotated(const char *query_string, CmdType commandType)
|
||||||
int colocationId = ExtractFieldInt32(jsonbDatum, "cId",
|
int colocationId = ExtractFieldInt32(jsonbDatum, "cId",
|
||||||
INVALID_COLOCATION_ID);
|
INVALID_COLOCATION_ID);
|
||||||
|
|
||||||
AttributeTask(tenantId, colocationId, commandType);
|
AttributeTask(tenantId, colocationId, commandType, istrue);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -218,7 +226,7 @@ AttributeQueryIfAnnotated(const char *query_string, CmdType commandType)
|
||||||
* AttributeTask assigns the given attributes of a tenant and starts a timer
|
* AttributeTask assigns the given attributes of a tenant and starts a timer
|
||||||
*/
|
*/
|
||||||
void
|
void
|
||||||
AttributeTask(char *tenantId, int colocationId, CmdType commandType)
|
AttributeTask(char *tenantId, int colocationId, CmdType commandType, bool istrue)
|
||||||
{
|
{
|
||||||
if (StatTenantsTrack == STAT_TENANTS_TRACK_NONE ||
|
if (StatTenantsTrack == STAT_TENANTS_TRACK_NONE ||
|
||||||
tenantId == NULL || colocationId == INVALID_COLOCATION_ID)
|
tenantId == NULL || colocationId == INVALID_COLOCATION_ID)
|
||||||
|
@ -230,7 +238,10 @@ AttributeTask(char *tenantId, int colocationId, CmdType commandType)
|
||||||
strncpy_s(AttributeToTenant, MAX_TENANT_ATTRIBUTE_LENGTH, tenantId,
|
strncpy_s(AttributeToTenant, MAX_TENANT_ATTRIBUTE_LENGTH, tenantId,
|
||||||
MAX_TENANT_ATTRIBUTE_LENGTH - 1);
|
MAX_TENANT_ATTRIBUTE_LENGTH - 1);
|
||||||
AttributeToCommandType = commandType;
|
AttributeToCommandType = commandType;
|
||||||
QueryStartClock = clock();
|
if (istrue)
|
||||||
|
{
|
||||||
|
QueryStartClock = clock();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -270,11 +281,12 @@ AnnotateQuery(char *queryString, Const *partitionKeyValue, int colocationId)
|
||||||
void
|
void
|
||||||
CitusAttributeToEnd(QueryDesc *queryDesc)
|
CitusAttributeToEnd(QueryDesc *queryDesc)
|
||||||
{
|
{
|
||||||
|
// elog(WARNING, "slm3 %s", queryDesc->sourceText);
|
||||||
/*
|
/*
|
||||||
* At the end of the Executor is the last moment we have to attribute the previous
|
* At the end of the Executor is the last moment we have to attribute the previous
|
||||||
* attribution to a tenant, if applicable
|
* attribution to a tenant, if applicable
|
||||||
*/
|
*/
|
||||||
AttributeMetricsIfApplicable();
|
AttributeMetricsIfApplicable(queryDesc->sourceText, queryDesc->operation);
|
||||||
|
|
||||||
/* now call in to the previously installed hook, or the standard implementation */
|
/* now call in to the previously installed hook, or the standard implementation */
|
||||||
if (prev_ExecutorEnd)
|
if (prev_ExecutorEnd)
|
||||||
|
@ -314,8 +326,9 @@ CompareTenantScore(const void *leftElement, const void *rightElement)
|
||||||
* AttributeMetricsIfApplicable updates the metrics for current tenant's statistics
|
* AttributeMetricsIfApplicable updates the metrics for current tenant's statistics
|
||||||
*/
|
*/
|
||||||
static void
|
static void
|
||||||
AttributeMetricsIfApplicable()
|
AttributeMetricsIfApplicable(const char * query, CmdType t)
|
||||||
{
|
{
|
||||||
|
AttributeQueryIfAnnotated(query, t, false);
|
||||||
if (StatTenantsTrack == STAT_TENANTS_TRACK_NONE ||
|
if (StatTenantsTrack == STAT_TENANTS_TRACK_NONE ||
|
||||||
AttributeToTenant[0] == '\0')
|
AttributeToTenant[0] == '\0')
|
||||||
{
|
{
|
||||||
|
|
|
@ -104,11 +104,11 @@ typedef enum
|
||||||
} StatTenantsTrackType;
|
} StatTenantsTrackType;
|
||||||
|
|
||||||
extern void CitusAttributeToEnd(QueryDesc *queryDesc);
|
extern void CitusAttributeToEnd(QueryDesc *queryDesc);
|
||||||
extern void AttributeQueryIfAnnotated(const char *queryString, CmdType commandType);
|
extern void AttributeQueryIfAnnotated(const char *queryString, CmdType commandType, bool istrue);
|
||||||
extern char * AnnotateQuery(char *queryString, Const *partitionKeyValue,
|
extern char * AnnotateQuery(char *queryString, Const *partitionKeyValue,
|
||||||
int colocationId);
|
int colocationId);
|
||||||
extern void InitializeMultiTenantMonitorSMHandleManagement(void);
|
extern void InitializeMultiTenantMonitorSMHandleManagement(void);
|
||||||
extern void AttributeTask(char *tenantId, int colocationGroupId, CmdType commandType);
|
extern void AttributeTask(char *tenantId, int colocationGroupId, CmdType commandType, bool istrue);
|
||||||
|
|
||||||
extern ExecutorEnd_hook_type prev_ExecutorEnd;
|
extern ExecutorEnd_hook_type prev_ExecutorEnd;
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue