[PG-159] pg_stat_monitor: Bucket start time should be aligned with fix number of seconds

The buckets are now created with the start time a modulus of the bucket time size.
So if we have a 10 second bucket, the start times would reflect that:
- Bucket1: 00:00:00
- Bucket2: 00:00:10
- Bucket3: 00:00:20
...

Previously, the start time of the bucket was aligned with the first query that
arrives in that bucket. However, now the behaviour is changed. So, even if the
first query for bucket 2 arrives at 00:00:13, the start time would still be set
to 00:00:10.

This change now makes the bucketing separated out by fixed time windows so that
external applications can easily consume that data and chart it.

Also, as part of this change, locking of pgss is updated now and extended
to last the bucket related changes.
pull/266/head
Hamid Akhtar 2022-06-20 17:53:40 +05:00
parent 053f1d6e56
commit 7efc3fa50e
2 changed files with 51 additions and 59 deletions

View File

@ -2085,55 +2085,54 @@ static uint64
get_next_wbucket(pgssSharedState *pgss) get_next_wbucket(pgssSharedState *pgss)
{ {
struct timeval tv; struct timeval tv;
uint64 current_usec; uint64 current_sec;
uint64 current_bucket_usec; uint64 current_bucket_sec;
uint64 new_bucket_id; uint64 new_bucket_id;
uint64 prev_bucket_id; uint64 prev_bucket_id;
struct tm *lt; struct tm *lt;
bool update_bucket = false; char file_name[1024];
gettimeofday(&tv,NULL); gettimeofday(&tv,NULL);
current_usec = (TimestampTz) tv.tv_sec - ((POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE) * SECS_PER_DAY); current_sec = (TimestampTz) tv.tv_sec - ((POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE) * SECS_PER_DAY);
current_usec = (current_usec * USECS_PER_SEC) + tv.tv_usec; current_bucket_sec = pg_atomic_read_u64(&pgss->prev_bucket_sec);
current_bucket_usec = pg_atomic_read_u64(&pgss->prev_bucket_usec);
/* /*
* If current bucket expired we loop attempting to update prev_bucket_usec. * If current bucket expired we loop attempting to update prev_bucket_sec.
* *
* pg_atomic_compare_exchange_u64 may fail in two possible ways: * pg_atomic_compare_exchange_u64 may fail in two possible ways:
* 1. Another thread/process updated the variable before us. * 1. Another thread/process updated the variable before us.
* 2. A spurious failure / hardware event. * 2. A spurious failure / hardware event.
* *
* In both failure cases we read prev_bucket_usec from memory again, if it was * In both failure cases we read prev_bucket_sec from memory again, if it was
* a spurious failure then the value of prev_bucket_usec must be the same as * a spurious failure then the value of prev_bucket_sec must be the same as
* before, which will cause the while loop to execute again. * before, which will cause the while loop to execute again.
* *
* If another thread updated prev_bucket_usec, then its current value will * If another thread updated prev_bucket_sec, then its current value will
* definitely make the while condition to fail, we can stop the loop as another * definitely make the while condition to fail, we can stop the loop as another
* thread has already updated prev_bucket_usec. * thread has already updated prev_bucket_sec.
*/ */
while ((current_usec - current_bucket_usec) > ((uint64)PGSM_BUCKET_TIME * 1000LU * 1000LU)) if ((current_sec - current_bucket_sec) < ((uint64)PGSM_BUCKET_TIME * 1000LU * 1000LU))
{ {
if (pg_atomic_compare_exchange_u64(&pgss->prev_bucket_usec, &current_bucket_usec, current_usec)) return pg_atomic_read_u64(&pgss->current_wbucket);
{
update_bucket = true;
break;
} }
current_bucket_usec = pg_atomic_read_u64(&pgss->prev_bucket_usec);
}
if (update_bucket)
{
char file_name[1024];
int sec = 0;
new_bucket_id = (tv.tv_sec / PGSM_BUCKET_TIME) % PGSM_MAX_BUCKETS; new_bucket_id = (tv.tv_sec / PGSM_BUCKET_TIME) % PGSM_MAX_BUCKETS;
/* Update bucket id and retrieve the previous one. */ /* Update bucket id and retrieve the previous one. */
prev_bucket_id = pg_atomic_exchange_u64(&pgss->current_wbucket, new_bucket_id); prev_bucket_id = pg_atomic_exchange_u64(&pgss->current_wbucket, new_bucket_id);
tv.tv_sec = (tv.tv_sec) - (tv.tv_sec % PGSM_BUCKET_TIME);
lt = localtime(&tv.tv_sec);
LWLockAcquire(pgss->lock, LW_EXCLUSIVE); LWLockAcquire(pgss->lock, LW_EXCLUSIVE);
/* Reconfirm that no other backend has created the bucket while we waited */
if (new_bucket_id == prev_bucket_id)
{
LWLockRelease(pgss->lock);
return new_bucket_id;
}
hash_entry_dealloc(new_bucket_id, prev_bucket_id, pgss_qbuf); hash_entry_dealloc(new_bucket_id, prev_bucket_id, pgss_qbuf);
if (pgss->overflow) if (pgss->overflow)
@ -2153,21 +2152,14 @@ get_next_wbucket(pgssSharedState *pgss)
} }
} }
snprintf(pgss->bucket_start_time[new_bucket_id], sizeof(pgss->bucket_start_time[new_bucket_id]),
"%04d-%02d-%02d %02d:%02d:%02d", lt->tm_year + 1900, lt->tm_mon + 1, lt->tm_mday, lt->tm_hour, lt->tm_min, lt->tm_sec);
LWLockRelease(pgss->lock); LWLockRelease(pgss->lock);
lt = localtime(&tv.tv_sec);
sec = lt->tm_sec - (lt->tm_sec % PGSM_BUCKET_TIME);
if (sec < 0)
sec = 0;
snprintf(pgss->bucket_start_time[new_bucket_id], sizeof(pgss->bucket_start_time[new_bucket_id]),
"%04d-%02d-%02d %02d:%02d:%02d", lt->tm_year + 1900, lt->tm_mon + 1, lt->tm_mday, lt->tm_hour, lt->tm_min, sec);
return new_bucket_id; return new_bucket_id;
} }
return pg_atomic_read_u64(&pgss->current_wbucket);
}
#if PG_VERSION_NUM < 140000 #if PG_VERSION_NUM < 140000
/* /*
* AppendJumble: Append a value that is substantive in a given query to * AppendJumble: Append a value that is substantive in a given query to

View File

@ -310,7 +310,7 @@ typedef struct pgssSharedState
Size extent; /* current extent of query file */ Size extent; /* current extent of query file */
int64 n_writers; /* number of active writers to query file */ int64 n_writers; /* number of active writers to query file */
pg_atomic_uint64 current_wbucket; pg_atomic_uint64 current_wbucket;
pg_atomic_uint64 prev_bucket_usec; pg_atomic_uint64 prev_bucket_sec;
uint64 bucket_entry[MAX_BUCKETS]; uint64 bucket_entry[MAX_BUCKETS];
char bucket_start_time[MAX_BUCKETS][60]; /* start time of the bucket */ char bucket_start_time[MAX_BUCKETS][60]; /* start time of the bucket */
LWLock *errors_lock; /* protects errors hashtable search/modification */ LWLock *errors_lock; /* protects errors hashtable search/modification */
@ -336,7 +336,7 @@ do { \
x->cur_median_usage = ASSUMED_MEDIAN_INIT; \ x->cur_median_usage = ASSUMED_MEDIAN_INIT; \
x->n_writers = 0; \ x->n_writers = 0; \
pg_atomic_init_u64(&x->current_wbucket, 0); \ pg_atomic_init_u64(&x->current_wbucket, 0); \
pg_atomic_init_u64(&x->prev_bucket_usec, 0); \ pg_atomic_init_u64(&x->prev_bucket_sec, 0); \
memset(&x->bucket_entry, 0, MAX_BUCKETS * sizeof(uint64)); \ memset(&x->bucket_entry, 0, MAX_BUCKETS * sizeof(uint64)); \
} while(0) } while(0)