Merge pull request #115 from darkfronza/PG-254_improve_performance_when_querying_pgsm_view
Pg 254: improve performance when querying pgsm viewpull/119/head
commit
e234ae203d
286
hash_query.c
286
hash_query.c
|
@ -22,18 +22,19 @@
|
|||
|
||||
static pgssSharedState *pgss;
|
||||
static HTAB *pgss_hash;
|
||||
static HTAB *pgss_query_hash;
|
||||
|
||||
static HTAB* hash_init(const char *hash_name, int key_size, int entry_size, int hash_size);
|
||||
/*
|
||||
* Copy all queries from query_buffer[old_bucket_id] to query_buffer[new_bucket_id]
|
||||
* whose query ids are found in the array 'query_ids', of length 'n_queries'.
|
||||
* Copy query from src_buffer to dst_buff.
|
||||
* Use query_id and query_pos to fast locate query in source buffer.
|
||||
* Store updated query position in the destination buffer into param query_pos.
|
||||
*/
|
||||
static void copy_queries(unsigned char *query_buffer[],
|
||||
uint64 new_bucket_id,
|
||||
uint64 old_bucket_id,
|
||||
uint64 *query_ids,
|
||||
size_t n_queries);
|
||||
static bool copy_query(uint64 bucket_id,
|
||||
uint64 query_id,
|
||||
uint64 query_pos,
|
||||
unsigned char *dst_buf,
|
||||
unsigned char *src_buf,
|
||||
size_t *new_query_pos);
|
||||
|
||||
static HTAB*
|
||||
hash_init(const char *hash_name, int key_size, int entry_size, int hash_size)
|
||||
|
@ -55,7 +56,6 @@ pgss_startup(void)
|
|||
|
||||
pgss = NULL;
|
||||
pgss_hash = NULL;
|
||||
pgss_query_hash = NULL;
|
||||
|
||||
/*
|
||||
* Create or attach to the shared memory state, including hash table
|
||||
|
@ -85,7 +85,6 @@ pgss_startup(void)
|
|||
}
|
||||
|
||||
pgss_hash = hash_init("pg_stat_monitor: bucket hashtable", sizeof(pgssHashKey), sizeof(pgssEntry), MAX_BUCKET_ENTRIES);
|
||||
pgss_query_hash = hash_init("pg_stat_monitor: query hashtable", sizeof(pgssQueryHashKey), sizeof(pgssQueryEntry),MAX_BUCKET_ENTRIES);
|
||||
|
||||
LWLockRelease(AddinShmemInitLock);
|
||||
|
||||
|
@ -169,146 +168,44 @@ hash_entry_alloc(pgssSharedState *pgss, pgssHashKey *key, int encoding)
|
|||
elog(DEBUG1, "%s", "pg_stat_monitor: out of memory");
|
||||
return entry;
|
||||
}
|
||||
|
||||
/*
|
||||
* Reset all the entries.
|
||||
* Prepare resources for using the new bucket:
|
||||
* - Deallocate finished hash table entries in new_bucket_id (entries whose
|
||||
* state is PGSS_FINISHED or PGSS_FINISHED).
|
||||
* - Clear query buffer for new_bucket_id.
|
||||
* - If old_bucket_id != -1, move all pending hash table entries in
|
||||
* old_bucket_id to the new bucket id, also move pending queries from the
|
||||
* previous query buffer (query_buffer[old_bucket_id]) to the new one
|
||||
* (query_buffer[new_bucket_id]).
|
||||
*
|
||||
* Caller must hold an exclusive lock on pgss->lock.
|
||||
*/
|
||||
void
|
||||
hash_query_entryies_reset()
|
||||
{
|
||||
HASH_SEQ_STATUS hash_seq;
|
||||
pgssQueryEntry *entry;
|
||||
|
||||
hash_seq_init(&hash_seq, pgss_query_hash);
|
||||
while ((entry = hash_seq_search(&hash_seq)) != NULL)
|
||||
entry = hash_search(pgss_query_hash, &entry->key, HASH_REMOVE, NULL);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* Deallocate finished entries in new_bucket_id.
|
||||
*
|
||||
* Move all pending queries in query_buffer[old_bucket_id] to
|
||||
* query_buffer[new_bucket_id].
|
||||
*
|
||||
* Caller must hold an exclusive lock on pgss->lock.
|
||||
*/
|
||||
void
|
||||
hash_query_entry_dealloc(int new_bucket_id, int old_bucket_id, unsigned char *query_buffer[])
|
||||
{
|
||||
HASH_SEQ_STATUS hash_seq;
|
||||
pgssQueryEntry *entry;
|
||||
pgssSharedState *pgss = pgsm_get_ss();
|
||||
/*
|
||||
* Store pending query ids from the previous bucket.
|
||||
* If there are more pending queries than MAX_PENDING_QUERIES then
|
||||
* we try to dynamically allocate memory for them.
|
||||
*/
|
||||
#define MAX_PENDING_QUERIES 128
|
||||
uint64 pending_query_ids[MAX_PENDING_QUERIES];
|
||||
uint64 *pending_query_ids_buf = NULL;
|
||||
size_t n_pending_queries = 0;
|
||||
bool out_of_memory = false;
|
||||
|
||||
/* Clear all queries in the query buffer for the new bucket. */
|
||||
memset(query_buffer[new_bucket_id], 0, pgss->query_buf_size_bucket);
|
||||
|
||||
hash_seq_init(&hash_seq, pgss_query_hash);
|
||||
while ((entry = hash_seq_search(&hash_seq)) != NULL)
|
||||
{
|
||||
/* Remove previous finished query entries matching new bucket id. */
|
||||
if (entry->key.bucket_id == new_bucket_id)
|
||||
{
|
||||
if (entry->state == PGSS_FINISHED || entry->state == PGSS_ERROR)
|
||||
{
|
||||
entry = hash_search(pgss_query_hash, &entry->key, HASH_REMOVE, NULL);
|
||||
}
|
||||
}
|
||||
/* Set up a list of pending query ids from the previous bucket. */
|
||||
else if (entry->key.bucket_id == old_bucket_id &&
|
||||
(entry->state == PGSS_PARSE ||
|
||||
entry->state == PGSS_PLAN ||
|
||||
entry->state == PGSS_EXEC))
|
||||
{
|
||||
if (n_pending_queries < MAX_PENDING_QUERIES)
|
||||
{
|
||||
pending_query_ids[n_pending_queries] = entry->key.queryid;
|
||||
++n_pending_queries;
|
||||
}
|
||||
else
|
||||
{
|
||||
/*
|
||||
* No. of pending queries exceeds MAX_PENDING_QUERIES.
|
||||
* Try to allocate memory from heap to keep track of pending query ids.
|
||||
* If allocation fails we manually copy pending query to the next query buffer.
|
||||
*/
|
||||
if (!out_of_memory && !pending_query_ids_buf)
|
||||
{
|
||||
/* Allocate enough room for query ids. */
|
||||
pending_query_ids_buf = malloc(sizeof(uint64) * hash_get_num_entries(pgss_query_hash));
|
||||
if (pending_query_ids_buf != NULL)
|
||||
memcpy(pending_query_ids_buf, pending_query_ids, n_pending_queries * sizeof(uint64));
|
||||
else
|
||||
out_of_memory = true;
|
||||
}
|
||||
|
||||
if (!out_of_memory)
|
||||
{
|
||||
/* Store pending query id in the dynamic buffer. */
|
||||
pending_query_ids_buf[n_pending_queries] = entry->key.queryid;
|
||||
++n_pending_queries;
|
||||
}
|
||||
else
|
||||
{
|
||||
/* No memory, manually copy query from previous buffer. */
|
||||
char query_txt[1024];
|
||||
|
||||
if (read_query(query_buffer[old_bucket_id], old_bucket_id, entry->key.queryid, query_txt) != 0
|
||||
|| read_query_buffer(old_bucket_id, entry->key.queryid, query_txt) == MAX_QUERY_BUFFER_BUCKET)
|
||||
{
|
||||
SaveQueryText(new_bucket_id, entry->key.queryid, query_buffer[new_bucket_id], query_txt, strlen(query_txt));
|
||||
}
|
||||
else
|
||||
/* There was no space available to store the pending query text. */
|
||||
elog(WARNING, "hash_query_entry_dealloc: Failed to move pending query %lX, %s",
|
||||
entry->key.queryid,
|
||||
(PGSM_OVERFLOW_TARGET == OVERFLOW_TARGET_NONE) ?
|
||||
"insufficient shared space for query" :
|
||||
"I/O error reading query from disk");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/* Copy all detected pending queries from previous bucket id to the new one. */
|
||||
if (n_pending_queries > 0) {
|
||||
if (n_pending_queries < MAX_PENDING_QUERIES)
|
||||
pending_query_ids_buf = pending_query_ids;
|
||||
|
||||
copy_queries(query_buffer, new_bucket_id, old_bucket_id, pending_query_ids_buf, n_pending_queries);
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Deallocate least-used entries.
|
||||
*
|
||||
* If old_bucket_id != -1, move all pending queries in old_bucket_id
|
||||
* to the new bucket id.
|
||||
*
|
||||
* Caller must hold an exclusive lock on pgss->lock.
|
||||
*/
|
||||
bool
|
||||
hash_entry_dealloc(int new_bucket_id, int old_bucket_id)
|
||||
hash_entry_dealloc(int new_bucket_id, int old_bucket_id, unsigned char *query_buffer[])
|
||||
{
|
||||
HASH_SEQ_STATUS hash_seq;
|
||||
pgssEntry *entry = NULL;
|
||||
List *pending_entries = NIL;
|
||||
ListCell *pending_entry;
|
||||
pgssSharedState *pgss = pgsm_get_ss();
|
||||
|
||||
/* Store pending query ids from the previous bucket. */
|
||||
List *pending_entries = NIL;
|
||||
ListCell *pending_entry;
|
||||
|
||||
if (new_bucket_id != -1)
|
||||
{
|
||||
/* Clear all queries in the query buffer for the new bucket. */
|
||||
memset(query_buffer[new_bucket_id], 0, pgss->query_buf_size_bucket);
|
||||
}
|
||||
|
||||
/* Iterate over the hash table. */
|
||||
hash_seq_init(&hash_seq, pgss_hash);
|
||||
while ((entry = hash_seq_search(&hash_seq)) != NULL)
|
||||
{
|
||||
/*
|
||||
* Remove all entries if new_bucket_id == -1.
|
||||
* Otherwise remove entry in new_bucket_id if it has finished already.
|
||||
*/
|
||||
if (new_bucket_id < 0 ||
|
||||
(entry->key.bucket_id == new_bucket_id &&
|
||||
(entry->counters.state == PGSS_FINISHED || entry->counters.state == PGSS_ERROR)))
|
||||
|
@ -333,6 +230,7 @@ hash_entry_dealloc(int new_bucket_id, int old_bucket_id)
|
|||
if (!bkp_entry)
|
||||
{
|
||||
/* No memory, remove pending query entry from the previous bucket. */
|
||||
elog(ERROR, "hash_entry_dealloc: out of memory");
|
||||
entry = hash_search(pgss_hash, &entry->key, HASH_REMOVE, NULL);
|
||||
continue;
|
||||
}
|
||||
|
@ -370,14 +268,19 @@ hash_entry_dealloc(int new_bucket_id, int old_bucket_id)
|
|||
new_entry->counters = old_entry->counters;
|
||||
SpinLockInit(&new_entry->mutex);
|
||||
new_entry->encoding = old_entry->encoding;
|
||||
/* copy query's text from previous bucket to the new one. */
|
||||
copy_query(new_bucket_id,
|
||||
new_entry->key.queryid, /* query id */
|
||||
old_entry->query_pos, /* query position in buffer */
|
||||
query_buffer[new_bucket_id], /* destination query buffer */
|
||||
query_buffer[old_bucket_id], /* source query buffer */
|
||||
&new_entry->query_pos); /* position in which query was inserted into destination buffer */
|
||||
}
|
||||
|
||||
free(old_entry);
|
||||
}
|
||||
|
||||
list_free(pending_entries);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
/*
|
||||
|
@ -401,45 +304,6 @@ hash_entry_reset()
|
|||
LWLockRelease(pgss->lock);
|
||||
}
|
||||
|
||||
/* Caller must acquire a lock */
|
||||
pgssQueryEntry*
|
||||
hash_create_query_entry(uint64 bucket_id, uint64 queryid, uint64 dbid, uint64 userid, uint64 ip, uint64 appid)
|
||||
{
|
||||
pgssQueryHashKey key;
|
||||
pgssQueryEntry *entry;
|
||||
bool found;
|
||||
|
||||
key.queryid = queryid;
|
||||
key.bucket_id = bucket_id;
|
||||
key.dbid = dbid;
|
||||
key.userid = userid;
|
||||
key.ip = ip;
|
||||
key.appid = appid;
|
||||
|
||||
entry = (pgssQueryEntry *) hash_search(pgss_query_hash, &key, HASH_ENTER_NULL, &found);
|
||||
return entry;
|
||||
}
|
||||
|
||||
/* Caller must acquire a lock */
|
||||
pgssQueryEntry*
|
||||
hash_find_query_entry(uint64 bucket_id, uint64 queryid, uint64 dbid, uint64 userid, uint64 ip, uint64 appid)
|
||||
{
|
||||
pgssQueryHashKey key;
|
||||
pgssQueryEntry *entry;
|
||||
bool found;
|
||||
|
||||
key.queryid = queryid;
|
||||
key.bucket_id = bucket_id;
|
||||
key.dbid = dbid;
|
||||
key.userid = userid;
|
||||
key.ip = ip;
|
||||
key.appid = appid;
|
||||
|
||||
/* Lookup the hash table entry with shared lock. */
|
||||
entry = (pgssQueryEntry *) hash_search(pgss_query_hash, &key, HASH_FIND, &found);
|
||||
return entry;
|
||||
}
|
||||
|
||||
bool
|
||||
IsHashInitialize(void)
|
||||
{
|
||||
|
@ -447,52 +311,38 @@ IsHashInitialize(void)
|
|||
pgss_hash != NULL);
|
||||
}
|
||||
|
||||
static void copy_queries(unsigned char *query_buffer[],
|
||||
uint64 new_bucket_id,
|
||||
uint64 old_bucket_id,
|
||||
uint64 *query_ids,
|
||||
size_t n_queries)
|
||||
static bool copy_query(uint64 bucket_id,
|
||||
uint64 query_id,
|
||||
uint64 query_pos,
|
||||
unsigned char *dst_buf,
|
||||
unsigned char *src_buf,
|
||||
size_t *new_query_pos)
|
||||
{
|
||||
bool found;
|
||||
uint64 query_id = 0;
|
||||
uint64 query_len = 0;
|
||||
uint64 rlen = 0;
|
||||
uint64 buf_len = 0;
|
||||
unsigned char *src_buffer = query_buffer[old_bucket_id];
|
||||
size_t i;
|
||||
uint64 query_len = 0;
|
||||
uint64 buf_len = 0;
|
||||
|
||||
memcpy(&buf_len, src_buffer, sizeof (uint64));
|
||||
memcpy(&buf_len, src_buf, sizeof (uint64));
|
||||
if (buf_len <= 0)
|
||||
return;
|
||||
return false;
|
||||
|
||||
rlen = sizeof (uint64); /* Move forwad to skip length bytes */
|
||||
while (rlen < buf_len)
|
||||
/* Try to locate the query directly. */
|
||||
if (query_pos != 0 && (query_pos + sizeof(uint64) + sizeof(uint64)) < buf_len)
|
||||
{
|
||||
found = false;
|
||||
memcpy(&query_id, &src_buffer[rlen], sizeof (uint64)); /* query id */
|
||||
for (i = 0; i < n_queries; ++i)
|
||||
{
|
||||
if (query_id == query_ids[i])
|
||||
{
|
||||
found = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (*(uint64 *)&src_buf[query_pos] != query_id)
|
||||
return false;
|
||||
|
||||
rlen += sizeof (uint64);
|
||||
if (buf_len <= rlen)
|
||||
break;
|
||||
query_pos += sizeof(uint64);
|
||||
|
||||
memcpy(&query_len, &src_buffer[rlen], sizeof (uint64)); /* query len */
|
||||
rlen += sizeof (uint64);
|
||||
if (buf_len < rlen + query_len)
|
||||
break;
|
||||
memcpy(&query_len, &src_buf[query_pos], sizeof(uint64)); /* query len */
|
||||
query_pos += sizeof(uint64);
|
||||
|
||||
if (found) {
|
||||
SaveQueryText(new_bucket_id, query_id, query_buffer[new_bucket_id],
|
||||
(const char *)&src_buffer[rlen], query_len);
|
||||
}
|
||||
if (query_pos + query_len > buf_len) /* avoid reading past buffer's length. */
|
||||
return false;
|
||||
|
||||
rlen += query_len;
|
||||
return SaveQueryText(bucket_id, query_id, dst_buf,
|
||||
(const char *)&src_buf[query_pos],
|
||||
query_len, new_query_pos);
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
|
|
@ -147,15 +147,6 @@ static uint64 pgss_hash_string(const char *str, int len);
|
|||
char *unpack_sql_state(int sql_state);
|
||||
|
||||
static void pgss_store_error(uint64 queryid, const char * query, ErrorData *edata);
|
||||
static pgssQueryEntry *pgss_store_query_info(uint64 bucketid,
|
||||
uint64 queryid,
|
||||
uint64 dbid,
|
||||
uint64 userid,
|
||||
uint64 ip,
|
||||
uint64 appid,
|
||||
const char *query,
|
||||
uint64 query_len,
|
||||
pgssStoreKind kind);
|
||||
|
||||
static void pgss_store_utility(const char *query,
|
||||
double total_time,
|
||||
|
@ -1314,40 +1305,6 @@ pgss_update_entry(pgssEntry *entry,
|
|||
}
|
||||
}
|
||||
|
||||
static pgssEntry*
|
||||
pgss_get_entry(uint64 bucket_id,
|
||||
uint64 userid,
|
||||
uint64 dbid,
|
||||
uint64 queryid,
|
||||
uint64 ip,
|
||||
uint64 planid,
|
||||
uint64 appid)
|
||||
{
|
||||
pgssEntry *entry;
|
||||
pgssHashKey key;
|
||||
HTAB *pgss_hash = pgsm_get_hash();
|
||||
pgssSharedState *pgss = pgsm_get_ss();
|
||||
|
||||
key.bucket_id = bucket_id;
|
||||
key.userid = userid;
|
||||
key.dbid = MyDatabaseId;
|
||||
key.queryid = queryid;
|
||||
key.ip = pg_get_client_addr();
|
||||
key.planid = planid;
|
||||
key.appid = appid;
|
||||
|
||||
entry = (pgssEntry *) hash_search(pgss_hash, &key, HASH_FIND, NULL);
|
||||
if(!entry)
|
||||
{
|
||||
/* OK to create a new hashtable entry */
|
||||
entry = hash_entry_alloc(pgss, &key, GetDatabaseEncoding());
|
||||
if (entry == NULL)
|
||||
return NULL;
|
||||
}
|
||||
Assert(entry);
|
||||
return entry;
|
||||
}
|
||||
|
||||
static void
|
||||
pgss_store_query(uint64 queryid,
|
||||
const char * query,
|
||||
|
@ -1486,100 +1443,119 @@ pgss_store(uint64 queryid,
|
|||
JumbleState *jstate,
|
||||
pgssStoreKind kind)
|
||||
{
|
||||
pgssEntry *entry;
|
||||
HTAB *pgss_hash;
|
||||
pgssHashKey key;
|
||||
pgssEntry *entry;
|
||||
pgssSharedState *pgss = pgsm_get_ss();
|
||||
char application_name[APPLICATIONNAME_LEN];
|
||||
int application_name_len = pg_get_application_name(application_name);
|
||||
bool reset = false;
|
||||
uint64 bucketid;
|
||||
uint64 prev_bucket_id;
|
||||
char application_name[APPLICATIONNAME_LEN];
|
||||
int application_name_len;
|
||||
bool reset = false;
|
||||
uint64 bucketid;
|
||||
uint64 prev_bucket_id;
|
||||
uint64 userid;
|
||||
int con;
|
||||
uint64 dbid = MyDatabaseId;
|
||||
uint64 ip = pg_get_client_addr();
|
||||
uint64 planid = plan_info ? plan_info->planid: 0;
|
||||
uint64 appid = djb2_hash((unsigned char *)application_name, application_name_len);
|
||||
uint64 planid;
|
||||
uint64 appid;
|
||||
char comments[512] = "";
|
||||
bool out_of_memory = false;
|
||||
size_t query_len;
|
||||
|
||||
/* Monitoring is disabled */
|
||||
if (!PGSM_ENABLED)
|
||||
return;
|
||||
|
||||
Assert(query != NULL);
|
||||
if (kind == PGSS_ERROR)
|
||||
GetUserIdAndSecContext((unsigned int *)&userid, &con);
|
||||
else
|
||||
userid = GetUserId();
|
||||
|
||||
extract_query_comments(query, comments, sizeof(comments));
|
||||
|
||||
/* Safety check... */
|
||||
if (!IsSystemInitialized() || !pgss_qbuf[pg_atomic_read_u64(&pgss->current_wbucket)])
|
||||
return;
|
||||
|
||||
Assert(query != NULL);
|
||||
if (kind == PGSS_ERROR)
|
||||
GetUserIdAndSecContext((unsigned int *)&userid, &con);
|
||||
else
|
||||
userid = GetUserId();
|
||||
|
||||
application_name_len = pg_get_application_name(application_name);
|
||||
planid = plan_info ? plan_info->planid: 0;
|
||||
appid = djb2_hash((unsigned char *)application_name, application_name_len);
|
||||
|
||||
extract_query_comments(query, comments, sizeof(comments));
|
||||
|
||||
prev_bucket_id = pg_atomic_read_u64(&pgss->current_wbucket);
|
||||
bucketid = get_next_wbucket(pgss);
|
||||
|
||||
if (bucketid != prev_bucket_id)
|
||||
reset = true;
|
||||
|
||||
LWLockAcquire(pgss->lock, LW_EXCLUSIVE);
|
||||
key.bucket_id = bucketid;
|
||||
key.userid = userid;
|
||||
key.dbid = MyDatabaseId;
|
||||
key.queryid = queryid;
|
||||
key.ip = pg_get_client_addr();
|
||||
key.planid = planid;
|
||||
key.appid = appid;
|
||||
|
||||
switch (kind)
|
||||
pgss_hash = pgsm_get_hash();
|
||||
|
||||
LWLockAcquire(pgss->lock, LW_SHARED);
|
||||
|
||||
entry = (pgssEntry *) hash_search(pgss_hash, &key, HASH_FIND, NULL);
|
||||
if (!entry)
|
||||
{
|
||||
case PGSS_PARSE:
|
||||
case PGSS_PLAN:
|
||||
{
|
||||
pgssQueryEntry *query_entry;
|
||||
query_entry = pgss_store_query_info(bucketid, queryid, dbid, userid, ip, appid, query, strlen(query), kind);
|
||||
if (query_entry == NULL)
|
||||
out_of_memory = true;
|
||||
break;
|
||||
}
|
||||
case PGSS_ERROR:
|
||||
case PGSS_EXEC:
|
||||
case PGSS_FINISHED:
|
||||
{
|
||||
pgssQueryEntry *query_entry;
|
||||
query_entry = pgss_store_query_info(bucketid, queryid, dbid, userid, ip, appid, query, strlen(query), kind);
|
||||
if (query_entry == NULL)
|
||||
{
|
||||
out_of_memory = true;
|
||||
break;
|
||||
}
|
||||
query_entry->state = kind;
|
||||
entry = pgss_get_entry(bucketid, userid, dbid, queryid, ip, planid, appid);
|
||||
if (entry == NULL)
|
||||
{
|
||||
out_of_memory = true;
|
||||
break;
|
||||
}
|
||||
uint64 prev_qbuf_len;
|
||||
/* position in which the query's text was inserted into the query buffer. */
|
||||
size_t qpos = 0;
|
||||
|
||||
if (jstate == NULL)
|
||||
pgss_update_entry(entry, /* entry */
|
||||
bucketid, /* bucketid */
|
||||
queryid, /* queryid */
|
||||
query, /* query */
|
||||
comments, /* comments */
|
||||
plan_info, /* PlanInfo */
|
||||
cmd_type, /* CmdType */
|
||||
sys_info, /* SysInfo */
|
||||
error_info, /* ErrorInfo */
|
||||
total_time, /* total_time */
|
||||
rows, /* rows */
|
||||
bufusage, /* bufusage */
|
||||
walusage, /* walusage */
|
||||
reset, /* reset */
|
||||
kind); /* kind */
|
||||
query_len = strlen(query);
|
||||
if (query_len > PGSM_QUERY_MAX_LEN)
|
||||
query_len = PGSM_QUERY_MAX_LEN;
|
||||
|
||||
/* Need exclusive lock to make a new hashtable entry - promote */
|
||||
LWLockRelease(pgss->lock);
|
||||
LWLockAcquire(pgss->lock, LW_EXCLUSIVE);
|
||||
|
||||
/*
|
||||
* Save current query buffer length, if we fail to add a new
|
||||
* new entry to the hash table then we must restore the
|
||||
* original length.
|
||||
*/
|
||||
memcpy(&prev_qbuf_len, pgss_qbuf[bucketid], sizeof(prev_qbuf_len));
|
||||
if (!SaveQueryText(bucketid, queryid, pgss_qbuf[bucketid], query, query_len, &qpos))
|
||||
{
|
||||
LWLockRelease(pgss->lock);
|
||||
elog(DEBUG1, "pg_stat_monitor: insufficient shared space for query.");
|
||||
return;
|
||||
}
|
||||
break;
|
||||
case PGSS_NUMKIND:
|
||||
case PGSS_INVALID:
|
||||
break;
|
||||
|
||||
/* OK to create a new hashtable entry */
|
||||
entry = hash_entry_alloc(pgss, &key, GetDatabaseEncoding());
|
||||
if (entry == NULL)
|
||||
{
|
||||
/* Restore previous query buffer length. */
|
||||
memcpy(pgss_qbuf[bucketid], &prev_qbuf_len, sizeof(prev_qbuf_len));
|
||||
LWLockRelease(pgss->lock);
|
||||
elog(DEBUG1, "pg_stat_monitor: out of memory");
|
||||
return;
|
||||
}
|
||||
entry->query_pos = qpos;
|
||||
}
|
||||
|
||||
if (jstate == NULL)
|
||||
pgss_update_entry(entry, /* entry */
|
||||
bucketid, /* bucketid */
|
||||
queryid, /* queryid */
|
||||
query, /* query */
|
||||
comments, /* comments */
|
||||
plan_info, /* PlanInfo */
|
||||
cmd_type, /* CmdType */
|
||||
sys_info, /* SysInfo */
|
||||
error_info, /* ErrorInfo */
|
||||
total_time, /* total_time */
|
||||
rows, /* rows */
|
||||
bufusage, /* bufusage */
|
||||
walusage, /* walusage */
|
||||
reset, /* reset */
|
||||
kind); /* kind */
|
||||
|
||||
LWLockRelease(pgss->lock);
|
||||
if (out_of_memory)
|
||||
elog(DEBUG1, "pg_stat_monitor: out of memory");
|
||||
}
|
||||
/*
|
||||
* Reset all statement statistics.
|
||||
|
@ -1594,8 +1570,12 @@ pg_stat_monitor_reset(PG_FUNCTION_ARGS)
|
|||
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
|
||||
errmsg("pg_stat_monitor: must be loaded via shared_preload_libraries")));
|
||||
LWLockAcquire(pgss->lock, LW_EXCLUSIVE);
|
||||
hash_entry_dealloc(-1, -1);
|
||||
hash_query_entryies_reset();
|
||||
hash_entry_dealloc(-1, -1, NULL);
|
||||
/* Reset query buffers. */
|
||||
for (size_t i = 0; i < MAX_BUCKETS; ++i)
|
||||
{
|
||||
*(uint64 *)pgss_qbuf[i] = 0;
|
||||
}
|
||||
#ifdef BENCHMARK
|
||||
for (int i = STATS_START; i < STATS_END; ++i) {
|
||||
pg_hook_stats[i].min_time = 0;
|
||||
|
@ -1646,7 +1626,6 @@ pg_stat_monitor_internal(FunctionCallInfo fcinfo,
|
|||
MemoryContext oldcontext;
|
||||
HASH_SEQ_STATUS hash_seq;
|
||||
pgssEntry *entry;
|
||||
pgssQueryEntry *query_entry;
|
||||
char parentid_txt[32];
|
||||
pgssSharedState *pgss = pgsm_get_ss();
|
||||
HTAB *pgss_hash = pgsm_get_hash();
|
||||
|
@ -1706,19 +1685,14 @@ pg_stat_monitor_internal(FunctionCallInfo fcinfo,
|
|||
uint64 userid = entry->key.userid;
|
||||
uint64 ip = entry->key.ip;
|
||||
uint64 planid = entry->key.planid;
|
||||
uint64 appid = entry->key.appid;
|
||||
unsigned char *buf = pgss_qbuf[bucketid];
|
||||
#if PG_VERSION_NUM < 140000
|
||||
bool is_allowed_role = is_member_of_role(GetUserId(), DEFAULT_ROLE_READ_ALL_STATS);
|
||||
#else
|
||||
bool is_allowed_role = is_member_of_role(GetUserId(), ROLE_PG_READ_ALL_STATS);
|
||||
#endif
|
||||
query_entry = hash_find_query_entry(bucketid, queryid, dbid, userid, ip, appid);
|
||||
if (query_entry == NULL)
|
||||
continue;
|
||||
|
||||
|
||||
if (read_query(buf, bucketid, queryid, query_txt) == 0)
|
||||
if (read_query(buf, queryid, query_txt, entry->query_pos) == 0)
|
||||
{
|
||||
int len;
|
||||
len = read_query_buffer(bucketid, queryid, query_txt);
|
||||
|
@ -1738,16 +1712,16 @@ pg_stat_monitor_internal(FunctionCallInfo fcinfo,
|
|||
if (tmp.state == PGSS_FINISHED)
|
||||
continue;
|
||||
}
|
||||
if (tmp.info.parentid != UINT64CONST(0))
|
||||
{
|
||||
int len = 0;
|
||||
if (read_query(buf, bucketid, tmp.info.parentid, parent_query_txt) == 0)
|
||||
{
|
||||
len = read_query_buffer(bucketid, tmp.info.parentid, parent_query_txt);
|
||||
if (len != MAX_QUERY_BUFFER_BUCKET)
|
||||
snprintf(parent_query_txt, 32, "%s", "<insufficient disk/shared space>");
|
||||
}
|
||||
}
|
||||
if (tmp.info.parentid != UINT64CONST(0))
|
||||
{
|
||||
int len = 0;
|
||||
if (read_query(buf, tmp.info.parentid, parent_query_txt, 0) == 0)
|
||||
{
|
||||
len = read_query_buffer(bucketid, tmp.info.parentid, parent_query_txt);
|
||||
if (len != MAX_QUERY_BUFFER_BUCKET)
|
||||
snprintf(parent_query_txt, 32, "%s", "<insufficient disk/shared space>");
|
||||
}
|
||||
}
|
||||
/* bucketid at column number 0 */
|
||||
values[i++] = Int64GetDatumFast(bucketid);
|
||||
|
||||
|
@ -2057,8 +2031,7 @@ get_next_wbucket(pgssSharedState *pgss)
|
|||
prev_bucket_id = pg_atomic_exchange_u64(&pgss->current_wbucket, new_bucket_id);
|
||||
|
||||
LWLockAcquire(pgss->lock, LW_EXCLUSIVE);
|
||||
hash_entry_dealloc(new_bucket_id, prev_bucket_id);
|
||||
hash_query_entry_dealloc(new_bucket_id, prev_bucket_id, pgss_qbuf);
|
||||
hash_entry_dealloc(new_bucket_id, prev_bucket_id, pgss_qbuf);
|
||||
|
||||
snprintf(file_name, 1024, "%s.%d", PGSM_TEXT_FILE, (int)new_bucket_id);
|
||||
unlink(file_name);
|
||||
|
@ -3014,7 +2987,7 @@ intarray_get_datum(int32 arr[], int len)
|
|||
}
|
||||
|
||||
uint64
|
||||
read_query(unsigned char *buf, uint64 bucketid, uint64 queryid, char * query)
|
||||
read_query(unsigned char *buf, uint64 queryid, char * query, size_t pos)
|
||||
{
|
||||
bool found = false;
|
||||
uint64 query_id = 0;
|
||||
|
@ -3026,6 +2999,27 @@ read_query(unsigned char *buf, uint64 bucketid, uint64 queryid, char * query)
|
|||
if (buf_len <= 0)
|
||||
goto exit;
|
||||
|
||||
/* If a position hint is given, try to locate the query directly. */
|
||||
if (pos != 0 && (pos + sizeof(uint64) + sizeof(uint64)) < buf_len)
|
||||
{
|
||||
memcpy(&query_id, &buf[pos], sizeof(uint64));
|
||||
if (query_id != queryid)
|
||||
return 0;
|
||||
|
||||
pos += sizeof(uint64);
|
||||
|
||||
memcpy(&query_len, &buf[pos], sizeof(uint64)); /* query len */
|
||||
pos += sizeof(uint64);
|
||||
|
||||
if (pos + query_len > buf_len) /* avoid reading past buffer's length. */
|
||||
return 0;
|
||||
|
||||
memcpy(query, &buf[pos], query_len); /* Actual query */
|
||||
query[query_len] = '\0';
|
||||
|
||||
return queryid;
|
||||
}
|
||||
|
||||
rlen = sizeof (uint64); /* Move forwad to skip length bytes */
|
||||
for(;;)
|
||||
{
|
||||
|
@ -3035,6 +3029,7 @@ read_query(unsigned char *buf, uint64 bucketid, uint64 queryid, char * query)
|
|||
memcpy(&query_id, &buf[rlen], sizeof (uint64)); /* query id */
|
||||
if (query_id == queryid)
|
||||
found = true;
|
||||
|
||||
rlen += sizeof (uint64);
|
||||
if (buf_len <= rlen)
|
||||
continue;
|
||||
|
@ -3063,44 +3058,13 @@ exit:
|
|||
return 0;
|
||||
}
|
||||
|
||||
static pgssQueryEntry*
|
||||
pgss_store_query_info(uint64 bucketid,
|
||||
uint64 queryid,
|
||||
uint64 dbid,
|
||||
uint64 userid,
|
||||
uint64 ip,
|
||||
uint64 appid,
|
||||
const char *query,
|
||||
uint64 query_len,
|
||||
pgssStoreKind kind)
|
||||
{
|
||||
pgssSharedState *pgss = pgsm_get_ss();
|
||||
unsigned char *buf = pgss_qbuf[pg_atomic_read_u64(&pgss->current_wbucket)];
|
||||
pgssQueryEntry *entry;
|
||||
|
||||
if (query_len > PGSM_QUERY_MAX_LEN)
|
||||
query_len = PGSM_QUERY_MAX_LEN;
|
||||
|
||||
/* Already have query in the shared buffer, there
|
||||
* is no need to add that again.
|
||||
*/
|
||||
entry = hash_find_query_entry(bucketid, queryid, dbid, userid, ip, appid);
|
||||
if (entry)
|
||||
return entry;
|
||||
|
||||
entry = hash_create_query_entry(bucketid, queryid, dbid, userid, ip, appid);
|
||||
if (!entry)
|
||||
return NULL;
|
||||
entry->state = kind;
|
||||
|
||||
if(!SaveQueryText(bucketid, queryid, buf, query, query_len))
|
||||
return NULL;
|
||||
|
||||
return entry;
|
||||
}
|
||||
|
||||
bool
|
||||
SaveQueryText(uint64 bucketid, uint64 queryid, unsigned char *buf, const char *query, uint64 query_len)
|
||||
SaveQueryText(uint64 bucketid,
|
||||
uint64 queryid,
|
||||
unsigned char *buf,
|
||||
const char *query,
|
||||
uint64 query_len,
|
||||
size_t *query_pos)
|
||||
{
|
||||
uint64 buf_len = 0;
|
||||
|
||||
|
@ -3125,6 +3089,8 @@ SaveQueryText(uint64 bucketid, uint64 queryid, unsigned char *buf, const char *q
|
|||
}
|
||||
}
|
||||
|
||||
*query_pos = buf_len;
|
||||
|
||||
memcpy(&buf[buf_len], &queryid, sizeof (uint64)); /* query id */
|
||||
buf_len += sizeof (uint64);
|
||||
|
||||
|
@ -3359,7 +3325,7 @@ read_query_buffer(int bucket_id, uint64 queryid, char *query_txt)
|
|||
break;
|
||||
}
|
||||
off += buf_len;
|
||||
if (read_query(buf, bucket_id, queryid, query_txt))
|
||||
if (read_query(buf, queryid, query_txt, 0))
|
||||
break;
|
||||
}
|
||||
if (fd > 0)
|
||||
|
|
|
@ -294,6 +294,7 @@ typedef struct pgssEntry
|
|||
Counters counters; /* the statistics for this query */
|
||||
int encoding; /* query text encoding */
|
||||
slock_t mutex; /* protects the counters only */
|
||||
size_t query_pos; /* query location within query buffer */
|
||||
} pgssEntry;
|
||||
|
||||
/*
|
||||
|
@ -361,7 +362,12 @@ typedef struct JumbleState
|
|||
|
||||
/* Links to shared memory state */
|
||||
|
||||
bool SaveQueryText(uint64 bucketid, uint64 queryid, unsigned char *buf, const char *query, uint64 query_len);
|
||||
bool SaveQueryText(uint64 bucketid,
|
||||
uint64 queryid,
|
||||
unsigned char *buf,
|
||||
const char *query,
|
||||
uint64 query_len,
|
||||
size_t *query_pos);
|
||||
|
||||
/* guc.c */
|
||||
void init_guc(void);
|
||||
|
@ -380,12 +386,12 @@ void hash_entry_reset(void);
|
|||
void hash_query_entryies_reset(void);
|
||||
void hash_query_entries();
|
||||
void hash_query_entry_dealloc(int new_bucket_id, int old_bucket_id, unsigned char *query_buffer[]);
|
||||
bool hash_entry_dealloc(int new_bucket_id, int old_bucket_id);
|
||||
void hash_entry_dealloc(int new_bucket_id, int old_bucket_id, unsigned char *query_buffer[]);
|
||||
pgssEntry* hash_entry_alloc(pgssSharedState *pgss, pgssHashKey *key, int encoding);
|
||||
Size hash_memsize(void);
|
||||
|
||||
int read_query_buffer(int bucket_id, uint64 queryid, char *query_txt);
|
||||
uint64 read_query(unsigned char *buf, uint64 bucketid, uint64 queryid, char * query);
|
||||
uint64 read_query(unsigned char *buf, uint64 queryid, char * query, size_t pos);
|
||||
pgssQueryEntry* hash_find_query_entry(uint64 bucket_id, uint64 queryid, uint64 dbid, uint64 userid, uint64 ip, uint64 appid);
|
||||
pgssQueryEntry* hash_create_query_entry(uint64 bucket_id, uint64 queryid, uint64 dbid, uint64 userid, uint64 ip, uint64 appid);
|
||||
void pgss_startup(void);
|
||||
|
|
Loading…
Reference in New Issue