diff --git a/hash_query.c b/hash_query.c index a4ab4a5..795d7ad 100644 --- a/hash_query.c +++ b/hash_query.c @@ -160,24 +160,65 @@ hash_entry_alloc(pgssSharedState *pgss, pgssHashKey *key,int encoding) elog(DEBUG1, "%s", "pg_stat_monitor: out of memory"); return entry; } - /* - * Deallocate least-used entries. + * Reset all the entries. * * Caller must hold an exclusive lock on pgss->lock. */ void -hash_query_entry_dealloc(int bucket) +hash_query_entryies_reset() { HASH_SEQ_STATUS hash_seq; pgssQueryEntry *entry; + hash_seq_init(&hash_seq, pgss_query_hash); + while ((entry = hash_seq_search(&hash_seq)) != NULL) + entry = hash_search(pgss_query_hash, &entry->key, HASH_REMOVE, NULL); +} + + +/* + * Deallocate finished entries. + * + * Caller must hold an exclusive lock on pgss->lock. + */ +void +hash_query_entry_dealloc(int bucket, unsigned char *buf) +{ + HASH_SEQ_STATUS hash_seq; + pgssQueryEntry *entry; + unsigned char *old_buf; + pgssSharedState *pgss = pgsm_get_ss(); + + old_buf = palloc0(pgss->query_buf_size_bucket); + memcpy(old_buf, buf, pgss->query_buf_size_bucket); + + memset(buf, 0, pgss->query_buf_size_bucket); + hash_seq_init(&hash_seq, pgss_query_hash); while ((entry = hash_seq_search(&hash_seq)) != NULL) { - if (entry->key.bucket_id == bucket || bucket < 0) - entry = hash_search(pgss_query_hash, &entry->key, HASH_REMOVE, NULL); + if (entry->key.bucket_id == bucket) + { + if (entry->state == PGSS_FINISHED || entry->state == PGSS_ERROR) + { + entry = hash_search(pgss_query_hash, &entry->key, HASH_REMOVE, NULL); + } + else + { + int len; + char query_txt[1024]; + if (read_query(old_buf, entry->key.bucket_id, entry->key.queryid, query_txt) == 0) + { + len = read_query_buffer(entry->key.bucket_id, entry->key.queryid, query_txt); + if (len != MAX_QUERY_BUFFER_BUCKET) + snprintf(query_txt, 32, "%s", ""); + } + SaveQueryText(entry->key.bucket_id, entry->key.queryid, buf, query_txt, strlen(query_txt)); + } + } } + pfree(old_buf); } /* @@ -185,20 +226,23 @@ hash_query_entry_dealloc(int bucket) * * Caller must hold an exclusive lock on pgss->lock. */ -void +bool hash_entry_dealloc(int bucket) { HASH_SEQ_STATUS hash_seq; - pgssEntry *entry; + pgssEntry *entry = NULL; hash_seq_init(&hash_seq, pgss_hash); while ((entry = hash_seq_search(&hash_seq)) != NULL) { - if (entry->key.bucket_id == bucket || bucket < 0) + if (bucket < 0 || + (entry->key.bucket_id == bucket && + (entry->counters.state == PGSS_FINISHED || entry->counters.state == PGSS_ERROR))) { entry = hash_search(pgss_hash, &entry->key, HASH_REMOVE, NULL); } } + return true; } /* diff --git a/pg_stat_monitor.c b/pg_stat_monitor.c index 381d0ce..9d9ca6b 100644 --- a/pg_stat_monitor.c +++ b/pg_stat_monitor.c @@ -188,8 +188,6 @@ pgss_store_query(uint64 queryid, int query_len, pgssJumbleState *jstate, pgssStoreKind kind); -static uint64 read_query(unsigned char *buf, uint64 bucketid, uint64 queryid, char * query); -int read_query_buffer(int bucket_id, uint64 queryid, char *query_txt); static uint64 get_query_id(pgssJumbleState *jstate, Query *query); @@ -1292,7 +1290,7 @@ pg_stat_monitor_reset(PG_FUNCTION_ARGS) errmsg("pg_stat_monitor: must be loaded via shared_preload_libraries"))); LWLockAcquire(pgss->lock, LW_EXCLUSIVE); hash_entry_dealloc(-1); - hash_query_entry_dealloc(-1); + hash_query_entryies_reset(); LWLockRelease(pgss->lock); PG_RETURN_VOID(); } @@ -1398,8 +1396,6 @@ pg_stat_monitor_internal(FunctionCallInfo fcinfo, char *query_txt = (char*) malloc(PGSM_QUERY_MAX_LEN); bool is_allowed_role = is_member_of_role(GetUserId(), DEFAULT_ROLE_READ_ALL_STATS); - if (!IsBucketValid(bucketid)) - continue; query_entry = hash_find_query_entry(bucketid, queryid, dbid, userid, ip); if (query_entry == NULL) @@ -1420,6 +1416,11 @@ pg_stat_monitor_internal(FunctionCallInfo fcinfo, tmp = e->counters; SpinLockRelease(&e->mutex); } + if (!IsBucketValid(bucketid)) + { + if (tmp.state == PGSS_FINISHED) + continue; + } /* bucketid at column number 0 */ values[i++] = Int64GetDatumFast(bucketid); @@ -1689,12 +1690,11 @@ get_next_wbucket(pgssSharedState *pgss) LWLockAcquire(pgss->lock, LW_EXCLUSIVE); buf = pgss_qbuf[bucket_id]; hash_entry_dealloc(bucket_id); - hash_query_entry_dealloc(bucket_id); + hash_query_entry_dealloc(bucket_id, buf); + snprintf(file_name, 1024, "%s.%d", PGSM_TEXT_FILE, (int)bucket_id); unlink(file_name); - /* reset the query buffer */ - memset(buf, 0, sizeof (uint64)); LWLockRelease(pgss->lock); pgss->prev_bucket_usec = current_usec; lt = localtime(&tv.tv_sec); @@ -2642,7 +2642,7 @@ intarray_get_datum(int32 arr[], int len) } -static uint64 +uint64 read_query(unsigned char *buf, uint64 bucketid, uint64 queryid, char * query) { bool found = false; @@ -2673,7 +2673,6 @@ read_query(unsigned char *buf, uint64 bucketid, uint64 queryid, char * query) rlen += sizeof (uint64); if (buf_len < rlen + query_len) goto exit; - if (found) { if (query != NULL) @@ -2704,7 +2703,6 @@ pgss_store_query_info(uint64 bucketid, uint64 query_len, pgssStoreKind kind) { - uint64 buf_len = 0; pgssSharedState *pgss = pgsm_get_ss(); unsigned char *buf = pgss_qbuf[pgss->current_wbucket]; pgssQueryEntry *entry; @@ -2722,6 +2720,18 @@ pgss_store_query_info(uint64 bucketid, entry = hash_create_query_entry(bucketid, queryid, dbid, userid, ip); if (!entry) return NULL; + entry->state = kind; + + if(!SaveQueryText(bucketid, queryid, buf, query, query_len)) + return NULL; + + return entry; +} + +bool +SaveQueryText(uint64 bucketid, uint64 queryid, unsigned char *buf, const char *query, uint64 query_len) +{ + uint64 buf_len = 0; memcpy(&buf_len, buf, sizeof (uint64)); if (buf_len == 0) @@ -2732,7 +2742,7 @@ pgss_store_query_info(uint64 bucketid, switch(PGSM_OVERFLOW_TARGET) { case OVERFLOW_TARGET_NONE: - return NULL; + return false; case OVERFLOW_TARGET_DISK: { dump_queries_buffer(bucketid, buf, MAX_QUERY_BUFFER_BUCKET); @@ -2753,7 +2763,7 @@ pgss_store_query_info(uint64 bucketid, memcpy(&buf[buf_len], query, query_len); /* query */ buf_len += query_len; memcpy(buf, &buf_len, sizeof (uint64)); - return entry; + return true; } static uint64 diff --git a/pg_stat_monitor.h b/pg_stat_monitor.h index ef2ad9d..448232e 100644 --- a/pg_stat_monitor.h +++ b/pg_stat_monitor.h @@ -183,6 +183,7 @@ typedef struct pgssQueryEntry { pgssQueryHashKey key; /* hash key of entry - MUST BE FIRST */ uint64 pos; /* bucket number */ + uint64 state; } pgssQueryEntry; typedef struct PlanInfo @@ -356,6 +357,8 @@ typedef struct pgssJumbleState /* Links to shared memory state */ +bool SaveQueryText(uint64 bucketid, uint64 queryid, unsigned char *buf, const char *query, uint64 query_len); + /* guc.c */ void init_guc(void); GucVariable *get_conf(int i); @@ -372,11 +375,15 @@ HTAB *pgsm_get_hash(void); HTAB *pgsm_get_plan_hash(void); HTAB* pgsm_get_query_hash(void); void hash_entry_reset(void); -void hash_query_entry_dealloc(int bucket); -void hash_entry_dealloc(int bucket); +void hash_query_entryies_reset(void); +void hash_query_entries(); +void hash_query_entry_dealloc(int bucket, unsigned char *buf); +bool hash_entry_dealloc(int bucket); pgssEntry* hash_entry_alloc(pgssSharedState *pgss, pgssHashKey *key, int encoding); Size hash_memsize(void); +int read_query_buffer(int bucket_id, uint64 queryid, char *query_txt); +uint64 read_query(unsigned char *buf, uint64 bucketid, uint64 queryid, char * query); pgssQueryEntry* hash_find_query_entry(uint64 bucket_id, uint64 queryid, uint64 dbid, uint64 userid, uint64 ip); pgssQueryEntry* hash_create_query_entry(uint64 bucket_id, uint64 queryid, uint64 dbid, uint64 userid, uint64 ip); void pgss_startup(void);