Add citus_column_stats UDF

naisila/citus_column_stats
naisila 2025-06-20 17:34:48 +02:00
parent 55a0d1f730
commit f55c636dbb
9 changed files with 235 additions and 3 deletions

View File

@ -51,6 +51,7 @@ DROP VIEW IF EXISTS pg_catalog.citus_lock_waits;
#include "udfs/citus_stat_counters/13.1-1.sql"
#include "udfs/citus_stat_counters_reset/13.1-1.sql"
#include "udfs/citus_nodes/13.1-1.sql"
#include "udfs/citus_column_stats/13.1-1.sql"
-- Since shard_name/13.1-1.sql first drops the function and then creates it, we first
-- need to drop citus_shards view since that view depends on this function. And immediately

View File

@ -46,6 +46,7 @@ DROP VIEW pg_catalog.citus_stat_counters;
DROP FUNCTION pg_catalog.citus_stat_counters(oid);
DROP FUNCTION pg_catalog.citus_stat_counters_reset(oid);
DROP VIEW IF EXISTS pg_catalog.citus_nodes;
DROP FUNCTION IF EXISTS pg_catalog.citus_column_stats;
-- Definition of shard_name() prior to this release doesn't have a separate SQL file
-- because it's quite an old UDF that its prior definition(s) was(were) squashed into

View File

@ -0,0 +1,56 @@
CREATE OR REPLACE FUNCTION pg_catalog.citus_column_stats(
qualified_table_name text)
RETURNS TABLE (
attname text,
most_common_vals text[],
most_common_freqs float4[]
)
AS $func$
BEGIN
IF NOT EXISTS (SELECT * FROM pg_dist_partition WHERE logicalrelid = qualified_table_name::regclass) THEN
RAISE EXCEPTION 'Not a Citus table';
ELSE
RETURN QUERY
WITH most_common_vals_json AS (
SELECT * FROM run_command_on_shards(qualified_table_name,
$$ SELECT json_agg(row_to_json(shard_stats)) FROM (
SELECT attname, most_common_vals, most_common_freqs, c.reltuples AS reltuples
FROM pg_stats s RIGHT JOIN pg_class c ON (s.tablename = c.relname)
WHERE c.relname = '%s') shard_stats $$ )),
table_reltuples_json AS (
SELECT distinct(shardid),
(json_array_elements(result::json)->>'reltuples')::bigint AS shard_reltuples
FROM most_common_vals_json),
table_reltuples AS (
SELECT sum(shard_reltuples) AS table_reltuples FROM table_reltuples_json),
most_common_vals AS (
SELECT shardid,
(json_array_elements(result::json)->>'attname')::text AS attname,
json_array_elements_text((json_array_elements(result::json)->>'most_common_vals')::json)::text AS common_val,
json_array_elements_text((json_array_elements(result::json)->>'most_common_freqs')::json)::float4 AS common_freq,
(json_array_elements(result::json)->>'reltuples')::bigint AS shard_reltuples
FROM most_common_vals_json),
common_val_occurrence AS (
SELECT m.attname, common_val, sum(common_freq * shard_reltuples)::bigint AS occurrence
FROM most_common_vals m
GROUP BY m.attname, common_val
ORDER BY m.attname, occurrence DESC, common_val)
SELECT c.attname,
ARRAY_agg(common_val) AS most_common_vals,
ARRAY_agg((occurrence/t.table_reltuples)::float4) AS most_common_freqs
FROM common_val_occurrence c, table_reltuples t
GROUP BY c.attname;
END IF;
END;
$func$ LANGUAGE plpgsql;
COMMENT ON FUNCTION pg_catalog.citus_column_stats(
qualified_table_name text)
IS 'provides some pg_stats for columns of input Citus table';

View File

@ -0,0 +1,56 @@
CREATE OR REPLACE FUNCTION pg_catalog.citus_column_stats(
qualified_table_name text)
RETURNS TABLE (
attname text,
most_common_vals text[],
most_common_freqs float4[]
)
AS $func$
BEGIN
IF NOT EXISTS (SELECT * FROM pg_dist_partition WHERE logicalrelid = qualified_table_name::regclass) THEN
RAISE EXCEPTION 'Not a Citus table';
ELSE
RETURN QUERY
WITH most_common_vals_json AS (
SELECT * FROM run_command_on_shards(qualified_table_name,
$$ SELECT json_agg(row_to_json(shard_stats)) FROM (
SELECT attname, most_common_vals, most_common_freqs, c.reltuples AS reltuples
FROM pg_stats s RIGHT JOIN pg_class c ON (s.tablename = c.relname)
WHERE c.relname = '%s') shard_stats $$ )),
table_reltuples_json AS (
SELECT distinct(shardid),
(json_array_elements(result::json)->>'reltuples')::bigint AS shard_reltuples
FROM most_common_vals_json),
table_reltuples AS (
SELECT sum(shard_reltuples) AS table_reltuples FROM table_reltuples_json),
most_common_vals AS (
SELECT shardid,
(json_array_elements(result::json)->>'attname')::text AS attname,
json_array_elements_text((json_array_elements(result::json)->>'most_common_vals')::json)::text AS common_val,
json_array_elements_text((json_array_elements(result::json)->>'most_common_freqs')::json)::float4 AS common_freq,
(json_array_elements(result::json)->>'reltuples')::bigint AS shard_reltuples
FROM most_common_vals_json),
common_val_occurrence AS (
SELECT m.attname, common_val, sum(common_freq * shard_reltuples)::bigint AS occurrence
FROM most_common_vals m
GROUP BY m.attname, common_val
ORDER BY m.attname, occurrence DESC, common_val)
SELECT c.attname,
ARRAY_agg(common_val) AS most_common_vals,
ARRAY_agg((occurrence/t.table_reltuples)::float4) AS most_common_freqs
FROM common_val_occurrence c, table_reltuples t
GROUP BY c.attname;
END IF;
END;
$func$ LANGUAGE plpgsql;
COMMENT ON FUNCTION pg_catalog.citus_column_stats(
qualified_table_name text)
IS 'provides some pg_stats for columns of input Citus table';

View File

@ -0,0 +1,62 @@
SET citus.shard_count = 2;
CREATE USER user1;
GRANT ALL ON SCHEMA public TO user1;
SET SESSION AUTHORIZATION user1;
CREATE TABLE current_check (currentid int, payload text, rlsuser text);
GRANT ALL ON current_check TO PUBLIC;
INSERT INTO current_check VALUES
(1, 'abc', 'user1'),
(3, 'cde', 'user1'),
(4, 'def', 'user1'),
(4, 'def', 'user1'),
(3, 'cde', 'user2');
ALTER TABLE current_check ENABLE ROW LEVEL SECURITY;
SET row_security TO ON;
ANALYZE current_check;
SELECT attname, most_common_vals, most_common_freqs FROM pg_stats
WHERE tablename = 'current_check'
ORDER BY 1;
attname | most_common_vals | most_common_freqs
---------------------------------------------------------------------
currentid | {3,4} | {0.4,0.4}
payload | {cde,def} | {0.4,0.4}
rlsuser | {user1} | {0.8}
(3 rows)
SELECT * FROM citus_column_stats('current_check');
ERROR: Not a Citus table
CONTEXT: PL/pgSQL function citus_column_stats(text) line XX at RAISE
CREATE TABLE dist_current_check (currentid int, payload text, rlsuser text);
SELECT create_distributed_table('dist_current_check', 'currentid');
create_distributed_table
---------------------------------------------------------------------
(1 row)
INSERT INTO dist_current_check VALUES
(1, 'abc', 'user1'),
(3, 'cde', 'user1'),
(4, 'def', 'user1'),
(4, 'def', 'user1'),
(3, 'cde', 'user2');
ANALYZE dist_current_check;
SELECT attname, most_common_vals, most_common_freqs FROM pg_stats
WHERE tablename = 'dist_current_check'
ORDER BY 1;
attname | most_common_vals | most_common_freqs
---------------------------------------------------------------------
(0 rows)
SELECT * FROM citus_column_stats('dist_current_check');
attname | most_common_vals | most_common_freqs
---------------------------------------------------------------------
currentid | {3,4} | {0.4,0.4}
payload | {cde,def} | {0.4,0.4}
rlsuser | {user1} | {0.8}
(3 rows)
DROP TABLE current_check;
DROP TABLE dist_current_check;
RESET SESSION AUTHORIZATION;
REVOKE ALL ON SCHEMA public FROM user1;
DROP USER user1;

View File

@ -1452,10 +1452,11 @@ SELECT * FROM multi_extension.print_extension_changes();
-- Snapshot of state at 13.1-1
ALTER EXTENSION citus UPDATE TO '13.1-1';
SELECT * FROM multi_extension.print_extension_changes();
previous_object | current_object
previous_object | current_object
---------------------------------------------------------------------
function citus_unmark_object_distributed(oid,oid,integer) void |
function shard_name(regclass,bigint) text |
| function citus_column_stats(text) TABLE(attname text, most_common_vals text[], most_common_freqs real[])
| function citus_internal.acquire_citus_advisory_object_class_lock(integer,cstring) void
| function citus_internal.add_colocation_metadata(integer,integer,integer,regtype,oid) void
| function citus_internal.add_object_metadata(text,text[],text[],integer,integer,boolean) void
@ -1487,7 +1488,7 @@ SELECT * FROM multi_extension.print_extension_changes();
| function shard_name(regclass,bigint,boolean) text
| view citus_nodes
| view citus_stat_counters
(33 rows)
(34 rows)
DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff;
-- show running version

View File

@ -54,6 +54,7 @@ ORDER BY 1;
function citus_check_connection_to_node(text,integer)
function citus_cleanup_orphaned_resources()
function citus_cleanup_orphaned_shards()
function citus_column_stats(text)
function citus_conninfo_cache_invalidate()
function citus_coordinator_nodeid()
function citus_copy_shard_placement(bigint,integer,integer,citus.shard_transfer_mode)
@ -393,6 +394,6 @@ ORDER BY 1;
view citus_stat_tenants_local
view pg_dist_shard_placement
view time_partitions
(362 rows)
(363 rows)
DROP TABLE extension_basic_types;

View File

@ -32,6 +32,7 @@ test: propagate_extension_commands
test: escape_extension_name
test: ref_citus_local_fkeys
test: alter_database_owner
test: citus_column_stats
test: seclabel
test: distributed_triggers
test: create_single_shard_table

View File

@ -0,0 +1,53 @@
SET citus.shard_count = 2;
CREATE USER user1;
GRANT ALL ON SCHEMA public TO user1;
SET SESSION AUTHORIZATION user1;
CREATE TABLE current_check (currentid int, payload text, rlsuser text);
GRANT ALL ON current_check TO PUBLIC;
INSERT INTO current_check VALUES
(1, 'abc', 'user1'),
(3, 'cde', 'user1'),
(4, 'def', 'user1'),
(4, 'def', 'user1'),
(3, 'cde', 'user2');
ALTER TABLE current_check ENABLE ROW LEVEL SECURITY;
SET row_security TO ON;
ANALYZE current_check;
SELECT attname, most_common_vals, most_common_freqs FROM pg_stats
WHERE tablename = 'current_check'
ORDER BY 1;
SELECT * FROM citus_column_stats('current_check');
CREATE TABLE dist_current_check (currentid int, payload text, rlsuser text);
SELECT create_distributed_table('dist_current_check', 'currentid');
INSERT INTO dist_current_check VALUES
(1, 'abc', 'user1'),
(3, 'cde', 'user1'),
(4, 'def', 'user1'),
(4, 'def', 'user1'),
(3, 'cde', 'user2');
ANALYZE dist_current_check;
SELECT attname, most_common_vals, most_common_freqs FROM pg_stats
WHERE tablename = 'dist_current_check'
ORDER BY 1;
SELECT * FROM citus_column_stats('dist_current_check');
DROP TABLE current_check;
DROP TABLE dist_current_check;
RESET SESSION AUTHORIZATION;
REVOKE ALL ON SCHEMA public FROM user1;
DROP USER user1;