Single shard misc udfs (#6956)

This PR tests:
- shards_colocated
- citus_shard_cost_by_disk_size
- citus_update_shard_statistics
- citus_update_table_statistics
pull/6974/head
Halil Ozan Akgül 2023-06-07 13:30:50 +03:00 committed by GitHub
parent 6369645db4
commit b569d53a0c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 190 additions and 10 deletions

View File

@ -36,6 +36,10 @@ CREATE OR REPLACE FUNCTION pg_catalog.is_citus_depended_object(oid,oid)
RETURNS bool
LANGUAGE C
AS 'citus', $$is_citus_depended_object$$;
CREATE FUNCTION shards_colocated(bigint, bigint)
RETURNS bool
AS 'citus'
LANGUAGE C STRICT;
-- test some other udf's with single shard tables
CREATE TABLE null_dist_key_table(a int);
SELECT create_distributed_table('null_dist_key_table', null, colocate_with=>'none', distribution_type=>null);
@ -528,25 +532,25 @@ SELECT create_distributed_table ('update_col_3', null, colocate_with:='none');
(1 row)
-- make sure nodes are correct
SELECT c1.nodeport = c2.nodeport AS same_node
-- make sure nodes are correct and test shards_colocated UDF
SELECT c1.nodeport = c2.nodeport AS same_node, shards_colocated(c1.shardid, c2.shardid)
FROM citus_shards c1, citus_shards c2, pg_dist_node p1, pg_dist_node p2
WHERE c1.table_name::text = 'update_col_1' AND c2.table_name::text = 'update_col_2' AND
p1.nodeport = c1.nodeport AND p2.nodeport = c2.nodeport AND
p1.noderole = 'primary' AND p2.noderole = 'primary';
same_node
same_node | shards_colocated
---------------------------------------------------------------------
t
t | t
(1 row)
SELECT c1.nodeport = c2.nodeport AS same_node
SELECT c1.nodeport = c2.nodeport AS same_node, shards_colocated(c1.shardid, c2.shardid)
FROM citus_shards c1, citus_shards c2, pg_dist_node p1, pg_dist_node p2
WHERE c1.table_name::text = 'update_col_1' AND c2.table_name::text = 'update_col_3' AND
p1.nodeport = c1.nodeport AND p2.nodeport = c2.nodeport AND
p1.noderole = 'primary' AND p2.noderole = 'primary';
same_node
same_node | shards_colocated
---------------------------------------------------------------------
f
f | f
(1 row)
-- and the update_col_1 and update_col_2 are colocated
@ -573,6 +577,17 @@ WHERE c1.table_name::text = 'update_col_1' AND c2.table_name::text = 'update_col
f
(1 row)
-- test shards_colocated UDF with shards in same node but different colocation groups
SELECT shards_colocated(c1.shardid, c2.shardid)
FROM citus_shards c1, citus_shards c2, pg_dist_node p1, pg_dist_node p2
WHERE c1.table_name::text = 'update_col_1' AND c2.table_name::text = 'update_col_2' AND
p1.nodeport = c1.nodeport AND p2.nodeport = c2.nodeport AND
p1.noderole = 'primary' AND p2.noderole = 'primary';
shards_colocated
---------------------------------------------------------------------
f
(1 row)
-- re-colocate, the shards were already in the same node
SELECT update_distributed_table_colocation('update_col_2', colocate_with:='update_col_1');
update_distributed_table_colocation
@ -1096,5 +1111,105 @@ SELECT create_distributed_table ('partcol_tbl', NULL, colocate_with:='none');
SELECT partition_column_id('partcol_tbl'::regclass);
ERROR: table needs to be hash distributed
-- test citus_shard_cost_by_disk_size
CREATE TABLE size_tbl_dist (a INT, b TEXT);
SELECT create_distributed_table('size_tbl_dist', 'a', shard_count:=4, colocate_with:='none');
create_distributed_table
---------------------------------------------------------------------
(1 row)
CREATE TABLE size_tbl_single (a INT, b TEXT);
SELECT create_distributed_table('size_tbl_single', NULL, colocate_with:='none');
create_distributed_table
---------------------------------------------------------------------
(1 row)
INSERT INTO size_tbl_dist SELECT 1, '1234567890' FROM generate_series(1, 10000);
INSERT INTO size_tbl_single SELECT 1, '1234567890' FROM generate_series(1, 10000);
SELECT citus_shard_cost_by_disk_size(c1.shardid) = citus_shard_cost_by_disk_size(c2.shardid) AS equal_cost
FROM citus_shards c1, citus_shards c2
WHERE c1.table_name::TEXT = 'size_tbl_dist' AND c2.table_name::TEXT = 'size_tbl_single'
ORDER BY c1.shard_size DESC
LIMIT 1;
equal_cost
---------------------------------------------------------------------
t
(1 row)
-- test update statistics UDFs
CREATE TABLE update_tbl_stat (a INT, b TEXT);
SELECT create_distributed_table('update_tbl_stat', NULL, colocate_with:='none');
create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT shardid AS update_tbl_stat_shard
FROM citus_shards
WHERE table_name::TEXT = 'update_tbl_stat'
LIMIT 1 \gset
SELECT shardlength > 0 FROM pg_dist_shard_placement WHERE shardid = :update_tbl_stat_shard LIMIT 1;
?column?
---------------------------------------------------------------------
f
(1 row)
INSERT INTO update_tbl_stat SELECT 1, '1234567890' FROM generate_series(1, 10000);
SELECT shardlength > 0 FROM pg_dist_shard_placement WHERE shardid = :update_tbl_stat_shard LIMIT 1;
?column?
---------------------------------------------------------------------
f
(1 row)
SELECT citus_update_table_statistics('update_tbl_stat');
citus_update_table_statistics
---------------------------------------------------------------------
(1 row)
SELECT shardlength > 0 FROM pg_dist_shard_placement WHERE shardid = :update_tbl_stat_shard LIMIT 1;
?column?
---------------------------------------------------------------------
t
(1 row)
CREATE TABLE update_shard_stat (a INT, b TEXT);
SELECT create_distributed_table('update_shard_stat', NULL, colocate_with:='none');
create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT shardid AS update_shard_stat_shard
FROM citus_shards
WHERE table_name::TEXT = 'update_shard_stat'
LIMIT 1 \gset
SELECT shardlength > 0 FROM pg_dist_shard_placement WHERE shardid = :update_shard_stat_shard LIMIT 1;
?column?
---------------------------------------------------------------------
f
(1 row)
INSERT INTO update_shard_stat SELECT 1, '1234567890' FROM generate_series(1, 10000);
SELECT shardlength > 0 FROM pg_dist_shard_placement WHERE shardid = :update_shard_stat_shard LIMIT 1;
?column?
---------------------------------------------------------------------
f
(1 row)
SELECT 1 FROM citus_update_shard_statistics(:update_shard_stat_shard);
?column?
---------------------------------------------------------------------
1
(1 row)
SELECT shardlength > 0 FROM pg_dist_shard_placement WHERE shardid = :update_shard_stat_shard LIMIT 1;
?column?
---------------------------------------------------------------------
t
(1 row)
SET client_min_messages TO WARNING;
DROP SCHEMA null_dist_key_udfs CASCADE;

View File

@ -39,6 +39,11 @@ RETURNS bool
LANGUAGE C
AS 'citus', $$is_citus_depended_object$$;
CREATE FUNCTION shards_colocated(bigint, bigint)
RETURNS bool
AS 'citus'
LANGUAGE C STRICT;
-- test some other udf's with single shard tables
CREATE TABLE null_dist_key_table(a int);
SELECT create_distributed_table('null_dist_key_table', null, colocate_with=>'none', distribution_type=>null);
@ -226,14 +231,14 @@ SELECT create_distributed_table ('update_col_2', null, colocate_with:='update_co
-- with the new colocation id the new table will be in the other worker node
SELECT create_distributed_table ('update_col_3', null, colocate_with:='none');
-- make sure nodes are correct
SELECT c1.nodeport = c2.nodeport AS same_node
-- make sure nodes are correct and test shards_colocated UDF
SELECT c1.nodeport = c2.nodeport AS same_node, shards_colocated(c1.shardid, c2.shardid)
FROM citus_shards c1, citus_shards c2, pg_dist_node p1, pg_dist_node p2
WHERE c1.table_name::text = 'update_col_1' AND c2.table_name::text = 'update_col_2' AND
p1.nodeport = c1.nodeport AND p2.nodeport = c2.nodeport AND
p1.noderole = 'primary' AND p2.noderole = 'primary';
SELECT c1.nodeport = c2.nodeport AS same_node
SELECT c1.nodeport = c2.nodeport AS same_node, shards_colocated(c1.shardid, c2.shardid)
FROM citus_shards c1, citus_shards c2, pg_dist_node p1, pg_dist_node p2
WHERE c1.table_name::text = 'update_col_1' AND c2.table_name::text = 'update_col_3' AND
p1.nodeport = c1.nodeport AND p2.nodeport = c2.nodeport AND
@ -251,6 +256,13 @@ SELECT c1.colocation_id = c2.colocation_id AS colocated
FROM public.citus_tables c1, public.citus_tables c2
WHERE c1.table_name::text = 'update_col_1' AND c2.table_name::text = 'update_col_2';
-- test shards_colocated UDF with shards in same node but different colocation groups
SELECT shards_colocated(c1.shardid, c2.shardid)
FROM citus_shards c1, citus_shards c2, pg_dist_node p1, pg_dist_node p2
WHERE c1.table_name::text = 'update_col_1' AND c2.table_name::text = 'update_col_2' AND
p1.nodeport = c1.nodeport AND p2.nodeport = c2.nodeport AND
p1.noderole = 'primary' AND p2.noderole = 'primary';
-- re-colocate, the shards were already in the same node
SELECT update_distributed_table_colocation('update_col_2', colocate_with:='update_col_1');
@ -523,5 +535,58 @@ CREATE TABLE partcol_tbl (a INT);
SELECT create_distributed_table ('partcol_tbl', NULL, colocate_with:='none');
SELECT partition_column_id('partcol_tbl'::regclass);
-- test citus_shard_cost_by_disk_size
CREATE TABLE size_tbl_dist (a INT, b TEXT);
SELECT create_distributed_table('size_tbl_dist', 'a', shard_count:=4, colocate_with:='none');
CREATE TABLE size_tbl_single (a INT, b TEXT);
SELECT create_distributed_table('size_tbl_single', NULL, colocate_with:='none');
INSERT INTO size_tbl_dist SELECT 1, '1234567890' FROM generate_series(1, 10000);
INSERT INTO size_tbl_single SELECT 1, '1234567890' FROM generate_series(1, 10000);
SELECT citus_shard_cost_by_disk_size(c1.shardid) = citus_shard_cost_by_disk_size(c2.shardid) AS equal_cost
FROM citus_shards c1, citus_shards c2
WHERE c1.table_name::TEXT = 'size_tbl_dist' AND c2.table_name::TEXT = 'size_tbl_single'
ORDER BY c1.shard_size DESC
LIMIT 1;
-- test update statistics UDFs
CREATE TABLE update_tbl_stat (a INT, b TEXT);
SELECT create_distributed_table('update_tbl_stat', NULL, colocate_with:='none');
SELECT shardid AS update_tbl_stat_shard
FROM citus_shards
WHERE table_name::TEXT = 'update_tbl_stat'
LIMIT 1 \gset
SELECT shardlength > 0 FROM pg_dist_shard_placement WHERE shardid = :update_tbl_stat_shard LIMIT 1;
INSERT INTO update_tbl_stat SELECT 1, '1234567890' FROM generate_series(1, 10000);
SELECT shardlength > 0 FROM pg_dist_shard_placement WHERE shardid = :update_tbl_stat_shard LIMIT 1;
SELECT citus_update_table_statistics('update_tbl_stat');
SELECT shardlength > 0 FROM pg_dist_shard_placement WHERE shardid = :update_tbl_stat_shard LIMIT 1;
CREATE TABLE update_shard_stat (a INT, b TEXT);
SELECT create_distributed_table('update_shard_stat', NULL, colocate_with:='none');
SELECT shardid AS update_shard_stat_shard
FROM citus_shards
WHERE table_name::TEXT = 'update_shard_stat'
LIMIT 1 \gset
SELECT shardlength > 0 FROM pg_dist_shard_placement WHERE shardid = :update_shard_stat_shard LIMIT 1;
INSERT INTO update_shard_stat SELECT 1, '1234567890' FROM generate_series(1, 10000);
SELECT shardlength > 0 FROM pg_dist_shard_placement WHERE shardid = :update_shard_stat_shard LIMIT 1;
SELECT 1 FROM citus_update_shard_statistics(:update_shard_stat_shard);
SELECT shardlength > 0 FROM pg_dist_shard_placement WHERE shardid = :update_shard_stat_shard LIMIT 1;
SET client_min_messages TO WARNING;
DROP SCHEMA null_dist_key_udfs CASCADE;