diff --git a/pg_stat_monitor--1.0.sql b/pg_stat_monitor--1.0.sql index 54693c3..0700103 100644 --- a/pg_stat_monitor--1.0.sql +++ b/pg_stat_monitor--1.0.sql @@ -46,6 +46,16 @@ RETURNS SETOF record AS 'MODULE_PATHNAME', 'pg_stat_monitor' LANGUAGE C STRICT VOLATILE PARALLEL SAFE; +CREATE FUNCTION pg_stat_wait_events( + OUT queryid text, + OUT pid bigint, + OUT wait_event text, + OUT wait_event_type text + ) +RETURNS SETOF record +AS 'MODULE_PATHNAME', 'pg_stat_wait_events' +LANGUAGE C STRICT VOLATILE PARALLEL SAFE; + CREATE FUNCTION pg_stat_agg( OUT queryid text, OUT id bigint, @@ -61,7 +71,7 @@ CREATE VIEW pg_stat_monitor AS SELECT bucket_start_time, userid, dbid, - queryid, + m.queryid, query, calls, total_time, @@ -87,9 +97,21 @@ CREATE VIEW pg_stat_monitor AS SELECT (string_to_array(resp_calls, ',')) resp_calls, cpu_user_time, cpu_sys_time, - (string_to_array(tables_names, ',')) tables_names -FROM pg_stat_monitor(true); + (string_to_array(tables_names, ',')) tables_names, + wait_event, + wait_event_type +FROM pg_stat_monitor(true) m, pg_stat_wait_events() w WHERE (w.queryid = m.queryid) OR w.queryid IS NULL; + +-- Register a view on the function for ease of use. +CREATE VIEW pg_stat_wait_events AS SELECT + m.queryid, + query, + wait_event, + wait_event_type +FROM pg_stat_monitor(true) m, pg_stat_wait_events() w WHERE w.queryid = m.queryid; + +GRANT SELECT ON pg_stat_wait_events TO PUBLIC; GRANT SELECT ON pg_stat_monitor TO PUBLIC; CREATE VIEW pg_stat_agg_database AS @@ -154,6 +176,8 @@ FROM pg_stat_agg() agg INNER JOIN (SELECT DISTINCT bucket, queryid, userid, query, client_ip, host, min_time, max_time, mean_time, resp_calls, tables_names, cpu_user_time,cpu_sys_time FROM pg_stat_monitor) ss ON agg.queryid = ss.queryid AND agg.type = 2 AND id = host; + + GRANT SELECT ON pg_stat_agg_user TO PUBLIC; GRANT SELECT ON pg_stat_agg_ip TO PUBLIC; GRANT SELECT ON pg_stat_agg_database TO PUBLIC; diff --git a/pg_stat_monitor.c b/pg_stat_monitor.c index 5c9c028..5c8e427 100644 --- a/pg_stat_monitor.c +++ b/pg_stat_monitor.c @@ -12,263 +12,11 @@ */ #include "postgres.h" -#include -#include -#include -#include -#include -#include +#include "pg_stat_monitor.h" -#include "access/hash.h" -#include "catalog/pg_authid.h" -#include "executor/instrument.h" -#include "common/ip.h" -#include "funcapi.h" -#include "mb/pg_wchar.h" -#include "miscadmin.h" -#include "parser/analyze.h" -#include "parser/parsetree.h" -#include "parser/scanner.h" -#include "parser/scansup.h" -#include "pgstat.h" -#include "storage/fd.h" -#include "storage/ipc.h" -#include "storage/spin.h" -#include "tcop/utility.h" -#include "utils/acl.h" -#include "utils/builtins.h" -#include "utils/memutils.h" -#include "utils/timestamp.h" -#include "utils/lsyscache.h" PG_MODULE_MAGIC; -/* Time difference in miliseconds */ -#define TIMEVAL_DIFF(start, end) (((double) end.tv_sec + (double) end.tv_usec / 1000000.0) \ - - ((double) start.tv_sec + (double) start.tv_usec / 1000000.0)) * 1000 - -#define ArrayGetTextDatum(x) array_get_datum(x) - -/* XXX: Should USAGE_EXEC reflect execution time and/or buffer usage? */ -#define USAGE_EXEC(duration) (1.0) -#define USAGE_INIT (1.0) /* including initial planning */ -#define ASSUMED_MEDIAN_INIT (10.0) /* initial assumed median usage */ -#define ASSUMED_LENGTH_INIT 1024 /* initial assumed mean query length */ -#define USAGE_DECREASE_FACTOR (0.99) /* decreased every entry_dealloc */ -#define STICKY_DECREASE_FACTOR (0.50) /* factor for sticky entries */ -#define USAGE_DEALLOC_PERCENT 5 /* free this % of entries at once */ - -#define JUMBLE_SIZE 1024 /* query serialization buffer size */ - -#define MAX_RESPONSE_BUCKET 10 -#define MAX_REL_LEN 2 -#define MAX_BUCKETS 10 -#define MAX_OBJECT_CACHE 100 - -/* - * Type of aggregate keys - */ -typedef enum AGG_KEY -{ - AGG_KEY_DATABASE = 0, - AGG_KEY_USER, - AGG_KEY_HOST -} AGG_KEY; - -/* Bucket shared_memory storage */ -typedef struct pgssBucketHashKey -{ - uint64 bucket_id; /* bucket number */ -} pgssBucketHashKey; - -typedef struct pgssBucketCounters -{ - Timestamp current_time; /* start time of the bucket */ - int resp_calls[MAX_RESPONSE_BUCKET]; /* execution time's in msec */ -}pgssBucketCounters; - -typedef struct pgssBucketEntry -{ - pgssBucketHashKey key; /* hash key of entry - MUST BE FIRST */ - pgssBucketCounters counters; - slock_t mutex; /* protects the counters only */ -}pgssBucketEntry; - -/* Objects shared memory storage */ -typedef struct pgssObjectHashKey -{ - uint64 queryid; /* query id */ -} pgssObjectHashKey; - -typedef struct pgssObjectEntry -{ - pgssObjectHashKey key; /* hash key of entry - MUST BE FIRST */ - char tables_name[MAX_REL_LEN]; /* table names involved in the query */ - slock_t mutex; /* protects the counters only */ -} pgssObjectEntry; - -/* Aggregate shared memory storage */ -typedef struct pgssAggHashKey -{ - uint64 id; /* dbid, userid or ip depend upon the type */ - uint64 type; /* type of id dbid, userid or ip */ - uint64 queryid; /* query identifier, foreign key to the query */ - uint64 bucket_id; /* bucket_id is the foreign key to pgssBucketHashKey */ -} pgssAggHashKey; - -typedef struct pgssAggCounters -{ - uint64 total_calls; /* number of quries per database/user/ip */ -} pgssAggCounters; - -typedef struct pgssAggEntry -{ - pgssAggHashKey key; /* hash key of entry - MUST BE FIRST */ - pgssAggCounters counters; /* the statistics aggregates */ - slock_t mutex; /* protects the counters only */ -} pgssAggEntry; - -/* shared nenory storage for the query */ -typedef struct pgssHashKey -{ - uint64 bucket_id; /* bucket number */ - uint64 queryid; /* query identifier */ - Oid userid; /* user OID */ - Oid dbid; /* database OID */ -} pgssHashKey; - -typedef struct QueryInfo -{ - uint64 queryid; /* query identifier */ - Oid userid; /* user OID */ - Oid dbid; /* database OID */ - uint host; /* client IP */ - char tables_name[MAX_REL_LEN]; /* table names involved in the query */ -} QueryInfo; - -typedef struct Calls -{ - int64 calls; /* # of times executed */ - int64 rows; /* total # of retrieved or affected rows */ - double usage; /* usage factor */ -} Calls; - -typedef struct CallTime -{ - double total_time; /* total execution time, in msec */ - double min_time; /* minimum execution time in msec */ - double max_time; /* maximum execution time in msec */ - double mean_time; /* mean execution time in msec */ - double sum_var_time; /* sum of variances in execution time in msec */ -} CallTime; - -typedef struct Blocks -{ - int64 shared_blks_hit; /* # of shared buffer hits */ - int64 shared_blks_read; /* # of shared disk blocks read */ - int64 shared_blks_dirtied; /* # of shared disk blocks dirtied */ - int64 shared_blks_written; /* # of shared disk blocks written */ - int64 local_blks_hit; /* # of local buffer hits */ - int64 local_blks_read; /* # of local disk blocks read */ - int64 local_blks_dirtied; /* # of local disk blocks dirtied */ - int64 local_blks_written; /* # of local disk blocks written */ - int64 temp_blks_read; /* # of temp blocks read */ - int64 temp_blks_written; /* # of temp blocks written */ - double blk_read_time; /* time spent reading, in msec */ - double blk_write_time; /* time spent writing, in msec */ -} Blocks; - -typedef struct SysInfo -{ - float utime; /* user cpu time */ - float stime; /* system cpu time */ -} SysInfo; - -/* - * The actual stats counters kept within pgssEntry. - */ -typedef struct Counters -{ - uint64 bucket_id; /* bucket id */ - Calls calls; - QueryInfo info; - CallTime time; - Blocks blocks; - SysInfo sysinfo; -} Counters; - -/* Some global structure to get the cpu usage, really don't like the idea of global variable */ - -/* - * Statistics per statement - */ -typedef struct pgssEntry -{ - pgssHashKey key; /* hash key of entry - MUST BE FIRST */ - Counters counters; /* the statistics for this query */ - int encoding; /* query text encoding */ - slock_t mutex; /* protects the counters only */ -} pgssEntry; - -typedef struct QueryFifo -{ - int head; - int tail; -} QueryFifo; - -/* - * Global shared state - */ -typedef struct pgssSharedState -{ - LWLock *lock; /* protects hashtable search/modification */ - double cur_median_usage; /* current median usage in hashtable */ - slock_t mutex; /* protects following fields only: */ - Size extent; /* current extent of query file */ - int n_writers; /* number of active writers to query file */ - uint64 current_wbucket; - unsigned long prev_bucket_usec; - unsigned long bucket_overflow[MAX_BUCKETS]; - unsigned long bucket_entry[MAX_BUCKETS]; - QueryFifo query_fifo[MAX_BUCKETS]; -} pgssSharedState; - -unsigned char *pgss_qbuf[MAX_BUCKETS]; - -/* - * Struct for tracking locations/lengths of constants during normalization - */ -typedef struct pgssLocationLen -{ - int location; /* start offset in query text */ - int length; /* length in bytes, or -1 to ignore */ -} pgssLocationLen; - -/* - * Working state for computing a query jumble and producing a normalized - * query string - */ -typedef struct pgssJumbleState -{ - /* Jumble of current query tree */ - unsigned char *jumble; - - /* Number of bytes used in jumble[] */ - Size jumble_len; - - /* Array of locations of constants that should be removed */ - pgssLocationLen *clocations; - - /* Allocated length of clocations array */ - int clocations_buf_size; - - /* Current number of valid entries in clocations array */ - int clocations_count; - - /* highest Param id we've seen, in order to start normalization correctly */ - int highest_extern_param_id; -} pgssJumbleState; - /*---- Local variables ----*/ /* Current nesting depth of ExecutorRun+ProcessUtility calls */ @@ -277,9 +25,11 @@ static double respose_time_lower_bound = .01; static double respose_time_step = .1; static struct rusage rusage_start; static struct rusage rusage_end; - +static volatile sig_atomic_t sigterm = false; +static void handle_sigterm(SIGNAL_ARGS); /* Saved hook values in case of unload */ +static planner_hook_type planner_hook_next = NULL; static shmem_startup_hook_type prev_shmem_startup_hook = NULL; static post_parse_analyze_hook_type prev_post_parse_analyze_hook = NULL; static ExecutorStart_hook_type prev_ExecutorStart = NULL; @@ -299,25 +49,14 @@ static HTAB *pgss_agghash = NULL; /* Hash table for aggegates */ static HTAB *pgss_buckethash = NULL; +/* Hash table for wait events */ +static HTAB *pgss_waiteventshash = NULL; + static pgssBucketEntry **pgssBucketEntries = NULL; +static pgssWaitEventEntry **pgssWaitEventEntries = NULL; /*---- GUC variables ----*/ -typedef enum -{ - PGSS_TRACK_NONE, /* track no statements */ - PGSS_TRACK_TOP, /* only top level statements */ - PGSS_TRACK_ALL /* all statements, including nested ones */ -} PGSSTrackLevel; - -static const struct config_enum_entry track_options[] = -{ - {"none", PGSS_TRACK_NONE, false}, - {"top", PGSS_TRACK_TOP, false}, - {"all", PGSS_TRACK_ALL, false}, - {NULL, 0, false} -}; - static int pgss_max; /* max # statements to track */ static int pgss_track; /* tracking level */ static bool pgss_track_utility; /* whether to track utility commands */ @@ -331,12 +70,8 @@ static int pgss_query_max_len; static int pgss_query_buf_size; static int pgss_query_buf_size_bucket; -#define pgss_enabled() \ - (pgss_track == PGSS_TRACK_ALL || \ - (pgss_track == PGSS_TRACK_TOP && nested_level == 0)) /*---- Function declarations ----*/ - void _PG_init(void); void _PG_fini(void); @@ -344,6 +79,7 @@ PG_FUNCTION_INFO_V1(pg_stat_monitor_reset); PG_FUNCTION_INFO_V1(pg_stat_monitor_1_2); PG_FUNCTION_INFO_V1(pg_stat_monitor_1_3); PG_FUNCTION_INFO_V1(pg_stat_monitor); +PG_FUNCTION_INFO_V1(pg_stat_wait_events); /* Extended version function prototypes */ PG_FUNCTION_INFO_V1(pg_stat_agg); @@ -354,6 +90,7 @@ static void update_agg_counters(uint64 bucket_id, uint64 queryid, uint64 id, AGG static pgssAggEntry *agg_entry_alloc(pgssAggHashKey *key); void add_object_entry(uint64 queryid, char *objects); +static PlannedStmt *pgss_planner_hook(Query *parse, int opt, ParamListInfo param); static void pgss_shmem_startup(void); static void pgss_shmem_shutdown(int code, Datum arg); static void pgss_post_parse_analyze(ParseState *pstate, Query *query); @@ -377,7 +114,7 @@ static Size pgss_memsize(void); static pgssEntry *entry_alloc(pgssSharedState *pgss, pgssHashKey *key, Size query_offset, int query_len, int encoding, bool sticky); static void entry_dealloc(int bucket_id); -static void entry_reset(int bucket_id); +static void entry_reset(void); static void AppendJumble(pgssJumbleState *jstate, const unsigned char *item, Size size); static void JumbleQuery(pgssJumbleState *jstate, Query *query); @@ -394,6 +131,13 @@ static uint64 get_next_wbucket(pgssSharedState *pgss); static void store_query(unsigned long queryid, const char *query, unsigned long query_len); static unsigned long locate_query(unsigned long bucket_id, unsigned long queryid, char * query); + +/* Wait Event Local Functions */ +static void register_wait_event(void); +void wait_event_main(Datum main_arg); +static void update_wait_event(void); +static uint64 get_query_id(pgssJumbleState *jstate, Query *query); + /* * Module load callback */ @@ -575,6 +319,7 @@ _PG_init(void) */ RequestAddinShmemSpace(pgss_memsize()); RequestNamedLWLockTranche("pg_stat_monitor", 1); + register_wait_event(); /* * Install hooks. @@ -594,6 +339,8 @@ _PG_init(void) ExecutorEnd_hook = pgss_ExecutorEnd; prev_ProcessUtility = ProcessUtility_hook; ProcessUtility_hook = pgss_ProcessUtility; + planner_hook_next = planner_hook; + planner_hook = pgss_planner_hook; } /* @@ -610,6 +357,7 @@ _PG_fini(void) ExecutorFinish_hook = prev_ExecutorFinish; ExecutorEnd_hook = prev_ExecutorEnd; ProcessUtility_hook = prev_ProcessUtility; + entry_reset(); } /* @@ -634,6 +382,8 @@ pgss_shmem_startup(void) pgss_object_hash = NULL; pgss_agghash = NULL; pgss_buckethash = NULL; + pgss_waiteventshash = NULL; + /* * Create or attach to the shared memory state, including hash table */ @@ -669,8 +419,35 @@ pgss_shmem_startup(void) pgss_buckethash = ShmemInitHash("pg_stat_monitor: Buckets hashtable", max_buckets, max_buckets, &info, + HASH_ELEM | HASH_BLOBS); + + memset(&info, 0, sizeof(info)); + info.keysize = sizeof(pgssWaitEventKey); + info.entrysize = sizeof(pgssWaitEventEntry); + + pgss_waiteventshash = ShmemInitHash("pg_stat_monitor: Wait Event hashtable", + 100, 100, + &info, HASH_ELEM | HASH_BLOBS); +#define MAX_BACKEND_PROCESES (MaxBackends + NUM_AUXILIARY_PROCS + max_prepared_xacts) + + pgssWaitEventEntries = malloc(sizeof (pgssWaitEventEntry) * MAX_BACKEND_PROCESES); + for (i = 0; i < MAX_BACKEND_PROCESES; i++) + { + pgssWaitEventKey key; + pgssWaitEventEntry *entry = NULL; + bool found = false; + + key.processid = i; + entry = (pgssWaitEventEntry *) hash_search(pgss_waiteventshash, &key, HASH_ENTER, &found); + if (!found) + { + SpinLockInit(&entry->mutex); + pgssWaitEventEntries[i] = entry; + } + } + memset(&info, 0, sizeof(info)); info.keysize = sizeof(pgssObjectHashKey); info.entrysize = sizeof(pgssObjectEntry); @@ -772,20 +549,7 @@ pgss_post_parse_analyze(ParseState *pstate, Query *query) return; } - /* Set up workspace for query jumbling */ - jstate.jumble = (unsigned char *) palloc(JUMBLE_SIZE); - jstate.jumble_len = 0; - jstate.clocations_buf_size = 32; - jstate.clocations = (pgssLocationLen *) - palloc(jstate.clocations_buf_size * sizeof(pgssLocationLen)); - jstate.clocations_count = 0; - jstate.highest_extern_param_id = 0; - - /* Compute query ID and mark the Query node with it */ - JumbleQuery(&jstate, query); - query->queryId = - DatumGetUInt64(hash_any_extended(jstate.jumble, jstate.jumble_len, 0)); - + query->queryId = get_query_id(&jstate, query); if (query->rtable) { ListCell *lc; @@ -1368,11 +1132,11 @@ exit: Datum pg_stat_monitor_reset(PG_FUNCTION_ARGS) { - if (!pgss || !pgss_hash || !pgss_agghash || !pgss_buckethash) + if (!pgss || !pgss_hash || !pgss_agghash || !pgss_buckethash || !pgss_waiteventshash) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("pg_stat_monitor: must be loaded via shared_preload_libraries"))); - entry_reset(-1); + entry_dealloc(-1); PG_RETURN_VOID(); } @@ -1386,6 +1150,99 @@ pg_stat_monitor(PG_FUNCTION_ARGS) return (Datum) 0; } +Datum +pg_stat_wait_events(PG_FUNCTION_ARGS) +{ + ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; + TupleDesc tupdesc; + Tuplestorestate *tupstore; + MemoryContext per_query_ctx; + MemoryContext oldcontext; + HASH_SEQ_STATUS hash_seq; + pgssWaitEventEntry *entry; + char *query_txt; + char queryid_txt[64]; + query_txt = (char*) malloc(pgss_query_max_len); + + /* hash table must exist already */ + if (!pgss || !pgss_hash || !pgss_object_hash) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("pg_stat_monitor: must be loaded via shared_preload_libraries"))); + + /* check to see if caller supports us returning a tuplestore */ + if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo)) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("pg_stat_monitor: set-valued function called in context that cannot accept a set"))); + if (!(rsinfo->allowedModes & SFRM_Materialize)) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("pg_stat_monitor: materialize mode required, but it is not " \ + "allowed in this context"))); + + /* Switch into long-lived context to construct returned data structures */ + per_query_ctx = rsinfo->econtext->ecxt_per_query_memory; + oldcontext = MemoryContextSwitchTo(per_query_ctx); + + /* Build a tuple descriptor for our result type */ + if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE) + elog(ERROR, "pg_stat_monitor: return type must be a row type"); + + tupstore = tuplestore_begin_heap(true, false, work_mem); + rsinfo->returnMode = SFRM_Materialize; + rsinfo->setResult = tupstore; + rsinfo->setDesc = tupdesc; + + MemoryContextSwitchTo(oldcontext); + + LWLockAcquire(pgss->lock, LW_SHARED); + + hash_seq_init(&hash_seq, pgss_waiteventshash); + while ((entry = hash_seq_search(&hash_seq)) != NULL) + { + Datum values[4]; + bool nulls[4] = {true}; + int i = 0; + int64 queryid = entry->key.queryid; + + if (queryid == 0) + continue; + memset(values, 0, sizeof(values)); + memset(nulls, 0, sizeof(nulls)); + + sprintf(queryid_txt, "%08lX", queryid); + + values[i++] = ObjectIdGetDatum(cstring_to_text(queryid_txt)); + values[i++] = ObjectIdGetDatum(entry->pid); + if (entry->wait_event_info != 0) + { + const char *event_type = pgstat_get_wait_event_type(entry->wait_event_info); + const char *event = pgstat_get_wait_event(entry->wait_event_info); + if (event_type) + values[i++] = PointerGetDatum(cstring_to_text(event_type)); + else + nulls[i++] = true; + if (event) + values[i++] = PointerGetDatum(cstring_to_text(event)); + else + nulls[i++] = true; + } + else + { + nulls[i++] = true; + nulls[i++] = true; + } + tuplestore_putvalues(tupstore, tupdesc, values, nulls); + } + free(query_txt); + + /* clean up and return the tuplestore */ + LWLockRelease(pgss->lock); + + tuplestore_donestoring(tupstore); + return (Datum) 0; +} /* Common code for all versions of pg_stat_statements() */ static void @@ -1403,7 +1260,6 @@ pg_stat_monitor_internal(FunctionCallInfo fcinfo, pgssEntry *entry; char *query_txt; char queryid_txt[64]; - query_txt = (char*) malloc(pgss_query_max_len); /* Superusers or members of pg_read_all_stats members are allowed */ @@ -1700,27 +1556,41 @@ entry_dealloc(int bucket) * Release all entries. */ static void -entry_reset(int bucket) +entry_reset() { HASH_SEQ_STATUS hash_seq; pgssEntry *entry; pgssAggEntry *dbentry; + pgssObjectEntry *objentry; + pgssWaitEventEntry *weentry; LWLockAcquire(pgss->lock, LW_EXCLUSIVE); hash_seq_init(&hash_seq, pgss_hash); while ((entry = hash_seq_search(&hash_seq)) != NULL) { - if (entry->key.bucket_id == bucket || bucket == -1) - hash_search(pgss_hash, &entry->key, HASH_REMOVE, NULL); + hash_search(pgss_hash, &entry->key, HASH_REMOVE, NULL); } hash_seq_init(&hash_seq, pgss_agghash); while ((dbentry = hash_seq_search(&hash_seq)) != NULL) { - if (dbentry->key.bucket_id == bucket || bucket == -1) - hash_search(pgss_agghash, &dbentry->key, HASH_REMOVE, NULL); + hash_search(pgss_agghash, &dbentry->key, HASH_REMOVE, NULL); } + + hash_seq_init(&hash_seq, pgss_buckethash); + while ((objentry = hash_seq_search(&hash_seq)) != NULL) + { + hash_search(pgss_buckethash, &dbentry->key, HASH_REMOVE, NULL); + } + + hash_seq_init(&hash_seq, pgss_waiteventshash); + while ((weentry = hash_seq_search(&hash_seq)) != NULL) + { + hash_search(pgss_waiteventshash, &dbentry->key, HASH_REMOVE, NULL); + } + free(pgssWaitEventEntries); + free(pgssBucketEntries); LWLockRelease(pgss->lock); } @@ -2873,3 +2743,105 @@ store_query(unsigned long queryid, const char *query, unsigned long query_len) pgss->query_fifo[pgss->current_wbucket].head = next; } + +static PlannedStmt * +pgss_planner_hook(Query *parse, int opt, ParamListInfo param) +{ + if (MyProc) + { + int i = MyProc - ProcGlobal->allProcs; + if (pgssWaitEventEntries[i]->key.queryid != parse->queryId) + pgssWaitEventEntries[i]->key.queryid = parse->queryId; + } + if (planner_hook_next) + return planner_hook_next(parse, opt, param); + + return standard_planner(parse, opt, param); +} + +static void +update_wait_event(void) +{ + PGPROC *proc = NULL; + int i; + + LWLockAcquire(ProcArrayLock, LW_SHARED); + for (i = 0; i < ProcGlobal->allProcCount; i++) + { + proc = &ProcGlobal->allProcs[i]; + if (proc->pid == 0) + continue; + + pgssWaitEventEntries[i]->wait_event_info = proc->wait_event_info; + pgssWaitEventEntries[i]->pid = proc->pid; + } + LWLockRelease(ProcArrayLock); +} + +static void +handle_sigterm(SIGNAL_ARGS) +{ + sigterm = true; +} + +static void +register_wait_event(void) +{ + BackgroundWorker worker; + + memset(&worker, 0, sizeof(worker)); + worker.bgw_flags = BGWORKER_SHMEM_ACCESS; + worker.bgw_start_time = BgWorkerStart_ConsistentState; + worker.bgw_restart_time = 0; + worker.bgw_notify_pid = 0; + snprintf(worker.bgw_library_name, BGW_MAXLEN, "pg_stat_monitor"); + snprintf(worker.bgw_function_name, BGW_MAXLEN, CppAsString(wait_event_main)); + snprintf(worker.bgw_name, BGW_MAXLEN, "pg_stat_monitor collector"); + worker.bgw_main_arg = (Datum) 0; + RegisterBackgroundWorker(&worker); +} + +void +wait_event_main(Datum main_arg) +{ + int rc; + + InitPostgres(NULL, InvalidOid, NULL, InvalidOid, NULL, false); + SetProcessingMode(NormalProcessing); + pqsignal(SIGTERM, handle_sigterm); + BackgroundWorkerUnblockSignals(); + while (1) + { + if (sigterm) + break; + rc = WaitLatch(&MyProc->procLatch, WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH, 1, PG_WAIT_EXTENSION); + + if (rc & WL_POSTMASTER_DEATH) + proc_exit(1); + + ResetLatch(&MyProc->procLatch); + + update_wait_event(); + } + proc_exit(0); +} + +static uint64 +get_query_id(pgssJumbleState *jstate, Query *query) +{ + uint64 queryid; + + /* Set up workspace for query jumbling */ + jstate->jumble = (unsigned char *) palloc(JUMBLE_SIZE); + jstate->jumble_len = 0; + jstate->clocations_buf_size = 32; + jstate->clocations = (pgssLocationLen *) palloc(jstate->clocations_buf_size * sizeof(pgssLocationLen)); + jstate->clocations_count = 0; + jstate->highest_extern_param_id = 0; + + /* Compute query ID and mark the Query node with it */ + JumbleQuery(jstate, query); + queryid = DatumGetUInt64(hash_any_extended(jstate->jumble, jstate->jumble_len, 0)); + return queryid; +} + diff --git a/pg_stat_monitor.h b/pg_stat_monitor.h new file mode 100644 index 0000000..a76f10f --- /dev/null +++ b/pg_stat_monitor.h @@ -0,0 +1,303 @@ +#ifndef __PG_STAT_MONITOR_H__ +#define __PG_STAT_MONITOR_H__ + +#include "postgres.h" + +#include +#include +#include +#include +#include +#include + +#include "access/hash.h" +#include "catalog/pg_authid.h" +#include "executor/instrument.h" +#include "common/ip.h" +#include "funcapi.h" +#include "access/twophase.h" +#include "mb/pg_wchar.h" +#include "miscadmin.h" +#include "optimizer/planner.h" +#include "postmaster/bgworker.h" +#include "parser/analyze.h" +#include "parser/parsetree.h" +#include "parser/scanner.h" +#include "parser/scansup.h" +#include "pgstat.h" +#include "storage/fd.h" +#include "storage/ipc.h" +#include "storage/spin.h" +#include "tcop/utility.h" +#include "utils/acl.h" +#include "utils/builtins.h" +#include "utils/memutils.h" +#include "utils/timestamp.h" +#include "utils/lsyscache.h" +#include "utils/guc.h" +/* Time difference in miliseconds */ +#define TIMEVAL_DIFF(start, end) (((double) end.tv_sec + (double) end.tv_usec / 1000000.0) \ + - ((double) start.tv_sec + (double) start.tv_usec / 1000000.0)) * 1000 + +#define ArrayGetTextDatum(x) array_get_datum(x) + +/* XXX: Should USAGE_EXEC reflect execution time and/or buffer usage? */ +#define USAGE_EXEC(duration) (1.0) +#define USAGE_INIT (1.0) /* including initial planning */ +#define ASSUMED_MEDIAN_INIT (10.0) /* initial assumed median usage */ +#define ASSUMED_LENGTH_INIT 1024 /* initial assumed mean query length */ +#define USAGE_DECREASE_FACTOR (0.99) /* decreased every entry_dealloc */ +#define STICKY_DECREASE_FACTOR (0.50) /* factor for sticky entries */ +#define USAGE_DEALLOC_PERCENT 5 /* free this % of entries at once */ + +#define JUMBLE_SIZE 1024 /* query serialization buffer size */ + +#define MAX_RESPONSE_BUCKET 10 +#define MAX_REL_LEN 2 +#define MAX_BUCKETS 10 +#define MAX_OBJECT_CACHE 100 + +/* + * Type of aggregate keys + */ +typedef enum AGG_KEY +{ + AGG_KEY_DATABASE = 0, + AGG_KEY_USER, + AGG_KEY_HOST +} AGG_KEY; + +/* Bucket shared_memory storage */ +typedef struct pgssBucketHashKey +{ + uint64 bucket_id; /* bucket number */ +} pgssBucketHashKey; + +typedef struct pgssBucketCounters +{ + Timestamp current_time; /* start time of the bucket */ + int resp_calls[MAX_RESPONSE_BUCKET]; /* execution time's in msec */ +}pgssBucketCounters; + +typedef struct pgssBucketEntry +{ + pgssBucketHashKey key; /* hash key of entry - MUST BE FIRST */ + pgssBucketCounters counters; + slock_t mutex; /* protects the counters only */ +}pgssBucketEntry; + +/* Objects shared memory storage */ +typedef struct pgssObjectHashKey +{ + uint64 queryid; /* query id */ +} pgssObjectHashKey; + +typedef struct pgssObjectEntry +{ + pgssObjectHashKey key; /* hash key of entry - MUST BE FIRST */ + char tables_name[MAX_REL_LEN]; /* table names involved in the query */ + slock_t mutex; /* protects the counters only */ +} pgssObjectEntry; + +/* Aggregate shared memory storage */ +typedef struct pgssAggHashKey +{ + uint64 id; /* dbid, userid or ip depend upon the type */ + uint64 type; /* type of id dbid, userid or ip */ + uint64 queryid; /* query identifier, foreign key to the query */ + uint64 bucket_id; /* bucket_id is the foreign key to pgssBucketHashKey */ +} pgssAggHashKey; + +typedef struct pgssAggCounters +{ + uint64 total_calls; /* number of quries per database/user/ip */ +} pgssAggCounters; + +typedef struct pgssAggEntry +{ + pgssAggHashKey key; /* hash key of entry - MUST BE FIRST */ + pgssAggCounters counters; /* the statistics aggregates */ + slock_t mutex; /* protects the counters only */ +} pgssAggEntry; + + +typedef struct pgssWaitEventKey +{ + uint64 processid; +} pgssWaitEventKey; + +#define MAX_QUERY_LEN 1024 +typedef struct pgssWaitEventEntry +{ + pgssAggHashKey key; /* hash key of entry - MUST BE FIRST */ + uint64 queryid; + uint64 pid; + uint32 wait_event_info; + char query[MAX_QUERY_LEN]; + slock_t mutex; /* protects the counters only */ +} pgssWaitEventEntry; + + +/* shared nenory storage for the query */ +typedef struct pgssHashKey +{ + uint64 bucket_id; /* bucket number */ + uint64 queryid; /* query identifier */ + Oid userid; /* user OID */ + Oid dbid; /* database OID */ +} pgssHashKey; + +typedef struct QueryInfo +{ + uint64 queryid; /* query identifier */ + Oid userid; /* user OID */ + Oid dbid; /* database OID */ + uint host; /* client IP */ + char tables_name[MAX_REL_LEN]; /* table names involved in the query */ +} QueryInfo; + +typedef struct Calls +{ + int64 calls; /* # of times executed */ + int64 rows; /* total # of retrieved or affected rows */ + double usage; /* usage factor */ +} Calls; + +typedef struct CallTime +{ + double total_time; /* total execution time, in msec */ + double min_time; /* minimum execution time in msec */ + double max_time; /* maximum execution time in msec */ + double mean_time; /* mean execution time in msec */ + double sum_var_time; /* sum of variances in execution time in msec */ +} CallTime; + +typedef struct Blocks +{ + int64 shared_blks_hit; /* # of shared buffer hits */ + int64 shared_blks_read; /* # of shared disk blocks read */ + int64 shared_blks_dirtied; /* # of shared disk blocks dirtied */ + int64 shared_blks_written; /* # of shared disk blocks written */ + int64 local_blks_hit; /* # of local buffer hits */ + int64 local_blks_read; /* # of local disk blocks read */ + int64 local_blks_dirtied; /* # of local disk blocks dirtied */ + int64 local_blks_written; /* # of local disk blocks written */ + int64 temp_blks_read; /* # of temp blocks read */ + int64 temp_blks_written; /* # of temp blocks written */ + double blk_read_time; /* time spent reading, in msec */ + double blk_write_time; /* time spent writing, in msec */ +} Blocks; + +typedef struct SysInfo +{ + float utime; /* user cpu time */ + float stime; /* system cpu time */ +} SysInfo; + +/* + * The actual stats counters kept within pgssEntry. + */ +typedef struct Counters +{ + uint64 bucket_id; /* bucket id */ + Calls calls; + QueryInfo info; + CallTime time; + Blocks blocks; + SysInfo sysinfo; +} Counters; + +/* Some global structure to get the cpu usage, really don't like the idea of global variable */ + +/* + * Statistics per statement + */ +typedef struct pgssEntry +{ + pgssHashKey key; /* hash key of entry - MUST BE FIRST */ + Counters counters; /* the statistics for this query */ + int encoding; /* query text encoding */ + slock_t mutex; /* protects the counters only */ +} pgssEntry; + +typedef struct QueryFifo +{ + int head; + int tail; +} QueryFifo; + +/* + * Global shared state + */ +typedef struct pgssSharedState +{ + LWLock *lock; /* protects hashtable search/modification */ + double cur_median_usage; /* current median usage in hashtable */ + slock_t mutex; /* protects following fields only: */ + Size extent; /* current extent of query file */ + int n_writers; /* number of active writers to query file */ + uint64 current_wbucket; + unsigned long prev_bucket_usec; + unsigned long bucket_overflow[MAX_BUCKETS]; + unsigned long bucket_entry[MAX_BUCKETS]; + QueryFifo query_fifo[MAX_BUCKETS]; +} pgssSharedState; + +unsigned char *pgss_qbuf[MAX_BUCKETS]; + +/* + * Struct for tracking locations/lengths of constants during normalization + */ +typedef struct pgssLocationLen +{ + int location; /* start offset in query text */ + int length; /* length in bytes, or -1 to ignore */ +} pgssLocationLen; + +/* + * Working state for computing a query jumble and producing a normalized + * query string + */ +typedef struct pgssJumbleState +{ + /* Jumble of current query tree */ + unsigned char *jumble; + + /* Number of bytes used in jumble[] */ + Size jumble_len; + + /* Array of locations of constants that should be removed */ + pgssLocationLen *clocations; + + /* Allocated length of clocations array */ + int clocations_buf_size; + + /* Current number of valid entries in clocations array */ + int clocations_count; + + /* highest Param id we've seen, in order to start normalization correctly */ + int highest_extern_param_id; +} pgssJumbleState; + +/*---- GUC variables ----*/ + +typedef enum +{ + PGSS_TRACK_NONE, /* track no statements */ + PGSS_TRACK_TOP, /* only top level statements */ + PGSS_TRACK_ALL /* all statements, including nested ones */ +} PGSSTrackLevel; + +static const struct config_enum_entry track_options[] = +{ + {"none", PGSS_TRACK_NONE, false}, + {"top", PGSS_TRACK_TOP, false}, + {"all", PGSS_TRACK_ALL, false}, + {NULL, 0, false} +}; + +#define pgss_enabled() \ + (pgss_track == PGSS_TRACK_ALL || \ + (pgss_track == PGSS_TRACK_TOP && nested_level == 0)) + +#endif