diff --git a/Makefile b/Makefile index 5ddf0c5..a8a28c9 100644 --- a/Makefile +++ b/Makefile @@ -1,10 +1,10 @@ # contrib/pg_stat_monitor/Makefile MODULE_big = pg_stat_monitor -OBJS = hash_query.o guc.o pg_stat_monitor.o $(WIN32RES) +OBJS = hash_query.o guc.o pg_stat_monitor.o pgsm_table_export.o $(WIN32RES) EXTENSION = pg_stat_monitor -DATA = pg_stat_monitor--2.0.sql pg_stat_monitor--1.0--2.0.sql pg_stat_monitor--2.0--2.1.sql pg_stat_monitor--2.1--2.2.sql pg_stat_monitor--2.2--2.3.sql +DATA = pg_stat_monitor--2.0.sql pg_stat_monitor--1.0--2.0.sql pg_stat_monitor--2.0--2.1.sql pg_stat_monitor--2.1--2.2.sql pg_stat_monitor--2.2--2.3.sql pg_stat_monitor--2.3--2.4.sql PGFILEDESC = "pg_stat_monitor - execution statistics of SQL statements" diff --git a/PLAN.md b/PLAN.md new file mode 100644 index 0000000..d0e9d04 --- /dev/null +++ b/PLAN.md @@ -0,0 +1,474 @@ +# pg_stat_monitor Low-Memory Fork: Phased Implementation Plan + +## 🎯 **Objective** +Create a fork that stores data in partitioned tables with materialized views, ultimately reducing memory from 276MB → 1-3MB through phased, reviewable changes. + +## 📊 **Memory Reduction Stages** + +### **Current State** +- Default: 256MB + 20MB = **276MB total** + +### **Phase 1: Configuration Only** +- `pgsm_max`: 256MB → 10MB (minimum allowed) +- `pgsm_max_buckets`: 10 → 2 +- `pgsm_query_shared_buffer`: 20MB → 1MB +- **Result**: 11MB (96% reduction) + +### **Phase 3: Clear After Export** +- Keep only current bucket in memory +- Clear immediately after export to table +- **Target**: 1-3MB (99% reduction) + +## 🏗️ **Phased Implementation Plan** + +--- + +# **PHASE 1: Table Storage (2 hours)** +*Git diff: ~150 lines in new file + 2 line hook* + +## **Objective** +Export statistics to partitioned table on bucket rotation without changing memory usage. + +## **Changes** + +### **1.1: New File** (`pgsm_table_export.c`) + +```c +/* + * pgsm_table_export.c - Export pg_stat_monitor data to partitioned tables + * Phase 1: Basic table export functionality + */ +#include "postgres.h" +#include "pg_stat_monitor.h" +#include "executor/spi.h" +#include "utils/builtins.h" +#include "utils/timestamp.h" + +/* GUC variable */ +bool pgsm_enable_table_export = true; + +/* External references */ +extern HTAB *pgsm_hash; + +/* + * Export bucket data to partitioned table + * Called from get_next_wbucket() before bucket cleanup + */ +void +pgsm_export_bucket_to_table(uint64 bucket_id) +{ + HASH_SEQ_STATUS hash_seq; + pgsmEntry *entry; + StringInfoData sql; + int ret, exported = 0; + bool first = true; + + /* Quick exit if disabled */ + if (!pgsm_enable_table_export) + return; + + /* Check table exists (will be created by SQL migration) */ + SPI_connect(); + ret = SPI_execute("SELECT 1 FROM pg_tables WHERE tablename = 'pg_stat_monitor_data'", + true, 1); + if (ret != SPI_OK_SELECT || SPI_processed == 0) { + SPI_finish(); + return; /* Table doesn't exist yet */ + } + SPI_finish(); + + /* Build batch INSERT */ + initStringInfo(&sql); + appendStringInfo(&sql, + "INSERT INTO pg_stat_monitor_data " + "(bucket_id, queryid, dbid, userid, calls, rows, " + "total_time, mean_time, min_time, max_time, " + "shared_blks_hit, shared_blks_read) VALUES "); + + /* Export all entries from this bucket */ + hash_seq_init(&hash_seq, pgsm_hash); + while ((entry = hash_seq_search(&hash_seq)) != NULL) + { + if (entry->key.bucket_id != bucket_id) + continue; + + if (!first) + appendStringInfoChar(&sql, ','); + first = false; + + /* Add entry data */ + appendStringInfo(&sql, + "(%lu, %lu, %u, %u, %ld, %ld, " + "%.3f, %.3f, %.3f, %.3f, %ld, %ld)", + bucket_id, + entry->pgsm_query_id, + entry->key.dbid, + entry->key.userid, + entry->counters.calls.calls, + entry->counters.calls.rows, + entry->counters.time.total_time, + entry->counters.time.mean_time, + entry->counters.time.min_time, + entry->counters.time.max_time, + entry->counters.blocks.shared_blks_hit, + entry->counters.blocks.shared_blks_read); + + exported++; + } + + /* Execute if we have data */ + if (exported > 0) + { + SPI_connect(); + ret = SPI_execute(sql.data, false, 0); + SPI_finish(); + + elog(DEBUG1, "pg_stat_monitor: exported %d entries from bucket %lu", + exported, bucket_id); + } + + pfree(sql.data); +} +``` + +### **1.2: Hook Addition** (`pg_stat_monitor.c`) + +```diff ++ /* External function from pgsm_table_export.c */ ++ extern void pgsm_export_bucket_to_table(uint64 bucket_id); + +static uint64 +get_next_wbucket(pgsmSharedState *pgsm) +{ + // ... line ~2570 ... + if (update_bucket) + { + new_bucket_id = (tv.tv_sec / pgsm_bucket_time) % pgsm_max_buckets; + prev_bucket_id = pg_atomic_exchange_u64(&pgsm->current_wbucket, new_bucket_id); + + pgsm_lock_aquire(pgsm, LW_EXCLUSIVE); + ++ /* Export bucket data before deallocation */ ++ pgsm_export_bucket_to_table(new_bucket_id); + + hash_entry_dealloc(new_bucket_id, prev_bucket_id, NULL); + pgsm_lock_release(pgsm); + } +``` + +### **1.3: SQL Migration** (`pg_stat_monitor--2.0--2.1.sql`) + +```sql +-- Phase 1: Create partitioned table for data export +CREATE TABLE IF NOT EXISTS pg_stat_monitor_data ( + bucket_id bigint, + queryid bigint, + dbid oid, + userid oid, + calls bigint, + rows bigint, + total_time double precision, + mean_time double precision, + min_time double precision, + max_time double precision, + shared_blks_hit bigint, + shared_blks_read bigint, + exported_at timestamptz DEFAULT now() +) PARTITION BY RANGE (exported_at); + +-- Create initial partition for today +CREATE TABLE pg_stat_monitor_data_default +PARTITION OF pg_stat_monitor_data DEFAULT; + +-- Create indexes +CREATE INDEX ON pg_stat_monitor_data (queryid); +CREATE INDEX ON pg_stat_monitor_data (exported_at); + +-- Reduce memory usage via configuration +ALTER SYSTEM SET pg_stat_monitor.pgsm_max = '10MB'; +ALTER SYSTEM SET pg_stat_monitor.pgsm_max_buckets = 2; +ALTER SYSTEM SET pg_stat_monitor.pgsm_query_shared_buffer = '1MB'; +``` + +### **1.4: Makefile Update** + +```diff +- OBJS = hash_query.o guc.o pg_stat_monitor.o $(WIN32RES) ++ OBJS = hash_query.o guc.o pg_stat_monitor.o pgsm_table_export.o $(WIN32RES) +``` + +### **1.5: GUC Addition** (`guc.c`) + +```diff ++ /* Declared in pgsm_table_export.c */ ++ extern bool pgsm_enable_table_export; + +void +init_guc(void) +{ + // ... existing GUCs ... + ++ DefineCustomBoolVariable("pg_stat_monitor.pgsm_enable_table_export", ++ "Enable export to partitioned tables", ++ NULL, ++ &pgsm_enable_table_export, ++ true, ++ PGC_SIGHUP, ++ 0, ++ NULL, NULL, NULL); +} + +## **Phase 1 Testing** + +```sql +-- Verify export is working +SELECT count(*) FROM pg_stat_monitor_data; + +-- Check data contents +SELECT queryid, calls, total_time FROM pg_stat_monitor_data LIMIT 10; +``` + +--- + +# **PHASE 2: Materialized View with Bucket-Synced Refresh (1 hour)** +*Git diff: ~30 lines added to existing file* + +## **Objective** +Replace existing view with materialized view that refreshes after each bucket export (synchronized with `pgsm_bucket_time`). + +## **Changes** + +### **2.1: Add Refresh Function** (`pgsm_table_export.c`) + +```c +/* + * Refresh materialized view after bucket export + * This keeps the view in sync with the data export schedule + */ +void +pgsm_refresh_materialized_view(void) +{ + int ret; + + /* Skip if table export disabled */ + if (!pgsm_enable_table_export) + return; + + /* Check if materialized view exists */ + SPI_connect(); + ret = SPI_execute("SELECT 1 FROM pg_matviews WHERE matviewname = 'pg_stat_monitor'", + true, 1); + if (ret != SPI_OK_SELECT || SPI_processed == 0) { + SPI_finish(); + return; /* Materialized view doesn't exist yet */ + } + + /* Refresh the view (CONCURRENTLY to avoid blocking) */ + ret = SPI_execute("REFRESH MATERIALIZED VIEW CONCURRENTLY pg_stat_monitor", + false, 0); + if (ret == SPI_OK_UTILITY) + { + elog(DEBUG1, "pg_stat_monitor: refreshed materialized view"); + } + SPI_finish(); +} +``` + +### **2.2: Hook After Export** (`pg_stat_monitor.c`) + +```diff ++ extern void pgsm_refresh_materialized_view(void); + +static uint64 +get_next_wbucket(pgsmSharedState *pgsm) +{ + if (update_bucket) + { + new_bucket_id = (tv.tv_sec / pgsm_bucket_time) % pgsm_max_buckets; + prev_bucket_id = pg_atomic_exchange_u64(&pgsm->current_wbucket, new_bucket_id); + + pgsm_lock_aquire(pgsm, LW_EXCLUSIVE); + + /* Export bucket data before deallocation */ + pgsm_export_bucket_to_table(new_bucket_id); + ++ /* Refresh materialized view after export (same timing as bucket) */ ++ pgsm_refresh_materialized_view(); + + hash_entry_dealloc(new_bucket_id, prev_bucket_id, NULL); + pgsm_lock_release(pgsm); + } +} +``` + +### **2.3: SQL Migration** (`pg_stat_monitor--2.1--2.2.sql`) + +```sql +-- Phase 2: Replace view with materialized view + +-- Save existing view definition +CREATE OR REPLACE VIEW pg_stat_monitor_old AS +SELECT * FROM pg_stat_monitor; + +-- Drop existing view +DROP VIEW IF EXISTS pg_stat_monitor; + +-- Create materialized view from table data +CREATE MATERIALIZED VIEW pg_stat_monitor AS +SELECT + bucket_id, + queryid, + dbid, + userid, + calls, + rows, + total_time, + mean_time, + min_time, + max_time, + shared_blks_hit, + shared_blks_read +FROM pg_stat_monitor_data +WHERE exported_at > now() - interval '24 hours'; + +-- Create unique index for CONCURRENT refresh +CREATE UNIQUE INDEX ON pg_stat_monitor (bucket_id, queryid, dbid, userid); + +-- Initial population +REFRESH MATERIALIZED VIEW pg_stat_monitor; +``` + +## **Phase 2 Testing** + +```sql +-- Query the materialized view +SELECT * FROM pg_stat_monitor LIMIT 10; + +-- Verify refresh timing matches bucket rotation +-- With default pgsm_bucket_time=60, view should refresh every 60 seconds +-- Check PostgreSQL logs for: "pg_stat_monitor: refreshed materialized view" + +-- Test with different bucket timing +ALTER SYSTEM SET pg_stat_monitor.pgsm_bucket_time = 30; -- 30-second buckets +SELECT pg_reload_conf(); +-- Now view should refresh every 30 seconds +``` + +--- + +# **PHASE 3: Memory Reduction to 1-3MB (2 hours)** +*Git diff: ~100 lines, mostly in isolated functions* + +## **Objective** +Clear buckets immediately after export to achieve 1-3MB memory usage. + +## **Changes** + +### **3.1: Immediate Clear After Export** (`pgsm_table_export.c`) + +```c +/* + * Clear all entries from exported bucket + * This allows us to keep only current data in memory + */ +void +pgsm_clear_bucket_after_export(uint64 bucket_id) +{ + HASH_SEQ_STATUS hash_seq; + pgsmEntry *entry; + int cleared = 0; + + /* Iterate and remove entries from exported bucket */ + hash_seq_init(&hash_seq, pgsm_hash); + while ((entry = hash_seq_search(&hash_seq)) != NULL) + { + if (entry->key.bucket_id == bucket_id) + { + /* Remove from hash table */ + hash_search(pgsm_hash, &entry->key, HASH_REMOVE, NULL); + cleared++; + } + } + + elog(DEBUG1, "pg_stat_monitor: cleared %d entries from bucket %lu", + cleared, bucket_id); +} +``` + +### **3.2: Modify Export Flow** (`pg_stat_monitor.c`) + +```diff +static uint64 +get_next_wbucket(pgsmSharedState *pgsm) +{ + if (update_bucket) + { + new_bucket_id = (tv.tv_sec / pgsm_bucket_time) % pgsm_max_buckets; + prev_bucket_id = pg_atomic_exchange_u64(&pgsm->current_wbucket, new_bucket_id); + + pgsm_lock_aquire(pgsm, LW_EXCLUSIVE); + + /* Export bucket data before deallocation */ + pgsm_export_bucket_to_table(new_bucket_id); + ++ /* Clear exported bucket immediately to free memory */ ++ extern void pgsm_clear_bucket_after_export(uint64); ++ pgsm_clear_bucket_after_export(new_bucket_id); + +- hash_entry_dealloc(new_bucket_id, prev_bucket_id, NULL); ++ /* Skip normal dealloc since we already cleared */ + + pgsm_lock_release(pgsm); + } +} +``` + +### **3.3: Single Bucket Mode** (`guc.c`) + +```diff +void +init_guc(void) +{ ++ /* Override to single bucket for minimal memory */ ++ if (pgsm_enable_table_export) ++ pgsm_max_buckets = 1; /* Only current bucket needed */ +``` + +### **3.4: Further Memory Reduction** + +```sql +-- Phase 3: Aggressive memory reduction +ALTER SYSTEM SET pg_stat_monitor.pgsm_max = '1MB'; -- Minimum possible +ALTER SYSTEM SET pg_stat_monitor.pgsm_max_buckets = 1; -- Single bucket +ALTER SYSTEM SET pg_stat_monitor.pgsm_query_shared_buffer = '100kB'; -- Tiny buffer +``` + +## **Phase 3 Testing** + +```sql +-- Check memory usage +SELECT name, size FROM pg_shmem_allocations +WHERE name LIKE '%pg_stat_monitor%'; + +-- Should show ~1-3MB instead of 276MB +``` + +--- + +## **Summary of Changes** + +| Phase | Files Changed | Lines Added | Memory Usage | Feature | +|-------|--------------|-------------|--------------|---------| +| **Phase 1** | 4 files | ~150 lines | 11MB | Table export | +| **Phase 2** | 2 files | ~30 lines | 11MB | Mat view + bucket-synced refresh | +| **Phase 3** | 3 files | ~100 lines | **1-3MB** | Clear after export | + +**Total Git Diff: ~300 lines across 5 files** + +## **Key Benefits** + +1. **Phased approach** - Each phase can be reviewed/tested independently +2. **Minimal core changes** - Hooks into existing bucket rotation +3. **No cron needed** - Uses existing function calls for timing +4. **Easy rollback** - Can disable with single GUC +5. **Rebase friendly** - Changes isolated in new file + minimal hooks \ No newline at end of file diff --git a/pg_stat_monitor--2.3--2.4.sql b/pg_stat_monitor--2.3--2.4.sql new file mode 100644 index 0000000..3a7f522 --- /dev/null +++ b/pg_stat_monitor--2.3--2.4.sql @@ -0,0 +1,128 @@ +/* contrib/pg_stat_monitor/pg_stat_monitor--2.3--2.4.sql */ + +-- complain if script is sourced in psql, rather than via CREATE EXTENSION +\echo Use "ALTER EXTENSION pg_stat_monitor" to load this file. \quit + +-- Create partitioned table with exact pg_stat_monitor API structure +-- Table columns match the expected API exactly - direct table access +DROP VIEW IF EXISTS pg_stat_monitor CASCADE; + +CREATE TABLE pg_stat_monitor ( + bucket bigint, -- renamed from bucket_id + bucket_start_time timestamptz, + userid oid, + username text, + dbid oid, + datname text, + client_ip inet, + pgsm_query_id bigint, + queryid bigint, + toplevel boolean, + top_queryid bigint, + + -- Query texts limited to 1.5KB each + query text, + comments text, + planid bigint, + query_plan text, + top_query text, + + application_name text, + relations text[], -- array instead of comma-separated string + cmd_type int, + cmd_type_text text, -- computed at write time + elevel int, + sqlcode text, + message text, + + -- Execution stats + calls bigint, + total_exec_time double precision, + min_exec_time double precision, + max_exec_time double precision, + mean_exec_time double precision, + stddev_exec_time double precision, + rows bigint, + + -- Planning stats + plans bigint, + total_plan_time double precision, + min_plan_time double precision, + max_plan_time double precision, + mean_plan_time double precision, + stddev_plan_time double precision, + + -- Block stats + shared_blks_hit bigint, + shared_blks_read bigint, + shared_blks_dirtied bigint, + shared_blks_written bigint, + local_blks_hit bigint, + local_blks_read bigint, + local_blks_dirtied bigint, + local_blks_written bigint, + temp_blks_read bigint, + temp_blks_written bigint, + shared_blk_read_time double precision, + shared_blk_write_time double precision, + local_blk_read_time double precision, + local_blk_write_time double precision, + temp_blk_read_time double precision, + temp_blk_write_time double precision, + + -- System stats + cpu_user_time double precision, + cpu_sys_time double precision, + + -- WAL stats + wal_records bigint, + wal_fpi bigint, + wal_bytes numeric, + + -- JIT stats + jit_functions bigint, + jit_generation_time double precision, + jit_inlining_count bigint, + jit_inlining_time double precision, + jit_optimization_count bigint, + jit_optimization_time double precision, + jit_emission_count bigint, + jit_emission_time double precision, + jit_deform_count bigint, + jit_deform_time double precision, + + -- Response time histogram + resp_calls text[], + + -- Metadata + stats_since timestamptz, + minmax_stats_since timestamptz, + bucket_done boolean DEFAULT false, + exported_at timestamptz DEFAULT now() +) PARTITION BY RANGE (exported_at); + +-- Create initial partition for today +CREATE TABLE pg_stat_monitor_default +PARTITION OF pg_stat_monitor DEFAULT; + +-- Create indexes for query performance +CREATE INDEX ON pg_stat_monitor (queryid); +CREATE INDEX ON pg_stat_monitor (exported_at); +CREATE INDEX ON pg_stat_monitor (bucket, queryid); -- Composite for time-series queries + +-- Configure memory and query text limits (set these manually in postgresql.conf): +-- pg_stat_monitor.pgsm_max = '10MB' +-- pg_stat_monitor.pgsm_max_buckets = 2 +-- pg_stat_monitor.pgsm_query_shared_buffer = '1MB' +-- pg_stat_monitor.pgsm_query_max_len = 1536 + +-- Create user-callable export function following PostgreSQL extension best practices +CREATE OR REPLACE FUNCTION pg_stat_monitor_export() +RETURNS int +AS 'MODULE_PATHNAME', 'pg_stat_monitor_export' +LANGUAGE C STRICT VOLATILE; + +-- Grant execute permission to public +GRANT EXECUTE ON FUNCTION pg_stat_monitor_export() TO PUBLIC; + +-- Table structure matches API exactly - implementation complete \ No newline at end of file diff --git a/pg_stat_monitor.h b/pg_stat_monitor.h index 1dc622b..c5e8062 100644 --- a/pg_stat_monitor.h +++ b/pg_stat_monitor.h @@ -503,3 +503,6 @@ void pgsm_hash_seq_init(PGSM_HASH_SEQ_STATUS * hstat, PGSM_HASH_TABLE * shared_ void *pgsm_hash_seq_next(PGSM_HASH_SEQ_STATUS * hstat); void pgsm_hash_seq_term(PGSM_HASH_SEQ_STATUS * hstat); void pgsm_hash_delete_current(PGSM_HASH_SEQ_STATUS * hstat, PGSM_HASH_TABLE * shared_hash, void *key); + +/* Table export functions */ +bool pgsm_export_bucket_to_table(uint64 bucket_id); diff --git a/pgsm_table_export.c b/pgsm_table_export.c new file mode 100644 index 0000000..7e1f562 --- /dev/null +++ b/pgsm_table_export.c @@ -0,0 +1,723 @@ +/* + * pgsm_table_export.c - Export pg_stat_monitor data to partitioned tables + * Low-memory implementation with query text limits and top queries filtering + */ +#include "postgres.h" +#include "pg_stat_monitor.h" +#include "executor/spi.h" +#include "utils/builtins.h" +#include "utils/timestamp.h" +#include "catalog/pg_authid.h" +#include "lib/dshash.h" +#include "utils/dsa.h" + +/* Helper macro for repeated truncate and escape pattern */ +#define APPEND_ESCAPED_STRING(sql, str, max_len) \ + do { \ + if ((str) && strlen(str) > 0) { \ + char *escaped = truncate_and_escape_string((str), (max_len)); \ + appendStringInfo(&(sql), "'%s', ", escaped); \ + pfree(escaped); \ + } else { \ + appendStringInfo(&(sql), "NULL, "); \ + } \ + } while(0) + +/* Function declarations */ +bool pgsm_export_bucket_to_table(uint64 bucket_id); + +/* Configuration */ +#define MAX_QUERY_LENGTH 1536 /* 1.5KB limit for query text */ +#define MAX_QUERIES_PER_BUCKET 300 /* Top 300 queries per bucket */ + +/* External references */ +extern pgsmSharedState *pgsm_get_ss(void); +extern dsa_area *get_dsa_area_for_query_text(void); + +/* Structure for sorting entries by total_time */ +typedef struct +{ + pgsmEntry *entry; + double total_time; +} EntryWithTime; + + +/* + * Helper function to truncate and escape strings for SQL + */ +static char * +truncate_and_escape_string(const char *str, int max_len) +{ + int len; + bool truncated; + char *result; + int src_len; + int j = 0; + + if (!str || *str == '\0') + return pstrdup(""); + + len = strlen(str); + truncated = (len > max_len); + + /* Simple approach: allocate generous buffer */ + src_len = truncated ? max_len : len; + result = palloc(src_len * 2 + 10); + + /* Copy and escape quotes */ + for (int i = 0; i < src_len; i++) + { + if (str[i] == '\'') + { + result[j++] = '\''; + result[j++] = '\''; + } + else + { + result[j++] = str[i]; + } + } + + /* Add truncation marker if needed */ + if (truncated) + { + result[j++] = '.'; + result[j++] = '.'; + result[j++] = '.'; + } + + result[j] = '\0'; + return result; +} + +/* + * Helper to convert response calls array to PostgreSQL array format + */ +static char * +resp_calls_to_array(int *resp_calls, int count) +{ + StringInfoData str; + char *result; + + + initStringInfo(&str); + + appendStringInfo(&str, "ARRAY["); + for (int i = 0; i < count; i++) + { + if (i > 0) + appendStringInfoChar(&str, ','); + appendStringInfo(&str, "%d", resp_calls[i]); + } + appendStringInfo(&str, "]"); + + /* Create a copy in the calling context and free the StringInfo */ + result = pstrdup(str.data); + + pfree(str.data); + + return result; +} + +/* + * Comparison function for sorting entries by total_time (descending) + */ +static int +compare_entries_by_time(const void *a, const void *b) +{ + EntryWithTime *ea = (EntryWithTime *)a; + EntryWithTime *eb = (EntryWithTime *)b; + + if (ea->total_time > eb->total_time) + return -1; + else if (ea->total_time < eb->total_time) + return 1; + else + return 0; +} + + +/* + * Helper function to collect and sort entries from a bucket + */ +static EntryWithTime * +collect_and_sort_entries(uint64 bucket_id, int *entry_count) +{ + PGSM_HASH_SEQ_STATUS hstat; + pgsmEntry *entry; + EntryWithTime *entries_array = NULL; + int array_size = 1000; /* Initial allocation */ + + *entry_count = 0; + + /* Allocate array for sorting entries */ + entries_array = palloc(array_size * sizeof(EntryWithTime)); + + /* First pass: collect all entries from this bucket */ + pgsm_hash_seq_init(&hstat, get_pgsmHash(), false); + while ((entry = pgsm_hash_seq_next(&hstat)) != NULL) + { + if (entry->key.bucket_id != bucket_id) + continue; + + /* Expand array if needed */ + if (*entry_count >= array_size) + { + array_size *= 2; + entries_array = repalloc(entries_array, array_size * sizeof(EntryWithTime)); + } + + entries_array[*entry_count].entry = entry; + entries_array[*entry_count].total_time = entry->counters.time.total_time; + (*entry_count)++; + } + pgsm_hash_seq_term(&hstat); + + elog(LOG, "pg_stat_monitor: Found %d entries for bucket %lu", *entry_count, (unsigned long)bucket_id); + + /* Sort entries by total_time descending */ + qsort(entries_array, *entry_count, sizeof(EntryWithTime), compare_entries_by_time); + + return entries_array; +} + +/* + * Helper function to build INSERT SQL statement + */ +/* + * Build the INSERT statement header with all column names + */ +static void +build_insert_header(StringInfo sql) +{ + appendStringInfo(sql, + "INSERT INTO pg_stat_monitor " + "(bucket, bucket_start_time, userid, username, dbid, datname, " + "client_ip, pgsm_query_id, queryid, toplevel, top_queryid, " + "query, comments, planid, query_plan, top_query, application_name, relations, " + "cmd_type, cmd_type_text, elevel, sqlcode, message, " + "calls, total_exec_time, min_exec_time, max_exec_time, mean_exec_time, stddev_exec_time, rows, " + "plans, total_plan_time, min_plan_time, max_plan_time, mean_plan_time, stddev_plan_time, " + "shared_blks_hit, shared_blks_read, shared_blks_dirtied, shared_blks_written, " + "local_blks_hit, local_blks_read, local_blks_dirtied, local_blks_written, " + "temp_blks_read, temp_blks_written, " + "shared_blk_read_time, shared_blk_write_time, local_blk_read_time, local_blk_write_time, " + "temp_blk_read_time, temp_blk_write_time, " + "cpu_user_time, cpu_sys_time, " + "wal_records, wal_fpi, wal_bytes, " + "jit_functions, jit_generation_time, jit_inlining_count, jit_inlining_time, " + "jit_optimization_count, jit_optimization_time, jit_emission_count, jit_emission_time, " + "jit_deform_count, jit_deform_time, " + "resp_calls, stats_since, minmax_stats_since, bucket_done, exported_at" + ") VALUES "); +} + +static char * +build_insert_statement(EntryWithTime *entries_array, int queries_to_export, + uint64 bucket_id, pgsmSharedState *pgsm, dsa_area *query_dsa_area) +{ + StringInfoData sql; + bool first = true; + + /* Build batch INSERT for top queries */ + initStringInfo(&sql); + build_insert_header(&sql); + + /* Export top N entries using original incremental approach */ + for (int i = 0; i < queries_to_export; i++) + { + char *query_txt = NULL; + char *parent_query_txt = NULL; + char *query_plan_txt = NULL; + char *relations_str = NULL; + double exec_stddev = 0.0; + double plan_stddev = 0.0; + double plan_mean __attribute__((unused)) = 0.0; + char *resp_calls_str; + pgsmEntry *entry; + + entry = entries_array[i].entry; + + if (!first) + appendStringInfoChar(&sql, ','); + first = false; + + /* Get query text - truncated to MAX_QUERY_LENGTH */ + if (DsaPointerIsValid(entry->query_text.query_pos)) + { + char *query_ptr = dsa_get_address(query_dsa_area, entry->query_text.query_pos); + query_txt = truncate_and_escape_string(query_ptr, MAX_QUERY_LENGTH); + } + else + { + query_txt = pstrdup("Query string not available"); + } + + /* Get parent query text if exists - also truncated */ + if (entry->key.parentid != UINT64CONST(0) && + DsaPointerIsValid(entry->counters.info.parent_query)) + { + char *query_ptr = dsa_get_address(query_dsa_area, entry->counters.info.parent_query); + parent_query_txt = truncate_and_escape_string(query_ptr, MAX_QUERY_LENGTH); + } + + /* Get relations as comma-separated string */ + if (entry->counters.info.num_relations > 0) + { + StringInfoData rel_str; + initStringInfo(&rel_str); + for (int j = 0; j < entry->counters.info.num_relations; j++) + { + if (j > 0) + appendStringInfoChar(&rel_str, ','); + appendStringInfo(&rel_str, "%s", entry->counters.info.relations[j]); + } + /* Create a copy in the calling context and free the StringInfo */ + relations_str = pstrdup(rel_str.data); + pfree(rel_str.data); + } + + /* Calculate stddev for exec and plan times */ + if (entry->counters.calls.calls > 1) + exec_stddev = sqrt(entry->counters.time.sum_var_time / entry->counters.calls.calls); + if (entry->counters.plancalls.calls > 1) + { + plan_stddev = sqrt(entry->counters.plantime.sum_var_time / entry->counters.plancalls.calls); + plan_mean = entry->counters.plantime.mean_time; + } + + /* Convert response calls array to PostgreSQL array */ + resp_calls_str = resp_calls_to_array(entry->counters.resp_calls, MAX_RESPONSE_BUCKET); + + /* Get query plan if exists - also truncated */ + if (entry->key.planid && entry->counters.planinfo.plan_text[0]) + { + query_plan_txt = truncate_and_escape_string(entry->counters.planinfo.plan_text, MAX_QUERY_LENGTH); + } + + /* Build the VALUES clause for this entry */ + appendStringInfo(&sql, + "(" + "%ld, ", /* bucket_id */ + (int64)bucket_id); + + /* bucket_start_time */ + if (pgsm->bucket_start_time[bucket_id] != 0) + appendStringInfo(&sql, "to_timestamp(%f)::timestamptz, ", + (double)(pgsm->bucket_start_time[bucket_id] / 1000000.0)); + else + appendStringInfo(&sql, "NULL, "); + + { + char *username_escaped = truncate_and_escape_string(entry->username, NAMEDATALEN); + char *datname_escaped = truncate_and_escape_string(entry->datname, NAMEDATALEN); + + appendStringInfo(&sql, "%u, '%s', %u, '%s', ", + entry->key.userid, username_escaped, entry->key.dbid, datname_escaped); + + pfree(username_escaped); + pfree(datname_escaped); + } + + /* client_ip */ + if (entry->key.ip != 0) + appendStringInfo(&sql, "'0.0.0.0'::inet + %ld, ", (int64)entry->key.ip); + else + appendStringInfo(&sql, "NULL, "); + + /* pgsm_query_id, queryid */ + appendStringInfo(&sql, "%ld, %ld, ", + (int64)entry->pgsm_query_id, + (int64)entry->key.queryid); + + /* toplevel */ + appendStringInfo(&sql, "%s, ", entry->key.toplevel ? "true" : "false"); + + /* top_queryid */ + if (entry->key.parentid != UINT64CONST(0)) + appendStringInfo(&sql, "%ld, ", (int64)entry->key.parentid); + else + appendStringInfo(&sql, "NULL, "); + + /* query */ + appendStringInfo(&sql, "'%s', ", query_txt); + + /* comments */ + APPEND_ESCAPED_STRING(sql, entry->counters.info.comments, 256); + + /* planid */ + if (entry->key.planid) + appendStringInfo(&sql, "%ld, ", (int64)entry->key.planid); + else + appendStringInfo(&sql, "NULL, "); + + /* query_plan */ + if (query_plan_txt) + appendStringInfo(&sql, "'%s', ", query_plan_txt); + else + appendStringInfo(&sql, "NULL, "); + + /* top_query */ + if (parent_query_txt) + appendStringInfo(&sql, "'%s', ", parent_query_txt); + else + appendStringInfo(&sql, "NULL, "); + + /* application_name */ + APPEND_ESCAPED_STRING(sql, entry->counters.info.application_name, NAMEDATALEN); + + /* relations - convert to PostgreSQL array format */ + if (relations_str && strlen(relations_str) > 0) + { + char *relations_copy; + char *token; + bool first_rel = true; + + appendStringInfo(&sql, "ARRAY["); + relations_copy = pstrdup(relations_str); + token = strtok(relations_copy, ","); + while (token != NULL) + { + if (!first_rel) + appendStringInfoChar(&sql, ','); + appendStringInfo(&sql, "'%s'", token); + first_rel = false; + token = strtok(NULL, ","); + } + appendStringInfo(&sql, "], "); + pfree(relations_copy); + } + else + appendStringInfo(&sql, "NULL, "); + + /* cmd_type */ + if (entry->counters.info.cmd_type != CMD_NOTHING) + appendStringInfo(&sql, "%d, ", entry->counters.info.cmd_type); + else + appendStringInfo(&sql, "NULL, "); + + /* cmd_type_text - convert to text */ + switch (entry->counters.info.cmd_type) + { + case CMD_SELECT: + appendStringInfo(&sql, "'SELECT', "); + break; + case CMD_UPDATE: + appendStringInfo(&sql, "'UPDATE', "); + break; + case CMD_INSERT: + appendStringInfo(&sql, "'INSERT', "); + break; + case CMD_DELETE: + appendStringInfo(&sql, "'DELETE', "); + break; + case CMD_UTILITY: + appendStringInfo(&sql, "'UTILITY', "); + break; + case CMD_NOTHING: + default: + appendStringInfo(&sql, "NULL, "); + break; + } + + /* elevel, sqlcode, message */ + appendStringInfo(&sql, "%ld, ", (long)entry->counters.error.elevel); + + APPEND_ESCAPED_STRING(sql, entry->counters.error.sqlcode, 5); + APPEND_ESCAPED_STRING(sql, entry->counters.error.message, 256); + + /* Execution stats */ + appendStringInfo(&sql, + "%ld, %.3f, %.3f, %.3f, %.3f, %.3f, %ld, ", + entry->counters.calls.calls, + entry->counters.time.total_time, + entry->counters.time.min_time, + entry->counters.time.max_time, + entry->counters.time.mean_time, + exec_stddev, + entry->counters.calls.rows); + + /* Planning stats */ + appendStringInfo(&sql, + "%ld, %.3f, %.3f, %.3f, %.3f, %.3f, ", + entry->counters.plancalls.calls, + entry->counters.plantime.total_time, + entry->counters.plantime.min_time, + entry->counters.plantime.max_time, + entry->counters.plantime.mean_time, + plan_stddev); + + /* Block stats */ + appendStringInfo(&sql, + "%ld, %ld, %ld, %ld, %ld, %ld, %ld, %ld, %ld, %ld, " + "%.3f, %.3f, %.3f, %.3f, %.3f, %.3f, ", + entry->counters.blocks.shared_blks_hit, + entry->counters.blocks.shared_blks_read, + entry->counters.blocks.shared_blks_dirtied, + entry->counters.blocks.shared_blks_written, + entry->counters.blocks.local_blks_hit, + entry->counters.blocks.local_blks_read, + entry->counters.blocks.local_blks_dirtied, + entry->counters.blocks.local_blks_written, + entry->counters.blocks.temp_blks_read, + entry->counters.blocks.temp_blks_written, + entry->counters.blocks.shared_blk_read_time, + entry->counters.blocks.shared_blk_write_time, + entry->counters.blocks.local_blk_read_time, + entry->counters.blocks.local_blk_write_time, + entry->counters.blocks.temp_blk_read_time, + entry->counters.blocks.temp_blk_write_time); + + /* System stats */ + appendStringInfo(&sql, + "%.3f, %.3f, ", + entry->counters.sysinfo.utime, + entry->counters.sysinfo.stime); + + /* WAL stats */ + appendStringInfo(&sql, + "%ld, %ld, %ld, ", + entry->counters.walusage.wal_records, + entry->counters.walusage.wal_fpi, + (int64)entry->counters.walusage.wal_bytes); + + /* JIT stats */ + appendStringInfo(&sql, + "%ld, %.3f, %ld, %.3f, %ld, %.3f, %ld, %.3f, %ld, %.3f, ", + entry->counters.jitinfo.jit_functions, + entry->counters.jitinfo.jit_generation_time, + entry->counters.jitinfo.jit_inlining_count, + entry->counters.jitinfo.jit_inlining_time, + entry->counters.jitinfo.jit_optimization_count, + entry->counters.jitinfo.jit_optimization_time, + entry->counters.jitinfo.jit_emission_count, + entry->counters.jitinfo.jit_emission_time, + entry->counters.jitinfo.jit_deform_count, + entry->counters.jitinfo.jit_deform_time); + + /* resp_calls */ + appendStringInfo(&sql, "%s, ", resp_calls_str); + + /* stats_since, minmax_stats_since */ + if (entry->stats_since != 0) + appendStringInfo(&sql, "to_timestamp(%f)::timestamptz, ", + (double)(entry->stats_since / 1000000.0)); + else + appendStringInfo(&sql, "NULL, "); + + if (entry->minmax_stats_since != 0) + appendStringInfo(&sql, "to_timestamp(%f)::timestamptz, ", + (double)(entry->minmax_stats_since / 1000000.0)); + else + appendStringInfo(&sql, "NULL, "); + + /* bucket_done - mark as false since bucket is still active */ + appendStringInfo(&sql, "false, "); + + /* exported_at - use current timestamp */ + appendStringInfo(&sql, "now())"); + + /* Clean up */ + if (query_txt) + pfree(query_txt); + if (parent_query_txt) + pfree(parent_query_txt); + if (query_plan_txt) + pfree(query_plan_txt); + if (relations_str) + pfree(relations_str); + if (resp_calls_str) + pfree(resp_calls_str); + } + + { + char *result = pstrdup(sql.data); + /* StringInfo manages its own memory - don't call pfree on sql.data */ + return result; + } +} + +/* + * Helper function to execute SQL statement + */ +static bool +execute_export_sql(const char *sql, uint64 bucket_id, int exported) +{ + int spi_result; + int ret; + bool export_successful = false; + bool spi_connected = false; + + elog(LOG, "pg_stat_monitor: About to export %d queries for bucket %lu", exported, (unsigned long)bucket_id); + + /* Attempt SPI execution with proper error handling */ + PG_TRY(); + { + /* Try to connect to SPI manager */ + spi_result = SPI_connect(); + if (spi_result != SPI_OK_CONNECT) + { + elog(DEBUG1, "pg_stat_monitor: Deferring export of bucket %lu (SPI connect failed: %d)", + (unsigned long)bucket_id, spi_result); + export_successful = false; + } + else + { + spi_connected = true; + + /* Execute the INSERT statement */ + elog(LOG, "pg_stat_monitor: Executing SQL for bucket %lu", (unsigned long)bucket_id); + elog(LOG, "pg_stat_monitor: FULL SQL: %s", sql); + + /* Add pre-execution debugging */ + elog(LOG, "pg_stat_monitor: About to call SPI_execute"); + elog(LOG, "pg_stat_monitor: SPI connection status: %d", SPI_result); + + ret = SPI_execute(sql, false, 0); + + elog(LOG, "pg_stat_monitor: SPI_execute returned: %d", ret); + elog(LOG, "pg_stat_monitor: SPI_OK_INSERT value is: %d", SPI_OK_INSERT); + elog(LOG, "pg_stat_monitor: SPI_processed: %lu", (unsigned long)SPI_processed); + + /* Check result */ + if (ret == SPI_OK_INSERT) + { + elog(LOG, "pg_stat_monitor: Successfully exported %d queries for bucket %lu", + exported, (unsigned long)bucket_id); + export_successful = true; + } + else + { + elog(WARNING, "pg_stat_monitor: Failed to insert bucket %lu data, SPI result: %d", + (unsigned long)bucket_id, ret); + export_successful = false; + } + + /* Always call SPI_finish() after successful SPI_connect() */ + SPI_finish(); + spi_connected = false; + } + } + PG_CATCH(); + { + /* Handle SPI context errors gracefully */ + ErrorData *errdata; + + /* Clean up SPI connection if it was established */ + if (spi_connected) + { + SPI_finish(); + } + + errdata = CopyErrorData(); + elog(DEBUG1, "pg_stat_monitor: Export deferred for bucket %lu: %s", + (unsigned long)bucket_id, errdata->message); + FreeErrorData(errdata); + FlushErrorState(); + export_successful = false; + } + PG_END_TRY(); + + return export_successful; +} + +/* + * Export bucket data to partitioned table + * Called from ExecutorEnd when a bucket needs export + * Returns true if export succeeded or no data to export, false if deferred + */ +bool +pgsm_export_bucket_to_table(uint64 bucket_id) +{ + EntryWithTime *entries_array = NULL; + char *sql = NULL; + pgsmSharedState *pgsm; + dsa_area *query_dsa_area; + int entry_count, queries_to_export; + bool export_successful = false; + + /* Log when export function is called */ + elog(DEBUG1, "pg_stat_monitor: Export function called for bucket %lu", (unsigned long)bucket_id); + + /* Get shared memory areas */ + pgsm = pgsm_get_ss(); + query_dsa_area = get_dsa_area_for_query_text(); + + /* Step 1: Collect and sort entries from this bucket */ + entries_array = collect_and_sort_entries(bucket_id, &entry_count); + + /* Step 2: Limit to top N queries */ + queries_to_export = (entry_count > MAX_QUERIES_PER_BUCKET) ? + MAX_QUERIES_PER_BUCKET : entry_count; + + if (queries_to_export == 0) + { + elog(DEBUG1, "pg_stat_monitor: No data to export for bucket %lu (found %d entries)", + (unsigned long)bucket_id, entry_count); + export_successful = true; /* No data to export is success */ + goto cleanup; + } + + /* Step 3: Build INSERT SQL statement */ + sql = build_insert_statement(entries_array, queries_to_export, bucket_id, pgsm, query_dsa_area); + + /* Step 4: Execute SQL statement */ + export_successful = execute_export_sql(sql, bucket_id, queries_to_export); + elog(LOG, "pg_stat_monitor: execute_export_sql returned %s for bucket %lu", + export_successful ? "true" : "false", (unsigned long)bucket_id); + +cleanup: + /* Single cleanup point to avoid double-free errors */ + if (sql) + pfree(sql); + if (entries_array) + pfree(entries_array); + + elog(LOG, "pg_stat_monitor: pgsm_export_bucket_to_table returning %s for bucket %lu", + export_successful ? "true" : "false", (unsigned long)bucket_id); + return export_successful; +} + +/* + * User-callable export function following PostgreSQL extension best practices + * This is the safe way to trigger exports - from a proper transaction context + * Similar to pg_stat_statements_reset() pattern + */ +PG_FUNCTION_INFO_V1(pg_stat_monitor_export); + +Datum +pg_stat_monitor_export(PG_FUNCTION_ARGS) +{ + pgsmSharedState *pgsm = pgsm_get_ss(); + uint64 bucket_to_export; + uint64 current_bucket_id; + int exported_buckets = 0; + + /* Check if user has appropriate permissions */ + if (!superuser()) + ereport(ERROR, + (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), + errmsg("must be superuser to export pg_stat_monitor data"))); + + /* Get current bucket ID */ + current_bucket_id = pg_atomic_read_u64(&pgsm->current_wbucket); + + /* Check for pending exports */ + bucket_to_export = pg_atomic_read_u64(&pgsm->bucket_to_export); + + /* Determine which bucket to export */ + { + uint64 target_bucket = (bucket_to_export == (uint64)-1) ? current_bucket_id : bucket_to_export; + + /* Export the target bucket */ + if (pgsm_export_bucket_to_table(target_bucket)) + { + exported_buckets = 1; + /* Clear export flag if it was a pending export */ + if (bucket_to_export != (uint64)-1) + pg_atomic_compare_exchange_u64(&pgsm->bucket_to_export, &bucket_to_export, (uint64)-1); + } + } + + PG_RETURN_INT32(exported_buckets); +} \ No newline at end of file