-- -- MULTI CITUS TOOLS -- -- tests UDFs created for citus tools -- ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1240000; ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1240000; -- the function is not exposed explicitly, create the entry point CREATE OR REPLACE FUNCTION master_run_on_worker(worker_name text[], port integer[], command text[], parallel boolean default false, OUT node_name text, OUT node_port integer, OUT success boolean, OUT result text) RETURNS SETOF record LANGUAGE C STABLE STRICT AS 'citus.so', $$master_run_on_worker$$; -- test with invalid port, prevent OS dependent warning from being displayed SET client_min_messages to ERROR; SELECT * FROM master_run_on_worker(ARRAY['localhost']::text[], ARRAY['666']::int[], ARRAY['select count(*) from pg_dist_shard']::text[], false); node_name | node_port | success | result -----------+-----------+---------+------------------------------------ localhost | 666 | f | failed to connect to localhost:666 (1 row) SELECT * FROM master_run_on_worker(ARRAY['localhost']::text[], ARRAY['666']::int[], ARRAY['select count(*) from pg_dist_shard']::text[], true); node_name | node_port | success | result -----------+-----------+---------+------------------------------------ localhost | 666 | f | failed to connect to localhost:666 (1 row) RESET client_min_messages; -- store worker node name and port SELECT quote_literal(node_name) as node_name, node_port as node_port FROM master_get_active_worker_nodes() ORDER BY node_port LIMIT 1 \gset -- connect to the first worker and ask for shard count, should return 0 SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[], ARRAY['select count(*) from pg_dist_shard']::text[], false); node_name | node_port | success | result -----------+-----------+---------+-------- localhost | 57637 | t | 0 (1 row) -- connect to the first worker and ask for shards, should fail with -- expecting a single column error SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[], ARRAY['select * from pg_dist_shard']::text[], false); node_name | node_port | success | result -----------+-----------+---------+------------------------------------------ localhost | 57637 | f | expected a single column in query target (1 row) -- query result may only contain a single row SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[], ARRAY['select a from generate_series(1,2) a']::text[], false); node_name | node_port | success | result -----------+-----------+---------+--------------------------------------- localhost | 57637 | f | expected a single row in query result (1 row) -- send multiple queries SELECT * FROM master_run_on_worker(ARRAY[:node_name, :node_name]::text[], ARRAY[:node_port, :node_port]::int[], ARRAY['select a from generate_series(1,1) a', 'select a from generate_series(2,2) a']::text[], false); node_name | node_port | success | result -----------+-----------+---------+-------- localhost | 57637 | t | 1 localhost | 57637 | t | 2 (2 rows) -- send multiple queries, one fails SELECT * FROM master_run_on_worker(ARRAY[:node_name, :node_name]::text[], ARRAY[:node_port, :node_port]::int[], ARRAY['select a from generate_series(1,1) a', 'select a from generate_series(1,2) a']::text[], false); node_name | node_port | success | result -----------+-----------+---------+--------------------------------------- localhost | 57637 | t | 1 localhost | 57637 | f | expected a single row in query result (2 rows) -- send multiple queries, both fail SELECT * FROM master_run_on_worker(ARRAY[:node_name, :node_name]::text[], ARRAY[:node_port, :node_port]::int[], ARRAY['select a from generate_series(1,2) a', 'select a from generate_series(1,2) a']::text[], false); node_name | node_port | success | result -----------+-----------+---------+--------------------------------------- localhost | 57637 | f | expected a single row in query result localhost | 57637 | f | expected a single row in query result (2 rows) -- can create tables at worker SELECT * FROM master_run_on_worker(ARRAY[:node_name, :node_name]::text[], ARRAY[:node_port, :node_port]::int[], ARRAY['create table first_table(a int, b int)', 'create table second_table(a int, b int)']::text[], false); node_name | node_port | success | result -----------+-----------+---------+-------------- localhost | 57637 | t | CREATE TABLE localhost | 57637 | t | CREATE TABLE (2 rows) -- can insert into table SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[], ARRAY['insert into first_table select a,a from generate_series(1,20) a']::text[], false); node_name | node_port | success | result -----------+-----------+---------+------------- localhost | 57637 | t | INSERT 0 20 (1 row) SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[], ARRAY['select count(*) from first_table']::text[], false); node_name | node_port | success | result -----------+-----------+---------+-------- localhost | 57637 | t | 20 (1 row) -- insert into second table twice SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[], ARRAY['insert into second_table select * from first_table']::text[], false); node_name | node_port | success | result -----------+-----------+---------+------------- localhost | 57637 | t | INSERT 0 20 (1 row) SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[], ARRAY['insert into second_table select * from first_table']::text[], false); node_name | node_port | success | result -----------+-----------+---------+------------- localhost | 57637 | t | INSERT 0 20 (1 row) -- check inserted values at second table SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[], ARRAY['select count(*) from second_table']::text[], false); node_name | node_port | success | result -----------+-----------+---------+-------- localhost | 57637 | t | 40 (1 row) -- store worker node name and port again -- previously set variables become unusable after some number of uses SELECT quote_literal(node_name) as node_name, node_port as node_port FROM master_get_active_worker_nodes() ORDER BY node_port LIMIT 1 \gset -- create index on tables SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[], ARRAY['create index first_table_index on first_table(a)']::text[], false); node_name | node_port | success | result -----------+-----------+---------+-------------- localhost | 57637 | t | CREATE INDEX (1 row) -- drop created tables SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[], ARRAY['drop table first_table']::text[], false); node_name | node_port | success | result -----------+-----------+---------+------------ localhost | 57637 | t | DROP TABLE (1 row) SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[], ARRAY['drop table second_table']::text[], false); node_name | node_port | success | result -----------+-----------+---------+------------ localhost | 57637 | t | DROP TABLE (1 row) -- verify table is dropped SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[], ARRAY['select count(*) from second_table']::text[], false); node_name | node_port | success | result -----------+-----------+---------+------------------------------------------------ localhost | 57637 | f | ERROR: relation "second_table" does not exist (1 row) -- -- Run the same tests in parallel -- -- connect to the first worker and ask for shard count, should return 0 SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[], ARRAY['select count(*) from pg_dist_shard']::text[], true); node_name | node_port | success | result -----------+-----------+---------+-------- localhost | 57637 | t | 0 (1 row) -- connect to the first worker and ask for shards, should fail with -- expecting a single column error SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[], ARRAY['select * from pg_dist_shard']::text[], true); node_name | node_port | success | result -----------+-----------+---------+------------------------------------------ localhost | 57637 | f | expected a single column in query target (1 row) -- query result may only contain a single row SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[], ARRAY['select a from generate_series(1,2) a']::text[], true); node_name | node_port | success | result -----------+-----------+---------+--------------------------------------- localhost | 57637 | f | expected a single row in query result (1 row) -- send multiple queries SELECT * FROM master_run_on_worker(ARRAY[:node_name, :node_name]::text[], ARRAY[:node_port, :node_port]::int[], ARRAY['select a from generate_series(1,1) a', 'select a from generate_series(2,2) a']::text[], true); node_name | node_port | success | result -----------+-----------+---------+-------- localhost | 57637 | t | 1 localhost | 57637 | t | 2 (2 rows) -- send multiple queries, one fails SELECT * FROM master_run_on_worker(ARRAY[:node_name, :node_name]::text[], ARRAY[:node_port, :node_port]::int[], ARRAY['select a from generate_series(1,1) a', 'select a from generate_series(1,2) a']::text[], true); node_name | node_port | success | result -----------+-----------+---------+--------------------------------------- localhost | 57637 | t | 1 localhost | 57637 | f | expected a single row in query result (2 rows) -- send multiple queries, both fail SELECT * FROM master_run_on_worker(ARRAY[:node_name, :node_name]::text[], ARRAY[:node_port, :node_port]::int[], ARRAY['select a from generate_series(1,2) a', 'select a from generate_series(1,2) a']::text[], true); node_name | node_port | success | result -----------+-----------+---------+--------------------------------------- localhost | 57637 | f | expected a single row in query result localhost | 57637 | f | expected a single row in query result (2 rows) -- can create tables at worker SELECT * FROM master_run_on_worker(ARRAY[:node_name, :node_name]::text[], ARRAY[:node_port, :node_port]::int[], ARRAY['create table first_table(a int, b int)', 'create table second_table(a int, b int)']::text[], true); node_name | node_port | success | result -----------+-----------+---------+-------------- localhost | 57637 | t | CREATE TABLE localhost | 57637 | t | CREATE TABLE (2 rows) -- store worker node name and port again -- previously set variables become unusable after some number of uses SELECT quote_literal(node_name) as node_name, node_port as node_port FROM master_get_active_worker_nodes() ORDER BY node_port LIMIT 1 \gset -- can insert into table SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[], ARRAY['insert into first_table select a,a from generate_series(1,20) a']::text[], true); node_name | node_port | success | result -----------+-----------+---------+------------- localhost | 57637 | t | INSERT 0 20 (1 row) SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[], ARRAY['select count(*) from first_table']::text[], true); node_name | node_port | success | result -----------+-----------+---------+-------- localhost | 57637 | t | 20 (1 row) -- insert into second table twice SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[], ARRAY['insert into second_table select * from first_table']::text[], true); node_name | node_port | success | result -----------+-----------+---------+------------- localhost | 57637 | t | INSERT 0 20 (1 row) SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[], ARRAY['insert into second_table select * from first_table']::text[], true); node_name | node_port | success | result -----------+-----------+---------+------------- localhost | 57637 | t | INSERT 0 20 (1 row) -- check inserted values at second table SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[], ARRAY['select count(*) from second_table']::text[], true); node_name | node_port | success | result -----------+-----------+---------+-------- localhost | 57637 | t | 40 (1 row) -- create index on tables SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[], ARRAY['create index first_table_index on first_table(a)']::text[], true); node_name | node_port | success | result -----------+-----------+---------+-------------- localhost | 57637 | t | CREATE INDEX (1 row) -- drop created tables SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[], ARRAY['drop table first_table']::text[], true); node_name | node_port | success | result -----------+-----------+---------+------------ localhost | 57637 | t | DROP TABLE (1 row) SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[], ARRAY['drop table second_table']::text[], true); node_name | node_port | success | result -----------+-----------+---------+------------ localhost | 57637 | t | DROP TABLE (1 row) -- verify table is dropped SELECT * FROM master_run_on_worker(ARRAY[:node_name]::text[], ARRAY[:node_port]::int[], ARRAY['select count(*) from second_table']::text[], true); node_name | node_port | success | result -----------+-----------+---------+------------------------------------------------ localhost | 57637 | f | ERROR: relation "second_table" does not exist (1 row) -- drop the function after use DROP FUNCTION master_run_on_worker(worker_name text[], port integer[], command text[], parallel boolean, OUT node_name text, OUT node_port integer, OUT success boolean, OUT result text);