diff --git a/src/backend/distributed/utils/attribute.c b/src/backend/distributed/utils/attribute.c index b5471a3d8..119ebe65e 100644 --- a/src/backend/distributed/utils/attribute.c +++ b/src/backend/distributed/utils/attribute.c @@ -16,6 +16,7 @@ #include "distributed/tuplestore.h" #include "executor/execdesc.h" #include "storage/ipc.h" +#include "storage/lwlock.h" #include "storage/shmem.h" #include "utils/builtins.h" @@ -38,24 +39,20 @@ CmdType attributeCommandType = CMD_UNKNOWN; int colocationGroupId = -1; clock_t attributeToTenantStart = { 0 }; -const char *SharedMemoryNameForMultiTenantMonitorHandleManagement = - "Shared memory handle for multi tenant monitor"; +const char *SharedMemoryNameForMultiTenantMonitor = + "Shared memory for multi tenant monitor"; static shmem_startup_hook_type prev_shmem_startup_hook = NULL; static void UpdatePeriodsIfNecessary(MultiTenantMonitor *monitor,TenantStats *tenantStats); static void ReduceScoreIfNecessary(MultiTenantMonitor *monitor, TenantStats *tenantStats, time_t updateTime); static void CreateMultiTenantMonitor(void); -static dsm_handle CreateSharedMemoryForMultiTenantMonitor(void); -static void StoreMultiTenantMonitorSMHandle(dsm_handle dsmHandle); +static MultiTenantMonitor * CreateSharedMemoryForMultiTenantMonitor(void); static MultiTenantMonitor * GetMultiTenantMonitor(void); -static dsm_handle GetMultiTenantMonitorDSMHandle(void); -static void DetachSegment(void); static void MultiTenantMonitorSMInit(void); -static dsm_handle CreateTenantStats(MultiTenantMonitor *monitor); -static dsm_handle CreateSharedMemoryForTenantStats(void); -static TenantStats * GetTenantStatsFromDSMHandle(dsm_handle dsmHandle); -static dsm_handle FindTenantStats(MultiTenantMonitor *monitor); +static int CreateTenantStats(MultiTenantMonitor *monitor); +static int FindTenantStats(MultiTenantMonitor *monitor); +static size_t MultiTenantMonitorshmemSize(void); int MultiTenantMonitoringLogLevel = CITUS_LOG_LEVEL_OFF; int CitusStatsTenantsPeriod = (time_t) 60; @@ -93,7 +90,9 @@ citus_stats_tenants(PG_FUNCTION_ARGS) { PG_RETURN_VOID(); } - + + LWLockAcquire(&monitor->lock, LW_EXCLUSIVE); + monitor->periodStart = monitor->periodStart + ((monitoringTime-monitor->periodStart)/CitusStatsTenantsPeriod)*CitusStatsTenantsPeriod; int numberOfRowsToReturn = 0; @@ -103,7 +102,7 @@ citus_stats_tenants(PG_FUNCTION_ARGS) } else { - numberOfRowsToReturn = min (monitor->tenantCount, CitusStatsTenantsLimit); + numberOfRowsToReturn = Min (monitor->tenantCount, CitusStatsTenantsLimit); } for (int i=0; itenants[i]); + TenantStats *tenantStats = &monitor->tenants[i]; UpdatePeriodsIfNecessary(monitor, tenantStats); ReduceScoreIfNecessary(monitor, tenantStats, monitoringTime); @@ -122,12 +121,13 @@ citus_stats_tenants(PG_FUNCTION_ARGS) values[3] = Int32GetDatum(tenantStats->selectsInLastPeriod); values[4] = Int32GetDatum(tenantStats->selectsInThisPeriod + tenantStats->insertsInThisPeriod); values[5] = Int32GetDatum(tenantStats->selectsInLastPeriod + tenantStats->insertsInLastPeriod); - values[6] = Int64GetDatum(monitor->scores[tenantStats->rank]); - + values[6] = Int64GetDatum(tenantStats->score); tuplestore_putvalues(tupleStore, tupleDescriptor, values, isNulls); } + LWLockRelease(&monitor->lock); + PG_RETURN_VOID(); } @@ -258,24 +258,21 @@ AttributeMetricsIfApplicable() attributeToTenant))); } - if (GetMultiTenantMonitorDSMHandle() == DSM_HANDLE_INVALID) - { - CreateMultiTenantMonitor(); - } - MultiTenantMonitor *monitor = GetMultiTenantMonitor(); + + LWLockAcquire(&monitor->lock, LW_SHARED); monitor->periodStart = monitor->periodStart + ((queryTime-monitor->periodStart)/CitusStatsTenantsPeriod)*CitusStatsTenantsPeriod; - dsm_handle tenantDSMHandle = FindTenantStats(monitor); + int tenantIndex = FindTenantStats(monitor); - if (tenantDSMHandle == DSM_HANDLE_INVALID) + if (tenantIndex == -1) { - tenantDSMHandle = CreateTenantStats(monitor); + tenantIndex = CreateTenantStats(monitor); } - TenantStats * tenantStats = GetTenantStatsFromDSMHandle(tenantDSMHandle); - strcpy(tenantStats->tenantAttribute, attributeToTenant); - tenantStats->colocationGroupId = colocationGroupId; + TenantStats * tenantStats = &monitor->tenants[tenantIndex]; + + LWLockAcquire(&tenantStats->lock, LW_EXCLUSIVE); UpdatePeriodsIfNecessary(monitor, tenantStats); tenantStats->lastQueryTime = queryTime; @@ -285,38 +282,25 @@ AttributeMetricsIfApplicable() /* * We do this after the reducing the scores so the scores in this period are not affected by the reduction. */ - monitor->scores[tenantStats->rank] += ONE_QUERY_SCORE; + tenantStats->score += ONE_QUERY_SCORE; /* * After updating the score we might need to change the rank of the tenant in the monitor */ - while(tenantStats->rank != 0 && monitor->scores[tenantStats->rank-1] < monitor->scores[tenantStats->rank]) + while(tenantIndex != 0 && monitor->tenants[tenantIndex-1].score < tenantStats->score) { + LWLockAcquire(&monitor->tenants[tenantIndex-1].lock, LW_EXCLUSIVE); // we need to reduce previous tenants score too !!!!!!!! - TenantStats *previousTenantStats = GetTenantStatsFromDSMHandle(monitor->tenants[tenantStats->rank-1]); + elog(WARNING, "swap = %d, %d", tenantIndex, tenantIndex-1); - dsm_handle tempTenant = monitor->tenants[tenantStats->rank]; - monitor->tenants[tenantStats->rank] = monitor->tenants[previousTenantStats->rank]; - monitor->tenants[previousTenantStats->rank] = tempTenant; + TenantStats tempTenant = monitor->tenants[tenantIndex]; + monitor->tenants[tenantIndex] = monitor->tenants[tenantIndex - 1]; + monitor->tenants[tenantIndex - 1] = tempTenant; - long long tempScore = monitor->scores[tenantStats->rank]; - monitor->scores[tenantStats->rank] = monitor->scores[previousTenantStats->rank]; - monitor->scores[previousTenantStats->rank] = tempScore; + LWLockRelease(&monitor->tenants[tenantIndex-1].lock); - previousTenantStats->rank++; - tenantStats->rank--; - } - - /* - * We keep up to CitusStatsTenantsLimit * 3 tenants instead of CitusStatsTenantsLimit, - * so we don't lose data immediately after a tenant is out of top CitusStatsTenantsLimit - * - * Every time tenant count hits CitusStatsTenantsLimit * 3, we reduce it back to CitusStatsTenantsLimit * 2. - */ - if (monitor->tenantCount >= CitusStatsTenantsLimit * 3) - { - monitor->tenantCount = CitusStatsTenantsLimit * 2; + tenantIndex--; } if (attributeCommandType == CMD_SELECT) @@ -332,6 +316,22 @@ AttributeMetricsIfApplicable() tenantStats->totalInsertTime+=cpu_time_used; } + LWLockRelease(&tenantStats->lock); + LWLockRelease(&monitor->lock); + + /* + * We keep up to CitusStatsTenantsLimit * 3 tenants instead of CitusStatsTenantsLimit, + * so we don't lose data immediately after a tenant is out of top CitusStatsTenantsLimit + * + * Every time tenant count hits CitusStatsTenantsLimit * 3, we reduce it back to CitusStatsTenantsLimit * 2. + */ + if (monitor->tenantCount >= CitusStatsTenantsLimit * 3) + { + LWLockAcquire(&monitor->lock, LW_EXCLUSIVE); + monitor->tenantCount = CitusStatsTenantsLimit * 2; + LWLockRelease(&monitor->lock); + } + if (MultiTenantMonitoringLogLevel != CITUS_LOG_LEVEL_OFF) { ereport(NOTICE, (errmsg("total select count = %d, total CPU time = %f to tenant: %s", tenantStats->selectCount, tenantStats->totalSelectTime, @@ -410,7 +410,7 @@ ReduceScoreIfNecessary(MultiTenantMonitor *monitor, TenantStats *tenantStats, ti */ if (periodCountAfterLastScoreReduction > 0) { - monitor->scores[tenantStats->rank] >>= periodCountAfterLastScoreReduction; + tenantStats->score >>= periodCountAfterLastScoreReduction; tenantStats->lastScoreReduction = updateTime; } } @@ -422,9 +422,7 @@ ReduceScoreIfNecessary(MultiTenantMonitor *monitor, TenantStats *tenantStats, ti static void CreateMultiTenantMonitor() { - dsm_handle dsmHandle = CreateSharedMemoryForMultiTenantMonitor(); - StoreMultiTenantMonitorSMHandle(dsmHandle); - MultiTenantMonitor * monitor = GetMultiTenantMonitor(); + MultiTenantMonitor * monitor = CreateSharedMemoryForMultiTenantMonitor(); monitor->tenantCount = 0; monitor->periodStart = time(0); } @@ -433,28 +431,26 @@ CreateMultiTenantMonitor() /* * CreateSharedMemoryForMultiTenantMonitor creates a dynamic shared memory segment for multi tenant monitor. */ -static dsm_handle +static MultiTenantMonitor* CreateSharedMemoryForMultiTenantMonitor() -{ - struct dsm_segment *dsmSegment = dsm_create(sizeof(MultiTenantMonitor), DSM_CREATE_NULL_IF_MAXSEGMENTS); - dsm_pin_segment(dsmSegment); - dsm_pin_mapping(dsmSegment); // don't know why we do both !!!!!!!!!!!!!!!!! - return dsm_segment_handle(dsmSegment); -} - -/* - * StoreMultiTenantMonitorSMHandle stores the dsm (dynamic shared memory) handle for multi tenant monitor - * in a non-dynamic shared memory location, so we don't lose it. - */ -static void -StoreMultiTenantMonitorSMHandle(dsm_handle dsmHandle) { bool found = false; - MultiTenantMonitorSMData *smData = ShmemInitStruct(SharedMemoryNameForMultiTenantMonitorHandleManagement, - sizeof(MultiTenantMonitorSMData), - &found); + MultiTenantMonitor *monitor = ShmemInitStruct(SharedMemoryNameForMultiTenantMonitor, + MultiTenantMonitorshmemSize(), + &found); + if (found) + { + return monitor; + } - smData->dsmHandle = dsmHandle; + char * trancheName = "Multi Tenant Monitor Tranche"; + + monitor->namedLockTranche.trancheId = LWLockNewTrancheId(); + + LWLockRegisterTranche(monitor->namedLockTranche.trancheId, trancheName); + LWLockInitialize(&monitor->lock, monitor->namedLockTranche.trancheId); + + return monitor; } @@ -463,54 +459,19 @@ StoreMultiTenantMonitorSMHandle(dsm_handle dsmHandle) */ static MultiTenantMonitor * GetMultiTenantMonitor() -{ - dsm_handle dsmHandle = GetMultiTenantMonitorDSMHandle(); - if (dsmHandle == DSM_HANDLE_INVALID) - { - return NULL; - } - dsm_segment *dsmSegment = dsm_find_mapping(dsmHandle); - if (dsmSegment == NULL) - { - dsmSegment = dsm_attach(dsmHandle); - } - MultiTenantMonitor *monitor = (MultiTenantMonitor *) dsm_segment_address(dsmSegment); - dsm_pin_mapping(dsmSegment); - return monitor; -} - -/* - * GetMultiTenantMonitorDSMHandle fetches the dsm (dynamic shared memory) handle for multi tenant monitor. - */ -static dsm_handle -GetMultiTenantMonitorDSMHandle() { bool found = false; - MultiTenantMonitorSMData *smData = ShmemInitStruct(SharedMemoryNameForMultiTenantMonitorHandleManagement, - sizeof(MultiTenantMonitorSMData), - &found); + MultiTenantMonitor *monitor = ShmemInitStruct(SharedMemoryNameForMultiTenantMonitor, + MultiTenantMonitorshmemSize(), + &found); if (!found) { - elog(WARNING, "dsm handle not found"); - return DSM_HANDLE_INVALID; + elog(WARNING, "monitor not found"); + return NULL; } - dsm_handle dsmHandle = smData->dsmHandle; - - return dsmHandle; -} - - -static void -DetachSegment() -{ - dsm_handle dsmHandle = GetMultiTenantMonitorDSMHandle(); - dsm_segment *dsmSegment = dsm_find_mapping(dsmHandle); - if (dsmSegment != NULL) - { - dsm_detach(dsmSegment); - } + return monitor; } @@ -535,14 +496,7 @@ InitializeMultiTenantMonitorSMHandleManagement() static void MultiTenantMonitorSMInit() { - bool alreadyInitialized = false; - MultiTenantMonitorSMData *smData = ShmemInitStruct(SharedMemoryNameForMultiTenantMonitorHandleManagement, - sizeof(MultiTenantMonitorSMData), - &alreadyInitialized); - if (!alreadyInitialized) - { - smData->dsmHandle = DSM_HANDLE_INVALID; - } + CreateMultiTenantMonitor(); if (prev_shmem_startup_hook != NULL) { @@ -554,64 +508,51 @@ MultiTenantMonitorSMInit() /* * CreateTenantStats creates the data structure for a tenant's statistics. */ -static dsm_handle +static int CreateTenantStats(MultiTenantMonitor *monitor) { - dsm_handle dsmHandle = CreateSharedMemoryForTenantStats(); - monitor->tenants[monitor->tenantCount] = dsmHandle; - TenantStats *tenantStats = GetTenantStatsFromDSMHandle(dsmHandle); - tenantStats->rank = monitor->tenantCount; + int tenantIndex = monitor->tenantCount; + + strcpy(monitor->tenants[tenantIndex].tenantAttribute, attributeToTenant); + monitor->tenants[tenantIndex].colocationGroupId = colocationGroupId; + + char * trancheName = "Tenant Tranche"; + + monitor->tenants[tenantIndex].namedLockTranche.trancheId = LWLockNewTrancheId(); + + LWLockRegisterTranche(monitor->tenants[tenantIndex].namedLockTranche.trancheId, trancheName); + LWLockInitialize(&monitor->tenants[tenantIndex].lock, monitor->tenants[tenantIndex].namedLockTranche.trancheId); + monitor->tenantCount++; - return dsmHandle; -} - -/* - * CreateSharedMemoryForTenantStats creates a dynamic shared memory segment for a tenant's statistics. - */ -static dsm_handle -CreateSharedMemoryForTenantStats() -{ - struct dsm_segment *dsmSegment = dsm_create(sizeof(TenantStats), DSM_CREATE_NULL_IF_MAXSEGMENTS); - dsm_pin_segment(dsmSegment); - dsm_pin_mapping(dsmSegment); // don't know why we do both !!!!!!!!!!!!!!!!! - return dsm_segment_handle(dsmSegment); -} - - -/* - * GetTenantStatsFromDSMHandle returns the data structure for a tenant's statistics with the dsm (dynamic shared memory) handle. - */ -static TenantStats * -GetTenantStatsFromDSMHandle(dsm_handle dsmHandle) -{ - dsm_segment *dsmSegment = dsm_find_mapping(dsmHandle); - if (dsmSegment == NULL) - { - dsmSegment = dsm_attach(dsmHandle); - } - TenantStats *stats = (TenantStats *) dsm_segment_address(dsmSegment); - dsm_pin_mapping(dsmSegment); - - return stats; + return tenantIndex; } /* * FindTenantStats finds the dsm (dynamic shared memory) handle for the current tenant's statistics. */ -static dsm_handle +static int FindTenantStats(MultiTenantMonitor *monitor) { for(int i=0; itenantCount; i++) { - TenantStats * tenantStats = GetTenantStatsFromDSMHandle(monitor->tenants[i]); + TenantStats * tenantStats = &monitor->tenants[i]; if (strcmp(tenantStats->tenantAttribute, attributeToTenant) == 0 && tenantStats->colocationGroupId == colocationGroupId) { - return monitor->tenants[i]; + return i; } } - return DSM_HANDLE_INVALID; + return -1; } + +static size_t +MultiTenantMonitorshmemSize(void) +{ + Size size = sizeof(MultiTenantMonitor); + size = add_size(size, mul_size(sizeof(TenantStats), CitusStatsTenantsLimit * 3)); + + return size; +} diff --git a/src/include/distributed/utils/attribute.h b/src/include/distributed/utils/attribute.h index 6c7db6da8..5200ac63b 100644 --- a/src/include/distributed/utils/attribute.h +++ b/src/include/distributed/utils/attribute.h @@ -7,15 +7,7 @@ #include "executor/execdesc.h" #include "executor/executor.h" - -typedef struct MultiTenantMonitor -{ - int tenantCount; - dsm_handle tenants[300]; - long long scores[300]; - - time_t periodStart; -} MultiTenantMonitor; +#include "storage/lwlock.h" typedef struct TenantStats { @@ -35,14 +27,25 @@ typedef struct TenantStats time_t lastQueryTime; + long long score; time_t lastScoreReduction; int rank; + + NamedLWLockTranche namedLockTranche; + LWLock lock; } TenantStats; -typedef struct MultiTenantMonitorSMData +typedef struct MultiTenantMonitor { - dsm_handle dsmHandle; -} MultiTenantMonitorSMData; + time_t periodStart; + + NamedLWLockTranche namedLockTranche; + LWLock lock; + + int tenantCount; + TenantStats tenants[FLEXIBLE_ARRAY_MEMBER]; +} MultiTenantMonitor; + extern void CitusAttributeToEnd(QueryDesc *queryDesc); extern void AttributeQueryIfAnnotated(const char *queryString, CmdType commandType);