mirror of
https://github.com/percona/pg_stat_monitor.git
synced 2026-02-04 05:56:21 +00:00
@@ -32,12 +32,16 @@ static int plan_nested_level = 0;
|
||||
static int exec_nested_level = 0;
|
||||
#endif
|
||||
|
||||
static bool system_init = false;
|
||||
static struct rusage rusage_start;
|
||||
static struct rusage rusage_end;
|
||||
static volatile sig_atomic_t sigterm = false;
|
||||
static void handle_sigterm(SIGNAL_ARGS);
|
||||
static unsigned char *pgss_qbuf[MAX_BUCKETS];
|
||||
|
||||
|
||||
static bool IsSystemInitialized(void);
|
||||
|
||||
/* Saved hook values in case of unload */
|
||||
static planner_hook_type planner_hook_next = NULL;
|
||||
static post_parse_analyze_hook_type prev_post_parse_analyze_hook = NULL;
|
||||
@@ -46,6 +50,8 @@ static ExecutorRun_hook_type prev_ExecutorRun = NULL;
|
||||
static ExecutorFinish_hook_type prev_ExecutorFinish = NULL;
|
||||
static ExecutorEnd_hook_type prev_ExecutorEnd = NULL;
|
||||
static ProcessUtility_hook_type prev_ProcessUtility = NULL;
|
||||
static emit_log_hook_type prev_emit_log_hook = NULL;
|
||||
void pgsm_emit_log_hook(ErrorData *edata);
|
||||
static shmem_startup_hook_type prev_shmem_startup_hook = NULL;
|
||||
|
||||
PG_FUNCTION_INFO_V1(pg_stat_monitor_version);
|
||||
@@ -88,8 +94,13 @@ static void pgss_ProcessUtility(PlannedStmt *pstmt, const char *queryString,
|
||||
#endif
|
||||
|
||||
static uint64 pgss_hash_string(const char *str, int len);
|
||||
static void pgss_store(const char *query, uint64 queryId,
|
||||
int query_location, int query_len,
|
||||
static void pgss_store(uint64 queryId,
|
||||
const char *query,
|
||||
uint64 elevel,
|
||||
uint64 sqlerrcode,
|
||||
const char *message,
|
||||
int query_location,
|
||||
int query_len,
|
||||
pgssStoreKind kind,
|
||||
double total_time, uint64 rows,
|
||||
const BufferUsage *bufusage,
|
||||
@@ -178,6 +189,9 @@ _PG_init(void)
|
||||
ProcessUtility_hook = pgss_ProcessUtility;
|
||||
planner_hook_next = planner_hook;
|
||||
planner_hook = pgss_planner_hook;
|
||||
emit_log_hook = pgsm_emit_log_hook;
|
||||
|
||||
system_init = true;
|
||||
}
|
||||
|
||||
/*
|
||||
@@ -213,7 +227,6 @@ pgss_shmem_startup(void)
|
||||
pgss_startup();
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* Select the version of pg_stat_monitor.
|
||||
*/
|
||||
@@ -223,7 +236,7 @@ pg_stat_monitor_version(PG_FUNCTION_ARGS)
|
||||
PG_RETURN_TEXT_P(cstring_to_text(BUILD_VERSION));
|
||||
}
|
||||
|
||||
#define PG_STAT_STATEMENTS_COLS 38 /* maximum of above */
|
||||
#define PG_STAT_STATEMENTS_COLS 40 /* maximum of above */
|
||||
|
||||
/*
|
||||
* Post-parse-analysis hook: mark query with a queryId
|
||||
@@ -238,7 +251,7 @@ pgss_post_parse_analyze(ParseState *pstate, Query *query)
|
||||
prev_post_parse_analyze_hook(pstate, query);
|
||||
|
||||
/* Safety check... */
|
||||
if (!IsHashInitialize())
|
||||
if (!IsSystemInitialized())
|
||||
return;
|
||||
|
||||
/*
|
||||
@@ -299,8 +312,11 @@ pgss_post_parse_analyze(ParseState *pstate, Query *query)
|
||||
|
||||
if (PGSM_ENABLED == 1)
|
||||
if (jstate.clocations_count > 0)
|
||||
pgss_store(pstate->p_sourcetext,
|
||||
query->queryId,
|
||||
pgss_store(query->queryId,
|
||||
pstate->p_sourcetext,
|
||||
0, /* error elevel */
|
||||
0, /* error sqlcode */
|
||||
NULL, /* error message */
|
||||
query->stmt_location,
|
||||
query->stmt_len,
|
||||
PGSS_INVALID,
|
||||
@@ -420,20 +436,23 @@ pgss_ExecutorEnd(QueryDesc *queryDesc)
|
||||
stime = TIMEVAL_DIFF(rusage_start.ru_stime, rusage_end.ru_stime);
|
||||
|
||||
if (PGSM_ENABLED == 1)
|
||||
pgss_store(queryDesc->sourceText,
|
||||
queryId,
|
||||
queryDesc->plannedstmt->stmt_location,
|
||||
queryDesc->plannedstmt->stmt_len,
|
||||
PGSS_EXEC,
|
||||
queryDesc->totaltime->total * 1000.0, /* convert to msec */
|
||||
queryDesc->estate->es_processed,
|
||||
&queryDesc->totaltime->bufusage,
|
||||
pgss_store(queryId,
|
||||
queryDesc->sourceText,
|
||||
0, /* error elevel */
|
||||
0, /* error sqlcode */
|
||||
NULL, /* error message */
|
||||
queryDesc->plannedstmt->stmt_location,
|
||||
queryDesc->plannedstmt->stmt_len,
|
||||
PGSS_EXEC,
|
||||
queryDesc->totaltime->total * 1000.0, /* convert to msec */
|
||||
queryDesc->estate->es_processed,
|
||||
&queryDesc->totaltime->bufusage,
|
||||
#if PG_VERSION_NUM >= 130000
|
||||
&queryDesc->totaltime->walusage,
|
||||
&queryDesc->totaltime->walusage,
|
||||
#endif
|
||||
NULL,
|
||||
utime,
|
||||
stime);
|
||||
NULL,
|
||||
utime,
|
||||
stime);
|
||||
}
|
||||
|
||||
if (prev_ExecutorEnd)
|
||||
@@ -553,20 +572,23 @@ static void pgss_ProcessUtility(PlannedStmt *pstmt, const char *queryString,
|
||||
memset(&bufusage, 0, sizeof(BufferUsage));
|
||||
BufferUsageAccumDiff(&bufusage, &pgBufferUsage, &bufusage_start);
|
||||
if (PGSM_ENABLED == 1)
|
||||
pgss_store(queryString,
|
||||
0, /* signal that it's a utility stmt */
|
||||
pstmt->stmt_location,
|
||||
pstmt->stmt_len,
|
||||
PGSS_EXEC,
|
||||
INSTR_TIME_GET_MILLISEC(duration),
|
||||
rows,
|
||||
&bufusage,
|
||||
pgss_store(0, /* query id, passing 0 to signal that it's a utility stmt */
|
||||
queryString, /* query text */
|
||||
0, /* error elevel */
|
||||
0, /* error sqlcode */
|
||||
NULL, /* error message */
|
||||
pstmt->stmt_location,
|
||||
pstmt->stmt_len,
|
||||
PGSS_EXEC,
|
||||
INSTR_TIME_GET_MILLISEC(duration),
|
||||
rows,
|
||||
&bufusage,
|
||||
#if PG_VERSION_NUM >= 130000
|
||||
&walusage,
|
||||
&walusage,
|
||||
#endif
|
||||
NULL,
|
||||
0,
|
||||
0);
|
||||
NULL,
|
||||
0,
|
||||
0);
|
||||
}
|
||||
else
|
||||
{
|
||||
@@ -671,16 +693,23 @@ pg_get_client_addr(void)
|
||||
* we have no statistics as yet; we just want to record the normalized
|
||||
* query string. total_time, rows, bufusage are ignored in this case.
|
||||
*/
|
||||
static void pgss_store(const char *query, uint64 queryId,
|
||||
int query_location, int query_len,
|
||||
pgssStoreKind kind,
|
||||
double total_time, uint64 rows,
|
||||
const BufferUsage *bufusage,
|
||||
static void pgss_store(uint64 queryId,
|
||||
const char *query,
|
||||
uint64 elevel,
|
||||
uint64 sqlcode,
|
||||
const char *message,
|
||||
int query_location,
|
||||
int query_len,
|
||||
pgssStoreKind kind,
|
||||
double total_time,
|
||||
uint64 rows,
|
||||
const BufferUsage *bufusage,
|
||||
#if PG_VERSION_NUM >= 130000
|
||||
const WalUsage *walusage,
|
||||
const WalUsage *walusage,
|
||||
#endif
|
||||
pgssJumbleState *jstate,
|
||||
float utime, float stime)
|
||||
pgssJumbleState *jstate,
|
||||
float utime,
|
||||
float stime)
|
||||
{
|
||||
pgssHashKey key;
|
||||
int bucket_id;
|
||||
@@ -693,12 +722,13 @@ static void pgss_store(const char *query, uint64 queryId,
|
||||
int len;
|
||||
pgssSharedState *pgss = pgsm_get_ss();
|
||||
HTAB *pgss_hash = pgsm_get_hash();
|
||||
int message_len = message ? strlen(message) : 0;
|
||||
|
||||
Assert(query != NULL);
|
||||
Assert(PGSM_ENABLED);
|
||||
|
||||
/* Safety check... */
|
||||
if (!IsHashInitialize() || !pgss_qbuf[pgss->current_wbucket])
|
||||
if (!IsSystemInitialized() || !pgss_qbuf[pgss->current_wbucket])
|
||||
return;
|
||||
|
||||
/*
|
||||
@@ -755,7 +785,10 @@ static void pgss_store(const char *query, uint64 queryId,
|
||||
|
||||
/* Set up key for hashtable search */
|
||||
key.bucket_id = bucket_id;
|
||||
key.userid = GetUserId();
|
||||
if (elevel == 0)
|
||||
key.userid = GetUserId();
|
||||
else
|
||||
key.userid = 1;
|
||||
key.dbid = MyDatabaseId;
|
||||
key.queryid = queryId;
|
||||
key.ip = pg_get_client_addr();
|
||||
@@ -858,6 +891,10 @@ static void pgss_store(const char *query, uint64 queryId,
|
||||
e->counters.resp_calls[MAX_RESPONSE_BUCKET - 1]++;
|
||||
}
|
||||
|
||||
e->counters.error.elevel = elevel;
|
||||
e->counters.error.sqlcode = sqlcode;
|
||||
for(i = 0; i < message_len; i++)
|
||||
e->counters.error.message[i] = message[i];
|
||||
e->counters.calls[kind].rows += rows;
|
||||
e->counters.blocks.shared_blks_hit += bufusage->shared_blks_hit;
|
||||
e->counters.blocks.shared_blks_read += bufusage->shared_blks_read;
|
||||
@@ -897,7 +934,7 @@ pg_stat_monitor_reset(PG_FUNCTION_ARGS)
|
||||
{
|
||||
pgssSharedState *pgss = pgsm_get_ss();
|
||||
/* Safety check... */
|
||||
if (!IsHashInitialize())
|
||||
if (!IsSystemInitialized())
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
|
||||
errmsg("pg_stat_monitor: must be loaded via shared_preload_libraries")));
|
||||
@@ -907,7 +944,6 @@ pg_stat_monitor_reset(PG_FUNCTION_ARGS)
|
||||
PG_RETURN_VOID();
|
||||
}
|
||||
|
||||
#define PG_STAT_STATEMENTS_COLS 38 /* maximum of above */
|
||||
|
||||
Datum
|
||||
pg_stat_wait_events(PG_FUNCTION_ARGS)
|
||||
@@ -925,7 +961,7 @@ pg_stat_wait_events(PG_FUNCTION_ARGS)
|
||||
HTAB *pgss_waiteventshash = pgsm_get_wait_event_hash();
|
||||
|
||||
/* Safety check... */
|
||||
if (!IsHashInitialize())
|
||||
if (!IsSystemInitialized())
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
|
||||
errmsg("pg_stat_monitor: must be loaded via shared_preload_libraries")));
|
||||
@@ -1039,7 +1075,7 @@ pg_stat_monitor_internal(FunctionCallInfo fcinfo,
|
||||
is_allowed_role = is_member_of_role(GetUserId(), DEFAULT_ROLE_READ_ALL_STATS);
|
||||
|
||||
/* Safety check... */
|
||||
if (!IsHashInitialize())
|
||||
if (!IsSystemInitialized())
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
|
||||
errmsg("pg_stat_monitor: must be loaded via shared_preload_libraries")));
|
||||
@@ -1087,14 +1123,18 @@ pg_stat_monitor_internal(FunctionCallInfo fcinfo,
|
||||
memset(nulls, 0, sizeof(nulls));
|
||||
|
||||
if(locate_query(entry->key.bucket_id, queryid, query_txt) == 0)
|
||||
sprintf(query_txt, "%s", "<invalid query text, probably no space left in shared buffer>");
|
||||
query_txt = NULL;
|
||||
|
||||
sprintf(queryid_txt, "%08lX", queryid);
|
||||
if (query_txt)
|
||||
sprintf(queryid_txt, "%08lX", queryid);
|
||||
else
|
||||
sprintf(queryid_txt, "%08lX", (long unsigned int)0);
|
||||
|
||||
values[i++] = ObjectIdGetDatum(entry->key.bucket_id);
|
||||
values[i++] = ObjectIdGetDatum(entry->key.userid);
|
||||
values[i++] = ObjectIdGetDatum(entry->key.dbid);
|
||||
values[i++] = Int64GetDatumFast(entry->key.ip);
|
||||
|
||||
/* copy counters to a local variable to keep locking time short */
|
||||
{
|
||||
volatile pgssEntry *e = (volatile pgssEntry *) entry;
|
||||
@@ -1108,10 +1148,17 @@ pg_stat_monitor_internal(FunctionCallInfo fcinfo,
|
||||
if (showtext)
|
||||
{
|
||||
char *enc;
|
||||
enc = pg_any_to_server(query_txt, strlen(query_txt), entry->encoding);
|
||||
values[i++] = CStringGetTextDatum(enc);
|
||||
if (enc != query_txt)
|
||||
pfree(enc);
|
||||
if (query_txt)
|
||||
{
|
||||
enc = pg_any_to_server(query_txt, strlen(query_txt), entry->encoding);
|
||||
values[i++] = CStringGetTextDatum(enc);
|
||||
if (enc != query_txt)
|
||||
pfree(enc);
|
||||
}
|
||||
else
|
||||
{
|
||||
nulls[i++] = true;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
@@ -1130,6 +1177,12 @@ pg_stat_monitor_internal(FunctionCallInfo fcinfo,
|
||||
else
|
||||
nulls[i++] = true;
|
||||
}
|
||||
values[i++] = Int64GetDatumFast(tmp.error.elevel);
|
||||
values[i++] = Int64GetDatumFast(tmp.error.sqlcode);
|
||||
if (strlen(tmp.error.message) == 0)
|
||||
nulls[i++] = true;
|
||||
else
|
||||
values[i++] = CStringGetTextDatum(tmp.error.message);
|
||||
|
||||
values[i++] = TimestampGetDatum(pgss->bucket_start_time[entry->key.bucket_id]);
|
||||
|
||||
@@ -2282,18 +2335,21 @@ static PlannedStmt *pgss_planner_hook(Query *parse, int opt, ParamListInfo param
|
||||
memset(&walusage, 0, sizeof(WalUsage));
|
||||
WalUsageAccumDiff(&walusage, &pgWalUsage, &walusage_start);
|
||||
if (PGSM_ENABLED == 1)
|
||||
pgss_store(query_string,
|
||||
parse->queryId,
|
||||
parse->stmt_location,
|
||||
parse->stmt_len,
|
||||
PGSS_PLAN,
|
||||
INSTR_TIME_GET_MILLISEC(duration),
|
||||
0,
|
||||
&bufusage,
|
||||
&walusage,
|
||||
NULL,
|
||||
0,
|
||||
0);
|
||||
pgss_store(query_string, /* query text */
|
||||
parse->queryId, /* query id */
|
||||
0, /* error elevel */
|
||||
0, /* error sqlcode */
|
||||
NULL, /* error message */
|
||||
parse->stmt_location,
|
||||
parse->stmt_len,
|
||||
PGSS_PLAN,
|
||||
INSTR_TIME_GET_MILLISEC(duration),
|
||||
0,
|
||||
&bufusage,
|
||||
&walusage,
|
||||
NULL,
|
||||
0,
|
||||
0);
|
||||
}
|
||||
else
|
||||
{
|
||||
@@ -2458,3 +2514,46 @@ set_qbuf(int i, unsigned char *buf)
|
||||
{
|
||||
pgss_qbuf[i] = buf;
|
||||
}
|
||||
|
||||
void
|
||||
pgsm_emit_log_hook(ErrorData *edata)
|
||||
{
|
||||
BufferUsage bufusage;
|
||||
#if PG_VERSION_NUM >= 130000
|
||||
WalUsage walusage;
|
||||
#endif
|
||||
if (PGSM_ENABLED == 1 && IsSystemInitialized())
|
||||
{
|
||||
uint64 queryid = 0;
|
||||
|
||||
if (debug_query_string)
|
||||
queryid = DatumGetUInt64(hash_any_extended((const unsigned char *)debug_query_string, strlen(debug_query_string), 0));
|
||||
|
||||
pgss_store(queryid,
|
||||
debug_query_string ? debug_query_string : "",
|
||||
edata->elevel,
|
||||
edata->sqlerrcode,
|
||||
edata->message,
|
||||
0,
|
||||
debug_query_string ? strlen(debug_query_string) : 0,
|
||||
PGSS_EXEC,
|
||||
0,
|
||||
0,
|
||||
&bufusage,
|
||||
#if PG_VERSION_NUM >= 130000
|
||||
&walusage,
|
||||
#endif
|
||||
NULL,
|
||||
0,
|
||||
0);
|
||||
}
|
||||
if (prev_emit_log_hook)
|
||||
prev_emit_log_hook(edata);
|
||||
}
|
||||
|
||||
bool
|
||||
IsSystemInitialized(void)
|
||||
{
|
||||
return (system_init && IsHashInitialize());
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user