diff --git a/pg_stat_monitor.c b/pg_stat_monitor.c index 127d6a3..93789c3 100644 --- a/pg_stat_monitor.c +++ b/pg_stat_monitor.c @@ -1701,9 +1701,9 @@ pg_stat_monitor_internal(FunctionCallInfo fcinfo, if (read_query(buf, queryid, query_txt, entry->query_pos) == 0) { - int len; - len = read_query_buffer(bucketid, queryid, query_txt); - if (len != MAX_QUERY_BUFFER_BUCKET) + int rc; + rc = read_query_buffer(bucketid, queryid, query_txt, entry->query_pos); + if (rc != 1) snprintf(query_txt, 32, "%s", ""); } @@ -1726,11 +1726,11 @@ pg_stat_monitor_internal(FunctionCallInfo fcinfo, if (tmp.info.parentid != UINT64CONST(0)) { - int len = 0; + int rc = 0; if (read_query(buf, tmp.info.parentid, parent_query_txt, 0) == 0) { - len = read_query_buffer(bucketid, tmp.info.parentid, parent_query_txt); - if (len != MAX_QUERY_BUFFER_BUCKET) + rc = read_query_buffer(bucketid, tmp.info.parentid, parent_query_txt, 0); + if (rc != 1) snprintf(parent_query_txt, 32, "%s", ""); } } @@ -3101,7 +3101,7 @@ SaveQueryText(uint64 bucketid, if (buf_len <= sizeof (uint64)) return false; - dump_ok = dump_queries_buffer(bucketid, buf, buf_len); + dump_ok = dump_queries_buffer(bucketid, buf, MAX_QUERY_BUFFER_BUCKET); buf_len = sizeof (uint64); /* @@ -3321,6 +3321,8 @@ dump_queries_buffer(int bucket_id, unsigned char *buf, int buf_len) int fd = 0; char file_name[1024]; bool success = true; + int off = 0; + int tries = 0; snprintf(file_name, 1024, "%s.%d", PGSM_TEXT_FILE, bucket_id); fd = OpenTransientFile(file_name, O_RDWR | O_CREAT | O_APPEND | PG_BINARY); @@ -3333,14 +3335,24 @@ dump_queries_buffer(int bucket_id, unsigned char *buf, int buf_len) return false; } - if (write(fd, buf, buf_len) != buf_len) - { + /* Loop until write buf_len bytes to the file. */ + do { + ssize_t nwrite = write(fd, buf + off, buf_len - off); + if (nwrite == -1) + { + if (errno == EINTR && tries++ < 3) + continue; + + success = false; + break; + } + off += nwrite; + } while (off < buf_len); + + if (!success) ereport(LOG, - (errcode_for_file_access(), - errmsg("could not write file \"%s\": %m", - file_name))); - success = false; - } + (errcode_for_file_access(), + errmsg("could not write file \"%s\": %m", file_name))); if (fd > 0) CloseTransientFile(fd); @@ -3348,14 +3360,25 @@ dump_queries_buffer(int bucket_id, unsigned char *buf, int buf_len) return success; } +/* + * Try to locate query text in a dumped file for bucket_id. + * + * Returns: + * 1 Query sucessfully read, query_text will contain the query text. + * 0 Query not found. + * -1 I/O Error. + */ int -read_query_buffer(int bucket_id, uint64 queryid, char *query_txt) +read_query_buffer(int bucket_id, uint64 queryid, char *query_txt, size_t pos) { int fd = 0; - int buf_len = 0; char file_name[1024]; unsigned char *buf = NULL; + ssize_t nread = 0; int off = 0; + int tries = 0; + bool done = false; + bool found = false; snprintf(file_name, 1024, "%s.%d", PGSM_TEXT_FILE, bucket_id); fd = OpenTransientFile(file_name, O_RDONLY | PG_BINARY); @@ -3363,40 +3386,66 @@ read_query_buffer(int bucket_id, uint64 queryid, char *query_txt) goto exit; buf = (unsigned char*) palloc(MAX_QUERY_BUFFER_BUCKET); - for(;;) + while (!done) { - if (lseek(fd, off, SEEK_SET) != off) - goto exit; + off = 0; + /* read a chunck of MAX_QUERY_BUFFER_BUCKET size. */ + do { + nread = read(fd, buf + off, MAX_QUERY_BUFFER_BUCKET - off); + if (nread == -1) + { + if (errno == EINTR && tries++ < 3) /* read() was interrupted, attempt to read again (max attempts=3) */ + continue; - buf_len = read(fd, buf, MAX_QUERY_BUFFER_BUCKET); - if (buf_len != MAX_QUERY_BUFFER_BUCKET) - { - if (errno != ENOENT) goto exit; - - if (buf_len == 0) + } + else if (nread == 0) /* EOF */ + { + done = true; break; + } + + off += nread; + } while (off < MAX_QUERY_BUFFER_BUCKET); + + if (off == MAX_QUERY_BUFFER_BUCKET) + { + /* we have a chunck, scan it looking for queryid. */ + if (read_query(buf, queryid, query_txt, pos) != 0) + { + + found = true; + /* query was found, don't need to read another chunck. */ + break; + } } - off += buf_len; - if (read_query(buf, queryid, query_txt, 0)) + else + /* + * Either done=true or file has a size not multiple of MAX_QUERY_BUFFER_BUCKET. + * It is safe to assume that the file was truncated or corrupted. + */ break; } - if (fd > 0) - CloseTransientFile(fd); - if (buf) - pfree(buf); - return buf_len; exit: - ereport(LOG, - (errcode_for_file_access(), - errmsg("could not read file \"%s\": %m", - file_name))); - if (fd > 0) + if (fd < 0 || nread == -1) + ereport(LOG, + (errcode_for_file_access(), + errmsg("could not read file \"%s\": %m", + file_name))); + + if (fd >= 0) CloseTransientFile(fd); + if (buf) pfree(buf); - return buf_len; + + if (found) + return 1; + else if (fd == -1 || nread == -1) + return -1; /* I/O error. */ + else + return 0; /* Not found. */ } static double diff --git a/pg_stat_monitor.h b/pg_stat_monitor.h index a1ab4d5..be32103 100644 --- a/pg_stat_monitor.h +++ b/pg_stat_monitor.h @@ -390,7 +390,7 @@ void hash_entry_dealloc(int new_bucket_id, int old_bucket_id, unsigned char *que pgssEntry* hash_entry_alloc(pgssSharedState *pgss, pgssHashKey *key, int encoding); Size hash_memsize(void); -int read_query_buffer(int bucket_id, uint64 queryid, char *query_txt); +int read_query_buffer(int bucket_id, uint64 queryid, char *query_txt, size_t pos); uint64 read_query(unsigned char *buf, uint64 queryid, char * query, size_t pos); pgssQueryEntry* hash_find_query_entry(uint64 bucket_id, uint64 queryid, uint64 dbid, uint64 userid, uint64 ip, uint64 appid); pgssQueryEntry* hash_create_query_entry(uint64 bucket_id, uint64 queryid, uint64 dbid, uint64 userid, uint64 ip, uint64 appid);