From eaa609f510fc8ae07aa10039e808bc3f0938667d Mon Sep 17 00:00:00 2001 From: Naisila Puka <37271756+naisila@users.noreply.github.com> Date: Tue, 19 Aug 2025 23:17:13 +0300 Subject: [PATCH] Add citus_stats UDF (#8026) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit DESCRIPTION: Add `citus_stats` UDF This UDF acts on a Citus table, and provides `null_frac`, `most_common_vals` and `most_common_freqs` for each column in the table, based on the definitions of these columns in the Postgres view `pg_stats`. **Aggregated Views: pg\_stats > citus\_stats** citus\_stats, is a **view** intended for use in **Citus**, a distributed extension of PostgreSQL. It collects and returns **column-level** **statistics** for a distributed table—specifically, the **most common values**, their **frequencies,** and **fraction of null values**, like pg\_stats view does for regular Postgres tables. **Use Case** This view is useful when: - You need **column-level insights** on a distributed table. - You're performing **query optimization**, **cardinality estimation**, or **data profiling** across shards. **What It Returns** A **table** with: | Column Name | Data Type | Description | |---------------------|-----------|-----------------------------------------------------------------------------| | schemaname | text | Name of the schema containing the distributed table | | tablename | text | Name of the distributed table | | attname | text | Name of the column (attribute) | | null_frac | float4 | Estimated fraction of NULLs in the column across all shards | | most_common_vals | text[] | Array of most common values for the column | | most_common_freqs | float4[] | Array of corresponding frequencies (as fractions) of the most common values| **Caveats** - The function assumes that the array of the most common values among different shards will be the same, therefore it just adds everything up. --- .../distributed/sql/citus--13.1-1--13.2-1.sql | 2 + .../sql/downgrades/citus--13.2-1--13.1-1.sql | 2 + .../sql/udfs/citus_stats/13.2-1.sql | 82 ++++++++++++ .../sql/udfs/citus_stats/latest.sql | 82 ++++++++++++ .../expected/citus_aggregated_stats.out | 122 ++++++++++++++++++ src/test/regress/expected/multi_extension.out | 3 +- .../expected/upgrade_list_citus_objects.out | 3 +- src/test/regress/multi_1_schedule | 1 + .../regress/sql/citus_aggregated_stats.sql | 87 +++++++++++++ 9 files changed, 382 insertions(+), 2 deletions(-) create mode 100644 src/backend/distributed/sql/udfs/citus_stats/13.2-1.sql create mode 100644 src/backend/distributed/sql/udfs/citus_stats/latest.sql create mode 100644 src/test/regress/expected/citus_aggregated_stats.out create mode 100644 src/test/regress/sql/citus_aggregated_stats.sql diff --git a/src/backend/distributed/sql/citus--13.1-1--13.2-1.sql b/src/backend/distributed/sql/citus--13.1-1--13.2-1.sql index 610f5f2e4..2bf4f148c 100644 --- a/src/backend/distributed/sql/citus--13.1-1--13.2-1.sql +++ b/src/backend/distributed/sql/citus--13.1-1--13.2-1.sql @@ -14,6 +14,8 @@ #include "udfs/citus_finish_pg_upgrade/13.2-1.sql" +#include "udfs/citus_stats/13.2-1.sql" + DO $drop_leftover_old_columnar_objects$ BEGIN -- If old columnar exists, i.e., the columnar access method that we had before Citus 11.1, diff --git a/src/backend/distributed/sql/downgrades/citus--13.2-1--13.1-1.sql b/src/backend/distributed/sql/downgrades/citus--13.2-1--13.1-1.sql index 0be7c2869..0f29e236c 100644 --- a/src/backend/distributed/sql/downgrades/citus--13.2-1--13.1-1.sql +++ b/src/backend/distributed/sql/downgrades/citus--13.2-1--13.1-1.sql @@ -25,3 +25,5 @@ DROP FUNCTION IF EXISTS pg_catalog.get_snapshot_based_node_split_plan(text, inte -- -- If the user wants to create "citus_columnar" extension later, "citus_columnar" -- will anyway properly create them at the scope of that extension. + +DROP VIEW IF EXISTS pg_catalog.citus_stats; diff --git a/src/backend/distributed/sql/udfs/citus_stats/13.2-1.sql b/src/backend/distributed/sql/udfs/citus_stats/13.2-1.sql new file mode 100644 index 000000000..3b0f62bd4 --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_stats/13.2-1.sql @@ -0,0 +1,82 @@ +SET search_path = 'pg_catalog'; +DROP VIEW IF EXISTS pg_catalog.citus_stats; + +CREATE OR REPLACE VIEW citus.citus_stats AS + +WITH most_common_vals_double_json AS ( + SELECT ( SELECT json_agg(row_to_json(f)) FROM ( SELECT * FROM run_command_on_shards(logicalrelid, + $$ SELECT json_agg(row_to_json(shard_stats)) FROM ( + SELECT '$$ || logicalrelid || $$' AS citus_table, attname, s.null_frac, + most_common_vals, most_common_freqs, c.reltuples AS reltuples + -- join on tablename is enough here, no need to join with pg_namespace + -- since shards have unique ids in their names, hence two shard names + -- could never be the same + FROM pg_stats s RIGHT JOIN pg_class c ON (s.tablename = c.relname) + WHERE c.oid = '%s'::regclass) shard_stats $$ ))f) + FROM pg_dist_partition), + +most_common_vals_json AS ( + SELECT (json_array_elements(json_agg)->>'result') AS result, + (json_array_elements(json_agg)->>'shardid') AS shardid + FROM most_common_vals_double_json), + +table_reltuples_json AS ( + SELECT distinct(shardid), + (json_array_elements(result::json)->>'reltuples')::bigint AS shard_reltuples, + (json_array_elements(result::json)->>'citus_table')::regclass AS citus_table + FROM most_common_vals_json), + +table_reltuples AS ( + SELECT citus_table, sum(shard_reltuples) AS table_reltuples + FROM table_reltuples_json GROUP BY 1 ORDER BY 1), + +null_frac_json AS ( + SELECT (json_array_elements(result::json)->>'citus_table')::regclass AS citus_table, + (json_array_elements(result::json)->>'reltuples')::bigint AS shard_reltuples, + (json_array_elements(result::json)->>'null_frac')::float4 AS null_frac, + (json_array_elements(result::json)->>'attname')::text AS attname + FROM most_common_vals_json +), + +null_occurrences AS ( + SELECT citus_table, attname, sum(null_frac * shard_reltuples)::bigint AS null_occurrences + FROM null_frac_json + GROUP BY 1, 2 + ORDER BY 1, 2 +), + +most_common_vals AS ( + SELECT (json_array_elements(result::json)->>'citus_table')::regclass AS citus_table, + (json_array_elements(result::json)->>'attname')::text AS attname, + json_array_elements_text((json_array_elements(result::json)->>'most_common_vals')::json)::text AS common_val, + json_array_elements_text((json_array_elements(result::json)->>'most_common_freqs')::json)::float4 AS common_freq, + (json_array_elements(result::json)->>'reltuples')::bigint AS shard_reltuples + FROM most_common_vals_json), + +common_val_occurrence AS ( + SELECT citus_table, m.attname, common_val, + sum(common_freq * shard_reltuples)::bigint AS occurrence + FROM most_common_vals m + GROUP BY citus_table, m.attname, common_val + ORDER BY 1, 2, occurrence DESC) + +SELECT nsp.nspname AS schemaname, p.relname AS tablename, c.attname, + + CASE WHEN max(t.table_reltuples::bigint) = 0 THEN 0 + ELSE max(n.null_occurrences/t.table_reltuples)::float4 END AS null_frac, + + ARRAY_agg(common_val) AS most_common_vals, + + CASE WHEN max(t.table_reltuples::bigint) = 0 THEN NULL + ELSE ARRAY_agg((occurrence/t.table_reltuples)::float4) END AS most_common_freqs + +FROM common_val_occurrence c, table_reltuples t, null_occurrences n, pg_class p, pg_namespace nsp +WHERE c.citus_table = t.citus_table + AND c.citus_table = n.citus_table AND c.attname = n.attname + AND c.citus_table::regclass::oid = p.oid AND p.relnamespace = nsp.oid +GROUP BY nsp.nspname, c.citus_table, p.relname, c.attname; + +ALTER VIEW citus.citus_stats SET SCHEMA pg_catalog; +GRANT SELECT ON pg_catalog.citus_stats TO PUBLIC; + +RESET search_path; diff --git a/src/backend/distributed/sql/udfs/citus_stats/latest.sql b/src/backend/distributed/sql/udfs/citus_stats/latest.sql new file mode 100644 index 000000000..3b0f62bd4 --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_stats/latest.sql @@ -0,0 +1,82 @@ +SET search_path = 'pg_catalog'; +DROP VIEW IF EXISTS pg_catalog.citus_stats; + +CREATE OR REPLACE VIEW citus.citus_stats AS + +WITH most_common_vals_double_json AS ( + SELECT ( SELECT json_agg(row_to_json(f)) FROM ( SELECT * FROM run_command_on_shards(logicalrelid, + $$ SELECT json_agg(row_to_json(shard_stats)) FROM ( + SELECT '$$ || logicalrelid || $$' AS citus_table, attname, s.null_frac, + most_common_vals, most_common_freqs, c.reltuples AS reltuples + -- join on tablename is enough here, no need to join with pg_namespace + -- since shards have unique ids in their names, hence two shard names + -- could never be the same + FROM pg_stats s RIGHT JOIN pg_class c ON (s.tablename = c.relname) + WHERE c.oid = '%s'::regclass) shard_stats $$ ))f) + FROM pg_dist_partition), + +most_common_vals_json AS ( + SELECT (json_array_elements(json_agg)->>'result') AS result, + (json_array_elements(json_agg)->>'shardid') AS shardid + FROM most_common_vals_double_json), + +table_reltuples_json AS ( + SELECT distinct(shardid), + (json_array_elements(result::json)->>'reltuples')::bigint AS shard_reltuples, + (json_array_elements(result::json)->>'citus_table')::regclass AS citus_table + FROM most_common_vals_json), + +table_reltuples AS ( + SELECT citus_table, sum(shard_reltuples) AS table_reltuples + FROM table_reltuples_json GROUP BY 1 ORDER BY 1), + +null_frac_json AS ( + SELECT (json_array_elements(result::json)->>'citus_table')::regclass AS citus_table, + (json_array_elements(result::json)->>'reltuples')::bigint AS shard_reltuples, + (json_array_elements(result::json)->>'null_frac')::float4 AS null_frac, + (json_array_elements(result::json)->>'attname')::text AS attname + FROM most_common_vals_json +), + +null_occurrences AS ( + SELECT citus_table, attname, sum(null_frac * shard_reltuples)::bigint AS null_occurrences + FROM null_frac_json + GROUP BY 1, 2 + ORDER BY 1, 2 +), + +most_common_vals AS ( + SELECT (json_array_elements(result::json)->>'citus_table')::regclass AS citus_table, + (json_array_elements(result::json)->>'attname')::text AS attname, + json_array_elements_text((json_array_elements(result::json)->>'most_common_vals')::json)::text AS common_val, + json_array_elements_text((json_array_elements(result::json)->>'most_common_freqs')::json)::float4 AS common_freq, + (json_array_elements(result::json)->>'reltuples')::bigint AS shard_reltuples + FROM most_common_vals_json), + +common_val_occurrence AS ( + SELECT citus_table, m.attname, common_val, + sum(common_freq * shard_reltuples)::bigint AS occurrence + FROM most_common_vals m + GROUP BY citus_table, m.attname, common_val + ORDER BY 1, 2, occurrence DESC) + +SELECT nsp.nspname AS schemaname, p.relname AS tablename, c.attname, + + CASE WHEN max(t.table_reltuples::bigint) = 0 THEN 0 + ELSE max(n.null_occurrences/t.table_reltuples)::float4 END AS null_frac, + + ARRAY_agg(common_val) AS most_common_vals, + + CASE WHEN max(t.table_reltuples::bigint) = 0 THEN NULL + ELSE ARRAY_agg((occurrence/t.table_reltuples)::float4) END AS most_common_freqs + +FROM common_val_occurrence c, table_reltuples t, null_occurrences n, pg_class p, pg_namespace nsp +WHERE c.citus_table = t.citus_table + AND c.citus_table = n.citus_table AND c.attname = n.attname + AND c.citus_table::regclass::oid = p.oid AND p.relnamespace = nsp.oid +GROUP BY nsp.nspname, c.citus_table, p.relname, c.attname; + +ALTER VIEW citus.citus_stats SET SCHEMA pg_catalog; +GRANT SELECT ON pg_catalog.citus_stats TO PUBLIC; + +RESET search_path; diff --git a/src/test/regress/expected/citus_aggregated_stats.out b/src/test/regress/expected/citus_aggregated_stats.out new file mode 100644 index 000000000..65dc294ea --- /dev/null +++ b/src/test/regress/expected/citus_aggregated_stats.out @@ -0,0 +1,122 @@ +CREATE SCHEMA citus_aggregated_stats; +SET search_path TO citus_aggregated_stats, public; +SET citus.shard_count = 2; +SET citus.next_shard_id TO 1870000; +CREATE USER user1; +GRANT ALL ON SCHEMA citus_aggregated_stats TO user1; +SET SESSION AUTHORIZATION user1; +CREATE TABLE current_check (currentid int, payload text, rlsuser text); +GRANT ALL ON current_check TO PUBLIC; +INSERT INTO current_check VALUES + (1, 'abc', 'user1'), + (3, 'cde', 'user1'), + (4, 'def', 'user1'), + (4, 'def', 'user1'), + (3, 'cde', 'user2'), + (5, NULL, NULL), + (4, 'def', 'user1'); +ALTER TABLE current_check ENABLE ROW LEVEL SECURITY; +SET row_security TO ON; +ANALYZE current_check; +SELECT schemaname, tablename, attname, null_frac, most_common_vals, most_common_freqs FROM pg_stats + WHERE tablename IN ('current_check', 'dist_current_check', 'ref_current_check', 'citus_local_current_check') + ORDER BY 1; + schemaname | tablename | attname | null_frac | most_common_vals | most_common_freqs +--------------------------------------------------------------------- + citus_aggregated_stats | current_check | currentid | 0 | {4,3} | {0.428571,0.285714} + citus_aggregated_stats | current_check | payload | 0.142857 | {def,cde} | {0.428571,0.285714} + citus_aggregated_stats | current_check | rlsuser | 0.142857 | {user1} | {0.714286} +(3 rows) + +SELECT * FROM citus_stats + WHERE tablename IN ('current_check', 'dist_current_check', 'ref_current_check', 'citus_local_current_check') + ORDER BY 1; + schemaname | tablename | attname | null_frac | most_common_vals | most_common_freqs +--------------------------------------------------------------------- +(0 rows) + +-- test various Citus table types in citus_stats +CREATE TABLE dist_current_check (currentid int, payload text, rlsuser text); +CREATE TABLE ref_current_check (currentid int, payload text, rlsuser text); +CREATE TABLE citus_local_current_check (currentid int, payload text, rlsuser text); +SELECT create_distributed_table('dist_current_check', 'currentid'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT create_reference_table('ref_current_check'); + create_reference_table +--------------------------------------------------------------------- + +(1 row) + +SELECT citus_add_local_table_to_metadata('citus_local_current_check'); + citus_add_local_table_to_metadata +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO dist_current_check VALUES + (1, 'abc', 'user1'), + (3, 'cde', 'user1'), + (4, 'def', 'user1'), + (4, 'def', 'user1'), + (3, 'cde', 'user2'), + (5, NULL, NULL), + (4, 'def', 'user1'); +INSERT INTO ref_current_check VALUES + (1, 'abc', 'user1'), + (3, 'cde', 'user1'), + (4, 'def', 'user1'), + (4, 'def', 'user1'), + (3, 'cde', 'user2'), + (5, NULL, NULL), + (4, 'def', 'user1'); +INSERT INTO citus_local_current_check VALUES + (1, 'abc', 'user1'), + (3, 'cde', 'user1'), + (4, 'def', 'user1'), + (4, 'def', 'user1'), + (3, 'cde', 'user2'), + (5, NULL, NULL), + (4, 'def', 'user1'); +ANALYZE dist_current_check; +ANALYZE ref_current_check; +ANALYZE citus_local_current_check; +SELECT schemaname, tablename, attname, null_frac, most_common_vals, most_common_freqs FROM pg_stats + WHERE tablename IN ('current_check', 'dist_current_check', 'ref_current_check', 'citus_local_current_check') + ORDER BY 1; + schemaname | tablename | attname | null_frac | most_common_vals | most_common_freqs +--------------------------------------------------------------------- + citus_aggregated_stats | current_check | currentid | 0 | {4,3} | {0.428571,0.285714} + citus_aggregated_stats | current_check | payload | 0.142857 | {def,cde} | {0.428571,0.285714} + citus_aggregated_stats | current_check | rlsuser | 0.142857 | {user1} | {0.714286} +(3 rows) + +SELECT * FROM citus_stats + WHERE tablename IN ('current_check', 'dist_current_check', 'ref_current_check', 'citus_local_current_check') + ORDER BY 1; + schemaname | tablename | attname | null_frac | most_common_vals | most_common_freqs +--------------------------------------------------------------------- + citus_aggregated_stats | dist_current_check | currentid | 0 | {4,3} | {0.428571,0.285714} + citus_aggregated_stats | dist_current_check | payload | 0.142857 | {def,cde} | {0.428571,0.285714} + citus_aggregated_stats | dist_current_check | rlsuser | 0.142857 | {user1} | {0.714286} + citus_aggregated_stats | ref_current_check | currentid | 0 | {4,3} | {0.428571,0.285714} + citus_aggregated_stats | ref_current_check | payload | 0.142857 | {def,cde} | {0.428571,0.285714} + citus_aggregated_stats | ref_current_check | rlsuser | 0.142857 | {user1} | {0.714286} + citus_aggregated_stats | citus_local_current_check | currentid | 0 | {4,3} | {0.428571,0.285714} + citus_aggregated_stats | citus_local_current_check | payload | 0.142857 | {def,cde} | {0.428571,0.285714} + citus_aggregated_stats | citus_local_current_check | rlsuser | 0.142857 | {user1} | {0.714286} +(9 rows) + +RESET SESSION AUTHORIZATION; +DROP SCHEMA citus_aggregated_stats CASCADE; +NOTICE: drop cascades to 6 other objects +DETAIL: drop cascades to table current_check +drop cascades to table dist_current_check +drop cascades to table ref_current_check +drop cascades to table citus_local_current_check_1870003 +drop cascades to table ref_current_check_1870002 +drop cascades to table citus_local_current_check +DROP USER user1; diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index 61db1fcea..f11f76bc6 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -1649,7 +1649,8 @@ SELECT * FROM multi_extension.print_extension_changes(); | function citus_remove_clone_node_with_nodeid(integer) void | function get_snapshot_based_node_split_plan(text,integer,text,integer,name) TABLE(table_name regclass, shardid bigint, shard_size bigint, placement_node text) | function worker_last_saved_explain_analyze() TABLE(explain_analyze_output text, execution_duration double precision, execution_ntuples double precision, execution_nloops double precision) -(11 rows) + | view citus_stats +(12 rows) DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff; -- show running version diff --git a/src/test/regress/expected/upgrade_list_citus_objects.out b/src/test/regress/expected/upgrade_list_citus_objects.out index a9628da96..76121c3ef 100644 --- a/src/test/regress/expected/upgrade_list_citus_objects.out +++ b/src/test/regress/expected/upgrade_list_citus_objects.out @@ -398,9 +398,10 @@ ORDER BY 1; view citus_stat_statements view citus_stat_tenants view citus_stat_tenants_local + view citus_stats view citus_tables view pg_dist_shard_placement view time_partitions -(369 rows) +(370 rows) DROP TABLE extension_basic_types; diff --git a/src/test/regress/multi_1_schedule b/src/test/regress/multi_1_schedule index 43527ff8a..59fdae959 100644 --- a/src/test/regress/multi_1_schedule +++ b/src/test/regress/multi_1_schedule @@ -32,6 +32,7 @@ test: propagate_extension_commands test: escape_extension_name test: ref_citus_local_fkeys test: alter_database_owner +test: citus_aggregated_stats test: seclabel test: distributed_triggers test: create_single_shard_table diff --git a/src/test/regress/sql/citus_aggregated_stats.sql b/src/test/regress/sql/citus_aggregated_stats.sql new file mode 100644 index 000000000..e8ce9e314 --- /dev/null +++ b/src/test/regress/sql/citus_aggregated_stats.sql @@ -0,0 +1,87 @@ +CREATE SCHEMA citus_aggregated_stats; +SET search_path TO citus_aggregated_stats, public; +SET citus.shard_count = 2; +SET citus.next_shard_id TO 1870000; + +CREATE USER user1; +GRANT ALL ON SCHEMA citus_aggregated_stats TO user1; + +SET SESSION AUTHORIZATION user1; + +CREATE TABLE current_check (currentid int, payload text, rlsuser text); +GRANT ALL ON current_check TO PUBLIC; + +INSERT INTO current_check VALUES + (1, 'abc', 'user1'), + (3, 'cde', 'user1'), + (4, 'def', 'user1'), + (4, 'def', 'user1'), + (3, 'cde', 'user2'), + (5, NULL, NULL), + (4, 'def', 'user1'); + +ALTER TABLE current_check ENABLE ROW LEVEL SECURITY; + +SET row_security TO ON; + +ANALYZE current_check; + +SELECT schemaname, tablename, attname, null_frac, most_common_vals, most_common_freqs FROM pg_stats + WHERE tablename IN ('current_check', 'dist_current_check', 'ref_current_check', 'citus_local_current_check') + ORDER BY 1; + +SELECT * FROM citus_stats + WHERE tablename IN ('current_check', 'dist_current_check', 'ref_current_check', 'citus_local_current_check') + ORDER BY 1; + +-- test various Citus table types in citus_stats +CREATE TABLE dist_current_check (currentid int, payload text, rlsuser text); +CREATE TABLE ref_current_check (currentid int, payload text, rlsuser text); +CREATE TABLE citus_local_current_check (currentid int, payload text, rlsuser text); + +SELECT create_distributed_table('dist_current_check', 'currentid'); +SELECT create_reference_table('ref_current_check'); +SELECT citus_add_local_table_to_metadata('citus_local_current_check'); + +INSERT INTO dist_current_check VALUES + (1, 'abc', 'user1'), + (3, 'cde', 'user1'), + (4, 'def', 'user1'), + (4, 'def', 'user1'), + (3, 'cde', 'user2'), + (5, NULL, NULL), + (4, 'def', 'user1'); + +INSERT INTO ref_current_check VALUES + (1, 'abc', 'user1'), + (3, 'cde', 'user1'), + (4, 'def', 'user1'), + (4, 'def', 'user1'), + (3, 'cde', 'user2'), + (5, NULL, NULL), + (4, 'def', 'user1'); + +INSERT INTO citus_local_current_check VALUES + (1, 'abc', 'user1'), + (3, 'cde', 'user1'), + (4, 'def', 'user1'), + (4, 'def', 'user1'), + (3, 'cde', 'user2'), + (5, NULL, NULL), + (4, 'def', 'user1'); + +ANALYZE dist_current_check; +ANALYZE ref_current_check; +ANALYZE citus_local_current_check; + +SELECT schemaname, tablename, attname, null_frac, most_common_vals, most_common_freqs FROM pg_stats + WHERE tablename IN ('current_check', 'dist_current_check', 'ref_current_check', 'citus_local_current_check') + ORDER BY 1; + +SELECT * FROM citus_stats + WHERE tablename IN ('current_check', 'dist_current_check', 'ref_current_check', 'citus_local_current_check') + ORDER BY 1; + +RESET SESSION AUTHORIZATION; +DROP SCHEMA citus_aggregated_stats CASCADE; +DROP USER user1;