PG-588: Some queries are not being normalised.

This bug uncovered serious issues with how the data was being stored by PSGM.
So it require a complete redesign.

pg_stat_monitor now stores the data locally within the backend process's local
memory. The data is only stored when the query completes. This reduces the
number of lock acquisitions that were previously needed during various stages
of the execution. Also, this avoids data loss in case the current bucket
changes during execution. Also, the unavailability of jumble state during later
stages of executions was causing pg_stat_monitor to save non-normalized query.
This was a major problem as well.

pg_stat_monitor specific memory context is implemented. It is used for saving
data locally. The context memory callback helps us clear the locally saved data
so that we do not store it multiple times in the shared hash.

As part of this major rewrite, pgss reference in function and variable names
is changed to pgsm. Memory footprint for the entries is reduced, data types
are corrected where needed, and we've removed unused variables, functions and
macros.

This patch was mutually created by:
Co-authored-by: Hamid Akhtar <hamid.akhtar@percona.com>
Co-authored-by: Muhammad Usama <muhammad.usama@percona.com>
This commit is contained in:
Hamid Akhtar
2023-02-22 19:14:42 +05:00
parent 837bacdf3a
commit de66ef0fce
27 changed files with 1287 additions and 1049 deletions

View File

@@ -19,14 +19,15 @@
#include "pg_stat_monitor.h"
static pgsmLocalState pgsmStateLocal;
static PGSM_HASH_TABLE_HANDLE pgsm_create_bucket_hash(pgssSharedState *pgss, dsa_area *dsa);
static PGSM_HASH_TABLE_HANDLE pgsm_create_bucket_hash(pgsmSharedState *pgsm, dsa_area *dsa);
static Size pgsm_get_shared_area_size(void);
static void InitializeSharedState(pgsmSharedState *pgsm);
#if USE_DYNAMIC_HASH
/* parameter for the shared hash */
static dshash_parameters dsh_params = {
sizeof(pgssHashKey),
sizeof(pgssEntry),
sizeof(pgsmHashKey),
sizeof(pgsmEntry),
dshash_memcmp,
dshash_memhash
};
@@ -54,12 +55,12 @@ pgsm_query_area_size(void)
Size
pgsm_ShmemSize(void)
{
Size sz = MAXALIGN(sizeof(pgssSharedState));
Size sz = MAXALIGN(sizeof(pgsmSharedState));
sz = add_size(sz, MAX_QUERY_BUF);
#if USE_DYNAMIC_HASH
sz = add_size(sz, MAX_BUCKETS_MEM);
#else
sz = add_size(sz, hash_estimate_size(MAX_BUCKET_ENTRIES, sizeof(pgssEntry)));
sz = add_size(sz, hash_estimate_size(MAX_BUCKET_ENTRIES, sizeof(pgsmEntry)));
#endif
return MAXALIGN(sz);
}
@@ -77,47 +78,48 @@ pgsm_get_shared_area_size(void)
#if USE_DYNAMIC_HASH
sz = pgsm_ShmemSize();
#else
sz = MAXALIGN(sizeof(pgssSharedState));
sz = MAXALIGN(sizeof(pgsmSharedState));
sz = add_size(sz, pgsm_query_area_size());
#endif
return sz;
}
void
pgss_startup(void)
pgsm_startup(void)
{
bool found = false;
pgssSharedState *pgss;
pgsmSharedState *pgsm;
/* reset in case this is a restart within the postmaster */
pgsmStateLocal.dsa = NULL;
pgsmStateLocal.shared_hash = NULL;
pgsmStateLocal.shared_pgssState = NULL;
pgsmStateLocal.shared_pgsmState = NULL;
/*
* Create or attach to the shared memory state, including hash table
*/
LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE);
pgss = ShmemInitStruct("pg_stat_monitor", pgsm_get_shared_area_size(), &found);
pgsm = ShmemInitStruct("pg_stat_monitor", pgsm_get_shared_area_size(), &found);
if (!found)
{
/* First time through ... */
dsa_area *dsa;
char *p = (char *) pgss;
char *p = (char *) pgsm;
pgss->lock = &(GetNamedLWLockTranche("pg_stat_monitor"))->lock;
SpinLockInit(&pgss->mutex);
ResetSharedState(pgss);
/* the allocation of pgssSharedState itself */
p += MAXALIGN(sizeof(pgssSharedState));
pgss->raw_dsa_area = p;
dsa = dsa_create_in_place(pgss->raw_dsa_area,
pgsm->pgsm_oom = false;
pgsm->lock = &(GetNamedLWLockTranche("pg_stat_monitor"))->lock;
SpinLockInit(&pgsm->mutex);
InitializeSharedState(pgsm);
/* the allocation of pgsmSharedState itself */
p += MAXALIGN(sizeof(pgsmSharedState));
pgsm->raw_dsa_area = p;
dsa = dsa_create_in_place(pgsm->raw_dsa_area,
pgsm_query_area_size(),
LWLockNewTrancheId(), 0);
dsa_pin(dsa);
dsa_set_size_limit(dsa, pgsm_query_area_size());
pgss->hash_handle = pgsm_create_bucket_hash(pgss,dsa);
pgsm->hash_handle = pgsm_create_bucket_hash(pgsm,dsa);
/* If overflow is enabled, set the DSA size to unlimited,
* and allow the DSA to grow beyond the shared memory space
@@ -125,7 +127,7 @@ pgss_startup(void)
if (PGSM_OVERFLOW_TARGET == OVERFLOW_TARGET_DISK)
dsa_set_size_limit(dsa, -1);
pgsmStateLocal.shared_pgssState = pgss;
pgsmStateLocal.shared_pgsmState = pgsm;
/*
* Postmaster will never access the dsa again, thus free it's local
* references.
@@ -143,29 +145,41 @@ pgss_startup(void)
* If we're in the postmaster (or a standalone backend...), set up a shmem
* exit hook to dump the statistics to disk.
*/
on_shmem_exit(pgss_shmem_shutdown, (Datum) 0);
on_shmem_exit(pgsm_shmem_shutdown, (Datum) 0);
}
static void
InitializeSharedState(pgsmSharedState *pgsm)
{
pg_atomic_init_u64(&pgsm->current_wbucket, 0);
pg_atomic_init_u64(&pgsm->prev_bucket_sec, 0);
memset(&pgsm->bucket_entry, 0, MAX_BUCKETS * sizeof(uint64));
pgsm->pgsm_mem_cxt = AllocSetContextCreate(TopMemoryContext,
"pg_stat_monitor local store",
ALLOCSET_DEFAULT_SIZES);
}
/*
* Create the classic or dshahs hash table for storing the query statistics.
*/
static PGSM_HASH_TABLE_HANDLE
pgsm_create_bucket_hash(pgssSharedState *pgss, dsa_area *dsa)
pgsm_create_bucket_hash(pgsmSharedState *pgsm, dsa_area *dsa)
{
PGSM_HASH_TABLE_HANDLE bucket_hash;
#if USE_DYNAMIC_HASH
dshash_table *dsh;
pgss->hash_tranche_id = LWLockNewTrancheId();
dsh_params.tranche_id = pgss->hash_tranche_id;
pgsm->hash_tranche_id = LWLockNewTrancheId();
dsh_params.tranche_id = pgsm->hash_tranche_id;
dsh = dshash_create(dsa, &dsh_params, 0);
bucket_hash = dshash_get_hash_table_handle(dsh);
dshash_detach(dsh);
#else
HASHCTL info;
memset(&info, 0, sizeof(info));
info.keysize = sizeof(pgssHashKey);
info.entrysize = sizeof(pgssEntry);
info.keysize = sizeof(pgsmHashKey);
info.entrysize = sizeof(pgsmEntry);
bucket_hash = ShmemInitHash("pg_stat_monitor: bucket hashtable", MAX_BUCKET_ENTRIES, MAX_BUCKET_ENTRIES, &info, HASH_ELEM | HASH_BLOBS);
#endif
return bucket_hash;
@@ -193,7 +207,7 @@ pgsm_attach_shmem(void)
*/
oldcontext = MemoryContextSwitchTo(TopMemoryContext);
pgsmStateLocal.dsa = dsa_attach_in_place(pgsmStateLocal.shared_pgssState->raw_dsa_area,
pgsmStateLocal.dsa = dsa_attach_in_place(pgsmStateLocal.shared_pgsmState->raw_dsa_area,
NULL);
/* pin the attached area to keep the area attached until end of
* session or explicit detach.
@@ -201,11 +215,11 @@ pgsm_attach_shmem(void)
dsa_pin_mapping(pgsmStateLocal.dsa);
#if USE_DYNAMIC_HASH
dsh_params.tranche_id = pgsmStateLocal.shared_pgssState->hash_tranche_id;
dsh_params.tranche_id = pgsmStateLocal.shared_pgsmState->hash_tranche_id;
pgsmStateLocal.shared_hash = dshash_attach(pgsmStateLocal.dsa, &dsh_params,
pgsmStateLocal.shared_pgssState->hash_handle, 0);
pgsmStateLocal.shared_pgsmState->hash_handle, 0);
#else
pgsmStateLocal.shared_hash = pgsmStateLocal.shared_pgssState->hash_handle;
pgsmStateLocal.shared_hash = pgsmStateLocal.shared_pgsmState->hash_handle;
#endif
MemoryContextSwitchTo(oldcontext);
@@ -219,17 +233,17 @@ get_dsa_area_for_query_text(void)
}
PGSM_HASH_TABLE*
get_pgssHash(void)
get_pgsmHash(void)
{
pgsm_attach_shmem();
return pgsmStateLocal.shared_hash;
}
pgssSharedState *
pgsmSharedState *
pgsm_get_ss(void)
{
pgsm_attach_shmem();
return pgsmStateLocal.shared_pgssState;
return pgsmStateLocal.shared_pgsmState;
}
@@ -240,35 +254,36 @@ pgsm_get_ss(void)
* other processes running when this is called.
*/
void
pgss_shmem_shutdown(int code, Datum arg)
pgsm_shmem_shutdown(int code, Datum arg)
{
/* Don't try to dump during a crash. */
elog(LOG,"pgss_shmem_shutdown");
elog(LOG,"[pg_stat_monitor] pgsm_shmem_shutdown: Shutdown initiated.");
if (code)
return;
pgsmStateLocal.shared_pgssState = NULL;
pgsmStateLocal.shared_pgsmState = NULL;
/* Safety check ... shouldn't get here unless shmem is set up. */
if (!IsHashInitialize())
return;
}
pgssEntry *
hash_entry_alloc(pgssSharedState *pgss, pgssHashKey *key, int encoding)
pgsmEntry *
hash_entry_alloc(pgsmSharedState *pgsm, pgsmHashKey *key, int encoding)
{
pgssEntry *entry = NULL;
pgsmEntry *entry = NULL;
bool found = false;
/* Find or create an entry with desired hash code */
entry = (pgssEntry*) pgsm_hash_find_or_insert(pgsmStateLocal.shared_hash, key, &found);
entry = (pgsmEntry*) pgsm_hash_find_or_insert(pgsmStateLocal.shared_hash, key, &found);
if (entry == NULL)
elog(DEBUG1, "hash_entry_alloc: OUT OF MEMORY");
elog(DEBUG1, "[pg_stat_monitor] hash_entry_alloc: OUT OF MEMORY.");
else if (!found)
{
pgss->bucket_entry[pg_atomic_read_u64(&pgss->current_wbucket)]++;
pgsm->bucket_entry[pg_atomic_read_u64(&pgsm->current_wbucket)]++;
/* New entry, initialize it */
/* reset the statistics */
memset(&entry->counters, 0, sizeof(Counters));
entry->query_pos = InvalidDsaPointer;
entry->query_text.query_pos = InvalidDsaPointer;
entry->counters.info.parent_query = InvalidDsaPointer;
/* set the appropriate initial usage count */
@@ -288,23 +303,21 @@ hash_entry_alloc(pgssSharedState *pgss, pgssHashKey *key, int encoding)
/*
* Prepare resources for using the new bucket:
* - Deallocate finished hash table entries in new_bucket_id (entries whose
* state is PGSS_FINISHED or PGSS_FINISHED).
* state is PGSM_EXEC or PGSM_ERROR).
* - Clear query buffer for new_bucket_id.
* - If old_bucket_id != -1, move all pending hash table entries in
* old_bucket_id to the new bucket id, also move pending queries from the
* previous query buffer (query_buffer[old_bucket_id]) to the new one
* (query_buffer[new_bucket_id]).
*
* Caller must hold an exclusive lock on pgss->lock.
* Caller must hold an exclusive lock on pgsm->lock.
*/
void
hash_entry_dealloc(int new_bucket_id, int old_bucket_id, unsigned char *query_buffer)
{
PGSM_HASH_SEQ_STATUS hstat;
pgssEntry *entry = NULL;
pgsmEntry *entry = NULL;
/* Store pending query ids from the previous bucket. */
List *pending_entries = NIL;
ListCell *pending_entry;
if (!pgsmStateLocal.shared_hash)
return;
@@ -321,11 +334,10 @@ hash_entry_dealloc(int new_bucket_id, int old_bucket_id, unsigned char *query_bu
* in new_bucket_id if it has finished already.
*/
if (new_bucket_id < 0 ||
(entry->key.bucket_id == new_bucket_id &&
(entry->counters.state == PGSS_FINISHED || entry->counters.state == PGSS_ERROR)))
(entry->key.bucket_id == new_bucket_id ))
{
dsa_pointer parent_qdsa = entry->counters.info.parent_query;
pdsa = entry->query_pos;
pdsa = entry->query_text.query_pos;
pgsm_hash_delete_current(&hstat, pgsmStateLocal.shared_hash, &entry->key);
@@ -334,153 +346,23 @@ hash_entry_dealloc(int new_bucket_id, int old_bucket_id, unsigned char *query_bu
if (DsaPointerIsValid(parent_qdsa))
dsa_free(pgsmStateLocal.dsa, parent_qdsa);
continue;
}
/*
* If we detect a pending query residing in the previous bucket id, we
* add it to a list of pending elements to be moved to the new bucket
* id. Can't update the hash table while iterating it inside this
* loop, as this may introduce all sort of problems.
*/
if (old_bucket_id != -1 && entry->key.bucket_id == old_bucket_id)
{
if (entry->counters.state == PGSS_PARSE ||
entry->counters.state == PGSS_PLAN ||
entry->counters.state == PGSS_EXEC)
{
pgssEntry *bkp_entry = malloc(sizeof(pgssEntry));
if (!bkp_entry)
{
elog(DEBUG1, "hash_entry_dealloc: out of memory");
/*
* No memory, If the entry has calls > 1 then we change
* the state to finished, as the pending query will likely
* finish execution during the new bucket time window. The
* pending query will vanish in this case, can't list it
* until it completes.
*
* If there is only one call to the query and it's
* pending, remove the entry from the previous bucket and
* allow it to finish in the new bucket, in order to avoid
* the query living in the old bucket forever.
*/
if (entry->counters.calls.calls > 1)
entry->counters.state = PGSS_FINISHED;
else
{
pdsa = entry->query_pos;
pgsm_hash_delete_current(&hstat, pgsmStateLocal.shared_hash, &entry->key);
if (DsaPointerIsValid(pdsa))
dsa_free(pgsmStateLocal.dsa, pdsa);
}
continue;
}
/* Save key/data from the previous entry. */
memcpy(bkp_entry, entry, sizeof(pgssEntry));
/* Update key to use the new bucket id. */
bkp_entry->key.bucket_id = new_bucket_id;
/* Add the entry to a list of nodes to be processed later. */
pending_entries = lappend(pending_entries, bkp_entry);
/*
* If the entry has calls > 1 then we change the state to
* finished in the previous bucket, as the pending query will
* likely finish execution during the new bucket time window.
* Can't remove it from the previous bucket as it may have
* many calls and we would lose the query statistics.
*
* If there is only one call to the query and it's pending,
* remove the entry from the previous bucket and allow it to
* finish in the new bucket, in order to avoid the query
* living in the old bucket forever.
*/
if (entry->counters.calls.calls > 1)
entry->counters.state = PGSS_FINISHED;
else
{
pdsa = entry->query_pos;
pgsm_hash_delete_current(&hstat, pgsmStateLocal.shared_hash, &entry->key);
/* We should not delete the Query in DSA here
* as the same will get reused when the entry gets inserted into new bucket
*/
}
}
pgsmStateLocal.shared_pgsmState->pgsm_oom = false;
}
}
pgsm_hash_seq_term(&hstat);
/*
* Iterate over the list of pending queries in order to add them back to
* the hash table with the updated bucket id.
*/
foreach(pending_entry, pending_entries)
{
bool found = false;
pgssEntry *new_entry;
pgssEntry *old_entry = (pgssEntry *) lfirst(pending_entry);
PGSM_DISABLE_ERROR_CAPUTRE();
{
new_entry = (pgssEntry*) pgsm_hash_find_or_insert(pgsmStateLocal.shared_hash, &old_entry->key, &found);
}PGSM_END_DISABLE_ERROR_CAPTURE();
if (new_entry == NULL)
elog(DEBUG1, "%s", "pg_stat_monitor: out of memory");
else if (!found)
{
/* Restore counters and other data. */
new_entry->counters = old_entry->counters;
SpinLockInit(&new_entry->mutex);
new_entry->encoding = old_entry->encoding;
new_entry->query_pos = old_entry->query_pos;
}
#if USE_DYNAMIC_HASH
if(new_entry)
dshash_release_lock(pgsmStateLocal.shared_hash, new_entry);
#endif
free(old_entry);
}
list_free(pending_entries);
}
/*
* Release all entries.
*/
void
hash_entry_reset()
{
pgssSharedState *pgss = pgsm_get_ss();
PGSM_HASH_SEQ_STATUS hstat;
pgssEntry *entry;
LWLockAcquire(pgss->lock, LW_EXCLUSIVE);
pgsm_hash_seq_init(&hstat, pgsmStateLocal.shared_hash, true);
while ((entry = pgsm_hash_seq_next(&hstat)) != NULL)
{
dsa_pointer pdsa = entry->query_pos;
pgsm_hash_delete_current(&hstat, pgsmStateLocal.shared_hash, &entry->key);
if (DsaPointerIsValid(pdsa))
dsa_free(pgsmStateLocal.dsa, pdsa);
}
pgsm_hash_seq_term(&hstat);
pg_atomic_write_u64(&pgss->current_wbucket, 0);
LWLockRelease(pgss->lock);
}
bool
IsHashInitialize(void)
{
return (pgsmStateLocal.shared_pgssState != NULL);
return (pgsmStateLocal.shared_pgsmState != NULL);
}
bool
IsSystemOOM(void)
{
return (IsHashInitialize() && pgsmStateLocal.shared_pgsmState->pgsm_oom);
}
/*
@@ -489,7 +371,7 @@ IsHashInitialize(void)
*/
void *
pgsm_hash_find_or_insert(PGSM_HASH_TABLE *shared_hash, pgssHashKey *key, bool* found)
pgsm_hash_find_or_insert(PGSM_HASH_TABLE *shared_hash, pgsmHashKey *key, bool* found)
{
#if USE_DYNAMIC_HASH
void *entry;
@@ -501,7 +383,7 @@ pgsm_hash_find_or_insert(PGSM_HASH_TABLE *shared_hash, pgssHashKey *key, bool* f
}
void *
pgsm_hash_find(PGSM_HASH_TABLE *shared_hash, pgssHashKey *key, bool* found)
pgsm_hash_find(PGSM_HASH_TABLE *shared_hash, pgsmHashKey *key, bool* found)
{
#if USE_DYNAMIC_HASH
return dshash_find(shared_hash, key, false);
@@ -546,4 +428,4 @@ pgsm_hash_delete_current(PGSM_HASH_SEQ_STATUS *hstat, PGSM_HASH_TABLE *shared_ha
#else
hash_search(shared_hash, key, HASH_REMOVE, NULL);
#endif
}
}