mirror of https://github.com/citusdata/citus.git
prototype tenant attributable cpu util
parent
b5b73d78c3
commit
badf5d8020
|
@ -47,6 +47,7 @@
|
|||
#include "distributed/recursive_planning.h"
|
||||
#include "distributed/shardinterval_utils.h"
|
||||
#include "distributed/shard_utils.h"
|
||||
#include "distributed/utils/attribute.h"
|
||||
#include "distributed/version_compat.h"
|
||||
#include "distributed/worker_shard_visibility.h"
|
||||
#include "executor/executor.h"
|
||||
|
@ -131,7 +132,6 @@ static RTEListProperties * GetRTEListProperties(List *rangeTableList);
|
|||
static List * TranslatedVars(PlannerInfo *root, int relationIndex);
|
||||
static void WarnIfListHasForeignDistributedTable(List *rangeTableList);
|
||||
|
||||
|
||||
/* Distributed planner hook */
|
||||
PlannedStmt *
|
||||
distributed_planner(Query *parse,
|
||||
|
@ -143,6 +143,8 @@ distributed_planner(Query *parse,
|
|||
bool fastPathRouterQuery = false;
|
||||
Node *distributionKeyValue = NULL;
|
||||
|
||||
AttributeQueryIfAnnotated(query_string);
|
||||
|
||||
List *rangeTableList = ExtractRangeTableEntryList(parse);
|
||||
|
||||
if (cursorOptions & CURSOR_OPT_FORCE_DISTRIBUTED)
|
||||
|
|
|
@ -90,6 +90,7 @@
|
|||
#include "distributed/resource_lock.h"
|
||||
#include "distributed/transaction_management.h"
|
||||
#include "distributed/transaction_recovery.h"
|
||||
#include "distributed/utils/attribute.h"
|
||||
#include "distributed/utils/directory.h"
|
||||
#include "distributed/worker_log_messages.h"
|
||||
#include "distributed/worker_manager.h"
|
||||
|
@ -433,6 +434,8 @@ _PG_init(void)
|
|||
ExecutorStart_hook = CitusExecutorStart;
|
||||
ExecutorRun_hook = CitusExecutorRun;
|
||||
ExplainOneQuery_hook = CitusExplainOneQuery;
|
||||
prev_ExecutorEnd = ExecutorEnd_hook;
|
||||
ExecutorEnd_hook = CitusAttributeToEnd;
|
||||
|
||||
/* register hook for error messages */
|
||||
emit_log_hook = multi_log_hook;
|
||||
|
|
|
@ -0,0 +1,98 @@
|
|||
//
|
||||
// Created by Nils Dijk on 02/12/2022.
|
||||
//
|
||||
|
||||
#include "postgres.h"
|
||||
|
||||
#include "executor/execdesc.h"
|
||||
#include "utils/builtins.h"
|
||||
|
||||
#include "distributed/utils/attribute.h"
|
||||
|
||||
#include <time.h>
|
||||
|
||||
static void AttributeMetricsIfApplicable(void);
|
||||
|
||||
ExecutorEnd_hook_type prev_ExecutorEnd = NULL;
|
||||
|
||||
#define ATTRIBUTE_PREFIX "/* attributeTo: "
|
||||
|
||||
/* TODO maybe needs to be a stack */
|
||||
const char *attributeToTenant = NULL;
|
||||
clock_t attributeToTenantStart = { 0 };
|
||||
|
||||
void
|
||||
AttributeQueryIfAnnotated(const char *query_string)
|
||||
{
|
||||
if (strncmp(ATTRIBUTE_PREFIX, query_string, strlen(ATTRIBUTE_PREFIX)) == 0)
|
||||
{
|
||||
/* TODO create a function to safely parse the tenant identifier from the query comment */
|
||||
/* query is attributed to a tenant */
|
||||
char *tenantId = (char*)query_string + strlen(ATTRIBUTE_PREFIX);
|
||||
char *tenantEnd = tenantId;
|
||||
while (true && tenantEnd[0] != '\0')
|
||||
{
|
||||
if (tenantEnd[0] == ' ' && tenantEnd[1] == '*' && tenantEnd[2] == '/')
|
||||
{
|
||||
break;
|
||||
}
|
||||
|
||||
tenantEnd++;
|
||||
}
|
||||
|
||||
/* hack to get a clean copy of the tenant id string */
|
||||
char tenantEndTmp = *tenantEnd;
|
||||
*tenantEnd = '\0';
|
||||
tenantId = pstrdup(tenantId);
|
||||
*tenantEnd = tenantEndTmp;
|
||||
|
||||
ereport(NOTICE, (errmsg("attributing query to tenant: %s", quote_literal_cstr(tenantId))));
|
||||
|
||||
attributeToTenant = tenantId;
|
||||
}
|
||||
else
|
||||
{
|
||||
Assert(attributeToTenant == NULL);
|
||||
}
|
||||
|
||||
attributeToTenantStart = clock();
|
||||
}
|
||||
|
||||
|
||||
void
|
||||
CitusAttributeToEnd(QueryDesc *queryDesc)
|
||||
{
|
||||
/*
|
||||
* At the end of the Executor is the last moment we have to attribute the previous
|
||||
* attribution to a tenant, if applicable
|
||||
*/
|
||||
AttributeMetricsIfApplicable();
|
||||
|
||||
/* now call in to the previously installed hook, or the standard implementation */
|
||||
if (prev_ExecutorEnd)
|
||||
{
|
||||
prev_ExecutorEnd(queryDesc);
|
||||
}
|
||||
else
|
||||
{
|
||||
standard_ExecutorEnd(queryDesc);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
static void
|
||||
AttributeMetricsIfApplicable()
|
||||
{
|
||||
if (attributeToTenant)
|
||||
{
|
||||
clock_t end = { 0 };
|
||||
double cpu_time_used = 0;
|
||||
|
||||
end = clock();
|
||||
cpu_time_used = ((double) (end - attributeToTenantStart)) / CLOCKS_PER_SEC;
|
||||
|
||||
ereport(NOTICE, (errmsg("attribute cpu counter (%f) to tenant: %s", cpu_time_used,
|
||||
attributeToTenant)));
|
||||
}
|
||||
attributeToTenant = NULL;
|
||||
}
|
|
@ -0,0 +1,16 @@
|
|||
//
|
||||
// Created by Nils Dijk on 02/12/2022.
|
||||
//
|
||||
|
||||
#ifndef CITUS_ATTRIBUTE_H
|
||||
#define CITUS_ATTRIBUTE_H
|
||||
|
||||
#include "executor/execdesc.h"
|
||||
#include "executor/executor.h"
|
||||
|
||||
extern void CitusAttributeToEnd(QueryDesc *queryDesc);
|
||||
extern void AttributeQueryIfAnnotated(const char *queryString);
|
||||
|
||||
extern ExecutorEnd_hook_type prev_ExecutorEnd;
|
||||
|
||||
#endif //CITUS_ATTRIBUTE_H
|
Loading…
Reference in New Issue