PG-220: Fix read/write of dumped query buffer to files.
This commit fix some issues when the query buffer overflows and pg_stat_monitor attempts to dump its contents to a file. The dump process is now as follows: 1. The dump will always be a full dump of the current query buffer, meaning pg_stat_monitor will dump MAX_QUERY_BUFFER_BUCKET bytes to the dump file. 2. When scanning the dump file, read chunks of size MAX_QUERY_BUFFER_BUCKET, then look for the query ID using that chunk and the query position metadata, this allows pg_stat_monitor to avoid scanning the whole chunk when looking for a query ID. The code in charge to read from/write to the dump file now takes into account that read() and write() may return less bytes than what it was asked for, the code now ensures that we actually read or write the amount of bytes required (MAX_QUERY_BUFFER_BUCKET), also it handles rare but posssible interrupts when doing those operations.pull/129/head
parent
e593dbccc3
commit
f66b45afd6
|
@ -1701,9 +1701,9 @@ pg_stat_monitor_internal(FunctionCallInfo fcinfo,
|
|||
|
||||
if (read_query(buf, queryid, query_txt, entry->query_pos) == 0)
|
||||
{
|
||||
int len;
|
||||
len = read_query_buffer(bucketid, queryid, query_txt);
|
||||
if (len != MAX_QUERY_BUFFER_BUCKET)
|
||||
int rc;
|
||||
rc = read_query_buffer(bucketid, queryid, query_txt, entry->query_pos);
|
||||
if (rc != 1)
|
||||
snprintf(query_txt, 32, "%s", "<insufficient disk/shared space>");
|
||||
}
|
||||
|
||||
|
@ -1726,11 +1726,11 @@ pg_stat_monitor_internal(FunctionCallInfo fcinfo,
|
|||
|
||||
if (tmp.info.parentid != UINT64CONST(0))
|
||||
{
|
||||
int len = 0;
|
||||
int rc = 0;
|
||||
if (read_query(buf, tmp.info.parentid, parent_query_txt, 0) == 0)
|
||||
{
|
||||
len = read_query_buffer(bucketid, tmp.info.parentid, parent_query_txt);
|
||||
if (len != MAX_QUERY_BUFFER_BUCKET)
|
||||
rc = read_query_buffer(bucketid, tmp.info.parentid, parent_query_txt, 0);
|
||||
if (rc != 1)
|
||||
snprintf(parent_query_txt, 32, "%s", "<insufficient disk/shared space>");
|
||||
}
|
||||
}
|
||||
|
@ -3101,7 +3101,7 @@ SaveQueryText(uint64 bucketid,
|
|||
if (buf_len <= sizeof (uint64))
|
||||
return false;
|
||||
|
||||
dump_ok = dump_queries_buffer(bucketid, buf, buf_len);
|
||||
dump_ok = dump_queries_buffer(bucketid, buf, MAX_QUERY_BUFFER_BUCKET);
|
||||
buf_len = sizeof (uint64);
|
||||
|
||||
/*
|
||||
|
@ -3321,6 +3321,8 @@ dump_queries_buffer(int bucket_id, unsigned char *buf, int buf_len)
|
|||
int fd = 0;
|
||||
char file_name[1024];
|
||||
bool success = true;
|
||||
int off = 0;
|
||||
int tries = 0;
|
||||
|
||||
snprintf(file_name, 1024, "%s.%d", PGSM_TEXT_FILE, bucket_id);
|
||||
fd = OpenTransientFile(file_name, O_RDWR | O_CREAT | O_APPEND | PG_BINARY);
|
||||
|
@ -3333,14 +3335,24 @@ dump_queries_buffer(int bucket_id, unsigned char *buf, int buf_len)
|
|||
return false;
|
||||
}
|
||||
|
||||
if (write(fd, buf, buf_len) != buf_len)
|
||||
{
|
||||
/* Loop until write buf_len bytes to the file. */
|
||||
do {
|
||||
ssize_t nwrite = write(fd, buf + off, buf_len - off);
|
||||
if (nwrite == -1)
|
||||
{
|
||||
if (errno == EINTR && tries++ < 3)
|
||||
continue;
|
||||
|
||||
success = false;
|
||||
break;
|
||||
}
|
||||
off += nwrite;
|
||||
} while (off < buf_len);
|
||||
|
||||
if (!success)
|
||||
ereport(LOG,
|
||||
(errcode_for_file_access(),
|
||||
errmsg("could not write file \"%s\": %m",
|
||||
file_name)));
|
||||
success = false;
|
||||
}
|
||||
(errcode_for_file_access(),
|
||||
errmsg("could not write file \"%s\": %m", file_name)));
|
||||
|
||||
if (fd > 0)
|
||||
CloseTransientFile(fd);
|
||||
|
@ -3348,14 +3360,25 @@ dump_queries_buffer(int bucket_id, unsigned char *buf, int buf_len)
|
|||
return success;
|
||||
}
|
||||
|
||||
/*
|
||||
* Try to locate query text in a dumped file for bucket_id.
|
||||
*
|
||||
* Returns:
|
||||
* 1 Query sucessfully read, query_text will contain the query text.
|
||||
* 0 Query not found.
|
||||
* -1 I/O Error.
|
||||
*/
|
||||
int
|
||||
read_query_buffer(int bucket_id, uint64 queryid, char *query_txt)
|
||||
read_query_buffer(int bucket_id, uint64 queryid, char *query_txt, size_t pos)
|
||||
{
|
||||
int fd = 0;
|
||||
int buf_len = 0;
|
||||
char file_name[1024];
|
||||
unsigned char *buf = NULL;
|
||||
ssize_t nread = 0;
|
||||
int off = 0;
|
||||
int tries = 0;
|
||||
bool done = false;
|
||||
bool found = false;
|
||||
|
||||
snprintf(file_name, 1024, "%s.%d", PGSM_TEXT_FILE, bucket_id);
|
||||
fd = OpenTransientFile(file_name, O_RDONLY | PG_BINARY);
|
||||
|
@ -3363,40 +3386,66 @@ read_query_buffer(int bucket_id, uint64 queryid, char *query_txt)
|
|||
goto exit;
|
||||
|
||||
buf = (unsigned char*) palloc(MAX_QUERY_BUFFER_BUCKET);
|
||||
for(;;)
|
||||
while (!done)
|
||||
{
|
||||
if (lseek(fd, off, SEEK_SET) != off)
|
||||
goto exit;
|
||||
off = 0;
|
||||
/* read a chunck of MAX_QUERY_BUFFER_BUCKET size. */
|
||||
do {
|
||||
nread = read(fd, buf + off, MAX_QUERY_BUFFER_BUCKET - off);
|
||||
if (nread == -1)
|
||||
{
|
||||
if (errno == EINTR && tries++ < 3) /* read() was interrupted, attempt to read again (max attempts=3) */
|
||||
continue;
|
||||
|
||||
buf_len = read(fd, buf, MAX_QUERY_BUFFER_BUCKET);
|
||||
if (buf_len != MAX_QUERY_BUFFER_BUCKET)
|
||||
{
|
||||
if (errno != ENOENT)
|
||||
goto exit;
|
||||
|
||||
if (buf_len == 0)
|
||||
}
|
||||
else if (nread == 0) /* EOF */
|
||||
{
|
||||
done = true;
|
||||
break;
|
||||
}
|
||||
|
||||
off += nread;
|
||||
} while (off < MAX_QUERY_BUFFER_BUCKET);
|
||||
|
||||
if (off == MAX_QUERY_BUFFER_BUCKET)
|
||||
{
|
||||
/* we have a chunck, scan it looking for queryid. */
|
||||
if (read_query(buf, queryid, query_txt, pos) != 0)
|
||||
{
|
||||
|
||||
found = true;
|
||||
/* query was found, don't need to read another chunck. */
|
||||
break;
|
||||
}
|
||||
}
|
||||
off += buf_len;
|
||||
if (read_query(buf, queryid, query_txt, 0))
|
||||
else
|
||||
/*
|
||||
* Either done=true or file has a size not multiple of MAX_QUERY_BUFFER_BUCKET.
|
||||
* It is safe to assume that the file was truncated or corrupted.
|
||||
*/
|
||||
break;
|
||||
}
|
||||
if (fd > 0)
|
||||
CloseTransientFile(fd);
|
||||
if (buf)
|
||||
pfree(buf);
|
||||
return buf_len;
|
||||
|
||||
exit:
|
||||
ereport(LOG,
|
||||
(errcode_for_file_access(),
|
||||
errmsg("could not read file \"%s\": %m",
|
||||
file_name)));
|
||||
if (fd > 0)
|
||||
if (fd < 0 || nread == -1)
|
||||
ereport(LOG,
|
||||
(errcode_for_file_access(),
|
||||
errmsg("could not read file \"%s\": %m",
|
||||
file_name)));
|
||||
|
||||
if (fd >= 0)
|
||||
CloseTransientFile(fd);
|
||||
|
||||
if (buf)
|
||||
pfree(buf);
|
||||
return buf_len;
|
||||
|
||||
if (found)
|
||||
return 1;
|
||||
else if (fd == -1 || nread == -1)
|
||||
return -1; /* I/O error. */
|
||||
else
|
||||
return 0; /* Not found. */
|
||||
}
|
||||
|
||||
static double
|
||||
|
|
|
@ -390,7 +390,7 @@ void hash_entry_dealloc(int new_bucket_id, int old_bucket_id, unsigned char *que
|
|||
pgssEntry* hash_entry_alloc(pgssSharedState *pgss, pgssHashKey *key, int encoding);
|
||||
Size hash_memsize(void);
|
||||
|
||||
int read_query_buffer(int bucket_id, uint64 queryid, char *query_txt);
|
||||
int read_query_buffer(int bucket_id, uint64 queryid, char *query_txt, size_t pos);
|
||||
uint64 read_query(unsigned char *buf, uint64 queryid, char * query, size_t pos);
|
||||
pgssQueryEntry* hash_find_query_entry(uint64 bucket_id, uint64 queryid, uint64 dbid, uint64 userid, uint64 ip, uint64 appid);
|
||||
pgssQueryEntry* hash_create_query_entry(uint64 bucket_id, uint64 queryid, uint64 dbid, uint64 userid, uint64 ip, uint64 appid);
|
||||
|
|
Loading…
Reference in New Issue