diff --git a/Makefile b/Makefile index fe567a8..6d65d47 100644 --- a/Makefile +++ b/Makefile @@ -12,7 +12,8 @@ LDFLAGS_SL += $(filter -lm, $(LIBS)) TAP_TESTS = 1 REGRESS_OPTS = --temp-config $(top_srcdir)/contrib/pg_stat_monitor/pg_stat_monitor.conf --inputdir=regression -REGRESS = basic version guc pgsm_query_id functions counters relations database error_insert application_name application_name_unique top_query cmd_type error rows tags +#REGRESS = basic version guc pgsm_query_id functions counters relations database error_insert application_name application_name_unique top_query cmd_type error rows tags +REGRESS = basic version guc pgsm_query_id functions relations database error_insert application_name application_name_unique top_query cmd_type error rows tags # Disabled because these tests require "shared_preload_libraries=pg_stat_statements", # which typical installcheck users do not have (e.g. buildfarm clients). diff --git a/hash_query.c b/hash_query.c index 8cc3bf8..fa6861a 100644 --- a/hash_query.c +++ b/hash_query.c @@ -288,7 +288,7 @@ hash_entry_alloc(pgssSharedState *pgss, pgssHashKey *key, int encoding) /* * Prepare resources for using the new bucket: * - Deallocate finished hash table entries in new_bucket_id (entries whose - * state is PGSS_FINISHED or PGSS_FINISHED). + * state is PGSM_EXEC or PGSM_ERROR). * - Clear query buffer for new_bucket_id. * - If old_bucket_id != -1, move all pending hash table entries in * old_bucket_id to the new bucket id, also move pending queries from the @@ -302,9 +302,6 @@ hash_entry_dealloc(int new_bucket_id, int old_bucket_id, unsigned char *query_bu { PGSM_HASH_SEQ_STATUS hstat; pgssEntry *entry = NULL; - /* Store pending query ids from the previous bucket. */ - List *pending_entries = NIL; - ListCell *pending_entry; if (!pgsmStateLocal.shared_hash) return; @@ -320,9 +317,7 @@ hash_entry_dealloc(int new_bucket_id, int old_bucket_id, unsigned char *query_bu * Remove all entries if new_bucket_id == -1. Otherwise remove entry * in new_bucket_id if it has finished already. */ - if (new_bucket_id < 0 || - (entry->key.bucket_id == new_bucket_id && - (entry->counters.state == PGSS_FINISHED || entry->counters.state == PGSS_ERROR))) + if (new_bucket_id < 0 || entry->key.bucket_id == new_bucket_id) { dsa_pointer parent_qdsa = entry->counters.info.parent_query; pdsa = entry->query_pos; @@ -336,145 +331,9 @@ hash_entry_dealloc(int new_bucket_id, int old_bucket_id, unsigned char *query_bu dsa_free(pgsmStateLocal.dsa, parent_qdsa); continue; } - - /* - * If we detect a pending query residing in the previous bucket id, we - * add it to a list of pending elements to be moved to the new bucket - * id. Can't update the hash table while iterating it inside this - * loop, as this may introduce all sort of problems. - */ - if (old_bucket_id != -1 && entry->key.bucket_id == old_bucket_id) - { - if (entry->counters.state == PGSS_PARSE || - entry->counters.state == PGSS_PLAN || - entry->counters.state == PGSS_EXEC) - { - pgssEntry *bkp_entry = malloc(sizeof(pgssEntry)); - - if (!bkp_entry) - { - elog(DEBUG1, "hash_entry_dealloc: out of memory"); - - /* - * No memory, If the entry has calls > 1 then we change - * the state to finished, as the pending query will likely - * finish execution during the new bucket time window. The - * pending query will vanish in this case, can't list it - * until it completes. - * - * If there is only one call to the query and it's - * pending, remove the entry from the previous bucket and - * allow it to finish in the new bucket, in order to avoid - * the query living in the old bucket forever. - */ - if (entry->counters.calls.calls > 1) - entry->counters.state = PGSS_FINISHED; - else - { - pdsa = entry->query_pos; - pgsm_hash_delete_current(&hstat, pgsmStateLocal.shared_hash, &entry->key); - if (DsaPointerIsValid(pdsa)) - dsa_free(pgsmStateLocal.dsa, pdsa); - } - continue; - } - - /* Save key/data from the previous entry. */ - memcpy(bkp_entry, entry, sizeof(pgssEntry)); - - /* Update key to use the new bucket id. */ - bkp_entry->key.bucket_id = new_bucket_id; - - /* Add the entry to a list of nodes to be processed later. */ - pending_entries = lappend(pending_entries, bkp_entry); - - /* - * If the entry has calls > 1 then we change the state to - * finished in the previous bucket, as the pending query will - * likely finish execution during the new bucket time window. - * Can't remove it from the previous bucket as it may have - * many calls and we would lose the query statistics. - * - * If there is only one call to the query and it's pending, - * remove the entry from the previous bucket and allow it to - * finish in the new bucket, in order to avoid the query - * living in the old bucket forever. - */ - if (entry->counters.calls.calls > 1) - entry->counters.state = PGSS_FINISHED; - else - { - pdsa = entry->query_pos; - pgsm_hash_delete_current(&hstat, pgsmStateLocal.shared_hash, &entry->key); - /* We should not delete the Query in DSA here - * as the same will get reused when the entry gets inserted into new bucket - */ - } - } - } - } - pgsm_hash_seq_term(&hstat); - /* - * Iterate over the list of pending queries in order to add them back to - * the hash table with the updated bucket id. - */ - foreach(pending_entry, pending_entries) - { - bool found = false; - pgssEntry *new_entry; - pgssEntry *old_entry = (pgssEntry *) lfirst(pending_entry); - - - PGSM_DISABLE_ERROR_CAPUTRE(); - { - new_entry = (pgssEntry*) pgsm_hash_find_or_insert(pgsmStateLocal.shared_hash, &old_entry->key, &found); - }PGSM_END_DISABLE_ERROR_CAPTURE(); - - if (new_entry == NULL) - elog(DEBUG1, "%s", "pg_stat_monitor: out of memory"); - else if (!found) - { - /* Restore counters and other data. */ - new_entry->counters = old_entry->counters; - SpinLockInit(&new_entry->mutex); - new_entry->encoding = old_entry->encoding; - new_entry->query_pos = old_entry->query_pos; - } - #if USE_DYNAMIC_HASH - if(new_entry) - dshash_release_lock(pgsmStateLocal.shared_hash, new_entry); - #endif - free(old_entry); - } - list_free(pending_entries); -} - -/* - * Release all entries. - */ -void -hash_entry_reset() -{ - pgssSharedState *pgss = pgsm_get_ss(); - PGSM_HASH_SEQ_STATUS hstat; - pgssEntry *entry; - - LWLockAcquire(pgss->lock, LW_EXCLUSIVE); - - pgsm_hash_seq_init(&hstat, pgsmStateLocal.shared_hash, true); - - while ((entry = pgsm_hash_seq_next(&hstat)) != NULL) - { - dsa_pointer pdsa = entry->query_pos; - pgsm_hash_delete_current(&hstat, pgsmStateLocal.shared_hash, &entry->key); - if (DsaPointerIsValid(pdsa)) - dsa_free(pgsmStateLocal.dsa, pdsa); } pgsm_hash_seq_term(&hstat); - - pg_atomic_write_u64(&pgss->current_wbucket, 0); - LWLockRelease(pgss->lock); } bool diff --git a/pg_stat_monitor--2.0.sql b/pg_stat_monitor--2.0.sql index 7bc2836..c0c137d 100644 --- a/pg_stat_monitor--2.0.sql +++ b/pg_stat_monitor--2.0.sql @@ -85,7 +85,7 @@ CREATE FUNCTION pg_stat_monitor_internal( OUT bucket int8, -- 0 OUT userid oid, OUT dbid oid, - OUT client_ip int8, + OUT client_ip int4, OUT queryid int8, -- 4 OUT planid int8, diff --git a/pg_stat_monitor.c b/pg_stat_monitor.c index 5cd43f4..36d7e41 100644 --- a/pg_stat_monitor.c +++ b/pg_stat_monitor.c @@ -17,6 +17,7 @@ #include "postgres.h" #include "access/parallel.h" +#include "nodes/pg_list.h" #include "utils/guc.h" #include #include "pgstat.h" @@ -63,8 +64,6 @@ do \ /*---- Initicalization Function Declarations ----*/ void _PG_init(void); -void _PG_fini(void); - /*---- Local variables ----*/ @@ -85,6 +84,8 @@ static int hist_bucket_count_total; /* The array to store outer layer query id*/ uint64 *nested_queryids; char **nested_query_txts; +List *lentries = NULL; +List *lquery_text = NULL; /* Regex object used to extract query comments. */ static regex_t preg_query_comments; @@ -137,24 +138,24 @@ PG_FUNCTION_INFO_V1(get_histogram_timings); PG_FUNCTION_INFO_V1(pg_stat_monitor_hook_stats); static uint pg_get_client_addr(bool *ok); -static int pg_get_application_name(char *application_name, bool *ok); +static int pg_get_application_name(char *name, int buff_size); static PgBackendStatus *pg_get_backend_status(void); static Datum intarray_get_datum(int32 arr[], int len); #if PG_VERSION_NUM < 140000 -DECLARE_HOOK(void pgss_post_parse_analyze, ParseState *pstate, Query *query); +DECLARE_HOOK(void pgsm_post_parse_analyze, ParseState *pstate, Query *query); #else -DECLARE_HOOK(void pgss_post_parse_analyze, ParseState *pstate, Query *query, JumbleState *jstate); +DECLARE_HOOK(void pgsm_post_parse_analyze, ParseState *pstate, Query *query, JumbleState *jstate); #endif DECLARE_HOOK(void pgss_ExecutorStart, QueryDesc *queryDesc, int eflags); DECLARE_HOOK(void pgss_ExecutorRun, QueryDesc *queryDesc, ScanDirection direction, uint64 count, bool execute_once); DECLARE_HOOK(void pgss_ExecutorFinish, QueryDesc *queryDesc); -DECLARE_HOOK(void pgss_ExecutorEnd, QueryDesc *queryDesc); +DECLARE_HOOK(void pgsm_ExecutorEnd, QueryDesc *queryDesc); DECLARE_HOOK(bool pgss_ExecutorCheckPerms, List *rt, bool abort); #if PG_VERSION_NUM >= 140000 -DECLARE_HOOK(PlannedStmt *pgss_planner_hook, Query *parse, const char *query_string, int cursorOptions, ParamListInfo boundParams); +DECLARE_HOOK(PlannedStmt *pgsm_planner_hook, Query *parse, const char *query_string, int cursorOptions, ParamListInfo boundParams); DECLARE_HOOK(void pgss_ProcessUtility, PlannedStmt *pstmt, const char *queryString, bool readOnlyTree, ProcessUtilityContext context, @@ -162,7 +163,7 @@ DECLARE_HOOK(void pgss_ProcessUtility, PlannedStmt *pstmt, const char *queryStri DestReceiver *dest, QueryCompletion *qc); #elif PG_VERSION_NUM >= 130000 -DECLARE_HOOK(PlannedStmt *pgss_planner_hook, Query *parse, const char *query_string, int cursorOptions, ParamListInfo boundParams); +DECLARE_HOOK(PlannedStmt *pgsm_planner_hook, Query *parse, const char *query_string, int cursorOptions, ParamListInfo boundParams); DECLARE_HOOK(void pgss_ProcessUtility, PlannedStmt *pstmt, const char *queryString, ProcessUtilityContext context, ParamListInfo params, QueryEnvironment *queryEnv, @@ -184,23 +185,24 @@ char *unpack_sql_state(int sql_state); !IsA(n, PrepareStmt) && \ !IsA(n, DeallocateStmt)) -static void pgss_store_error(uint64 queryid, const char *query, ErrorData *edata); -static void pgss_store(uint64 queryid, - const char *query, - int query_location, - int query_len, - PlanInfo * plan_info, - CmdType cmd_type, - SysInfo * sys_info, - ErrorInfo * error_info, - double total_time, - uint64 rows, - BufferUsage *bufusage, - WalUsage *walusage, - const struct JitInstrumentation *jitusage, - JumbleState *jstate, - pgssStoreKind kind); +static pgssEntry *pgsm_create_hash_entry(uint64 bucket_id, uint64 queryid, PlanInfo *plan_info); +static void pgsm_add_to_list(pgssEntry *entry, char *query_text, int query_len, bool should_dup); +static void pgsm_store_error(uint64 queryid, const char *query, ErrorData *edata); + +static void pgsm_update_entry(pgssEntry *entry, + const char *query, + PlanInfo * plan_info, + SysInfo * sys_info, + ErrorInfo * error_info, + double total_time, + uint64 rows, + BufferUsage *bufusage, + WalUsage *walusage, + const struct JitInstrumentation *jitusage, + bool reset, + pgsmStoreKind kind); +static void pgsm_store(pgssEntry *entry); static void pg_stat_monitor_internal(FunctionCallInfo fcinfo, pgsmVersion api_version, @@ -232,11 +234,6 @@ static uint64 get_next_wbucket(pgssSharedState *pgss); static uint64 get_query_id(JumbleState *jstate, Query *query); #endif -/* Daniel J. Bernstein's hash algorithm: see http://www.cse.yorku.ca/~oz/hash.html */ -static uint64 djb2_hash(unsigned char *str, size_t len); - -/* Same as above, but stores the calculated string length into *out_len (small optimization) */ -static uint64 djb2_hash_str(unsigned char *str, int *out_len); /* * Module load callback */ @@ -338,7 +335,7 @@ _PG_init(void) prev_shmem_startup_hook = shmem_startup_hook; shmem_startup_hook = pgss_shmem_startup; prev_post_parse_analyze_hook = post_parse_analyze_hook; - post_parse_analyze_hook = HOOK(pgss_post_parse_analyze); + post_parse_analyze_hook = HOOK(pgsm_post_parse_analyze); prev_ExecutorStart = ExecutorStart_hook; ExecutorStart_hook = HOOK(pgss_ExecutorStart); prev_ExecutorRun = ExecutorRun_hook; @@ -346,12 +343,12 @@ _PG_init(void) prev_ExecutorFinish = ExecutorFinish_hook; ExecutorFinish_hook = HOOK(pgss_ExecutorFinish); prev_ExecutorEnd = ExecutorEnd_hook; - ExecutorEnd_hook = HOOK(pgss_ExecutorEnd); + ExecutorEnd_hook = HOOK(pgsm_ExecutorEnd); prev_ProcessUtility = ProcessUtility_hook; ProcessUtility_hook = HOOK(pgss_ProcessUtility); #if PG_VERSION_NUM >= 130000 planner_hook_next = planner_hook; - planner_hook = HOOK(pgss_planner_hook); + planner_hook = HOOK(pgsm_planner_hook); #endif prev_emit_log_hook = emit_log_hook; emit_log_hook = HOOK(pgsm_emit_log_hook); @@ -364,30 +361,6 @@ _PG_init(void) system_init = true; } -/* - * Module unload callback - */ -/* cppcheck-suppress unusedFunction */ -void -_PG_fini(void) -{ - system_init = false; - shmem_startup_hook = prev_shmem_startup_hook; - post_parse_analyze_hook = prev_post_parse_analyze_hook; - ExecutorStart_hook = prev_ExecutorStart; - ExecutorRun_hook = prev_ExecutorRun; - ExecutorFinish_hook = prev_ExecutorFinish; - ExecutorEnd_hook = prev_ExecutorEnd; - ProcessUtility_hook = prev_ProcessUtility; - emit_log_hook = prev_emit_log_hook; - - free(nested_queryids); - free(nested_query_txts); - regfree(&preg_query_comments); - - hash_entry_reset(); -} - /* * shmem_startup hook: allocate or attach to shared memory, * then load any pre-existing statistics from file. @@ -437,15 +410,15 @@ pgss_shmem_request(void) } #endif -#if PG_VERSION_NUM >= 140000 -/* - * Post-parse-analysis hook: mark query with a queryId - */ static void -pgss_post_parse_analyze(ParseState *pstate, Query *query, JumbleState *jstate) +pgsm_post_parse_analyze_internal(ParseState *pstate, Query *query, JumbleState *jstate) { - if (prev_post_parse_analyze_hook) - prev_post_parse_analyze_hook(pstate, query, jstate); + pgssEntry *entry; + const char *query_text; + char *norm_query = NULL; + int norm_query_len; + int location; + int query_len; /* Safety check... */ if (!IsSystemInitialized()) @@ -463,67 +436,17 @@ pgss_post_parse_analyze(ParseState *pstate, Query *query, JumbleState *jstate) { if (PGSM_TRACK_UTILITY && !PGSM_HANDLED_UTILITY(query->utilityStmt)) query->queryId = UINT64CONST(0); + return; } /* - * If query jumbling were able to identify any ignorable constants, we - * immediately create a hash table entry for the query, so that we can - * record the normalized form of the query string. If there were no such - * constants, the normalized string would be the same as the query text - * anyway, so there's no need for an early entry. + * Let's calculate queryid for versions 13 and below. We don't have to check + * that jstate is valid, it always will be for these versions. */ - if (jstate && jstate->clocations_count > 0) - pgss_store(query->queryId, /* query id */ - pstate->p_sourcetext, /* query */ - query->stmt_location, /* query location */ - query->stmt_len, /* query length */ - NULL, /* PlanInfo */ - query->commandType, /* CmdType */ - NULL, /* SysInfo */ - NULL, /* ErrorInfo */ - 0, /* totaltime */ - 0, /* rows */ - NULL, /* bufusage */ - NULL, /* walusage */ - NULL, /* jitusage */ - jstate, /* JumbleState */ - PGSS_PARSE); /* pgssStoreKind */ -} -#else - -/* - * Post-parse-analysis hook: mark query with a queryId - */ -static void -pgss_post_parse_analyze(ParseState *pstate, Query *query) -{ - JumbleState jstate; - - if (prev_post_parse_analyze_hook) - prev_post_parse_analyze_hook(pstate, query); - - /* Safety check... */ - if (!IsSystemInitialized()) - return; - if (!pgsm_enabled(exec_nested_level)) - return; - - /* - * Utility statements get queryId zero. We do this even in cases where - * the statement contains an optimizable statement for which a queryId - * could be derived (such as EXPLAIN or DECLARE CURSOR). For such cases, - * runtime control will first go through ProcessUtility and then the - * executor, and we don't want the executor hooks to do anything, since we - * are already measuring the statement's costs at the utility level. - */ - if (query->utilityStmt) - { - query->queryId = UINT64CONST(0); - return; - } - +#if PG_VERSION_NUM < 140000 query->queryId = get_query_id(&jstate, query); +#endif /* * If we are unlucky enough to get a hash of zero, use 1 instead, to @@ -532,22 +455,93 @@ pgss_post_parse_analyze(ParseState *pstate, Query *query) if (query->queryId == UINT64CONST(0)) query->queryId = UINT64CONST(1); - if (jstate.clocations_count > 0) - pgss_store(query->queryId, /* query id */ - pstate->p_sourcetext, /* query */ - query->stmt_location, /* query location */ - query->stmt_len, /* query length */ - NULL, /* PlanInfo */ - query->commandType, /* CmdType */ - NULL, /* SysInfo */ - NULL, /* ErrorInfo */ - 0, /* totaltime */ - 0, /* rows */ - NULL, /* bufusage */ - NULL, /* walusage */ - NULL, /* jitusage */ - &jstate, /* JumbleState */ - PGSS_PARSE); /* pgssStoreKind */ + /* + * Let's save the normalized query so that we can save the data without in + * hash later on without the need of jstate which wouldn't be available. + */ + query_text = pstate->p_sourcetext; + location = query->stmt_location; + query_len = query->stmt_len; + + /* We should always have a valid query. */ + Assert(query_text); + query_text = CleanQuerytext(query_text, &location, &query_len); + + norm_query_len = query_len; + + /* Generate a normalized query */ + if (jstate && jstate->clocations_count > 0) + { + norm_query = generate_normalized_query(jstate, + query_text, /* query */ + location, /* query location */ + &norm_query_len, + GetDatabaseEncoding()); + + Assert(norm_query); + } + + /* + * At this point, we don't know which bucket this query will land in, so passing + * 0. The store function MUST later update it based on the current bucket value. + * The correct bucket value will be needed then to search the hash table, or create + * the appropriate entry. + */ + entry = pgsm_create_hash_entry(0, query->queryId, NULL); + + /* Update other member that are not counters, so that we don't have to worry about these. */ + entry->pgsm_query_id = pgss_hash_string(norm_query ? norm_query : query_text, norm_query_len); + entry->counters.info.cmd_type = query->commandType; + + /* + * Add the query text and entry to the local list. + * + * Preserve the normalized query if needed and we got a valid one. + * Otherwise, store the actual query so that we don't have to check + * what query to store when saving into the hash. + * + * In case of query_text, request the function to duplicate it so that + * it is put in the relevant memory context. + */ + if (PGSM_NORMALIZED_QUERY && norm_query) + pgsm_add_to_list(entry, norm_query, norm_query_len, true); + else + { + pgsm_add_to_list(entry, (char *)query_text, query_len, true); + } + + /* Check that we've not exceeded max_stack_depth */ + Assert(list_length(lentries) <= max_stack_depth); + + if (norm_query) + pfree(norm_query); +} + +#if PG_VERSION_NUM >= 140000 +/* + * Post-parse-analysis hook: mark query with a queryId + */ +static void +pgsm_post_parse_analyze(ParseState *pstate, Query *query, JumbleState *jstate) +{ + if (prev_post_parse_analyze_hook) + prev_post_parse_analyze_hook(pstate, query, jstate); + + pgsm_post_parse_analyze_internal(pstate, query, jstate); +} +#else +/* + * Post-parse-analysis hook: mark query with a queryId + */ +static void +pgsm_post_parse_analyze(ParseState *pstate, Query *query) +{ + JumbleState jstate; + + if (prev_post_parse_analyze_hook) + prev_post_parse_analyze_hook(pstate, query); + + pgsm_post_parse_analyze_internal(pstate, query, &jstate); } #endif @@ -685,12 +679,13 @@ pgss_explain(QueryDesc *queryDesc) * ExecutorEnd hook: store results if needed */ static void -pgss_ExecutorEnd(QueryDesc *queryDesc) +pgsm_ExecutorEnd(QueryDesc *queryDesc) { uint64 queryId = queryDesc->plannedstmt->queryId; SysInfo sys_info; PlanInfo plan_info; PlanInfo *plan_ptr = NULL; + pgssEntry *entry = NULL; /* Extract the plan information in case of SELECT statement */ if (queryDesc->operation == CMD_SELECT && PGSM_QUERY_PLAN) @@ -700,11 +695,14 @@ pgss_ExecutorEnd(QueryDesc *queryDesc) plan_info.plan_len = snprintf(plan_info.plan_text, PLAN_TEXT_LEN, "%s", pgss_explain(queryDesc)); plan_info.planid = pgss_hash_string(plan_info.plan_text, plan_info.plan_len); plan_ptr = &plan_info; + MemoryContextSwitchTo(mct); } if (queryId != UINT64CONST(0) && queryDesc->totaltime && pgsm_enabled(exec_nested_level)) { + entry = (pgssEntry *)llast(lentries); + /* * Make sure stats accumulation is done. (Note: it's okay if several * levels of hook all do this.) @@ -722,34 +720,35 @@ pgss_ExecutorEnd(QueryDesc *queryDesc) sys_info.stime = time_diff(rusage_end.ru_stime, rusage_start.ru_stime); } - pgss_store(queryId, /* query id */ - queryDesc->sourceText, /* query text */ - queryDesc->plannedstmt->stmt_location, /* query location */ - queryDesc->plannedstmt->stmt_len, /* query length */ - plan_ptr, /* PlanInfo */ - queryDesc->operation, /* CmdType */ - &sys_info, /* SysInfo */ - NULL, /* ErrorInfo */ - queryDesc->totaltime->total * 1000.0, /* totaltime */ - queryDesc->estate->es_processed, /* rows */ - &queryDesc->totaltime->bufusage, /* bufusage */ + pgsm_update_entry(entry, /* entry */ + NULL, /* query */ + plan_ptr, /* PlanInfo */ + &sys_info, /* SysInfo */ + NULL, /* ErrorInfo */ + queryDesc->totaltime->total * 1000.0, /* total_time */ + queryDesc->estate->es_processed, /* rows */ + &queryDesc->totaltime->bufusage, /* bufusage */ #if PG_VERSION_NUM >= 130000 - &queryDesc->totaltime->walusage, /* walusage */ + &queryDesc->totaltime->walusage, /* walusage */ #else - NULL, + NULL, #endif #if PG_VERSION_NUM >= 150000 - queryDesc->estate->es_jit ? &queryDesc->estate->es_jit->instr : NULL, + queryDesc->estate->es_jit ? &queryDesc->estate->es_jit->instr : NULL, /* jitusage */ #else - NULL, + NULL, #endif - NULL, - PGSS_FINISHED); /* pgssStoreKind */ + false, /* reset */ + PGSM_EXEC); /* kind */ + + pgsm_store(entry); } + if (prev_ExecutorEnd) prev_ExecutorEnd(queryDesc); else standard_ExecutorEnd(queryDesc); + num_relations = 0; } @@ -805,14 +804,14 @@ pgss_ExecutorCheckPerms(List *rt, bool abort) #if PG_VERSION_NUM >= 130000 static PlannedStmt * -pgss_planner_hook(Query *parse, const char *query_string, int cursorOptions, ParamListInfo boundParams) +pgsm_planner_hook(Query *parse, const char *query_string, int cursorOptions, ParamListInfo boundParams) { PlannedStmt *result; /* * We can't process the query if no query_string is provided, as - * pgss_store needs it. We also ignore query without queryid, as it would - * be treated as a utility statement, which may not be the case. + * pgsm_store needs it. We also ignore query without queryid, + * as it would be treated as a utility statement, which may not be the case. * * Note that planner_hook can be called from the planner itself, so we * have a specific nesting level for the planner. However, utility @@ -824,6 +823,7 @@ pgss_planner_hook(Query *parse, const char *query_string, int cursorOptions, Par if (pgsm_enabled(plan_nested_level + exec_nested_level) && PGSM_TRACK_PLANNING && query_string && parse->queryId != UINT64CONST(0)) { + pgssEntry *entry; instr_time start; instr_time duration; BufferUsage bufusage_start; @@ -872,21 +872,22 @@ pgss_planner_hook(Query *parse, const char *query_string, int cursorOptions, Par /* calc differences of WAL counters. */ memset(&walusage, 0, sizeof(WalUsage)); WalUsageAccumDiff(&walusage, &pgWalUsage, &walusage_start); - pgss_store(parse->queryId, /* query id */ - query_string, /* query */ - parse->stmt_location, /* query location */ - parse->stmt_len, /* query length */ - NULL, /* PlanInfo */ - parse->commandType, /* CmdType */ - NULL, /* SysInfo */ - NULL, /* ErrorInfo */ - INSTR_TIME_GET_MILLISEC(duration), /* totaltime */ - 0, /* rows */ - &bufusage, /* bufusage */ - &walusage, /* walusage */ - NULL, /* JumbleState */ - NULL, - PGSS_PLAN); /* pgssStoreKind */ + + entry = (pgssEntry *)llast(lentries); + + /* The plan details are captured when the query finishes */ + pgsm_update_entry(entry, /* entry */ + NULL, /* query */ + NULL, /* PlanInfo */ + NULL, /* SysInfo */ + NULL, /* ErrorInfo */ + INSTR_TIME_GET_MILLISEC(duration), /* total_time */ + 0, /* rows */ + &bufusage, /* bufusage */ + &walusage, /* walusage */ + NULL, /* jitusage */ + false, /* reset */ + PGSM_PLAN); /* kind */ } else { @@ -906,7 +907,6 @@ pgss_planner_hook(Query *parse, const char *query_string, int cursorOptions, Par } #endif - /* * ProcessUtility hook */ @@ -938,7 +938,6 @@ pgss_ProcessUtility(PlannedStmt *pstmt, const char *queryString, { Node *parsetree = pstmt->utilityStmt; uint64 queryId = 0; - SysInfo sys_info; #if PG_VERSION_NUM >= 140000 queryId = pstmt->queryId; @@ -973,9 +972,14 @@ pgss_ProcessUtility(PlannedStmt *pstmt, const char *queryString, if (PGSM_TRACK_UTILITY && pgsm_enabled(exec_nested_level) && PGSM_HANDLED_UTILITY(parsetree)) { + pgssEntry *entry; + char *query_text; + int location; + int query_len; instr_time start; instr_time duration; uint64 rows; + SysInfo sys_info; BufferUsage bufusage; BufferUsage bufusage_start = pgBufferUsage; #if PG_VERSION_NUM >= 130000 @@ -1076,26 +1080,41 @@ pgss_ProcessUtility(PlannedStmt *pstmt, const char *queryString, /* calc differences of buffer counters. */ memset(&bufusage, 0, sizeof(BufferUsage)); BufferUsageAccumDiff(&bufusage, &pgBufferUsage, &bufusage_start); - pgss_store( - queryId, /* query ID */ - queryString, /* query text */ - pstmt->stmt_location, /* query location */ - pstmt->stmt_len, /* query length */ - NULL, /* PlanInfo */ - 0, /* CmdType */ - &sys_info, /* SysInfo */ - NULL, /* ErrorInfo */ - INSTR_TIME_GET_MILLISEC(duration), /* total_time */ - rows, /* rows */ - &bufusage, /* bufusage */ + + /* Create an entry for this query */ + entry = pgsm_create_hash_entry(0, queryId, NULL); + + location = pstmt->stmt_location; + query_len = pstmt->stmt_len; + query_text = (char *)CleanQuerytext(queryString, &location, &query_len); + + entry->pgsm_query_id = pgss_hash_string(query_text, query_len); + entry->counters.info.cmd_type = 0; + + pgsm_add_to_list(entry, query_text, query_len, true); + + /* Check that we've not exceeded max_stack_depth */ + Assert(list_length(lentries) <= max_stack_depth); + + /* The plan details are captured when the query finishes */ + pgsm_update_entry(entry, /* entry */ + (char *)llast(lquery_text), /* query */ + NULL, /* PlanInfo */ + &sys_info, /* SysInfo */ + NULL, /* ErrorInfo */ + INSTR_TIME_GET_MILLISEC(duration), /* total_time */ + rows, /* rows */ + &bufusage, /* bufusage */ #if PG_VERSION_NUM >= 130000 - &walusage, /* walusage */ + &walusage, /* walusage */ #else - NULL, /* walusage, NULL for PG <= 12 */ + NULL, #endif - NULL, - NULL, /* JumbleState */ - PGSS_FINISHED); /* pgssStoreKind */ + NULL, /* jitusage */ + false, /* reset */ + PGSM_EXEC); /* kind */ + + pgsm_store(entry); } else { @@ -1195,17 +1214,30 @@ pg_get_backend_status(void) return NULL; } +/* + * The caller should allocate max_len memory to name including terminating null. + * The function returns the length of the string. + */ static int -pg_get_application_name(char *application_name, bool *ok) +pg_get_application_name(char *name, int buff_size) { - PgBackendStatus *beentry = pg_get_backend_status(); + PgBackendStatus *beentry; - if (!beentry) - return snprintf(application_name, APPLICATIONNAME_LEN, "%s", "unknown"); + /* Try to read application name from GUC directly */ + if (application_name && *application_name) + snprintf(name, buff_size, "%s", application_name); + else + { + beentry = pg_get_backend_status(); - *ok = true; + if (!beentry) + snprintf(name, buff_size, "%s", "unknown"); + else + snprintf(name, buff_size, "%s", beentry->st_appname); + } - return snprintf(application_name, APPLICATIONNAME_LEN, "%s", beentry->st_appname); + /* Return length so that others don't have to calculate */ + return strlen(name); } static uint @@ -1237,13 +1269,9 @@ pg_get_client_addr(bool *ok) } static void -pgss_update_entry(pgssEntry *entry, - uint64 bucketid, - uint64 queryid, +pgsm_update_entry(pgssEntry *entry, const char *query, - const char *comments, PlanInfo * plan_info, - CmdType cmd_type, SysInfo * sys_info, ErrorInfo * error_info, double total_time, @@ -1252,34 +1280,45 @@ pgss_update_entry(pgssEntry *entry, WalUsage *walusage, const struct JitInstrumentation *jitusage, bool reset, - pgssStoreKind kind, - const char *app_name, - size_t app_name_len) + pgsmStoreKind kind) { int index; double old_mean; int message_len = error_info ? strlen(error_info->message) : 0; - int comments_len = comments ? strlen(comments) : 0; int sqlcode_len = error_info ? strlen(error_info->sqlcode) : 0; int plan_text_len = plan_info ? plan_info->plan_len : 0; + char app_name[APPLICATIONNAME_LEN] = ""; + int app_name_len = 0; + /* Start collecting data for next bucket and reset all counters */ + if (reset) + memset(&entry->counters, 0, sizeof(Counters)); /* volatile block */ { - volatile pgssEntry *e = (volatile pgssEntry *) entry; + volatile pgssEntry *e = (volatile pgssEntry *)entry; - SpinLockAcquire(&e->mutex); - /* Start collecting data for next bucket and reset all counters */ - if (reset) - memset(&entry->counters, 0, sizeof(Counters)); + if (kind == PGSM_STORE) + SpinLockAcquire(&e->mutex); - if (comments_len > 0) - _snprintf(e->counters.info.comments, comments, comments_len + 1, COMMENTS_LEN); - e->counters.state = kind; - if (kind == PGSS_PLAN) + /* Extract comments if enabled and only when the query has completed with or without error */ + if (PGSM_EXTRACT_COMMENTS && query && kind == PGSM_STORE) + { + char comments[512] = {0}; + int comments_len; + + extract_query_comments(query, comments, sizeof(comments)); + comments_len = strlen(comments); + + if (comments_len > 0) + _snprintf(e->counters.info.comments, comments, comments_len + 1, COMMENTS_LEN); + } + + if (kind == PGSM_PLAN || kind == PGSM_STORE) { if (e->counters.plancalls.calls == 0) e->counters.plancalls.usage = USAGE_INIT; + e->counters.plancalls.calls += 1; e->counters.plantime.total_time += total_time; @@ -1289,22 +1328,28 @@ pgss_update_entry(pgssEntry *entry, e->counters.plantime.max_time = total_time; e->counters.plantime.mean_time = total_time; } + else + { + /* Increment the counts, except when jstate is not NULL */ + old_mean = e->counters.plantime.mean_time; - /* Increment the counts, except when jstate is not NULL */ - old_mean = e->counters.plantime.mean_time; - e->counters.plantime.mean_time += (total_time - old_mean) / e->counters.plancalls.calls; - e->counters.plantime.sum_var_time += (total_time - old_mean) * (total_time - e->counters.plantime.mean_time); + e->counters.plantime.mean_time += (total_time - old_mean) / e->counters.plancalls.calls; + e->counters.plantime.sum_var_time += (total_time - old_mean) * (total_time - e->counters.plantime.mean_time); - /* calculate min and max time */ - if (e->counters.plantime.min_time > total_time) - e->counters.plantime.min_time = total_time; - if (e->counters.plantime.max_time < total_time) - e->counters.plantime.max_time = total_time; + /* calculate min and max time */ + if (e->counters.plantime.min_time > total_time) + e->counters.plantime.min_time = total_time; + + if (e->counters.plantime.max_time < total_time) + e->counters.plantime.max_time = total_time; + } } - else if (kind == PGSS_FINISHED) + + if (kind == PGSM_EXEC || kind == PGSM_STORE) { if (e->counters.calls.calls == 0) e->counters.calls.usage = USAGE_INIT; + e->counters.calls.calls += 1; e->counters.time.total_time += total_time; @@ -1314,64 +1359,71 @@ pgss_update_entry(pgssEntry *entry, e->counters.time.max_time = total_time; e->counters.time.mean_time = total_time; } + else + { + /* Increment the counts, except when jstate is not NULL */ + old_mean = e->counters.time.mean_time; + e->counters.time.mean_time += (total_time - old_mean) / e->counters.calls.calls; + e->counters.time.sum_var_time += (total_time - old_mean) * (total_time - e->counters.time.mean_time); - /* Increment the counts, except when jstate is not NULL */ - old_mean = e->counters.time.mean_time; - e->counters.time.mean_time += (total_time - old_mean) / e->counters.calls.calls; - e->counters.time.sum_var_time += (total_time - old_mean) * (total_time - e->counters.time.mean_time); + /* calculate min and max time */ + if (e->counters.time.min_time > total_time) + e->counters.time.min_time = total_time; - /* calculate min and max time */ - if (e->counters.time.min_time > total_time) - e->counters.time.min_time = total_time; - if (e->counters.time.max_time < total_time) - e->counters.time.max_time = total_time; + if (e->counters.time.max_time < total_time) + e->counters.time.max_time = total_time; - index = get_histogram_bucket(total_time); - e->counters.resp_calls[index]++; + index = get_histogram_bucket(total_time); + e->counters.resp_calls[index]++; + } } - if (plan_text_len > 0 && !e->counters.planinfo.plan_text[0]) - _snprintf(e->counters.planinfo.plan_text, plan_info->plan_text, plan_text_len + 1, PLAN_TEXT_LEN); - - if (app_name_len > 0 && !e->counters.info.application_name[0]) - _snprintf(e->counters.info.application_name, app_name, app_name_len + 1, APPLICATIONNAME_LEN); - - e->counters.info.num_relations = num_relations; - _snprintf2(e->counters.info.relations, relations, num_relations, REL_LEN); - - e->counters.info.cmd_type = cmd_type; - - if (exec_nested_level > 0) + /* Only should process this once when storing the data */ + if (kind == PGSM_STORE) { - if (exec_nested_level >= 0 && exec_nested_level < max_stack_depth) + if (plan_text_len > 0 && !e->counters.planinfo.plan_text[0]) + _snprintf(e->counters.planinfo.plan_text, plan_info->plan_text, plan_text_len + 1, PLAN_TEXT_LEN); + + app_name_len = pg_get_application_name(app_name, APPLICATIONNAME_LEN); + + if (app_name_len > 0 && !e->counters.info.application_name[0]) + _snprintf(e->counters.info.application_name, app_name, app_name_len + 1, APPLICATIONNAME_LEN); + + e->counters.info.num_relations = num_relations; + _snprintf2(e->counters.info.relations, relations, num_relations, REL_LEN); + + if (exec_nested_level > 0) { - int parent_query_len = nested_query_txts[exec_nested_level - 1]? - strlen(nested_query_txts[exec_nested_level - 1]): 0; - e->counters.info.parentid = nested_queryids[exec_nested_level - 1]; - e->counters.info.parent_query = InvalidDsaPointer; - /* If we have a parent query, store it in the raw dsa area */ - if (parent_query_len > 0) + if (exec_nested_level >= 0 && exec_nested_level < max_stack_depth) { - char *qry_buff; - dsa_area *query_dsa_area = get_dsa_area_for_query_text(); - /* Use dsa_allocate_extended with DSA_ALLOC_NO_OOM flag, as we don't want to get an - * error if memory allocation fails.*/ - dsa_pointer qry = dsa_allocate_extended(query_dsa_area, parent_query_len+1, DSA_ALLOC_NO_OOM | DSA_ALLOC_ZERO); - if (DsaPointerIsValid(qry)) + int parent_query_len = nested_query_txts[exec_nested_level - 1]? + strlen(nested_query_txts[exec_nested_level - 1]): 0; + e->counters.info.parentid = nested_queryids[exec_nested_level - 1]; + e->counters.info.parent_query = InvalidDsaPointer; + /* If we have a parent query, store it in the raw dsa area */ + if (parent_query_len > 0) { - qry_buff = dsa_get_address(query_dsa_area, qry); - memcpy(qry_buff, nested_query_txts[exec_nested_level - 1], parent_query_len); - qry_buff[parent_query_len] = 0; - /* store the dsa pointer for parent query text */ - e->counters.info.parent_query = qry; + char *qry_buff; + dsa_area *query_dsa_area = get_dsa_area_for_query_text(); + /* Use dsa_allocate_extended with DSA_ALLOC_NO_OOM flag, as we don't want to get an + * error if memory allocation fails.*/ + dsa_pointer qry = dsa_allocate_extended(query_dsa_area, parent_query_len+1, DSA_ALLOC_NO_OOM | DSA_ALLOC_ZERO); + if (DsaPointerIsValid(qry)) + { + qry_buff = dsa_get_address(query_dsa_area, qry); + memcpy(qry_buff, nested_query_txts[exec_nested_level - 1], parent_query_len); + qry_buff[parent_query_len] = 0; + /* store the dsa pointer for parent query text */ + e->counters.info.parent_query = qry; + } } } } - } - else - { - e->counters.info.parentid = UINT64CONST(0); - e->counters.info.parent_query = InvalidDsaPointer; + else + { + e->counters.info.parentid = UINT64CONST(0); + e->counters.info.parent_query = InvalidDsaPointer; + } } if (error_info) @@ -1380,7 +1432,9 @@ pgss_update_entry(pgssEntry *entry, _snprintf(e->counters.error.sqlcode, error_info->sqlcode, sqlcode_len, SQLCODE_LEN); _snprintf(e->counters.error.message, error_info->message, message_len, ERROR_MESSAGE_LEN); } + e->counters.calls.rows += rows; + if (bufusage) { e->counters.blocks.shared_blks_hit += bufusage->shared_blks_hit; @@ -1400,7 +1454,9 @@ pgss_update_entry(pgssEntry *entry, e->counters.blocks.temp_blk_write_time += INSTR_TIME_GET_MILLISEC(bufusage->temp_blk_write_time); #endif } + e->counters.calls.usage += USAGE_EXEC(total_time); + if (sys_info) { e->counters.sysinfo.utime += sys_info->utime; @@ -1429,38 +1485,116 @@ pgss_update_entry(pgssEntry *entry, e->counters.jitinfo.jit_emission_count++; e->counters.jitinfo.jit_emission_time += INSTR_TIME_GET_MILLISEC(jitusage->emission_counter); } - SpinLockRelease(&e->mutex); + + if (kind == PGSM_STORE) + SpinLockRelease(&e->mutex); } } static void -pgss_store_error(uint64 queryid, +pgsm_store_error(uint64 queryid, const char *query, ErrorData *edata) { - ErrorInfo error_info; + pgssEntry *entry; + ErrorInfo error_info; error_info.elevel = edata->elevel; snprintf(error_info.message, ERROR_MESSAGE_LEN, "%s", edata->message); snprintf(error_info.sqlcode, SQLCODE_LEN, "%s", unpack_sql_state(edata->sqlerrcode)); - pgss_store(queryid, /* query id */ - query, /* query text */ - 0, /* query location */ - strlen(query), /* query length */ - NULL, /* PlanInfo */ - 0, /* CmdType */ - NULL, /* SysInfo */ - &error_info, /* ErrorInfo */ - 0, /* total_time */ - 0, /* rows */ - NULL, /* bufusage */ - NULL, /* walusage */ - NULL, /* JumbleState */ - NULL, - PGSS_ERROR); /* pgssStoreKind */ + entry = pgsm_create_hash_entry(0, queryid, NULL); + + pgsm_add_to_list(entry, (char *)query, strlen(query), true); + + pgsm_update_entry(entry, /* entry */ + query, /* query */ + NULL, /* PlanInfo */ + NULL, /* SysInfo */ + &error_info, /* ErrorInfo */ + 0, /* total_time */ + 0, /* rows */ + NULL, /* bufusage */ + NULL, /* walusage */ + NULL, /* jitusage */ + false, /* reset */ + PGSM_ERROR); /* kind */ + + pgsm_store(entry); } +static void +pgsm_add_to_list(pgssEntry *entry, char *query_text, int query_len, bool should_dup) +{ + MemoryContext oldctx; + char *query; + + /* Switch to TopMemoryContext */ + oldctx = MemoryContextSwitchTo(TopMemoryContext); + + if (should_dup) + query = pnstrdup(query_text, query_len); + else + query = query_text; + + lentries = lappend(lentries, entry); + lquery_text = lappend(lquery_text, query); + + MemoryContextSwitchTo(oldctx); +} + + +/* + * Function encapsulating some external calls for filling up the hash key data structure. + * The bucket_id may not be known at this stage. So pass any value that you may wish. + */ +static pgssEntry * +pgsm_create_hash_entry(uint64 bucket_id, uint64 queryid, PlanInfo *plan_info) +{ + pgssEntry *entry; + int sec_ctx; + bool found_client_addr = false; + char app_name[APPLICATIONNAME_LEN] = ""; + char *app_name_ptr = app_name; + int app_name_len = 0; + MemoryContext oldctx; + + /* Create an entry in the TopMemoryContext */ + oldctx = MemoryContextSwitchTo(TopMemoryContext); + entry = palloc0(sizeof(pgssEntry)); + MemoryContextSwitchTo(oldctx); + + /* + * Get the user ID. Let's use this instead of GetUserID as this + * won't throw an assertion in case of an error. + */ + GetUserIdAndSecContext((Oid *) &entry->key.userid, &sec_ctx); + + /* Get the application name and set appid */ + app_name_len = pg_get_application_name(app_name, APPLICATIONNAME_LEN); + entry->key.appid = pgss_hash_string((const char *)app_name_ptr, app_name_len); + + /* client address */ + entry->key.ip = pg_get_client_addr(&found_client_addr); + + /* PlanID, if there is one */ + entry->key.planid = plan_info ? plan_info->planid : 0; + + /* Set remaining data */ + entry->key.dbid = MyDatabaseId; + entry->key.queryid = queryid; + entry->key.bucket_id = bucket_id; + +#if PG_VERSION_NUM < 140000 + entry->key.toplevel = 1; +#else + entry->key.toplevel = ((exec_nested_level + plan_nested_level) == 0); +#endif + + return entry; +} + + /* * Store some statistics for a statement. * @@ -1472,42 +1606,16 @@ pgss_store_error(uint64 queryid, * query string. total_time, rows, bufusage are ignored in this case. */ static void -pgss_store(uint64 queryid, - const char *query, - int query_location, - int query_len, - PlanInfo * plan_info, - CmdType cmd_type, - SysInfo * sys_info, - ErrorInfo * error_info, - double total_time, - uint64 rows, - BufferUsage *bufusage, - WalUsage *walusage, - const struct JitInstrumentation *jitusage, - JumbleState *jstate, - pgssStoreKind kind) +pgsm_store(pgssEntry *entry) { - pgssHashKey key; - pgssEntry *entry; + pgssEntry *shared_hash_entry; pgssSharedState *pgss; - char *app_name_ptr; - char app_name[APPLICATIONNAME_LEN] = ""; - int app_name_len = 0; - bool reset = false; - uint64 pgsm_query_id = 0; + bool found; uint64 bucketid; uint64 prev_bucket_id; - uint64 userid; - uint64 planid; - uint64 appid = 0; - int norm_query_len = 0; - char *norm_query = NULL; - char comments[512] = ""; - bool found_app_name = false; - bool found_client_addr = false; - uint client_addr = 0; - bool found; + bool reset = false; /* Only used in update function - HAMID */ + char *query; + int query_len; /* Safety check... */ if (!IsSystemInitialized()) @@ -1515,136 +1623,30 @@ pgss_store(uint64 queryid, pgss = pgsm_get_ss(); -#if PG_VERSION_NUM >= 140000 - - /* - * Nothing to do if compute_query_id isn't enabled and no other module - * computed a query identifier. - */ - if (queryid == UINT64CONST(0)) - return; -#endif - - query = CleanQuerytext(query, &query_location, &query_len); - -#if PG_VERSION_NUM < 140000 - - /* - * For utility statements, we just hash the query string to get an ID. - */ - if (queryid == UINT64CONST(0)) - { - queryid = pgss_hash_string(query, query_len); - - /* - * If we are unlucky enough to get a hash of zero(invalid), use - * queryID as 2 instead, queryID 1 is already in use for normal - * statements. - */ - if (queryid == UINT64CONST(0)) - queryid = UINT64CONST(2); - } -#endif - - Assert(query != NULL); - if (kind == PGSS_ERROR) - { - int sec_ctx; - - GetUserIdAndSecContext((Oid *) &userid, &sec_ctx); - } - else - userid = GetUserId(); - - /* Try to read application name from GUC directly */ - if (application_name && *application_name) - { - app_name_ptr = application_name; - appid = djb2_hash_str((unsigned char *) application_name, &app_name_len); - } - else - { - app_name_len = pg_get_application_name(app_name, &found_app_name); - if (found_app_name) - appid = djb2_hash((unsigned char *) app_name, app_name_len); - app_name_ptr = app_name; - } - - if (!found_client_addr) - client_addr = pg_get_client_addr(&found_client_addr); - - planid = plan_info ? plan_info->planid : 0; - - /* Extract comments if enabled. */ - if (PGSM_EXTRACT_COMMENTS) - extract_query_comments(query, comments, sizeof(comments)); - + /* We should lock the hash table here what if the bucket is removed; e.g. reset is called - HAMID */ prev_bucket_id = pg_atomic_read_u64(&pgss->current_wbucket); bucketid = get_next_wbucket(pgss); if (bucketid != prev_bucket_id) reset = true; - key.bucket_id = bucketid; - key.userid = userid; - key.dbid = MyDatabaseId; - key.queryid = queryid; - key.ip = client_addr; - key.planid = planid; - key.appid = appid; -#if PG_VERSION_NUM < 140000 - key.toplevel = 1; -#else - key.toplevel = ((exec_nested_level + plan_nested_level) == 0); -#endif + entry->key.bucket_id = bucketid; + query = (char *)llast(lquery_text); + query_len = strlen(query); + /* + * Acquire a share lock to start with. We'd have to acquire exclusive + * if we need ot create the entry. + */ LWLockAcquire(pgss->lock, LW_SHARED); + shared_hash_entry = (pgssEntry *) pgsm_hash_find(get_pgssHash(), &entry->key, &found); - entry = (pgssEntry *) pgsm_hash_find(get_pgssHash(), &key, &found); - if (!entry) + if (!shared_hash_entry) { dsa_pointer dsa_query_pointer; dsa_area *query_dsa_area; char *query_buff; - /* - * Create a new, normalized query string if caller asked. We don't - * need to hold the lock while doing this work. (Note: in any case, - * it's possible that someone else creates a duplicate hashtable entry - * in the interval where we don't hold the lock below. That case is - * handled by entry_alloc. - */ - if (jstate) - { - norm_query_len = query_len; - - LWLockRelease(pgss->lock); - norm_query = generate_normalized_query(jstate, query, - query_location, - &norm_query_len, - GetDatabaseEncoding()); - LWLockAcquire(pgss->lock, LW_SHARED); - - pgsm_query_id = pgss_hash_string(norm_query, norm_query_len); - - /* Free up norm_query if we don't intend to show normalized version in the view */ - if (PGSM_NORMALIZED_QUERY) - { - query_len = norm_query_len; - } - else - { - if (norm_query) - pfree(norm_query); - - norm_query = NULL; - } - } - else - { - pgsm_query_id = pgss_hash_string(query, query_len); - } - /* New query, truncate length if necessary. */ if (query_len > PGSM_QUERY_MAX_LEN) query_len = PGSM_QUERY_MAX_LEN; @@ -1655,26 +1657,27 @@ pgss_store(uint64 queryid, if (!DsaPointerIsValid(dsa_query_pointer)) { LWLockRelease(pgss->lock); - if (norm_query) - pfree(norm_query); return; } + /* Get the memory address from DSA pointer and copy the query text in local variable */ query_buff = dsa_get_address(query_dsa_area, dsa_query_pointer); - memcpy(query_buff, norm_query ? norm_query : query, query_len); - /* OK to create a new hashtable entry */ + memcpy(query_buff, query, query_len); + LWLockRelease(pgss->lock); + LWLockAcquire(pgss->lock, LW_EXCLUSIVE); + + /* OK to create a new hashtable entry */ PGSM_DISABLE_ERROR_CAPUTRE(); { PG_TRY(); { - entry = hash_entry_alloc(pgss, &key, GetDatabaseEncoding()); + shared_hash_entry = hash_entry_alloc(pgss, &entry->key, GetDatabaseEncoding()); } PG_CATCH(); { LWLockRelease(pgss->lock); - if (norm_query) - pfree(norm_query); + if (DsaPointerIsValid(dsa_query_pointer)) dsa_free(query_dsa_area, dsa_query_pointer); PG_RE_THROW(); @@ -1685,47 +1688,41 @@ pgss_store(uint64 queryid, if (entry == NULL) { LWLockRelease(pgss->lock); - if (norm_query) - pfree(norm_query); + if (DsaPointerIsValid(dsa_query_pointer)) dsa_free(query_dsa_area, dsa_query_pointer); + return; } - entry->query_pos = dsa_query_pointer; - entry->pgsm_query_id = pgsm_query_id; - } - else - { - #if USE_DYNAMIC_HASH - if(entry) - dshash_release_lock(get_pgssHash(), entry); - #endif + /* If we already have the pointer set, free this one */ + if (DsaPointerIsValid(shared_hash_entry->query_pos)) + dsa_free(query_dsa_area, dsa_query_pointer); + else + shared_hash_entry->query_pos = dsa_query_pointer; + + shared_hash_entry->pgsm_query_id = entry->pgsm_query_id; + shared_hash_entry->encoding = entry->encoding; + shared_hash_entry->counters.info.cmd_type = entry->counters.info.cmd_type; } - if (jstate == NULL) - pgss_update_entry(entry, /* entry */ - bucketid, /* bucketid */ - queryid, /* queryid */ - query, /* query */ - comments, /* comments */ - plan_info, /* PlanInfo */ - cmd_type, /* CmdType */ - sys_info, /* SysInfo */ - error_info, /* ErrorInfo */ - total_time, /* total_time */ - rows, /* rows */ - bufusage, /* bufusage */ - walusage, /* walusage */ - jitusage, - reset, /* reset */ - kind, /* kind */ - app_name_ptr, - app_name_len); + pgsm_update_entry(shared_hash_entry, /* entry */ + query, /* query */ + &entry->counters.planinfo, /* PlanInfo */ + &entry->counters.sysinfo, /* SysInfo */ + &entry->counters.error, /* ErrorInfo */ + 0, /* total_time */ /* HAMID - need to pass a proper value here */ + entry->counters.calls.rows, /* rows */ + NULL, // HAMID &entry->counters.blocks, /* bufusage */ + NULL, // HAMID &entry->counters.walusage, /* walusage */ + NULL, // HAMID &entry->counters.jitinfo, /* jitusage */ + reset, /* reset */ + PGSM_STORE); + + lentries = list_delete_last(lentries); + lquery_text = list_delete_last(lquery_text); LWLockRelease(pgss->lock); - if (norm_query) - pfree(norm_query); } /* @@ -1863,19 +1860,18 @@ pg_stat_monitor_internal(FunctionCallInfo fcinfo, double stddev; uint64 queryid = entry->key.queryid; int64 bucketid = entry->key.bucket_id; - uint64 dbid = entry->key.dbid; - uint64 userid = entry->key.userid; - int64 ip = entry->key.ip; + Oid dbid = entry->key.dbid; + Oid userid = entry->key.userid; + uint32 ip = entry->key.ip; uint64 planid = entry->key.planid; uint64 pgsm_query_id = entry->pgsm_query_id; dsa_area *query_dsa_area; char *query_ptr; + bool toplevel = entry->key.toplevel; #if PG_VERSION_NUM < 140000 - bool toplevel = 1; bool is_allowed_role = is_member_of_role(GetUserId(), DEFAULT_ROLE_READ_ALL_STATS); #else bool is_allowed_role = is_member_of_role(GetUserId(), ROLE_PG_READ_ALL_STATS); - bool toplevel = entry->key.toplevel; #endif /* Load the query text from dsa area */ if (DsaPointerIsValid(entry->query_pos)) @@ -1905,13 +1901,8 @@ pg_stat_monitor_internal(FunctionCallInfo fcinfo, if (!IsBucketValid(bucketid)) { - if (tmp.state == PGSS_FINISHED) - continue; - } - - /* Skip queries such as, $1, $2 := $3, etc. */ - if (tmp.state == PGSS_PARSE || tmp.state == PGSS_PLAN) continue; + } /* read the parent query text if any */ if (tmp.info.parentid != UINT64CONST(0)) @@ -1939,7 +1930,7 @@ pg_stat_monitor_internal(FunctionCallInfo fcinfo, * pg_read_all_stats members are allowed */ if (is_allowed_role || userid == GetUserId()) - values[i++] = Int64GetDatumFast(ip); + values[i++] = UInt32GetDatum(ip); else nulls[i++] = true; @@ -3281,7 +3272,7 @@ pgsm_emit_log_hook(ErrorData *edata) if (debug_query_string) queryid = pgss_hash_string(debug_query_string, strlen(debug_query_string)); - pgss_store_error(queryid, + pgsm_store_error(queryid, debug_query_string ? debug_query_string : "", edata); } @@ -3492,34 +3483,3 @@ get_query_id(JumbleState *jstate, Query *query) return queryid; } #endif - -static uint64 -djb2_hash(unsigned char *str, size_t len) -{ - uint64 hash = 5381LLU; - - while (len--) - hash = ((hash << 5) + hash) ^ *str++; - /* hash(i - 1) * 33 ^ str[i] */ - - return hash; -} - -static uint64 -djb2_hash_str(unsigned char *str, int *out_len) -{ - uint64 hash = 5381LLU; - unsigned char *start = str; - unsigned char c; - - while ((c = *str) != '\0') - { - hash = ((hash << 5) + hash) ^ c; - /* hash(i - 1) * 33 ^ str[i] */ - ++str; - } - - *out_len = str - start; - - return hash; -} diff --git a/pg_stat_monitor.h b/pg_stat_monitor.h index 6bcf089..9c8bcd1 100644 --- a/pg_stat_monitor.h +++ b/pg_stat_monitor.h @@ -196,23 +196,23 @@ typedef enum OVERFLOW_TARGET OVERFLOW_TARGET_DISK } OVERFLOW_TARGET; -typedef enum pgssStoreKind +typedef enum pgsmStoreKind { - PGSS_INVALID = -1, + PGSM_INVALID = -1, /* - * PGSS_PLAN and PGSS_EXEC must be respectively 0 and 1 as they're used to + * PGSM_PLAN and PGSM_EXEC must be respectively 0 and 1 as they're used to * reference the underlying values in the arrays in the Counters struct, - * and this order is required in pg_stat_statements_internal(). + * and this order is required in pg_stat_monitor_internal(). */ - PGSS_PARSE = 0, - PGSS_PLAN, - PGSS_EXEC, - PGSS_FINISHED, - PGSS_ERROR, + PGSM_PARSE = 0, + PGSM_PLAN, + PGSM_EXEC, + PGSM_STORE, + PGSM_ERROR, PGSS_NUMKIND /* Must be last value of this enum */ -} pgssStoreKind; +} pgsmStoreKind; /* the assumption of query max nested level */ #define DEFAULT_MAX_NESTED_LEVEL 10 @@ -251,12 +251,12 @@ typedef struct pgssHashKey { uint64 bucket_id; /* bucket number */ uint64 queryid; /* query identifier */ - uint64 userid; /* user OID */ - uint64 dbid; /* database OID */ - uint64 ip; /* client ip address */ uint64 planid; /* plan identifier */ uint64 appid; /* hash of application name */ - uint64 toplevel; /* query executed at top level */ + Oid userid; /* user OID */ + Oid dbid; /* database OID */ + uint32 ip; /* client ip address */ + bool toplevel; /* query executed at top level */ } pgssHashKey; typedef struct QueryInfo @@ -339,7 +339,6 @@ typedef struct Wal_Usage typedef struct Counters { - uint64 bucket_id; /* bucket id */ Calls calls; QueryInfo info; CallTime time; @@ -479,8 +478,6 @@ void pgss_shmem_startup(void); void pgss_shmem_shutdown(int code, Datum arg); int pgsm_get_bucket_size(void); pgssSharedState *pgsm_get_ss(void); -void hash_entry_reset(void); -void hash_query_entryies_reset(void); void hash_query_entries(); void hash_query_entry_dealloc(int new_bucket_id, int old_bucket_id, unsigned char *query_buffer[]); void hash_entry_dealloc(int new_bucket_id, int old_bucket_id, unsigned char *query_buffer);