-- =================================================================== -- create test functions -- =================================================================== CREATE FUNCTION load_shard_id_array(regclass) RETURNS bigint[] AS 'citus' LANGUAGE C STRICT; CREATE FUNCTION load_shard_interval_array(bigint, anyelement) RETURNS anyarray AS 'citus' LANGUAGE C STRICT; CREATE FUNCTION load_shard_placement_array(bigint, bool) RETURNS text[] AS 'citus' LANGUAGE C STRICT; CREATE FUNCTION partition_column_id(regclass) RETURNS smallint AS 'citus' LANGUAGE C STRICT; CREATE FUNCTION partition_type(regclass) RETURNS "char" AS 'citus' LANGUAGE C STRICT; CREATE FUNCTION is_distributed_table(regclass) RETURNS boolean AS 'citus' LANGUAGE C STRICT; CREATE FUNCTION column_name_to_column_id(regclass, cstring) RETURNS smallint AS 'citus' LANGUAGE C STRICT; CREATE FUNCTION create_monolithic_shard_row(regclass) RETURNS bigint AS 'citus' LANGUAGE C STRICT; CREATE FUNCTION create_healthy_local_shard_placement_row(bigint) RETURNS void AS 'citus' LANGUAGE C STRICT; CREATE FUNCTION delete_shard_placement_row(bigint, text, bigint) RETURNS bool AS 'citus' LANGUAGE C STRICT; CREATE FUNCTION update_shard_placement_row_state(bigint, text, bigint, int) RETURNS bool AS 'citus' LANGUAGE C STRICT; CREATE FUNCTION acquire_shared_shard_lock(bigint) RETURNS void AS 'citus' LANGUAGE C STRICT; CREATE FUNCTION column_name_to_column(regclass, text) RETURNS text AS 'citus' LANGUAGE C STRICT; -- =================================================================== -- test distribution metadata functionality -- =================================================================== -- create hash distributed table CREATE TABLE events_hash ( id bigint, name text ); SELECT master_create_distributed_table('events_hash', 'name', 'hash'); master_create_distributed_table --------------------------------- (1 row) -- create worker shards SELECT master_create_worker_shards('events_hash', 4, 2); master_create_worker_shards ----------------------------- (1 row) -- set shardstate of one replication from each shard to 0 (invalid value) UPDATE pg_dist_shard_placement SET shardstate = 0 WHERE nodeport = 57638 AND shardid BETWEEN 103025 AND 103028; -- should see above shard identifiers SELECT load_shard_id_array('events_hash'); load_shard_id_array ------------------------------- {103025,103026,103027,103028} (1 row) -- should see array with first shard range SELECT load_shard_interval_array(103025, 0); load_shard_interval_array --------------------------- {-2147483648,-1073741825} (1 row) -- should even work for range-partitioned shards -- create range distributed table CREATE TABLE events_range ( id bigint, name text ); SELECT master_create_distributed_table('events_range', 'name', 'range'); master_create_distributed_table --------------------------------- (1 row) -- create empty shard SELECT master_create_empty_shard('events_range'); master_create_empty_shard --------------------------- 103029 (1 row) UPDATE pg_dist_shard SET shardminvalue = 'Aardvark', shardmaxvalue = 'Zebra' WHERE shardid = 103029; SELECT load_shard_interval_array(103029, ''::text); load_shard_interval_array --------------------------- {Aardvark,Zebra} (1 row) -- should see error for non-existent shard SELECT load_shard_interval_array(103030, 0); ERROR: could not find valid entry for shard 103030 -- should see two placements SELECT load_shard_placement_array(103026, false); load_shard_placement_array ----------------------------------- {localhost:57637,localhost:57638} (1 row) -- only one of which is finalized SELECT load_shard_placement_array(103026, true); load_shard_placement_array ---------------------------- {localhost:57637} (1 row) -- should see error for non-existent shard SELECT load_shard_placement_array(103031, false); WARNING: could not find any shard placements for shardId 103031 load_shard_placement_array ---------------------------- {} (1 row) -- should see column id of 'name' SELECT partition_column_id('events_hash'); partition_column_id --------------------- 2 (1 row) -- should see hash partition type and fail for non-distributed tables SELECT partition_type('events_hash'); partition_type ---------------- h (1 row) SELECT partition_type('pg_type'); ERROR: relation 1247 is not distributed -- should see true for events_hash, false for others SELECT is_distributed_table('events_hash'); is_distributed_table ---------------------- t (1 row) SELECT is_distributed_table('pg_type'); is_distributed_table ---------------------- f (1 row) SELECT is_distributed_table('pg_dist_shard'); is_distributed_table ---------------------- f (1 row) -- test underlying column name-id translation SELECT column_name_to_column_id('events_hash', 'name'); column_name_to_column_id -------------------------- 2 (1 row) SELECT column_name_to_column_id('events_hash', 'ctid'); ERROR: cannot reference system column "ctid" in relation "events_hash" SELECT column_name_to_column_id('events_hash', 'non_existent'); ERROR: column "non_existent" of relation "events_hash" does not exist -- drop shard rows (must drop placements first) DELETE FROM pg_dist_shard_placement WHERE shardid BETWEEN 103025 AND 103029; DELETE FROM pg_dist_shard WHERE logicalrelid = 'events_hash'::regclass; DELETE FROM pg_dist_shard WHERE logicalrelid = 'events_range'::regclass; -- verify that an eager load shows them missing SELECT load_shard_id_array('events_hash'); load_shard_id_array --------------------- {} (1 row) -- create second table to distribute CREATE TABLE customers ( id bigint, name text ); -- now we'll distribute using function calls but verify metadata manually... -- partition on id and manually inspect partition row INSERT INTO pg_dist_partition (logicalrelid, partmethod, partkey) VALUES ('customers'::regclass, 'h', column_name_to_column('customers'::regclass, 'id')); SELECT partmethod, partkey FROM pg_dist_partition WHERE logicalrelid = 'customers'::regclass; partmethod | partkey ------------+------------------------------------------------------------------------------------------------------------------------ h | {VAR :varno 1 :varattno 1 :vartype 20 :vartypmod -1 :varcollid 0 :varlevelsup 0 :varnoold 1 :varoattno 1 :location -1} (1 row) -- make one huge shard and manually inspect shard row SELECT create_monolithic_shard_row('customers') AS new_shard_id \gset SELECT shardstorage, shardminvalue, shardmaxvalue FROM pg_dist_shard WHERE shardid = :new_shard_id; shardstorage | shardminvalue | shardmaxvalue --------------+---------------+--------------- t | -2147483648 | 2147483647 (1 row) -- add a placement and manually inspect row SELECT create_healthy_local_shard_placement_row(:new_shard_id); create_healthy_local_shard_placement_row ------------------------------------------ (1 row) SELECT shardstate, nodename, nodeport FROM pg_dist_shard_placement WHERE shardid = :new_shard_id AND nodename = 'localhost' and nodeport = 5432; shardstate | nodename | nodeport ------------+-----------+---------- 1 | localhost | 5432 (1 row) -- mark it as unhealthy and inspect SELECT update_shard_placement_row_state(:new_shard_id, 'localhost', 5432, 3); update_shard_placement_row_state ---------------------------------- t (1 row) SELECT shardstate FROM pg_dist_shard_placement WHERE shardid = :new_shard_id AND nodename = 'localhost' and nodeport = 5432; shardstate ------------ 3 (1 row) -- remove it and verify it is gone SELECT delete_shard_placement_row(:new_shard_id, 'localhost', 5432); delete_shard_placement_row ---------------------------- t (1 row) SELECT COUNT(*) FROM pg_dist_shard_placement WHERE shardid = :new_shard_id AND nodename = 'localhost' and nodeport = 5432; count ------- 0 (1 row) -- deleting or updating a non-existent row should fail SELECT delete_shard_placement_row(:new_shard_id, 'wrong_localhost', 5432); ERROR: could not find valid entry for shard placement 103030 on node "wrong_localhost:5432" SELECT update_shard_placement_row_state(:new_shard_id, 'localhost', 5432, 3); ERROR: could not find valid entry for shard placement 103030 on node "localhost:5432" -- now we'll even test our lock methods... -- use transaction to bound how long we hold the lock BEGIN; -- pick up a shard lock and look for it in pg_locks SELECT acquire_shared_shard_lock(5); acquire_shared_shard_lock --------------------------- (1 row) SELECT objid, mode FROM pg_locks WHERE locktype = 'advisory' AND objid = 5; objid | mode -------+----------- 5 | ShareLock (1 row) -- commit should drop the lock COMMIT; -- lock should be gone now SELECT COUNT(*) FROM pg_locks WHERE locktype = 'advisory' AND objid = 5; count ------- 0 (1 row)