PG-230: Fix duplicated query entries.
A problem during bucket management was allowing some queries to be duplicated, old entries would sit around without having their statistics updated. The problem would arise during the following chain of events: 1. A query goes through pgss_post_parse_analyze, in this stage (PGSS_PARSE) we only save the query into the query buffer and create an entry in the query hash table. 2. The query then goes through pgss_ExecutorStart (PGSS_EXEC), in this stage we create an entry for query statistic counters with default values, all time stats equal zero, etc. 3. The query then goes through pgss_ExecutorEnd (PGSS_FINISH), in this stage we update the query statistis, no. calls, total time taken, min_time, etc. The problem is that between steps 2 and 3, the current bucket ID timer may have been expired. For example, during steps 1 and 2 the query may have been stored in bucket ID 1, but when the query is finished (pgss_ExecutorEnd) the current bucket ID may have been updated to 2. This is leaving an entry for the query in bucket ID 1 with state ACTIVE, with time statistics not updated yet. This is also creating an entry for the query in the bucket ID 2, with all statistics (time and others) being updated for this entry. To solve this problem, during transition to a new bucket id, we scan all pending queries in the previous bucket id and move them to the new bucket id. This way finished queries will always be associated with the bucket id that was active at the time they've finished.pull/111/head
parent
0403ca951c
commit
273f23b161
92
hash_query.c
92
hash_query.c
|
@ -15,9 +15,11 @@
|
|||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
#include "postgres.h"
|
||||
#include "nodes/pg_list.h"
|
||||
|
||||
#include "pg_stat_monitor.h"
|
||||
|
||||
|
||||
static pgssSharedState *pgss;
|
||||
static HTAB *pgss_hash;
|
||||
static HTAB *pgss_query_hash;
|
||||
|
@ -130,7 +132,7 @@ hash_memsize(void)
|
|||
}
|
||||
|
||||
pgssEntry *
|
||||
hash_entry_alloc(pgssSharedState *pgss, pgssHashKey *key,int encoding)
|
||||
hash_entry_alloc(pgssSharedState *pgss, pgssHashKey *key, int encoding)
|
||||
{
|
||||
pgssEntry *entry = NULL;
|
||||
bool found = false;
|
||||
|
@ -222,24 +224,106 @@ hash_query_entry_dealloc(int bucket, unsigned char *buf)
|
|||
/*
|
||||
* Deallocate least-used entries.
|
||||
*
|
||||
* If old_bucket_id != -1, move all pending queries in old_bucket_id
|
||||
* to the new bucket id.
|
||||
*
|
||||
* Caller must hold an exclusive lock on pgss->lock.
|
||||
*/
|
||||
bool
|
||||
hash_entry_dealloc(int bucket)
|
||||
hash_entry_dealloc(int new_bucket_id, int old_bucket_id)
|
||||
{
|
||||
HASH_SEQ_STATUS hash_seq;
|
||||
pgssEntry *entry = NULL;
|
||||
List *pending_entries = NIL;
|
||||
ListCell *pending_entry;
|
||||
|
||||
/*
|
||||
* During transition to a new bucket id, a rare but possible race
|
||||
* condition may happen while reading pgss->current_wbucket. If a
|
||||
* different thread/process updates pgss->current_wbucket before this
|
||||
* function is called, it may happen that old_bucket_id == new_bucket_id.
|
||||
* If that is the case, we adjust the old bucket id here instead of using
|
||||
* a lock in order to avoid the overhead.
|
||||
*/
|
||||
if (old_bucket_id != -1 && old_bucket_id == new_bucket_id)
|
||||
{
|
||||
if (old_bucket_id == 0)
|
||||
old_bucket_id = PGSM_MAX_BUCKETS - 1;
|
||||
else
|
||||
old_bucket_id--;
|
||||
}
|
||||
|
||||
hash_seq_init(&hash_seq, pgss_hash);
|
||||
while ((entry = hash_seq_search(&hash_seq)) != NULL)
|
||||
{
|
||||
if (bucket < 0 ||
|
||||
(entry->key.bucket_id == bucket &&
|
||||
if (new_bucket_id < 0 ||
|
||||
(entry->key.bucket_id == new_bucket_id &&
|
||||
(entry->counters.state == PGSS_FINISHED || entry->counters.state == PGSS_ERROR)))
|
||||
{
|
||||
entry = hash_search(pgss_hash, &entry->key, HASH_REMOVE, NULL);
|
||||
}
|
||||
|
||||
/*
|
||||
* If we detect a pending query residing in the previous bucket id,
|
||||
* we add it to a list of pending elements to be moved to the new
|
||||
* bucket id.
|
||||
* Can't update the hash table while iterating it inside this loop,
|
||||
* as this may introduce all sort of problems.
|
||||
*/
|
||||
if (old_bucket_id != -1 && entry->key.bucket_id == old_bucket_id)
|
||||
{
|
||||
if (entry->counters.state == PGSS_PARSE ||
|
||||
entry->counters.state == PGSS_PLAN ||
|
||||
entry->counters.state == PGSS_EXEC)
|
||||
{
|
||||
pgssEntry *bkp_entry = malloc(sizeof(pgssEntry));
|
||||
if (!bkp_entry)
|
||||
{
|
||||
/* No memory, remove pending query entry from the previous bucket. */
|
||||
entry = hash_search(pgss_hash, &entry->key, HASH_REMOVE, NULL);
|
||||
continue;
|
||||
}
|
||||
|
||||
/* Save key/data from the previous entry. */
|
||||
memcpy(bkp_entry, entry, sizeof(pgssEntry));
|
||||
|
||||
/* Update key to use the new bucket id. */
|
||||
bkp_entry->key.bucket_id = new_bucket_id;
|
||||
|
||||
/* Add the entry to a list of noded to be processed later. */
|
||||
pending_entries = lappend(pending_entries, bkp_entry);
|
||||
|
||||
/* Finally remove the pending query from the expired bucket id. */
|
||||
entry = hash_search(pgss_hash, &entry->key, HASH_REMOVE, NULL);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Iterate over the list of pending queries in order
|
||||
* to add them back to the hash table with the updated bucket id.
|
||||
*/
|
||||
foreach (pending_entry, pending_entries) {
|
||||
bool found = false;
|
||||
pgssEntry *new_entry;
|
||||
pgssEntry *old_entry = (pgssEntry *) lfirst(pending_entry);
|
||||
|
||||
new_entry = (pgssEntry *) hash_search(pgss_hash, &old_entry->key, HASH_ENTER_NULL, &found);
|
||||
if (new_entry == NULL)
|
||||
elog(DEBUG1, "%s", "pg_stat_monitor: out of memory");
|
||||
else if (!found)
|
||||
{
|
||||
/* Restore counters and other data. */
|
||||
new_entry->counters = old_entry->counters;
|
||||
SpinLockInit(&new_entry->mutex);
|
||||
new_entry->encoding = old_entry->encoding;
|
||||
}
|
||||
|
||||
free(old_entry);
|
||||
}
|
||||
|
||||
list_free(pending_entries);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
|
|
|
@ -1591,7 +1591,7 @@ pg_stat_monitor_reset(PG_FUNCTION_ARGS)
|
|||
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
|
||||
errmsg("pg_stat_monitor: must be loaded via shared_preload_libraries")));
|
||||
LWLockAcquire(pgss->lock, LW_EXCLUSIVE);
|
||||
hash_entry_dealloc(-1);
|
||||
hash_entry_dealloc(-1, -1);
|
||||
hash_query_entryies_reset();
|
||||
#ifdef BENCHMARK
|
||||
for (int i = STATS_START; i < STATS_END; ++i) {
|
||||
|
@ -2007,7 +2007,7 @@ get_next_wbucket(pgssSharedState *pgss)
|
|||
bucket_id = (tv.tv_sec / PGSM_BUCKET_TIME) % PGSM_MAX_BUCKETS;
|
||||
LWLockAcquire(pgss->lock, LW_EXCLUSIVE);
|
||||
buf = pgss_qbuf[bucket_id];
|
||||
hash_entry_dealloc(bucket_id);
|
||||
hash_entry_dealloc(bucket_id, pgss->current_wbucket);
|
||||
hash_query_entry_dealloc(bucket_id, buf);
|
||||
|
||||
snprintf(file_name, 1024, "%s.%d", PGSM_TEXT_FILE, (int)bucket_id);
|
||||
|
|
|
@ -380,7 +380,7 @@ void hash_entry_reset(void);
|
|||
void hash_query_entryies_reset(void);
|
||||
void hash_query_entries();
|
||||
void hash_query_entry_dealloc(int bucket, unsigned char *buf);
|
||||
bool hash_entry_dealloc(int bucket);
|
||||
bool hash_entry_dealloc(int new_bucket_id, int old_bucket_id);
|
||||
pgssEntry* hash_entry_alloc(pgssSharedState *pgss, pgssHashKey *key, int encoding);
|
||||
Size hash_memsize(void);
|
||||
|
||||
|
|
Loading…
Reference in New Issue