diff --git a/Makefile b/Makefile index a6e622c..28c6f03 100644 --- a/Makefile +++ b/Makefile @@ -4,12 +4,11 @@ MODULE_big = pg_stat_monitor OBJS = pg_stat_monitor.o $(WIN32RES) EXTENSION = pg_stat_monitor - DATA = pg_stat_monitor--1.0.sql PGFILEDESC = "pg_stat_monitor - execution statistics of SQL statements" -LDFLAGS_SL += $(filter -lm, $(LIBS)) +LDFLAGS_SL += $(filter -lm, $(LIBS)) REGRESS_OPTS = --temp-config $(top_srcdir)/contrib/pg_stat_monitor/pg_stat_monitor.conf REGRESS = pg_stat_monitor diff --git a/README.md b/README.md index 655b8b4..0f6d328 100644 --- a/README.md +++ b/README.md @@ -18,11 +18,11 @@ There are two ways to install pg_stat_monitor. The first is by downloading the p ##### Download and compile The latest release of pg_stat_monitor can be downloaded from this GitHub page: - https://github.com/Percona-Lab/pg_stat_monitor/releases + https://github.com/Percona/pg_stat_monitor/releases or it can be downloaded using the git: - git clone git://github.com/Percona-Lab/pg_stat_monitor.git + git clone git://github.com/Percona/pg_stat_monitor.git After downloading the code, set the path for the [PostgreSQL][1] binary: @@ -57,46 +57,46 @@ There are four views, and complete statistics can be accessed using these views. ##### pg_stat_monitor This is the main view which stores per query-based statistics, similar to pg_stat_statment with some additional columns. - \d pg_stat_monitor; - View "public.pg_stat_monitor" - Column | Type | Collation | Nullable | Default - ---------------------+------------------+-----------+----------+--------- - userid | oid | | | - dbid | oid | | | - queryid | bigint | | | - query | text | | | - calls | bigint | | | - total_time | double precision | | | - min_time | double precision | | | - max_time | double precision | | | - mean_time | double precision | | | - stddev_time | double precision | | | - rows | bigint | | | - shared_blks_hit | bigint | | | - shared_blks_read | bigint | | | - shared_blks_dirtied | bigint | | | - shared_blks_written | bigint | | | - local_blks_hit | bigint | | | - local_blks_read | bigint | | | - local_blks_dirtied | bigint | | | - local_blks_written | bigint | | | - temp_blks_read | bigint | | | - temp_blks_written | bigint | | | - blk_read_time | double precision | | | - blk_write_time | double precision | | | - host | integer | | | - hist_calls | text | | | - hist_min_time | text | | | - hist_max_time | text | | | - hist_mean_time | text | | | - slow_query | text | | | - cpu_user_time | double precision | | | - cpu_sys_time | double precision | | | + postgres=# \d pg_stat_monitor; + View "public.pg_stat_monitor" + Column | Type | Collation | Nullable | Default + ---------------------+--------------------------+-----------+----------+--------- + bucket | oid | | | + bucket_start_time | timestamp with time zone | | | + userid | oid | | | + dbid | oid | | | + queryid | text | | | + query | text | | | + calls | bigint | | | + total_time | double precision | | | + min_time | double precision | | | + max_time | double precision | | | + mean_time | double precision | | | + stddev_time | double precision | | | + int8 | bigint | | | + shared_blks_hit | bigint | | | + shared_blks_read | bigint | | | + shared_blks_dirtied | bigint | | | + shared_blks_written | bigint | | | + local_blks_hit | bigint | | | + local_blks_read | bigint | | | + local_blks_dirtied | bigint | | | + local_blks_written | bigint | | | + temp_blks_read | bigint | | | + temp_blks_written | bigint | | | + blk_read_time | double precision | | | + blk_write_time | double precision | | | + host | bigint | | | + client_ip | inet | | | + resp_calls | text[] | | | + cpu_user_time | double precision | | | + cpu_sys_time | double precision | | | + tables_names | text[] | | | These are new column added to have more detail about the query. - host: Client IP or Hostname + client_ip: Client IP or Hostname hist_calls: Hourly based 24 hours calls of query histogram hist_min_time: Hourly based 24 hours min time of query histogram Hist_max_time: Hourly based 24 hours max time of query histogram @@ -106,84 +106,72 @@ These are new column added to have more detail about the query. cpu_sys_time: CPU System time for that query. - regression=# \d pg_stat_agg_database - View "public.pg_stat_agg_database" - Column | Type | Collation | Nullable | Default - ----------------+--------------------------+-----------+----------+--------- - queryid | bigint | | | - dbid | bigint | | | - userid | oid | | | - host | inet | | | - total_calls | integer | | | - min_time | double precision | | | - max_time | double precision | | | - mean_time | double precision | | | - hist_calls | text[] | | | - hist_min_time | text[] | | | - hist_max_time | text[] | | | - hist_mean_time | text[] | | | - first_log_time | timestamp with time zone | | | - last_log_time | timestamp with time zone | | | - cpu_user_time | double precision | | | - cpu_sys_time | double precision | | | - query | text | | | - slow_query | text | | | + postgres=# \d pg_stat_agg_database + View "public.pg_stat_agg_database" + Column | Type | Collation | Nullable | Default + ---------------+------------------+-----------+----------+--------- + bucket | oid | | | + queryid | text | | | + dbid | bigint | | | + userid | oid | | | + client_ip | inet | | | + total_calls | integer | | | + min_time | double precision | | | + max_time | double precision | | | + mean_time | double precision | | | + resp_calls | text[] | | | + cpu_user_time | double precision | | | + cpu_sys_time | double precision | | | + query | text | | | + tables_names | text[] | | | - # \d pg_stat_agg_user - View "public.pg_stat_agg_user" - Column | Type | Collation | Nullable | Default - ----------------+--------------------------+-----------+----------+--------- - queryid | bigint | | | - dbid | bigint | | | - userid | oid | | | - host | inet | | | - total_calls | integer | | | - min_time | double precision | | | - max_time | double precision | | | - mean_time | double precision | | | - hist_calls | text[] | | | - hist_min_time | text[] | | | - hist_max_time | text[] | | | - hist_mean_time | text[] | | | - first_log_time | timestamp with time zone | | | - last_log_time | timestamp with time zone | | | - cpu_user_time | double precision | | | - cpu_sys_time | double precision | | | - query | text | | | - slow_query | text | | | + postgres=# \d pg_stat_agg_user + View "public.pg_stat_agg_user" + Column | Type | Collation | Nullable | Default + ---------------+------------------+-----------+----------+--------- + bucket | oid | | | + queryid | text | | | + dbid | bigint | | | + userid | oid | | | + client_ip | inet | | | + total_calls | integer | | | + min_time | double precision | | | + max_time | double precision | | | + mean_time | double precision | | | + resp_calls | text[] | | | + cpu_user_time | double precision | | | + cpu_sys_time | double precision | | | + query | text | | | + tables_names | text[] | | | - # \d pg_stat_agg_host - View "public.pg_stat_agg_host" - Column | Type | Collation | Nullable | Default - ----------------+--------------------------+-----------+----------+--------- - queryid | bigint | | | - dbid | bigint | | | - userid | oid | | | - host | inet | | | - total_calls | integer | | | - min_time | double precision | | | - max_time | double precision | | | - mean_time | double precision | | | - hist_calls | text[] | | | - hist_min_time | text[] | | | - hist_max_time | text[] | | | - hist_mean_time | text[] | | | - first_log_time | timestamp with time zone | | | - last_log_time | timestamp with time zone | | | - cpu_user_time | double precision | | | - cpu_sys_time | double precision | | | - query | text | | | - slow_query | text | | | + postgres=# \d pg_stat_agg_ip + View "public.pg_stat_agg_ip" + Column | Type | Collation | Nullable | Default + ---------------+------------------+-----------+----------+--------- + bucket | oid | | | + queryid | text | | | + dbid | bigint | | | + userid | oid | | | + client_ip | inet | | | + host | bigint | | | + total_calls | integer | | | + min_time | double precision | | | + max_time | double precision | | | + mean_time | double precision | | | + resp_calls | text[] | | | + cpu_user_time | double precision | | | + cpu_sys_time | double precision | | | + query | text | | | + tables_names | text[] | | | Examples -1 - In this query we are getting the exact value of f1 which is '05:06:07-07' in the case of slow queries. +1 - In this query we are getting the exact value of f1 which is '05:06:07-07' (special setting is required for this). - # select userid, queryid, query, slow_query, max_time, total_calls from pg_stat_agg_user; + # select userid, queryid, query, max_time, total_calls from pg_stat_agg_user; -[ RECORD 1 ]---------------------------------------------------------------------------------------- userid | 10 queryid | -203926152419851453 - query | SELECT f1 FROM TIMETZ_TBL WHERE f1 < $1 - slow_query | SELECT f1 FROM TIMETZ_TBL WHERE f1 < '05:06:07-07'; + query | SELECT f1 FROM TIMETZ_TBL WHERE f1 < '05:06:07-07'; max_time | 1.237875 total_calls | 8 @@ -206,6 +194,6 @@ Copyright (c) 2006 - 2019, Percona LLC. See [`LICENSE`][2] for full details. [1]: https://www.postgresql.org/ -[2]: https://github.com/Percona-Lab/pg_stat_monitor/blob/master/LICENSE -[3]: https://github.com/Percona-Lab/pg_stat_monitor/issues/new +[2]: https://github.com/Percona/pg_stat_monitor/blob/master/LICENSE +[3]: https://github.com/Percona/pg_stat_monitor/issues/new [4]: CONTRIBUTING.md diff --git a/pg_stat_monitor--1.0.sql b/pg_stat_monitor--1.0.sql index f7469d5..54693c3 100644 --- a/pg_stat_monitor--1.0.sql +++ b/pg_stat_monitor--1.0.sql @@ -1,4 +1,4 @@ -/* contrib/pg_stat_monitor/pg_stat_monitor--1.4.sql */ +/* contrib/pg_stat_monitor/pg_stat_monitor--1.1.sql */ -- complain if script is sourced in psql, rather than via CREATE EXTENSION \echo Use "CREATE EXTENSION pg_stat_monitor" to load this file. \quit @@ -10,10 +10,13 @@ AS 'MODULE_PATHNAME' LANGUAGE C PARALLEL SAFE; CREATE FUNCTION pg_stat_monitor(IN showtext boolean, + OUT bucket oid, OUT userid oid, OUT dbid oid, - OUT queryid bigint, + + OUT queryid text, OUT query text, + OUT bucket_start_time timestamptz, OUT calls int8, OUT total_time float8, OUT min_time float8, @@ -33,109 +36,126 @@ CREATE FUNCTION pg_stat_monitor(IN showtext boolean, OUT temp_blks_written int8, OUT blk_read_time float8, OUT blk_write_time float8, - OUT host bigint, - OUT hist_calls text, - OUT hist_min_time text, - OUT hist_max_time text, - OUT hist_mean_time text, - OUT slow_query text, + OUT client_ip bigint, + OUT resp_calls text, OUT cpu_user_time float8, - OUT cpu_sys_time float8 + OUT cpu_sys_time float8, + OUT tables_names text ) RETURNS SETOF record -AS 'MODULE_PATHNAME', 'pg_stat_monitor_1_3' +AS 'MODULE_PATHNAME', 'pg_stat_monitor' LANGUAGE C STRICT VOLATILE PARALLEL SAFE; CREATE FUNCTION pg_stat_agg( - OUT queryid bigint, + OUT queryid text, OUT id bigint, OUT type bigint, - OUT total_calls int, - OUT first_call_time timestamptz, - OUT last_call_time timestamptz) + OUT total_calls int) RETURNS SETOF record AS 'MODULE_PATHNAME', 'pg_stat_agg' LANGUAGE C STRICT VOLATILE PARALLEL SAFE; -- Register a view on the function for ease of use. -CREATE VIEW pg_stat_monitor AS - SELECT * FROM pg_stat_monitor(true); +CREATE VIEW pg_stat_monitor AS SELECT + bucket, + bucket_start_time, + userid, + dbid, + queryid, + query, + calls, + total_time, + min_time, + max_time, + mean_time, + stddev_time, + rows int8, + shared_blks_hit, + shared_blks_read, + shared_blks_dirtied, + shared_blks_written, + local_blks_hit, + local_blks_read, + local_blks_dirtied, + local_blks_written, + temp_blks_read, + temp_blks_written, + blk_read_time, + blk_write_time, + client_ip as host, + '0.0.0.0'::inet + client_ip AS client_ip, + (string_to_array(resp_calls, ',')) resp_calls, + cpu_user_time, + cpu_sys_time, + (string_to_array(tables_names, ',')) tables_names +FROM pg_stat_monitor(true); GRANT SELECT ON pg_stat_monitor TO PUBLIC; CREATE VIEW pg_stat_agg_database AS SELECT + ss.bucket, agg.queryid, agg.id AS dbid, ss.userid, - '0.0.0.0'::inet + ss.host AS host, + client_ip, agg.total_calls, ss.min_time, ss.max_time, ss.mean_time, - (string_to_array(hist_calls, ',')) hist_calls, - (string_to_array(hist_min_time, ',')) hist_min_time, - (string_to_array(hist_max_time, ',')) hist_max_time, - (string_to_array(hist_mean_time, ',')) hist_mean_time, - agg.first_call_time AS first_log_time, - agg.last_call_time AS last_log_time, + ss.resp_calls, ss.cpu_user_time, ss.cpu_sys_time, ss.query, - ss.slow_query + ss.tables_names FROM pg_stat_agg() agg -INNER JOIN (SELECT DISTINCT queryid, dbid, userid, query, host, min_time, max_time, mean_time, hist_calls, hist_min_time, hist_max_time,hist_mean_time,slow_query,cpu_user_time,cpu_sys_time +INNER JOIN (SELECT DISTINCT bucket, queryid, dbid, userid, query, client_ip, min_time, max_time, mean_time, resp_calls, tables_names, cpu_user_time,cpu_sys_time FROM pg_stat_monitor) ss ON agg.queryid = ss.queryid AND agg.type = 0 AND id = dbid; CREATE VIEW pg_stat_agg_user AS SELECT + ss.bucket, agg.queryid, agg.id AS dbid, ss.userid, - '0.0.0.0'::inet + ss.host AS host, + client_ip, agg.total_calls, ss.min_time, ss.max_time, ss.mean_time, - (string_to_array(hist_calls, ',')) hist_calls, - (string_to_array(hist_min_time, ',')) hist_min_time, - (string_to_array(hist_max_time, ',')) hist_max_time, - (string_to_array(hist_mean_time, ',')) hist_mean_time, - agg.first_call_time AS first_log_time, - agg.last_call_time AS last_log_time, + ss.resp_calls, ss.cpu_user_time, ss.cpu_sys_time, ss.query, - ss.slow_query + ss.tables_names FROM pg_stat_agg() agg -INNER JOIN (SELECT DISTINCT queryid, userid, query, host, min_time, max_time, mean_time, hist_calls, hist_min_time, hist_max_time,hist_mean_time,slow_query,cpu_user_time,cpu_sys_time FROM pg_stat_monitor) ss +INNER JOIN (SELECT DISTINCT bucket, queryid, userid, query, client_ip, min_time, max_time, mean_time, resp_calls, tables_names, cpu_user_time,cpu_sys_time FROM pg_stat_monitor) ss ON agg.queryid = ss.queryid AND agg.type = 1 AND id = userid; -CREATE VIEW pg_stat_agg_host AS +CREATE VIEW pg_stat_agg_ip AS SELECT + ss.bucket, agg.queryid, agg.id AS dbid, ss.userid, - '0.0.0.0'::inet + ss.host AS host, + ss.client_ip, + ss.host, agg.total_calls, ss.min_time, ss.max_time, ss.mean_time, - (string_to_array(hist_calls, ',')) hist_calls, - (string_to_array(hist_min_time, ',')) hist_min_time, - (string_to_array(hist_max_time, ',')) hist_max_time, - (string_to_array(hist_mean_time, ',')) hist_mean_time, - agg.first_call_time AS first_log_time, - agg.last_call_time AS last_log_time, + ss.resp_calls, ss.cpu_user_time, ss.cpu_sys_time, ss.query, - ss.slow_query + ss.tables_names FROM pg_stat_agg() agg -INNER JOIN (SELECT DISTINCT queryid, userid, query, host, min_time, max_time, mean_time, hist_calls, hist_min_time, hist_max_time,hist_mean_time,slow_query,cpu_user_time,cpu_sys_time FROM pg_stat_monitor) ss +INNER JOIN (SELECT DISTINCT bucket, queryid, userid, query, client_ip, host, min_time, max_time, mean_time, resp_calls, tables_names, cpu_user_time,cpu_sys_time FROM pg_stat_monitor) ss ON agg.queryid = ss.queryid AND agg.type = 2 AND id = host; +GRANT SELECT ON pg_stat_agg_user TO PUBLIC; +GRANT SELECT ON pg_stat_agg_ip TO PUBLIC; GRANT SELECT ON pg_stat_agg_database TO PUBLIC; -- Don't want this to be available to non-superusers. diff --git a/pg_stat_monitor.c b/pg_stat_monitor.c index c98571b..5c9c028 100644 --- a/pg_stat_monitor.c +++ b/pg_stat_monitor.c @@ -3,51 +3,6 @@ * pg_stat_monitor.c * Track statement execution times across a whole database cluster. * - * Execution costs are totalled for each distinct source query, and kept in - * a shared hashtable. (We track only as many distinct queries as will fit - * in the designated amount of shared memory.) - * - * As of Postgres 9.2, this module normalizes query entries. Normalization - * is a process whereby similar queries, typically differing only in their - * constants (though the exact rules are somewhat more subtle than that) are - * recognized as equivalent, and are tracked as a single entry. This is - * particularly useful for non-prepared queries. - * - * Normalization is implemented by fingerprinting queries, selectively - * serializing those fields of each query tree's nodes that are judged to be - * essential to the query. This is referred to as a query jumble. This is - * distinct from a regular serialization in that various extraneous - * information is ignored as irrelevant or not essential to the query, such - * as the collations of Vars and, most notably, the values of constants. - * - * This jumble is acquired at the end of parse analysis of each query, and - * a 64-bit hash of it is stored into the query's Query.queryId field. - * The server then copies this value around, making it available in plan - * tree(s) generated from the query. The executor can then use this value - * to blame query costs on the proper queryId. - * - * To facilitate presenting entries to users, we create "representative" query - * strings in which constants are replaced with parameter symbols ($n), to - * make it clearer what a normalized entry can represent. To save on shared - * memory, and to avoid having to truncate oversized query strings, we store - * these strings in a temporary external query-texts file. Offsets into this - * file are kept in shared memory. - * - * Note about locking issues: to create or delete an entry in the shared - * hashtable, one must hold pgss->lock exclusively. Modifying any field - * in an entry except the counters requires the same. To look up an entry, - * one must hold the lock shared. To read or update the counters within - * an entry, one must hold the lock shared or exclusive (so the entry doesn't - * disappear!) and also take the entry's mutex spinlock. - * The shared state variable pgss->extent (the next free spot in the external - * query-text file) should be accessed only while holding either the - * pgss->mutex spinlock, or exclusive lock on pgss->lock. We use the mutex to - * allow reserving file space while holding only shared lock on pgss->lock. - * Rewriting the entire external query-text file, eg for garbage collection, - * requires holding pgss->lock exclusively; this allows individual entries - * in the file to be read or written while holding only shared lock. - * - * * Copyright (c) 2008-2018, PostgreSQL Global Development Group * * IDENTIFICATION @@ -84,38 +39,16 @@ #include "utils/builtins.h" #include "utils/memutils.h" #include "utils/timestamp.h" - +#include "utils/lsyscache.h" PG_MODULE_MAGIC; -/* Maximum length of the stord query (with actual values) in shared mememory, */ -#define MAX_QUERY_LEN 255 - /* Time difference in miliseconds */ -#define TIMEVAL_DIFF(start, end) (((double) end.tv_sec + (double) end.tv_usec / 1000000.0) \ +#define TIMEVAL_DIFF(start, end) (((double) end.tv_sec + (double) end.tv_usec / 1000000.0) \ - ((double) start.tv_sec + (double) start.tv_usec / 1000000.0)) * 1000 - -/* Location of permanent stats file (valid when database is shut down) */ -#define PGSS_DUMP_FILE PGSTAT_STAT_PERMANENT_DIRECTORY "/pg_stat_monitor.stat" #define ArrayGetTextDatum(x) array_get_datum(x) -/* - * Location of external query text file. We don't keep it in the core - * system's stats_temp_directory. The core system can safely use that GUC - * setting, because the statistics collector temp file paths are set only once - * as part of changing the GUC, but pg_stat_statements has no way of avoiding - * race conditions. Besides, we only expect modest, infrequent I/O for query - * strings, so placing the file on a faster filesystem is not compelling. - */ -#define PGSS_TEXT_FILE PG_STAT_TMP_DIR "/pgsm_query_texts.stat" - -/* Magic number identifying the stats file format */ -static const uint32 PGSS_FILE_HEADER = 0x20171004; - -/* PostgreSQL major version number, changes in which invalidate all entries */ -static const uint32 PGSS_PG_MAJOR_VERSION = PG_VERSION_NUM / 100; - /* XXX: Should USAGE_EXEC reflect execution time and/or buffer usage? */ #define USAGE_EXEC(duration) (1.0) #define USAGE_INIT (1.0) /* including initial planning */ @@ -127,129 +60,181 @@ static const uint32 PGSS_PG_MAJOR_VERSION = PG_VERSION_NUM / 100; #define JUMBLE_SIZE 1024 /* query serialization buffer size */ -/* - * Extension version number, for supporting older extension versions' objects - */ -typedef enum pgssVersion -{ - PGSS_V1_0 = 0, - PGSS_V1_1, - PGSS_V1_2, - PGSS_V1_3, - PGSS_V1_3_EXTENDED_1, /* Extended verion based on 3.1 */ -} pgssVersion; +#define MAX_RESPONSE_BUCKET 10 +#define MAX_REL_LEN 2 +#define MAX_BUCKETS 10 +#define MAX_OBJECT_CACHE 100 +/* + * Type of aggregate keys + */ +typedef enum AGG_KEY +{ + AGG_KEY_DATABASE = 0, + AGG_KEY_USER, + AGG_KEY_HOST +} AGG_KEY; + +/* Bucket shared_memory storage */ +typedef struct pgssBucketHashKey +{ + uint64 bucket_id; /* bucket number */ +} pgssBucketHashKey; + +typedef struct pgssBucketCounters +{ + Timestamp current_time; /* start time of the bucket */ + int resp_calls[MAX_RESPONSE_BUCKET]; /* execution time's in msec */ +}pgssBucketCounters; + +typedef struct pgssBucketEntry +{ + pgssBucketHashKey key; /* hash key of entry - MUST BE FIRST */ + pgssBucketCounters counters; + slock_t mutex; /* protects the counters only */ +}pgssBucketEntry; + +/* Objects shared memory storage */ +typedef struct pgssObjectHashKey +{ + uint64 queryid; /* query id */ +} pgssObjectHashKey; + +typedef struct pgssObjectEntry +{ + pgssObjectHashKey key; /* hash key of entry - MUST BE FIRST */ + char tables_name[MAX_REL_LEN]; /* table names involved in the query */ + slock_t mutex; /* protects the counters only */ +} pgssObjectEntry; + +/* Aggregate shared memory storage */ typedef struct pgssAggHashKey { - uint64 queryid; /* query identifier */ - uint64 id; /* dbid, userid or ip depend upon the type */ - uint64 type; /* query identifier */ + uint64 id; /* dbid, userid or ip depend upon the type */ + uint64 type; /* type of id dbid, userid or ip */ + uint64 queryid; /* query identifier, foreign key to the query */ + uint64 bucket_id; /* bucket_id is the foreign key to pgssBucketHashKey */ } pgssAggHashKey; typedef struct pgssAggCounters { uint64 total_calls; /* number of quries per database/user/ip */ - Timestamp first_call_time; /* first stats collection time */ - Timestamp last_call_time; /* last stats collection time */ } pgssAggCounters; typedef struct pgssAggEntry { - pgssAggHashKey key; /* hash key of entry - MUST BE FIRST */ + pgssAggHashKey key; /* hash key of entry - MUST BE FIRST */ pgssAggCounters counters; /* the statistics aggregates */ - slock_t mutex; /* protects the counters only */ + slock_t mutex; /* protects the counters only */ } pgssAggEntry; -/* - * Hashtable key that defines the identity of a hashtable entry. We separate - * queries by user and by database even if they are otherwise identical. - * - * Right now, this structure contains no padding. If you add any, make sure - * to teach pgss_store() to zero the padding bytes. Otherwise, things will - * break, because pgss_hash is created using HASH_BLOBS, and thus tag_hash - * is used to hash this. - */ +/* shared nenory storage for the query */ typedef struct pgssHashKey { - Oid userid; /* user OID */ - Oid dbid; /* database OID */ + uint64 bucket_id; /* bucket number */ uint64 queryid; /* query identifier */ + Oid userid; /* user OID */ + Oid dbid; /* database OID */ } pgssHashKey; +typedef struct QueryInfo +{ + uint64 queryid; /* query identifier */ + Oid userid; /* user OID */ + Oid dbid; /* database OID */ + uint host; /* client IP */ + char tables_name[MAX_REL_LEN]; /* table names involved in the query */ +} QueryInfo; + +typedef struct Calls +{ + int64 calls; /* # of times executed */ + int64 rows; /* total # of retrieved or affected rows */ + double usage; /* usage factor */ +} Calls; + +typedef struct CallTime +{ + double total_time; /* total execution time, in msec */ + double min_time; /* minimum execution time in msec */ + double max_time; /* maximum execution time in msec */ + double mean_time; /* mean execution time in msec */ + double sum_var_time; /* sum of variances in execution time in msec */ +} CallTime; + +typedef struct Blocks +{ + int64 shared_blks_hit; /* # of shared buffer hits */ + int64 shared_blks_read; /* # of shared disk blocks read */ + int64 shared_blks_dirtied; /* # of shared disk blocks dirtied */ + int64 shared_blks_written; /* # of shared disk blocks written */ + int64 local_blks_hit; /* # of local buffer hits */ + int64 local_blks_read; /* # of local disk blocks read */ + int64 local_blks_dirtied; /* # of local disk blocks dirtied */ + int64 local_blks_written; /* # of local disk blocks written */ + int64 temp_blks_read; /* # of temp blocks read */ + int64 temp_blks_written; /* # of temp blocks written */ + double blk_read_time; /* time spent reading, in msec */ + double blk_write_time; /* time spent writing, in msec */ +} Blocks; + +typedef struct SysInfo +{ + float utime; /* user cpu time */ + float stime; /* system cpu time */ +} SysInfo; + /* * The actual stats counters kept within pgssEntry. */ typedef struct Counters { - int64 calls; /* # of times executed */ - double total_time; /* total execution time, in msec */ - double min_time; /* minimum execution time in msec */ - double max_time; /* maximum execution time in msec */ - double mean_time; /* mean execution time in msec */ - double sum_var_time; /* sum of variances in execution time in msec */ - int64 rows; /* total # of retrieved or affected rows */ - int64 shared_blks_hit; /* # of shared buffer hits */ - int64 shared_blks_read; /* # of shared disk blocks read */ - int64 shared_blks_dirtied; /* # of shared disk blocks dirtied */ - int64 shared_blks_written; /* # of shared disk blocks written */ - int64 local_blks_hit; /* # of local buffer hits */ - int64 local_blks_read; /* # of local disk blocks read */ - int64 local_blks_dirtied; /* # of local disk blocks dirtied */ - int64 local_blks_written; /* # of local disk blocks written */ - int64 temp_blks_read; /* # of temp blocks read */ - int64 temp_blks_written; /* # of temp blocks written */ - double blk_read_time; /* time spent reading, in msec */ - double blk_write_time; /* time spent writing, in msec */ - double usage; /* usage factor */ - - /* Extra counters for extended version */ - uint host; /* client IP */ - Timestamp first_call_time; /* first stats collection time */ - Timestamp last_call_time; /* last stats collection time */ - double hist_calls[24]; /* execution time's histogram in msec */ - double hist_min_time[24]; /* min execution time's histogram in msec */ - double hist_max_time[24]; /* max execution time's histogram in msec */ - double hist_mean_time[24]; /* mean execution time's histogram in msec */ - char slow_query[MAX_QUERY_LEN]; /* slowes query */ - float utime; /* user cpu time */ - float stime; /* system cpu time */ + uint64 bucket_id; /* bucket id */ + Calls calls; + QueryInfo info; + CallTime time; + Blocks blocks; + SysInfo sysinfo; } Counters; /* Some global structure to get the cpu usage, really don't like the idea of global variable */ -static struct rusage rusage_start; -static struct rusage rusage_end; /* * Statistics per statement - * - * Note: in event of a failure in garbage collection of the query text file, - * we reset query_offset to zero and query_len to -1. This will be seen as - * an invalid state by qtext_fetch(). */ typedef struct pgssEntry { - pgssHashKey key; /* hash key of entry - MUST BE FIRST */ - Counters counters; /* the statistics for this query */ - int query_offset; /* query text offset in external file */ - int query_len; /* # of valid bytes in query string, or -1 */ - int encoding; /* query text encoding */ - slock_t mutex; /* protects the counters only */ + pgssHashKey key; /* hash key of entry - MUST BE FIRST */ + Counters counters; /* the statistics for this query */ + int encoding; /* query text encoding */ + slock_t mutex; /* protects the counters only */ } pgssEntry; +typedef struct QueryFifo +{ + int head; + int tail; +} QueryFifo; + /* * Global shared state */ typedef struct pgssSharedState { - LWLock *lock; /* protects hashtable search/modification */ - double cur_median_usage; /* current median usage in hashtable */ - Size mean_query_len; /* current mean entry text length */ - slock_t mutex; /* protects following fields only: */ - Size extent; /* current extent of query file */ - int n_writers; /* number of active writers to query file */ - int gc_count; /* query file garbage collection cycle count */ + LWLock *lock; /* protects hashtable search/modification */ + double cur_median_usage; /* current median usage in hashtable */ + slock_t mutex; /* protects following fields only: */ + Size extent; /* current extent of query file */ + int n_writers; /* number of active writers to query file */ + uint64 current_wbucket; + unsigned long prev_bucket_usec; + unsigned long bucket_overflow[MAX_BUCKETS]; + unsigned long bucket_entry[MAX_BUCKETS]; + QueryFifo query_fifo[MAX_BUCKETS]; } pgssSharedState; +unsigned char *pgss_qbuf[MAX_BUCKETS]; + /* * Struct for tracking locations/lengths of constants during normalization */ @@ -288,6 +273,11 @@ typedef struct pgssJumbleState /* Current nesting depth of ExecutorRun+ProcessUtility calls */ static int nested_level = 0; +static double respose_time_lower_bound = .01; +static double respose_time_step = .1; +static struct rusage rusage_start; +static struct rusage rusage_end; + /* Saved hook values in case of unload */ static shmem_startup_hook_type prev_shmem_startup_hook = NULL; @@ -301,10 +291,16 @@ static ProcessUtility_hook_type prev_ProcessUtility = NULL; /* Links to shared memory state */ static pgssSharedState *pgss = NULL; static HTAB *pgss_hash = NULL; +static HTAB *pgss_object_hash = NULL; /* Hash table for aggegates */ static HTAB *pgss_agghash = NULL; +/* Hash table for aggegates */ +static HTAB *pgss_buckethash = NULL; + +static pgssBucketEntry **pgssBucketEntries = NULL; + /*---- GUC variables ----*/ typedef enum @@ -312,7 +308,7 @@ typedef enum PGSS_TRACK_NONE, /* track no statements */ PGSS_TRACK_TOP, /* only top level statements */ PGSS_TRACK_ALL /* all statements, including nested ones */ -} PGSSTrackLevel; +} PGSSTrackLevel; static const struct config_enum_entry track_options[] = { @@ -322,50 +318,47 @@ static const struct config_enum_entry track_options[] = {NULL, 0, false} }; -static int pgss_max; /* max # statements to track */ -static int pgss_track; /* tracking level */ -static bool pgss_track_utility; /* whether to track utility commands */ -static bool pgss_save; /* whether to save stats across shutdown */ - +static int pgss_max; /* max # statements to track */ +static int pgss_track; /* tracking level */ +static bool pgss_track_utility; /* whether to track utility commands */ +static bool pgss_save; /* whether to save stats across shutdown */ +static int max_bucket_time; +static int max_buckets; +static int max_bucket_size; /* max # statements to track in a bucket */ +static int max_object_cache; +static bool pgss_normalized_query; +static int pgss_query_max_len; +static int pgss_query_buf_size; +static int pgss_query_buf_size_bucket; #define pgss_enabled() \ (pgss_track == PGSS_TRACK_ALL || \ (pgss_track == PGSS_TRACK_TOP && nested_level == 0)) -#define record_gc_qtexts() \ - do { \ - volatile pgssSharedState *s = (volatile pgssSharedState *) pgss; \ - SpinLockAcquire(&s->mutex); \ - s->gc_count++; \ - SpinLockRelease(&s->mutex); \ - } while(0) - /*---- Function declarations ----*/ -void _PG_init(void); -void _PG_fini(void); +void _PG_init(void); +void _PG_fini(void); PG_FUNCTION_INFO_V1(pg_stat_monitor_reset); PG_FUNCTION_INFO_V1(pg_stat_monitor_1_2); PG_FUNCTION_INFO_V1(pg_stat_monitor_1_3); PG_FUNCTION_INFO_V1(pg_stat_monitor); - /* Extended version function prototypes */ PG_FUNCTION_INFO_V1(pg_stat_agg); -static uint pg_get_client_addr(); -static Datum array_get_datum(double arr[]); -static void update_agg_counters(uint64 queryid, uint64 id, uint64 type); -static void hash_remove_agg(uint64 queryid); +static uint pg_get_client_addr(void); +static Datum array_get_datum(int arr[]); + +static void update_agg_counters(uint64 bucket_id, uint64 queryid, uint64 id, AGG_KEY type); static pgssAggEntry *agg_entry_alloc(pgssAggHashKey *key); +void add_object_entry(uint64 queryid, char *objects); static void pgss_shmem_startup(void); static void pgss_shmem_shutdown(int code, Datum arg); static void pgss_post_parse_analyze(ParseState *pstate, Query *query); static void pgss_ExecutorStart(QueryDesc *queryDesc, int eflags); -static void pgss_ExecutorRun(QueryDesc *queryDesc, - ScanDirection direction, - uint64 count, bool execute_once); +static void pgss_ExecutorRun(QueryDesc *queryDesc, ScanDirection direction, uint64 count, bool execute_once); static void pgss_ExecutorFinish(QueryDesc *queryDesc); static void pgss_ExecutorEnd(QueryDesc *queryDesc); static void pgss_ProcessUtility(PlannedStmt *pstmt, const char *queryString, @@ -379,21 +372,12 @@ static void pgss_store(const char *query, uint64 queryId, const BufferUsage *bufusage, float utime, float stime, pgssJumbleState *jstate); static void pg_stat_monitor_internal(FunctionCallInfo fcinfo, - pgssVersion api_version, bool showtext); static Size pgss_memsize(void); -static pgssEntry *entry_alloc(pgssHashKey *key, Size query_offset, int query_len, - int encoding, bool sticky); +static pgssEntry *entry_alloc(pgssSharedState *pgss, pgssHashKey *key, Size query_offset, int query_len, int encoding, bool sticky); -static void entry_dealloc(void); -static bool qtext_store(const char *query, int query_len, - Size *query_offset, int *gc_count); -static char *qtext_load_file(Size *buffer_size); -static char *qtext_fetch(Size query_offset, int query_len, - char *buffer, Size buffer_size); -static bool need_gc_qtexts(void); -static void gc_qtexts(void); -static void entry_reset(void); +static void entry_dealloc(int bucket_id); +static void entry_reset(int bucket_id); static void AppendJumble(pgssJumbleState *jstate, const unsigned char *item, Size size); static void JumbleQuery(pgssJumbleState *jstate, Query *query); @@ -404,9 +388,12 @@ static char *generate_normalized_query(pgssJumbleState *jstate, const char *quer int query_loc, int *query_len_p, int encoding); static void fill_in_constant_lengths(pgssJumbleState *jstate, const char *query, int query_loc); -static int comp_location(const void *a, const void *b); +static int comp_location(const void *a, const void *b); +static uint64 get_next_wbucket(pgssSharedState *pgss); +static void store_query(unsigned long queryid, const char *query, unsigned long query_len); +static unsigned long locate_query(unsigned long bucket_id, unsigned long queryid, char * query); /* * Module load callback */ @@ -432,13 +419,27 @@ _PG_init(void) NULL, &pgss_max, 5000, - 5, + 5000, INT_MAX, PGC_POSTMASTER, 0, NULL, NULL, NULL); + + DefineCustomIntVariable("pg_stat_monitor.query_max_len", + "Sets the maximum length of query", + NULL, + &pgss_query_max_len, + 1024, + 1024, + INT_MAX, + PGC_POSTMASTER, + 0, + NULL, + NULL, + NULL); + DefineCustomEnumVariable("pg_stat_monitor.track", "Selects which statements are tracked by pg_stat_monitor.", NULL, @@ -462,19 +463,111 @@ _PG_init(void) NULL, NULL); - DefineCustomBoolVariable("pg_stat_monitor.save", - "Save pg_stat_monitor statistics across server shutdowns.", + DefineCustomBoolVariable("pg_stat_monitor.normalized_query", + "Selects whether save query in normalized format.", NULL, - &pgss_save, + &pgss_normalized_query, true, - PGC_SIGHUP, + PGC_SUSET, 0, NULL, NULL, NULL); + + DefineCustomBoolVariable("pg_stat_monitor.save", + "Save pg_stat_monitor statistics across server shutdowns.", + NULL, + &pgss_save, + true, + PGC_SIGHUP, + 0, + NULL, + NULL, + NULL); + + DefineCustomIntVariable("pg_stat_monitor.max_buckets ", + "Sets the maximum number of buckets.", + NULL, + &max_buckets, + MAX_BUCKETS, + 1, + MAX_BUCKETS, + PGC_POSTMASTER, + 0, + NULL, + NULL, + NULL); + + DefineCustomIntVariable("pg_stat_monitor.bucket_time", + "Sets the time in seconds per bucket.", + NULL, + &max_bucket_time, + 60, + 1, + INT_MAX, + PGC_POSTMASTER, + 0, + NULL, + NULL, + NULL); + + DefineCustomIntVariable("pg_stat_monitor.max_object_cache ", + "Sets the maximum number of object cache", + NULL, + &max_object_cache, + MAX_OBJECT_CACHE, + 10, + MAX_OBJECT_CACHE, + PGC_POSTMASTER, + 0, + NULL, + NULL, + NULL); + + DefineCustomRealVariable("pg_stat_monitor.respose_time_lower_bound", + "Sets the time in millisecond.", + NULL, + &respose_time_lower_bound, + .1, + .1, + INT_MAX, + PGC_POSTMASTER, + 0, + NULL, + NULL, + NULL); + + DefineCustomRealVariable("pg_stat_monitor.respose_time_step", + "Sets the respose time steps in millisecond.", + NULL, + &respose_time_step, + .1, + .1, + INT_MAX, + PGC_POSTMASTER, + 0, + NULL, + NULL, + NULL); + + DefineCustomIntVariable("pg_stat_monitor.shared_buffer", + "Sets the shared_buffer size", + NULL, + &pgss_query_buf_size, + 500000, + 500000, + INT_MAX, + PGC_POSTMASTER, + 0, + NULL, + NULL, + NULL); + EmitWarningsOnPlaceholders("pg_stat_monitor"); + max_bucket_size = pgss_max / max_buckets; + /* * Request additional shared resources. (These are no-ops if we're not in * the postmaster process.) We'll allocate or attach to the shared @@ -488,6 +581,7 @@ _PG_init(void) */ prev_shmem_startup_hook = shmem_startup_hook; shmem_startup_hook = pgss_shmem_startup; + prev_post_parse_analyze_hook = post_parse_analyze_hook; post_parse_analyze_hook = pgss_post_parse_analyze; prev_ExecutorStart = ExecutorStart_hook; @@ -529,15 +623,7 @@ pgss_shmem_startup(void) { bool found = false; HASHCTL info; - FILE *file = NULL; - FILE *qfile = NULL; - uint32 header; - int32 num; - int32 agg_num; - int32 pgver; int32 i; - int buffer_size; - char *buffer = NULL; if (prev_shmem_startup_hook) prev_shmem_startup_hook(); @@ -545,8 +631,9 @@ pgss_shmem_startup(void) /* reset in case this is a restart within the postmaster */ pgss = NULL; pgss_hash = NULL; + pgss_object_hash = NULL; pgss_agghash = NULL; - + pgss_buckethash = NULL; /* * Create or attach to the shared memory state, including hash table */ @@ -558,34 +645,71 @@ pgss_shmem_startup(void) /* First time through ... */ pgss->lock = &(GetNamedLWLockTranche("pg_stat_monitor"))->lock; pgss->cur_median_usage = ASSUMED_MEDIAN_INIT; - pgss->mean_query_len = ASSUMED_LENGTH_INIT; SpinLockInit(&pgss->mutex); pgss->extent = 0; pgss->n_writers = 0; - pgss->gc_count = 0; } + pgss_query_buf_size_bucket = pgss_query_buf_size / max_buckets; + for (i = 0; i < max_buckets; i++) + pgss_qbuf[i] = (unsigned char *) ShmemAlloc(pgss_query_buf_size_bucket); + memset(&info, 0, sizeof(info)); info.keysize = sizeof(pgssHashKey); info.entrysize = sizeof(pgssEntry); - pgss_hash = ShmemInitHash("pg_stat_monitor hash", - pgss_max, pgss_max, - &info, - HASH_ELEM | HASH_BLOBS); + pgss_hash = ShmemInitHash("pg_stat_monitor: Queries hashtable", + pgss_max, pgss_max, + &info, + HASH_ELEM | HASH_BLOBS); memset(&info, 0, sizeof(info)); - info.keysize = sizeof(pgssAggHashKey); - info.entrysize = sizeof(pgssAggEntry); + info.keysize = sizeof(pgssBucketHashKey); + info.entrysize = sizeof(pgssBucketEntry); + + pgss_buckethash = ShmemInitHash("pg_stat_monitor: Buckets hashtable", + max_buckets, max_buckets, + &info, + HASH_ELEM | HASH_BLOBS); + + memset(&info, 0, sizeof(info)); + info.keysize = sizeof(pgssObjectHashKey); + info.entrysize = sizeof(pgssObjectEntry); + + pgss_object_hash = ShmemInitHash("pg_stat_monitor: Objects hashtable", + max_object_cache, max_object_cache, + &info, + HASH_ELEM | HASH_BLOBS); + + pgssBucketEntries = malloc(sizeof (pgssBucketEntry) * max_buckets); + for (i = 0; i < max_buckets; i++) + { + pgssBucketHashKey key; + pgssBucketEntry *entry = NULL; + bool found = false; + + key.bucket_id = i; + /* Find or create an entry with desired hash code */ + entry = (pgssBucketEntry *) hash_search(pgss_buckethash, &key, HASH_ENTER, &found); + if (!found) + { + memset(&entry->counters, 0, sizeof(pgssBucketCounters)); + SpinLockInit(&entry->mutex); + pgssBucketEntries[i] = entry; + } + } /* * Create a aggregate hash 3 times than the the normal hash because we have * three different type of aggregate stored in the aggregate hash. Aggregate * by database, aggraget by user and aggragete by host. */ - pgss_agghash = ShmemInitHash("pg_stat_monitor aggrage_hash", - pgss_max * 3, pgss_max * 3, - &info, - HASH_ELEM | HASH_BLOBS); + memset(&info, 0, sizeof(info)); + info.keysize = sizeof(pgssAggHashKey); + info.entrysize = sizeof(pgssAggEntry); + pgss_agghash = ShmemInitHash("pg_stat_monitor: Aggregated Information hashtable", + pgss_max * 3, pgss_max * 3, + &info, + HASH_ELEM | HASH_BLOBS); LWLockRelease(AddinShmemInitLock); /* @@ -594,173 +718,6 @@ pgss_shmem_startup(void) */ if (!IsUnderPostmaster) on_shmem_exit(pgss_shmem_shutdown, (Datum) 0); - - /* - * Done if some other process already completed our initialization. - */ - if (found) - return; - - /* - * Note: we don't bother with locks here, because there should be no other - * processes running when this code is reached. - */ - - /* Unlink query text file possibly left over from crash */ - unlink(PGSS_TEXT_FILE); - - /* Allocate new query text temp file */ - qfile = AllocateFile(PGSS_TEXT_FILE, PG_BINARY_W); - if (qfile == NULL) - goto write_error; - - /* - * If we were told not to load old statistics, we're done. (Note we do - * not try to unlink any old dump file in this case. This seems a bit - * questionable but it's the historical behavior.) - */ - if (!pgss_save) - { - FreeFile(qfile); - return; - } - - /* - * Attempt to load old statistics from the dump file. - */ - file = AllocateFile(PGSS_DUMP_FILE, PG_BINARY_R); - if (file == NULL) - { - if (errno != ENOENT) - goto read_error; - /* No existing persisted stats file, so we're done */ - FreeFile(qfile); - return; - } - - buffer_size = 2048; - buffer = (char *) palloc(buffer_size); - - if (fread(&header, sizeof(uint32), 1, file) != 1 || - fread(&pgver, sizeof(uint32), 1, file) != 1 || - fread(&num, sizeof(int32), 1, file) != 1 || - fread(&agg_num, sizeof(int32), 1, file) != 1) - goto read_error; - - if (header != PGSS_FILE_HEADER || - pgver != PGSS_PG_MAJOR_VERSION) - goto data_error; - - for (i = 0; i < num; i++) - { - pgssEntry temp; - pgssEntry *entry; - Size query_offset; - - if (fread(&temp, sizeof(pgssEntry), 1, file) != 1) - goto read_error; - - /* Encoding is the only field we can easily sanity-check */ - if (!PG_VALID_BE_ENCODING(temp.encoding)) - goto data_error; - - /* Resize buffer as needed */ - if (temp.query_len >= buffer_size) - { - buffer_size = Max(buffer_size * 2, temp.query_len + 1); - buffer = repalloc(buffer, buffer_size); - } - - if (fread(buffer, 1, temp.query_len + 1, file) != temp.query_len + 1) - goto read_error; - - /* Should have a trailing null, but let's make sure */ - buffer[temp.query_len] = '\0'; - - /* Skip loading "sticky" entries */ - if (temp.counters.calls == 0) - continue; - - /* Store the query text */ - query_offset = pgss->extent; - if (fwrite(buffer, 1, temp.query_len + 1, qfile) != temp.query_len + 1) - goto write_error; - pgss->extent += temp.query_len + 1; - - /* make the hashtable entry (discards old entries if too many) */ - entry = entry_alloc(&temp.key, query_offset, temp.query_len, - temp.encoding, - false); - - /* copy in the actual stats */ - entry->counters = temp.counters; - } - - /* Read the aggregates information from the file. */ - for (i = 0; i < agg_num; i++) - { - pgssAggEntry temp; - pgssAggEntry *entry; - - if (fread(&temp, sizeof(pgssAggEntry), 1, file) != 1) - goto read_error; - - entry = agg_entry_alloc(&temp.key); - memcpy(entry, &temp, sizeof(pgssAggEntry)); - } - - pfree(buffer); - FreeFile(file); - FreeFile(qfile); - - /* - * Remove the persisted stats file so it's not included in - * backups/replication slaves, etc. A new file will be written on next - * shutdown. - * - * Note: it's okay if the PGSS_TEXT_FILE is included in a basebackup, - * because we remove that file on startup; it acts inversely to - * PGSS_DUMP_FILE, in that it is only supposed to be around when the - * server is running, whereas PGSS_DUMP_FILE is only supposed to be around - * when the server is not running. Leaving the file creates no danger of - * a newly restored database having a spurious record of execution costs, - * which is what we're really concerned about here. - */ - unlink(PGSS_DUMP_FILE); - - return; - -read_error: - ereport(LOG, - (errcode_for_file_access(), - errmsg("could not read pg_stat_monitor file \"%s\": %m", - PGSS_DUMP_FILE))); - goto fail; -data_error: - ereport(LOG, - (errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("ignoring invalid data in pg_stat_monitor file \"%s\"", - PGSS_DUMP_FILE))); - goto fail; -write_error: - ereport(LOG, - (errcode_for_file_access(), - errmsg("could not write pg_stat_monitor file \"%s\": %m", - PGSS_TEXT_FILE))); -fail: - if (buffer) - pfree(buffer); - if (file) - FreeFile(file); - if (qfile) - FreeFile(qfile); - /* If possible, throw away the bogus file; ignore any error */ - unlink(PGSS_DUMP_FILE); - - /* - * Don't unlink PGSS_TEXT_FILE here; it should always be around while the - * server is running with pg_stat_statements enabled - */ } /* @@ -772,14 +729,6 @@ fail: static void pgss_shmem_shutdown(int code, Datum arg) { - FILE *file; - char *qbuffer = NULL; - Size qbuffer_size = 0; - HASH_SEQ_STATUS hash_seq; - int32 num_entries; - pgssEntry *entry; - pgssAggEntry *aggentry; - /* Don't try to dump during a crash. */ if (code) return; @@ -787,102 +736,9 @@ pgss_shmem_shutdown(int code, Datum arg) /* Safety check ... shouldn't get here unless shmem is set up. */ if (!pgss || !pgss_hash) return; - - /* Don't dump if told not to. */ - if (!pgss_save) - return; - - file = AllocateFile(PGSS_DUMP_FILE ".tmp", PG_BINARY_W); - if (file == NULL) - goto error; - - if (fwrite(&PGSS_FILE_HEADER, sizeof(uint32), 1, file) != 1) - goto error; - - if (fwrite(&PGSS_PG_MAJOR_VERSION, sizeof(uint32), 1, file) != 1) - goto error; - - num_entries = hash_get_num_entries(pgss_hash); - if (fwrite(&num_entries, sizeof(int32), 1, file) != 1) - goto error; - - num_entries = hash_get_num_entries(pgss_agghash); - if (fwrite(&num_entries, sizeof(int32), 1, file) != 1) - goto error; - - qbuffer = qtext_load_file(&qbuffer_size); - if (qbuffer == NULL) - goto error; - - /* - * When serializing to disk, we store query texts immediately after their - * entry data. Any orphaned query texts are thereby excluded. - */ - hash_seq_init(&hash_seq, pgss_hash); - while ((entry = hash_seq_search(&hash_seq)) != NULL) - { - int len = entry->query_len; - char *qstr = qtext_fetch(entry->query_offset, len, - qbuffer, qbuffer_size); - - if (qstr == NULL) - continue; /* Ignore any entries with bogus texts */ - - if (fwrite(entry, sizeof(pgssEntry), 1, file) != 1 || - fwrite(qstr, 1, len + 1, file) != len + 1) - { - /* note: we assume hash_seq_term won't change errno */ - hash_seq_term(&hash_seq); - goto error; - } - } - - hash_seq_init(&hash_seq, pgss_agghash); - while ((aggentry = hash_seq_search(&hash_seq)) != NULL) - { - if (fwrite(aggentry, sizeof(pgssAggEntry), 1, file) != 1) - { - /* note: we assume hash_seq_term won't change errno */ - hash_seq_term(&hash_seq); - goto error; - } - } - - free(qbuffer); - qbuffer = NULL; - - free(qbuffer); - qbuffer = NULL; - - if (FreeFile(file)) - { - file = NULL; - goto error; - } - - /* - * Rename file into place, so we atomically replace any old one. - */ - (void) durable_rename(PGSS_DUMP_FILE ".tmp", PGSS_DUMP_FILE, LOG); - - /* Unlink query-texts file; it's not needed while shutdown */ - unlink(PGSS_TEXT_FILE); - - return; - -error: - ereport(LOG, - (errcode_for_file_access(), - errmsg("could not write pg_stat_monitor file \"%s\": %m", - PGSS_DUMP_FILE ".tmp"))); - if (qbuffer) - free(qbuffer); - if (file) - FreeFile(file); - unlink(PGSS_DUMP_FILE ".tmp"); - unlink(PGSS_TEXT_FILE); } + /* * Post-parse-analysis hook: mark query with a queryId */ @@ -890,6 +746,7 @@ static void pgss_post_parse_analyze(ParseState *pstate, Query *query) { pgssJumbleState jstate; + char tables_name[MAX_REL_LEN] = {0}; if (prev_post_parse_analyze_hook) prev_post_parse_analyze_hook(pstate, query); @@ -929,6 +786,42 @@ pgss_post_parse_analyze(ParseState *pstate, Query *query) query->queryId = DatumGetUInt64(hash_any_extended(jstate.jumble, jstate.jumble_len, 0)); + if (query->rtable) + { + ListCell *lc; + bool first = true; + foreach(lc, query->rtable) + { + RangeTblEntry *rte = lfirst_node(RangeTblEntry, lc); + if (rte->rtekind == RTE_RELATION) + { + char *relname = get_rel_name(rte->relid); + char *relspacename = get_namespace_name(get_rel_namespace(rte->relid)); + if (relname) + { + if (first) + { + if (relspacename) + snprintf(tables_name, MAX_REL_LEN, "%s.%s", relspacename, relname); + else + snprintf(tables_name, MAX_REL_LEN, "%s", relname); + first = false; + } + else + { + if (relspacename) + snprintf(tables_name, MAX_REL_LEN, "%s,%s.%s", tables_name, relspacename, relname); + else + snprintf(tables_name, MAX_REL_LEN, "%s,%s", tables_name, relname); + } + } + } + } + LWLockAcquire(pgss->lock, LW_EXCLUSIVE); + add_object_entry(query->queryId, tables_name); + LWLockRelease(pgss->lock); + } + /* * If we are unlucky enough to get a hash of zero, use 1 instead, to * prevent confusion with the utility-statement case. @@ -936,13 +829,6 @@ pgss_post_parse_analyze(ParseState *pstate, Query *query) if (query->queryId == UINT64CONST(0)) query->queryId = UINT64CONST(1); - /* - * If we were able to identify any ignorable constants, we immediately - * create a hash table entry for the query, so that we can record the - * normalized form of the query string. If there were no such constants, - * the normalized string would be the same as the query text anyway, so - * there's no need for an early entry. - */ if (jstate.clocations_count > 0) pgss_store(pstate->p_sourcetext, query->queryId, @@ -1068,7 +954,7 @@ pgss_ExecutorEnd(QueryDesc *queryDesc) &queryDesc->totaltime->bufusage, utime, stime, - NULL); + NULL); } if (prev_ExecutorEnd) @@ -1210,12 +1096,12 @@ pgss_hash_string(const char *str, int len) } static uint -pg_get_client_addr() +pg_get_client_addr(void) { char remote_host[NI_MAXHOST]; - int num_backends = pgstat_fetch_stat_numbackends(); - int ret; - int i; + int num_backends = pgstat_fetch_stat_numbackends(); + int ret; + int i; memset(remote_host, 0x0, NI_MAXHOST); for (i = 1; i <= num_backends; i++) @@ -1262,19 +1148,18 @@ pgss_store(const char *query, uint64 queryId, const BufferUsage *bufusage, float utime, float stime, pgssJumbleState *jstate) { - pgssHashKey key; - pgssEntry *entry; - char *norm_query = NULL; - double old_mean; - int encoding = GetDatabaseEncoding(); - time_t t = time(NULL); - struct tm tm = *localtime(&t); - int current_hour = tm.tm_hour; + pgssHashKey key; + pgssEntry *entry; + char *norm_query = NULL; + int encoding = GetDatabaseEncoding(); + bool reset = false; + int i; + char tables_name[MAX_REL_LEN] = {0}; Assert(query != NULL); /* Safety check... */ - if (!pgss || !pgss_hash) + if (!pgss || !pgss_hash || !pgss_qbuf[pgss->current_wbucket]) return; /* @@ -1315,24 +1200,42 @@ pgss_store(const char *query, uint64 queryId, if (queryId == UINT64CONST(0)) queryId = pgss_hash_string(query, query_len); + + { + pgssObjectHashKey key; + pgssObjectEntry *entry; + + key.queryid = queryId; + + LWLockAcquire(pgss->lock, LW_SHARED); + entry = (pgssObjectEntry *) hash_search(pgss_object_hash, &key, HASH_FIND, NULL); + if (entry != NULL) + { + LWLockRelease(pgss->lock); + LWLockAcquire(pgss->lock, LW_EXCLUSIVE); + snprintf(tables_name, MAX_REL_LEN, "%s", entry->tables_name); + hash_search(pgss_object_hash, &entry->key, HASH_REMOVE, NULL); + } + LWLockRelease(pgss->lock); + } + /* Set up key for hashtable search */ key.userid = GetUserId(); key.dbid = MyDatabaseId; key.queryid = queryId; + key.bucket_id = get_next_wbucket(pgss); + + if (key.bucket_id != pgss->current_wbucket) + { + reset = true; + pgss->current_wbucket = key.bucket_id; + } /* Lookup the hash table entry with shared lock. */ LWLockAcquire(pgss->lock, LW_SHARED); - entry = (pgssEntry *) hash_search(pgss_hash, &key, HASH_FIND, NULL); - - /* Create new entry, if not present */ - if (!entry) + if(!entry) { - Size query_offset; - int gc_count; - bool stored; - bool do_gc; - /* * Create a new, normalized query string if caller asked. We don't * need to hold the lock while doing this work. (Note: in any case, @@ -1350,83 +1253,52 @@ pgss_store(const char *query, uint64 queryId, LWLockAcquire(pgss->lock, LW_SHARED); } - /* Append new query text to file with only shared lock held */ - stored = qtext_store(norm_query ? norm_query : query, query_len, - &query_offset, &gc_count); - - /* - * Determine whether we need to garbage collect external query texts - * while the shared lock is still held. This micro-optimization - * avoids taking the time to decide this while holding exclusive lock. - */ - do_gc = need_gc_qtexts(); - - /* Need exclusive lock to make a new hashtable entry - promote */ LWLockRelease(pgss->lock); LWLockAcquire(pgss->lock, LW_EXCLUSIVE); - /* - * A garbage collection may have occurred while we weren't holding the - * lock. In the unlikely event that this happens, the query text we - * stored above will have been garbage collected, so write it again. - * This should be infrequent enough that doing it while holding - * exclusive lock isn't a performance problem. - */ - if (!stored || pgss->gc_count != gc_count) - stored = qtext_store(norm_query ? norm_query : query, query_len, - &query_offset, NULL); - - /* If we failed to write to the text file, give up */ - if (!stored) - goto done; - /* OK to create a new hashtable entry */ - entry = entry_alloc(&key, query_offset, query_len, encoding, - jstate != NULL); - - /* If needed, perform garbage collection while exclusive lock held */ - if (do_gc) - gc_qtexts(); + entry = entry_alloc(pgss, &key, 0, query_len, encoding, jstate != NULL); + if (entry == NULL) + goto exit; } - /* Increment the counts, except when jstate is not NULL */ - if (!jstate) + if (pgss_normalized_query) + store_query(queryId, norm_query ? norm_query : query, query_len); + else + store_query(queryId, query, query_len); + + /* + * Grab the spinlock while updating the counters (see comment about + * locking rules at the head of the file) + */ { - int i; - /* - * Grab the spinlock while updating the counters (see comment about - * locking rules at the head of the file) - */ volatile pgssEntry *e = (volatile pgssEntry *) entry; + /* Increment the counts, except when jstate is not NULL */ + if (!jstate) + { - SpinLockAcquire(&e->mutex); + SpinLockAcquire(&e->mutex); - /* Calculate the agregates for database/user and host */ - update_agg_counters(key.queryid, key.dbid, 0); - update_agg_counters(key.queryid, key.userid, 1); - update_agg_counters(key.queryid, pg_get_client_addr(), 2); + /* Start collecting data for next bucket and reset all counters */ + if (reset) + memset(&entry->counters, 0, sizeof(Counters)); + + /* Calculate the agregates for database/user and host */ + update_agg_counters(entry->key.bucket_id, key.queryid, key.dbid, AGG_KEY_DATABASE); + update_agg_counters(entry->key.bucket_id, key.queryid, key.userid, AGG_KEY_USER); + update_agg_counters(entry->key.bucket_id, key.queryid, pg_get_client_addr(), AGG_KEY_HOST); /* "Unstick" entry if it was previously sticky */ - if (e->counters.calls == 0) - e->counters.usage = USAGE_INIT; + if (e->counters.calls.calls == 0) + e->counters.calls.usage = USAGE_INIT; - e->counters.calls += 1; - e->counters.total_time += total_time; - if (e->counters.calls == 1) + e->counters.calls.calls += 1; + e->counters.time.total_time += total_time; + if (e->counters.calls.calls == 1) { - int i; - e->counters.min_time = total_time; - e->counters.max_time = total_time; - e->counters.mean_time = total_time; - for (i = 0; i < 24; i++) - { - e->counters.hist_min_time[i] = -1;; - e->counters.hist_max_time[i] = -1; - e->counters.hist_mean_time[i] = -1; - } - e->counters.hist_min_time[current_hour] = total_time; - e->counters.hist_max_time[current_hour] = total_time; - e->counters.hist_mean_time[current_hour] = total_time; + e->counters.time.min_time = total_time; + e->counters.time.max_time = total_time; + e->counters.time.mean_time = total_time; } else { @@ -1434,57 +1306,55 @@ pgss_store(const char *query, uint64 queryId, * Welford's method for accurately computing variance. See * */ - double old_mean = e->counters.mean_time; + double old_mean = e->counters.time.mean_time; - e->counters.mean_time += - (total_time - old_mean) / e->counters.calls; - e->counters.sum_var_time += - (total_time - old_mean) * (total_time - e->counters.mean_time); + e->counters.time.mean_time += + (total_time - old_mean) / e->counters.calls.calls; + e->counters.time.sum_var_time += + (total_time - old_mean) * (total_time - e->counters.time.mean_time); /* calculate min and max time */ - if (e->counters.min_time > total_time) - e->counters.min_time = total_time; - if (e->counters.max_time < total_time) - e->counters.max_time = total_time; + if (e->counters.time.min_time > total_time) + e->counters.time.min_time = total_time; + if (e->counters.time.max_time < total_time) + e->counters.time.max_time = total_time; } - e->counters.rows += rows; - e->counters.shared_blks_hit += bufusage->shared_blks_hit; - e->counters.shared_blks_read += bufusage->shared_blks_read; - e->counters.shared_blks_dirtied += bufusage->shared_blks_dirtied; - e->counters.shared_blks_written += bufusage->shared_blks_written; - e->counters.local_blks_hit += bufusage->local_blks_hit; - e->counters.local_blks_read += bufusage->local_blks_read; - e->counters.local_blks_dirtied += bufusage->local_blks_dirtied; - e->counters.local_blks_written += bufusage->local_blks_written; - e->counters.temp_blks_read += bufusage->temp_blks_read; - e->counters.temp_blks_written += bufusage->temp_blks_written; - e->counters.blk_read_time += INSTR_TIME_GET_MILLISEC(bufusage->blk_read_time); - e->counters.blk_write_time += INSTR_TIME_GET_MILLISEC(bufusage->blk_write_time); - e->counters.usage += USAGE_EXEC(total_time); - e->counters.host = pg_get_client_addr(); - { - e->counters.hist_calls[current_hour] += 1; - if (total_time < e->counters.hist_min_time[current_hour]) - e->counters.hist_min_time[current_hour] = USAGE_EXEC(total_time); - if (total_time > e->counters.hist_min_time[current_hour]) - e->counters.hist_max_time[current_hour] = USAGE_EXEC(total_time); - old_mean = e->counters.hist_mean_time[current_hour]; - e->counters.hist_mean_time[current_hour] += - (total_time - old_mean) / e->counters.hist_calls[current_hour]; - } - if (total_time >= e->counters.max_time) + for (i = 0; i < MAX_RESPONSE_BUCKET - 1; i++) { - for(i = 0; i < MAX_QUERY_LEN - 1; i++) - e->counters.slow_query[i] = query[i]; + if (total_time < respose_time_lower_bound + (respose_time_step * i)) + { + pgssBucketEntries[entry->key.bucket_id]->counters.resp_calls[i]++; + break; + } } - e->counters.slow_query[MAX_QUERY_LEN - 1] = 0; - e->counters.utime = utime; - e->counters.stime = stime; + if (total_time > respose_time_lower_bound + (respose_time_step * MAX_RESPONSE_BUCKET)) + pgssBucketEntries[entry->key.bucket_id]->counters.resp_calls[MAX_RESPONSE_BUCKET - 1]++; + + e->counters.calls.rows += rows; + e->counters.blocks.shared_blks_hit += bufusage->shared_blks_hit; + e->counters.blocks.shared_blks_read += bufusage->shared_blks_read; + e->counters.blocks.shared_blks_dirtied += bufusage->shared_blks_dirtied; + e->counters.blocks.shared_blks_written += bufusage->shared_blks_written; + e->counters.blocks.local_blks_hit += bufusage->local_blks_hit; + e->counters.blocks.local_blks_read += bufusage->local_blks_read; + e->counters.blocks.local_blks_dirtied += bufusage->local_blks_dirtied; + e->counters.blocks.local_blks_written += bufusage->local_blks_written; + e->counters.blocks.temp_blks_read += bufusage->temp_blks_read; + e->counters.blocks.temp_blks_written += bufusage->temp_blks_written; + e->counters.blocks.blk_read_time += INSTR_TIME_GET_MILLISEC(bufusage->blk_read_time); + e->counters.blocks.blk_write_time += INSTR_TIME_GET_MILLISEC(bufusage->blk_write_time); + e->counters.calls.usage += USAGE_EXEC(total_time); + e->counters.info.host = pg_get_client_addr(); + e->counters.sysinfo.utime = utime; + e->counters.sysinfo.stime = stime; + for(i = 0; i < MAX_REL_LEN - 1; i++) + e->counters.info.tables_name[i] = tables_name[i]; SpinLockRelease(&e->mutex); + } } -done: +exit: LWLockRelease(pgss->lock); /* We postpone this clean-up until we're out of the lock */ @@ -1498,103 +1368,62 @@ done: Datum pg_stat_monitor_reset(PG_FUNCTION_ARGS) { - if (!pgss || !pgss_hash || !pgss_agghash) + if (!pgss || !pgss_hash || !pgss_agghash || !pgss_buckethash) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("pg_stat_monitor must be loaded via shared_preload_libraries"))); - entry_reset(); + errmsg("pg_stat_monitor: must be loaded via shared_preload_libraries"))); + entry_reset(-1); PG_RETURN_VOID(); } -/* Number of output arguments (columns) for various API versions */ -#define PG_STAT_STATEMENTS_COLS_V1_0 14 -#define PG_STAT_STATEMENTS_COLS_V1_1 18 -#define PG_STAT_STATEMENTS_COLS_V1_2 19 -#define PG_STAT_STATEMENTS_COLS_V1_3 23 -#define PG_STAT_STATEMENTS_COLS_V1_3_EXTENED_1 31 -#define PG_STAT_STATEMENTS_COLS 31 /* maximum of above */ +#define PG_STAT_STATEMENTS_COLS 31 /* maximum of above */ -/* - * Retrieve statement statistics. - * - * The SQL API of this function has changed multiple times, and will likely - * do so again in future. To support the case where a newer version of this - * loadable module is being used with an old SQL declaration of the function, - * we continue to support the older API versions. For 1.2 and later, the - * expected API version is identified by embedding it in the C name of the - * function. Unfortunately we weren't bright enough to do that for 1.1. - */ -Datum -pg_stat_monitor_1_3(PG_FUNCTION_ARGS) -{ - bool showtext = PG_GETARG_BOOL(0); - - pg_stat_monitor_internal(fcinfo, PGSS_V1_3, showtext); - - return (Datum) 0; -} - -Datum -pg_stat_monitor_1_2(PG_FUNCTION_ARGS) -{ - bool showtext = PG_GETARG_BOOL(0); - - pg_stat_monitor_internal(fcinfo, PGSS_V1_2, showtext); - - return (Datum) 0; -} - -/* - * Legacy entry point for pg_stat_statements() API versions 1.0 and 1.1. - * This can be removed someday, perhaps. - */ Datum pg_stat_monitor(PG_FUNCTION_ARGS) { /* If it's really API 1.1, we'll figure that out below */ - pg_stat_monitor_internal(fcinfo, PGSS_V1_0, true); - + pg_stat_monitor_internal(fcinfo, true); return (Datum) 0; } + /* Common code for all versions of pg_stat_statements() */ static void pg_stat_monitor_internal(FunctionCallInfo fcinfo, - pgssVersion api_version, - bool showtext) + bool showtext) { - ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; - TupleDesc tupdesc; + ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; + TupleDesc tupdesc; Tuplestorestate *tupstore; - MemoryContext per_query_ctx; - MemoryContext oldcontext; - Oid userid = GetUserId(); - bool is_allowed_role = false; - char *qbuffer = NULL; - Size qbuffer_size = 0; - Size extent = 0; - int gc_count = 0; + MemoryContext per_query_ctx; + MemoryContext oldcontext; + Oid userid = GetUserId(); + bool is_allowed_role = false; HASH_SEQ_STATUS hash_seq; - pgssEntry *entry; + pgssEntry *entry; + char *query_txt; + char queryid_txt[64]; + + query_txt = (char*) malloc(pgss_query_max_len); /* Superusers or members of pg_read_all_stats members are allowed */ is_allowed_role = is_member_of_role(GetUserId(), DEFAULT_ROLE_READ_ALL_STATS); /* hash table must exist already */ - if (!pgss || !pgss_hash) + if (!pgss || !pgss_hash || !pgss_object_hash) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("pg_stat_monitor must be loaded via shared_preload_libraries"))); + errmsg("pg_stat_monitor: must be loaded via shared_preload_libraries"))); /* check to see if caller supports us returning a tuplestore */ if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo)) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("set-valued function called in context that cannot accept a set"))); + errmsg("pg_stat_monitor: set-valued function called in context that cannot accept a set"))); if (!(rsinfo->allowedModes & SFRM_Materialize)) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("materialize mode required, but it is not " \ + errmsg("pg_stat_monitor: materialize mode required, but it is not " \ "allowed in this context"))); /* Switch into long-lived context to construct returned data structures */ @@ -1603,41 +1432,7 @@ pg_stat_monitor_internal(FunctionCallInfo fcinfo, /* Build a tuple descriptor for our result type */ if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE) - elog(ERROR, "return type must be a row type"); - - /* - * Check we have the expected number of output arguments. Aside from - * being a good safety check, we need a kluge here to detect API version - * 1.1, which was wedged into the code in an ill-considered way. - */ - switch (tupdesc->natts) - { - case PG_STAT_STATEMENTS_COLS_V1_0: - if (api_version != PGSS_V1_0) - elog(ERROR, "incorrect number of output arguments"); - break; - case PG_STAT_STATEMENTS_COLS_V1_1: - /* pg_stat_statements() should have told us 1.0 */ - if (api_version != PGSS_V1_0) - elog(ERROR, "incorrect number of output arguments"); - api_version = PGSS_V1_1; - break; - case PG_STAT_STATEMENTS_COLS_V1_2: - if (api_version != PGSS_V1_2) - elog(ERROR, "incorrect number of output arguments"); - break; - case PG_STAT_STATEMENTS_COLS_V1_3: - if (api_version != PGSS_V1_3) - elog(ERROR, "incorrect number of output arguments"); - break; - case PG_STAT_STATEMENTS_COLS_V1_3_EXTENED_1: - if (api_version != PGSS_V1_3) - elog(ERROR, "incorrect number of output arguments %d", api_version); - api_version = PGSS_V1_3_EXTENDED_1; - break; - default: - elog(ERROR, "incorrect number of output arguments"); - } + elog(ERROR, "pg_stat_monitor: return type must be a row type"); tupstore = tuplestore_begin_heap(true, false, work_mem); rsinfo->returnMode = SFRM_Materialize; @@ -1646,71 +1441,8 @@ pg_stat_monitor_internal(FunctionCallInfo fcinfo, MemoryContextSwitchTo(oldcontext); - /* - * We'd like to load the query text file (if needed) while not holding any - * lock on pgss->lock. In the worst case we'll have to do this again - * after we have the lock, but it's unlikely enough to make this a win - * despite occasional duplicated work. We need to reload if anybody - * writes to the file (either a retail qtext_store(), or a garbage - * collection) between this point and where we've gotten shared lock. If - * a qtext_store is actually in progress when we look, we might as well - * skip the speculative load entirely. - */ - if (showtext) - { - int n_writers; - - /* Take the mutex so we can examine variables */ - { - volatile pgssSharedState *s = (volatile pgssSharedState *) pgss; - - SpinLockAcquire(&s->mutex); - extent = s->extent; - n_writers = s->n_writers; - gc_count = s->gc_count; - SpinLockRelease(&s->mutex); - } - - /* No point in loading file now if there are active writers */ - if (n_writers == 0) - qbuffer = qtext_load_file(&qbuffer_size); - } - - /* - * Get shared lock, load or reload the query text file if we must, and - * iterate over the hashtable entries. - * - * With a large hash table, we might be holding the lock rather longer - * than one could wish. However, this only blocks creation of new hash - * table entries, and the larger the hash table the less likely that is to - * be needed. So we can hope this is okay. Perhaps someday we'll decide - * we need to partition the hash table to limit the time spent holding any - * one lock. - */ LWLockAcquire(pgss->lock, LW_SHARED); - if (showtext) - { - /* - * Here it is safe to examine extent and gc_count without taking the - * mutex. Note that although other processes might change - * pgss->extent just after we look at it, the strings they then write - * into the file cannot yet be referenced in the hashtable, so we - * don't care whether we see them or not. - * - * If qtext_load_file fails, we just press on; we'll return NULL for - * every query text. - */ - if (qbuffer == NULL || - pgss->extent != extent || - pgss->gc_count != gc_count) - { - if (qbuffer) - free(qbuffer); - qbuffer = qtext_load_file(&qbuffer_size); - } - } - hash_seq_init(&hash_seq, pgss_hash); while ((entry = hash_seq_search(&hash_seq)) != NULL) { @@ -1724,41 +1456,33 @@ pg_stat_monitor_internal(FunctionCallInfo fcinfo, memset(values, 0, sizeof(values)); memset(nulls, 0, sizeof(nulls)); + if(locate_query(entry->key.bucket_id, queryid, query_txt) == 0) + sprintf(query_txt, "%s", ""); + + sprintf(queryid_txt, "%08lX", queryid); + + values[i++] = ObjectIdGetDatum(entry->key.bucket_id); values[i++] = ObjectIdGetDatum(entry->key.userid); values[i++] = ObjectIdGetDatum(entry->key.dbid); - + /* copy counters to a local variable to keep locking time short */ + { + volatile pgssEntry *e = (volatile pgssEntry *) entry; + SpinLockAcquire(&e->mutex); + tmp = e->counters; + SpinLockRelease(&e->mutex); + } if (is_allowed_role || entry->key.userid == userid) { - if (api_version >= PGSS_V1_2) - values[i++] = Int64GetDatumFast(queryid); - + values[i++] = CStringGetTextDatum(queryid_txt); if (showtext) { - char *qstr = qtext_fetch(entry->query_offset, - entry->query_len, - qbuffer, - qbuffer_size); - - if (qstr) - { - char *enc; - - enc = pg_any_to_server(qstr, - entry->query_len, - entry->encoding); - + char *enc; + enc = pg_any_to_server(query_txt, strlen(query_txt), entry->encoding); values[i++] = CStringGetTextDatum(enc); - - if (enc != qstr) + if (enc != query_txt) pfree(enc); - } - else - { - /* Just return a null if we fail to find the text */ - nulls[i++] = true; - } } - else + else { /* Query text not requested */ nulls[i++] = true; @@ -1766,13 +1490,9 @@ pg_stat_monitor_internal(FunctionCallInfo fcinfo, } else { - /* Don't show queryid */ - if (api_version >= PGSS_V1_2) - nulls[i++] = true; - /* * Don't show query text, but hint as to the reason for not doing - * so if it was requested + * so if it was requested */ if (showtext) values[i++] = CStringGetTextDatum(""); @@ -1780,82 +1500,57 @@ pg_stat_monitor_internal(FunctionCallInfo fcinfo, nulls[i++] = true; } - /* copy counters to a local variable to keep locking time short */ - { - volatile pgssEntry *e = (volatile pgssEntry *) entry; - - SpinLockAcquire(&e->mutex); - tmp = e->counters; - SpinLockRelease(&e->mutex); - } - /* Skip entry if unexecuted (ie, it's a pending "sticky" entry) */ - if (tmp.calls == 0) + if (tmp.calls.calls == 0) continue; - values[i++] = Int64GetDatumFast(tmp.calls); - values[i++] = Float8GetDatumFast(tmp.total_time); - if (api_version >= PGSS_V1_3) - { - values[i++] = Float8GetDatumFast(tmp.min_time); - values[i++] = Float8GetDatumFast(tmp.max_time); - values[i++] = Float8GetDatumFast(tmp.mean_time); + values[i++] = TimestampGetDatum(pgssBucketEntries[entry->key.bucket_id]->counters.current_time); + values[i++] = Int64GetDatumFast(tmp.calls.calls); + values[i++] = Float8GetDatumFast(tmp.time.total_time); + values[i++] = Float8GetDatumFast(tmp.time.min_time); + values[i++] = Float8GetDatumFast(tmp.time.max_time); + values[i++] = Float8GetDatumFast(tmp.time.mean_time); - /* - * Note we are calculating the population variance here, not the - * sample variance, as we have data for the whole population, so - * Bessel's correction is not used, and we don't divide by - * tmp.calls - 1. - */ - if (tmp.calls > 1) - stddev = sqrt(tmp.sum_var_time / tmp.calls); - else - stddev = 0.0; - values[i++] = Float8GetDatumFast(stddev); - } - values[i++] = Int64GetDatumFast(tmp.rows); - values[i++] = Int64GetDatumFast(tmp.shared_blks_hit); - values[i++] = Int64GetDatumFast(tmp.shared_blks_read); - if (api_version >= PGSS_V1_1) - values[i++] = Int64GetDatumFast(tmp.shared_blks_dirtied); - values[i++] = Int64GetDatumFast(tmp.shared_blks_written); - values[i++] = Int64GetDatumFast(tmp.local_blks_hit); - values[i++] = Int64GetDatumFast(tmp.local_blks_read); - if (api_version >= PGSS_V1_1) - values[i++] = Int64GetDatumFast(tmp.local_blks_dirtied); - values[i++] = Int64GetDatumFast(tmp.local_blks_written); - values[i++] = Int64GetDatumFast(tmp.temp_blks_read); - values[i++] = Int64GetDatumFast(tmp.temp_blks_written); - if (api_version >= PGSS_V1_1) - { - values[i++] = Float8GetDatumFast(tmp.blk_read_time); - values[i++] = Float8GetDatumFast(tmp.blk_write_time); - } - values[i++] = Int64GetDatumFast(tmp.host); - values[i++] = ArrayGetTextDatum(tmp.hist_calls); - values[i++] = ArrayGetTextDatum(tmp.hist_min_time); - values[i++] = ArrayGetTextDatum(tmp.hist_max_time); - values[i++] = ArrayGetTextDatum(tmp.hist_mean_time); - values[i++] = CStringGetTextDatum(tmp.slow_query); - values[i++] = Float8GetDatumFast(tmp.utime); - values[i++] = Float8GetDatumFast(tmp.stime); - - Assert(i == (api_version == PGSS_V1_0 ? PG_STAT_STATEMENTS_COLS_V1_0 : - api_version == PGSS_V1_1 ? PG_STAT_STATEMENTS_COLS_V1_1 : - api_version == PGSS_V1_2 ? PG_STAT_STATEMENTS_COLS_V1_2 : - api_version == PGSS_V1_3 ? PG_STAT_STATEMENTS_COLS_V1_3 : - api_version == PGSS_V1_3_EXTENDED_1 ? PG_STAT_STATEMENTS_COLS_V1_3_EXTENED_1 : - -1 /* fail if you forget to update this assert */ )); + /* + * Note we are calculating the population variance here, not the + * sample variance, as we have data for the whole population, so + * Bessel's correction is not used, and we don't divide by + * tmp.calls - 1. + */ + if (tmp.calls.calls > 1) + stddev = sqrt(tmp.time.sum_var_time / tmp.calls.calls); + else + stddev = 0.0; + values[i++] = Float8GetDatumFast(stddev); + values[i++] = Int64GetDatumFast(tmp.calls.rows); + values[i++] = Int64GetDatumFast(tmp.blocks.shared_blks_hit); + values[i++] = Int64GetDatumFast(tmp.blocks.shared_blks_read); + values[i++] = Int64GetDatumFast(tmp.blocks.shared_blks_dirtied); + values[i++] = Int64GetDatumFast(tmp.blocks.shared_blks_written); + values[i++] = Int64GetDatumFast(tmp.blocks.local_blks_hit); + values[i++] = Int64GetDatumFast(tmp.blocks.local_blks_read); + values[i++] = Int64GetDatumFast(tmp.blocks.local_blks_dirtied); + values[i++] = Int64GetDatumFast(tmp.blocks.local_blks_written); + values[i++] = Int64GetDatumFast(tmp.blocks.temp_blks_read); + values[i++] = Int64GetDatumFast(tmp.blocks.temp_blks_written); + values[i++] = Float8GetDatumFast(tmp.blocks.blk_read_time); + values[i++] = Float8GetDatumFast(tmp.blocks.blk_write_time); + values[i++] = Int64GetDatumFast(tmp.info.host); + values[i++] = ArrayGetTextDatum(pgssBucketEntries[entry->key.bucket_id]->counters.resp_calls); + values[i++] = Float8GetDatumFast(tmp.sysinfo.utime); + values[i++] = Float8GetDatumFast(tmp.sysinfo.stime); + if (strlen(tmp.info.tables_name) == 0) + nulls[i++] = true; + else + values[i++] = CStringGetTextDatum(tmp.info.tables_name); tuplestore_putvalues(tupstore, tupdesc, values, nulls); } + free(query_txt); /* clean up and return the tuplestore */ LWLockRelease(pgss->lock); - if (qbuffer) - free(qbuffer); - tuplestore_donestoring(tupstore); } @@ -1865,7 +1560,7 @@ pg_stat_monitor_internal(FunctionCallInfo fcinfo, static Size pgss_memsize(void) { - Size size; + Size size; size = MAXALIGN(sizeof(pgssSharedState)); size = add_size(size, hash_estimate_size(pgss_max, sizeof(pgssEntry))); @@ -1891,53 +1586,69 @@ pgss_memsize(void) * have made the entry while we waited to get exclusive lock. */ static pgssEntry * -entry_alloc(pgssHashKey *key, Size query_offset, int query_len, int encoding, +entry_alloc(pgssSharedState *pgss, pgssHashKey *key, Size query_offset, int query_len, int encoding, bool sticky) { - pgssEntry *entry; - bool found; + pgssEntry *entry = NULL; + bool found = false; - /* Make space if needed */ - while (hash_get_num_entries(pgss_hash) >= pgss_max) - entry_dealloc(); + if (pgss->bucket_entry[pgss->current_wbucket] >= max_bucket_size) + { + pgss->bucket_overflow[pgss->current_wbucket]++; + return NULL; + } + + if (hash_get_num_entries(pgss_hash) >= pgss_max) + return NULL; /* Find or create an entry with desired hash code */ entry = (pgssEntry *) hash_search(pgss_hash, key, HASH_ENTER, &found); if (!found) { + pgss->bucket_entry[pgss->current_wbucket]++; /* New entry, initialize it */ /* reset the statistics */ memset(&entry->counters, 0, sizeof(Counters)); /* set the appropriate initial usage count */ - entry->counters.usage = sticky ? pgss->cur_median_usage : USAGE_INIT; + entry->counters.calls.usage = sticky ? pgss->cur_median_usage : USAGE_INIT; /* re-initialize the mutex each time ... we assume no one using it */ SpinLockInit(&entry->mutex); /* ... and don't forget the query text metadata */ - Assert(query_len >= 0); - entry->query_offset = query_offset; - entry->query_len = query_len; entry->encoding = encoding; } - return entry; } -/* - * qsort comparator for sorting into increasing usage order - */ -static int -entry_cmp(const void *lhs, const void *rhs) +static uint64 +get_next_wbucket(pgssSharedState *pgss) { - double l_usage = (*(pgssEntry *const *) lhs)->counters.usage; - double r_usage = (*(pgssEntry *const *) rhs)->counters.usage; + struct timeval tv; + unsigned long current_usec; + uint64 bucket_id; - if (l_usage < r_usage) - return -1; - else if (l_usage > r_usage) - return +1; - else - return 0; + gettimeofday(&tv,NULL); + current_usec = tv.tv_sec; + + if ((current_usec - pgss->prev_bucket_usec) > max_bucket_time) + { + bucket_id = pgss->current_wbucket + 1; + if (bucket_id == max_buckets) + bucket_id = 0; + + LWLockAcquire(pgss->lock, LW_EXCLUSIVE); + entry_dealloc(bucket_id); + /* reset the query buffer */ + pgss->query_fifo[bucket_id].head = 0; + pgss->query_fifo[bucket_id].tail = 0; + LWLockRelease(pgss->lock); + + pgss->prev_bucket_usec = current_usec; + + pgssBucketEntries[bucket_id]->counters.current_time = GetCurrentTimestamp(); + return bucket_id; + } + return pgss->current_wbucket; } /* @@ -1946,572 +1657,70 @@ entry_cmp(const void *lhs, const void *rhs) * Caller must hold an exclusive lock on pgss->lock. */ static void -entry_dealloc(void) +entry_dealloc(int bucket) { HASH_SEQ_STATUS hash_seq; - pgssEntry **entries; - pgssEntry *entry; - int nvictims; - int i; - Size tottextlen; - int nvalidtexts; + HASH_SEQ_STATUS hash_dbseq; + pgssEntry *entry; + pgssAggEntry *agg_entry; + pgssEntry **entries; + pgssAggEntry **agg_entries; + int i; + int nvictims = 0; - /* - * Sort entries by usage and deallocate USAGE_DEALLOC_PERCENT of them. - * While we're scanning the table, apply the decay factor to the usage - * values, and update the mean query length. - * - * Note that the mean query length is almost immediately obsolete, since - * we compute it before not after discarding the least-used entries. - * Hopefully, that doesn't affect the mean too much; it doesn't seem worth - * making two passes to get a more current result. Likewise, the new - * cur_median_usage includes the entries we're about to zap. - */ + pgss->bucket_entry[bucket] = 0; entries = palloc(hash_get_num_entries(pgss_hash) * sizeof(pgssEntry *)); - - i = 0; - tottextlen = 0; - nvalidtexts = 0; - hash_seq_init(&hash_seq, pgss_hash); while ((entry = hash_seq_search(&hash_seq)) != NULL) { - entries[i++] = entry; - /* "Sticky" entries get a different usage decay rate. */ - if (entry->counters.calls == 0) - entry->counters.usage *= STICKY_DECREASE_FACTOR; - else - entry->counters.usage *= USAGE_DECREASE_FACTOR; - /* In the mean length computation, ignore dropped texts. */ - if (entry->query_len >= 0) - { - tottextlen += entry->query_len + 1; - nvalidtexts++; - } + if (entry->key.bucket_id == bucket || bucket < 0) + entries[nvictims++] = entry; } - /* Sort into increasing order by usage */ - qsort(entries, i, sizeof(pgssEntry *), entry_cmp); - - /* Record the (approximate) median usage */ - if (i > 0) - pgss->cur_median_usage = entries[i / 2]->counters.usage; - /* Record the mean query length */ - if (nvalidtexts > 0) - pgss->mean_query_len = tottextlen / nvalidtexts; - else - pgss->mean_query_len = ASSUMED_LENGTH_INIT; - - /* Now zap an appropriate fraction of lowest-usage entries */ - nvictims = Max(10, i * USAGE_DEALLOC_PERCENT / 100); - nvictims = Min(nvictims, i); - for (i = 0; i < nvictims; i++) + entry = hash_search(pgss_hash, &entries[i]->key, HASH_REMOVE, NULL); + + nvictims = 0; + agg_entries = palloc(hash_get_num_entries(pgss_agghash) * sizeof(pgssAggEntry *)); + hash_seq_init(&hash_dbseq, pgss_agghash); + while ((agg_entry = hash_seq_search(&hash_dbseq)) != NULL) { - hash_search(pgss_hash, &entries[i]->key, HASH_REMOVE, NULL); - hash_remove_agg(entries[i]->key.queryid); + if (agg_entry->key.bucket_id == bucket || bucket < 0) + agg_entries[nvictims++] = agg_entry; } + for (i = 0; i < nvictims; i++) + hash_search(pgss_agghash, &agg_entries[i]->key, HASH_REMOVE, NULL); + pfree(entries); - - -} - -static void -hash_remove_agg(uint64 queryid) -{ - HASH_SEQ_STATUS hash_seq; - pgssAggEntry *entry; - - hash_seq_init(&hash_seq, pgss_agghash); - while ((entry = hash_seq_search(&hash_seq)) != NULL) - { - if (entry->key.queryid == queryid) - hash_search(pgss_agghash, &entry->key, HASH_REMOVE, NULL); - } -} - - -/* - * Given a query string (not necessarily null-terminated), allocate a new - * entry in the external query text file and store the string there. - * - * If successful, returns true, and stores the new entry's offset in the file - * into *query_offset. Also, if gc_count isn't NULL, *gc_count is set to the - * number of garbage collections that have occurred so far. - * - * On failure, returns false. - * - * At least a shared lock on pgss->lock must be held by the caller, so as - * to prevent a concurrent garbage collection. Share-lock-holding callers - * should pass a gc_count pointer to obtain the number of garbage collections, - * so that they can recheck the count after obtaining exclusive lock to - * detect whether a garbage collection occurred (and removed this entry). - */ -static bool -qtext_store(const char *query, int query_len, - Size *query_offset, int *gc_count) -{ - Size off; - int fd; - - /* - * We use a spinlock to protect extent/n_writers/gc_count, so that - * multiple processes may execute this function concurrently. - */ - { - volatile pgssSharedState *s = (volatile pgssSharedState *) pgss; - - SpinLockAcquire(&s->mutex); - off = s->extent; - s->extent += query_len + 1; - s->n_writers++; - if (gc_count) - *gc_count = s->gc_count; - SpinLockRelease(&s->mutex); - } - - *query_offset = off; - - /* Now write the data into the successfully-reserved part of the file */ - fd = OpenTransientFile(PGSS_TEXT_FILE, O_RDWR | O_CREAT | PG_BINARY); - if (fd < 0) - goto error; - - if (lseek(fd, off, SEEK_SET) != off) - goto error; - - if (write(fd, query, query_len) != query_len) - goto error; - if (write(fd, "\0", 1) != 1) - goto error; - - CloseTransientFile(fd); - - /* Mark our write complete */ - { - volatile pgssSharedState *s = (volatile pgssSharedState *) pgss; - - SpinLockAcquire(&s->mutex); - s->n_writers--; - SpinLockRelease(&s->mutex); - } - - return true; - -error: - ereport(LOG, - (errcode_for_file_access(), - errmsg("could not write pg_stat_monitor file \"%s\": %m", - PGSS_TEXT_FILE))); - - if (fd >= 0) - CloseTransientFile(fd); - - /* Mark our write complete */ - { - volatile pgssSharedState *s = (volatile pgssSharedState *) pgss; - - SpinLockAcquire(&s->mutex); - s->n_writers--; - SpinLockRelease(&s->mutex); - } - - return false; -} - -/* - * Read the external query text file into a malloc'd buffer. - * - * Returns NULL (without throwing an error) if unable to read, eg - * file not there or insufficient memory. - * - * On success, the buffer size is also returned into *buffer_size. - * - * This can be called without any lock on pgss->lock, but in that case - * the caller is responsible for verifying that the result is sane. - */ -static char * -qtext_load_file(Size *buffer_size) -{ - char *buf; - int fd; - struct stat stat; - - fd = OpenTransientFile(PGSS_TEXT_FILE, O_RDONLY | PG_BINARY); - if (fd < 0) - { - if (errno != ENOENT) - ereport(LOG, - (errcode_for_file_access(), - errmsg("could not read pg_stat_monitor file \"%s\": %m", - PGSS_TEXT_FILE))); - return NULL; - } - - /* Get file length */ - if (fstat(fd, &stat)) - { - ereport(LOG, - (errcode_for_file_access(), - errmsg("could not stat pg_stat_monitor file \"%s\": %m", - PGSS_TEXT_FILE))); - CloseTransientFile(fd); - return NULL; - } - - /* Allocate buffer; beware that off_t might be wider than size_t */ - if (stat.st_size <= MaxAllocHugeSize) - buf = (char *) malloc(stat.st_size); - else - buf = NULL; - if (buf == NULL) - { - ereport(LOG, - (errcode(ERRCODE_OUT_OF_MEMORY), - errmsg("out of memory"), - errdetail("Could not allocate enough memory to read pg_stat_monitor file \"%s\".", - PGSS_TEXT_FILE))); - CloseTransientFile(fd); - return NULL; - } - - /* - * OK, slurp in the file. If we get a short read and errno doesn't get - * set, the reason is probably that garbage collection truncated the file - * since we did the fstat(), so we don't log a complaint --- but we don't - * return the data, either, since it's most likely corrupt due to - * concurrent writes from garbage collection. - */ - errno = 0; - if (read(fd, buf, stat.st_size) != stat.st_size) - { - if (errno) - ereport(LOG, - (errcode_for_file_access(), - errmsg("could not read pg_stat_monitor file \"%s\": %m", - PGSS_TEXT_FILE))); - free(buf); - CloseTransientFile(fd); - return NULL; - } - - CloseTransientFile(fd); - - *buffer_size = stat.st_size; - return buf; -} - -/* - * Locate a query text in the file image previously read by qtext_load_file(). - * - * We validate the given offset/length, and return NULL if bogus. Otherwise, - * the result points to a null-terminated string within the buffer. - */ -static char * -qtext_fetch(Size query_offset, int query_len, - char *buffer, Size buffer_size) -{ - /* File read failed? */ - if (buffer == NULL) - return NULL; - /* Bogus offset/length? */ - if (query_len < 0 || - query_offset + query_len >= buffer_size) - return NULL; - /* As a further sanity check, make sure there's a trailing null */ - if (buffer[query_offset + query_len] != '\0') - return NULL; - /* Looks OK */ - return buffer + query_offset; -} - -/* - * Do we need to garbage-collect the external query text file? - * - * Caller should hold at least a shared lock on pgss->lock. - */ -static bool -need_gc_qtexts(void) -{ - Size extent; - - /* Read shared extent pointer */ - { - volatile pgssSharedState *s = (volatile pgssSharedState *) pgss; - - SpinLockAcquire(&s->mutex); - extent = s->extent; - SpinLockRelease(&s->mutex); - } - - /* Don't proceed if file does not exceed 512 bytes per possible entry */ - if (extent < 512 * pgss_max) - return false; - - /* - * Don't proceed if file is less than about 50% bloat. Nothing can or - * should be done in the event of unusually large query texts accounting - * for file's large size. We go to the trouble of maintaining the mean - * query length in order to prevent garbage collection from thrashing - * uselessly. - */ - if (extent < pgss->mean_query_len * pgss_max * 2) - return false; - - return true; -} - -/* - * Garbage-collect orphaned query texts in external file. - * - * This won't be called often in the typical case, since it's likely that - * there won't be too much churn, and besides, a similar compaction process - * occurs when serializing to disk at shutdown or as part of resetting. - * Despite this, it seems prudent to plan for the edge case where the file - * becomes unreasonably large, with no other method of compaction likely to - * occur in the foreseeable future. - * - * The caller must hold an exclusive lock on pgss->lock. - * - * At the first sign of trouble we unlink the query text file to get a clean - * slate (although existing statistics are retained), rather than risk - * thrashing by allowing the same problem case to recur indefinitely. - */ -static void -gc_qtexts(void) -{ - char *qbuffer; - Size qbuffer_size; - FILE *qfile = NULL; - HASH_SEQ_STATUS hash_seq; - pgssEntry *entry; - Size extent; - int nentries; - - /* - * When called from pgss_store, some other session might have proceeded - * with garbage collection in the no-lock-held interim of lock strength - * escalation. Check once more that this is actually necessary. - */ - if (!need_gc_qtexts()) - return; - - /* - * Load the old texts file. If we fail (out of memory, for instance), - * invalidate query texts. Hopefully this is rare. It might seem better - * to leave things alone on an OOM failure, but the problem is that the - * file is only going to get bigger; hoping for a future non-OOM result is - * risky and can easily lead to complete denial of service. - */ - qbuffer = qtext_load_file(&qbuffer_size); - if (qbuffer == NULL) - goto gc_fail; - - /* - * We overwrite the query texts file in place, so as to reduce the risk of - * an out-of-disk-space failure. Since the file is guaranteed not to get - * larger, this should always work on traditional filesystems; though we - * could still lose on copy-on-write filesystems. - */ - qfile = AllocateFile(PGSS_TEXT_FILE, PG_BINARY_W); - if (qfile == NULL) - { - ereport(LOG, - (errcode_for_file_access(), - errmsg("could not write pg_stat_monitor file \"%s\": %m", - PGSS_TEXT_FILE))); - goto gc_fail; - } - - extent = 0; - nentries = 0; - - hash_seq_init(&hash_seq, pgss_hash); - while ((entry = hash_seq_search(&hash_seq)) != NULL) - { - int query_len = entry->query_len; - char *qry = qtext_fetch(entry->query_offset, - query_len, - qbuffer, - qbuffer_size); - - if (qry == NULL) - { - /* Trouble ... drop the text */ - entry->query_offset = 0; - entry->query_len = -1; - /* entry will not be counted in mean query length computation */ - continue; - } - - if (fwrite(qry, 1, query_len + 1, qfile) != query_len + 1) - { - ereport(LOG, - (errcode_for_file_access(), - errmsg("could not write pg_stat_monitor file \"%s\": %m", - PGSS_TEXT_FILE))); - hash_seq_term(&hash_seq); - goto gc_fail; - } - - entry->query_offset = extent; - extent += query_len + 1; - nentries++; - } - - /* - * Truncate away any now-unused space. If this fails for some odd reason, - * we log it, but there's no need to fail. - */ - if (ftruncate(fileno(qfile), extent) != 0) - ereport(LOG, - (errcode_for_file_access(), - errmsg("could not truncate pg_stat_monitor file \"%s\": %m", - PGSS_TEXT_FILE))); - - if (FreeFile(qfile)) - { - ereport(LOG, - (errcode_for_file_access(), - errmsg("could not write pg_stat_monitor file \"%s\": %m", - PGSS_TEXT_FILE))); - qfile = NULL; - goto gc_fail; - } - - elog(DEBUG1, "pgss gc of queries file shrunk size from %zu to %zu", - pgss->extent, extent); - - /* Reset the shared extent pointer */ - pgss->extent = extent; - - /* - * Also update the mean query length, to be sure that need_gc_qtexts() - * won't still think we have a problem. - */ - if (nentries > 0) - pgss->mean_query_len = extent / nentries; - else - pgss->mean_query_len = ASSUMED_LENGTH_INIT; - - free(qbuffer); - - /* - * OK, count a garbage collection cycle. (Note: even though we have - * exclusive lock on pgss->lock, we must take pgss->mutex for this, since - * other processes may examine gc_count while holding only the mutex. - * Also, we have to advance the count *after* we've rewritten the file, - * else other processes might not realize they read a stale file.) - */ - record_gc_qtexts(); - - return; - -gc_fail: - /* clean up resources */ - if (qfile) - FreeFile(qfile); - if (qbuffer) - free(qbuffer); - - /* - * Since the contents of the external file are now uncertain, mark all - * hashtable entries as having invalid texts. - */ - hash_seq_init(&hash_seq, pgss_hash); - while ((entry = hash_seq_search(&hash_seq)) != NULL) - { - entry->query_offset = 0; - entry->query_len = -1; - } - - /* - * Destroy the query text file and create a new, empty one - */ - (void) unlink(PGSS_TEXT_FILE); - qfile = AllocateFile(PGSS_TEXT_FILE, PG_BINARY_W); - if (qfile == NULL) - ereport(LOG, - (errcode_for_file_access(), - errmsg("could not write new pg_stat_monitor file \"%s\": %m", - PGSS_TEXT_FILE))); - else - FreeFile(qfile); - - /* Reset the shared extent pointer */ - pgss->extent = 0; - - /* Reset mean_query_len to match the new state */ - pgss->mean_query_len = ASSUMED_LENGTH_INIT; - - /* - * Bump the GC count even though we failed. - * - * This is needed to make concurrent readers of file without any lock on - * pgss->lock notice existence of new version of file. Once readers - * subsequently observe a change in GC count with pgss->lock held, that - * forces a safe reopen of file. Writers also require that we bump here, - * of course. (As required by locking protocol, readers and writers don't - * trust earlier file contents until gc_count is found unchanged after - * pgss->lock acquired in shared or exclusive mode respectively.) - */ - record_gc_qtexts(); + pfree(agg_entries); } /* * Release all entries. */ static void -entry_reset(void) +entry_reset(int bucket) { - HASH_SEQ_STATUS hash_seq; - pgssEntry *entry; - pgssAggEntry *dbentry; - FILE *qfile; + HASH_SEQ_STATUS hash_seq; + pgssEntry *entry; + pgssAggEntry *dbentry; LWLockAcquire(pgss->lock, LW_EXCLUSIVE); hash_seq_init(&hash_seq, pgss_hash); while ((entry = hash_seq_search(&hash_seq)) != NULL) { - hash_search(pgss_hash, &entry->key, HASH_REMOVE, NULL); + if (entry->key.bucket_id == bucket || bucket == -1) + hash_search(pgss_hash, &entry->key, HASH_REMOVE, NULL); } hash_seq_init(&hash_seq, pgss_agghash); while ((dbentry = hash_seq_search(&hash_seq)) != NULL) { - hash_search(pgss_agghash, &dbentry->key, HASH_REMOVE, NULL); + if (dbentry->key.bucket_id == bucket || bucket == -1) + hash_search(pgss_agghash, &dbentry->key, HASH_REMOVE, NULL); } - - /* - * Write new empty query file, perhaps even creating a new one to recover - * if the file was missing. - */ - qfile = AllocateFile(PGSS_TEXT_FILE, PG_BINARY_W); - if (qfile == NULL) - { - ereport(LOG, - (errcode_for_file_access(), - errmsg("could not create pg_stat_monitor file \"%s\": %m", - PGSS_TEXT_FILE))); - goto done; - } - - /* If ftruncate fails, log it, but it's not a fatal problem */ - if (ftruncate(fileno(qfile), 0) != 0) - ereport(LOG, - (errcode_for_file_access(), - errmsg("could not truncate pg_stat_monitor file \"%s\": %m", - PGSS_TEXT_FILE))); - - FreeFile(qfile); - -done: - pgss->extent = 0; - /* This counts as a query text garbage collection for our purposes */ - record_gc_qtexts(); - LWLockRelease(pgss->lock); } @@ -3306,13 +2515,13 @@ static void fill_in_constant_lengths(pgssJumbleState *jstate, const char *query, int query_loc) { - pgssLocationLen *locs; - core_yyscan_t yyscanner; - core_yy_extra_type yyextra; - core_YYSTYPE yylval; - YYLTYPE yylloc; - int last_loc = -1; - int i; + pgssLocationLen *locs; + core_yyscan_t yyscanner; + core_yy_extra_type yyextra; + core_YYSTYPE yylval; + YYLTYPE yylloc; + int last_loc = -1; + int i; /* * Sort the records by location so that we can process them in order while @@ -3409,8 +2618,8 @@ fill_in_constant_lengths(pgssJumbleState *jstate, const char *query, static int comp_location(const void *a, const void *b) { - int l = ((const pgssLocationLen *) a)->location; - int r = ((const pgssLocationLen *) b)->location; + int l = ((const pgssLocationLen *) a)->location; + int r = ((const pgssLocationLen *) b)->location; if (l < r) return -1; @@ -3422,43 +2631,53 @@ comp_location(const void *a, const void *b) /* Convert array into Text dataum */ static Datum -array_get_datum(double arr[]) +array_get_datum(int arr[]) { int j; - char str[1024]; + char str[1024] = {0}; char tmp[10]; bool first = true; memset(str, 0, 1024); /* Need to calculate the actual size, and avoid unnessary memory usage */ - for (j = 0; j < 24; j++) + for (j = 0; j < 10; j++) { if (first) { - snprintf(tmp, 10, "%04.1f", arr[j]); + snprintf(tmp, 10, "%d", arr[j]); strcat(str,tmp); first = false; continue; } - snprintf(tmp, 10, ", %04.1f", arr[j]); + snprintf(tmp, 10, ", %d", arr[j]); strcat(str,tmp); } return CStringGetTextDatum(str); } +/* Alocate memory for a new entry */ +void add_object_entry(uint64 queryid, char *objects) +{ + pgssObjectEntry *entry = NULL; + bool found; + pgssObjectHashKey key; + + key.queryid = queryid; + entry = (pgssObjectEntry *) hash_search(pgss_object_hash, &key, HASH_ENTER, &found); + if (!found) + { + SpinLockAcquire(&entry->mutex); + snprintf(entry->tables_name, MAX_REL_LEN, "%s", objects); + SpinLockRelease(&entry->mutex); + } +} + /* Alocate memory for a new entry */ static pgssAggEntry * agg_entry_alloc(pgssAggHashKey *key) { - pgssAggEntry *entry; - bool found; - - /* - * Make space if needed in reallity this should not happen, - * because we alread delete entry case of non-aggregate query. - */ - while (hash_get_num_entries(pgss_hash) >= pgss_max) - entry_dealloc(); + pgssAggEntry *entry = NULL; + bool found; entry = (pgssAggEntry *) hash_search(pgss_agghash, key, HASH_ENTER, &found); if (!found) @@ -3466,36 +2685,34 @@ agg_entry_alloc(pgssAggHashKey *key) SpinLockAcquire(&entry->mutex); memset(&entry->counters, 0, sizeof(pgssAggCounters)); entry->counters.total_calls = 0; - entry->counters.first_call_time = GetCurrentTimestamp(); SpinLockRelease(&entry->mutex); } return entry; } static void -update_agg_counters(uint64 queryid, uint64 id, uint64 type) +update_agg_counters(uint64 bucket, uint64 queryid, uint64 id, AGG_KEY type) { pgssAggHashKey key; pgssAggEntry *entry; key.id = id; - key.type = type; + key.type = (int64) type; key.queryid = queryid; + key.bucket_id = bucket; entry = agg_entry_alloc(&key); if (!entry) - { - elog(WARNING, "no space left in shared buffer"); return; - } + SpinLockAcquire(&entry->mutex); + entry->key.queryid = queryid; entry->key.id = id; entry->key.type = key.type; + entry->key.bucket_id = bucket; + entry->counters.total_calls++; - if (entry->counters.total_calls == 1) - entry->counters.first_call_time = GetCurrentTimestamp(); - entry->counters.last_call_time= GetCurrentTimestamp(); SpinLockRelease(&entry->mutex); } @@ -3503,29 +2720,29 @@ Datum pg_stat_agg(PG_FUNCTION_ARGS) { ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; - TupleDesc tupdesc; - Tuplestorestate *tupstore; - MemoryContext per_query_ctx; - MemoryContext oldcontext; - HASH_SEQ_STATUS hash_seq; - pgssAggEntry *entry; + TupleDesc tupdesc; + Tuplestorestate *tupstore; + MemoryContext per_query_ctx; + MemoryContext oldcontext; + HASH_SEQ_STATUS hash_seq; + pgssAggEntry *entry; /* hash table must exist already */ if (!pgss || !pgss_agghash) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("pg_stat_monitor must be loaded via shared_preload_libraries"))); + errmsg("pg_stat_monitor: must be loaded via shared_preload_libraries"))); /* check to see if caller supports us returning a tuplestore */ if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo)) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("set-valued function called in context that cannot accept a set"))); + errmsg("pg_stat_monitor: set-valued function called in context that cannot accept a set"))); if (!(rsinfo->allowedModes & SFRM_Materialize)) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("materialize mode required, but it is not " \ + errmsg("pg_stat_monitor: materialize mode required, but it is not " \ "allowed in this context"))); /* Switch into long-lived context to construct returned data structures */ @@ -3534,10 +2751,10 @@ pg_stat_agg(PG_FUNCTION_ARGS) /* Build a tuple descriptor for our result type */ if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE) - elog(ERROR, "return type must be a row type"); + elog(ERROR, "pg_stat_monitor: return type must be a row type"); - if (tupdesc->natts != 6) - elog(ERROR, "incorrect number of output arguments, required %d", tupdesc->natts); + if (tupdesc->natts != 4) + elog(ERROR, "pg_stat_monitor: incorrect number of output arguments, required %d", tupdesc->natts); tupstore = tuplestore_begin_heap(true, false, work_mem); rsinfo->returnMode = SFRM_Materialize; @@ -3563,16 +2780,17 @@ pg_stat_agg(PG_FUNCTION_ARGS) { Datum values[6]; bool nulls[6]; - int i = 0; + int i = 0; + char queryid_txt[32]; memset(values, 0, sizeof(values)); memset(nulls, 0, sizeof(nulls)); - values[i++] = Int64GetDatumFast(entry->key.queryid); + + sprintf(queryid_txt, "%08lX", entry->key.queryid); + values[i++] = CStringGetTextDatum(queryid_txt); values[i++] = Int64GetDatumFast(entry->key.id); values[i++] = Int64GetDatumFast(entry->key.type); values[i++] = Int64GetDatumFast(entry->counters.total_calls); - values[i++] = TimestampGetDatum(entry->counters.first_call_time); - values[i++] = TimestampGetDatum(entry->counters.last_call_time); tuplestore_putvalues(tupstore, tupdesc, values, nulls); } @@ -3581,3 +2799,77 @@ pg_stat_agg(PG_FUNCTION_ARGS) tuplestore_donestoring(tupstore); return 0; } + +static unsigned long +locate_query(unsigned long bucket_id, unsigned long queryid, char * query) +{ + unsigned long id = 0; + unsigned long len = 0; + unsigned long offset = 0; + unsigned long head = pgss->query_fifo[bucket_id].head; + unsigned long tail = pgss->query_fifo[bucket_id].tail; + unsigned char *buf = pgss_qbuf[bucket_id]; + + while (head != tail) + { + offset = 0; + memcpy(&id, &buf[tail + offset], sizeof (unsigned long)); /* query id */ + + offset += sizeof (unsigned long); + memcpy(&len, &buf[tail + offset], sizeof (unsigned long)); /* query len */ + + if (len == 0) + return 0; + + offset += sizeof (unsigned long); + if (query != NULL) + { + memcpy(query, &buf[tail + offset], len); /* Actual query */ + query[len] = 0; + } + offset += len; + + if (id == queryid) + return id; + tail = (tail + offset) % pgss_query_buf_size_bucket; + } + return 0; +} + +static void +store_query(unsigned long queryid, const char *query, unsigned long query_len) +{ + int next; + int offset = 0; + + if (query_len > pgss_query_max_len) + query_len = pgss_query_max_len; + + /* Already have query in the shared buffer, there + * is no need to add that again. + */ + if (locate_query(pgss->current_wbucket, queryid, NULL) == queryid) + return; + + next = pgss->query_fifo[pgss->current_wbucket].head + query_len + sizeof (unsigned long) + sizeof (unsigned long); + if (next >= pgss_query_buf_size_bucket) + next = 0; + + /* Buffer is full */ + if (next == pgss->query_fifo[pgss->current_wbucket].tail) + { + elog(DEBUG2, "pg_stat_monitor: no space left in shared_buffer"); + return; + } + + offset = 0; + memcpy(&pgss_qbuf[pgss->current_wbucket][pgss->query_fifo[pgss->current_wbucket].head], &queryid, sizeof (unsigned long)); /* query id */ + offset += sizeof (unsigned long); + + memcpy(&pgss_qbuf[pgss->current_wbucket][pgss->query_fifo[pgss->current_wbucket].head + offset], &query_len, sizeof (unsigned long)); /* query len */ + offset += sizeof (unsigned long); + + memcpy(&pgss_qbuf[pgss->current_wbucket][pgss->query_fifo[pgss->current_wbucket].head + offset], query, query_len); /* actual query */ + + pgss->query_fifo[pgss->current_wbucket].head = next; +}