diff --git a/hash_query.c b/hash_query.c index 98670a2..6a626dd 100644 --- a/hash_query.c +++ b/hash_query.c @@ -21,6 +21,7 @@ static pgsmLocalState pgsmStateLocal; static PGSM_HASH_TABLE_HANDLE pgsm_create_bucket_hash(pgsmSharedState *pgsm, dsa_area *dsa); static Size pgsm_get_shared_area_size(void); +static void InitializeSharedState(pgsmSharedState *pgsm); #if USE_DYNAMIC_HASH /* parameter for the shared hash */ @@ -108,7 +109,7 @@ pgsm_startup(void) pgsm->pgsm_oom = false; pgsm->lock = &(GetNamedLWLockTranche("pg_stat_monitor"))->lock; SpinLockInit(&pgsm->mutex); - ResetSharedState(pgsm); + InitializeSharedState(pgsm); /* the allocation of pgsmSharedState itself */ p += MAXALIGN(sizeof(pgsmSharedState)); pgsm->raw_dsa_area = p; @@ -147,6 +148,18 @@ pgsm_startup(void) on_shmem_exit(pgsm_shmem_shutdown, (Datum) 0); } +static void +InitializeSharedState(pgsmSharedState *pgsm) +{ + pg_atomic_init_u64(&pgsm->current_wbucket, 0); + pg_atomic_init_u64(&pgsm->prev_bucket_sec, 0); + memset(&pgsm->bucket_entry, 0, MAX_BUCKETS * sizeof(uint64)); + pgsm->pgsm_mem_cxt = AllocSetContextCreate(TopMemoryContext, + "pg_stat_monitor local store", + ALLOCSET_DEFAULT_SIZES); +} + + /* * Create the classic or dshahs hash table for storing the query statistics. */ @@ -415,4 +428,4 @@ pgsm_hash_delete_current(PGSM_HASH_SEQ_STATUS *hstat, PGSM_HASH_TABLE *shared_ha #else hash_search(shared_hash, key, HASH_REMOVE, NULL); #endif -} \ No newline at end of file +} diff --git a/pg_stat_monitor.c b/pg_stat_monitor.c index 91a6f4e..113e767 100644 --- a/pg_stat_monitor.c +++ b/pg_stat_monitor.c @@ -19,8 +19,6 @@ #include "access/parallel.h" #include "nodes/pg_list.h" #include "utils/guc.h" -#include "utils/memutils.h" -#include "utils/palloc.h" #include #include "pgstat.h" #include "commands/dbcommands.h" @@ -68,10 +66,6 @@ do \ /*---- Initicalization Function Declarations ----*/ void _PG_init(void); -/*---- Local variables ----*/ -MemoryContextCallback callback; -volatile bool callback_setup = false; - /* Current nesting depth of ExecutorRun+ProcessUtility calls */ static int exec_nested_level = 0; volatile bool __pgsm_do_not_capture_error = false; @@ -190,12 +184,20 @@ char *unpack_sql_state(int sql_state); !IsA(n, DeallocateStmt)) -static pgsmEntry *pgsm_create_hash_entry(MemoryContext context, uint64 bucket_id, uint64 queryid, PlanInfo *plan_info); -static void pgsm_add_to_list(pgsmEntry *entry, char *query_text, int query_len, bool should_dup); +static pgsmEntry *pgsm_create_hash_entry(uint64 bucket_id, uint64 queryid, PlanInfo *plan_info); +static void pgsm_add_to_list(pgsmEntry *entry, char *query_text, int query_len); static pgsmEntry* pgsm_get_entry_for_query(uint64 queryid, PlanInfo *plan_info, const char* query_text, int query_len, bool create); static void pgsm_cleanup_callback(void *arg); static void pgsm_store_error(const char *query, ErrorData *edata); +/*---- Local variables ----*/ +MemoryContextCallback mem_cxt_reset_callback = + { + .func = pgsm_cleanup_callback, + .arg = NULL + }; +volatile bool callback_setup = false; + static void pgsm_update_entry(pgsmEntry *entry, const char *query, PlanInfo * plan_info, @@ -430,18 +432,14 @@ pgsm_post_parse_analyze_internal(ParseState *pstate, Query *query, JumbleState * if (callback_setup == false) { + /* If MessageContext is valid setup a callback to cleanup + * our local stats list when the MessagContext gets reset + */ if (MemoryContextIsValid(MessageContext)) { - callback.func = pgsm_cleanup_callback; - callback.arg = (void *) strdup("MessageContext"); - - MemoryContextRegisterResetCallback(MessageContext, &callback); + MemoryContextRegisterResetCallback(MessageContext, &mem_cxt_reset_callback); callback_setup = true; } - else - { - MemoryContextStats(TopMemoryContext); - } } if (!pgsm_enabled(exec_nested_level)) @@ -507,7 +505,7 @@ pgsm_post_parse_analyze_internal(ParseState *pstate, Query *query, JumbleState * * The correct bucket value will be needed then to search the hash table, or create * the appropriate entry. */ - entry = pgsm_create_hash_entry(MessageContext, 0, query->queryId, NULL); + entry = pgsm_create_hash_entry(0, query->queryId, NULL); /* Update other member that are not counters, so that we don't have to worry about these. */ entry->pgsm_query_id = pgsm_hash_string(norm_query ? norm_query : query_text, norm_query_len); @@ -524,10 +522,10 @@ pgsm_post_parse_analyze_internal(ParseState *pstate, Query *query, JumbleState * * it is put in the relevant memory context. */ if (PGSM_NORMALIZED_QUERY && norm_query) - pgsm_add_to_list(entry, norm_query, norm_query_len, true); + pgsm_add_to_list(entry, norm_query, norm_query_len); else { - pgsm_add_to_list(entry, (char *)query_text, query_len, true); + pgsm_add_to_list(entry, (char *)query_text, query_len); } /* Check that we've not exceeded max_stack_depth */ @@ -722,7 +720,7 @@ pgsm_ExecutorEnd(QueryDesc *queryDesc) entry = pgsm_get_entry_for_query(queryId, plan_ptr, (char *)queryDesc->sourceText, strlen(queryDesc->sourceText), true); if(!entry) { - elog(NOTICE,"[pg_stat_monitor] pgsm_ExecutorEnd: Failed to find entry for [%lu] %s.",queryId, queryDesc->sourceText); + elog(DEBUG2,"[pg_stat_monitor] pgsm_ExecutorEnd: Failed to find entry for [%lu] %s.",queryId, queryDesc->sourceText); return; } @@ -1122,7 +1120,7 @@ pgsm_ProcessUtility(PlannedStmt *pstmt, const char *queryString, BufferUsageAccumDiff(&bufusage, &pgBufferUsage, &bufusage_start); /* Create an entry for this query */ - entry = pgsm_create_hash_entry(MessageContext, 0, queryId, NULL); + entry = pgsm_create_hash_entry(0, queryId, NULL); location = pstmt->stmt_location; query_len = pstmt->stmt_len; @@ -1131,7 +1129,7 @@ pgsm_ProcessUtility(PlannedStmt *pstmt, const char *queryString, entry->pgsm_query_id = pgsm_hash_string(query_text, query_len); entry->counters.info.cmd_type = 0; - pgsm_add_to_list(entry, query_text, query_len, true); + pgsm_add_to_list(entry, query_text, query_len); /* Check that we've not exceeded max_stack_depth */ Assert(list_length(lentries) <= max_stack_depth); @@ -1564,7 +1562,7 @@ pgsm_store_error(const char *query, ErrorData *edata) queryid = pgsm_hash_string(query, len); - entry = pgsm_create_hash_entry(ErrorContext, 0, queryid, NULL); + entry = pgsm_create_hash_entry(0, queryid, NULL); entry->query_text.query_pointer = pnstrdup(query, len); entry->counters.error.elevel = edata->elevel; @@ -1575,20 +1573,12 @@ pgsm_store_error(const char *query, ErrorData *edata) } static void -pgsm_add_to_list(pgsmEntry *entry, char *query_text, int query_len, bool should_dup) +pgsm_add_to_list(pgsmEntry *entry, char *query_text, int query_len) { - MemoryContext oldctx; - - /* Switch to TopMemoryContext */ - oldctx = MemoryContextSwitchTo(MessageContext); - - if (should_dup) - entry->query_text.query_pointer = pnstrdup(query_text, query_len); - else - entry->query_text.query_pointer = query_text; - + /* Switch to pgsm memory context */ + MemoryContext oldctx = MemoryContextSwitchTo(pgsm_get_ss()->pgsm_mem_cxt); + entry->query_text.query_pointer = pnstrdup(query_text, query_len); lentries = lappend(lentries, entry); - MemoryContextSwitchTo(oldctx); } @@ -1623,11 +1613,11 @@ pgsm_get_entry_for_query(uint64 queryid, PlanInfo *plan_info, const char* query_ * The correct bucket value will be needed then to search the hash table, or create * the appropriate entry. */ - entry = pgsm_create_hash_entry(MessageContext, 0, queryid, plan_info); + entry = pgsm_create_hash_entry(0, queryid, plan_info); /* Update other member that are not counters, so that we don't have to worry about these. */ entry->pgsm_query_id = pgsm_hash_string(query_text, query_len); - pgsm_add_to_list(entry, (char *)query_text, query_len, true); + pgsm_add_to_list(entry, (char *)query_text, query_len); } return entry; @@ -1636,6 +1626,8 @@ pgsm_get_entry_for_query(uint64 queryid, PlanInfo *plan_info, const char* query_ static void pgsm_cleanup_callback(void *arg) { + /* Reset the memory context holding the list */ + MemoryContextReset(pgsm_get_ss()->pgsm_mem_cxt); lentries = NIL; callback_setup = false; } @@ -1644,7 +1636,7 @@ pgsm_cleanup_callback(void *arg) * The bucket_id may not be known at this stage. So pass any value that you may wish. */ static pgsmEntry * -pgsm_create_hash_entry(MemoryContext context, uint64 bucket_id, uint64 queryid, PlanInfo *plan_info) +pgsm_create_hash_entry(uint64 bucket_id, uint64 queryid, PlanInfo *plan_info) { pgsmEntry *entry; int sec_ctx; @@ -1656,10 +1648,9 @@ pgsm_create_hash_entry(MemoryContext context, uint64 bucket_id, uint64 queryid, char *datname = NULL; char *username = NULL; - /* Create an entry in the TopMemoryContext */ - oldctx = MemoryContextSwitchTo(context); + /* Create an entry in the pgsm memory context */ + oldctx = MemoryContextSwitchTo(pgsm_get_ss()->pgsm_mem_cxt); entry = palloc0(sizeof(pgsmEntry)); - MemoryContextSwitchTo(oldctx); /* * Get the user ID. Let's use this instead of GetUserID as this @@ -1706,6 +1697,8 @@ pgsm_create_hash_entry(MemoryContext context, uint64 bucket_id, uint64 queryid, pfree(datname); pfree(username); + MemoryContextSwitchTo(oldctx); + return entry; } @@ -1965,8 +1958,6 @@ pg_stat_monitor_internal(FunctionCallInfo fcinfo, PGSM_HASH_SEQ_STATUS hstat; pgsmEntry *entry; pgsmSharedState *pgsm; - char *query_txt = NULL; - char *parent_query_txt = NULL; int expected_columns = (api_version >= PGSM_V2_0)?PG_STAT_MONITOR_COLS_V2_0:PG_STAT_MONITOR_COLS_V1_0; /* Disallow old api usage */ @@ -2038,6 +2029,9 @@ pg_stat_monitor_internal(FunctionCallInfo fcinfo, uint64 pgsm_query_id = entry->pgsm_query_id; dsa_area *query_dsa_area; char *query_ptr; + char *query_txt = NULL; + char *parent_query_txt = NULL; + bool toplevel = entry->key.toplevel; #if PG_VERSION_NUM < 140000 bool is_allowed_role = is_member_of_role(GetUserId(), DEFAULT_ROLE_READ_ALL_STATS); @@ -2353,15 +2347,16 @@ pg_stat_monitor_internal(FunctionCallInfo fcinfo, /* clean up and return the tuplestore */ tuplestore_putvalues(tupstore, tupdesc, values, nulls); + + if(query_txt) + pfree(query_txt); + if(parent_query_txt) + pfree(parent_query_txt); } /* clean up and return the tuplestore */ pgsm_hash_seq_term(&hstat); LWLockRelease(pgsm->lock); - if(query_txt) - pfree(query_txt); - if(parent_query_txt) - pfree(parent_query_txt); tuplestore_donestoring(tupstore); } diff --git a/pg_stat_monitor.h b/pg_stat_monitor.h index 5676dab..a65af74 100644 --- a/pg_stat_monitor.h +++ b/pg_stat_monitor.h @@ -57,6 +57,9 @@ #include "utils/lsyscache.h" #include "utils/guc.h" #include "utils/guc_tables.h" +#include "utils/memutils.h" +#include "utils/palloc.h" + #define MAX_BACKEND_PROCESES (MaxBackends + NUM_AUXILIARY_PROCS + max_prepared_xacts) #define IntArrayGetTextDatum(x,y) intarray_get_datum(x,y) @@ -64,7 +67,6 @@ /* XXX: Should USAGE_EXEC reflect execution time and/or buffer usage? */ #define USAGE_EXEC(duration) (1.0) #define USAGE_INIT (1.0) /* including initial planning */ -#define ASSUMED_MEDIAN_INIT (10.0) /* initial assumed median usage */ #define ASSUMED_LENGTH_INIT 1024 /* initial assumed mean query length */ #define USAGE_DECREASE_FACTOR (0.99) /* decreased every entry_dealloc */ #define STICKY_DECREASE_FACTOR (0.50) /* factor for sticky entries */ @@ -402,7 +404,6 @@ typedef struct pgsmEntry typedef struct pgsmSharedState { LWLock *lock; /* protects hashtable search/modification */ - double cur_median_usage; /* current median usage in hashtable */ slock_t mutex; /* protects following fields only: */ pg_atomic_uint64 current_wbucket; pg_atomic_uint64 prev_bucket_sec; @@ -417,6 +418,10 @@ typedef struct pgsmSharedState * classic shared memory hash or dshash * (if we are using USE_DYNAMIC_HASH) */ + MemoryContext pgsm_mem_cxt; + /* context to store stats in local + * memory until they are pushed to shared hash + */ bool pgsm_oom; } pgsmSharedState; @@ -429,16 +434,6 @@ typedef struct pgsmLocalState PGSM_HASH_TABLE *shared_hash; }pgsmLocalState; -#define ResetSharedState(x) \ -do { \ - x->cur_median_usage = ASSUMED_MEDIAN_INIT; \ - x->cur_median_usage = ASSUMED_MEDIAN_INIT; \ - pg_atomic_init_u64(&x->current_wbucket, 0); \ - pg_atomic_init_u64(&x->prev_bucket_sec, 0); \ - memset(&x->bucket_entry, 0, MAX_BUCKETS * sizeof(uint64)); \ -} while(0) - - #if PG_VERSION_NUM < 140000 /* * Struct for tracking locations/lengths of constants during normalization