Merge pull request #131 from percona/devel

Devel
pull/138/head
Ibrar Ahmed 2021-11-12 20:41:37 +05:00 committed by GitHub
commit 0148409b33
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 129 additions and 42 deletions

View File

@ -76,7 +76,7 @@ static struct pg_hook_stats_t *pg_hook_stats;
static void extract_query_comments(const char *query, char *comments, size_t max_len);
static int get_histogram_bucket(double q_time);
static bool IsSystemInitialized(void);
static void dump_queries_buffer(int bucket_id, unsigned char *buf, int buf_len);
static bool dump_queries_buffer(int bucket_id, unsigned char *buf, int buf_len);
static double time_diff(struct timeval end, struct timeval start);
@ -1701,9 +1701,9 @@ pg_stat_monitor_internal(FunctionCallInfo fcinfo,
if (read_query(buf, queryid, query_txt, entry->query_pos) == 0)
{
int len;
len = read_query_buffer(bucketid, queryid, query_txt);
if (len != MAX_QUERY_BUFFER_BUCKET)
int rc;
rc = read_query_buffer(bucketid, queryid, query_txt, entry->query_pos);
if (rc != 1)
snprintf(query_txt, 32, "%s", "<insufficient disk/shared space>");
}
@ -1726,11 +1726,11 @@ pg_stat_monitor_internal(FunctionCallInfo fcinfo,
if (tmp.info.parentid != UINT64CONST(0))
{
int len = 0;
int rc = 0;
if (read_query(buf, tmp.info.parentid, parent_query_txt, 0) == 0)
{
len = read_query_buffer(bucketid, tmp.info.parentid, parent_query_txt);
if (len != MAX_QUERY_BUFFER_BUCKET)
rc = read_query_buffer(bucketid, tmp.info.parentid, parent_query_txt, 0);
if (rc != 1)
snprintf(parent_query_txt, 32, "%s", "<insufficient disk/shared space>");
}
}
@ -3092,11 +3092,39 @@ SaveQueryText(uint64 bucketid,
return false;
case OVERFLOW_TARGET_DISK:
{
dump_queries_buffer(bucketid, buf, MAX_QUERY_BUFFER_BUCKET);
bool dump_ok;
/*
* If the query buffer is empty, there is nothing to dump, this also
* means that the current query length exceeds MAX_QUERY_BUFFER_BUCKET.
*/
if (buf_len <= sizeof (uint64))
return false;
dump_ok = dump_queries_buffer(bucketid, buf, MAX_QUERY_BUFFER_BUCKET);
buf_len = sizeof (uint64);
/*
* We must check for overflow again, as the query length may
* exceed the size allocated to the buffer (MAX_QUERY_BUFFER_BUCKET).
*/
if (QUERY_BUFFER_OVERFLOW(buf_len, query_len))
{
/*
* If we successfully dumped the query buffer to disk, then
* reset the buffer, otherwise we could end up dumping the
* same buffer again.
*/
if (dump_ok)
*(uint64 *)buf = 0;
return false;
}
}
break;
default:
Assert(false);
break;
}
}
@ -3287,37 +3315,70 @@ IsSystemInitialized(void)
return (system_init && IsHashInitialize());
}
static void
static bool
dump_queries_buffer(int bucket_id, unsigned char *buf, int buf_len)
{
int fd = 0;
int fd = 0;
char file_name[1024];
bool success = true;
int off = 0;
int tries = 0;
snprintf(file_name, 1024, "%s.%d", PGSM_TEXT_FILE, bucket_id);
fd = OpenTransientFile(file_name, O_RDWR | O_CREAT | O_APPEND | PG_BINARY);
if (fd < 0)
if (fd < 0)
{
ereport(LOG,
(errcode_for_file_access(),
errmsg("could not write file \"%s\": %m",
file_name)));
return false;
}
if (write(fd, buf, buf_len) != buf_len)
/* Loop until write buf_len bytes to the file. */
do {
ssize_t nwrite = write(fd, buf + off, buf_len - off);
if (nwrite == -1)
{
if (errno == EINTR && tries++ < 3)
continue;
success = false;
break;
}
off += nwrite;
} while (off < buf_len);
if (!success)
ereport(LOG,
(errcode_for_file_access(),
errmsg("could not write file \"%s\": %m",
file_name)));
(errcode_for_file_access(),
errmsg("could not write file \"%s\": %m", file_name)));
if (fd > 0)
CloseTransientFile(fd);
return success;
}
/*
* Try to locate query text in a dumped file for bucket_id.
*
* Returns:
* 1 Query sucessfully read, query_text will contain the query text.
* 0 Query not found.
* -1 I/O Error.
*/
int
read_query_buffer(int bucket_id, uint64 queryid, char *query_txt)
read_query_buffer(int bucket_id, uint64 queryid, char *query_txt, size_t pos)
{
int fd = 0;
int buf_len = 0;
char file_name[1024];
unsigned char *buf = NULL;
ssize_t nread = 0;
int off = 0;
int tries = 0;
bool done = false;
bool found = false;
snprintf(file_name, 1024, "%s.%d", PGSM_TEXT_FILE, bucket_id);
fd = OpenTransientFile(file_name, O_RDONLY | PG_BINARY);
@ -3325,40 +3386,66 @@ read_query_buffer(int bucket_id, uint64 queryid, char *query_txt)
goto exit;
buf = (unsigned char*) palloc(MAX_QUERY_BUFFER_BUCKET);
for(;;)
while (!done)
{
if (lseek(fd, off, SEEK_SET) != off)
goto exit;
off = 0;
/* read a chunck of MAX_QUERY_BUFFER_BUCKET size. */
do {
nread = read(fd, buf + off, MAX_QUERY_BUFFER_BUCKET - off);
if (nread == -1)
{
if (errno == EINTR && tries++ < 3) /* read() was interrupted, attempt to read again (max attempts=3) */
continue;
buf_len = read(fd, buf, MAX_QUERY_BUFFER_BUCKET);
if (buf_len != MAX_QUERY_BUFFER_BUCKET)
{
if (errno != ENOENT)
goto exit;
if (buf_len == 0)
}
else if (nread == 0) /* EOF */
{
done = true;
break;
}
off += nread;
} while (off < MAX_QUERY_BUFFER_BUCKET);
if (off == MAX_QUERY_BUFFER_BUCKET)
{
/* we have a chunck, scan it looking for queryid. */
if (read_query(buf, queryid, query_txt, pos) != 0)
{
found = true;
/* query was found, don't need to read another chunck. */
break;
}
}
off += buf_len;
if (read_query(buf, queryid, query_txt, 0))
else
/*
* Either done=true or file has a size not multiple of MAX_QUERY_BUFFER_BUCKET.
* It is safe to assume that the file was truncated or corrupted.
*/
break;
}
if (fd > 0)
CloseTransientFile(fd);
if (buf)
pfree(buf);
return buf_len;
exit:
ereport(LOG,
(errcode_for_file_access(),
errmsg("could not read file \"%s\": %m",
file_name)));
if (fd > 0)
if (fd < 0 || nread == -1)
ereport(LOG,
(errcode_for_file_access(),
errmsg("could not read file \"%s\": %m",
file_name)));
if (fd >= 0)
CloseTransientFile(fd);
if (buf)
pfree(buf);
return buf_len;
if (found)
return 1;
else if (fd == -1 || nread == -1)
return -1; /* I/O error. */
else
return 0; /* Not found. */
}
static double

View File

@ -390,7 +390,7 @@ void hash_entry_dealloc(int new_bucket_id, int old_bucket_id, unsigned char *que
pgssEntry* hash_entry_alloc(pgssSharedState *pgss, pgssHashKey *key, int encoding);
Size hash_memsize(void);
int read_query_buffer(int bucket_id, uint64 queryid, char *query_txt);
int read_query_buffer(int bucket_id, uint64 queryid, char *query_txt, size_t pos);
uint64 read_query(unsigned char *buf, uint64 queryid, char * query, size_t pos);
pgssQueryEntry* hash_find_query_entry(uint64 bucket_id, uint64 queryid, uint64 dbid, uint64 userid, uint64 ip, uint64 appid);
pgssQueryEntry* hash_create_query_entry(uint64 bucket_id, uint64 queryid, uint64 dbid, uint64 userid, uint64 ip, uint64 appid);
@ -411,7 +411,7 @@ void pgss_startup(void);
#define PGSM_HISTOGRAM_MAX get_conf(8)->guc_variable
#define PGSM_HISTOGRAM_BUCKETS get_conf(9)->guc_variable
#define PGSM_QUERY_SHARED_BUFFER get_conf(10)->guc_variable
#define PGSM_OVERFLOW_TARGET get_conf(12)->guc_variable
#define PGSM_OVERFLOW_TARGET get_conf(11)->guc_variable
#define PGSM_QUERY_PLAN get_conf(12)->guc_variable
#define PGSM_TRACK_PLANNING get_conf(13)->guc_variable

View File

@ -23,7 +23,7 @@ SELECT * FROM pg_stat_monitor_settings ORDER BY name COLLATE "C";
pg_stat_monitor.pgsm_max | 100 | 100 | Sets the maximum size of shared memory in (MB) used for statement's metadata tracked by pg_stat_monitor. | 1 | 1000 | 1
pg_stat_monitor.pgsm_max_buckets | 10 | 10 | Sets the maximum number of buckets. | 1 | 10 | 1
pg_stat_monitor.pgsm_normalized_query | 1 | 1 | Selects whether save query in normalized format. | 0 | 0 | 0
pg_stat_monitor.pgsm_overflow_target | 0 | 1 | Sets the overflow target for pg_stat_monitor | 0 | 1 | 1
pg_stat_monitor.pgsm_overflow_target | 1 | 1 | Sets the overflow target for pg_stat_monitor | 0 | 1 | 1
pg_stat_monitor.pgsm_query_max_len | 1024 | 1024 | Sets the maximum length of query. | 1024 | 2147483647 | 1
pg_stat_monitor.pgsm_query_shared_buffer | 20 | 20 | Sets the maximum size of shared memory in (MB) used for query tracked by pg_stat_monitor. | 1 | 10000 | 1
pg_stat_monitor.pgsm_track_planning | 1 | 1 | Selects whether planning statistics are tracked. | 0 | 0 | 0