-- =================================================================== -- create test functions -- =================================================================== ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 540000; CREATE FUNCTION load_shard_id_array(regclass) RETURNS bigint[] AS 'citus' LANGUAGE C STRICT; CREATE FUNCTION load_shard_interval_array(bigint, anyelement) RETURNS anyarray AS 'citus' LANGUAGE C STRICT; CREATE FUNCTION load_shard_placement_array(bigint, bool) RETURNS text[] AS 'citus' LANGUAGE C STRICT; CREATE FUNCTION partition_column_id(regclass) RETURNS smallint AS 'citus' LANGUAGE C STRICT; CREATE FUNCTION partition_type(regclass) RETURNS "char" AS 'citus' LANGUAGE C STRICT; CREATE FUNCTION is_distributed_table(regclass) RETURNS boolean AS 'citus' LANGUAGE C STRICT; CREATE FUNCTION column_name_to_column_id(regclass, cstring) RETURNS smallint AS 'citus' LANGUAGE C STRICT; CREATE FUNCTION create_monolithic_shard_row(regclass) RETURNS bigint AS 'citus' LANGUAGE C STRICT; CREATE FUNCTION acquire_shared_shard_lock(bigint) RETURNS void AS 'citus' LANGUAGE C STRICT; CREATE FUNCTION relation_count_in_query(text) RETURNS int AS 'citus' LANGUAGE C STRICT; -- =================================================================== -- test distribution metadata functionality -- =================================================================== -- create hash distributed table CREATE TABLE events_hash ( id bigint, name text ); SELECT create_distributed_table('events_hash', 'name', 'hash'); create_distributed_table --------------------------------------------------------------------- (1 row) -- set shardstate of one replication from each shard to 0 (invalid value) UPDATE pg_dist_placement SET shardstate = 0 WHERE shardid BETWEEN 540000 AND 540003 AND groupid = (SELECT groupid FROM pg_dist_node WHERE nodeport = :worker_2_port); -- should see above shard identifiers SELECT load_shard_id_array('events_hash'); load_shard_id_array --------------------------------------------------------------------- {540000,540001,540002,540003} (1 row) -- should see array with first shard range SELECT load_shard_interval_array(540000, 0); load_shard_interval_array --------------------------------------------------------------------- {-2147483648,-1073741825} (1 row) -- should even work for range-partitioned shards -- create range distributed table CREATE TABLE events_range ( id bigint, name text ); SELECT master_create_distributed_table('events_range', 'name', 'range'); master_create_distributed_table --------------------------------------------------------------------- (1 row) -- create empty shard SELECT master_create_empty_shard('events_range'); master_create_empty_shard --------------------------------------------------------------------- 540004 (1 row) UPDATE pg_dist_shard SET shardminvalue = 'Aardvark', shardmaxvalue = 'Zebra' WHERE shardid = 540004; SELECT load_shard_interval_array(540004, ''::text); load_shard_interval_array --------------------------------------------------------------------- {Aardvark,Zebra} (1 row) -- should see error for non-existent shard SELECT load_shard_interval_array(540005, 0); ERROR: could not find valid entry for shard xxxxx -- should see two placements SELECT load_shard_placement_array(540001, false); load_shard_placement_array --------------------------------------------------------------------- {localhost:xxxxx,localhost:xxxxx} (1 row) -- only one of which is active SELECT load_shard_placement_array(540001, true); load_shard_placement_array --------------------------------------------------------------------- {localhost:xxxxx} (1 row) -- should see error for non-existent shard SELECT load_shard_placement_array(540001, false); load_shard_placement_array --------------------------------------------------------------------- {localhost:xxxxx,localhost:xxxxx} (1 row) -- should see column id of 'name' SELECT partition_column_id('events_hash'); partition_column_id --------------------------------------------------------------------- 2 (1 row) -- should see hash partition type and fail for non-distributed tables SELECT partition_type('events_hash'); partition_type --------------------------------------------------------------------- h (1 row) SELECT partition_type('pg_type'); ERROR: relation pg_type is not distributed -- should see true for events_hash, false for others SELECT is_distributed_table('events_hash'); is_distributed_table --------------------------------------------------------------------- t (1 row) SELECT is_distributed_table('pg_type'); is_distributed_table --------------------------------------------------------------------- f (1 row) SELECT is_distributed_table('pg_dist_shard'); is_distributed_table --------------------------------------------------------------------- f (1 row) -- test underlying column name-id translation SELECT column_name_to_column_id('events_hash', 'name'); column_name_to_column_id --------------------------------------------------------------------- 2 (1 row) SELECT column_name_to_column_id('events_hash', 'ctid'); ERROR: cannot reference system column "ctid" in relation "events_hash" SELECT column_name_to_column_id('events_hash', 'non_existent'); ERROR: column "non_existent" of relation "events_hash" does not exist -- drop shard rows (must drop placements first) DELETE FROM pg_dist_placement WHERE shardid BETWEEN 540000 AND 540004; DELETE FROM pg_dist_shard WHERE logicalrelid = 'events_hash'::regclass; DELETE FROM pg_dist_shard WHERE logicalrelid = 'events_range'::regclass; -- verify that an eager load shows them missing SELECT load_shard_id_array('events_hash'); load_shard_id_array --------------------------------------------------------------------- {} (1 row) -- create second table to distribute CREATE TABLE customers ( id bigint, name text ); -- now we'll distribute using function calls but verify metadata manually... -- partition on id and manually inspect partition row INSERT INTO pg_dist_partition (logicalrelid, partmethod, partkey) VALUES ('customers'::regclass, 'h', column_name_to_column('customers'::regclass, 'id')); SELECT partmethod, column_to_column_name(logicalrelid, partkey) FROM pg_dist_partition WHERE logicalrelid = 'customers'::regclass; partmethod | column_to_column_name --------------------------------------------------------------------- h | id (1 row) -- test column_to_column_name with illegal arguments SELECT column_to_column_name(1204127312,''); ERROR: not a valid column SELECT column_to_column_name('customers',''); ERROR: not a valid column SELECT column_to_column_name('pg_dist_node'::regclass, NULL); column_to_column_name --------------------------------------------------------------------- (1 row) SELECT column_to_column_name('pg_dist_node'::regclass,'{FROMEXPR :fromlist ({RANGETBLREF :rtindex 1 }) :quals <>}'); ERROR: not a valid column -- test column_name_to_column with illegal arguments SELECT column_name_to_column(1204127312,''); ERROR: could not open relation with OID 1204127312 SELECT column_name_to_column('customers','notacolumn'); ERROR: column "notacolumn" of relation "customers" does not exist -- make one huge shard and manually inspect shard row SELECT create_monolithic_shard_row('customers') AS new_shard_id \gset SELECT shardstorage, shardminvalue, shardmaxvalue FROM pg_dist_shard WHERE shardid = :new_shard_id; shardstorage | shardminvalue | shardmaxvalue --------------------------------------------------------------------- t | -2147483648 | 2147483647 (1 row) -- now we'll even test our lock methods... -- use transaction to bound how long we hold the lock BEGIN; -- pick up a shard lock and look for it in pg_locks SELECT acquire_shared_shard_lock(5); acquire_shared_shard_lock --------------------------------------------------------------------- (1 row) SELECT objid, mode FROM pg_locks WHERE locktype = 'advisory' AND objid = 5; objid | mode --------------------------------------------------------------------- 5 | ShareLock (1 row) -- commit should drop the lock COMMIT; -- lock should be gone now SELECT COUNT(*) FROM pg_locks WHERE locktype = 'advisory' AND objid = 5; count --------------------------------------------------------------------- 0 (1 row) -- test get_shard_id_for_distribution_column SET citus.shard_count TO 4; CREATE TABLE get_shardid_test_table1(column1 int, column2 int); SELECT create_distributed_table('get_shardid_test_table1', 'column1'); create_distributed_table --------------------------------------------------------------------- (1 row) \COPY get_shardid_test_table1 FROM STDIN with delimiter '|'; SELECT get_shard_id_for_distribution_column('get_shardid_test_table1', 1); get_shard_id_for_distribution_column --------------------------------------------------------------------- 540006 (1 row) SELECT get_shard_id_for_distribution_column('get_shardid_test_table1', 2); get_shard_id_for_distribution_column --------------------------------------------------------------------- 540009 (1 row) SELECT get_shard_id_for_distribution_column('get_shardid_test_table1', 3); get_shard_id_for_distribution_column --------------------------------------------------------------------- 540007 (1 row) -- verify result of the get_shard_id_for_distribution_column \c - - - :worker_1_port SELECT * FROM get_shardid_test_table1_540006; column1 | column2 --------------------------------------------------------------------- 1 | 1 (1 row) SELECT * FROM get_shardid_test_table1_540009; column1 | column2 --------------------------------------------------------------------- 2 | 2 (1 row) SELECT * FROM get_shardid_test_table1_540007; column1 | column2 --------------------------------------------------------------------- 3 | 3 (1 row) \c - - - :master_port -- test non-existing value SELECT get_shard_id_for_distribution_column('get_shardid_test_table1', 4); get_shard_id_for_distribution_column --------------------------------------------------------------------- 540007 (1 row) -- test array type SET citus.shard_count TO 4; CREATE TABLE get_shardid_test_table2(column1 text[], column2 int); SELECT create_distributed_table('get_shardid_test_table2', 'column1'); create_distributed_table --------------------------------------------------------------------- (1 row) \COPY get_shardid_test_table2 FROM STDIN with delimiter '|'; SELECT get_shard_id_for_distribution_column('get_shardid_test_table2', '{a, b, c}'); get_shard_id_for_distribution_column --------------------------------------------------------------------- 540013 (1 row) SELECT get_shard_id_for_distribution_column('get_shardid_test_table2', '{d, e, f}'); get_shard_id_for_distribution_column --------------------------------------------------------------------- 540011 (1 row) -- verify result of the get_shard_id_for_distribution_column \c - - - :worker_1_port SELECT * FROM get_shardid_test_table2_540013; column1 | column2 --------------------------------------------------------------------- {a,b,c} | 1 (1 row) SELECT * FROM get_shardid_test_table2_540011; column1 | column2 --------------------------------------------------------------------- {d,e,f} | 2 (1 row) \c - - - :master_port -- test mismatching data type SELECT get_shard_id_for_distribution_column('get_shardid_test_table2', 'a'); ERROR: malformed array literal: "a" DETAIL: Array value must start with "{" or dimension information. -- test NULL distribution column value for hash distributed table SELECT get_shard_id_for_distribution_column('get_shardid_test_table2'); ERROR: distribution value cannot be NULL for tables other than reference tables. SELECT get_shard_id_for_distribution_column('get_shardid_test_table2', NULL); ERROR: distribution value cannot be NULL for tables other than reference tables. -- test non-distributed table CREATE TABLE get_shardid_test_table3(column1 int, column2 int); SELECT get_shard_id_for_distribution_column('get_shardid_test_table3', 1); ERROR: relation is not distributed -- test append distributed table SELECT create_distributed_table('get_shardid_test_table3', 'column1', 'append'); create_distributed_table --------------------------------------------------------------------- (1 row) SELECT get_shard_id_for_distribution_column('get_shardid_test_table3', 1); ERROR: finding shard id of given distribution value is only supported for hash partitioned tables, range partitioned tables and reference tables. -- test reference table; CREATE TABLE get_shardid_test_table4(column1 int, column2 int); SELECT create_reference_table('get_shardid_test_table4'); create_reference_table --------------------------------------------------------------------- (1 row) -- test NULL distribution column value for reference table SELECT get_shard_id_for_distribution_column('get_shardid_test_table4'); get_shard_id_for_distribution_column --------------------------------------------------------------------- 540014 (1 row) SELECT get_shard_id_for_distribution_column('get_shardid_test_table4', NULL); get_shard_id_for_distribution_column --------------------------------------------------------------------- 540014 (1 row) -- test different data types for reference table SELECT get_shard_id_for_distribution_column('get_shardid_test_table4', 1); get_shard_id_for_distribution_column --------------------------------------------------------------------- 540014 (1 row) SELECT get_shard_id_for_distribution_column('get_shardid_test_table4', 'a'); get_shard_id_for_distribution_column --------------------------------------------------------------------- 540014 (1 row) SELECT get_shard_id_for_distribution_column('get_shardid_test_table4', '{a, b, c}'); get_shard_id_for_distribution_column --------------------------------------------------------------------- 540014 (1 row) -- test range distributed table CREATE TABLE get_shardid_test_table5(column1 int, column2 int); SELECT create_distributed_table('get_shardid_test_table5', 'column1', 'range'); create_distributed_table --------------------------------------------------------------------- (1 row) -- create worker shards SELECT master_create_empty_shard('get_shardid_test_table5'); master_create_empty_shard --------------------------------------------------------------------- 540015 (1 row) SELECT master_create_empty_shard('get_shardid_test_table5'); master_create_empty_shard --------------------------------------------------------------------- 540016 (1 row) SELECT master_create_empty_shard('get_shardid_test_table5'); master_create_empty_shard --------------------------------------------------------------------- 540017 (1 row) SELECT master_create_empty_shard('get_shardid_test_table5'); master_create_empty_shard --------------------------------------------------------------------- 540018 (1 row) -- now the comparison is done via the partition column type, which is text UPDATE pg_dist_shard SET shardminvalue = 1, shardmaxvalue = 1000 WHERE shardid = 540015; UPDATE pg_dist_shard SET shardminvalue = 1001, shardmaxvalue = 2000 WHERE shardid = 540016; UPDATE pg_dist_shard SET shardminvalue = 2001, shardmaxvalue = 3000 WHERE shardid = 540017; UPDATE pg_dist_shard SET shardminvalue = 3001, shardmaxvalue = 4000 WHERE shardid = 540018; SELECT get_shard_id_for_distribution_column('get_shardid_test_table5', 5); get_shard_id_for_distribution_column --------------------------------------------------------------------- 540015 (1 row) SELECT get_shard_id_for_distribution_column('get_shardid_test_table5', 1111); get_shard_id_for_distribution_column --------------------------------------------------------------------- 540016 (1 row) SELECT get_shard_id_for_distribution_column('get_shardid_test_table5', 2689); get_shard_id_for_distribution_column --------------------------------------------------------------------- 540017 (1 row) SELECT get_shard_id_for_distribution_column('get_shardid_test_table5', 3248); get_shard_id_for_distribution_column --------------------------------------------------------------------- 540018 (1 row) -- test non-existing value for range distributed tables SELECT get_shard_id_for_distribution_column('get_shardid_test_table5', 4001); get_shard_id_for_distribution_column --------------------------------------------------------------------- 0 (1 row) SELECT get_shard_id_for_distribution_column('get_shardid_test_table5', -999); get_shard_id_for_distribution_column --------------------------------------------------------------------- 0 (1 row) SET citus.shard_count TO 2; CREATE TABLE events_table_count (user_id int, time timestamp, event_type int, value_2 int, value_3 float, value_4 bigint); SELECT create_distributed_table('events_table_count', 'user_id'); create_distributed_table --------------------------------------------------------------------- (1 row) CREATE TABLE users_table_count (user_id int, time timestamp, value_1 int, value_2 int, value_3 float, value_4 bigint); SELECT create_distributed_table('users_table_count', 'user_id'); create_distributed_table --------------------------------------------------------------------- (1 row) SELECT relation_count_in_query($$-- we can support arbitrary subqueries within UNIONs SELECT ("final_query"."event_types") as types, count(*) AS sumOfEventType FROM ( SELECT *, random() FROM (SELECT "t"."user_id", "t"."time", unnest("t"."collected_events") AS "event_types" FROM ( SELECT "t1"."user_id", min("t1"."time") AS "time", array_agg(("t1"."event") ORDER BY TIME ASC, event DESC) AS collected_events FROM ( (SELECT * FROM (SELECT events_table."time", 0 AS event, events_table."user_id" FROM "events_table_count" as events_table WHERE events_table.event_type IN (1, 2) ) events_subquery_1) UNION (SELECT * FROM ( SELECT * FROM ( SELECT max("events_table_count"."time"), 0 AS event, "events_table_count"."user_id" FROM "events_table_count", users_table_count as "users" WHERE "events_table_count".user_id = users.user_id AND "events_table_count".event_type IN (1, 2) GROUP BY "events_table_count"."user_id" ) as events_subquery_5 ) events_subquery_2) UNION (SELECT * FROM (SELECT "events_table_count"."time", 2 AS event, "events_table_count"."user_id" FROM "events_table_count" WHERE event_type IN (3, 4) ) events_subquery_3) UNION (SELECT * FROM (SELECT "events_table_count"."time", 3 AS event, "events_table_count"."user_id" FROM "events_table_count" WHERE event_type IN (5, 6)) events_subquery_4) ) t1 GROUP BY "t1"."user_id") AS t) "q" INNER JOIN (SELECT "events_table_count"."user_id" FROM users_table_count as "events_table_count" WHERE value_1 > 0 and value_1 < 4) AS t ON (t.user_id = q.user_id)) as final_query GROUP BY types ORDER BY types;$$); relation_count_in_query --------------------------------------------------------------------- 6 (1 row) -- clear unnecessary tables; DROP TABLE get_shardid_test_table1, get_shardid_test_table2, get_shardid_test_table3, get_shardid_test_table4, get_shardid_test_table5, events_table_count;