Add locks and convert to static shared memory

pull/6763/head
Halil Ozan Akgul 2023-03-06 16:20:42 +03:00
parent 4dfb9c6694
commit f7a479d2e4
2 changed files with 115 additions and 171 deletions

View File

@ -16,6 +16,7 @@
#include "distributed/tuplestore.h" #include "distributed/tuplestore.h"
#include "executor/execdesc.h" #include "executor/execdesc.h"
#include "storage/ipc.h" #include "storage/ipc.h"
#include "storage/lwlock.h"
#include "storage/shmem.h" #include "storage/shmem.h"
#include "utils/builtins.h" #include "utils/builtins.h"
@ -38,24 +39,20 @@ CmdType attributeCommandType = CMD_UNKNOWN;
int colocationGroupId = -1; int colocationGroupId = -1;
clock_t attributeToTenantStart = { 0 }; clock_t attributeToTenantStart = { 0 };
const char *SharedMemoryNameForMultiTenantMonitorHandleManagement = const char *SharedMemoryNameForMultiTenantMonitor =
"Shared memory handle for multi tenant monitor"; "Shared memory for multi tenant monitor";
static shmem_startup_hook_type prev_shmem_startup_hook = NULL; static shmem_startup_hook_type prev_shmem_startup_hook = NULL;
static void UpdatePeriodsIfNecessary(MultiTenantMonitor *monitor,TenantStats *tenantStats); static void UpdatePeriodsIfNecessary(MultiTenantMonitor *monitor,TenantStats *tenantStats);
static void ReduceScoreIfNecessary(MultiTenantMonitor *monitor, TenantStats *tenantStats, time_t updateTime); static void ReduceScoreIfNecessary(MultiTenantMonitor *monitor, TenantStats *tenantStats, time_t updateTime);
static void CreateMultiTenantMonitor(void); static void CreateMultiTenantMonitor(void);
static dsm_handle CreateSharedMemoryForMultiTenantMonitor(void); static MultiTenantMonitor * CreateSharedMemoryForMultiTenantMonitor(void);
static void StoreMultiTenantMonitorSMHandle(dsm_handle dsmHandle);
static MultiTenantMonitor * GetMultiTenantMonitor(void); static MultiTenantMonitor * GetMultiTenantMonitor(void);
static dsm_handle GetMultiTenantMonitorDSMHandle(void);
static void DetachSegment(void);
static void MultiTenantMonitorSMInit(void); static void MultiTenantMonitorSMInit(void);
static dsm_handle CreateTenantStats(MultiTenantMonitor *monitor); static int CreateTenantStats(MultiTenantMonitor *monitor);
static dsm_handle CreateSharedMemoryForTenantStats(void); static int FindTenantStats(MultiTenantMonitor *monitor);
static TenantStats * GetTenantStatsFromDSMHandle(dsm_handle dsmHandle); static size_t MultiTenantMonitorshmemSize(void);
static dsm_handle FindTenantStats(MultiTenantMonitor *monitor);
int MultiTenantMonitoringLogLevel = CITUS_LOG_LEVEL_OFF; int MultiTenantMonitoringLogLevel = CITUS_LOG_LEVEL_OFF;
int CitusStatsTenantsPeriod = (time_t) 60; int CitusStatsTenantsPeriod = (time_t) 60;
@ -93,7 +90,9 @@ citus_stats_tenants(PG_FUNCTION_ARGS)
{ {
PG_RETURN_VOID(); PG_RETURN_VOID();
} }
LWLockAcquire(&monitor->lock, LW_EXCLUSIVE);
monitor->periodStart = monitor->periodStart + ((monitoringTime-monitor->periodStart)/CitusStatsTenantsPeriod)*CitusStatsTenantsPeriod; monitor->periodStart = monitor->periodStart + ((monitoringTime-monitor->periodStart)/CitusStatsTenantsPeriod)*CitusStatsTenantsPeriod;
int numberOfRowsToReturn = 0; int numberOfRowsToReturn = 0;
@ -103,7 +102,7 @@ citus_stats_tenants(PG_FUNCTION_ARGS)
} }
else else
{ {
numberOfRowsToReturn = min (monitor->tenantCount, CitusStatsTenantsLimit); numberOfRowsToReturn = Min (monitor->tenantCount, CitusStatsTenantsLimit);
} }
for (int i=0; i<numberOfRowsToReturn; i++) for (int i=0; i<numberOfRowsToReturn; i++)
@ -111,7 +110,7 @@ citus_stats_tenants(PG_FUNCTION_ARGS)
memset(values, 0, sizeof(values)); memset(values, 0, sizeof(values));
memset(isNulls, false, sizeof(isNulls)); memset(isNulls, false, sizeof(isNulls));
TenantStats *tenantStats = GetTenantStatsFromDSMHandle(monitor->tenants[i]); TenantStats *tenantStats = &monitor->tenants[i];
UpdatePeriodsIfNecessary(monitor, tenantStats); UpdatePeriodsIfNecessary(monitor, tenantStats);
ReduceScoreIfNecessary(monitor, tenantStats, monitoringTime); ReduceScoreIfNecessary(monitor, tenantStats, monitoringTime);
@ -122,12 +121,13 @@ citus_stats_tenants(PG_FUNCTION_ARGS)
values[3] = Int32GetDatum(tenantStats->selectsInLastPeriod); values[3] = Int32GetDatum(tenantStats->selectsInLastPeriod);
values[4] = Int32GetDatum(tenantStats->selectsInThisPeriod + tenantStats->insertsInThisPeriod); values[4] = Int32GetDatum(tenantStats->selectsInThisPeriod + tenantStats->insertsInThisPeriod);
values[5] = Int32GetDatum(tenantStats->selectsInLastPeriod + tenantStats->insertsInLastPeriod); values[5] = Int32GetDatum(tenantStats->selectsInLastPeriod + tenantStats->insertsInLastPeriod);
values[6] = Int64GetDatum(monitor->scores[tenantStats->rank]); values[6] = Int64GetDatum(tenantStats->score);
tuplestore_putvalues(tupleStore, tupleDescriptor, values, isNulls); tuplestore_putvalues(tupleStore, tupleDescriptor, values, isNulls);
} }
LWLockRelease(&monitor->lock);
PG_RETURN_VOID(); PG_RETURN_VOID();
} }
@ -258,24 +258,21 @@ AttributeMetricsIfApplicable()
attributeToTenant))); attributeToTenant)));
} }
if (GetMultiTenantMonitorDSMHandle() == DSM_HANDLE_INVALID)
{
CreateMultiTenantMonitor();
}
MultiTenantMonitor *monitor = GetMultiTenantMonitor(); MultiTenantMonitor *monitor = GetMultiTenantMonitor();
LWLockAcquire(&monitor->lock, LW_SHARED);
monitor->periodStart = monitor->periodStart + ((queryTime-monitor->periodStart)/CitusStatsTenantsPeriod)*CitusStatsTenantsPeriod; monitor->periodStart = monitor->periodStart + ((queryTime-monitor->periodStart)/CitusStatsTenantsPeriod)*CitusStatsTenantsPeriod;
dsm_handle tenantDSMHandle = FindTenantStats(monitor); int tenantIndex = FindTenantStats(monitor);
if (tenantDSMHandle == DSM_HANDLE_INVALID) if (tenantIndex == -1)
{ {
tenantDSMHandle = CreateTenantStats(monitor); tenantIndex = CreateTenantStats(monitor);
} }
TenantStats * tenantStats = GetTenantStatsFromDSMHandle(tenantDSMHandle); TenantStats * tenantStats = &monitor->tenants[tenantIndex];
strcpy(tenantStats->tenantAttribute, attributeToTenant);
tenantStats->colocationGroupId = colocationGroupId; LWLockAcquire(&tenantStats->lock, LW_EXCLUSIVE);
UpdatePeriodsIfNecessary(monitor, tenantStats); UpdatePeriodsIfNecessary(monitor, tenantStats);
tenantStats->lastQueryTime = queryTime; tenantStats->lastQueryTime = queryTime;
@ -285,38 +282,25 @@ AttributeMetricsIfApplicable()
/* /*
* We do this after the reducing the scores so the scores in this period are not affected by the reduction. * We do this after the reducing the scores so the scores in this period are not affected by the reduction.
*/ */
monitor->scores[tenantStats->rank] += ONE_QUERY_SCORE; tenantStats->score += ONE_QUERY_SCORE;
/* /*
* After updating the score we might need to change the rank of the tenant in the monitor * After updating the score we might need to change the rank of the tenant in the monitor
*/ */
while(tenantStats->rank != 0 && monitor->scores[tenantStats->rank-1] < monitor->scores[tenantStats->rank]) while(tenantIndex != 0 && monitor->tenants[tenantIndex-1].score < tenantStats->score)
{ {
LWLockAcquire(&monitor->tenants[tenantIndex-1].lock, LW_EXCLUSIVE);
// we need to reduce previous tenants score too !!!!!!!! // we need to reduce previous tenants score too !!!!!!!!
TenantStats *previousTenantStats = GetTenantStatsFromDSMHandle(monitor->tenants[tenantStats->rank-1]); elog(WARNING, "swap = %d, %d", tenantIndex, tenantIndex-1);
dsm_handle tempTenant = monitor->tenants[tenantStats->rank]; TenantStats tempTenant = monitor->tenants[tenantIndex];
monitor->tenants[tenantStats->rank] = monitor->tenants[previousTenantStats->rank]; monitor->tenants[tenantIndex] = monitor->tenants[tenantIndex - 1];
monitor->tenants[previousTenantStats->rank] = tempTenant; monitor->tenants[tenantIndex - 1] = tempTenant;
long long tempScore = monitor->scores[tenantStats->rank]; LWLockRelease(&monitor->tenants[tenantIndex-1].lock);
monitor->scores[tenantStats->rank] = monitor->scores[previousTenantStats->rank];
monitor->scores[previousTenantStats->rank] = tempScore;
previousTenantStats->rank++; tenantIndex--;
tenantStats->rank--;
}
/*
* We keep up to CitusStatsTenantsLimit * 3 tenants instead of CitusStatsTenantsLimit,
* so we don't lose data immediately after a tenant is out of top CitusStatsTenantsLimit
*
* Every time tenant count hits CitusStatsTenantsLimit * 3, we reduce it back to CitusStatsTenantsLimit * 2.
*/
if (monitor->tenantCount >= CitusStatsTenantsLimit * 3)
{
monitor->tenantCount = CitusStatsTenantsLimit * 2;
} }
if (attributeCommandType == CMD_SELECT) if (attributeCommandType == CMD_SELECT)
@ -332,6 +316,22 @@ AttributeMetricsIfApplicable()
tenantStats->totalInsertTime+=cpu_time_used; tenantStats->totalInsertTime+=cpu_time_used;
} }
LWLockRelease(&tenantStats->lock);
LWLockRelease(&monitor->lock);
/*
* We keep up to CitusStatsTenantsLimit * 3 tenants instead of CitusStatsTenantsLimit,
* so we don't lose data immediately after a tenant is out of top CitusStatsTenantsLimit
*
* Every time tenant count hits CitusStatsTenantsLimit * 3, we reduce it back to CitusStatsTenantsLimit * 2.
*/
if (monitor->tenantCount >= CitusStatsTenantsLimit * 3)
{
LWLockAcquire(&monitor->lock, LW_EXCLUSIVE);
monitor->tenantCount = CitusStatsTenantsLimit * 2;
LWLockRelease(&monitor->lock);
}
if (MultiTenantMonitoringLogLevel != CITUS_LOG_LEVEL_OFF) if (MultiTenantMonitoringLogLevel != CITUS_LOG_LEVEL_OFF)
{ {
ereport(NOTICE, (errmsg("total select count = %d, total CPU time = %f to tenant: %s", tenantStats->selectCount, tenantStats->totalSelectTime, ereport(NOTICE, (errmsg("total select count = %d, total CPU time = %f to tenant: %s", tenantStats->selectCount, tenantStats->totalSelectTime,
@ -410,7 +410,7 @@ ReduceScoreIfNecessary(MultiTenantMonitor *monitor, TenantStats *tenantStats, ti
*/ */
if (periodCountAfterLastScoreReduction > 0) if (periodCountAfterLastScoreReduction > 0)
{ {
monitor->scores[tenantStats->rank] >>= periodCountAfterLastScoreReduction; tenantStats->score >>= periodCountAfterLastScoreReduction;
tenantStats->lastScoreReduction = updateTime; tenantStats->lastScoreReduction = updateTime;
} }
} }
@ -422,9 +422,7 @@ ReduceScoreIfNecessary(MultiTenantMonitor *monitor, TenantStats *tenantStats, ti
static void static void
CreateMultiTenantMonitor() CreateMultiTenantMonitor()
{ {
dsm_handle dsmHandle = CreateSharedMemoryForMultiTenantMonitor(); MultiTenantMonitor * monitor = CreateSharedMemoryForMultiTenantMonitor();
StoreMultiTenantMonitorSMHandle(dsmHandle);
MultiTenantMonitor * monitor = GetMultiTenantMonitor();
monitor->tenantCount = 0; monitor->tenantCount = 0;
monitor->periodStart = time(0); monitor->periodStart = time(0);
} }
@ -433,28 +431,26 @@ CreateMultiTenantMonitor()
/* /*
* CreateSharedMemoryForMultiTenantMonitor creates a dynamic shared memory segment for multi tenant monitor. * CreateSharedMemoryForMultiTenantMonitor creates a dynamic shared memory segment for multi tenant monitor.
*/ */
static dsm_handle static MultiTenantMonitor*
CreateSharedMemoryForMultiTenantMonitor() CreateSharedMemoryForMultiTenantMonitor()
{
struct dsm_segment *dsmSegment = dsm_create(sizeof(MultiTenantMonitor), DSM_CREATE_NULL_IF_MAXSEGMENTS);
dsm_pin_segment(dsmSegment);
dsm_pin_mapping(dsmSegment); // don't know why we do both !!!!!!!!!!!!!!!!!
return dsm_segment_handle(dsmSegment);
}
/*
* StoreMultiTenantMonitorSMHandle stores the dsm (dynamic shared memory) handle for multi tenant monitor
* in a non-dynamic shared memory location, so we don't lose it.
*/
static void
StoreMultiTenantMonitorSMHandle(dsm_handle dsmHandle)
{ {
bool found = false; bool found = false;
MultiTenantMonitorSMData *smData = ShmemInitStruct(SharedMemoryNameForMultiTenantMonitorHandleManagement, MultiTenantMonitor *monitor = ShmemInitStruct(SharedMemoryNameForMultiTenantMonitor,
sizeof(MultiTenantMonitorSMData), MultiTenantMonitorshmemSize(),
&found); &found);
if (found)
{
return monitor;
}
smData->dsmHandle = dsmHandle; char * trancheName = "Multi Tenant Monitor Tranche";
monitor->namedLockTranche.trancheId = LWLockNewTrancheId();
LWLockRegisterTranche(monitor->namedLockTranche.trancheId, trancheName);
LWLockInitialize(&monitor->lock, monitor->namedLockTranche.trancheId);
return monitor;
} }
@ -463,54 +459,19 @@ StoreMultiTenantMonitorSMHandle(dsm_handle dsmHandle)
*/ */
static MultiTenantMonitor * static MultiTenantMonitor *
GetMultiTenantMonitor() GetMultiTenantMonitor()
{
dsm_handle dsmHandle = GetMultiTenantMonitorDSMHandle();
if (dsmHandle == DSM_HANDLE_INVALID)
{
return NULL;
}
dsm_segment *dsmSegment = dsm_find_mapping(dsmHandle);
if (dsmSegment == NULL)
{
dsmSegment = dsm_attach(dsmHandle);
}
MultiTenantMonitor *monitor = (MultiTenantMonitor *) dsm_segment_address(dsmSegment);
dsm_pin_mapping(dsmSegment);
return monitor;
}
/*
* GetMultiTenantMonitorDSMHandle fetches the dsm (dynamic shared memory) handle for multi tenant monitor.
*/
static dsm_handle
GetMultiTenantMonitorDSMHandle()
{ {
bool found = false; bool found = false;
MultiTenantMonitorSMData *smData = ShmemInitStruct(SharedMemoryNameForMultiTenantMonitorHandleManagement, MultiTenantMonitor *monitor = ShmemInitStruct(SharedMemoryNameForMultiTenantMonitor,
sizeof(MultiTenantMonitorSMData), MultiTenantMonitorshmemSize(),
&found); &found);
if (!found) if (!found)
{ {
elog(WARNING, "dsm handle not found"); elog(WARNING, "monitor not found");
return DSM_HANDLE_INVALID; return NULL;
} }
dsm_handle dsmHandle = smData->dsmHandle; return monitor;
return dsmHandle;
}
static void
DetachSegment()
{
dsm_handle dsmHandle = GetMultiTenantMonitorDSMHandle();
dsm_segment *dsmSegment = dsm_find_mapping(dsmHandle);
if (dsmSegment != NULL)
{
dsm_detach(dsmSegment);
}
} }
@ -535,14 +496,7 @@ InitializeMultiTenantMonitorSMHandleManagement()
static void static void
MultiTenantMonitorSMInit() MultiTenantMonitorSMInit()
{ {
bool alreadyInitialized = false; CreateMultiTenantMonitor();
MultiTenantMonitorSMData *smData = ShmemInitStruct(SharedMemoryNameForMultiTenantMonitorHandleManagement,
sizeof(MultiTenantMonitorSMData),
&alreadyInitialized);
if (!alreadyInitialized)
{
smData->dsmHandle = DSM_HANDLE_INVALID;
}
if (prev_shmem_startup_hook != NULL) if (prev_shmem_startup_hook != NULL)
{ {
@ -554,64 +508,51 @@ MultiTenantMonitorSMInit()
/* /*
* CreateTenantStats creates the data structure for a tenant's statistics. * CreateTenantStats creates the data structure for a tenant's statistics.
*/ */
static dsm_handle static int
CreateTenantStats(MultiTenantMonitor *monitor) CreateTenantStats(MultiTenantMonitor *monitor)
{ {
dsm_handle dsmHandle = CreateSharedMemoryForTenantStats(); int tenantIndex = monitor->tenantCount;
monitor->tenants[monitor->tenantCount] = dsmHandle;
TenantStats *tenantStats = GetTenantStatsFromDSMHandle(dsmHandle); strcpy(monitor->tenants[tenantIndex].tenantAttribute, attributeToTenant);
tenantStats->rank = monitor->tenantCount; monitor->tenants[tenantIndex].colocationGroupId = colocationGroupId;
char * trancheName = "Tenant Tranche";
monitor->tenants[tenantIndex].namedLockTranche.trancheId = LWLockNewTrancheId();
LWLockRegisterTranche(monitor->tenants[tenantIndex].namedLockTranche.trancheId, trancheName);
LWLockInitialize(&monitor->tenants[tenantIndex].lock, monitor->tenants[tenantIndex].namedLockTranche.trancheId);
monitor->tenantCount++; monitor->tenantCount++;
return dsmHandle;
}
return tenantIndex;
/*
* CreateSharedMemoryForTenantStats creates a dynamic shared memory segment for a tenant's statistics.
*/
static dsm_handle
CreateSharedMemoryForTenantStats()
{
struct dsm_segment *dsmSegment = dsm_create(sizeof(TenantStats), DSM_CREATE_NULL_IF_MAXSEGMENTS);
dsm_pin_segment(dsmSegment);
dsm_pin_mapping(dsmSegment); // don't know why we do both !!!!!!!!!!!!!!!!!
return dsm_segment_handle(dsmSegment);
}
/*
* GetTenantStatsFromDSMHandle returns the data structure for a tenant's statistics with the dsm (dynamic shared memory) handle.
*/
static TenantStats *
GetTenantStatsFromDSMHandle(dsm_handle dsmHandle)
{
dsm_segment *dsmSegment = dsm_find_mapping(dsmHandle);
if (dsmSegment == NULL)
{
dsmSegment = dsm_attach(dsmHandle);
}
TenantStats *stats = (TenantStats *) dsm_segment_address(dsmSegment);
dsm_pin_mapping(dsmSegment);
return stats;
} }
/* /*
* FindTenantStats finds the dsm (dynamic shared memory) handle for the current tenant's statistics. * FindTenantStats finds the dsm (dynamic shared memory) handle for the current tenant's statistics.
*/ */
static dsm_handle static int
FindTenantStats(MultiTenantMonitor *monitor) FindTenantStats(MultiTenantMonitor *monitor)
{ {
for(int i=0; i<monitor->tenantCount; i++) for(int i=0; i<monitor->tenantCount; i++)
{ {
TenantStats * tenantStats = GetTenantStatsFromDSMHandle(monitor->tenants[i]); TenantStats * tenantStats = &monitor->tenants[i];
if (strcmp(tenantStats->tenantAttribute, attributeToTenant) == 0 && tenantStats->colocationGroupId == colocationGroupId) if (strcmp(tenantStats->tenantAttribute, attributeToTenant) == 0 && tenantStats->colocationGroupId == colocationGroupId)
{ {
return monitor->tenants[i]; return i;
} }
} }
return DSM_HANDLE_INVALID; return -1;
} }
static size_t
MultiTenantMonitorshmemSize(void)
{
Size size = sizeof(MultiTenantMonitor);
size = add_size(size, mul_size(sizeof(TenantStats), CitusStatsTenantsLimit * 3));
return size;
}

View File

@ -7,15 +7,7 @@
#include "executor/execdesc.h" #include "executor/execdesc.h"
#include "executor/executor.h" #include "executor/executor.h"
#include "storage/lwlock.h"
typedef struct MultiTenantMonitor
{
int tenantCount;
dsm_handle tenants[300];
long long scores[300];
time_t periodStart;
} MultiTenantMonitor;
typedef struct TenantStats typedef struct TenantStats
{ {
@ -35,14 +27,25 @@ typedef struct TenantStats
time_t lastQueryTime; time_t lastQueryTime;
long long score;
time_t lastScoreReduction; time_t lastScoreReduction;
int rank; int rank;
NamedLWLockTranche namedLockTranche;
LWLock lock;
} TenantStats; } TenantStats;
typedef struct MultiTenantMonitorSMData typedef struct MultiTenantMonitor
{ {
dsm_handle dsmHandle; time_t periodStart;
} MultiTenantMonitorSMData;
NamedLWLockTranche namedLockTranche;
LWLock lock;
int tenantCount;
TenantStats tenants[FLEXIBLE_ARRAY_MEMBER];
} MultiTenantMonitor;
extern void CitusAttributeToEnd(QueryDesc *queryDesc); extern void CitusAttributeToEnd(QueryDesc *queryDesc);
extern void AttributeQueryIfAnnotated(const char *queryString, CmdType commandType); extern void AttributeQueryIfAnnotated(const char *queryString, CmdType commandType);