From fc9e63049740042f9a8473f3caf22caeb9c08aed Mon Sep 17 00:00:00 2001 From: Diego Fronza Date: Wed, 6 Oct 2021 14:44:57 -0300 Subject: [PATCH 1/3] PG-254: Postpone variable initialization. There were many variables being initialized in pgss_store() before checking if the module was actually active, this would waste cpu processor if the module is disabled. To fix that, declare variables and initialize them only after check that pg_stat_monitor is active. --- pg_stat_monitor.c | 46 ++++++++++++++++++++++++++-------------------- 1 file changed, 26 insertions(+), 20 deletions(-) diff --git a/pg_stat_monitor.c b/pg_stat_monitor.c index d6b6ddc..f739ff3 100644 --- a/pg_stat_monitor.c +++ b/pg_stat_monitor.c @@ -1486,37 +1486,44 @@ pgss_store(uint64 queryid, JumbleState *jstate, pgssStoreKind kind) { - pgssEntry *entry; + pgssEntry *entry; pgssSharedState *pgss = pgsm_get_ss(); - char application_name[APPLICATIONNAME_LEN]; - int application_name_len = pg_get_application_name(application_name); - bool reset = false; - uint64 bucketid; - uint64 prev_bucket_id; + char application_name[APPLICATIONNAME_LEN]; + int application_name_len; + bool reset = false; + uint64 bucketid; + uint64 prev_bucket_id; uint64 userid; int con; - uint64 dbid = MyDatabaseId; - uint64 ip = pg_get_client_addr(); - uint64 planid = plan_info ? plan_info->planid: 0; - uint64 appid = djb2_hash((unsigned char *)application_name, application_name_len); + uint64 dbid; + uint64 ip; + uint64 planid; + uint64 appid; char comments[512] = ""; - bool out_of_memory = false; + bool out_of_memory = false; + /* Monitoring is disabled */ if (!PGSM_ENABLED) return; - Assert(query != NULL); - if (kind == PGSS_ERROR) - GetUserIdAndSecContext((unsigned int *)&userid, &con); - else - userid = GetUserId(); - - extract_query_comments(query, comments, sizeof(comments)); - /* Safety check... */ if (!IsSystemInitialized() || !pgss_qbuf[pg_atomic_read_u64(&pgss->current_wbucket)]) return; + Assert(query != NULL); + if (kind == PGSS_ERROR) + GetUserIdAndSecContext((unsigned int *)&userid, &con); + else + userid = GetUserId(); + + dbid = MyDatabaseId; + application_name_len = pg_get_application_name(application_name); + ip = pg_get_client_addr(); + planid = plan_info ? plan_info->planid: 0; + appid = djb2_hash((unsigned char *)application_name, application_name_len); + + extract_query_comments(query, comments, sizeof(comments)); + prev_bucket_id = pg_atomic_read_u64(&pgss->current_wbucket); bucketid = get_next_wbucket(pgss); @@ -1717,7 +1724,6 @@ pg_stat_monitor_internal(FunctionCallInfo fcinfo, if (query_entry == NULL) continue; - if (read_query(buf, bucketid, queryid, query_txt) == 0) { int len; From fcb70ffed1dea0921f1eb64c123afe6a1b5d61db Mon Sep 17 00:00:00 2001 From: Diego Fronza Date: Wed, 6 Oct 2021 14:57:15 -0300 Subject: [PATCH 2/3] PG-254: Removal of pgss_query_hash hash table. There was no necessity for using a separate hash table to keep track of queries as all the necessary information is already available in the pgss_hash table. The code that moves pending queries' text in hash_query_entry_dealloc was merged into hash_entry_dealloc. Couple functions were not necessary anymore, thus were removed: - hash_create_query_entry - pgss_store_query_info - pgss_get_entry (this logic was added directly into pgss_store). We simplified the logic in pgss_store, it basically works as follows: 1. Create a key (bucketid, queryid, etc...) for the query event. 2. Lookup the key in pgss_hash, if no entry exists for the key, create a new one, save the query text into the current query buffer. 3. If jstate == NULL, then update stats counters for the entry. --- hash_query.c | 278 ++++++++++++++++++---------------------------- pg_stat_monitor.c | 216 ++++++++++++----------------------- pg_stat_monitor.h | 2 +- 3 files changed, 179 insertions(+), 317 deletions(-) diff --git a/hash_query.c b/hash_query.c index 078fa47..bb684d8 100644 --- a/hash_query.c +++ b/hash_query.c @@ -22,7 +22,6 @@ static pgssSharedState *pgss; static HTAB *pgss_hash; -static HTAB *pgss_query_hash; static HTAB* hash_init(const char *hash_name, int key_size, int entry_size, int hash_size); /* @@ -55,7 +54,6 @@ pgss_startup(void) pgss = NULL; pgss_hash = NULL; - pgss_query_hash = NULL; /* * Create or attach to the shared memory state, including hash table @@ -85,7 +83,6 @@ pgss_startup(void) } pgss_hash = hash_init("pg_stat_monitor: bucket hashtable", sizeof(pgssHashKey), sizeof(pgssEntry), MAX_BUCKET_ENTRIES); - pgss_query_hash = hash_init("pg_stat_monitor: query hashtable", sizeof(pgssQueryHashKey), sizeof(pgssQueryEntry),MAX_BUCKET_ENTRIES); LWLockRelease(AddinShmemInitLock); @@ -169,146 +166,64 @@ hash_entry_alloc(pgssSharedState *pgss, pgssHashKey *key, int encoding) elog(DEBUG1, "%s", "pg_stat_monitor: out of memory"); return entry; } + /* - * Reset all the entries. + * Prepare resources for using the new bucket: + * - Deallocate finished hash table entries in new_bucket_id (entries whose + * state is PGSS_FINISHED or PGSS_FINISHED). + * - Clear query buffer for new_bucket_id. + * - If old_bucket_id != -1, move all pending hash table entries in + * old_bucket_id to the new bucket id, also move pending queries from the + * previous query buffer (query_buffer[old_bucket_id]) to the new one + * (query_buffer[new_bucket_id]). * * Caller must hold an exclusive lock on pgss->lock. */ void -hash_query_entryies_reset() -{ - HASH_SEQ_STATUS hash_seq; - pgssQueryEntry *entry; - - hash_seq_init(&hash_seq, pgss_query_hash); - while ((entry = hash_seq_search(&hash_seq)) != NULL) - entry = hash_search(pgss_query_hash, &entry->key, HASH_REMOVE, NULL); -} - - -/* - * Deallocate finished entries in new_bucket_id. - * - * Move all pending queries in query_buffer[old_bucket_id] to - * query_buffer[new_bucket_id]. - * - * Caller must hold an exclusive lock on pgss->lock. - */ -void -hash_query_entry_dealloc(int new_bucket_id, int old_bucket_id, unsigned char *query_buffer[]) -{ - HASH_SEQ_STATUS hash_seq; - pgssQueryEntry *entry; - pgssSharedState *pgss = pgsm_get_ss(); - /* - * Store pending query ids from the previous bucket. - * If there are more pending queries than MAX_PENDING_QUERIES then - * we try to dynamically allocate memory for them. - */ -#define MAX_PENDING_QUERIES 128 - uint64 pending_query_ids[MAX_PENDING_QUERIES]; - uint64 *pending_query_ids_buf = NULL; - size_t n_pending_queries = 0; - bool out_of_memory = false; - - /* Clear all queries in the query buffer for the new bucket. */ - memset(query_buffer[new_bucket_id], 0, pgss->query_buf_size_bucket); - - hash_seq_init(&hash_seq, pgss_query_hash); - while ((entry = hash_seq_search(&hash_seq)) != NULL) - { - /* Remove previous finished query entries matching new bucket id. */ - if (entry->key.bucket_id == new_bucket_id) - { - if (entry->state == PGSS_FINISHED || entry->state == PGSS_ERROR) - { - entry = hash_search(pgss_query_hash, &entry->key, HASH_REMOVE, NULL); - } - } - /* Set up a list of pending query ids from the previous bucket. */ - else if (entry->key.bucket_id == old_bucket_id && - (entry->state == PGSS_PARSE || - entry->state == PGSS_PLAN || - entry->state == PGSS_EXEC)) - { - if (n_pending_queries < MAX_PENDING_QUERIES) - { - pending_query_ids[n_pending_queries] = entry->key.queryid; - ++n_pending_queries; - } - else - { - /* - * No. of pending queries exceeds MAX_PENDING_QUERIES. - * Try to allocate memory from heap to keep track of pending query ids. - * If allocation fails we manually copy pending query to the next query buffer. - */ - if (!out_of_memory && !pending_query_ids_buf) - { - /* Allocate enough room for query ids. */ - pending_query_ids_buf = malloc(sizeof(uint64) * hash_get_num_entries(pgss_query_hash)); - if (pending_query_ids_buf != NULL) - memcpy(pending_query_ids_buf, pending_query_ids, n_pending_queries * sizeof(uint64)); - else - out_of_memory = true; - } - - if (!out_of_memory) - { - /* Store pending query id in the dynamic buffer. */ - pending_query_ids_buf[n_pending_queries] = entry->key.queryid; - ++n_pending_queries; - } - else - { - /* No memory, manually copy query from previous buffer. */ - char query_txt[1024]; - - if (read_query(query_buffer[old_bucket_id], old_bucket_id, entry->key.queryid, query_txt) != 0 - || read_query_buffer(old_bucket_id, entry->key.queryid, query_txt) == MAX_QUERY_BUFFER_BUCKET) - { - SaveQueryText(new_bucket_id, entry->key.queryid, query_buffer[new_bucket_id], query_txt, strlen(query_txt)); - } - else - /* There was no space available to store the pending query text. */ - elog(WARNING, "hash_query_entry_dealloc: Failed to move pending query %lX, %s", - entry->key.queryid, - (PGSM_OVERFLOW_TARGET == OVERFLOW_TARGET_NONE) ? - "insufficient shared space for query" : - "I/O error reading query from disk"); - } - } - } - } - - /* Copy all detected pending queries from previous bucket id to the new one. */ - if (n_pending_queries > 0) { - if (n_pending_queries < MAX_PENDING_QUERIES) - pending_query_ids_buf = pending_query_ids; - - copy_queries(query_buffer, new_bucket_id, old_bucket_id, pending_query_ids_buf, n_pending_queries); - } -} - -/* - * Deallocate least-used entries. - * - * If old_bucket_id != -1, move all pending queries in old_bucket_id - * to the new bucket id. - * - * Caller must hold an exclusive lock on pgss->lock. - */ -bool -hash_entry_dealloc(int new_bucket_id, int old_bucket_id) +hash_entry_dealloc(int new_bucket_id, int old_bucket_id, unsigned char *query_buffer[]) { HASH_SEQ_STATUS hash_seq; pgssEntry *entry = NULL; - List *pending_entries = NIL; - ListCell *pending_entry; + pgssSharedState *pgss = pgsm_get_ss(); +#define MAX_PENDING_QUERIES 128 + /* + * Variables used to store pending queries from the previous bucket. + * + * We use a linked list to keep a full copy of entries from the hash table + * that must be moved to the new bucket. + * + * We use an array to keep a list of pending query IDs only, the array will + * be used in copy_queries() as a filter of which queries to copy. + * The reason we use a separate array to keep pending queries IDs is that it + * is faster to iterate than the linked list, as following pointers in a list + * almost always make bad use of cpu cache, while a small array of uint64 is + * a good candidate to be stored in L1 cache. + * + * If there are more pending queries than MAX_PENDING_QUERIES then + * we try to dynamically allocate memory for them. + */ + List *pending_entries = NIL; + ListCell *pending_entry; + uint64 pending_query_ids[MAX_PENDING_QUERIES]; + uint64 *pending_query_ids_buf = NULL; + size_t n_pending_queries = 0; + bool out_of_memory = false; + + if (new_bucket_id != -1) + { + /* Clear all queries in the query buffer for the new bucket. */ + memset(query_buffer[new_bucket_id], 0, pgss->query_buf_size_bucket); + } + + /* Iterate over the hash table. */ hash_seq_init(&hash_seq, pgss_hash); while ((entry = hash_seq_search(&hash_seq)) != NULL) { + /* + * Remove all entries if new_bucket_id == -1. + * Otherwise remove entry in new_bucket_id if it's finished already. + */ if (new_bucket_id < 0 || (entry->key.bucket_id == new_bucket_id && (entry->counters.state == PGSS_FINISHED || entry->counters.state == PGSS_ERROR))) @@ -333,6 +248,7 @@ hash_entry_dealloc(int new_bucket_id, int old_bucket_id) if (!bkp_entry) { /* No memory, remove pending query entry from the previous bucket. */ + elog(ERROR, "hash_entry_dealloc: out of memory"); entry = hash_search(pgss_hash, &entry->key, HASH_REMOVE, NULL); continue; } @@ -346,12 +262,63 @@ hash_entry_dealloc(int new_bucket_id, int old_bucket_id) /* Add the entry to a list of nodes to be processed later. */ pending_entries = lappend(pending_entries, bkp_entry); + /* Add pending query ID to the array. */ + if (n_pending_queries < MAX_PENDING_QUERIES) + { + pending_query_ids[n_pending_queries] = entry->key.queryid; + ++n_pending_queries; + } + else + { + /* + * No. of pending queries exceeds MAX_PENDING_QUERIES. + * Try to dynamically allocate memory to keep track of pending query ids. + * If allocation fails we manually copy pending query to the next query buffer. + */ + if (!out_of_memory && !pending_query_ids_buf) + { + /* Allocate enough room for query ids. */ + pending_query_ids_buf = malloc(sizeof(uint64) * hash_get_num_entries(pgss_hash)); + if (pending_query_ids_buf != NULL) + memcpy(pending_query_ids_buf, pending_query_ids, n_pending_queries * sizeof(uint64)); + else + out_of_memory = true; + } + + if (!out_of_memory) + { + /* Store pending query id in the dynamic buffer. */ + pending_query_ids_buf[n_pending_queries] = entry->key.queryid; + ++n_pending_queries; + } + else + { + /* No memory, manually copy query from previous buffer. */ + char query_txt[1024]; + + if (read_query(query_buffer[old_bucket_id], old_bucket_id, entry->key.queryid, query_txt) != 0 + || read_query_buffer(old_bucket_id, entry->key.queryid, query_txt) == MAX_QUERY_BUFFER_BUCKET) + { + SaveQueryText(new_bucket_id, entry->key.queryid, query_buffer[new_bucket_id], query_txt, strlen(query_txt)); + } + else + /* There was no space available to store the pending query text. */ + elog(ERROR, "hash_entry_dealloc: Failed to move pending query %lX, %s", + entry->key.queryid, + (PGSM_OVERFLOW_TARGET == OVERFLOW_TARGET_NONE) ? + "insufficient shared space for query" : + "I/O error reading query from disk"); + } + } + /* Finally remove the pending query from the expired bucket id. */ entry = hash_search(pgss_hash, &entry->key, HASH_REMOVE, NULL); } } } + Assert(list_length(pending_entries) == n_pending_queries); + /* * Iterate over the list of pending queries in order * to add them back to the hash table with the updated bucket id. @@ -375,9 +342,15 @@ hash_entry_dealloc(int new_bucket_id, int old_bucket_id) free(old_entry); } - list_free(pending_entries); + /* Copy all detected pending queries from previous bucket id to the new one. */ + if (n_pending_queries > 0) { + if (n_pending_queries < MAX_PENDING_QUERIES) + pending_query_ids_buf = pending_query_ids; - return true; + copy_queries(query_buffer, new_bucket_id, old_bucket_id, pending_query_ids_buf, n_pending_queries); + } + + list_free(pending_entries); } /* @@ -401,45 +374,6 @@ hash_entry_reset() LWLockRelease(pgss->lock); } -/* Caller must acquire a lock */ -pgssQueryEntry* -hash_create_query_entry(uint64 bucket_id, uint64 queryid, uint64 dbid, uint64 userid, uint64 ip, uint64 appid) -{ - pgssQueryHashKey key; - pgssQueryEntry *entry; - bool found; - - key.queryid = queryid; - key.bucket_id = bucket_id; - key.dbid = dbid; - key.userid = userid; - key.ip = ip; - key.appid = appid; - - entry = (pgssQueryEntry *) hash_search(pgss_query_hash, &key, HASH_ENTER_NULL, &found); - return entry; -} - -/* Caller must acquire a lock */ -pgssQueryEntry* -hash_find_query_entry(uint64 bucket_id, uint64 queryid, uint64 dbid, uint64 userid, uint64 ip, uint64 appid) -{ - pgssQueryHashKey key; - pgssQueryEntry *entry; - bool found; - - key.queryid = queryid; - key.bucket_id = bucket_id; - key.dbid = dbid; - key.userid = userid; - key.ip = ip; - key.appid = appid; - - /* Lookup the hash table entry with shared lock. */ - entry = (pgssQueryEntry *) hash_search(pgss_query_hash, &key, HASH_FIND, &found); - return entry; -} - bool IsHashInitialize(void) { diff --git a/pg_stat_monitor.c b/pg_stat_monitor.c index f739ff3..32334b8 100644 --- a/pg_stat_monitor.c +++ b/pg_stat_monitor.c @@ -147,15 +147,6 @@ static uint64 pgss_hash_string(const char *str, int len); char *unpack_sql_state(int sql_state); static void pgss_store_error(uint64 queryid, const char * query, ErrorData *edata); -static pgssQueryEntry *pgss_store_query_info(uint64 bucketid, - uint64 queryid, - uint64 dbid, - uint64 userid, - uint64 ip, - uint64 appid, - const char *query, - uint64 query_len, - pgssStoreKind kind); static void pgss_store_utility(const char *query, double total_time, @@ -1314,40 +1305,6 @@ pgss_update_entry(pgssEntry *entry, } } -static pgssEntry* -pgss_get_entry(uint64 bucket_id, - uint64 userid, - uint64 dbid, - uint64 queryid, - uint64 ip, - uint64 planid, - uint64 appid) -{ - pgssEntry *entry; - pgssHashKey key; - HTAB *pgss_hash = pgsm_get_hash(); - pgssSharedState *pgss = pgsm_get_ss(); - - key.bucket_id = bucket_id; - key.userid = userid; - key.dbid = MyDatabaseId; - key.queryid = queryid; - key.ip = pg_get_client_addr(); - key.planid = planid; - key.appid = appid; - - entry = (pgssEntry *) hash_search(pgss_hash, &key, HASH_FIND, NULL); - if(!entry) - { - /* OK to create a new hashtable entry */ - entry = hash_entry_alloc(pgss, &key, GetDatabaseEncoding()); - if (entry == NULL) - return NULL; - } - Assert(entry); - return entry; -} - static void pgss_store_query(uint64 queryid, const char * query, @@ -1486,6 +1443,8 @@ pgss_store(uint64 queryid, JumbleState *jstate, pgssStoreKind kind) { + HTAB *pgss_hash; + pgssHashKey key; pgssEntry *entry; pgssSharedState *pgss = pgsm_get_ss(); char application_name[APPLICATIONNAME_LEN]; @@ -1495,12 +1454,10 @@ pgss_store(uint64 queryid, uint64 prev_bucket_id; uint64 userid; int con; - uint64 dbid; - uint64 ip; uint64 planid; uint64 appid; char comments[512] = ""; - bool out_of_memory = false; + size_t query_len; /* Monitoring is disabled */ if (!PGSM_ENABLED) @@ -1516,9 +1473,7 @@ pgss_store(uint64 queryid, else userid = GetUserId(); - dbid = MyDatabaseId; application_name_len = pg_get_application_name(application_name); - ip = pg_get_client_addr(); planid = plan_info ? plan_info->planid: 0; appid = djb2_hash((unsigned char *)application_name, application_name_len); @@ -1530,63 +1485,74 @@ pgss_store(uint64 queryid, if (bucketid != prev_bucket_id) reset = true; - LWLockAcquire(pgss->lock, LW_EXCLUSIVE); + key.bucket_id = bucketid; + key.userid = userid; + key.dbid = MyDatabaseId; + key.queryid = queryid; + key.ip = pg_get_client_addr(); + key.planid = planid; + key.appid = appid; - switch (kind) + pgss_hash = pgsm_get_hash(); + + LWLockAcquire(pgss->lock, LW_SHARED); + + entry = (pgssEntry *) hash_search(pgss_hash, &key, HASH_FIND, NULL); + if (!entry) { - case PGSS_PARSE: - case PGSS_PLAN: - { - pgssQueryEntry *query_entry; - query_entry = pgss_store_query_info(bucketid, queryid, dbid, userid, ip, appid, query, strlen(query), kind); - if (query_entry == NULL) - out_of_memory = true; - break; - } - case PGSS_ERROR: - case PGSS_EXEC: - case PGSS_FINISHED: - { - pgssQueryEntry *query_entry; - query_entry = pgss_store_query_info(bucketid, queryid, dbid, userid, ip, appid, query, strlen(query), kind); - if (query_entry == NULL) - { - out_of_memory = true; - break; - } - query_entry->state = kind; - entry = pgss_get_entry(bucketid, userid, dbid, queryid, ip, planid, appid); - if (entry == NULL) - { - out_of_memory = true; - break; - } + uint64 prev_qbuf_len; - if (jstate == NULL) - pgss_update_entry(entry, /* entry */ - bucketid, /* bucketid */ - queryid, /* queryid */ - query, /* query */ - comments, /* comments */ - plan_info, /* PlanInfo */ - cmd_type, /* CmdType */ - sys_info, /* SysInfo */ - error_info, /* ErrorInfo */ - total_time, /* total_time */ - rows, /* rows */ - bufusage, /* bufusage */ - walusage, /* walusage */ - reset, /* reset */ - kind); /* kind */ + query_len = strlen(query); + if (query_len > PGSM_QUERY_MAX_LEN) + query_len = PGSM_QUERY_MAX_LEN; + + /* Need exclusive lock to make a new hashtable entry - promote */ + LWLockRelease(pgss->lock); + LWLockAcquire(pgss->lock, LW_EXCLUSIVE); + + /* + * Save current query buffer length, if we fail to add a new + * new entry to the hash table then we must restore the + * original length. + */ + memcpy(&prev_qbuf_len, pgss_qbuf[bucketid], sizeof(prev_qbuf_len)); + if (!SaveQueryText(bucketid, queryid, pgss_qbuf[bucketid], query, query_len)) + { + LWLockRelease(pgss->lock); + elog(DEBUG1, "pg_stat_monitor: insufficient shared space for query."); + return; + } + + /* OK to create a new hashtable entry */ + entry = hash_entry_alloc(pgss, &key, GetDatabaseEncoding()); + if (entry == NULL) + { + /* Restore previous query buffer length. */ + memcpy(pgss_qbuf[bucketid], &prev_qbuf_len, sizeof(prev_qbuf_len)); + LWLockRelease(pgss->lock); + elog(DEBUG1, "pg_stat_monitor: out of memory"); + return; } - break; - case PGSS_NUMKIND: - case PGSS_INVALID: - break; } + + if (jstate == NULL) + pgss_update_entry(entry, /* entry */ + bucketid, /* bucketid */ + queryid, /* queryid */ + query, /* query */ + comments, /* comments */ + plan_info, /* PlanInfo */ + cmd_type, /* CmdType */ + sys_info, /* SysInfo */ + error_info, /* ErrorInfo */ + total_time, /* total_time */ + rows, /* rows */ + bufusage, /* bufusage */ + walusage, /* walusage */ + reset, /* reset */ + kind); /* kind */ + LWLockRelease(pgss->lock); - if (out_of_memory) - elog(DEBUG1, "pg_stat_monitor: out of memory"); } /* * Reset all statement statistics. @@ -1601,8 +1567,12 @@ pg_stat_monitor_reset(PG_FUNCTION_ARGS) (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("pg_stat_monitor: must be loaded via shared_preload_libraries"))); LWLockAcquire(pgss->lock, LW_EXCLUSIVE); - hash_entry_dealloc(-1, -1); - hash_query_entryies_reset(); + hash_entry_dealloc(-1, -1, NULL); + /* Reset query buffers. */ + for (size_t i = 0; i < MAX_BUCKETS; ++i) + { + *(uint64 *)pgss_qbuf[i] = 0; + } #ifdef BENCHMARK for (int i = STATS_START; i < STATS_END; ++i) { pg_hook_stats[i].min_time = 0; @@ -1653,7 +1623,6 @@ pg_stat_monitor_internal(FunctionCallInfo fcinfo, MemoryContext oldcontext; HASH_SEQ_STATUS hash_seq; pgssEntry *entry; - pgssQueryEntry *query_entry; char parentid_txt[32]; pgssSharedState *pgss = pgsm_get_ss(); HTAB *pgss_hash = pgsm_get_hash(); @@ -1713,16 +1682,12 @@ pg_stat_monitor_internal(FunctionCallInfo fcinfo, uint64 userid = entry->key.userid; uint64 ip = entry->key.ip; uint64 planid = entry->key.planid; - uint64 appid = entry->key.appid; unsigned char *buf = pgss_qbuf[bucketid]; #if PG_VERSION_NUM < 140000 bool is_allowed_role = is_member_of_role(GetUserId(), DEFAULT_ROLE_READ_ALL_STATS); #else bool is_allowed_role = is_member_of_role(GetUserId(), ROLE_PG_READ_ALL_STATS); #endif - query_entry = hash_find_query_entry(bucketid, queryid, dbid, userid, ip, appid); - if (query_entry == NULL) - continue; if (read_query(buf, bucketid, queryid, query_txt) == 0) { @@ -2063,8 +2028,7 @@ get_next_wbucket(pgssSharedState *pgss) prev_bucket_id = pg_atomic_exchange_u64(&pgss->current_wbucket, new_bucket_id); LWLockAcquire(pgss->lock, LW_EXCLUSIVE); - hash_entry_dealloc(new_bucket_id, prev_bucket_id); - hash_query_entry_dealloc(new_bucket_id, prev_bucket_id, pgss_qbuf); + hash_entry_dealloc(new_bucket_id, prev_bucket_id, pgss_qbuf); snprintf(file_name, 1024, "%s.%d", PGSM_TEXT_FILE, (int)new_bucket_id); unlink(file_name); @@ -3069,42 +3033,6 @@ exit: return 0; } -static pgssQueryEntry* -pgss_store_query_info(uint64 bucketid, - uint64 queryid, - uint64 dbid, - uint64 userid, - uint64 ip, - uint64 appid, - const char *query, - uint64 query_len, - pgssStoreKind kind) -{ - pgssSharedState *pgss = pgsm_get_ss(); - unsigned char *buf = pgss_qbuf[pg_atomic_read_u64(&pgss->current_wbucket)]; - pgssQueryEntry *entry; - - if (query_len > PGSM_QUERY_MAX_LEN) - query_len = PGSM_QUERY_MAX_LEN; - - /* Already have query in the shared buffer, there - * is no need to add that again. - */ - entry = hash_find_query_entry(bucketid, queryid, dbid, userid, ip, appid); - if (entry) - return entry; - - entry = hash_create_query_entry(bucketid, queryid, dbid, userid, ip, appid); - if (!entry) - return NULL; - entry->state = kind; - - if(!SaveQueryText(bucketid, queryid, buf, query, query_len)) - return NULL; - - return entry; -} - bool SaveQueryText(uint64 bucketid, uint64 queryid, unsigned char *buf, const char *query, uint64 query_len) { diff --git a/pg_stat_monitor.h b/pg_stat_monitor.h index 94a0f83..2eab42b 100644 --- a/pg_stat_monitor.h +++ b/pg_stat_monitor.h @@ -380,7 +380,7 @@ void hash_entry_reset(void); void hash_query_entryies_reset(void); void hash_query_entries(); void hash_query_entry_dealloc(int new_bucket_id, int old_bucket_id, unsigned char *query_buffer[]); -bool hash_entry_dealloc(int new_bucket_id, int old_bucket_id); +void hash_entry_dealloc(int new_bucket_id, int old_bucket_id, unsigned char *query_buffer[]); pgssEntry* hash_entry_alloc(pgssSharedState *pgss, pgssHashKey *key, int encoding); Size hash_memsize(void); From 8fdf0946fefce265878b05a0c931f98cf2e10d94 Mon Sep 17 00:00:00 2001 From: Diego Fronza Date: Thu, 7 Oct 2021 10:06:20 -0300 Subject: [PATCH 3/3] PG-254: Add query location to hash table entries. Whenever a new query is added to a query buffer, record the position in which the query was inserted, we can then use this information to locate the query in a faster way later on when required. This allowed to simplify the logic in hash_entry_dealloc(), after creating the list of pending queries, as the list is scanned we can copy the query from the previous query buffer to the new one by using the query position (query_pos), this avoids scanning the whole query buffer when looking up for the queryid. Also, when moving a query to a new buffer (copy_query), we update the query_pos for the hash table entry, so it points to the right place in the new query buffer. read_query() function was updated to allow query position to be passed as argument, if pos != 0 use it to locate the query directly, otherwise fallback to the previous mode of scanning the whole buffer. SaveQueryText() was updated to pass a reference to the query position as argument, this value is updated after the function finishes with the position that the query was stored into the buffer. --- hash_query.c | 170 ++++++++++++---------------------------------- pg_stat_monitor.c | 62 +++++++++++++---- pg_stat_monitor.h | 10 ++- 3 files changed, 98 insertions(+), 144 deletions(-) diff --git a/hash_query.c b/hash_query.c index bb684d8..29d0cc8 100644 --- a/hash_query.c +++ b/hash_query.c @@ -25,14 +25,16 @@ static HTAB *pgss_hash; static HTAB* hash_init(const char *hash_name, int key_size, int entry_size, int hash_size); /* - * Copy all queries from query_buffer[old_bucket_id] to query_buffer[new_bucket_id] - * whose query ids are found in the array 'query_ids', of length 'n_queries'. + * Copy query from src_buffer to dst_buff. + * Use query_id and query_pos to fast locate query in source buffer. + * Store updated query position in the destination buffer into param query_pos. */ -static void copy_queries(unsigned char *query_buffer[], - uint64 new_bucket_id, - uint64 old_bucket_id, - uint64 *query_ids, - size_t n_queries); +static bool copy_query(uint64 bucket_id, + uint64 query_id, + uint64 query_pos, + unsigned char *dst_buf, + unsigned char *src_buf, + size_t *new_query_pos); static HTAB* hash_init(const char *hash_name, int key_size, int entry_size, int hash_size) @@ -186,29 +188,9 @@ hash_entry_dealloc(int new_bucket_id, int old_bucket_id, unsigned char *query_bu pgssEntry *entry = NULL; pgssSharedState *pgss = pgsm_get_ss(); -#define MAX_PENDING_QUERIES 128 - /* - * Variables used to store pending queries from the previous bucket. - * - * We use a linked list to keep a full copy of entries from the hash table - * that must be moved to the new bucket. - * - * We use an array to keep a list of pending query IDs only, the array will - * be used in copy_queries() as a filter of which queries to copy. - * The reason we use a separate array to keep pending queries IDs is that it - * is faster to iterate than the linked list, as following pointers in a list - * almost always make bad use of cpu cache, while a small array of uint64 is - * a good candidate to be stored in L1 cache. - * - * If there are more pending queries than MAX_PENDING_QUERIES then - * we try to dynamically allocate memory for them. - */ + /* Store pending query ids from the previous bucket. */ List *pending_entries = NIL; ListCell *pending_entry; - uint64 pending_query_ids[MAX_PENDING_QUERIES]; - uint64 *pending_query_ids_buf = NULL; - size_t n_pending_queries = 0; - bool out_of_memory = false; if (new_bucket_id != -1) { @@ -222,7 +204,7 @@ hash_entry_dealloc(int new_bucket_id, int old_bucket_id, unsigned char *query_bu { /* * Remove all entries if new_bucket_id == -1. - * Otherwise remove entry in new_bucket_id if it's finished already. + * Otherwise remove entry in new_bucket_id if it has finished already. */ if (new_bucket_id < 0 || (entry->key.bucket_id == new_bucket_id && @@ -262,63 +244,12 @@ hash_entry_dealloc(int new_bucket_id, int old_bucket_id, unsigned char *query_bu /* Add the entry to a list of nodes to be processed later. */ pending_entries = lappend(pending_entries, bkp_entry); - /* Add pending query ID to the array. */ - if (n_pending_queries < MAX_PENDING_QUERIES) - { - pending_query_ids[n_pending_queries] = entry->key.queryid; - ++n_pending_queries; - } - else - { - /* - * No. of pending queries exceeds MAX_PENDING_QUERIES. - * Try to dynamically allocate memory to keep track of pending query ids. - * If allocation fails we manually copy pending query to the next query buffer. - */ - if (!out_of_memory && !pending_query_ids_buf) - { - /* Allocate enough room for query ids. */ - pending_query_ids_buf = malloc(sizeof(uint64) * hash_get_num_entries(pgss_hash)); - if (pending_query_ids_buf != NULL) - memcpy(pending_query_ids_buf, pending_query_ids, n_pending_queries * sizeof(uint64)); - else - out_of_memory = true; - } - - if (!out_of_memory) - { - /* Store pending query id in the dynamic buffer. */ - pending_query_ids_buf[n_pending_queries] = entry->key.queryid; - ++n_pending_queries; - } - else - { - /* No memory, manually copy query from previous buffer. */ - char query_txt[1024]; - - if (read_query(query_buffer[old_bucket_id], old_bucket_id, entry->key.queryid, query_txt) != 0 - || read_query_buffer(old_bucket_id, entry->key.queryid, query_txt) == MAX_QUERY_BUFFER_BUCKET) - { - SaveQueryText(new_bucket_id, entry->key.queryid, query_buffer[new_bucket_id], query_txt, strlen(query_txt)); - } - else - /* There was no space available to store the pending query text. */ - elog(ERROR, "hash_entry_dealloc: Failed to move pending query %lX, %s", - entry->key.queryid, - (PGSM_OVERFLOW_TARGET == OVERFLOW_TARGET_NONE) ? - "insufficient shared space for query" : - "I/O error reading query from disk"); - } - } - /* Finally remove the pending query from the expired bucket id. */ entry = hash_search(pgss_hash, &entry->key, HASH_REMOVE, NULL); } } } - Assert(list_length(pending_entries) == n_pending_queries); - /* * Iterate over the list of pending queries in order * to add them back to the hash table with the updated bucket id. @@ -337,19 +268,18 @@ hash_entry_dealloc(int new_bucket_id, int old_bucket_id, unsigned char *query_bu new_entry->counters = old_entry->counters; SpinLockInit(&new_entry->mutex); new_entry->encoding = old_entry->encoding; + /* copy query's text from previous bucket to the new one. */ + copy_query(new_bucket_id, + new_entry->key.queryid, /* query id */ + old_entry->query_pos, /* query position in buffer */ + query_buffer[new_bucket_id], /* destination query buffer */ + query_buffer[old_bucket_id], /* source query buffer */ + &new_entry->query_pos); /* position in which query was inserted into destination buffer */ } free(old_entry); } - /* Copy all detected pending queries from previous bucket id to the new one. */ - if (n_pending_queries > 0) { - if (n_pending_queries < MAX_PENDING_QUERIES) - pending_query_ids_buf = pending_query_ids; - - copy_queries(query_buffer, new_bucket_id, old_bucket_id, pending_query_ids_buf, n_pending_queries); - } - list_free(pending_entries); } @@ -381,52 +311,38 @@ IsHashInitialize(void) pgss_hash != NULL); } -static void copy_queries(unsigned char *query_buffer[], - uint64 new_bucket_id, - uint64 old_bucket_id, - uint64 *query_ids, - size_t n_queries) +static bool copy_query(uint64 bucket_id, + uint64 query_id, + uint64 query_pos, + unsigned char *dst_buf, + unsigned char *src_buf, + size_t *new_query_pos) { - bool found; - uint64 query_id = 0; - uint64 query_len = 0; - uint64 rlen = 0; - uint64 buf_len = 0; - unsigned char *src_buffer = query_buffer[old_bucket_id]; - size_t i; + uint64 query_len = 0; + uint64 buf_len = 0; - memcpy(&buf_len, src_buffer, sizeof (uint64)); + memcpy(&buf_len, src_buf, sizeof (uint64)); if (buf_len <= 0) - return; + return false; - rlen = sizeof (uint64); /* Move forwad to skip length bytes */ - while (rlen < buf_len) + /* Try to locate the query directly. */ + if (query_pos != 0 && (query_pos + sizeof(uint64) + sizeof(uint64)) < buf_len) { - found = false; - memcpy(&query_id, &src_buffer[rlen], sizeof (uint64)); /* query id */ - for (i = 0; i < n_queries; ++i) - { - if (query_id == query_ids[i]) - { - found = true; - break; - } - } + if (*(uint64 *)&src_buf[query_pos] != query_id) + return false; - rlen += sizeof (uint64); - if (buf_len <= rlen) - break; + query_pos += sizeof(uint64); - memcpy(&query_len, &src_buffer[rlen], sizeof (uint64)); /* query len */ - rlen += sizeof (uint64); - if (buf_len < rlen + query_len) - break; + memcpy(&query_len, &src_buf[query_pos], sizeof(uint64)); /* query len */ + query_pos += sizeof(uint64); - if (found) { - SaveQueryText(new_bucket_id, query_id, query_buffer[new_bucket_id], - (const char *)&src_buffer[rlen], query_len); - } + if (query_pos + query_len > buf_len) /* avoid reading past buffer's length. */ + return false; - rlen += query_len; + return SaveQueryText(bucket_id, query_id, dst_buf, + (const char *)&src_buf[query_pos], + query_len, new_query_pos); } -} \ No newline at end of file + + return false; +} diff --git a/pg_stat_monitor.c b/pg_stat_monitor.c index 32334b8..ee63ba7 100644 --- a/pg_stat_monitor.c +++ b/pg_stat_monitor.c @@ -1501,6 +1501,8 @@ pgss_store(uint64 queryid, if (!entry) { uint64 prev_qbuf_len; + /* position in which the query's text was inserted into the query buffer. */ + size_t qpos = 0; query_len = strlen(query); if (query_len > PGSM_QUERY_MAX_LEN) @@ -1516,7 +1518,7 @@ pgss_store(uint64 queryid, * original length. */ memcpy(&prev_qbuf_len, pgss_qbuf[bucketid], sizeof(prev_qbuf_len)); - if (!SaveQueryText(bucketid, queryid, pgss_qbuf[bucketid], query, query_len)) + if (!SaveQueryText(bucketid, queryid, pgss_qbuf[bucketid], query, query_len, &qpos)) { LWLockRelease(pgss->lock); elog(DEBUG1, "pg_stat_monitor: insufficient shared space for query."); @@ -1533,6 +1535,7 @@ pgss_store(uint64 queryid, elog(DEBUG1, "pg_stat_monitor: out of memory"); return; } + entry->query_pos = qpos; } if (jstate == NULL) @@ -1689,7 +1692,7 @@ pg_stat_monitor_internal(FunctionCallInfo fcinfo, bool is_allowed_role = is_member_of_role(GetUserId(), ROLE_PG_READ_ALL_STATS); #endif - if (read_query(buf, bucketid, queryid, query_txt) == 0) + if (read_query(buf, queryid, query_txt, entry->query_pos) == 0) { int len; len = read_query_buffer(bucketid, queryid, query_txt); @@ -1709,16 +1712,16 @@ pg_stat_monitor_internal(FunctionCallInfo fcinfo, if (tmp.state == PGSS_FINISHED) continue; } - if (tmp.info.parentid != UINT64CONST(0)) - { - int len = 0; - if (read_query(buf, bucketid, tmp.info.parentid, parent_query_txt) == 0) - { - len = read_query_buffer(bucketid, tmp.info.parentid, parent_query_txt); - if (len != MAX_QUERY_BUFFER_BUCKET) - snprintf(parent_query_txt, 32, "%s", ""); - } - } + if (tmp.info.parentid != UINT64CONST(0)) + { + int len = 0; + if (read_query(buf, tmp.info.parentid, parent_query_txt, 0) == 0) + { + len = read_query_buffer(bucketid, tmp.info.parentid, parent_query_txt); + if (len != MAX_QUERY_BUFFER_BUCKET) + snprintf(parent_query_txt, 32, "%s", ""); + } + } /* bucketid at column number 0 */ values[i++] = Int64GetDatumFast(bucketid); @@ -2984,7 +2987,7 @@ intarray_get_datum(int32 arr[], int len) } uint64 -read_query(unsigned char *buf, uint64 bucketid, uint64 queryid, char * query) +read_query(unsigned char *buf, uint64 queryid, char * query, size_t pos) { bool found = false; uint64 query_id = 0; @@ -2996,6 +2999,27 @@ read_query(unsigned char *buf, uint64 bucketid, uint64 queryid, char * query) if (buf_len <= 0) goto exit; + /* If a position hint is given, try to locate the query directly. */ + if (pos != 0 && (pos + sizeof(uint64) + sizeof(uint64)) < buf_len) + { + memcpy(&query_id, &buf[pos], sizeof(uint64)); + if (query_id != queryid) + return 0; + + pos += sizeof(uint64); + + memcpy(&query_len, &buf[pos], sizeof(uint64)); /* query len */ + pos += sizeof(uint64); + + if (pos + query_len > buf_len) /* avoid reading past buffer's length. */ + return 0; + + memcpy(query, &buf[pos], query_len); /* Actual query */ + query[query_len] = '\0'; + + return queryid; + } + rlen = sizeof (uint64); /* Move forwad to skip length bytes */ for(;;) { @@ -3005,6 +3029,7 @@ read_query(unsigned char *buf, uint64 bucketid, uint64 queryid, char * query) memcpy(&query_id, &buf[rlen], sizeof (uint64)); /* query id */ if (query_id == queryid) found = true; + rlen += sizeof (uint64); if (buf_len <= rlen) continue; @@ -3034,7 +3059,12 @@ exit: } bool -SaveQueryText(uint64 bucketid, uint64 queryid, unsigned char *buf, const char *query, uint64 query_len) +SaveQueryText(uint64 bucketid, + uint64 queryid, + unsigned char *buf, + const char *query, + uint64 query_len, + size_t *query_pos) { uint64 buf_len = 0; @@ -3059,6 +3089,8 @@ SaveQueryText(uint64 bucketid, uint64 queryid, unsigned char *buf, const char *q } } + *query_pos = buf_len; + memcpy(&buf[buf_len], &queryid, sizeof (uint64)); /* query id */ buf_len += sizeof (uint64); @@ -3293,7 +3325,7 @@ read_query_buffer(int bucket_id, uint64 queryid, char *query_txt) break; } off += buf_len; - if (read_query(buf, bucket_id, queryid, query_txt)) + if (read_query(buf, queryid, query_txt, 0)) break; } if (fd > 0) diff --git a/pg_stat_monitor.h b/pg_stat_monitor.h index 2eab42b..2ddfe01 100644 --- a/pg_stat_monitor.h +++ b/pg_stat_monitor.h @@ -294,6 +294,7 @@ typedef struct pgssEntry Counters counters; /* the statistics for this query */ int encoding; /* query text encoding */ slock_t mutex; /* protects the counters only */ + size_t query_pos; /* query location within query buffer */ } pgssEntry; /* @@ -361,7 +362,12 @@ typedef struct JumbleState /* Links to shared memory state */ -bool SaveQueryText(uint64 bucketid, uint64 queryid, unsigned char *buf, const char *query, uint64 query_len); +bool SaveQueryText(uint64 bucketid, + uint64 queryid, + unsigned char *buf, + const char *query, + uint64 query_len, + size_t *query_pos); /* guc.c */ void init_guc(void); @@ -385,7 +391,7 @@ pgssEntry* hash_entry_alloc(pgssSharedState *pgss, pgssHashKey *key, int encodin Size hash_memsize(void); int read_query_buffer(int bucket_id, uint64 queryid, char *query_txt); -uint64 read_query(unsigned char *buf, uint64 bucketid, uint64 queryid, char * query); +uint64 read_query(unsigned char *buf, uint64 queryid, char * query, size_t pos); pgssQueryEntry* hash_find_query_entry(uint64 bucket_id, uint64 queryid, uint64 dbid, uint64 userid, uint64 ip, uint64 appid); pgssQueryEntry* hash_create_query_entry(uint64 bucket_id, uint64 queryid, uint64 dbid, uint64 userid, uint64 ip, uint64 appid); void pgss_startup(void);