mirror of https://github.com/citusdata/citus.git
Address reviews on the PR
parent
3ada889498
commit
2b7482ea0d
|
@ -648,7 +648,15 @@ LocallyExecuteTaskPlan(PlannedStmt *taskPlan, char *queryString,
|
|||
LocalExecutorShardId = task->anchorShardId;
|
||||
}
|
||||
|
||||
AttributeTask(task->partitionColumn, task->colocationId, taskPlan->commandType);
|
||||
|
||||
char *partitionKeyValueString = NULL;
|
||||
if (task->partitionKeyValue != NULL)
|
||||
{
|
||||
partitionKeyValueString = DatumToString(task->partitionKeyValue->constvalue,
|
||||
task->partitionKeyValue->consttype);
|
||||
}
|
||||
|
||||
AttributeTask(partitionKeyValueString, task->colocationId, taskPlan->commandType);
|
||||
|
||||
PG_TRY();
|
||||
{
|
||||
|
|
|
@ -142,18 +142,7 @@ RebuildQueryStrings(Job *workerJob)
|
|||
? "(null)"
|
||||
: TaskQueryString(task))));
|
||||
|
||||
Datum partitionColumnValue;
|
||||
Oid partitionColumnType = 0;
|
||||
char *partitionColumnString = NULL;
|
||||
if (workerJob->partitionKeyValue != NULL)
|
||||
{
|
||||
partitionColumnValue = workerJob->partitionKeyValue->constvalue;
|
||||
partitionColumnType = workerJob->partitionKeyValue->consttype;
|
||||
partitionColumnString = DatumToString(partitionColumnValue,
|
||||
partitionColumnType);
|
||||
}
|
||||
|
||||
task->partitionColumn = partitionColumnString;
|
||||
task->partitionKeyValue = workerJob->partitionKeyValue;
|
||||
SetJobColocationId(workerJob);
|
||||
task->colocationId = workerJob->colocationId;
|
||||
|
||||
|
@ -404,7 +393,7 @@ SetTaskQueryIfShouldLazyDeparse(Task *task, Query *query)
|
|||
}
|
||||
|
||||
SetTaskQueryString(task, AnnotateQuery(DeparseTaskQuery(task, query),
|
||||
task->partitionColumn, task->colocationId));
|
||||
task->partitionKeyValue, task->colocationId));
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -157,8 +157,6 @@ distributed_planner(Query *parse,
|
|||
bool fastPathRouterQuery = false;
|
||||
Node *distributionKeyValue = NULL;
|
||||
|
||||
AttributeQueryIfAnnotated(query_string, parse->commandType);
|
||||
|
||||
List *rangeTableList = ExtractRangeTableEntryList(parse);
|
||||
|
||||
if (cursorOptions & CURSOR_OPT_FORCE_DISTRIBUTED)
|
||||
|
@ -308,6 +306,11 @@ distributed_planner(Query *parse,
|
|||
errhint("Consider using PL/pgSQL functions instead.")));
|
||||
}
|
||||
|
||||
/*
|
||||
* We annotate the query for tenant statisisics.
|
||||
*/
|
||||
AttributeQueryIfAnnotated(query_string, parse->commandType);
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
|
|
|
@ -164,7 +164,7 @@ static int CompareInsertValuesByShardId(const void *leftElement,
|
|||
static List * SingleShardTaskList(Query *query, uint64 jobId,
|
||||
List *relationShardList, List *placementList,
|
||||
uint64 shardId, bool parametersInQueryResolved,
|
||||
bool isLocalTableModification, char *partitionColumn,
|
||||
bool isLocalTableModification, Const *partitionKeyValue,
|
||||
int colocationId);
|
||||
static bool RowLocksOnRelations(Node *node, List **rtiLockList);
|
||||
static void ReorderTaskPlacementsByTaskAssignmentPolicy(Job *job,
|
||||
|
@ -1940,17 +1940,6 @@ GenerateSingleShardRouterTaskList(Job *job, List *relationShardList,
|
|||
|
||||
if (originalQuery->commandType == CMD_SELECT)
|
||||
{
|
||||
Datum partitionColumnValue;
|
||||
Oid partitionColumnType = 0;
|
||||
char *partitionColumnString = NULL;
|
||||
if (job->partitionKeyValue != NULL)
|
||||
{
|
||||
partitionColumnValue = job->partitionKeyValue->constvalue;
|
||||
partitionColumnType = job->partitionKeyValue->consttype;
|
||||
partitionColumnString = DatumToString(partitionColumnValue,
|
||||
partitionColumnType);
|
||||
}
|
||||
|
||||
SetJobColocationId(job);
|
||||
|
||||
job->taskList = SingleShardTaskList(originalQuery, job->jobId,
|
||||
|
@ -1958,7 +1947,7 @@ GenerateSingleShardRouterTaskList(Job *job, List *relationShardList,
|
|||
shardId,
|
||||
job->parametersInJobQueryResolved,
|
||||
isLocalTableModification,
|
||||
partitionColumnString, job->colocationId);
|
||||
job->partitionKeyValue, job->colocationId);
|
||||
|
||||
/*
|
||||
* Queries to reference tables, or distributed tables with multiple replica's have
|
||||
|
@ -1982,17 +1971,6 @@ GenerateSingleShardRouterTaskList(Job *job, List *relationShardList,
|
|||
}
|
||||
else
|
||||
{
|
||||
Datum partitionColumnValue;
|
||||
Oid partitionColumnType = 0;
|
||||
char *partitionColumnString = NULL;
|
||||
if (job->partitionKeyValue != NULL)
|
||||
{
|
||||
partitionColumnValue = job->partitionKeyValue->constvalue;
|
||||
partitionColumnType = job->partitionKeyValue->consttype;
|
||||
partitionColumnString = DatumToString(partitionColumnValue,
|
||||
partitionColumnType);
|
||||
}
|
||||
|
||||
SetJobColocationId(job);
|
||||
|
||||
job->taskList = SingleShardTaskList(originalQuery, job->jobId,
|
||||
|
@ -2000,7 +1978,7 @@ GenerateSingleShardRouterTaskList(Job *job, List *relationShardList,
|
|||
shardId,
|
||||
job->parametersInJobQueryResolved,
|
||||
isLocalTableModification,
|
||||
partitionColumnString, job->colocationId);
|
||||
job->partitionKeyValue, job->colocationId);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -2094,7 +2072,7 @@ static List *
|
|||
SingleShardTaskList(Query *query, uint64 jobId, List *relationShardList,
|
||||
List *placementList, uint64 shardId,
|
||||
bool parametersInQueryResolved,
|
||||
bool isLocalTableModification, char *partitionColumn,
|
||||
bool isLocalTableModification, Const *partitionKeyValue,
|
||||
int colocationId)
|
||||
{
|
||||
TaskType taskType = READ_TASK;
|
||||
|
@ -2165,7 +2143,7 @@ SingleShardTaskList(Query *query, uint64 jobId, List *relationShardList,
|
|||
* that the query cannot be executed locally.
|
||||
*/
|
||||
task->taskPlacementList = placementList;
|
||||
task->partitionColumn = partitionColumn;
|
||||
task->partitionKeyValue = partitionKeyValue;
|
||||
task->colocationId = colocationId;
|
||||
SetTaskQueryIfShouldLazyDeparse(task, query);
|
||||
task->anchorShardId = shardId;
|
||||
|
|
|
@ -2386,8 +2386,10 @@ RegisterCitusConfigVariables(void)
|
|||
|
||||
DefineCustomEnumVariable(
|
||||
"citus.stat_tenants_track",
|
||||
gettext_noop("enable disable"),
|
||||
NULL,
|
||||
gettext_noop("Enables/Disables the stats collection for citus_stat_tenants."),
|
||||
gettext_noop("Enables the stats collection when set to 'all'. "
|
||||
"Disables when set to 'none'. Disabling can be useful for "
|
||||
"avoiding extra CPU cycles needed for the calculations."),
|
||||
&StatTenantsTrack,
|
||||
STAT_TENANTS_TRACK_ALL,
|
||||
stat_tenants_track_options,
|
||||
|
@ -2397,7 +2399,7 @@ RegisterCitusConfigVariables(void)
|
|||
|
||||
DefineCustomIntVariable(
|
||||
"citus.stats_tenants_limit",
|
||||
gettext_noop("monitor limit"),
|
||||
gettext_noop("Number of tenants to be shown in citus_stat_tenants."),
|
||||
NULL,
|
||||
&CitusStatsTenantsLimit,
|
||||
10, 1, 100,
|
||||
|
@ -2407,7 +2409,8 @@ RegisterCitusConfigVariables(void)
|
|||
|
||||
DefineCustomIntVariable(
|
||||
"citus.stats_tenants_period",
|
||||
gettext_noop("monitor period"),
|
||||
gettext_noop("Period in seconds to be used for calculating the tenant "
|
||||
"statistics in citus_stat_tenants."),
|
||||
NULL,
|
||||
&CitusStatsTenantsPeriod,
|
||||
60, 1, 1000000000,
|
||||
|
|
|
@ -12,3 +12,6 @@ ALTER TABLE pg_catalog.pg_dist_transaction REPLICA IDENTITY USING INDEX pg_dist_
|
|||
#include "udfs/citus_internal_mark_node_not_synced/11.3-1.sql"
|
||||
#include "udfs/citus_stats_tenants_local/11.3-1.sql"
|
||||
#include "udfs/citus_stats_tenants/11.3-1.sql"
|
||||
|
||||
#include "udfs/citus_stats_tenants_local_reset/11.3-1.sql"
|
||||
#include "udfs/citus_stats_tenants_reset/11.3-1.sql"
|
||||
|
|
|
@ -26,3 +26,6 @@ DROP FUNCTION pg_catalog.citus_stats_tenants_local(boolean);
|
|||
|
||||
DROP VIEW pg_catalog.citus_stats_tenants;
|
||||
DROP FUNCTION pg_catalog.citus_stats_tenants(boolean);
|
||||
|
||||
DROP FUNCTION pg_catalog.citus_stats_tenants_local_reset();
|
||||
DROP FUNCTION pg_catalog.citus_stats_tenants_reset();
|
||||
|
|
|
@ -0,0 +1,7 @@
|
|||
CREATE OR REPLACE FUNCTION pg_catalog.citus_stats_tenants_local_reset()
|
||||
RETURNS VOID
|
||||
LANGUAGE C STRICT
|
||||
AS 'MODULE_PATHNAME', $$citus_stats_tenants_local_reset$$;
|
||||
|
||||
COMMENT ON FUNCTION pg_catalog.citus_stats_tenants_local_reset()
|
||||
IS 'resets the local tenant statistics';
|
|
@ -0,0 +1,7 @@
|
|||
CREATE OR REPLACE FUNCTION pg_catalog.citus_stats_tenants_local_reset()
|
||||
RETURNS VOID
|
||||
LANGUAGE C STRICT
|
||||
AS 'MODULE_PATHNAME', $$citus_stats_tenants_local_reset$$;
|
||||
|
||||
COMMENT ON FUNCTION pg_catalog.citus_stats_tenants_local_reset()
|
||||
IS 'resets the local tenant statistics';
|
|
@ -0,0 +1,8 @@
|
|||
CREATE OR REPLACE FUNCTION pg_catalog.citus_stats_tenants_reset()
|
||||
RETURNS VOID
|
||||
LANGUAGE plpgsql
|
||||
AS $function$
|
||||
BEGIN
|
||||
PERFORM run_command_on_all_nodes($$SELECT citus_stats_tenants_local_reset()$$);
|
||||
END;
|
||||
$function$;
|
|
@ -0,0 +1,8 @@
|
|||
CREATE OR REPLACE FUNCTION pg_catalog.citus_stats_tenants_reset()
|
||||
RETURNS VOID
|
||||
LANGUAGE plpgsql
|
||||
AS $function$
|
||||
BEGIN
|
||||
PERFORM run_command_on_all_nodes($$SELECT citus_stats_tenants_local_reset()$$);
|
||||
END;
|
||||
$function$;
|
|
@ -0,0 +1,38 @@
|
|||
/*-------------------------------------------------------------------------
|
||||
*
|
||||
* citus_stat_tenants.c
|
||||
*
|
||||
* This file contains functions to test citus_stats_tenants.
|
||||
*
|
||||
* Copyright (c) Citus Data, Inc.
|
||||
*
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
|
||||
#include "postgres.h"
|
||||
#include "fmgr.h"
|
||||
|
||||
#include "distributed/utils/attribute.h"
|
||||
#include "sys/time.h"
|
||||
|
||||
PG_FUNCTION_INFO_V1(sleep_until_next_period);
|
||||
|
||||
/*
|
||||
* sleep_until_next_period sleeps until the next monitoring period starts.
|
||||
*/
|
||||
Datum
|
||||
sleep_until_next_period(PG_FUNCTION_ARGS)
|
||||
{
|
||||
struct timeval currentTime;
|
||||
gettimeofday(¤tTime, NULL);
|
||||
|
||||
long int nextPeriodStart = currentTime.tv_sec -
|
||||
(currentTime.tv_sec % CitusStatsTenantsPeriod) +
|
||||
CitusStatsTenantsPeriod;
|
||||
|
||||
long int sleepTime = (nextPeriodStart - currentTime.tv_sec) * 1000000 -
|
||||
currentTime.tv_usec + 100000;
|
||||
pg_usleep(sleepTime);
|
||||
|
||||
PG_RETURN_VOID();
|
||||
}
|
|
@ -23,8 +23,9 @@
|
|||
#include "storage/ipc.h"
|
||||
#include "storage/lwlock.h"
|
||||
#include "storage/shmem.h"
|
||||
#include <sys/time.h>
|
||||
#include "sys/time.h"
|
||||
#include "utils/builtins.h"
|
||||
#include "utils/datetime.h"
|
||||
#include "utils/json.h"
|
||||
#include "distributed/utils/attribute.h"
|
||||
|
||||
|
@ -54,15 +55,15 @@ char *monitorTrancheName = "Multi Tenant Monitor Tranche";
|
|||
static shmem_startup_hook_type prev_shmem_startup_hook = NULL;
|
||||
|
||||
static int CompareTenantScore(const void *leftElement, const void *rightElement);
|
||||
static void UpdatePeriodsIfNecessary(TenantStats *tenantStats, time_t queryTime);
|
||||
static void ReduceScoreIfNecessary(TenantStats *tenantStats, time_t queryTime);
|
||||
static void EvictTenantsIfNecessary(time_t queryTime);
|
||||
static void UpdatePeriodsIfNecessary(TenantStats *tenantStats, TimestampTz queryTime);
|
||||
static void ReduceScoreIfNecessary(TenantStats *tenantStats, TimestampTz queryTime);
|
||||
static void EvictTenantsIfNecessary(TimestampTz queryTime);
|
||||
static void RecordTenantStats(TenantStats *tenantStats);
|
||||
static void CreateMultiTenantMonitor(void);
|
||||
static MultiTenantMonitor * CreateSharedMemoryForMultiTenantMonitor(void);
|
||||
static MultiTenantMonitor * GetMultiTenantMonitor(void);
|
||||
static void MultiTenantMonitorSMInit(void);
|
||||
static int CreateTenantStats(MultiTenantMonitor *monitor, time_t queryTime);
|
||||
static int CreateTenantStats(MultiTenantMonitor *monitor, TimestampTz queryTime);
|
||||
static int FindTenantStats(MultiTenantMonitor *monitor);
|
||||
static size_t MultiTenantMonitorshmemSize(void);
|
||||
static char * ExtractTopComment(const char *inputString);
|
||||
|
@ -76,8 +77,7 @@ int StatTenantsTrack = STAT_TENANTS_TRACK_ALL;
|
|||
|
||||
|
||||
PG_FUNCTION_INFO_V1(citus_stats_tenants_local);
|
||||
PG_FUNCTION_INFO_V1(clean_citus_stats_tenants);
|
||||
PG_FUNCTION_INFO_V1(sleep_until_next_period);
|
||||
PG_FUNCTION_INFO_V1(citus_stats_tenants_local_reset);
|
||||
|
||||
|
||||
/*
|
||||
|
@ -97,7 +97,7 @@ citus_stats_tenants_local(PG_FUNCTION_ARGS)
|
|||
|
||||
TupleDesc tupleDescriptor = NULL;
|
||||
Tuplestorestate *tupleStore = SetupTuplestore(fcinfo, &tupleDescriptor);
|
||||
time_t monitoringTime = time(0);
|
||||
TimestampTz monitoringTime = GetCurrentTimestamp();
|
||||
|
||||
Datum values[CITUS_STATS_TENANTS_COLUMNS];
|
||||
bool isNulls[CITUS_STATS_TENANTS_COLUMNS];
|
||||
|
@ -156,10 +156,11 @@ citus_stats_tenants_local(PG_FUNCTION_ARGS)
|
|||
|
||||
|
||||
/*
|
||||
* clean_citus_stats_tenants cleans the citus_stats_tenants monitor.
|
||||
* citus_stats_tenants_local_reset resets monitor for tenant statistics
|
||||
* on the local node.
|
||||
*/
|
||||
Datum
|
||||
clean_citus_stats_tenants(PG_FUNCTION_ARGS)
|
||||
citus_stats_tenants_local_reset(PG_FUNCTION_ARGS)
|
||||
{
|
||||
MultiTenantMonitor *monitor = GetMultiTenantMonitor();
|
||||
monitor->tenantCount = 0;
|
||||
|
@ -168,27 +169,6 @@ clean_citus_stats_tenants(PG_FUNCTION_ARGS)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* sleep_until_next_period sleeps until the next monitoring period starts.
|
||||
*/
|
||||
Datum
|
||||
sleep_until_next_period(PG_FUNCTION_ARGS)
|
||||
{
|
||||
struct timeval currentTime;
|
||||
gettimeofday(¤tTime, NULL);
|
||||
|
||||
long int nextPeriodStart = currentTime.tv_sec -
|
||||
(currentTime.tv_sec % CitusStatsTenantsPeriod) +
|
||||
CitusStatsTenantsPeriod;
|
||||
|
||||
long int sleepTime = (nextPeriodStart - currentTime.tv_sec) * 1000000 -
|
||||
currentTime.tv_usec + 100000;
|
||||
pg_usleep(sleepTime);
|
||||
|
||||
PG_RETURN_VOID();
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* AttributeQueryIfAnnotated checks the query annotation and if the query is annotated
|
||||
* for the tenant statistics monitoring this function records the tenant attributes.
|
||||
|
@ -254,14 +234,17 @@ AttributeTask(char *tenantId, int colocationId, CmdType commandType)
|
|||
* AnnotateQuery annotates the query with tenant attributes.
|
||||
*/
|
||||
char *
|
||||
AnnotateQuery(char *queryString, char *partitionColumn, int colocationId)
|
||||
AnnotateQuery(char *queryString, Const *partitionKeyValue, int colocationId)
|
||||
{
|
||||
if (StatTenantsTrack == STAT_TENANTS_TRACK_NONE || partitionColumn == NULL)
|
||||
if (StatTenantsTrack == STAT_TENANTS_TRACK_NONE || partitionKeyValue == NULL)
|
||||
{
|
||||
return queryString;
|
||||
}
|
||||
|
||||
char *commentCharsEscaped = EscapeCommentChars(partitionColumn);
|
||||
char *partitionKeyValueString = DatumToString(partitionKeyValue->constvalue,
|
||||
partitionKeyValue->consttype);
|
||||
|
||||
char *commentCharsEscaped = EscapeCommentChars(partitionKeyValueString);
|
||||
StringInfo escapedSourceName = makeStringInfo();
|
||||
|
||||
escape_json(escapedSourceName, commentCharsEscaped);
|
||||
|
@ -329,21 +312,61 @@ CompareTenantScore(const void *leftElement, const void *rightElement)
|
|||
static void
|
||||
AttributeMetricsIfApplicable()
|
||||
{
|
||||
if (StatTenantsTrack == STAT_TENANTS_TRACK_NONE)
|
||||
if (StatTenantsTrack == STAT_TENANTS_TRACK_NONE ||
|
||||
attributeToTenant[0] == '\0')
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
if (strcmp(attributeToTenant, "") != 0)
|
||||
{
|
||||
time_t queryTime = time(0);
|
||||
TimestampTz queryTime = GetCurrentTimestamp();
|
||||
|
||||
MultiTenantMonitor *monitor = GetMultiTenantMonitor();
|
||||
MultiTenantMonitor *monitor = GetMultiTenantMonitor();
|
||||
|
||||
/*
|
||||
* We need to acquire the monitor lock in shared mode to check if the tenant is
|
||||
* already in the monitor. If it is not, we need to acquire the lock in
|
||||
* exclusive mode to add the tenant to the monitor.
|
||||
*
|
||||
* We need to check again if the tenant is in the monitor after acquiring the
|
||||
* exclusive lock to avoid adding the tenant twice. Some other backend might
|
||||
* have added the tenant while we were waiting for the lock.
|
||||
*
|
||||
* After releasing the exclusive lock, we need to acquire the lock in shared
|
||||
* mode to update the tenant's statistics. We need to check again if the tenant
|
||||
* is in the monitor after acquiring the shared lock because some other backend
|
||||
* might have removed the tenant while we were waiting for the lock.
|
||||
*/
|
||||
LWLockAcquire(&monitor->lock, LW_SHARED);
|
||||
|
||||
int currentTenantIndex = FindTenantStats(monitor);
|
||||
|
||||
if (currentTenantIndex != -1)
|
||||
{
|
||||
TenantStats *tenantStats = &monitor->tenants[currentTenantIndex];
|
||||
LWLockAcquire(&tenantStats->lock, LW_EXCLUSIVE);
|
||||
|
||||
UpdatePeriodsIfNecessary(tenantStats, queryTime);
|
||||
ReduceScoreIfNecessary(tenantStats, queryTime);
|
||||
RecordTenantStats(tenantStats);
|
||||
|
||||
LWLockRelease(&tenantStats->lock);
|
||||
}
|
||||
else
|
||||
{
|
||||
LWLockRelease(&monitor->lock);
|
||||
|
||||
LWLockAcquire(&monitor->lock, LW_EXCLUSIVE);
|
||||
currentTenantIndex = FindTenantStats(monitor);
|
||||
|
||||
if (currentTenantIndex == -1)
|
||||
{
|
||||
currentTenantIndex = CreateTenantStats(monitor, queryTime);
|
||||
}
|
||||
|
||||
LWLockRelease(&monitor->lock);
|
||||
|
||||
LWLockAcquire(&monitor->lock, LW_SHARED);
|
||||
|
||||
int currentTenantIndex = FindTenantStats(monitor);
|
||||
|
||||
currentTenantIndex = FindTenantStats(monitor);
|
||||
if (currentTenantIndex != -1)
|
||||
{
|
||||
TenantStats *tenantStats = &monitor->tenants[currentTenantIndex];
|
||||
|
@ -355,36 +378,8 @@ AttributeMetricsIfApplicable()
|
|||
|
||||
LWLockRelease(&tenantStats->lock);
|
||||
}
|
||||
else
|
||||
{
|
||||
LWLockRelease(&monitor->lock);
|
||||
|
||||
LWLockAcquire(&monitor->lock, LW_EXCLUSIVE);
|
||||
currentTenantIndex = FindTenantStats(monitor);
|
||||
|
||||
if (currentTenantIndex == -1)
|
||||
{
|
||||
currentTenantIndex = CreateTenantStats(monitor, queryTime);
|
||||
}
|
||||
|
||||
LWLockRelease(&monitor->lock);
|
||||
|
||||
LWLockAcquire(&monitor->lock, LW_SHARED);
|
||||
currentTenantIndex = FindTenantStats(monitor);
|
||||
if (currentTenantIndex != -1)
|
||||
{
|
||||
TenantStats *tenantStats = &monitor->tenants[currentTenantIndex];
|
||||
LWLockAcquire(&tenantStats->lock, LW_EXCLUSIVE);
|
||||
|
||||
UpdatePeriodsIfNecessary(tenantStats, queryTime);
|
||||
ReduceScoreIfNecessary(tenantStats, queryTime);
|
||||
RecordTenantStats(tenantStats);
|
||||
|
||||
LWLockRelease(&tenantStats->lock);
|
||||
}
|
||||
}
|
||||
LWLockRelease(&monitor->lock);
|
||||
}
|
||||
LWLockRelease(&monitor->lock);
|
||||
|
||||
strcpy_s(attributeToTenant, sizeof(attributeToTenant), "");
|
||||
}
|
||||
|
@ -400,9 +395,10 @@ AttributeMetricsIfApplicable()
|
|||
* statistics.
|
||||
*/
|
||||
static void
|
||||
UpdatePeriodsIfNecessary(TenantStats *tenantStats, time_t queryTime)
|
||||
UpdatePeriodsIfNecessary(TenantStats *tenantStats, TimestampTz queryTime)
|
||||
{
|
||||
time_t periodStart = queryTime - (queryTime % CitusStatsTenantsPeriod);
|
||||
long long int periodInMicroSeconds = CitusStatsTenantsPeriod * USECS_PER_SEC;
|
||||
TimestampTz periodStart = queryTime - (queryTime % periodInMicroSeconds);
|
||||
|
||||
/*
|
||||
* If the last query in this tenant was before the start of current period
|
||||
|
@ -421,7 +417,8 @@ UpdatePeriodsIfNecessary(TenantStats *tenantStats, time_t queryTime)
|
|||
/*
|
||||
* If the last query is more than two periods ago, we clean the last period counts too.
|
||||
*/
|
||||
if (tenantStats->lastQueryTime < periodStart - CitusStatsTenantsPeriod)
|
||||
if (TimestampDifferenceExceeds(tenantStats->lastQueryTime, periodStart,
|
||||
periodInMicroSeconds))
|
||||
{
|
||||
tenantStats->writesInLastPeriod = 0;
|
||||
|
||||
|
@ -439,9 +436,10 @@ UpdatePeriodsIfNecessary(TenantStats *tenantStats, time_t queryTime)
|
|||
* periods that passed after the lsat score reduction and reduces the score accordingly.
|
||||
*/
|
||||
static void
|
||||
ReduceScoreIfNecessary(TenantStats *tenantStats, time_t queryTime)
|
||||
ReduceScoreIfNecessary(TenantStats *tenantStats, TimestampTz queryTime)
|
||||
{
|
||||
time_t periodStart = queryTime - (queryTime % CitusStatsTenantsPeriod);
|
||||
long long int periodInMicroSeconds = CitusStatsTenantsPeriod * USECS_PER_SEC;
|
||||
TimestampTz periodStart = queryTime - (queryTime % periodInMicroSeconds);
|
||||
|
||||
/*
|
||||
* With each query we increase the score of tenant by ONE_QUERY_SCORE.
|
||||
|
@ -453,8 +451,8 @@ ReduceScoreIfNecessary(TenantStats *tenantStats, time_t queryTime)
|
|||
*/
|
||||
int periodCountAfterLastScoreReduction = (periodStart -
|
||||
tenantStats->lastScoreReduction +
|
||||
CitusStatsTenantsPeriod - 1) /
|
||||
CitusStatsTenantsPeriod;
|
||||
periodInMicroSeconds - 1) /
|
||||
periodInMicroSeconds;
|
||||
|
||||
/*
|
||||
* This should not happen but let's make sure
|
||||
|
@ -480,7 +478,7 @@ ReduceScoreIfNecessary(TenantStats *tenantStats, time_t queryTime)
|
|||
* equal to 3 * CitusStatsTenantsLimit.
|
||||
*/
|
||||
static void
|
||||
EvictTenantsIfNecessary(time_t queryTime)
|
||||
EvictTenantsIfNecessary(TimestampTz queryTime)
|
||||
{
|
||||
MultiTenantMonitor *monitor = GetMultiTenantMonitor();
|
||||
|
||||
|
@ -618,9 +616,11 @@ MultiTenantMonitorSMInit()
|
|||
|
||||
/*
|
||||
* CreateTenantStats creates the data structure for a tenant's statistics.
|
||||
*
|
||||
* Calling this function should be protected by the monitor->lock in LW_EXCLUSIVE mode.
|
||||
*/
|
||||
static int
|
||||
CreateTenantStats(MultiTenantMonitor *monitor, time_t queryTime)
|
||||
CreateTenantStats(MultiTenantMonitor *monitor, TimestampTz queryTime)
|
||||
{
|
||||
/*
|
||||
* If the tenant count reached 3 * CitusStatsTenantsLimit, we evict the tenants
|
||||
|
|
|
@ -331,7 +331,7 @@ typedef struct Task
|
|||
*/
|
||||
bool cannotBeExecutedInTransction;
|
||||
|
||||
char *partitionColumn;
|
||||
Const *partitionKeyValue;
|
||||
int colocationId;
|
||||
} Task;
|
||||
|
||||
|
|
|
@ -14,6 +14,7 @@
|
|||
#include "executor/execdesc.h"
|
||||
#include "executor/executor.h"
|
||||
#include "storage/lwlock.h"
|
||||
#include "utils/datetime.h"
|
||||
|
||||
#define MAX_TENANT_ATTRIBUTE_LENGTH 100
|
||||
|
||||
|
@ -44,7 +45,7 @@ typedef struct TenantStats
|
|||
/*
|
||||
* The latest time this tenant ran a query. This value is used to update the score later.
|
||||
*/
|
||||
time_t lastQueryTime;
|
||||
TimestampTz lastQueryTime;
|
||||
|
||||
/*
|
||||
* The tenant monitoring score of this tenant. This value is increased by ONE_QUERY_SCORE at every query
|
||||
|
@ -55,7 +56,7 @@ typedef struct TenantStats
|
|||
/*
|
||||
* The latest time the score of this tenant is halved. This value is used to correctly calculate the reduction later.
|
||||
*/
|
||||
time_t lastScoreReduction;
|
||||
TimestampTz lastScoreReduction;
|
||||
|
||||
/*
|
||||
* Locks needed to update this tenant's statistics.
|
||||
|
@ -95,7 +96,8 @@ typedef enum
|
|||
|
||||
extern void CitusAttributeToEnd(QueryDesc *queryDesc);
|
||||
extern void AttributeQueryIfAnnotated(const char *queryString, CmdType commandType);
|
||||
extern char * AnnotateQuery(char *queryString, char *partitionColumn, int colocationId);
|
||||
extern char * AnnotateQuery(char *queryString, Const *partitionKeyValue,
|
||||
int colocationId);
|
||||
extern void InitializeMultiTenantMonitorSMHandleManagement(void);
|
||||
extern void AttributeTask(char *tenantId, int colocationGroupId, CmdType commandType);
|
||||
|
||||
|
|
|
@ -2,21 +2,15 @@ CREATE SCHEMA citus_stats_tenants;
|
|||
SET search_path TO citus_stats_tenants;
|
||||
SET citus.next_shard_id TO 5797500;
|
||||
SET citus.shard_replication_factor TO 1;
|
||||
CREATE OR REPLACE FUNCTION pg_catalog.clean_citus_stats_tenants()
|
||||
RETURNS VOID
|
||||
LANGUAGE C
|
||||
AS 'citus', $$clean_citus_stats_tenants$$;
|
||||
CREATE OR REPLACE FUNCTION pg_catalog.sleep_until_next_period()
|
||||
RETURNS VOID
|
||||
LANGUAGE C
|
||||
AS 'citus', $$sleep_until_next_period$$;
|
||||
SELECT result FROM run_command_on_all_nodes('SELECT clean_citus_stats_tenants()');
|
||||
result
|
||||
SELECT citus_stats_tenants_reset();
|
||||
citus_stats_tenants_reset
|
||||
---------------------------------------------------------------------
|
||||
|
||||
|
||||
|
||||
(3 rows)
|
||||
(1 row)
|
||||
|
||||
-- set period to a high number to prevent stats from being reset
|
||||
SELECT result FROM run_command_on_all_nodes('ALTER SYSTEM SET citus.stats_tenants_period TO 1000000000');
|
||||
|
@ -78,13 +72,11 @@ SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, q
|
|||
5 | 0 | 0 | 1 | 0
|
||||
(5 rows)
|
||||
|
||||
SELECT result FROM run_command_on_all_nodes('SELECT clean_citus_stats_tenants()');
|
||||
result
|
||||
SELECT citus_stats_tenants_reset();
|
||||
citus_stats_tenants_reset
|
||||
---------------------------------------------------------------------
|
||||
|
||||
|
||||
|
||||
(3 rows)
|
||||
(1 row)
|
||||
|
||||
-- queries with multiple tenants should not be counted
|
||||
SELECT count(*)>=0 FROM dist_tbl WHERE a IN (1, 5);
|
||||
|
@ -226,13 +218,11 @@ SELECT tenant_attribute, query_count_in_this_period, score FROM citus_stats_tena
|
|||
(7 rows)
|
||||
|
||||
-- test period passing
|
||||
SELECT result FROM run_command_on_all_nodes('SELECT clean_citus_stats_tenants()');
|
||||
result
|
||||
SELECT citus_stats_tenants_reset();
|
||||
citus_stats_tenants_reset
|
||||
---------------------------------------------------------------------
|
||||
|
||||
|
||||
|
||||
(3 rows)
|
||||
(1 row)
|
||||
|
||||
SELECT count(*)>=0 FROM dist_tbl WHERE a = 1;
|
||||
?column?
|
||||
|
@ -264,14 +254,6 @@ SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, q
|
|||
5 | 0 | 0 | 0 | 1
|
||||
(2 rows)
|
||||
|
||||
\c - - - :worker_2_port
|
||||
SELECT tenant_attribute, query_count_in_this_period, score FROM citus_stats_tenants(true) ORDER BY score DESC;
|
||||
tenant_attribute | query_count_in_this_period | score
|
||||
---------------------------------------------------------------------
|
||||
1 | 0 | 500000000
|
||||
5 | 0 | 500000000
|
||||
(2 rows)
|
||||
|
||||
\c - - - :master_port
|
||||
SET search_path TO citus_stats_tenants;
|
||||
-- test logs
|
||||
|
@ -315,13 +297,11 @@ CONTEXT: PL/pgSQL function citus_stats_tenants(boolean) line XX at RAISE
|
|||
(1 row)
|
||||
|
||||
RESET client_min_messages;
|
||||
SELECT result FROM run_command_on_all_nodes('SELECT clean_citus_stats_tenants()');
|
||||
result
|
||||
SELECT citus_stats_tenants_reset();
|
||||
citus_stats_tenants_reset
|
||||
---------------------------------------------------------------------
|
||||
|
||||
|
||||
|
||||
(3 rows)
|
||||
(1 row)
|
||||
|
||||
-- test turning monitoring on/off
|
||||
SET citus.stat_tenants_track TO "NONE";
|
||||
|
@ -357,13 +337,11 @@ SELECT tenant_attribute, query_count_in_this_period FROM citus_stats_tenants;
|
|||
(1 row)
|
||||
|
||||
-- test special and multibyte characters in tenant attribute
|
||||
SELECT result FROM run_command_on_all_nodes('SELECT clean_citus_stats_tenants()');
|
||||
result
|
||||
SELECT citus_stats_tenants_reset();
|
||||
citus_stats_tenants_reset
|
||||
---------------------------------------------------------------------
|
||||
|
||||
|
||||
|
||||
(3 rows)
|
||||
(1 row)
|
||||
|
||||
TRUNCATE TABLE dist_tbl_text;
|
||||
SELECT count(*)>=0 FROM dist_tbl_text WHERE a = '/bcde';
|
||||
|
@ -459,12 +437,11 @@ SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, q
|
|||
bcde*/ | 1 | 0 | 1 | 0
|
||||
(10 rows)
|
||||
|
||||
SELECT result FROM run_command_on_all_nodes('SELECT clean_citus_stats_tenants()');
|
||||
result
|
||||
SELECT citus_stats_tenants_reset();
|
||||
citus_stats_tenants_reset
|
||||
---------------------------------------------------------------------
|
||||
|
||||
|
||||
(2 rows)
|
||||
(1 row)
|
||||
|
||||
-- test local queries
|
||||
-- all of these distribution column values are from second worker
|
||||
|
@ -672,13 +649,11 @@ SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, q
|
|||
|
||||
\c - - - :master_port
|
||||
SET search_path TO citus_stats_tenants;
|
||||
SELECT result FROM run_command_on_all_nodes('SELECT clean_citus_stats_tenants()');
|
||||
result
|
||||
SELECT citus_stats_tenants_reset();
|
||||
citus_stats_tenants_reset
|
||||
---------------------------------------------------------------------
|
||||
|
||||
|
||||
|
||||
(3 rows)
|
||||
(1 row)
|
||||
|
||||
SELECT count(*)>=0 FROM dist_tbl_text WHERE a = 'thisisaveryloooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooongname';
|
||||
?column?
|
||||
|
|
|
@ -1368,11 +1368,13 @@ SELECT * FROM multi_extension.print_extension_changes();
|
|||
| function citus_internal_stop_replication_origin_tracking() void
|
||||
| function citus_stats_tenants(boolean) SETOF record
|
||||
| function citus_stats_tenants_local(boolean) SETOF record
|
||||
| function citus_stats_tenants_local_reset() void
|
||||
| function citus_stats_tenants_reset() void
|
||||
| function worker_adjust_identity_column_seq_ranges(regclass) void
|
||||
| function worker_drop_all_shell_tables(boolean)
|
||||
| view citus_stats_tenants
|
||||
| view citus_stats_tenants_local
|
||||
(10 rows)
|
||||
(12 rows)
|
||||
|
||||
DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff;
|
||||
-- show running version
|
||||
|
|
|
@ -127,6 +127,8 @@ ORDER BY 1;
|
|||
function citus_stat_statements_reset()
|
||||
function citus_stats_tenants(boolean)
|
||||
function citus_stats_tenants_local(boolean)
|
||||
function citus_stats_tenants_local_reset()
|
||||
function citus_stats_tenants_reset()
|
||||
function citus_table_is_visible(oid)
|
||||
function citus_table_size(regclass)
|
||||
function citus_task_wait(bigint,citus_task_status)
|
||||
|
@ -328,5 +330,5 @@ ORDER BY 1;
|
|||
view citus_stats_tenants_local
|
||||
view pg_dist_shard_placement
|
||||
view time_partitions
|
||||
(320 rows)
|
||||
(322 rows)
|
||||
|
||||
|
|
|
@ -3,17 +3,12 @@ SET search_path TO citus_stats_tenants;
|
|||
SET citus.next_shard_id TO 5797500;
|
||||
SET citus.shard_replication_factor TO 1;
|
||||
|
||||
CREATE OR REPLACE FUNCTION pg_catalog.clean_citus_stats_tenants()
|
||||
RETURNS VOID
|
||||
LANGUAGE C
|
||||
AS 'citus', $$clean_citus_stats_tenants$$;
|
||||
|
||||
CREATE OR REPLACE FUNCTION pg_catalog.sleep_until_next_period()
|
||||
RETURNS VOID
|
||||
LANGUAGE C
|
||||
AS 'citus', $$sleep_until_next_period$$;
|
||||
|
||||
SELECT result FROM run_command_on_all_nodes('SELECT clean_citus_stats_tenants()');
|
||||
SELECT citus_stats_tenants_reset();
|
||||
|
||||
-- set period to a high number to prevent stats from being reset
|
||||
SELECT result FROM run_command_on_all_nodes('ALTER SYSTEM SET citus.stats_tenants_period TO 1000000000');
|
||||
|
@ -39,7 +34,7 @@ DELETE FROM dist_tbl WHERE a = 5;
|
|||
|
||||
SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period FROM citus_stats_tenants(true) ORDER BY tenant_attribute;
|
||||
|
||||
SELECT result FROM run_command_on_all_nodes('SELECT clean_citus_stats_tenants()');
|
||||
SELECT citus_stats_tenants_reset();
|
||||
|
||||
-- queries with multiple tenants should not be counted
|
||||
SELECT count(*)>=0 FROM dist_tbl WHERE a IN (1, 5);
|
||||
|
@ -80,7 +75,7 @@ SELECT count(*)>=0 FROM dist_tbl_text WHERE a = 'defg';
|
|||
SELECT tenant_attribute, query_count_in_this_period, score FROM citus_stats_tenants(true) WHERE nodeid = :worker_2_nodeid ORDER BY score DESC, tenant_attribute;
|
||||
|
||||
-- test period passing
|
||||
SELECT result FROM run_command_on_all_nodes('SELECT clean_citus_stats_tenants()');
|
||||
SELECT citus_stats_tenants_reset();
|
||||
|
||||
SELECT count(*)>=0 FROM dist_tbl WHERE a = 1;
|
||||
INSERT INTO dist_tbl VALUES (5, 'abcd');
|
||||
|
@ -93,8 +88,6 @@ SET citus.stats_tenants_period TO 2;
|
|||
SELECT sleep_until_next_period();
|
||||
|
||||
SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period FROM citus_stats_tenants_local ORDER BY tenant_attribute;
|
||||
\c - - - :worker_2_port
|
||||
SELECT tenant_attribute, query_count_in_this_period, score FROM citus_stats_tenants(true) ORDER BY score DESC;
|
||||
|
||||
\c - - - :master_port
|
||||
SET search_path TO citus_stats_tenants;
|
||||
|
@ -112,7 +105,7 @@ SET citus.multi_tenant_monitoring_log_level TO DEBUG;
|
|||
SELECT count(*)>=0 FROM citus_stats_tenants;
|
||||
RESET client_min_messages;
|
||||
|
||||
SELECT result FROM run_command_on_all_nodes('SELECT clean_citus_stats_tenants()');
|
||||
SELECT citus_stats_tenants_reset();
|
||||
|
||||
-- test turning monitoring on/off
|
||||
SET citus.stat_tenants_track TO "NONE";
|
||||
|
@ -131,7 +124,7 @@ INSERT INTO dist_tbl VALUES (1, 1);
|
|||
SELECT tenant_attribute, query_count_in_this_period FROM citus_stats_tenants;
|
||||
|
||||
-- test special and multibyte characters in tenant attribute
|
||||
SELECT result FROM run_command_on_all_nodes('SELECT clean_citus_stats_tenants()');
|
||||
SELECT citus_stats_tenants_reset();
|
||||
TRUNCATE TABLE dist_tbl_text;
|
||||
|
||||
SELECT count(*)>=0 FROM dist_tbl_text WHERE a = '/bcde';
|
||||
|
@ -152,7 +145,7 @@ SET search_path TO citus_stats_tenants;
|
|||
|
||||
SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period FROM citus_stats_tenants ORDER BY tenant_attribute;
|
||||
|
||||
SELECT result FROM run_command_on_all_nodes('SELECT clean_citus_stats_tenants()');
|
||||
SELECT citus_stats_tenants_reset();
|
||||
|
||||
-- test local queries
|
||||
-- all of these distribution column values are from second worker
|
||||
|
@ -209,7 +202,7 @@ SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, q
|
|||
\c - - - :master_port
|
||||
SET search_path TO citus_stats_tenants;
|
||||
|
||||
SELECT result FROM run_command_on_all_nodes('SELECT clean_citus_stats_tenants()');
|
||||
SELECT citus_stats_tenants_reset();
|
||||
SELECT count(*)>=0 FROM dist_tbl_text WHERE a = 'thisisaveryloooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooongname';
|
||||
SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period FROM citus_stats_tenants ORDER BY tenant_attribute;
|
||||
|
||||
|
|
Loading…
Reference in New Issue