parent
95dc0b575f
commit
3999e28c00
14
README.md
14
README.md
|
@ -490,17 +490,3 @@ postgres=# select tables_names, query from pg_stat_monitor;
|
|||
```
|
||||
|
||||
|
||||
**`wait_event`**: Current state of the query
|
||||
|
||||
```
|
||||
postgres=# select wait_event_type, query from pg_stat_monitor;
|
||||
wait_event | query
|
||||
-----------------+-------------------------------------------------------------------------------
|
||||
| select client_ip, query from pg_stat_monitor
|
||||
ClientRead | select wait_event_type, query from pg_stat_monitor
|
||||
| select * from pg_stat_monitor_reset()
|
||||
```
|
||||
|
||||
|
||||
|
||||
|
||||
|
|
50
hash_query.c
50
hash_query.c
|
@ -17,9 +17,7 @@
|
|||
static pgssSharedState *pgss;
|
||||
static HTAB *pgss_hash;
|
||||
static HTAB *pgss_object_hash;
|
||||
static HTAB *pgss_waiteventshash = NULL;
|
||||
|
||||
static pgssWaitEventEntry **pgssWaitEventEntries = NULL;
|
||||
static HTAB* hash_init(const char *hash_name, int key_size, int entry_size, int hash_size);
|
||||
|
||||
static HTAB*
|
||||
|
@ -39,10 +37,10 @@ pgss_startup(void)
|
|||
int32 i;
|
||||
|
||||
/* reset in case this is a restart within the postmaster */
|
||||
|
||||
pgss = NULL;
|
||||
pgss_hash = NULL;
|
||||
pgss_object_hash = NULL;
|
||||
pgss_waiteventshash = NULL;
|
||||
|
||||
/*
|
||||
* Create or attach to the shared memory state, including hash table
|
||||
|
@ -68,36 +66,14 @@ pgss_startup(void)
|
|||
}
|
||||
|
||||
pgss_hash = hash_init("pg_stat_monitor: Queries hashtable", sizeof(pgssHashKey), sizeof(pgssEntry),PGSM_MAX);
|
||||
|
||||
pgss_waiteventshash = hash_init("pg_stat_monitor: Wait Event hashtable", sizeof(pgssWaitEventKey), sizeof(pgssWaitEventEntry), 100);
|
||||
|
||||
pgss_object_hash = hash_init("pg_stat_monitor: Object hashtable", sizeof(pgssObjectHashKey), sizeof(pgssObjectEntry), PGSM_OBJECT_CACHE);
|
||||
|
||||
Assert(IsHashInitialize());
|
||||
|
||||
pgssWaitEventEntries = malloc(sizeof (pgssWaitEventEntry) * MAX_BACKEND_PROCESES);
|
||||
for (i = 0; i < MAX_BACKEND_PROCESES; i++)
|
||||
{
|
||||
pgssWaitEventKey key;
|
||||
pgssWaitEventEntry *entry = NULL;
|
||||
bool found = false;
|
||||
|
||||
key.processid = i;
|
||||
entry = (pgssWaitEventEntry *) hash_search(pgss_waiteventshash, &key, HASH_ENTER, &found);
|
||||
if (!found)
|
||||
{
|
||||
SpinLockInit(&entry->mutex);
|
||||
pgssWaitEventEntries[i] = entry;
|
||||
}
|
||||
}
|
||||
|
||||
LWLockRelease(AddinShmemInitLock);
|
||||
|
||||
/*
|
||||
* If we're in the postmaster (or a standalone backend...), set up a shmem
|
||||
* exit hook to dump the statistics to disk.
|
||||
*/
|
||||
if (!IsUnderPostmaster)
|
||||
on_shmem_exit(pgss_shmem_shutdown, (Datum) 0);
|
||||
}
|
||||
|
||||
|
@ -117,16 +93,6 @@ HTAB* pgsm_get_hash(void)
|
|||
return pgss_hash;
|
||||
}
|
||||
|
||||
HTAB* pgsm_get_wait_event_hash(void)
|
||||
{
|
||||
return pgss_waiteventshash;
|
||||
}
|
||||
|
||||
pgssWaitEventEntry** pgsm_get_wait_event_entry(void)
|
||||
{
|
||||
return pgssWaitEventEntries;
|
||||
}
|
||||
|
||||
/*
|
||||
* shmem_shutdown hook: Dump statistics into file.
|
||||
*
|
||||
|
@ -136,12 +102,13 @@ pgssWaitEventEntry** pgsm_get_wait_event_entry(void)
|
|||
void
|
||||
pgss_shmem_shutdown(int code, Datum arg)
|
||||
{
|
||||
elog(DEBUG2, "pg_stat_monitor: %s()", __FUNCTION__);
|
||||
printf("--%s", __FUNCTION__);
|
||||
|
||||
/* Don't try to dump during a crash. */
|
||||
if (code)
|
||||
return;
|
||||
|
||||
pgss = NULL;
|
||||
/* Safety check ... shouldn't get here unless shmem is set up. */
|
||||
if (!IsHashInitialize())
|
||||
return;
|
||||
|
@ -220,7 +187,6 @@ hash_entry_reset()
|
|||
{
|
||||
HASH_SEQ_STATUS hash_seq;
|
||||
pgssEntry *entry;
|
||||
pgssWaitEventEntry *weentry;
|
||||
|
||||
LWLockAcquire(pgss->lock, LW_EXCLUSIVE);
|
||||
|
||||
|
@ -229,14 +195,7 @@ hash_entry_reset()
|
|||
{
|
||||
hash_search(pgss_hash, &entry->key, HASH_REMOVE, NULL);
|
||||
}
|
||||
|
||||
hash_seq_init(&hash_seq, pgss_waiteventshash);
|
||||
while ((weentry = hash_seq_search(&hash_seq)) != NULL)
|
||||
{
|
||||
hash_search(pgss_waiteventshash, &weentry->key, HASH_REMOVE, NULL);
|
||||
}
|
||||
pgss->current_wbucket = 0;
|
||||
free(pgssWaitEventEntries);
|
||||
LWLockRelease(pgss->lock);
|
||||
}
|
||||
|
||||
|
@ -314,7 +273,6 @@ IsHashInitialize(void)
|
|||
{
|
||||
return (pgss != NULL &&
|
||||
pgss_hash != NULL &&
|
||||
pgss_object_hash !=NULL &&
|
||||
pgss_waiteventshash != NULL);
|
||||
pgss_object_hash !=NULL);
|
||||
}
|
||||
|
||||
|
|
|
@ -64,16 +64,6 @@ RETURNS SETOF record
|
|||
AS 'MODULE_PATHNAME', 'pg_stat_monitor'
|
||||
LANGUAGE C STRICT VOLATILE PARALLEL SAFE;
|
||||
|
||||
CREATE FUNCTION pg_stat_wait_events(
|
||||
OUT queryid text,
|
||||
OUT pid bigint,
|
||||
OUT wait_event text,
|
||||
OUT wait_event_type text
|
||||
)
|
||||
RETURNS SETOF record
|
||||
AS 'MODULE_PATHNAME', 'pg_stat_wait_events'
|
||||
LANGUAGE C STRICT VOLATILE PARALLEL SAFE;
|
||||
|
||||
CREATE FUNCTION pg_stat_monitor_settings(
|
||||
OUT name text,
|
||||
OUT value INTEGER,
|
||||
|
@ -162,32 +152,6 @@ SELECT
|
|||
$$
|
||||
LANGUAGE SQL PARALLEL SAFE;
|
||||
|
||||
-- Register a view on the function for ease of use.
|
||||
CREATE VIEW pg_stat_wait_events AS SELECT
|
||||
m.queryid,
|
||||
query,
|
||||
wait_event,
|
||||
wait_event_type
|
||||
FROM pg_stat_monitor(true) m, pg_stat_wait_events() w WHERE w.queryid = m.queryid;
|
||||
|
||||
/*CREATE VIEW pg_stat_monitor_db AS
|
||||
SELECT
|
||||
*
|
||||
FROM pg_stat_monitor GROUP BY dbid;
|
||||
|
||||
CREATE VIEW pg_stat_monitor_user AS
|
||||
SELECT
|
||||
*
|
||||
FROM pg_stat_monitor GROUP BY userid;
|
||||
|
||||
CREATE VIEW pg_stat_monitor_ip AS
|
||||
SELECT
|
||||
*
|
||||
FROM pg_stat_monitor GROUP BY client_ip;
|
||||
|
||||
GRANT SELECT ON pg_stat_agg_user TO PUBLIC;
|
||||
GRANT SELECT ON pg_stat_agg_ip TO PUBLIC;
|
||||
GRANT SELECT ON pg_stat_agg_database TO PUBLIC;
|
||||
GRANT SELECT ON pg_stat_monitor_settings TO PUBLIC;
|
||||
*/
|
||||
-- Don't want this to be available to non-superusers.
|
||||
|
|
|
@ -35,8 +35,6 @@ static int exec_nested_level = 0;
|
|||
static bool system_init = false;
|
||||
static struct rusage rusage_start;
|
||||
static struct rusage rusage_end;
|
||||
static volatile sig_atomic_t sigterm = false;
|
||||
static void handle_sigterm(SIGNAL_ARGS);
|
||||
static unsigned char *pgss_qbuf[MAX_BUCKETS];
|
||||
|
||||
|
||||
|
@ -59,7 +57,6 @@ PG_FUNCTION_INFO_V1(pg_stat_monitor_reset);
|
|||
PG_FUNCTION_INFO_V1(pg_stat_monitor_1_2);
|
||||
PG_FUNCTION_INFO_V1(pg_stat_monitor_1_3);
|
||||
PG_FUNCTION_INFO_V1(pg_stat_monitor);
|
||||
PG_FUNCTION_INFO_V1(pg_stat_wait_events);
|
||||
PG_FUNCTION_INFO_V1(pg_stat_monitor_settings);
|
||||
|
||||
static uint pg_get_client_addr(void);
|
||||
|
@ -130,10 +127,6 @@ static uint64 get_next_wbucket(pgssSharedState *pgss);
|
|||
static void store_query(uint64 queryid, const char *query, uint64 query_len);
|
||||
static uint64 locate_query(uint64 bucket_id, uint64 queryid, char * query);
|
||||
|
||||
/* Wait Event Local Functions */
|
||||
static void register_wait_event(void);
|
||||
void wait_event_main(Datum main_arg);
|
||||
static void update_wait_event(void);
|
||||
static uint64 get_query_id(pgssJumbleState *jstate, Query *query);
|
||||
|
||||
/*
|
||||
|
@ -167,9 +160,6 @@ _PG_init(void)
|
|||
RequestAddinShmemSpace(hash_memsize());
|
||||
RequestNamedLWLockTranche("pg_stat_monitor", 1);
|
||||
|
||||
/* Register Wait events */
|
||||
register_wait_event();
|
||||
|
||||
/*
|
||||
* Install hooks.
|
||||
*/
|
||||
|
@ -200,8 +190,7 @@ _PG_init(void)
|
|||
void
|
||||
_PG_fini(void)
|
||||
{
|
||||
elog(DEBUG2, "pg_stat_monitor: %s()", __FUNCTION__);
|
||||
|
||||
system_init = false;
|
||||
shmem_startup_hook = prev_shmem_startup_hook;
|
||||
post_parse_analyze_hook = prev_post_parse_analyze_hook;
|
||||
ExecutorStart_hook = prev_ExecutorStart;
|
||||
|
@ -944,105 +933,6 @@ pg_stat_monitor_reset(PG_FUNCTION_ARGS)
|
|||
PG_RETURN_VOID();
|
||||
}
|
||||
|
||||
|
||||
Datum
|
||||
pg_stat_wait_events(PG_FUNCTION_ARGS)
|
||||
{
|
||||
ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
|
||||
TupleDesc tupdesc;
|
||||
Tuplestorestate *tupstore;
|
||||
MemoryContext per_query_ctx;
|
||||
MemoryContext oldcontext;
|
||||
HASH_SEQ_STATUS hash_seq;
|
||||
pgssWaitEventEntry *entry;
|
||||
char *query_txt;
|
||||
char queryid_txt[64];
|
||||
pgssSharedState *pgss = pgsm_get_ss();
|
||||
HTAB *pgss_waiteventshash = pgsm_get_wait_event_hash();
|
||||
|
||||
/* Safety check... */
|
||||
if (!IsSystemInitialized())
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
|
||||
errmsg("pg_stat_monitor: must be loaded via shared_preload_libraries")));
|
||||
|
||||
/* check to see if caller supports us returning a tuplestore */
|
||||
if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("pg_stat_monitor: set-valued function called in context that cannot accept a set")));
|
||||
if (!(rsinfo->allowedModes & SFRM_Materialize))
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("pg_stat_monitor: materialize mode required, but it is not " \
|
||||
"allowed in this context")));
|
||||
|
||||
query_txt = (char*) malloc(PGSM_QUERY_MAX_LEN);
|
||||
|
||||
/* Switch into long-lived context to construct returned data structures */
|
||||
per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
|
||||
oldcontext = MemoryContextSwitchTo(per_query_ctx);
|
||||
|
||||
/* Build a tuple descriptor for our result type */
|
||||
if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
|
||||
elog(ERROR, "pg_stat_monitor: return type must be a row type");
|
||||
|
||||
tupstore = tuplestore_begin_heap(true, false, work_mem);
|
||||
rsinfo->returnMode = SFRM_Materialize;
|
||||
rsinfo->setResult = tupstore;
|
||||
rsinfo->setDesc = tupdesc;
|
||||
|
||||
MemoryContextSwitchTo(oldcontext);
|
||||
|
||||
LWLockAcquire(pgss->lock, LW_SHARED);
|
||||
|
||||
hash_seq_init(&hash_seq, pgss_waiteventshash);
|
||||
while ((entry = hash_seq_search(&hash_seq)) != NULL)
|
||||
{
|
||||
Datum values[4];
|
||||
bool nulls[4] = {true};
|
||||
int i = 0;
|
||||
int64 queryid = entry->key.queryid;
|
||||
|
||||
if (queryid == 0)
|
||||
continue;
|
||||
memset(values, 0, sizeof(values));
|
||||
memset(nulls, 0, sizeof(nulls));
|
||||
|
||||
sprintf(queryid_txt, "%08lX", queryid);
|
||||
|
||||
values[i++] = ObjectIdGetDatum(cstring_to_text(queryid_txt));
|
||||
values[i++] = ObjectIdGetDatum(entry->pid);
|
||||
if (entry->wait_event_info != 0)
|
||||
{
|
||||
const char *event_type = pgstat_get_wait_event_type(entry->wait_event_info);
|
||||
const char *event = pgstat_get_wait_event(entry->wait_event_info);
|
||||
if (event_type)
|
||||
values[i++] = PointerGetDatum(cstring_to_text(event_type));
|
||||
else
|
||||
nulls[i++] = true;
|
||||
if (event)
|
||||
values[i++] = PointerGetDatum(cstring_to_text(event));
|
||||
else
|
||||
nulls[i++] = true;
|
||||
}
|
||||
else
|
||||
{
|
||||
nulls[i++] = true;
|
||||
nulls[i++] = true;
|
||||
}
|
||||
tuplestore_putvalues(tupstore, tupdesc, values, nulls);
|
||||
}
|
||||
free(query_txt);
|
||||
|
||||
/* clean up and return the tuplestore */
|
||||
LWLockRelease(pgss->lock);
|
||||
|
||||
tuplestore_donestoring(tupstore);
|
||||
return (Datum) 0;
|
||||
}
|
||||
|
||||
|
||||
Datum
|
||||
pg_stat_monitor(PG_FUNCTION_ARGS)
|
||||
{
|
||||
|
@ -2282,14 +2172,6 @@ static PlannedStmt *pgss_planner_hook(Query *parse, int opt, ParamListInfo param
|
|||
#endif
|
||||
{
|
||||
PlannedStmt *result;
|
||||
pgssWaitEventEntry **pgssWaitEventEntries = pgsm_get_wait_event_entry();
|
||||
|
||||
if (MyProc)
|
||||
{
|
||||
int i = MyProc - ProcGlobal->allProcs;
|
||||
if (pgssWaitEventEntries[i]->key.queryid != parse->queryId)
|
||||
pgssWaitEventEntries[i]->key.queryid = parse->queryId;
|
||||
}
|
||||
#if PG_VERSION_NUM >= 130000
|
||||
if (PGSM_TRACK_PLANNING && query_string
|
||||
&& parse->queryId != UINT64CONST(0))
|
||||
|
@ -2353,7 +2235,6 @@ static PlannedStmt *pgss_planner_hook(Query *parse, int opt, ParamListInfo param
|
|||
}
|
||||
else
|
||||
{
|
||||
|
||||
if (planner_hook_next)
|
||||
result = planner_hook_next(parse, query_string, cursorOptions, boundParams);
|
||||
result = standard_planner(parse, query_string, cursorOptions, boundParams);
|
||||
|
@ -2366,75 +2247,6 @@ static PlannedStmt *pgss_planner_hook(Query *parse, int opt, ParamListInfo param
|
|||
return result;
|
||||
}
|
||||
|
||||
static void
|
||||
update_wait_event(void)
|
||||
{
|
||||
PGPROC *proc = NULL;
|
||||
int i;
|
||||
pgssWaitEventEntry **pgssWaitEventEntries = pgsm_get_wait_event_entry();
|
||||
|
||||
LWLockAcquire(ProcArrayLock, LW_SHARED);
|
||||
for (i = 0; i < ProcGlobal->allProcCount; i++)
|
||||
{
|
||||
proc = &ProcGlobal->allProcs[i];
|
||||
if (proc->pid == 0)
|
||||
continue;
|
||||
|
||||
pgssWaitEventEntries[i]->wait_event_info = proc->wait_event_info;
|
||||
pgssWaitEventEntries[i]->pid = proc->pid;
|
||||
}
|
||||
LWLockRelease(ProcArrayLock);
|
||||
}
|
||||
|
||||
static void
|
||||
handle_sigterm(SIGNAL_ARGS)
|
||||
{
|
||||
sigterm = true;
|
||||
}
|
||||
|
||||
static void
|
||||
register_wait_event(void)
|
||||
{
|
||||
BackgroundWorker worker;
|
||||
|
||||
memset(&worker, 0, sizeof(worker));
|
||||
worker.bgw_flags = BGWORKER_SHMEM_ACCESS;
|
||||
worker.bgw_start_time = BgWorkerStart_ConsistentState;
|
||||
worker.bgw_restart_time = 0;
|
||||
worker.bgw_notify_pid = 0;
|
||||
snprintf(worker.bgw_library_name, BGW_MAXLEN, "pg_stat_monitor");
|
||||
snprintf(worker.bgw_function_name, BGW_MAXLEN, CppAsString(wait_event_main));
|
||||
snprintf(worker.bgw_name, BGW_MAXLEN, "pg_stat_monitor collector");
|
||||
worker.bgw_main_arg = (Datum) 0;
|
||||
RegisterBackgroundWorker(&worker);
|
||||
}
|
||||
|
||||
void
|
||||
wait_event_main(Datum main_arg)
|
||||
{
|
||||
int rc;
|
||||
|
||||
InitPostgres(NULL, InvalidOid, NULL, InvalidOid, NULL, false);
|
||||
SetProcessingMode(NormalProcessing);
|
||||
pqsignal(SIGTERM, handle_sigterm);
|
||||
BackgroundWorkerUnblockSignals();
|
||||
while (1)
|
||||
{
|
||||
sleep(1);
|
||||
if (sigterm)
|
||||
break;
|
||||
rc = WaitLatch(&MyProc->procLatch, WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH, 1, PG_WAIT_EXTENSION);
|
||||
|
||||
if (rc & WL_POSTMASTER_DEATH)
|
||||
proc_exit(1);
|
||||
|
||||
ResetLatch(&MyProc->procLatch);
|
||||
|
||||
update_wait_event();
|
||||
}
|
||||
proc_exit(0);
|
||||
}
|
||||
|
||||
static uint64
|
||||
get_query_id(pgssJumbleState *jstate, Query *query)
|
||||
{
|
||||
|
@ -2522,7 +2334,10 @@ pgsm_emit_log_hook(ErrorData *edata)
|
|||
#if PG_VERSION_NUM >= 130000
|
||||
WalUsage walusage;
|
||||
#endif
|
||||
if (PGSM_ENABLED == 1 && IsSystemInitialized())
|
||||
|
||||
if (PGSM_ENABLED == 1 &&
|
||||
IsSystemInitialized() &&
|
||||
(edata->elevel == ERROR || edata->elevel == WARNING || edata->elevel == INFO || edata->elevel == DEBUG1) )
|
||||
{
|
||||
uint64 queryid = 0;
|
||||
|
||||
|
|
|
@ -111,23 +111,7 @@ typedef struct pgssObjectEntry
|
|||
slock_t mutex; /* protects the counters only */
|
||||
} pgssObjectEntry;
|
||||
|
||||
typedef struct pgssWaitEventKey
|
||||
{
|
||||
uint64 queryid;
|
||||
uint64 processid;
|
||||
} pgssWaitEventKey;
|
||||
|
||||
#define MAX_QUERY_LEN 1024
|
||||
typedef struct pgssWaitEventEntry
|
||||
{
|
||||
pgssWaitEventKey key; /* hash key of entry - MUST BE FIRST */
|
||||
uint64 queryid;
|
||||
uint64 pid;
|
||||
uint32 wait_event_info;
|
||||
char query[MAX_QUERY_LEN];
|
||||
slock_t mutex; /* protects the counters only */
|
||||
} pgssWaitEventEntry;
|
||||
|
||||
|
||||
/* shared nenory storage for the query */
|
||||
typedef struct pgssHashKey
|
||||
|
@ -303,9 +287,7 @@ void pgss_shmem_startup(void);
|
|||
void pgss_shmem_shutdown(int code, Datum arg);
|
||||
int pgsm_get_bucket_size(void);
|
||||
pgssSharedState* pgsm_get_ss(void);
|
||||
HTAB* pgsm_get_wait_event_hash(void);
|
||||
HTAB* pgsm_get_hash(void);
|
||||
pgssWaitEventEntry** pgsm_get_wait_event_entry(void);
|
||||
void hash_entry_reset(void);
|
||||
void hash_entry_dealloc(int bucket);
|
||||
pgssEntry* hash_entry_alloc(pgssSharedState *pgss, pgssHashKey *key, int encoding);
|
||||
|
|
Loading…
Reference in New Issue