From 8b956b8279f5de180a2599df9deffe908ea129a4 Mon Sep 17 00:00:00 2001 From: Artem Gavrilov Date: Thu, 18 Sep 2025 19:04:42 +0200 Subject: [PATCH] PG-1907 Add columns to track parallel worker activity These are the same counters as were intorduced in pg_stat_statements in commit cf54a2c. --- Makefile | 3 ++- meson.build | 1 + pg_stat_monitor--2.2--2.3.sql | 10 ++++++-- pg_stat_monitor.c | 36 ++++++++++++++++++++++++--- pg_stat_monitor.h | 4 +++ regression/expected/parallel.out | 6 +++++ regression/expected/parallel_1.out | 39 ++++++++++++++++++++++++++++++ regression/sql/parallel.sql | 32 ++++++++++++++++++++++++ t/018_column_names.pl | 1 + t/025_compare_pgss.pl | 32 ++++++++++++++++++++++++ 10 files changed, 157 insertions(+), 7 deletions(-) create mode 100644 regression/expected/parallel.out create mode 100644 regression/expected/parallel_1.out create mode 100644 regression/sql/parallel.sql diff --git a/Makefile b/Makefile index 0e57e5d..c60f10b 100644 --- a/Makefile +++ b/Makefile @@ -32,7 +32,8 @@ REGRESS = basic \ tags \ user \ level_tracking \ - decode_error_level + decode_error_level \ + parallel # Disabled because these tests require "shared_preload_libraries=pg_stat_statements", # which typical installcheck users do not have (e.g. buildfarm clients). diff --git a/meson.build b/meson.build index afca8bb..136eb22 100644 --- a/meson.build +++ b/meson.build @@ -41,6 +41,7 @@ tests += { 'guc', 'histogram', 'level_tracking' + 'parallel', 'pgsqm_query_id', 'relations', 'rows', diff --git a/pg_stat_monitor--2.2--2.3.sql b/pg_stat_monitor--2.2--2.3.sql index 1647b27..af87539 100644 --- a/pg_stat_monitor--2.2--2.3.sql +++ b/pg_stat_monitor--2.2--2.3.sql @@ -87,10 +87,13 @@ CREATE FUNCTION pg_stat_monitor_internal( OUT jit_deform_count int8, OUT jit_deform_time float8, - OUT stats_since timestamp with time zone, -- 67 + OUT parallel_workers_to_launch int, -- 67 + OUT parallel_workers_launched int, + + OUT stats_since timestamp with time zone, -- 69 OUT minmax_stats_since timestamp with time zone, - OUT toplevel BOOLEAN, -- 69 + OUT toplevel BOOLEAN, -- 71 OUT bucket_done BOOLEAN ) RETURNS SETOF record @@ -175,6 +178,9 @@ CREATE VIEW pg_stat_monitor AS SELECT jit_deform_count, jit_deform_time, + parallel_workers_to_launch, + parallel_workers_launched, + stats_since, minmax_stats_since diff --git a/pg_stat_monitor.c b/pg_stat_monitor.c index 74e5102..4bc2c1f 100644 --- a/pg_stat_monitor.c +++ b/pg_stat_monitor.c @@ -49,7 +49,7 @@ PG_MODULE_MAGIC; #define PG_STAT_MONITOR_COLS_V1_0 52 #define PG_STAT_MONITOR_COLS_V2_0 64 #define PG_STAT_MONITOR_COLS_V2_1 70 -#define PG_STAT_MONITOR_COLS_V2_3 71 +#define PG_STAT_MONITOR_COLS_V2_3 73 #define PG_STAT_MONITOR_COLS PG_STAT_MONITOR_COLS_V2_3 /* maximum of above */ #define PGSM_TEXT_FILE PGSTAT_STAT_PERMANENT_DIRECTORY "pg_stat_monitor_query" @@ -230,6 +230,8 @@ static void pgsm_update_entry(pgsmEntry *entry, BufferUsage *bufusage, WalUsage *walusage, const struct JitInstrumentation *jitusage, + int parallel_workers_to_launch, + int parallel_workers_launched, bool reset, pgsmStoreKind kind); static void pgsm_store(pgsmEntry *entry); @@ -783,6 +785,13 @@ pgsm_ExecutorEnd(QueryDesc *queryDesc) queryDesc->estate->es_jit ? &queryDesc->estate->es_jit->instr : NULL, /* jitusage */ #else NULL, +#endif +#if PG_VERSION_NUM >= 180000 + queryDesc->estate->es_parallel_workers_to_launch, /* parallel_workers_to_launch */ + queryDesc->estate->es_parallel_workers_launched, /* parallel_workers_launched */ +#else + 0, /* parallel_workers_to_launch */ + 0, /* parallel_workers_launched */ #endif false, /* reset */ PGSM_EXEC); /* kind */ @@ -964,6 +973,8 @@ pgsm_planner_hook(Query *parse, const char *query_string, int cursorOptions, Par &bufusage, /* bufusage */ &walusage, /* walusage */ NULL, /* jitusage */ + 0, /* parallel_workers_to_launch */ + 0, /* parallel_workers_launched */ false, /* reset */ PGSM_PLAN); /* kind */ } @@ -1189,6 +1200,8 @@ pgsm_ProcessUtility(PlannedStmt *pstmt, const char *queryString, &bufusage, /* bufusage */ &walusage, /* walusage */ NULL, /* jitusage */ + 0, /* parallel_workers_to_launch */ + 0, /* parallel_workers_launched */ false, /* reset */ PGSM_EXEC); /* kind */ @@ -1373,6 +1386,8 @@ pgsm_update_entry(pgsmEntry *entry, BufferUsage *bufusage, WalUsage *walusage, const struct JitInstrumentation *jitusage, + int parallel_workers_to_launch, + int parallel_workers_launched, bool reset, pgsmStoreKind kind) { @@ -1628,6 +1643,10 @@ pgsm_update_entry(pgsmEntry *entry, } } + /* parallel worker counters */ + entry->counters.parallel_workers_to_launch += parallel_workers_to_launch; + entry->counters.parallel_workers_launched += parallel_workers_launched; + if (kind == PGSM_STORE) SpinLockRelease(&entry->mutex); } @@ -2018,6 +2037,8 @@ pgsm_store(pgsmEntry *entry) &bufusage, /* bufusage */ &walusage, /* walusage */ &jitusage, /* jitusage */ + entry->counters.parallel_workers_to_launch, /* parallel_workers_to_launch */ + entry->counters.parallel_workers_launched, /* parallel_workers_launched */ reset, /* reset */ PGSM_STORE); @@ -2548,17 +2569,24 @@ pg_stat_monitor_internal(FunctionCallInfo fcinfo, } } - if (api_version >= PGSM_V2_1) + if (api_version >= PGSM_V2_3) { /* at column number 67 */ + values[i++] = Int64GetDatumFast(tmp.parallel_workers_to_launch); + values[i++] = Int64GetDatumFast(tmp.parallel_workers_launched); + } + + if (api_version >= PGSM_V2_1) + { + /* at column number 69 */ values[i++] = TimestampTzGetDatum(entry->stats_since); values[i++] = TimestampTzGetDatum(entry->minmax_stats_since); } - /* toplevel at column number 69 */ + /* toplevel at column number 71 */ values[i++] = BoolGetDatum(toplevel); - /* bucket_done at column number 70 */ + /* bucket_done at column number 72 */ values[i++] = BoolGetDatum(pg_atomic_read_u64(&pgsm->current_wbucket) != bucketid); /* clean up and return the tuplestore */ diff --git a/pg_stat_monitor.h b/pg_stat_monitor.h index 0e6c7bc..da18296 100644 --- a/pg_stat_monitor.h +++ b/pg_stat_monitor.h @@ -328,6 +328,10 @@ typedef struct Counters Wal_Usage walusage; int resp_calls[MAX_RESPONSE_BUCKET]; /* execution time's in * msec */ + int64 parallel_workers_to_launch; /* # of parallel workers planned + * to be launched */ + int64 parallel_workers_launched; /* # of parallel workers actually + * launched */ } Counters; /* Some global structure to get the cpu usage, really don't like the idea of global variable */ diff --git a/regression/expected/parallel.out b/regression/expected/parallel.out new file mode 100644 index 0000000..f42bb89 --- /dev/null +++ b/regression/expected/parallel.out @@ -0,0 +1,6 @@ +-- +-- Tests for parallel statistics +-- +SELECT setting::integer < 180000 AS skip_test FROM pg_settings where name = 'server_version_num' \gset +\if :skip_test +\quit diff --git a/regression/expected/parallel_1.out b/regression/expected/parallel_1.out new file mode 100644 index 0000000..981156e --- /dev/null +++ b/regression/expected/parallel_1.out @@ -0,0 +1,39 @@ +-- +-- Tests for parallel statistics +-- +SELECT setting::integer < 180000 AS skip_test FROM pg_settings where name = 'server_version_num' \gset +\if :skip_test +\quit +\endif +CREATE EXTENSION pg_stat_monitor; +SET pgsm.track_utility = FALSE; +-- encourage use of parallel plans +SET parallel_setup_cost = 0; +SET parallel_tuple_cost = 0; +SET min_parallel_table_scan_size = 0; +SET max_parallel_workers_per_gather = 2; +CREATE TABLE pgsm_parallel_tab (a int); +SELECT pg_stat_monitor_reset() IS NOT NULL AS t; + t +--- + t +(1 row) + +SELECT count(*) FROM pgsm_parallel_tab; + count +------- + 0 +(1 row) + +SELECT query, + parallel_workers_to_launch > 0 AS has_workers_to_launch, + parallel_workers_launched > 0 AS has_workers_launched + FROM pg_stat_monitor + WHERE query ~ 'SELECT count' + ORDER BY query COLLATE "C"; + query | has_workers_to_launch | has_workers_launched +----------------------------------------+-----------------------+---------------------- + SELECT count(*) FROM pgsm_parallel_tab | t | t +(1 row) + +DROP TABLE pgsm_parallel_tab; diff --git a/regression/sql/parallel.sql b/regression/sql/parallel.sql new file mode 100644 index 0000000..7862f88 --- /dev/null +++ b/regression/sql/parallel.sql @@ -0,0 +1,32 @@ +-- +-- Tests for parallel statistics +-- + +SELECT setting::integer < 180000 AS skip_test FROM pg_settings where name = 'server_version_num' \gset +\if :skip_test +\quit +\endif + +CREATE EXTENSION pg_stat_monitor; +SET pgsm.track_utility = FALSE; + +-- encourage use of parallel plans +SET parallel_setup_cost = 0; +SET parallel_tuple_cost = 0; +SET min_parallel_table_scan_size = 0; +SET max_parallel_workers_per_gather = 2; + +CREATE TABLE pgsm_parallel_tab (a int); + +SELECT pg_stat_monitor_reset() IS NOT NULL AS t; + +SELECT count(*) FROM pgsm_parallel_tab; + +SELECT query, + parallel_workers_to_launch > 0 AS has_workers_to_launch, + parallel_workers_launched > 0 AS has_workers_launched + FROM pg_stat_monitor + WHERE query ~ 'SELECT count' + ORDER BY query COLLATE "C"; + +DROP TABLE pgsm_parallel_tab; diff --git a/t/018_column_names.pl b/t/018_column_names.pl index 8c5cbb3..032b1a4 100644 --- a/t/018_column_names.pl +++ b/t/018_column_names.pl @@ -31,6 +31,7 @@ my %pg_versions_pgsm_columns = ( 18 => "application_name,". "local_blk_read_time,local_blk_write_time,local_blks_dirtied,local_blks_hit,". "local_blks_read,local_blks_written,max_exec_time,max_plan_time,mean_exec_time," . "mean_plan_time,message,min_exec_time,min_plan_time,minmax_stats_since," . + "parallel_workers_launched,parallel_workers_to_launch," . "pgsm_query_id,planid,plans,query,query_plan,queryid,relations,resp_calls,rows," . "shared_blk_read_time,shared_blk_write_time,shared_blks_dirtied," . "shared_blks_hit,shared_blks_read,shared_blks_written,sqlcode,stats_since," . diff --git a/t/025_compare_pgss.pl b/t/025_compare_pgss.pl index 37cdb59..a629e45 100644 --- a/t/025_compare_pgss.pl +++ b/t/025_compare_pgss.pl @@ -145,6 +145,14 @@ if ($PGSM::PG_MAJOR_VERSION >= 18) ($cmdret, $stdout, $stderr) = $node->psql('postgres', 'SELECT SUM(PGSM.wal_buffers_full) = SUM(PGSS.wal_buffers_full) FROM pg_stat_monitor AS PGSM INNER JOIN pg_stat_statements AS PGSS ON PGSS.query = PGSM.query WHERE PGSM.query LIKE \'%DELETE FROM pgbench_accounts%\' GROUP BY PGSM.query;', extra_params => ['-Pformat=unaligned','-Ptuples_only=on']); trim($stdout); is($stdout,'t',"Compare: wal_buffers_full are equal."); + + ($cmdret, $stdout, $stderr) = $node->psql('postgres', 'SELECT SUM(PGSM.parallel_workers_to_launch) = SUM(PGSS.parallel_workers_to_launch) FROM pg_stat_monitor AS PGSM INNER JOIN pg_stat_statements AS PGSS ON PGSS.query = PGSM.query WHERE PGSM.query LIKE \'%DELETE FROM pgbench_accounts%\' GROUP BY PGSM.query;', extra_params => ['-Pformat=unaligned','-Ptuples_only=on']); + trim($stdout); + is($stdout,'t',"Compare: parallel_workers_to_launch are equal."); + + ($cmdret, $stdout, $stderr) = $node->psql('postgres', 'SELECT SUM(PGSM.parallel_workers_launched) = SUM(PGSS.parallel_workers_launched) FROM pg_stat_monitor AS PGSM INNER JOIN pg_stat_statements AS PGSS ON PGSS.query = PGSM.query WHERE PGSM.query LIKE \'%DELETE FROM pgbench_accounts%\' GROUP BY PGSM.query;', extra_params => ['-Pformat=unaligned','-Ptuples_only=on']); + trim($stdout); + is($stdout,'t',"Compare: parallel_workers_launched are equal."); } # Compare values for query 'INSERT INTO pgbench_history (tid, bid, aid, delta, mtime) VALUES ($1, $2, $3, $4, CURRENT_TIMESTAMP)' @@ -201,6 +209,14 @@ if ($PGSM::PG_MAJOR_VERSION >= 18) ($cmdret, $stdout, $stderr) = $node->psql('postgres', 'SELECT SUM(PGSM.wal_buffers_full) = SUM(PGSS.wal_buffers_full) FROM pg_stat_monitor AS PGSM INNER JOIN pg_stat_statements AS PGSS ON PGSS.query = PGSM.query WHERE PGSM.query LIKE \'%INSERT INTO pgbench_history%\' GROUP BY PGSM.query;', extra_params => ['-Pformat=unaligned','-Ptuples_only=on']); trim($stdout); is($stdout,'t',"Compare: wal_buffers_full are equal."); + + ($cmdret, $stdout, $stderr) = $node->psql('postgres', 'SELECT SUM(PGSM.parallel_workers_to_launch) = SUM(PGSS.parallel_workers_to_launch) FROM pg_stat_monitor AS PGSM INNER JOIN pg_stat_statements AS PGSS ON PGSS.query = PGSM.query WHERE PGSM.query LIKE \'%INSERT INTO pgbench_history%\' GROUP BY PGSM.query;', extra_params => ['-Pformat=unaligned','-Ptuples_only=on']); + trim($stdout); + is($stdout,'t',"Compare: parallel_workers_to_launch are equal."); + + ($cmdret, $stdout, $stderr) = $node->psql('postgres', 'SELECT SUM(PGSM.parallel_workers_launched) = SUM(PGSS.parallel_workers_launched) FROM pg_stat_monitor AS PGSM INNER JOIN pg_stat_statements AS PGSS ON PGSS.query = PGSM.query WHERE PGSM.query LIKE \'%INSERT INTO pgbench_history%\' GROUP BY PGSM.query;', extra_params => ['-Pformat=unaligned','-Ptuples_only=on']); + trim($stdout); + is($stdout,'t',"Compare: parallel_workers_launched are equal."); } # Compare values for query 'SELECT abalance FROM pgbench_accounts WHERE aid = $1' @@ -257,6 +273,14 @@ if ($PGSM::PG_MAJOR_VERSION >= 18) ($cmdret, $stdout, $stderr) = $node->psql('postgres', 'SELECT SUM(PGSM.wal_buffers_full) = SUM(PGSS.wal_buffers_full) FROM pg_stat_monitor AS PGSM INNER JOIN pg_stat_statements AS PGSS ON PGSS.query = PGSM.query WHERE PGSM.query LIKE \'%SELECT abalance FROM pgbench_accounts%\' GROUP BY PGSM.query;', extra_params => ['-Pformat=unaligned','-Ptuples_only=on']); trim($stdout); is($stdout,'t',"Compare: wal_buffers_full are equal."); + + ($cmdret, $stdout, $stderr) = $node->psql('postgres', 'SELECT SUM(PGSM.parallel_workers_to_launch) = SUM(PGSS.parallel_workers_to_launch) FROM pg_stat_monitor AS PGSM INNER JOIN pg_stat_statements AS PGSS ON PGSS.query = PGSM.query WHERE PGSM.query LIKE \'%SELECT abalance FROM pgbench_accounts%\' GROUP BY PGSM.query;', extra_params => ['-Pformat=unaligned','-Ptuples_only=on']); + trim($stdout); + is($stdout,'t',"Compare: parallel_workers_to_launch are equal."); + + ($cmdret, $stdout, $stderr) = $node->psql('postgres', 'SELECT SUM(PGSM.parallel_workers_launched) = SUM(PGSS.parallel_workers_launched) FROM pg_stat_monitor AS PGSM INNER JOIN pg_stat_statements AS PGSS ON PGSS.query = PGSM.query WHERE PGSM.query LIKE \'%SELECT abalance FROM pgbench_accounts%\' GROUP BY PGSM.query;', extra_params => ['-Pformat=unaligned','-Ptuples_only=on']); + trim($stdout); + is($stdout,'t',"Compare: parallel_workers_launched are equal."); } # Compare values for query 'UPDATE pgbench_accounts SET abalance = abalance + $1 WHERE aid = $2' @@ -309,6 +333,14 @@ if ($PGSM::PG_MAJOR_VERSION >= 18) ($cmdret, $stdout, $stderr) = $node->psql('postgres', 'SELECT SUM(PGSM.wal_buffers_full) = SUM(PGSS.wal_buffers_full) FROM pg_stat_monitor AS PGSM INNER JOIN pg_stat_statements AS PGSS ON PGSS.query = PGSM.query WHERE PGSM.query LIKE \'%UPDATE pgbench_accounts%\' GROUP BY PGSM.query;', extra_params => ['-Pformat=unaligned','-Ptuples_only=on']); trim($stdout); is($stdout,'t',"Compare: wal_buffers_full are equal."); + + ($cmdret, $stdout, $stderr) = $node->psql('postgres', 'SELECT SUM(PGSM.parallel_workers_to_launch) = SUM(PGSS.parallel_workers_to_launch) FROM pg_stat_monitor AS PGSM INNER JOIN pg_stat_statements AS PGSS ON PGSS.query = PGSM.query WHERE PGSM.query LIKE \'%UPDATE pgbench_accounts%\' GROUP BY PGSM.query;', extra_params => ['-Pformat=unaligned','-Ptuples_only=on']); + trim($stdout); + is($stdout,'t',"Compare: parallel_workers_to_launch are equal."); + + ($cmdret, $stdout, $stderr) = $node->psql('postgres', 'SELECT SUM(PGSM.parallel_workers_launched) = SUM(PGSS.parallel_workers_launched) FROM pg_stat_monitor AS PGSM INNER JOIN pg_stat_statements AS PGSS ON PGSS.query = PGSM.query WHERE PGSM.query LIKE \'%UPDATE pgbench_accounts%\' GROUP BY PGSM.query;', extra_params => ['-Pformat=unaligned','-Ptuples_only=on']); + trim($stdout); + is($stdout,'t',"Compare: parallel_workers_launched are equal."); } # DROP EXTENSION