From bd8c54476f61048599b27c88c52a32e39f880dfa Mon Sep 17 00:00:00 2001 From: Ibrar Ahmed Date: Wed, 10 Feb 2021 14:55:40 +0000 Subject: [PATCH] PG-150: Logging CMD Type like SELECT, UPDATE, INSERT, DELETE. --- hash_query.c | 2 +- pg_stat_monitor--1.0.sql | 26 ++++++++++++----- pg_stat_monitor.c | 60 +++++++++++++++------------------------- pg_stat_monitor.h | 7 +++-- 4 files changed, 46 insertions(+), 49 deletions(-) diff --git a/hash_query.c b/hash_query.c index a3a3bb6..c204c4b 100644 --- a/hash_query.c +++ b/hash_query.c @@ -132,7 +132,7 @@ hash_memsize(void) Size size; size = MAXALIGN(sizeof(pgssSharedState)); - size = MAXALIGN(MAX_QUERY_BUF); + size += MAXALIGN(MAX_QUERY_BUF); size = add_size(size, hash_estimate_size(MAX_BUCKET_ENTRIES, sizeof(pgssEntry))); size = add_size(size, hash_estimate_size(500000, sizeof(pgssQueryEntry))); diff --git a/pg_stat_monitor--1.0.sql b/pg_stat_monitor--1.0.sql index 5b663db..f5ed116 100644 --- a/pg_stat_monitor--1.0.sql +++ b/pg_stat_monitor--1.0.sql @@ -34,7 +34,7 @@ CREATE FUNCTION pg_stat_monitor(IN showtext boolean, OUT query text, OUT application_name text, OUT relations text, - OUT cmd_type text, + OUT cmd_type int, OUT elevel int, OUT sqlcode TEXT, OUT message text, @@ -79,6 +79,21 @@ RETURNS SETOF record AS 'MODULE_PATHNAME', 'pg_stat_monitor' LANGUAGE C STRICT VOLATILE PARALLEL SAFE; +CREATE or REPLACE FUNCTION get_cmd_type (cmd_type INTEGER) RETURNS TEXT AS +$$ +SELECT + CASE + WHEN cmd_type = 0 THEN '' + WHEN cmd_type = 1 THEN 'SELECT' + WHEN cmd_type = 2 THEN 'UPDATE' + WHEN cmd_type = 3 THEN 'INSERT' + WHEN cmd_type = 4 THEN 'DELETE' + WHEN cmd_type = 5 THEN 'UTILITY' + WHEN cmd_type = 6 THEN 'NOTHING' + END +$$ +LANGUAGE SQL PARALLEL SAFE; + CREATE FUNCTION pg_stat_monitor_settings( OUT name text, OUT value INTEGER, @@ -113,11 +128,8 @@ CREATE VIEW pg_stat_monitor AS SELECT query, application_name, (string_to_array(relations, ','))::oid[]::regclass[] AS relations, - CASE - WHEN query like 'BEGIN' THEN '' - WHEN query like 'END' THEN '' - ELSE (string_to_array(cmd_type, ','))[1] - END AS cmd_type, + cmd_type, + get_cmd_type(cmd_type) AS cmd_type_text, elevel, sqlcode, message, @@ -152,7 +164,7 @@ CREATE VIEW pg_stat_monitor AS SELECT wal_records, wal_fpi, wal_bytes -FROM pg_stat_monitor(TRUE), pg_database WHERE dbid = oid +FROM pg_stat_monitor(TRUE) p, pg_database d WHERE dbid = oid ORDER BY bucket_start_time; CREATE FUNCTION decode_error_level(elevel int) diff --git a/pg_stat_monitor.c b/pg_stat_monitor.c index 0e2e2ce..947013d 100644 --- a/pg_stat_monitor.c +++ b/pg_stat_monitor.c @@ -83,7 +83,7 @@ PG_FUNCTION_INFO_V1(get_histogram_timings); static uint pg_get_client_addr(void); static int pg_get_application_name(char* application_name); static PgBackendStatus *pg_get_backend_status(void); -static Datum textarray_get_datum(char arr[][1024], int len, int str_len); +static Datum textarray_get_datum(char **arr, int arr_len, int str_len); static Datum intarray_get_datum(int32 arr[], int len); #if PG_VERSION_NUM >= 130000 @@ -120,6 +120,7 @@ char *unpack_sql_state(int sql_state); static void pgss_store(uint64 queryId, const char *query, + CmdType cmd_type, uint64 elevel, char *sqlerrcode, const char *message, @@ -306,6 +307,7 @@ pgss_post_parse_analyze(ParseState *pstate, Query *query) if (jstate.clocations_count > 0) pgss_store(query->queryId, pstate->p_sourcetext, + query->commandType, 0, /* error elevel */ "", /* error sqlcode */ NULL, /* error message */ @@ -433,6 +435,7 @@ pgss_ExecutorEnd(QueryDesc *queryDesc) if (PGSM_ENABLED == 1) pgss_store(queryId, queryDesc->sourceText, + queryDesc->operation, 0, /* error elevel */ "", /* error sqlcode */ NULL, /* error message */ @@ -455,7 +458,6 @@ pgss_ExecutorEnd(QueryDesc *queryDesc) else standard_ExecutorEnd(queryDesc); memset(pgss->relations, 0x0, sizeof(pgss->relations)); - memset(pgss->cmdTag, 0x0, sizeof(pgss->cmdTag)); } static bool @@ -467,7 +469,6 @@ pgss_ExecutorCheckPerms(List *rt, bool abort) int j = 0; LWLockAcquire(pgss->lock, LW_EXCLUSIVE); - memset(pgss->cmdTag, 0x0, sizeof(pgss->cmdTag)); memset(pgss->relations, 0x0, sizeof(pgss->relations)); foreach(lr, rt) @@ -487,15 +488,6 @@ pgss_ExecutorCheckPerms(List *rt, bool abort) if (!found) pgss->relations[i++] = rte->relid; } - if (rte->requiredPerms & ACL_INSERT) snprintf(pgss->cmdTag[0],CMD_LEN,"%s", "INSERT"); - else if (rte->requiredPerms & ACL_UPDATE) snprintf(pgss->cmdTag[1],CMD_LEN,"%s", "UPDATE"); - else if (rte->requiredPerms & ACL_DELETE) snprintf(pgss->cmdTag[2],CMD_LEN,"%s", "DELETE"); - else if (rte->requiredPerms & ACL_SELECT) snprintf(pgss->cmdTag[3],CMD_LEN,"%s", "SELECT"); - else if (rte->requiredPerms & ACL_TRUNCATE) snprintf(pgss->cmdTag[4],CMD_LEN,"%s", "TRUNCATE"); - else if (rte->requiredPerms & ACL_REFERENCES) snprintf(pgss->cmdTag[5],CMD_LEN,"%s", "REFERENCES"); - else if (rte->requiredPerms & ACL_TRIGGER) snprintf(pgss->cmdTag[6],CMD_LEN,"%s", "TRIGGER"); - else if (rte->requiredPerms & ACL_EXECUTE) snprintf(pgss->cmdTag[7],CMD_LEN,"%s", "EXECUTE"); - else if (rte->requiredPerms & ACL_CREATE) snprintf(pgss->cmdTag[8],CMD_LEN,"%s", "CREATE"); } LWLockRelease(pgss->lock); @@ -620,6 +612,7 @@ static void pgss_ProcessUtility(PlannedStmt *pstmt, const char *queryString, if (PGSM_ENABLED == 1) pgss_store(0, /* query id, passing 0 to signal that it's a utility stmt */ queryString, /* query text */ + 0, 0, /* error elevel */ "", /* error sqlcode */ NULL, /* error message */ @@ -766,6 +759,7 @@ pg_get_client_addr(void) */ static void pgss_store(uint64 queryId, const char *query, + CmdType cmd_type, uint64 elevel, char *sqlcode, const char *message, @@ -793,7 +787,6 @@ static void pgss_store(uint64 queryId, pgssSharedState *pgss = pgsm_get_ss(); HTAB *pgss_hash = pgsm_get_hash(); int message_len = message ? strlen(message) : 0; - int cmd_len[CMD_LST]; char application_name[APPLICATIONNAME_LEN]; int application_name_len; int sqlcode_len = strlen(sqlcode); @@ -846,9 +839,6 @@ static void pgss_store(uint64 queryId, if (queryId == UINT64CONST(0)) queryId = pgss_hash_string(query, query_len); - for (i = 0; i < CMD_LST; i++) - cmd_len[i] = strlen(pgss->cmdTag[i]); - bucket_id = get_next_wbucket(pgss); if (bucket_id != pgss->current_wbucket) @@ -862,7 +852,7 @@ static void pgss_store(uint64 queryId, /* Set up key for hashtable search */ key.bucket_id = bucket_id; - key.userid = GetUserId(); + key.userid = (elevel == 0) ? GetUserId() : 0; key.dbid = MyDatabaseId; key.queryid = queryId; key.ip = pg_get_client_addr(); @@ -967,18 +957,7 @@ static void pgss_store(uint64 queryId, if (!found) _snprintf(e->counters.info.relations, pgss->relations, REL_LST, REL_LST); - found = false; - /* This is bit ugly hack to check we already updated the counter or not */ - for (i = 0; i < CMD_LST; i++) - if (e->counters.info.cmd_type[i][0] != 0) - found = true; - - /* Don't update the counter, if already updated */ - if (!found) - { - for (i = 0; i < CMD_LST; i++) - _snprintf(e->counters.info.cmd_type[i], pgss->cmdTag[i], cmd_len[i], CMD_LEN); - } + e->counters.info.cmd_type = cmd_type; e->counters.error.elevel = elevel; _snprintf(e->counters.error.sqlcode, sqlcode, sqlcode_len, SQLCODE_LEN); @@ -1220,7 +1199,7 @@ pg_stat_monitor_internal(FunctionCallInfo fcinfo, else values[i++] = IntArrayGetTextDatum(tmp.info.relations, len); - values[i++] = TextArrayGetTextDatum((char (*)[1024])tmp.info.cmd_type, CMD_LST, CMD_LEN); + values[i++] = Int64GetDatumFast(tmp.info.cmd_type); values[i++] = Int64GetDatumFast(tmp.error.elevel); if (strlen(tmp.error.sqlcode) <= 0) values[i++] = CStringGetTextDatum("0"); @@ -2247,30 +2226,33 @@ comp_location(const void *a, const void *b) return 0; } +#define MAX_STRING_LEN 1024 /* Convert array into Text dataum */ static Datum -textarray_get_datum(char arr[][1024], int len, int str_len) +textarray_get_datum(char **arr, int arr_len, int str_len) { int j; - char str[1024]; + char *text_str = palloc0(MAX_STRING_LEN); bool first = true; - memset(str, 0, sizeof(str)); + /* Sanity check */ + if (arr == NULL || str_len >= MAX_STRING_LEN) + return 0; /* Need to calculate the actual size, and avoid unnessary memory usage */ - for (j = 0; j < len; j++) + for (j = 0; j < arr_len; j++) { if (arr[j] == NULL || strlen(arr[j]) <= 0) continue; if (first) { - snprintf(str, str_len, "%s", arr[j]); + snprintf(text_str, MAX_STRING_LEN, "%s", arr[j]); first = false; continue; } - snprintf(str, str_len, "%s,%s", str, arr[j]); + snprintf(text_str, MAX_STRING_LEN, "%s,%s", text_str, arr[j]); } - return CStringGetTextDatum(str); + return CStringGetTextDatum(text_str); } @@ -2442,6 +2424,7 @@ static PlannedStmt *pgss_planner_hook(Query *parse, int opt, ParamListInfo param if (PGSM_ENABLED == 1) pgss_store(parse->queryId, /* query id */ query_string, /* query text */ + parse->commandType, 0, /* error elevel */ "", /* error sqlcode */ NULL, /* error message */ @@ -2573,6 +2556,7 @@ pgsm_emit_log_hook(ErrorData *edata) pgss_store(queryid, debug_query_string ? debug_query_string : "", + 0, edata->elevel, unpack_sql_state(edata->sqlerrcode), edata->message, @@ -2752,6 +2736,6 @@ get_histogram_timings(PG_FUNCTION_ARGS) int64 b_end = exp(bucket_size * index); sprintf(range[index-1], "(%ld - %ld)}", b_start, b_end); } - return TextArrayGetTextDatum(range, b_count, 1024); + return TextArrayGetTextDatum((char**)range, b_count, 1023); } diff --git a/pg_stat_monitor.h b/pg_stat_monitor.h index cfbf714..384a578 100644 --- a/pg_stat_monitor.h +++ b/pg_stat_monitor.h @@ -80,6 +80,8 @@ #define APPLICATIONNAME_LEN 100 #define PGSM_OVER_FLOW_MAX 10 +/* the assumption of query max nested level */ +#define DEFAULT_MAX_NESTED_LEVEL 10 #define MAX_QUERY_BUF (PGSM_QUERY_BUF_SIZE * 1024 * 1024) #define MAX_BUCKETS_MEM (PGSM_MAX * 1024 * 1024) @@ -160,8 +162,8 @@ typedef struct QueryInfo uint host; /* client IP */ int64 type; /* type of query, options are query, info, warning, error, fatal */ char application_name[APPLICATIONNAME_LEN]; - int32 relations[REL_LST]; - char cmd_type[CMD_LST][CMD_LEN]; /* query command type SELECT/UPDATE/DELETE/INSERT */ + int32 relations[REL_LST]; /* List of relation involved in the query */ + CmdType cmd_type; /* query command type SELECT/UPDATE/DELETE/INSERT */ } QueryInfo; typedef struct ErrorInfo @@ -260,7 +262,6 @@ typedef struct pgssSharedState uint64 bucket_entry[MAX_BUCKETS]; int64 query_buf_size_bucket; int32 relations[REL_LST]; - char cmdTag[CMD_LST][CMD_LEN]; char bucket_start_time[MAX_BUCKETS][60]; /* start time of the bucket */ } pgssSharedState;