PG-286: Fix query buffer overflow management.

If pgsm_overflow_target is ON (default, 1) and the query buffer
overflows, we now dump the buffer and keep track of how many times
pg_stat_monitor changed bucket since that.

If an overflow happen again before pg_stat_monitor cycle through
pgsm_max_buckets buckets (default 10), then we don't dump the buffer
again, but instead report an error, this ensures that only one dump file
of size pgsm_query_shared_buffer will be in disk at any time, avoiding
slowing down queries to the pg_stat_monitor view.

As soon as pg_stat_monitor cycles through all buckets, we remove the
dump file and reset the counter (pgss->n_bucket_cycles).
pull/184/head
Diego Fronza 2021-11-29 15:25:56 -03:00 committed by Hamid Akhtar
parent df89c3f4a3
commit 8c61e24f95
2 changed files with 50 additions and 11 deletions

View File

@ -215,7 +215,8 @@ static uint64 djb2_hash(unsigned char *str, size_t len);
void
_PG_init(void)
{
int i, rc;
int rc;
char file_name[1024];
elog(DEBUG2, "pg_stat_monitor: %s()", __FUNCTION__);
/*
@ -240,12 +241,8 @@ _PG_init(void)
EnableQueryId();
#endif
for (i = 0; i < PGSM_MAX_BUCKETS; i++)
{
char file_name[1024];
snprintf(file_name, 1024, "%s.%d", PGSM_TEXT_FILE, i);
unlink(file_name);
}
snprintf(file_name, 1024, "%s", PGSM_TEXT_FILE);
unlink(file_name);
EmitWarningsOnPlaceholders("pg_stat_monitor");
@ -2085,8 +2082,22 @@ get_next_wbucket(pgssSharedState *pgss)
LWLockAcquire(pgss->lock, LW_EXCLUSIVE);
hash_entry_dealloc(new_bucket_id, prev_bucket_id, pgss_qbuf);
snprintf(file_name, 1024, "%s.%d", PGSM_TEXT_FILE, (int)new_bucket_id);
unlink(file_name);
if (pgss->overflow)
{
pgss->n_bucket_cycles += 1;
if (pgss->n_bucket_cycles >= PGSM_MAX_BUCKETS)
{
/*
* A full rotation of PGSM_MAX_BUCKETS buckets happened since
* we detected a query buffer overflow.
* Reset overflow state and remove the dump file.
*/
pgss->overflow = false;
pgss->n_bucket_cycles = 0;
snprintf(file_name, 1024, "%s", PGSM_TEXT_FILE);
unlink(file_name);
}
}
LWLockRelease(pgss->lock);
@ -3131,6 +3142,13 @@ SaveQueryText(uint64 bucketid,
case OVERFLOW_TARGET_DISK:
{
bool dump_ok;
pgssSharedState *pgss = pgsm_get_ss();
if (pgss->overflow)
{
pgsm_log_error("query buffer overflowed twice");
return false;
}
/*
* If the query buffer is empty, there is nothing to dump, this also
@ -3142,6 +3160,12 @@ SaveQueryText(uint64 bucketid,
dump_ok = dump_queries_buffer(bucketid, buf, MAX_QUERY_BUF);
buf_len = sizeof (uint64);
if (dump_ok)
{
pgss->overflow = true;
pgss->n_bucket_cycles = 0;
}
/*
* We must check for overflow again, as the query length may
* exceed the total size allocated to the buffer (MAX_QUERY_BUF).
@ -3367,7 +3391,7 @@ dump_queries_buffer(int bucket_id, unsigned char *buf, int buf_len)
int off = 0;
int tries = 0;
snprintf(file_name, 1024, "%s.%d", PGSM_TEXT_FILE, bucket_id);
snprintf(file_name, 1024, "%s", PGSM_TEXT_FILE);
fd = OpenTransientFile(file_name, O_RDWR | O_CREAT | O_APPEND | PG_BINARY);
if (fd < 0)
{
@ -3423,7 +3447,7 @@ read_query_buffer(int bucket_id, uint64 queryid, char *query_txt, size_t pos)
bool done = false;
bool found = false;
snprintf(file_name, 1024, "%s.%d", PGSM_TEXT_FILE, bucket_id);
snprintf(file_name, 1024, "%s", PGSM_TEXT_FILE);
fd = OpenTransientFile(file_name, O_RDONLY | PG_BINARY);
if (fd < 0)
goto exit;

View File

@ -306,6 +306,21 @@ typedef struct pgssSharedState
pg_atomic_uint64 prev_bucket_usec;
uint64 bucket_entry[MAX_BUCKETS];
char bucket_start_time[MAX_BUCKETS][60]; /* start time of the bucket */
LWLock *errors_lock; /* protects errors hashtable search/modification */
/*
* These variables are used when pgsm_overflow_target is ON.
*
* overflow is set to true when the query buffer overflows.
*
* n_bucket_cycles counts the number of times we changed bucket
* since the query buffer overflowed. When it reaches pgsm_max_buckets
* we remove the dump file, also reset the counter.
*
* This allows us to avoid having a large file on disk that would also
* slowdown queries to the pg_stat_monitor view.
*/
bool overflow;
size_t n_bucket_cycles;
} pgssSharedState;
#define ResetSharedState(x) \