diff --git a/src/backend/distributed/operations/repair_shards.c b/src/backend/distributed/operations/repair_shards.c index 86f1490d4..ff46fa563 100644 --- a/src/backend/distributed/operations/repair_shards.c +++ b/src/backend/distributed/operations/repair_shards.c @@ -96,6 +96,7 @@ static void EnsureEnoughDiskSpaceForShardMove(List *colocatedShardList, char *targetNodeName, uint32 targetNodePort); + /* declarations for dynamic loading */ PG_FUNCTION_INFO_V1(citus_copy_shard_placement); PG_FUNCTION_INFO_V1(master_copy_shard_placement); @@ -299,6 +300,7 @@ citus_move_shard_placement(PG_FUNCTION_ARGS) Oid relationId = RelationIdForShard(shardId); ErrorIfMoveCitusLocalTable(relationId); + ErrorIfTargetNodeIsNotSafeToMove(targetNodeName, targetNodePort); ShardInterval *shardInterval = LoadShardInterval(shardId); Oid distributedTableId = shardInterval->relationId; @@ -421,6 +423,51 @@ EnsureEnoughDiskSpaceForShardMove(List *colocatedShardList, } +/* + * ErrorIfTargetNodeIsNotSafeToMove throws error if the target node is not + * eligible for moving shards. + */ +void +ErrorIfTargetNodeIsNotSafeToMove(const char *targetNodeName, int targetNodePort) +{ + WorkerNode *workerNode = FindWorkerNode(targetNodeName, targetNodePort); + if (workerNode == NULL) + { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("Moving shards to a non-existing node is not supported"), + errhint( + "Add the target node via SELECT citus_add_node('%s', %d);", + targetNodeName, targetNodePort))); + } + + if (!workerNode->isActive) + { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("Moving shards to a non-active node is not supported"), + errhint( + "Activate the target node via SELECT citus_activate_node('%s', %d);", + targetNodeName, targetNodePort))); + } + + if (!workerNode->shouldHaveShards) + { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("Moving shards to a node that shouldn't have a shard is " + "not supported"), + errhint("Allow shards on the target node via " + "SELECT * FROM citus_set_node_property('%s', %d, 'shouldhaveshards', true);", + targetNodeName, targetNodePort))); + } + + if (!NodeIsPrimary(workerNode)) + { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("Moving shards to a secondary (e.g., replica) node is " + "not supported"))); + } +} + + /* * master_move_shard_placement is a wrapper around citus_move_shard_placement. */ diff --git a/src/include/distributed/coordinator_protocol.h b/src/include/distributed/coordinator_protocol.h index 779c05adb..ab5490fca 100644 --- a/src/include/distributed/coordinator_protocol.h +++ b/src/include/distributed/coordinator_protocol.h @@ -278,6 +278,8 @@ extern ShardPlacement * SearchShardPlacementInList(List *shardPlacementList, extern ShardPlacement * SearchShardPlacementInListOrError(List *shardPlacementList, const char *nodeName, uint32 nodePort); +extern void ErrorIfTargetNodeIsNotSafeToMove(const char *targetNodeName, int + targetNodePort); extern void ErrorIfMoveCitusLocalTable(Oid relationId); extern char LookupShardTransferMode(Oid shardReplicationModeOid); extern void BlockWritesToShardList(List *shardList); diff --git a/src/test/regress/expected/shard_rebalancer.out b/src/test/regress/expected/shard_rebalancer.out index e43983a7d..c6d68a023 100644 --- a/src/test/regress/expected/shard_rebalancer.out +++ b/src/test/regress/expected/shard_rebalancer.out @@ -894,6 +894,47 @@ SELECT create_distributed_table('colocated_rebalance_test', 'id'); (1 row) +-- make sure that we do not allow shards on target nodes +-- that are not eligable to move shards +-- Try to move shards to a non-existing node +SELECT master_move_shard_placement(shardid, 'localhost', :worker_2_port, 'localhost', 10000, 'block_writes') +FROM pg_dist_shard_placement +WHERE nodeport = :worker_2_port; +ERROR: Moving shards to a non-existing node is not supported +HINT: Add the target node via SELECT citus_add_node('localhost', 10000); +-- Try to move shards to a node where shards are not allowed +SELECT * from master_set_node_property('localhost', :worker_1_port, 'shouldhaveshards', false); + master_set_node_property +--------------------------------------------------------------------- + +(1 row) + +SELECT master_move_shard_placement(shardid, 'localhost', :worker_2_port, 'localhost', :worker_1_port, 'block_writes') +FROM pg_dist_shard_placement +WHERE nodeport = :worker_2_port; +ERROR: Moving shards to a node that shouldn't have a shard is not supported +HINT: Allow shards on the target node via SELECT * FROM citus_set_node_property('localhost', 57637, 'shouldhaveshards', true); +SELECT * from master_set_node_property('localhost', :worker_1_port, 'shouldhaveshards', true); + master_set_node_property +--------------------------------------------------------------------- + +(1 row) + +-- Try to move shards to a non-active node +UPDATE pg_dist_node SET isactive = false WHERE nodeport = :worker_1_port; +SELECT master_move_shard_placement(shardid, 'localhost', :worker_2_port, 'localhost', :worker_1_port, 'block_writes') +FROM pg_dist_shard_placement +WHERE nodeport = :worker_2_port; +ERROR: Moving shards to a non-active node is not supported +HINT: Activate the target node via SELECT citus_activate_node('localhost', 57637); +UPDATE pg_dist_node SET isactive = true WHERE nodeport = :worker_1_port; +-- Try to move shards to a secondary node +UPDATE pg_dist_node SET noderole = 'secondary' WHERE nodeport = :worker_1_port; +SELECT master_move_shard_placement(shardid, 'localhost', :worker_2_port, 'localhost', :worker_1_port, 'block_writes') +FROM pg_dist_shard_placement +WHERE nodeport = :worker_2_port; +ERROR: Moving shards to a secondary (e.g., replica) node is not supported +UPDATE pg_dist_node SET noderole = 'primary' WHERE nodeport = :worker_1_port; -- Move all shards to worker1 SELECT master_move_shard_placement(shardid, 'localhost', :worker_2_port, 'localhost', :worker_1_port, 'block_writes') FROM pg_dist_shard_placement @@ -1560,6 +1601,7 @@ CREATE OR REPLACE FUNCTION capacity_high_worker_2(nodeidarg int) (CASE WHEN nodeport = 57638 THEN 1000 ELSE 1 END)::real FROM pg_dist_node where nodeid = nodeidarg $$ LANGUAGE sql; +\set VERBOSITY terse SELECT citus_add_rebalance_strategy( 'capacity_high_worker_2', 'citus_shard_cost_1', @@ -1689,7 +1731,6 @@ SELECT * FROM master_drain_node('localhost', :worker_2_port, rebalance_strategy ERROR: could not find rebalance strategy with name non_existing SELECT citus_set_default_rebalance_strategy('non_existing'); ERROR: strategy with specified name does not exist -CONTEXT: PL/pgSQL function citus_set_default_rebalance_strategy(text) line 5 at RAISE UPDATE pg_dist_rebalance_strategy SET default_strategy=false; SELECT * FROM get_rebalance_table_shards_plan('tab'); ERROR: no rebalance_strategy was provided, but there is also no default strategy set @@ -1726,13 +1767,6 @@ SELECT citus_add_rebalance_strategy( 0 ); ERROR: signature for shard_cost_function is incorrect -DETAIL: number of arguments of shard_cost_no_arguments should be 1, not 0 -CONTEXT: SQL statement "SELECT citus_validate_rebalance_strategy_functions( - NEW.shard_cost_function, - NEW.node_capacity_function, - NEW.shard_allowed_on_node_function)" -PL/pgSQL function citus_internal.pg_dist_rebalance_strategy_trigger_func() line 5 at PERFORM -SQL function "citus_add_rebalance_strategy" statement 1 SELECT citus_add_rebalance_strategy( 'insert_should_fail', 'shard_cost_bad_arg_type', @@ -1741,13 +1775,6 @@ SELECT citus_add_rebalance_strategy( 0 ); ERROR: signature for shard_cost_function is incorrect -DETAIL: argument type of shard_cost_bad_arg_type should be bigint -CONTEXT: SQL statement "SELECT citus_validate_rebalance_strategy_functions( - NEW.shard_cost_function, - NEW.node_capacity_function, - NEW.shard_allowed_on_node_function)" -PL/pgSQL function citus_internal.pg_dist_rebalance_strategy_trigger_func() line 5 at PERFORM -SQL function "citus_add_rebalance_strategy" statement 1 SELECT citus_add_rebalance_strategy( 'insert_should_fail', 'shard_cost_bad_return_type', @@ -1756,13 +1783,6 @@ SELECT citus_add_rebalance_strategy( 0 ); ERROR: signature for shard_cost_function is incorrect -DETAIL: return type of shard_cost_bad_return_type should be real -CONTEXT: SQL statement "SELECT citus_validate_rebalance_strategy_functions( - NEW.shard_cost_function, - NEW.node_capacity_function, - NEW.shard_allowed_on_node_function)" -PL/pgSQL function citus_internal.pg_dist_rebalance_strategy_trigger_func() line 5 at PERFORM -SQL function "citus_add_rebalance_strategy" statement 1 SELECT citus_add_rebalance_strategy( 'insert_should_fail', 0, @@ -1771,12 +1791,6 @@ SELECT citus_add_rebalance_strategy( 0 ); ERROR: cache lookup failed for shard_cost_function with oid 0 -CONTEXT: SQL statement "SELECT citus_validate_rebalance_strategy_functions( - NEW.shard_cost_function, - NEW.node_capacity_function, - NEW.shard_allowed_on_node_function)" -PL/pgSQL function citus_internal.pg_dist_rebalance_strategy_trigger_func() line 5 at PERFORM -SQL function "citus_add_rebalance_strategy" statement 1 SELECT citus_add_rebalance_strategy( 'insert_should_fail', 'citus_shard_cost_1', @@ -1785,13 +1799,6 @@ SELECT citus_add_rebalance_strategy( 0 ); ERROR: signature for node_capacity_function is incorrect -DETAIL: number of arguments of node_capacity_no_arguments should be 1, not 0 -CONTEXT: SQL statement "SELECT citus_validate_rebalance_strategy_functions( - NEW.shard_cost_function, - NEW.node_capacity_function, - NEW.shard_allowed_on_node_function)" -PL/pgSQL function citus_internal.pg_dist_rebalance_strategy_trigger_func() line 5 at PERFORM -SQL function "citus_add_rebalance_strategy" statement 1 SELECT citus_add_rebalance_strategy( 'insert_should_fail', 'citus_shard_cost_1', @@ -1800,13 +1807,6 @@ SELECT citus_add_rebalance_strategy( 0 ); ERROR: signature for node_capacity_function is incorrect -DETAIL: argument type of node_capacity_bad_arg_type should be int -CONTEXT: SQL statement "SELECT citus_validate_rebalance_strategy_functions( - NEW.shard_cost_function, - NEW.node_capacity_function, - NEW.shard_allowed_on_node_function)" -PL/pgSQL function citus_internal.pg_dist_rebalance_strategy_trigger_func() line 5 at PERFORM -SQL function "citus_add_rebalance_strategy" statement 1 SELECT citus_add_rebalance_strategy( 'insert_should_fail', 'citus_shard_cost_1', @@ -1815,13 +1815,6 @@ SELECT citus_add_rebalance_strategy( 0 ); ERROR: signature for node_capacity_function is incorrect -DETAIL: return type of node_capacity_bad_return_type should be real -CONTEXT: SQL statement "SELECT citus_validate_rebalance_strategy_functions( - NEW.shard_cost_function, - NEW.node_capacity_function, - NEW.shard_allowed_on_node_function)" -PL/pgSQL function citus_internal.pg_dist_rebalance_strategy_trigger_func() line 5 at PERFORM -SQL function "citus_add_rebalance_strategy" statement 1 SELECT citus_add_rebalance_strategy( 'insert_should_fail', 'citus_shard_cost_1', @@ -1830,12 +1823,6 @@ SELECT citus_add_rebalance_strategy( 0 ); ERROR: cache lookup failed for node_capacity_function with oid 0 -CONTEXT: SQL statement "SELECT citus_validate_rebalance_strategy_functions( - NEW.shard_cost_function, - NEW.node_capacity_function, - NEW.shard_allowed_on_node_function)" -PL/pgSQL function citus_internal.pg_dist_rebalance_strategy_trigger_func() line 5 at PERFORM -SQL function "citus_add_rebalance_strategy" statement 1 SELECT citus_add_rebalance_strategy( 'insert_should_fail', 'citus_shard_cost_1', @@ -1844,13 +1831,6 @@ SELECT citus_add_rebalance_strategy( 0 ); ERROR: signature for shard_allowed_on_node_function is incorrect -DETAIL: number of arguments of shard_allowed_on_node_no_arguments should be 2, not 0 -CONTEXT: SQL statement "SELECT citus_validate_rebalance_strategy_functions( - NEW.shard_cost_function, - NEW.node_capacity_function, - NEW.shard_allowed_on_node_function)" -PL/pgSQL function citus_internal.pg_dist_rebalance_strategy_trigger_func() line 5 at PERFORM -SQL function "citus_add_rebalance_strategy" statement 1 SELECT citus_add_rebalance_strategy( 'insert_should_fail', 'citus_shard_cost_1', @@ -1859,13 +1839,6 @@ SELECT citus_add_rebalance_strategy( 0 ); ERROR: signature for shard_allowed_on_node_function is incorrect -DETAIL: type of first argument of shard_allowed_on_node_bad_arg1 should be bigint -CONTEXT: SQL statement "SELECT citus_validate_rebalance_strategy_functions( - NEW.shard_cost_function, - NEW.node_capacity_function, - NEW.shard_allowed_on_node_function)" -PL/pgSQL function citus_internal.pg_dist_rebalance_strategy_trigger_func() line 5 at PERFORM -SQL function "citus_add_rebalance_strategy" statement 1 SELECT citus_add_rebalance_strategy( 'insert_should_fail', 'citus_shard_cost_1', @@ -1874,13 +1847,6 @@ SELECT citus_add_rebalance_strategy( 0 ); ERROR: signature for shard_allowed_on_node_function is incorrect -DETAIL: type of second argument of shard_allowed_on_node_bad_arg2 should be int -CONTEXT: SQL statement "SELECT citus_validate_rebalance_strategy_functions( - NEW.shard_cost_function, - NEW.node_capacity_function, - NEW.shard_allowed_on_node_function)" -PL/pgSQL function citus_internal.pg_dist_rebalance_strategy_trigger_func() line 5 at PERFORM -SQL function "citus_add_rebalance_strategy" statement 1 SELECT citus_add_rebalance_strategy( 'insert_should_fail', 'citus_shard_cost_1', @@ -1889,13 +1855,6 @@ SELECT citus_add_rebalance_strategy( 0 ); ERROR: signature for shard_allowed_on_node_function is incorrect -DETAIL: return type of shard_allowed_on_node_bad_return_type should be boolean -CONTEXT: SQL statement "SELECT citus_validate_rebalance_strategy_functions( - NEW.shard_cost_function, - NEW.node_capacity_function, - NEW.shard_allowed_on_node_function)" -PL/pgSQL function citus_internal.pg_dist_rebalance_strategy_trigger_func() line 5 at PERFORM -SQL function "citus_add_rebalance_strategy" statement 1 SELECT citus_add_rebalance_strategy( 'insert_should_fail', 'citus_shard_cost_1', @@ -1904,12 +1863,6 @@ SELECT citus_add_rebalance_strategy( 0 ); ERROR: cache lookup failed for shard_allowed_on_node_function with oid 0 -CONTEXT: SQL statement "SELECT citus_validate_rebalance_strategy_functions( - NEW.shard_cost_function, - NEW.node_capacity_function, - NEW.shard_allowed_on_node_function)" -PL/pgSQL function citus_internal.pg_dist_rebalance_strategy_trigger_func() line 5 at PERFORM -SQL function "citus_add_rebalance_strategy" statement 1 -- Confirm that manual insert/update has the same checks INSERT INTO pg_catalog.pg_dist_rebalance_strategy( @@ -1926,20 +1879,8 @@ INSERT INTO 0 ); ERROR: signature for shard_cost_function is incorrect -DETAIL: number of arguments of shard_cost_no_arguments should be 1, not 0 -CONTEXT: SQL statement "SELECT citus_validate_rebalance_strategy_functions( - NEW.shard_cost_function, - NEW.node_capacity_function, - NEW.shard_allowed_on_node_function)" -PL/pgSQL function citus_internal.pg_dist_rebalance_strategy_trigger_func() line 5 at PERFORM UPDATE pg_dist_rebalance_strategy SET shard_cost_function='shard_cost_no_arguments' WHERE name='by_disk_size'; ERROR: signature for shard_cost_function is incorrect -DETAIL: number of arguments of shard_cost_no_arguments should be 1, not 0 -CONTEXT: SQL statement "SELECT citus_validate_rebalance_strategy_functions( - NEW.shard_cost_function, - NEW.node_capacity_function, - NEW.shard_allowed_on_node_function)" -PL/pgSQL function citus_internal.pg_dist_rebalance_strategy_trigger_func() line 5 at PERFORM -- Confirm that only a single default strategy can exist INSERT INTO pg_catalog.pg_dist_rebalance_strategy( @@ -1958,10 +1899,8 @@ INSERT INTO 0 ); ERROR: there cannot be two default strategies -CONTEXT: PL/pgSQL function citus_internal.pg_dist_rebalance_strategy_trigger_func() line 19 at RAISE UPDATE pg_dist_rebalance_strategy SET default_strategy=true WHERE name='by_disk_size'; ERROR: there cannot be two default strategies -CONTEXT: PL/pgSQL function citus_internal.pg_dist_rebalance_strategy_trigger_func() line 19 at RAISE -- ensure the trigger allows updating the default strategy UPDATE pg_dist_rebalance_strategy SET default_strategy=true WHERE name='by_shard_count'; -- Confirm that default strategy should be higher than minimum strategy @@ -1974,8 +1913,6 @@ SELECT citus_add_rebalance_strategy( 0.1 ); ERROR: default_threshold cannot be smaller than minimum_threshold -CONTEXT: PL/pgSQL function citus_internal.pg_dist_rebalance_strategy_trigger_func() line 10 at RAISE -SQL function "citus_add_rebalance_strategy" statement 1 -- Make it a data node again SELECT * from master_set_node_property('localhost', :worker_2_port, 'shouldhaveshards', true); master_set_node_property diff --git a/src/test/regress/sql/shard_rebalancer.sql b/src/test/regress/sql/shard_rebalancer.sql index b15c1ea5b..dd6110f68 100644 --- a/src/test/regress/sql/shard_rebalancer.sql +++ b/src/test/regress/sql/shard_rebalancer.sql @@ -579,12 +579,41 @@ CREATE TABLE colocated_rebalance_test(id integer); CREATE TABLE colocated_rebalance_test2(id integer); SELECT create_distributed_table('colocated_rebalance_test', 'id'); + +-- make sure that we do not allow shards on target nodes +-- that are not eligable to move shards + +-- Try to move shards to a non-existing node +SELECT master_move_shard_placement(shardid, 'localhost', :worker_2_port, 'localhost', 10000, 'block_writes') +FROM pg_dist_shard_placement +WHERE nodeport = :worker_2_port; + +-- Try to move shards to a node where shards are not allowed +SELECT * from master_set_node_property('localhost', :worker_1_port, 'shouldhaveshards', false); +SELECT master_move_shard_placement(shardid, 'localhost', :worker_2_port, 'localhost', :worker_1_port, 'block_writes') +FROM pg_dist_shard_placement +WHERE nodeport = :worker_2_port; +SELECT * from master_set_node_property('localhost', :worker_1_port, 'shouldhaveshards', true); + +-- Try to move shards to a non-active node +UPDATE pg_dist_node SET isactive = false WHERE nodeport = :worker_1_port; +SELECT master_move_shard_placement(shardid, 'localhost', :worker_2_port, 'localhost', :worker_1_port, 'block_writes') +FROM pg_dist_shard_placement +WHERE nodeport = :worker_2_port; +UPDATE pg_dist_node SET isactive = true WHERE nodeport = :worker_1_port; + +-- Try to move shards to a secondary node +UPDATE pg_dist_node SET noderole = 'secondary' WHERE nodeport = :worker_1_port; +SELECT master_move_shard_placement(shardid, 'localhost', :worker_2_port, 'localhost', :worker_1_port, 'block_writes') +FROM pg_dist_shard_placement +WHERE nodeport = :worker_2_port; +UPDATE pg_dist_node SET noderole = 'primary' WHERE nodeport = :worker_1_port; + -- Move all shards to worker1 SELECT master_move_shard_placement(shardid, 'localhost', :worker_2_port, 'localhost', :worker_1_port, 'block_writes') FROM pg_dist_shard_placement WHERE nodeport = :worker_2_port; - SELECT create_distributed_table('colocated_rebalance_test2', 'id'); -- Confirm all shards for both tables are on worker1 @@ -840,6 +869,8 @@ CREATE OR REPLACE FUNCTION capacity_high_worker_2(nodeidarg int) FROM pg_dist_node where nodeid = nodeidarg $$ LANGUAGE sql; +\set VERBOSITY terse + SELECT citus_add_rebalance_strategy( 'capacity_high_worker_2', 'citus_shard_cost_1',