mirror of
https://github.com/percona/pg_stat_monitor.git
synced 2026-02-04 14:06:20 +00:00
PG-488: pg_stat_monitor: Overflow management.
Reimplement the storage mechanism of buckets and query texts using Dynamic shared memory. Since the dynamic shared memory can grow into a swap area, so we get the overflow out of the box. oreover the new design saves the query pointer inside the bucket and eventually, the query text gets evicted with the bucket recycle. Finally, the dynamic shared memory hash has a built-in locking mechanism so we can revisit the whole locking in pg_stat_monitor has potential for lots of performance improvements
This commit is contained in:
@@ -32,7 +32,6 @@
|
||||
PGSM_V2_0
|
||||
} pgsmVersion;
|
||||
|
||||
|
||||
PG_MODULE_MAGIC;
|
||||
|
||||
#define BUILD_VERSION "2.0.0-dev"
|
||||
@@ -78,6 +77,7 @@ static int plan_nested_level = 0;
|
||||
|
||||
/* The array to store outer layer query id*/
|
||||
uint64 *nested_queryids;
|
||||
char **nested_query_txts;
|
||||
|
||||
/* Regex object used to extract query comments. */
|
||||
static regex_t preg_query_comments;
|
||||
@@ -88,13 +88,11 @@ static struct rusage rusage_start;
|
||||
static struct rusage rusage_end;
|
||||
|
||||
/* Query buffer, store queries' text. */
|
||||
static unsigned char *pgss_qbuf = NULL;
|
||||
static char *pgss_explain(QueryDesc *queryDesc);
|
||||
|
||||
static void extract_query_comments(const char *query, char *comments, size_t max_len);
|
||||
static int get_histogram_bucket(double q_time);
|
||||
static bool IsSystemInitialized(void);
|
||||
static bool dump_queries_buffer(int bucket_id, unsigned char *buf, int buf_len);
|
||||
static double time_diff(struct timeval end, struct timeval start);
|
||||
static void request_additional_shared_resources(void);
|
||||
|
||||
@@ -230,7 +228,6 @@ static uint64 djb2_hash(unsigned char *str, size_t len);
|
||||
|
||||
/* Same as above, but stores the calculated string length into *out_len (small optimization) */
|
||||
static uint64 djb2_hash_str(unsigned char *str, int *out_len);
|
||||
|
||||
/*
|
||||
* Module load callback
|
||||
*/
|
||||
@@ -239,7 +236,6 @@ void
|
||||
_PG_init(void)
|
||||
{
|
||||
int rc;
|
||||
char file_name[1024];
|
||||
|
||||
elog(DEBUG2, "pg_stat_monitor: %s()", __FUNCTION__);
|
||||
|
||||
@@ -266,8 +262,6 @@ _PG_init(void)
|
||||
EnableQueryId();
|
||||
#endif
|
||||
|
||||
snprintf(file_name, 1024, "%s", PGSM_TEXT_FILE);
|
||||
unlink(file_name);
|
||||
|
||||
EmitWarningsOnPlaceholders("pg_stat_monitor");
|
||||
|
||||
@@ -313,6 +307,7 @@ _PG_init(void)
|
||||
ExecutorCheckPerms_hook = HOOK(pgss_ExecutorCheckPerms);
|
||||
|
||||
nested_queryids = (uint64 *) malloc(sizeof(uint64) * max_stack_depth);
|
||||
nested_query_txts = (char **) malloc(sizeof(char*) * max_stack_depth);
|
||||
|
||||
system_init = true;
|
||||
}
|
||||
@@ -335,6 +330,7 @@ _PG_fini(void)
|
||||
emit_log_hook = prev_emit_log_hook;
|
||||
|
||||
free(nested_queryids);
|
||||
free(nested_query_txts);
|
||||
regfree(&preg_query_comments);
|
||||
|
||||
hash_entry_reset();
|
||||
@@ -363,7 +359,7 @@ request_additional_shared_resources(void)
|
||||
* the postmaster process.) We'll allocate or attach to the shared
|
||||
* resources in pgss_shmem_startup().
|
||||
*/
|
||||
RequestAddinShmemSpace(hash_memsize() + HOOK_STATS_SIZE);
|
||||
RequestAddinShmemSpace(pgsm_ShmemSize() + HOOK_STATS_SIZE);
|
||||
RequestNamedLWLockTranche("pg_stat_monitor", 1);
|
||||
}
|
||||
/*
|
||||
@@ -554,7 +550,11 @@ pgss_ExecutorRun(QueryDesc *queryDesc, ScanDirection direction, uint64 count,
|
||||
bool execute_once)
|
||||
{
|
||||
if (exec_nested_level >= 0 && exec_nested_level < max_stack_depth)
|
||||
{
|
||||
nested_queryids[exec_nested_level] = queryDesc->plannedstmt->queryId;
|
||||
nested_query_txts[exec_nested_level] = strdup(queryDesc->sourceText);
|
||||
}
|
||||
|
||||
exec_nested_level++;
|
||||
PG_TRY();
|
||||
{
|
||||
@@ -564,13 +564,23 @@ pgss_ExecutorRun(QueryDesc *queryDesc, ScanDirection direction, uint64 count,
|
||||
standard_ExecutorRun(queryDesc, direction, count, execute_once);
|
||||
exec_nested_level--;
|
||||
if (exec_nested_level >= 0 && exec_nested_level < max_stack_depth)
|
||||
{
|
||||
nested_queryids[exec_nested_level] = UINT64CONST(0);
|
||||
if(nested_query_txts[exec_nested_level])
|
||||
free(nested_query_txts[exec_nested_level]);
|
||||
nested_query_txts[exec_nested_level] = NULL;
|
||||
}
|
||||
}
|
||||
PG_CATCH();
|
||||
{
|
||||
exec_nested_level--;
|
||||
if (exec_nested_level >= 0 && exec_nested_level < max_stack_depth)
|
||||
{
|
||||
nested_queryids[exec_nested_level] = UINT64CONST(0);
|
||||
if(nested_query_txts[exec_nested_level])
|
||||
free(nested_query_txts[exec_nested_level]);
|
||||
nested_query_txts[exec_nested_level] = NULL;
|
||||
}
|
||||
PG_RE_THROW();
|
||||
}
|
||||
PG_END_TRY();
|
||||
@@ -1260,11 +1270,29 @@ pgss_update_entry(pgssEntry *entry,
|
||||
if (exec_nested_level > 0)
|
||||
{
|
||||
if (exec_nested_level >= 0 && exec_nested_level < max_stack_depth)
|
||||
{
|
||||
int parent_query_len = nested_query_txts[exec_nested_level - 1]?
|
||||
strlen(nested_query_txts[exec_nested_level - 1]): 0;
|
||||
e->counters.info.parentid = nested_queryids[exec_nested_level - 1];
|
||||
if (parent_query_len > 0)
|
||||
{
|
||||
char *qry_buff;
|
||||
dsa_area *query_dsa_area = get_dsa_area_for_query_text();
|
||||
dsa_pointer qry = dsa_allocate(query_dsa_area, parent_query_len+1);
|
||||
qry_buff = dsa_get_address(query_dsa_area, qry);
|
||||
memcpy(qry_buff, nested_query_txts[exec_nested_level - 1], parent_query_len);
|
||||
qry_buff[parent_query_len] = 0;
|
||||
e->counters.info.parent_query = qry;
|
||||
}
|
||||
else
|
||||
e->counters.info.parent_query = InvalidDsaPointer;
|
||||
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
e->counters.info.parentid = UINT64CONST(0);
|
||||
e->counters.info.parent_query = InvalidDsaPointer;
|
||||
}
|
||||
|
||||
if (error_info)
|
||||
@@ -1381,7 +1409,6 @@ pgss_store(uint64 queryid,
|
||||
JumbleState *jstate,
|
||||
pgssStoreKind kind)
|
||||
{
|
||||
HTAB *pgss_hash;
|
||||
pgssHashKey key;
|
||||
pgssEntry *entry;
|
||||
pgssSharedState *pgss = pgsm_get_ss();
|
||||
@@ -1486,19 +1513,15 @@ pgss_store(uint64 queryid,
|
||||
#else
|
||||
key.toplevel = ((exec_nested_level + plan_nested_level) == 0);
|
||||
#endif
|
||||
pgss_hash = pgsm_get_hash();
|
||||
|
||||
LWLockAcquire(pgss->lock, LW_SHARED);
|
||||
|
||||
entry = (pgssEntry *) hash_search(pgss_hash, &key, HASH_FIND, NULL);
|
||||
entry = (pgssEntry *) dshash_find(get_pgssHash(), &key, false);
|
||||
if (!entry)
|
||||
{
|
||||
pgssQueryEntry *query_entry;
|
||||
bool query_found = false;
|
||||
uint64 prev_qbuf_len = 0;
|
||||
HTAB *pgss_query_hash;
|
||||
dsa_pointer dsa_query_pointer;
|
||||
char* query_buff;
|
||||
|
||||
pgss_query_hash = pgsm_get_query_hash();
|
||||
|
||||
/*
|
||||
* Create a new, normalized query string if caller asked. We don't
|
||||
@@ -1509,74 +1532,41 @@ pgss_store(uint64 queryid,
|
||||
*/
|
||||
if (jstate && PGSM_NORMALIZED_QUERY)
|
||||
{
|
||||
LWLockRelease(pgss->lock);
|
||||
norm_query = generate_normalized_query(jstate, query,
|
||||
query_location,
|
||||
&query_len,
|
||||
GetDatabaseEncoding());
|
||||
LWLockAcquire(pgss->lock, LW_SHARED);
|
||||
}
|
||||
|
||||
query_entry = hash_search(pgss_query_hash, &queryid, HASH_ENTER_NULL, &query_found);
|
||||
if (query_entry == NULL)
|
||||
{
|
||||
LWLockRelease(pgss->lock);
|
||||
if (norm_query)
|
||||
pfree(norm_query);
|
||||
elog(DEBUG1, "pgss_store: out of memory (pgss_query_hash).");
|
||||
return;
|
||||
}
|
||||
else if (!query_found)
|
||||
{
|
||||
/* New query, truncate length if necessary. */
|
||||
if (query_len > PGSM_QUERY_MAX_LEN)
|
||||
query_len = PGSM_QUERY_MAX_LEN;
|
||||
}
|
||||
/* New query, truncate length if necessary. */
|
||||
if (query_len > PGSM_QUERY_MAX_LEN)
|
||||
query_len = PGSM_QUERY_MAX_LEN;
|
||||
|
||||
/* Need exclusive lock to make a new hashtable entry - promote */
|
||||
LWLockRelease(pgss->lock);
|
||||
LWLockAcquire(pgss->lock, LW_EXCLUSIVE);
|
||||
|
||||
if (!query_found)
|
||||
{
|
||||
if (!SaveQueryText(bucketid,
|
||||
queryid,
|
||||
pgss_qbuf,
|
||||
norm_query ? norm_query : query,
|
||||
query_len,
|
||||
&query_entry->query_pos))
|
||||
{
|
||||
LWLockRelease(pgss->lock);
|
||||
if (norm_query)
|
||||
pfree(norm_query);
|
||||
elog(DEBUG1, "pgss_store: insufficient shared space for query.");
|
||||
return;
|
||||
}
|
||||
|
||||
/*
|
||||
* Save current query buffer length, if we fail to add a new new
|
||||
* entry to the hash table then we must restore the original
|
||||
* length.
|
||||
*/
|
||||
memcpy(&prev_qbuf_len, pgss_qbuf, sizeof(prev_qbuf_len));
|
||||
}
|
||||
/* Save the query text in raw dsa area */
|
||||
dsa_area* query_dsa_area = get_dsa_area_for_query_text();
|
||||
dsa_query_pointer = dsa_allocate(query_dsa_area, query_len+1);
|
||||
query_buff = dsa_get_address(query_dsa_area, dsa_query_pointer);
|
||||
memcpy(query_buff, norm_query ? norm_query : query, query_len);
|
||||
query_buff[query_len] = 0;
|
||||
|
||||
/* OK to create a new hashtable entry */
|
||||
entry = hash_entry_alloc(pgss, &key, GetDatabaseEncoding());
|
||||
if (entry == NULL)
|
||||
{
|
||||
if (!query_found)
|
||||
{
|
||||
/* Restore previous query buffer length. */
|
||||
memcpy(pgss_qbuf, &prev_qbuf_len, sizeof(prev_qbuf_len));
|
||||
}
|
||||
LWLockRelease(pgss->lock);
|
||||
if (norm_query)
|
||||
pfree(norm_query);
|
||||
return;
|
||||
}
|
||||
entry->query_pos = query_entry->query_pos;
|
||||
entry->query_pos = dsa_query_pointer;
|
||||
}
|
||||
else
|
||||
dshash_release_lock(get_pgssHash(), entry);
|
||||
|
||||
|
||||
if (jstate == NULL)
|
||||
pgss_update_entry(entry, /* entry */
|
||||
@@ -1619,9 +1609,6 @@ pg_stat_monitor_reset(PG_FUNCTION_ARGS)
|
||||
LWLockAcquire(pgss->lock, LW_EXCLUSIVE);
|
||||
hash_entry_dealloc(-1, -1, NULL);
|
||||
|
||||
/* Reset query buffer. */
|
||||
*(uint64 *) pgss_qbuf = 0;
|
||||
|
||||
LWLockRelease(pgss->lock);
|
||||
PG_RETURN_VOID();
|
||||
}
|
||||
@@ -1678,13 +1665,12 @@ pg_stat_monitor_internal(FunctionCallInfo fcinfo,
|
||||
Tuplestorestate *tupstore;
|
||||
MemoryContext per_query_ctx;
|
||||
MemoryContext oldcontext;
|
||||
HASH_SEQ_STATUS hash_seq;
|
||||
dshash_seq_status hstat;
|
||||
pgssEntry *entry;
|
||||
char parentid_txt[32];
|
||||
pgssSharedState *pgss = pgsm_get_ss();
|
||||
HTAB *pgss_hash = pgsm_get_hash();
|
||||
char *query_txt = (char *) palloc0(PGSM_QUERY_MAX_LEN + 1);
|
||||
char *parent_query_txt = (char *) palloc0(PGSM_QUERY_MAX_LEN + 1);
|
||||
char *query_txt = NULL;
|
||||
char *parent_query_txt = NULL;
|
||||
int expected_columns = (api_version >= PGSM_V2_0)?PG_STAT_MONITOR_COLS_V2_0:PG_STAT_MONITOR_COLS_V1_0;
|
||||
|
||||
/* Safety check... */
|
||||
@@ -1722,10 +1708,11 @@ pg_stat_monitor_internal(FunctionCallInfo fcinfo,
|
||||
|
||||
MemoryContextSwitchTo(oldcontext);
|
||||
|
||||
LWLockAcquire(pgss->lock, LW_SHARED);
|
||||
// LWLockAcquire(pgss->lock, LW_SHARED);
|
||||
|
||||
hash_seq_init(&hash_seq, pgss_hash);
|
||||
while ((entry = hash_seq_search(&hash_seq)) != NULL)
|
||||
dshash_seq_init(&hstat, get_pgssHash(), false);
|
||||
|
||||
while ((entry = dshash_seq_next(&hstat)) != NULL)
|
||||
{
|
||||
Datum values[PG_STAT_MONITOR_COLS] = {0};
|
||||
bool nulls[PG_STAT_MONITOR_COLS] = {0};
|
||||
@@ -1740,6 +1727,8 @@ pg_stat_monitor_internal(FunctionCallInfo fcinfo,
|
||||
uint64 userid = entry->key.userid;
|
||||
int64 ip = entry->key.ip;
|
||||
uint64 planid = entry->key.planid;
|
||||
dsa_area *query_dsa_area;
|
||||
char *query_ptr;
|
||||
#if PG_VERSION_NUM < 140000
|
||||
bool toplevel = 1;
|
||||
bool is_allowed_role = is_member_of_role(GetUserId(), DEFAULT_ROLE_READ_ALL_STATS);
|
||||
@@ -1747,15 +1736,10 @@ pg_stat_monitor_internal(FunctionCallInfo fcinfo,
|
||||
bool is_allowed_role = is_member_of_role(GetUserId(), ROLE_PG_READ_ALL_STATS);
|
||||
bool toplevel = entry->key.toplevel;
|
||||
#endif
|
||||
|
||||
if (read_query(pgss_qbuf, queryid, query_txt, entry->query_pos) == 0)
|
||||
{
|
||||
int rc;
|
||||
|
||||
rc = read_query_buffer(bucketid, queryid, query_txt, entry->query_pos);
|
||||
if (rc != 1)
|
||||
snprintf(query_txt, 32, "%s", "<insufficient disk/shared space>");
|
||||
}
|
||||
/* Load the query text from dsa area */
|
||||
query_dsa_area = get_dsa_area_for_query_text();
|
||||
query_ptr = dsa_get_address(query_dsa_area, entry->query_pos);
|
||||
query_txt = pstrdup(query_ptr);
|
||||
|
||||
/* copy counters to a local variable to keep locking time short */
|
||||
{
|
||||
@@ -1783,15 +1767,17 @@ pg_stat_monitor_internal(FunctionCallInfo fcinfo,
|
||||
if (tmp.state == PGSS_PARSE || tmp.state == PGSS_PLAN)
|
||||
continue;
|
||||
|
||||
/* read the parent query text if any */
|
||||
if (tmp.info.parentid != UINT64CONST(0))
|
||||
{
|
||||
if (read_query(pgss_qbuf, tmp.info.parentid, parent_query_txt, 0) == 0)
|
||||
if (DsaPointerIsValid(tmp.info.parent_query))
|
||||
{
|
||||
int rc = read_query_buffer(bucketid, tmp.info.parentid, parent_query_txt, 0);
|
||||
|
||||
if (rc != 1)
|
||||
snprintf(parent_query_txt, 32, "%s", "<insufficient disk/shared space>");
|
||||
query_dsa_area = get_dsa_area_for_query_text();
|
||||
query_ptr = dsa_get_address(query_dsa_area, tmp.info.parent_query);
|
||||
parent_query_txt = pstrdup(query_ptr);
|
||||
}
|
||||
else
|
||||
parent_query_txt = pstrdup("parent query text not available");
|
||||
}
|
||||
/* bucketid at column number 0 */
|
||||
values[i++] = Int64GetDatumFast(bucketid);
|
||||
@@ -2071,10 +2057,12 @@ pg_stat_monitor_internal(FunctionCallInfo fcinfo,
|
||||
tuplestore_putvalues(tupstore, tupdesc, values, nulls);
|
||||
}
|
||||
/* clean up and return the tuplestore */
|
||||
LWLockRelease(pgss->lock);
|
||||
dshash_seq_term(&hstat);
|
||||
|
||||
pfree(query_txt);
|
||||
pfree(parent_query_txt);
|
||||
if(query_txt)
|
||||
pfree(query_txt);
|
||||
if(parent_query_txt)
|
||||
pfree(parent_query_txt);
|
||||
|
||||
tuplestore_donestoring(tupstore);
|
||||
}
|
||||
@@ -2120,7 +2108,6 @@ get_next_wbucket(pgssSharedState *pgss)
|
||||
|
||||
if (update_bucket)
|
||||
{
|
||||
char file_name[1024];
|
||||
|
||||
new_bucket_id = (tv.tv_sec / PGSM_BUCKET_TIME) % PGSM_MAX_BUCKETS;
|
||||
|
||||
@@ -2128,24 +2115,7 @@ get_next_wbucket(pgssSharedState *pgss)
|
||||
prev_bucket_id = pg_atomic_exchange_u64(&pgss->current_wbucket, new_bucket_id);
|
||||
|
||||
LWLockAcquire(pgss->lock, LW_EXCLUSIVE);
|
||||
hash_entry_dealloc(new_bucket_id, prev_bucket_id, pgss_qbuf);
|
||||
|
||||
if (pgss->overflow)
|
||||
{
|
||||
pgss->n_bucket_cycles += 1;
|
||||
if (pgss->n_bucket_cycles >= PGSM_MAX_BUCKETS)
|
||||
{
|
||||
/*
|
||||
* A full rotation of PGSM_MAX_BUCKETS buckets happened since
|
||||
* we detected a query buffer overflow.
|
||||
* Reset overflow state and remove the dump file.
|
||||
*/
|
||||
pgss->overflow = false;
|
||||
pgss->n_bucket_cycles = 0;
|
||||
snprintf(file_name, 1024, "%s", PGSM_TEXT_FILE);
|
||||
unlink(file_name);
|
||||
}
|
||||
}
|
||||
hash_entry_dealloc(new_bucket_id, prev_bucket_id, NULL);
|
||||
|
||||
LWLockRelease(pgss->lock);
|
||||
|
||||
@@ -3144,165 +3114,6 @@ intarray_get_datum(int32 arr[], int len)
|
||||
|
||||
}
|
||||
|
||||
uint64
|
||||
read_query(unsigned char *buf, uint64 queryid, char *query, size_t pos)
|
||||
{
|
||||
bool found = false;
|
||||
uint64 query_id = 0;
|
||||
uint64 query_len = 0;
|
||||
uint64 rlen = 0;
|
||||
uint64 buf_len = 0;
|
||||
|
||||
memcpy(&buf_len, buf, sizeof(uint64));
|
||||
if (buf_len <= 0)
|
||||
goto exit;
|
||||
|
||||
/* If a position hint is given, try to locate the query directly. */
|
||||
if (pos != 0 && (pos + sizeof(uint64) + sizeof(uint64)) < buf_len)
|
||||
{
|
||||
memcpy(&query_id, &buf[pos], sizeof(uint64));
|
||||
if (query_id != queryid)
|
||||
return 0;
|
||||
|
||||
pos += sizeof(uint64);
|
||||
|
||||
memcpy(&query_len, &buf[pos], sizeof(uint64)); /* query len */
|
||||
pos += sizeof(uint64);
|
||||
|
||||
if (pos + query_len > buf_len) /* avoid reading past buffer's length. */
|
||||
return 0;
|
||||
|
||||
memcpy(query, &buf[pos], query_len); /* Actual query */
|
||||
query[query_len] = '\0';
|
||||
|
||||
return queryid;
|
||||
}
|
||||
|
||||
rlen = sizeof(uint64); /* Move forwad to skip length bytes */
|
||||
for (;;)
|
||||
{
|
||||
if (rlen >= buf_len)
|
||||
goto exit;
|
||||
|
||||
memcpy(&query_id, &buf[rlen], sizeof(uint64)); /* query id */
|
||||
if (query_id == queryid)
|
||||
found = true;
|
||||
|
||||
rlen += sizeof(uint64);
|
||||
if (buf_len <= rlen)
|
||||
continue;
|
||||
|
||||
memcpy(&query_len, &buf[rlen], sizeof(uint64)); /* query len */
|
||||
rlen += sizeof(uint64);
|
||||
if (buf_len < rlen + query_len)
|
||||
goto exit;
|
||||
if (found)
|
||||
{
|
||||
if (query != NULL)
|
||||
{
|
||||
memcpy(query, &buf[rlen], query_len); /* Actual query */
|
||||
query[query_len] = 0;
|
||||
}
|
||||
return query_id;
|
||||
}
|
||||
rlen += query_len;
|
||||
}
|
||||
exit:
|
||||
if (PGSM_OVERFLOW_TARGET == OVERFLOW_TARGET_NONE)
|
||||
{
|
||||
sprintf(query, "%s", "<insufficient shared space>");
|
||||
return -1;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
bool
|
||||
SaveQueryText(uint64 bucketid,
|
||||
uint64 queryid,
|
||||
unsigned char *buf,
|
||||
const char *query,
|
||||
uint64 query_len,
|
||||
size_t *query_pos)
|
||||
{
|
||||
uint64 buf_len = 0;
|
||||
|
||||
memcpy(&buf_len, buf, sizeof(uint64));
|
||||
if (buf_len == 0)
|
||||
buf_len += sizeof(uint64);
|
||||
|
||||
if (QUERY_BUFFER_OVERFLOW(buf_len, query_len))
|
||||
{
|
||||
switch (PGSM_OVERFLOW_TARGET)
|
||||
{
|
||||
case OVERFLOW_TARGET_NONE:
|
||||
return false;
|
||||
case OVERFLOW_TARGET_DISK:
|
||||
{
|
||||
bool dump_ok;
|
||||
pgssSharedState *pgss = pgsm_get_ss();
|
||||
|
||||
if (pgss->overflow)
|
||||
{
|
||||
elog(DEBUG1, "query buffer overflowed twice");
|
||||
return false;
|
||||
}
|
||||
|
||||
/*
|
||||
* If the query buffer is empty, there is nothing to dump,
|
||||
* this also means that the current query length exceeds
|
||||
* MAX_QUERY_BUF.
|
||||
*/
|
||||
if (buf_len <= sizeof(uint64))
|
||||
return false;
|
||||
|
||||
dump_ok = dump_queries_buffer(bucketid, buf, MAX_QUERY_BUF);
|
||||
buf_len = sizeof(uint64);
|
||||
|
||||
if (dump_ok)
|
||||
{
|
||||
pgss->overflow = true;
|
||||
pgss->n_bucket_cycles = 0;
|
||||
}
|
||||
|
||||
/*
|
||||
* We must check for overflow again, as the query length
|
||||
* may exceed the total size allocated to the buffer
|
||||
* (MAX_QUERY_BUF).
|
||||
*/
|
||||
if (QUERY_BUFFER_OVERFLOW(buf_len, query_len))
|
||||
{
|
||||
/*
|
||||
* If we successfully dumped the query buffer to disk,
|
||||
* then reset the buffer, otherwise we could end up
|
||||
* dumping the same buffer again.
|
||||
*/
|
||||
if (dump_ok)
|
||||
*(uint64 *) buf = 0;
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
}
|
||||
break;
|
||||
default:
|
||||
Assert(false);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
*query_pos = buf_len;
|
||||
|
||||
memcpy(&buf[buf_len], &queryid, sizeof(uint64)); /* query id */
|
||||
buf_len += sizeof(uint64);
|
||||
|
||||
memcpy(&buf[buf_len], &query_len, sizeof(uint64)); /* query length */
|
||||
buf_len += sizeof(uint64);
|
||||
|
||||
memcpy(&buf[buf_len], query, query_len); /* query */
|
||||
buf_len += query_len;
|
||||
memcpy(buf, &buf_len, sizeof(uint64));
|
||||
return true;
|
||||
}
|
||||
|
||||
Datum
|
||||
pg_stat_monitor_settings(PG_FUNCTION_ARGS)
|
||||
@@ -3440,12 +3251,6 @@ pg_stat_monitor_hook_stats(PG_FUNCTION_ARGS)
|
||||
return (Datum) 0;
|
||||
}
|
||||
|
||||
void
|
||||
set_qbuf(unsigned char *buf)
|
||||
{
|
||||
pgss_qbuf = buf;
|
||||
*(uint64 *) pgss_qbuf = 0;
|
||||
}
|
||||
|
||||
void
|
||||
pgsm_emit_log_hook(ErrorData *edata)
|
||||
@@ -3482,145 +3287,6 @@ IsSystemInitialized(void)
|
||||
return (system_init && IsHashInitialize());
|
||||
}
|
||||
|
||||
static bool
|
||||
dump_queries_buffer(int bucket_id, unsigned char *buf, int buf_len)
|
||||
{
|
||||
int fd = 0;
|
||||
char file_name[1024];
|
||||
bool success = true;
|
||||
int off = 0;
|
||||
int tries = 0;
|
||||
|
||||
snprintf(file_name, 1024, "%s", PGSM_TEXT_FILE);
|
||||
fd = OpenTransientFile(file_name, O_RDWR | O_CREAT | O_APPEND | PG_BINARY);
|
||||
if (fd < 0)
|
||||
{
|
||||
ereport(LOG,
|
||||
(errcode_for_file_access(),
|
||||
errmsg("could not write file \"%s\": %m",
|
||||
file_name)));
|
||||
return false;
|
||||
}
|
||||
|
||||
/* Loop until write buf_len bytes to the file. */
|
||||
do
|
||||
{
|
||||
ssize_t nwrite = write(fd, buf + off, buf_len - off);
|
||||
|
||||
if (nwrite == -1)
|
||||
{
|
||||
if (errno == EINTR && tries++ < 3)
|
||||
continue;
|
||||
|
||||
success = false;
|
||||
break;
|
||||
}
|
||||
off += nwrite;
|
||||
} while (off < buf_len);
|
||||
|
||||
if (!success)
|
||||
ereport(LOG,
|
||||
(errcode_for_file_access(),
|
||||
errmsg("could not write file \"%s\": %m", file_name)));
|
||||
|
||||
if (fd > 0)
|
||||
CloseTransientFile(fd);
|
||||
|
||||
return success;
|
||||
}
|
||||
|
||||
/*
|
||||
* Try to locate query text in a dumped file for bucket_id.
|
||||
*
|
||||
* Returns:
|
||||
* 1 Query sucessfully read, query_text will contain the query text.
|
||||
* 0 Query not found.
|
||||
* -1 I/O Error.
|
||||
*/
|
||||
int
|
||||
read_query_buffer(int bucket_id, uint64 queryid, char *query_txt, size_t pos)
|
||||
{
|
||||
int fd = 0;
|
||||
char file_name[1024];
|
||||
unsigned char *buf = NULL;
|
||||
ssize_t nread = 0;
|
||||
int off = 0;
|
||||
int tries = 0;
|
||||
bool done = false;
|
||||
bool found = false;
|
||||
|
||||
snprintf(file_name, 1024, "%s", PGSM_TEXT_FILE);
|
||||
fd = OpenTransientFile(file_name, O_RDONLY | PG_BINARY);
|
||||
if (fd < 0)
|
||||
goto exit;
|
||||
|
||||
buf = (unsigned char *) palloc(MAX_QUERY_BUF);
|
||||
while (!done)
|
||||
{
|
||||
off = 0;
|
||||
/* read a chunck of MAX_QUERY_BUF size. */
|
||||
do
|
||||
{
|
||||
nread = read(fd, buf + off, MAX_QUERY_BUF - off);
|
||||
if (nread == -1)
|
||||
{
|
||||
if (errno == EINTR && tries++ < 3) /* read() was interrupted,
|
||||
* attempt to read again
|
||||
* (max attempts=3) */
|
||||
continue;
|
||||
|
||||
goto exit;
|
||||
}
|
||||
else if (nread == 0) /* EOF */
|
||||
{
|
||||
done = true;
|
||||
break;
|
||||
}
|
||||
|
||||
off += nread;
|
||||
} while (off < MAX_QUERY_BUF);
|
||||
|
||||
if (off == MAX_QUERY_BUF)
|
||||
{
|
||||
/* we have a chunck, scan it looking for queryid. */
|
||||
if (read_query(buf, queryid, query_txt, pos) != 0)
|
||||
{
|
||||
|
||||
found = true;
|
||||
/* query was found, don't need to read another chunck. */
|
||||
break;
|
||||
}
|
||||
}
|
||||
else
|
||||
|
||||
/*
|
||||
* Either done=true or file has a size not multiple of
|
||||
* MAX_QUERY_BUF. It is safe to assume that the file was truncated
|
||||
* or corrupted.
|
||||
*/
|
||||
break;
|
||||
}
|
||||
|
||||
exit:
|
||||
if (fd < 0 || nread == -1)
|
||||
ereport(LOG,
|
||||
(errcode_for_file_access(),
|
||||
errmsg("could not read file \"%s\": %m",
|
||||
file_name)));
|
||||
|
||||
if (fd >= 0)
|
||||
CloseTransientFile(fd);
|
||||
|
||||
if (buf)
|
||||
pfree(buf);
|
||||
|
||||
if (found)
|
||||
return 1;
|
||||
else if (fd == -1 || nread == -1)
|
||||
return -1; /* I/O error. */
|
||||
else
|
||||
return 0; /* Not found. */
|
||||
}
|
||||
|
||||
static double
|
||||
time_diff(struct timeval end, struct timeval start)
|
||||
|
||||
Reference in New Issue
Block a user