PG-1907 Add columns to track parallel worker activity

These are the same counters as were intorduced in pg_stat_statements in
commit cf54a2c.
pull/571/head
Artem Gavrilov 2025-09-18 19:04:42 +02:00 committed by Artem Gavrilov
parent 9496937c9f
commit bd0546b7c9
10 changed files with 157 additions and 7 deletions

View File

@ -32,7 +32,8 @@ REGRESS = basic \
tags \
user \
level_tracking \
decode_error_level
decode_error_level \
parallel
# Disabled because these tests require "shared_preload_libraries=pg_stat_statements",
# which typical installcheck users do not have (e.g. buildfarm clients).

View File

@ -41,6 +41,7 @@ tests += {
'guc',
'histogram',
'level_tracking'
'parallel',
'pgsqm_query_id',
'relations',
'rows',

View File

@ -87,10 +87,13 @@ CREATE FUNCTION pg_stat_monitor_internal(
OUT jit_deform_count int8,
OUT jit_deform_time float8,
OUT stats_since timestamp with time zone, -- 67
OUT parallel_workers_to_launch int, -- 67
OUT parallel_workers_launched int,
OUT stats_since timestamp with time zone, -- 69
OUT minmax_stats_since timestamp with time zone,
OUT toplevel BOOLEAN, -- 69
OUT toplevel BOOLEAN, -- 71
OUT bucket_done BOOLEAN
)
RETURNS SETOF record
@ -175,6 +178,9 @@ CREATE VIEW pg_stat_monitor AS SELECT
jit_deform_count,
jit_deform_time,
parallel_workers_to_launch,
parallel_workers_launched,
stats_since,
minmax_stats_since

View File

@ -49,7 +49,7 @@ PG_MODULE_MAGIC;
#define PG_STAT_MONITOR_COLS_V1_0 52
#define PG_STAT_MONITOR_COLS_V2_0 64
#define PG_STAT_MONITOR_COLS_V2_1 70
#define PG_STAT_MONITOR_COLS_V2_3 71
#define PG_STAT_MONITOR_COLS_V2_3 73
#define PG_STAT_MONITOR_COLS PG_STAT_MONITOR_COLS_V2_3 /* maximum of above */
#define PGSM_TEXT_FILE PGSTAT_STAT_PERMANENT_DIRECTORY "pg_stat_monitor_query"
@ -230,6 +230,8 @@ static void pgsm_update_entry(pgsmEntry *entry,
BufferUsage *bufusage,
WalUsage *walusage,
const struct JitInstrumentation *jitusage,
int parallel_workers_to_launch,
int parallel_workers_launched,
bool reset,
pgsmStoreKind kind);
static void pgsm_store(pgsmEntry *entry);
@ -783,6 +785,13 @@ pgsm_ExecutorEnd(QueryDesc *queryDesc)
queryDesc->estate->es_jit ? &queryDesc->estate->es_jit->instr : NULL, /* jitusage */
#else
NULL,
#endif
#if PG_VERSION_NUM >= 180000
queryDesc->estate->es_parallel_workers_to_launch, /* parallel_workers_to_launch */
queryDesc->estate->es_parallel_workers_launched, /* parallel_workers_launched */
#else
0, /* parallel_workers_to_launch */
0, /* parallel_workers_launched */
#endif
false, /* reset */
PGSM_EXEC); /* kind */
@ -964,6 +973,8 @@ pgsm_planner_hook(Query *parse, const char *query_string, int cursorOptions, Par
&bufusage, /* bufusage */
&walusage, /* walusage */
NULL, /* jitusage */
0, /* parallel_workers_to_launch */
0, /* parallel_workers_launched */
false, /* reset */
PGSM_PLAN); /* kind */
}
@ -1189,6 +1200,8 @@ pgsm_ProcessUtility(PlannedStmt *pstmt, const char *queryString,
&bufusage, /* bufusage */
&walusage, /* walusage */
NULL, /* jitusage */
0, /* parallel_workers_to_launch */
0, /* parallel_workers_launched */
false, /* reset */
PGSM_EXEC); /* kind */
@ -1373,6 +1386,8 @@ pgsm_update_entry(pgsmEntry *entry,
BufferUsage *bufusage,
WalUsage *walusage,
const struct JitInstrumentation *jitusage,
int parallel_workers_to_launch,
int parallel_workers_launched,
bool reset,
pgsmStoreKind kind)
{
@ -1628,6 +1643,10 @@ pgsm_update_entry(pgsmEntry *entry,
}
}
/* parallel worker counters */
entry->counters.parallel_workers_to_launch += parallel_workers_to_launch;
entry->counters.parallel_workers_launched += parallel_workers_launched;
if (kind == PGSM_STORE)
SpinLockRelease(&entry->mutex);
}
@ -2018,6 +2037,8 @@ pgsm_store(pgsmEntry *entry)
&bufusage, /* bufusage */
&walusage, /* walusage */
&jitusage, /* jitusage */
entry->counters.parallel_workers_to_launch, /* parallel_workers_to_launch */
entry->counters.parallel_workers_launched, /* parallel_workers_launched */
reset, /* reset */
PGSM_STORE);
@ -2548,17 +2569,24 @@ pg_stat_monitor_internal(FunctionCallInfo fcinfo,
}
}
if (api_version >= PGSM_V2_1)
if (api_version >= PGSM_V2_3)
{
/* at column number 67 */
values[i++] = Int64GetDatumFast(tmp.parallel_workers_to_launch);
values[i++] = Int64GetDatumFast(tmp.parallel_workers_launched);
}
if (api_version >= PGSM_V2_1)
{
/* at column number 69 */
values[i++] = TimestampTzGetDatum(entry->stats_since);
values[i++] = TimestampTzGetDatum(entry->minmax_stats_since);
}
/* toplevel at column number 69 */
/* toplevel at column number 71 */
values[i++] = BoolGetDatum(toplevel);
/* bucket_done at column number 70 */
/* bucket_done at column number 72 */
values[i++] = BoolGetDatum(pg_atomic_read_u64(&pgsm->current_wbucket) != bucketid);
/* clean up and return the tuplestore */

View File

@ -328,6 +328,10 @@ typedef struct Counters
Wal_Usage walusage;
int resp_calls[MAX_RESPONSE_BUCKET]; /* execution time's in
* msec */
int64 parallel_workers_to_launch; /* # of parallel workers planned
* to be launched */
int64 parallel_workers_launched; /* # of parallel workers actually
* launched */
} Counters;
/* Some global structure to get the cpu usage, really don't like the idea of global variable */

View File

@ -0,0 +1,6 @@
--
-- Tests for parallel statistics
--
SELECT setting::integer < 180000 AS skip_test FROM pg_settings where name = 'server_version_num' \gset
\if :skip_test
\quit

View File

@ -0,0 +1,39 @@
--
-- Tests for parallel statistics
--
SELECT setting::integer < 180000 AS skip_test FROM pg_settings where name = 'server_version_num' \gset
\if :skip_test
\quit
\endif
CREATE EXTENSION pg_stat_monitor;
SET pgsm.track_utility = FALSE;
-- encourage use of parallel plans
SET parallel_setup_cost = 0;
SET parallel_tuple_cost = 0;
SET min_parallel_table_scan_size = 0;
SET max_parallel_workers_per_gather = 2;
CREATE TABLE pgsm_parallel_tab (a int);
SELECT pg_stat_monitor_reset() IS NOT NULL AS t;
t
---
t
(1 row)
SELECT count(*) FROM pgsm_parallel_tab;
count
-------
0
(1 row)
SELECT query,
parallel_workers_to_launch > 0 AS has_workers_to_launch,
parallel_workers_launched > 0 AS has_workers_launched
FROM pg_stat_monitor
WHERE query ~ 'SELECT count'
ORDER BY query COLLATE "C";
query | has_workers_to_launch | has_workers_launched
----------------------------------------+-----------------------+----------------------
SELECT count(*) FROM pgsm_parallel_tab | t | t
(1 row)
DROP TABLE pgsm_parallel_tab;

View File

@ -0,0 +1,32 @@
--
-- Tests for parallel statistics
--
SELECT setting::integer < 180000 AS skip_test FROM pg_settings where name = 'server_version_num' \gset
\if :skip_test
\quit
\endif
CREATE EXTENSION pg_stat_monitor;
SET pgsm.track_utility = FALSE;
-- encourage use of parallel plans
SET parallel_setup_cost = 0;
SET parallel_tuple_cost = 0;
SET min_parallel_table_scan_size = 0;
SET max_parallel_workers_per_gather = 2;
CREATE TABLE pgsm_parallel_tab (a int);
SELECT pg_stat_monitor_reset() IS NOT NULL AS t;
SELECT count(*) FROM pgsm_parallel_tab;
SELECT query,
parallel_workers_to_launch > 0 AS has_workers_to_launch,
parallel_workers_launched > 0 AS has_workers_launched
FROM pg_stat_monitor
WHERE query ~ 'SELECT count'
ORDER BY query COLLATE "C";
DROP TABLE pgsm_parallel_tab;

View File

@ -31,6 +31,7 @@ my %pg_versions_pgsm_columns = ( 18 => "application_name,".
"local_blk_read_time,local_blk_write_time,local_blks_dirtied,local_blks_hit,".
"local_blks_read,local_blks_written,max_exec_time,max_plan_time,mean_exec_time," .
"mean_plan_time,message,min_exec_time,min_plan_time,minmax_stats_since," .
"parallel_workers_launched,parallel_workers_to_launch," .
"pgsm_query_id,planid,plans,query,query_plan,queryid,relations,resp_calls,rows," .
"shared_blk_read_time,shared_blk_write_time,shared_blks_dirtied," .
"shared_blks_hit,shared_blks_read,shared_blks_written,sqlcode,stats_since," .

View File

@ -145,6 +145,14 @@ if ($PGSM::PG_MAJOR_VERSION >= 18)
($cmdret, $stdout, $stderr) = $node->psql('postgres', 'SELECT SUM(PGSM.wal_buffers_full) = SUM(PGSS.wal_buffers_full) FROM pg_stat_monitor AS PGSM INNER JOIN pg_stat_statements AS PGSS ON PGSS.query = PGSM.query WHERE PGSM.query LIKE \'%DELETE FROM pgbench_accounts%\' GROUP BY PGSM.query;', extra_params => ['-Pformat=unaligned','-Ptuples_only=on']);
trim($stdout);
is($stdout,'t',"Compare: wal_buffers_full are equal.");
($cmdret, $stdout, $stderr) = $node->psql('postgres', 'SELECT SUM(PGSM.parallel_workers_to_launch) = SUM(PGSS.parallel_workers_to_launch) FROM pg_stat_monitor AS PGSM INNER JOIN pg_stat_statements AS PGSS ON PGSS.query = PGSM.query WHERE PGSM.query LIKE \'%DELETE FROM pgbench_accounts%\' GROUP BY PGSM.query;', extra_params => ['-Pformat=unaligned','-Ptuples_only=on']);
trim($stdout);
is($stdout,'t',"Compare: parallel_workers_to_launch are equal.");
($cmdret, $stdout, $stderr) = $node->psql('postgres', 'SELECT SUM(PGSM.parallel_workers_launched) = SUM(PGSS.parallel_workers_launched) FROM pg_stat_monitor AS PGSM INNER JOIN pg_stat_statements AS PGSS ON PGSS.query = PGSM.query WHERE PGSM.query LIKE \'%DELETE FROM pgbench_accounts%\' GROUP BY PGSM.query;', extra_params => ['-Pformat=unaligned','-Ptuples_only=on']);
trim($stdout);
is($stdout,'t',"Compare: parallel_workers_launched are equal.");
}
# Compare values for query 'INSERT INTO pgbench_history (tid, bid, aid, delta, mtime) VALUES ($1, $2, $3, $4, CURRENT_TIMESTAMP)'
@ -201,6 +209,14 @@ if ($PGSM::PG_MAJOR_VERSION >= 18)
($cmdret, $stdout, $stderr) = $node->psql('postgres', 'SELECT SUM(PGSM.wal_buffers_full) = SUM(PGSS.wal_buffers_full) FROM pg_stat_monitor AS PGSM INNER JOIN pg_stat_statements AS PGSS ON PGSS.query = PGSM.query WHERE PGSM.query LIKE \'%INSERT INTO pgbench_history%\' GROUP BY PGSM.query;', extra_params => ['-Pformat=unaligned','-Ptuples_only=on']);
trim($stdout);
is($stdout,'t',"Compare: wal_buffers_full are equal.");
($cmdret, $stdout, $stderr) = $node->psql('postgres', 'SELECT SUM(PGSM.parallel_workers_to_launch) = SUM(PGSS.parallel_workers_to_launch) FROM pg_stat_monitor AS PGSM INNER JOIN pg_stat_statements AS PGSS ON PGSS.query = PGSM.query WHERE PGSM.query LIKE \'%INSERT INTO pgbench_history%\' GROUP BY PGSM.query;', extra_params => ['-Pformat=unaligned','-Ptuples_only=on']);
trim($stdout);
is($stdout,'t',"Compare: parallel_workers_to_launch are equal.");
($cmdret, $stdout, $stderr) = $node->psql('postgres', 'SELECT SUM(PGSM.parallel_workers_launched) = SUM(PGSS.parallel_workers_launched) FROM pg_stat_monitor AS PGSM INNER JOIN pg_stat_statements AS PGSS ON PGSS.query = PGSM.query WHERE PGSM.query LIKE \'%INSERT INTO pgbench_history%\' GROUP BY PGSM.query;', extra_params => ['-Pformat=unaligned','-Ptuples_only=on']);
trim($stdout);
is($stdout,'t',"Compare: parallel_workers_launched are equal.");
}
# Compare values for query 'SELECT abalance FROM pgbench_accounts WHERE aid = $1'
@ -257,6 +273,14 @@ if ($PGSM::PG_MAJOR_VERSION >= 18)
($cmdret, $stdout, $stderr) = $node->psql('postgres', 'SELECT SUM(PGSM.wal_buffers_full) = SUM(PGSS.wal_buffers_full) FROM pg_stat_monitor AS PGSM INNER JOIN pg_stat_statements AS PGSS ON PGSS.query = PGSM.query WHERE PGSM.query LIKE \'%SELECT abalance FROM pgbench_accounts%\' GROUP BY PGSM.query;', extra_params => ['-Pformat=unaligned','-Ptuples_only=on']);
trim($stdout);
is($stdout,'t',"Compare: wal_buffers_full are equal.");
($cmdret, $stdout, $stderr) = $node->psql('postgres', 'SELECT SUM(PGSM.parallel_workers_to_launch) = SUM(PGSS.parallel_workers_to_launch) FROM pg_stat_monitor AS PGSM INNER JOIN pg_stat_statements AS PGSS ON PGSS.query = PGSM.query WHERE PGSM.query LIKE \'%SELECT abalance FROM pgbench_accounts%\' GROUP BY PGSM.query;', extra_params => ['-Pformat=unaligned','-Ptuples_only=on']);
trim($stdout);
is($stdout,'t',"Compare: parallel_workers_to_launch are equal.");
($cmdret, $stdout, $stderr) = $node->psql('postgres', 'SELECT SUM(PGSM.parallel_workers_launched) = SUM(PGSS.parallel_workers_launched) FROM pg_stat_monitor AS PGSM INNER JOIN pg_stat_statements AS PGSS ON PGSS.query = PGSM.query WHERE PGSM.query LIKE \'%SELECT abalance FROM pgbench_accounts%\' GROUP BY PGSM.query;', extra_params => ['-Pformat=unaligned','-Ptuples_only=on']);
trim($stdout);
is($stdout,'t',"Compare: parallel_workers_launched are equal.");
}
# Compare values for query 'UPDATE pgbench_accounts SET abalance = abalance + $1 WHERE aid = $2'
@ -309,6 +333,14 @@ if ($PGSM::PG_MAJOR_VERSION >= 18)
($cmdret, $stdout, $stderr) = $node->psql('postgres', 'SELECT SUM(PGSM.wal_buffers_full) = SUM(PGSS.wal_buffers_full) FROM pg_stat_monitor AS PGSM INNER JOIN pg_stat_statements AS PGSS ON PGSS.query = PGSM.query WHERE PGSM.query LIKE \'%UPDATE pgbench_accounts%\' GROUP BY PGSM.query;', extra_params => ['-Pformat=unaligned','-Ptuples_only=on']);
trim($stdout);
is($stdout,'t',"Compare: wal_buffers_full are equal.");
($cmdret, $stdout, $stderr) = $node->psql('postgres', 'SELECT SUM(PGSM.parallel_workers_to_launch) = SUM(PGSS.parallel_workers_to_launch) FROM pg_stat_monitor AS PGSM INNER JOIN pg_stat_statements AS PGSS ON PGSS.query = PGSM.query WHERE PGSM.query LIKE \'%UPDATE pgbench_accounts%\' GROUP BY PGSM.query;', extra_params => ['-Pformat=unaligned','-Ptuples_only=on']);
trim($stdout);
is($stdout,'t',"Compare: parallel_workers_to_launch are equal.");
($cmdret, $stdout, $stderr) = $node->psql('postgres', 'SELECT SUM(PGSM.parallel_workers_launched) = SUM(PGSS.parallel_workers_launched) FROM pg_stat_monitor AS PGSM INNER JOIN pg_stat_statements AS PGSS ON PGSS.query = PGSM.query WHERE PGSM.query LIKE \'%UPDATE pgbench_accounts%\' GROUP BY PGSM.query;', extra_params => ['-Pformat=unaligned','-Ptuples_only=on']);
trim($stdout);
is($stdout,'t',"Compare: parallel_workers_launched are equal.");
}
# DROP EXTENSION