CREATE SCHEMA null_dist_key_udfs; SET search_path TO null_dist_key_udfs; SET citus.next_shard_id TO 1820000; SET citus.shard_count TO 32; SET citus.shard_replication_factor TO 1; ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 198000; SET client_min_messages TO ERROR; SELECT 1 FROM citus_add_node('localhost', :master_port, groupid=>0); ?column? --------------------------------------------------------------------- 1 (1 row) RESET client_min_messages; CREATE FUNCTION get_referencing_relation_id_list(Oid) RETURNS SETOF Oid LANGUAGE C STABLE STRICT AS 'citus', $$get_referencing_relation_id_list$$; CREATE FUNCTION get_referenced_relation_id_list(Oid) RETURNS SETOF Oid LANGUAGE C STABLE STRICT AS 'citus', $$get_referenced_relation_id_list$$; CREATE OR REPLACE FUNCTION get_foreign_key_connected_relations(IN table_name regclass) RETURNS SETOF RECORD LANGUAGE C STRICT AS 'citus', $$get_foreign_key_connected_relations$$; CREATE OR REPLACE FUNCTION citus_get_all_dependencies_for_object(classid oid, objid oid, objsubid int) RETURNS SETOF RECORD LANGUAGE C STRICT AS 'citus', $$citus_get_all_dependencies_for_object$$; CREATE OR REPLACE FUNCTION citus_get_dependencies_for_object(classid oid, objid oid, objsubid int) RETURNS SETOF RECORD LANGUAGE C STRICT AS 'citus', $$citus_get_dependencies_for_object$$; CREATE OR REPLACE FUNCTION pg_catalog.is_citus_depended_object(oid,oid) RETURNS bool LANGUAGE C AS 'citus', $$is_citus_depended_object$$; CREATE FUNCTION shards_colocated(bigint, bigint) RETURNS bool AS 'citus' LANGUAGE C STRICT; -- test some other udf's with single shard tables CREATE TABLE null_dist_key_table(a int); SELECT create_distributed_table('null_dist_key_table', null, colocate_with=>'none', distribution_type=>null); create_distributed_table --------------------------------------------------------------------- (1 row) SELECT truncate_local_data_after_distributing_table('null_dist_key_table'); truncate_local_data_after_distributing_table --------------------------------------------------------------------- (1 row) -- should work -- -- insert some data & create an index for table size udf's INSERT INTO null_dist_key_table VALUES (1), (2), (3); CREATE INDEX null_dist_key_idx ON null_dist_key_table(a); SELECT citus_table_size('null_dist_key_table'); citus_table_size --------------------------------------------------------------------- 8192 (1 row) SELECT citus_total_relation_size('null_dist_key_table'); citus_total_relation_size --------------------------------------------------------------------- 24576 (1 row) SELECT citus_relation_size('null_dist_key_table'); citus_relation_size --------------------------------------------------------------------- 8192 (1 row) SELECT shard_name, shard_size FROM pg_catalog.citus_shard_sizes(), citus_shards WHERE shardid = shard_id AND shard_name LIKE '%null_dist_key_table%' AND nodeport IN (:worker_1_port, :worker_2_port); shard_name | shard_size --------------------------------------------------------------------- null_dist_key_udfs.null_dist_key_table_1820000 | 24576 (1 row) BEGIN; SELECT lock_relation_if_exists('null_dist_key_table', 'ACCESS SHARE'); lock_relation_if_exists --------------------------------------------------------------------- t (1 row) SELECT count(*) FROM pg_locks where relation='null_dist_key_table'::regclass; count --------------------------------------------------------------------- 1 (1 row) COMMIT; SELECT partmethod, repmodel FROM pg_dist_partition WHERE logicalrelid = 'null_dist_key_table'::regclass; partmethod | repmodel --------------------------------------------------------------------- n | s (1 row) SELECT master_get_table_ddl_events('null_dist_key_table'); master_get_table_ddl_events --------------------------------------------------------------------- CREATE TABLE null_dist_key_udfs.null_dist_key_table (a integer) USING heap ALTER TABLE null_dist_key_udfs.null_dist_key_table OWNER TO postgres CREATE INDEX null_dist_key_idx ON null_dist_key_udfs.null_dist_key_table USING btree (a) (3 rows) SELECT column_to_column_name(logicalrelid, partkey) FROM pg_dist_partition WHERE logicalrelid = 'null_dist_key_table'::regclass; column_to_column_name --------------------------------------------------------------------- (1 row) SELECT column_name_to_column('null_dist_key_table', 'a'); column_name_to_column --------------------------------------------------------------------- {VAR :varno 1 :varattno 1 :vartype 23 :vartypmod -1 :varcollid 0 :varnullingrels (b) :varlevelsup 0 :varnosyn 1 :varattnosyn 1 :location -1} (1 row) SELECT master_update_shard_statistics(shardid) FROM (SELECT shardid FROM pg_dist_shard WHERE logicalrelid='null_dist_key_table'::regclass) as shardid; master_update_shard_statistics --------------------------------------------------------------------- 8192 (1 row) SELECT truncate_local_data_after_distributing_table('null_dist_key_table'); truncate_local_data_after_distributing_table --------------------------------------------------------------------- (1 row) -- should return a single element array that only includes its own shard id SELECT shardid=unnest(get_colocated_shard_array(shardid)) FROM (SELECT shardid FROM pg_dist_shard WHERE logicalrelid='null_dist_key_table'::regclass) as shardid; ?column? --------------------------------------------------------------------- t (1 row) BEGIN; SELECT master_remove_partition_metadata('null_dist_key_table'::regclass::oid, 'null_dist_key_udfs', 'null_dist_key_table'); master_remove_partition_metadata --------------------------------------------------------------------- (1 row) -- should print 0 select count(*) from pg_dist_partition where logicalrelid='null_dist_key_table'::regclass; count --------------------------------------------------------------------- 0 (1 row) ROLLBACK; SELECT master_create_empty_shard('null_dist_key_table'); ERROR: relation "null_dist_key_table" is a single shard table DETAIL: We currently don't support creating shards on single shard tables -- return true SELECT citus_table_is_visible('null_dist_key_table'::regclass::oid); citus_table_is_visible --------------------------------------------------------------------- t (1 row) -- return false SELECT relation_is_a_known_shard('null_dist_key_table'); relation_is_a_known_shard --------------------------------------------------------------------- f (1 row) -- return | false | true | SELECT citus_table_is_visible(tableName::regclass::oid), relation_is_a_known_shard(tableName::regclass) FROM (SELECT tableName FROM pg_catalog.pg_tables WHERE tablename LIKE 'null_dist_key_table%') as tableName; citus_table_is_visible | relation_is_a_known_shard --------------------------------------------------------------------- t | f (1 row) -- should fail, maybe support in the future SELECT create_reference_table('null_dist_key_table'); ERROR: table "null_dist_key_table" is already distributed SELECT create_distributed_table('null_dist_key_table', 'a'); ERROR: table "null_dist_key_table" is already distributed SELECT create_distributed_table_concurrently('null_dist_key_table', 'a'); ERROR: table "null_dist_key_table" is already distributed SELECT citus_add_local_table_to_metadata('null_dist_key_table'); ERROR: table "null_dist_key_table" is already distributed -- test altering distribution column, fails for single shard tables SELECT alter_distributed_table('null_dist_key_table', distribution_column := 'a'); ERROR: relation null_dist_key_table should be a hash distributed table -- test altering shard count, fails for single shard tables SELECT alter_distributed_table('null_dist_key_table', shard_count := 6); ERROR: relation null_dist_key_table should be a hash distributed table -- test shard splitting udf, fails for single shard tables SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \gset SELECT citus_split_shard_by_split_points( 1820000, ARRAY['-1073741826'], ARRAY[:worker_1_node, :worker_2_node], 'block_writes'); ERROR: Cannot split shard as operation is only supported for hash distributed tables. SELECT colocationid FROM pg_dist_partition WHERE logicalrelid::text LIKE '%null_dist_key_table%'; colocationid --------------------------------------------------------------------- 198000 (1 row) -- test alter_table_set_access_method and verify it doesn't change the colocation id SELECT alter_table_set_access_method('null_dist_key_table', 'columnar'); NOTICE: creating a new table for null_dist_key_udfs.null_dist_key_table NOTICE: moving the data of null_dist_key_udfs.null_dist_key_table NOTICE: dropping the old null_dist_key_udfs.null_dist_key_table NOTICE: renaming the new table to null_dist_key_udfs.null_dist_key_table alter_table_set_access_method --------------------------------------------------------------------- (1 row) SELECT colocationid FROM pg_dist_partition WHERE logicalrelid::text LIKE '%null_dist_key_table%'; colocationid --------------------------------------------------------------------- 198000 (1 row) -- undistribute SELECT undistribute_table('null_dist_key_table'); NOTICE: creating a new table for null_dist_key_udfs.null_dist_key_table NOTICE: moving the data of null_dist_key_udfs.null_dist_key_table NOTICE: dropping the old null_dist_key_udfs.null_dist_key_table NOTICE: renaming the new table to null_dist_key_udfs.null_dist_key_table undistribute_table --------------------------------------------------------------------- (1 row) -- verify that the metadata is gone SELECT COUNT(*) = 0 FROM pg_dist_partition WHERE logicalrelid::text LIKE '%null_dist_key_table%'; ?column? --------------------------------------------------------------------- t (1 row) SELECT COUNT(*) = 0 FROM pg_dist_placement WHERE shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid::text LIKE '%null_dist_key_table%'); ?column? --------------------------------------------------------------------- t (1 row) SELECT COUNT(*) = 0 FROM pg_dist_shard WHERE logicalrelid::text LIKE '%null_dist_key_table%'; ?column? --------------------------------------------------------------------- t (1 row) -- create 7 single shard tables, 3 of them are colocated, for testing shard moves / rebalance on them CREATE TABLE single_shard_table_col1_1 (a INT PRIMARY KEY); CREATE TABLE single_shard_table_col1_2 (a TEXT PRIMARY KEY); CREATE TABLE single_shard_table_col1_3 (a TIMESTAMP PRIMARY KEY); CREATE TABLE single_shard_table_col2_1 (a INT PRIMARY KEY); CREATE TABLE single_shard_table_col3_1 (a INT PRIMARY KEY); CREATE TABLE single_shard_table_col4_1 (a INT PRIMARY KEY); CREATE TABLE single_shard_table_col5_1 (a INT PRIMARY KEY); SELECT create_distributed_table('single_shard_table_col1_1', null, colocate_with=>'none'); create_distributed_table --------------------------------------------------------------------- (1 row) SELECT create_distributed_table('single_shard_table_col1_2', null, colocate_with=>'single_shard_table_col1_1'); create_distributed_table --------------------------------------------------------------------- (1 row) SELECT create_distributed_table('single_shard_table_col1_3', null, colocate_with=>'single_shard_table_col1_2'); create_distributed_table --------------------------------------------------------------------- (1 row) SELECT create_distributed_table('single_shard_table_col2_1', null, colocate_with=>'none'); create_distributed_table --------------------------------------------------------------------- (1 row) SELECT create_distributed_table('single_shard_table_col3_1', null, colocate_with=>'none'); create_distributed_table --------------------------------------------------------------------- (1 row) SELECT create_distributed_table('single_shard_table_col4_1', null, colocate_with=>'none'); create_distributed_table --------------------------------------------------------------------- (1 row) SELECT create_distributed_table('single_shard_table_col5_1', null, colocate_with=>'none'); create_distributed_table --------------------------------------------------------------------- (1 row) -- initial status SELECT shardid, nodeport FROM pg_dist_shard_placement WHERE shardid > 1820000 ORDER BY shardid; shardid | nodeport --------------------------------------------------------------------- 1820002 | 57638 1820003 | 57638 1820004 | 57638 1820005 | 57637 1820006 | 57638 1820007 | 57637 1820008 | 57638 (7 rows) -- errors out because streaming replicated SELECT citus_copy_shard_placement(1820005, 'localhost', :worker_1_port, 'localhost', :worker_2_port); ERROR: Table 'single_shard_table_col2_1' is streaming replicated. Shards of streaming replicated tables cannot be copied SELECT master_copy_shard_placement(1820005, 'localhost', :worker_1_port, 'localhost', :worker_2_port); WARNING: do_repair argument is deprecated ERROR: Table 'single_shard_table_col2_1' is streaming replicated. Shards of streaming replicated tables cannot be copied SELECT citus_copy_shard_placement(1820005, :worker_1_node, :worker_2_node); ERROR: Table 'single_shard_table_col2_1' is streaming replicated. Shards of streaming replicated tables cannot be copied -- no changes because it's already balanced SELECT rebalance_table_shards(rebalance_strategy := 'by_shard_count'); rebalance_table_shards --------------------------------------------------------------------- (1 row) -- same placements SELECT shardid, nodeport FROM pg_dist_shard_placement WHERE shardid > 1820000 ORDER BY shardid; shardid | nodeport --------------------------------------------------------------------- 1820002 | 57638 1820003 | 57638 1820004 | 57638 1820005 | 57637 1820006 | 57638 1820007 | 57637 1820008 | 57638 (7 rows) -- manually move 2 shard from 2 colocation groups to make the cluster unbalanced SELECT citus_move_shard_placement(1820005, 'localhost', :worker_1_port, 'localhost', :worker_2_port); citus_move_shard_placement --------------------------------------------------------------------- (1 row) SELECT citus_move_shard_placement(1820007, :worker_1_node, :worker_2_node); citus_move_shard_placement --------------------------------------------------------------------- (1 row) -- all placements are located on worker 2 SELECT shardid, nodeport FROM pg_dist_shard_placement WHERE shardid > 1820000 ORDER BY shardid; shardid | nodeport --------------------------------------------------------------------- 1820002 | 57638 1820003 | 57638 1820004 | 57638 1820005 | 57638 1820006 | 57638 1820007 | 57638 1820008 | 57638 (7 rows) -- move some of them to worker 1 to balance the cluster SELECT rebalance_table_shards(); NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ... NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ... rebalance_table_shards --------------------------------------------------------------------- (1 row) -- the final status, balanced SELECT shardid, nodeport FROM pg_dist_shard_placement WHERE shardid > 1820000 ORDER BY shardid; shardid | nodeport --------------------------------------------------------------------- 1820002 | 57637 1820003 | 57637 1820004 | 57637 1820005 | 57637 1820006 | 57638 1820007 | 57638 1820008 | 57638 (7 rows) -- verify we didn't break any colocations SELECT logicalrelid, colocationid FROM pg_dist_partition WHERE logicalrelid::text LIKE '%single_shard_table_col%' ORDER BY colocationid, logicalrelid; logicalrelid | colocationid --------------------------------------------------------------------- single_shard_table_col1_1 | 198001 single_shard_table_col1_2 | 198001 single_shard_table_col1_3 | 198001 single_shard_table_col2_1 | 198002 single_shard_table_col3_1 | 198003 single_shard_table_col4_1 | 198004 single_shard_table_col5_1 | 198005 (7 rows) -- drop preexisting tables -- we can remove the drop commands once the issue is fixed: https://github.com/citusdata/citus/issues/6948 SET client_min_messages TO ERROR; DROP TABLE IF EXISTS public.lineitem, public.orders, public.customer_append, public.part_append, public.supplier_single_shard, public.events, public.users, public.lineitem_hash_part, public.lineitem_subquery, public.orders_hash_part, public.orders_subquery, public.unlogged_table CASCADE; DROP SCHEMA IF EXISTS with_basics, subquery_and_ctes CASCADE; DROP TABLE IF EXISTS public.users_table, public.events_table, public.agg_results, public.agg_results_second, public.agg_results_third, public.agg_results_fourth, public.agg_results_window CASCADE; -- drain node SELECT citus_drain_node('localhost', :worker_2_port, 'block_writes'); citus_drain_node --------------------------------------------------------------------- (1 row) SELECT citus_set_node_property('localhost', :worker_2_port, 'shouldhaveshards', true); citus_set_node_property --------------------------------------------------------------------- (1 row) RESET client_min_messages; -- see the plan for moving 4 shards, 3 of them are in the same colocation group SELECT * FROM get_rebalance_table_shards_plan(); table_name | shardid | shard_size | sourcename | sourceport | targetname | targetport --------------------------------------------------------------------- single_shard_table_col1_1 | 1820002 | 0 | localhost | 57637 | localhost | 57638 single_shard_table_col1_2 | 1820003 | 0 | localhost | 57637 | localhost | 57638 single_shard_table_col1_3 | 1820004 | 0 | localhost | 57637 | localhost | 57638 single_shard_table_col2_1 | 1820005 | 0 | localhost | 57637 | localhost | 57638 (4 rows) -- move some of them to worker 2 to balance the cluster SELECT 1 FROM citus_rebalance_start(); NOTICE: Scheduled 2 moves as job xxx DETAIL: Rebalance scheduled as background job HINT: To monitor progress, run: SELECT * FROM citus_rebalance_status(); ?column? --------------------------------------------------------------------- 1 (1 row) -- stop it SELECT * FROM citus_rebalance_stop(); citus_rebalance_stop --------------------------------------------------------------------- (1 row) -- show rebalance status, see the cancelled job for two moves SELECT state, details FROM citus_rebalance_status(); state | details --------------------------------------------------------------------- cancelled | {"tasks": [], "task_state_counts": {"cancelled": 2}} (1 row) -- start again SELECT 1 FROM citus_rebalance_start(); NOTICE: Scheduled 2 moves as job xxx DETAIL: Rebalance scheduled as background job HINT: To monitor progress, run: SELECT * FROM citus_rebalance_status(); ?column? --------------------------------------------------------------------- 1 (1 row) -- show rebalance status, scheduled a job for two moves SELECT state, details FROM citus_rebalance_status(); state | details --------------------------------------------------------------------- scheduled | {"tasks": [], "task_state_counts": {"runnable": 2}} (1 row) -- wait for rebalance to be completed SELECT * FROM citus_rebalance_wait(); citus_rebalance_wait --------------------------------------------------------------------- (1 row) -- the final status, balanced SELECT shardid, nodeport FROM pg_dist_shard_placement WHERE shardid > 1820000 ORDER BY shardid; shardid | nodeport --------------------------------------------------------------------- 1820002 | 57638 1820003 | 57638 1820004 | 57638 1820005 | 57638 1820006 | 57637 1820007 | 57637 1820008 | 57637 (7 rows) -- test update_distributed_table_colocation CREATE TABLE update_col_1 (a INT); CREATE TABLE update_col_2 (a INT); CREATE TABLE update_col_3 (a INT); -- create colocated single shard distributed tables, so the shards will be -- in the same worker node SELECT create_distributed_table ('update_col_1', null, colocate_with:='none'); create_distributed_table --------------------------------------------------------------------- (1 row) SELECT create_distributed_table ('update_col_2', null, colocate_with:='update_col_1'); create_distributed_table --------------------------------------------------------------------- (1 row) -- now create a third single shard distributed table that is not colocated, -- with the new colocation id the new table will be in the other worker node SELECT create_distributed_table ('update_col_3', null, colocate_with:='none'); create_distributed_table --------------------------------------------------------------------- (1 row) -- make sure nodes are correct and test shards_colocated UDF SELECT c1.nodeport = c2.nodeport AS same_node, shards_colocated(c1.shardid, c2.shardid) FROM citus_shards c1, citus_shards c2, pg_dist_node p1, pg_dist_node p2 WHERE c1.table_name::text = 'update_col_1' AND c2.table_name::text = 'update_col_2' AND p1.nodeport = c1.nodeport AND p2.nodeport = c2.nodeport AND p1.noderole = 'primary' AND p2.noderole = 'primary'; same_node | shards_colocated --------------------------------------------------------------------- t | t (1 row) SELECT c1.nodeport = c2.nodeport AS same_node, shards_colocated(c1.shardid, c2.shardid) FROM citus_shards c1, citus_shards c2, pg_dist_node p1, pg_dist_node p2 WHERE c1.table_name::text = 'update_col_1' AND c2.table_name::text = 'update_col_3' AND p1.nodeport = c1.nodeport AND p2.nodeport = c2.nodeport AND p1.noderole = 'primary' AND p2.noderole = 'primary'; same_node | shards_colocated --------------------------------------------------------------------- f | f (1 row) -- and the update_col_1 and update_col_2 are colocated SELECT c1.colocation_id = c2.colocation_id AS colocated FROM public.citus_tables c1, public.citus_tables c2 WHERE c1.table_name::text = 'update_col_1' AND c2.table_name::text = 'update_col_2'; colocated --------------------------------------------------------------------- t (1 row) -- break the colocation SELECT update_distributed_table_colocation('update_col_2', colocate_with:='none'); update_distributed_table_colocation --------------------------------------------------------------------- (1 row) SELECT c1.colocation_id = c2.colocation_id AS colocated FROM public.citus_tables c1, public.citus_tables c2 WHERE c1.table_name::text = 'update_col_1' AND c2.table_name::text = 'update_col_2'; colocated --------------------------------------------------------------------- f (1 row) -- test shards_colocated UDF with shards in same node but different colocation groups SELECT shards_colocated(c1.shardid, c2.shardid) FROM citus_shards c1, citus_shards c2, pg_dist_node p1, pg_dist_node p2 WHERE c1.table_name::text = 'update_col_1' AND c2.table_name::text = 'update_col_2' AND p1.nodeport = c1.nodeport AND p2.nodeport = c2.nodeport AND p1.noderole = 'primary' AND p2.noderole = 'primary'; shards_colocated --------------------------------------------------------------------- f (1 row) -- re-colocate, the shards were already in the same node SELECT update_distributed_table_colocation('update_col_2', colocate_with:='update_col_1'); update_distributed_table_colocation --------------------------------------------------------------------- (1 row) SELECT c1.colocation_id = c2.colocation_id AS colocated FROM public.citus_tables c1, public.citus_tables c2 WHERE c1.table_name::text = 'update_col_1' AND c2.table_name::text = 'update_col_2'; colocated --------------------------------------------------------------------- t (1 row) -- update_col_1 and update_col_3 are not colocated, because they are not in the some node SELECT c1.colocation_id = c2.colocation_id AS colocated FROM public.citus_tables c1, public.citus_tables c2 WHERE c1.table_name::text = 'update_col_1' AND c2.table_name::text = 'update_col_3'; colocated --------------------------------------------------------------------- f (1 row) -- they should not be able to be colocated since the shards are in different nodes SELECT update_distributed_table_colocation('update_col_3', colocate_with:='update_col_1'); ERROR: cannot colocate tables update_col_1 and update_col_3 DETAIL: Shard xxxxx of update_col_1 and shard xxxxx of update_col_3 are not colocated. SELECT c1.colocation_id = c2.colocation_id AS colocated FROM public.citus_tables c1, public.citus_tables c2 WHERE c1.table_name::text = 'update_col_1' AND c2.table_name::text = 'update_col_3'; colocated --------------------------------------------------------------------- f (1 row) -- hash distributed and single shard distributed tables cannot be colocated CREATE TABLE update_col_4 (a INT); SELECT create_distributed_table ('update_col_4', 'a', colocate_with:='none'); create_distributed_table --------------------------------------------------------------------- (1 row) SELECT update_distributed_table_colocation('update_col_1', colocate_with:='update_col_4'); ERROR: cannot colocate tables update_col_4 and update_col_1 DETAIL: Distribution column types don't match for update_col_4 and update_col_1. SELECT update_distributed_table_colocation('update_col_4', colocate_with:='update_col_1'); ERROR: cannot colocate tables update_col_1 and update_col_4 DETAIL: Distribution column types don't match for update_col_1 and update_col_4. -- test columnar UDFs CREATE TABLE columnar_tbl (a INT) USING COLUMNAR; SELECT create_distributed_table('columnar_tbl', NULL, colocate_with:='none'); create_distributed_table --------------------------------------------------------------------- (1 row) SELECT * FROM columnar.options WHERE relation = 'columnar_tbl'::regclass; relation | chunk_group_row_limit | stripe_row_limit | compression | compression_level --------------------------------------------------------------------- columnar_tbl | 10000 | 150000 | zstd | 3 (1 row) SELECT alter_columnar_table_set('columnar_tbl', compression_level => 2); alter_columnar_table_set --------------------------------------------------------------------- (1 row) SELECT * FROM columnar.options WHERE relation = 'columnar_tbl'::regclass; relation | chunk_group_row_limit | stripe_row_limit | compression | compression_level --------------------------------------------------------------------- columnar_tbl | 10000 | 150000 | zstd | 2 (1 row) SELECT alter_columnar_table_reset('columnar_tbl', compression_level => true); alter_columnar_table_reset --------------------------------------------------------------------- (1 row) SELECT * FROM columnar.options WHERE relation = 'columnar_tbl'::regclass; relation | chunk_group_row_limit | stripe_row_limit | compression | compression_level --------------------------------------------------------------------- columnar_tbl | 10000 | 150000 | zstd | 3 (1 row) SELECT columnar_internal.upgrade_columnar_storage(c.oid) FROM pg_class c, pg_am a WHERE c.relam = a.oid AND amname = 'columnar' AND relname = 'columnar_tbl'; upgrade_columnar_storage --------------------------------------------------------------------- (1 row) SELECT columnar_internal.downgrade_columnar_storage(c.oid) FROM pg_class c, pg_am a WHERE c.relam = a.oid AND amname = 'columnar' AND relname = 'columnar_tbl'; downgrade_columnar_storage --------------------------------------------------------------------- (1 row) CREATE OR REPLACE FUNCTION columnar_storage_info( rel regclass, version_major OUT int4, version_minor OUT int4, storage_id OUT int8, reserved_stripe_id OUT int8, reserved_row_number OUT int8, reserved_offset OUT int8) STRICT LANGUAGE c AS 'citus', $$columnar_storage_info$$; SELECT version_major, version_minor, reserved_stripe_id, reserved_row_number, reserved_offset FROM columnar_storage_info('columnar_tbl'); version_major | version_minor | reserved_stripe_id | reserved_row_number | reserved_offset --------------------------------------------------------------------- 2 | 0 | 1 | 1 | 16336 (1 row) SELECT columnar.get_storage_id(oid) = storage_id FROM pg_class, columnar_storage_info('columnar_tbl') WHERE relname = 'columnar_tbl'; ?column? --------------------------------------------------------------------- t (1 row) -- test time series functions CREATE TABLE part_tbl (a DATE) PARTITION BY RANGE (a); CREATE TABLE part_tbl_1 PARTITION OF part_tbl FOR VALUES FROM ('2000-01-01') TO ('2010-01-01'); CREATE TABLE part_tbl_2 PARTITION OF part_tbl FOR VALUES FROM ('2020-01-01') TO ('2030-01-01'); SELECT create_distributed_table('part_tbl', NULL, colocate_with:='none'); create_distributed_table --------------------------------------------------------------------- (1 row) SELECT * FROM time_partitions WHERE parent_table::text = 'part_tbl'; parent_table | partition_column | partition | from_value | to_value | access_method --------------------------------------------------------------------- part_tbl | a | part_tbl_1 | 01-01-2000 | 01-01-2010 | heap part_tbl | a | part_tbl_2 | 01-01-2020 | 01-01-2030 | heap (2 rows) SELECT time_partition_range('part_tbl_2'); time_partition_range --------------------------------------------------------------------- (01-01-2020,01-01-2030) (1 row) SELECT get_missing_time_partition_ranges('part_tbl', INTERVAL '10 years', '2050-01-01', '2000-01-01'); get_missing_time_partition_ranges --------------------------------------------------------------------- (part_tbl_p2010,01-01-2010,01-01-2020) (part_tbl_p2030,01-01-2030,01-01-2040) (part_tbl_p2040,01-01-2040,01-01-2050) (3 rows) SELECT create_time_partitions('part_tbl', INTERVAL '10 years', '2050-01-01', '2000-01-01'); create_time_partitions --------------------------------------------------------------------- t (1 row) CALL drop_old_time_partitions('part_tbl', '2030-01-01'); NOTICE: dropping part_tbl_1 with start time 01-01-2000 and end time 01-01-2010 CONTEXT: PL/pgSQL function drop_old_time_partitions(regclass,timestamp with time zone) line XX at RAISE NOTICE: dropping part_tbl_p2010 with start time 01-01-2010 and end time 01-01-2020 CONTEXT: PL/pgSQL function drop_old_time_partitions(regclass,timestamp with time zone) line XX at RAISE NOTICE: dropping part_tbl_2 with start time 01-01-2020 and end time 01-01-2030 CONTEXT: PL/pgSQL function drop_old_time_partitions(regclass,timestamp with time zone) line XX at RAISE SELECT * FROM time_partitions WHERE parent_table::text = 'part_tbl'; parent_table | partition_column | partition | from_value | to_value | access_method --------------------------------------------------------------------- part_tbl | a | part_tbl_p2030 | 01-01-2030 | 01-01-2040 | heap part_tbl | a | part_tbl_p2040 | 01-01-2040 | 01-01-2050 | heap (2 rows) -- test locking shards CREATE TABLE lock_tbl_1 (a INT); SELECT create_distributed_table('lock_tbl_1', NULL, colocate_with:='none'); create_distributed_table --------------------------------------------------------------------- (1 row) CREATE TABLE lock_tbl_2 (a INT); SELECT create_distributed_table('lock_tbl_2', NULL, colocate_with:='none'); create_distributed_table --------------------------------------------------------------------- (1 row) BEGIN; SELECT lock_shard_metadata(3, array_agg(distinct(shardid))) FROM citus_shards WHERE table_name::text = 'lock_tbl_1'; lock_shard_metadata --------------------------------------------------------------------- (1 row) SELECT lock_shard_metadata(5, array_agg(distinct(shardid))) FROM citus_shards WHERE table_name::text LIKE 'lock\_tbl\__'; lock_shard_metadata --------------------------------------------------------------------- (1 row) SELECT table_name, classid, mode, granted FROM pg_locks, public.citus_tables WHERE locktype = 'advisory' AND table_name::text LIKE 'lock\_tbl\__' AND objid = colocation_id ORDER BY 1, 3; table_name | classid | mode | granted --------------------------------------------------------------------- lock_tbl_1 | 0 | RowExclusiveLock | t lock_tbl_1 | 0 | ShareLock | t lock_tbl_2 | 0 | ShareLock | t (3 rows) END; BEGIN; SELECT lock_shard_resources(3, array_agg(distinct(shardid))) FROM citus_shards WHERE table_name::text = 'lock_tbl_1'; lock_shard_resources --------------------------------------------------------------------- (1 row) SELECT lock_shard_resources(5, array_agg(distinct(shardid))) FROM citus_shards WHERE table_name::text LIKE 'lock\_tbl\__'; lock_shard_resources --------------------------------------------------------------------- (1 row) SELECT locktype, table_name, mode, granted FROM pg_locks, citus_shards, pg_dist_node WHERE objid = shardid AND table_name::text LIKE 'lock\_tbl\__' AND citus_shards.nodeport = pg_dist_node.nodeport AND noderole = 'primary' ORDER BY 2, 3; locktype | table_name | mode | granted --------------------------------------------------------------------- advisory | lock_tbl_1 | RowExclusiveLock | t advisory | lock_tbl_1 | ShareLock | t advisory | lock_tbl_2 | ShareLock | t (3 rows) END; -- test foreign key UDFs CREATE TABLE fkey_s1 (a INT UNIQUE); CREATE TABLE fkey_r (a INT UNIQUE); CREATE TABLE fkey_s2 (x INT, y INT); CREATE TABLE fkey_s3 (x INT, y INT); SELECT create_distributed_table('fkey_s1', NULL, colocate_with:='none'); create_distributed_table --------------------------------------------------------------------- (1 row) SELECT create_reference_table('fkey_r'); create_reference_table --------------------------------------------------------------------- (1 row) SELECT create_distributed_table('fkey_s2', NULL, colocate_with:='fkey_s1'); create_distributed_table --------------------------------------------------------------------- (1 row) SELECT create_distributed_table('fkey_s3', NULL, colocate_with:='fkey_s1'); create_distributed_table --------------------------------------------------------------------- (1 row) ALTER TABLE fkey_s2 ADD CONSTRAINT f1 FOREIGN KEY (x) REFERENCES fkey_s1 (a); ALTER TABLE fkey_s2 ADD CONSTRAINT f2 FOREIGN KEY (y) REFERENCES fkey_r (a); ALTER TABLE fkey_s3 ADD CONSTRAINT f3 FOREIGN KEY (x) REFERENCES fkey_s1 (a); ALTER TABLE fkey_s3 ADD CONSTRAINT f4 FOREIGN KEY (y) REFERENCES fkey_r (a); SELECT get_referencing_relation_id_list::regclass::text FROM get_referencing_relation_id_list('fkey_s1'::regclass) ORDER BY 1; get_referencing_relation_id_list --------------------------------------------------------------------- fkey_s2 fkey_s3 (2 rows) SELECT get_referenced_relation_id_list::regclass::text FROM get_referenced_relation_id_list('fkey_s2'::regclass) ORDER BY 1; get_referenced_relation_id_list --------------------------------------------------------------------- fkey_r fkey_s1 (2 rows) SELECT oid::regclass::text FROM get_foreign_key_connected_relations('fkey_s1'::regclass) AS f(oid oid) ORDER BY 1; oid --------------------------------------------------------------------- fkey_r fkey_s1 fkey_s2 fkey_s3 (4 rows) --test dependency functions CREATE TYPE dep_type AS (a INT); CREATE TABLE dep_tbl(a INT, b dep_type); SELECT create_distributed_table('dep_tbl', NULL, colocate_with:='none'); create_distributed_table --------------------------------------------------------------------- (1 row) CREATE VIEW dep_view AS SELECT * FROM dep_tbl; -- find all the dependencies of table dep_tbl SELECT pg_identify_object(t.classid, t.objid, t.objsubid) FROM (SELECT * FROM pg_get_object_address('table', '{dep_tbl}', '{}')) as addr JOIN LATERAL citus_get_all_dependencies_for_object(addr.classid, addr.objid, addr.objsubid) as t(classid oid, objid oid, objsubid int) ON TRUE ORDER BY 1; pg_identify_object --------------------------------------------------------------------- ("composite type",null_dist_key_udfs,dep_type,null_dist_key_udfs.dep_type) (schema,,null_dist_key_udfs,null_dist_key_udfs) (type,null_dist_key_udfs,dep_type,null_dist_key_udfs.dep_type) (3 rows) -- find all the dependencies of view dep_view SELECT pg_identify_object(t.classid, t.objid, t.objsubid) FROM (SELECT * FROM pg_get_object_address('view', '{dep_view}', '{}')) as addr JOIN LATERAL citus_get_all_dependencies_for_object(addr.classid, addr.objid, addr.objsubid) as t(classid oid, objid oid, objsubid int) ON TRUE ORDER BY 1; pg_identify_object --------------------------------------------------------------------- ("composite type",null_dist_key_udfs,dep_type,null_dist_key_udfs.dep_type) (schema,,null_dist_key_udfs,null_dist_key_udfs) (table,null_dist_key_udfs,dep_tbl,null_dist_key_udfs.dep_tbl) (type,null_dist_key_udfs,dep_type,null_dist_key_udfs.dep_type) (4 rows) -- find non-distributed dependencies of table dep_tbl SELECT pg_identify_object(t.classid, t.objid, t.objsubid) FROM (SELECT * FROM pg_get_object_address('table', '{dep_tbl}', '{}')) as addr JOIN LATERAL citus_get_dependencies_for_object(addr.classid, addr.objid, addr.objsubid) as t(classid oid, objid oid, objsubid int) ON TRUE ORDER BY 1; pg_identify_object --------------------------------------------------------------------- (0 rows) SET citus.hide_citus_dependent_objects TO true; CREATE TABLE citus_dep_tbl (a noderole); SELECT create_distributed_table('citus_dep_tbl', NULL, colocate_with:='none'); create_distributed_table --------------------------------------------------------------------- (1 row) SELECT is_citus_depended_object('pg_class'::regclass, 'citus_dep_tbl'::regclass); is_citus_depended_object --------------------------------------------------------------------- t (1 row) RESET citus.hide_citus_dependent_objects; -- test replicate_reference_tables SET client_min_messages TO WARNING; DROP SCHEMA null_dist_key_udfs CASCADE; RESET client_min_messages; CREATE SCHEMA null_dist_key_udfs; SET search_path TO null_dist_key_udfs; SELECT citus_remove_node('localhost', :worker_2_port); citus_remove_node --------------------------------------------------------------------- (1 row) CREATE TABLE rep_ref (a INT UNIQUE); SELECT create_reference_table('rep_ref'); create_reference_table --------------------------------------------------------------------- (1 row) CREATE TABLE rep_sing (a INT); SELECT create_distributed_table('rep_sing', NULL, colocate_with:='none'); create_distributed_table --------------------------------------------------------------------- (1 row) ALTER TABLE rep_sing ADD CONSTRAINT rep_fkey FOREIGN KEY (a) REFERENCES rep_ref(a); SELECT 1 FROM citus_add_node('localhost', :worker_2_port); ?column? --------------------------------------------------------------------- 1 (1 row) SELECT count(*) FROM citus_shards WHERE table_name = 'rep_ref'::regclass AND nodeport = :worker_2_port; count --------------------------------------------------------------------- 0 (1 row) SELECT replicate_reference_tables('block_writes'); replicate_reference_tables --------------------------------------------------------------------- (1 row) SELECT count(*) FROM citus_shards WHERE table_name = 'rep_ref'::regclass AND nodeport = :worker_2_port; count --------------------------------------------------------------------- 1 (1 row) -- test fix_partition_shard_index_names SET citus.next_shard_id TO 3820000; CREATE TABLE part_tbl_sing (dist_col int, another_col int, partition_col timestamp) PARTITION BY RANGE (partition_col); SELECT create_distributed_table('part_tbl_sing', NULL, colocate_with:='none'); create_distributed_table --------------------------------------------------------------------- (1 row) -- create a partition with a long name and another with a short name CREATE TABLE partition_table_with_very_long_name PARTITION OF part_tbl_sing FOR VALUES FROM ('2018-01-01') TO ('2019-01-01'); CREATE TABLE p PARTITION OF part_tbl_sing FOR VALUES FROM ('2019-01-01') TO ('2020-01-01'); -- create an index on parent table -- we will see that it doesn't matter whether we name the index on parent or not -- indexes auto-generated on partitions will not use this name -- SELECT fix_partition_shard_index_names('dist_partitioned_table') will be executed -- automatically at the end of the CREATE INDEX command CREATE INDEX short ON part_tbl_sing USING btree (another_col, partition_col); SELECT tablename, indexname FROM pg_indexes WHERE schemaname = 'null_dist_key_udfs' AND tablename SIMILAR TO 'p%' ORDER BY 1, 2; tablename | indexname --------------------------------------------------------------------- p | p_another_col_partition_col_idx part_tbl_sing | short partition_table_with_very_long_name | partition_table_with_very_long_na_another_col_partition_col_idx (3 rows) SELECT nodeport AS part_tbl_sing_port FROM citus_shards WHERE table_name = 'part_tbl_sing'::regclass AND nodeport IN (:worker_1_port, :worker_2_port) \gset \c - - - :part_tbl_sing_port -- the names are generated correctly -- shard id has been appended to all index names which didn't end in shard id -- this goes in line with Citus's way of naming indexes of shards: always append shardid to the end SELECT tablename, indexname FROM pg_indexes WHERE schemaname = 'null_dist_key_udfs' AND tablename SIMILAR TO 'p%\_\d*' ORDER BY 1, 2; tablename | indexname --------------------------------------------------------------------- p_3820002 | p_another_col_partition_col_idx_3820002 part_tbl_sing_3820000 | short_3820000 partition_table_with_very_long_name_3820001 | partition_table_with_very_long_na_another_col__dd884a3b_3820001 (3 rows) \c - - - :master_port SET search_path TO null_dist_key_udfs; --test isolate_tenant_to_new_shard CREATE TABLE iso_tbl (a INT); SET citus.shard_replication_factor TO 1; SELECT create_distributed_table('iso_tbl', NULL, colocate_with:='none'); create_distributed_table --------------------------------------------------------------------- (1 row) SELECT isolate_tenant_to_new_shard('iso_tbl', 5); ERROR: cannot isolate tenant because tenant isolation is only support for hash distributed tables -- test replicate_table_shards CREATE TABLE rep_tbl (a INT); SELECT create_distributed_table('rep_tbl', NULL, colocate_with:='none'); create_distributed_table --------------------------------------------------------------------- (1 row) SELECT replicate_table_shards('rep_tbl'); ERROR: cannot replicate single shard tables' shards -- test debug_equality_expression CREATE FUNCTION debug_equality_expression(regclass) RETURNS cstring AS 'citus' LANGUAGE C STRICT; CREATE TABLE debug_tbl (a INT); SELECT create_distributed_table ('debug_tbl', NULL, colocate_with:='none'); create_distributed_table --------------------------------------------------------------------- (1 row) SELECT debug_equality_expression('debug_tbl'::regclass); ERROR: table needs to be hash distributed -- test partition_column_id CREATE FUNCTION partition_column_id(regclass) RETURNS smallint AS 'citus' LANGUAGE C STRICT; CREATE TABLE partcol_tbl (a INT); SELECT create_distributed_table ('partcol_tbl', NULL, colocate_with:='none'); create_distributed_table --------------------------------------------------------------------- (1 row) SELECT partition_column_id('partcol_tbl'::regclass); ERROR: table needs to be hash distributed -- test citus_shard_cost_by_disk_size CREATE TABLE size_tbl_dist (a INT, b TEXT); SELECT create_distributed_table('size_tbl_dist', 'a', shard_count:=4, colocate_with:='none'); create_distributed_table --------------------------------------------------------------------- (1 row) CREATE TABLE size_tbl_single (a INT, b TEXT); SELECT create_distributed_table('size_tbl_single', NULL, colocate_with:='none'); create_distributed_table --------------------------------------------------------------------- (1 row) INSERT INTO size_tbl_dist SELECT 1, '1234567890' FROM generate_series(1, 10000); INSERT INTO size_tbl_single SELECT 1, '1234567890' FROM generate_series(1, 10000); SELECT citus_shard_cost_by_disk_size(c1.shardid) = citus_shard_cost_by_disk_size(c2.shardid) AS equal_cost FROM citus_shards c1, citus_shards c2 WHERE c1.table_name::TEXT = 'size_tbl_dist' AND c2.table_name::TEXT = 'size_tbl_single' ORDER BY c1.shard_size DESC LIMIT 1; equal_cost --------------------------------------------------------------------- t (1 row) -- test update statistics UDFs CREATE TABLE update_tbl_stat (a INT, b TEXT); SELECT create_distributed_table('update_tbl_stat', NULL, colocate_with:='none'); create_distributed_table --------------------------------------------------------------------- (1 row) SELECT shardid AS update_tbl_stat_shard FROM citus_shards WHERE table_name::TEXT = 'update_tbl_stat' LIMIT 1 \gset SELECT shardlength > 0 FROM pg_dist_shard_placement WHERE shardid = :update_tbl_stat_shard LIMIT 1; ?column? --------------------------------------------------------------------- f (1 row) INSERT INTO update_tbl_stat SELECT 1, '1234567890' FROM generate_series(1, 10000); SELECT shardlength > 0 FROM pg_dist_shard_placement WHERE shardid = :update_tbl_stat_shard LIMIT 1; ?column? --------------------------------------------------------------------- f (1 row) SELECT citus_update_table_statistics('update_tbl_stat'); citus_update_table_statistics --------------------------------------------------------------------- (1 row) SELECT shardlength > 0 FROM pg_dist_shard_placement WHERE shardid = :update_tbl_stat_shard LIMIT 1; ?column? --------------------------------------------------------------------- t (1 row) CREATE TABLE update_shard_stat (a INT, b TEXT); SELECT create_distributed_table('update_shard_stat', NULL, colocate_with:='none'); create_distributed_table --------------------------------------------------------------------- (1 row) SELECT shardid AS update_shard_stat_shard FROM citus_shards WHERE table_name::TEXT = 'update_shard_stat' LIMIT 1 \gset SELECT shardlength > 0 FROM pg_dist_shard_placement WHERE shardid = :update_shard_stat_shard LIMIT 1; ?column? --------------------------------------------------------------------- f (1 row) INSERT INTO update_shard_stat SELECT 1, '1234567890' FROM generate_series(1, 10000); SELECT shardlength > 0 FROM pg_dist_shard_placement WHERE shardid = :update_shard_stat_shard LIMIT 1; ?column? --------------------------------------------------------------------- f (1 row) SELECT 1 FROM citus_update_shard_statistics(:update_shard_stat_shard); ?column? --------------------------------------------------------------------- 1 (1 row) SELECT shardlength > 0 FROM pg_dist_shard_placement WHERE shardid = :update_shard_stat_shard LIMIT 1; ?column? --------------------------------------------------------------------- t (1 row) -- test citus clock SET citus.enable_cluster_clock TO ON; CREATE TABLE clock_single(a INT); SELECT create_distributed_table('clock_single', NULL, colocate_with:='none'); create_distributed_table --------------------------------------------------------------------- (1 row) SELECT citus_get_node_clock() AS nc1 \gset SELECT citus_get_node_clock() AS nc2 \gset SELECT citus_get_node_clock() AS nc3 \gset SELECT citus_is_clock_after(:'nc2', :'nc1'); citus_is_clock_after --------------------------------------------------------------------- t (1 row) SELECT citus_is_clock_after(:'nc3', :'nc2'); citus_is_clock_after --------------------------------------------------------------------- t (1 row) BEGIN; SELECT citus_get_node_clock() AS nc4 \gset COPY clock_single FROM STDIN; SELECT citus_get_node_clock() AS nc5 \gset END; SELECT citus_is_clock_after(:'nc4', :'nc3'); citus_is_clock_after --------------------------------------------------------------------- t (1 row) SELECT citus_is_clock_after(:'nc5', :'nc4'); citus_is_clock_after --------------------------------------------------------------------- t (1 row) BEGIN; SELECT citus_get_transaction_clock(); citus_get_transaction_clock --------------------------------------------------------------------- (xxxxxxxxxxxxx,x) (1 row) END; -- Transaction with single shard table access SELECT nodeport AS clock_shard_nodeport FROM citus_shards WHERE table_name::text = 'clock_single' AND nodeport IN (:worker_1_port, :worker_2_port) \gset BEGIN; COPY clock_single FROM STDIN; SELECT get_current_transaction_id() \gset tid SET client_min_messages TO DEBUG1; -- Capture the transaction timestamp SELECT citus_get_transaction_clock() as txnclock \gset DEBUG: node xxxx transaction clock xxxxxx DEBUG: node xxxx transaction clock xxxxxx DEBUG: final global transaction clock xxxxxx COMMIT; -- Check to see if the clock is persisted in the sequence. SELECT result as logseq from run_command_on_workers($$SELECT last_value FROM pg_dist_clock_logical_seq$$) WHERE nodeport = :clock_shard_nodeport \gset SELECT cluster_clock_logical(:'txnclock') as txnlog \gset SELECT :logseq = :txnlog; ?column? --------------------------------------------------------------------- t (1 row) BEGIN; COPY clock_single FROM STDIN; SELECT get_current_transaction_id() \gset tid SET client_min_messages TO DEBUG1; -- Capture the transaction timestamp SELECT citus_get_transaction_clock() as txnclock \gset DEBUG: node xxxx transaction clock xxxxxx DEBUG: node xxxx transaction clock xxxxxx DEBUG: final global transaction clock xxxxxx ROLLBACK; SELECT result as logseq from run_command_on_workers($$SELECT last_value FROM pg_dist_clock_logical_seq$$) WHERE nodeport = :clock_shard_nodeport \gset SELECT cluster_clock_logical(:'txnclock') as txnlog \gset SELECT :logseq = :txnlog; ?column? --------------------------------------------------------------------- t (1 row) -- test table with space in its name in citus_shards CREATE TABLE "t b l" (a INT); SELECT create_distributed_table('"t b l"', NULL, colocate_with:='none'); create_distributed_table --------------------------------------------------------------------- (1 row) SELECT table_name, shard_size FROM citus_shards WHERE table_name = '"t b l"'::regclass AND nodeport IN (:worker_1_port, :worker_2_port); table_name | shard_size --------------------------------------------------------------------- "t b l" | 0 (1 row) SET client_min_messages TO WARNING; DROP SCHEMA null_dist_key_udfs CASCADE;