PG-254: Add query location to hash table entries.

Whenever a new query is added to a query buffer, record the position
in which the query was inserted, we can then use this information to
locate the query in a faster way later on when required.

This allowed to simplify the logic in hash_entry_dealloc(), after
creating the list of pending queries, as the list is scanned we can copy
the query from the previous query buffer to the new one by using the
query position (query_pos), this avoids scanning the whole query buffer
when looking up for the queryid.

Also, when moving a query to a new buffer (copy_query), we update
the query_pos for the hash table entry, so it points to the right place
in the new query buffer.

read_query() function was updated to allow query position to be passed
as argument, if pos != 0 use it to locate the query directly, otherwise
fallback to the previous mode of scanning the whole buffer.

SaveQueryText() was updated to pass a reference to the query position as
argument, this value is updated after the function finishes with the
position that the query was stored into the buffer.
pull/115/head
Diego Fronza 2021-10-07 10:06:20 -03:00
parent fcb70ffed1
commit 8fdf0946fe
3 changed files with 98 additions and 144 deletions

View File

@ -25,14 +25,16 @@ static HTAB *pgss_hash;
static HTAB* hash_init(const char *hash_name, int key_size, int entry_size, int hash_size); static HTAB* hash_init(const char *hash_name, int key_size, int entry_size, int hash_size);
/* /*
* Copy all queries from query_buffer[old_bucket_id] to query_buffer[new_bucket_id] * Copy query from src_buffer to dst_buff.
* whose query ids are found in the array 'query_ids', of length 'n_queries'. * Use query_id and query_pos to fast locate query in source buffer.
* Store updated query position in the destination buffer into param query_pos.
*/ */
static void copy_queries(unsigned char *query_buffer[], static bool copy_query(uint64 bucket_id,
uint64 new_bucket_id, uint64 query_id,
uint64 old_bucket_id, uint64 query_pos,
uint64 *query_ids, unsigned char *dst_buf,
size_t n_queries); unsigned char *src_buf,
size_t *new_query_pos);
static HTAB* static HTAB*
hash_init(const char *hash_name, int key_size, int entry_size, int hash_size) hash_init(const char *hash_name, int key_size, int entry_size, int hash_size)
@ -186,29 +188,9 @@ hash_entry_dealloc(int new_bucket_id, int old_bucket_id, unsigned char *query_bu
pgssEntry *entry = NULL; pgssEntry *entry = NULL;
pgssSharedState *pgss = pgsm_get_ss(); pgssSharedState *pgss = pgsm_get_ss();
#define MAX_PENDING_QUERIES 128 /* Store pending query ids from the previous bucket. */
/*
* Variables used to store pending queries from the previous bucket.
*
* We use a linked list to keep a full copy of entries from the hash table
* that must be moved to the new bucket.
*
* We use an array to keep a list of pending query IDs only, the array will
* be used in copy_queries() as a filter of which queries to copy.
* The reason we use a separate array to keep pending queries IDs is that it
* is faster to iterate than the linked list, as following pointers in a list
* almost always make bad use of cpu cache, while a small array of uint64 is
* a good candidate to be stored in L1 cache.
*
* If there are more pending queries than MAX_PENDING_QUERIES then
* we try to dynamically allocate memory for them.
*/
List *pending_entries = NIL; List *pending_entries = NIL;
ListCell *pending_entry; ListCell *pending_entry;
uint64 pending_query_ids[MAX_PENDING_QUERIES];
uint64 *pending_query_ids_buf = NULL;
size_t n_pending_queries = 0;
bool out_of_memory = false;
if (new_bucket_id != -1) if (new_bucket_id != -1)
{ {
@ -222,7 +204,7 @@ hash_entry_dealloc(int new_bucket_id, int old_bucket_id, unsigned char *query_bu
{ {
/* /*
* Remove all entries if new_bucket_id == -1. * Remove all entries if new_bucket_id == -1.
* Otherwise remove entry in new_bucket_id if it's finished already. * Otherwise remove entry in new_bucket_id if it has finished already.
*/ */
if (new_bucket_id < 0 || if (new_bucket_id < 0 ||
(entry->key.bucket_id == new_bucket_id && (entry->key.bucket_id == new_bucket_id &&
@ -262,63 +244,12 @@ hash_entry_dealloc(int new_bucket_id, int old_bucket_id, unsigned char *query_bu
/* Add the entry to a list of nodes to be processed later. */ /* Add the entry to a list of nodes to be processed later. */
pending_entries = lappend(pending_entries, bkp_entry); pending_entries = lappend(pending_entries, bkp_entry);
/* Add pending query ID to the array. */
if (n_pending_queries < MAX_PENDING_QUERIES)
{
pending_query_ids[n_pending_queries] = entry->key.queryid;
++n_pending_queries;
}
else
{
/*
* No. of pending queries exceeds MAX_PENDING_QUERIES.
* Try to dynamically allocate memory to keep track of pending query ids.
* If allocation fails we manually copy pending query to the next query buffer.
*/
if (!out_of_memory && !pending_query_ids_buf)
{
/* Allocate enough room for query ids. */
pending_query_ids_buf = malloc(sizeof(uint64) * hash_get_num_entries(pgss_hash));
if (pending_query_ids_buf != NULL)
memcpy(pending_query_ids_buf, pending_query_ids, n_pending_queries * sizeof(uint64));
else
out_of_memory = true;
}
if (!out_of_memory)
{
/* Store pending query id in the dynamic buffer. */
pending_query_ids_buf[n_pending_queries] = entry->key.queryid;
++n_pending_queries;
}
else
{
/* No memory, manually copy query from previous buffer. */
char query_txt[1024];
if (read_query(query_buffer[old_bucket_id], old_bucket_id, entry->key.queryid, query_txt) != 0
|| read_query_buffer(old_bucket_id, entry->key.queryid, query_txt) == MAX_QUERY_BUFFER_BUCKET)
{
SaveQueryText(new_bucket_id, entry->key.queryid, query_buffer[new_bucket_id], query_txt, strlen(query_txt));
}
else
/* There was no space available to store the pending query text. */
elog(ERROR, "hash_entry_dealloc: Failed to move pending query %lX, %s",
entry->key.queryid,
(PGSM_OVERFLOW_TARGET == OVERFLOW_TARGET_NONE) ?
"insufficient shared space for query" :
"I/O error reading query from disk");
}
}
/* Finally remove the pending query from the expired bucket id. */ /* Finally remove the pending query from the expired bucket id. */
entry = hash_search(pgss_hash, &entry->key, HASH_REMOVE, NULL); entry = hash_search(pgss_hash, &entry->key, HASH_REMOVE, NULL);
} }
} }
} }
Assert(list_length(pending_entries) == n_pending_queries);
/* /*
* Iterate over the list of pending queries in order * Iterate over the list of pending queries in order
* to add them back to the hash table with the updated bucket id. * to add them back to the hash table with the updated bucket id.
@ -337,19 +268,18 @@ hash_entry_dealloc(int new_bucket_id, int old_bucket_id, unsigned char *query_bu
new_entry->counters = old_entry->counters; new_entry->counters = old_entry->counters;
SpinLockInit(&new_entry->mutex); SpinLockInit(&new_entry->mutex);
new_entry->encoding = old_entry->encoding; new_entry->encoding = old_entry->encoding;
/* copy query's text from previous bucket to the new one. */
copy_query(new_bucket_id,
new_entry->key.queryid, /* query id */
old_entry->query_pos, /* query position in buffer */
query_buffer[new_bucket_id], /* destination query buffer */
query_buffer[old_bucket_id], /* source query buffer */
&new_entry->query_pos); /* position in which query was inserted into destination buffer */
} }
free(old_entry); free(old_entry);
} }
/* Copy all detected pending queries from previous bucket id to the new one. */
if (n_pending_queries > 0) {
if (n_pending_queries < MAX_PENDING_QUERIES)
pending_query_ids_buf = pending_query_ids;
copy_queries(query_buffer, new_bucket_id, old_bucket_id, pending_query_ids_buf, n_pending_queries);
}
list_free(pending_entries); list_free(pending_entries);
} }
@ -381,52 +311,38 @@ IsHashInitialize(void)
pgss_hash != NULL); pgss_hash != NULL);
} }
static void copy_queries(unsigned char *query_buffer[], static bool copy_query(uint64 bucket_id,
uint64 new_bucket_id, uint64 query_id,
uint64 old_bucket_id, uint64 query_pos,
uint64 *query_ids, unsigned char *dst_buf,
size_t n_queries) unsigned char *src_buf,
size_t *new_query_pos)
{ {
bool found; uint64 query_len = 0;
uint64 query_id = 0; uint64 buf_len = 0;
uint64 query_len = 0;
uint64 rlen = 0;
uint64 buf_len = 0;
unsigned char *src_buffer = query_buffer[old_bucket_id];
size_t i;
memcpy(&buf_len, src_buffer, sizeof (uint64)); memcpy(&buf_len, src_buf, sizeof (uint64));
if (buf_len <= 0) if (buf_len <= 0)
return; return false;
rlen = sizeof (uint64); /* Move forwad to skip length bytes */ /* Try to locate the query directly. */
while (rlen < buf_len) if (query_pos != 0 && (query_pos + sizeof(uint64) + sizeof(uint64)) < buf_len)
{ {
found = false; if (*(uint64 *)&src_buf[query_pos] != query_id)
memcpy(&query_id, &src_buffer[rlen], sizeof (uint64)); /* query id */ return false;
for (i = 0; i < n_queries; ++i)
{
if (query_id == query_ids[i])
{
found = true;
break;
}
}
rlen += sizeof (uint64); query_pos += sizeof(uint64);
if (buf_len <= rlen)
break;
memcpy(&query_len, &src_buffer[rlen], sizeof (uint64)); /* query len */ memcpy(&query_len, &src_buf[query_pos], sizeof(uint64)); /* query len */
rlen += sizeof (uint64); query_pos += sizeof(uint64);
if (buf_len < rlen + query_len)
break;
if (found) { if (query_pos + query_len > buf_len) /* avoid reading past buffer's length. */
SaveQueryText(new_bucket_id, query_id, query_buffer[new_bucket_id], return false;
(const char *)&src_buffer[rlen], query_len);
}
rlen += query_len; return SaveQueryText(bucket_id, query_id, dst_buf,
(const char *)&src_buf[query_pos],
query_len, new_query_pos);
} }
}
return false;
}

View File

@ -1501,6 +1501,8 @@ pgss_store(uint64 queryid,
if (!entry) if (!entry)
{ {
uint64 prev_qbuf_len; uint64 prev_qbuf_len;
/* position in which the query's text was inserted into the query buffer. */
size_t qpos = 0;
query_len = strlen(query); query_len = strlen(query);
if (query_len > PGSM_QUERY_MAX_LEN) if (query_len > PGSM_QUERY_MAX_LEN)
@ -1516,7 +1518,7 @@ pgss_store(uint64 queryid,
* original length. * original length.
*/ */
memcpy(&prev_qbuf_len, pgss_qbuf[bucketid], sizeof(prev_qbuf_len)); memcpy(&prev_qbuf_len, pgss_qbuf[bucketid], sizeof(prev_qbuf_len));
if (!SaveQueryText(bucketid, queryid, pgss_qbuf[bucketid], query, query_len)) if (!SaveQueryText(bucketid, queryid, pgss_qbuf[bucketid], query, query_len, &qpos))
{ {
LWLockRelease(pgss->lock); LWLockRelease(pgss->lock);
elog(DEBUG1, "pg_stat_monitor: insufficient shared space for query."); elog(DEBUG1, "pg_stat_monitor: insufficient shared space for query.");
@ -1533,6 +1535,7 @@ pgss_store(uint64 queryid,
elog(DEBUG1, "pg_stat_monitor: out of memory"); elog(DEBUG1, "pg_stat_monitor: out of memory");
return; return;
} }
entry->query_pos = qpos;
} }
if (jstate == NULL) if (jstate == NULL)
@ -1689,7 +1692,7 @@ pg_stat_monitor_internal(FunctionCallInfo fcinfo,
bool is_allowed_role = is_member_of_role(GetUserId(), ROLE_PG_READ_ALL_STATS); bool is_allowed_role = is_member_of_role(GetUserId(), ROLE_PG_READ_ALL_STATS);
#endif #endif
if (read_query(buf, bucketid, queryid, query_txt) == 0) if (read_query(buf, queryid, query_txt, entry->query_pos) == 0)
{ {
int len; int len;
len = read_query_buffer(bucketid, queryid, query_txt); len = read_query_buffer(bucketid, queryid, query_txt);
@ -1709,16 +1712,16 @@ pg_stat_monitor_internal(FunctionCallInfo fcinfo,
if (tmp.state == PGSS_FINISHED) if (tmp.state == PGSS_FINISHED)
continue; continue;
} }
if (tmp.info.parentid != UINT64CONST(0)) if (tmp.info.parentid != UINT64CONST(0))
{ {
int len = 0; int len = 0;
if (read_query(buf, bucketid, tmp.info.parentid, parent_query_txt) == 0) if (read_query(buf, tmp.info.parentid, parent_query_txt, 0) == 0)
{ {
len = read_query_buffer(bucketid, tmp.info.parentid, parent_query_txt); len = read_query_buffer(bucketid, tmp.info.parentid, parent_query_txt);
if (len != MAX_QUERY_BUFFER_BUCKET) if (len != MAX_QUERY_BUFFER_BUCKET)
snprintf(parent_query_txt, 32, "%s", "<insufficient disk/shared space>"); snprintf(parent_query_txt, 32, "%s", "<insufficient disk/shared space>");
} }
} }
/* bucketid at column number 0 */ /* bucketid at column number 0 */
values[i++] = Int64GetDatumFast(bucketid); values[i++] = Int64GetDatumFast(bucketid);
@ -2984,7 +2987,7 @@ intarray_get_datum(int32 arr[], int len)
} }
uint64 uint64
read_query(unsigned char *buf, uint64 bucketid, uint64 queryid, char * query) read_query(unsigned char *buf, uint64 queryid, char * query, size_t pos)
{ {
bool found = false; bool found = false;
uint64 query_id = 0; uint64 query_id = 0;
@ -2996,6 +2999,27 @@ read_query(unsigned char *buf, uint64 bucketid, uint64 queryid, char * query)
if (buf_len <= 0) if (buf_len <= 0)
goto exit; goto exit;
/* If a position hint is given, try to locate the query directly. */
if (pos != 0 && (pos + sizeof(uint64) + sizeof(uint64)) < buf_len)
{
memcpy(&query_id, &buf[pos], sizeof(uint64));
if (query_id != queryid)
return 0;
pos += sizeof(uint64);
memcpy(&query_len, &buf[pos], sizeof(uint64)); /* query len */
pos += sizeof(uint64);
if (pos + query_len > buf_len) /* avoid reading past buffer's length. */
return 0;
memcpy(query, &buf[pos], query_len); /* Actual query */
query[query_len] = '\0';
return queryid;
}
rlen = sizeof (uint64); /* Move forwad to skip length bytes */ rlen = sizeof (uint64); /* Move forwad to skip length bytes */
for(;;) for(;;)
{ {
@ -3005,6 +3029,7 @@ read_query(unsigned char *buf, uint64 bucketid, uint64 queryid, char * query)
memcpy(&query_id, &buf[rlen], sizeof (uint64)); /* query id */ memcpy(&query_id, &buf[rlen], sizeof (uint64)); /* query id */
if (query_id == queryid) if (query_id == queryid)
found = true; found = true;
rlen += sizeof (uint64); rlen += sizeof (uint64);
if (buf_len <= rlen) if (buf_len <= rlen)
continue; continue;
@ -3034,7 +3059,12 @@ exit:
} }
bool bool
SaveQueryText(uint64 bucketid, uint64 queryid, unsigned char *buf, const char *query, uint64 query_len) SaveQueryText(uint64 bucketid,
uint64 queryid,
unsigned char *buf,
const char *query,
uint64 query_len,
size_t *query_pos)
{ {
uint64 buf_len = 0; uint64 buf_len = 0;
@ -3059,6 +3089,8 @@ SaveQueryText(uint64 bucketid, uint64 queryid, unsigned char *buf, const char *q
} }
} }
*query_pos = buf_len;
memcpy(&buf[buf_len], &queryid, sizeof (uint64)); /* query id */ memcpy(&buf[buf_len], &queryid, sizeof (uint64)); /* query id */
buf_len += sizeof (uint64); buf_len += sizeof (uint64);
@ -3293,7 +3325,7 @@ read_query_buffer(int bucket_id, uint64 queryid, char *query_txt)
break; break;
} }
off += buf_len; off += buf_len;
if (read_query(buf, bucket_id, queryid, query_txt)) if (read_query(buf, queryid, query_txt, 0))
break; break;
} }
if (fd > 0) if (fd > 0)

View File

@ -294,6 +294,7 @@ typedef struct pgssEntry
Counters counters; /* the statistics for this query */ Counters counters; /* the statistics for this query */
int encoding; /* query text encoding */ int encoding; /* query text encoding */
slock_t mutex; /* protects the counters only */ slock_t mutex; /* protects the counters only */
size_t query_pos; /* query location within query buffer */
} pgssEntry; } pgssEntry;
/* /*
@ -361,7 +362,12 @@ typedef struct JumbleState
/* Links to shared memory state */ /* Links to shared memory state */
bool SaveQueryText(uint64 bucketid, uint64 queryid, unsigned char *buf, const char *query, uint64 query_len); bool SaveQueryText(uint64 bucketid,
uint64 queryid,
unsigned char *buf,
const char *query,
uint64 query_len,
size_t *query_pos);
/* guc.c */ /* guc.c */
void init_guc(void); void init_guc(void);
@ -385,7 +391,7 @@ pgssEntry* hash_entry_alloc(pgssSharedState *pgss, pgssHashKey *key, int encodin
Size hash_memsize(void); Size hash_memsize(void);
int read_query_buffer(int bucket_id, uint64 queryid, char *query_txt); int read_query_buffer(int bucket_id, uint64 queryid, char *query_txt);
uint64 read_query(unsigned char *buf, uint64 bucketid, uint64 queryid, char * query); uint64 read_query(unsigned char *buf, uint64 queryid, char * query, size_t pos);
pgssQueryEntry* hash_find_query_entry(uint64 bucket_id, uint64 queryid, uint64 dbid, uint64 userid, uint64 ip, uint64 appid); pgssQueryEntry* hash_find_query_entry(uint64 bucket_id, uint64 queryid, uint64 dbid, uint64 userid, uint64 ip, uint64 appid);
pgssQueryEntry* hash_create_query_entry(uint64 bucket_id, uint64 queryid, uint64 dbid, uint64 userid, uint64 ip, uint64 appid); pgssQueryEntry* hash_create_query_entry(uint64 bucket_id, uint64 queryid, uint64 dbid, uint64 userid, uint64 ip, uint64 appid);
void pgss_startup(void); void pgss_startup(void);