Merge pull request #266 from EngineeredVirus/main
[PG-159] pg_stat_monitor: Bucket start time should be aligned with fi…pull/268/head
commit
2ba1960c2f
|
@ -2085,87 +2085,79 @@ static uint64
|
||||||
get_next_wbucket(pgssSharedState *pgss)
|
get_next_wbucket(pgssSharedState *pgss)
|
||||||
{
|
{
|
||||||
struct timeval tv;
|
struct timeval tv;
|
||||||
uint64 current_usec;
|
uint64 current_sec;
|
||||||
uint64 current_bucket_usec;
|
uint64 current_bucket_sec;
|
||||||
uint64 new_bucket_id;
|
uint64 new_bucket_id;
|
||||||
uint64 prev_bucket_id;
|
uint64 prev_bucket_id;
|
||||||
struct tm *lt;
|
struct tm *lt;
|
||||||
bool update_bucket = false;
|
char file_name[1024];
|
||||||
|
|
||||||
gettimeofday(&tv,NULL);
|
gettimeofday(&tv,NULL);
|
||||||
current_usec = (TimestampTz) tv.tv_sec - ((POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE) * SECS_PER_DAY);
|
current_sec = (TimestampTz) tv.tv_sec - ((POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE) * SECS_PER_DAY);
|
||||||
current_usec = (current_usec * USECS_PER_SEC) + tv.tv_usec;
|
current_bucket_sec = pg_atomic_read_u64(&pgss->prev_bucket_sec);
|
||||||
current_bucket_usec = pg_atomic_read_u64(&pgss->prev_bucket_usec);
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* If current bucket expired we loop attempting to update prev_bucket_usec.
|
* If current bucket expired we loop attempting to update prev_bucket_sec.
|
||||||
*
|
*
|
||||||
* pg_atomic_compare_exchange_u64 may fail in two possible ways:
|
* pg_atomic_compare_exchange_u64 may fail in two possible ways:
|
||||||
* 1. Another thread/process updated the variable before us.
|
* 1. Another thread/process updated the variable before us.
|
||||||
* 2. A spurious failure / hardware event.
|
* 2. A spurious failure / hardware event.
|
||||||
*
|
*
|
||||||
* In both failure cases we read prev_bucket_usec from memory again, if it was
|
* In both failure cases we read prev_bucket_sec from memory again, if it was
|
||||||
* a spurious failure then the value of prev_bucket_usec must be the same as
|
* a spurious failure then the value of prev_bucket_sec must be the same as
|
||||||
* before, which will cause the while loop to execute again.
|
* before, which will cause the while loop to execute again.
|
||||||
*
|
*
|
||||||
* If another thread updated prev_bucket_usec, then its current value will
|
* If another thread updated prev_bucket_sec, then its current value will
|
||||||
* definitely make the while condition to fail, we can stop the loop as another
|
* definitely make the while condition to fail, we can stop the loop as another
|
||||||
* thread has already updated prev_bucket_usec.
|
* thread has already updated prev_bucket_sec.
|
||||||
*/
|
*/
|
||||||
while ((current_usec - current_bucket_usec) > ((uint64)PGSM_BUCKET_TIME * 1000LU * 1000LU))
|
if ((current_sec - current_bucket_sec) < ((uint64)PGSM_BUCKET_TIME * 1000LU * 1000LU))
|
||||||
{
|
{
|
||||||
if (pg_atomic_compare_exchange_u64(&pgss->prev_bucket_usec, ¤t_bucket_usec, current_usec))
|
return pg_atomic_read_u64(&pgss->current_wbucket);
|
||||||
{
|
|
||||||
update_bucket = true;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
current_bucket_usec = pg_atomic_read_u64(&pgss->prev_bucket_usec);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (update_bucket)
|
new_bucket_id = (tv.tv_sec / PGSM_BUCKET_TIME) % PGSM_MAX_BUCKETS;
|
||||||
|
|
||||||
|
/* Update bucket id and retrieve the previous one. */
|
||||||
|
prev_bucket_id = pg_atomic_exchange_u64(&pgss->current_wbucket, new_bucket_id);
|
||||||
|
|
||||||
|
tv.tv_sec = (tv.tv_sec) - (tv.tv_sec % PGSM_BUCKET_TIME);
|
||||||
|
lt = localtime(&tv.tv_sec);
|
||||||
|
|
||||||
|
LWLockAcquire(pgss->lock, LW_EXCLUSIVE);
|
||||||
|
|
||||||
|
/* Reconfirm that no other backend has created the bucket while we waited */
|
||||||
|
if (new_bucket_id == prev_bucket_id)
|
||||||
{
|
{
|
||||||
char file_name[1024];
|
|
||||||
int sec = 0;
|
|
||||||
|
|
||||||
new_bucket_id = (tv.tv_sec / PGSM_BUCKET_TIME) % PGSM_MAX_BUCKETS;
|
|
||||||
|
|
||||||
/* Update bucket id and retrieve the previous one. */
|
|
||||||
prev_bucket_id = pg_atomic_exchange_u64(&pgss->current_wbucket, new_bucket_id);
|
|
||||||
|
|
||||||
LWLockAcquire(pgss->lock, LW_EXCLUSIVE);
|
|
||||||
hash_entry_dealloc(new_bucket_id, prev_bucket_id, pgss_qbuf);
|
|
||||||
|
|
||||||
if (pgss->overflow)
|
|
||||||
{
|
|
||||||
pgss->n_bucket_cycles += 1;
|
|
||||||
if (pgss->n_bucket_cycles >= PGSM_MAX_BUCKETS)
|
|
||||||
{
|
|
||||||
/*
|
|
||||||
* A full rotation of PGSM_MAX_BUCKETS buckets happened since
|
|
||||||
* we detected a query buffer overflow.
|
|
||||||
* Reset overflow state and remove the dump file.
|
|
||||||
*/
|
|
||||||
pgss->overflow = false;
|
|
||||||
pgss->n_bucket_cycles = 0;
|
|
||||||
snprintf(file_name, 1024, "%s", PGSM_TEXT_FILE);
|
|
||||||
unlink(file_name);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
LWLockRelease(pgss->lock);
|
LWLockRelease(pgss->lock);
|
||||||
|
|
||||||
lt = localtime(&tv.tv_sec);
|
|
||||||
sec = lt->tm_sec - (lt->tm_sec % PGSM_BUCKET_TIME);
|
|
||||||
if (sec < 0)
|
|
||||||
sec = 0;
|
|
||||||
snprintf(pgss->bucket_start_time[new_bucket_id], sizeof(pgss->bucket_start_time[new_bucket_id]),
|
|
||||||
"%04d-%02d-%02d %02d:%02d:%02d", lt->tm_year + 1900, lt->tm_mon + 1, lt->tm_mday, lt->tm_hour, lt->tm_min, sec);
|
|
||||||
|
|
||||||
return new_bucket_id;
|
return new_bucket_id;
|
||||||
}
|
}
|
||||||
|
|
||||||
return pg_atomic_read_u64(&pgss->current_wbucket);
|
hash_entry_dealloc(new_bucket_id, prev_bucket_id, pgss_qbuf);
|
||||||
|
|
||||||
|
if (pgss->overflow)
|
||||||
|
{
|
||||||
|
pgss->n_bucket_cycles += 1;
|
||||||
|
if (pgss->n_bucket_cycles >= PGSM_MAX_BUCKETS)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* A full rotation of PGSM_MAX_BUCKETS buckets happened since
|
||||||
|
* we detected a query buffer overflow.
|
||||||
|
* Reset overflow state and remove the dump file.
|
||||||
|
*/
|
||||||
|
pgss->overflow = false;
|
||||||
|
pgss->n_bucket_cycles = 0;
|
||||||
|
snprintf(file_name, 1024, "%s", PGSM_TEXT_FILE);
|
||||||
|
unlink(file_name);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
snprintf(pgss->bucket_start_time[new_bucket_id], sizeof(pgss->bucket_start_time[new_bucket_id]),
|
||||||
|
"%04d-%02d-%02d %02d:%02d:%02d", lt->tm_year + 1900, lt->tm_mon + 1, lt->tm_mday, lt->tm_hour, lt->tm_min, lt->tm_sec);
|
||||||
|
|
||||||
|
LWLockRelease(pgss->lock);
|
||||||
|
|
||||||
|
return new_bucket_id;
|
||||||
}
|
}
|
||||||
|
|
||||||
#if PG_VERSION_NUM < 140000
|
#if PG_VERSION_NUM < 140000
|
||||||
|
|
|
@ -310,7 +310,7 @@ typedef struct pgssSharedState
|
||||||
Size extent; /* current extent of query file */
|
Size extent; /* current extent of query file */
|
||||||
int64 n_writers; /* number of active writers to query file */
|
int64 n_writers; /* number of active writers to query file */
|
||||||
pg_atomic_uint64 current_wbucket;
|
pg_atomic_uint64 current_wbucket;
|
||||||
pg_atomic_uint64 prev_bucket_usec;
|
pg_atomic_uint64 prev_bucket_sec;
|
||||||
uint64 bucket_entry[MAX_BUCKETS];
|
uint64 bucket_entry[MAX_BUCKETS];
|
||||||
char bucket_start_time[MAX_BUCKETS][60]; /* start time of the bucket */
|
char bucket_start_time[MAX_BUCKETS][60]; /* start time of the bucket */
|
||||||
LWLock *errors_lock; /* protects errors hashtable search/modification */
|
LWLock *errors_lock; /* protects errors hashtable search/modification */
|
||||||
|
@ -336,7 +336,7 @@ do { \
|
||||||
x->cur_median_usage = ASSUMED_MEDIAN_INIT; \
|
x->cur_median_usage = ASSUMED_MEDIAN_INIT; \
|
||||||
x->n_writers = 0; \
|
x->n_writers = 0; \
|
||||||
pg_atomic_init_u64(&x->current_wbucket, 0); \
|
pg_atomic_init_u64(&x->current_wbucket, 0); \
|
||||||
pg_atomic_init_u64(&x->prev_bucket_usec, 0); \
|
pg_atomic_init_u64(&x->prev_bucket_sec, 0); \
|
||||||
memset(&x->bucket_entry, 0, MAX_BUCKETS * sizeof(uint64)); \
|
memset(&x->bucket_entry, 0, MAX_BUCKETS * sizeof(uint64)); \
|
||||||
} while(0)
|
} while(0)
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue