diff --git a/pg_stat_monitor.c b/pg_stat_monitor.c index f08e716..22038d5 100644 --- a/pg_stat_monitor.c +++ b/pg_stat_monitor.c @@ -76,7 +76,7 @@ static struct pg_hook_stats_t *pg_hook_stats; static void extract_query_comments(const char *query, char *comments, size_t max_len); static int get_histogram_bucket(double q_time); static bool IsSystemInitialized(void); -static void dump_queries_buffer(int bucket_id, unsigned char *buf, int buf_len); +static bool dump_queries_buffer(int bucket_id, unsigned char *buf, int buf_len); static double time_diff(struct timeval end, struct timeval start); @@ -1701,9 +1701,9 @@ pg_stat_monitor_internal(FunctionCallInfo fcinfo, if (read_query(buf, queryid, query_txt, entry->query_pos) == 0) { - int len; - len = read_query_buffer(bucketid, queryid, query_txt); - if (len != MAX_QUERY_BUFFER_BUCKET) + int rc; + rc = read_query_buffer(bucketid, queryid, query_txt, entry->query_pos); + if (rc != 1) snprintf(query_txt, 32, "%s", ""); } @@ -1726,11 +1726,11 @@ pg_stat_monitor_internal(FunctionCallInfo fcinfo, if (tmp.info.parentid != UINT64CONST(0)) { - int len = 0; + int rc = 0; if (read_query(buf, tmp.info.parentid, parent_query_txt, 0) == 0) { - len = read_query_buffer(bucketid, tmp.info.parentid, parent_query_txt); - if (len != MAX_QUERY_BUFFER_BUCKET) + rc = read_query_buffer(bucketid, tmp.info.parentid, parent_query_txt, 0); + if (rc != 1) snprintf(parent_query_txt, 32, "%s", ""); } } @@ -3092,11 +3092,39 @@ SaveQueryText(uint64 bucketid, return false; case OVERFLOW_TARGET_DISK: { - dump_queries_buffer(bucketid, buf, MAX_QUERY_BUFFER_BUCKET); + bool dump_ok; + + /* + * If the query buffer is empty, there is nothing to dump, this also + * means that the current query length exceeds MAX_QUERY_BUFFER_BUCKET. + */ + if (buf_len <= sizeof (uint64)) + return false; + + dump_ok = dump_queries_buffer(bucketid, buf, MAX_QUERY_BUFFER_BUCKET); buf_len = sizeof (uint64); + + /* + * We must check for overflow again, as the query length may + * exceed the size allocated to the buffer (MAX_QUERY_BUFFER_BUCKET). + */ + if (QUERY_BUFFER_OVERFLOW(buf_len, query_len)) + { + /* + * If we successfully dumped the query buffer to disk, then + * reset the buffer, otherwise we could end up dumping the + * same buffer again. + */ + if (dump_ok) + *(uint64 *)buf = 0; + + return false; + } + } break; default: + Assert(false); break; } } @@ -3287,37 +3315,70 @@ IsSystemInitialized(void) return (system_init && IsHashInitialize()); } -static void +static bool dump_queries_buffer(int bucket_id, unsigned char *buf, int buf_len) { - int fd = 0; + int fd = 0; char file_name[1024]; + bool success = true; + int off = 0; + int tries = 0; snprintf(file_name, 1024, "%s.%d", PGSM_TEXT_FILE, bucket_id); fd = OpenTransientFile(file_name, O_RDWR | O_CREAT | O_APPEND | PG_BINARY); - if (fd < 0) + if (fd < 0) + { ereport(LOG, (errcode_for_file_access(), errmsg("could not write file \"%s\": %m", file_name))); + return false; + } - if (write(fd, buf, buf_len) != buf_len) + /* Loop until write buf_len bytes to the file. */ + do { + ssize_t nwrite = write(fd, buf + off, buf_len - off); + if (nwrite == -1) + { + if (errno == EINTR && tries++ < 3) + continue; + + success = false; + break; + } + off += nwrite; + } while (off < buf_len); + + if (!success) ereport(LOG, - (errcode_for_file_access(), - errmsg("could not write file \"%s\": %m", - file_name))); + (errcode_for_file_access(), + errmsg("could not write file \"%s\": %m", file_name))); + if (fd > 0) CloseTransientFile(fd); + + return success; } +/* + * Try to locate query text in a dumped file for bucket_id. + * + * Returns: + * 1 Query sucessfully read, query_text will contain the query text. + * 0 Query not found. + * -1 I/O Error. + */ int -read_query_buffer(int bucket_id, uint64 queryid, char *query_txt) +read_query_buffer(int bucket_id, uint64 queryid, char *query_txt, size_t pos) { int fd = 0; - int buf_len = 0; char file_name[1024]; unsigned char *buf = NULL; + ssize_t nread = 0; int off = 0; + int tries = 0; + bool done = false; + bool found = false; snprintf(file_name, 1024, "%s.%d", PGSM_TEXT_FILE, bucket_id); fd = OpenTransientFile(file_name, O_RDONLY | PG_BINARY); @@ -3325,40 +3386,66 @@ read_query_buffer(int bucket_id, uint64 queryid, char *query_txt) goto exit; buf = (unsigned char*) palloc(MAX_QUERY_BUFFER_BUCKET); - for(;;) + while (!done) { - if (lseek(fd, off, SEEK_SET) != off) - goto exit; + off = 0; + /* read a chunck of MAX_QUERY_BUFFER_BUCKET size. */ + do { + nread = read(fd, buf + off, MAX_QUERY_BUFFER_BUCKET - off); + if (nread == -1) + { + if (errno == EINTR && tries++ < 3) /* read() was interrupted, attempt to read again (max attempts=3) */ + continue; - buf_len = read(fd, buf, MAX_QUERY_BUFFER_BUCKET); - if (buf_len != MAX_QUERY_BUFFER_BUCKET) - { - if (errno != ENOENT) goto exit; - - if (buf_len == 0) + } + else if (nread == 0) /* EOF */ + { + done = true; break; + } + + off += nread; + } while (off < MAX_QUERY_BUFFER_BUCKET); + + if (off == MAX_QUERY_BUFFER_BUCKET) + { + /* we have a chunck, scan it looking for queryid. */ + if (read_query(buf, queryid, query_txt, pos) != 0) + { + + found = true; + /* query was found, don't need to read another chunck. */ + break; + } } - off += buf_len; - if (read_query(buf, queryid, query_txt, 0)) + else + /* + * Either done=true or file has a size not multiple of MAX_QUERY_BUFFER_BUCKET. + * It is safe to assume that the file was truncated or corrupted. + */ break; } - if (fd > 0) - CloseTransientFile(fd); - if (buf) - pfree(buf); - return buf_len; exit: - ereport(LOG, - (errcode_for_file_access(), - errmsg("could not read file \"%s\": %m", - file_name))); - if (fd > 0) + if (fd < 0 || nread == -1) + ereport(LOG, + (errcode_for_file_access(), + errmsg("could not read file \"%s\": %m", + file_name))); + + if (fd >= 0) CloseTransientFile(fd); + if (buf) pfree(buf); - return buf_len; + + if (found) + return 1; + else if (fd == -1 || nread == -1) + return -1; /* I/O error. */ + else + return 0; /* Not found. */ } static double diff --git a/pg_stat_monitor.h b/pg_stat_monitor.h index 2ddfe01..be32103 100644 --- a/pg_stat_monitor.h +++ b/pg_stat_monitor.h @@ -390,7 +390,7 @@ void hash_entry_dealloc(int new_bucket_id, int old_bucket_id, unsigned char *que pgssEntry* hash_entry_alloc(pgssSharedState *pgss, pgssHashKey *key, int encoding); Size hash_memsize(void); -int read_query_buffer(int bucket_id, uint64 queryid, char *query_txt); +int read_query_buffer(int bucket_id, uint64 queryid, char *query_txt, size_t pos); uint64 read_query(unsigned char *buf, uint64 queryid, char * query, size_t pos); pgssQueryEntry* hash_find_query_entry(uint64 bucket_id, uint64 queryid, uint64 dbid, uint64 userid, uint64 ip, uint64 appid); pgssQueryEntry* hash_create_query_entry(uint64 bucket_id, uint64 queryid, uint64 dbid, uint64 userid, uint64 ip, uint64 appid); @@ -411,7 +411,7 @@ void pgss_startup(void); #define PGSM_HISTOGRAM_MAX get_conf(8)->guc_variable #define PGSM_HISTOGRAM_BUCKETS get_conf(9)->guc_variable #define PGSM_QUERY_SHARED_BUFFER get_conf(10)->guc_variable -#define PGSM_OVERFLOW_TARGET get_conf(12)->guc_variable +#define PGSM_OVERFLOW_TARGET get_conf(11)->guc_variable #define PGSM_QUERY_PLAN get_conf(12)->guc_variable #define PGSM_TRACK_PLANNING get_conf(13)->guc_variable diff --git a/regression/expected/guc.out b/regression/expected/guc.out index 934c719..0da2986 100644 --- a/regression/expected/guc.out +++ b/regression/expected/guc.out @@ -23,7 +23,7 @@ SELECT * FROM pg_stat_monitor_settings ORDER BY name COLLATE "C"; pg_stat_monitor.pgsm_max | 100 | 100 | Sets the maximum size of shared memory in (MB) used for statement's metadata tracked by pg_stat_monitor. | 1 | 1000 | 1 pg_stat_monitor.pgsm_max_buckets | 10 | 10 | Sets the maximum number of buckets. | 1 | 10 | 1 pg_stat_monitor.pgsm_normalized_query | 1 | 1 | Selects whether save query in normalized format. | 0 | 0 | 0 - pg_stat_monitor.pgsm_overflow_target | 0 | 1 | Sets the overflow target for pg_stat_monitor | 0 | 1 | 1 + pg_stat_monitor.pgsm_overflow_target | 1 | 1 | Sets the overflow target for pg_stat_monitor | 0 | 1 | 1 pg_stat_monitor.pgsm_query_max_len | 1024 | 1024 | Sets the maximum length of query. | 1024 | 2147483647 | 1 pg_stat_monitor.pgsm_query_shared_buffer | 20 | 20 | Sets the maximum size of shared memory in (MB) used for query tracked by pg_stat_monitor. | 1 | 10000 | 1 pg_stat_monitor.pgsm_track_planning | 1 | 1 | Selects whether planning statistics are tracked. | 0 | 0 | 0