mirror of https://github.com/citusdata/citus.git
indent
parent
4ebac440f9
commit
01eb68a7c9
|
@ -149,7 +149,8 @@ RebuildQueryStrings(Job *workerJob)
|
||||||
{
|
{
|
||||||
partitionColumnValue = workerJob->partitionKeyValue->constvalue;
|
partitionColumnValue = workerJob->partitionKeyValue->constvalue;
|
||||||
partitionColumnType = workerJob->partitionKeyValue->consttype;
|
partitionColumnType = workerJob->partitionKeyValue->consttype;
|
||||||
partitionColumnString = DatumToString(partitionColumnValue, partitionColumnType);
|
partitionColumnString = DatumToString(partitionColumnValue,
|
||||||
|
partitionColumnType);
|
||||||
}
|
}
|
||||||
|
|
||||||
task->partitionColumn = partitionColumnString;
|
task->partitionColumn = partitionColumnString;
|
||||||
|
@ -402,7 +403,8 @@ SetTaskQueryIfShouldLazyDeparse(Task *task, Query *query)
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
SetTaskQueryString(task, AnnotateQuery(DeparseTaskQuery(task, query), task->partitionColumn, task->colocationId));
|
SetTaskQueryString(task, AnnotateQuery(DeparseTaskQuery(task, query),
|
||||||
|
task->partitionColumn, task->colocationId));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -170,7 +170,8 @@ static int CompareInsertValuesByShardId(const void *leftElement,
|
||||||
static List * SingleShardTaskList(Query *query, uint64 jobId,
|
static List * SingleShardTaskList(Query *query, uint64 jobId,
|
||||||
List *relationShardList, List *placementList,
|
List *relationShardList, List *placementList,
|
||||||
uint64 shardId, bool parametersInQueryResolved,
|
uint64 shardId, bool parametersInQueryResolved,
|
||||||
bool isLocalTableModification, char * partitionColumn, int colocationId);
|
bool isLocalTableModification, char *partitionColumn,
|
||||||
|
int colocationId);
|
||||||
static bool RowLocksOnRelations(Node *node, List **rtiLockList);
|
static bool RowLocksOnRelations(Node *node, List **rtiLockList);
|
||||||
static void ReorderTaskPlacementsByTaskAssignmentPolicy(Job *job,
|
static void ReorderTaskPlacementsByTaskAssignmentPolicy(Job *job,
|
||||||
TaskAssignmentPolicyType
|
TaskAssignmentPolicyType
|
||||||
|
@ -1961,7 +1962,8 @@ GenerateSingleShardRouterTaskList(Job *job, List *relationShardList,
|
||||||
{
|
{
|
||||||
partitionColumnValue = job->partitionKeyValue->constvalue;
|
partitionColumnValue = job->partitionKeyValue->constvalue;
|
||||||
partitionColumnType = job->partitionKeyValue->consttype;
|
partitionColumnType = job->partitionKeyValue->consttype;
|
||||||
partitionColumnString = DatumToString(partitionColumnValue, partitionColumnType);
|
partitionColumnString = DatumToString(partitionColumnValue,
|
||||||
|
partitionColumnType);
|
||||||
}
|
}
|
||||||
|
|
||||||
SetJobColocationId(job);
|
SetJobColocationId(job);
|
||||||
|
@ -1970,7 +1972,8 @@ GenerateSingleShardRouterTaskList(Job *job, List *relationShardList,
|
||||||
relationShardList, placementList,
|
relationShardList, placementList,
|
||||||
shardId,
|
shardId,
|
||||||
job->parametersInJobQueryResolved,
|
job->parametersInJobQueryResolved,
|
||||||
isLocalTableModification, partitionColumnString, job->colocationId);
|
isLocalTableModification,
|
||||||
|
partitionColumnString, job->colocationId);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Queries to reference tables, or distributed tables with multiple replica's have
|
* Queries to reference tables, or distributed tables with multiple replica's have
|
||||||
|
@ -2092,7 +2095,8 @@ static List *
|
||||||
SingleShardTaskList(Query *query, uint64 jobId, List *relationShardList,
|
SingleShardTaskList(Query *query, uint64 jobId, List *relationShardList,
|
||||||
List *placementList, uint64 shardId,
|
List *placementList, uint64 shardId,
|
||||||
bool parametersInQueryResolved,
|
bool parametersInQueryResolved,
|
||||||
bool isLocalTableModification, char * partitionColumn, int colocationId)
|
bool isLocalTableModification, char *partitionColumn,
|
||||||
|
int colocationId)
|
||||||
{
|
{
|
||||||
TaskType taskType = READ_TASK;
|
TaskType taskType = READ_TASK;
|
||||||
char replicationModel = 0;
|
char replicationModel = 0;
|
||||||
|
|
|
@ -44,8 +44,10 @@ const char *SharedMemoryNameForMultiTenantMonitor =
|
||||||
|
|
||||||
static shmem_startup_hook_type prev_shmem_startup_hook = NULL;
|
static shmem_startup_hook_type prev_shmem_startup_hook = NULL;
|
||||||
|
|
||||||
static void UpdatePeriodsIfNecessary(MultiTenantMonitor *monitor,TenantStats *tenantStats);
|
static void UpdatePeriodsIfNecessary(MultiTenantMonitor *monitor,
|
||||||
static void ReduceScoreIfNecessary(MultiTenantMonitor *monitor, TenantStats *tenantStats, time_t updateTime);
|
TenantStats *tenantStats);
|
||||||
|
static void ReduceScoreIfNecessary(MultiTenantMonitor *monitor, TenantStats *tenantStats,
|
||||||
|
time_t updateTime);
|
||||||
static void CreateMultiTenantMonitor(void);
|
static void CreateMultiTenantMonitor(void);
|
||||||
static MultiTenantMonitor * CreateSharedMemoryForMultiTenantMonitor(void);
|
static MultiTenantMonitor * CreateSharedMemoryForMultiTenantMonitor(void);
|
||||||
static MultiTenantMonitor * GetMultiTenantMonitor(void);
|
static MultiTenantMonitor * GetMultiTenantMonitor(void);
|
||||||
|
@ -68,7 +70,7 @@ PG_FUNCTION_INFO_V1(citus_stats_tenants);
|
||||||
Datum
|
Datum
|
||||||
citus_stats_tenants(PG_FUNCTION_ARGS)
|
citus_stats_tenants(PG_FUNCTION_ARGS)
|
||||||
{
|
{
|
||||||
//CheckCitusVersion(ERROR);
|
/*CheckCitusVersion(ERROR); */
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* We keep more than CitusStatsTenantsLimit tenants in our monitor.
|
* We keep more than CitusStatsTenantsLimit tenants in our monitor.
|
||||||
|
@ -93,7 +95,9 @@ citus_stats_tenants(PG_FUNCTION_ARGS)
|
||||||
|
|
||||||
LWLockAcquire(&monitor->lock, LW_EXCLUSIVE);
|
LWLockAcquire(&monitor->lock, LW_EXCLUSIVE);
|
||||||
|
|
||||||
monitor->periodStart = monitor->periodStart + ((monitoringTime-monitor->periodStart)/CitusStatsTenantsPeriod)*CitusStatsTenantsPeriod;
|
monitor->periodStart = monitor->periodStart +
|
||||||
|
((monitoringTime - monitor->periodStart) / CitusStatsTenantsPeriod) *
|
||||||
|
CitusStatsTenantsPeriod;
|
||||||
|
|
||||||
int numberOfRowsToReturn = 0;
|
int numberOfRowsToReturn = 0;
|
||||||
if (returnAllTenants)
|
if (returnAllTenants)
|
||||||
|
@ -119,8 +123,10 @@ citus_stats_tenants(PG_FUNCTION_ARGS)
|
||||||
values[1] = PointerGetDatum(cstring_to_text(tenantStats->tenantAttribute));
|
values[1] = PointerGetDatum(cstring_to_text(tenantStats->tenantAttribute));
|
||||||
values[2] = Int32GetDatum(tenantStats->selectsInThisPeriod);
|
values[2] = Int32GetDatum(tenantStats->selectsInThisPeriod);
|
||||||
values[3] = Int32GetDatum(tenantStats->selectsInLastPeriod);
|
values[3] = Int32GetDatum(tenantStats->selectsInLastPeriod);
|
||||||
values[4] = Int32GetDatum(tenantStats->selectsInThisPeriod + tenantStats->insertsInThisPeriod);
|
values[4] = Int32GetDatum(tenantStats->selectsInThisPeriod +
|
||||||
values[5] = Int32GetDatum(tenantStats->selectsInLastPeriod + tenantStats->insertsInLastPeriod);
|
tenantStats->insertsInThisPeriod);
|
||||||
|
values[5] = Int32GetDatum(tenantStats->selectsInLastPeriod +
|
||||||
|
tenantStats->insertsInLastPeriod);
|
||||||
values[6] = Int64GetDatum(tenantStats->score);
|
values[6] = Int64GetDatum(tenantStats->score);
|
||||||
|
|
||||||
tuplestore_putvalues(tupleStore, tupleDescriptor, values, isNulls);
|
tuplestore_putvalues(tupleStore, tupleDescriptor, values, isNulls);
|
||||||
|
@ -180,7 +186,8 @@ AttributeQueryIfAnnotated(const char *query_string, CmdType commandType)
|
||||||
|
|
||||||
if (MultiTenantMonitoringLogLevel != CITUS_LOG_LEVEL_OFF)
|
if (MultiTenantMonitoringLogLevel != CITUS_LOG_LEVEL_OFF)
|
||||||
{
|
{
|
||||||
ereport(NOTICE, (errmsg("attributing query to tenant: %s", quote_literal_cstr(tenantId))));
|
ereport(NOTICE, (errmsg("attributing query to tenant: %s",
|
||||||
|
quote_literal_cstr(tenantId))));
|
||||||
}
|
}
|
||||||
|
|
||||||
attributeToTenant = (char *) malloc(strlen(tenantId));
|
attributeToTenant = (char *) malloc(strlen(tenantId));
|
||||||
|
@ -191,7 +198,7 @@ AttributeQueryIfAnnotated(const char *query_string, CmdType commandType)
|
||||||
Assert(attributeToTenant == NULL);
|
Assert(attributeToTenant == NULL);
|
||||||
}
|
}
|
||||||
|
|
||||||
//DetachSegment();
|
/*DetachSegment(); */
|
||||||
|
|
||||||
attributeToTenantStart = clock();
|
attributeToTenantStart = clock();
|
||||||
}
|
}
|
||||||
|
@ -254,15 +261,17 @@ AttributeMetricsIfApplicable()
|
||||||
|
|
||||||
if (MultiTenantMonitoringLogLevel != CITUS_LOG_LEVEL_OFF)
|
if (MultiTenantMonitoringLogLevel != CITUS_LOG_LEVEL_OFF)
|
||||||
{
|
{
|
||||||
ereport(NOTICE, (errmsg("attribute cpu counter (%f) to tenant: %s", cpu_time_used,
|
ereport(NOTICE, (errmsg("attribute cpu counter (%f) to tenant: %s",
|
||||||
attributeToTenant)));
|
cpu_time_used, attributeToTenant)));
|
||||||
}
|
}
|
||||||
|
|
||||||
MultiTenantMonitor *monitor = GetMultiTenantMonitor();
|
MultiTenantMonitor *monitor = GetMultiTenantMonitor();
|
||||||
|
|
||||||
LWLockAcquire(&monitor->lock, LW_SHARED);
|
LWLockAcquire(&monitor->lock, LW_SHARED);
|
||||||
|
|
||||||
monitor->periodStart = monitor->periodStart + ((queryTime-monitor->periodStart)/CitusStatsTenantsPeriod)*CitusStatsTenantsPeriod;
|
monitor->periodStart = monitor->periodStart +
|
||||||
|
((queryTime - monitor->periodStart) / CitusStatsTenantsPeriod) *
|
||||||
|
CitusStatsTenantsPeriod;
|
||||||
|
|
||||||
int tenantIndex = FindTenantStats(monitor);
|
int tenantIndex = FindTenantStats(monitor);
|
||||||
|
|
||||||
|
@ -288,7 +297,8 @@ AttributeMetricsIfApplicable()
|
||||||
/*
|
/*
|
||||||
* After updating the score we might need to change the rank of the tenant in the monitor
|
* After updating the score we might need to change the rank of the tenant in the monitor
|
||||||
*/
|
*/
|
||||||
while(tenantIndex != 0 && monitor->tenants[tenantIndex-1].score < tenantStats->score)
|
while (tenantIndex != 0 &&
|
||||||
|
monitor->tenants[tenantIndex - 1].score < tenantStats->score)
|
||||||
{
|
{
|
||||||
LWLockAcquire(&monitor->tenants[tenantIndex - 1].lock, LW_EXCLUSIVE);
|
LWLockAcquire(&monitor->tenants[tenantIndex - 1].lock, LW_EXCLUSIVE);
|
||||||
|
|
||||||
|
@ -334,7 +344,8 @@ AttributeMetricsIfApplicable()
|
||||||
|
|
||||||
if (MultiTenantMonitoringLogLevel != CITUS_LOG_LEVEL_OFF)
|
if (MultiTenantMonitoringLogLevel != CITUS_LOG_LEVEL_OFF)
|
||||||
{
|
{
|
||||||
ereport(NOTICE, (errmsg("total select count = %d, total CPU time = %f to tenant: %s", tenantStats->selectCount, tenantStats->totalSelectTime,
|
ereport(NOTICE, (errmsg("total select count = %d, total CPU time = %f to tenant: %s",
|
||||||
|
tenantStats->selectCount, tenantStats->totalSelectTime,
|
||||||
tenantStats->tenantAttribute)));
|
tenantStats->tenantAttribute)));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -358,7 +369,8 @@ UpdatePeriodsIfNecessary(MultiTenantMonitor *monitor,TenantStats *tenantStats)
|
||||||
* If the last query in this tenant was before the start of current period
|
* If the last query in this tenant was before the start of current period
|
||||||
* but there are some query count for this period we move them to the last period.
|
* but there are some query count for this period we move them to the last period.
|
||||||
*/
|
*/
|
||||||
if (tenantStats->lastQueryTime < monitor->periodStart && (tenantStats->insertsInThisPeriod || tenantStats->selectsInThisPeriod))
|
if (tenantStats->lastQueryTime < monitor->periodStart &&
|
||||||
|
(tenantStats->insertsInThisPeriod || tenantStats->selectsInThisPeriod))
|
||||||
{
|
{
|
||||||
tenantStats->insertsInLastPeriod = tenantStats->insertsInThisPeriod;
|
tenantStats->insertsInLastPeriod = tenantStats->insertsInThisPeriod;
|
||||||
tenantStats->insertsInThisPeriod = 0;
|
tenantStats->insertsInThisPeriod = 0;
|
||||||
|
@ -366,6 +378,7 @@ UpdatePeriodsIfNecessary(MultiTenantMonitor *monitor,TenantStats *tenantStats)
|
||||||
tenantStats->selectsInLastPeriod = tenantStats->selectsInThisPeriod;
|
tenantStats->selectsInLastPeriod = tenantStats->selectsInThisPeriod;
|
||||||
tenantStats->selectsInThisPeriod = 0;
|
tenantStats->selectsInThisPeriod = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* If the last query is more than two periods ago, we clean the last period counts too.
|
* If the last query is more than two periods ago, we clean the last period counts too.
|
||||||
*/
|
*/
|
||||||
|
@ -385,7 +398,8 @@ UpdatePeriodsIfNecessary(MultiTenantMonitor *monitor,TenantStats *tenantStats)
|
||||||
* periods that passed after the lsat score reduction and reduces the score accordingly.
|
* periods that passed after the lsat score reduction and reduces the score accordingly.
|
||||||
*/
|
*/
|
||||||
static void
|
static void
|
||||||
ReduceScoreIfNecessary(MultiTenantMonitor *monitor, TenantStats *tenantStats, time_t updateTime)
|
ReduceScoreIfNecessary(MultiTenantMonitor *monitor, TenantStats *tenantStats,
|
||||||
|
time_t updateTime)
|
||||||
{
|
{
|
||||||
/*
|
/*
|
||||||
* With each query we increase the score of tenant by ONE_QUERY_SCORE.
|
* With each query we increase the score of tenant by ONE_QUERY_SCORE.
|
||||||
|
@ -395,7 +409,10 @@ ReduceScoreIfNecessary(MultiTenantMonitor *monitor, TenantStats *tenantStats, ti
|
||||||
* If the latest score reduction was in this period this number should be 0,
|
* If the latest score reduction was in this period this number should be 0,
|
||||||
* if it was in the last period this number should be 1 and so on.
|
* if it was in the last period this number should be 1 and so on.
|
||||||
*/
|
*/
|
||||||
int periodCountAfterLastScoreReduction = (monitor->periodStart - tenantStats->lastScoreReduction + CitusStatsTenantsPeriod -1) / CitusStatsTenantsPeriod;
|
int periodCountAfterLastScoreReduction = (monitor->periodStart -
|
||||||
|
tenantStats->lastScoreReduction +
|
||||||
|
CitusStatsTenantsPeriod - 1) /
|
||||||
|
CitusStatsTenantsPeriod;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* This should not happen but let's make sure
|
* This should not happen but let's make sure
|
||||||
|
@ -520,8 +537,10 @@ CreateTenantStats(MultiTenantMonitor *monitor)
|
||||||
|
|
||||||
monitor->tenants[tenantIndex].namedLockTranche.trancheId = LWLockNewTrancheId();
|
monitor->tenants[tenantIndex].namedLockTranche.trancheId = LWLockNewTrancheId();
|
||||||
|
|
||||||
LWLockRegisterTranche(monitor->tenants[tenantIndex].namedLockTranche.trancheId, trancheName);
|
LWLockRegisterTranche(monitor->tenants[tenantIndex].namedLockTranche.trancheId,
|
||||||
LWLockInitialize(&monitor->tenants[tenantIndex].lock, monitor->tenants[tenantIndex].namedLockTranche.trancheId);
|
trancheName);
|
||||||
|
LWLockInitialize(&monitor->tenants[tenantIndex].lock,
|
||||||
|
monitor->tenants[tenantIndex].namedLockTranche.trancheId);
|
||||||
|
|
||||||
monitor->tenantCount++;
|
monitor->tenantCount++;
|
||||||
|
|
||||||
|
@ -538,7 +557,8 @@ FindTenantStats(MultiTenantMonitor *monitor)
|
||||||
for (int i = 0; i < monitor->tenantCount; i++)
|
for (int i = 0; i < monitor->tenantCount; i++)
|
||||||
{
|
{
|
||||||
TenantStats *tenantStats = &monitor->tenants[i];
|
TenantStats *tenantStats = &monitor->tenants[i];
|
||||||
if (strcmp(tenantStats->tenantAttribute, attributeToTenant) == 0 && tenantStats->colocationGroupId == colocationGroupId)
|
if (strcmp(tenantStats->tenantAttribute, attributeToTenant) == 0 &&
|
||||||
|
tenantStats->colocationGroupId == colocationGroupId)
|
||||||
{
|
{
|
||||||
return i;
|
return i;
|
||||||
}
|
}
|
||||||
|
|
|
@ -64,4 +64,4 @@ extern int MultiTenantMonitoringLogLevel;
|
||||||
extern int CitusStatsTenantsPeriod;
|
extern int CitusStatsTenantsPeriod;
|
||||||
extern int CitusStatsTenantsLimit;
|
extern int CitusStatsTenantsLimit;
|
||||||
|
|
||||||
#endif //CITUS_ATTRIBUTE_H
|
#endif /*CITUS_ATTRIBUTE_H */
|
||||||
|
|
Loading…
Reference in New Issue