From 971e57fd9352e84825c8815887e520ac8a3bef65 Mon Sep 17 00:00:00 2001 From: Ibrar Ahmed Date: Tue, 24 Nov 2020 18:02:25 +0000 Subject: [PATCH] Issue - (#62): Logging CMD Type like SELECT, UPDATE, INSERT and DELETE. PG-150 --- pg_stat_monitor.c | 92 ++++++++++++++++++++++++++++++++++------------- pg_stat_monitor.h | 8 +++-- 2 files changed, 73 insertions(+), 27 deletions(-) diff --git a/pg_stat_monitor.c b/pg_stat_monitor.c index 240cba1..34c3c8c 100644 --- a/pg_stat_monitor.c +++ b/pg_stat_monitor.c @@ -62,7 +62,8 @@ PG_FUNCTION_INFO_V1(pg_stat_monitor); PG_FUNCTION_INFO_V1(pg_stat_monitor_settings); static uint pg_get_client_addr(void); -static Datum array_get_datum(int32 arr[], int len); +static Datum textarray_get_datum(char arr[][CMD_LEN], int len); +static Datum intarray_get_datum(int32 arr[], int len); #if PG_VERSION_NUM >= 130000 static PlannedStmt * pgss_planner_hook(Query *parse, const char *query_string, int cursorOptions, ParamListInfo boundParams); @@ -184,7 +185,7 @@ _PG_init(void) planner_hook = pgss_planner_hook; emit_log_hook = pgsm_emit_log_hook; prev_ExecutorCheckPerms_hook = ExecutorCheckPerms_hook; - ExecutorCheckPerms_hook = pgss_ExecutorCheckPerms; + ExecutorCheckPerms_hook = pgss_ExecutorCheckPerms; system_init = true; } @@ -331,6 +332,7 @@ pgss_post_parse_analyze(ParseState *pstate, Query *query) static void pgss_ExecutorStart(QueryDesc *queryDesc, int eflags) { + getrusage(RUSAGE_SELF, &rusage_start); if (prev_ExecutorStart) @@ -459,30 +461,26 @@ pgss_ExecutorEnd(QueryDesc *queryDesc) static bool pgss_ExecutorCheckPerms(List *rt, bool abort) { - ListCell *lr; + ListCell *lr; pgssSharedState *pgss = pgsm_get_ss(); - int i; LWLockAcquire(pgss->lock, LW_EXCLUSIVE); - for (i = 0; i < CMD_LST; i++) - pgss->cmdTag[i] = 0; + memset(pgss->cmdTag, 0x0, sizeof(pgss->cmdTag)); foreach(lr, rt) { RangeTblEntry *rte = lfirst(lr); if (rte->rtekind != RTE_RELATION) continue; - - if (rte->requiredPerms & ACL_INSERT) pgss->cmdTag[0] = true; - else if (rte->requiredPerms & ACL_UPDATE) pgss->cmdTag[1] = true; - else if (rte->requiredPerms & ACL_DELETE) pgss->cmdTag[2] = true; - else if (rte->requiredPerms & ACL_SELECT) pgss->cmdTag[3] = true; - else if (rte->requiredPerms & ACL_TRUNCATE) pgss->cmdTag[4] = true; - else if (rte->requiredPerms & ACL_REFERENCES) pgss->cmdTag[5] = true; - else if (rte->requiredPerms & ACL_TRIGGER) pgss->cmdTag[6] = true; - else if (rte->requiredPerms & ACL_EXECUTE) pgss->cmdTag[7] = true; - else if (rte->requiredPerms & ACL_CREATE) pgss->cmdTag[8] = true; - else pgss->cmdTag[9] = true; + if (rte->requiredPerms & ACL_INSERT) snprintf(pgss->cmdTag[0],CMD_LEN,"%s", "INSERT"); + if (rte->requiredPerms & ACL_UPDATE) snprintf(pgss->cmdTag[1],CMD_LEN,"%s", "UPDATE"); + if (rte->requiredPerms & ACL_DELETE) snprintf(pgss->cmdTag[2],CMD_LEN,"%s", "DELETE"); + if (rte->requiredPerms & ACL_SELECT) snprintf(pgss->cmdTag[3],CMD_LEN,"%s", "SELECT"); + if (rte->requiredPerms & ACL_TRUNCATE) snprintf(pgss->cmdTag[4],CMD_LEN,"%s", "TRUNCATE"); + if (rte->requiredPerms & ACL_REFERENCES) snprintf(pgss->cmdTag[5],CMD_LEN,"%s", "REFERENCES"); + if (rte->requiredPerms & ACL_TRIGGER) snprintf(pgss->cmdTag[6],CMD_LEN,"%s", "TRIGGER"); + if (rte->requiredPerms & ACL_EXECUTE) snprintf(pgss->cmdTag[7],CMD_LEN,"%s", "EXECUTE"); + if (rte->requiredPerms & ACL_CREATE) snprintf(pgss->cmdTag[8],CMD_LEN,"%s", "CREATE"); } LWLockRelease(pgss->lock); @@ -748,12 +746,14 @@ static void pgss_store(uint64 queryId, char *norm_query = NULL; int encoding = GetDatabaseEncoding(); bool reset = false; - int i; + bool found = false; + int i,j; char tables_name[MAX_REL_LEN] = {0}; int len; pgssSharedState *pgss = pgsm_get_ss(); HTAB *pgss_hash = pgsm_get_hash(); int message_len = message ? strlen(message) : 0; + int cmd_len[CMD_LST]; Assert(query != NULL); Assert(PGSM_ENABLED); @@ -803,6 +803,9 @@ static void pgss_store(uint64 queryId, hash_dealloc_object_entry(queryId, tables_name); len = strlen(tables_name); + for (i = 0; i < CMD_LST; i++) + cmd_len[i] = strlen(pgss->cmdTag[i]); + bucket_id = get_next_wbucket(pgss); if (bucket_id != pgss->current_wbucket) @@ -921,9 +924,24 @@ static void pgss_store(uint64 queryId, if (total_time > PGSM_RESPOSE_TIME_LOWER_BOUND + (PGSM_RESPOSE_TIME_STEP * MAX_RESPONSE_BUCKET)) e->counters.resp_calls[MAX_RESPONSE_BUCKET - 1]++; } - for (i = 0; i < CMD_LST; i++) - e->counters.info.cmd_type[i] = pgss->cmdTag[i]; + /* This is bit ugly hack to check we already updated the counter or not */ + for (i = 0; i < CMD_LST; i++) + if (e->counters.info.cmd_type[i][0] != 0) + found = true; + + /* Don't update the counter, if already updated */ + if (!found) + { + for (i = 0; i < CMD_LST; i++) + { + for (j = 0; j < CMD_LEN; j++) + { + e->counters.info.cmd_type[i][j] = (cmd_len[i] <= CMD_LEN ? pgss->cmdTag[i][j] : 0); + pgss->cmdTag[i][j] = 0; + } + } + } e->counters.error.elevel = elevel; e->counters.error.sqlcode = sqlcode; for(i = 0; i < message_len; i++) @@ -1112,7 +1130,7 @@ pg_stat_monitor_internal(FunctionCallInfo fcinfo, else nulls[i++] = true; } - values[i++] = ArrayGetTextDatum(tmp.info.cmd_type, CMD_LST); + values[i++] = TextArrayGetTextDatum(tmp.info.cmd_type, CMD_LST); values[i++] = Int64GetDatumFast(tmp.error.elevel); values[i++] = Int64GetDatumFast(tmp.error.sqlcode); if (strlen(tmp.error.message) == 0) @@ -1148,7 +1166,7 @@ pg_stat_monitor_internal(FunctionCallInfo fcinfo, values[i++] = Int64GetDatumFast(tmp.blocks.temp_blks_written); values[i++] = Float8GetDatumFast(tmp.blocks.blk_read_time); values[i++] = Float8GetDatumFast(tmp.blocks.blk_write_time); - values[i++] = ArrayGetTextDatum(tmp.resp_calls, 10); + values[i++] = IntArrayGetTextDatum(tmp.resp_calls, 10); values[i++] = Float8GetDatumFast(tmp.sysinfo.utime); values[i++] = Float8GetDatumFast(tmp.sysinfo.stime); if (strlen(tmp.info.tables_name) == 0) @@ -2099,17 +2117,43 @@ comp_location(const void *a, const void *b) else return 0; } +/* Convert array into Text dataum */ +static Datum +textarray_get_datum(char arr[][CMD_LEN], int len) +{ + int j; + char str[1024]; + bool first = true; + + memset(str, 0, sizeof(str)); + + /* Need to calculate the actual size, and avoid unnessary memory usage */ + for (j = 0; j < len; j++) + { + if (strlen(arr[j]) <= 0) + continue; + if (first) + { + snprintf(str, CMD_LEN, "%s", arr[j]); + first = false; + continue; + } + snprintf(str, CMD_LEN, "%s,%s", str, arr[j]); + } + return CStringGetTextDatum(str); + +} /* Convert array into Text dataum */ static Datum -array_get_datum(int32 arr[], int len) +intarray_get_datum(int32 arr[], int len) { int j; char str[1024]; char tmp[10]; bool first = true; - memset(str, 0, 1023); + memset(str, 0, sizeof(str)); /* Need to calculate the actual size, and avoid unnessary memory usage */ for (j = 0; j < len; j++) diff --git a/pg_stat_monitor.h b/pg_stat_monitor.h index 4720a88..7735af7 100644 --- a/pg_stat_monitor.h +++ b/pg_stat_monitor.h @@ -42,7 +42,8 @@ #define TIMEVAL_DIFF(start, end) (((double) end.tv_sec + (double) end.tv_usec / 1000000.0) \ - ((double) start.tv_sec + (double) start.tv_usec / 1000000.0)) * 1000 -#define ArrayGetTextDatum(x,y) array_get_datum(x,y) +#define TextArrayGetTextDatum(x,y) textarray_get_datum(x,y) +#define IntArrayGetTextDatum(x,y) intarray_get_datum(x,y) /* XXX: Should USAGE_EXEC reflect execution time and/or buffer usage? */ #define USAGE_EXEC(duration) (1.0) @@ -62,6 +63,7 @@ #define TEXT_LEN 255 #define ERROR_MESSAGE_LEN 100 #define CMD_LST 10 +#define CMD_LEN 20 typedef struct GucVariables { @@ -131,7 +133,7 @@ typedef struct QueryInfo Oid dbid; /* database OID */ uint host; /* client IP */ int64 type; /* type of query, options are query, info, warning, error, fatal */ - int32 cmd_type[CMD_LST]; /* query command type SELECT/UPDATE/DELETE/INSERT */ + char cmd_type[CMD_LST][CMD_LEN]; /* query command type SELECT/UPDATE/DELETE/INSERT */ char tables_name[MAX_REL_LEN]; /* table names involved in the query */ } QueryInfo; @@ -224,7 +226,7 @@ typedef struct pgssSharedState uint64 bucket_overflow[MAX_BUCKETS]; uint64 bucket_entry[MAX_BUCKETS]; int query_buf_size_bucket; - int32 cmdTag[CMD_LST]; + char cmdTag[CMD_LST][20]; Timestamp bucket_start_time[MAX_BUCKETS]; /* start time of the bucket */ } pgssSharedState;