From 01eb68a7c93a0b54ada0fd94665b34cc2568a195 Mon Sep 17 00:00:00 2001 From: Halil Ozan Akgul Date: Mon, 6 Mar 2023 17:24:06 +0300 Subject: [PATCH] indent --- .../distributed/planner/deparse_shard_query.c | 6 +- .../planner/multi_router_planner.c | 12 +- src/backend/distributed/shared_library_init.c | 2 +- src/backend/distributed/utils/attribute.c | 116 ++++++++++-------- .../distributed/multi_physical_planner.h | 2 +- src/include/distributed/utils/attribute.h | 4 +- 6 files changed, 84 insertions(+), 58 deletions(-) diff --git a/src/backend/distributed/planner/deparse_shard_query.c b/src/backend/distributed/planner/deparse_shard_query.c index 22cf4415c..136268ba0 100644 --- a/src/backend/distributed/planner/deparse_shard_query.c +++ b/src/backend/distributed/planner/deparse_shard_query.c @@ -149,7 +149,8 @@ RebuildQueryStrings(Job *workerJob) { partitionColumnValue = workerJob->partitionKeyValue->constvalue; partitionColumnType = workerJob->partitionKeyValue->consttype; - partitionColumnString = DatumToString(partitionColumnValue, partitionColumnType); + partitionColumnString = DatumToString(partitionColumnValue, + partitionColumnType); } task->partitionColumn = partitionColumnString; @@ -402,7 +403,8 @@ SetTaskQueryIfShouldLazyDeparse(Task *task, Query *query) return; } - SetTaskQueryString(task, AnnotateQuery(DeparseTaskQuery(task, query), task->partitionColumn, task->colocationId)); + SetTaskQueryString(task, AnnotateQuery(DeparseTaskQuery(task, query), + task->partitionColumn, task->colocationId)); } diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index cf87760d2..7e596bfe5 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -170,7 +170,8 @@ static int CompareInsertValuesByShardId(const void *leftElement, static List * SingleShardTaskList(Query *query, uint64 jobId, List *relationShardList, List *placementList, uint64 shardId, bool parametersInQueryResolved, - bool isLocalTableModification, char * partitionColumn, int colocationId); + bool isLocalTableModification, char *partitionColumn, + int colocationId); static bool RowLocksOnRelations(Node *node, List **rtiLockList); static void ReorderTaskPlacementsByTaskAssignmentPolicy(Job *job, TaskAssignmentPolicyType @@ -1961,7 +1962,8 @@ GenerateSingleShardRouterTaskList(Job *job, List *relationShardList, { partitionColumnValue = job->partitionKeyValue->constvalue; partitionColumnType = job->partitionKeyValue->consttype; - partitionColumnString = DatumToString(partitionColumnValue, partitionColumnType); + partitionColumnString = DatumToString(partitionColumnValue, + partitionColumnType); } SetJobColocationId(job); @@ -1970,7 +1972,8 @@ GenerateSingleShardRouterTaskList(Job *job, List *relationShardList, relationShardList, placementList, shardId, job->parametersInJobQueryResolved, - isLocalTableModification, partitionColumnString, job->colocationId); + isLocalTableModification, + partitionColumnString, job->colocationId); /* * Queries to reference tables, or distributed tables with multiple replica's have @@ -2092,7 +2095,8 @@ static List * SingleShardTaskList(Query *query, uint64 jobId, List *relationShardList, List *placementList, uint64 shardId, bool parametersInQueryResolved, - bool isLocalTableModification, char * partitionColumn, int colocationId) + bool isLocalTableModification, char *partitionColumn, + int colocationId) { TaskType taskType = READ_TASK; char replicationModel = 0; diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index 82fa9a6a3..6ceae8b52 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -1903,7 +1903,7 @@ RegisterCitusConfigVariables(void) PGC_USERSET, GUC_STANDARD, NULL, NULL, NULL); - + DefineCustomEnumVariable( "citus.multi_tenant_monitoring_log_level", gettext_noop("Sets the level of multi tenant monitoring log messages"), diff --git a/src/backend/distributed/utils/attribute.c b/src/backend/distributed/utils/attribute.c index 1c8ef03e0..205696628 100644 --- a/src/backend/distributed/utils/attribute.c +++ b/src/backend/distributed/utils/attribute.c @@ -44,8 +44,10 @@ const char *SharedMemoryNameForMultiTenantMonitor = static shmem_startup_hook_type prev_shmem_startup_hook = NULL; -static void UpdatePeriodsIfNecessary(MultiTenantMonitor *monitor,TenantStats *tenantStats); -static void ReduceScoreIfNecessary(MultiTenantMonitor *monitor, TenantStats *tenantStats, time_t updateTime); +static void UpdatePeriodsIfNecessary(MultiTenantMonitor *monitor, + TenantStats *tenantStats); +static void ReduceScoreIfNecessary(MultiTenantMonitor *monitor, TenantStats *tenantStats, + time_t updateTime); static void CreateMultiTenantMonitor(void); static MultiTenantMonitor * CreateSharedMemoryForMultiTenantMonitor(void); static MultiTenantMonitor * GetMultiTenantMonitor(void); @@ -68,7 +70,7 @@ PG_FUNCTION_INFO_V1(citus_stats_tenants); Datum citus_stats_tenants(PG_FUNCTION_ARGS) { - //CheckCitusVersion(ERROR); + /*CheckCitusVersion(ERROR); */ /* * We keep more than CitusStatsTenantsLimit tenants in our monitor. @@ -93,7 +95,9 @@ citus_stats_tenants(PG_FUNCTION_ARGS) LWLockAcquire(&monitor->lock, LW_EXCLUSIVE); - monitor->periodStart = monitor->periodStart + ((monitoringTime-monitor->periodStart)/CitusStatsTenantsPeriod)*CitusStatsTenantsPeriod; + monitor->periodStart = monitor->periodStart + + ((monitoringTime - monitor->periodStart) / CitusStatsTenantsPeriod) * + CitusStatsTenantsPeriod; int numberOfRowsToReturn = 0; if (returnAllTenants) @@ -102,10 +106,10 @@ citus_stats_tenants(PG_FUNCTION_ARGS) } else { - numberOfRowsToReturn = Min (monitor->tenantCount, CitusStatsTenantsLimit); + numberOfRowsToReturn = Min(monitor->tenantCount, CitusStatsTenantsLimit); } - for (int i=0; itenantAttribute)); values[2] = Int32GetDatum(tenantStats->selectsInThisPeriod); values[3] = Int32GetDatum(tenantStats->selectsInLastPeriod); - values[4] = Int32GetDatum(tenantStats->selectsInThisPeriod + tenantStats->insertsInThisPeriod); - values[5] = Int32GetDatum(tenantStats->selectsInLastPeriod + tenantStats->insertsInLastPeriod); + values[4] = Int32GetDatum(tenantStats->selectsInThisPeriod + + tenantStats->insertsInThisPeriod); + values[5] = Int32GetDatum(tenantStats->selectsInLastPeriod + + tenantStats->insertsInLastPeriod); values[6] = Int64GetDatum(tenantStats->score); tuplestore_putvalues(tupleStore, tupleDescriptor, values, isNulls); @@ -151,7 +157,7 @@ AttributeQueryIfAnnotated(const char *query_string, CmdType commandType) { /* TODO create a function to safely parse the tenant identifier from the query comment */ /* query is attributed to a tenant */ - char *tenantId = (char*)query_string + strlen(ATTRIBUTE_PREFIX); + char *tenantId = (char *) query_string + strlen(ATTRIBUTE_PREFIX); char *tenantEnd = tenantId; while (true && tenantEnd[0] != '\0') { @@ -165,7 +171,7 @@ AttributeQueryIfAnnotated(const char *query_string, CmdType commandType) tenantEnd--; colocationGroupId = 0; - while(*tenantEnd != ',') + while (*tenantEnd != ',') { colocationGroupId *= 10; colocationGroupId += *tenantEnd - '0'; @@ -180,10 +186,11 @@ AttributeQueryIfAnnotated(const char *query_string, CmdType commandType) if (MultiTenantMonitoringLogLevel != CITUS_LOG_LEVEL_OFF) { - ereport(NOTICE, (errmsg("attributing query to tenant: %s", quote_literal_cstr(tenantId)))); + ereport(NOTICE, (errmsg("attributing query to tenant: %s", + quote_literal_cstr(tenantId)))); } - attributeToTenant=(char *)malloc(strlen(tenantId)); + attributeToTenant = (char *) malloc(strlen(tenantId)); strcpy(attributeToTenant, tenantId); } else @@ -191,7 +198,7 @@ AttributeQueryIfAnnotated(const char *query_string, CmdType commandType) Assert(attributeToTenant == NULL); } - //DetachSegment(); + /*DetachSegment(); */ attributeToTenantStart = clock(); } @@ -201,7 +208,7 @@ AttributeQueryIfAnnotated(const char *query_string, CmdType commandType) * AnnotateQuery annotates the query with tenant attributes. */ char * -AnnotateQuery (char * queryString, char * partitionColumn, int colocationId) +AnnotateQuery(char *queryString, char *partitionColumn, int colocationId) { if (partitionColumn == NULL) { @@ -254,15 +261,17 @@ AttributeMetricsIfApplicable() if (MultiTenantMonitoringLogLevel != CITUS_LOG_LEVEL_OFF) { - ereport(NOTICE, (errmsg("attribute cpu counter (%f) to tenant: %s", cpu_time_used, - attributeToTenant))); + ereport(NOTICE, (errmsg("attribute cpu counter (%f) to tenant: %s", + cpu_time_used, attributeToTenant))); } MultiTenantMonitor *monitor = GetMultiTenantMonitor(); LWLockAcquire(&monitor->lock, LW_SHARED); - - monitor->periodStart = monitor->periodStart + ((queryTime-monitor->periodStart)/CitusStatsTenantsPeriod)*CitusStatsTenantsPeriod; + + monitor->periodStart = monitor->periodStart + + ((queryTime - monitor->periodStart) / CitusStatsTenantsPeriod) * + CitusStatsTenantsPeriod; int tenantIndex = FindTenantStats(monitor); @@ -270,7 +279,7 @@ AttributeMetricsIfApplicable() { tenantIndex = CreateTenantStats(monitor); } - TenantStats * tenantStats = &monitor->tenants[tenantIndex]; + TenantStats *tenantStats = &monitor->tenants[tenantIndex]; LWLockAcquire(&tenantStats->lock, LW_EXCLUSIVE); @@ -288,17 +297,18 @@ AttributeMetricsIfApplicable() /* * After updating the score we might need to change the rank of the tenant in the monitor */ - while(tenantIndex != 0 && monitor->tenants[tenantIndex-1].score < tenantStats->score) + while (tenantIndex != 0 && + monitor->tenants[tenantIndex - 1].score < tenantStats->score) { - LWLockAcquire(&monitor->tenants[tenantIndex-1].lock, LW_EXCLUSIVE); + LWLockAcquire(&monitor->tenants[tenantIndex - 1].lock, LW_EXCLUSIVE); + + ReduceScoreIfNecessary(monitor, monitor->tenants[tenantIndex - 1], queryTime); - ReduceScoreIfNecessary(monitor, monitor->tenants[tenantIndex-1], queryTime); - TenantStats tempTenant = monitor->tenants[tenantIndex]; monitor->tenants[tenantIndex] = monitor->tenants[tenantIndex - 1]; monitor->tenants[tenantIndex - 1] = tempTenant; - LWLockRelease(&monitor->tenants[tenantIndex-1].lock); + LWLockRelease(&monitor->tenants[tenantIndex - 1].lock); tenantIndex--; } @@ -307,13 +317,13 @@ AttributeMetricsIfApplicable() { tenantStats->selectCount++; tenantStats->selectsInThisPeriod++; - tenantStats->totalSelectTime+=cpu_time_used; + tenantStats->totalSelectTime += cpu_time_used; } else if (attributeCommandType == CMD_INSERT) { tenantStats->insertCount++; tenantStats->insertsInThisPeriod++; - tenantStats->totalInsertTime+=cpu_time_used; + tenantStats->totalInsertTime += cpu_time_used; } LWLockRelease(&tenantStats->lock); @@ -322,7 +332,7 @@ AttributeMetricsIfApplicable() /* * We keep up to CitusStatsTenantsLimit * 3 tenants instead of CitusStatsTenantsLimit, * so we don't lose data immediately after a tenant is out of top CitusStatsTenantsLimit - * + * * Every time tenant count hits CitusStatsTenantsLimit * 3, we reduce it back to CitusStatsTenantsLimit * 2. */ if (monitor->tenantCount >= CitusStatsTenantsLimit * 3) @@ -334,8 +344,9 @@ AttributeMetricsIfApplicable() if (MultiTenantMonitoringLogLevel != CITUS_LOG_LEVEL_OFF) { - ereport(NOTICE, (errmsg("total select count = %d, total CPU time = %f to tenant: %s", tenantStats->selectCount, tenantStats->totalSelectTime, - tenantStats->tenantAttribute))); + ereport(NOTICE, (errmsg("total select count = %d, total CPU time = %f to tenant: %s", + tenantStats->selectCount, tenantStats->totalSelectTime, + tenantStats->tenantAttribute))); } } attributeToTenant = NULL; @@ -344,35 +355,37 @@ AttributeMetricsIfApplicable() /* * UpdatePeriodsIfNecessary moves the query counts to previous periods if a enough time has passed. - * + * * If 1 period has passed after the latest query, this function moves this period's counts to the last period * and cleans this period's statistics. - * + * * If 2 or more periods has passed after the last query, this function cleans all both this and last period's * statistics. */ static void -UpdatePeriodsIfNecessary(MultiTenantMonitor *monitor,TenantStats *tenantStats) +UpdatePeriodsIfNecessary(MultiTenantMonitor *monitor, TenantStats *tenantStats) { /* * If the last query in this tenant was before the start of current period * but there are some query count for this period we move them to the last period. */ - if (tenantStats->lastQueryTime < monitor->periodStart && (tenantStats->insertsInThisPeriod || tenantStats->selectsInThisPeriod)) + if (tenantStats->lastQueryTime < monitor->periodStart && + (tenantStats->insertsInThisPeriod || tenantStats->selectsInThisPeriod)) { tenantStats->insertsInLastPeriod = tenantStats->insertsInThisPeriod; tenantStats->insertsInThisPeriod = 0; - + tenantStats->selectsInLastPeriod = tenantStats->selectsInThisPeriod; tenantStats->selectsInThisPeriod = 0; } + /* * If the last query is more than two periods ago, we clean the last period counts too. */ if (tenantStats->lastQueryTime < monitor->periodStart - CitusStatsTenantsPeriod) { tenantStats->insertsInLastPeriod = 0; - + tenantStats->selectsInLastPeriod = 0; } } @@ -381,21 +394,25 @@ UpdatePeriodsIfNecessary(MultiTenantMonitor *monitor,TenantStats *tenantStats) /* * ReduceScoreIfNecessary reduces the tenant score only if it is necessary. * - * We halve the tenants' scores after each period. This function checks the number of + * We halve the tenants' scores after each period. This function checks the number of * periods that passed after the lsat score reduction and reduces the score accordingly. */ static void -ReduceScoreIfNecessary(MultiTenantMonitor *monitor, TenantStats *tenantStats, time_t updateTime) +ReduceScoreIfNecessary(MultiTenantMonitor *monitor, TenantStats *tenantStats, + time_t updateTime) { /* * With each query we increase the score of tenant by ONE_QUERY_SCORE. * After one period we halve the scores. - * + * * Here we calculate how many periods passed after the last time we did score reduction * If the latest score reduction was in this period this number should be 0, * if it was in the last period this number should be 1 and so on. */ - int periodCountAfterLastScoreReduction = (monitor->periodStart - tenantStats->lastScoreReduction + CitusStatsTenantsPeriod -1) / CitusStatsTenantsPeriod; + int periodCountAfterLastScoreReduction = (monitor->periodStart - + tenantStats->lastScoreReduction + + CitusStatsTenantsPeriod - 1) / + CitusStatsTenantsPeriod; /* * This should not happen but let's make sure @@ -422,7 +439,7 @@ ReduceScoreIfNecessary(MultiTenantMonitor *monitor, TenantStats *tenantStats, ti static void CreateMultiTenantMonitor() { - MultiTenantMonitor * monitor = CreateSharedMemoryForMultiTenantMonitor(); + MultiTenantMonitor *monitor = CreateSharedMemoryForMultiTenantMonitor(); monitor->tenantCount = 0; monitor->periodStart = time(0); } @@ -431,7 +448,7 @@ CreateMultiTenantMonitor() /* * CreateSharedMemoryForMultiTenantMonitor creates a dynamic shared memory segment for multi tenant monitor. */ -static MultiTenantMonitor* +static MultiTenantMonitor * CreateSharedMemoryForMultiTenantMonitor() { bool found = false; @@ -443,7 +460,7 @@ CreateSharedMemoryForMultiTenantMonitor() return monitor; } - char * trancheName = "Multi Tenant Monitor Tranche"; + char *trancheName = "Multi Tenant Monitor Tranche"; monitor->namedLockTranche.trancheId = LWLockNewTrancheId(); @@ -516,12 +533,14 @@ CreateTenantStats(MultiTenantMonitor *monitor) strcpy(monitor->tenants[tenantIndex].tenantAttribute, attributeToTenant); monitor->tenants[tenantIndex].colocationGroupId = colocationGroupId; - char * trancheName = "Tenant Tranche"; + char *trancheName = "Tenant Tranche"; monitor->tenants[tenantIndex].namedLockTranche.trancheId = LWLockNewTrancheId(); - LWLockRegisterTranche(monitor->tenants[tenantIndex].namedLockTranche.trancheId, trancheName); - LWLockInitialize(&monitor->tenants[tenantIndex].lock, monitor->tenants[tenantIndex].namedLockTranche.trancheId); + LWLockRegisterTranche(monitor->tenants[tenantIndex].namedLockTranche.trancheId, + trancheName); + LWLockInitialize(&monitor->tenants[tenantIndex].lock, + monitor->tenants[tenantIndex].namedLockTranche.trancheId); monitor->tenantCount++; @@ -535,10 +554,11 @@ CreateTenantStats(MultiTenantMonitor *monitor) static int FindTenantStats(MultiTenantMonitor *monitor) { - for(int i=0; itenantCount; i++) + for (int i = 0; i < monitor->tenantCount; i++) { - TenantStats * tenantStats = &monitor->tenants[i]; - if (strcmp(tenantStats->tenantAttribute, attributeToTenant) == 0 && tenantStats->colocationGroupId == colocationGroupId) + TenantStats *tenantStats = &monitor->tenants[i]; + if (strcmp(tenantStats->tenantAttribute, attributeToTenant) == 0 && + tenantStats->colocationGroupId == colocationGroupId) { return i; } diff --git a/src/include/distributed/multi_physical_planner.h b/src/include/distributed/multi_physical_planner.h index 46a946027..52a42ee26 100644 --- a/src/include/distributed/multi_physical_planner.h +++ b/src/include/distributed/multi_physical_planner.h @@ -331,7 +331,7 @@ typedef struct Task */ bool cannotBeExecutedInTransction; - char * partitionColumn; + char *partitionColumn; int colocationId; } Task; diff --git a/src/include/distributed/utils/attribute.h b/src/include/distributed/utils/attribute.h index 77f83bee7..7f7e14d50 100644 --- a/src/include/distributed/utils/attribute.h +++ b/src/include/distributed/utils/attribute.h @@ -55,7 +55,7 @@ typedef struct MultiTenantMonitor extern void CitusAttributeToEnd(QueryDesc *queryDesc); extern void AttributeQueryIfAnnotated(const char *queryString, CmdType commandType); -extern char * AnnotateQuery(char *queryString, char * partitionColumn, int colocationId); +extern char * AnnotateQuery(char *queryString, char *partitionColumn, int colocationId); extern void InitializeMultiTenantMonitorSMHandleManagement(void); extern ExecutorEnd_hook_type prev_ExecutorEnd; @@ -64,4 +64,4 @@ extern int MultiTenantMonitoringLogLevel; extern int CitusStatsTenantsPeriod; extern int CitusStatsTenantsLimit; -#endif //CITUS_ATTRIBUTE_H +#endif /*CITUS_ATTRIBUTE_H */