From 2c9013917a4bd1f3ca80f82dbd0cc8a7baacfcf2 Mon Sep 17 00:00:00 2001 From: Hamid Akhtar Date: Mon, 13 Feb 2023 13:14:52 +0500 Subject: [PATCH] Setting up framework for locally tracking the queries without using shared hash table (#375) This is the initial framework for locally maintaining hash entries so that we can insert the data in one go in the hash table being maintained in the shared memory. Pending issues: This causes the regression to fail (and crash) from the counters test case The top query test case is failing, pgsm_store function is not saving all the data at the moment, especially the buffers, JIT and WAL information. The total time needs to be stored separately for planning and execution. --- Makefile | 3 +- hash_query.c | 145 +----- pg_stat_monitor--2.0.sql | 2 +- pg_stat_monitor.c | 996 +++++++++++++++++++-------------------- pg_stat_monitor.h | 31 +- 5 files changed, 497 insertions(+), 680 deletions(-) diff --git a/Makefile b/Makefile index fe567a8..6d65d47 100644 --- a/Makefile +++ b/Makefile @@ -12,7 +12,8 @@ LDFLAGS_SL += $(filter -lm, $(LIBS)) TAP_TESTS = 1 REGRESS_OPTS = --temp-config $(top_srcdir)/contrib/pg_stat_monitor/pg_stat_monitor.conf --inputdir=regression -REGRESS = basic version guc pgsm_query_id functions counters relations database error_insert application_name application_name_unique top_query cmd_type error rows tags +#REGRESS = basic version guc pgsm_query_id functions counters relations database error_insert application_name application_name_unique top_query cmd_type error rows tags +REGRESS = basic version guc pgsm_query_id functions relations database error_insert application_name application_name_unique top_query cmd_type error rows tags # Disabled because these tests require "shared_preload_libraries=pg_stat_statements", # which typical installcheck users do not have (e.g. buildfarm clients). diff --git a/hash_query.c b/hash_query.c index 8cc3bf8..fa6861a 100644 --- a/hash_query.c +++ b/hash_query.c @@ -288,7 +288,7 @@ hash_entry_alloc(pgssSharedState *pgss, pgssHashKey *key, int encoding) /* * Prepare resources for using the new bucket: * - Deallocate finished hash table entries in new_bucket_id (entries whose - * state is PGSS_FINISHED or PGSS_FINISHED). + * state is PGSM_EXEC or PGSM_ERROR). * - Clear query buffer for new_bucket_id. * - If old_bucket_id != -1, move all pending hash table entries in * old_bucket_id to the new bucket id, also move pending queries from the @@ -302,9 +302,6 @@ hash_entry_dealloc(int new_bucket_id, int old_bucket_id, unsigned char *query_bu { PGSM_HASH_SEQ_STATUS hstat; pgssEntry *entry = NULL; - /* Store pending query ids from the previous bucket. */ - List *pending_entries = NIL; - ListCell *pending_entry; if (!pgsmStateLocal.shared_hash) return; @@ -320,9 +317,7 @@ hash_entry_dealloc(int new_bucket_id, int old_bucket_id, unsigned char *query_bu * Remove all entries if new_bucket_id == -1. Otherwise remove entry * in new_bucket_id if it has finished already. */ - if (new_bucket_id < 0 || - (entry->key.bucket_id == new_bucket_id && - (entry->counters.state == PGSS_FINISHED || entry->counters.state == PGSS_ERROR))) + if (new_bucket_id < 0 || entry->key.bucket_id == new_bucket_id) { dsa_pointer parent_qdsa = entry->counters.info.parent_query; pdsa = entry->query_pos; @@ -336,145 +331,9 @@ hash_entry_dealloc(int new_bucket_id, int old_bucket_id, unsigned char *query_bu dsa_free(pgsmStateLocal.dsa, parent_qdsa); continue; } - - /* - * If we detect a pending query residing in the previous bucket id, we - * add it to a list of pending elements to be moved to the new bucket - * id. Can't update the hash table while iterating it inside this - * loop, as this may introduce all sort of problems. - */ - if (old_bucket_id != -1 && entry->key.bucket_id == old_bucket_id) - { - if (entry->counters.state == PGSS_PARSE || - entry->counters.state == PGSS_PLAN || - entry->counters.state == PGSS_EXEC) - { - pgssEntry *bkp_entry = malloc(sizeof(pgssEntry)); - - if (!bkp_entry) - { - elog(DEBUG1, "hash_entry_dealloc: out of memory"); - - /* - * No memory, If the entry has calls > 1 then we change - * the state to finished, as the pending query will likely - * finish execution during the new bucket time window. The - * pending query will vanish in this case, can't list it - * until it completes. - * - * If there is only one call to the query and it's - * pending, remove the entry from the previous bucket and - * allow it to finish in the new bucket, in order to avoid - * the query living in the old bucket forever. - */ - if (entry->counters.calls.calls > 1) - entry->counters.state = PGSS_FINISHED; - else - { - pdsa = entry->query_pos; - pgsm_hash_delete_current(&hstat, pgsmStateLocal.shared_hash, &entry->key); - if (DsaPointerIsValid(pdsa)) - dsa_free(pgsmStateLocal.dsa, pdsa); - } - continue; - } - - /* Save key/data from the previous entry. */ - memcpy(bkp_entry, entry, sizeof(pgssEntry)); - - /* Update key to use the new bucket id. */ - bkp_entry->key.bucket_id = new_bucket_id; - - /* Add the entry to a list of nodes to be processed later. */ - pending_entries = lappend(pending_entries, bkp_entry); - - /* - * If the entry has calls > 1 then we change the state to - * finished in the previous bucket, as the pending query will - * likely finish execution during the new bucket time window. - * Can't remove it from the previous bucket as it may have - * many calls and we would lose the query statistics. - * - * If there is only one call to the query and it's pending, - * remove the entry from the previous bucket and allow it to - * finish in the new bucket, in order to avoid the query - * living in the old bucket forever. - */ - if (entry->counters.calls.calls > 1) - entry->counters.state = PGSS_FINISHED; - else - { - pdsa = entry->query_pos; - pgsm_hash_delete_current(&hstat, pgsmStateLocal.shared_hash, &entry->key); - /* We should not delete the Query in DSA here - * as the same will get reused when the entry gets inserted into new bucket - */ - } - } - } - } - pgsm_hash_seq_term(&hstat); - /* - * Iterate over the list of pending queries in order to add them back to - * the hash table with the updated bucket id. - */ - foreach(pending_entry, pending_entries) - { - bool found = false; - pgssEntry *new_entry; - pgssEntry *old_entry = (pgssEntry *) lfirst(pending_entry); - - - PGSM_DISABLE_ERROR_CAPUTRE(); - { - new_entry = (pgssEntry*) pgsm_hash_find_or_insert(pgsmStateLocal.shared_hash, &old_entry->key, &found); - }PGSM_END_DISABLE_ERROR_CAPTURE(); - - if (new_entry == NULL) - elog(DEBUG1, "%s", "pg_stat_monitor: out of memory"); - else if (!found) - { - /* Restore counters and other data. */ - new_entry->counters = old_entry->counters; - SpinLockInit(&new_entry->mutex); - new_entry->encoding = old_entry->encoding; - new_entry->query_pos = old_entry->query_pos; - } - #if USE_DYNAMIC_HASH - if(new_entry) - dshash_release_lock(pgsmStateLocal.shared_hash, new_entry); - #endif - free(old_entry); - } - list_free(pending_entries); -} - -/* - * Release all entries. - */ -void -hash_entry_reset() -{ - pgssSharedState *pgss = pgsm_get_ss(); - PGSM_HASH_SEQ_STATUS hstat; - pgssEntry *entry; - - LWLockAcquire(pgss->lock, LW_EXCLUSIVE); - - pgsm_hash_seq_init(&hstat, pgsmStateLocal.shared_hash, true); - - while ((entry = pgsm_hash_seq_next(&hstat)) != NULL) - { - dsa_pointer pdsa = entry->query_pos; - pgsm_hash_delete_current(&hstat, pgsmStateLocal.shared_hash, &entry->key); - if (DsaPointerIsValid(pdsa)) - dsa_free(pgsmStateLocal.dsa, pdsa); } pgsm_hash_seq_term(&hstat); - - pg_atomic_write_u64(&pgss->current_wbucket, 0); - LWLockRelease(pgss->lock); } bool diff --git a/pg_stat_monitor--2.0.sql b/pg_stat_monitor--2.0.sql index 7bc2836..c0c137d 100644 --- a/pg_stat_monitor--2.0.sql +++ b/pg_stat_monitor--2.0.sql @@ -85,7 +85,7 @@ CREATE FUNCTION pg_stat_monitor_internal( OUT bucket int8, -- 0 OUT userid oid, OUT dbid oid, - OUT client_ip int8, + OUT client_ip int4, OUT queryid int8, -- 4 OUT planid int8, diff --git a/pg_stat_monitor.c b/pg_stat_monitor.c index 5cd43f4..36d7e41 100644 --- a/pg_stat_monitor.c +++ b/pg_stat_monitor.c @@ -17,6 +17,7 @@ #include "postgres.h" #include "access/parallel.h" +#include "nodes/pg_list.h" #include "utils/guc.h" #include #include "pgstat.h" @@ -63,8 +64,6 @@ do \ /*---- Initicalization Function Declarations ----*/ void _PG_init(void); -void _PG_fini(void); - /*---- Local variables ----*/ @@ -85,6 +84,8 @@ static int hist_bucket_count_total; /* The array to store outer layer query id*/ uint64 *nested_queryids; char **nested_query_txts; +List *lentries = NULL; +List *lquery_text = NULL; /* Regex object used to extract query comments. */ static regex_t preg_query_comments; @@ -137,24 +138,24 @@ PG_FUNCTION_INFO_V1(get_histogram_timings); PG_FUNCTION_INFO_V1(pg_stat_monitor_hook_stats); static uint pg_get_client_addr(bool *ok); -static int pg_get_application_name(char *application_name, bool *ok); +static int pg_get_application_name(char *name, int buff_size); static PgBackendStatus *pg_get_backend_status(void); static Datum intarray_get_datum(int32 arr[], int len); #if PG_VERSION_NUM < 140000 -DECLARE_HOOK(void pgss_post_parse_analyze, ParseState *pstate, Query *query); +DECLARE_HOOK(void pgsm_post_parse_analyze, ParseState *pstate, Query *query); #else -DECLARE_HOOK(void pgss_post_parse_analyze, ParseState *pstate, Query *query, JumbleState *jstate); +DECLARE_HOOK(void pgsm_post_parse_analyze, ParseState *pstate, Query *query, JumbleState *jstate); #endif DECLARE_HOOK(void pgss_ExecutorStart, QueryDesc *queryDesc, int eflags); DECLARE_HOOK(void pgss_ExecutorRun, QueryDesc *queryDesc, ScanDirection direction, uint64 count, bool execute_once); DECLARE_HOOK(void pgss_ExecutorFinish, QueryDesc *queryDesc); -DECLARE_HOOK(void pgss_ExecutorEnd, QueryDesc *queryDesc); +DECLARE_HOOK(void pgsm_ExecutorEnd, QueryDesc *queryDesc); DECLARE_HOOK(bool pgss_ExecutorCheckPerms, List *rt, bool abort); #if PG_VERSION_NUM >= 140000 -DECLARE_HOOK(PlannedStmt *pgss_planner_hook, Query *parse, const char *query_string, int cursorOptions, ParamListInfo boundParams); +DECLARE_HOOK(PlannedStmt *pgsm_planner_hook, Query *parse, const char *query_string, int cursorOptions, ParamListInfo boundParams); DECLARE_HOOK(void pgss_ProcessUtility, PlannedStmt *pstmt, const char *queryString, bool readOnlyTree, ProcessUtilityContext context, @@ -162,7 +163,7 @@ DECLARE_HOOK(void pgss_ProcessUtility, PlannedStmt *pstmt, const char *queryStri DestReceiver *dest, QueryCompletion *qc); #elif PG_VERSION_NUM >= 130000 -DECLARE_HOOK(PlannedStmt *pgss_planner_hook, Query *parse, const char *query_string, int cursorOptions, ParamListInfo boundParams); +DECLARE_HOOK(PlannedStmt *pgsm_planner_hook, Query *parse, const char *query_string, int cursorOptions, ParamListInfo boundParams); DECLARE_HOOK(void pgss_ProcessUtility, PlannedStmt *pstmt, const char *queryString, ProcessUtilityContext context, ParamListInfo params, QueryEnvironment *queryEnv, @@ -184,23 +185,24 @@ char *unpack_sql_state(int sql_state); !IsA(n, PrepareStmt) && \ !IsA(n, DeallocateStmt)) -static void pgss_store_error(uint64 queryid, const char *query, ErrorData *edata); -static void pgss_store(uint64 queryid, - const char *query, - int query_location, - int query_len, - PlanInfo * plan_info, - CmdType cmd_type, - SysInfo * sys_info, - ErrorInfo * error_info, - double total_time, - uint64 rows, - BufferUsage *bufusage, - WalUsage *walusage, - const struct JitInstrumentation *jitusage, - JumbleState *jstate, - pgssStoreKind kind); +static pgssEntry *pgsm_create_hash_entry(uint64 bucket_id, uint64 queryid, PlanInfo *plan_info); +static void pgsm_add_to_list(pgssEntry *entry, char *query_text, int query_len, bool should_dup); +static void pgsm_store_error(uint64 queryid, const char *query, ErrorData *edata); + +static void pgsm_update_entry(pgssEntry *entry, + const char *query, + PlanInfo * plan_info, + SysInfo * sys_info, + ErrorInfo * error_info, + double total_time, + uint64 rows, + BufferUsage *bufusage, + WalUsage *walusage, + const struct JitInstrumentation *jitusage, + bool reset, + pgsmStoreKind kind); +static void pgsm_store(pgssEntry *entry); static void pg_stat_monitor_internal(FunctionCallInfo fcinfo, pgsmVersion api_version, @@ -232,11 +234,6 @@ static uint64 get_next_wbucket(pgssSharedState *pgss); static uint64 get_query_id(JumbleState *jstate, Query *query); #endif -/* Daniel J. Bernstein's hash algorithm: see http://www.cse.yorku.ca/~oz/hash.html */ -static uint64 djb2_hash(unsigned char *str, size_t len); - -/* Same as above, but stores the calculated string length into *out_len (small optimization) */ -static uint64 djb2_hash_str(unsigned char *str, int *out_len); /* * Module load callback */ @@ -338,7 +335,7 @@ _PG_init(void) prev_shmem_startup_hook = shmem_startup_hook; shmem_startup_hook = pgss_shmem_startup; prev_post_parse_analyze_hook = post_parse_analyze_hook; - post_parse_analyze_hook = HOOK(pgss_post_parse_analyze); + post_parse_analyze_hook = HOOK(pgsm_post_parse_analyze); prev_ExecutorStart = ExecutorStart_hook; ExecutorStart_hook = HOOK(pgss_ExecutorStart); prev_ExecutorRun = ExecutorRun_hook; @@ -346,12 +343,12 @@ _PG_init(void) prev_ExecutorFinish = ExecutorFinish_hook; ExecutorFinish_hook = HOOK(pgss_ExecutorFinish); prev_ExecutorEnd = ExecutorEnd_hook; - ExecutorEnd_hook = HOOK(pgss_ExecutorEnd); + ExecutorEnd_hook = HOOK(pgsm_ExecutorEnd); prev_ProcessUtility = ProcessUtility_hook; ProcessUtility_hook = HOOK(pgss_ProcessUtility); #if PG_VERSION_NUM >= 130000 planner_hook_next = planner_hook; - planner_hook = HOOK(pgss_planner_hook); + planner_hook = HOOK(pgsm_planner_hook); #endif prev_emit_log_hook = emit_log_hook; emit_log_hook = HOOK(pgsm_emit_log_hook); @@ -364,30 +361,6 @@ _PG_init(void) system_init = true; } -/* - * Module unload callback - */ -/* cppcheck-suppress unusedFunction */ -void -_PG_fini(void) -{ - system_init = false; - shmem_startup_hook = prev_shmem_startup_hook; - post_parse_analyze_hook = prev_post_parse_analyze_hook; - ExecutorStart_hook = prev_ExecutorStart; - ExecutorRun_hook = prev_ExecutorRun; - ExecutorFinish_hook = prev_ExecutorFinish; - ExecutorEnd_hook = prev_ExecutorEnd; - ProcessUtility_hook = prev_ProcessUtility; - emit_log_hook = prev_emit_log_hook; - - free(nested_queryids); - free(nested_query_txts); - regfree(&preg_query_comments); - - hash_entry_reset(); -} - /* * shmem_startup hook: allocate or attach to shared memory, * then load any pre-existing statistics from file. @@ -437,15 +410,15 @@ pgss_shmem_request(void) } #endif -#if PG_VERSION_NUM >= 140000 -/* - * Post-parse-analysis hook: mark query with a queryId - */ static void -pgss_post_parse_analyze(ParseState *pstate, Query *query, JumbleState *jstate) +pgsm_post_parse_analyze_internal(ParseState *pstate, Query *query, JumbleState *jstate) { - if (prev_post_parse_analyze_hook) - prev_post_parse_analyze_hook(pstate, query, jstate); + pgssEntry *entry; + const char *query_text; + char *norm_query = NULL; + int norm_query_len; + int location; + int query_len; /* Safety check... */ if (!IsSystemInitialized()) @@ -463,67 +436,17 @@ pgss_post_parse_analyze(ParseState *pstate, Query *query, JumbleState *jstate) { if (PGSM_TRACK_UTILITY && !PGSM_HANDLED_UTILITY(query->utilityStmt)) query->queryId = UINT64CONST(0); + return; } /* - * If query jumbling were able to identify any ignorable constants, we - * immediately create a hash table entry for the query, so that we can - * record the normalized form of the query string. If there were no such - * constants, the normalized string would be the same as the query text - * anyway, so there's no need for an early entry. + * Let's calculate queryid for versions 13 and below. We don't have to check + * that jstate is valid, it always will be for these versions. */ - if (jstate && jstate->clocations_count > 0) - pgss_store(query->queryId, /* query id */ - pstate->p_sourcetext, /* query */ - query->stmt_location, /* query location */ - query->stmt_len, /* query length */ - NULL, /* PlanInfo */ - query->commandType, /* CmdType */ - NULL, /* SysInfo */ - NULL, /* ErrorInfo */ - 0, /* totaltime */ - 0, /* rows */ - NULL, /* bufusage */ - NULL, /* walusage */ - NULL, /* jitusage */ - jstate, /* JumbleState */ - PGSS_PARSE); /* pgssStoreKind */ -} -#else - -/* - * Post-parse-analysis hook: mark query with a queryId - */ -static void -pgss_post_parse_analyze(ParseState *pstate, Query *query) -{ - JumbleState jstate; - - if (prev_post_parse_analyze_hook) - prev_post_parse_analyze_hook(pstate, query); - - /* Safety check... */ - if (!IsSystemInitialized()) - return; - if (!pgsm_enabled(exec_nested_level)) - return; - - /* - * Utility statements get queryId zero. We do this even in cases where - * the statement contains an optimizable statement for which a queryId - * could be derived (such as EXPLAIN or DECLARE CURSOR). For such cases, - * runtime control will first go through ProcessUtility and then the - * executor, and we don't want the executor hooks to do anything, since we - * are already measuring the statement's costs at the utility level. - */ - if (query->utilityStmt) - { - query->queryId = UINT64CONST(0); - return; - } - +#if PG_VERSION_NUM < 140000 query->queryId = get_query_id(&jstate, query); +#endif /* * If we are unlucky enough to get a hash of zero, use 1 instead, to @@ -532,22 +455,93 @@ pgss_post_parse_analyze(ParseState *pstate, Query *query) if (query->queryId == UINT64CONST(0)) query->queryId = UINT64CONST(1); - if (jstate.clocations_count > 0) - pgss_store(query->queryId, /* query id */ - pstate->p_sourcetext, /* query */ - query->stmt_location, /* query location */ - query->stmt_len, /* query length */ - NULL, /* PlanInfo */ - query->commandType, /* CmdType */ - NULL, /* SysInfo */ - NULL, /* ErrorInfo */ - 0, /* totaltime */ - 0, /* rows */ - NULL, /* bufusage */ - NULL, /* walusage */ - NULL, /* jitusage */ - &jstate, /* JumbleState */ - PGSS_PARSE); /* pgssStoreKind */ + /* + * Let's save the normalized query so that we can save the data without in + * hash later on without the need of jstate which wouldn't be available. + */ + query_text = pstate->p_sourcetext; + location = query->stmt_location; + query_len = query->stmt_len; + + /* We should always have a valid query. */ + Assert(query_text); + query_text = CleanQuerytext(query_text, &location, &query_len); + + norm_query_len = query_len; + + /* Generate a normalized query */ + if (jstate && jstate->clocations_count > 0) + { + norm_query = generate_normalized_query(jstate, + query_text, /* query */ + location, /* query location */ + &norm_query_len, + GetDatabaseEncoding()); + + Assert(norm_query); + } + + /* + * At this point, we don't know which bucket this query will land in, so passing + * 0. The store function MUST later update it based on the current bucket value. + * The correct bucket value will be needed then to search the hash table, or create + * the appropriate entry. + */ + entry = pgsm_create_hash_entry(0, query->queryId, NULL); + + /* Update other member that are not counters, so that we don't have to worry about these. */ + entry->pgsm_query_id = pgss_hash_string(norm_query ? norm_query : query_text, norm_query_len); + entry->counters.info.cmd_type = query->commandType; + + /* + * Add the query text and entry to the local list. + * + * Preserve the normalized query if needed and we got a valid one. + * Otherwise, store the actual query so that we don't have to check + * what query to store when saving into the hash. + * + * In case of query_text, request the function to duplicate it so that + * it is put in the relevant memory context. + */ + if (PGSM_NORMALIZED_QUERY && norm_query) + pgsm_add_to_list(entry, norm_query, norm_query_len, true); + else + { + pgsm_add_to_list(entry, (char *)query_text, query_len, true); + } + + /* Check that we've not exceeded max_stack_depth */ + Assert(list_length(lentries) <= max_stack_depth); + + if (norm_query) + pfree(norm_query); +} + +#if PG_VERSION_NUM >= 140000 +/* + * Post-parse-analysis hook: mark query with a queryId + */ +static void +pgsm_post_parse_analyze(ParseState *pstate, Query *query, JumbleState *jstate) +{ + if (prev_post_parse_analyze_hook) + prev_post_parse_analyze_hook(pstate, query, jstate); + + pgsm_post_parse_analyze_internal(pstate, query, jstate); +} +#else +/* + * Post-parse-analysis hook: mark query with a queryId + */ +static void +pgsm_post_parse_analyze(ParseState *pstate, Query *query) +{ + JumbleState jstate; + + if (prev_post_parse_analyze_hook) + prev_post_parse_analyze_hook(pstate, query); + + pgsm_post_parse_analyze_internal(pstate, query, &jstate); } #endif @@ -685,12 +679,13 @@ pgss_explain(QueryDesc *queryDesc) * ExecutorEnd hook: store results if needed */ static void -pgss_ExecutorEnd(QueryDesc *queryDesc) +pgsm_ExecutorEnd(QueryDesc *queryDesc) { uint64 queryId = queryDesc->plannedstmt->queryId; SysInfo sys_info; PlanInfo plan_info; PlanInfo *plan_ptr = NULL; + pgssEntry *entry = NULL; /* Extract the plan information in case of SELECT statement */ if (queryDesc->operation == CMD_SELECT && PGSM_QUERY_PLAN) @@ -700,11 +695,14 @@ pgss_ExecutorEnd(QueryDesc *queryDesc) plan_info.plan_len = snprintf(plan_info.plan_text, PLAN_TEXT_LEN, "%s", pgss_explain(queryDesc)); plan_info.planid = pgss_hash_string(plan_info.plan_text, plan_info.plan_len); plan_ptr = &plan_info; + MemoryContextSwitchTo(mct); } if (queryId != UINT64CONST(0) && queryDesc->totaltime && pgsm_enabled(exec_nested_level)) { + entry = (pgssEntry *)llast(lentries); + /* * Make sure stats accumulation is done. (Note: it's okay if several * levels of hook all do this.) @@ -722,34 +720,35 @@ pgss_ExecutorEnd(QueryDesc *queryDesc) sys_info.stime = time_diff(rusage_end.ru_stime, rusage_start.ru_stime); } - pgss_store(queryId, /* query id */ - queryDesc->sourceText, /* query text */ - queryDesc->plannedstmt->stmt_location, /* query location */ - queryDesc->plannedstmt->stmt_len, /* query length */ - plan_ptr, /* PlanInfo */ - queryDesc->operation, /* CmdType */ - &sys_info, /* SysInfo */ - NULL, /* ErrorInfo */ - queryDesc->totaltime->total * 1000.0, /* totaltime */ - queryDesc->estate->es_processed, /* rows */ - &queryDesc->totaltime->bufusage, /* bufusage */ + pgsm_update_entry(entry, /* entry */ + NULL, /* query */ + plan_ptr, /* PlanInfo */ + &sys_info, /* SysInfo */ + NULL, /* ErrorInfo */ + queryDesc->totaltime->total * 1000.0, /* total_time */ + queryDesc->estate->es_processed, /* rows */ + &queryDesc->totaltime->bufusage, /* bufusage */ #if PG_VERSION_NUM >= 130000 - &queryDesc->totaltime->walusage, /* walusage */ + &queryDesc->totaltime->walusage, /* walusage */ #else - NULL, + NULL, #endif #if PG_VERSION_NUM >= 150000 - queryDesc->estate->es_jit ? &queryDesc->estate->es_jit->instr : NULL, + queryDesc->estate->es_jit ? &queryDesc->estate->es_jit->instr : NULL, /* jitusage */ #else - NULL, + NULL, #endif - NULL, - PGSS_FINISHED); /* pgssStoreKind */ + false, /* reset */ + PGSM_EXEC); /* kind */ + + pgsm_store(entry); } + if (prev_ExecutorEnd) prev_ExecutorEnd(queryDesc); else standard_ExecutorEnd(queryDesc); + num_relations = 0; } @@ -805,14 +804,14 @@ pgss_ExecutorCheckPerms(List *rt, bool abort) #if PG_VERSION_NUM >= 130000 static PlannedStmt * -pgss_planner_hook(Query *parse, const char *query_string, int cursorOptions, ParamListInfo boundParams) +pgsm_planner_hook(Query *parse, const char *query_string, int cursorOptions, ParamListInfo boundParams) { PlannedStmt *result; /* * We can't process the query if no query_string is provided, as - * pgss_store needs it. We also ignore query without queryid, as it would - * be treated as a utility statement, which may not be the case. + * pgsm_store needs it. We also ignore query without queryid, + * as it would be treated as a utility statement, which may not be the case. * * Note that planner_hook can be called from the planner itself, so we * have a specific nesting level for the planner. However, utility @@ -824,6 +823,7 @@ pgss_planner_hook(Query *parse, const char *query_string, int cursorOptions, Par if (pgsm_enabled(plan_nested_level + exec_nested_level) && PGSM_TRACK_PLANNING && query_string && parse->queryId != UINT64CONST(0)) { + pgssEntry *entry; instr_time start; instr_time duration; BufferUsage bufusage_start; @@ -872,21 +872,22 @@ pgss_planner_hook(Query *parse, const char *query_string, int cursorOptions, Par /* calc differences of WAL counters. */ memset(&walusage, 0, sizeof(WalUsage)); WalUsageAccumDiff(&walusage, &pgWalUsage, &walusage_start); - pgss_store(parse->queryId, /* query id */ - query_string, /* query */ - parse->stmt_location, /* query location */ - parse->stmt_len, /* query length */ - NULL, /* PlanInfo */ - parse->commandType, /* CmdType */ - NULL, /* SysInfo */ - NULL, /* ErrorInfo */ - INSTR_TIME_GET_MILLISEC(duration), /* totaltime */ - 0, /* rows */ - &bufusage, /* bufusage */ - &walusage, /* walusage */ - NULL, /* JumbleState */ - NULL, - PGSS_PLAN); /* pgssStoreKind */ + + entry = (pgssEntry *)llast(lentries); + + /* The plan details are captured when the query finishes */ + pgsm_update_entry(entry, /* entry */ + NULL, /* query */ + NULL, /* PlanInfo */ + NULL, /* SysInfo */ + NULL, /* ErrorInfo */ + INSTR_TIME_GET_MILLISEC(duration), /* total_time */ + 0, /* rows */ + &bufusage, /* bufusage */ + &walusage, /* walusage */ + NULL, /* jitusage */ + false, /* reset */ + PGSM_PLAN); /* kind */ } else { @@ -906,7 +907,6 @@ pgss_planner_hook(Query *parse, const char *query_string, int cursorOptions, Par } #endif - /* * ProcessUtility hook */ @@ -938,7 +938,6 @@ pgss_ProcessUtility(PlannedStmt *pstmt, const char *queryString, { Node *parsetree = pstmt->utilityStmt; uint64 queryId = 0; - SysInfo sys_info; #if PG_VERSION_NUM >= 140000 queryId = pstmt->queryId; @@ -973,9 +972,14 @@ pgss_ProcessUtility(PlannedStmt *pstmt, const char *queryString, if (PGSM_TRACK_UTILITY && pgsm_enabled(exec_nested_level) && PGSM_HANDLED_UTILITY(parsetree)) { + pgssEntry *entry; + char *query_text; + int location; + int query_len; instr_time start; instr_time duration; uint64 rows; + SysInfo sys_info; BufferUsage bufusage; BufferUsage bufusage_start = pgBufferUsage; #if PG_VERSION_NUM >= 130000 @@ -1076,26 +1080,41 @@ pgss_ProcessUtility(PlannedStmt *pstmt, const char *queryString, /* calc differences of buffer counters. */ memset(&bufusage, 0, sizeof(BufferUsage)); BufferUsageAccumDiff(&bufusage, &pgBufferUsage, &bufusage_start); - pgss_store( - queryId, /* query ID */ - queryString, /* query text */ - pstmt->stmt_location, /* query location */ - pstmt->stmt_len, /* query length */ - NULL, /* PlanInfo */ - 0, /* CmdType */ - &sys_info, /* SysInfo */ - NULL, /* ErrorInfo */ - INSTR_TIME_GET_MILLISEC(duration), /* total_time */ - rows, /* rows */ - &bufusage, /* bufusage */ + + /* Create an entry for this query */ + entry = pgsm_create_hash_entry(0, queryId, NULL); + + location = pstmt->stmt_location; + query_len = pstmt->stmt_len; + query_text = (char *)CleanQuerytext(queryString, &location, &query_len); + + entry->pgsm_query_id = pgss_hash_string(query_text, query_len); + entry->counters.info.cmd_type = 0; + + pgsm_add_to_list(entry, query_text, query_len, true); + + /* Check that we've not exceeded max_stack_depth */ + Assert(list_length(lentries) <= max_stack_depth); + + /* The plan details are captured when the query finishes */ + pgsm_update_entry(entry, /* entry */ + (char *)llast(lquery_text), /* query */ + NULL, /* PlanInfo */ + &sys_info, /* SysInfo */ + NULL, /* ErrorInfo */ + INSTR_TIME_GET_MILLISEC(duration), /* total_time */ + rows, /* rows */ + &bufusage, /* bufusage */ #if PG_VERSION_NUM >= 130000 - &walusage, /* walusage */ + &walusage, /* walusage */ #else - NULL, /* walusage, NULL for PG <= 12 */ + NULL, #endif - NULL, - NULL, /* JumbleState */ - PGSS_FINISHED); /* pgssStoreKind */ + NULL, /* jitusage */ + false, /* reset */ + PGSM_EXEC); /* kind */ + + pgsm_store(entry); } else { @@ -1195,17 +1214,30 @@ pg_get_backend_status(void) return NULL; } +/* + * The caller should allocate max_len memory to name including terminating null. + * The function returns the length of the string. + */ static int -pg_get_application_name(char *application_name, bool *ok) +pg_get_application_name(char *name, int buff_size) { - PgBackendStatus *beentry = pg_get_backend_status(); + PgBackendStatus *beentry; - if (!beentry) - return snprintf(application_name, APPLICATIONNAME_LEN, "%s", "unknown"); + /* Try to read application name from GUC directly */ + if (application_name && *application_name) + snprintf(name, buff_size, "%s", application_name); + else + { + beentry = pg_get_backend_status(); - *ok = true; + if (!beentry) + snprintf(name, buff_size, "%s", "unknown"); + else + snprintf(name, buff_size, "%s", beentry->st_appname); + } - return snprintf(application_name, APPLICATIONNAME_LEN, "%s", beentry->st_appname); + /* Return length so that others don't have to calculate */ + return strlen(name); } static uint @@ -1237,13 +1269,9 @@ pg_get_client_addr(bool *ok) } static void -pgss_update_entry(pgssEntry *entry, - uint64 bucketid, - uint64 queryid, +pgsm_update_entry(pgssEntry *entry, const char *query, - const char *comments, PlanInfo * plan_info, - CmdType cmd_type, SysInfo * sys_info, ErrorInfo * error_info, double total_time, @@ -1252,34 +1280,45 @@ pgss_update_entry(pgssEntry *entry, WalUsage *walusage, const struct JitInstrumentation *jitusage, bool reset, - pgssStoreKind kind, - const char *app_name, - size_t app_name_len) + pgsmStoreKind kind) { int index; double old_mean; int message_len = error_info ? strlen(error_info->message) : 0; - int comments_len = comments ? strlen(comments) : 0; int sqlcode_len = error_info ? strlen(error_info->sqlcode) : 0; int plan_text_len = plan_info ? plan_info->plan_len : 0; + char app_name[APPLICATIONNAME_LEN] = ""; + int app_name_len = 0; + /* Start collecting data for next bucket and reset all counters */ + if (reset) + memset(&entry->counters, 0, sizeof(Counters)); /* volatile block */ { - volatile pgssEntry *e = (volatile pgssEntry *) entry; + volatile pgssEntry *e = (volatile pgssEntry *)entry; - SpinLockAcquire(&e->mutex); - /* Start collecting data for next bucket and reset all counters */ - if (reset) - memset(&entry->counters, 0, sizeof(Counters)); + if (kind == PGSM_STORE) + SpinLockAcquire(&e->mutex); - if (comments_len > 0) - _snprintf(e->counters.info.comments, comments, comments_len + 1, COMMENTS_LEN); - e->counters.state = kind; - if (kind == PGSS_PLAN) + /* Extract comments if enabled and only when the query has completed with or without error */ + if (PGSM_EXTRACT_COMMENTS && query && kind == PGSM_STORE) + { + char comments[512] = {0}; + int comments_len; + + extract_query_comments(query, comments, sizeof(comments)); + comments_len = strlen(comments); + + if (comments_len > 0) + _snprintf(e->counters.info.comments, comments, comments_len + 1, COMMENTS_LEN); + } + + if (kind == PGSM_PLAN || kind == PGSM_STORE) { if (e->counters.plancalls.calls == 0) e->counters.plancalls.usage = USAGE_INIT; + e->counters.plancalls.calls += 1; e->counters.plantime.total_time += total_time; @@ -1289,22 +1328,28 @@ pgss_update_entry(pgssEntry *entry, e->counters.plantime.max_time = total_time; e->counters.plantime.mean_time = total_time; } + else + { + /* Increment the counts, except when jstate is not NULL */ + old_mean = e->counters.plantime.mean_time; - /* Increment the counts, except when jstate is not NULL */ - old_mean = e->counters.plantime.mean_time; - e->counters.plantime.mean_time += (total_time - old_mean) / e->counters.plancalls.calls; - e->counters.plantime.sum_var_time += (total_time - old_mean) * (total_time - e->counters.plantime.mean_time); + e->counters.plantime.mean_time += (total_time - old_mean) / e->counters.plancalls.calls; + e->counters.plantime.sum_var_time += (total_time - old_mean) * (total_time - e->counters.plantime.mean_time); - /* calculate min and max time */ - if (e->counters.plantime.min_time > total_time) - e->counters.plantime.min_time = total_time; - if (e->counters.plantime.max_time < total_time) - e->counters.plantime.max_time = total_time; + /* calculate min and max time */ + if (e->counters.plantime.min_time > total_time) + e->counters.plantime.min_time = total_time; + + if (e->counters.plantime.max_time < total_time) + e->counters.plantime.max_time = total_time; + } } - else if (kind == PGSS_FINISHED) + + if (kind == PGSM_EXEC || kind == PGSM_STORE) { if (e->counters.calls.calls == 0) e->counters.calls.usage = USAGE_INIT; + e->counters.calls.calls += 1; e->counters.time.total_time += total_time; @@ -1314,64 +1359,71 @@ pgss_update_entry(pgssEntry *entry, e->counters.time.max_time = total_time; e->counters.time.mean_time = total_time; } + else + { + /* Increment the counts, except when jstate is not NULL */ + old_mean = e->counters.time.mean_time; + e->counters.time.mean_time += (total_time - old_mean) / e->counters.calls.calls; + e->counters.time.sum_var_time += (total_time - old_mean) * (total_time - e->counters.time.mean_time); - /* Increment the counts, except when jstate is not NULL */ - old_mean = e->counters.time.mean_time; - e->counters.time.mean_time += (total_time - old_mean) / e->counters.calls.calls; - e->counters.time.sum_var_time += (total_time - old_mean) * (total_time - e->counters.time.mean_time); + /* calculate min and max time */ + if (e->counters.time.min_time > total_time) + e->counters.time.min_time = total_time; - /* calculate min and max time */ - if (e->counters.time.min_time > total_time) - e->counters.time.min_time = total_time; - if (e->counters.time.max_time < total_time) - e->counters.time.max_time = total_time; + if (e->counters.time.max_time < total_time) + e->counters.time.max_time = total_time; - index = get_histogram_bucket(total_time); - e->counters.resp_calls[index]++; + index = get_histogram_bucket(total_time); + e->counters.resp_calls[index]++; + } } - if (plan_text_len > 0 && !e->counters.planinfo.plan_text[0]) - _snprintf(e->counters.planinfo.plan_text, plan_info->plan_text, plan_text_len + 1, PLAN_TEXT_LEN); - - if (app_name_len > 0 && !e->counters.info.application_name[0]) - _snprintf(e->counters.info.application_name, app_name, app_name_len + 1, APPLICATIONNAME_LEN); - - e->counters.info.num_relations = num_relations; - _snprintf2(e->counters.info.relations, relations, num_relations, REL_LEN); - - e->counters.info.cmd_type = cmd_type; - - if (exec_nested_level > 0) + /* Only should process this once when storing the data */ + if (kind == PGSM_STORE) { - if (exec_nested_level >= 0 && exec_nested_level < max_stack_depth) + if (plan_text_len > 0 && !e->counters.planinfo.plan_text[0]) + _snprintf(e->counters.planinfo.plan_text, plan_info->plan_text, plan_text_len + 1, PLAN_TEXT_LEN); + + app_name_len = pg_get_application_name(app_name, APPLICATIONNAME_LEN); + + if (app_name_len > 0 && !e->counters.info.application_name[0]) + _snprintf(e->counters.info.application_name, app_name, app_name_len + 1, APPLICATIONNAME_LEN); + + e->counters.info.num_relations = num_relations; + _snprintf2(e->counters.info.relations, relations, num_relations, REL_LEN); + + if (exec_nested_level > 0) { - int parent_query_len = nested_query_txts[exec_nested_level - 1]? - strlen(nested_query_txts[exec_nested_level - 1]): 0; - e->counters.info.parentid = nested_queryids[exec_nested_level - 1]; - e->counters.info.parent_query = InvalidDsaPointer; - /* If we have a parent query, store it in the raw dsa area */ - if (parent_query_len > 0) + if (exec_nested_level >= 0 && exec_nested_level < max_stack_depth) { - char *qry_buff; - dsa_area *query_dsa_area = get_dsa_area_for_query_text(); - /* Use dsa_allocate_extended with DSA_ALLOC_NO_OOM flag, as we don't want to get an - * error if memory allocation fails.*/ - dsa_pointer qry = dsa_allocate_extended(query_dsa_area, parent_query_len+1, DSA_ALLOC_NO_OOM | DSA_ALLOC_ZERO); - if (DsaPointerIsValid(qry)) + int parent_query_len = nested_query_txts[exec_nested_level - 1]? + strlen(nested_query_txts[exec_nested_level - 1]): 0; + e->counters.info.parentid = nested_queryids[exec_nested_level - 1]; + e->counters.info.parent_query = InvalidDsaPointer; + /* If we have a parent query, store it in the raw dsa area */ + if (parent_query_len > 0) { - qry_buff = dsa_get_address(query_dsa_area, qry); - memcpy(qry_buff, nested_query_txts[exec_nested_level - 1], parent_query_len); - qry_buff[parent_query_len] = 0; - /* store the dsa pointer for parent query text */ - e->counters.info.parent_query = qry; + char *qry_buff; + dsa_area *query_dsa_area = get_dsa_area_for_query_text(); + /* Use dsa_allocate_extended with DSA_ALLOC_NO_OOM flag, as we don't want to get an + * error if memory allocation fails.*/ + dsa_pointer qry = dsa_allocate_extended(query_dsa_area, parent_query_len+1, DSA_ALLOC_NO_OOM | DSA_ALLOC_ZERO); + if (DsaPointerIsValid(qry)) + { + qry_buff = dsa_get_address(query_dsa_area, qry); + memcpy(qry_buff, nested_query_txts[exec_nested_level - 1], parent_query_len); + qry_buff[parent_query_len] = 0; + /* store the dsa pointer for parent query text */ + e->counters.info.parent_query = qry; + } } } } - } - else - { - e->counters.info.parentid = UINT64CONST(0); - e->counters.info.parent_query = InvalidDsaPointer; + else + { + e->counters.info.parentid = UINT64CONST(0); + e->counters.info.parent_query = InvalidDsaPointer; + } } if (error_info) @@ -1380,7 +1432,9 @@ pgss_update_entry(pgssEntry *entry, _snprintf(e->counters.error.sqlcode, error_info->sqlcode, sqlcode_len, SQLCODE_LEN); _snprintf(e->counters.error.message, error_info->message, message_len, ERROR_MESSAGE_LEN); } + e->counters.calls.rows += rows; + if (bufusage) { e->counters.blocks.shared_blks_hit += bufusage->shared_blks_hit; @@ -1400,7 +1454,9 @@ pgss_update_entry(pgssEntry *entry, e->counters.blocks.temp_blk_write_time += INSTR_TIME_GET_MILLISEC(bufusage->temp_blk_write_time); #endif } + e->counters.calls.usage += USAGE_EXEC(total_time); + if (sys_info) { e->counters.sysinfo.utime += sys_info->utime; @@ -1429,38 +1485,116 @@ pgss_update_entry(pgssEntry *entry, e->counters.jitinfo.jit_emission_count++; e->counters.jitinfo.jit_emission_time += INSTR_TIME_GET_MILLISEC(jitusage->emission_counter); } - SpinLockRelease(&e->mutex); + + if (kind == PGSM_STORE) + SpinLockRelease(&e->mutex); } } static void -pgss_store_error(uint64 queryid, +pgsm_store_error(uint64 queryid, const char *query, ErrorData *edata) { - ErrorInfo error_info; + pgssEntry *entry; + ErrorInfo error_info; error_info.elevel = edata->elevel; snprintf(error_info.message, ERROR_MESSAGE_LEN, "%s", edata->message); snprintf(error_info.sqlcode, SQLCODE_LEN, "%s", unpack_sql_state(edata->sqlerrcode)); - pgss_store(queryid, /* query id */ - query, /* query text */ - 0, /* query location */ - strlen(query), /* query length */ - NULL, /* PlanInfo */ - 0, /* CmdType */ - NULL, /* SysInfo */ - &error_info, /* ErrorInfo */ - 0, /* total_time */ - 0, /* rows */ - NULL, /* bufusage */ - NULL, /* walusage */ - NULL, /* JumbleState */ - NULL, - PGSS_ERROR); /* pgssStoreKind */ + entry = pgsm_create_hash_entry(0, queryid, NULL); + + pgsm_add_to_list(entry, (char *)query, strlen(query), true); + + pgsm_update_entry(entry, /* entry */ + query, /* query */ + NULL, /* PlanInfo */ + NULL, /* SysInfo */ + &error_info, /* ErrorInfo */ + 0, /* total_time */ + 0, /* rows */ + NULL, /* bufusage */ + NULL, /* walusage */ + NULL, /* jitusage */ + false, /* reset */ + PGSM_ERROR); /* kind */ + + pgsm_store(entry); } +static void +pgsm_add_to_list(pgssEntry *entry, char *query_text, int query_len, bool should_dup) +{ + MemoryContext oldctx; + char *query; + + /* Switch to TopMemoryContext */ + oldctx = MemoryContextSwitchTo(TopMemoryContext); + + if (should_dup) + query = pnstrdup(query_text, query_len); + else + query = query_text; + + lentries = lappend(lentries, entry); + lquery_text = lappend(lquery_text, query); + + MemoryContextSwitchTo(oldctx); +} + + +/* + * Function encapsulating some external calls for filling up the hash key data structure. + * The bucket_id may not be known at this stage. So pass any value that you may wish. + */ +static pgssEntry * +pgsm_create_hash_entry(uint64 bucket_id, uint64 queryid, PlanInfo *plan_info) +{ + pgssEntry *entry; + int sec_ctx; + bool found_client_addr = false; + char app_name[APPLICATIONNAME_LEN] = ""; + char *app_name_ptr = app_name; + int app_name_len = 0; + MemoryContext oldctx; + + /* Create an entry in the TopMemoryContext */ + oldctx = MemoryContextSwitchTo(TopMemoryContext); + entry = palloc0(sizeof(pgssEntry)); + MemoryContextSwitchTo(oldctx); + + /* + * Get the user ID. Let's use this instead of GetUserID as this + * won't throw an assertion in case of an error. + */ + GetUserIdAndSecContext((Oid *) &entry->key.userid, &sec_ctx); + + /* Get the application name and set appid */ + app_name_len = pg_get_application_name(app_name, APPLICATIONNAME_LEN); + entry->key.appid = pgss_hash_string((const char *)app_name_ptr, app_name_len); + + /* client address */ + entry->key.ip = pg_get_client_addr(&found_client_addr); + + /* PlanID, if there is one */ + entry->key.planid = plan_info ? plan_info->planid : 0; + + /* Set remaining data */ + entry->key.dbid = MyDatabaseId; + entry->key.queryid = queryid; + entry->key.bucket_id = bucket_id; + +#if PG_VERSION_NUM < 140000 + entry->key.toplevel = 1; +#else + entry->key.toplevel = ((exec_nested_level + plan_nested_level) == 0); +#endif + + return entry; +} + + /* * Store some statistics for a statement. * @@ -1472,42 +1606,16 @@ pgss_store_error(uint64 queryid, * query string. total_time, rows, bufusage are ignored in this case. */ static void -pgss_store(uint64 queryid, - const char *query, - int query_location, - int query_len, - PlanInfo * plan_info, - CmdType cmd_type, - SysInfo * sys_info, - ErrorInfo * error_info, - double total_time, - uint64 rows, - BufferUsage *bufusage, - WalUsage *walusage, - const struct JitInstrumentation *jitusage, - JumbleState *jstate, - pgssStoreKind kind) +pgsm_store(pgssEntry *entry) { - pgssHashKey key; - pgssEntry *entry; + pgssEntry *shared_hash_entry; pgssSharedState *pgss; - char *app_name_ptr; - char app_name[APPLICATIONNAME_LEN] = ""; - int app_name_len = 0; - bool reset = false; - uint64 pgsm_query_id = 0; + bool found; uint64 bucketid; uint64 prev_bucket_id; - uint64 userid; - uint64 planid; - uint64 appid = 0; - int norm_query_len = 0; - char *norm_query = NULL; - char comments[512] = ""; - bool found_app_name = false; - bool found_client_addr = false; - uint client_addr = 0; - bool found; + bool reset = false; /* Only used in update function - HAMID */ + char *query; + int query_len; /* Safety check... */ if (!IsSystemInitialized()) @@ -1515,136 +1623,30 @@ pgss_store(uint64 queryid, pgss = pgsm_get_ss(); -#if PG_VERSION_NUM >= 140000 - - /* - * Nothing to do if compute_query_id isn't enabled and no other module - * computed a query identifier. - */ - if (queryid == UINT64CONST(0)) - return; -#endif - - query = CleanQuerytext(query, &query_location, &query_len); - -#if PG_VERSION_NUM < 140000 - - /* - * For utility statements, we just hash the query string to get an ID. - */ - if (queryid == UINT64CONST(0)) - { - queryid = pgss_hash_string(query, query_len); - - /* - * If we are unlucky enough to get a hash of zero(invalid), use - * queryID as 2 instead, queryID 1 is already in use for normal - * statements. - */ - if (queryid == UINT64CONST(0)) - queryid = UINT64CONST(2); - } -#endif - - Assert(query != NULL); - if (kind == PGSS_ERROR) - { - int sec_ctx; - - GetUserIdAndSecContext((Oid *) &userid, &sec_ctx); - } - else - userid = GetUserId(); - - /* Try to read application name from GUC directly */ - if (application_name && *application_name) - { - app_name_ptr = application_name; - appid = djb2_hash_str((unsigned char *) application_name, &app_name_len); - } - else - { - app_name_len = pg_get_application_name(app_name, &found_app_name); - if (found_app_name) - appid = djb2_hash((unsigned char *) app_name, app_name_len); - app_name_ptr = app_name; - } - - if (!found_client_addr) - client_addr = pg_get_client_addr(&found_client_addr); - - planid = plan_info ? plan_info->planid : 0; - - /* Extract comments if enabled. */ - if (PGSM_EXTRACT_COMMENTS) - extract_query_comments(query, comments, sizeof(comments)); - + /* We should lock the hash table here what if the bucket is removed; e.g. reset is called - HAMID */ prev_bucket_id = pg_atomic_read_u64(&pgss->current_wbucket); bucketid = get_next_wbucket(pgss); if (bucketid != prev_bucket_id) reset = true; - key.bucket_id = bucketid; - key.userid = userid; - key.dbid = MyDatabaseId; - key.queryid = queryid; - key.ip = client_addr; - key.planid = planid; - key.appid = appid; -#if PG_VERSION_NUM < 140000 - key.toplevel = 1; -#else - key.toplevel = ((exec_nested_level + plan_nested_level) == 0); -#endif + entry->key.bucket_id = bucketid; + query = (char *)llast(lquery_text); + query_len = strlen(query); + /* + * Acquire a share lock to start with. We'd have to acquire exclusive + * if we need ot create the entry. + */ LWLockAcquire(pgss->lock, LW_SHARED); + shared_hash_entry = (pgssEntry *) pgsm_hash_find(get_pgssHash(), &entry->key, &found); - entry = (pgssEntry *) pgsm_hash_find(get_pgssHash(), &key, &found); - if (!entry) + if (!shared_hash_entry) { dsa_pointer dsa_query_pointer; dsa_area *query_dsa_area; char *query_buff; - /* - * Create a new, normalized query string if caller asked. We don't - * need to hold the lock while doing this work. (Note: in any case, - * it's possible that someone else creates a duplicate hashtable entry - * in the interval where we don't hold the lock below. That case is - * handled by entry_alloc. - */ - if (jstate) - { - norm_query_len = query_len; - - LWLockRelease(pgss->lock); - norm_query = generate_normalized_query(jstate, query, - query_location, - &norm_query_len, - GetDatabaseEncoding()); - LWLockAcquire(pgss->lock, LW_SHARED); - - pgsm_query_id = pgss_hash_string(norm_query, norm_query_len); - - /* Free up norm_query if we don't intend to show normalized version in the view */ - if (PGSM_NORMALIZED_QUERY) - { - query_len = norm_query_len; - } - else - { - if (norm_query) - pfree(norm_query); - - norm_query = NULL; - } - } - else - { - pgsm_query_id = pgss_hash_string(query, query_len); - } - /* New query, truncate length if necessary. */ if (query_len > PGSM_QUERY_MAX_LEN) query_len = PGSM_QUERY_MAX_LEN; @@ -1655,26 +1657,27 @@ pgss_store(uint64 queryid, if (!DsaPointerIsValid(dsa_query_pointer)) { LWLockRelease(pgss->lock); - if (norm_query) - pfree(norm_query); return; } + /* Get the memory address from DSA pointer and copy the query text in local variable */ query_buff = dsa_get_address(query_dsa_area, dsa_query_pointer); - memcpy(query_buff, norm_query ? norm_query : query, query_len); - /* OK to create a new hashtable entry */ + memcpy(query_buff, query, query_len); + LWLockRelease(pgss->lock); + LWLockAcquire(pgss->lock, LW_EXCLUSIVE); + + /* OK to create a new hashtable entry */ PGSM_DISABLE_ERROR_CAPUTRE(); { PG_TRY(); { - entry = hash_entry_alloc(pgss, &key, GetDatabaseEncoding()); + shared_hash_entry = hash_entry_alloc(pgss, &entry->key, GetDatabaseEncoding()); } PG_CATCH(); { LWLockRelease(pgss->lock); - if (norm_query) - pfree(norm_query); + if (DsaPointerIsValid(dsa_query_pointer)) dsa_free(query_dsa_area, dsa_query_pointer); PG_RE_THROW(); @@ -1685,47 +1688,41 @@ pgss_store(uint64 queryid, if (entry == NULL) { LWLockRelease(pgss->lock); - if (norm_query) - pfree(norm_query); + if (DsaPointerIsValid(dsa_query_pointer)) dsa_free(query_dsa_area, dsa_query_pointer); + return; } - entry->query_pos = dsa_query_pointer; - entry->pgsm_query_id = pgsm_query_id; - } - else - { - #if USE_DYNAMIC_HASH - if(entry) - dshash_release_lock(get_pgssHash(), entry); - #endif + /* If we already have the pointer set, free this one */ + if (DsaPointerIsValid(shared_hash_entry->query_pos)) + dsa_free(query_dsa_area, dsa_query_pointer); + else + shared_hash_entry->query_pos = dsa_query_pointer; + + shared_hash_entry->pgsm_query_id = entry->pgsm_query_id; + shared_hash_entry->encoding = entry->encoding; + shared_hash_entry->counters.info.cmd_type = entry->counters.info.cmd_type; } - if (jstate == NULL) - pgss_update_entry(entry, /* entry */ - bucketid, /* bucketid */ - queryid, /* queryid */ - query, /* query */ - comments, /* comments */ - plan_info, /* PlanInfo */ - cmd_type, /* CmdType */ - sys_info, /* SysInfo */ - error_info, /* ErrorInfo */ - total_time, /* total_time */ - rows, /* rows */ - bufusage, /* bufusage */ - walusage, /* walusage */ - jitusage, - reset, /* reset */ - kind, /* kind */ - app_name_ptr, - app_name_len); + pgsm_update_entry(shared_hash_entry, /* entry */ + query, /* query */ + &entry->counters.planinfo, /* PlanInfo */ + &entry->counters.sysinfo, /* SysInfo */ + &entry->counters.error, /* ErrorInfo */ + 0, /* total_time */ /* HAMID - need to pass a proper value here */ + entry->counters.calls.rows, /* rows */ + NULL, // HAMID &entry->counters.blocks, /* bufusage */ + NULL, // HAMID &entry->counters.walusage, /* walusage */ + NULL, // HAMID &entry->counters.jitinfo, /* jitusage */ + reset, /* reset */ + PGSM_STORE); + + lentries = list_delete_last(lentries); + lquery_text = list_delete_last(lquery_text); LWLockRelease(pgss->lock); - if (norm_query) - pfree(norm_query); } /* @@ -1863,19 +1860,18 @@ pg_stat_monitor_internal(FunctionCallInfo fcinfo, double stddev; uint64 queryid = entry->key.queryid; int64 bucketid = entry->key.bucket_id; - uint64 dbid = entry->key.dbid; - uint64 userid = entry->key.userid; - int64 ip = entry->key.ip; + Oid dbid = entry->key.dbid; + Oid userid = entry->key.userid; + uint32 ip = entry->key.ip; uint64 planid = entry->key.planid; uint64 pgsm_query_id = entry->pgsm_query_id; dsa_area *query_dsa_area; char *query_ptr; + bool toplevel = entry->key.toplevel; #if PG_VERSION_NUM < 140000 - bool toplevel = 1; bool is_allowed_role = is_member_of_role(GetUserId(), DEFAULT_ROLE_READ_ALL_STATS); #else bool is_allowed_role = is_member_of_role(GetUserId(), ROLE_PG_READ_ALL_STATS); - bool toplevel = entry->key.toplevel; #endif /* Load the query text from dsa area */ if (DsaPointerIsValid(entry->query_pos)) @@ -1905,13 +1901,8 @@ pg_stat_monitor_internal(FunctionCallInfo fcinfo, if (!IsBucketValid(bucketid)) { - if (tmp.state == PGSS_FINISHED) - continue; - } - - /* Skip queries such as, $1, $2 := $3, etc. */ - if (tmp.state == PGSS_PARSE || tmp.state == PGSS_PLAN) continue; + } /* read the parent query text if any */ if (tmp.info.parentid != UINT64CONST(0)) @@ -1939,7 +1930,7 @@ pg_stat_monitor_internal(FunctionCallInfo fcinfo, * pg_read_all_stats members are allowed */ if (is_allowed_role || userid == GetUserId()) - values[i++] = Int64GetDatumFast(ip); + values[i++] = UInt32GetDatum(ip); else nulls[i++] = true; @@ -3281,7 +3272,7 @@ pgsm_emit_log_hook(ErrorData *edata) if (debug_query_string) queryid = pgss_hash_string(debug_query_string, strlen(debug_query_string)); - pgss_store_error(queryid, + pgsm_store_error(queryid, debug_query_string ? debug_query_string : "", edata); } @@ -3492,34 +3483,3 @@ get_query_id(JumbleState *jstate, Query *query) return queryid; } #endif - -static uint64 -djb2_hash(unsigned char *str, size_t len) -{ - uint64 hash = 5381LLU; - - while (len--) - hash = ((hash << 5) + hash) ^ *str++; - /* hash(i - 1) * 33 ^ str[i] */ - - return hash; -} - -static uint64 -djb2_hash_str(unsigned char *str, int *out_len) -{ - uint64 hash = 5381LLU; - unsigned char *start = str; - unsigned char c; - - while ((c = *str) != '\0') - { - hash = ((hash << 5) + hash) ^ c; - /* hash(i - 1) * 33 ^ str[i] */ - ++str; - } - - *out_len = str - start; - - return hash; -} diff --git a/pg_stat_monitor.h b/pg_stat_monitor.h index 6bcf089..9c8bcd1 100644 --- a/pg_stat_monitor.h +++ b/pg_stat_monitor.h @@ -196,23 +196,23 @@ typedef enum OVERFLOW_TARGET OVERFLOW_TARGET_DISK } OVERFLOW_TARGET; -typedef enum pgssStoreKind +typedef enum pgsmStoreKind { - PGSS_INVALID = -1, + PGSM_INVALID = -1, /* - * PGSS_PLAN and PGSS_EXEC must be respectively 0 and 1 as they're used to + * PGSM_PLAN and PGSM_EXEC must be respectively 0 and 1 as they're used to * reference the underlying values in the arrays in the Counters struct, - * and this order is required in pg_stat_statements_internal(). + * and this order is required in pg_stat_monitor_internal(). */ - PGSS_PARSE = 0, - PGSS_PLAN, - PGSS_EXEC, - PGSS_FINISHED, - PGSS_ERROR, + PGSM_PARSE = 0, + PGSM_PLAN, + PGSM_EXEC, + PGSM_STORE, + PGSM_ERROR, PGSS_NUMKIND /* Must be last value of this enum */ -} pgssStoreKind; +} pgsmStoreKind; /* the assumption of query max nested level */ #define DEFAULT_MAX_NESTED_LEVEL 10 @@ -251,12 +251,12 @@ typedef struct pgssHashKey { uint64 bucket_id; /* bucket number */ uint64 queryid; /* query identifier */ - uint64 userid; /* user OID */ - uint64 dbid; /* database OID */ - uint64 ip; /* client ip address */ uint64 planid; /* plan identifier */ uint64 appid; /* hash of application name */ - uint64 toplevel; /* query executed at top level */ + Oid userid; /* user OID */ + Oid dbid; /* database OID */ + uint32 ip; /* client ip address */ + bool toplevel; /* query executed at top level */ } pgssHashKey; typedef struct QueryInfo @@ -339,7 +339,6 @@ typedef struct Wal_Usage typedef struct Counters { - uint64 bucket_id; /* bucket id */ Calls calls; QueryInfo info; CallTime time; @@ -479,8 +478,6 @@ void pgss_shmem_startup(void); void pgss_shmem_shutdown(int code, Datum arg); int pgsm_get_bucket_size(void); pgssSharedState *pgsm_get_ss(void); -void hash_entry_reset(void); -void hash_query_entryies_reset(void); void hash_query_entries(); void hash_query_entry_dealloc(int new_bucket_id, int old_bucket_id, unsigned char *query_buffer[]); void hash_entry_dealloc(int new_bucket_id, int old_bucket_id, unsigned char *query_buffer);