mirror of https://github.com/citusdata/citus.git
Add multi tenant monitoring
parent
552b7f59b8
commit
4dfb9c6694
|
@ -66,7 +66,6 @@ static DistributedPlan * CopyDistributedPlanWithoutCache(
|
|||
DistributedPlan *originalDistributedPlan);
|
||||
static void CitusEndScan(CustomScanState *node);
|
||||
static void CitusReScan(CustomScanState *node);
|
||||
static void SetJobColocationId(Job *job);
|
||||
static void EnsureForceDelegationDistributionKey(Job *job);
|
||||
static void EnsureAnchorShardsInJobExist(Job *job);
|
||||
static bool AnchorShardsInTaskListExist(List *taskList);
|
||||
|
@ -892,7 +891,7 @@ IsCitusCustomScan(Plan *plan)
|
|||
* colocation group, the Job's colocation ID is set to the group ID, else,
|
||||
* it will be set to INVALID_COLOCATION_ID.
|
||||
*/
|
||||
static void
|
||||
void
|
||||
SetJobColocationId(Job *job)
|
||||
{
|
||||
uint32 jobColocationId = INVALID_COLOCATION_ID;
|
||||
|
|
|
@ -26,6 +26,7 @@
|
|||
#include "distributed/multi_physical_planner.h"
|
||||
#include "distributed/multi_router_planner.h"
|
||||
#include "distributed/shard_utils.h"
|
||||
#include "distributed/utils/attribute.h"
|
||||
#include "distributed/version_compat.h"
|
||||
#include "lib/stringinfo.h"
|
||||
#include "nodes/makefuncs.h"
|
||||
|
@ -141,6 +142,20 @@ RebuildQueryStrings(Job *workerJob)
|
|||
? "(null)"
|
||||
: TaskQueryString(task))));
|
||||
|
||||
Datum partitionColumnValue;
|
||||
Oid partitionColumnType = 0;
|
||||
char *partitionColumnString = NULL;
|
||||
if (workerJob->partitionKeyValue != NULL)
|
||||
{
|
||||
partitionColumnValue = workerJob->partitionKeyValue->constvalue;
|
||||
partitionColumnType = workerJob->partitionKeyValue->consttype;
|
||||
partitionColumnString = DatumToString(partitionColumnValue, partitionColumnType);
|
||||
}
|
||||
|
||||
task->partitionColumn = partitionColumnString;
|
||||
SetJobColocationId(workerJob);
|
||||
task->colocationId = workerJob->colocationId;
|
||||
|
||||
UpdateTaskQueryString(query, task);
|
||||
|
||||
/*
|
||||
|
@ -387,7 +402,7 @@ SetTaskQueryIfShouldLazyDeparse(Task *task, Query *query)
|
|||
return;
|
||||
}
|
||||
|
||||
SetTaskQueryString(task, DeparseTaskQuery(task, query));
|
||||
SetTaskQueryString(task, AnnotateQuery(DeparseTaskQuery(task, query), task->partitionColumn, task->colocationId));
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -146,7 +146,7 @@ distributed_planner(Query *parse,
|
|||
bool fastPathRouterQuery = false;
|
||||
Node *distributionKeyValue = NULL;
|
||||
|
||||
AttributeQueryIfAnnotated(query_string);
|
||||
AttributeQueryIfAnnotated(query_string, parse->commandType);
|
||||
|
||||
List *rangeTableList = ExtractRangeTableEntryList(parse);
|
||||
|
||||
|
|
|
@ -170,7 +170,7 @@ static int CompareInsertValuesByShardId(const void *leftElement,
|
|||
static List * SingleShardTaskList(Query *query, uint64 jobId,
|
||||
List *relationShardList, List *placementList,
|
||||
uint64 shardId, bool parametersInQueryResolved,
|
||||
bool isLocalTableModification);
|
||||
bool isLocalTableModification, char * partitionColumn, int colocationId);
|
||||
static bool RowLocksOnRelations(Node *node, List **rtiLockList);
|
||||
static void ReorderTaskPlacementsByTaskAssignmentPolicy(Job *job,
|
||||
TaskAssignmentPolicyType
|
||||
|
@ -1954,11 +1954,23 @@ GenerateSingleShardRouterTaskList(Job *job, List *relationShardList,
|
|||
|
||||
if (originalQuery->commandType == CMD_SELECT)
|
||||
{
|
||||
Datum partitionColumnValue;
|
||||
Oid partitionColumnType = 0;
|
||||
char *partitionColumnString = NULL;
|
||||
if (job->partitionKeyValue != NULL)
|
||||
{
|
||||
partitionColumnValue = job->partitionKeyValue->constvalue;
|
||||
partitionColumnType = job->partitionKeyValue->consttype;
|
||||
partitionColumnString = DatumToString(partitionColumnValue, partitionColumnType);
|
||||
}
|
||||
|
||||
SetJobColocationId(job);
|
||||
|
||||
job->taskList = SingleShardTaskList(originalQuery, job->jobId,
|
||||
relationShardList, placementList,
|
||||
shardId,
|
||||
job->parametersInJobQueryResolved,
|
||||
isLocalTableModification);
|
||||
isLocalTableModification, partitionColumnString, job->colocationId);
|
||||
|
||||
/*
|
||||
* Queries to reference tables, or distributed tables with multiple replica's have
|
||||
|
@ -1986,7 +1998,7 @@ GenerateSingleShardRouterTaskList(Job *job, List *relationShardList,
|
|||
relationShardList, placementList,
|
||||
shardId,
|
||||
job->parametersInJobQueryResolved,
|
||||
isLocalTableModification);
|
||||
isLocalTableModification, "", -1);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -2080,7 +2092,7 @@ static List *
|
|||
SingleShardTaskList(Query *query, uint64 jobId, List *relationShardList,
|
||||
List *placementList, uint64 shardId,
|
||||
bool parametersInQueryResolved,
|
||||
bool isLocalTableModification)
|
||||
bool isLocalTableModification, char * partitionColumn, int colocationId)
|
||||
{
|
||||
TaskType taskType = READ_TASK;
|
||||
char replicationModel = 0;
|
||||
|
@ -2150,6 +2162,8 @@ SingleShardTaskList(Query *query, uint64 jobId, List *relationShardList,
|
|||
* that the query cannot be executed locally.
|
||||
*/
|
||||
task->taskPlacementList = placementList;
|
||||
task->partitionColumn = partitionColumn;
|
||||
task->colocationId = colocationId;
|
||||
SetTaskQueryIfShouldLazyDeparse(task, query);
|
||||
task->anchorShardId = shardId;
|
||||
task->jobId = jobId;
|
||||
|
|
|
@ -475,6 +475,8 @@ _PG_init(void)
|
|||
/* initialize shard split shared memory handle management */
|
||||
InitializeShardSplitSMHandleManagement();
|
||||
|
||||
InitializeMultiTenantMonitorSMHandleManagement();
|
||||
|
||||
/* enable modification of pg_catalog tables during pg_upgrade */
|
||||
if (IsBinaryUpgrade)
|
||||
{
|
||||
|
@ -1901,6 +1903,16 @@ RegisterCitusConfigVariables(void)
|
|||
PGC_USERSET,
|
||||
GUC_STANDARD,
|
||||
NULL, NULL, NULL);
|
||||
|
||||
DefineCustomEnumVariable(
|
||||
"citus.multi_tenant_monitoring_log_level",
|
||||
gettext_noop("Sets the level of multi tenant monitoring log messages"),
|
||||
NULL,
|
||||
&MultiTenantMonitoringLogLevel,
|
||||
CITUS_LOG_LEVEL_OFF, log_level_options,
|
||||
PGC_USERSET,
|
||||
GUC_STANDARD,
|
||||
NULL, NULL, NULL);
|
||||
|
||||
DefineCustomIntVariable(
|
||||
"citus.next_cleanup_record_id",
|
||||
|
@ -2286,6 +2298,26 @@ RegisterCitusConfigVariables(void)
|
|||
GUC_STANDARD,
|
||||
NULL, NULL, NULL);
|
||||
|
||||
DefineCustomIntVariable(
|
||||
"citus.stats_tenants_limit",
|
||||
gettext_noop("monitor limit"),
|
||||
NULL,
|
||||
&CitusStatsTenantsLimit,
|
||||
10, 1, 100,
|
||||
PGC_USERSET,
|
||||
GUC_STANDARD,
|
||||
NULL, NULL, NULL);
|
||||
|
||||
DefineCustomIntVariable(
|
||||
"citus.stats_tenants_period",
|
||||
gettext_noop("monitor period"),
|
||||
NULL,
|
||||
&CitusStatsTenantsPeriod,
|
||||
60, 1, 60 * 60,
|
||||
PGC_USERSET,
|
||||
GUC_STANDARD,
|
||||
NULL, NULL, NULL);
|
||||
|
||||
DefineCustomBoolVariable(
|
||||
"citus.subquery_pushdown",
|
||||
gettext_noop("Usage of this GUC is highly discouraged, please read the long "
|
||||
|
|
|
@ -2,3 +2,5 @@
|
|||
|
||||
-- bump version to 11.3-1
|
||||
|
||||
#include "udfs/citus_stats_tenants/11.3-1.sql"
|
||||
#include "udfs/citus_stats_tenants_storage/11.3-1.sql"
|
||||
|
|
|
@ -0,0 +1,27 @@
|
|||
CREATE OR REPLACE FUNCTION pg_catalog.citus_stats_tenants(
|
||||
return_all_tenants BOOLEAN DEFAULT FALSE,
|
||||
OUT colocation_id INT,
|
||||
OUT tenant_attribute TEXT,
|
||||
OUT read_count_in_this_period INT,
|
||||
OUT read_count_in_last_period INT,
|
||||
OUT query_count_in_this_period INT,
|
||||
OUT query_count_in_last_period INT,
|
||||
OUT score BIGINT)
|
||||
RETURNS SETOF RECORD
|
||||
LANGUAGE C
|
||||
AS 'citus', $$citus_stats_tenants$$;
|
||||
|
||||
|
||||
CREATE OR REPLACE VIEW citus.citus_stats_tenants AS
|
||||
SELECT
|
||||
colocation_id,
|
||||
tenant_attribute,
|
||||
read_count_in_this_period,
|
||||
read_count_in_last_period,
|
||||
query_count_in_this_period,
|
||||
query_count_in_last_period
|
||||
FROM pg_catalog.citus_stats_tenants()
|
||||
ORDER BY score DESC;
|
||||
|
||||
ALTER VIEW citus.citus_stats_tenants SET SCHEMA pg_catalog;
|
||||
GRANT SELECT ON pg_catalog.citus_stats_tenants TO PUBLIC;
|
|
@ -0,0 +1,27 @@
|
|||
CREATE OR REPLACE FUNCTION pg_catalog.citus_stats_tenants(
|
||||
return_all_tenants BOOLEAN DEFAULT FALSE,
|
||||
OUT colocation_id INT,
|
||||
OUT tenant_attribute TEXT,
|
||||
OUT read_count_in_this_period INT,
|
||||
OUT read_count_in_last_period INT,
|
||||
OUT query_count_in_this_period INT,
|
||||
OUT query_count_in_last_period INT,
|
||||
OUT score BIGINT)
|
||||
RETURNS SETOF RECORD
|
||||
LANGUAGE C
|
||||
AS 'citus', $$citus_stats_tenants$$;
|
||||
|
||||
|
||||
CREATE OR REPLACE VIEW citus.citus_stats_tenants AS
|
||||
SELECT
|
||||
colocation_id,
|
||||
tenant_attribute,
|
||||
read_count_in_this_period,
|
||||
read_count_in_last_period,
|
||||
query_count_in_this_period,
|
||||
query_count_in_last_period
|
||||
FROM pg_catalog.citus_stats_tenants()
|
||||
ORDER BY score DESC;
|
||||
|
||||
ALTER VIEW citus.citus_stats_tenants SET SCHEMA pg_catalog;
|
||||
GRANT SELECT ON pg_catalog.citus_stats_tenants TO PUBLIC;
|
|
@ -0,0 +1,28 @@
|
|||
CREATE OR REPLACE FUNCTION pg_catalog.citus_stats_tenants_storage (
|
||||
OUT colocation_id INT,
|
||||
OUT tenant_attribute TEXT,
|
||||
OUT storage_estimate INT
|
||||
)
|
||||
RETURNS SETOF record
|
||||
LANGUAGE plpgsql
|
||||
AS $function$
|
||||
DECLARE
|
||||
tn TEXT;
|
||||
dc TEXT;
|
||||
ci INT;
|
||||
BEGIN
|
||||
FOR ci, tn, dc IN SELECT cts.colocation_id, cts.table_name, cts.distribution_column FROM citus_tables cts
|
||||
LOOP
|
||||
RETURN QUERY
|
||||
EXECUTE 'SELECT ' || ci || '::int, ' || dc || '::text, sum(pg_column_size(' || tn || '.*))::int FROM ' || tn || ' GROUP BY ' || dc;
|
||||
END LOOP;
|
||||
END;
|
||||
$function$;
|
||||
|
||||
CREATE OR REPLACE VIEW citus.citus_stats_tenants_storage AS
|
||||
SELECT colocation_id, tenant_attribute, sum(storage_estimate) total_storage FROM pg_catalog.citus_stats_tenants_storage()
|
||||
GROUP BY colocation_id, tenant_attribute
|
||||
ORDER BY total_storage DESC;
|
||||
|
||||
ALTER VIEW citus.citus_stats_tenants_storage SET SCHEMA pg_catalog;
|
||||
GRANT SELECT ON pg_catalog.citus_stats_tenants_storage TO PUBLIC;
|
|
@ -0,0 +1,28 @@
|
|||
CREATE OR REPLACE FUNCTION pg_catalog.citus_stats_tenants_storage (
|
||||
OUT colocation_id INT,
|
||||
OUT tenant_attribute TEXT,
|
||||
OUT storage_estimate INT
|
||||
)
|
||||
RETURNS SETOF record
|
||||
LANGUAGE plpgsql
|
||||
AS $function$
|
||||
DECLARE
|
||||
tn TEXT;
|
||||
dc TEXT;
|
||||
ci INT;
|
||||
BEGIN
|
||||
FOR ci, tn, dc IN SELECT cts.colocation_id, cts.table_name, cts.distribution_column FROM citus_tables cts
|
||||
LOOP
|
||||
RETURN QUERY
|
||||
EXECUTE 'SELECT ' || ci || '::int, ' || dc || '::text, sum(pg_column_size(' || tn || '.*))::int FROM ' || tn || ' GROUP BY ' || dc;
|
||||
END LOOP;
|
||||
END;
|
||||
$function$;
|
||||
|
||||
CREATE OR REPLACE VIEW citus.citus_stats_tenants_storage AS
|
||||
SELECT colocation_id, tenant_attribute, sum(storage_estimate) total_storage FROM pg_catalog.citus_stats_tenants_storage()
|
||||
GROUP BY colocation_id, tenant_attribute
|
||||
ORDER BY total_storage DESC;
|
||||
|
||||
ALTER VIEW citus.citus_stats_tenants_storage SET SCHEMA pg_catalog;
|
||||
GRANT SELECT ON pg_catalog.citus_stats_tenants_storage TO PUBLIC;
|
|
@ -1,10 +1,22 @@
|
|||
//
|
||||
// Created by Nils Dijk on 02/12/2022.
|
||||
//
|
||||
/*-------------------------------------------------------------------------
|
||||
*
|
||||
* attribute.c
|
||||
* Routines related to the multi tenant monitor.
|
||||
*
|
||||
* Copyright (c) Citus Data, Inc.
|
||||
*
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
|
||||
#include "postgres.h"
|
||||
#include "unistd.h"
|
||||
|
||||
#include "distributed/log_utils.h"
|
||||
#include "distributed/listutils.h"
|
||||
#include "distributed/tuplestore.h"
|
||||
#include "executor/execdesc.h"
|
||||
#include "storage/ipc.h"
|
||||
#include "storage/shmem.h"
|
||||
#include "utils/builtins.h"
|
||||
|
||||
#include "distributed/utils/attribute.h"
|
||||
|
@ -16,14 +28,125 @@ static void AttributeMetricsIfApplicable(void);
|
|||
ExecutorEnd_hook_type prev_ExecutorEnd = NULL;
|
||||
|
||||
#define ATTRIBUTE_PREFIX "/* attributeTo: "
|
||||
#define ATTRIBUTE_STRING_FORMAT "/* attributeTo: %s,%d */"
|
||||
#define CITUS_STATS_TENANTS_COLUMNS 7
|
||||
#define ONE_QUERY_SCORE 1000000000
|
||||
|
||||
/* TODO maybe needs to be a stack */
|
||||
const char *attributeToTenant = NULL;
|
||||
char *attributeToTenant = NULL;
|
||||
CmdType attributeCommandType = CMD_UNKNOWN;
|
||||
int colocationGroupId = -1;
|
||||
clock_t attributeToTenantStart = { 0 };
|
||||
|
||||
void
|
||||
AttributeQueryIfAnnotated(const char *query_string)
|
||||
const char *SharedMemoryNameForMultiTenantMonitorHandleManagement =
|
||||
"Shared memory handle for multi tenant monitor";
|
||||
|
||||
static shmem_startup_hook_type prev_shmem_startup_hook = NULL;
|
||||
|
||||
static void UpdatePeriodsIfNecessary(MultiTenantMonitor *monitor,TenantStats *tenantStats);
|
||||
static void ReduceScoreIfNecessary(MultiTenantMonitor *monitor, TenantStats *tenantStats, time_t updateTime);
|
||||
static void CreateMultiTenantMonitor(void);
|
||||
static dsm_handle CreateSharedMemoryForMultiTenantMonitor(void);
|
||||
static void StoreMultiTenantMonitorSMHandle(dsm_handle dsmHandle);
|
||||
static MultiTenantMonitor * GetMultiTenantMonitor(void);
|
||||
static dsm_handle GetMultiTenantMonitorDSMHandle(void);
|
||||
static void DetachSegment(void);
|
||||
static void MultiTenantMonitorSMInit(void);
|
||||
static dsm_handle CreateTenantStats(MultiTenantMonitor *monitor);
|
||||
static dsm_handle CreateSharedMemoryForTenantStats(void);
|
||||
static TenantStats * GetTenantStatsFromDSMHandle(dsm_handle dsmHandle);
|
||||
static dsm_handle FindTenantStats(MultiTenantMonitor *monitor);
|
||||
|
||||
int MultiTenantMonitoringLogLevel = CITUS_LOG_LEVEL_OFF;
|
||||
int CitusStatsTenantsPeriod = (time_t) 60;
|
||||
int CitusStatsTenantsLimit = 10;
|
||||
|
||||
|
||||
PG_FUNCTION_INFO_V1(citus_stats_tenants);
|
||||
|
||||
|
||||
/*
|
||||
* citus_stats_tenants finds, updates and returns the statistics for tenants.
|
||||
*/
|
||||
Datum
|
||||
citus_stats_tenants(PG_FUNCTION_ARGS)
|
||||
{
|
||||
//CheckCitusVersion(ERROR);
|
||||
|
||||
/*
|
||||
* We keep more than CitusStatsTenantsLimit tenants in our monitor.
|
||||
* We do this to not lose data if a tenant falls out of top CitusStatsTenantsLimit in case they need to return soon.
|
||||
* Normally we return CitusStatsTenantsLimit tenants but if returnAllTenants is true we return all of them.
|
||||
*/
|
||||
bool returnAllTenants = PG_GETARG_BOOL(0);
|
||||
|
||||
TupleDesc tupleDescriptor = NULL;
|
||||
Tuplestorestate *tupleStore = SetupTuplestore(fcinfo, &tupleDescriptor);
|
||||
time_t monitoringTime = time(0);
|
||||
|
||||
Datum values[CITUS_STATS_TENANTS_COLUMNS];
|
||||
bool isNulls[CITUS_STATS_TENANTS_COLUMNS];
|
||||
|
||||
MultiTenantMonitor *monitor = GetMultiTenantMonitor();
|
||||
|
||||
if (monitor == NULL)
|
||||
{
|
||||
PG_RETURN_VOID();
|
||||
}
|
||||
|
||||
monitor->periodStart = monitor->periodStart + ((monitoringTime-monitor->periodStart)/CitusStatsTenantsPeriod)*CitusStatsTenantsPeriod;
|
||||
|
||||
int numberOfRowsToReturn = 0;
|
||||
if (returnAllTenants)
|
||||
{
|
||||
numberOfRowsToReturn = monitor->tenantCount;
|
||||
}
|
||||
else
|
||||
{
|
||||
numberOfRowsToReturn = min (monitor->tenantCount, CitusStatsTenantsLimit);
|
||||
}
|
||||
|
||||
for (int i=0; i<numberOfRowsToReturn; i++)
|
||||
{
|
||||
memset(values, 0, sizeof(values));
|
||||
memset(isNulls, false, sizeof(isNulls));
|
||||
|
||||
TenantStats *tenantStats = GetTenantStatsFromDSMHandle(monitor->tenants[i]);
|
||||
|
||||
UpdatePeriodsIfNecessary(monitor, tenantStats);
|
||||
ReduceScoreIfNecessary(monitor, tenantStats, monitoringTime);
|
||||
|
||||
values[0] = Int32GetDatum(tenantStats->colocationGroupId);
|
||||
values[1] = PointerGetDatum(cstring_to_text(tenantStats->tenantAttribute));
|
||||
values[2] = Int32GetDatum(tenantStats->selectsInThisPeriod);
|
||||
values[3] = Int32GetDatum(tenantStats->selectsInLastPeriod);
|
||||
values[4] = Int32GetDatum(tenantStats->selectsInThisPeriod + tenantStats->insertsInThisPeriod);
|
||||
values[5] = Int32GetDatum(tenantStats->selectsInLastPeriod + tenantStats->insertsInLastPeriod);
|
||||
values[6] = Int64GetDatum(monitor->scores[tenantStats->rank]);
|
||||
|
||||
|
||||
tuplestore_putvalues(tupleStore, tupleDescriptor, values, isNulls);
|
||||
}
|
||||
|
||||
PG_RETURN_VOID();
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* AttributeQueryIfAnnotated assigns the attributes of tenant if the query is annotated.
|
||||
*/
|
||||
void
|
||||
AttributeQueryIfAnnotated(const char *query_string, CmdType commandType)
|
||||
{
|
||||
attributeToTenant = NULL;
|
||||
|
||||
attributeCommandType = commandType;
|
||||
|
||||
if (query_string == NULL)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
if (strncmp(ATTRIBUTE_PREFIX, query_string, strlen(ATTRIBUTE_PREFIX)) == 0)
|
||||
{
|
||||
/* TODO create a function to safely parse the tenant identifier from the query comment */
|
||||
|
@ -39,6 +162,15 @@ AttributeQueryIfAnnotated(const char *query_string)
|
|||
|
||||
tenantEnd++;
|
||||
}
|
||||
tenantEnd--;
|
||||
|
||||
colocationGroupId = 0;
|
||||
while(*tenantEnd != ',')
|
||||
{
|
||||
colocationGroupId *= 10;
|
||||
colocationGroupId += *tenantEnd - '0';
|
||||
tenantEnd--;
|
||||
}
|
||||
|
||||
/* hack to get a clean copy of the tenant id string */
|
||||
char tenantEndTmp = *tenantEnd;
|
||||
|
@ -46,19 +178,44 @@ AttributeQueryIfAnnotated(const char *query_string)
|
|||
tenantId = pstrdup(tenantId);
|
||||
*tenantEnd = tenantEndTmp;
|
||||
|
||||
ereport(NOTICE, (errmsg("attributing query to tenant: %s", quote_literal_cstr(tenantId))));
|
||||
if (MultiTenantMonitoringLogLevel != CITUS_LOG_LEVEL_OFF)
|
||||
{
|
||||
ereport(NOTICE, (errmsg("attributing query to tenant: %s", quote_literal_cstr(tenantId))));
|
||||
}
|
||||
|
||||
attributeToTenant = tenantId;
|
||||
attributeToTenant=(char *)malloc(strlen(tenantId));
|
||||
strcpy(attributeToTenant, tenantId);
|
||||
}
|
||||
else
|
||||
{
|
||||
Assert(attributeToTenant == NULL);
|
||||
}
|
||||
|
||||
//DetachSegment();
|
||||
|
||||
attributeToTenantStart = clock();
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* AnnotateQuery annotates the query with tenant attributes.
|
||||
*/
|
||||
char *
|
||||
AnnotateQuery (char * queryString, char * partitionColumn, int colocationId)
|
||||
{
|
||||
if (partitionColumn == NULL)
|
||||
{
|
||||
return queryString;
|
||||
}
|
||||
StringInfo newQuery = makeStringInfo();
|
||||
appendStringInfo(newQuery, ATTRIBUTE_STRING_FORMAT, partitionColumn, colocationId);
|
||||
|
||||
appendStringInfoString(newQuery, queryString);
|
||||
|
||||
return newQuery->data;
|
||||
}
|
||||
|
||||
|
||||
void
|
||||
CitusAttributeToEnd(QueryDesc *queryDesc)
|
||||
{
|
||||
|
@ -80,6 +237,9 @@ CitusAttributeToEnd(QueryDesc *queryDesc)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* AttributeMetricsIfApplicable updates the metrics for current tenant's statistics
|
||||
*/
|
||||
static void
|
||||
AttributeMetricsIfApplicable()
|
||||
{
|
||||
|
@ -89,10 +249,369 @@ AttributeMetricsIfApplicable()
|
|||
double cpu_time_used = 0;
|
||||
|
||||
end = clock();
|
||||
time_t queryTime = time(0);
|
||||
cpu_time_used = ((double) (end - attributeToTenantStart)) / CLOCKS_PER_SEC;
|
||||
|
||||
ereport(NOTICE, (errmsg("attribute cpu counter (%f) to tenant: %s", cpu_time_used,
|
||||
attributeToTenant)));
|
||||
if (MultiTenantMonitoringLogLevel != CITUS_LOG_LEVEL_OFF)
|
||||
{
|
||||
ereport(NOTICE, (errmsg("attribute cpu counter (%f) to tenant: %s", cpu_time_used,
|
||||
attributeToTenant)));
|
||||
}
|
||||
|
||||
if (GetMultiTenantMonitorDSMHandle() == DSM_HANDLE_INVALID)
|
||||
{
|
||||
CreateMultiTenantMonitor();
|
||||
}
|
||||
|
||||
MultiTenantMonitor *monitor = GetMultiTenantMonitor();
|
||||
|
||||
monitor->periodStart = monitor->periodStart + ((queryTime-monitor->periodStart)/CitusStatsTenantsPeriod)*CitusStatsTenantsPeriod;
|
||||
|
||||
dsm_handle tenantDSMHandle = FindTenantStats(monitor);
|
||||
|
||||
if (tenantDSMHandle == DSM_HANDLE_INVALID)
|
||||
{
|
||||
tenantDSMHandle = CreateTenantStats(monitor);
|
||||
}
|
||||
TenantStats * tenantStats = GetTenantStatsFromDSMHandle(tenantDSMHandle);
|
||||
strcpy(tenantStats->tenantAttribute, attributeToTenant);
|
||||
tenantStats->colocationGroupId = colocationGroupId;
|
||||
|
||||
UpdatePeriodsIfNecessary(monitor, tenantStats);
|
||||
tenantStats->lastQueryTime = queryTime;
|
||||
|
||||
ReduceScoreIfNecessary(monitor, tenantStats, queryTime);
|
||||
|
||||
/*
|
||||
* We do this after the reducing the scores so the scores in this period are not affected by the reduction.
|
||||
*/
|
||||
monitor->scores[tenantStats->rank] += ONE_QUERY_SCORE;
|
||||
|
||||
|
||||
/*
|
||||
* After updating the score we might need to change the rank of the tenant in the monitor
|
||||
*/
|
||||
while(tenantStats->rank != 0 && monitor->scores[tenantStats->rank-1] < monitor->scores[tenantStats->rank])
|
||||
{
|
||||
// we need to reduce previous tenants score too !!!!!!!!
|
||||
TenantStats *previousTenantStats = GetTenantStatsFromDSMHandle(monitor->tenants[tenantStats->rank-1]);
|
||||
|
||||
dsm_handle tempTenant = monitor->tenants[tenantStats->rank];
|
||||
monitor->tenants[tenantStats->rank] = monitor->tenants[previousTenantStats->rank];
|
||||
monitor->tenants[previousTenantStats->rank] = tempTenant;
|
||||
|
||||
long long tempScore = monitor->scores[tenantStats->rank];
|
||||
monitor->scores[tenantStats->rank] = monitor->scores[previousTenantStats->rank];
|
||||
monitor->scores[previousTenantStats->rank] = tempScore;
|
||||
|
||||
previousTenantStats->rank++;
|
||||
tenantStats->rank--;
|
||||
}
|
||||
|
||||
/*
|
||||
* We keep up to CitusStatsTenantsLimit * 3 tenants instead of CitusStatsTenantsLimit,
|
||||
* so we don't lose data immediately after a tenant is out of top CitusStatsTenantsLimit
|
||||
*
|
||||
* Every time tenant count hits CitusStatsTenantsLimit * 3, we reduce it back to CitusStatsTenantsLimit * 2.
|
||||
*/
|
||||
if (monitor->tenantCount >= CitusStatsTenantsLimit * 3)
|
||||
{
|
||||
monitor->tenantCount = CitusStatsTenantsLimit * 2;
|
||||
}
|
||||
|
||||
if (attributeCommandType == CMD_SELECT)
|
||||
{
|
||||
tenantStats->selectCount++;
|
||||
tenantStats->selectsInThisPeriod++;
|
||||
tenantStats->totalSelectTime+=cpu_time_used;
|
||||
}
|
||||
else if (attributeCommandType == CMD_INSERT)
|
||||
{
|
||||
tenantStats->insertCount++;
|
||||
tenantStats->insertsInThisPeriod++;
|
||||
tenantStats->totalInsertTime+=cpu_time_used;
|
||||
}
|
||||
|
||||
if (MultiTenantMonitoringLogLevel != CITUS_LOG_LEVEL_OFF)
|
||||
{
|
||||
ereport(NOTICE, (errmsg("total select count = %d, total CPU time = %f to tenant: %s", tenantStats->selectCount, tenantStats->totalSelectTime,
|
||||
tenantStats->tenantAttribute)));
|
||||
}
|
||||
}
|
||||
attributeToTenant = NULL;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* UpdatePeriodsIfNecessary moves the query counts to previous periods if a enough time has passed.
|
||||
*
|
||||
* If 1 period has passed after the latest query, this function moves this period's counts to the last period
|
||||
* and cleans this period's statistics.
|
||||
*
|
||||
* If 2 or more periods has passed after the last query, this function cleans all both this and last period's
|
||||
* statistics.
|
||||
*/
|
||||
static void
|
||||
UpdatePeriodsIfNecessary(MultiTenantMonitor *monitor,TenantStats *tenantStats)
|
||||
{
|
||||
/*
|
||||
* If the last query in this tenant was before the start of current period
|
||||
* but there are some query count for this period we move them to the last period.
|
||||
*/
|
||||
if (tenantStats->lastQueryTime < monitor->periodStart && (tenantStats->insertsInThisPeriod || tenantStats->selectsInThisPeriod))
|
||||
{
|
||||
tenantStats->insertsInLastPeriod = tenantStats->insertsInThisPeriod;
|
||||
tenantStats->insertsInThisPeriod = 0;
|
||||
|
||||
tenantStats->selectsInLastPeriod = tenantStats->selectsInThisPeriod;
|
||||
tenantStats->selectsInThisPeriod = 0;
|
||||
}
|
||||
/*
|
||||
* If the last query is more than two periods ago, we clean the last period counts too.
|
||||
*/
|
||||
if (tenantStats->lastQueryTime < monitor->periodStart - CitusStatsTenantsPeriod)
|
||||
{
|
||||
tenantStats->insertsInLastPeriod = 0;
|
||||
|
||||
tenantStats->selectsInLastPeriod = 0;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* ReduceScoreIfNecessary reduces the tenant score only if it is necessary.
|
||||
*
|
||||
* We halve the tenants' scores after each period. This function checks the number of
|
||||
* periods that passed after the lsat score reduction and reduces the score accordingly.
|
||||
*/
|
||||
static void
|
||||
ReduceScoreIfNecessary(MultiTenantMonitor *monitor, TenantStats *tenantStats, time_t updateTime)
|
||||
{
|
||||
/*
|
||||
* With each query we increase the score of tenant by ONE_QUERY_SCORE.
|
||||
* After one period we halve the scores.
|
||||
*
|
||||
* Here we calculate how many periods passed after the last time we did score reduction
|
||||
* If the latest score reduction was in this period this number should be 0,
|
||||
* if it was in the last period this number should be 1 and so on.
|
||||
*/
|
||||
int periodCountAfterLastScoreReduction = (monitor->periodStart - tenantStats->lastScoreReduction + CitusStatsTenantsPeriod -1) / CitusStatsTenantsPeriod;
|
||||
|
||||
/*
|
||||
* This should not happen but let's make sure
|
||||
*/
|
||||
if (periodCountAfterLastScoreReduction < 0)
|
||||
{
|
||||
periodCountAfterLastScoreReduction = 0;
|
||||
}
|
||||
|
||||
/*
|
||||
* If the last score reduction was not in this period we do score reduction now.
|
||||
*/
|
||||
if (periodCountAfterLastScoreReduction > 0)
|
||||
{
|
||||
monitor->scores[tenantStats->rank] >>= periodCountAfterLastScoreReduction;
|
||||
tenantStats->lastScoreReduction = updateTime;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* CreateMultiTenantMonitor creates the data structure for multi tenant monitor.
|
||||
*/
|
||||
static void
|
||||
CreateMultiTenantMonitor()
|
||||
{
|
||||
dsm_handle dsmHandle = CreateSharedMemoryForMultiTenantMonitor();
|
||||
StoreMultiTenantMonitorSMHandle(dsmHandle);
|
||||
MultiTenantMonitor * monitor = GetMultiTenantMonitor();
|
||||
monitor->tenantCount = 0;
|
||||
monitor->periodStart = time(0);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* CreateSharedMemoryForMultiTenantMonitor creates a dynamic shared memory segment for multi tenant monitor.
|
||||
*/
|
||||
static dsm_handle
|
||||
CreateSharedMemoryForMultiTenantMonitor()
|
||||
{
|
||||
struct dsm_segment *dsmSegment = dsm_create(sizeof(MultiTenantMonitor), DSM_CREATE_NULL_IF_MAXSEGMENTS);
|
||||
dsm_pin_segment(dsmSegment);
|
||||
dsm_pin_mapping(dsmSegment); // don't know why we do both !!!!!!!!!!!!!!!!!
|
||||
return dsm_segment_handle(dsmSegment);
|
||||
}
|
||||
|
||||
/*
|
||||
* StoreMultiTenantMonitorSMHandle stores the dsm (dynamic shared memory) handle for multi tenant monitor
|
||||
* in a non-dynamic shared memory location, so we don't lose it.
|
||||
*/
|
||||
static void
|
||||
StoreMultiTenantMonitorSMHandle(dsm_handle dsmHandle)
|
||||
{
|
||||
bool found = false;
|
||||
MultiTenantMonitorSMData *smData = ShmemInitStruct(SharedMemoryNameForMultiTenantMonitorHandleManagement,
|
||||
sizeof(MultiTenantMonitorSMData),
|
||||
&found);
|
||||
|
||||
smData->dsmHandle = dsmHandle;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* GetMultiTenantMonitor returns the data structure for multi tenant monitor.
|
||||
*/
|
||||
static MultiTenantMonitor *
|
||||
GetMultiTenantMonitor()
|
||||
{
|
||||
dsm_handle dsmHandle = GetMultiTenantMonitorDSMHandle();
|
||||
if (dsmHandle == DSM_HANDLE_INVALID)
|
||||
{
|
||||
return NULL;
|
||||
}
|
||||
dsm_segment *dsmSegment = dsm_find_mapping(dsmHandle);
|
||||
if (dsmSegment == NULL)
|
||||
{
|
||||
dsmSegment = dsm_attach(dsmHandle);
|
||||
}
|
||||
MultiTenantMonitor *monitor = (MultiTenantMonitor *) dsm_segment_address(dsmSegment);
|
||||
dsm_pin_mapping(dsmSegment);
|
||||
return monitor;
|
||||
}
|
||||
|
||||
/*
|
||||
* GetMultiTenantMonitorDSMHandle fetches the dsm (dynamic shared memory) handle for multi tenant monitor.
|
||||
*/
|
||||
static dsm_handle
|
||||
GetMultiTenantMonitorDSMHandle()
|
||||
{
|
||||
bool found = false;
|
||||
MultiTenantMonitorSMData *smData = ShmemInitStruct(SharedMemoryNameForMultiTenantMonitorHandleManagement,
|
||||
sizeof(MultiTenantMonitorSMData),
|
||||
&found);
|
||||
|
||||
if (!found)
|
||||
{
|
||||
elog(WARNING, "dsm handle not found");
|
||||
return DSM_HANDLE_INVALID;
|
||||
}
|
||||
|
||||
dsm_handle dsmHandle = smData->dsmHandle;
|
||||
|
||||
return dsmHandle;
|
||||
}
|
||||
|
||||
|
||||
static void
|
||||
DetachSegment()
|
||||
{
|
||||
dsm_handle dsmHandle = GetMultiTenantMonitorDSMHandle();
|
||||
dsm_segment *dsmSegment = dsm_find_mapping(dsmHandle);
|
||||
if (dsmSegment != NULL)
|
||||
{
|
||||
dsm_detach(dsmSegment);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* InitializeMultiTenantMonitorSMHandleManagement sets up the shared memory startup hook
|
||||
* so that the multi tenant monitor can be initialized and stored in shared memory.
|
||||
*/
|
||||
void
|
||||
InitializeMultiTenantMonitorSMHandleManagement()
|
||||
{
|
||||
prev_shmem_startup_hook = shmem_startup_hook;
|
||||
shmem_startup_hook = MultiTenantMonitorSMInit;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* MultiTenantMonitorSMInit initializes the shared memory for MultiTenantMonitorSMData.
|
||||
*
|
||||
* MultiTenantMonitorSMData only holds the dsm (dynamic shared memory) handle for the actual
|
||||
* multi tenant monitor.
|
||||
*/
|
||||
static void
|
||||
MultiTenantMonitorSMInit()
|
||||
{
|
||||
bool alreadyInitialized = false;
|
||||
MultiTenantMonitorSMData *smData = ShmemInitStruct(SharedMemoryNameForMultiTenantMonitorHandleManagement,
|
||||
sizeof(MultiTenantMonitorSMData),
|
||||
&alreadyInitialized);
|
||||
if (!alreadyInitialized)
|
||||
{
|
||||
smData->dsmHandle = DSM_HANDLE_INVALID;
|
||||
}
|
||||
|
||||
if (prev_shmem_startup_hook != NULL)
|
||||
{
|
||||
prev_shmem_startup_hook();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* CreateTenantStats creates the data structure for a tenant's statistics.
|
||||
*/
|
||||
static dsm_handle
|
||||
CreateTenantStats(MultiTenantMonitor *monitor)
|
||||
{
|
||||
dsm_handle dsmHandle = CreateSharedMemoryForTenantStats();
|
||||
monitor->tenants[monitor->tenantCount] = dsmHandle;
|
||||
TenantStats *tenantStats = GetTenantStatsFromDSMHandle(dsmHandle);
|
||||
tenantStats->rank = monitor->tenantCount;
|
||||
monitor->tenantCount++;
|
||||
return dsmHandle;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* CreateSharedMemoryForTenantStats creates a dynamic shared memory segment for a tenant's statistics.
|
||||
*/
|
||||
static dsm_handle
|
||||
CreateSharedMemoryForTenantStats()
|
||||
{
|
||||
struct dsm_segment *dsmSegment = dsm_create(sizeof(TenantStats), DSM_CREATE_NULL_IF_MAXSEGMENTS);
|
||||
dsm_pin_segment(dsmSegment);
|
||||
dsm_pin_mapping(dsmSegment); // don't know why we do both !!!!!!!!!!!!!!!!!
|
||||
return dsm_segment_handle(dsmSegment);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* GetTenantStatsFromDSMHandle returns the data structure for a tenant's statistics with the dsm (dynamic shared memory) handle.
|
||||
*/
|
||||
static TenantStats *
|
||||
GetTenantStatsFromDSMHandle(dsm_handle dsmHandle)
|
||||
{
|
||||
dsm_segment *dsmSegment = dsm_find_mapping(dsmHandle);
|
||||
if (dsmSegment == NULL)
|
||||
{
|
||||
dsmSegment = dsm_attach(dsmHandle);
|
||||
}
|
||||
TenantStats *stats = (TenantStats *) dsm_segment_address(dsmSegment);
|
||||
dsm_pin_mapping(dsmSegment);
|
||||
|
||||
return stats;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* FindTenantStats finds the dsm (dynamic shared memory) handle for the current tenant's statistics.
|
||||
*/
|
||||
static dsm_handle
|
||||
FindTenantStats(MultiTenantMonitor *monitor)
|
||||
{
|
||||
for(int i=0; i<monitor->tenantCount; i++)
|
||||
{
|
||||
TenantStats * tenantStats = GetTenantStatsFromDSMHandle(monitor->tenants[i]);
|
||||
if (strcmp(tenantStats->tenantAttribute, attributeToTenant) == 0 && tenantStats->colocationGroupId == colocationGroupId)
|
||||
{
|
||||
return monitor->tenants[i];
|
||||
}
|
||||
}
|
||||
|
||||
return DSM_HANDLE_INVALID;
|
||||
}
|
||||
|
||||
|
|
|
@ -46,4 +46,6 @@ extern CustomScan * FetchCitusCustomScanIfExists(Plan *plan);
|
|||
extern bool IsCitusPlan(Plan *plan);
|
||||
extern bool IsCitusCustomScan(Plan *plan);
|
||||
|
||||
extern void SetJobColocationId(Job *job);
|
||||
|
||||
#endif /* CITUS_CUSTOM_SCAN_H */
|
||||
|
|
|
@ -330,6 +330,9 @@ typedef struct Task
|
|||
* Vacuum, create/drop/reindex concurrently cannot be executed in a transaction.
|
||||
*/
|
||||
bool cannotBeExecutedInTransction;
|
||||
|
||||
char * partitionColumn;
|
||||
int colocationId;
|
||||
} Task;
|
||||
|
||||
|
||||
|
|
|
@ -8,9 +8,51 @@
|
|||
#include "executor/execdesc.h"
|
||||
#include "executor/executor.h"
|
||||
|
||||
typedef struct MultiTenantMonitor
|
||||
{
|
||||
int tenantCount;
|
||||
dsm_handle tenants[300];
|
||||
long long scores[300];
|
||||
|
||||
time_t periodStart;
|
||||
} MultiTenantMonitor;
|
||||
|
||||
typedef struct TenantStats
|
||||
{
|
||||
char tenantAttribute[100];
|
||||
|
||||
int colocationGroupId;
|
||||
|
||||
int selectCount;
|
||||
double totalSelectTime;
|
||||
int selectsInLastPeriod;
|
||||
int selectsInThisPeriod;
|
||||
|
||||
int insertCount;
|
||||
double totalInsertTime;
|
||||
int insertsInLastPeriod;
|
||||
int insertsInThisPeriod;
|
||||
|
||||
time_t lastQueryTime;
|
||||
|
||||
time_t lastScoreReduction;
|
||||
int rank;
|
||||
} TenantStats;
|
||||
|
||||
typedef struct MultiTenantMonitorSMData
|
||||
{
|
||||
dsm_handle dsmHandle;
|
||||
} MultiTenantMonitorSMData;
|
||||
|
||||
extern void CitusAttributeToEnd(QueryDesc *queryDesc);
|
||||
extern void AttributeQueryIfAnnotated(const char *queryString);
|
||||
extern void AttributeQueryIfAnnotated(const char *queryString, CmdType commandType);
|
||||
extern char * AnnotateQuery(char *queryString, char * partitionColumn, int colocationId);
|
||||
extern void InitializeMultiTenantMonitorSMHandleManagement(void);
|
||||
|
||||
extern ExecutorEnd_hook_type prev_ExecutorEnd;
|
||||
|
||||
extern int MultiTenantMonitoringLogLevel;
|
||||
extern int CitusStatsTenantsPeriod;
|
||||
extern int CitusStatsTenantsLimit;
|
||||
|
||||
#endif //CITUS_ATTRIBUTE_H
|
||||
|
|
Loading…
Reference in New Issue