diff --git a/hash_query.c b/hash_query.c index bb684d8..29d0cc8 100644 --- a/hash_query.c +++ b/hash_query.c @@ -25,14 +25,16 @@ static HTAB *pgss_hash; static HTAB* hash_init(const char *hash_name, int key_size, int entry_size, int hash_size); /* - * Copy all queries from query_buffer[old_bucket_id] to query_buffer[new_bucket_id] - * whose query ids are found in the array 'query_ids', of length 'n_queries'. + * Copy query from src_buffer to dst_buff. + * Use query_id and query_pos to fast locate query in source buffer. + * Store updated query position in the destination buffer into param query_pos. */ -static void copy_queries(unsigned char *query_buffer[], - uint64 new_bucket_id, - uint64 old_bucket_id, - uint64 *query_ids, - size_t n_queries); +static bool copy_query(uint64 bucket_id, + uint64 query_id, + uint64 query_pos, + unsigned char *dst_buf, + unsigned char *src_buf, + size_t *new_query_pos); static HTAB* hash_init(const char *hash_name, int key_size, int entry_size, int hash_size) @@ -186,29 +188,9 @@ hash_entry_dealloc(int new_bucket_id, int old_bucket_id, unsigned char *query_bu pgssEntry *entry = NULL; pgssSharedState *pgss = pgsm_get_ss(); -#define MAX_PENDING_QUERIES 128 - /* - * Variables used to store pending queries from the previous bucket. - * - * We use a linked list to keep a full copy of entries from the hash table - * that must be moved to the new bucket. - * - * We use an array to keep a list of pending query IDs only, the array will - * be used in copy_queries() as a filter of which queries to copy. - * The reason we use a separate array to keep pending queries IDs is that it - * is faster to iterate than the linked list, as following pointers in a list - * almost always make bad use of cpu cache, while a small array of uint64 is - * a good candidate to be stored in L1 cache. - * - * If there are more pending queries than MAX_PENDING_QUERIES then - * we try to dynamically allocate memory for them. - */ + /* Store pending query ids from the previous bucket. */ List *pending_entries = NIL; ListCell *pending_entry; - uint64 pending_query_ids[MAX_PENDING_QUERIES]; - uint64 *pending_query_ids_buf = NULL; - size_t n_pending_queries = 0; - bool out_of_memory = false; if (new_bucket_id != -1) { @@ -222,7 +204,7 @@ hash_entry_dealloc(int new_bucket_id, int old_bucket_id, unsigned char *query_bu { /* * Remove all entries if new_bucket_id == -1. - * Otherwise remove entry in new_bucket_id if it's finished already. + * Otherwise remove entry in new_bucket_id if it has finished already. */ if (new_bucket_id < 0 || (entry->key.bucket_id == new_bucket_id && @@ -262,63 +244,12 @@ hash_entry_dealloc(int new_bucket_id, int old_bucket_id, unsigned char *query_bu /* Add the entry to a list of nodes to be processed later. */ pending_entries = lappend(pending_entries, bkp_entry); - /* Add pending query ID to the array. */ - if (n_pending_queries < MAX_PENDING_QUERIES) - { - pending_query_ids[n_pending_queries] = entry->key.queryid; - ++n_pending_queries; - } - else - { - /* - * No. of pending queries exceeds MAX_PENDING_QUERIES. - * Try to dynamically allocate memory to keep track of pending query ids. - * If allocation fails we manually copy pending query to the next query buffer. - */ - if (!out_of_memory && !pending_query_ids_buf) - { - /* Allocate enough room for query ids. */ - pending_query_ids_buf = malloc(sizeof(uint64) * hash_get_num_entries(pgss_hash)); - if (pending_query_ids_buf != NULL) - memcpy(pending_query_ids_buf, pending_query_ids, n_pending_queries * sizeof(uint64)); - else - out_of_memory = true; - } - - if (!out_of_memory) - { - /* Store pending query id in the dynamic buffer. */ - pending_query_ids_buf[n_pending_queries] = entry->key.queryid; - ++n_pending_queries; - } - else - { - /* No memory, manually copy query from previous buffer. */ - char query_txt[1024]; - - if (read_query(query_buffer[old_bucket_id], old_bucket_id, entry->key.queryid, query_txt) != 0 - || read_query_buffer(old_bucket_id, entry->key.queryid, query_txt) == MAX_QUERY_BUFFER_BUCKET) - { - SaveQueryText(new_bucket_id, entry->key.queryid, query_buffer[new_bucket_id], query_txt, strlen(query_txt)); - } - else - /* There was no space available to store the pending query text. */ - elog(ERROR, "hash_entry_dealloc: Failed to move pending query %lX, %s", - entry->key.queryid, - (PGSM_OVERFLOW_TARGET == OVERFLOW_TARGET_NONE) ? - "insufficient shared space for query" : - "I/O error reading query from disk"); - } - } - /* Finally remove the pending query from the expired bucket id. */ entry = hash_search(pgss_hash, &entry->key, HASH_REMOVE, NULL); } } } - Assert(list_length(pending_entries) == n_pending_queries); - /* * Iterate over the list of pending queries in order * to add them back to the hash table with the updated bucket id. @@ -337,19 +268,18 @@ hash_entry_dealloc(int new_bucket_id, int old_bucket_id, unsigned char *query_bu new_entry->counters = old_entry->counters; SpinLockInit(&new_entry->mutex); new_entry->encoding = old_entry->encoding; + /* copy query's text from previous bucket to the new one. */ + copy_query(new_bucket_id, + new_entry->key.queryid, /* query id */ + old_entry->query_pos, /* query position in buffer */ + query_buffer[new_bucket_id], /* destination query buffer */ + query_buffer[old_bucket_id], /* source query buffer */ + &new_entry->query_pos); /* position in which query was inserted into destination buffer */ } free(old_entry); } - /* Copy all detected pending queries from previous bucket id to the new one. */ - if (n_pending_queries > 0) { - if (n_pending_queries < MAX_PENDING_QUERIES) - pending_query_ids_buf = pending_query_ids; - - copy_queries(query_buffer, new_bucket_id, old_bucket_id, pending_query_ids_buf, n_pending_queries); - } - list_free(pending_entries); } @@ -381,52 +311,38 @@ IsHashInitialize(void) pgss_hash != NULL); } -static void copy_queries(unsigned char *query_buffer[], - uint64 new_bucket_id, - uint64 old_bucket_id, - uint64 *query_ids, - size_t n_queries) +static bool copy_query(uint64 bucket_id, + uint64 query_id, + uint64 query_pos, + unsigned char *dst_buf, + unsigned char *src_buf, + size_t *new_query_pos) { - bool found; - uint64 query_id = 0; - uint64 query_len = 0; - uint64 rlen = 0; - uint64 buf_len = 0; - unsigned char *src_buffer = query_buffer[old_bucket_id]; - size_t i; + uint64 query_len = 0; + uint64 buf_len = 0; - memcpy(&buf_len, src_buffer, sizeof (uint64)); + memcpy(&buf_len, src_buf, sizeof (uint64)); if (buf_len <= 0) - return; + return false; - rlen = sizeof (uint64); /* Move forwad to skip length bytes */ - while (rlen < buf_len) + /* Try to locate the query directly. */ + if (query_pos != 0 && (query_pos + sizeof(uint64) + sizeof(uint64)) < buf_len) { - found = false; - memcpy(&query_id, &src_buffer[rlen], sizeof (uint64)); /* query id */ - for (i = 0; i < n_queries; ++i) - { - if (query_id == query_ids[i]) - { - found = true; - break; - } - } + if (*(uint64 *)&src_buf[query_pos] != query_id) + return false; - rlen += sizeof (uint64); - if (buf_len <= rlen) - break; + query_pos += sizeof(uint64); - memcpy(&query_len, &src_buffer[rlen], sizeof (uint64)); /* query len */ - rlen += sizeof (uint64); - if (buf_len < rlen + query_len) - break; + memcpy(&query_len, &src_buf[query_pos], sizeof(uint64)); /* query len */ + query_pos += sizeof(uint64); - if (found) { - SaveQueryText(new_bucket_id, query_id, query_buffer[new_bucket_id], - (const char *)&src_buffer[rlen], query_len); - } + if (query_pos + query_len > buf_len) /* avoid reading past buffer's length. */ + return false; - rlen += query_len; + return SaveQueryText(bucket_id, query_id, dst_buf, + (const char *)&src_buf[query_pos], + query_len, new_query_pos); } -} \ No newline at end of file + + return false; +} diff --git a/pg_stat_monitor.c b/pg_stat_monitor.c index 32334b8..ee63ba7 100644 --- a/pg_stat_monitor.c +++ b/pg_stat_monitor.c @@ -1501,6 +1501,8 @@ pgss_store(uint64 queryid, if (!entry) { uint64 prev_qbuf_len; + /* position in which the query's text was inserted into the query buffer. */ + size_t qpos = 0; query_len = strlen(query); if (query_len > PGSM_QUERY_MAX_LEN) @@ -1516,7 +1518,7 @@ pgss_store(uint64 queryid, * original length. */ memcpy(&prev_qbuf_len, pgss_qbuf[bucketid], sizeof(prev_qbuf_len)); - if (!SaveQueryText(bucketid, queryid, pgss_qbuf[bucketid], query, query_len)) + if (!SaveQueryText(bucketid, queryid, pgss_qbuf[bucketid], query, query_len, &qpos)) { LWLockRelease(pgss->lock); elog(DEBUG1, "pg_stat_monitor: insufficient shared space for query."); @@ -1533,6 +1535,7 @@ pgss_store(uint64 queryid, elog(DEBUG1, "pg_stat_monitor: out of memory"); return; } + entry->query_pos = qpos; } if (jstate == NULL) @@ -1689,7 +1692,7 @@ pg_stat_monitor_internal(FunctionCallInfo fcinfo, bool is_allowed_role = is_member_of_role(GetUserId(), ROLE_PG_READ_ALL_STATS); #endif - if (read_query(buf, bucketid, queryid, query_txt) == 0) + if (read_query(buf, queryid, query_txt, entry->query_pos) == 0) { int len; len = read_query_buffer(bucketid, queryid, query_txt); @@ -1709,16 +1712,16 @@ pg_stat_monitor_internal(FunctionCallInfo fcinfo, if (tmp.state == PGSS_FINISHED) continue; } - if (tmp.info.parentid != UINT64CONST(0)) - { - int len = 0; - if (read_query(buf, bucketid, tmp.info.parentid, parent_query_txt) == 0) - { - len = read_query_buffer(bucketid, tmp.info.parentid, parent_query_txt); - if (len != MAX_QUERY_BUFFER_BUCKET) - snprintf(parent_query_txt, 32, "%s", ""); - } - } + if (tmp.info.parentid != UINT64CONST(0)) + { + int len = 0; + if (read_query(buf, tmp.info.parentid, parent_query_txt, 0) == 0) + { + len = read_query_buffer(bucketid, tmp.info.parentid, parent_query_txt); + if (len != MAX_QUERY_BUFFER_BUCKET) + snprintf(parent_query_txt, 32, "%s", ""); + } + } /* bucketid at column number 0 */ values[i++] = Int64GetDatumFast(bucketid); @@ -2984,7 +2987,7 @@ intarray_get_datum(int32 arr[], int len) } uint64 -read_query(unsigned char *buf, uint64 bucketid, uint64 queryid, char * query) +read_query(unsigned char *buf, uint64 queryid, char * query, size_t pos) { bool found = false; uint64 query_id = 0; @@ -2996,6 +2999,27 @@ read_query(unsigned char *buf, uint64 bucketid, uint64 queryid, char * query) if (buf_len <= 0) goto exit; + /* If a position hint is given, try to locate the query directly. */ + if (pos != 0 && (pos + sizeof(uint64) + sizeof(uint64)) < buf_len) + { + memcpy(&query_id, &buf[pos], sizeof(uint64)); + if (query_id != queryid) + return 0; + + pos += sizeof(uint64); + + memcpy(&query_len, &buf[pos], sizeof(uint64)); /* query len */ + pos += sizeof(uint64); + + if (pos + query_len > buf_len) /* avoid reading past buffer's length. */ + return 0; + + memcpy(query, &buf[pos], query_len); /* Actual query */ + query[query_len] = '\0'; + + return queryid; + } + rlen = sizeof (uint64); /* Move forwad to skip length bytes */ for(;;) { @@ -3005,6 +3029,7 @@ read_query(unsigned char *buf, uint64 bucketid, uint64 queryid, char * query) memcpy(&query_id, &buf[rlen], sizeof (uint64)); /* query id */ if (query_id == queryid) found = true; + rlen += sizeof (uint64); if (buf_len <= rlen) continue; @@ -3034,7 +3059,12 @@ exit: } bool -SaveQueryText(uint64 bucketid, uint64 queryid, unsigned char *buf, const char *query, uint64 query_len) +SaveQueryText(uint64 bucketid, + uint64 queryid, + unsigned char *buf, + const char *query, + uint64 query_len, + size_t *query_pos) { uint64 buf_len = 0; @@ -3059,6 +3089,8 @@ SaveQueryText(uint64 bucketid, uint64 queryid, unsigned char *buf, const char *q } } + *query_pos = buf_len; + memcpy(&buf[buf_len], &queryid, sizeof (uint64)); /* query id */ buf_len += sizeof (uint64); @@ -3293,7 +3325,7 @@ read_query_buffer(int bucket_id, uint64 queryid, char *query_txt) break; } off += buf_len; - if (read_query(buf, bucket_id, queryid, query_txt)) + if (read_query(buf, queryid, query_txt, 0)) break; } if (fd > 0) diff --git a/pg_stat_monitor.h b/pg_stat_monitor.h index 2eab42b..2ddfe01 100644 --- a/pg_stat_monitor.h +++ b/pg_stat_monitor.h @@ -294,6 +294,7 @@ typedef struct pgssEntry Counters counters; /* the statistics for this query */ int encoding; /* query text encoding */ slock_t mutex; /* protects the counters only */ + size_t query_pos; /* query location within query buffer */ } pgssEntry; /* @@ -361,7 +362,12 @@ typedef struct JumbleState /* Links to shared memory state */ -bool SaveQueryText(uint64 bucketid, uint64 queryid, unsigned char *buf, const char *query, uint64 query_len); +bool SaveQueryText(uint64 bucketid, + uint64 queryid, + unsigned char *buf, + const char *query, + uint64 query_len, + size_t *query_pos); /* guc.c */ void init_guc(void); @@ -385,7 +391,7 @@ pgssEntry* hash_entry_alloc(pgssSharedState *pgss, pgssHashKey *key, int encodin Size hash_memsize(void); int read_query_buffer(int bucket_id, uint64 queryid, char *query_txt); -uint64 read_query(unsigned char *buf, uint64 bucketid, uint64 queryid, char * query); +uint64 read_query(unsigned char *buf, uint64 queryid, char * query, size_t pos); pgssQueryEntry* hash_find_query_entry(uint64 bucket_id, uint64 queryid, uint64 dbid, uint64 userid, uint64 ip, uint64 appid); pgssQueryEntry* hash_create_query_entry(uint64 bucket_id, uint64 queryid, uint64 dbid, uint64 userid, uint64 ip, uint64 appid); void pgss_startup(void);