Merge pull request #111 from darkfronza/PG-230_fix_duplicate_query_entries
PG-230: Fix duplicated query entries.pull/112/head
commit
f269af3da2
92
hash_query.c
92
hash_query.c
|
@ -15,9 +15,11 @@
|
||||||
*-------------------------------------------------------------------------
|
*-------------------------------------------------------------------------
|
||||||
*/
|
*/
|
||||||
#include "postgres.h"
|
#include "postgres.h"
|
||||||
|
#include "nodes/pg_list.h"
|
||||||
|
|
||||||
#include "pg_stat_monitor.h"
|
#include "pg_stat_monitor.h"
|
||||||
|
|
||||||
|
|
||||||
static pgssSharedState *pgss;
|
static pgssSharedState *pgss;
|
||||||
static HTAB *pgss_hash;
|
static HTAB *pgss_hash;
|
||||||
static HTAB *pgss_query_hash;
|
static HTAB *pgss_query_hash;
|
||||||
|
@ -130,7 +132,7 @@ hash_memsize(void)
|
||||||
}
|
}
|
||||||
|
|
||||||
pgssEntry *
|
pgssEntry *
|
||||||
hash_entry_alloc(pgssSharedState *pgss, pgssHashKey *key,int encoding)
|
hash_entry_alloc(pgssSharedState *pgss, pgssHashKey *key, int encoding)
|
||||||
{
|
{
|
||||||
pgssEntry *entry = NULL;
|
pgssEntry *entry = NULL;
|
||||||
bool found = false;
|
bool found = false;
|
||||||
|
@ -222,24 +224,106 @@ hash_query_entry_dealloc(int bucket, unsigned char *buf)
|
||||||
/*
|
/*
|
||||||
* Deallocate least-used entries.
|
* Deallocate least-used entries.
|
||||||
*
|
*
|
||||||
|
* If old_bucket_id != -1, move all pending queries in old_bucket_id
|
||||||
|
* to the new bucket id.
|
||||||
|
*
|
||||||
* Caller must hold an exclusive lock on pgss->lock.
|
* Caller must hold an exclusive lock on pgss->lock.
|
||||||
*/
|
*/
|
||||||
bool
|
bool
|
||||||
hash_entry_dealloc(int bucket)
|
hash_entry_dealloc(int new_bucket_id, int old_bucket_id)
|
||||||
{
|
{
|
||||||
HASH_SEQ_STATUS hash_seq;
|
HASH_SEQ_STATUS hash_seq;
|
||||||
pgssEntry *entry = NULL;
|
pgssEntry *entry = NULL;
|
||||||
|
List *pending_entries = NIL;
|
||||||
|
ListCell *pending_entry;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* During transition to a new bucket id, a rare but possible race
|
||||||
|
* condition may happen while reading pgss->current_wbucket. If a
|
||||||
|
* different thread/process updates pgss->current_wbucket before this
|
||||||
|
* function is called, it may happen that old_bucket_id == new_bucket_id.
|
||||||
|
* If that is the case, we adjust the old bucket id here instead of using
|
||||||
|
* a lock in order to avoid the overhead.
|
||||||
|
*/
|
||||||
|
if (old_bucket_id != -1 && old_bucket_id == new_bucket_id)
|
||||||
|
{
|
||||||
|
if (old_bucket_id == 0)
|
||||||
|
old_bucket_id = PGSM_MAX_BUCKETS - 1;
|
||||||
|
else
|
||||||
|
old_bucket_id--;
|
||||||
|
}
|
||||||
|
|
||||||
hash_seq_init(&hash_seq, pgss_hash);
|
hash_seq_init(&hash_seq, pgss_hash);
|
||||||
while ((entry = hash_seq_search(&hash_seq)) != NULL)
|
while ((entry = hash_seq_search(&hash_seq)) != NULL)
|
||||||
{
|
{
|
||||||
if (bucket < 0 ||
|
if (new_bucket_id < 0 ||
|
||||||
(entry->key.bucket_id == bucket &&
|
(entry->key.bucket_id == new_bucket_id &&
|
||||||
(entry->counters.state == PGSS_FINISHED || entry->counters.state == PGSS_ERROR)))
|
(entry->counters.state == PGSS_FINISHED || entry->counters.state == PGSS_ERROR)))
|
||||||
{
|
{
|
||||||
entry = hash_search(pgss_hash, &entry->key, HASH_REMOVE, NULL);
|
entry = hash_search(pgss_hash, &entry->key, HASH_REMOVE, NULL);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* If we detect a pending query residing in the previous bucket id,
|
||||||
|
* we add it to a list of pending elements to be moved to the new
|
||||||
|
* bucket id.
|
||||||
|
* Can't update the hash table while iterating it inside this loop,
|
||||||
|
* as this may introduce all sort of problems.
|
||||||
|
*/
|
||||||
|
if (old_bucket_id != -1 && entry->key.bucket_id == old_bucket_id)
|
||||||
|
{
|
||||||
|
if (entry->counters.state == PGSS_PARSE ||
|
||||||
|
entry->counters.state == PGSS_PLAN ||
|
||||||
|
entry->counters.state == PGSS_EXEC)
|
||||||
|
{
|
||||||
|
pgssEntry *bkp_entry = malloc(sizeof(pgssEntry));
|
||||||
|
if (!bkp_entry)
|
||||||
|
{
|
||||||
|
/* No memory, remove pending query entry from the previous bucket. */
|
||||||
|
entry = hash_search(pgss_hash, &entry->key, HASH_REMOVE, NULL);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Save key/data from the previous entry. */
|
||||||
|
memcpy(bkp_entry, entry, sizeof(pgssEntry));
|
||||||
|
|
||||||
|
/* Update key to use the new bucket id. */
|
||||||
|
bkp_entry->key.bucket_id = new_bucket_id;
|
||||||
|
|
||||||
|
/* Add the entry to a list of noded to be processed later. */
|
||||||
|
pending_entries = lappend(pending_entries, bkp_entry);
|
||||||
|
|
||||||
|
/* Finally remove the pending query from the expired bucket id. */
|
||||||
|
entry = hash_search(pgss_hash, &entry->key, HASH_REMOVE, NULL);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Iterate over the list of pending queries in order
|
||||||
|
* to add them back to the hash table with the updated bucket id.
|
||||||
|
*/
|
||||||
|
foreach (pending_entry, pending_entries) {
|
||||||
|
bool found = false;
|
||||||
|
pgssEntry *new_entry;
|
||||||
|
pgssEntry *old_entry = (pgssEntry *) lfirst(pending_entry);
|
||||||
|
|
||||||
|
new_entry = (pgssEntry *) hash_search(pgss_hash, &old_entry->key, HASH_ENTER_NULL, &found);
|
||||||
|
if (new_entry == NULL)
|
||||||
|
elog(DEBUG1, "%s", "pg_stat_monitor: out of memory");
|
||||||
|
else if (!found)
|
||||||
|
{
|
||||||
|
/* Restore counters and other data. */
|
||||||
|
new_entry->counters = old_entry->counters;
|
||||||
|
SpinLockInit(&new_entry->mutex);
|
||||||
|
new_entry->encoding = old_entry->encoding;
|
||||||
|
}
|
||||||
|
|
||||||
|
free(old_entry);
|
||||||
|
}
|
||||||
|
|
||||||
|
list_free(pending_entries);
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1591,7 +1591,7 @@ pg_stat_monitor_reset(PG_FUNCTION_ARGS)
|
||||||
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
|
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
|
||||||
errmsg("pg_stat_monitor: must be loaded via shared_preload_libraries")));
|
errmsg("pg_stat_monitor: must be loaded via shared_preload_libraries")));
|
||||||
LWLockAcquire(pgss->lock, LW_EXCLUSIVE);
|
LWLockAcquire(pgss->lock, LW_EXCLUSIVE);
|
||||||
hash_entry_dealloc(-1);
|
hash_entry_dealloc(-1, -1);
|
||||||
hash_query_entryies_reset();
|
hash_query_entryies_reset();
|
||||||
#ifdef BENCHMARK
|
#ifdef BENCHMARK
|
||||||
for (int i = STATS_START; i < STATS_END; ++i) {
|
for (int i = STATS_START; i < STATS_END; ++i) {
|
||||||
|
@ -2007,7 +2007,7 @@ get_next_wbucket(pgssSharedState *pgss)
|
||||||
bucket_id = (tv.tv_sec / PGSM_BUCKET_TIME) % PGSM_MAX_BUCKETS;
|
bucket_id = (tv.tv_sec / PGSM_BUCKET_TIME) % PGSM_MAX_BUCKETS;
|
||||||
LWLockAcquire(pgss->lock, LW_EXCLUSIVE);
|
LWLockAcquire(pgss->lock, LW_EXCLUSIVE);
|
||||||
buf = pgss_qbuf[bucket_id];
|
buf = pgss_qbuf[bucket_id];
|
||||||
hash_entry_dealloc(bucket_id);
|
hash_entry_dealloc(bucket_id, pgss->current_wbucket);
|
||||||
hash_query_entry_dealloc(bucket_id, buf);
|
hash_query_entry_dealloc(bucket_id, buf);
|
||||||
|
|
||||||
snprintf(file_name, 1024, "%s.%d", PGSM_TEXT_FILE, (int)bucket_id);
|
snprintf(file_name, 1024, "%s.%d", PGSM_TEXT_FILE, (int)bucket_id);
|
||||||
|
|
|
@ -380,7 +380,7 @@ void hash_entry_reset(void);
|
||||||
void hash_query_entryies_reset(void);
|
void hash_query_entryies_reset(void);
|
||||||
void hash_query_entries();
|
void hash_query_entries();
|
||||||
void hash_query_entry_dealloc(int bucket, unsigned char *buf);
|
void hash_query_entry_dealloc(int bucket, unsigned char *buf);
|
||||||
bool hash_entry_dealloc(int bucket);
|
bool hash_entry_dealloc(int new_bucket_id, int old_bucket_id);
|
||||||
pgssEntry* hash_entry_alloc(pgssSharedState *pgss, pgssHashKey *key, int encoding);
|
pgssEntry* hash_entry_alloc(pgssSharedState *pgss, pgssHashKey *key, int encoding);
|
||||||
Size hash_memsize(void);
|
Size hash_memsize(void);
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue