diff --git a/README.md b/README.md index 4e7883f..b3d17bb 100644 --- a/README.md +++ b/README.md @@ -401,6 +401,19 @@ postgres=# select userid, dbid, queryid, query, calls from pg_stat_monitor; 10 | 12696 | 85900141D214EC52 | select bucket, bucket_start_time, query from pg_stat_monitor | 2 10 | 12696 | F1AC132034D5B366 | SELECT * FROM foo | 1 ``` +**`elevel`**, **`sqlcode`**,**`message`**,: error level / sql code and log/warning/error message + +postgres=# select substr(query,0,50) as query, decode_error_level(elevel)as elevel,sqlcode,calls, substr(message,0,50) message from pg_stat_monitor; + query | elevel | sqlcode | calls | message +---------------------------------------------------+--------+---------+-------+--------------------------------------------------- + select substr(query,$1,$2) as query, decode_error | | 0 | 1 | + select bucket,substr(query,$1,$2),decode_error_le | | 0 | 3 | + | LOG | 0 | 1 | database system is ready to accept connections + select 1/0; | ERROR | 130 | 1 | division by zero + | LOG | 0 | 1 | database system was shut down at 2020-11-11 11:37 + select $1/$2 | | 0 | 1 | +(6 rows) + **`total_time`**, **`min_time`**, **`max_time`**, **`mean_time`**: The total / minimum / maximum and mean time spent for the same query. diff --git a/hash_query.c b/hash_query.c index 758fe37..33a7877 100644 --- a/hash_query.c +++ b/hash_query.c @@ -17,7 +17,6 @@ static pgssSharedState *pgss; static HTAB *pgss_hash; static HTAB *pgss_object_hash; -static HTAB *pgss_buckethash = NULL; static HTAB *pgss_waiteventshash = NULL; static pgssWaitEventEntry **pgssWaitEventEntries = NULL; @@ -43,7 +42,6 @@ pgss_startup(void) pgss = NULL; pgss_hash = NULL; pgss_object_hash = NULL; - pgss_buckethash = NULL; pgss_waiteventshash = NULL; /* @@ -111,25 +109,21 @@ pgsm_get_bucket_size(void) pgssSharedState* pgsm_get_ss(void) { - Assert(pgss); return pgss; } HTAB* pgsm_get_hash(void) { - Assert(pgss_hash); return pgss_hash; } HTAB* pgsm_get_wait_event_hash(void) { - Assert(pgss_waiteventshash); return pgss_waiteventshash; } pgssWaitEventEntry** pgsm_get_wait_event_entry(void) { - Assert(pgssWaitEventEntries); return pgssWaitEventEntries; } @@ -143,12 +137,13 @@ void pgss_shmem_shutdown(int code, Datum arg) { elog(DEBUG2, "pg_stat_monitor: %s()", __FUNCTION__); + /* Don't try to dump during a crash. */ if (code) return; /* Safety check ... shouldn't get here unless shmem is set up. */ - if (IsHashInitialize()) + if (!IsHashInitialize()) return; } @@ -225,7 +220,6 @@ hash_entry_reset() { HASH_SEQ_STATUS hash_seq; pgssEntry *entry; - pgssObjectEntry *objentry; pgssWaitEventEntry *weentry; LWLockAcquire(pgss->lock, LW_EXCLUSIVE); @@ -236,12 +230,6 @@ hash_entry_reset() hash_search(pgss_hash, &entry->key, HASH_REMOVE, NULL); } - hash_seq_init(&hash_seq, pgss_buckethash); - while ((objentry = hash_seq_search(&hash_seq)) != NULL) - { - hash_search(pgss_buckethash, &objentry->key, HASH_REMOVE, NULL); - } - hash_seq_init(&hash_seq, pgss_waiteventshash); while ((weentry = hash_seq_search(&hash_seq)) != NULL) { @@ -324,6 +312,9 @@ hash_create_query_entry(unsigned int queryid, bool IsHashInitialize(void) { - return (pgss || pgss_hash || pgss_object_hash || pgss_buckethash || pgss_waiteventshash); + return (pgss != NULL && + pgss_hash != NULL && + pgss_object_hash !=NULL && + pgss_waiteventshash != NULL); } diff --git a/pg_stat_monitor--1.0.sql b/pg_stat_monitor--1.0.sql index 3933460..cc00c72 100644 --- a/pg_stat_monitor--1.0.sql +++ b/pg_stat_monitor--1.0.sql @@ -22,6 +22,9 @@ CREATE FUNCTION pg_stat_monitor(IN showtext boolean, OUT queryid text, OUT query text, + OUT elevel int, + OUT sqlcode int, + OUT message text, OUT bucket_start_time timestamptz, OUT plans int8, @@ -103,6 +106,9 @@ CREATE VIEW pg_stat_monitor AS SELECT '0.0.0.0'::inet + client_ip AS client_ip, queryid, query, + elevel, + sqlcode, + message, plans, round( CAST(plan_total_time as numeric), 2)::float8 as plan_total_time, round( CAST(plan_min_time as numeric), 2)::float8 as plan_min_timei, @@ -134,6 +140,27 @@ CREATE VIEW pg_stat_monitor AS SELECT (string_to_array(tables_names, ',')) tables_names FROM pg_stat_monitor(TRUE); +CREATE FUNCTION decode_error_level(elevel int) +RETURNS text +AS +$$ +SELECT + CASE + WHEN elevel = 0 THEN '' + WHEN elevel = 10 THEN 'DEBUG5' + WHEN elevel = 11 THEN 'DEBUG4' + WHEN elevel = 12 THEN 'DEBUG3' + WHEN elevel = 13 THEN 'DEBUG2' + WHEN elevel = 14 THEN 'DEBUG1' + WHEN elevel = 15 THEN 'LOG' + WHEN elevel = 16 THEN 'LOG_SERVER_ONLY' + WHEN elevel = 17 THEN 'INFO' + WHEN elevel = 18 THEN 'NOTICE' + WHEN elevel = 19 THEN 'WARNING' + WHEN elevel = 20 THEN 'ERROR' + END +$$ +LANGUAGE SQL PARALLEL SAFE; -- Register a view on the function for ease of use. CREATE VIEW pg_stat_wait_events AS SELECT diff --git a/pg_stat_monitor.c b/pg_stat_monitor.c index 6769c19..7d29bd2 100644 --- a/pg_stat_monitor.c +++ b/pg_stat_monitor.c @@ -32,12 +32,16 @@ static int plan_nested_level = 0; static int exec_nested_level = 0; #endif +static bool system_init = false; static struct rusage rusage_start; static struct rusage rusage_end; static volatile sig_atomic_t sigterm = false; static void handle_sigterm(SIGNAL_ARGS); static unsigned char *pgss_qbuf[MAX_BUCKETS]; + +static bool IsSystemInitialized(void); + /* Saved hook values in case of unload */ static planner_hook_type planner_hook_next = NULL; static post_parse_analyze_hook_type prev_post_parse_analyze_hook = NULL; @@ -46,6 +50,8 @@ static ExecutorRun_hook_type prev_ExecutorRun = NULL; static ExecutorFinish_hook_type prev_ExecutorFinish = NULL; static ExecutorEnd_hook_type prev_ExecutorEnd = NULL; static ProcessUtility_hook_type prev_ProcessUtility = NULL; +static emit_log_hook_type prev_emit_log_hook = NULL; +void pgsm_emit_log_hook(ErrorData *edata); static shmem_startup_hook_type prev_shmem_startup_hook = NULL; PG_FUNCTION_INFO_V1(pg_stat_monitor_version); @@ -88,8 +94,13 @@ static void pgss_ProcessUtility(PlannedStmt *pstmt, const char *queryString, #endif static uint64 pgss_hash_string(const char *str, int len); -static void pgss_store(const char *query, uint64 queryId, - int query_location, int query_len, +static void pgss_store(uint64 queryId, + const char *query, + uint64 elevel, + uint64 sqlerrcode, + const char *message, + int query_location, + int query_len, pgssStoreKind kind, double total_time, uint64 rows, const BufferUsage *bufusage, @@ -178,6 +189,9 @@ _PG_init(void) ProcessUtility_hook = pgss_ProcessUtility; planner_hook_next = planner_hook; planner_hook = pgss_planner_hook; + emit_log_hook = pgsm_emit_log_hook; + + system_init = true; } /* @@ -213,7 +227,6 @@ pgss_shmem_startup(void) pgss_startup(); } - /* * Select the version of pg_stat_monitor. */ @@ -223,7 +236,7 @@ pg_stat_monitor_version(PG_FUNCTION_ARGS) PG_RETURN_TEXT_P(cstring_to_text(BUILD_VERSION)); } -#define PG_STAT_STATEMENTS_COLS 38 /* maximum of above */ +#define PG_STAT_STATEMENTS_COLS 40 /* maximum of above */ /* * Post-parse-analysis hook: mark query with a queryId @@ -238,7 +251,7 @@ pgss_post_parse_analyze(ParseState *pstate, Query *query) prev_post_parse_analyze_hook(pstate, query); /* Safety check... */ - if (!IsHashInitialize()) + if (!IsSystemInitialized()) return; /* @@ -299,8 +312,11 @@ pgss_post_parse_analyze(ParseState *pstate, Query *query) if (PGSM_ENABLED == 1) if (jstate.clocations_count > 0) - pgss_store(pstate->p_sourcetext, - query->queryId, + pgss_store(query->queryId, + pstate->p_sourcetext, + 0, /* error elevel */ + 0, /* error sqlcode */ + NULL, /* error message */ query->stmt_location, query->stmt_len, PGSS_INVALID, @@ -420,20 +436,23 @@ pgss_ExecutorEnd(QueryDesc *queryDesc) stime = TIMEVAL_DIFF(rusage_start.ru_stime, rusage_end.ru_stime); if (PGSM_ENABLED == 1) - pgss_store(queryDesc->sourceText, - queryId, - queryDesc->plannedstmt->stmt_location, - queryDesc->plannedstmt->stmt_len, - PGSS_EXEC, - queryDesc->totaltime->total * 1000.0, /* convert to msec */ - queryDesc->estate->es_processed, - &queryDesc->totaltime->bufusage, + pgss_store(queryId, + queryDesc->sourceText, + 0, /* error elevel */ + 0, /* error sqlcode */ + NULL, /* error message */ + queryDesc->plannedstmt->stmt_location, + queryDesc->plannedstmt->stmt_len, + PGSS_EXEC, + queryDesc->totaltime->total * 1000.0, /* convert to msec */ + queryDesc->estate->es_processed, + &queryDesc->totaltime->bufusage, #if PG_VERSION_NUM >= 130000 - &queryDesc->totaltime->walusage, + &queryDesc->totaltime->walusage, #endif - NULL, - utime, - stime); + NULL, + utime, + stime); } if (prev_ExecutorEnd) @@ -553,20 +572,23 @@ static void pgss_ProcessUtility(PlannedStmt *pstmt, const char *queryString, memset(&bufusage, 0, sizeof(BufferUsage)); BufferUsageAccumDiff(&bufusage, &pgBufferUsage, &bufusage_start); if (PGSM_ENABLED == 1) - pgss_store(queryString, - 0, /* signal that it's a utility stmt */ - pstmt->stmt_location, - pstmt->stmt_len, - PGSS_EXEC, - INSTR_TIME_GET_MILLISEC(duration), - rows, - &bufusage, + pgss_store(0, /* query id, passing 0 to signal that it's a utility stmt */ + queryString, /* query text */ + 0, /* error elevel */ + 0, /* error sqlcode */ + NULL, /* error message */ + pstmt->stmt_location, + pstmt->stmt_len, + PGSS_EXEC, + INSTR_TIME_GET_MILLISEC(duration), + rows, + &bufusage, #if PG_VERSION_NUM >= 130000 - &walusage, + &walusage, #endif - NULL, - 0, - 0); + NULL, + 0, + 0); } else { @@ -671,16 +693,23 @@ pg_get_client_addr(void) * we have no statistics as yet; we just want to record the normalized * query string. total_time, rows, bufusage are ignored in this case. */ -static void pgss_store(const char *query, uint64 queryId, - int query_location, int query_len, - pgssStoreKind kind, - double total_time, uint64 rows, - const BufferUsage *bufusage, +static void pgss_store(uint64 queryId, + const char *query, + uint64 elevel, + uint64 sqlcode, + const char *message, + int query_location, + int query_len, + pgssStoreKind kind, + double total_time, + uint64 rows, + const BufferUsage *bufusage, #if PG_VERSION_NUM >= 130000 - const WalUsage *walusage, + const WalUsage *walusage, #endif - pgssJumbleState *jstate, - float utime, float stime) + pgssJumbleState *jstate, + float utime, + float stime) { pgssHashKey key; int bucket_id; @@ -693,12 +722,13 @@ static void pgss_store(const char *query, uint64 queryId, int len; pgssSharedState *pgss = pgsm_get_ss(); HTAB *pgss_hash = pgsm_get_hash(); + int message_len = message ? strlen(message) : 0; Assert(query != NULL); Assert(PGSM_ENABLED); /* Safety check... */ - if (!IsHashInitialize() || !pgss_qbuf[pgss->current_wbucket]) + if (!IsSystemInitialized() || !pgss_qbuf[pgss->current_wbucket]) return; /* @@ -755,7 +785,10 @@ static void pgss_store(const char *query, uint64 queryId, /* Set up key for hashtable search */ key.bucket_id = bucket_id; - key.userid = GetUserId(); + if (elevel == 0) + key.userid = GetUserId(); + else + key.userid = 1; key.dbid = MyDatabaseId; key.queryid = queryId; key.ip = pg_get_client_addr(); @@ -858,6 +891,10 @@ static void pgss_store(const char *query, uint64 queryId, e->counters.resp_calls[MAX_RESPONSE_BUCKET - 1]++; } + e->counters.error.elevel = elevel; + e->counters.error.sqlcode = sqlcode; + for(i = 0; i < message_len; i++) + e->counters.error.message[i] = message[i]; e->counters.calls[kind].rows += rows; e->counters.blocks.shared_blks_hit += bufusage->shared_blks_hit; e->counters.blocks.shared_blks_read += bufusage->shared_blks_read; @@ -897,7 +934,7 @@ pg_stat_monitor_reset(PG_FUNCTION_ARGS) { pgssSharedState *pgss = pgsm_get_ss(); /* Safety check... */ - if (!IsHashInitialize()) + if (!IsSystemInitialized()) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("pg_stat_monitor: must be loaded via shared_preload_libraries"))); @@ -907,7 +944,6 @@ pg_stat_monitor_reset(PG_FUNCTION_ARGS) PG_RETURN_VOID(); } -#define PG_STAT_STATEMENTS_COLS 38 /* maximum of above */ Datum pg_stat_wait_events(PG_FUNCTION_ARGS) @@ -925,7 +961,7 @@ pg_stat_wait_events(PG_FUNCTION_ARGS) HTAB *pgss_waiteventshash = pgsm_get_wait_event_hash(); /* Safety check... */ - if (!IsHashInitialize()) + if (!IsSystemInitialized()) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("pg_stat_monitor: must be loaded via shared_preload_libraries"))); @@ -1039,7 +1075,7 @@ pg_stat_monitor_internal(FunctionCallInfo fcinfo, is_allowed_role = is_member_of_role(GetUserId(), DEFAULT_ROLE_READ_ALL_STATS); /* Safety check... */ - if (!IsHashInitialize()) + if (!IsSystemInitialized()) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("pg_stat_monitor: must be loaded via shared_preload_libraries"))); @@ -1087,14 +1123,18 @@ pg_stat_monitor_internal(FunctionCallInfo fcinfo, memset(nulls, 0, sizeof(nulls)); if(locate_query(entry->key.bucket_id, queryid, query_txt) == 0) - sprintf(query_txt, "%s", ""); + query_txt = NULL; - sprintf(queryid_txt, "%08lX", queryid); + if (query_txt) + sprintf(queryid_txt, "%08lX", queryid); + else + sprintf(queryid_txt, "%08lX", (long unsigned int)0); values[i++] = ObjectIdGetDatum(entry->key.bucket_id); values[i++] = ObjectIdGetDatum(entry->key.userid); values[i++] = ObjectIdGetDatum(entry->key.dbid); values[i++] = Int64GetDatumFast(entry->key.ip); + /* copy counters to a local variable to keep locking time short */ { volatile pgssEntry *e = (volatile pgssEntry *) entry; @@ -1108,10 +1148,17 @@ pg_stat_monitor_internal(FunctionCallInfo fcinfo, if (showtext) { char *enc; - enc = pg_any_to_server(query_txt, strlen(query_txt), entry->encoding); - values[i++] = CStringGetTextDatum(enc); - if (enc != query_txt) - pfree(enc); + if (query_txt) + { + enc = pg_any_to_server(query_txt, strlen(query_txt), entry->encoding); + values[i++] = CStringGetTextDatum(enc); + if (enc != query_txt) + pfree(enc); + } + else + { + nulls[i++] = true; + } } else { @@ -1130,6 +1177,12 @@ pg_stat_monitor_internal(FunctionCallInfo fcinfo, else nulls[i++] = true; } + values[i++] = Int64GetDatumFast(tmp.error.elevel); + values[i++] = Int64GetDatumFast(tmp.error.sqlcode); + if (strlen(tmp.error.message) == 0) + nulls[i++] = true; + else + values[i++] = CStringGetTextDatum(tmp.error.message); values[i++] = TimestampGetDatum(pgss->bucket_start_time[entry->key.bucket_id]); @@ -2282,18 +2335,21 @@ static PlannedStmt *pgss_planner_hook(Query *parse, int opt, ParamListInfo param memset(&walusage, 0, sizeof(WalUsage)); WalUsageAccumDiff(&walusage, &pgWalUsage, &walusage_start); if (PGSM_ENABLED == 1) - pgss_store(query_string, - parse->queryId, - parse->stmt_location, - parse->stmt_len, - PGSS_PLAN, - INSTR_TIME_GET_MILLISEC(duration), - 0, - &bufusage, - &walusage, - NULL, - 0, - 0); + pgss_store(query_string, /* query text */ + parse->queryId, /* query id */ + 0, /* error elevel */ + 0, /* error sqlcode */ + NULL, /* error message */ + parse->stmt_location, + parse->stmt_len, + PGSS_PLAN, + INSTR_TIME_GET_MILLISEC(duration), + 0, + &bufusage, + &walusage, + NULL, + 0, + 0); } else { @@ -2458,3 +2514,46 @@ set_qbuf(int i, unsigned char *buf) { pgss_qbuf[i] = buf; } + +void +pgsm_emit_log_hook(ErrorData *edata) +{ + BufferUsage bufusage; +#if PG_VERSION_NUM >= 130000 + WalUsage walusage; +#endif + if (PGSM_ENABLED == 1 && IsSystemInitialized()) + { + uint64 queryid = 0; + + if (debug_query_string) + queryid = DatumGetUInt64(hash_any_extended((const unsigned char *)debug_query_string, strlen(debug_query_string), 0)); + + pgss_store(queryid, + debug_query_string ? debug_query_string : "", + edata->elevel, + edata->sqlerrcode, + edata->message, + 0, + debug_query_string ? strlen(debug_query_string) : 0, + PGSS_EXEC, + 0, + 0, + &bufusage, +#if PG_VERSION_NUM >= 130000 + &walusage, +#endif + NULL, + 0, + 0); + } + if (prev_emit_log_hook) + prev_emit_log_hook(edata); +} + +bool +IsSystemInitialized(void) +{ + return (system_init && IsHashInitialize()); +} + diff --git a/pg_stat_monitor.h b/pg_stat_monitor.h index 39da015..1b13754 100644 --- a/pg_stat_monitor.h +++ b/pg_stat_monitor.h @@ -60,6 +60,7 @@ #define MAX_BUCKETS 10 #define MAX_OBJECT_CACHE 100 #define TEXT_LEN 255 +#define ERROR_MESSAGE_LEN 100 typedef struct GucVariables { @@ -144,9 +145,17 @@ typedef struct QueryInfo Oid userid; /* user OID */ Oid dbid; /* database OID */ uint host; /* client IP */ + int64 type; /* type of query, options are query, info, warning, error, fatal */ char tables_name[MAX_REL_LEN]; /* table names involved in the query */ } QueryInfo; +typedef struct ErrorInfo +{ + unsigned char elevel; /* error elevel */ + unsigned char sqlcode; /* error sqlcode */ + char message[ERROR_MESSAGE_LEN]; /* error message text */ +} ErrorInfo; + typedef struct Calls { int64 calls; /* # of times executed */ @@ -196,6 +205,7 @@ typedef struct Counters CallTime time[PGSS_NUMKIND]; Blocks blocks; SysInfo sysinfo; + ErrorInfo error; int plans; int resp_calls[MAX_RESPONSE_BUCKET]; /* execution time's in msec */ } Counters; @@ -213,12 +223,6 @@ typedef struct pgssEntry slock_t mutex; /* protects the counters only */ } pgssEntry; -typedef struct QueryFifo -{ - int head; - int tail; -} QueryFifo; - /* * Global shared state */ @@ -233,7 +237,6 @@ typedef struct pgssSharedState uint64 prev_bucket_usec; uint64 bucket_overflow[MAX_BUCKETS]; uint64 bucket_entry[MAX_BUCKETS]; - QueryFifo query_fifo[MAX_BUCKETS]; int query_buf_size_bucket; Timestamp bucket_start_time[MAX_BUCKETS]; /* start time of the bucket */ } pgssSharedState; @@ -247,7 +250,6 @@ do { \ x->prev_bucket_usec = 0; \ memset(&x->bucket_overflow, 0, MAX_BUCKETS * sizeof(uint64)); \ memset(&x->bucket_entry, 0, MAX_BUCKETS * sizeof(uint64)); \ - memset(&x->query_fifo, 0, MAX_BUCKETS * sizeof(uint64)); \ } while(0) @@ -314,6 +316,8 @@ pgssEntry* hash_create_query_entry(unsigned int queryid, unsigned int userid, un void pgss_startup(void); void set_qbuf(int i, unsigned char *); +/* hash_query.c */ +void pgss_startup(void); /*---- GUC variables ----*/ #define PGSM_MAX get_conf(0)->guc_variable #define PGSM_QUERY_MAX_LEN get_conf(1)->guc_variable