diff --git a/hash_query.c b/hash_query.c index 4d6c812..6c01a52 100644 --- a/hash_query.c +++ b/hash_query.c @@ -15,9 +15,11 @@ *------------------------------------------------------------------------- */ #include "postgres.h" +#include "nodes/pg_list.h" #include "pg_stat_monitor.h" + static pgssSharedState *pgss; static HTAB *pgss_hash; static HTAB *pgss_query_hash; @@ -130,7 +132,7 @@ hash_memsize(void) } pgssEntry * -hash_entry_alloc(pgssSharedState *pgss, pgssHashKey *key,int encoding) +hash_entry_alloc(pgssSharedState *pgss, pgssHashKey *key, int encoding) { pgssEntry *entry = NULL; bool found = false; @@ -222,24 +224,106 @@ hash_query_entry_dealloc(int bucket, unsigned char *buf) /* * Deallocate least-used entries. * + * If old_bucket_id != -1, move all pending queries in old_bucket_id + * to the new bucket id. + * * Caller must hold an exclusive lock on pgss->lock. */ bool -hash_entry_dealloc(int bucket) +hash_entry_dealloc(int new_bucket_id, int old_bucket_id) { HASH_SEQ_STATUS hash_seq; pgssEntry *entry = NULL; + List *pending_entries = NIL; + ListCell *pending_entry; + + /* + * During transition to a new bucket id, a rare but possible race + * condition may happen while reading pgss->current_wbucket. If a + * different thread/process updates pgss->current_wbucket before this + * function is called, it may happen that old_bucket_id == new_bucket_id. + * If that is the case, we adjust the old bucket id here instead of using + * a lock in order to avoid the overhead. + */ + if (old_bucket_id != -1 && old_bucket_id == new_bucket_id) + { + if (old_bucket_id == 0) + old_bucket_id = PGSM_MAX_BUCKETS - 1; + else + old_bucket_id--; + } hash_seq_init(&hash_seq, pgss_hash); while ((entry = hash_seq_search(&hash_seq)) != NULL) { - if (bucket < 0 || - (entry->key.bucket_id == bucket && + if (new_bucket_id < 0 || + (entry->key.bucket_id == new_bucket_id && (entry->counters.state == PGSS_FINISHED || entry->counters.state == PGSS_ERROR))) { entry = hash_search(pgss_hash, &entry->key, HASH_REMOVE, NULL); } + + /* + * If we detect a pending query residing in the previous bucket id, + * we add it to a list of pending elements to be moved to the new + * bucket id. + * Can't update the hash table while iterating it inside this loop, + * as this may introduce all sort of problems. + */ + if (old_bucket_id != -1 && entry->key.bucket_id == old_bucket_id) + { + if (entry->counters.state == PGSS_PARSE || + entry->counters.state == PGSS_PLAN || + entry->counters.state == PGSS_EXEC) + { + pgssEntry *bkp_entry = malloc(sizeof(pgssEntry)); + if (!bkp_entry) + { + /* No memory, remove pending query entry from the previous bucket. */ + entry = hash_search(pgss_hash, &entry->key, HASH_REMOVE, NULL); + continue; + } + + /* Save key/data from the previous entry. */ + memcpy(bkp_entry, entry, sizeof(pgssEntry)); + + /* Update key to use the new bucket id. */ + bkp_entry->key.bucket_id = new_bucket_id; + + /* Add the entry to a list of noded to be processed later. */ + pending_entries = lappend(pending_entries, bkp_entry); + + /* Finally remove the pending query from the expired bucket id. */ + entry = hash_search(pgss_hash, &entry->key, HASH_REMOVE, NULL); + } + } } + + /* + * Iterate over the list of pending queries in order + * to add them back to the hash table with the updated bucket id. + */ + foreach (pending_entry, pending_entries) { + bool found = false; + pgssEntry *new_entry; + pgssEntry *old_entry = (pgssEntry *) lfirst(pending_entry); + + new_entry = (pgssEntry *) hash_search(pgss_hash, &old_entry->key, HASH_ENTER_NULL, &found); + if (new_entry == NULL) + elog(DEBUG1, "%s", "pg_stat_monitor: out of memory"); + else if (!found) + { + /* Restore counters and other data. */ + new_entry->counters = old_entry->counters; + SpinLockInit(&new_entry->mutex); + new_entry->encoding = old_entry->encoding; + } + + free(old_entry); + } + + list_free(pending_entries); + return true; } diff --git a/pg_stat_monitor.c b/pg_stat_monitor.c index 418ba78..b319b79 100644 --- a/pg_stat_monitor.c +++ b/pg_stat_monitor.c @@ -1591,7 +1591,7 @@ pg_stat_monitor_reset(PG_FUNCTION_ARGS) (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("pg_stat_monitor: must be loaded via shared_preload_libraries"))); LWLockAcquire(pgss->lock, LW_EXCLUSIVE); - hash_entry_dealloc(-1); + hash_entry_dealloc(-1, -1); hash_query_entryies_reset(); #ifdef BENCHMARK for (int i = STATS_START; i < STATS_END; ++i) { @@ -2007,7 +2007,7 @@ get_next_wbucket(pgssSharedState *pgss) bucket_id = (tv.tv_sec / PGSM_BUCKET_TIME) % PGSM_MAX_BUCKETS; LWLockAcquire(pgss->lock, LW_EXCLUSIVE); buf = pgss_qbuf[bucket_id]; - hash_entry_dealloc(bucket_id); + hash_entry_dealloc(bucket_id, pgss->current_wbucket); hash_query_entry_dealloc(bucket_id, buf); snprintf(file_name, 1024, "%s.%d", PGSM_TEXT_FILE, (int)bucket_id); diff --git a/pg_stat_monitor.h b/pg_stat_monitor.h index 15ed28d..e9c4f62 100644 --- a/pg_stat_monitor.h +++ b/pg_stat_monitor.h @@ -380,7 +380,7 @@ void hash_entry_reset(void); void hash_query_entryies_reset(void); void hash_query_entries(); void hash_query_entry_dealloc(int bucket, unsigned char *buf); -bool hash_entry_dealloc(int bucket); +bool hash_entry_dealloc(int new_bucket_id, int old_bucket_id); pgssEntry* hash_entry_alloc(pgssSharedState *pgss, pgssHashKey *key, int encoding); Size hash_memsize(void);