mirror of https://github.com/citusdata/citus.git
Merge c8f6d1c7af
into 8c8f04a0c2
commit
358e8e51b8
|
@ -214,3 +214,86 @@ DROP FUNCTION master_run_on_worker(worker_name text[], port integer[], command t
|
|||
parallel boolean, OUT node_name text, OUT node_port integer,
|
||||
OUT success boolean, OUT result text);
|
||||
|
||||
-- load the tools to be tested
|
||||
\i ../../backend/distributed/master/citus_tools.sql
|
||||
|
||||
-- replace master_run_on_worker with a stub so we can test funcs which call it
|
||||
CREATE OR REPLACE FUNCTION master_run_on_worker(worker_name text[], port integer[],
|
||||
command text[],
|
||||
parallel boolean default false,
|
||||
OUT node_name text, OUT node_port integer,
|
||||
OUT success boolean, OUT result text)
|
||||
RETURNS SETOF record AS $$
|
||||
BEGIN
|
||||
RETURN QUERY SELECT worker_host, worker_port, true, 'some result'::text
|
||||
FROM unnest(worker_name, port) AS x(worker_host, worker_port);
|
||||
END
|
||||
$$ LANGUAGE 'plpgsql' STABLE STRICT;
|
||||
|
||||
-- emits one entry per worker
|
||||
SELECT master_remove_node('localhost', :worker_1_port);
|
||||
SELECT * FROM citus_run_on_all_workers('select 1');
|
||||
SELECT master_add_node('localhost', :worker_1_port);
|
||||
SELECT * FROM citus_run_on_all_workers('select 1');
|
||||
|
||||
-- make sure run_on_all_placements respects shardstate
|
||||
CREATE TABLE check_placements (key int);
|
||||
SELECT master_create_distributed_table('check_placements', 'key', 'hash');
|
||||
SELECT master_create_worker_shards('check_placements', 5, 2);
|
||||
SELECT * FROM citus_run_on_all_placements('check_placements', 'select 1');
|
||||
UPDATE pg_dist_shard_placement SET shardstate = 3
|
||||
WHERE shardid % 2 = 0 AND nodeport = :worker_1_port;
|
||||
SELECT * FROM citus_run_on_all_placements('check_placements', 'select 1');
|
||||
DROP TABLE check_placements CASCADE;
|
||||
|
||||
-- make sure run_on_all_colocated_placements correctly detects colocation
|
||||
CREATE TABLE check_colocated (key int);
|
||||
SELECT master_create_distributed_table('check_colocated', 'key', 'hash');
|
||||
SELECT master_create_worker_shards('check_colocated', 5, 2);
|
||||
CREATE TABLE second_table (key int);
|
||||
SELECT master_create_distributed_table('second_table', 'key', 'hash');
|
||||
SELECT master_create_worker_shards('second_table', 4, 2);
|
||||
SELECT * FROM citus_run_on_all_colocated_placements('check_colocated', 'second_table',
|
||||
'select 1');
|
||||
-- even when the difference is in replication factor, an error is thrown
|
||||
SELECT master_drop_all_shards('second_table'::regclass, current_schema(), 'second_table');
|
||||
SELECT master_create_worker_shards('second_table', 5, 1);
|
||||
SELECT * FROM citus_run_on_all_colocated_placements('check_colocated', 'second_table',
|
||||
'select 1');
|
||||
-- when everything matches, the command is run!
|
||||
SELECT master_drop_all_shards('second_table'::regclass, current_schema(), 'second_table');
|
||||
SELECT master_create_worker_shards('second_table', 5, 2);
|
||||
SELECT * FROM citus_run_on_all_colocated_placements('check_colocated', 'second_table',
|
||||
'select 1');
|
||||
-- when a placement is invalid considers the tables to not be colocated
|
||||
UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid = (
|
||||
SELECT shardid FROM pg_dist_shard
|
||||
WHERE nodeport = :worker_1_port AND logicalrelid = 'second_table'::regclass
|
||||
LIMIT 1
|
||||
);
|
||||
SELECT * FROM citus_run_on_all_colocated_placements('check_colocated', 'second_table',
|
||||
'select 1');
|
||||
-- when multiple placements are invalid, still considers the tables to not be colocated
|
||||
UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid = (
|
||||
SELECT shardid FROM pg_dist_shard
|
||||
WHERE nodeport = :worker_1_port AND logicalrelid = 'check_colocated'::regclass
|
||||
LIMIT 1
|
||||
);
|
||||
SELECT * FROM citus_run_on_all_colocated_placements('check_colocated', 'second_table',
|
||||
'select 1');
|
||||
DROP TABLE check_colocated CASCADE;
|
||||
DROP TABLE second_table CASCADE;
|
||||
|
||||
-- runs on all shards
|
||||
CREATE TABLE check_shards (key int);
|
||||
SELECT master_create_distributed_table('check_shards', 'key', 'hash');
|
||||
SELECT master_create_worker_shards('check_shards', 5, 2);
|
||||
SELECT * FROM citus_run_on_all_shards('check_shards', 'select 1');
|
||||
UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid % 2 = 0;
|
||||
SELECT * FROM citus_run_on_all_shards('check_shards', 'select 1');
|
||||
DROP TABLE check_shards CASCADE;
|
||||
|
||||
-- clean up
|
||||
DROP FUNCTION master_run_on_worker(worker_name text[], port integer[], command text[],
|
||||
parallel boolean, OUT node_name text, OUT node_port integer,
|
||||
OUT success boolean, OUT result text);
|
||||
|
|
Loading…
Reference in New Issue