diff --git a/Makefile b/Makefile index 6d65d47..fe567a8 100644 --- a/Makefile +++ b/Makefile @@ -12,8 +12,7 @@ LDFLAGS_SL += $(filter -lm, $(LIBS)) TAP_TESTS = 1 REGRESS_OPTS = --temp-config $(top_srcdir)/contrib/pg_stat_monitor/pg_stat_monitor.conf --inputdir=regression -#REGRESS = basic version guc pgsm_query_id functions counters relations database error_insert application_name application_name_unique top_query cmd_type error rows tags -REGRESS = basic version guc pgsm_query_id functions relations database error_insert application_name application_name_unique top_query cmd_type error rows tags +REGRESS = basic version guc pgsm_query_id functions counters relations database error_insert application_name application_name_unique top_query cmd_type error rows tags # Disabled because these tests require "shared_preload_libraries=pg_stat_statements", # which typical installcheck users do not have (e.g. buildfarm clients). diff --git a/hash_query.c b/hash_query.c index fa6861a..b49257a 100644 --- a/hash_query.c +++ b/hash_query.c @@ -268,7 +268,7 @@ hash_entry_alloc(pgssSharedState *pgss, pgssHashKey *key, int encoding) /* New entry, initialize it */ /* reset the statistics */ memset(&entry->counters, 0, sizeof(Counters)); - entry->query_pos = InvalidDsaPointer; + entry->query_text.query_pos = InvalidDsaPointer; entry->counters.info.parent_query = InvalidDsaPointer; /* set the appropriate initial usage count */ @@ -302,6 +302,7 @@ hash_entry_dealloc(int new_bucket_id, int old_bucket_id, unsigned char *query_bu { PGSM_HASH_SEQ_STATUS hstat; pgssEntry *entry = NULL; + /* Store pending query ids from the previous bucket. */ if (!pgsmStateLocal.shared_hash) return; @@ -317,10 +318,11 @@ hash_entry_dealloc(int new_bucket_id, int old_bucket_id, unsigned char *query_bu * Remove all entries if new_bucket_id == -1. Otherwise remove entry * in new_bucket_id if it has finished already. */ - if (new_bucket_id < 0 || entry->key.bucket_id == new_bucket_id) + if (new_bucket_id < 0 || + (entry->key.bucket_id == new_bucket_id )) { dsa_pointer parent_qdsa = entry->counters.info.parent_query; - pdsa = entry->query_pos; + pdsa = entry->query_text.query_pos; pgsm_hash_delete_current(&hstat, pgsmStateLocal.shared_hash, &entry->key); @@ -332,8 +334,35 @@ hash_entry_dealloc(int new_bucket_id, int old_bucket_id, unsigned char *query_bu continue; } } + pgsm_hash_seq_term(&hstat); +} + +/* + * Release all entries. + */ +void +hash_entry_reset() +{ + pgssSharedState *pgss = pgsm_get_ss(); + PGSM_HASH_SEQ_STATUS hstat; + pgssEntry *entry; + + LWLockAcquire(pgss->lock, LW_EXCLUSIVE); + + pgsm_hash_seq_init(&hstat, pgsmStateLocal.shared_hash, true); + + while ((entry = pgsm_hash_seq_next(&hstat)) != NULL) + { + dsa_pointer pdsa = entry->query_text.query_pos; + pgsm_hash_delete_current(&hstat, pgsmStateLocal.shared_hash, &entry->key); + if (DsaPointerIsValid(pdsa)) + dsa_free(pgsmStateLocal.dsa, pdsa); + } pgsm_hash_seq_term(&hstat); + + pg_atomic_write_u64(&pgss->current_wbucket, 0); + LWLockRelease(pgss->lock); } bool diff --git a/pg_stat_monitor.c b/pg_stat_monitor.c index 36d7e41..bcc7e81 100644 --- a/pg_stat_monitor.c +++ b/pg_stat_monitor.c @@ -19,6 +19,8 @@ #include "access/parallel.h" #include "nodes/pg_list.h" #include "utils/guc.h" +#include "utils/memutils.h" +#include "utils/palloc.h" #include #include "pgstat.h" #include "commands/explain.h" @@ -84,8 +86,7 @@ static int hist_bucket_count_total; /* The array to store outer layer query id*/ uint64 *nested_queryids; char **nested_query_txts; -List *lentries = NULL; -List *lquery_text = NULL; +List *lentries = NIL; /* Regex object used to extract query comments. */ static regex_t preg_query_comments; @@ -188,6 +189,9 @@ char *unpack_sql_state(int sql_state); static pgssEntry *pgsm_create_hash_entry(uint64 bucket_id, uint64 queryid, PlanInfo *plan_info); static void pgsm_add_to_list(pgssEntry *entry, char *query_text, int query_len, bool should_dup); +static pgssEntry* pgsm_get_entry_for_query(uint64 queryid, const char* query_text, int query_len, bool create); +static void pgsm_print_entrys_list(void); +static void pgsm_cleanup_callback(void *arg); static void pgsm_store_error(uint64 queryid, const char *query, ErrorData *edata); static void pgsm_update_entry(pgssEntry *entry, @@ -410,6 +414,9 @@ pgss_shmem_request(void) } #endif +MemoryContextCallback callback, callback2; +volatile bool callback_setup = false; + static void pgsm_post_parse_analyze_internal(ParseState *pstate, Query *query, JumbleState *jstate) { @@ -424,9 +431,30 @@ pgsm_post_parse_analyze_internal(ParseState *pstate, Query *query, JumbleState * if (!IsSystemInitialized()) return; + // elog(NOTICE,"%s[%d-%d]:%s\n\t%s",__FUNCTION__,exec_nested_level,plan_nested_level, pstate->p_sourcetext, pstate->parentParseState? pstate->parentParseState->p_sourcetext:"PARENT IS NULL"); + + if (callback_setup == false) + { + if (MemoryContextIsValid(MessageContext)) + { + // elog(NOTICE,"Setting Callback: %s[%d-%d]:%s\n\t%s",__FUNCTION__,exec_nested_level,plan_nested_level, pstate->p_sourcetext, pstate->parentParseState? pstate->parentParseState->p_sourcetext:"PARENT IS NULL"); + callback.func = pgsm_cleanup_callback; + callback.arg = (void *) strdup("MessageContext"); + + MemoryContextRegisterResetCallback(MessageContext, &callback); + callback_setup = true; + } + else + { + MemoryContextStats(TopMemoryContext); + } + } + + if (!pgsm_enabled(exec_nested_level)) return; + /* * Clear queryId for prepared statements related utility, as those will * inherit from the underlying statement's one (except DEALLOCATE which is @@ -439,6 +467,7 @@ pgsm_post_parse_analyze_internal(ParseState *pstate, Query *query, JumbleState * return; } + //elog(NOTICE,"2- %s:%s",__FUNCTION__,pstate->p_sourcetext); /* * Let's calculate queryid for versions 13 and below. We don't have to check @@ -538,6 +567,8 @@ pgsm_post_parse_analyze(ParseState *pstate, Query *query) { JumbleState jstate; + // elog(NOTICE,"%s:%s",__FUNCTION__,queryDesc->sourceText); + if (prev_post_parse_analyze_hook) prev_post_parse_analyze_hook(pstate, query); @@ -554,6 +585,8 @@ pgss_ExecutorStart(QueryDesc *queryDesc, int eflags) if (getrusage(RUSAGE_SELF, &rusage_start) != 0) elog(DEBUG1, "pgss_ExecutorStart: failed to execute getrusage"); + // elog(NOTICE,"%s:%s",__FUNCTION__,queryDesc->sourceText); + if (prev_ExecutorStart) prev_ExecutorStart(queryDesc, eflags); else @@ -595,6 +628,8 @@ static void pgss_ExecutorRun(QueryDesc *queryDesc, ScanDirection direction, uint64 count, bool execute_once) { + // elog(NOTICE,"%s:%s",__FUNCTION__,queryDesc->sourceText); + if (exec_nested_level >= 0 && exec_nested_level < max_stack_depth) { nested_queryids[exec_nested_level] = queryDesc->plannedstmt->queryId; @@ -639,6 +674,8 @@ static void pgss_ExecutorFinish(QueryDesc *queryDesc) { exec_nested_level++; + // elog(NOTICE,"%s[%d]:%s",__FUNCTION__,exec_nested_level,queryDesc->sourceText); + PG_TRY(); { if (prev_ExecutorFinish) @@ -653,6 +690,7 @@ pgss_ExecutorFinish(QueryDesc *queryDesc) PG_RE_THROW(); } PG_END_TRY(); + } static char * @@ -690,7 +728,7 @@ pgsm_ExecutorEnd(QueryDesc *queryDesc) /* Extract the plan information in case of SELECT statement */ if (queryDesc->operation == CMD_SELECT && PGSM_QUERY_PLAN) { - MemoryContext mct = MemoryContextSwitchTo(TopMemoryContext); + MemoryContext mct = MemoryContextSwitchTo(MessageContext); plan_info.plan_len = snprintf(plan_info.plan_text, PLAN_TEXT_LEN, "%s", pgss_explain(queryDesc)); plan_info.planid = pgss_hash_string(plan_info.plan_text, plan_info.plan_len); @@ -698,10 +736,17 @@ pgsm_ExecutorEnd(QueryDesc *queryDesc) MemoryContextSwitchTo(mct); } + // elog(NOTICE,"%s *[%d--%d]*:%s",__FUNCTION__,exec_nested_level,plan_nested_level,queryDesc->sourceText); if (queryId != UINT64CONST(0) && queryDesc->totaltime && pgsm_enabled(exec_nested_level)) { - entry = (pgssEntry *)llast(lentries); + // elog(NOTICE,"**Hoping for list entryof query: %s",queryDesc->sourceText); + entry = pgsm_get_entry_for_query(queryId, queryDesc->sourceText, strlen(queryDesc->sourceText), true); + if(!entry) + { + elog(NOTICE,"Failed to find entry for [%lu] %s",queryId, queryDesc->sourceText); + pgsm_print_entrys_list(); + } /* * Make sure stats accumulation is done. (Note: it's okay if several @@ -750,6 +795,13 @@ pgsm_ExecutorEnd(QueryDesc *queryDesc) standard_ExecutorEnd(queryDesc); num_relations = 0; + +// if (exec_nested_level == 0) +// { +// // elog(NOTICE,"%s Deleting ALL [%d]",__FUNCTION__,list_length(lentries)); +// // list_free_deep(lentries); +// // lentries = NIL; +// } } static bool @@ -807,6 +859,8 @@ static PlannedStmt * pgsm_planner_hook(Query *parse, const char *query_string, int cursorOptions, ParamListInfo boundParams) { PlannedStmt *result; + pgssEntry *entry = NULL; + /* * We can't process the query if no query_string is provided, as @@ -820,10 +874,14 @@ pgsm_planner_hook(Query *parse, const char *query_string, int cursorOptions, Par * So testing the planner nesting level only is not enough to detect real * top level planner call. */ + // elog(NOTICE,"%s:%s",__FUNCTION__,query_string); + if (MemoryContextIsValid(MessageContext)) + entry = pgsm_get_entry_for_query(parse->queryId, query_string, strlen(query_string), true); + + if (pgsm_enabled(plan_nested_level + exec_nested_level) && PGSM_TRACK_PLANNING && query_string && parse->queryId != UINT64CONST(0)) { - pgssEntry *entry; instr_time start; instr_time duration; BufferUsage bufusage_start; @@ -873,9 +931,8 @@ pgsm_planner_hook(Query *parse, const char *query_string, int cursorOptions, Par memset(&walusage, 0, sizeof(WalUsage)); WalUsageAccumDiff(&walusage, &pgWalUsage, &walusage_start); - entry = (pgssEntry *)llast(lentries); - /* The plan details are captured when the query finishes */ + if(entry) pgsm_update_entry(entry, /* entry */ NULL, /* query */ NULL, /* PlanInfo */ @@ -898,10 +955,14 @@ pgsm_planner_hook(Query *parse, const char *query_string, int cursorOptions, Par * since it modifies the first argument (Query *), the second call * would trigger an assertion failure. */ + plan_nested_level++; + if (planner_hook_next) result = planner_hook_next(parse, query_string, cursorOptions, boundParams); else result = standard_planner(parse, query_string, cursorOptions, boundParams); + plan_nested_level--; + } return result; } @@ -986,6 +1047,7 @@ pgss_ProcessUtility(PlannedStmt *pstmt, const char *queryString, WalUsage walusage; WalUsage walusage_start = pgWalUsage; #endif + // elog(NOTICE,"%s: %s",__FUNCTION__,queryString); if (getrusage(RUSAGE_SELF, &rusage_start) != 0) elog(DEBUG1, "pg_stat_monitor: failed to execute getrusage"); @@ -1098,7 +1160,7 @@ pgss_ProcessUtility(PlannedStmt *pstmt, const char *queryString, /* The plan details are captured when the query finishes */ pgsm_update_entry(entry, /* entry */ - (char *)llast(lquery_text), /* query */ + (char *)query_text, /* query */ NULL, /* PlanInfo */ &sys_info, /* SysInfo */ NULL, /* ErrorInfo */ @@ -1115,6 +1177,11 @@ pgss_ProcessUtility(PlannedStmt *pstmt, const char *queryString, PGSM_EXEC); /* kind */ pgsm_store(entry); + /*TODO USAMA*/ + // elog(NOTICE,"%s Deleting[%d]",__FUNCTION__,list_length(lentries)); + // lentries = list_delete_last(lentries); + + } else { @@ -1503,47 +1570,129 @@ pgsm_store_error(uint64 queryid, snprintf(error_info.message, ERROR_MESSAGE_LEN, "%s", edata->message); snprintf(error_info.sqlcode, SQLCODE_LEN, "%s", unpack_sql_state(edata->sqlerrcode)); - entry = pgsm_create_hash_entry(0, queryid, NULL); + // entry = pgsm_create_hash_entry(0, queryid, NULL); + entry = pgsm_get_entry_for_query(queryid, query, strlen(query), false); - pgsm_add_to_list(entry, (char *)query, strlen(query), true); + // pgsm_add_to_list(entry, (char *)query, strlen(query), true); + if (entry) + { + pgsm_update_entry(entry, /* entry */ + query, /* query */ + NULL, /* PlanInfo */ + NULL, /* SysInfo */ + &error_info, /* ErrorInfo */ + 0, /* total_time */ + 0, /* rows */ + NULL, /* bufusage */ + NULL, /* walusage */ + NULL, /* jitusage */ + false, /* reset */ + PGSM_ERROR); /* kind */ - pgsm_update_entry(entry, /* entry */ - query, /* query */ - NULL, /* PlanInfo */ - NULL, /* SysInfo */ - &error_info, /* ErrorInfo */ - 0, /* total_time */ - 0, /* rows */ - NULL, /* bufusage */ - NULL, /* walusage */ - NULL, /* jitusage */ - false, /* reset */ - PGSM_ERROR); /* kind */ - - pgsm_store(entry); + pgsm_store(entry); + } + // if (exec_nested_level == 0) + // { + // // elog(NOTICE,"%s Deleting ALL [%d]",__FUNCTION__,list_length(lentries)); + // list_free(lentries); + // lentries = NULL; + // } } static void pgsm_add_to_list(pgssEntry *entry, char *query_text, int query_len, bool should_dup) { MemoryContext oldctx; - char *query; /* Switch to TopMemoryContext */ - oldctx = MemoryContextSwitchTo(TopMemoryContext); + oldctx = MemoryContextSwitchTo(MessageContext); if (should_dup) - query = pnstrdup(query_text, query_len); + entry->query_text.query_pointer = pnstrdup(query_text, query_len); else - query = query_text; + entry->query_text.query_pointer = query_text; lentries = lappend(lentries, entry); - lquery_text = lappend(lquery_text, query); + // elog(NOTICE,"Adding *[%d]*-- [%d]--%s",exec_nested_level,list_length(lentries),query_text); MemoryContextSwitchTo(oldctx); } +static pgssEntry* +pgsm_get_entry_for_query(uint64 queryid, const char* query_text, int query_len, bool create) +{ + pgssEntry *entry = NULL; + ListCell *lc = NULL; + /* First bet is on the last entry */ + // elog(NOTICE,"*** Finding query-id:[%lu]-- [%s]--CREATE? %s ",queryid, query_text?query_text:"NULL",create?"TRUE":"FALSE"); + if (lentries == NIL && !create) + return NULL; + + if (lentries) + { + entry = (pgssEntry *)llast(lentries); + if(entry->key.queryid == queryid) + return entry; + + foreach(lc, lentries) + { + entry = lfirst(lc); + if(entry->key.queryid == queryid) + return entry; + } + } + if (create && query_text) + { + /* + * At this point, we don't know which bucket this query will land in, so passing + * 0. The store function MUST later update it based on the current bucket value. + * The correct bucket value will be needed then to search the hash table, or create + * the appropriate entry. + */ + entry = pgsm_create_hash_entry(0, queryid, NULL); + + /* Update other member that are not counters, so that we don't have to worry about these. */ + entry->pgsm_query_id = pgss_hash_string(query_text, query_len); + // entry->counters.info.cmd_type = query->commandType; + pgsm_add_to_list(entry, query_text, query_len, true); + } + + return entry; +} + +static void +pgsm_print_entrys_list(void) +{ + pgssEntry *entry; + ListCell *lc = NULL; + int i= 0; + /* First bet is on the last entry */ + if (lentries == NIL) + { + // elog(NOTICE,"LIST iS EMPTY"); + return; + } + foreach(lc, lentries) + { + entry = lfirst(lc); + // elog(NOTICE,"****%d [%lu] %s",i++,entry->key.queryid,entry->query_text.query_pointer); + } + return; +} + +static void +pgsm_cleanup_callback(void *arg) +{ + // if (strcmp(arg,"PortalContext") == 0) + // elog(NOTICE,"PortalContext"); + // else + //elog(NOTICE,"MessageContext"); + // elog(NOTICE,"Setting list to empty"); + // pgsm_print_entrys_list(); + lentries = NIL; + callback_setup = false; +} /* * Function encapsulating some external calls for filling up the hash key data structure. * The bucket_id may not be known at this stage. So pass any value that you may wish. @@ -1560,7 +1709,7 @@ pgsm_create_hash_entry(uint64 bucket_id, uint64 queryid, PlanInfo *plan_info) MemoryContext oldctx; /* Create an entry in the TopMemoryContext */ - oldctx = MemoryContextSwitchTo(TopMemoryContext); + oldctx = MemoryContextSwitchTo(MessageContext); entry = palloc0(sizeof(pgssEntry)); MemoryContextSwitchTo(oldctx); @@ -1631,7 +1780,7 @@ pgsm_store(pgssEntry *entry) reset = true; entry->key.bucket_id = bucketid; - query = (char *)llast(lquery_text); + query = entry->query_text.query_pointer; query_len = strlen(query); /* @@ -1685,7 +1834,7 @@ pgsm_store(pgssEntry *entry) PG_END_TRY(); }PGSM_END_DISABLE_ERROR_CAPTURE(); - if (entry == NULL) + if (shared_hash_entry == NULL) { LWLockRelease(pgss->lock); @@ -1696,10 +1845,10 @@ pgsm_store(pgssEntry *entry) } /* If we already have the pointer set, free this one */ - if (DsaPointerIsValid(shared_hash_entry->query_pos)) + if (DsaPointerIsValid(shared_hash_entry->query_text.query_pos)) dsa_free(query_dsa_area, dsa_query_pointer); else - shared_hash_entry->query_pos = dsa_query_pointer; + shared_hash_entry->query_text.query_pos = dsa_query_pointer; shared_hash_entry->pgsm_query_id = entry->pgsm_query_id; shared_hash_entry->encoding = entry->encoding; @@ -1719,8 +1868,11 @@ pgsm_store(pgssEntry *entry) reset, /* reset */ PGSM_STORE); - lentries = list_delete_last(lentries); - lquery_text = list_delete_last(lquery_text); + + // elog(NOTICE,"Deleting[%d]",list_length(lentries)); + + // lentries = list_delete_last(lentries); + // lquery_text = list_delete_last(lquery_text); LWLockRelease(pgss->lock); } @@ -1874,10 +2026,10 @@ pg_stat_monitor_internal(FunctionCallInfo fcinfo, bool is_allowed_role = is_member_of_role(GetUserId(), ROLE_PG_READ_ALL_STATS); #endif /* Load the query text from dsa area */ - if (DsaPointerIsValid(entry->query_pos)) + if (DsaPointerIsValid(entry->query_text.query_pos)) { query_dsa_area = get_dsa_area_for_query_text(); - query_ptr = dsa_get_address(query_dsa_area, entry->query_pos); + query_ptr = dsa_get_address(query_dsa_area, entry->query_text.query_pos); query_txt = pstrdup(query_ptr); } else @@ -3483,3 +3635,4 @@ get_query_id(JumbleState *jstate, Query *query) return queryid; } #endif + diff --git a/pg_stat_monitor.h b/pg_stat_monitor.h index 9c8bcd1..8af820d 100644 --- a/pg_stat_monitor.h +++ b/pg_stat_monitor.h @@ -369,7 +369,11 @@ typedef struct pgssEntry Counters counters; /* the statistics for this query */ int encoding; /* query text encoding */ slock_t mutex; /* protects the counters only */ - dsa_pointer query_pos; /* query location within query buffer */ + union + { + dsa_pointer query_pos; /* query location within query buffer */ + char* query_pointer; + }query_text; } pgssEntry; /* diff --git a/regression/sql/error.sql b/regression/sql/error.sql index 98870f2..67f2ac3 100644 --- a/regression/sql/error.sql +++ b/regression/sql/error.sql @@ -9,6 +9,6 @@ BEGIN RAISE WARNING 'warning message'; END $$; -SELECT query, elevel, sqlcode, message FROM pg_stat_monitor ORDER BY query COLLATE "C",elevel; +SELECT queryid, query, elevel, sqlcode, message FROM pg_stat_monitor ORDER BY query COLLATE "C",elevel; SELECT pg_stat_monitor_reset(); DROP EXTENSION pg_stat_monitor;