Issue - (#34): PostgreSQL Version 13 support added.

pull/36/head
Ibrar Ahmed 2020-06-23 17:24:21 +00:00
parent 5536041539
commit 355436c4dc
5 changed files with 326 additions and 160 deletions

View File

@ -99,13 +99,13 @@ SELECT query, calls, rows FROM pg_stat_monitor ORDER BY query COLLATE "C";
query | calls | rows query | calls | rows
---------------------------------------------------+-------+------ ---------------------------------------------------+-------+------
PREPARE pgss_test (int) AS SELECT $1, $2 LIMIT $3 | 1 | 1 PREPARE pgss_test (int) AS SELECT $1, $2 LIMIT $3 | 1 | 1
SELECT $1 | 2 | 2
SELECT $1 +| 4 | 4 SELECT $1 +| 4 | 4
+| | +| |
AS "text" | | AS "text" | |
SELECT $1 + $2 | 2 | 2 SELECT $1 + $2 | 2 | 2
SELECT $1 + $2 + $3 AS "add" | 3 | 3 SELECT $1 + $2 + $3 AS "add" | 3 | 3
SELECT $1 AS "float" | 1 | 1 SELECT $1 AS "float" | 1 | 1
SELECT $1 AS "int" | 2 | 2
SELECT $1 AS i UNION SELECT $2 ORDER BY i | 1 | 2 SELECT $1 AS i UNION SELECT $2 ORDER BY i | 1 | 2
SELECT $1 || $2 | 1 | 1 SELECT $1 || $2 | 1 | 1
SELECT pg_stat_monitor_reset() | 1 | 1 SELECT pg_stat_monitor_reset() | 1 | 1
@ -233,8 +233,10 @@ SELECT 1 + 1 AS "two";
SELECT query, calls, rows FROM pg_stat_monitor ORDER BY query COLLATE "C"; SELECT query, calls, rows FROM pg_stat_monitor ORDER BY query COLLATE "C";
query | calls | rows query | calls | rows
-------+-------+------ ----------------+-------+------
(0 rows) SELECT $1 | 0 | 0
SELECT $1 + $2 | 0 | 0
(2 rows)
-- --
-- pg_stat_monitor.track = top -- pg_stat_monitor.track = top
@ -289,14 +291,16 @@ SELECT PLUS_ONE(10);
SELECT query, calls, rows FROM pg_stat_monitor ORDER BY query COLLATE "C"; SELECT query, calls, rows FROM pg_stat_monitor ORDER BY query COLLATE "C";
query | calls | rows query | calls | rows
--------------------------------+-------+------ -----------------------------------+-------+------
SELECT $1 +| 1 | 1 SELECT $1 +| 1 | 1
+| | +| |
AS "text" | | AS "text" | |
SELECT (i + $2 + $3)::INTEGER | 0 | 0
SELECT (i + $2)::INTEGER LIMIT $3 | 0 | 0
SELECT PLUS_ONE($1) | 2 | 2 SELECT PLUS_ONE($1) | 2 | 2
SELECT PLUS_TWO($1) | 2 | 2 SELECT PLUS_TWO($1) | 2 | 2
SELECT pg_stat_monitor_reset() | 1 | 1 SELECT pg_stat_monitor_reset() | 1 | 1
(4 rows) (6 rows)
-- --
-- pg_stat_monitor.track = all -- pg_stat_monitor.track = all
@ -393,7 +397,7 @@ SELECT query, calls, rows FROM pg_stat_monitor ORDER BY query COLLATE "C";
DROP FUNCTION PLUS_TWO(INTEGER) | 1 | 0 DROP FUNCTION PLUS_TWO(INTEGER) | 1 | 0
DROP TABLE IF EXISTS test | 3 | 0 DROP TABLE IF EXISTS test | 3 | 0
DROP TABLE test | 1 | 0 DROP TABLE test | 1 | 0
SELECT $1 AS "int" | 1 | 1 SELECT $1 | 1 | 1
SELECT pg_stat_monitor_reset() | 1 | 1 SELECT pg_stat_monitor_reset() | 1 | 1
(8 rows) (8 rows)

25
guc.c
View File

@ -53,6 +53,7 @@ init_guc(void)
.guc_max = 0, .guc_max = 0,
.guc_restart = false .guc_restart = false
}; };
conf[i++] = (GucVariable) { conf[i++] = (GucVariable) {
.guc_name = "pg_stat_monitor.pgsm_normalized_query", .guc_name = "pg_stat_monitor.pgsm_normalized_query",
.guc_desc = "Selects whether save query in normalized format.", .guc_desc = "Selects whether save query in normalized format.",
@ -112,6 +113,16 @@ init_guc(void)
.guc_max = INT_MAX, .guc_max = INT_MAX,
.guc_restart = true .guc_restart = true
}; };
#if PG_VERSION_NUM >= 130000
conf[i++] = (GucVariable) {
.guc_name = "pg_stat_monitor.pgsm_track_planning",
.guc_desc = "Selects whether planning statistics are tracked.",
.guc_default = 1,
.guc_min = 0,
.guc_max = 0,
.guc_restart = false
};
#endif
DefineCustomIntVariable("pg_stat_monitor.max", DefineCustomIntVariable("pg_stat_monitor.max",
"Sets the maximum number of statements tracked by pg_stat_monitor.", "Sets the maximum number of statements tracked by pg_stat_monitor.",
@ -173,7 +184,7 @@ init_guc(void)
NULL, NULL,
NULL); NULL);
DefineCustomIntVariable("pg_stat_monitor.PGSM_MAX_buckets ", DefineCustomIntVariable("pg_stat_monitor.pgsm_max_buckets ",
"Sets the maximum number of buckets.", "Sets the maximum number of buckets.",
NULL, NULL,
&PGSM_MAX_BUCKETS, &PGSM_MAX_BUCKETS,
@ -239,7 +250,7 @@ init_guc(void)
NULL, NULL,
NULL); NULL);
DefineCustomIntVariable("pg_stat_monitor.query_shared_buffer", DefineCustomIntVariable("pg_stat_monitor.pgsm_query_shared_buffer.",
"Sets the shared_buffer size", "Sets the shared_buffer size",
NULL, NULL,
&PGSM_QUERY_BUF_SIZE, &PGSM_QUERY_BUF_SIZE,
@ -251,6 +262,16 @@ init_guc(void)
NULL, NULL,
NULL, NULL,
NULL); NULL);
DefineCustomBoolVariable("pg_stat_monitor.pgsm_track_planning",
"Selects whether track planning statistics.",
NULL,
(bool*)&PGSM_TRACK_PLANNING,
true,
PGC_SUSET,
0,
NULL,
NULL,
NULL);
} }

View File

@ -17,6 +17,16 @@ CREATE FUNCTION pg_stat_monitor(IN showtext boolean,
OUT queryid text, OUT queryid text,
OUT query text, OUT query text,
OUT bucket_start_time timestamptz, OUT bucket_start_time timestamptz,
OUT plan_calls int8,
OUT plan_total_time float8,
OUT plan_min_time float8,
OUT plan_max_time float8,
OUT plan_mean_time float8,
OUT plan_stddev_time float8,
OUT plan_rows int8,
OUT calls int8, OUT calls int8,
OUT total_time float8, OUT total_time float8,
OUT min_time float8, OUT min_time float8,
@ -24,6 +34,7 @@ CREATE FUNCTION pg_stat_monitor(IN showtext boolean,
OUT mean_time float8, OUT mean_time float8,
OUT stddev_time float8, OUT stddev_time float8,
OUT rows int8, OUT rows int8,
OUT shared_blks_hit int8, OUT shared_blks_hit int8,
OUT shared_blks_read int8, OUT shared_blks_read int8,
OUT shared_blks_dirtied int8, OUT shared_blks_dirtied int8,
@ -96,12 +107,19 @@ CREATE VIEW pg_stat_monitor AS SELECT
dbid, dbid,
m.queryid, m.queryid,
query, query,
plan_calls,
round( CAST(plan_total_time as numeric), 2) as plan_total_time,
round( CAST(plan_min_time as numeric), 2) as plan_min_timei,
round( CAST(plan_max_time as numeric), 2) as plan_max_time,
round( CAST(plan_mean_time as numeric), 2) as plan_mean_time,
round( CAST(plan_stddev_time as numeric), 2) as plan_stddev_time,
plan_rows,
calls, calls,
total_time, round( CAST(total_time as numeric), 2)as total_time,
min_time, round( CAST(min_time as numeric), 2)as min_time,
max_time, round( CAST(max_time as numeric), 2)as max_time,
mean_time, round( CAST(mean_time as numeric), 2)as mean_time,
stddev_time, round( CAST(stddev_time as numeric), 2)as stddev_time,
rows, rows,
shared_blks_hit, shared_blks_hit,
shared_blks_read, shared_blks_read,

View File

@ -25,6 +25,10 @@ void _PG_fini(void);
/* Current nesting depth of ExecutorRun+ProcessUtility calls */ /* Current nesting depth of ExecutorRun+ProcessUtility calls */
static int nested_level = 0; static int nested_level = 0;
#if PG_VERSION_NUM >= 130000
static int plan_nested_level = 0;
static int exec_nested_level = 0;
#endif
static struct rusage rusage_start; static struct rusage rusage_start;
static struct rusage rusage_end; static struct rusage rusage_end;
static volatile sig_atomic_t sigterm = false; static volatile sig_atomic_t sigterm = false;
@ -77,10 +81,10 @@ static Datum array_get_datum(int arr[]);
static void update_agg_counters(uint64 bucket_id, uint64 queryid, uint64 id, AGG_KEY type); static void update_agg_counters(uint64 bucket_id, uint64 queryid, uint64 id, AGG_KEY type);
static pgssAggEntry *agg_entry_alloc(pgssAggHashKey *key); static pgssAggEntry *agg_entry_alloc(pgssAggHashKey *key);
void add_object_entry(uint64 queryid, char *objects); void add_object_entry(uint64 queryid, char *objects);
#if PG_VERSION_NUM >= 130000 #if PG_VERSION_NUM >= 130000
static PlannedStmt * pgss_planner_hook(Query *parse, const char *query_string, int cursorOptions, ParamListInfo boundParams); static PlannedStmt * pgss_planner_hook(Query *parse, const char *query_string, int cursorOptions, ParamListInfo boundParams);
#else #else
static void BufferUsageAccumDiff(BufferUsage* bufusage, BufferUsage* pgBufferUsage, BufferUsage* bufusage_start);
static PlannedStmt *pgss_planner_hook(Query *parse, int opt, ParamListInfo param); static PlannedStmt *pgss_planner_hook(Query *parse, int opt, ParamListInfo param);
#endif #endif
@ -91,21 +95,34 @@ static void pgss_ExecutorStart(QueryDesc *queryDesc, int eflags);
static void pgss_ExecutorRun(QueryDesc *queryDesc, ScanDirection direction, uint64 count, bool execute_once); static void pgss_ExecutorRun(QueryDesc *queryDesc, ScanDirection direction, uint64 count, bool execute_once);
static void pgss_ExecutorFinish(QueryDesc *queryDesc); static void pgss_ExecutorFinish(QueryDesc *queryDesc);
static void pgss_ExecutorEnd(QueryDesc *queryDesc); static void pgss_ExecutorEnd(QueryDesc *queryDesc);
#if PG_VERSION_NUM >= 130000
static void pgss_ProcessUtility(PlannedStmt *pstmt, const char *queryString,
ProcessUtilityContext context,
ParamListInfo params, QueryEnvironment *queryEnv,
DestReceiver *dest,
QueryCompletion *qc
);
#else
static void pgss_ProcessUtility(PlannedStmt *pstmt, const char *queryString, static void pgss_ProcessUtility(PlannedStmt *pstmt, const char *queryString,
ProcessUtilityContext context, ParamListInfo params, ProcessUtilityContext context, ParamListInfo params,
QueryEnvironment *queryEnv, QueryEnvironment *queryEnv,
DestReceiver *dest, DestReceiver *dest,
#if PG_VERSION_NUM >= 130000
QueryCompletion *qc);
#else
char *completionTag); char *completionTag);
#endif #endif
static uint64 pgss_hash_string(const char *str, int len); static uint64 pgss_hash_string(const char *str, int len);
static void pgss_store(const char *query, uint64 queryId, static void pgss_store(const char *query, uint64 queryId,
int query_location, int query_len, int query_location, int query_len,
bool kind,
double total_time, uint64 rows, double total_time, uint64 rows,
const BufferUsage *bufusage, float utime, float stime, const BufferUsage *bufusage,
pgssJumbleState *jstate); #if PG_VERSION_NUM >= 130000
const WalUsage *walusage,
#endif
pgssJumbleState *jstate,
float utime, float stime);
static void pg_stat_monitor_internal(FunctionCallInfo fcinfo, static void pg_stat_monitor_internal(FunctionCallInfo fcinfo,
bool showtext); bool showtext);
static Size pgss_memsize(void); static Size pgss_memsize(void);
@ -232,10 +249,6 @@ pgss_shmem_startup(void)
bool found = false; bool found = false;
int32 i; int32 i;
elog(WARNING, "pg_stat_monitor: %s()", __FUNCTION__);
Assert(IsHashInitialize());
if (prev_shmem_startup_hook) if (prev_shmem_startup_hook)
prev_shmem_startup_hook(); prev_shmem_startup_hook();
@ -290,7 +303,7 @@ pgss_shmem_startup(void)
sizeof(pgssAggEntry), sizeof(pgssAggEntry),
PGSM_MAX * 3); PGSM_MAX * 3);
Assert(!IsHashInitialize()); Assert(IsHashInitialize());
pgssWaitEventEntries = malloc(sizeof (pgssWaitEventEntry) * MAX_BACKEND_PROCESES); pgssWaitEventEntries = malloc(sizeof (pgssWaitEventEntry) * MAX_BACKEND_PROCESES);
for (i = 0; i < MAX_BACKEND_PROCESES; i++) for (i = 0; i < MAX_BACKEND_PROCESES; i++)
@ -372,7 +385,7 @@ pgss_post_parse_analyze(ParseState *pstate, Query *query)
Assert(query->queryId == UINT64CONST(0)); Assert(query->queryId == UINT64CONST(0));
/* Safety check... */ /* Safety check... */
if (IsHashInitialize()) if (!IsHashInitialize())
return; return;
/* /*
@ -438,12 +451,16 @@ pgss_post_parse_analyze(ParseState *pstate, Query *query)
query->queryId, query->queryId,
query->stmt_location, query->stmt_location,
query->stmt_len, query->stmt_len,
PGSS_INVALID,
0, 0,
0, 0,
NULL, NULL,
0, #if PG_VERSION_NUM >= 130000
0, NULL,
&jstate); #endif
&jstate,
0.0,
0.0);
} }
/* /*
@ -535,9 +552,9 @@ pgss_ExecutorFinish(QueryDesc *queryDesc)
static void static void
pgss_ExecutorEnd(QueryDesc *queryDesc) pgss_ExecutorEnd(QueryDesc *queryDesc)
{ {
uint64 queryId = queryDesc->plannedstmt->queryId;
float utime; float utime;
float stime; float stime;
uint64 queryId = queryDesc->plannedstmt->queryId;
if (queryId != UINT64CONST(0) && queryDesc->totaltime && PGSS_ENABLED()) if (queryId != UINT64CONST(0) && queryDesc->totaltime && PGSS_ENABLED())
{ {
@ -549,16 +566,21 @@ pgss_ExecutorEnd(QueryDesc *queryDesc)
getrusage(RUSAGE_SELF, &rusage_end); getrusage(RUSAGE_SELF, &rusage_end);
utime = TIMEVAL_DIFF(rusage_start.ru_utime, rusage_end.ru_utime); utime = TIMEVAL_DIFF(rusage_start.ru_utime, rusage_end.ru_utime);
stime = TIMEVAL_DIFF(rusage_start.ru_stime, rusage_end.ru_stime); stime = TIMEVAL_DIFF(rusage_start.ru_stime, rusage_end.ru_stime);
pgss_store(queryDesc->sourceText, pgss_store(queryDesc->sourceText,
queryId, queryId,
queryDesc->plannedstmt->stmt_location, queryDesc->plannedstmt->stmt_location,
queryDesc->plannedstmt->stmt_len, queryDesc->plannedstmt->stmt_len,
PGSS_EXEC,
queryDesc->totaltime->total * 1000.0, /* convert to msec */ queryDesc->totaltime->total * 1000.0, /* convert to msec */
queryDesc->estate->es_processed, queryDesc->estate->es_processed,
&queryDesc->totaltime->bufusage, &queryDesc->totaltime->bufusage,
#if PG_VERSION_NUM >= 130000
&queryDesc->totaltime->walusage,
#endif
NULL,
utime, utime,
stime, stime);
NULL);
} }
if (prev_ExecutorEnd) if (prev_ExecutorEnd)
@ -571,14 +593,17 @@ pgss_ExecutorEnd(QueryDesc *queryDesc)
/* /*
* ProcessUtility hook * ProcessUtility hook
*/ */
static void #if PG_VERSION_NUM >= 130000
pgss_ProcessUtility(PlannedStmt *pstmt, const char *queryString, static void pgss_ProcessUtility(PlannedStmt *pstmt, const char *queryString,
ProcessUtilityContext context, ProcessUtilityContext context,
ParamListInfo params, QueryEnvironment *queryEnv, ParamListInfo params, QueryEnvironment *queryEnv,
DestReceiver *dest, DestReceiver *dest,
#if PG_VERSION_NUM >= 130000
QueryCompletion *qc) QueryCompletion *qc)
#else #else
static void pgss_ProcessUtility(PlannedStmt *pstmt, const char *queryString,
ProcessUtilityContext context, ParamListInfo params,
QueryEnvironment *queryEnv,
DestReceiver *dest,
char *completionTag) char *completionTag)
#endif #endif
{ {
@ -598,7 +623,7 @@ pgss_ProcessUtility(PlannedStmt *pstmt, const char *queryString,
* *
* Likewise, we don't track execution of DEALLOCATE. * Likewise, we don't track execution of DEALLOCATE.
*/ */
if (PGSM_TRACK_UTILITY && PGSS_ENABLED() && if (PGSM_TRACK_UTILITY &&
!IsA(parsetree, ExecuteStmt) && !IsA(parsetree, ExecuteStmt) &&
!IsA(parsetree, PrepareStmt) && !IsA(parsetree, PrepareStmt) &&
!IsA(parsetree, DeallocateStmt)) !IsA(parsetree, DeallocateStmt))
@ -608,42 +633,61 @@ pgss_ProcessUtility(PlannedStmt *pstmt, const char *queryString,
uint64 rows; uint64 rows;
BufferUsage bufusage_start, BufferUsage bufusage_start,
bufusage; bufusage;
#if PG_VERSION_NUM >= 130000
WalUsage walusage_start,
walusage;
walusage_start = pgWalUsage;
exec_nested_level++;
#endif
bufusage_start = pgBufferUsage; bufusage_start = pgBufferUsage;
INSTR_TIME_SET_CURRENT(start); INSTR_TIME_SET_CURRENT(start);
nested_level++;
PG_TRY(); PG_TRY();
{ {
if (prev_ProcessUtility) if (prev_ProcessUtility)
prev_ProcessUtility(pstmt, queryString, prev_ProcessUtility(pstmt, queryString,
context, params, queryEnv, context, params, queryEnv,
dest
#if PG_VERSION_NUM >= 130000 #if PG_VERSION_NUM >= 130000
dest, qc); ,qc
#else #else
dest, completionTag); ,completionTag
#endif #endif
);
else else
standard_ProcessUtility(pstmt, queryString, standard_ProcessUtility(pstmt, queryString,
context, params, queryEnv, context, params, queryEnv,
dest
#if PG_VERSION_NUM >= 130000 #if PG_VERSION_NUM >= 130000
dest, qc); ,qc
#else #else
dest, completionTag); ,completionTag
#endif #endif
nested_level--; );
} }
#if PG_VERSION_NUM >= 130000
PG_FINALLY();
{
exec_nested_level--;
}
#else
PG_CATCH(); PG_CATCH();
{ {
nested_level--; nested_level--;
PG_RE_THROW(); PG_RE_THROW();
}
PG_END_TRY();
}
#endif
PG_END_TRY();
INSTR_TIME_SET_CURRENT(duration); INSTR_TIME_SET_CURRENT(duration);
INSTR_TIME_SUBTRACT(duration, start); INSTR_TIME_SUBTRACT(duration, start);
#if PG_VERSION_NUM >= 130000 #if PG_VERSION_NUM >= 130000
rows = (qc && qc->commandTag == CMDTAG_COPY) ? qc->nprocessed : 0; rows = (qc && qc->commandTag == CMDTAG_COPY) ? qc->nprocessed : 0;
/* calc differences of WAL counters. */
memset(&walusage, 0, sizeof(WalUsage));
WalUsageAccumDiff(&walusage, &pgWalUsage, &walusage_start);
#else #else
/* parse command tag to retrieve the number of affected rows. */ /* parse command tag to retrieve the number of affected rows. */
if (completionTag && strncmp(completionTag, "COPY ", 5) == 0) if (completionTag && strncmp(completionTag, "COPY ", 5) == 0)
@ -653,64 +697,69 @@ pgss_ProcessUtility(PlannedStmt *pstmt, const char *queryString,
#endif #endif
/* calc differences of buffer counters. */ /* calc differences of buffer counters. */
bufusage.shared_blks_hit = memset(&bufusage, 0, sizeof(BufferUsage));
pgBufferUsage.shared_blks_hit - bufusage_start.shared_blks_hit; BufferUsageAccumDiff(&bufusage, &pgBufferUsage, &bufusage_start);
bufusage.shared_blks_read =
pgBufferUsage.shared_blks_read - bufusage_start.shared_blks_read;
bufusage.shared_blks_dirtied =
pgBufferUsage.shared_blks_dirtied - bufusage_start.shared_blks_dirtied;
bufusage.shared_blks_written =
pgBufferUsage.shared_blks_written - bufusage_start.shared_blks_written;
bufusage.local_blks_hit =
pgBufferUsage.local_blks_hit - bufusage_start.local_blks_hit;
bufusage.local_blks_read =
pgBufferUsage.local_blks_read - bufusage_start.local_blks_read;
bufusage.local_blks_dirtied =
pgBufferUsage.local_blks_dirtied - bufusage_start.local_blks_dirtied;
bufusage.local_blks_written =
pgBufferUsage.local_blks_written - bufusage_start.local_blks_written;
bufusage.temp_blks_read =
pgBufferUsage.temp_blks_read - bufusage_start.temp_blks_read;
bufusage.temp_blks_written =
pgBufferUsage.temp_blks_written - bufusage_start.temp_blks_written;
bufusage.blk_read_time = pgBufferUsage.blk_read_time;
INSTR_TIME_SUBTRACT(bufusage.blk_read_time, bufusage_start.blk_read_time);
bufusage.blk_write_time = pgBufferUsage.blk_write_time;
INSTR_TIME_SUBTRACT(bufusage.blk_write_time, bufusage_start.blk_write_time);
pgss_store(queryString, pgss_store(queryString,
0, /* signal that it's a utility stmt */ 0, /* signal that it's a utility stmt */
pstmt->stmt_location, pstmt->stmt_location,
pstmt->stmt_len, pstmt->stmt_len,
PGSS_EXEC,
INSTR_TIME_GET_MILLISEC(duration), INSTR_TIME_GET_MILLISEC(duration),
rows, rows,
&bufusage, &bufusage,
#if PG_VERSION_NUM >= 130000
&walusage,
#endif
NULL,
0, 0,
0, 0);
NULL);
} }
else else
{ {
if (prev_ProcessUtility) if (prev_ProcessUtility)
prev_ProcessUtility(pstmt, queryString, prev_ProcessUtility(pstmt, queryString,
context, params, queryEnv, context, params, queryEnv,
dest
#if PG_VERSION_NUM >= 130000 #if PG_VERSION_NUM >= 130000
dest, qc); ,qc
#else #else
dest, completionTag); ,completionTag
#endif #endif
);
else
standard_ProcessUtility(pstmt, queryString, standard_ProcessUtility(pstmt, queryString,
context, params, queryEnv, context, params, queryEnv,
dest
#if PG_VERSION_NUM >= 130000 #if PG_VERSION_NUM >= 130000
dest, qc); ,qc
#else #else
dest, completionTag); ,completionTag
#endif #endif
);
} }
} }
#if PG_VERSION_NUM < 130000
static void
BufferUsageAccumDiff(BufferUsage* bufusage, BufferUsage* pgBufferUsage, BufferUsage* bufusage_start)
{
/* calc differences of buffer counters. */
bufusage->shared_blks_hit = pgBufferUsage->shared_blks_hit - bufusage_start->shared_blks_hit;
bufusage->shared_blks_read = pgBufferUsage->shared_blks_read - bufusage_start->shared_blks_read;
bufusage->shared_blks_dirtied = pgBufferUsage->shared_blks_dirtied - bufusage_start->shared_blks_dirtied;
bufusage->shared_blks_written = pgBufferUsage->shared_blks_written - bufusage_start->shared_blks_written;
bufusage->local_blks_hit = pgBufferUsage->local_blks_hit - bufusage_start->local_blks_hit;
bufusage->local_blks_read = pgBufferUsage->local_blks_read - bufusage_start->local_blks_read;
bufusage->local_blks_dirtied = pgBufferUsage->local_blks_dirtied - bufusage_start->local_blks_dirtied;
bufusage->local_blks_written = pgBufferUsage->local_blks_written - bufusage_start->local_blks_written;
bufusage->temp_blks_read = pgBufferUsage->temp_blks_read - bufusage_start->temp_blks_read;
bufusage->temp_blks_written = pgBufferUsage->temp_blks_written - bufusage_start->temp_blks_written;
bufusage->blk_read_time = pgBufferUsage->blk_read_time;
INSTR_TIME_SUBTRACT(bufusage->blk_read_time, bufusage_start->blk_read_time);
bufusage->blk_write_time = pgBufferUsage->blk_write_time;
INSTR_TIME_SUBTRACT(bufusage->blk_write_time, bufusage_start->blk_write_time);
}
#endif
/* /*
* Given an arbitrarily long query string, produce a hash for the purposes of * Given an arbitrarily long query string, produce a hash for the purposes of
* identifying the query, without normalizing constants. Used when hashing * identifying the query, without normalizing constants. Used when hashing
@ -769,12 +818,16 @@ pg_get_client_addr(void)
* we have no statistics as yet; we just want to record the normalized * we have no statistics as yet; we just want to record the normalized
* query string. total_time, rows, bufusage are ignored in this case. * query string. total_time, rows, bufusage are ignored in this case.
*/ */
static void static void pgss_store(const char *query, uint64 queryId,
pgss_store(const char *query, uint64 queryId,
int query_location, int query_len, int query_location, int query_len,
bool kind,
double total_time, uint64 rows, double total_time, uint64 rows,
const BufferUsage *bufusage, const BufferUsage *bufusage,
float utime, float stime, pgssJumbleState *jstate) #if PG_VERSION_NUM >= 130000
const WalUsage *walusage,
#endif
pgssJumbleState *jstate,
float utime, float stime)
{ {
pgssHashKey key; pgssHashKey key;
pgssEntry *entry; pgssEntry *entry;
@ -787,7 +840,7 @@ pgss_store(const char *query, uint64 queryId,
Assert(query != NULL); Assert(query != NULL);
/* Safety check... */ /* Safety check... */
if (IsHashInitialize() || !pgss_qbuf[pgss->current_wbucket]) if (!IsHashInitialize() || !pgss_qbuf[pgss->current_wbucket])
return; return;
/* /*
@ -917,16 +970,16 @@ pgss_store(const char *query, uint64 queryId,
update_agg_counters(entry->key.bucket_id, key.queryid, pg_get_client_addr(), AGG_KEY_HOST); update_agg_counters(entry->key.bucket_id, key.queryid, pg_get_client_addr(), AGG_KEY_HOST);
/* "Unstick" entry if it was previously sticky */ /* "Unstick" entry if it was previously sticky */
if (e->counters.calls.calls == 0) if (e->counters.calls[kind].calls == 0)
e->counters.calls.usage = USAGE_INIT; e->counters.calls[kind].usage = USAGE_INIT;
e->counters.calls[kind].calls += 1;
e->counters.time[kind].total_time += total_time;
e->counters.calls.calls += 1; if (e->counters.calls[kind].calls == 1)
e->counters.time.total_time += total_time;
if (e->counters.calls.calls == 1)
{ {
e->counters.time.min_time = total_time; e->counters.time[kind].min_time = total_time;
e->counters.time.max_time = total_time; e->counters.time[kind].max_time = total_time;
e->counters.time.mean_time = total_time; e->counters.time[kind].mean_time = total_time;
} }
else else
{ {
@ -934,18 +987,18 @@ pgss_store(const char *query, uint64 queryId,
* Welford's method for accurately computing variance. See * Welford's method for accurately computing variance. See
* <http://www.johndcook.com/blog/standard_deviation/> * <http://www.johndcook.com/blog/standard_deviation/>
*/ */
double old_mean = e->counters.time.mean_time; double old_mean = e->counters.time[kind].mean_time;
e->counters.time.mean_time += e->counters.time[kind].mean_time +=
(total_time - old_mean) / e->counters.calls.calls; (total_time - old_mean) / e->counters.calls[kind].calls;
e->counters.time.sum_var_time += e->counters.time[kind].sum_var_time +=
(total_time - old_mean) * (total_time - e->counters.time.mean_time); (total_time - old_mean) * (total_time - e->counters.time[kind].mean_time);
/* calculate min and max time */ /* calculate min and max time */
if (e->counters.time.min_time > total_time) if (e->counters.time[kind].min_time > total_time)
e->counters.time.min_time = total_time; e->counters.time[kind].min_time = total_time;
if (e->counters.time.max_time < total_time) if (e->counters.time[kind].max_time < total_time)
e->counters.time.max_time = total_time; e->counters.time[kind].max_time = total_time;
} }
for (i = 0; i < MAX_RESPONSE_BUCKET - 1; i++) for (i = 0; i < MAX_RESPONSE_BUCKET - 1; i++)
@ -959,7 +1012,7 @@ pgss_store(const char *query, uint64 queryId,
if (total_time > PGSM_RESPOSE_TIME_LOWER_BOUND + (PGSM_RESPOSE_TIME_STEP * MAX_RESPONSE_BUCKET)) if (total_time > PGSM_RESPOSE_TIME_LOWER_BOUND + (PGSM_RESPOSE_TIME_STEP * MAX_RESPONSE_BUCKET))
pgssBucketEntries[entry->key.bucket_id]->counters.resp_calls[MAX_RESPONSE_BUCKET - 1]++; pgssBucketEntries[entry->key.bucket_id]->counters.resp_calls[MAX_RESPONSE_BUCKET - 1]++;
e->counters.calls.rows += rows; e->counters.calls[kind].rows += rows;
e->counters.blocks.shared_blks_hit += bufusage->shared_blks_hit; e->counters.blocks.shared_blks_hit += bufusage->shared_blks_hit;
e->counters.blocks.shared_blks_read += bufusage->shared_blks_read; e->counters.blocks.shared_blks_read += bufusage->shared_blks_read;
e->counters.blocks.shared_blks_dirtied += bufusage->shared_blks_dirtied; e->counters.blocks.shared_blks_dirtied += bufusage->shared_blks_dirtied;
@ -972,7 +1025,7 @@ pgss_store(const char *query, uint64 queryId,
e->counters.blocks.temp_blks_written += bufusage->temp_blks_written; e->counters.blocks.temp_blks_written += bufusage->temp_blks_written;
e->counters.blocks.blk_read_time += INSTR_TIME_GET_MILLISEC(bufusage->blk_read_time); e->counters.blocks.blk_read_time += INSTR_TIME_GET_MILLISEC(bufusage->blk_read_time);
e->counters.blocks.blk_write_time += INSTR_TIME_GET_MILLISEC(bufusage->blk_write_time); e->counters.blocks.blk_write_time += INSTR_TIME_GET_MILLISEC(bufusage->blk_write_time);
e->counters.calls.usage += USAGE_EXEC(total_time); e->counters.calls[kind].usage += USAGE_EXEC(total_time);
e->counters.info.host = pg_get_client_addr(); e->counters.info.host = pg_get_client_addr();
e->counters.sysinfo.utime = utime; e->counters.sysinfo.utime = utime;
e->counters.sysinfo.stime = stime; e->counters.sysinfo.stime = stime;
@ -1004,7 +1057,7 @@ pg_stat_monitor_reset(PG_FUNCTION_ARGS)
PG_RETURN_VOID(); PG_RETURN_VOID();
} }
#define PG_STAT_STATEMENTS_COLS 31 /* maximum of above */ #define PG_STAT_STATEMENTS_COLS 38 /* maximum of above */
Datum Datum
pg_stat_wait_events(PG_FUNCTION_ARGS) pg_stat_wait_events(PG_FUNCTION_ARGS)
@ -1221,30 +1274,22 @@ pg_stat_monitor_internal(FunctionCallInfo fcinfo,
nulls[i++] = true; nulls[i++] = true;
} }
/* Skip entry if unexecuted (ie, it's a pending "sticky" entry) */
if (tmp.calls.calls == 0)
continue;
values[i++] = TimestampGetDatum(pgssBucketEntries[entry->key.bucket_id]->counters.current_time); values[i++] = TimestampGetDatum(pgssBucketEntries[entry->key.bucket_id]->counters.current_time);
values[i++] = Int64GetDatumFast(tmp.calls.calls);
values[i++] = Float8GetDatumFast(tmp.time.total_time);
values[i++] = Float8GetDatumFast(tmp.time.min_time);
values[i++] = Float8GetDatumFast(tmp.time.max_time);
values[i++] = Float8GetDatumFast(tmp.time.mean_time);
/* for (int kind = 0; kind < PGSS_NUMKIND; kind++)
* Note we are calculating the population variance here, not the {
* sample variance, as we have data for the whole population, so values[i++] = Int64GetDatumFast(tmp.calls[kind].calls);
* Bessel's correction is not used, and we don't divide by values[i++] = Float8GetDatumFast(tmp.time[kind].total_time);
* tmp.calls - 1. values[i++] = Float8GetDatumFast(tmp.time[kind].min_time);
*/ values[i++] = Float8GetDatumFast(tmp.time[kind].max_time);
if (tmp.calls.calls > 1) values[i++] = Float8GetDatumFast(tmp.time[kind].mean_time);
stddev = sqrt(tmp.time.sum_var_time / tmp.calls.calls); if (tmp.calls[kind].calls > 1)
stddev = sqrt(tmp.time[kind].sum_var_time / tmp.calls[kind].calls);
else else
stddev = 0.0; stddev = 0.0;
values[i++] = Float8GetDatumFast(stddev); values[i++] = Float8GetDatumFast(stddev);
values[i++] = Int64GetDatumFast(tmp.calls.rows); values[i++] = Int64GetDatumFast(tmp.calls[kind].rows);
}
values[i++] = Int64GetDatumFast(tmp.blocks.shared_blks_hit); values[i++] = Int64GetDatumFast(tmp.blocks.shared_blks_hit);
values[i++] = Int64GetDatumFast(tmp.blocks.shared_blks_read); values[i++] = Int64GetDatumFast(tmp.blocks.shared_blks_read);
values[i++] = Int64GetDatumFast(tmp.blocks.shared_blks_dirtied); values[i++] = Int64GetDatumFast(tmp.blocks.shared_blks_dirtied);
@ -1332,7 +1377,7 @@ entry_alloc(pgssSharedState *pgss, pgssHashKey *key, Size query_offset, int quer
/* reset the statistics */ /* reset the statistics */
memset(&entry->counters, 0, sizeof(Counters)); memset(&entry->counters, 0, sizeof(Counters));
/* set the appropriate initial usage count */ /* set the appropriate initial usage count */
entry->counters.calls.usage = sticky ? pgss->cur_median_usage : USAGE_INIT; entry->counters.calls[0].usage = sticky ? pgss->cur_median_usage : USAGE_INIT;
/* re-initialize the mutex each time ... we assume no one using it */ /* re-initialize the mutex each time ... we assume no one using it */
SpinLockInit(&entry->mutex); SpinLockInit(&entry->mutex);
/* ... and don't forget the query text metadata */ /* ... and don't forget the query text metadata */
@ -2618,6 +2663,7 @@ static PlannedStmt * pgss_planner_hook(Query *parse, const char *query_string, i
static PlannedStmt *pgss_planner_hook(Query *parse, int opt, ParamListInfo param) static PlannedStmt *pgss_planner_hook(Query *parse, int opt, ParamListInfo param)
#endif #endif
{ {
PlannedStmt *result;
if (MyProc) if (MyProc)
{ {
int i = MyProc - ProcGlobal->allProcs; int i = MyProc - ProcGlobal->allProcs;
@ -2625,14 +2671,75 @@ static PlannedStmt *pgss_planner_hook(Query *parse, int opt, ParamListInfo param
pgssWaitEventEntries[i]->key.queryid = parse->queryId; pgssWaitEventEntries[i]->key.queryid = parse->queryId;
} }
#if PG_VERSION_NUM >= 130000 #if PG_VERSION_NUM >= 130000
if (PGSM_TRACK_PLANNING && query_string
&& parse->queryId != UINT64CONST(0))
{
instr_time start;
instr_time duration;
BufferUsage bufusage_start,
bufusage;
WalUsage walusage_start,
walusage;
/* We need to track buffer usage as the planner can access them. */
bufusage_start = pgBufferUsage;
/*
* Similarly the planner could write some WAL records in some cases
* (e.g. setting a hint bit with those being WAL-logged)
*/
walusage_start = pgWalUsage;
INSTR_TIME_SET_CURRENT(start);
plan_nested_level++;
PG_TRY();
{
if (planner_hook_next) if (planner_hook_next)
return planner_hook_next(parse, query_string, cursorOptions, boundParams); result = planner_hook_next(parse, query_string, cursorOptions, boundParams);
return standard_planner(parse, query_string, cursorOptions, boundParams); result = standard_planner(parse, query_string, cursorOptions, boundParams);
}
PG_FINALLY();
{
plan_nested_level--;
}
PG_END_TRY();
INSTR_TIME_SET_CURRENT(duration);
INSTR_TIME_SUBTRACT(duration, start);
/* calc differences of buffer counters. */
memset(&bufusage, 0, sizeof(BufferUsage));
BufferUsageAccumDiff(&bufusage, &pgBufferUsage, &bufusage_start);
/* calc differences of WAL counters. */
memset(&walusage, 0, sizeof(WalUsage));
WalUsageAccumDiff(&walusage, &pgWalUsage, &walusage_start);
pgss_store(query_string,
parse->queryId,
parse->stmt_location,
parse->stmt_len,
PGSS_PLAN,
INSTR_TIME_GET_MILLISEC(duration),
0,
&bufusage,
&walusage,
NULL,
0,
0);
}
else
{
if (planner_hook_next)
result = planner_hook_next(parse, query_string, cursorOptions, boundParams);
result = standard_planner(parse, query_string, cursorOptions, boundParams);
}
#else #else
if (planner_hook_next) if (planner_hook_next)
return planner_hook_next(parse, opt, param); result = planner_hook_next(parse, opt, param);
return standard_planner(parse, opt, param); result = standard_planner(parse, opt, param);
#endif #endif
return result;
} }
static void static void

View File

@ -36,7 +36,7 @@
#include "utils/lsyscache.h" #include "utils/lsyscache.h"
#include "utils/guc.h" #include "utils/guc.h"
#define IsHashInitialize() (!pgss || !pgss_hash || !pgss_object_hash || !pgss_agghash || !pgss_buckethash || !pgss_waiteventshash) #define IsHashInitialize() (pgss || pgss_hash || pgss_object_hash || pgss_agghash || pgss_buckethash || pgss_waiteventshash)
#define MAX_BACKEND_PROCESES (MaxBackends + NUM_AUXILIARY_PROCS + max_prepared_xacts) #define MAX_BACKEND_PROCESES (MaxBackends + NUM_AUXILIARY_PROCS + max_prepared_xacts)
@ -73,6 +73,21 @@ typedef struct GucVariables
int guc_max; int guc_max;
bool guc_restart; bool guc_restart;
} GucVariable; } GucVariable;
typedef enum pgssStoreKind
{
PGSS_INVALID = -1,
/*
* PGSS_PLAN and PGSS_EXEC must be respectively 0 and 1 as they're used to
* reference the underlying values in the arrays in the Counters struct,
* and this order is required in pg_stat_statements_internal().
*/
PGSS_PLAN = 0,
PGSS_EXEC,
PGSS_NUMKIND /* Must be last value of this enum */
} pgssStoreKind;
/* /*
* Type of aggregate keys * Type of aggregate keys
@ -217,9 +232,9 @@ typedef struct SysInfo
typedef struct Counters typedef struct Counters
{ {
uint64 bucket_id; /* bucket id */ uint64 bucket_id; /* bucket id */
Calls calls; Calls calls[PGSS_NUMKIND];
QueryInfo info; QueryInfo info;
CallTime time; CallTime time[PGSS_NUMKIND];
Blocks blocks; Blocks blocks;
SysInfo sysinfo; SysInfo sysinfo;
} Counters; } Counters;
@ -344,6 +359,7 @@ void init_guc(void);
#define PGSM_OBJECT_CACHE conf[8].guc_variable #define PGSM_OBJECT_CACHE conf[8].guc_variable
#define PGSM_RESPOSE_TIME_LOWER_BOUND conf[9].guc_variable #define PGSM_RESPOSE_TIME_LOWER_BOUND conf[9].guc_variable
#define PGSM_RESPOSE_TIME_STEP conf[10].guc_variable #define PGSM_RESPOSE_TIME_STEP conf[10].guc_variable
#define PGSM_TRACK_PLANNING conf[11].guc_variable
GucVariable conf[11]; GucVariable conf[12];
#endif #endif