From 2b7482ea0dd755e15905793a10dcb87dbc3f14a7 Mon Sep 17 00:00:00 2001 From: Halil Ozan Akgul Date: Mon, 3 Apr 2023 17:49:37 +0300 Subject: [PATCH] Address reviews on the PR --- .../distributed/executor/local_executor.c | 10 +- .../distributed/planner/deparse_shard_query.c | 15 +- .../distributed/planner/distributed_planner.c | 7 +- .../planner/multi_router_planner.c | 32 +--- src/backend/distributed/shared_library_init.c | 11 +- .../distributed/sql/citus--11.2-1--11.3-1.sql | 3 + .../sql/downgrades/citus--11.3-1--11.2-1.sql | 3 + .../11.3-1.sql | 7 + .../latest.sql | 7 + .../udfs/citus_stats_tenants_reset/11.3-1.sql | 8 + .../udfs/citus_stats_tenants_reset/latest.sql | 8 + .../distributed/test/citus_stat_tenants.c | 38 +++++ src/backend/distributed/utils/attribute.c | 160 +++++++++--------- .../distributed/multi_physical_planner.h | 2 +- src/include/distributed/utils/attribute.h | 8 +- .../regress/expected/citus_stats_tenants.out | 67 +++----- src/test/regress/expected/multi_extension.out | 4 +- .../expected/upgrade_list_citus_objects.out | 4 +- src/test/regress/sql/citus_stats_tenants.sql | 21 +-- 19 files changed, 222 insertions(+), 193 deletions(-) create mode 100644 src/backend/distributed/sql/udfs/citus_stats_tenants_local_reset/11.3-1.sql create mode 100644 src/backend/distributed/sql/udfs/citus_stats_tenants_local_reset/latest.sql create mode 100644 src/backend/distributed/sql/udfs/citus_stats_tenants_reset/11.3-1.sql create mode 100644 src/backend/distributed/sql/udfs/citus_stats_tenants_reset/latest.sql create mode 100644 src/backend/distributed/test/citus_stat_tenants.c diff --git a/src/backend/distributed/executor/local_executor.c b/src/backend/distributed/executor/local_executor.c index bffecb4ac..ecbc04bd3 100644 --- a/src/backend/distributed/executor/local_executor.c +++ b/src/backend/distributed/executor/local_executor.c @@ -648,7 +648,15 @@ LocallyExecuteTaskPlan(PlannedStmt *taskPlan, char *queryString, LocalExecutorShardId = task->anchorShardId; } - AttributeTask(task->partitionColumn, task->colocationId, taskPlan->commandType); + + char *partitionKeyValueString = NULL; + if (task->partitionKeyValue != NULL) + { + partitionKeyValueString = DatumToString(task->partitionKeyValue->constvalue, + task->partitionKeyValue->consttype); + } + + AttributeTask(partitionKeyValueString, task->colocationId, taskPlan->commandType); PG_TRY(); { diff --git a/src/backend/distributed/planner/deparse_shard_query.c b/src/backend/distributed/planner/deparse_shard_query.c index 136268ba0..56670b96e 100644 --- a/src/backend/distributed/planner/deparse_shard_query.c +++ b/src/backend/distributed/planner/deparse_shard_query.c @@ -142,18 +142,7 @@ RebuildQueryStrings(Job *workerJob) ? "(null)" : TaskQueryString(task)))); - Datum partitionColumnValue; - Oid partitionColumnType = 0; - char *partitionColumnString = NULL; - if (workerJob->partitionKeyValue != NULL) - { - partitionColumnValue = workerJob->partitionKeyValue->constvalue; - partitionColumnType = workerJob->partitionKeyValue->consttype; - partitionColumnString = DatumToString(partitionColumnValue, - partitionColumnType); - } - - task->partitionColumn = partitionColumnString; + task->partitionKeyValue = workerJob->partitionKeyValue; SetJobColocationId(workerJob); task->colocationId = workerJob->colocationId; @@ -404,7 +393,7 @@ SetTaskQueryIfShouldLazyDeparse(Task *task, Query *query) } SetTaskQueryString(task, AnnotateQuery(DeparseTaskQuery(task, query), - task->partitionColumn, task->colocationId)); + task->partitionKeyValue, task->colocationId)); } diff --git a/src/backend/distributed/planner/distributed_planner.c b/src/backend/distributed/planner/distributed_planner.c index c5415fb34..4ca72a4f1 100644 --- a/src/backend/distributed/planner/distributed_planner.c +++ b/src/backend/distributed/planner/distributed_planner.c @@ -157,8 +157,6 @@ distributed_planner(Query *parse, bool fastPathRouterQuery = false; Node *distributionKeyValue = NULL; - AttributeQueryIfAnnotated(query_string, parse->commandType); - List *rangeTableList = ExtractRangeTableEntryList(parse); if (cursorOptions & CURSOR_OPT_FORCE_DISTRIBUTED) @@ -308,6 +306,11 @@ distributed_planner(Query *parse, errhint("Consider using PL/pgSQL functions instead."))); } + /* + * We annotate the query for tenant statisisics. + */ + AttributeQueryIfAnnotated(query_string, parse->commandType); + return result; } diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index 480e54b68..94691bab9 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -164,7 +164,7 @@ static int CompareInsertValuesByShardId(const void *leftElement, static List * SingleShardTaskList(Query *query, uint64 jobId, List *relationShardList, List *placementList, uint64 shardId, bool parametersInQueryResolved, - bool isLocalTableModification, char *partitionColumn, + bool isLocalTableModification, Const *partitionKeyValue, int colocationId); static bool RowLocksOnRelations(Node *node, List **rtiLockList); static void ReorderTaskPlacementsByTaskAssignmentPolicy(Job *job, @@ -1940,17 +1940,6 @@ GenerateSingleShardRouterTaskList(Job *job, List *relationShardList, if (originalQuery->commandType == CMD_SELECT) { - Datum partitionColumnValue; - Oid partitionColumnType = 0; - char *partitionColumnString = NULL; - if (job->partitionKeyValue != NULL) - { - partitionColumnValue = job->partitionKeyValue->constvalue; - partitionColumnType = job->partitionKeyValue->consttype; - partitionColumnString = DatumToString(partitionColumnValue, - partitionColumnType); - } - SetJobColocationId(job); job->taskList = SingleShardTaskList(originalQuery, job->jobId, @@ -1958,7 +1947,7 @@ GenerateSingleShardRouterTaskList(Job *job, List *relationShardList, shardId, job->parametersInJobQueryResolved, isLocalTableModification, - partitionColumnString, job->colocationId); + job->partitionKeyValue, job->colocationId); /* * Queries to reference tables, or distributed tables with multiple replica's have @@ -1982,17 +1971,6 @@ GenerateSingleShardRouterTaskList(Job *job, List *relationShardList, } else { - Datum partitionColumnValue; - Oid partitionColumnType = 0; - char *partitionColumnString = NULL; - if (job->partitionKeyValue != NULL) - { - partitionColumnValue = job->partitionKeyValue->constvalue; - partitionColumnType = job->partitionKeyValue->consttype; - partitionColumnString = DatumToString(partitionColumnValue, - partitionColumnType); - } - SetJobColocationId(job); job->taskList = SingleShardTaskList(originalQuery, job->jobId, @@ -2000,7 +1978,7 @@ GenerateSingleShardRouterTaskList(Job *job, List *relationShardList, shardId, job->parametersInJobQueryResolved, isLocalTableModification, - partitionColumnString, job->colocationId); + job->partitionKeyValue, job->colocationId); } } @@ -2094,7 +2072,7 @@ static List * SingleShardTaskList(Query *query, uint64 jobId, List *relationShardList, List *placementList, uint64 shardId, bool parametersInQueryResolved, - bool isLocalTableModification, char *partitionColumn, + bool isLocalTableModification, Const *partitionKeyValue, int colocationId) { TaskType taskType = READ_TASK; @@ -2165,7 +2143,7 @@ SingleShardTaskList(Query *query, uint64 jobId, List *relationShardList, * that the query cannot be executed locally. */ task->taskPlacementList = placementList; - task->partitionColumn = partitionColumn; + task->partitionKeyValue = partitionKeyValue; task->colocationId = colocationId; SetTaskQueryIfShouldLazyDeparse(task, query); task->anchorShardId = shardId; diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index ebe162d23..baf9283b9 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -2386,8 +2386,10 @@ RegisterCitusConfigVariables(void) DefineCustomEnumVariable( "citus.stat_tenants_track", - gettext_noop("enable disable"), - NULL, + gettext_noop("Enables/Disables the stats collection for citus_stat_tenants."), + gettext_noop("Enables the stats collection when set to 'all'. " + "Disables when set to 'none'. Disabling can be useful for " + "avoiding extra CPU cycles needed for the calculations."), &StatTenantsTrack, STAT_TENANTS_TRACK_ALL, stat_tenants_track_options, @@ -2397,7 +2399,7 @@ RegisterCitusConfigVariables(void) DefineCustomIntVariable( "citus.stats_tenants_limit", - gettext_noop("monitor limit"), + gettext_noop("Number of tenants to be shown in citus_stat_tenants."), NULL, &CitusStatsTenantsLimit, 10, 1, 100, @@ -2407,7 +2409,8 @@ RegisterCitusConfigVariables(void) DefineCustomIntVariable( "citus.stats_tenants_period", - gettext_noop("monitor period"), + gettext_noop("Period in seconds to be used for calculating the tenant " + "statistics in citus_stat_tenants."), NULL, &CitusStatsTenantsPeriod, 60, 1, 1000000000, diff --git a/src/backend/distributed/sql/citus--11.2-1--11.3-1.sql b/src/backend/distributed/sql/citus--11.2-1--11.3-1.sql index 7b2ffdb5b..1384617c0 100644 --- a/src/backend/distributed/sql/citus--11.2-1--11.3-1.sql +++ b/src/backend/distributed/sql/citus--11.2-1--11.3-1.sql @@ -12,3 +12,6 @@ ALTER TABLE pg_catalog.pg_dist_transaction REPLICA IDENTITY USING INDEX pg_dist_ #include "udfs/citus_internal_mark_node_not_synced/11.3-1.sql" #include "udfs/citus_stats_tenants_local/11.3-1.sql" #include "udfs/citus_stats_tenants/11.3-1.sql" + +#include "udfs/citus_stats_tenants_local_reset/11.3-1.sql" +#include "udfs/citus_stats_tenants_reset/11.3-1.sql" diff --git a/src/backend/distributed/sql/downgrades/citus--11.3-1--11.2-1.sql b/src/backend/distributed/sql/downgrades/citus--11.3-1--11.2-1.sql index 35565bff1..64e75a4db 100644 --- a/src/backend/distributed/sql/downgrades/citus--11.3-1--11.2-1.sql +++ b/src/backend/distributed/sql/downgrades/citus--11.3-1--11.2-1.sql @@ -26,3 +26,6 @@ DROP FUNCTION pg_catalog.citus_stats_tenants_local(boolean); DROP VIEW pg_catalog.citus_stats_tenants; DROP FUNCTION pg_catalog.citus_stats_tenants(boolean); + +DROP FUNCTION pg_catalog.citus_stats_tenants_local_reset(); +DROP FUNCTION pg_catalog.citus_stats_tenants_reset(); diff --git a/src/backend/distributed/sql/udfs/citus_stats_tenants_local_reset/11.3-1.sql b/src/backend/distributed/sql/udfs/citus_stats_tenants_local_reset/11.3-1.sql new file mode 100644 index 000000000..99c39d4c2 --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_stats_tenants_local_reset/11.3-1.sql @@ -0,0 +1,7 @@ +CREATE OR REPLACE FUNCTION pg_catalog.citus_stats_tenants_local_reset() + RETURNS VOID + LANGUAGE C STRICT +AS 'MODULE_PATHNAME', $$citus_stats_tenants_local_reset$$; + +COMMENT ON FUNCTION pg_catalog.citus_stats_tenants_local_reset() + IS 'resets the local tenant statistics'; diff --git a/src/backend/distributed/sql/udfs/citus_stats_tenants_local_reset/latest.sql b/src/backend/distributed/sql/udfs/citus_stats_tenants_local_reset/latest.sql new file mode 100644 index 000000000..99c39d4c2 --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_stats_tenants_local_reset/latest.sql @@ -0,0 +1,7 @@ +CREATE OR REPLACE FUNCTION pg_catalog.citus_stats_tenants_local_reset() + RETURNS VOID + LANGUAGE C STRICT +AS 'MODULE_PATHNAME', $$citus_stats_tenants_local_reset$$; + +COMMENT ON FUNCTION pg_catalog.citus_stats_tenants_local_reset() + IS 'resets the local tenant statistics'; diff --git a/src/backend/distributed/sql/udfs/citus_stats_tenants_reset/11.3-1.sql b/src/backend/distributed/sql/udfs/citus_stats_tenants_reset/11.3-1.sql new file mode 100644 index 000000000..9e0fade87 --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_stats_tenants_reset/11.3-1.sql @@ -0,0 +1,8 @@ +CREATE OR REPLACE FUNCTION pg_catalog.citus_stats_tenants_reset() + RETURNS VOID + LANGUAGE plpgsql +AS $function$ +BEGIN + PERFORM run_command_on_all_nodes($$SELECT citus_stats_tenants_local_reset()$$); +END; +$function$; diff --git a/src/backend/distributed/sql/udfs/citus_stats_tenants_reset/latest.sql b/src/backend/distributed/sql/udfs/citus_stats_tenants_reset/latest.sql new file mode 100644 index 000000000..9e0fade87 --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_stats_tenants_reset/latest.sql @@ -0,0 +1,8 @@ +CREATE OR REPLACE FUNCTION pg_catalog.citus_stats_tenants_reset() + RETURNS VOID + LANGUAGE plpgsql +AS $function$ +BEGIN + PERFORM run_command_on_all_nodes($$SELECT citus_stats_tenants_local_reset()$$); +END; +$function$; diff --git a/src/backend/distributed/test/citus_stat_tenants.c b/src/backend/distributed/test/citus_stat_tenants.c new file mode 100644 index 000000000..6ef425908 --- /dev/null +++ b/src/backend/distributed/test/citus_stat_tenants.c @@ -0,0 +1,38 @@ +/*------------------------------------------------------------------------- + * + * citus_stat_tenants.c + * + * This file contains functions to test citus_stats_tenants. + * + * Copyright (c) Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" +#include "fmgr.h" + +#include "distributed/utils/attribute.h" +#include "sys/time.h" + +PG_FUNCTION_INFO_V1(sleep_until_next_period); + +/* + * sleep_until_next_period sleeps until the next monitoring period starts. + */ +Datum +sleep_until_next_period(PG_FUNCTION_ARGS) +{ + struct timeval currentTime; + gettimeofday(¤tTime, NULL); + + long int nextPeriodStart = currentTime.tv_sec - + (currentTime.tv_sec % CitusStatsTenantsPeriod) + + CitusStatsTenantsPeriod; + + long int sleepTime = (nextPeriodStart - currentTime.tv_sec) * 1000000 - + currentTime.tv_usec + 100000; + pg_usleep(sleepTime); + + PG_RETURN_VOID(); +} diff --git a/src/backend/distributed/utils/attribute.c b/src/backend/distributed/utils/attribute.c index 250039f31..cdb274129 100644 --- a/src/backend/distributed/utils/attribute.c +++ b/src/backend/distributed/utils/attribute.c @@ -23,8 +23,9 @@ #include "storage/ipc.h" #include "storage/lwlock.h" #include "storage/shmem.h" -#include +#include "sys/time.h" #include "utils/builtins.h" +#include "utils/datetime.h" #include "utils/json.h" #include "distributed/utils/attribute.h" @@ -54,15 +55,15 @@ char *monitorTrancheName = "Multi Tenant Monitor Tranche"; static shmem_startup_hook_type prev_shmem_startup_hook = NULL; static int CompareTenantScore(const void *leftElement, const void *rightElement); -static void UpdatePeriodsIfNecessary(TenantStats *tenantStats, time_t queryTime); -static void ReduceScoreIfNecessary(TenantStats *tenantStats, time_t queryTime); -static void EvictTenantsIfNecessary(time_t queryTime); +static void UpdatePeriodsIfNecessary(TenantStats *tenantStats, TimestampTz queryTime); +static void ReduceScoreIfNecessary(TenantStats *tenantStats, TimestampTz queryTime); +static void EvictTenantsIfNecessary(TimestampTz queryTime); static void RecordTenantStats(TenantStats *tenantStats); static void CreateMultiTenantMonitor(void); static MultiTenantMonitor * CreateSharedMemoryForMultiTenantMonitor(void); static MultiTenantMonitor * GetMultiTenantMonitor(void); static void MultiTenantMonitorSMInit(void); -static int CreateTenantStats(MultiTenantMonitor *monitor, time_t queryTime); +static int CreateTenantStats(MultiTenantMonitor *monitor, TimestampTz queryTime); static int FindTenantStats(MultiTenantMonitor *monitor); static size_t MultiTenantMonitorshmemSize(void); static char * ExtractTopComment(const char *inputString); @@ -76,8 +77,7 @@ int StatTenantsTrack = STAT_TENANTS_TRACK_ALL; PG_FUNCTION_INFO_V1(citus_stats_tenants_local); -PG_FUNCTION_INFO_V1(clean_citus_stats_tenants); -PG_FUNCTION_INFO_V1(sleep_until_next_period); +PG_FUNCTION_INFO_V1(citus_stats_tenants_local_reset); /* @@ -97,7 +97,7 @@ citus_stats_tenants_local(PG_FUNCTION_ARGS) TupleDesc tupleDescriptor = NULL; Tuplestorestate *tupleStore = SetupTuplestore(fcinfo, &tupleDescriptor); - time_t monitoringTime = time(0); + TimestampTz monitoringTime = GetCurrentTimestamp(); Datum values[CITUS_STATS_TENANTS_COLUMNS]; bool isNulls[CITUS_STATS_TENANTS_COLUMNS]; @@ -156,10 +156,11 @@ citus_stats_tenants_local(PG_FUNCTION_ARGS) /* - * clean_citus_stats_tenants cleans the citus_stats_tenants monitor. + * citus_stats_tenants_local_reset resets monitor for tenant statistics + * on the local node. */ Datum -clean_citus_stats_tenants(PG_FUNCTION_ARGS) +citus_stats_tenants_local_reset(PG_FUNCTION_ARGS) { MultiTenantMonitor *monitor = GetMultiTenantMonitor(); monitor->tenantCount = 0; @@ -168,27 +169,6 @@ clean_citus_stats_tenants(PG_FUNCTION_ARGS) } -/* - * sleep_until_next_period sleeps until the next monitoring period starts. - */ -Datum -sleep_until_next_period(PG_FUNCTION_ARGS) -{ - struct timeval currentTime; - gettimeofday(¤tTime, NULL); - - long int nextPeriodStart = currentTime.tv_sec - - (currentTime.tv_sec % CitusStatsTenantsPeriod) + - CitusStatsTenantsPeriod; - - long int sleepTime = (nextPeriodStart - currentTime.tv_sec) * 1000000 - - currentTime.tv_usec + 100000; - pg_usleep(sleepTime); - - PG_RETURN_VOID(); -} - - /* * AttributeQueryIfAnnotated checks the query annotation and if the query is annotated * for the tenant statistics monitoring this function records the tenant attributes. @@ -254,14 +234,17 @@ AttributeTask(char *tenantId, int colocationId, CmdType commandType) * AnnotateQuery annotates the query with tenant attributes. */ char * -AnnotateQuery(char *queryString, char *partitionColumn, int colocationId) +AnnotateQuery(char *queryString, Const *partitionKeyValue, int colocationId) { - if (StatTenantsTrack == STAT_TENANTS_TRACK_NONE || partitionColumn == NULL) + if (StatTenantsTrack == STAT_TENANTS_TRACK_NONE || partitionKeyValue == NULL) { return queryString; } - char *commentCharsEscaped = EscapeCommentChars(partitionColumn); + char *partitionKeyValueString = DatumToString(partitionKeyValue->constvalue, + partitionKeyValue->consttype); + + char *commentCharsEscaped = EscapeCommentChars(partitionKeyValueString); StringInfo escapedSourceName = makeStringInfo(); escape_json(escapedSourceName, commentCharsEscaped); @@ -329,21 +312,61 @@ CompareTenantScore(const void *leftElement, const void *rightElement) static void AttributeMetricsIfApplicable() { - if (StatTenantsTrack == STAT_TENANTS_TRACK_NONE) + if (StatTenantsTrack == STAT_TENANTS_TRACK_NONE || + attributeToTenant[0] == '\0') { return; } - if (strcmp(attributeToTenant, "") != 0) - { - time_t queryTime = time(0); + TimestampTz queryTime = GetCurrentTimestamp(); - MultiTenantMonitor *monitor = GetMultiTenantMonitor(); + MultiTenantMonitor *monitor = GetMultiTenantMonitor(); + + /* + * We need to acquire the monitor lock in shared mode to check if the tenant is + * already in the monitor. If it is not, we need to acquire the lock in + * exclusive mode to add the tenant to the monitor. + * + * We need to check again if the tenant is in the monitor after acquiring the + * exclusive lock to avoid adding the tenant twice. Some other backend might + * have added the tenant while we were waiting for the lock. + * + * After releasing the exclusive lock, we need to acquire the lock in shared + * mode to update the tenant's statistics. We need to check again if the tenant + * is in the monitor after acquiring the shared lock because some other backend + * might have removed the tenant while we were waiting for the lock. + */ + LWLockAcquire(&monitor->lock, LW_SHARED); + + int currentTenantIndex = FindTenantStats(monitor); + + if (currentTenantIndex != -1) + { + TenantStats *tenantStats = &monitor->tenants[currentTenantIndex]; + LWLockAcquire(&tenantStats->lock, LW_EXCLUSIVE); + + UpdatePeriodsIfNecessary(tenantStats, queryTime); + ReduceScoreIfNecessary(tenantStats, queryTime); + RecordTenantStats(tenantStats); + + LWLockRelease(&tenantStats->lock); + } + else + { + LWLockRelease(&monitor->lock); + + LWLockAcquire(&monitor->lock, LW_EXCLUSIVE); + currentTenantIndex = FindTenantStats(monitor); + + if (currentTenantIndex == -1) + { + currentTenantIndex = CreateTenantStats(monitor, queryTime); + } + + LWLockRelease(&monitor->lock); LWLockAcquire(&monitor->lock, LW_SHARED); - - int currentTenantIndex = FindTenantStats(monitor); - + currentTenantIndex = FindTenantStats(monitor); if (currentTenantIndex != -1) { TenantStats *tenantStats = &monitor->tenants[currentTenantIndex]; @@ -355,36 +378,8 @@ AttributeMetricsIfApplicable() LWLockRelease(&tenantStats->lock); } - else - { - LWLockRelease(&monitor->lock); - - LWLockAcquire(&monitor->lock, LW_EXCLUSIVE); - currentTenantIndex = FindTenantStats(monitor); - - if (currentTenantIndex == -1) - { - currentTenantIndex = CreateTenantStats(monitor, queryTime); - } - - LWLockRelease(&monitor->lock); - - LWLockAcquire(&monitor->lock, LW_SHARED); - currentTenantIndex = FindTenantStats(monitor); - if (currentTenantIndex != -1) - { - TenantStats *tenantStats = &monitor->tenants[currentTenantIndex]; - LWLockAcquire(&tenantStats->lock, LW_EXCLUSIVE); - - UpdatePeriodsIfNecessary(tenantStats, queryTime); - ReduceScoreIfNecessary(tenantStats, queryTime); - RecordTenantStats(tenantStats); - - LWLockRelease(&tenantStats->lock); - } - } - LWLockRelease(&monitor->lock); } + LWLockRelease(&monitor->lock); strcpy_s(attributeToTenant, sizeof(attributeToTenant), ""); } @@ -400,9 +395,10 @@ AttributeMetricsIfApplicable() * statistics. */ static void -UpdatePeriodsIfNecessary(TenantStats *tenantStats, time_t queryTime) +UpdatePeriodsIfNecessary(TenantStats *tenantStats, TimestampTz queryTime) { - time_t periodStart = queryTime - (queryTime % CitusStatsTenantsPeriod); + long long int periodInMicroSeconds = CitusStatsTenantsPeriod * USECS_PER_SEC; + TimestampTz periodStart = queryTime - (queryTime % periodInMicroSeconds); /* * If the last query in this tenant was before the start of current period @@ -421,7 +417,8 @@ UpdatePeriodsIfNecessary(TenantStats *tenantStats, time_t queryTime) /* * If the last query is more than two periods ago, we clean the last period counts too. */ - if (tenantStats->lastQueryTime < periodStart - CitusStatsTenantsPeriod) + if (TimestampDifferenceExceeds(tenantStats->lastQueryTime, periodStart, + periodInMicroSeconds)) { tenantStats->writesInLastPeriod = 0; @@ -439,9 +436,10 @@ UpdatePeriodsIfNecessary(TenantStats *tenantStats, time_t queryTime) * periods that passed after the lsat score reduction and reduces the score accordingly. */ static void -ReduceScoreIfNecessary(TenantStats *tenantStats, time_t queryTime) +ReduceScoreIfNecessary(TenantStats *tenantStats, TimestampTz queryTime) { - time_t periodStart = queryTime - (queryTime % CitusStatsTenantsPeriod); + long long int periodInMicroSeconds = CitusStatsTenantsPeriod * USECS_PER_SEC; + TimestampTz periodStart = queryTime - (queryTime % periodInMicroSeconds); /* * With each query we increase the score of tenant by ONE_QUERY_SCORE. @@ -453,8 +451,8 @@ ReduceScoreIfNecessary(TenantStats *tenantStats, time_t queryTime) */ int periodCountAfterLastScoreReduction = (periodStart - tenantStats->lastScoreReduction + - CitusStatsTenantsPeriod - 1) / - CitusStatsTenantsPeriod; + periodInMicroSeconds - 1) / + periodInMicroSeconds; /* * This should not happen but let's make sure @@ -480,7 +478,7 @@ ReduceScoreIfNecessary(TenantStats *tenantStats, time_t queryTime) * equal to 3 * CitusStatsTenantsLimit. */ static void -EvictTenantsIfNecessary(time_t queryTime) +EvictTenantsIfNecessary(TimestampTz queryTime) { MultiTenantMonitor *monitor = GetMultiTenantMonitor(); @@ -618,9 +616,11 @@ MultiTenantMonitorSMInit() /* * CreateTenantStats creates the data structure for a tenant's statistics. + * + * Calling this function should be protected by the monitor->lock in LW_EXCLUSIVE mode. */ static int -CreateTenantStats(MultiTenantMonitor *monitor, time_t queryTime) +CreateTenantStats(MultiTenantMonitor *monitor, TimestampTz queryTime) { /* * If the tenant count reached 3 * CitusStatsTenantsLimit, we evict the tenants diff --git a/src/include/distributed/multi_physical_planner.h b/src/include/distributed/multi_physical_planner.h index 49fe28f1d..ea5d15c83 100644 --- a/src/include/distributed/multi_physical_planner.h +++ b/src/include/distributed/multi_physical_planner.h @@ -331,7 +331,7 @@ typedef struct Task */ bool cannotBeExecutedInTransction; - char *partitionColumn; + Const *partitionKeyValue; int colocationId; } Task; diff --git a/src/include/distributed/utils/attribute.h b/src/include/distributed/utils/attribute.h index 77d7b08a3..c9e357439 100644 --- a/src/include/distributed/utils/attribute.h +++ b/src/include/distributed/utils/attribute.h @@ -14,6 +14,7 @@ #include "executor/execdesc.h" #include "executor/executor.h" #include "storage/lwlock.h" +#include "utils/datetime.h" #define MAX_TENANT_ATTRIBUTE_LENGTH 100 @@ -44,7 +45,7 @@ typedef struct TenantStats /* * The latest time this tenant ran a query. This value is used to update the score later. */ - time_t lastQueryTime; + TimestampTz lastQueryTime; /* * The tenant monitoring score of this tenant. This value is increased by ONE_QUERY_SCORE at every query @@ -55,7 +56,7 @@ typedef struct TenantStats /* * The latest time the score of this tenant is halved. This value is used to correctly calculate the reduction later. */ - time_t lastScoreReduction; + TimestampTz lastScoreReduction; /* * Locks needed to update this tenant's statistics. @@ -95,7 +96,8 @@ typedef enum extern void CitusAttributeToEnd(QueryDesc *queryDesc); extern void AttributeQueryIfAnnotated(const char *queryString, CmdType commandType); -extern char * AnnotateQuery(char *queryString, char *partitionColumn, int colocationId); +extern char * AnnotateQuery(char *queryString, Const *partitionKeyValue, + int colocationId); extern void InitializeMultiTenantMonitorSMHandleManagement(void); extern void AttributeTask(char *tenantId, int colocationGroupId, CmdType commandType); diff --git a/src/test/regress/expected/citus_stats_tenants.out b/src/test/regress/expected/citus_stats_tenants.out index e1bcacc55..51b3d38af 100644 --- a/src/test/regress/expected/citus_stats_tenants.out +++ b/src/test/regress/expected/citus_stats_tenants.out @@ -2,21 +2,15 @@ CREATE SCHEMA citus_stats_tenants; SET search_path TO citus_stats_tenants; SET citus.next_shard_id TO 5797500; SET citus.shard_replication_factor TO 1; -CREATE OR REPLACE FUNCTION pg_catalog.clean_citus_stats_tenants() -RETURNS VOID -LANGUAGE C -AS 'citus', $$clean_citus_stats_tenants$$; CREATE OR REPLACE FUNCTION pg_catalog.sleep_until_next_period() RETURNS VOID LANGUAGE C AS 'citus', $$sleep_until_next_period$$; -SELECT result FROM run_command_on_all_nodes('SELECT clean_citus_stats_tenants()'); - result +SELECT citus_stats_tenants_reset(); + citus_stats_tenants_reset --------------------------------------------------------------------- - - -(3 rows) +(1 row) -- set period to a high number to prevent stats from being reset SELECT result FROM run_command_on_all_nodes('ALTER SYSTEM SET citus.stats_tenants_period TO 1000000000'); @@ -78,13 +72,11 @@ SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, q 5 | 0 | 0 | 1 | 0 (5 rows) -SELECT result FROM run_command_on_all_nodes('SELECT clean_citus_stats_tenants()'); - result +SELECT citus_stats_tenants_reset(); + citus_stats_tenants_reset --------------------------------------------------------------------- - - -(3 rows) +(1 row) -- queries with multiple tenants should not be counted SELECT count(*)>=0 FROM dist_tbl WHERE a IN (1, 5); @@ -226,13 +218,11 @@ SELECT tenant_attribute, query_count_in_this_period, score FROM citus_stats_tena (7 rows) -- test period passing -SELECT result FROM run_command_on_all_nodes('SELECT clean_citus_stats_tenants()'); - result +SELECT citus_stats_tenants_reset(); + citus_stats_tenants_reset --------------------------------------------------------------------- - - -(3 rows) +(1 row) SELECT count(*)>=0 FROM dist_tbl WHERE a = 1; ?column? @@ -264,14 +254,6 @@ SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, q 5 | 0 | 0 | 0 | 1 (2 rows) -\c - - - :worker_2_port -SELECT tenant_attribute, query_count_in_this_period, score FROM citus_stats_tenants(true) ORDER BY score DESC; - tenant_attribute | query_count_in_this_period | score ---------------------------------------------------------------------- - 1 | 0 | 500000000 - 5 | 0 | 500000000 -(2 rows) - \c - - - :master_port SET search_path TO citus_stats_tenants; -- test logs @@ -315,13 +297,11 @@ CONTEXT: PL/pgSQL function citus_stats_tenants(boolean) line XX at RAISE (1 row) RESET client_min_messages; -SELECT result FROM run_command_on_all_nodes('SELECT clean_citus_stats_tenants()'); - result +SELECT citus_stats_tenants_reset(); + citus_stats_tenants_reset --------------------------------------------------------------------- - - -(3 rows) +(1 row) -- test turning monitoring on/off SET citus.stat_tenants_track TO "NONE"; @@ -357,13 +337,11 @@ SELECT tenant_attribute, query_count_in_this_period FROM citus_stats_tenants; (1 row) -- test special and multibyte characters in tenant attribute -SELECT result FROM run_command_on_all_nodes('SELECT clean_citus_stats_tenants()'); - result +SELECT citus_stats_tenants_reset(); + citus_stats_tenants_reset --------------------------------------------------------------------- - - -(3 rows) +(1 row) TRUNCATE TABLE dist_tbl_text; SELECT count(*)>=0 FROM dist_tbl_text WHERE a = '/bcde'; @@ -459,12 +437,11 @@ SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, q bcde*/ | 1 | 0 | 1 | 0 (10 rows) -SELECT result FROM run_command_on_all_nodes('SELECT clean_citus_stats_tenants()'); - result +SELECT citus_stats_tenants_reset(); + citus_stats_tenants_reset --------------------------------------------------------------------- - -(2 rows) +(1 row) -- test local queries -- all of these distribution column values are from second worker @@ -672,13 +649,11 @@ SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, q \c - - - :master_port SET search_path TO citus_stats_tenants; -SELECT result FROM run_command_on_all_nodes('SELECT clean_citus_stats_tenants()'); - result +SELECT citus_stats_tenants_reset(); + citus_stats_tenants_reset --------------------------------------------------------------------- - - -(3 rows) +(1 row) SELECT count(*)>=0 FROM dist_tbl_text WHERE a = 'thisisaveryloooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooongname'; ?column? diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index ccefd0098..89ec3f244 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -1368,11 +1368,13 @@ SELECT * FROM multi_extension.print_extension_changes(); | function citus_internal_stop_replication_origin_tracking() void | function citus_stats_tenants(boolean) SETOF record | function citus_stats_tenants_local(boolean) SETOF record + | function citus_stats_tenants_local_reset() void + | function citus_stats_tenants_reset() void | function worker_adjust_identity_column_seq_ranges(regclass) void | function worker_drop_all_shell_tables(boolean) | view citus_stats_tenants | view citus_stats_tenants_local -(10 rows) +(12 rows) DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff; -- show running version diff --git a/src/test/regress/expected/upgrade_list_citus_objects.out b/src/test/regress/expected/upgrade_list_citus_objects.out index 45a6df2e6..89f1fe132 100644 --- a/src/test/regress/expected/upgrade_list_citus_objects.out +++ b/src/test/regress/expected/upgrade_list_citus_objects.out @@ -127,6 +127,8 @@ ORDER BY 1; function citus_stat_statements_reset() function citus_stats_tenants(boolean) function citus_stats_tenants_local(boolean) + function citus_stats_tenants_local_reset() + function citus_stats_tenants_reset() function citus_table_is_visible(oid) function citus_table_size(regclass) function citus_task_wait(bigint,citus_task_status) @@ -328,5 +330,5 @@ ORDER BY 1; view citus_stats_tenants_local view pg_dist_shard_placement view time_partitions -(320 rows) +(322 rows) diff --git a/src/test/regress/sql/citus_stats_tenants.sql b/src/test/regress/sql/citus_stats_tenants.sql index e0fb5bad1..6641653b5 100644 --- a/src/test/regress/sql/citus_stats_tenants.sql +++ b/src/test/regress/sql/citus_stats_tenants.sql @@ -3,17 +3,12 @@ SET search_path TO citus_stats_tenants; SET citus.next_shard_id TO 5797500; SET citus.shard_replication_factor TO 1; -CREATE OR REPLACE FUNCTION pg_catalog.clean_citus_stats_tenants() -RETURNS VOID -LANGUAGE C -AS 'citus', $$clean_citus_stats_tenants$$; - CREATE OR REPLACE FUNCTION pg_catalog.sleep_until_next_period() RETURNS VOID LANGUAGE C AS 'citus', $$sleep_until_next_period$$; -SELECT result FROM run_command_on_all_nodes('SELECT clean_citus_stats_tenants()'); +SELECT citus_stats_tenants_reset(); -- set period to a high number to prevent stats from being reset SELECT result FROM run_command_on_all_nodes('ALTER SYSTEM SET citus.stats_tenants_period TO 1000000000'); @@ -39,7 +34,7 @@ DELETE FROM dist_tbl WHERE a = 5; SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period FROM citus_stats_tenants(true) ORDER BY tenant_attribute; -SELECT result FROM run_command_on_all_nodes('SELECT clean_citus_stats_tenants()'); +SELECT citus_stats_tenants_reset(); -- queries with multiple tenants should not be counted SELECT count(*)>=0 FROM dist_tbl WHERE a IN (1, 5); @@ -80,7 +75,7 @@ SELECT count(*)>=0 FROM dist_tbl_text WHERE a = 'defg'; SELECT tenant_attribute, query_count_in_this_period, score FROM citus_stats_tenants(true) WHERE nodeid = :worker_2_nodeid ORDER BY score DESC, tenant_attribute; -- test period passing -SELECT result FROM run_command_on_all_nodes('SELECT clean_citus_stats_tenants()'); +SELECT citus_stats_tenants_reset(); SELECT count(*)>=0 FROM dist_tbl WHERE a = 1; INSERT INTO dist_tbl VALUES (5, 'abcd'); @@ -93,8 +88,6 @@ SET citus.stats_tenants_period TO 2; SELECT sleep_until_next_period(); SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period FROM citus_stats_tenants_local ORDER BY tenant_attribute; -\c - - - :worker_2_port -SELECT tenant_attribute, query_count_in_this_period, score FROM citus_stats_tenants(true) ORDER BY score DESC; \c - - - :master_port SET search_path TO citus_stats_tenants; @@ -112,7 +105,7 @@ SET citus.multi_tenant_monitoring_log_level TO DEBUG; SELECT count(*)>=0 FROM citus_stats_tenants; RESET client_min_messages; -SELECT result FROM run_command_on_all_nodes('SELECT clean_citus_stats_tenants()'); +SELECT citus_stats_tenants_reset(); -- test turning monitoring on/off SET citus.stat_tenants_track TO "NONE"; @@ -131,7 +124,7 @@ INSERT INTO dist_tbl VALUES (1, 1); SELECT tenant_attribute, query_count_in_this_period FROM citus_stats_tenants; -- test special and multibyte characters in tenant attribute -SELECT result FROM run_command_on_all_nodes('SELECT clean_citus_stats_tenants()'); +SELECT citus_stats_tenants_reset(); TRUNCATE TABLE dist_tbl_text; SELECT count(*)>=0 FROM dist_tbl_text WHERE a = '/bcde'; @@ -152,7 +145,7 @@ SET search_path TO citus_stats_tenants; SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period FROM citus_stats_tenants ORDER BY tenant_attribute; -SELECT result FROM run_command_on_all_nodes('SELECT clean_citus_stats_tenants()'); +SELECT citus_stats_tenants_reset(); -- test local queries -- all of these distribution column values are from second worker @@ -209,7 +202,7 @@ SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, q \c - - - :master_port SET search_path TO citus_stats_tenants; -SELECT result FROM run_command_on_all_nodes('SELECT clean_citus_stats_tenants()'); +SELECT citus_stats_tenants_reset(); SELECT count(*)>=0 FROM dist_tbl_text WHERE a = 'thisisaveryloooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooongname'; SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period FROM citus_stats_tenants ORDER BY tenant_attribute;