mirror of https://github.com/citusdata/citus.git
176 lines
6.8 KiB
PL/PgSQL
176 lines
6.8 KiB
PL/PgSQL
CREATE SCHEMA citus_schema_move;
|
|
SET search_path TO citus_schema_move;
|
|
|
|
SET citus.next_shard_id TO 2220000;
|
|
SET citus.shard_count TO 32;
|
|
SET citus.shard_replication_factor TO 1;
|
|
|
|
SET client_min_messages TO WARNING;
|
|
SELECT 1 FROM citus_add_node('localhost', :master_port, groupid => 0);
|
|
|
|
SELECT master_set_node_property('localhost', :master_port, 'shouldhaveshards', true);
|
|
|
|
-- Due to a race condition that happens in TransferShards() when the same shard id
|
|
-- is used to create the same shard on a different worker node, need to call
|
|
-- citus_cleanup_orphaned_resources() to clean up any orphaned resources before
|
|
-- running the tests.
|
|
--
|
|
-- See https://github.com/citusdata/citus/pull/7180#issuecomment-1706786615.
|
|
|
|
CALL citus_cleanup_orphaned_resources();
|
|
|
|
SET client_min_messages TO NOTICE;
|
|
|
|
-- test null input, should be no-op
|
|
SELECT citus_schema_move(schema_id=>null, target_node_name=>null, target_node_port=>null, shard_transfer_mode=>null);
|
|
SELECT citus_schema_move(schema_id=>null, target_node_id=>null, shard_transfer_mode=>null);
|
|
SELECT citus_schema_move(schema_id=>null, target_node_id=>null, shard_transfer_mode=>null);
|
|
|
|
SET citus.enable_schema_based_sharding TO ON;
|
|
|
|
CREATE SCHEMA s1;
|
|
|
|
-- test invalid schema
|
|
SELECT citus_schema_move('no_such_schema', 'dummy_node_name', 1234);
|
|
SELECT citus_schema_move('no_such_schema', 1234);
|
|
|
|
-- test regular schema
|
|
SELECT citus_schema_move('citus_schema_move', 'dummy_node_name', 1234);
|
|
SELECT citus_schema_move('citus_schema_move', 1234);
|
|
|
|
-- test empty distributed schema
|
|
SELECT citus_schema_move('s1', 'dummy_node_name', 1234);
|
|
SELECT citus_schema_move('s1', 1234);
|
|
|
|
CREATE TABLE s1.t1 (a int);
|
|
|
|
-- test invalid node name / port / id
|
|
SELECT citus_schema_move('s1', 'dummy_node_name', 1234);
|
|
SELECT citus_schema_move('s1', 1234);
|
|
|
|
-- errors due to missing pkey / replicate ident.
|
|
SELECT citus_schema_move('s1', nodename, nodeport) FROM pg_dist_node
|
|
WHERE isactive AND shouldhaveshards AND noderole='primary' AND
|
|
(nodename, nodeport) NOT IN (
|
|
SELECT nodename, nodeport FROM citus_shards WHERE table_name = 's1.t1'::regclass
|
|
);
|
|
|
|
-- errors as we try to move the schema to the same node
|
|
SELECT citus_schema_move('s1', nodename, nodeport, 'block_writes')
|
|
FROM citus_shards
|
|
JOIN pg_dist_node USING (nodename, nodeport)
|
|
WHERE noderole = 'primary' AND table_name = 's1.t1'::regclass;
|
|
|
|
SELECT citus_schema_move('s1', nodeid, 'block_writes')
|
|
FROM citus_shards
|
|
JOIN pg_dist_node USING (nodename, nodeport)
|
|
WHERE noderole = 'primary' AND table_name = 's1.t1'::regclass;
|
|
|
|
-- returns id, host name and host port of a non-coordinator node that given schema can be moved to
|
|
CREATE OR REPLACE FUNCTION get_non_coord_candidate_node_for_schema_move(
|
|
schema_id regnamespace)
|
|
RETURNS TABLE (nodeid integer, nodename text, nodeport integer)
|
|
SET search_path TO 'pg_catalog, public'
|
|
AS $func$
|
|
BEGIN
|
|
IF NOT EXISTS (SELECT 1 FROM pg_dist_schema WHERE schemaid = schema_id)
|
|
THEN
|
|
RAISE EXCEPTION '% is not a distributed schema', schema_id;
|
|
END IF;
|
|
|
|
CREATE TEMP TABLE nodeid_nodename_nodeport ON COMMIT DROP AS
|
|
SELECT pdn1.nodeid, pdn1.nodename, pdn1.nodeport
|
|
FROM pg_dist_node pdn1
|
|
WHERE isactive AND shouldhaveshards AND noderole='primary' AND groupid != 0 AND
|
|
(pdn1.nodename, pdn1.nodeport) NOT IN (
|
|
SELECT cs.nodename, cs.nodeport
|
|
FROM citus_shards cs
|
|
JOIN pg_dist_node pdn2
|
|
ON cs.nodename = pdn2.nodename AND cs.nodeport = pdn2.nodeport
|
|
WHERE pdn2.noderole='primary' AND starts_with(table_name::text, schema_id::text)
|
|
);
|
|
|
|
IF NOT EXISTS (SELECT 1 FROM nodeid_nodename_nodeport)
|
|
THEN
|
|
RAISE EXCEPTION 'could not determine a node to move the schema to';
|
|
END IF;
|
|
|
|
RETURN QUERY SELECT * FROM nodeid_nodename_nodeport LIMIT 1;
|
|
END;
|
|
$func$ LANGUAGE plpgsql;
|
|
|
|
CREATE TABLE s1.t2 (a int);
|
|
|
|
-- move the schema to a different node
|
|
|
|
SELECT nodeid AS s1_new_nodeid, quote_literal(nodename) AS s1_new_nodename, nodeport AS s1_new_nodeport
|
|
FROM get_non_coord_candidate_node_for_schema_move('s1') \gset
|
|
|
|
SELECT citus_schema_move('s1', :s1_new_nodename, :s1_new_nodeport, 'block_writes');
|
|
|
|
SELECT (:s1_new_nodename, :s1_new_nodeport) = ALL(SELECT nodename, nodeport FROM citus_shards JOIN pg_dist_node USING (nodename, nodeport) WHERE noderole = 'primary' AND starts_with(table_name::text, 's1'::text));
|
|
|
|
SELECT nodeid AS s1_new_nodeid, quote_literal(nodename) AS s1_new_nodename, nodeport AS s1_new_nodeport
|
|
FROM get_non_coord_candidate_node_for_schema_move('s1') \gset
|
|
|
|
SELECT citus_schema_move('s1', :s1_new_nodeid, 'block_writes');
|
|
|
|
SELECT (:s1_new_nodename, :s1_new_nodeport) = ALL(SELECT nodename, nodeport FROM citus_shards JOIN pg_dist_node USING (nodename, nodeport) WHERE noderole = 'primary' AND starts_with(table_name::text, 's1'::text));
|
|
|
|
-- move the schema to the coordinator
|
|
|
|
SELECT citus_schema_move('s1', 'localhost', :master_port, 'block_writes');
|
|
|
|
SELECT ('localhost', :master_port) = ALL(SELECT nodename, nodeport FROM citus_shards JOIN pg_dist_node USING (nodename, nodeport) WHERE noderole = 'primary' AND starts_with(table_name::text, 's1'::text));
|
|
|
|
-- move the schema away from the coordinator
|
|
|
|
SELECT nodeid AS s1_new_nodeid, quote_literal(nodename) AS s1_new_nodename, nodeport AS s1_new_nodeport
|
|
FROM get_non_coord_candidate_node_for_schema_move('s1') \gset
|
|
|
|
SELECT citus_schema_move('s1', :s1_new_nodename, :s1_new_nodeport, 'block_writes');
|
|
|
|
SELECT (:s1_new_nodename, :s1_new_nodeport) = ALL(SELECT nodename, nodeport FROM citus_shards JOIN pg_dist_node USING (nodename, nodeport) WHERE noderole = 'primary' AND starts_with(table_name::text, 's1'::text));
|
|
|
|
CREATE USER tenantuser superuser;
|
|
SET ROLE tenantuser;
|
|
|
|
CREATE SCHEMA s2;
|
|
CREATE TABLE s2.t1 (a int);
|
|
CREATE TABLE s2.t2 (a int);
|
|
|
|
CREATE USER regularuser;
|
|
SET ROLE regularuser;
|
|
|
|
-- throws an error as the user is not the owner of the schema
|
|
SELECT citus_schema_move('s2', 'dummy_node', 1234);
|
|
|
|
-- assign all tables to regularuser
|
|
RESET ROLE;
|
|
REASSIGN OWNED BY tenantuser TO regularuser;
|
|
|
|
GRANT USAGE ON SCHEMA citus_schema_move TO regularuser;
|
|
|
|
SET ROLE regularuser;
|
|
|
|
SELECT nodeid AS s2_new_nodeid, quote_literal(nodename) AS s2_new_nodename, nodeport AS s2_new_nodeport
|
|
FROM get_non_coord_candidate_node_for_schema_move('s2') \gset
|
|
|
|
SELECT citus_schema_move('s2', :s2_new_nodename, :s2_new_nodeport, 'force_logical');
|
|
|
|
SELECT (:s2_new_nodename, :s2_new_nodeport) = ALL(SELECT nodename, nodeport FROM citus_shards JOIN pg_dist_node USING (nodename, nodeport) WHERE noderole = 'primary' AND starts_with(table_name::text, 's2'::text));
|
|
|
|
SET client_min_messages TO WARNING;
|
|
DROP SCHEMA s2 CASCADE;
|
|
SET client_min_messages TO NOTICE;
|
|
|
|
RESET ROLE;
|
|
|
|
REVOKE USAGE ON SCHEMA citus_schema_move FROM regularuser;
|
|
DROP ROLE regularuser, tenantuser;
|
|
|
|
RESET citus.enable_schema_based_sharding;
|
|
|
|
SET client_min_messages TO WARNING;
|
|
DROP SCHEMA citus_schema_move, s1 CASCADE;
|