diff --git a/src/backend/distributed/executor/citus_custom_scan.c b/src/backend/distributed/executor/citus_custom_scan.c index 5e4afd1a7..28486f23d 100644 --- a/src/backend/distributed/executor/citus_custom_scan.c +++ b/src/backend/distributed/executor/citus_custom_scan.c @@ -66,7 +66,6 @@ static DistributedPlan * CopyDistributedPlanWithoutCache( DistributedPlan *originalDistributedPlan); static void CitusEndScan(CustomScanState *node); static void CitusReScan(CustomScanState *node); -static void SetJobColocationId(Job *job); static void EnsureForceDelegationDistributionKey(Job *job); static void EnsureAnchorShardsInJobExist(Job *job); static bool AnchorShardsInTaskListExist(List *taskList); @@ -892,7 +891,7 @@ IsCitusCustomScan(Plan *plan) * colocation group, the Job's colocation ID is set to the group ID, else, * it will be set to INVALID_COLOCATION_ID. */ -static void +void SetJobColocationId(Job *job) { uint32 jobColocationId = INVALID_COLOCATION_ID; diff --git a/src/backend/distributed/planner/deparse_shard_query.c b/src/backend/distributed/planner/deparse_shard_query.c index e62821ad0..22cf4415c 100644 --- a/src/backend/distributed/planner/deparse_shard_query.c +++ b/src/backend/distributed/planner/deparse_shard_query.c @@ -26,6 +26,7 @@ #include "distributed/multi_physical_planner.h" #include "distributed/multi_router_planner.h" #include "distributed/shard_utils.h" +#include "distributed/utils/attribute.h" #include "distributed/version_compat.h" #include "lib/stringinfo.h" #include "nodes/makefuncs.h" @@ -141,6 +142,20 @@ RebuildQueryStrings(Job *workerJob) ? "(null)" : TaskQueryString(task)))); + Datum partitionColumnValue; + Oid partitionColumnType = 0; + char *partitionColumnString = NULL; + if (workerJob->partitionKeyValue != NULL) + { + partitionColumnValue = workerJob->partitionKeyValue->constvalue; + partitionColumnType = workerJob->partitionKeyValue->consttype; + partitionColumnString = DatumToString(partitionColumnValue, partitionColumnType); + } + + task->partitionColumn = partitionColumnString; + SetJobColocationId(workerJob); + task->colocationId = workerJob->colocationId; + UpdateTaskQueryString(query, task); /* @@ -387,7 +402,7 @@ SetTaskQueryIfShouldLazyDeparse(Task *task, Query *query) return; } - SetTaskQueryString(task, DeparseTaskQuery(task, query)); + SetTaskQueryString(task, AnnotateQuery(DeparseTaskQuery(task, query), task->partitionColumn, task->colocationId)); } diff --git a/src/backend/distributed/planner/distributed_planner.c b/src/backend/distributed/planner/distributed_planner.c index 8c09b78eb..e069ec417 100644 --- a/src/backend/distributed/planner/distributed_planner.c +++ b/src/backend/distributed/planner/distributed_planner.c @@ -146,7 +146,7 @@ distributed_planner(Query *parse, bool fastPathRouterQuery = false; Node *distributionKeyValue = NULL; - AttributeQueryIfAnnotated(query_string); + AttributeQueryIfAnnotated(query_string, parse->commandType); List *rangeTableList = ExtractRangeTableEntryList(parse); diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index 5fcb4dfea..cf87760d2 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -170,7 +170,7 @@ static int CompareInsertValuesByShardId(const void *leftElement, static List * SingleShardTaskList(Query *query, uint64 jobId, List *relationShardList, List *placementList, uint64 shardId, bool parametersInQueryResolved, - bool isLocalTableModification); + bool isLocalTableModification, char * partitionColumn, int colocationId); static bool RowLocksOnRelations(Node *node, List **rtiLockList); static void ReorderTaskPlacementsByTaskAssignmentPolicy(Job *job, TaskAssignmentPolicyType @@ -1954,11 +1954,23 @@ GenerateSingleShardRouterTaskList(Job *job, List *relationShardList, if (originalQuery->commandType == CMD_SELECT) { + Datum partitionColumnValue; + Oid partitionColumnType = 0; + char *partitionColumnString = NULL; + if (job->partitionKeyValue != NULL) + { + partitionColumnValue = job->partitionKeyValue->constvalue; + partitionColumnType = job->partitionKeyValue->consttype; + partitionColumnString = DatumToString(partitionColumnValue, partitionColumnType); + } + + SetJobColocationId(job); + job->taskList = SingleShardTaskList(originalQuery, job->jobId, relationShardList, placementList, shardId, job->parametersInJobQueryResolved, - isLocalTableModification); + isLocalTableModification, partitionColumnString, job->colocationId); /* * Queries to reference tables, or distributed tables with multiple replica's have @@ -1986,7 +1998,7 @@ GenerateSingleShardRouterTaskList(Job *job, List *relationShardList, relationShardList, placementList, shardId, job->parametersInJobQueryResolved, - isLocalTableModification); + isLocalTableModification, "", -1); } } @@ -2080,7 +2092,7 @@ static List * SingleShardTaskList(Query *query, uint64 jobId, List *relationShardList, List *placementList, uint64 shardId, bool parametersInQueryResolved, - bool isLocalTableModification) + bool isLocalTableModification, char * partitionColumn, int colocationId) { TaskType taskType = READ_TASK; char replicationModel = 0; @@ -2150,6 +2162,8 @@ SingleShardTaskList(Query *query, uint64 jobId, List *relationShardList, * that the query cannot be executed locally. */ task->taskPlacementList = placementList; + task->partitionColumn = partitionColumn; + task->colocationId = colocationId; SetTaskQueryIfShouldLazyDeparse(task, query); task->anchorShardId = shardId; task->jobId = jobId; diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index 790a3b5ef..82fa9a6a3 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -475,6 +475,8 @@ _PG_init(void) /* initialize shard split shared memory handle management */ InitializeShardSplitSMHandleManagement(); + InitializeMultiTenantMonitorSMHandleManagement(); + /* enable modification of pg_catalog tables during pg_upgrade */ if (IsBinaryUpgrade) { @@ -1901,6 +1903,16 @@ RegisterCitusConfigVariables(void) PGC_USERSET, GUC_STANDARD, NULL, NULL, NULL); + + DefineCustomEnumVariable( + "citus.multi_tenant_monitoring_log_level", + gettext_noop("Sets the level of multi tenant monitoring log messages"), + NULL, + &MultiTenantMonitoringLogLevel, + CITUS_LOG_LEVEL_OFF, log_level_options, + PGC_USERSET, + GUC_STANDARD, + NULL, NULL, NULL); DefineCustomIntVariable( "citus.next_cleanup_record_id", @@ -2286,6 +2298,26 @@ RegisterCitusConfigVariables(void) GUC_STANDARD, NULL, NULL, NULL); + DefineCustomIntVariable( + "citus.stats_tenants_limit", + gettext_noop("monitor limit"), + NULL, + &CitusStatsTenantsLimit, + 10, 1, 100, + PGC_USERSET, + GUC_STANDARD, + NULL, NULL, NULL); + + DefineCustomIntVariable( + "citus.stats_tenants_period", + gettext_noop("monitor period"), + NULL, + &CitusStatsTenantsPeriod, + 60, 1, 60 * 60, + PGC_USERSET, + GUC_STANDARD, + NULL, NULL, NULL); + DefineCustomBoolVariable( "citus.subquery_pushdown", gettext_noop("Usage of this GUC is highly discouraged, please read the long " diff --git a/src/backend/distributed/sql/citus--11.2-1--11.3-1.sql b/src/backend/distributed/sql/citus--11.2-1--11.3-1.sql index 981c5f375..6fbeede70 100644 --- a/src/backend/distributed/sql/citus--11.2-1--11.3-1.sql +++ b/src/backend/distributed/sql/citus--11.2-1--11.3-1.sql @@ -2,3 +2,5 @@ -- bump version to 11.3-1 +#include "udfs/citus_stats_tenants/11.3-1.sql" +#include "udfs/citus_stats_tenants_storage/11.3-1.sql" diff --git a/src/backend/distributed/sql/udfs/citus_stats_tenants/11.3-1.sql b/src/backend/distributed/sql/udfs/citus_stats_tenants/11.3-1.sql new file mode 100644 index 000000000..f476a9c28 --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_stats_tenants/11.3-1.sql @@ -0,0 +1,27 @@ +CREATE OR REPLACE FUNCTION pg_catalog.citus_stats_tenants( + return_all_tenants BOOLEAN DEFAULT FALSE, + OUT colocation_id INT, + OUT tenant_attribute TEXT, + OUT read_count_in_this_period INT, + OUT read_count_in_last_period INT, + OUT query_count_in_this_period INT, + OUT query_count_in_last_period INT, + OUT score BIGINT) +RETURNS SETOF RECORD +LANGUAGE C +AS 'citus', $$citus_stats_tenants$$; + + +CREATE OR REPLACE VIEW citus.citus_stats_tenants AS +SELECT + colocation_id, + tenant_attribute, + read_count_in_this_period, + read_count_in_last_period, + query_count_in_this_period, + query_count_in_last_period +FROM pg_catalog.citus_stats_tenants() +ORDER BY score DESC; + +ALTER VIEW citus.citus_stats_tenants SET SCHEMA pg_catalog; +GRANT SELECT ON pg_catalog.citus_stats_tenants TO PUBLIC; diff --git a/src/backend/distributed/sql/udfs/citus_stats_tenants/latest.sql b/src/backend/distributed/sql/udfs/citus_stats_tenants/latest.sql new file mode 100644 index 000000000..f476a9c28 --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_stats_tenants/latest.sql @@ -0,0 +1,27 @@ +CREATE OR REPLACE FUNCTION pg_catalog.citus_stats_tenants( + return_all_tenants BOOLEAN DEFAULT FALSE, + OUT colocation_id INT, + OUT tenant_attribute TEXT, + OUT read_count_in_this_period INT, + OUT read_count_in_last_period INT, + OUT query_count_in_this_period INT, + OUT query_count_in_last_period INT, + OUT score BIGINT) +RETURNS SETOF RECORD +LANGUAGE C +AS 'citus', $$citus_stats_tenants$$; + + +CREATE OR REPLACE VIEW citus.citus_stats_tenants AS +SELECT + colocation_id, + tenant_attribute, + read_count_in_this_period, + read_count_in_last_period, + query_count_in_this_period, + query_count_in_last_period +FROM pg_catalog.citus_stats_tenants() +ORDER BY score DESC; + +ALTER VIEW citus.citus_stats_tenants SET SCHEMA pg_catalog; +GRANT SELECT ON pg_catalog.citus_stats_tenants TO PUBLIC; diff --git a/src/backend/distributed/sql/udfs/citus_stats_tenants_storage/11.3-1.sql b/src/backend/distributed/sql/udfs/citus_stats_tenants_storage/11.3-1.sql new file mode 100644 index 000000000..c1604a7a1 --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_stats_tenants_storage/11.3-1.sql @@ -0,0 +1,28 @@ +CREATE OR REPLACE FUNCTION pg_catalog.citus_stats_tenants_storage ( + OUT colocation_id INT, + OUT tenant_attribute TEXT, + OUT storage_estimate INT +) +RETURNS SETOF record +LANGUAGE plpgsql +AS $function$ +DECLARE +tn TEXT; +dc TEXT; +ci INT; +BEGIN + FOR ci, tn, dc IN SELECT cts.colocation_id, cts.table_name, cts.distribution_column FROM citus_tables cts + LOOP + RETURN QUERY + EXECUTE 'SELECT ' || ci || '::int, ' || dc || '::text, sum(pg_column_size(' || tn || '.*))::int FROM ' || tn || ' GROUP BY ' || dc; + END LOOP; +END; +$function$; + +CREATE OR REPLACE VIEW citus.citus_stats_tenants_storage AS +SELECT colocation_id, tenant_attribute, sum(storage_estimate) total_storage FROM pg_catalog.citus_stats_tenants_storage() +GROUP BY colocation_id, tenant_attribute +ORDER BY total_storage DESC; + +ALTER VIEW citus.citus_stats_tenants_storage SET SCHEMA pg_catalog; +GRANT SELECT ON pg_catalog.citus_stats_tenants_storage TO PUBLIC; diff --git a/src/backend/distributed/sql/udfs/citus_stats_tenants_storage/latest.sql b/src/backend/distributed/sql/udfs/citus_stats_tenants_storage/latest.sql new file mode 100644 index 000000000..c1604a7a1 --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_stats_tenants_storage/latest.sql @@ -0,0 +1,28 @@ +CREATE OR REPLACE FUNCTION pg_catalog.citus_stats_tenants_storage ( + OUT colocation_id INT, + OUT tenant_attribute TEXT, + OUT storage_estimate INT +) +RETURNS SETOF record +LANGUAGE plpgsql +AS $function$ +DECLARE +tn TEXT; +dc TEXT; +ci INT; +BEGIN + FOR ci, tn, dc IN SELECT cts.colocation_id, cts.table_name, cts.distribution_column FROM citus_tables cts + LOOP + RETURN QUERY + EXECUTE 'SELECT ' || ci || '::int, ' || dc || '::text, sum(pg_column_size(' || tn || '.*))::int FROM ' || tn || ' GROUP BY ' || dc; + END LOOP; +END; +$function$; + +CREATE OR REPLACE VIEW citus.citus_stats_tenants_storage AS +SELECT colocation_id, tenant_attribute, sum(storage_estimate) total_storage FROM pg_catalog.citus_stats_tenants_storage() +GROUP BY colocation_id, tenant_attribute +ORDER BY total_storage DESC; + +ALTER VIEW citus.citus_stats_tenants_storage SET SCHEMA pg_catalog; +GRANT SELECT ON pg_catalog.citus_stats_tenants_storage TO PUBLIC; diff --git a/src/backend/distributed/utils/attribute.c b/src/backend/distributed/utils/attribute.c index b3485dde2..b5471a3d8 100644 --- a/src/backend/distributed/utils/attribute.c +++ b/src/backend/distributed/utils/attribute.c @@ -1,10 +1,22 @@ -// -// Created by Nils Dijk on 02/12/2022. -// +/*------------------------------------------------------------------------- + * + * attribute.c + * Routines related to the multi tenant monitor. + * + * Copyright (c) Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ #include "postgres.h" +#include "unistd.h" +#include "distributed/log_utils.h" +#include "distributed/listutils.h" +#include "distributed/tuplestore.h" #include "executor/execdesc.h" +#include "storage/ipc.h" +#include "storage/shmem.h" #include "utils/builtins.h" #include "distributed/utils/attribute.h" @@ -16,14 +28,125 @@ static void AttributeMetricsIfApplicable(void); ExecutorEnd_hook_type prev_ExecutorEnd = NULL; #define ATTRIBUTE_PREFIX "/* attributeTo: " +#define ATTRIBUTE_STRING_FORMAT "/* attributeTo: %s,%d */" +#define CITUS_STATS_TENANTS_COLUMNS 7 +#define ONE_QUERY_SCORE 1000000000 /* TODO maybe needs to be a stack */ -const char *attributeToTenant = NULL; +char *attributeToTenant = NULL; +CmdType attributeCommandType = CMD_UNKNOWN; +int colocationGroupId = -1; clock_t attributeToTenantStart = { 0 }; -void -AttributeQueryIfAnnotated(const char *query_string) +const char *SharedMemoryNameForMultiTenantMonitorHandleManagement = + "Shared memory handle for multi tenant monitor"; + +static shmem_startup_hook_type prev_shmem_startup_hook = NULL; + +static void UpdatePeriodsIfNecessary(MultiTenantMonitor *monitor,TenantStats *tenantStats); +static void ReduceScoreIfNecessary(MultiTenantMonitor *monitor, TenantStats *tenantStats, time_t updateTime); +static void CreateMultiTenantMonitor(void); +static dsm_handle CreateSharedMemoryForMultiTenantMonitor(void); +static void StoreMultiTenantMonitorSMHandle(dsm_handle dsmHandle); +static MultiTenantMonitor * GetMultiTenantMonitor(void); +static dsm_handle GetMultiTenantMonitorDSMHandle(void); +static void DetachSegment(void); +static void MultiTenantMonitorSMInit(void); +static dsm_handle CreateTenantStats(MultiTenantMonitor *monitor); +static dsm_handle CreateSharedMemoryForTenantStats(void); +static TenantStats * GetTenantStatsFromDSMHandle(dsm_handle dsmHandle); +static dsm_handle FindTenantStats(MultiTenantMonitor *monitor); + +int MultiTenantMonitoringLogLevel = CITUS_LOG_LEVEL_OFF; +int CitusStatsTenantsPeriod = (time_t) 60; +int CitusStatsTenantsLimit = 10; + + +PG_FUNCTION_INFO_V1(citus_stats_tenants); + + +/* + * citus_stats_tenants finds, updates and returns the statistics for tenants. + */ +Datum +citus_stats_tenants(PG_FUNCTION_ARGS) { + //CheckCitusVersion(ERROR); + + /* + * We keep more than CitusStatsTenantsLimit tenants in our monitor. + * We do this to not lose data if a tenant falls out of top CitusStatsTenantsLimit in case they need to return soon. + * Normally we return CitusStatsTenantsLimit tenants but if returnAllTenants is true we return all of them. + */ + bool returnAllTenants = PG_GETARG_BOOL(0); + + TupleDesc tupleDescriptor = NULL; + Tuplestorestate *tupleStore = SetupTuplestore(fcinfo, &tupleDescriptor); + time_t monitoringTime = time(0); + + Datum values[CITUS_STATS_TENANTS_COLUMNS]; + bool isNulls[CITUS_STATS_TENANTS_COLUMNS]; + + MultiTenantMonitor *monitor = GetMultiTenantMonitor(); + + if (monitor == NULL) + { + PG_RETURN_VOID(); + } + + monitor->periodStart = monitor->periodStart + ((monitoringTime-monitor->periodStart)/CitusStatsTenantsPeriod)*CitusStatsTenantsPeriod; + + int numberOfRowsToReturn = 0; + if (returnAllTenants) + { + numberOfRowsToReturn = monitor->tenantCount; + } + else + { + numberOfRowsToReturn = min (monitor->tenantCount, CitusStatsTenantsLimit); + } + + for (int i=0; itenants[i]); + + UpdatePeriodsIfNecessary(monitor, tenantStats); + ReduceScoreIfNecessary(monitor, tenantStats, monitoringTime); + + values[0] = Int32GetDatum(tenantStats->colocationGroupId); + values[1] = PointerGetDatum(cstring_to_text(tenantStats->tenantAttribute)); + values[2] = Int32GetDatum(tenantStats->selectsInThisPeriod); + values[3] = Int32GetDatum(tenantStats->selectsInLastPeriod); + values[4] = Int32GetDatum(tenantStats->selectsInThisPeriod + tenantStats->insertsInThisPeriod); + values[5] = Int32GetDatum(tenantStats->selectsInLastPeriod + tenantStats->insertsInLastPeriod); + values[6] = Int64GetDatum(monitor->scores[tenantStats->rank]); + + + tuplestore_putvalues(tupleStore, tupleDescriptor, values, isNulls); + } + + PG_RETURN_VOID(); +} + + +/* + * AttributeQueryIfAnnotated assigns the attributes of tenant if the query is annotated. + */ +void +AttributeQueryIfAnnotated(const char *query_string, CmdType commandType) +{ + attributeToTenant = NULL; + + attributeCommandType = commandType; + + if (query_string == NULL) + { + return; + } + if (strncmp(ATTRIBUTE_PREFIX, query_string, strlen(ATTRIBUTE_PREFIX)) == 0) { /* TODO create a function to safely parse the tenant identifier from the query comment */ @@ -39,6 +162,15 @@ AttributeQueryIfAnnotated(const char *query_string) tenantEnd++; } + tenantEnd--; + + colocationGroupId = 0; + while(*tenantEnd != ',') + { + colocationGroupId *= 10; + colocationGroupId += *tenantEnd - '0'; + tenantEnd--; + } /* hack to get a clean copy of the tenant id string */ char tenantEndTmp = *tenantEnd; @@ -46,19 +178,44 @@ AttributeQueryIfAnnotated(const char *query_string) tenantId = pstrdup(tenantId); *tenantEnd = tenantEndTmp; - ereport(NOTICE, (errmsg("attributing query to tenant: %s", quote_literal_cstr(tenantId)))); + if (MultiTenantMonitoringLogLevel != CITUS_LOG_LEVEL_OFF) + { + ereport(NOTICE, (errmsg("attributing query to tenant: %s", quote_literal_cstr(tenantId)))); + } - attributeToTenant = tenantId; + attributeToTenant=(char *)malloc(strlen(tenantId)); + strcpy(attributeToTenant, tenantId); } else { Assert(attributeToTenant == NULL); } + //DetachSegment(); + attributeToTenantStart = clock(); } +/* + * AnnotateQuery annotates the query with tenant attributes. + */ +char * +AnnotateQuery (char * queryString, char * partitionColumn, int colocationId) +{ + if (partitionColumn == NULL) + { + return queryString; + } + StringInfo newQuery = makeStringInfo(); + appendStringInfo(newQuery, ATTRIBUTE_STRING_FORMAT, partitionColumn, colocationId); + + appendStringInfoString(newQuery, queryString); + + return newQuery->data; +} + + void CitusAttributeToEnd(QueryDesc *queryDesc) { @@ -80,6 +237,9 @@ CitusAttributeToEnd(QueryDesc *queryDesc) } +/* + * AttributeMetricsIfApplicable updates the metrics for current tenant's statistics + */ static void AttributeMetricsIfApplicable() { @@ -89,10 +249,369 @@ AttributeMetricsIfApplicable() double cpu_time_used = 0; end = clock(); + time_t queryTime = time(0); cpu_time_used = ((double) (end - attributeToTenantStart)) / CLOCKS_PER_SEC; - ereport(NOTICE, (errmsg("attribute cpu counter (%f) to tenant: %s", cpu_time_used, - attributeToTenant))); + if (MultiTenantMonitoringLogLevel != CITUS_LOG_LEVEL_OFF) + { + ereport(NOTICE, (errmsg("attribute cpu counter (%f) to tenant: %s", cpu_time_used, + attributeToTenant))); + } + + if (GetMultiTenantMonitorDSMHandle() == DSM_HANDLE_INVALID) + { + CreateMultiTenantMonitor(); + } + + MultiTenantMonitor *monitor = GetMultiTenantMonitor(); + + monitor->periodStart = monitor->periodStart + ((queryTime-monitor->periodStart)/CitusStatsTenantsPeriod)*CitusStatsTenantsPeriod; + + dsm_handle tenantDSMHandle = FindTenantStats(monitor); + + if (tenantDSMHandle == DSM_HANDLE_INVALID) + { + tenantDSMHandle = CreateTenantStats(monitor); + } + TenantStats * tenantStats = GetTenantStatsFromDSMHandle(tenantDSMHandle); + strcpy(tenantStats->tenantAttribute, attributeToTenant); + tenantStats->colocationGroupId = colocationGroupId; + + UpdatePeriodsIfNecessary(monitor, tenantStats); + tenantStats->lastQueryTime = queryTime; + + ReduceScoreIfNecessary(monitor, tenantStats, queryTime); + + /* + * We do this after the reducing the scores so the scores in this period are not affected by the reduction. + */ + monitor->scores[tenantStats->rank] += ONE_QUERY_SCORE; + + + /* + * After updating the score we might need to change the rank of the tenant in the monitor + */ + while(tenantStats->rank != 0 && monitor->scores[tenantStats->rank-1] < monitor->scores[tenantStats->rank]) + { + // we need to reduce previous tenants score too !!!!!!!! + TenantStats *previousTenantStats = GetTenantStatsFromDSMHandle(monitor->tenants[tenantStats->rank-1]); + + dsm_handle tempTenant = monitor->tenants[tenantStats->rank]; + monitor->tenants[tenantStats->rank] = monitor->tenants[previousTenantStats->rank]; + monitor->tenants[previousTenantStats->rank] = tempTenant; + + long long tempScore = monitor->scores[tenantStats->rank]; + monitor->scores[tenantStats->rank] = monitor->scores[previousTenantStats->rank]; + monitor->scores[previousTenantStats->rank] = tempScore; + + previousTenantStats->rank++; + tenantStats->rank--; + } + + /* + * We keep up to CitusStatsTenantsLimit * 3 tenants instead of CitusStatsTenantsLimit, + * so we don't lose data immediately after a tenant is out of top CitusStatsTenantsLimit + * + * Every time tenant count hits CitusStatsTenantsLimit * 3, we reduce it back to CitusStatsTenantsLimit * 2. + */ + if (monitor->tenantCount >= CitusStatsTenantsLimit * 3) + { + monitor->tenantCount = CitusStatsTenantsLimit * 2; + } + + if (attributeCommandType == CMD_SELECT) + { + tenantStats->selectCount++; + tenantStats->selectsInThisPeriod++; + tenantStats->totalSelectTime+=cpu_time_used; + } + else if (attributeCommandType == CMD_INSERT) + { + tenantStats->insertCount++; + tenantStats->insertsInThisPeriod++; + tenantStats->totalInsertTime+=cpu_time_used; + } + + if (MultiTenantMonitoringLogLevel != CITUS_LOG_LEVEL_OFF) + { + ereport(NOTICE, (errmsg("total select count = %d, total CPU time = %f to tenant: %s", tenantStats->selectCount, tenantStats->totalSelectTime, + tenantStats->tenantAttribute))); + } } attributeToTenant = NULL; } + + +/* + * UpdatePeriodsIfNecessary moves the query counts to previous periods if a enough time has passed. + * + * If 1 period has passed after the latest query, this function moves this period's counts to the last period + * and cleans this period's statistics. + * + * If 2 or more periods has passed after the last query, this function cleans all both this and last period's + * statistics. + */ +static void +UpdatePeriodsIfNecessary(MultiTenantMonitor *monitor,TenantStats *tenantStats) +{ + /* + * If the last query in this tenant was before the start of current period + * but there are some query count for this period we move them to the last period. + */ + if (tenantStats->lastQueryTime < monitor->periodStart && (tenantStats->insertsInThisPeriod || tenantStats->selectsInThisPeriod)) + { + tenantStats->insertsInLastPeriod = tenantStats->insertsInThisPeriod; + tenantStats->insertsInThisPeriod = 0; + + tenantStats->selectsInLastPeriod = tenantStats->selectsInThisPeriod; + tenantStats->selectsInThisPeriod = 0; + } + /* + * If the last query is more than two periods ago, we clean the last period counts too. + */ + if (tenantStats->lastQueryTime < monitor->periodStart - CitusStatsTenantsPeriod) + { + tenantStats->insertsInLastPeriod = 0; + + tenantStats->selectsInLastPeriod = 0; + } +} + + +/* + * ReduceScoreIfNecessary reduces the tenant score only if it is necessary. + * + * We halve the tenants' scores after each period. This function checks the number of + * periods that passed after the lsat score reduction and reduces the score accordingly. + */ +static void +ReduceScoreIfNecessary(MultiTenantMonitor *monitor, TenantStats *tenantStats, time_t updateTime) +{ + /* + * With each query we increase the score of tenant by ONE_QUERY_SCORE. + * After one period we halve the scores. + * + * Here we calculate how many periods passed after the last time we did score reduction + * If the latest score reduction was in this period this number should be 0, + * if it was in the last period this number should be 1 and so on. + */ + int periodCountAfterLastScoreReduction = (monitor->periodStart - tenantStats->lastScoreReduction + CitusStatsTenantsPeriod -1) / CitusStatsTenantsPeriod; + + /* + * This should not happen but let's make sure + */ + if (periodCountAfterLastScoreReduction < 0) + { + periodCountAfterLastScoreReduction = 0; + } + + /* + * If the last score reduction was not in this period we do score reduction now. + */ + if (periodCountAfterLastScoreReduction > 0) + { + monitor->scores[tenantStats->rank] >>= periodCountAfterLastScoreReduction; + tenantStats->lastScoreReduction = updateTime; + } +} + + +/* + * CreateMultiTenantMonitor creates the data structure for multi tenant monitor. + */ +static void +CreateMultiTenantMonitor() +{ + dsm_handle dsmHandle = CreateSharedMemoryForMultiTenantMonitor(); + StoreMultiTenantMonitorSMHandle(dsmHandle); + MultiTenantMonitor * monitor = GetMultiTenantMonitor(); + monitor->tenantCount = 0; + monitor->periodStart = time(0); +} + + +/* + * CreateSharedMemoryForMultiTenantMonitor creates a dynamic shared memory segment for multi tenant monitor. + */ +static dsm_handle +CreateSharedMemoryForMultiTenantMonitor() +{ + struct dsm_segment *dsmSegment = dsm_create(sizeof(MultiTenantMonitor), DSM_CREATE_NULL_IF_MAXSEGMENTS); + dsm_pin_segment(dsmSegment); + dsm_pin_mapping(dsmSegment); // don't know why we do both !!!!!!!!!!!!!!!!! + return dsm_segment_handle(dsmSegment); +} + +/* + * StoreMultiTenantMonitorSMHandle stores the dsm (dynamic shared memory) handle for multi tenant monitor + * in a non-dynamic shared memory location, so we don't lose it. + */ +static void +StoreMultiTenantMonitorSMHandle(dsm_handle dsmHandle) +{ + bool found = false; + MultiTenantMonitorSMData *smData = ShmemInitStruct(SharedMemoryNameForMultiTenantMonitorHandleManagement, + sizeof(MultiTenantMonitorSMData), + &found); + + smData->dsmHandle = dsmHandle; +} + + +/* + * GetMultiTenantMonitor returns the data structure for multi tenant monitor. + */ +static MultiTenantMonitor * +GetMultiTenantMonitor() +{ + dsm_handle dsmHandle = GetMultiTenantMonitorDSMHandle(); + if (dsmHandle == DSM_HANDLE_INVALID) + { + return NULL; + } + dsm_segment *dsmSegment = dsm_find_mapping(dsmHandle); + if (dsmSegment == NULL) + { + dsmSegment = dsm_attach(dsmHandle); + } + MultiTenantMonitor *monitor = (MultiTenantMonitor *) dsm_segment_address(dsmSegment); + dsm_pin_mapping(dsmSegment); + return monitor; +} + +/* + * GetMultiTenantMonitorDSMHandle fetches the dsm (dynamic shared memory) handle for multi tenant monitor. + */ +static dsm_handle +GetMultiTenantMonitorDSMHandle() +{ + bool found = false; + MultiTenantMonitorSMData *smData = ShmemInitStruct(SharedMemoryNameForMultiTenantMonitorHandleManagement, + sizeof(MultiTenantMonitorSMData), + &found); + + if (!found) + { + elog(WARNING, "dsm handle not found"); + return DSM_HANDLE_INVALID; + } + + dsm_handle dsmHandle = smData->dsmHandle; + + return dsmHandle; +} + + +static void +DetachSegment() +{ + dsm_handle dsmHandle = GetMultiTenantMonitorDSMHandle(); + dsm_segment *dsmSegment = dsm_find_mapping(dsmHandle); + if (dsmSegment != NULL) + { + dsm_detach(dsmSegment); + } +} + + +/* + * InitializeMultiTenantMonitorSMHandleManagement sets up the shared memory startup hook + * so that the multi tenant monitor can be initialized and stored in shared memory. + */ +void +InitializeMultiTenantMonitorSMHandleManagement() +{ + prev_shmem_startup_hook = shmem_startup_hook; + shmem_startup_hook = MultiTenantMonitorSMInit; +} + + +/* + * MultiTenantMonitorSMInit initializes the shared memory for MultiTenantMonitorSMData. + * + * MultiTenantMonitorSMData only holds the dsm (dynamic shared memory) handle for the actual + * multi tenant monitor. + */ +static void +MultiTenantMonitorSMInit() +{ + bool alreadyInitialized = false; + MultiTenantMonitorSMData *smData = ShmemInitStruct(SharedMemoryNameForMultiTenantMonitorHandleManagement, + sizeof(MultiTenantMonitorSMData), + &alreadyInitialized); + if (!alreadyInitialized) + { + smData->dsmHandle = DSM_HANDLE_INVALID; + } + + if (prev_shmem_startup_hook != NULL) + { + prev_shmem_startup_hook(); + } +} + + +/* + * CreateTenantStats creates the data structure for a tenant's statistics. + */ +static dsm_handle +CreateTenantStats(MultiTenantMonitor *monitor) +{ + dsm_handle dsmHandle = CreateSharedMemoryForTenantStats(); + monitor->tenants[monitor->tenantCount] = dsmHandle; + TenantStats *tenantStats = GetTenantStatsFromDSMHandle(dsmHandle); + tenantStats->rank = monitor->tenantCount; + monitor->tenantCount++; + return dsmHandle; +} + + +/* + * CreateSharedMemoryForTenantStats creates a dynamic shared memory segment for a tenant's statistics. + */ +static dsm_handle +CreateSharedMemoryForTenantStats() +{ + struct dsm_segment *dsmSegment = dsm_create(sizeof(TenantStats), DSM_CREATE_NULL_IF_MAXSEGMENTS); + dsm_pin_segment(dsmSegment); + dsm_pin_mapping(dsmSegment); // don't know why we do both !!!!!!!!!!!!!!!!! + return dsm_segment_handle(dsmSegment); +} + + +/* + * GetTenantStatsFromDSMHandle returns the data structure for a tenant's statistics with the dsm (dynamic shared memory) handle. + */ +static TenantStats * +GetTenantStatsFromDSMHandle(dsm_handle dsmHandle) +{ + dsm_segment *dsmSegment = dsm_find_mapping(dsmHandle); + if (dsmSegment == NULL) + { + dsmSegment = dsm_attach(dsmHandle); + } + TenantStats *stats = (TenantStats *) dsm_segment_address(dsmSegment); + dsm_pin_mapping(dsmSegment); + + return stats; +} + + +/* + * FindTenantStats finds the dsm (dynamic shared memory) handle for the current tenant's statistics. + */ +static dsm_handle +FindTenantStats(MultiTenantMonitor *monitor) +{ + for(int i=0; itenantCount; i++) + { + TenantStats * tenantStats = GetTenantStatsFromDSMHandle(monitor->tenants[i]); + if (strcmp(tenantStats->tenantAttribute, attributeToTenant) == 0 && tenantStats->colocationGroupId == colocationGroupId) + { + return monitor->tenants[i]; + } + } + + return DSM_HANDLE_INVALID; +} + diff --git a/src/include/distributed/citus_custom_scan.h b/src/include/distributed/citus_custom_scan.h index 92301fceb..f31138ac2 100644 --- a/src/include/distributed/citus_custom_scan.h +++ b/src/include/distributed/citus_custom_scan.h @@ -46,4 +46,6 @@ extern CustomScan * FetchCitusCustomScanIfExists(Plan *plan); extern bool IsCitusPlan(Plan *plan); extern bool IsCitusCustomScan(Plan *plan); +extern void SetJobColocationId(Job *job); + #endif /* CITUS_CUSTOM_SCAN_H */ diff --git a/src/include/distributed/multi_physical_planner.h b/src/include/distributed/multi_physical_planner.h index 920541e97..46a946027 100644 --- a/src/include/distributed/multi_physical_planner.h +++ b/src/include/distributed/multi_physical_planner.h @@ -330,6 +330,9 @@ typedef struct Task * Vacuum, create/drop/reindex concurrently cannot be executed in a transaction. */ bool cannotBeExecutedInTransction; + + char * partitionColumn; + int colocationId; } Task; diff --git a/src/include/distributed/utils/attribute.h b/src/include/distributed/utils/attribute.h index 366e72a4b..6c7db6da8 100644 --- a/src/include/distributed/utils/attribute.h +++ b/src/include/distributed/utils/attribute.h @@ -8,9 +8,51 @@ #include "executor/execdesc.h" #include "executor/executor.h" +typedef struct MultiTenantMonitor +{ + int tenantCount; + dsm_handle tenants[300]; + long long scores[300]; + + time_t periodStart; +} MultiTenantMonitor; + +typedef struct TenantStats +{ + char tenantAttribute[100]; + + int colocationGroupId; + + int selectCount; + double totalSelectTime; + int selectsInLastPeriod; + int selectsInThisPeriod; + + int insertCount; + double totalInsertTime; + int insertsInLastPeriod; + int insertsInThisPeriod; + + time_t lastQueryTime; + + time_t lastScoreReduction; + int rank; +} TenantStats; + +typedef struct MultiTenantMonitorSMData +{ + dsm_handle dsmHandle; +} MultiTenantMonitorSMData; + extern void CitusAttributeToEnd(QueryDesc *queryDesc); -extern void AttributeQueryIfAnnotated(const char *queryString); +extern void AttributeQueryIfAnnotated(const char *queryString, CmdType commandType); +extern char * AnnotateQuery(char *queryString, char * partitionColumn, int colocationId); +extern void InitializeMultiTenantMonitorSMHandleManagement(void); extern ExecutorEnd_hook_type prev_ExecutorEnd; +extern int MultiTenantMonitoringLogLevel; +extern int CitusStatsTenantsPeriod; +extern int CitusStatsTenantsLimit; + #endif //CITUS_ATTRIBUTE_H