PG-488: pg_stat_monitor: Overflow management. (#342)

* PG-488: pg_stat_monitor: Overflow management.

Reimplement the storage mechanism of buckets (for PG-15 onward) and query texts
using Dynamic shared memory. Since the dynamic shared memory can grow into a
swap area, so we get the overflow out of the box.

As PostgreSQL versions prior to V15 does not support sequence scan on dynamic
shared memory hashes, so older versions has to live with the classic shared
memory hash for storing the buckets.

Another noteworthy change with the new design is: it saves the query pointer
inside the bucket, and eventually, the query text gets evicted with the bucket
recycle.

Finally, the dynamic shared memory hash has a built-in locking mechanism, so we
can revisit the whole locking in pg_stat_monitor has the potential for lots of
performance improvements.

* Fixing tap test reported issues and also disabling dynamic hash for all versions

* Updating the expected out file for top_query test case

Co-authored-by: Hamid Akhtar <hamid.akhtar@percona.com>
This commit is contained in:
Muhammad Usama
2023-01-10 17:54:17 +05:00
committed by GitHub
parent ff75b23257
commit 2c5e12af0a
3 changed files with 419 additions and 508 deletions

View File

@@ -16,59 +16,110 @@
*/
#include "postgres.h"
#include "nodes/pg_list.h"
#include "pg_stat_monitor.h"
static pgsmLocalState pgsmStateLocal;
static PGSM_HASH_TABLE_HANDLE pgsm_create_bucket_hash(pgssSharedState *pgss, dsa_area *dsa);
static Size pgsm_get_shared_area_size(void);
static pgssSharedState *pgss;
static HTAB *pgss_hash;
static HTAB *pgss_query_hash;
#if USE_DYNAMIC_HASH
/* parameter for the shared hash */
static dshash_parameters dsh_params = {
sizeof(pgssHashKey),
sizeof(pgssEntry),
dshash_memcmp,
dshash_memhash
};
#endif
static HTAB *
hash_init(const char *hash_name, int key_size, int entry_size, int hash_size)
static Size
pgsm_query_area_size(void)
{
HASHCTL info;
Size sz = MAX_QUERY_BUF;
#if USE_DYNAMIC_HASH
/* Dynamic hash also lives DSA area */
sz = add_size(sz, MAX_BUCKETS_MEM);
#endif
return MAXALIGN(sz);
}
memset(&info, 0, sizeof(info));
info.keysize = key_size;
info.entrysize = entry_size;
return ShmemInitHash(hash_name, hash_size, hash_size, &info, HASH_ELEM | HASH_BLOBS);
Size
pgsm_ShmemSize(void)
{
Size sz = MAXALIGN(sizeof(pgssSharedState));
sz = add_size(sz, MAX_QUERY_BUF);
#if USE_DYNAMIC_HASH
sz = add_size(sz, MAX_BUCKETS_MEM);
#else
sz = add_size(sz, hash_estimate_size(MAX_BUCKET_ENTRIES, sizeof(pgssEntry)));
#endif
return MAXALIGN(sz);
}
static Size
pgsm_get_shared_area_size(void)
{
Size sz;
#if USE_DYNAMIC_HASH
sz = pgsm_ShmemSize();
#else
sz = MAXALIGN(sizeof(pgssSharedState));
sz = add_size(sz, pgsm_query_area_size());
#endif
return sz;
}
void
pgss_startup(void)
{
bool found = false;
pgssSharedState *pgss;
/* reset in case this is a restart within the postmaster */
pgss = NULL;
pgss_hash = NULL;
pgsmStateLocal.dsa = NULL;
pgsmStateLocal.shared_hash = NULL;
pgsmStateLocal.shared_pgssState = NULL;
/*
* Create or attach to the shared memory state, including hash table
*/
LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE);
pgss = ShmemInitStruct("pg_stat_monitor", sizeof(pgssSharedState), &found);
pgss = ShmemInitStruct("pg_stat_monitor", pgsm_get_shared_area_size(), &found);
if (!found)
{
/* First time through ... */
dsa_area *dsa;
char *p = (char *) pgss;
pgss->lock = &(GetNamedLWLockTranche("pg_stat_monitor"))->lock;
SpinLockInit(&pgss->mutex);
ResetSharedState(pgss);
/* the allocation of pgssSharedState itself */
p += MAXALIGN(sizeof(pgssSharedState));
pgss->raw_dsa_area = p;
dsa = dsa_create_in_place(pgss->raw_dsa_area,
pgsm_query_area_size(),
LWLockNewTrancheId(), 0);
dsa_pin(dsa);
dsa_set_size_limit(dsa, pgsm_query_area_size());
pgss->hash_handle = pgsm_create_bucket_hash(pgss,dsa);
if (PGSM_OVERFLOW_TARGET == OVERFLOW_TARGET_DISK)
dsa_set_size_limit(dsa, -1);
pgsmStateLocal.shared_pgssState = pgss;
/*
* Postmaster will never access these again, thus free the local
* dsa/dshash references.
*/
dsa_detach(dsa);
}
#ifdef BENCHMARK
init_hook_stats();
#endif
set_qbuf((unsigned char *) ShmemAlloc(MAX_QUERY_BUF));
pgss_hash = hash_init("pg_stat_monitor: bucket hashtable", sizeof(pgssHashKey), sizeof(pgssEntry), MAX_BUCKET_ENTRIES);
pgss_query_hash = hash_init("pg_stat_monitor: queryID hashtable", sizeof(uint64), sizeof(pgssQueryEntry), MAX_BUCKET_ENTRIES);
LWLockRelease(AddinShmemInitLock);
/*
@@ -78,23 +129,73 @@ pgss_startup(void)
on_shmem_exit(pgss_shmem_shutdown, (Datum) 0);
}
static PGSM_HASH_TABLE_HANDLE
pgsm_create_bucket_hash(pgssSharedState *pgss, dsa_area *dsa)
{
PGSM_HASH_TABLE_HANDLE bucket_hash;
#if USE_DYNAMIC_HASH
dshash_table *dsh;
pgss->hash_tranche_id = LWLockNewTrancheId();
dsh_params.tranche_id = pgss->hash_tranche_id;
dsh = dshash_create(dsa, &dsh_params, 0);
bucket_hash = dshash_get_hash_table_handle(dsh);
dshash_detach(dsh);
#else
HASHCTL info;
memset(&info, 0, sizeof(info));
info.keysize = sizeof(pgssHashKey);
info.entrysize = sizeof(pgssEntry);
bucket_hash = ShmemInitHash("pg_stat_monitor: bucket hashtable", MAX_BUCKET_ENTRIES, MAX_BUCKET_ENTRIES, &info, HASH_ELEM | HASH_BLOBS);
#endif
return bucket_hash;
}
void
pgsm_attach_shmem(void)
{
MemoryContext oldcontext;
if (pgsmStateLocal.dsa)
return;
oldcontext = MemoryContextSwitchTo(TopMemoryContext);
pgsmStateLocal.dsa = dsa_attach_in_place(pgsmStateLocal.shared_pgssState->raw_dsa_area,
NULL);
dsa_pin_mapping(pgsmStateLocal.dsa);
#if USE_DYNAMIC_HASH
dsh_params.tranche_id = pgsmStateLocal.shared_pgssState->hash_tranche_id;
pgsmStateLocal.shared_hash = dshash_attach(pgsmStateLocal.dsa, &dsh_params,
pgsmStateLocal.shared_pgssState->hash_handle, 0);
#else
pgsmStateLocal.shared_hash = pgsmStateLocal.shared_pgssState->hash_handle;
#endif
MemoryContextSwitchTo(oldcontext);
}
dsa_area*
get_dsa_area_for_query_text(void)
{
pgsm_attach_shmem();
return pgsmStateLocal.dsa;
}
PGSM_HASH_TABLE*
get_pgssHash(void)
{
pgsm_attach_shmem();
return pgsmStateLocal.shared_hash;
}
pgssSharedState *
pgsm_get_ss(void)
{
return pgss;
pgsm_attach_shmem();
return pgsmStateLocal.shared_pgssState;
}
HTAB *
pgsm_get_hash(void)
{
return pgss_hash;
}
HTAB *
pgsm_get_query_hash(void)
{
return pgss_query_hash;
}
/*
* shmem_shutdown hook: Dump statistics into file.
@@ -106,41 +207,23 @@ void
pgss_shmem_shutdown(int code, Datum arg)
{
/* Don't try to dump during a crash. */
elog(LOG,"pgss_shmem_shutdown");
if (code)
return;
pgss = NULL;
pgsmStateLocal.shared_pgssState = NULL;
/* Safety check ... shouldn't get here unless shmem is set up. */
if (!IsHashInitialize())
return;
}
Size
hash_memsize(void)
{
Size size;
size = MAXALIGN(sizeof(pgssSharedState));
size += MAXALIGN(MAX_QUERY_BUF);
size = add_size(size, hash_estimate_size(MAX_BUCKET_ENTRIES, sizeof(pgssEntry)));
size = add_size(size, hash_estimate_size(MAX_BUCKET_ENTRIES, sizeof(pgssQueryEntry)));
return size;
}
pgssEntry *
hash_entry_alloc(pgssSharedState *pgss, pgssHashKey *key, int encoding)
{
pgssEntry *entry = NULL;
bool found = false;
if (hash_get_num_entries(pgss_hash) >= MAX_BUCKET_ENTRIES)
{
elog(DEBUG1, "pg_stat_monitor: out of memory");
return NULL;
}
/* Find or create an entry with desired hash code */
entry = (pgssEntry *) hash_search(pgss_hash, key, HASH_ENTER_NULL, &found);
entry = (pgssEntry*) pgsm_hash_find_or_insert(pgsmStateLocal.shared_hash, key, &found);
if (entry == NULL)
elog(DEBUG1, "hash_entry_alloc: OUT OF MEMORY");
else if (!found)
@@ -149,12 +232,19 @@ hash_entry_alloc(pgssSharedState *pgss, pgssHashKey *key, int encoding)
/* New entry, initialize it */
/* reset the statistics */
memset(&entry->counters, 0, sizeof(Counters));
entry->query_pos = InvalidDsaPointer;
entry->counters.info.parent_query = InvalidDsaPointer;
/* set the appropriate initial usage count */
/* re-initialize the mutex each time ... we assume no one using it */
SpinLockInit(&entry->mutex);
/* ... and don't forget the query text metadata */
entry->encoding = encoding;
}
#if USE_DYNAMIC_HASH
if(entry)
dshash_release_lock(pgsmStateLocal.shared_hash, entry);
#endif
return entry;
}
@@ -174,17 +264,22 @@ hash_entry_alloc(pgssSharedState *pgss, pgssHashKey *key, int encoding)
void
hash_entry_dealloc(int new_bucket_id, int old_bucket_id, unsigned char *query_buffer)
{
HASH_SEQ_STATUS hash_seq;
PGSM_HASH_SEQ_STATUS hstat;
pgssEntry *entry = NULL;
/* Store pending query ids from the previous bucket. */
List *pending_entries = NIL;
ListCell *pending_entry;
if (!pgsmStateLocal.shared_hash)
return;
/* Iterate over the hash table. */
hash_seq_init(&hash_seq, pgss_hash);
while ((entry = hash_seq_search(&hash_seq)) != NULL)
pgsm_hash_seq_init(&hstat, pgsmStateLocal.shared_hash, true);
while ((entry = pgsm_hash_seq_next(&hstat)) != NULL)
{
dsa_pointer pdsa;
/*
* Remove all entries if new_bucket_id == -1. Otherwise remove entry
* in new_bucket_id if it has finished already.
@@ -193,16 +288,17 @@ hash_entry_dealloc(int new_bucket_id, int old_bucket_id, unsigned char *query_bu
(entry->key.bucket_id == new_bucket_id &&
(entry->counters.state == PGSS_FINISHED || entry->counters.state == PGSS_ERROR)))
{
if (new_bucket_id == -1)
{
/*
* pg_stat_monitor_reset(), remove entry from query hash table
* too.
*/
hash_search(pgss_query_hash, &(entry->key.queryid), HASH_REMOVE, NULL);
}
dsa_pointer parent_qdsa = entry->counters.info.parent_query;
pdsa = entry->query_pos;
entry = hash_search(pgss_hash, &entry->key, HASH_REMOVE, NULL);
pgsm_hash_delete_current(&hstat, pgsmStateLocal.shared_hash, &entry->key);
if (DsaPointerIsValid(pdsa))
dsa_free(pgsmStateLocal.dsa, pdsa);
if (DsaPointerIsValid(parent_qdsa))
dsa_free(pgsmStateLocal.dsa, parent_qdsa);
continue;
}
/*
@@ -238,7 +334,12 @@ hash_entry_dealloc(int new_bucket_id, int old_bucket_id, unsigned char *query_bu
if (entry->counters.calls.calls > 1)
entry->counters.state = PGSS_FINISHED;
else
entry = hash_search(pgss_hash, &entry->key, HASH_REMOVE, NULL);
{
pdsa = entry->query_pos;
pgsm_hash_delete_current(&hstat, pgsmStateLocal.shared_hash, &entry->key);
if (DsaPointerIsValid(pdsa))
dsa_free(pgsmStateLocal.dsa, pdsa);
}
continue;
}
@@ -266,11 +367,17 @@ hash_entry_dealloc(int new_bucket_id, int old_bucket_id, unsigned char *query_bu
if (entry->counters.calls.calls > 1)
entry->counters.state = PGSS_FINISHED;
else
entry = hash_search(pgss_hash, &entry->key, HASH_REMOVE, NULL);
{
pdsa = entry->query_pos;
pgsm_hash_delete_current(&hstat, pgsmStateLocal.shared_hash, &entry->key);
/* We should not delete the Query in DSA here
* as the same will get reused when the entry gets inserted into new bucket
*/
}
}
}
}
pgsm_hash_seq_term(&hstat);
/*
* Iterate over the list of pending queries in order to add them back to
* the hash table with the updated bucket id.
@@ -281,7 +388,12 @@ hash_entry_dealloc(int new_bucket_id, int old_bucket_id, unsigned char *query_bu
pgssEntry *new_entry;
pgssEntry *old_entry = (pgssEntry *) lfirst(pending_entry);
new_entry = (pgssEntry *) hash_search(pgss_hash, &old_entry->key, HASH_ENTER_NULL, &found);
PGSM_DISABLE_ERROR_CAPUTRE();
{
new_entry = (pgssEntry*) pgsm_hash_find_or_insert(pgsmStateLocal.shared_hash, &old_entry->key, &found);
}PGSM_END_DISABLE_ERROR_CAPTURE();
if (new_entry == NULL)
elog(DEBUG1, "%s", "pg_stat_monitor: out of memory");
else if (!found)
@@ -292,10 +404,12 @@ hash_entry_dealloc(int new_bucket_id, int old_bucket_id, unsigned char *query_bu
new_entry->encoding = old_entry->encoding;
new_entry->query_pos = old_entry->query_pos;
}
#if USE_DYNAMIC_HASH
if(new_entry)
dshash_release_lock(pgsmStateLocal.shared_hash, new_entry);
#endif
free(old_entry);
}
list_free(pending_entries);
}
@@ -306,16 +420,23 @@ void
hash_entry_reset()
{
pgssSharedState *pgss = pgsm_get_ss();
HASH_SEQ_STATUS hash_seq;
PGSM_HASH_SEQ_STATUS hstat;
pgssEntry *entry;
LWLockAcquire(pgss->lock, LW_EXCLUSIVE);
hash_seq_init(&hash_seq, pgss_hash);
while ((entry = hash_seq_search(&hash_seq)) != NULL)
pgsm_hash_seq_init(&hstat, pgsmStateLocal.shared_hash, true);
while ((entry = pgsm_hash_seq_next(&hstat)) != NULL)
{
hash_search(pgss_hash, &entry->key, HASH_REMOVE, NULL);
dsa_pointer pdsa = entry->query_pos;
pgsm_hash_delete_current(&hstat, pgsmStateLocal.shared_hash, &entry->key);
if (DsaPointerIsValid(pdsa))
dsa_free(pgsmStateLocal.dsa, pdsa);
}
pgsm_hash_seq_term(&hstat);
pg_atomic_write_u64(&pgss->current_wbucket, 0);
LWLockRelease(pgss->lock);
}
@@ -323,6 +444,67 @@ hash_entry_reset()
bool
IsHashInitialize(void)
{
return (pgss != NULL &&
pgss_hash != NULL);
return (pgsmStateLocal.shared_pgssState != NULL);
}
/* hash function port based on USE_DYNAMIC_HASH */
void *
pgsm_hash_find_or_insert(PGSM_HASH_TABLE *shared_hash, pgssHashKey *key, bool* found)
{
#if USE_DYNAMIC_HASH
void *entry;
entry = dshash_find_or_insert(shared_hash, key, found);
return entry;
#else
return hash_search(shared_hash, key, HASH_ENTER_NULL, found);
#endif
}
void *
pgsm_hash_find(PGSM_HASH_TABLE *shared_hash, pgssHashKey *key, bool* found)
{
#if USE_DYNAMIC_HASH
return dshash_find(shared_hash, key, false);
#else
return hash_search(shared_hash, key, HASH_FIND, found);
#endif
}
void
pgsm_hash_seq_init(PGSM_HASH_SEQ_STATUS *hstat, PGSM_HASH_TABLE *shared_hash, bool lock)
{
#if USE_DYNAMIC_HASH
dshash_seq_init(hstat, shared_hash, lock);
#else
hash_seq_init(hstat, shared_hash);
#endif
}
void*
pgsm_hash_seq_next(PGSM_HASH_SEQ_STATUS *hstat)
{
#if USE_DYNAMIC_HASH
return dshash_seq_next(hstat);
#else
return hash_seq_search(hstat);
#endif
}
void
pgsm_hash_seq_term(PGSM_HASH_SEQ_STATUS *hstat)
{
#if USE_DYNAMIC_HASH
dshash_seq_term(hstat);
#endif
}
void
pgsm_hash_delete_current(PGSM_HASH_SEQ_STATUS *hstat, PGSM_HASH_TABLE *shared_hash, void *key)
{
#if USE_DYNAMIC_HASH
dshash_delete_current(hstat);
#else
hash_search(shared_hash, key, HASH_REMOVE, NULL);
#endif
}