diff --git a/Makefile b/Makefile index 7c2397b..dc1c41a 100644 --- a/Makefile +++ b/Makefile @@ -1,7 +1,7 @@ # contrib/pg_stat_monitor/Makefile MODULE_big = pg_stat_monitor -OBJS = guc.o pg_stat_monitor.o $(WIN32RES) +OBJS = hash_query.o guc.o pg_stat_monitor.o $(WIN32RES) EXTENSION = pg_stat_monitor DATA = pg_stat_monitor--1.0.sql diff --git a/guc.c b/guc.c index 71c8b8b..87ebdb3 100644 --- a/guc.c +++ b/guc.c @@ -81,9 +81,9 @@ init_guc(void) conf[i++] = (GucVariable) { .guc_name = "pg_stat_monitor.pgsm_object_cache", .guc_desc = "Sets the maximum number of object cache", - .guc_default = 5, - .guc_min = 5, - .guc_max = 10, + .guc_default = 50, + .guc_min = 50, + .guc_max = INT_MAX, .guc_restart = true }; @@ -214,9 +214,9 @@ init_guc(void) "Sets the maximum number of object cache", NULL, &PGSM_OBJECT_CACHE, - 5, - 5, - 10, + 50, + 50, + INT_MAX, PGC_POSTMASTER, 0, NULL, diff --git a/hash_query.c b/hash_query.c new file mode 100644 index 0000000..c69e9a8 --- /dev/null +++ b/hash_query.c @@ -0,0 +1,383 @@ +/*------------------------------------------------------------------------- + * + * hash_query.c + * Track statement execution times across a whole database cluster. + * + * Copyright (c) 2008-2018, PostgreSQL Global Development Group + * + * IDENTIFICATION + * contrib/pg_stat_monitor/hash_query.c + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include "pg_stat_monitor.h" + +static pgssSharedState *pgss; +static HTAB *pgss_hash; +static HTAB *pgss_object_hash; +static HTAB *pgss_buckethash = NULL; +static HTAB *pgss_waiteventshash = NULL; + +static pgssBucketEntry **pgssBucketEntries = NULL; +static pgssWaitEventEntry **pgssWaitEventEntries = NULL; +static HTAB* hash_init(const char *hash_name, int key_size, int entry_size, int hash_size); + +static HTAB* +hash_init(const char *hash_name, int key_size, int entry_size, int hash_size) +{ + HASHCTL info; + memset(&info, 0, sizeof(info)); + info.keysize = key_size; + info.entrysize = entry_size; + return ShmemInitHash(hash_name, hash_size, hash_size, &info, HASH_ELEM | HASH_BLOBS); +} + +/* + * shmem_startup hook: allocate or attach to shared memory, + * then load any pre-existing statistics from file. + * Also create and load the query-texts file, which is expected to exist + * (even if empty) while the module is enabled. + */ +void +pgss_shmem_startup(void) +{ + bool found = false; + int32 i; + + if (prev_shmem_startup_hook) + prev_shmem_startup_hook(); + + /* reset in case this is a restart within the postmaster */ + pgss = NULL; + pgss_hash = NULL; + pgss_object_hash = NULL; + pgss_buckethash = NULL; + pgss_waiteventshash = NULL; + + /* + * Create or attach to the shared memory state, including hash table + */ + LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE); + + pgss = ShmemInitStruct("pg_stat_monitor", sizeof(pgssSharedState), &found); + if (!found) + { + /* First time through ... */ + pgss->lock = &(GetNamedLWLockTranche("pg_stat_monitor"))->lock; + SpinLockInit(&pgss->mutex); + ResetSharedState(pgss); + } + + pgss->query_buf_size_bucket = PGSM_QUERY_BUF_SIZE / PGSM_MAX_BUCKETS; + for (i = 0; i < PGSM_MAX_BUCKETS; i++) + pgss_qbuf[i] = (unsigned char *) ShmemAlloc(pgss->query_buf_size_bucket); + + pgss_hash = hash_init("pg_stat_monitor: Queries hashtable", + sizeof(pgssHashKey), + sizeof(pgssEntry), + PGSM_MAX); + + pgss_buckethash = hash_init("pg_stat_monitor: Bucket hashtable", + sizeof(pgssBucketHashKey), + sizeof(pgssBucketEntry), + PGSM_MAX_BUCKETS); + + pgss_waiteventshash = hash_init("pg_stat_monitor: Wait Event hashtable", + sizeof(pgssWaitEventKey), + sizeof(pgssWaitEventEntry), + 100); + + pgss_object_hash = hash_init("pg_stat_monitor: Object hashtable", + sizeof(pgssObjectHashKey), + sizeof(pgssObjectEntry), + PGSM_OBJECT_CACHE); + + Assert(IsHashInitialize()); + + pgssWaitEventEntries = malloc(sizeof (pgssWaitEventEntry) * MAX_BACKEND_PROCESES); + for (i = 0; i < MAX_BACKEND_PROCESES; i++) + { + pgssWaitEventKey key; + pgssWaitEventEntry *entry = NULL; + bool found = false; + + key.processid = i; + entry = (pgssWaitEventEntry *) hash_search(pgss_waiteventshash, &key, HASH_ENTER, &found); + if (!found) + { + SpinLockInit(&entry->mutex); + pgssWaitEventEntries[i] = entry; + } + } + + pgssBucketEntries = malloc(sizeof (pgssBucketEntry) * PGSM_MAX_BUCKETS); + for (i = 0; i < PGSM_MAX_BUCKETS; i++) + { + pgssBucketHashKey key; + pgssBucketEntry *entry = NULL; + bool found = false; + + key.bucket_id = i; + /* Find or create an entry with desired hash code */ + entry = (pgssBucketEntry *) hash_search(pgss_buckethash, &key, HASH_ENTER, &found); + if (!found) + { + memset(&entry->counters, 0, sizeof(pgssBucketCounters)); + SpinLockInit(&entry->mutex); + pgssBucketEntries[i] = entry; + } + } + + LWLockRelease(AddinShmemInitLock); + + /* + * If we're in the postmaster (or a standalone backend...), set up a shmem + * exit hook to dump the statistics to disk. + */ + if (!IsUnderPostmaster) + on_shmem_exit(pgss_shmem_shutdown, (Datum) 0); +} + +int +pgsm_get_bucket_size(void) +{ + Assert(pgss->query_buf_size_bucket <= 0); + return pgss->query_buf_size_bucket; +} + +pgssSharedState* pgsm_get_ss(void) +{ + Assert(pgss); + return pgss; +} + +HTAB* pgsm_get_hash(void) +{ + return pgss_hash; +} + +pgssBucketEntry** pgsm_get_bucket_entries(void) +{ + return pgssBucketEntries; +} + +HTAB* pgsm_get_wait_event_hash(void) +{ + return pgss_waiteventshash; +} + +pgssBucketEntry** pgsm_get_bucket(void) +{ + return pgssBucketEntries; +} + +pgssWaitEventEntry** pgsm_get_wait_event_entry(void) +{ + return pgssWaitEventEntries; +} + +/* + * shmem_shutdown hook: Dump statistics into file. + * + * Note: we don't bother with acquiring lock, because there should be no + * other processes running when this is called. + */ +void +pgss_shmem_shutdown(int code, Datum arg) +{ + elog(DEBUG2, "pg_stat_monitor: %s()", __FUNCTION__); + /* Don't try to dump during a crash. */ + if (code) + return; + + /* Safety check ... shouldn't get here unless shmem is set up. */ + if (IsHashInitialize()) + return; +} + +Size +hash_memsize(void) +{ + Size size; + + size = MAXALIGN(sizeof(pgssSharedState)); + size = add_size(size, hash_estimate_size(PGSM_MAX, sizeof(pgssEntry))); + + return size; +} + +pgssEntry * +hash_entry_alloc(pgssSharedState *pgss, pgssHashKey *key,int encoding) +{ + pgssEntry *entry = NULL; + bool found = false; + + if (pgss->bucket_entry[pgss->current_wbucket] >= (PGSM_MAX / PGSM_MAX_BUCKETS)) + { + pgss->bucket_overflow[pgss->current_wbucket]++; + return NULL; + } + + if (hash_get_num_entries(pgss_hash) >= PGSM_MAX) + return NULL; + + /* Find or create an entry with desired hash code */ + entry = (pgssEntry *) hash_search(pgss_hash, key, HASH_ENTER, &found); + if (!found) + { + pgss->bucket_entry[pgss->current_wbucket]++; + /* New entry, initialize it */ + + /* reset the statistics */ + memset(&entry->counters, 0, sizeof(Counters)); + /* set the appropriate initial usage count */ + /* re-initialize the mutex each time ... we assume no one using it */ + SpinLockInit(&entry->mutex); + /* ... and don't forget the query text metadata */ + entry->encoding = encoding; + } + return entry; +} + +/* + * Deallocate least-used entries. + * + * Caller must hold an exclusive lock on pgss->lock. + */ +void +hash_entry_dealloc(int bucket) +{ + HASH_SEQ_STATUS hash_seq; + pgssEntry *entry; + pgssEntry **entries; + int i; + int nvictims = 0; + + pgss->bucket_entry[bucket] = 0; + + entries = palloc(hash_get_num_entries(pgss_hash) * sizeof(pgssEntry *)); + hash_seq_init(&hash_seq, pgss_hash); + while ((entry = hash_seq_search(&hash_seq)) != NULL) + { + if (entry->key.bucket_id == bucket || bucket < 0) + entries[nvictims++] = entry; + } + + for (i = 0; i < nvictims; i++) + entry = hash_search(pgss_hash, &entries[i]->key, HASH_REMOVE, NULL); + + pfree(entries); +} + +/* + * Release all entries. + */ +void +hash_entry_reset() +{ + HASH_SEQ_STATUS hash_seq; + pgssEntry *entry; + pgssObjectEntry *objentry; + pgssWaitEventEntry *weentry; + + LWLockAcquire(pgss->lock, LW_EXCLUSIVE); + + hash_seq_init(&hash_seq, pgss_hash); + while ((entry = hash_seq_search(&hash_seq)) != NULL) + { + hash_search(pgss_hash, &entry->key, HASH_REMOVE, NULL); + } + + hash_seq_init(&hash_seq, pgss_buckethash); + while ((objentry = hash_seq_search(&hash_seq)) != NULL) + { + hash_search(pgss_buckethash, &objentry->key, HASH_REMOVE, NULL); + } + + hash_seq_init(&hash_seq, pgss_waiteventshash); + while ((weentry = hash_seq_search(&hash_seq)) != NULL) + { + hash_search(pgss_waiteventshash, &weentry->key, HASH_REMOVE, NULL); + } + pgss->current_wbucket = 0; + free(pgssWaitEventEntries); + free(pgssBucketEntries); + LWLockRelease(pgss->lock); +} + +void +add_object_entry(uint64 queryid, char *objects) +{ + pgssObjectEntry *entry = NULL; + bool found; + pgssObjectHashKey key; + + LWLockAcquire(pgss->lock, LW_EXCLUSIVE); + key.queryid = queryid; + entry = (pgssObjectEntry *) hash_search(pgss_object_hash, &key, HASH_ENTER, &found); + if (!found) + { + SpinLockAcquire(&entry->mutex); + snprintf(entry->tables_name, MAX_REL_LEN, "%s", objects); + SpinLockRelease(&entry->mutex); + } + LWLockRelease(pgss->lock); +} + +/* De-alocate memory */ +void +remove_object_entry(uint64 queryid, char *objects) +{ + pgssObjectHashKey key; + pgssObjectEntry *entry; + + key.queryid = queryid; + + LWLockAcquire(pgss->lock, LW_SHARED); + entry = (pgssObjectEntry *) hash_search(pgss_object_hash, &key, HASH_FIND, NULL); + if (entry != NULL) + { + snprintf(objects, MAX_REL_LEN, "%s", entry->tables_name); + hash_search(pgss_object_hash, &entry->key, HASH_REMOVE, NULL); + } + LWLockRelease(pgss->lock); +} + +pgssEntry* +pgsm_create_query_entry(unsigned int queryid, + unsigned int userid, + unsigned int dbid, + unsigned int bucket_id, + unsigned int ip) +{ + pgssHashKey key; + pgssEntry *entry = NULL; + int encoding = GetDatabaseEncoding(); + + key.queryid = queryid; + key.userid = userid; + key.dbid = dbid; + key.bucket_id = bucket_id; + key.ip = ip; + + /* Lookup the hash table entry with shared lock. */ + LWLockAcquire(pgss->lock, LW_SHARED); + entry = (pgssEntry *) hash_search(pgss_hash, &key, HASH_FIND, NULL); + if(!entry) + { + LWLockRelease(pgss->lock); + LWLockAcquire(pgss->lock, LW_EXCLUSIVE); + + /* OK to create a new hashtable entry */ + entry = hash_entry_alloc(pgss, &key, encoding); + } + return entry; +} + +bool IsHashInitialize(void) +{ + return (pgss || pgss_hash || pgss_object_hash || pgss_buckethash || pgss_waiteventshash); +} + diff --git a/pg_stat_monitor--1.0.sql b/pg_stat_monitor--1.0.sql index f681ebc..1c73e43 100644 --- a/pg_stat_monitor--1.0.sql +++ b/pg_stat_monitor--1.0.sql @@ -10,48 +10,48 @@ AS 'MODULE_PATHNAME' LANGUAGE C PARALLEL SAFE; CREATE FUNCTION pg_stat_monitor(IN showtext boolean, - OUT bucket int, - OUT userid oid, - OUT dbid oid, - - OUT queryid text, - OUT query text, - OUT bucket_start_time timestamptz, + OUT bucket int, + OUT userid oid, + OUT dbid oid, + OUT client_ip bigint, + + OUT queryid text, + OUT query text, + OUT bucket_start_time timestamptz, - OUT plan_calls int8, - OUT plan_total_time float8, - OUT plan_min_time float8, - OUT plan_max_time float8, - OUT plan_mean_time float8, - OUT plan_stddev_time float8, - OUT plan_rows int8, + OUT plan_calls int8, + OUT plan_total_time float8, + OUT plan_min_time float8, + OUT plan_max_time float8, + OUT plan_mean_time float8, + OUT plan_stddev_time float8, + OUT plan_rows int8, - OUT calls int8, - OUT total_time float8, - OUT min_time float8, - OUT max_time float8, - OUT mean_time float8, - OUT stddev_time float8, - OUT rows int8, + OUT total_calls int8, + OUT total_time float8, + OUT min_time float8, + OUT max_time float8, + OUT mean_time float8, + OUT stddev_time float8, + OUT effected_rows int8, - OUT shared_blks_hit int8, - OUT shared_blks_read int8, + OUT shared_blks_hit int8, + OUT shared_blks_read int8, OUT shared_blks_dirtied int8, OUT shared_blks_written int8, - OUT local_blks_hit int8, - OUT local_blks_read int8, - OUT local_blks_dirtied int8, - OUT local_blks_written int8, - OUT temp_blks_read int8, - OUT temp_blks_written int8, - OUT blk_read_time float8, - OUT blk_write_time float8, - OUT client_ip bigint, - OUT resp_calls text, - OUT cpu_user_time float8, - OUT cpu_sys_time float8, - OUT tables_names text + OUT local_blks_hit int8, + OUT local_blks_read int8, + OUT local_blks_dirtied int8, + OUT local_blks_written int8, + OUT temp_blks_read int8, + OUT temp_blks_written int8, + OUT blk_read_time float8, + OUT blk_write_time float8, + OUT resp_calls text, + OUT cpu_user_time float8, + OUT cpu_sys_time float8, + OUT tables_names text ) RETURNS SETOF record AS 'MODULE_PATHNAME', 'pg_stat_monitor' @@ -90,37 +90,29 @@ CREATE VIEW pg_stat_monitor_settings AS SELECT restart FROM pg_stat_monitor_settings(); -CREATE FUNCTION pg_stat_agg( - OUT queryid text, - OUT id bigint, - OUT type bigint, - OUT total_calls int) -RETURNS SETOF record -AS 'MODULE_PATHNAME', 'pg_stat_agg' -LANGUAGE C STRICT VOLATILE PARALLEL SAFE; - -- Register a view on the function for ease of use. CREATE VIEW pg_stat_monitor AS SELECT bucket, bucket_start_time, userid, dbid, - m.queryid, + '0.0.0.0'::inet + client_ip AS client_ip, + queryid, query, plan_calls, - round( CAST(plan_total_time as numeric), 2) as plan_total_time, - round( CAST(plan_min_time as numeric), 2) as plan_min_timei, - round( CAST(plan_max_time as numeric), 2) as plan_max_time, - round( CAST(plan_mean_time as numeric), 2) as plan_mean_time, - round( CAST(plan_stddev_time as numeric), 2) as plan_stddev_time, + round( CAST(plan_total_time as numeric), 2)::float8 as plan_total_time, + round( CAST(plan_min_time as numeric), 2)::float8 as plan_min_timei, + round( CAST(plan_max_time as numeric), 2)::float8 as plan_max_time, + round( CAST(plan_mean_time as numeric), 2)::float8 as plan_mean_time, + round( CAST(plan_stddev_time as numeric), 2)::float8 as plan_stddev_time, plan_rows, - calls, - round( CAST(total_time as numeric), 2)as total_time, - round( CAST(min_time as numeric), 2)as min_time, - round( CAST(max_time as numeric), 2)as max_time, - round( CAST(mean_time as numeric), 2)as mean_time, - round( CAST(stddev_time as numeric), 2)as stddev_time, - rows, + total_calls, + round( CAST(total_time as numeric), 2)::float8 as total_time, + round( CAST(min_time as numeric), 2)::float8 as min_time, + round( CAST(max_time as numeric), 2)::float8 as max_time, + round( CAST(mean_time as numeric), 2)::float8 as mean_time, + round( CAST(stddev_time as numeric), 2)::float8 as stddev_time, + effected_rows, shared_blks_hit, shared_blks_read, shared_blks_dirtied, @@ -133,15 +125,11 @@ CREATE VIEW pg_stat_monitor AS SELECT temp_blks_written, blk_read_time, blk_write_time, - client_ip as host, - '0.0.0.0'::inet + client_ip AS client_ip, (string_to_array(resp_calls, ',')) resp_calls, cpu_user_time, cpu_sys_time, - (string_to_array(tables_names, ',')) tables_names, - wait_event, - wait_event_type -from pg_stat_monitor(true) m LEFT OUTER JOIN pg_stat_wait_events() w ON w.queryid = m.queryid; + (string_to_array(tables_names, ',')) tables_names +FROM pg_stat_monitor(TRUE); -- Register a view on the function for ease of use. @@ -152,77 +140,25 @@ CREATE VIEW pg_stat_wait_events AS SELECT wait_event_type FROM pg_stat_monitor(true) m, pg_stat_wait_events() w WHERE w.queryid = m.queryid; -GRANT SELECT ON pg_stat_wait_events TO PUBLIC; -GRANT SELECT ON pg_stat_monitor TO PUBLIC; - -CREATE VIEW pg_stat_agg_database AS +/*CREATE VIEW pg_stat_monitor_db AS SELECT - ss.bucket, - agg.queryid, - agg.id AS dbid, - ss.userid, - client_ip, - agg.total_calls, - ss.min_time, - ss.max_time, - ss.mean_time, - ss.resp_calls, - ss.cpu_user_time, - ss.cpu_sys_time, - ss.query, - ss.tables_names -FROM pg_stat_agg() agg -INNER JOIN (SELECT DISTINCT bucket, queryid, dbid, userid, query, client_ip, min_time, max_time, mean_time, resp_calls, tables_names, cpu_user_time,cpu_sys_time -FROM pg_stat_monitor) ss -ON agg.queryid = ss.queryid AND agg.type = 0 AND id = dbid; + * +FROM pg_stat_monitor GROUP BY dbid; -CREATE VIEW pg_stat_agg_user AS +CREATE VIEW pg_stat_monitor_user AS SELECT - ss.bucket, - agg.queryid, - agg.id AS dbid, - ss.userid, - client_ip, - agg.total_calls, - ss.min_time, - ss.max_time, - ss.mean_time, - ss.resp_calls, - ss.cpu_user_time, - ss.cpu_sys_time, - ss.query, - ss.tables_names -FROM pg_stat_agg() agg -INNER JOIN (SELECT DISTINCT bucket, queryid, userid, query, client_ip, min_time, max_time, mean_time, resp_calls, tables_names, cpu_user_time,cpu_sys_time FROM pg_stat_monitor) ss -ON agg.queryid = ss.queryid AND agg.type = 1 AND id = userid; + * +FROM pg_stat_monitor GROUP BY userid; -CREATE VIEW pg_stat_agg_ip AS +CREATE VIEW pg_stat_monitor_ip AS SELECT - ss.bucket, - agg.queryid, - agg.id AS dbid, - ss.userid, - ss.client_ip, - ss.host, - agg.total_calls, - ss.min_time, - ss.max_time, - ss.mean_time, - ss.resp_calls, - ss.cpu_user_time, - ss.cpu_sys_time, - ss.query, - ss.tables_names -FROM pg_stat_agg() agg -INNER JOIN (SELECT DISTINCT bucket, queryid, userid, query, client_ip, host, min_time, max_time, mean_time, resp_calls, tables_names, cpu_user_time,cpu_sys_time FROM pg_stat_monitor) ss -ON agg.queryid = ss.queryid AND agg.type = 2 AND id = host; - - + * +FROM pg_stat_monitor GROUP BY client_ip; GRANT SELECT ON pg_stat_agg_user TO PUBLIC; GRANT SELECT ON pg_stat_agg_ip TO PUBLIC; GRANT SELECT ON pg_stat_agg_database TO PUBLIC; GRANT SELECT ON pg_stat_monitor_settings TO PUBLIC; - +*/ -- Don't want this to be available to non-superusers. REVOKE ALL ON FUNCTION pg_stat_monitor_reset() FROM PUBLIC; diff --git a/pg_stat_monitor.c b/pg_stat_monitor.c index 443d5ea..7a015fb 100644 --- a/pg_stat_monitor.c +++ b/pg_stat_monitor.c @@ -20,27 +20,23 @@ PG_MODULE_MAGIC; void _PG_init(void); void _PG_fini(void); - /*---- Local variables ----*/ /* Current nesting depth of ExecutorRun+ProcessUtility calls */ static int nested_level = 0; + #if PG_VERSION_NUM >= 130000 static int plan_nested_level = 0; static int exec_nested_level = 0; #endif + static struct rusage rusage_start; static struct rusage rusage_end; static volatile sig_atomic_t sigterm = false; static void handle_sigterm(SIGNAL_ARGS); -int query_buf_size_bucket; -HTAB * -CreateHash(const char *hash_name, int key_size, int entry_size, int hash_size); - /* Saved hook values in case of unload */ static planner_hook_type planner_hook_next = NULL; -static shmem_startup_hook_type prev_shmem_startup_hook = NULL; static post_parse_analyze_hook_type prev_post_parse_analyze_hook = NULL; static ExecutorStart_hook_type prev_ExecutorStart = NULL; static ExecutorRun_hook_type prev_ExecutorRun = NULL; @@ -48,23 +44,6 @@ static ExecutorFinish_hook_type prev_ExecutorFinish = NULL; static ExecutorEnd_hook_type prev_ExecutorEnd = NULL; static ProcessUtility_hook_type prev_ProcessUtility = NULL; -/* Links to shared memory state */ -static pgssSharedState *pgss = NULL; -static HTAB *pgss_hash = NULL; -static HTAB *pgss_object_hash = NULL; - -/* Hash table for aggegates */ -static HTAB *pgss_agghash = NULL; - -/* Hash table for aggegates */ -static HTAB *pgss_buckethash = NULL; - -/* Hash table for wait events */ -static HTAB *pgss_waiteventshash = NULL; - -static pgssBucketEntry **pgssBucketEntries = NULL; -static pgssWaitEventEntry **pgssWaitEventEntries = NULL; - PG_FUNCTION_INFO_V1(pg_stat_monitor_reset); PG_FUNCTION_INFO_V1(pg_stat_monitor_1_2); @@ -73,14 +52,9 @@ PG_FUNCTION_INFO_V1(pg_stat_monitor); PG_FUNCTION_INFO_V1(pg_stat_wait_events); PG_FUNCTION_INFO_V1(pg_stat_monitor_settings); -/* Extended version function prototypes */ -PG_FUNCTION_INFO_V1(pg_stat_agg); static uint pg_get_client_addr(void); static Datum array_get_datum(int arr[]); -static void update_agg_counters(uint64 bucket_id, uint64 queryid, uint64 id, AGG_KEY type); -static pgssAggEntry *agg_entry_alloc(pgssAggHashKey *key); -void add_object_entry(uint64 queryid, char *objects); #if PG_VERSION_NUM >= 130000 static PlannedStmt * pgss_planner_hook(Query *parse, const char *query_string, int cursorOptions, ParamListInfo boundParams); #else @@ -88,8 +62,6 @@ static void BufferUsageAccumDiff(BufferUsage* bufusage, BufferUsage* pgBufferUsa static PlannedStmt *pgss_planner_hook(Query *parse, int opt, ParamListInfo param); #endif -static void pgss_shmem_startup(void); -static void pgss_shmem_shutdown(int code, Datum arg); static void pgss_post_parse_analyze(ParseState *pstate, Query *query); static void pgss_ExecutorStart(QueryDesc *queryDesc, int eflags); static void pgss_ExecutorRun(QueryDesc *queryDesc, ScanDirection direction, uint64 count, bool execute_once); @@ -125,11 +97,7 @@ static void pgss_store(const char *query, uint64 queryId, static void pg_stat_monitor_internal(FunctionCallInfo fcinfo, bool showtext); -static Size pgss_memsize(void); -static pgssEntry *entry_alloc(pgssSharedState *pgss, pgssHashKey *key, Size query_offset, int query_len, int encoding, bool sticky); -static void entry_dealloc(int bucket_id); -static void entry_reset(void); static void AppendJumble(pgssJumbleState *jstate, const unsigned char *item, Size size); static void JumbleQuery(pgssJumbleState *jstate, Query *query); @@ -181,7 +149,7 @@ _PG_init(void) * the postmaster process.) We'll allocate or attach to the shared * resources in pgss_shmem_startup(). */ - RequestAddinShmemSpace(pgss_memsize()); + RequestAddinShmemSpace(hash_memsize()); RequestNamedLWLockTranche("pg_stat_monitor", 1); /* Register Wait events */ @@ -223,152 +191,9 @@ _PG_fini(void) ExecutorFinish_hook = prev_ExecutorFinish; ExecutorEnd_hook = prev_ExecutorEnd; ProcessUtility_hook = prev_ProcessUtility; - entry_reset(); + hash_entry_reset(); } -HTAB * -CreateHash(const char *hash_name, int key_size, int entry_size, int hash_size) -{ - HASHCTL info; - memset(&info, 0, sizeof(info)); - info.keysize = key_size; - info.entrysize = entry_size; - return ShmemInitHash(hash_name, hash_size, hash_size, &info, HASH_ELEM | HASH_BLOBS); -} - - -/* - * shmem_startup hook: allocate or attach to shared memory, - * then load any pre-existing statistics from file. - * Also create and load the query-texts file, which is expected to exist - * (even if empty) while the module is enabled. - */ -static void -pgss_shmem_startup(void) -{ - bool found = false; - int32 i; - - if (prev_shmem_startup_hook) - prev_shmem_startup_hook(); - - /* reset in case this is a restart within the postmaster */ - pgss = NULL; - pgss_hash = NULL; - pgss_object_hash = NULL; - pgss_agghash = NULL; - pgss_buckethash = NULL; - pgss_waiteventshash = NULL; - - /* - * Create or attach to the shared memory state, including hash table - */ - LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE); - - pgss = ShmemInitStruct("pg_stat_monitor", sizeof(pgssSharedState), &found); - if (!found) - { - /* First time through ... */ - pgss->lock = &(GetNamedLWLockTranche("pg_stat_monitor"))->lock; - SpinLockInit(&pgss->mutex); - ResetSharedState(pgss); - } - - query_buf_size_bucket = PGSM_QUERY_BUF_SIZE / PGSM_MAX_BUCKETS; - for (i = 0; i < PGSM_MAX_BUCKETS; i++) - pgss_qbuf[i] = (unsigned char *) ShmemAlloc(query_buf_size_bucket); - - pgss_hash = CreateHash("pg_stat_monitor: Queries hashtable", - sizeof(pgssHashKey), - sizeof(pgssEntry), - PGSM_MAX); - - pgss_buckethash = CreateHash("pg_stat_monitor: Bucket hashtable", - sizeof(pgssBucketHashKey), - sizeof(pgssBucketEntry), - PGSM_MAX_BUCKETS); - - pgss_waiteventshash = CreateHash("pg_stat_monitor: Wait Event hashtable", - sizeof(pgssWaitEventKey), - sizeof(pgssWaitEventEntry), - 100); - - pgss_object_hash = CreateHash("pg_stat_monitor: Object hashtable", - sizeof(pgssObjectHashKey), - sizeof(pgssObjectEntry), - PGSM_OBJECT_CACHE); - - pgss_agghash = CreateHash("pg_stat_monitor: Aggregate hashtable", - sizeof(pgssAggHashKey), - sizeof(pgssAggEntry), - PGSM_MAX * 3); - - Assert(IsHashInitialize()); - - pgssWaitEventEntries = malloc(sizeof (pgssWaitEventEntry) * MAX_BACKEND_PROCESES); - for (i = 0; i < MAX_BACKEND_PROCESES; i++) - { - pgssWaitEventKey key; - pgssWaitEventEntry *entry = NULL; - bool found = false; - - key.processid = i; - entry = (pgssWaitEventEntry *) hash_search(pgss_waiteventshash, &key, HASH_ENTER, &found); - if (!found) - { - SpinLockInit(&entry->mutex); - pgssWaitEventEntries[i] = entry; - } - } - - pgssBucketEntries = malloc(sizeof (pgssBucketEntry) * PGSM_MAX_BUCKETS); - for (i = 0; i < PGSM_MAX_BUCKETS; i++) - { - pgssBucketHashKey key; - pgssBucketEntry *entry = NULL; - bool found = false; - - key.bucket_id = i; - /* Find or create an entry with desired hash code */ - entry = (pgssBucketEntry *) hash_search(pgss_buckethash, &key, HASH_ENTER, &found); - if (!found) - { - memset(&entry->counters, 0, sizeof(pgssBucketCounters)); - SpinLockInit(&entry->mutex); - pgssBucketEntries[i] = entry; - } - } - - LWLockRelease(AddinShmemInitLock); - - /* - * If we're in the postmaster (or a standalone backend...), set up a shmem - * exit hook to dump the statistics to disk. - */ - if (!IsUnderPostmaster) - on_shmem_exit(pgss_shmem_shutdown, (Datum) 0); -} - -/* - * shmem_shutdown hook: Dump statistics into file. - * - * Note: we don't bother with acquiring lock, because there should be no - * other processes running when this is called. - */ -static void -pgss_shmem_shutdown(int code, Datum arg) -{ - elog(DEBUG2, "pg_stat_monitor: %s()", __FUNCTION__); - /* Don't try to dump during a crash. */ - if (code) - return; - - /* Safety check ... shouldn't get here unless shmem is set up. */ - if (IsHashInitialize()) - return; -} - - /* * Post-parse-analysis hook: mark query with a queryId */ @@ -434,9 +259,7 @@ pgss_post_parse_analyze(ParseState *pstate, Query *query) } } } - LWLockAcquire(pgss->lock, LW_EXCLUSIVE); add_object_entry(query->queryId, tables_name); - LWLockRelease(pgss->lock); } /* @@ -446,7 +269,6 @@ pgss_post_parse_analyze(ParseState *pstate, Query *query) if (query->queryId == UINT64CONST(0)) query->queryId = UINT64CONST(1); - if (PGSM_ENABLED == 1) if (jstate.clocations_count > 0) pgss_store(pstate->p_sourcetext, @@ -839,7 +661,11 @@ static void pgss_store(const char *query, uint64 queryId, bool reset = false; int i; char tables_name[MAX_REL_LEN] = {0}; - + int len; + pgssSharedState *pgss = pgsm_get_ss(); + pgssBucketEntry **pgssBucketEntries = pgsm_get_bucket(); + HTAB *pgss_hash = pgsm_get_hash(); + Assert(query != NULL); Assert(PGSM_ENABLED); @@ -885,29 +711,15 @@ static void pgss_store(const char *query, uint64 queryId, if (queryId == UINT64CONST(0)) queryId = pgss_hash_string(query, query_len); - - { - pgssObjectHashKey key; - pgssObjectEntry *entry; - - key.queryid = queryId; - - LWLockAcquire(pgss->lock, LW_SHARED); - entry = (pgssObjectEntry *) hash_search(pgss_object_hash, &key, HASH_FIND, NULL); - if (entry != NULL) - { - LWLockRelease(pgss->lock); - LWLockAcquire(pgss->lock, LW_EXCLUSIVE); - snprintf(tables_name, MAX_REL_LEN, "%s", entry->tables_name); - hash_search(pgss_object_hash, &entry->key, HASH_REMOVE, NULL); - } - LWLockRelease(pgss->lock); - } - + remove_object_entry(queryId, tables_name); + len = strlen(tables_name); + /* Set up key for hashtable search */ key.userid = GetUserId(); key.dbid = MyDatabaseId; key.queryid = queryId; + key.ip = pg_get_client_addr(); + key.bucket_id = get_next_wbucket(pgss); if (key.bucket_id != pgss->current_wbucket) @@ -915,7 +727,6 @@ static void pgss_store(const char *query, uint64 queryId, reset = true; pgss->current_wbucket = key.bucket_id; } - /* Lookup the hash table entry with shared lock. */ LWLockAcquire(pgss->lock, LW_SHARED); entry = (pgssEntry *) hash_search(pgss_hash, &key, HASH_FIND, NULL); @@ -942,7 +753,7 @@ static void pgss_store(const char *query, uint64 queryId, LWLockAcquire(pgss->lock, LW_EXCLUSIVE); /* OK to create a new hashtable entry */ - entry = entry_alloc(pgss, &key, 0, query_len, encoding, jstate != NULL); + entry = hash_entry_alloc(pgss, &key, encoding); if (entry == NULL) goto exit; } @@ -968,11 +779,6 @@ static void pgss_store(const char *query, uint64 queryId, if (reset) memset(&entry->counters, 0, sizeof(Counters)); - /* Calculate the agregates for database/user and host */ - update_agg_counters(entry->key.bucket_id, key.queryid, key.dbid, AGG_KEY_DATABASE); - update_agg_counters(entry->key.bucket_id, key.queryid, key.userid, AGG_KEY_USER); - update_agg_counters(entry->key.bucket_id, key.queryid, pg_get_client_addr(), AGG_KEY_HOST); - /* "Unstick" entry if it was previously sticky */ if (e->counters.calls[kind].calls == 0) e->counters.calls[kind].usage = USAGE_INIT; @@ -1033,7 +839,7 @@ static void pgss_store(const char *query, uint64 queryId, e->counters.info.host = pg_get_client_addr(); e->counters.sysinfo.utime = utime; e->counters.sysinfo.stime = stime; - for(i = 0; i < MAX_REL_LEN - 1; i++) + for(i = 0; i < len; i++) e->counters.info.tables_name[i] = tables_name[i]; SpinLockRelease(&e->mutex); } @@ -1053,11 +859,12 @@ exit: Datum pg_stat_monitor_reset(PG_FUNCTION_ARGS) { - if (!pgss || !pgss_hash || !pgss_agghash || !pgss_buckethash || !pgss_waiteventshash) + /* Safety check... */ + if (!IsHashInitialize()) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("pg_stat_monitor: must be loaded via shared_preload_libraries"))); - entry_dealloc(-1); + hash_entry_dealloc(-1); PG_RETURN_VOID(); } @@ -1066,19 +873,20 @@ pg_stat_monitor_reset(PG_FUNCTION_ARGS) Datum pg_stat_wait_events(PG_FUNCTION_ARGS) { - ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; - TupleDesc tupdesc; - Tuplestorestate *tupstore; - MemoryContext per_query_ctx; - MemoryContext oldcontext; - HASH_SEQ_STATUS hash_seq; - pgssWaitEventEntry *entry; - char *query_txt; - char queryid_txt[64]; - query_txt = (char*) malloc(PGSM_QUERY_MAX_LEN); + ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; + TupleDesc tupdesc; + Tuplestorestate *tupstore; + MemoryContext per_query_ctx; + MemoryContext oldcontext; + HASH_SEQ_STATUS hash_seq; + pgssWaitEventEntry *entry; + char *query_txt; + char queryid_txt[64]; + pgssSharedState *pgss = pgsm_get_ss(); + HTAB *pgss_waiteventshash = pgsm_get_wait_event_hash(); - /* hash table must exist already */ - if (!pgss || !pgss_hash || !pgss_object_hash) + /* Safety check... */ + if (!IsHashInitialize()) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("pg_stat_monitor: must be loaded via shared_preload_libraries"))); @@ -1094,6 +902,8 @@ pg_stat_wait_events(PG_FUNCTION_ARGS) errmsg("pg_stat_monitor: materialize mode required, but it is not " \ "allowed in this context"))); + query_txt = (char*) malloc(PGSM_QUERY_MAX_LEN); + /* Switch into long-lived context to construct returned data structures */ per_query_ctx = rsinfo->econtext->ecxt_per_query_memory; oldcontext = MemoryContextSwitchTo(per_query_ctx); @@ -1161,7 +971,6 @@ pg_stat_wait_events(PG_FUNCTION_ARGS) Datum pg_stat_monitor(PG_FUNCTION_ARGS) { - /* If it's really API 1.1, we'll figure that out below */ pg_stat_monitor_internal(fcinfo, true); return (Datum) 0; } @@ -1171,24 +980,28 @@ static void pg_stat_monitor_internal(FunctionCallInfo fcinfo, bool showtext) { - ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; - TupleDesc tupdesc; - Tuplestorestate *tupstore; - MemoryContext per_query_ctx; - MemoryContext oldcontext; - Oid userid = GetUserId(); - bool is_allowed_role = false; - HASH_SEQ_STATUS hash_seq; - pgssEntry *entry; - char *query_txt; - char queryid_txt[64]; - query_txt = (char*) malloc(PGSM_QUERY_MAX_LEN); + ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; + TupleDesc tupdesc; + Tuplestorestate *tupstore; + MemoryContext per_query_ctx; + MemoryContext oldcontext; + Oid userid = GetUserId(); + bool is_allowed_role = false; + HASH_SEQ_STATUS hash_seq; + pgssEntry *entry; + char *query_txt; + char queryid_txt[64]; + pgssSharedState *pgss = pgsm_get_ss(); + HTAB *pgss_hash = pgsm_get_hash(); + pgssBucketEntry **pgssBucketEntries = pgsm_get_bucket_entries(); + query_txt = (char*) malloc(PGSM_QUERY_MAX_LEN); + /* Superusers or members of pg_read_all_stats members are allowed */ is_allowed_role = is_member_of_role(GetUserId(), DEFAULT_ROLE_READ_ALL_STATS); - /* hash table must exist already */ - if (!pgss || !pgss_hash || !pgss_object_hash) + /* Safety check... */ + if (!IsHashInitialize()) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("pg_stat_monitor: must be loaded via shared_preload_libraries"))); @@ -1243,6 +1056,7 @@ pg_stat_monitor_internal(FunctionCallInfo fcinfo, values[i++] = ObjectIdGetDatum(entry->key.bucket_id); values[i++] = ObjectIdGetDatum(entry->key.userid); values[i++] = ObjectIdGetDatum(entry->key.dbid); + values[i++] = Int64GetDatumFast(entry->key.ip); /* copy counters to a local variable to keep locking time short */ { volatile pgssEntry *e = (volatile pgssEntry *) entry; @@ -1307,7 +1121,6 @@ pg_stat_monitor_internal(FunctionCallInfo fcinfo, values[i++] = Int64GetDatumFast(tmp.blocks.temp_blks_written); values[i++] = Float8GetDatumFast(tmp.blocks.blk_read_time); values[i++] = Float8GetDatumFast(tmp.blocks.blk_write_time); - values[i++] = Int64GetDatumFast(tmp.info.host); values[i++] = ArrayGetTextDatum(pgssBucketEntries[entry->key.bucket_id]->counters.resp_calls); values[i++] = Float8GetDatumFast(tmp.sysinfo.utime); values[i++] = Float8GetDatumFast(tmp.sysinfo.stime); @@ -1325,78 +1138,13 @@ pg_stat_monitor_internal(FunctionCallInfo fcinfo, tuplestore_donestoring(tupstore); } -/* - * Estimate shared memory space needed. - */ -static Size -pgss_memsize(void) -{ - Size size; - - size = MAXALIGN(sizeof(pgssSharedState)); - size = add_size(size, hash_estimate_size(PGSM_MAX, sizeof(pgssEntry))); - - return size; -} - -/* - * Allocate a new hashtable entry. - * caller must hold an exclusive lock on pgss->lock - * - * "query" need not be null-terminated; we rely on query_len instead - * - * If "sticky" is true, make the new entry artificially sticky so that it will - * probably still be there when the query finishes execution. We do this by - * giving it a median usage value rather than the normal value. (Strictly - * speaking, query strings are normalized on a best effort basis, though it - * would be difficult to demonstrate this even under artificial conditions.) - * - * Note: despite needing exclusive lock, it's not an error for the target - * entry to already exist. This is because pgss_store releases and - * reacquires lock after failing to find a match; so someone else could - * have made the entry while we waited to get exclusive lock. - */ -static pgssEntry * -entry_alloc(pgssSharedState *pgss, pgssHashKey *key, Size query_offset, int query_len, int encoding, - bool sticky) -{ - pgssEntry *entry = NULL; - bool found = false; - - if (pgss->bucket_entry[pgss->current_wbucket] >= (PGSM_MAX / PGSM_MAX_BUCKETS)) - { - pgss->bucket_overflow[pgss->current_wbucket]++; - return NULL; - } - - if (hash_get_num_entries(pgss_hash) >= PGSM_MAX) - return NULL; - - /* Find or create an entry with desired hash code */ - entry = (pgssEntry *) hash_search(pgss_hash, key, HASH_ENTER, &found); - if (!found) - { - pgss->bucket_entry[pgss->current_wbucket]++; - /* New entry, initialize it */ - - /* reset the statistics */ - memset(&entry->counters, 0, sizeof(Counters)); - /* set the appropriate initial usage count */ - entry->counters.calls[0].usage = sticky ? pgss->cur_median_usage : USAGE_INIT; - /* re-initialize the mutex each time ... we assume no one using it */ - SpinLockInit(&entry->mutex); - /* ... and don't forget the query text metadata */ - entry->encoding = encoding; - } - return entry; -} - static uint64 get_next_wbucket(pgssSharedState *pgss) { struct timeval tv; uint64 current_usec; uint64 bucket_id; + pgssBucketEntry **pgssBucketEntries = pgsm_get_bucket(); gettimeofday(&tv,NULL); current_usec = tv.tv_sec; @@ -1408,7 +1156,7 @@ get_next_wbucket(pgssSharedState *pgss) bucket_id = 0; LWLockAcquire(pgss->lock, LW_EXCLUSIVE); - entry_dealloc(bucket_id); + hash_entry_dealloc(bucket_id); /* reset the query buffer */ pgss->query_fifo[bucket_id].head = 0; pgss->query_fifo[bucket_id].tail = 0; @@ -1422,94 +1170,6 @@ get_next_wbucket(pgssSharedState *pgss) return pgss->current_wbucket; } -/* - * Deallocate least-used entries. - * - * Caller must hold an exclusive lock on pgss->lock. - */ -static void -entry_dealloc(int bucket) -{ - HASH_SEQ_STATUS hash_seq; - HASH_SEQ_STATUS hash_dbseq; - pgssEntry *entry; - pgssAggEntry *agg_entry; - pgssEntry **entries; - pgssAggEntry **agg_entries; - int i; - int nvictims = 0; - - pgss->bucket_entry[bucket] = 0; - - entries = palloc(hash_get_num_entries(pgss_hash) * sizeof(pgssEntry *)); - hash_seq_init(&hash_seq, pgss_hash); - while ((entry = hash_seq_search(&hash_seq)) != NULL) - { - if (entry->key.bucket_id == bucket || bucket < 0) - entries[nvictims++] = entry; - } - - for (i = 0; i < nvictims; i++) - entry = hash_search(pgss_hash, &entries[i]->key, HASH_REMOVE, NULL); - - nvictims = 0; - agg_entries = palloc(hash_get_num_entries(pgss_agghash) * sizeof(pgssAggEntry *)); - hash_seq_init(&hash_dbseq, pgss_agghash); - while ((agg_entry = hash_seq_search(&hash_dbseq)) != NULL) - { - if (agg_entry->key.bucket_id == bucket || bucket < 0) - agg_entries[nvictims++] = agg_entry; - } - for (i = 0; i < nvictims; i++) - hash_search(pgss_agghash, &agg_entries[i]->key, HASH_REMOVE, NULL); - - pfree(entries); - pfree(agg_entries); -} - -/* - * Release all entries. - */ -static void -entry_reset() -{ - HASH_SEQ_STATUS hash_seq; - pgssEntry *entry; - pgssAggEntry *dbentry; - pgssObjectEntry *objentry; - pgssWaitEventEntry *weentry; - - LWLockAcquire(pgss->lock, LW_EXCLUSIVE); - - hash_seq_init(&hash_seq, pgss_hash); - while ((entry = hash_seq_search(&hash_seq)) != NULL) - { - hash_search(pgss_hash, &entry->key, HASH_REMOVE, NULL); - } - - hash_seq_init(&hash_seq, pgss_agghash); - while ((dbentry = hash_seq_search(&hash_seq)) != NULL) - { - hash_search(pgss_agghash, &dbentry->key, HASH_REMOVE, NULL); - } - - hash_seq_init(&hash_seq, pgss_buckethash); - while ((objentry = hash_seq_search(&hash_seq)) != NULL) - { - hash_search(pgss_buckethash, &dbentry->key, HASH_REMOVE, NULL); - } - - hash_seq_init(&hash_seq, pgss_waiteventshash); - while ((weentry = hash_seq_search(&hash_seq)) != NULL) - { - hash_search(pgss_waiteventshash, &dbentry->key, HASH_REMOVE, NULL); - } - pgss->current_wbucket = 0; - free(pgssWaitEventEntries); - free(pgssBucketEntries); - LWLockRelease(pgss->lock); -} - /* * AppendJumble: Append a value that is substantive in a given query to * the current jumble. @@ -2441,151 +2101,6 @@ array_get_datum(int arr[]) return CStringGetTextDatum(str); } -/* Alocate memory for a new entry */ -void add_object_entry(uint64 queryid, char *objects) -{ - pgssObjectEntry *entry = NULL; - bool found; - pgssObjectHashKey key; - - key.queryid = queryid; - entry = (pgssObjectEntry *) hash_search(pgss_object_hash, &key, HASH_ENTER, &found); - if (!found) - { - SpinLockAcquire(&entry->mutex); - snprintf(entry->tables_name, MAX_REL_LEN, "%s", objects); - SpinLockRelease(&entry->mutex); - } -} - -/* Alocate memory for a new entry */ -static pgssAggEntry * -agg_entry_alloc(pgssAggHashKey *key) -{ - pgssAggEntry *entry = NULL; - bool found; - - entry = (pgssAggEntry *) hash_search(pgss_agghash, key, HASH_ENTER, &found); - if (!found) - { - SpinLockAcquire(&entry->mutex); - memset(&entry->counters, 0, sizeof(pgssAggCounters)); - entry->counters.total_calls = 0; - SpinLockRelease(&entry->mutex); - } - return entry; -} - -static void -update_agg_counters(uint64 bucket, uint64 queryid, uint64 id, AGG_KEY type) -{ - pgssAggHashKey key; - pgssAggEntry *entry; - - key.id = id; - key.type = (int64) type; - key.queryid = queryid; - key.bucket_id = bucket; - - entry = agg_entry_alloc(&key); - if (!entry) - return; - - SpinLockAcquire(&entry->mutex); - - entry->key.queryid = queryid; - entry->key.id = id; - entry->key.type = key.type; - entry->key.bucket_id = bucket; - - entry->counters.total_calls++; - SpinLockRelease(&entry->mutex); -} - -Datum -pg_stat_agg(PG_FUNCTION_ARGS) -{ - ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; - TupleDesc tupdesc; - Tuplestorestate *tupstore; - MemoryContext per_query_ctx; - MemoryContext oldcontext; - HASH_SEQ_STATUS hash_seq; - pgssAggEntry *entry; - - /* hash table must exist already */ - if (!pgss || !pgss_agghash) - ereport(ERROR, - (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("pg_stat_monitor: must be loaded via shared_preload_libraries"))); - - /* check to see if caller supports us returning a tuplestore */ - if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo)) - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("pg_stat_monitor: set-valued function called in context that cannot accept a set"))); - - if (!(rsinfo->allowedModes & SFRM_Materialize)) - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("pg_stat_monitor: materialize mode required, but it is not " \ - "allowed in this context"))); - - /* Switch into long-lived context to construct returned data structures */ - per_query_ctx = rsinfo->econtext->ecxt_per_query_memory; - oldcontext = MemoryContextSwitchTo(per_query_ctx); - - /* Build a tuple descriptor for our result type */ - if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE) - elog(ERROR, "pg_stat_monitor: return type must be a row type"); - - if (tupdesc->natts != 4) - elog(ERROR, "pg_stat_monitor: incorrect number of output arguments, required %d", tupdesc->natts); - - tupstore = tuplestore_begin_heap(true, false, work_mem); - rsinfo->returnMode = SFRM_Materialize; - rsinfo->setResult = tupstore; - rsinfo->setDesc = tupdesc; - - MemoryContextSwitchTo(oldcontext); - - /* - * Get shared lock, load or reload the query text file if we must, and - * iterate over the hashtable entries. - * - * With a large hash table, we might be holding the lock rather longer - * than one could wish. However, this only blocks creation of new hash - * table entries, and the larger the hash table the less likely that is to - * be needed. So we can hope this is okay. Perhaps someday we'll decide - * we need to partition the hash table to limit the time spent holding any - * one lock. - */ - LWLockAcquire(pgss->lock, LW_SHARED); - hash_seq_init(&hash_seq, pgss_agghash); - while ((entry = hash_seq_search(&hash_seq)) != NULL) - { - Datum values[6]; - bool nulls[6]; - int i = 0; - char queryid_txt[32]; - - memset(values, 0, sizeof(values)); - memset(nulls, 0, sizeof(nulls)); - - sprintf(queryid_txt, "%08lX", entry->key.queryid); - values[i++] = CStringGetTextDatum(queryid_txt); - values[i++] = Int64GetDatumFast(entry->key.id); - values[i++] = Int64GetDatumFast(entry->key.type); - values[i++] = Int64GetDatumFast(entry->counters.total_calls); - tuplestore_putvalues(tupstore, tupdesc, values, nulls); - } - - /* clean up and return the tuplestore */ - LWLockRelease(pgss->lock); - tuplestore_donestoring(tupstore); - return 0; -} - #define FIFO_HEAD(b) pgss->query_fifo[b].head #define FIFO_TAIL(b) pgss->query_fifo[b].tail @@ -2595,6 +2110,8 @@ locate_query(uint64 bucket_id, uint64 queryid, char * query) uint64 id = 0; uint64 len = 0; uint64 offset = 0; + pgssSharedState *pgss = pgsm_get_ss(); + uint64 tail = FIFO_TAIL(bucket_id); unsigned char *buf = pgss_qbuf[bucket_id]; @@ -2619,7 +2136,7 @@ locate_query(uint64 bucket_id, uint64 queryid, char * query) if (id == queryid) return id; - tail = (tail + offset) % query_buf_size_bucket; + tail = (tail + offset) % pgsm_get_bucket_size(); } return 0; } @@ -2627,8 +2144,9 @@ locate_query(uint64 bucket_id, uint64 queryid, char * query) static void store_query(uint64 queryid, const char *query, uint64 query_len) { - int next; - int offset = 0; + int next; + int offset = 0; + pgssSharedState *pgss = pgsm_get_ss(); if (query_len > PGSM_QUERY_MAX_LEN) query_len = PGSM_QUERY_MAX_LEN; @@ -2640,7 +2158,7 @@ store_query(uint64 queryid, const char *query, uint64 query_len) return; next = FIFO_HEAD(pgss->current_wbucket) + query_len + sizeof (uint64) + sizeof (uint64); - if (next >= query_buf_size_bucket) + if (next >= pgsm_get_bucket_size()) next = 0; /* Buffer is full */ @@ -2669,6 +2187,8 @@ static PlannedStmt *pgss_planner_hook(Query *parse, int opt, ParamListInfo param #endif { PlannedStmt *result; + pgssWaitEventEntry **pgssWaitEventEntries = pgsm_get_wait_event_entry(); + if (MyProc) { int i = MyProc - ProcGlobal->allProcs; @@ -2751,8 +2271,9 @@ static PlannedStmt *pgss_planner_hook(Query *parse, int opt, ParamListInfo param static void update_wait_event(void) { - PGPROC *proc = NULL; - int i; + PGPROC *proc = NULL; + int i; + pgssWaitEventEntry **pgssWaitEventEntries = pgsm_get_wait_event_entry(); LWLockAcquire(ProcArrayLock, LW_SHARED); for (i = 0; i < ProcGlobal->allProcCount; i++) diff --git a/pg_stat_monitor.h b/pg_stat_monitor.h index a0e33ba..efc8778 100644 --- a/pg_stat_monitor.h +++ b/pg_stat_monitor.h @@ -36,8 +36,6 @@ #include "utils/lsyscache.h" #include "utils/guc.h" -#define IsHashInitialize() (pgss || pgss_hash || pgss_object_hash || pgss_agghash || pgss_buckethash || pgss_waiteventshash) - #define MAX_BACKEND_PROCESES (MaxBackends + NUM_AUXILIARY_PROCS + max_prepared_xacts) /* Time difference in miliseconds */ @@ -131,42 +129,21 @@ typedef struct pgssObjectEntry slock_t mutex; /* protects the counters only */ } pgssObjectEntry; -/* Aggregate shared memory storage */ -typedef struct pgssAggHashKey -{ - uint64 id; /* dbid, userid or ip depend upon the type */ - uint64 type; /* type of id dbid, userid or ip */ - uint64 queryid; /* query identifier, foreign key to the query */ - uint64 bucket_id; /* bucket_id is the foreign key to pgssBucketHashKey */ -} pgssAggHashKey; - -typedef struct pgssAggCounters -{ - uint64 total_calls; /* number of quries per database/user/ip */ -} pgssAggCounters; - -typedef struct pgssAggEntry -{ - pgssAggHashKey key; /* hash key of entry - MUST BE FIRST */ - pgssAggCounters counters; /* the statistics aggregates */ - slock_t mutex; /* protects the counters only */ -} pgssAggEntry; - - typedef struct pgssWaitEventKey { + uint64 queryid; uint64 processid; } pgssWaitEventKey; #define MAX_QUERY_LEN 1024 typedef struct pgssWaitEventEntry { - pgssAggHashKey key; /* hash key of entry - MUST BE FIRST */ - uint64 queryid; - uint64 pid; - uint32 wait_event_info; - char query[MAX_QUERY_LEN]; - slock_t mutex; /* protects the counters only */ + pgssWaitEventKey key; /* hash key of entry - MUST BE FIRST */ + uint64 queryid; + uint64 pid; + uint32 wait_event_info; + char query[MAX_QUERY_LEN]; + slock_t mutex; /* protects the counters only */ } pgssWaitEventEntry; @@ -177,6 +154,7 @@ typedef struct pgssHashKey uint64 queryid; /* query identifier */ Oid userid; /* user OID */ Oid dbid; /* database OID */ + uint32 ip; /* client ip address */ } pgssHashKey; typedef struct QueryInfo @@ -273,6 +251,7 @@ typedef struct pgssSharedState uint64 bucket_overflow[MAX_BUCKETS]; uint64 bucket_entry[MAX_BUCKETS]; QueryFifo query_fifo[MAX_BUCKETS]; + int query_buf_size_bucket; } pgssSharedState; #define ResetSharedState(x) \ @@ -325,9 +304,31 @@ typedef struct pgssJumbleState int highest_extern_param_id; } pgssJumbleState; +/* Links to shared memory state */ + /* guc.c */ void init_guc(void); +/* hash_create.c */ +void add_object_entry(uint64 queryid, char *objects); +void remove_object_entry(uint64 queryid, char *objects); +bool IsHashInitialize(void); +void pgss_shmem_startup(void); +void pgss_shmem_shutdown(int code, Datum arg); +shmem_startup_hook_type prev_shmem_startup_hook; +int pgsm_get_bucket_size(void); +pgssSharedState* pgsm_get_ss(void); +pgssBucketEntry** pgsm_get_bucket_entries(void); +HTAB* pgsm_get_wait_event_hash(void); +pgssBucketEntry** pgsm_get_bucket(void); +HTAB* pgsm_get_hash(void); +pgssWaitEventEntry** pgsm_get_wait_event_entry(void); +void hash_entry_reset(void); +void hash_entry_dealloc(int bucket); +pgssEntry* hash_entry_alloc(pgssSharedState *pgss, pgssHashKey *key, int encoding); +Size hash_memsize(void); +pgssEntry* pgsm_create_query_entry(unsigned int queryid, unsigned int userid, unsigned int dbid, unsigned int bucket_id, unsigned int ip); + /*---- GUC variables ----*/ #define PGSM_MAX conf[0].guc_variable #define PGSM_QUERY_MAX_LEN conf[1].guc_variable @@ -336,10 +337,10 @@ void init_guc(void); #define PGSM_NORMALIZED_QUERY conf[4].guc_variable #define PGSM_MAX_BUCKETS conf[5].guc_variable #define PGSM_BUCKET_TIME conf[6].guc_variable -#define PGSM_QUERY_BUF_SIZE conf[7].guc_variable -#define PGSM_OBJECT_CACHE conf[8].guc_variable -#define PGSM_RESPOSE_TIME_LOWER_BOUND conf[9].guc_variable -#define PGSM_RESPOSE_TIME_STEP conf[10].guc_variable +#define PGSM_OBJECT_CACHE conf[7].guc_variable +#define PGSM_RESPOSE_TIME_LOWER_BOUND conf[8].guc_variable +#define PGSM_RESPOSE_TIME_STEP conf[9].guc_variable +#define PGSM_QUERY_BUF_SIZE conf[10].guc_variable #define PGSM_TRACK_PLANNING conf[11].guc_variable GucVariable conf[12];