diff --git a/hash_query.c b/hash_query.c index df39b9b..078fa47 100644 --- a/hash_query.c +++ b/hash_query.c @@ -25,6 +25,15 @@ static HTAB *pgss_hash; static HTAB *pgss_query_hash; static HTAB* hash_init(const char *hash_name, int key_size, int entry_size, int hash_size); +/* + * Copy all queries from query_buffer[old_bucket_id] to query_buffer[new_bucket_id] + * whose query ids are found in the array 'query_ids', of length 'n_queries'. + */ +static void copy_queries(unsigned char *query_buffer[], + uint64 new_bucket_id, + uint64 old_bucket_id, + uint64 *query_ids, + size_t n_queries); static HTAB* hash_init(const char *hash_name, int key_size, int entry_size, int hash_size) @@ -178,47 +187,107 @@ hash_query_entryies_reset() /* - * Deallocate finished entries. + * Deallocate finished entries in new_bucket_id. + * + * Move all pending queries in query_buffer[old_bucket_id] to + * query_buffer[new_bucket_id]. * * Caller must hold an exclusive lock on pgss->lock. */ void -hash_query_entry_dealloc(int bucket, unsigned char *buf) +hash_query_entry_dealloc(int new_bucket_id, int old_bucket_id, unsigned char *query_buffer[]) { HASH_SEQ_STATUS hash_seq; pgssQueryEntry *entry; - unsigned char *old_buf; pgssSharedState *pgss = pgsm_get_ss(); + /* + * Store pending query ids from the previous bucket. + * If there are more pending queries than MAX_PENDING_QUERIES then + * we try to dynamically allocate memory for them. + */ +#define MAX_PENDING_QUERIES 128 + uint64 pending_query_ids[MAX_PENDING_QUERIES]; + uint64 *pending_query_ids_buf = NULL; + size_t n_pending_queries = 0; + bool out_of_memory = false; - old_buf = palloc0(pgss->query_buf_size_bucket); - memcpy(old_buf, buf, pgss->query_buf_size_bucket); - - memset(buf, 0, pgss->query_buf_size_bucket); + /* Clear all queries in the query buffer for the new bucket. */ + memset(query_buffer[new_bucket_id], 0, pgss->query_buf_size_bucket); hash_seq_init(&hash_seq, pgss_query_hash); while ((entry = hash_seq_search(&hash_seq)) != NULL) { - if (entry->key.bucket_id == bucket) + /* Remove previous finished query entries matching new bucket id. */ + if (entry->key.bucket_id == new_bucket_id) { if (entry->state == PGSS_FINISHED || entry->state == PGSS_ERROR) { entry = hash_search(pgss_query_hash, &entry->key, HASH_REMOVE, NULL); } + } + /* Set up a list of pending query ids from the previous bucket. */ + else if (entry->key.bucket_id == old_bucket_id && + (entry->state == PGSS_PARSE || + entry->state == PGSS_PLAN || + entry->state == PGSS_EXEC)) + { + if (n_pending_queries < MAX_PENDING_QUERIES) + { + pending_query_ids[n_pending_queries] = entry->key.queryid; + ++n_pending_queries; + } else { - int len; - char query_txt[1024]; - if (read_query(old_buf, entry->key.bucket_id, entry->key.queryid, query_txt) == 0) + /* + * No. of pending queries exceeds MAX_PENDING_QUERIES. + * Try to allocate memory from heap to keep track of pending query ids. + * If allocation fails we manually copy pending query to the next query buffer. + */ + if (!out_of_memory && !pending_query_ids_buf) { - len = read_query_buffer(entry->key.bucket_id, entry->key.queryid, query_txt); - if (len != MAX_QUERY_BUFFER_BUCKET) - snprintf(query_txt, 32, "%s", ""); + /* Allocate enough room for query ids. */ + pending_query_ids_buf = malloc(sizeof(uint64) * hash_get_num_entries(pgss_query_hash)); + if (pending_query_ids_buf != NULL) + memcpy(pending_query_ids_buf, pending_query_ids, n_pending_queries * sizeof(uint64)); + else + out_of_memory = true; + } + + if (!out_of_memory) + { + /* Store pending query id in the dynamic buffer. */ + pending_query_ids_buf[n_pending_queries] = entry->key.queryid; + ++n_pending_queries; + } + else + { + /* No memory, manually copy query from previous buffer. */ + char query_txt[1024]; + + if (read_query(query_buffer[old_bucket_id], old_bucket_id, entry->key.queryid, query_txt) != 0 + || read_query_buffer(old_bucket_id, entry->key.queryid, query_txt) == MAX_QUERY_BUFFER_BUCKET) + { + SaveQueryText(new_bucket_id, entry->key.queryid, query_buffer[new_bucket_id], query_txt, strlen(query_txt)); + } + else + /* There was no space available to store the pending query text. */ + elog(WARNING, "hash_query_entry_dealloc: Failed to move pending query %lX, %s", + entry->key.queryid, + (PGSM_OVERFLOW_TARGET == OVERFLOW_TARGET_NONE) ? + "insufficient shared space for query" : + "I/O error reading query from disk"); } - SaveQueryText(entry->key.bucket_id, entry->key.queryid, buf, query_txt, strlen(query_txt)); } } } - pfree(old_buf); + + /* Copy all detected pending queries from previous bucket id to the new one. */ + if (n_pending_queries > 0) { + if (n_pending_queries < MAX_PENDING_QUERIES) + pending_query_ids_buf = pending_query_ids; + + copy_queries(query_buffer, new_bucket_id, old_bucket_id, pending_query_ids_buf, n_pending_queries); + } } /* @@ -274,7 +343,7 @@ hash_entry_dealloc(int new_bucket_id, int old_bucket_id) /* Update key to use the new bucket id. */ bkp_entry->key.bucket_id = new_bucket_id; - /* Add the entry to a list of noded to be processed later. */ + /* Add the entry to a list of nodes to be processed later. */ pending_entries = lappend(pending_entries, bkp_entry); /* Finally remove the pending query from the expired bucket id. */ @@ -378,3 +447,52 @@ IsHashInitialize(void) pgss_hash != NULL); } +static void copy_queries(unsigned char *query_buffer[], + uint64 new_bucket_id, + uint64 old_bucket_id, + uint64 *query_ids, + size_t n_queries) +{ + bool found; + uint64 query_id = 0; + uint64 query_len = 0; + uint64 rlen = 0; + uint64 buf_len = 0; + unsigned char *src_buffer = query_buffer[old_bucket_id]; + size_t i; + + memcpy(&buf_len, src_buffer, sizeof (uint64)); + if (buf_len <= 0) + return; + + rlen = sizeof (uint64); /* Move forwad to skip length bytes */ + while (rlen < buf_len) + { + found = false; + memcpy(&query_id, &src_buffer[rlen], sizeof (uint64)); /* query id */ + for (i = 0; i < n_queries; ++i) + { + if (query_id == query_ids[i]) + { + found = true; + break; + } + } + + rlen += sizeof (uint64); + if (buf_len <= rlen) + break; + + memcpy(&query_len, &src_buffer[rlen], sizeof (uint64)); /* query len */ + rlen += sizeof (uint64); + if (buf_len < rlen + query_len) + break; + + if (found) { + SaveQueryText(new_bucket_id, query_id, query_buffer[new_bucket_id], + (const char *)&src_buffer[rlen], query_len); + } + + rlen += query_len; + } +} \ No newline at end of file diff --git a/pg_stat_monitor.c b/pg_stat_monitor.c index 52b9760..f3c8eff 100644 --- a/pg_stat_monitor.c +++ b/pg_stat_monitor.c @@ -1545,6 +1545,7 @@ pgss_store(uint64 queryid, out_of_memory = true; break; } + query_entry->state = kind; entry = pgss_get_entry(bucketid, userid, dbid, queryid, ip, planid, appid); if (entry == NULL) { @@ -2030,7 +2031,6 @@ get_next_wbucket(pgssSharedState *pgss) if (update_bucket) { - unsigned char *buf; char file_name[1024]; int sec = 0; @@ -2040,9 +2040,8 @@ get_next_wbucket(pgssSharedState *pgss) prev_bucket_id = pg_atomic_exchange_u64(&pgss->current_wbucket, new_bucket_id); LWLockAcquire(pgss->lock, LW_EXCLUSIVE); - buf = pgss_qbuf[new_bucket_id]; hash_entry_dealloc(new_bucket_id, prev_bucket_id); - hash_query_entry_dealloc(new_bucket_id, buf); + hash_query_entry_dealloc(new_bucket_id, prev_bucket_id, pgss_qbuf); snprintf(file_name, 1024, "%s.%d", PGSM_TEXT_FILE, (int)new_bucket_id); unlink(file_name); diff --git a/pg_stat_monitor.h b/pg_stat_monitor.h index 85d5261..94a0f83 100644 --- a/pg_stat_monitor.h +++ b/pg_stat_monitor.h @@ -379,7 +379,7 @@ HTAB *pgsm_get_plan_hash(void); void hash_entry_reset(void); void hash_query_entryies_reset(void); void hash_query_entries(); -void hash_query_entry_dealloc(int bucket, unsigned char *buf); +void hash_query_entry_dealloc(int new_bucket_id, int old_bucket_id, unsigned char *query_buffer[]); bool hash_entry_dealloc(int new_bucket_id, int old_bucket_id); pgssEntry* hash_entry_alloc(pgssSharedState *pgss, pgssHashKey *key, int encoding); Size hash_memsize(void);