diff --git a/hash_query.c b/hash_query.c index 078fa47..29d0cc8 100644 --- a/hash_query.c +++ b/hash_query.c @@ -22,18 +22,19 @@ static pgssSharedState *pgss; static HTAB *pgss_hash; -static HTAB *pgss_query_hash; static HTAB* hash_init(const char *hash_name, int key_size, int entry_size, int hash_size); /* - * Copy all queries from query_buffer[old_bucket_id] to query_buffer[new_bucket_id] - * whose query ids are found in the array 'query_ids', of length 'n_queries'. + * Copy query from src_buffer to dst_buff. + * Use query_id and query_pos to fast locate query in source buffer. + * Store updated query position in the destination buffer into param query_pos. */ -static void copy_queries(unsigned char *query_buffer[], - uint64 new_bucket_id, - uint64 old_bucket_id, - uint64 *query_ids, - size_t n_queries); +static bool copy_query(uint64 bucket_id, + uint64 query_id, + uint64 query_pos, + unsigned char *dst_buf, + unsigned char *src_buf, + size_t *new_query_pos); static HTAB* hash_init(const char *hash_name, int key_size, int entry_size, int hash_size) @@ -55,7 +56,6 @@ pgss_startup(void) pgss = NULL; pgss_hash = NULL; - pgss_query_hash = NULL; /* * Create or attach to the shared memory state, including hash table @@ -85,7 +85,6 @@ pgss_startup(void) } pgss_hash = hash_init("pg_stat_monitor: bucket hashtable", sizeof(pgssHashKey), sizeof(pgssEntry), MAX_BUCKET_ENTRIES); - pgss_query_hash = hash_init("pg_stat_monitor: query hashtable", sizeof(pgssQueryHashKey), sizeof(pgssQueryEntry),MAX_BUCKET_ENTRIES); LWLockRelease(AddinShmemInitLock); @@ -169,146 +168,44 @@ hash_entry_alloc(pgssSharedState *pgss, pgssHashKey *key, int encoding) elog(DEBUG1, "%s", "pg_stat_monitor: out of memory"); return entry; } + /* - * Reset all the entries. + * Prepare resources for using the new bucket: + * - Deallocate finished hash table entries in new_bucket_id (entries whose + * state is PGSS_FINISHED or PGSS_FINISHED). + * - Clear query buffer for new_bucket_id. + * - If old_bucket_id != -1, move all pending hash table entries in + * old_bucket_id to the new bucket id, also move pending queries from the + * previous query buffer (query_buffer[old_bucket_id]) to the new one + * (query_buffer[new_bucket_id]). * * Caller must hold an exclusive lock on pgss->lock. */ void -hash_query_entryies_reset() -{ - HASH_SEQ_STATUS hash_seq; - pgssQueryEntry *entry; - - hash_seq_init(&hash_seq, pgss_query_hash); - while ((entry = hash_seq_search(&hash_seq)) != NULL) - entry = hash_search(pgss_query_hash, &entry->key, HASH_REMOVE, NULL); -} - - -/* - * Deallocate finished entries in new_bucket_id. - * - * Move all pending queries in query_buffer[old_bucket_id] to - * query_buffer[new_bucket_id]. - * - * Caller must hold an exclusive lock on pgss->lock. - */ -void -hash_query_entry_dealloc(int new_bucket_id, int old_bucket_id, unsigned char *query_buffer[]) -{ - HASH_SEQ_STATUS hash_seq; - pgssQueryEntry *entry; - pgssSharedState *pgss = pgsm_get_ss(); - /* - * Store pending query ids from the previous bucket. - * If there are more pending queries than MAX_PENDING_QUERIES then - * we try to dynamically allocate memory for them. - */ -#define MAX_PENDING_QUERIES 128 - uint64 pending_query_ids[MAX_PENDING_QUERIES]; - uint64 *pending_query_ids_buf = NULL; - size_t n_pending_queries = 0; - bool out_of_memory = false; - - /* Clear all queries in the query buffer for the new bucket. */ - memset(query_buffer[new_bucket_id], 0, pgss->query_buf_size_bucket); - - hash_seq_init(&hash_seq, pgss_query_hash); - while ((entry = hash_seq_search(&hash_seq)) != NULL) - { - /* Remove previous finished query entries matching new bucket id. */ - if (entry->key.bucket_id == new_bucket_id) - { - if (entry->state == PGSS_FINISHED || entry->state == PGSS_ERROR) - { - entry = hash_search(pgss_query_hash, &entry->key, HASH_REMOVE, NULL); - } - } - /* Set up a list of pending query ids from the previous bucket. */ - else if (entry->key.bucket_id == old_bucket_id && - (entry->state == PGSS_PARSE || - entry->state == PGSS_PLAN || - entry->state == PGSS_EXEC)) - { - if (n_pending_queries < MAX_PENDING_QUERIES) - { - pending_query_ids[n_pending_queries] = entry->key.queryid; - ++n_pending_queries; - } - else - { - /* - * No. of pending queries exceeds MAX_PENDING_QUERIES. - * Try to allocate memory from heap to keep track of pending query ids. - * If allocation fails we manually copy pending query to the next query buffer. - */ - if (!out_of_memory && !pending_query_ids_buf) - { - /* Allocate enough room for query ids. */ - pending_query_ids_buf = malloc(sizeof(uint64) * hash_get_num_entries(pgss_query_hash)); - if (pending_query_ids_buf != NULL) - memcpy(pending_query_ids_buf, pending_query_ids, n_pending_queries * sizeof(uint64)); - else - out_of_memory = true; - } - - if (!out_of_memory) - { - /* Store pending query id in the dynamic buffer. */ - pending_query_ids_buf[n_pending_queries] = entry->key.queryid; - ++n_pending_queries; - } - else - { - /* No memory, manually copy query from previous buffer. */ - char query_txt[1024]; - - if (read_query(query_buffer[old_bucket_id], old_bucket_id, entry->key.queryid, query_txt) != 0 - || read_query_buffer(old_bucket_id, entry->key.queryid, query_txt) == MAX_QUERY_BUFFER_BUCKET) - { - SaveQueryText(new_bucket_id, entry->key.queryid, query_buffer[new_bucket_id], query_txt, strlen(query_txt)); - } - else - /* There was no space available to store the pending query text. */ - elog(WARNING, "hash_query_entry_dealloc: Failed to move pending query %lX, %s", - entry->key.queryid, - (PGSM_OVERFLOW_TARGET == OVERFLOW_TARGET_NONE) ? - "insufficient shared space for query" : - "I/O error reading query from disk"); - } - } - } - } - - /* Copy all detected pending queries from previous bucket id to the new one. */ - if (n_pending_queries > 0) { - if (n_pending_queries < MAX_PENDING_QUERIES) - pending_query_ids_buf = pending_query_ids; - - copy_queries(query_buffer, new_bucket_id, old_bucket_id, pending_query_ids_buf, n_pending_queries); - } -} - -/* - * Deallocate least-used entries. - * - * If old_bucket_id != -1, move all pending queries in old_bucket_id - * to the new bucket id. - * - * Caller must hold an exclusive lock on pgss->lock. - */ -bool -hash_entry_dealloc(int new_bucket_id, int old_bucket_id) +hash_entry_dealloc(int new_bucket_id, int old_bucket_id, unsigned char *query_buffer[]) { HASH_SEQ_STATUS hash_seq; pgssEntry *entry = NULL; - List *pending_entries = NIL; - ListCell *pending_entry; + pgssSharedState *pgss = pgsm_get_ss(); + /* Store pending query ids from the previous bucket. */ + List *pending_entries = NIL; + ListCell *pending_entry; + + if (new_bucket_id != -1) + { + /* Clear all queries in the query buffer for the new bucket. */ + memset(query_buffer[new_bucket_id], 0, pgss->query_buf_size_bucket); + } + + /* Iterate over the hash table. */ hash_seq_init(&hash_seq, pgss_hash); while ((entry = hash_seq_search(&hash_seq)) != NULL) { + /* + * Remove all entries if new_bucket_id == -1. + * Otherwise remove entry in new_bucket_id if it has finished already. + */ if (new_bucket_id < 0 || (entry->key.bucket_id == new_bucket_id && (entry->counters.state == PGSS_FINISHED || entry->counters.state == PGSS_ERROR))) @@ -333,6 +230,7 @@ hash_entry_dealloc(int new_bucket_id, int old_bucket_id) if (!bkp_entry) { /* No memory, remove pending query entry from the previous bucket. */ + elog(ERROR, "hash_entry_dealloc: out of memory"); entry = hash_search(pgss_hash, &entry->key, HASH_REMOVE, NULL); continue; } @@ -370,14 +268,19 @@ hash_entry_dealloc(int new_bucket_id, int old_bucket_id) new_entry->counters = old_entry->counters; SpinLockInit(&new_entry->mutex); new_entry->encoding = old_entry->encoding; + /* copy query's text from previous bucket to the new one. */ + copy_query(new_bucket_id, + new_entry->key.queryid, /* query id */ + old_entry->query_pos, /* query position in buffer */ + query_buffer[new_bucket_id], /* destination query buffer */ + query_buffer[old_bucket_id], /* source query buffer */ + &new_entry->query_pos); /* position in which query was inserted into destination buffer */ } free(old_entry); } list_free(pending_entries); - - return true; } /* @@ -401,45 +304,6 @@ hash_entry_reset() LWLockRelease(pgss->lock); } -/* Caller must acquire a lock */ -pgssQueryEntry* -hash_create_query_entry(uint64 bucket_id, uint64 queryid, uint64 dbid, uint64 userid, uint64 ip, uint64 appid) -{ - pgssQueryHashKey key; - pgssQueryEntry *entry; - bool found; - - key.queryid = queryid; - key.bucket_id = bucket_id; - key.dbid = dbid; - key.userid = userid; - key.ip = ip; - key.appid = appid; - - entry = (pgssQueryEntry *) hash_search(pgss_query_hash, &key, HASH_ENTER_NULL, &found); - return entry; -} - -/* Caller must acquire a lock */ -pgssQueryEntry* -hash_find_query_entry(uint64 bucket_id, uint64 queryid, uint64 dbid, uint64 userid, uint64 ip, uint64 appid) -{ - pgssQueryHashKey key; - pgssQueryEntry *entry; - bool found; - - key.queryid = queryid; - key.bucket_id = bucket_id; - key.dbid = dbid; - key.userid = userid; - key.ip = ip; - key.appid = appid; - - /* Lookup the hash table entry with shared lock. */ - entry = (pgssQueryEntry *) hash_search(pgss_query_hash, &key, HASH_FIND, &found); - return entry; -} - bool IsHashInitialize(void) { @@ -447,52 +311,38 @@ IsHashInitialize(void) pgss_hash != NULL); } -static void copy_queries(unsigned char *query_buffer[], - uint64 new_bucket_id, - uint64 old_bucket_id, - uint64 *query_ids, - size_t n_queries) +static bool copy_query(uint64 bucket_id, + uint64 query_id, + uint64 query_pos, + unsigned char *dst_buf, + unsigned char *src_buf, + size_t *new_query_pos) { - bool found; - uint64 query_id = 0; - uint64 query_len = 0; - uint64 rlen = 0; - uint64 buf_len = 0; - unsigned char *src_buffer = query_buffer[old_bucket_id]; - size_t i; + uint64 query_len = 0; + uint64 buf_len = 0; - memcpy(&buf_len, src_buffer, sizeof (uint64)); + memcpy(&buf_len, src_buf, sizeof (uint64)); if (buf_len <= 0) - return; + return false; - rlen = sizeof (uint64); /* Move forwad to skip length bytes */ - while (rlen < buf_len) + /* Try to locate the query directly. */ + if (query_pos != 0 && (query_pos + sizeof(uint64) + sizeof(uint64)) < buf_len) { - found = false; - memcpy(&query_id, &src_buffer[rlen], sizeof (uint64)); /* query id */ - for (i = 0; i < n_queries; ++i) - { - if (query_id == query_ids[i]) - { - found = true; - break; - } - } + if (*(uint64 *)&src_buf[query_pos] != query_id) + return false; - rlen += sizeof (uint64); - if (buf_len <= rlen) - break; + query_pos += sizeof(uint64); - memcpy(&query_len, &src_buffer[rlen], sizeof (uint64)); /* query len */ - rlen += sizeof (uint64); - if (buf_len < rlen + query_len) - break; + memcpy(&query_len, &src_buf[query_pos], sizeof(uint64)); /* query len */ + query_pos += sizeof(uint64); - if (found) { - SaveQueryText(new_bucket_id, query_id, query_buffer[new_bucket_id], - (const char *)&src_buffer[rlen], query_len); - } + if (query_pos + query_len > buf_len) /* avoid reading past buffer's length. */ + return false; - rlen += query_len; + return SaveQueryText(bucket_id, query_id, dst_buf, + (const char *)&src_buf[query_pos], + query_len, new_query_pos); } -} \ No newline at end of file + + return false; +} diff --git a/pg_stat_monitor.c b/pg_stat_monitor.c index d6b6ddc..ee63ba7 100644 --- a/pg_stat_monitor.c +++ b/pg_stat_monitor.c @@ -147,15 +147,6 @@ static uint64 pgss_hash_string(const char *str, int len); char *unpack_sql_state(int sql_state); static void pgss_store_error(uint64 queryid, const char * query, ErrorData *edata); -static pgssQueryEntry *pgss_store_query_info(uint64 bucketid, - uint64 queryid, - uint64 dbid, - uint64 userid, - uint64 ip, - uint64 appid, - const char *query, - uint64 query_len, - pgssStoreKind kind); static void pgss_store_utility(const char *query, double total_time, @@ -1314,40 +1305,6 @@ pgss_update_entry(pgssEntry *entry, } } -static pgssEntry* -pgss_get_entry(uint64 bucket_id, - uint64 userid, - uint64 dbid, - uint64 queryid, - uint64 ip, - uint64 planid, - uint64 appid) -{ - pgssEntry *entry; - pgssHashKey key; - HTAB *pgss_hash = pgsm_get_hash(); - pgssSharedState *pgss = pgsm_get_ss(); - - key.bucket_id = bucket_id; - key.userid = userid; - key.dbid = MyDatabaseId; - key.queryid = queryid; - key.ip = pg_get_client_addr(); - key.planid = planid; - key.appid = appid; - - entry = (pgssEntry *) hash_search(pgss_hash, &key, HASH_FIND, NULL); - if(!entry) - { - /* OK to create a new hashtable entry */ - entry = hash_entry_alloc(pgss, &key, GetDatabaseEncoding()); - if (entry == NULL) - return NULL; - } - Assert(entry); - return entry; -} - static void pgss_store_query(uint64 queryid, const char * query, @@ -1486,100 +1443,119 @@ pgss_store(uint64 queryid, JumbleState *jstate, pgssStoreKind kind) { - pgssEntry *entry; + HTAB *pgss_hash; + pgssHashKey key; + pgssEntry *entry; pgssSharedState *pgss = pgsm_get_ss(); - char application_name[APPLICATIONNAME_LEN]; - int application_name_len = pg_get_application_name(application_name); - bool reset = false; - uint64 bucketid; - uint64 prev_bucket_id; + char application_name[APPLICATIONNAME_LEN]; + int application_name_len; + bool reset = false; + uint64 bucketid; + uint64 prev_bucket_id; uint64 userid; int con; - uint64 dbid = MyDatabaseId; - uint64 ip = pg_get_client_addr(); - uint64 planid = plan_info ? plan_info->planid: 0; - uint64 appid = djb2_hash((unsigned char *)application_name, application_name_len); + uint64 planid; + uint64 appid; char comments[512] = ""; - bool out_of_memory = false; + size_t query_len; + /* Monitoring is disabled */ if (!PGSM_ENABLED) return; - Assert(query != NULL); - if (kind == PGSS_ERROR) - GetUserIdAndSecContext((unsigned int *)&userid, &con); - else - userid = GetUserId(); - - extract_query_comments(query, comments, sizeof(comments)); - /* Safety check... */ if (!IsSystemInitialized() || !pgss_qbuf[pg_atomic_read_u64(&pgss->current_wbucket)]) return; + Assert(query != NULL); + if (kind == PGSS_ERROR) + GetUserIdAndSecContext((unsigned int *)&userid, &con); + else + userid = GetUserId(); + + application_name_len = pg_get_application_name(application_name); + planid = plan_info ? plan_info->planid: 0; + appid = djb2_hash((unsigned char *)application_name, application_name_len); + + extract_query_comments(query, comments, sizeof(comments)); + prev_bucket_id = pg_atomic_read_u64(&pgss->current_wbucket); bucketid = get_next_wbucket(pgss); if (bucketid != prev_bucket_id) reset = true; - LWLockAcquire(pgss->lock, LW_EXCLUSIVE); + key.bucket_id = bucketid; + key.userid = userid; + key.dbid = MyDatabaseId; + key.queryid = queryid; + key.ip = pg_get_client_addr(); + key.planid = planid; + key.appid = appid; - switch (kind) + pgss_hash = pgsm_get_hash(); + + LWLockAcquire(pgss->lock, LW_SHARED); + + entry = (pgssEntry *) hash_search(pgss_hash, &key, HASH_FIND, NULL); + if (!entry) { - case PGSS_PARSE: - case PGSS_PLAN: - { - pgssQueryEntry *query_entry; - query_entry = pgss_store_query_info(bucketid, queryid, dbid, userid, ip, appid, query, strlen(query), kind); - if (query_entry == NULL) - out_of_memory = true; - break; - } - case PGSS_ERROR: - case PGSS_EXEC: - case PGSS_FINISHED: - { - pgssQueryEntry *query_entry; - query_entry = pgss_store_query_info(bucketid, queryid, dbid, userid, ip, appid, query, strlen(query), kind); - if (query_entry == NULL) - { - out_of_memory = true; - break; - } - query_entry->state = kind; - entry = pgss_get_entry(bucketid, userid, dbid, queryid, ip, planid, appid); - if (entry == NULL) - { - out_of_memory = true; - break; - } + uint64 prev_qbuf_len; + /* position in which the query's text was inserted into the query buffer. */ + size_t qpos = 0; - if (jstate == NULL) - pgss_update_entry(entry, /* entry */ - bucketid, /* bucketid */ - queryid, /* queryid */ - query, /* query */ - comments, /* comments */ - plan_info, /* PlanInfo */ - cmd_type, /* CmdType */ - sys_info, /* SysInfo */ - error_info, /* ErrorInfo */ - total_time, /* total_time */ - rows, /* rows */ - bufusage, /* bufusage */ - walusage, /* walusage */ - reset, /* reset */ - kind); /* kind */ + query_len = strlen(query); + if (query_len > PGSM_QUERY_MAX_LEN) + query_len = PGSM_QUERY_MAX_LEN; + + /* Need exclusive lock to make a new hashtable entry - promote */ + LWLockRelease(pgss->lock); + LWLockAcquire(pgss->lock, LW_EXCLUSIVE); + + /* + * Save current query buffer length, if we fail to add a new + * new entry to the hash table then we must restore the + * original length. + */ + memcpy(&prev_qbuf_len, pgss_qbuf[bucketid], sizeof(prev_qbuf_len)); + if (!SaveQueryText(bucketid, queryid, pgss_qbuf[bucketid], query, query_len, &qpos)) + { + LWLockRelease(pgss->lock); + elog(DEBUG1, "pg_stat_monitor: insufficient shared space for query."); + return; } - break; - case PGSS_NUMKIND: - case PGSS_INVALID: - break; + + /* OK to create a new hashtable entry */ + entry = hash_entry_alloc(pgss, &key, GetDatabaseEncoding()); + if (entry == NULL) + { + /* Restore previous query buffer length. */ + memcpy(pgss_qbuf[bucketid], &prev_qbuf_len, sizeof(prev_qbuf_len)); + LWLockRelease(pgss->lock); + elog(DEBUG1, "pg_stat_monitor: out of memory"); + return; + } + entry->query_pos = qpos; } + + if (jstate == NULL) + pgss_update_entry(entry, /* entry */ + bucketid, /* bucketid */ + queryid, /* queryid */ + query, /* query */ + comments, /* comments */ + plan_info, /* PlanInfo */ + cmd_type, /* CmdType */ + sys_info, /* SysInfo */ + error_info, /* ErrorInfo */ + total_time, /* total_time */ + rows, /* rows */ + bufusage, /* bufusage */ + walusage, /* walusage */ + reset, /* reset */ + kind); /* kind */ + LWLockRelease(pgss->lock); - if (out_of_memory) - elog(DEBUG1, "pg_stat_monitor: out of memory"); } /* * Reset all statement statistics. @@ -1594,8 +1570,12 @@ pg_stat_monitor_reset(PG_FUNCTION_ARGS) (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("pg_stat_monitor: must be loaded via shared_preload_libraries"))); LWLockAcquire(pgss->lock, LW_EXCLUSIVE); - hash_entry_dealloc(-1, -1); - hash_query_entryies_reset(); + hash_entry_dealloc(-1, -1, NULL); + /* Reset query buffers. */ + for (size_t i = 0; i < MAX_BUCKETS; ++i) + { + *(uint64 *)pgss_qbuf[i] = 0; + } #ifdef BENCHMARK for (int i = STATS_START; i < STATS_END; ++i) { pg_hook_stats[i].min_time = 0; @@ -1646,7 +1626,6 @@ pg_stat_monitor_internal(FunctionCallInfo fcinfo, MemoryContext oldcontext; HASH_SEQ_STATUS hash_seq; pgssEntry *entry; - pgssQueryEntry *query_entry; char parentid_txt[32]; pgssSharedState *pgss = pgsm_get_ss(); HTAB *pgss_hash = pgsm_get_hash(); @@ -1706,19 +1685,14 @@ pg_stat_monitor_internal(FunctionCallInfo fcinfo, uint64 userid = entry->key.userid; uint64 ip = entry->key.ip; uint64 planid = entry->key.planid; - uint64 appid = entry->key.appid; unsigned char *buf = pgss_qbuf[bucketid]; #if PG_VERSION_NUM < 140000 bool is_allowed_role = is_member_of_role(GetUserId(), DEFAULT_ROLE_READ_ALL_STATS); #else bool is_allowed_role = is_member_of_role(GetUserId(), ROLE_PG_READ_ALL_STATS); #endif - query_entry = hash_find_query_entry(bucketid, queryid, dbid, userid, ip, appid); - if (query_entry == NULL) - continue; - - if (read_query(buf, bucketid, queryid, query_txt) == 0) + if (read_query(buf, queryid, query_txt, entry->query_pos) == 0) { int len; len = read_query_buffer(bucketid, queryid, query_txt); @@ -1738,16 +1712,16 @@ pg_stat_monitor_internal(FunctionCallInfo fcinfo, if (tmp.state == PGSS_FINISHED) continue; } - if (tmp.info.parentid != UINT64CONST(0)) - { - int len = 0; - if (read_query(buf, bucketid, tmp.info.parentid, parent_query_txt) == 0) - { - len = read_query_buffer(bucketid, tmp.info.parentid, parent_query_txt); - if (len != MAX_QUERY_BUFFER_BUCKET) - snprintf(parent_query_txt, 32, "%s", ""); - } - } + if (tmp.info.parentid != UINT64CONST(0)) + { + int len = 0; + if (read_query(buf, tmp.info.parentid, parent_query_txt, 0) == 0) + { + len = read_query_buffer(bucketid, tmp.info.parentid, parent_query_txt); + if (len != MAX_QUERY_BUFFER_BUCKET) + snprintf(parent_query_txt, 32, "%s", ""); + } + } /* bucketid at column number 0 */ values[i++] = Int64GetDatumFast(bucketid); @@ -2057,8 +2031,7 @@ get_next_wbucket(pgssSharedState *pgss) prev_bucket_id = pg_atomic_exchange_u64(&pgss->current_wbucket, new_bucket_id); LWLockAcquire(pgss->lock, LW_EXCLUSIVE); - hash_entry_dealloc(new_bucket_id, prev_bucket_id); - hash_query_entry_dealloc(new_bucket_id, prev_bucket_id, pgss_qbuf); + hash_entry_dealloc(new_bucket_id, prev_bucket_id, pgss_qbuf); snprintf(file_name, 1024, "%s.%d", PGSM_TEXT_FILE, (int)new_bucket_id); unlink(file_name); @@ -3014,7 +2987,7 @@ intarray_get_datum(int32 arr[], int len) } uint64 -read_query(unsigned char *buf, uint64 bucketid, uint64 queryid, char * query) +read_query(unsigned char *buf, uint64 queryid, char * query, size_t pos) { bool found = false; uint64 query_id = 0; @@ -3026,6 +2999,27 @@ read_query(unsigned char *buf, uint64 bucketid, uint64 queryid, char * query) if (buf_len <= 0) goto exit; + /* If a position hint is given, try to locate the query directly. */ + if (pos != 0 && (pos + sizeof(uint64) + sizeof(uint64)) < buf_len) + { + memcpy(&query_id, &buf[pos], sizeof(uint64)); + if (query_id != queryid) + return 0; + + pos += sizeof(uint64); + + memcpy(&query_len, &buf[pos], sizeof(uint64)); /* query len */ + pos += sizeof(uint64); + + if (pos + query_len > buf_len) /* avoid reading past buffer's length. */ + return 0; + + memcpy(query, &buf[pos], query_len); /* Actual query */ + query[query_len] = '\0'; + + return queryid; + } + rlen = sizeof (uint64); /* Move forwad to skip length bytes */ for(;;) { @@ -3035,6 +3029,7 @@ read_query(unsigned char *buf, uint64 bucketid, uint64 queryid, char * query) memcpy(&query_id, &buf[rlen], sizeof (uint64)); /* query id */ if (query_id == queryid) found = true; + rlen += sizeof (uint64); if (buf_len <= rlen) continue; @@ -3063,44 +3058,13 @@ exit: return 0; } -static pgssQueryEntry* -pgss_store_query_info(uint64 bucketid, - uint64 queryid, - uint64 dbid, - uint64 userid, - uint64 ip, - uint64 appid, - const char *query, - uint64 query_len, - pgssStoreKind kind) -{ - pgssSharedState *pgss = pgsm_get_ss(); - unsigned char *buf = pgss_qbuf[pg_atomic_read_u64(&pgss->current_wbucket)]; - pgssQueryEntry *entry; - - if (query_len > PGSM_QUERY_MAX_LEN) - query_len = PGSM_QUERY_MAX_LEN; - - /* Already have query in the shared buffer, there - * is no need to add that again. - */ - entry = hash_find_query_entry(bucketid, queryid, dbid, userid, ip, appid); - if (entry) - return entry; - - entry = hash_create_query_entry(bucketid, queryid, dbid, userid, ip, appid); - if (!entry) - return NULL; - entry->state = kind; - - if(!SaveQueryText(bucketid, queryid, buf, query, query_len)) - return NULL; - - return entry; -} - bool -SaveQueryText(uint64 bucketid, uint64 queryid, unsigned char *buf, const char *query, uint64 query_len) +SaveQueryText(uint64 bucketid, + uint64 queryid, + unsigned char *buf, + const char *query, + uint64 query_len, + size_t *query_pos) { uint64 buf_len = 0; @@ -3125,6 +3089,8 @@ SaveQueryText(uint64 bucketid, uint64 queryid, unsigned char *buf, const char *q } } + *query_pos = buf_len; + memcpy(&buf[buf_len], &queryid, sizeof (uint64)); /* query id */ buf_len += sizeof (uint64); @@ -3359,7 +3325,7 @@ read_query_buffer(int bucket_id, uint64 queryid, char *query_txt) break; } off += buf_len; - if (read_query(buf, bucket_id, queryid, query_txt)) + if (read_query(buf, queryid, query_txt, 0)) break; } if (fd > 0) diff --git a/pg_stat_monitor.h b/pg_stat_monitor.h index 94a0f83..2ddfe01 100644 --- a/pg_stat_monitor.h +++ b/pg_stat_monitor.h @@ -294,6 +294,7 @@ typedef struct pgssEntry Counters counters; /* the statistics for this query */ int encoding; /* query text encoding */ slock_t mutex; /* protects the counters only */ + size_t query_pos; /* query location within query buffer */ } pgssEntry; /* @@ -361,7 +362,12 @@ typedef struct JumbleState /* Links to shared memory state */ -bool SaveQueryText(uint64 bucketid, uint64 queryid, unsigned char *buf, const char *query, uint64 query_len); +bool SaveQueryText(uint64 bucketid, + uint64 queryid, + unsigned char *buf, + const char *query, + uint64 query_len, + size_t *query_pos); /* guc.c */ void init_guc(void); @@ -380,12 +386,12 @@ void hash_entry_reset(void); void hash_query_entryies_reset(void); void hash_query_entries(); void hash_query_entry_dealloc(int new_bucket_id, int old_bucket_id, unsigned char *query_buffer[]); -bool hash_entry_dealloc(int new_bucket_id, int old_bucket_id); +void hash_entry_dealloc(int new_bucket_id, int old_bucket_id, unsigned char *query_buffer[]); pgssEntry* hash_entry_alloc(pgssSharedState *pgss, pgssHashKey *key, int encoding); Size hash_memsize(void); int read_query_buffer(int bucket_id, uint64 queryid, char *query_txt); -uint64 read_query(unsigned char *buf, uint64 bucketid, uint64 queryid, char * query); +uint64 read_query(unsigned char *buf, uint64 queryid, char * query, size_t pos); pgssQueryEntry* hash_find_query_entry(uint64 bucket_id, uint64 queryid, uint64 dbid, uint64 userid, uint64 ip, uint64 appid); pgssQueryEntry* hash_create_query_entry(uint64 bucket_id, uint64 queryid, uint64 dbid, uint64 userid, uint64 ip, uint64 appid); void pgss_startup(void);