mirror of https://github.com/citusdata/citus.git
Add tests for citus tools
parent
8c8f04a0c2
commit
c8f6d1c7af
|
@ -214,3 +214,86 @@ DROP FUNCTION master_run_on_worker(worker_name text[], port integer[], command t
|
||||||
parallel boolean, OUT node_name text, OUT node_port integer,
|
parallel boolean, OUT node_name text, OUT node_port integer,
|
||||||
OUT success boolean, OUT result text);
|
OUT success boolean, OUT result text);
|
||||||
|
|
||||||
|
-- load the tools to be tested
|
||||||
|
\i ../../backend/distributed/master/citus_tools.sql
|
||||||
|
|
||||||
|
-- replace master_run_on_worker with a stub so we can test funcs which call it
|
||||||
|
CREATE OR REPLACE FUNCTION master_run_on_worker(worker_name text[], port integer[],
|
||||||
|
command text[],
|
||||||
|
parallel boolean default false,
|
||||||
|
OUT node_name text, OUT node_port integer,
|
||||||
|
OUT success boolean, OUT result text)
|
||||||
|
RETURNS SETOF record AS $$
|
||||||
|
BEGIN
|
||||||
|
RETURN QUERY SELECT worker_host, worker_port, true, 'some result'::text
|
||||||
|
FROM unnest(worker_name, port) AS x(worker_host, worker_port);
|
||||||
|
END
|
||||||
|
$$ LANGUAGE 'plpgsql' STABLE STRICT;
|
||||||
|
|
||||||
|
-- emits one entry per worker
|
||||||
|
SELECT master_remove_node('localhost', :worker_1_port);
|
||||||
|
SELECT * FROM citus_run_on_all_workers('select 1');
|
||||||
|
SELECT master_add_node('localhost', :worker_1_port);
|
||||||
|
SELECT * FROM citus_run_on_all_workers('select 1');
|
||||||
|
|
||||||
|
-- make sure run_on_all_placements respects shardstate
|
||||||
|
CREATE TABLE check_placements (key int);
|
||||||
|
SELECT master_create_distributed_table('check_placements', 'key', 'hash');
|
||||||
|
SELECT master_create_worker_shards('check_placements', 5, 2);
|
||||||
|
SELECT * FROM citus_run_on_all_placements('check_placements', 'select 1');
|
||||||
|
UPDATE pg_dist_shard_placement SET shardstate = 3
|
||||||
|
WHERE shardid % 2 = 0 AND nodeport = :worker_1_port;
|
||||||
|
SELECT * FROM citus_run_on_all_placements('check_placements', 'select 1');
|
||||||
|
DROP TABLE check_placements CASCADE;
|
||||||
|
|
||||||
|
-- make sure run_on_all_colocated_placements correctly detects colocation
|
||||||
|
CREATE TABLE check_colocated (key int);
|
||||||
|
SELECT master_create_distributed_table('check_colocated', 'key', 'hash');
|
||||||
|
SELECT master_create_worker_shards('check_colocated', 5, 2);
|
||||||
|
CREATE TABLE second_table (key int);
|
||||||
|
SELECT master_create_distributed_table('second_table', 'key', 'hash');
|
||||||
|
SELECT master_create_worker_shards('second_table', 4, 2);
|
||||||
|
SELECT * FROM citus_run_on_all_colocated_placements('check_colocated', 'second_table',
|
||||||
|
'select 1');
|
||||||
|
-- even when the difference is in replication factor, an error is thrown
|
||||||
|
SELECT master_drop_all_shards('second_table'::regclass, current_schema(), 'second_table');
|
||||||
|
SELECT master_create_worker_shards('second_table', 5, 1);
|
||||||
|
SELECT * FROM citus_run_on_all_colocated_placements('check_colocated', 'second_table',
|
||||||
|
'select 1');
|
||||||
|
-- when everything matches, the command is run!
|
||||||
|
SELECT master_drop_all_shards('second_table'::regclass, current_schema(), 'second_table');
|
||||||
|
SELECT master_create_worker_shards('second_table', 5, 2);
|
||||||
|
SELECT * FROM citus_run_on_all_colocated_placements('check_colocated', 'second_table',
|
||||||
|
'select 1');
|
||||||
|
-- when a placement is invalid considers the tables to not be colocated
|
||||||
|
UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid = (
|
||||||
|
SELECT shardid FROM pg_dist_shard
|
||||||
|
WHERE nodeport = :worker_1_port AND logicalrelid = 'second_table'::regclass
|
||||||
|
LIMIT 1
|
||||||
|
);
|
||||||
|
SELECT * FROM citus_run_on_all_colocated_placements('check_colocated', 'second_table',
|
||||||
|
'select 1');
|
||||||
|
-- when multiple placements are invalid, still considers the tables to not be colocated
|
||||||
|
UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid = (
|
||||||
|
SELECT shardid FROM pg_dist_shard
|
||||||
|
WHERE nodeport = :worker_1_port AND logicalrelid = 'check_colocated'::regclass
|
||||||
|
LIMIT 1
|
||||||
|
);
|
||||||
|
SELECT * FROM citus_run_on_all_colocated_placements('check_colocated', 'second_table',
|
||||||
|
'select 1');
|
||||||
|
DROP TABLE check_colocated CASCADE;
|
||||||
|
DROP TABLE second_table CASCADE;
|
||||||
|
|
||||||
|
-- runs on all shards
|
||||||
|
CREATE TABLE check_shards (key int);
|
||||||
|
SELECT master_create_distributed_table('check_shards', 'key', 'hash');
|
||||||
|
SELECT master_create_worker_shards('check_shards', 5, 2);
|
||||||
|
SELECT * FROM citus_run_on_all_shards('check_shards', 'select 1');
|
||||||
|
UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid % 2 = 0;
|
||||||
|
SELECT * FROM citus_run_on_all_shards('check_shards', 'select 1');
|
||||||
|
DROP TABLE check_shards CASCADE;
|
||||||
|
|
||||||
|
-- clean up
|
||||||
|
DROP FUNCTION master_run_on_worker(worker_name text[], port integer[], command text[],
|
||||||
|
parallel boolean, OUT node_name text, OUT node_port integer,
|
||||||
|
OUT success boolean, OUT result text);
|
||||||
|
|
Loading…
Reference in New Issue