Add citus_stats UDF (#8026)

DESCRIPTION: Add `citus_stats` UDF

This UDF acts on a Citus table, and provides `null_frac`,
`most_common_vals` and `most_common_freqs` for each column in the table,
based on the definitions of these columns in the Postgres view
`pg_stats`.

**Aggregated Views: pg\_stats > citus\_stats** 

citus\_stats, is a **view** intended for use in **Citus**, a distributed
extension of PostgreSQL. It collects and returns **column-level**
**statistics** for a distributed table—specifically, the **most common
values**, their **frequencies,** and **fraction of null values**, like
pg\_stats view does for regular Postgres tables.

**Use Case** 

This view is useful when: 

- You need **column-level insights** on a distributed table. 
- You're performing **query optimization**, **cardinality estimation**,
or **data profiling** across shards.

**What It Returns** 

A **table** with: 

| Column Name | Data Type | Description |

|---------------------|-----------|-----------------------------------------------------------------------------|
| schemaname | text | Name of the schema containing the distributed
table |
| tablename | text | Name of the distributed table |
| attname | text | Name of the column (attribute) |
| null_frac | float4 | Estimated fraction of NULLs in the column across
all shards |
| most_common_vals | text[] | Array of most common values for the column
|
| most_common_freqs | float4[] | Array of corresponding frequencies (as
fractions) of the most common values|

**Caveats** 
- The function assumes that the array of the most common values among
different shards will be the same, therefore it just adds everything up.
pull/8130/head^2
Naisila Puka 2025-08-19 23:17:13 +03:00 committed by GitHub
parent bd0558fe39
commit eaa609f510
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 382 additions and 2 deletions

View File

@ -14,6 +14,8 @@
#include "udfs/citus_finish_pg_upgrade/13.2-1.sql" #include "udfs/citus_finish_pg_upgrade/13.2-1.sql"
#include "udfs/citus_stats/13.2-1.sql"
DO $drop_leftover_old_columnar_objects$ DO $drop_leftover_old_columnar_objects$
BEGIN BEGIN
-- If old columnar exists, i.e., the columnar access method that we had before Citus 11.1, -- If old columnar exists, i.e., the columnar access method that we had before Citus 11.1,

View File

@ -25,3 +25,5 @@ DROP FUNCTION IF EXISTS pg_catalog.get_snapshot_based_node_split_plan(text, inte
-- --
-- If the user wants to create "citus_columnar" extension later, "citus_columnar" -- If the user wants to create "citus_columnar" extension later, "citus_columnar"
-- will anyway properly create them at the scope of that extension. -- will anyway properly create them at the scope of that extension.
DROP VIEW IF EXISTS pg_catalog.citus_stats;

View File

@ -0,0 +1,82 @@
SET search_path = 'pg_catalog';
DROP VIEW IF EXISTS pg_catalog.citus_stats;
CREATE OR REPLACE VIEW citus.citus_stats AS
WITH most_common_vals_double_json AS (
SELECT ( SELECT json_agg(row_to_json(f)) FROM ( SELECT * FROM run_command_on_shards(logicalrelid,
$$ SELECT json_agg(row_to_json(shard_stats)) FROM (
SELECT '$$ || logicalrelid || $$' AS citus_table, attname, s.null_frac,
most_common_vals, most_common_freqs, c.reltuples AS reltuples
-- join on tablename is enough here, no need to join with pg_namespace
-- since shards have unique ids in their names, hence two shard names
-- could never be the same
FROM pg_stats s RIGHT JOIN pg_class c ON (s.tablename = c.relname)
WHERE c.oid = '%s'::regclass) shard_stats $$ ))f)
FROM pg_dist_partition),
most_common_vals_json AS (
SELECT (json_array_elements(json_agg)->>'result') AS result,
(json_array_elements(json_agg)->>'shardid') AS shardid
FROM most_common_vals_double_json),
table_reltuples_json AS (
SELECT distinct(shardid),
(json_array_elements(result::json)->>'reltuples')::bigint AS shard_reltuples,
(json_array_elements(result::json)->>'citus_table')::regclass AS citus_table
FROM most_common_vals_json),
table_reltuples AS (
SELECT citus_table, sum(shard_reltuples) AS table_reltuples
FROM table_reltuples_json GROUP BY 1 ORDER BY 1),
null_frac_json AS (
SELECT (json_array_elements(result::json)->>'citus_table')::regclass AS citus_table,
(json_array_elements(result::json)->>'reltuples')::bigint AS shard_reltuples,
(json_array_elements(result::json)->>'null_frac')::float4 AS null_frac,
(json_array_elements(result::json)->>'attname')::text AS attname
FROM most_common_vals_json
),
null_occurrences AS (
SELECT citus_table, attname, sum(null_frac * shard_reltuples)::bigint AS null_occurrences
FROM null_frac_json
GROUP BY 1, 2
ORDER BY 1, 2
),
most_common_vals AS (
SELECT (json_array_elements(result::json)->>'citus_table')::regclass AS citus_table,
(json_array_elements(result::json)->>'attname')::text AS attname,
json_array_elements_text((json_array_elements(result::json)->>'most_common_vals')::json)::text AS common_val,
json_array_elements_text((json_array_elements(result::json)->>'most_common_freqs')::json)::float4 AS common_freq,
(json_array_elements(result::json)->>'reltuples')::bigint AS shard_reltuples
FROM most_common_vals_json),
common_val_occurrence AS (
SELECT citus_table, m.attname, common_val,
sum(common_freq * shard_reltuples)::bigint AS occurrence
FROM most_common_vals m
GROUP BY citus_table, m.attname, common_val
ORDER BY 1, 2, occurrence DESC)
SELECT nsp.nspname AS schemaname, p.relname AS tablename, c.attname,
CASE WHEN max(t.table_reltuples::bigint) = 0 THEN 0
ELSE max(n.null_occurrences/t.table_reltuples)::float4 END AS null_frac,
ARRAY_agg(common_val) AS most_common_vals,
CASE WHEN max(t.table_reltuples::bigint) = 0 THEN NULL
ELSE ARRAY_agg((occurrence/t.table_reltuples)::float4) END AS most_common_freqs
FROM common_val_occurrence c, table_reltuples t, null_occurrences n, pg_class p, pg_namespace nsp
WHERE c.citus_table = t.citus_table
AND c.citus_table = n.citus_table AND c.attname = n.attname
AND c.citus_table::regclass::oid = p.oid AND p.relnamespace = nsp.oid
GROUP BY nsp.nspname, c.citus_table, p.relname, c.attname;
ALTER VIEW citus.citus_stats SET SCHEMA pg_catalog;
GRANT SELECT ON pg_catalog.citus_stats TO PUBLIC;
RESET search_path;

View File

@ -0,0 +1,82 @@
SET search_path = 'pg_catalog';
DROP VIEW IF EXISTS pg_catalog.citus_stats;
CREATE OR REPLACE VIEW citus.citus_stats AS
WITH most_common_vals_double_json AS (
SELECT ( SELECT json_agg(row_to_json(f)) FROM ( SELECT * FROM run_command_on_shards(logicalrelid,
$$ SELECT json_agg(row_to_json(shard_stats)) FROM (
SELECT '$$ || logicalrelid || $$' AS citus_table, attname, s.null_frac,
most_common_vals, most_common_freqs, c.reltuples AS reltuples
-- join on tablename is enough here, no need to join with pg_namespace
-- since shards have unique ids in their names, hence two shard names
-- could never be the same
FROM pg_stats s RIGHT JOIN pg_class c ON (s.tablename = c.relname)
WHERE c.oid = '%s'::regclass) shard_stats $$ ))f)
FROM pg_dist_partition),
most_common_vals_json AS (
SELECT (json_array_elements(json_agg)->>'result') AS result,
(json_array_elements(json_agg)->>'shardid') AS shardid
FROM most_common_vals_double_json),
table_reltuples_json AS (
SELECT distinct(shardid),
(json_array_elements(result::json)->>'reltuples')::bigint AS shard_reltuples,
(json_array_elements(result::json)->>'citus_table')::regclass AS citus_table
FROM most_common_vals_json),
table_reltuples AS (
SELECT citus_table, sum(shard_reltuples) AS table_reltuples
FROM table_reltuples_json GROUP BY 1 ORDER BY 1),
null_frac_json AS (
SELECT (json_array_elements(result::json)->>'citus_table')::regclass AS citus_table,
(json_array_elements(result::json)->>'reltuples')::bigint AS shard_reltuples,
(json_array_elements(result::json)->>'null_frac')::float4 AS null_frac,
(json_array_elements(result::json)->>'attname')::text AS attname
FROM most_common_vals_json
),
null_occurrences AS (
SELECT citus_table, attname, sum(null_frac * shard_reltuples)::bigint AS null_occurrences
FROM null_frac_json
GROUP BY 1, 2
ORDER BY 1, 2
),
most_common_vals AS (
SELECT (json_array_elements(result::json)->>'citus_table')::regclass AS citus_table,
(json_array_elements(result::json)->>'attname')::text AS attname,
json_array_elements_text((json_array_elements(result::json)->>'most_common_vals')::json)::text AS common_val,
json_array_elements_text((json_array_elements(result::json)->>'most_common_freqs')::json)::float4 AS common_freq,
(json_array_elements(result::json)->>'reltuples')::bigint AS shard_reltuples
FROM most_common_vals_json),
common_val_occurrence AS (
SELECT citus_table, m.attname, common_val,
sum(common_freq * shard_reltuples)::bigint AS occurrence
FROM most_common_vals m
GROUP BY citus_table, m.attname, common_val
ORDER BY 1, 2, occurrence DESC)
SELECT nsp.nspname AS schemaname, p.relname AS tablename, c.attname,
CASE WHEN max(t.table_reltuples::bigint) = 0 THEN 0
ELSE max(n.null_occurrences/t.table_reltuples)::float4 END AS null_frac,
ARRAY_agg(common_val) AS most_common_vals,
CASE WHEN max(t.table_reltuples::bigint) = 0 THEN NULL
ELSE ARRAY_agg((occurrence/t.table_reltuples)::float4) END AS most_common_freqs
FROM common_val_occurrence c, table_reltuples t, null_occurrences n, pg_class p, pg_namespace nsp
WHERE c.citus_table = t.citus_table
AND c.citus_table = n.citus_table AND c.attname = n.attname
AND c.citus_table::regclass::oid = p.oid AND p.relnamespace = nsp.oid
GROUP BY nsp.nspname, c.citus_table, p.relname, c.attname;
ALTER VIEW citus.citus_stats SET SCHEMA pg_catalog;
GRANT SELECT ON pg_catalog.citus_stats TO PUBLIC;
RESET search_path;

View File

@ -0,0 +1,122 @@
CREATE SCHEMA citus_aggregated_stats;
SET search_path TO citus_aggregated_stats, public;
SET citus.shard_count = 2;
SET citus.next_shard_id TO 1870000;
CREATE USER user1;
GRANT ALL ON SCHEMA citus_aggregated_stats TO user1;
SET SESSION AUTHORIZATION user1;
CREATE TABLE current_check (currentid int, payload text, rlsuser text);
GRANT ALL ON current_check TO PUBLIC;
INSERT INTO current_check VALUES
(1, 'abc', 'user1'),
(3, 'cde', 'user1'),
(4, 'def', 'user1'),
(4, 'def', 'user1'),
(3, 'cde', 'user2'),
(5, NULL, NULL),
(4, 'def', 'user1');
ALTER TABLE current_check ENABLE ROW LEVEL SECURITY;
SET row_security TO ON;
ANALYZE current_check;
SELECT schemaname, tablename, attname, null_frac, most_common_vals, most_common_freqs FROM pg_stats
WHERE tablename IN ('current_check', 'dist_current_check', 'ref_current_check', 'citus_local_current_check')
ORDER BY 1;
schemaname | tablename | attname | null_frac | most_common_vals | most_common_freqs
---------------------------------------------------------------------
citus_aggregated_stats | current_check | currentid | 0 | {4,3} | {0.428571,0.285714}
citus_aggregated_stats | current_check | payload | 0.142857 | {def,cde} | {0.428571,0.285714}
citus_aggregated_stats | current_check | rlsuser | 0.142857 | {user1} | {0.714286}
(3 rows)
SELECT * FROM citus_stats
WHERE tablename IN ('current_check', 'dist_current_check', 'ref_current_check', 'citus_local_current_check')
ORDER BY 1;
schemaname | tablename | attname | null_frac | most_common_vals | most_common_freqs
---------------------------------------------------------------------
(0 rows)
-- test various Citus table types in citus_stats
CREATE TABLE dist_current_check (currentid int, payload text, rlsuser text);
CREATE TABLE ref_current_check (currentid int, payload text, rlsuser text);
CREATE TABLE citus_local_current_check (currentid int, payload text, rlsuser text);
SELECT create_distributed_table('dist_current_check', 'currentid');
create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT create_reference_table('ref_current_check');
create_reference_table
---------------------------------------------------------------------
(1 row)
SELECT citus_add_local_table_to_metadata('citus_local_current_check');
citus_add_local_table_to_metadata
---------------------------------------------------------------------
(1 row)
INSERT INTO dist_current_check VALUES
(1, 'abc', 'user1'),
(3, 'cde', 'user1'),
(4, 'def', 'user1'),
(4, 'def', 'user1'),
(3, 'cde', 'user2'),
(5, NULL, NULL),
(4, 'def', 'user1');
INSERT INTO ref_current_check VALUES
(1, 'abc', 'user1'),
(3, 'cde', 'user1'),
(4, 'def', 'user1'),
(4, 'def', 'user1'),
(3, 'cde', 'user2'),
(5, NULL, NULL),
(4, 'def', 'user1');
INSERT INTO citus_local_current_check VALUES
(1, 'abc', 'user1'),
(3, 'cde', 'user1'),
(4, 'def', 'user1'),
(4, 'def', 'user1'),
(3, 'cde', 'user2'),
(5, NULL, NULL),
(4, 'def', 'user1');
ANALYZE dist_current_check;
ANALYZE ref_current_check;
ANALYZE citus_local_current_check;
SELECT schemaname, tablename, attname, null_frac, most_common_vals, most_common_freqs FROM pg_stats
WHERE tablename IN ('current_check', 'dist_current_check', 'ref_current_check', 'citus_local_current_check')
ORDER BY 1;
schemaname | tablename | attname | null_frac | most_common_vals | most_common_freqs
---------------------------------------------------------------------
citus_aggregated_stats | current_check | currentid | 0 | {4,3} | {0.428571,0.285714}
citus_aggregated_stats | current_check | payload | 0.142857 | {def,cde} | {0.428571,0.285714}
citus_aggregated_stats | current_check | rlsuser | 0.142857 | {user1} | {0.714286}
(3 rows)
SELECT * FROM citus_stats
WHERE tablename IN ('current_check', 'dist_current_check', 'ref_current_check', 'citus_local_current_check')
ORDER BY 1;
schemaname | tablename | attname | null_frac | most_common_vals | most_common_freqs
---------------------------------------------------------------------
citus_aggregated_stats | dist_current_check | currentid | 0 | {4,3} | {0.428571,0.285714}
citus_aggregated_stats | dist_current_check | payload | 0.142857 | {def,cde} | {0.428571,0.285714}
citus_aggregated_stats | dist_current_check | rlsuser | 0.142857 | {user1} | {0.714286}
citus_aggregated_stats | ref_current_check | currentid | 0 | {4,3} | {0.428571,0.285714}
citus_aggregated_stats | ref_current_check | payload | 0.142857 | {def,cde} | {0.428571,0.285714}
citus_aggregated_stats | ref_current_check | rlsuser | 0.142857 | {user1} | {0.714286}
citus_aggregated_stats | citus_local_current_check | currentid | 0 | {4,3} | {0.428571,0.285714}
citus_aggregated_stats | citus_local_current_check | payload | 0.142857 | {def,cde} | {0.428571,0.285714}
citus_aggregated_stats | citus_local_current_check | rlsuser | 0.142857 | {user1} | {0.714286}
(9 rows)
RESET SESSION AUTHORIZATION;
DROP SCHEMA citus_aggregated_stats CASCADE;
NOTICE: drop cascades to 6 other objects
DETAIL: drop cascades to table current_check
drop cascades to table dist_current_check
drop cascades to table ref_current_check
drop cascades to table citus_local_current_check_1870003
drop cascades to table ref_current_check_1870002
drop cascades to table citus_local_current_check
DROP USER user1;

View File

@ -1649,7 +1649,8 @@ SELECT * FROM multi_extension.print_extension_changes();
| function citus_remove_clone_node_with_nodeid(integer) void | function citus_remove_clone_node_with_nodeid(integer) void
| function get_snapshot_based_node_split_plan(text,integer,text,integer,name) TABLE(table_name regclass, shardid bigint, shard_size bigint, placement_node text) | function get_snapshot_based_node_split_plan(text,integer,text,integer,name) TABLE(table_name regclass, shardid bigint, shard_size bigint, placement_node text)
| function worker_last_saved_explain_analyze() TABLE(explain_analyze_output text, execution_duration double precision, execution_ntuples double precision, execution_nloops double precision) | function worker_last_saved_explain_analyze() TABLE(explain_analyze_output text, execution_duration double precision, execution_ntuples double precision, execution_nloops double precision)
(11 rows) | view citus_stats
(12 rows)
DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff; DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff;
-- show running version -- show running version

View File

@ -398,9 +398,10 @@ ORDER BY 1;
view citus_stat_statements view citus_stat_statements
view citus_stat_tenants view citus_stat_tenants
view citus_stat_tenants_local view citus_stat_tenants_local
view citus_stats
view citus_tables view citus_tables
view pg_dist_shard_placement view pg_dist_shard_placement
view time_partitions view time_partitions
(369 rows) (370 rows)
DROP TABLE extension_basic_types; DROP TABLE extension_basic_types;

View File

@ -32,6 +32,7 @@ test: propagate_extension_commands
test: escape_extension_name test: escape_extension_name
test: ref_citus_local_fkeys test: ref_citus_local_fkeys
test: alter_database_owner test: alter_database_owner
test: citus_aggregated_stats
test: seclabel test: seclabel
test: distributed_triggers test: distributed_triggers
test: create_single_shard_table test: create_single_shard_table

View File

@ -0,0 +1,87 @@
CREATE SCHEMA citus_aggregated_stats;
SET search_path TO citus_aggregated_stats, public;
SET citus.shard_count = 2;
SET citus.next_shard_id TO 1870000;
CREATE USER user1;
GRANT ALL ON SCHEMA citus_aggregated_stats TO user1;
SET SESSION AUTHORIZATION user1;
CREATE TABLE current_check (currentid int, payload text, rlsuser text);
GRANT ALL ON current_check TO PUBLIC;
INSERT INTO current_check VALUES
(1, 'abc', 'user1'),
(3, 'cde', 'user1'),
(4, 'def', 'user1'),
(4, 'def', 'user1'),
(3, 'cde', 'user2'),
(5, NULL, NULL),
(4, 'def', 'user1');
ALTER TABLE current_check ENABLE ROW LEVEL SECURITY;
SET row_security TO ON;
ANALYZE current_check;
SELECT schemaname, tablename, attname, null_frac, most_common_vals, most_common_freqs FROM pg_stats
WHERE tablename IN ('current_check', 'dist_current_check', 'ref_current_check', 'citus_local_current_check')
ORDER BY 1;
SELECT * FROM citus_stats
WHERE tablename IN ('current_check', 'dist_current_check', 'ref_current_check', 'citus_local_current_check')
ORDER BY 1;
-- test various Citus table types in citus_stats
CREATE TABLE dist_current_check (currentid int, payload text, rlsuser text);
CREATE TABLE ref_current_check (currentid int, payload text, rlsuser text);
CREATE TABLE citus_local_current_check (currentid int, payload text, rlsuser text);
SELECT create_distributed_table('dist_current_check', 'currentid');
SELECT create_reference_table('ref_current_check');
SELECT citus_add_local_table_to_metadata('citus_local_current_check');
INSERT INTO dist_current_check VALUES
(1, 'abc', 'user1'),
(3, 'cde', 'user1'),
(4, 'def', 'user1'),
(4, 'def', 'user1'),
(3, 'cde', 'user2'),
(5, NULL, NULL),
(4, 'def', 'user1');
INSERT INTO ref_current_check VALUES
(1, 'abc', 'user1'),
(3, 'cde', 'user1'),
(4, 'def', 'user1'),
(4, 'def', 'user1'),
(3, 'cde', 'user2'),
(5, NULL, NULL),
(4, 'def', 'user1');
INSERT INTO citus_local_current_check VALUES
(1, 'abc', 'user1'),
(3, 'cde', 'user1'),
(4, 'def', 'user1'),
(4, 'def', 'user1'),
(3, 'cde', 'user2'),
(5, NULL, NULL),
(4, 'def', 'user1');
ANALYZE dist_current_check;
ANALYZE ref_current_check;
ANALYZE citus_local_current_check;
SELECT schemaname, tablename, attname, null_frac, most_common_vals, most_common_freqs FROM pg_stats
WHERE tablename IN ('current_check', 'dist_current_check', 'ref_current_check', 'citus_local_current_check')
ORDER BY 1;
SELECT * FROM citus_stats
WHERE tablename IN ('current_check', 'dist_current_check', 'ref_current_check', 'citus_local_current_check')
ORDER BY 1;
RESET SESSION AUTHORIZATION;
DROP SCHEMA citus_aggregated_stats CASCADE;
DROP USER user1;