From f66b45afd60756c5c6604664984b754fcf0f3e4f Mon Sep 17 00:00:00 2001 From: Diego Fronza Date: Mon, 25 Oct 2021 16:55:30 -0300 Subject: [PATCH] PG-220: Fix read/write of dumped query buffer to files. This commit fix some issues when the query buffer overflows and pg_stat_monitor attempts to dump its contents to a file. The dump process is now as follows: 1. The dump will always be a full dump of the current query buffer, meaning pg_stat_monitor will dump MAX_QUERY_BUFFER_BUCKET bytes to the dump file. 2. When scanning the dump file, read chunks of size MAX_QUERY_BUFFER_BUCKET, then look for the query ID using that chunk and the query position metadata, this allows pg_stat_monitor to avoid scanning the whole chunk when looking for a query ID. The code in charge to read from/write to the dump file now takes into account that read() and write() may return less bytes than what it was asked for, the code now ensures that we actually read or write the amount of bytes required (MAX_QUERY_BUFFER_BUCKET), also it handles rare but posssible interrupts when doing those operations. --- pg_stat_monitor.c | 125 ++++++++++++++++++++++++++++++++-------------- pg_stat_monitor.h | 2 +- 2 files changed, 88 insertions(+), 39 deletions(-) diff --git a/pg_stat_monitor.c b/pg_stat_monitor.c index 127d6a3..93789c3 100644 --- a/pg_stat_monitor.c +++ b/pg_stat_monitor.c @@ -1701,9 +1701,9 @@ pg_stat_monitor_internal(FunctionCallInfo fcinfo, if (read_query(buf, queryid, query_txt, entry->query_pos) == 0) { - int len; - len = read_query_buffer(bucketid, queryid, query_txt); - if (len != MAX_QUERY_BUFFER_BUCKET) + int rc; + rc = read_query_buffer(bucketid, queryid, query_txt, entry->query_pos); + if (rc != 1) snprintf(query_txt, 32, "%s", ""); } @@ -1726,11 +1726,11 @@ pg_stat_monitor_internal(FunctionCallInfo fcinfo, if (tmp.info.parentid != UINT64CONST(0)) { - int len = 0; + int rc = 0; if (read_query(buf, tmp.info.parentid, parent_query_txt, 0) == 0) { - len = read_query_buffer(bucketid, tmp.info.parentid, parent_query_txt); - if (len != MAX_QUERY_BUFFER_BUCKET) + rc = read_query_buffer(bucketid, tmp.info.parentid, parent_query_txt, 0); + if (rc != 1) snprintf(parent_query_txt, 32, "%s", ""); } } @@ -3101,7 +3101,7 @@ SaveQueryText(uint64 bucketid, if (buf_len <= sizeof (uint64)) return false; - dump_ok = dump_queries_buffer(bucketid, buf, buf_len); + dump_ok = dump_queries_buffer(bucketid, buf, MAX_QUERY_BUFFER_BUCKET); buf_len = sizeof (uint64); /* @@ -3321,6 +3321,8 @@ dump_queries_buffer(int bucket_id, unsigned char *buf, int buf_len) int fd = 0; char file_name[1024]; bool success = true; + int off = 0; + int tries = 0; snprintf(file_name, 1024, "%s.%d", PGSM_TEXT_FILE, bucket_id); fd = OpenTransientFile(file_name, O_RDWR | O_CREAT | O_APPEND | PG_BINARY); @@ -3333,14 +3335,24 @@ dump_queries_buffer(int bucket_id, unsigned char *buf, int buf_len) return false; } - if (write(fd, buf, buf_len) != buf_len) - { + /* Loop until write buf_len bytes to the file. */ + do { + ssize_t nwrite = write(fd, buf + off, buf_len - off); + if (nwrite == -1) + { + if (errno == EINTR && tries++ < 3) + continue; + + success = false; + break; + } + off += nwrite; + } while (off < buf_len); + + if (!success) ereport(LOG, - (errcode_for_file_access(), - errmsg("could not write file \"%s\": %m", - file_name))); - success = false; - } + (errcode_for_file_access(), + errmsg("could not write file \"%s\": %m", file_name))); if (fd > 0) CloseTransientFile(fd); @@ -3348,14 +3360,25 @@ dump_queries_buffer(int bucket_id, unsigned char *buf, int buf_len) return success; } +/* + * Try to locate query text in a dumped file for bucket_id. + * + * Returns: + * 1 Query sucessfully read, query_text will contain the query text. + * 0 Query not found. + * -1 I/O Error. + */ int -read_query_buffer(int bucket_id, uint64 queryid, char *query_txt) +read_query_buffer(int bucket_id, uint64 queryid, char *query_txt, size_t pos) { int fd = 0; - int buf_len = 0; char file_name[1024]; unsigned char *buf = NULL; + ssize_t nread = 0; int off = 0; + int tries = 0; + bool done = false; + bool found = false; snprintf(file_name, 1024, "%s.%d", PGSM_TEXT_FILE, bucket_id); fd = OpenTransientFile(file_name, O_RDONLY | PG_BINARY); @@ -3363,40 +3386,66 @@ read_query_buffer(int bucket_id, uint64 queryid, char *query_txt) goto exit; buf = (unsigned char*) palloc(MAX_QUERY_BUFFER_BUCKET); - for(;;) + while (!done) { - if (lseek(fd, off, SEEK_SET) != off) - goto exit; + off = 0; + /* read a chunck of MAX_QUERY_BUFFER_BUCKET size. */ + do { + nread = read(fd, buf + off, MAX_QUERY_BUFFER_BUCKET - off); + if (nread == -1) + { + if (errno == EINTR && tries++ < 3) /* read() was interrupted, attempt to read again (max attempts=3) */ + continue; - buf_len = read(fd, buf, MAX_QUERY_BUFFER_BUCKET); - if (buf_len != MAX_QUERY_BUFFER_BUCKET) - { - if (errno != ENOENT) goto exit; - - if (buf_len == 0) + } + else if (nread == 0) /* EOF */ + { + done = true; break; + } + + off += nread; + } while (off < MAX_QUERY_BUFFER_BUCKET); + + if (off == MAX_QUERY_BUFFER_BUCKET) + { + /* we have a chunck, scan it looking for queryid. */ + if (read_query(buf, queryid, query_txt, pos) != 0) + { + + found = true; + /* query was found, don't need to read another chunck. */ + break; + } } - off += buf_len; - if (read_query(buf, queryid, query_txt, 0)) + else + /* + * Either done=true or file has a size not multiple of MAX_QUERY_BUFFER_BUCKET. + * It is safe to assume that the file was truncated or corrupted. + */ break; } - if (fd > 0) - CloseTransientFile(fd); - if (buf) - pfree(buf); - return buf_len; exit: - ereport(LOG, - (errcode_for_file_access(), - errmsg("could not read file \"%s\": %m", - file_name))); - if (fd > 0) + if (fd < 0 || nread == -1) + ereport(LOG, + (errcode_for_file_access(), + errmsg("could not read file \"%s\": %m", + file_name))); + + if (fd >= 0) CloseTransientFile(fd); + if (buf) pfree(buf); - return buf_len; + + if (found) + return 1; + else if (fd == -1 || nread == -1) + return -1; /* I/O error. */ + else + return 0; /* Not found. */ } static double diff --git a/pg_stat_monitor.h b/pg_stat_monitor.h index a1ab4d5..be32103 100644 --- a/pg_stat_monitor.h +++ b/pg_stat_monitor.h @@ -390,7 +390,7 @@ void hash_entry_dealloc(int new_bucket_id, int old_bucket_id, unsigned char *que pgssEntry* hash_entry_alloc(pgssSharedState *pgss, pgssHashKey *key, int encoding); Size hash_memsize(void); -int read_query_buffer(int bucket_id, uint64 queryid, char *query_txt); +int read_query_buffer(int bucket_id, uint64 queryid, char *query_txt, size_t pos); uint64 read_query(unsigned char *buf, uint64 queryid, char * query, size_t pos); pgssQueryEntry* hash_find_query_entry(uint64 bucket_id, uint64 queryid, uint64 dbid, uint64 userid, uint64 ip, uint64 appid); pgssQueryEntry* hash_create_query_entry(uint64 bucket_id, uint64 queryid, uint64 dbid, uint64 userid, uint64 ip, uint64 appid);