diff --git a/hash_query.c b/hash_query.c index 29d0cc8..9ce3477 100644 --- a/hash_query.c +++ b/hash_query.c @@ -22,19 +22,8 @@ static pgssSharedState *pgss; static HTAB *pgss_hash; +static HTAB *pgss_query_hash; -static HTAB* hash_init(const char *hash_name, int key_size, int entry_size, int hash_size); -/* - * Copy query from src_buffer to dst_buff. - * Use query_id and query_pos to fast locate query in source buffer. - * Store updated query position in the destination buffer into param query_pos. - */ -static bool copy_query(uint64 bucket_id, - uint64 query_id, - uint64 query_pos, - unsigned char *dst_buf, - unsigned char *src_buf, - size_t *new_query_pos); static HTAB* hash_init(const char *hash_name, int key_size, int entry_size, int hash_size) @@ -50,7 +39,6 @@ void pgss_startup(void) { bool found = false; - int32 i; /* reset in case this is a restart within the postmaster */ @@ -75,16 +63,10 @@ pgss_startup(void) init_hook_stats(); #endif - pgss->query_buf_size_bucket = MAX_QUERY_BUF / PGSM_MAX_BUCKETS; - - for (i = 0; i < PGSM_MAX_BUCKETS; i++) - { - unsigned char *buf = (unsigned char *)ShmemAlloc(pgss->query_buf_size_bucket); - set_qbuf(i, buf); - memset(buf, 0, sizeof (uint64)); - } + set_qbuf((unsigned char *)ShmemAlloc(MAX_QUERY_BUF)); pgss_hash = hash_init("pg_stat_monitor: bucket hashtable", sizeof(pgssHashKey), sizeof(pgssEntry), MAX_BUCKET_ENTRIES); + pgss_query_hash = hash_init("pg_stat_monitor: queryID hashtable", sizeof(uint64), sizeof(pgssQueryEntry), MAX_BUCKET_ENTRIES); LWLockRelease(AddinShmemInitLock); @@ -107,6 +89,12 @@ pgsm_get_hash(void) return pgss_hash; } +HTAB* +pgsm_get_query_hash(void) +{ + return pgss_query_hash; +} + /* * shmem_shutdown hook: Dump statistics into file. * @@ -182,22 +170,15 @@ hash_entry_alloc(pgssSharedState *pgss, pgssHashKey *key, int encoding) * Caller must hold an exclusive lock on pgss->lock. */ void -hash_entry_dealloc(int new_bucket_id, int old_bucket_id, unsigned char *query_buffer[]) +hash_entry_dealloc(int new_bucket_id, int old_bucket_id, unsigned char *query_buffer) { HASH_SEQ_STATUS hash_seq; pgssEntry *entry = NULL; - pgssSharedState *pgss = pgsm_get_ss(); /* Store pending query ids from the previous bucket. */ List *pending_entries = NIL; ListCell *pending_entry; - if (new_bucket_id != -1) - { - /* Clear all queries in the query buffer for the new bucket. */ - memset(query_buffer[new_bucket_id], 0, pgss->query_buf_size_bucket); - } - /* Iterate over the hash table. */ hash_seq_init(&hash_seq, pgss_hash); while ((entry = hash_seq_search(&hash_seq)) != NULL) @@ -210,6 +191,11 @@ hash_entry_dealloc(int new_bucket_id, int old_bucket_id, unsigned char *query_bu (entry->key.bucket_id == new_bucket_id && (entry->counters.state == PGSS_FINISHED || entry->counters.state == PGSS_ERROR))) { + if (new_bucket_id == -1) { + /* pg_stat_monitor_reset(), remove entry from query hash table too. */ + hash_search(pgss_query_hash, &(entry->key.queryid), HASH_REMOVE, NULL); + } + entry = hash_search(pgss_hash, &entry->key, HASH_REMOVE, NULL); } @@ -268,13 +254,6 @@ hash_entry_dealloc(int new_bucket_id, int old_bucket_id, unsigned char *query_bu new_entry->counters = old_entry->counters; SpinLockInit(&new_entry->mutex); new_entry->encoding = old_entry->encoding; - /* copy query's text from previous bucket to the new one. */ - copy_query(new_bucket_id, - new_entry->key.queryid, /* query id */ - old_entry->query_pos, /* query position in buffer */ - query_buffer[new_bucket_id], /* destination query buffer */ - query_buffer[old_bucket_id], /* source query buffer */ - &new_entry->query_pos); /* position in which query was inserted into destination buffer */ } free(old_entry); @@ -310,39 +289,3 @@ IsHashInitialize(void) return (pgss != NULL && pgss_hash != NULL); } - -static bool copy_query(uint64 bucket_id, - uint64 query_id, - uint64 query_pos, - unsigned char *dst_buf, - unsigned char *src_buf, - size_t *new_query_pos) -{ - uint64 query_len = 0; - uint64 buf_len = 0; - - memcpy(&buf_len, src_buf, sizeof (uint64)); - if (buf_len <= 0) - return false; - - /* Try to locate the query directly. */ - if (query_pos != 0 && (query_pos + sizeof(uint64) + sizeof(uint64)) < buf_len) - { - if (*(uint64 *)&src_buf[query_pos] != query_id) - return false; - - query_pos += sizeof(uint64); - - memcpy(&query_len, &src_buf[query_pos], sizeof(uint64)); /* query len */ - query_pos += sizeof(uint64); - - if (query_pos + query_len > buf_len) /* avoid reading past buffer's length. */ - return false; - - return SaveQueryText(bucket_id, query_id, dst_buf, - (const char *)&src_buf[query_pos], - query_len, new_query_pos); - } - - return false; -} diff --git a/pg_stat_monitor.c b/pg_stat_monitor.c index ec43578..78e4599 100644 --- a/pg_stat_monitor.c +++ b/pg_stat_monitor.c @@ -67,7 +67,8 @@ static int num_relations; /* Number of relation in the query */ static bool system_init = false; static struct rusage rusage_start; static struct rusage rusage_end; -static unsigned char *pgss_qbuf[MAX_BUCKETS]; +/* Query buffer, store queries' text. */ +static unsigned char *pgss_qbuf = NULL; static char *pgss_explain(QueryDesc *queryDesc); #ifdef BENCHMARK static struct pg_hook_stats_t *pg_hook_stats; @@ -1311,7 +1312,6 @@ pgss_update_entry(pgssEntry *entry, e->counters.blocks.blk_write_time += INSTR_TIME_GET_MILLISEC(bufusage->blk_write_time); } e->counters.calls.usage += USAGE_EXEC(total_time); - e->counters.info.host = pg_get_client_addr(); if (sys_info) { e->counters.sysinfo.utime = sys_info->utime; @@ -1478,14 +1478,13 @@ pgss_store(uint64 queryid, uint64 planid; uint64 appid; char comments[512] = ""; - size_t query_len; /* Monitoring is disabled */ if (!PGSM_ENABLED) return; /* Safety check... */ - if (!IsSystemInitialized() || !pgss_qbuf[pg_atomic_read_u64(&pgss->current_wbucket)]) + if (!IsSystemInitialized()) return; Assert(query != NULL); @@ -1528,42 +1527,63 @@ pgss_store(uint64 queryid, entry = (pgssEntry *) hash_search(pgss_hash, &key, HASH_FIND, NULL); if (!entry) { - uint64 prev_qbuf_len; - /* position in which the query's text was inserted into the query buffer. */ - size_t qpos = 0; + pgssQueryEntry *query_entry; + size_t query_len = 0; + bool query_found = false; + uint64 prev_qbuf_len = 0; + HTAB *pgss_query_hash; - query_len = strlen(query); - if (query_len > PGSM_QUERY_MAX_LEN) - query_len = PGSM_QUERY_MAX_LEN; + pgss_query_hash = pgsm_get_query_hash(); + + query_entry = hash_search(pgss_query_hash, &queryid, HASH_ENTER_NULL, &query_found); + if (query_entry == NULL) + { + LWLockRelease(pgss->lock); + pgsm_log_error("pgss_store: out of memory (pgss_query_hash)."); + return; + } + else if (!query_found) + { + /* New query, must add it to the buffer, calculate its length. */ + query_len = strlen(query); + if (query_len > PGSM_QUERY_MAX_LEN) + query_len = PGSM_QUERY_MAX_LEN; + } /* Need exclusive lock to make a new hashtable entry - promote */ LWLockRelease(pgss->lock); LWLockAcquire(pgss->lock, LW_EXCLUSIVE); - /* - * Save current query buffer length, if we fail to add a new - * new entry to the hash table then we must restore the - * original length. - */ - memcpy(&prev_qbuf_len, pgss_qbuf[bucketid], sizeof(prev_qbuf_len)); - if (!SaveQueryText(bucketid, queryid, pgss_qbuf[bucketid], query, query_len, &qpos)) + if (!query_found) { - LWLockRelease(pgss->lock); - elog(DEBUG1, "pg_stat_monitor: insufficient shared space for query."); - return; + if (!SaveQueryText(bucketid, queryid, pgss_qbuf, query, query_len, &query_entry->query_pos)) + { + LWLockRelease(pgss->lock); + pgsm_log_error("pgss_store: insufficient shared space for query."); + return; + } + /* + * Save current query buffer length, if we fail to add a new + * new entry to the hash table then we must restore the + * original length. + */ + memcpy(&prev_qbuf_len, pgss_qbuf, sizeof(prev_qbuf_len)); } /* OK to create a new hashtable entry */ entry = hash_entry_alloc(pgss, &key, GetDatabaseEncoding()); if (entry == NULL) { - /* Restore previous query buffer length. */ - memcpy(pgss_qbuf[bucketid], &prev_qbuf_len, sizeof(prev_qbuf_len)); + if (!query_found) + { + /* Restore previous query buffer length. */ + memcpy(pgss_qbuf, &prev_qbuf_len, sizeof(prev_qbuf_len)); + } LWLockRelease(pgss->lock); elog(DEBUG1, "pg_stat_monitor: out of memory"); return; } - entry->query_pos = qpos; + entry->query_pos = query_entry->query_pos; } if (jstate == NULL) @@ -1599,11 +1619,10 @@ pg_stat_monitor_reset(PG_FUNCTION_ARGS) errmsg("pg_stat_monitor: must be loaded via shared_preload_libraries"))); LWLockAcquire(pgss->lock, LW_EXCLUSIVE); hash_entry_dealloc(-1, -1, NULL); - /* Reset query buffers. */ - for (size_t i = 0; i < PGSM_MAX_BUCKETS; ++i) - { - *(uint64 *)pgss_qbuf[i] = 0; - } + + /* Reset query buffer. */ + *(uint64 *)pgss_qbuf = 0; + #ifdef BENCHMARK for (int i = STATS_START; i < STATS_END; ++i) { pg_hook_stats[i].min_time = 0; @@ -1713,7 +1732,6 @@ pg_stat_monitor_internal(FunctionCallInfo fcinfo, uint64 userid = entry->key.userid; uint64 ip = entry->key.ip; uint64 planid = entry->key.planid; - unsigned char *buf = pgss_qbuf[bucketid]; #if PG_VERSION_NUM < 140000 bool toplevel = 1; bool is_allowed_role = is_member_of_role(GetUserId(), DEFAULT_ROLE_READ_ALL_STATS); @@ -1722,7 +1740,7 @@ pg_stat_monitor_internal(FunctionCallInfo fcinfo, bool toplevel = entry->key.toplevel; #endif - if (read_query(buf, queryid, query_txt, entry->query_pos) == 0) + if (read_query(pgss_qbuf, queryid, query_txt, entry->query_pos) == 0) { int rc; rc = read_query_buffer(bucketid, queryid, query_txt, entry->query_pos); @@ -1749,8 +1767,7 @@ pg_stat_monitor_internal(FunctionCallInfo fcinfo, if (tmp.info.parentid != UINT64CONST(0)) { - int rc = 0; - if (read_query(buf, tmp.info.parentid, parent_query_txt, 0) == 0) + if (read_query(pgss_qbuf, tmp.info.parentid, parent_query_txt, 0) == 0) { rc = read_query_buffer(bucketid, tmp.info.parentid, parent_query_txt, 0); if (rc != 1) @@ -1802,7 +1819,7 @@ pg_stat_monitor_internal(FunctionCallInfo fcinfo, if (enc != query_txt) pfree(enc); /* plan at column number 7 */ - if (planid && strlen(tmp.planinfo.plan_text) > 0) + if (planid && tmp.planinfo.plan_text[0]) values[i++] = CStringGetTextDatum(tmp.planinfo.plan_text); else nulls[i++] = true; @@ -3120,17 +3137,17 @@ SaveQueryText(uint64 bucketid, /* * If the query buffer is empty, there is nothing to dump, this also - * means that the current query length exceeds MAX_QUERY_BUFFER_BUCKET. + * means that the current query length exceeds MAX_QUERY_BUF. */ if (buf_len <= sizeof (uint64)) return false; - dump_ok = dump_queries_buffer(bucketid, buf, MAX_QUERY_BUFFER_BUCKET); + dump_ok = dump_queries_buffer(bucketid, buf, MAX_QUERY_BUF); buf_len = sizeof (uint64); /* * We must check for overflow again, as the query length may - * exceed the size allocated to the buffer (MAX_QUERY_BUFFER_BUCKET). + * exceed the total size allocated to the buffer (MAX_QUERY_BUF). */ if (QUERY_BUFFER_OVERFLOW(buf_len, query_len)) { @@ -3293,9 +3310,10 @@ pg_stat_monitor_hook_stats(PG_FUNCTION_ARGS) } void -set_qbuf(int i, unsigned char *buf) +set_qbuf(unsigned char *buf) { - pgss_qbuf[i] = buf; + pgss_qbuf = buf; + *(uint64 *)pgss_qbuf = 0; } #ifdef BENCHMARK @@ -3413,13 +3431,13 @@ read_query_buffer(int bucket_id, uint64 queryid, char *query_txt, size_t pos) if (fd < 0) goto exit; - buf = (unsigned char*) palloc(MAX_QUERY_BUFFER_BUCKET); + buf = (unsigned char*) palloc(MAX_QUERY_BUF); while (!done) { off = 0; - /* read a chunck of MAX_QUERY_BUFFER_BUCKET size. */ + /* read a chunck of MAX_QUERY_BUF size. */ do { - nread = read(fd, buf + off, MAX_QUERY_BUFFER_BUCKET - off); + nread = read(fd, buf + off, MAX_QUERY_BUF - off); if (nread == -1) { if (errno == EINTR && tries++ < 3) /* read() was interrupted, attempt to read again (max attempts=3) */ @@ -3434,9 +3452,9 @@ read_query_buffer(int bucket_id, uint64 queryid, char *query_txt, size_t pos) } off += nread; - } while (off < MAX_QUERY_BUFFER_BUCKET); + } while (off < MAX_QUERY_BUF); - if (off == MAX_QUERY_BUFFER_BUCKET) + if (off == MAX_QUERY_BUF) { /* we have a chunck, scan it looking for queryid. */ if (read_query(buf, queryid, query_txt, pos) != 0) @@ -3449,7 +3467,7 @@ read_query_buffer(int bucket_id, uint64 queryid, char *query_txt, size_t pos) } else /* - * Either done=true or file has a size not multiple of MAX_QUERY_BUFFER_BUCKET. + * Either done=true or file has a size not multiple of MAX_QUERY_BUF. * It is safe to assume that the file was truncated or corrupted. */ break; diff --git a/pg_stat_monitor.h b/pg_stat_monitor.h index 84ed936..045da45 100644 --- a/pg_stat_monitor.h +++ b/pg_stat_monitor.h @@ -87,9 +87,8 @@ #define MAX_QUERY_BUF (PGSM_QUERY_SHARED_BUFFER * 1024 * 1024) #define MAX_BUCKETS_MEM (PGSM_MAX * 1024 * 1024) #define BUCKETS_MEM_OVERFLOW() ((hash_get_num_entries(pgss_hash) * sizeof(pgssEntry)) >= MAX_BUCKETS_MEM) -#define MAX_QUERY_BUFFER_BUCKET MAX_QUERY_BUF / PGSM_MAX_BUCKETS #define MAX_BUCKET_ENTRIES (MAX_BUCKETS_MEM / sizeof(pgssEntry)) -#define QUERY_BUFFER_OVERFLOW(x,y) ((x + y + sizeof(uint64) + sizeof(uint64)) > MAX_QUERY_BUFFER_BUCKET) +#define QUERY_BUFFER_OVERFLOW(x,y) ((x + y + sizeof(uint64) + sizeof(uint64)) > MAX_QUERY_BUF) #define QUERY_MARGIN 100 #define MIN_QUERY_LEN 10 #define SQLCODE_LEN 20 @@ -161,7 +160,7 @@ typedef enum AGG_KEY #define MAX_QUERY_LEN 1024 -/* shared nenory storage for the query */ +/* shared memory storage for the query */ typedef struct CallTime { double total_time; /* total execution time, in msec */ @@ -171,21 +170,19 @@ typedef struct CallTime double sum_var_time; /* sum of variances in execution time in msec */ } CallTime; -typedef struct pgssQueryHashKey -{ - uint64 bucket_id; /* bucket number */ - uint64 queryid; /* query identifier */ - uint64 userid; /* user OID */ - uint64 dbid; /* database OID */ - uint64 ip; /* client ip address */ - uint64 appid; /* hash of application name */ -} pgssQueryHashKey; - +/* + * Entry type for queries hash table (query ID). + * + * We use a hash table to keep track of query IDs that have their + * corresponding query text added to the query buffer (pgsm_query_shared_buffer). + * + * This allow us to avoid adding duplicated queries to the buffer, therefore + * leaving more space for other queries and saving some CPU. + */ typedef struct pgssQueryEntry { - pgssQueryHashKey key; /* hash key of entry - MUST BE FIRST */ - uint64 pos; /* bucket number */ - uint64 state; + uint64 queryid; /* query identifier, also the key. */ + size_t query_pos; /* query location within query buffer */ } pgssQueryEntry; typedef struct PlanInfo @@ -208,10 +205,6 @@ typedef struct pgssHashKey typedef struct QueryInfo { - uint64 queryid; /* query identifier */ - Oid userid; /* user OID */ - Oid dbid; /* database OID */ - uint host; /* client IP */ uint64 parentid; /* parent queryid of current query*/ int64 type; /* type of query, options are query, info, warning, error, fatal */ char application_name[APPLICATIONNAME_LEN]; @@ -311,7 +304,6 @@ typedef struct pgssSharedState pg_atomic_uint64 current_wbucket; pg_atomic_uint64 prev_bucket_usec; uint64 bucket_entry[MAX_BUCKETS]; - int64 query_buf_size_bucket; char bucket_start_time[MAX_BUCKETS][60]; /* start time of the bucket */ } pgssSharedState; @@ -382,21 +374,20 @@ int pgsm_get_bucket_size(void); pgssSharedState* pgsm_get_ss(void); HTAB *pgsm_get_plan_hash(void); HTAB *pgsm_get_hash(void); +HTAB *pgsm_get_query_hash(void); HTAB *pgsm_get_plan_hash(void); void hash_entry_reset(void); void hash_query_entryies_reset(void); void hash_query_entries(); void hash_query_entry_dealloc(int new_bucket_id, int old_bucket_id, unsigned char *query_buffer[]); -void hash_entry_dealloc(int new_bucket_id, int old_bucket_id, unsigned char *query_buffer[]); +void hash_entry_dealloc(int new_bucket_id, int old_bucket_id, unsigned char *query_buffer); pgssEntry* hash_entry_alloc(pgssSharedState *pgss, pgssHashKey *key, int encoding); Size hash_memsize(void); int read_query_buffer(int bucket_id, uint64 queryid, char *query_txt, size_t pos); uint64 read_query(unsigned char *buf, uint64 queryid, char * query, size_t pos); -pgssQueryEntry* hash_find_query_entry(uint64 bucket_id, uint64 queryid, uint64 dbid, uint64 userid, uint64 ip, uint64 appid); -pgssQueryEntry* hash_create_query_entry(uint64 bucket_id, uint64 queryid, uint64 dbid, uint64 userid, uint64 ip, uint64 appid); void pgss_startup(void); -void set_qbuf(int i, unsigned char *); +void set_qbuf(unsigned char *); /* hash_query.c */ void pgss_startup(void);