Use separate memory context to store intermediate stats

PG-588
Muhammad Usama 2023-02-22 15:50:16 +05:00
parent 1d4b0036e7
commit fe558bdcae
3 changed files with 64 additions and 61 deletions

View File

@ -21,6 +21,7 @@
static pgsmLocalState pgsmStateLocal;
static PGSM_HASH_TABLE_HANDLE pgsm_create_bucket_hash(pgsmSharedState *pgsm, dsa_area *dsa);
static Size pgsm_get_shared_area_size(void);
static void InitializeSharedState(pgsmSharedState *pgsm);
#if USE_DYNAMIC_HASH
/* parameter for the shared hash */
@ -108,7 +109,7 @@ pgsm_startup(void)
pgsm->pgsm_oom = false;
pgsm->lock = &(GetNamedLWLockTranche("pg_stat_monitor"))->lock;
SpinLockInit(&pgsm->mutex);
ResetSharedState(pgsm);
InitializeSharedState(pgsm);
/* the allocation of pgsmSharedState itself */
p += MAXALIGN(sizeof(pgsmSharedState));
pgsm->raw_dsa_area = p;
@ -147,6 +148,18 @@ pgsm_startup(void)
on_shmem_exit(pgsm_shmem_shutdown, (Datum) 0);
}
static void
InitializeSharedState(pgsmSharedState *pgsm)
{
pg_atomic_init_u64(&pgsm->current_wbucket, 0);
pg_atomic_init_u64(&pgsm->prev_bucket_sec, 0);
memset(&pgsm->bucket_entry, 0, MAX_BUCKETS * sizeof(uint64));
pgsm->pgsm_mem_cxt = AllocSetContextCreate(TopMemoryContext,
"pg_stat_monitor local store",
ALLOCSET_DEFAULT_SIZES);
}
/*
* Create the classic or dshahs hash table for storing the query statistics.
*/

View File

@ -19,8 +19,6 @@
#include "access/parallel.h"
#include "nodes/pg_list.h"
#include "utils/guc.h"
#include "utils/memutils.h"
#include "utils/palloc.h"
#include <regex.h>
#include "pgstat.h"
#include "commands/dbcommands.h"
@ -68,10 +66,6 @@ do \
/*---- Initicalization Function Declarations ----*/
void _PG_init(void);
/*---- Local variables ----*/
MemoryContextCallback callback;
volatile bool callback_setup = false;
/* Current nesting depth of ExecutorRun+ProcessUtility calls */
static int exec_nested_level = 0;
volatile bool __pgsm_do_not_capture_error = false;
@ -190,12 +184,20 @@ char *unpack_sql_state(int sql_state);
!IsA(n, DeallocateStmt))
static pgsmEntry *pgsm_create_hash_entry(MemoryContext context, uint64 bucket_id, uint64 queryid, PlanInfo *plan_info);
static void pgsm_add_to_list(pgsmEntry *entry, char *query_text, int query_len, bool should_dup);
static pgsmEntry *pgsm_create_hash_entry(uint64 bucket_id, uint64 queryid, PlanInfo *plan_info);
static void pgsm_add_to_list(pgsmEntry *entry, char *query_text, int query_len);
static pgsmEntry* pgsm_get_entry_for_query(uint64 queryid, PlanInfo *plan_info, const char* query_text, int query_len, bool create);
static void pgsm_cleanup_callback(void *arg);
static void pgsm_store_error(const char *query, ErrorData *edata);
/*---- Local variables ----*/
MemoryContextCallback mem_cxt_reset_callback =
{
.func = pgsm_cleanup_callback,
.arg = NULL
};
volatile bool callback_setup = false;
static void pgsm_update_entry(pgsmEntry *entry,
const char *query,
PlanInfo * plan_info,
@ -430,18 +432,14 @@ pgsm_post_parse_analyze_internal(ParseState *pstate, Query *query, JumbleState *
if (callback_setup == false)
{
/* If MessageContext is valid setup a callback to cleanup
* our local stats list when the MessagContext gets reset
*/
if (MemoryContextIsValid(MessageContext))
{
callback.func = pgsm_cleanup_callback;
callback.arg = (void *) strdup("MessageContext");
MemoryContextRegisterResetCallback(MessageContext, &callback);
MemoryContextRegisterResetCallback(MessageContext, &mem_cxt_reset_callback);
callback_setup = true;
}
else
{
MemoryContextStats(TopMemoryContext);
}
}
if (!pgsm_enabled(exec_nested_level))
@ -507,7 +505,7 @@ pgsm_post_parse_analyze_internal(ParseState *pstate, Query *query, JumbleState *
* The correct bucket value will be needed then to search the hash table, or create
* the appropriate entry.
*/
entry = pgsm_create_hash_entry(MessageContext, 0, query->queryId, NULL);
entry = pgsm_create_hash_entry(0, query->queryId, NULL);
/* Update other member that are not counters, so that we don't have to worry about these. */
entry->pgsm_query_id = pgsm_hash_string(norm_query ? norm_query : query_text, norm_query_len);
@ -524,10 +522,10 @@ pgsm_post_parse_analyze_internal(ParseState *pstate, Query *query, JumbleState *
* it is put in the relevant memory context.
*/
if (PGSM_NORMALIZED_QUERY && norm_query)
pgsm_add_to_list(entry, norm_query, norm_query_len, true);
pgsm_add_to_list(entry, norm_query, norm_query_len);
else
{
pgsm_add_to_list(entry, (char *)query_text, query_len, true);
pgsm_add_to_list(entry, (char *)query_text, query_len);
}
/* Check that we've not exceeded max_stack_depth */
@ -722,7 +720,7 @@ pgsm_ExecutorEnd(QueryDesc *queryDesc)
entry = pgsm_get_entry_for_query(queryId, plan_ptr, (char *)queryDesc->sourceText, strlen(queryDesc->sourceText), true);
if(!entry)
{
elog(NOTICE,"[pg_stat_monitor] pgsm_ExecutorEnd: Failed to find entry for [%lu] %s.",queryId, queryDesc->sourceText);
elog(DEBUG2,"[pg_stat_monitor] pgsm_ExecutorEnd: Failed to find entry for [%lu] %s.",queryId, queryDesc->sourceText);
return;
}
@ -1122,7 +1120,7 @@ pgsm_ProcessUtility(PlannedStmt *pstmt, const char *queryString,
BufferUsageAccumDiff(&bufusage, &pgBufferUsage, &bufusage_start);
/* Create an entry for this query */
entry = pgsm_create_hash_entry(MessageContext, 0, queryId, NULL);
entry = pgsm_create_hash_entry(0, queryId, NULL);
location = pstmt->stmt_location;
query_len = pstmt->stmt_len;
@ -1131,7 +1129,7 @@ pgsm_ProcessUtility(PlannedStmt *pstmt, const char *queryString,
entry->pgsm_query_id = pgsm_hash_string(query_text, query_len);
entry->counters.info.cmd_type = 0;
pgsm_add_to_list(entry, query_text, query_len, true);
pgsm_add_to_list(entry, query_text, query_len);
/* Check that we've not exceeded max_stack_depth */
Assert(list_length(lentries) <= max_stack_depth);
@ -1564,7 +1562,7 @@ pgsm_store_error(const char *query, ErrorData *edata)
queryid = pgsm_hash_string(query, len);
entry = pgsm_create_hash_entry(ErrorContext, 0, queryid, NULL);
entry = pgsm_create_hash_entry(0, queryid, NULL);
entry->query_text.query_pointer = pnstrdup(query, len);
entry->counters.error.elevel = edata->elevel;
@ -1575,20 +1573,12 @@ pgsm_store_error(const char *query, ErrorData *edata)
}
static void
pgsm_add_to_list(pgsmEntry *entry, char *query_text, int query_len, bool should_dup)
pgsm_add_to_list(pgsmEntry *entry, char *query_text, int query_len)
{
MemoryContext oldctx;
/* Switch to TopMemoryContext */
oldctx = MemoryContextSwitchTo(MessageContext);
if (should_dup)
entry->query_text.query_pointer = pnstrdup(query_text, query_len);
else
entry->query_text.query_pointer = query_text;
/* Switch to pgsm memory context */
MemoryContext oldctx = MemoryContextSwitchTo(pgsm_get_ss()->pgsm_mem_cxt);
entry->query_text.query_pointer = pnstrdup(query_text, query_len);
lentries = lappend(lentries, entry);
MemoryContextSwitchTo(oldctx);
}
@ -1623,11 +1613,11 @@ pgsm_get_entry_for_query(uint64 queryid, PlanInfo *plan_info, const char* query_
* The correct bucket value will be needed then to search the hash table, or create
* the appropriate entry.
*/
entry = pgsm_create_hash_entry(MessageContext, 0, queryid, plan_info);
entry = pgsm_create_hash_entry(0, queryid, plan_info);
/* Update other member that are not counters, so that we don't have to worry about these. */
entry->pgsm_query_id = pgsm_hash_string(query_text, query_len);
pgsm_add_to_list(entry, (char *)query_text, query_len, true);
pgsm_add_to_list(entry, (char *)query_text, query_len);
}
return entry;
@ -1636,6 +1626,8 @@ pgsm_get_entry_for_query(uint64 queryid, PlanInfo *plan_info, const char* query_
static void
pgsm_cleanup_callback(void *arg)
{
/* Reset the memory context holding the list */
MemoryContextReset(pgsm_get_ss()->pgsm_mem_cxt);
lentries = NIL;
callback_setup = false;
}
@ -1644,7 +1636,7 @@ pgsm_cleanup_callback(void *arg)
* The bucket_id may not be known at this stage. So pass any value that you may wish.
*/
static pgsmEntry *
pgsm_create_hash_entry(MemoryContext context, uint64 bucket_id, uint64 queryid, PlanInfo *plan_info)
pgsm_create_hash_entry(uint64 bucket_id, uint64 queryid, PlanInfo *plan_info)
{
pgsmEntry *entry;
int sec_ctx;
@ -1656,10 +1648,9 @@ pgsm_create_hash_entry(MemoryContext context, uint64 bucket_id, uint64 queryid,
char *datname = NULL;
char *username = NULL;
/* Create an entry in the TopMemoryContext */
oldctx = MemoryContextSwitchTo(context);
/* Create an entry in the pgsm memory context */
oldctx = MemoryContextSwitchTo(pgsm_get_ss()->pgsm_mem_cxt);
entry = palloc0(sizeof(pgsmEntry));
MemoryContextSwitchTo(oldctx);
/*
* Get the user ID. Let's use this instead of GetUserID as this
@ -1706,6 +1697,8 @@ pgsm_create_hash_entry(MemoryContext context, uint64 bucket_id, uint64 queryid,
pfree(datname);
pfree(username);
MemoryContextSwitchTo(oldctx);
return entry;
}
@ -1965,8 +1958,6 @@ pg_stat_monitor_internal(FunctionCallInfo fcinfo,
PGSM_HASH_SEQ_STATUS hstat;
pgsmEntry *entry;
pgsmSharedState *pgsm;
char *query_txt = NULL;
char *parent_query_txt = NULL;
int expected_columns = (api_version >= PGSM_V2_0)?PG_STAT_MONITOR_COLS_V2_0:PG_STAT_MONITOR_COLS_V1_0;
/* Disallow old api usage */
@ -2038,6 +2029,9 @@ pg_stat_monitor_internal(FunctionCallInfo fcinfo,
uint64 pgsm_query_id = entry->pgsm_query_id;
dsa_area *query_dsa_area;
char *query_ptr;
char *query_txt = NULL;
char *parent_query_txt = NULL;
bool toplevel = entry->key.toplevel;
#if PG_VERSION_NUM < 140000
bool is_allowed_role = is_member_of_role(GetUserId(), DEFAULT_ROLE_READ_ALL_STATS);
@ -2353,15 +2347,16 @@ pg_stat_monitor_internal(FunctionCallInfo fcinfo,
/* clean up and return the tuplestore */
tuplestore_putvalues(tupstore, tupdesc, values, nulls);
if(query_txt)
pfree(query_txt);
if(parent_query_txt)
pfree(parent_query_txt);
}
/* clean up and return the tuplestore */
pgsm_hash_seq_term(&hstat);
LWLockRelease(pgsm->lock);
if(query_txt)
pfree(query_txt);
if(parent_query_txt)
pfree(parent_query_txt);
tuplestore_donestoring(tupstore);
}

View File

@ -57,6 +57,9 @@
#include "utils/lsyscache.h"
#include "utils/guc.h"
#include "utils/guc_tables.h"
#include "utils/memutils.h"
#include "utils/palloc.h"
#define MAX_BACKEND_PROCESES (MaxBackends + NUM_AUXILIARY_PROCS + max_prepared_xacts)
#define IntArrayGetTextDatum(x,y) intarray_get_datum(x,y)
@ -64,7 +67,6 @@
/* XXX: Should USAGE_EXEC reflect execution time and/or buffer usage? */
#define USAGE_EXEC(duration) (1.0)
#define USAGE_INIT (1.0) /* including initial planning */
#define ASSUMED_MEDIAN_INIT (10.0) /* initial assumed median usage */
#define ASSUMED_LENGTH_INIT 1024 /* initial assumed mean query length */
#define USAGE_DECREASE_FACTOR (0.99) /* decreased every entry_dealloc */
#define STICKY_DECREASE_FACTOR (0.50) /* factor for sticky entries */
@ -402,7 +404,6 @@ typedef struct pgsmEntry
typedef struct pgsmSharedState
{
LWLock *lock; /* protects hashtable search/modification */
double cur_median_usage; /* current median usage in hashtable */
slock_t mutex; /* protects following fields only: */
pg_atomic_uint64 current_wbucket;
pg_atomic_uint64 prev_bucket_sec;
@ -417,6 +418,10 @@ typedef struct pgsmSharedState
* classic shared memory hash or dshash
* (if we are using USE_DYNAMIC_HASH)
*/
MemoryContext pgsm_mem_cxt;
/* context to store stats in local
* memory until they are pushed to shared hash
*/
bool pgsm_oom;
} pgsmSharedState;
@ -429,16 +434,6 @@ typedef struct pgsmLocalState
PGSM_HASH_TABLE *shared_hash;
}pgsmLocalState;
#define ResetSharedState(x) \
do { \
x->cur_median_usage = ASSUMED_MEDIAN_INIT; \
x->cur_median_usage = ASSUMED_MEDIAN_INIT; \
pg_atomic_init_u64(&x->current_wbucket, 0); \
pg_atomic_init_u64(&x->prev_bucket_sec, 0); \
memset(&x->bucket_entry, 0, MAX_BUCKETS * sizeof(uint64)); \
} while(0)
#if PG_VERSION_NUM < 140000
/*
* Struct for tracking locations/lengths of constants during normalization