From 7efc3fa50e5a42d32e5a67ceb1aac6f115f6b575 Mon Sep 17 00:00:00 2001 From: Hamid Akhtar Date: Mon, 20 Jun 2022 17:53:40 +0500 Subject: [PATCH] [PG-159] pg_stat_monitor: Bucket start time should be aligned with fix number of seconds The buckets are now created with the start time a modulus of the bucket time size. So if we have a 10 second bucket, the start times would reflect that: - Bucket1: 00:00:00 - Bucket2: 00:00:10 - Bucket3: 00:00:20 ... Previously, the start time of the bucket was aligned with the first query that arrives in that bucket. However, now the behaviour is changed. So, even if the first query for bucket 2 arrives at 00:00:13, the start time would still be set to 00:00:10. This change now makes the bucketing separated out by fixed time windows so that external applications can easily consume that data and chart it. Also, as part of this change, locking of pgss is updated now and extended to last the bucket related changes. --- pg_stat_monitor.c | 106 +++++++++++++++++++++------------------------- pg_stat_monitor.h | 4 +- 2 files changed, 51 insertions(+), 59 deletions(-) diff --git a/pg_stat_monitor.c b/pg_stat_monitor.c index b6ba46b..8bb74bf 100644 --- a/pg_stat_monitor.c +++ b/pg_stat_monitor.c @@ -2085,87 +2085,79 @@ static uint64 get_next_wbucket(pgssSharedState *pgss) { struct timeval tv; - uint64 current_usec; - uint64 current_bucket_usec; + uint64 current_sec; + uint64 current_bucket_sec; uint64 new_bucket_id; uint64 prev_bucket_id; struct tm *lt; - bool update_bucket = false; + char file_name[1024]; gettimeofday(&tv,NULL); - current_usec = (TimestampTz) tv.tv_sec - ((POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE) * SECS_PER_DAY); - current_usec = (current_usec * USECS_PER_SEC) + tv.tv_usec; - current_bucket_usec = pg_atomic_read_u64(&pgss->prev_bucket_usec); + current_sec = (TimestampTz) tv.tv_sec - ((POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE) * SECS_PER_DAY); + current_bucket_sec = pg_atomic_read_u64(&pgss->prev_bucket_sec); /* - * If current bucket expired we loop attempting to update prev_bucket_usec. + * If current bucket expired we loop attempting to update prev_bucket_sec. * * pg_atomic_compare_exchange_u64 may fail in two possible ways: * 1. Another thread/process updated the variable before us. * 2. A spurious failure / hardware event. * - * In both failure cases we read prev_bucket_usec from memory again, if it was - * a spurious failure then the value of prev_bucket_usec must be the same as + * In both failure cases we read prev_bucket_sec from memory again, if it was + * a spurious failure then the value of prev_bucket_sec must be the same as * before, which will cause the while loop to execute again. * - * If another thread updated prev_bucket_usec, then its current value will + * If another thread updated prev_bucket_sec, then its current value will * definitely make the while condition to fail, we can stop the loop as another - * thread has already updated prev_bucket_usec. + * thread has already updated prev_bucket_sec. */ - while ((current_usec - current_bucket_usec) > ((uint64)PGSM_BUCKET_TIME * 1000LU * 1000LU)) + if ((current_sec - current_bucket_sec) < ((uint64)PGSM_BUCKET_TIME * 1000LU * 1000LU)) { - if (pg_atomic_compare_exchange_u64(&pgss->prev_bucket_usec, ¤t_bucket_usec, current_usec)) - { - update_bucket = true; - break; - } - - current_bucket_usec = pg_atomic_read_u64(&pgss->prev_bucket_usec); + return pg_atomic_read_u64(&pgss->current_wbucket); } - if (update_bucket) + new_bucket_id = (tv.tv_sec / PGSM_BUCKET_TIME) % PGSM_MAX_BUCKETS; + + /* Update bucket id and retrieve the previous one. */ + prev_bucket_id = pg_atomic_exchange_u64(&pgss->current_wbucket, new_bucket_id); + + tv.tv_sec = (tv.tv_sec) - (tv.tv_sec % PGSM_BUCKET_TIME); + lt = localtime(&tv.tv_sec); + + LWLockAcquire(pgss->lock, LW_EXCLUSIVE); + + /* Reconfirm that no other backend has created the bucket while we waited */ + if (new_bucket_id == prev_bucket_id) { - char file_name[1024]; - int sec = 0; - - new_bucket_id = (tv.tv_sec / PGSM_BUCKET_TIME) % PGSM_MAX_BUCKETS; - - /* Update bucket id and retrieve the previous one. */ - prev_bucket_id = pg_atomic_exchange_u64(&pgss->current_wbucket, new_bucket_id); - - LWLockAcquire(pgss->lock, LW_EXCLUSIVE); - hash_entry_dealloc(new_bucket_id, prev_bucket_id, pgss_qbuf); - - if (pgss->overflow) - { - pgss->n_bucket_cycles += 1; - if (pgss->n_bucket_cycles >= PGSM_MAX_BUCKETS) - { - /* - * A full rotation of PGSM_MAX_BUCKETS buckets happened since - * we detected a query buffer overflow. - * Reset overflow state and remove the dump file. - */ - pgss->overflow = false; - pgss->n_bucket_cycles = 0; - snprintf(file_name, 1024, "%s", PGSM_TEXT_FILE); - unlink(file_name); - } - } - LWLockRelease(pgss->lock); - - lt = localtime(&tv.tv_sec); - sec = lt->tm_sec - (lt->tm_sec % PGSM_BUCKET_TIME); - if (sec < 0) - sec = 0; - snprintf(pgss->bucket_start_time[new_bucket_id], sizeof(pgss->bucket_start_time[new_bucket_id]), - "%04d-%02d-%02d %02d:%02d:%02d", lt->tm_year + 1900, lt->tm_mon + 1, lt->tm_mday, lt->tm_hour, lt->tm_min, sec); - return new_bucket_id; } - return pg_atomic_read_u64(&pgss->current_wbucket); + hash_entry_dealloc(new_bucket_id, prev_bucket_id, pgss_qbuf); + + if (pgss->overflow) + { + pgss->n_bucket_cycles += 1; + if (pgss->n_bucket_cycles >= PGSM_MAX_BUCKETS) + { + /* + * A full rotation of PGSM_MAX_BUCKETS buckets happened since + * we detected a query buffer overflow. + * Reset overflow state and remove the dump file. + */ + pgss->overflow = false; + pgss->n_bucket_cycles = 0; + snprintf(file_name, 1024, "%s", PGSM_TEXT_FILE); + unlink(file_name); + } + } + + snprintf(pgss->bucket_start_time[new_bucket_id], sizeof(pgss->bucket_start_time[new_bucket_id]), + "%04d-%02d-%02d %02d:%02d:%02d", lt->tm_year + 1900, lt->tm_mon + 1, lt->tm_mday, lt->tm_hour, lt->tm_min, lt->tm_sec); + + LWLockRelease(pgss->lock); + + return new_bucket_id; } #if PG_VERSION_NUM < 140000 diff --git a/pg_stat_monitor.h b/pg_stat_monitor.h index f729cbd..cb47663 100644 --- a/pg_stat_monitor.h +++ b/pg_stat_monitor.h @@ -310,7 +310,7 @@ typedef struct pgssSharedState Size extent; /* current extent of query file */ int64 n_writers; /* number of active writers to query file */ pg_atomic_uint64 current_wbucket; - pg_atomic_uint64 prev_bucket_usec; + pg_atomic_uint64 prev_bucket_sec; uint64 bucket_entry[MAX_BUCKETS]; char bucket_start_time[MAX_BUCKETS][60]; /* start time of the bucket */ LWLock *errors_lock; /* protects errors hashtable search/modification */ @@ -336,7 +336,7 @@ do { \ x->cur_median_usage = ASSUMED_MEDIAN_INIT; \ x->n_writers = 0; \ pg_atomic_init_u64(&x->current_wbucket, 0); \ - pg_atomic_init_u64(&x->prev_bucket_usec, 0); \ + pg_atomic_init_u64(&x->prev_bucket_sec, 0); \ memset(&x->bucket_entry, 0, MAX_BUCKETS * sizeof(uint64)); \ } while(0)