diff --git a/src/backend/distributed/operations/shard_rebalancer.c b/src/backend/distributed/operations/shard_rebalancer.c index 8fd3860ac..bdf5fa0e5 100644 --- a/src/backend/distributed/operations/shard_rebalancer.c +++ b/src/backend/distributed/operations/shard_rebalancer.c @@ -70,6 +70,7 @@ typedef struct RebalanceOptions int32 maxShardMoves; ArrayType *excludedShardArray; bool drainOnly; + float4 improvementThreshold; Form_pg_dist_rebalance_strategy rebalanceStrategy; } RebalanceOptions; @@ -80,14 +81,54 @@ typedef struct RebalanceOptions */ typedef struct RebalanceState { + /* + * placementsHash contains the current state of all shard placements, it + * is initialized from pg_dist_placement and is then modified based on the + * found shard moves. + */ HTAB *placementsHash; + + /* + * placementUpdateList contains all of the updates that have been done to + * reach the current state of placementsHash. + */ List *placementUpdateList; RebalancePlanFunctions *functions; + + /* + * fillStateListDesc contains all NodeFillStates ordered from full nodes to + * empty nodes. + */ List *fillStateListDesc; + + /* + * fillStateListAsc contains all NodeFillStates ordered from empty nodes to + * full nodes. + */ List *fillStateListAsc; + + /* + * disallowedPlacementList contains all placements that currently exist, + * but are not allowed according to the shardAllowedOnNode function. + */ List *disallowedPlacementList; + + /* + * totalCost is the cost of all the shards in the cluster added together. + */ float4 totalCost; + + /* + * totalCapacity is the capacity of all the nodes in the cluster added + * together. + */ float4 totalCapacity; + + /* + * ignoredMoves is the number of moves that were ignored. This is used to + * limit the amount of loglines we send. + */ + int64 ignoredMoves; } RebalanceState; @@ -128,6 +169,7 @@ static RebalanceState * InitRebalanceState(List *workerNodeList, List *shardPlac static void MoveShardsAwayFromDisallowedNodes(RebalanceState *state); static bool FindAndMoveShardCost(float4 utilizationLowerBound, float4 utilizationUpperBound, + float4 improvementThreshold, RebalanceState *state); static NodeFillState * FindAllowedTargetFillState(RebalanceState *state, uint64 shardId); static void MoveShardCost(NodeFillState *sourceFillState, NodeFillState *targetFillState, @@ -163,6 +205,8 @@ PG_FUNCTION_INFO_V1(citus_shard_cost_by_disk_size); PG_FUNCTION_INFO_V1(citus_validate_rebalance_strategy_functions); PG_FUNCTION_INFO_V1(pg_dist_rebalance_strategy_enterprise_check); +int MaxRebalancerLoggedIgnoredMoves = 5; + #ifdef USE_ASSERT_CHECKING @@ -369,6 +413,7 @@ GetRebalanceSteps(RebalanceOptions *options) options->threshold, options->maxShardMoves, options->drainOnly, + options->improvementThreshold, &rebalancePlanFunctions); } @@ -711,6 +756,7 @@ rebalance_table_shards(PG_FUNCTION_ARGS) .excludedShardArray = PG_GETARG_ARRAYTYPE_P(3), .drainOnly = PG_GETARG_BOOL(5), .rebalanceStrategy = strategy, + .improvementThreshold = strategy->improvementThreshold, }; Oid shardTransferModeOid = PG_GETARG_OID(4); RebalanceTableShards(&options, shardTransferModeOid); @@ -911,6 +957,8 @@ get_rebalance_table_shards_plan(PG_FUNCTION_ARGS) .excludedShardArray = PG_GETARG_ARRAYTYPE_P(3), .drainOnly = PG_GETARG_BOOL(4), .rebalanceStrategy = strategy, + .improvementThreshold = PG_GETARG_FLOAT4_OR_DEFAULT( + 6, strategy->improvementThreshold), }; @@ -1228,6 +1276,7 @@ RebalancePlacementUpdates(List *workerNodeList, List *shardPlacementListList, double threshold, int32 maxShardMoves, bool drainOnly, + float4 improvementThreshold, RebalancePlanFunctions *functions) { List *rebalanceStates = NIL; @@ -1264,9 +1313,11 @@ RebalancePlacementUpdates(List *workerNodeList, List *shardPlacementListList, while (list_length(state->placementUpdateList) < maxShardMoves && moreMovesAvailable) { - moreMovesAvailable = FindAndMoveShardCost(utilizationLowerBound, - utilizationUpperBound, - state); + moreMovesAvailable = FindAndMoveShardCost( + utilizationLowerBound, + utilizationUpperBound, + improvementThreshold, + state); } placementUpdateList = state->placementUpdateList; @@ -1286,6 +1337,36 @@ RebalancePlacementUpdates(List *workerNodeList, List *shardPlacementListList, hash_destroy(state->placementsHash); } + + if (state->ignoredMoves > 0) + { + if (MaxRebalancerLoggedIgnoredMoves == -1 || + state->ignoredMoves <= MaxRebalancerLoggedIgnoredMoves) + { + ereport(NOTICE, ( + errmsg( + "Ignored %ld moves, all of which are shown in notices above", + state->ignoredMoves + ), + errhint( + "If you do want these moves to happen, try changing improvement_threshold to a lower value than what it is now (%g).", + improvementThreshold) + )); + } + else + { + ereport(NOTICE, ( + errmsg( + "Ignored %ld moves, %d of which are shown in notices above", + state->ignoredMoves, + MaxRebalancerLoggedIgnoredMoves + ), + errhint( + "If you do want these moves to happen, try changing improvement_threshold to a lower value than what it is now (%g).", + improvementThreshold) + )); + } + } return placementUpdateList; } @@ -1314,8 +1395,8 @@ InitRebalanceState(List *workerNodeList, List *shardPlacementList, fillState->capacity = functions->nodeCapacity(workerNode, functions->context); /* - * Set the utilization here although the totalCost is not set yet. This is - * important to set the utilization to INFINITY when the capacity is 0. + * Set the utilization here although the totalCost is not set yet. This + * is needed to set the utilization to INFINITY when the capacity is 0. */ fillState->utilization = CalculateUtilization(fillState->totalCost, fillState->capacity); @@ -1648,13 +1729,39 @@ MoveShardCost(NodeFillState *sourceFillState, * current state and returns a list with a new move appended that improves the * balance of shards. The algorithm is greedy and will use the first new move * that improves the balance. It finds nodes by trying to move a shard from the - * fullest node to the emptiest node. If no moves are possible it will try the - * second emptiest node until it tried all of them. Then it wil try the second - * fullest node. If it was able to find a move it will return true and false if - * it couldn't. + * most utilized node (highest utilization) to the emptiest node (lowest + * utilization). If no moves are possible it will try the second emptiest node + * until it tried all of them. Then it wil try the second fullest node. If it + * was able to find a move it will return true and false if it couldn't. + * + * This algorithm won't necessarily result in the best possible balance. Getting + * the best balance is an NP problem, so it's not feasible to go for the best + * balance. This algorithm was chosen because of the following reasons: + * 1. Literature research showed that similar problems would get within 2X of + * the optimal balance with a greedy algoritm. + * 2. Every move will always improve the balance. So if the user stops a + * rebalance midway through, they will never be in a worse situation than + * before. + * 3. It's pretty easy to reason about. + * 4. It's simple to implement. + * + * utilizationLowerBound and utilizationUpperBound are used to indicate what + * the target utilization range of all nodes is. If they are within this range, + * then balance is good enough. If all nodes are in this range then the cluster + * is considered balanced and no more moves are done. This is mostly useful for + * the by_disk_size rebalance strategy. If we wouldn't have this then the + * rebalancer could become flappy in certain cases. + * + * improvementThreshold is a threshold that can be used to ignore moves when + * they only improve the balance a little relative to the cost of the shard. + * Again this is mostly useful for the by_disk_size rebalance strategy. + * Without this threshold the rebalancer would move a shard of 1TB when this + * move only improves the cluster by 10GB. */ static bool -FindAndMoveShardCost(float4 utilizationLowerBound, float4 utilizationUpperBound, +FindAndMoveShardCost(float4 utilizationLowerBound, + float4 utilizationUpperBound, + float4 improvementThreshold, RebalanceState *state) { NodeFillState *sourceFillState = NULL; @@ -1727,11 +1834,24 @@ FindAndMoveShardCost(float4 utilizationLowerBound, float4 utilizationUpperBound, } /* - * Ensure that the cost distrubition is actually better - * after the move, i.e. the new highest utilization of - * source and target is lower than the previous highest, or - * the highest utilization is the same, but the lowest - * increased. + * If the target is still less utilized than the source, then + * this is clearly a good move. And if they are equally + * utilized too. + */ + if (newTargetUtilization <= newSourceUtilization) + { + MoveShardCost(sourceFillState, targetFillState, + shardCost, state); + return true; + } + + /* + * The target is now more utilized than the source. So we need + * to determine if the move is a net positive for the overall + * cost distribution. This means that the new highest + * utilization of source and target is lower than the previous + * highest, or the highest utilization is the same, but the + * lowest increased. */ if (newTargetUtilization > sourceFillState->utilization) { @@ -1752,6 +1872,58 @@ FindAndMoveShardCost(float4 utilizationLowerBound, float4 utilizationUpperBound, */ continue; } + + /* + * fmaxf and fminf here are only needed for cases when nodes + * have different capacities. If they are the same, then both + * arguments are equal. + */ + float4 utilizationImprovement = fmaxf( + sourceFillState->utilization - newTargetUtilization, + newSourceUtilization - targetFillState->utilization + ); + float4 utilizationAddedByShard = fminf( + newTargetUtilization - targetFillState->utilization, + sourceFillState->utilization - newSourceUtilization + ); + + /* + * If the shard causes a lot of utilization, but the + * improvement which is gained by moving it is small, then we + * ignore the move. Probably there are other shards that are + * better candidates, and in any case it's probably not worth + * the effort to move the this shard. + * + * One of the main cases this tries to avoid is the rebalancer + * moving a very large shard with the "by_disk_size" strategy + * when that only gives a small benefit in data distribution. + */ + float4 normalizedUtilizationImprovement = utilizationImprovement / + utilizationAddedByShard; + if (normalizedUtilizationImprovement < improvementThreshold) + { + state->ignoredMoves++; + if (MaxRebalancerLoggedIgnoredMoves == -1 || + state->ignoredMoves <= MaxRebalancerLoggedIgnoredMoves) + { + ereport(NOTICE, ( + errmsg( + "Ignoring move of shard %ld from %s:%d to %s:%d, because the move only brings a small improvement relative to the shard its size", + shardCost->shardId, + sourceFillState->node->workerName, + sourceFillState->node->workerPort, + targetFillState->node->workerName, + targetFillState->node->workerPort + ), + errdetail( + "The balance improvement of %g is lower than the improvement_threshold of %g", + normalizedUtilizationImprovement, + improvementThreshold + ) + )); + } + continue; + } MoveShardCost(sourceFillState, targetFillState, shardCost, state); return true; diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index 512aa1add..f5d499036 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -68,6 +68,7 @@ #include "distributed/time_constants.h" #include "distributed/query_stats.h" #include "distributed/remote_commands.h" +#include "distributed/shard_rebalancer.h" #include "distributed/shared_library_init.h" #include "distributed/statistics_collection.h" #include "distributed/subplan_execution.h" @@ -1190,6 +1191,16 @@ RegisterCitusConfigVariables(void) GUC_UNIT_KB | GUC_STANDARD, NULL, NULL, NULL); + DefineCustomIntVariable( + "citus.max_rebalancer_logged_ignored_moves", + gettext_noop("Sets the maximum number of ignored moves the rebalance logs"), + NULL, + &MaxRebalancerLoggedIgnoredMoves, + 5, -1, INT_MAX, + PGC_USERSET, + GUC_NO_SHOW_ALL, + NULL, NULL, NULL); + DefineCustomIntVariable( "citus.max_shared_pool_size", gettext_noop("Sets the maximum number of connections allowed per worker node " diff --git a/src/backend/distributed/sql/citus--10.0-3--10.1-1.sql b/src/backend/distributed/sql/citus--10.0-3--10.1-1.sql index c85c81e3f..f98e005cc 100644 --- a/src/backend/distributed/sql/citus--10.0-3--10.1-1.sql +++ b/src/backend/distributed/sql/citus--10.0-3--10.1-1.sql @@ -5,5 +5,11 @@ #include "udfs/worker_partitioned_relation_total_size/10.1-1.sql" #include "udfs/worker_partitioned_relation_size/10.1-1.sql" #include "udfs/worker_partitioned_table_size/10.1-1.sql" +#include "udfs/citus_prepare_pg_upgrade/10.1-1.sql" #include "udfs/citus_finish_pg_upgrade/10.1-1.sql" #include "udfs/citus_local_disk_space_stats/10.1-1.sql" +#include "udfs/get_rebalance_table_shards_plan/10.1-1.sql" +#include "udfs/citus_add_rebalance_strategy/10.1-1.sql" + +ALTER TABLE pg_catalog.pg_dist_rebalance_strategy ADD COLUMN improvement_threshold float4 NOT NULL default 0; +UPDATE pg_catalog.pg_dist_rebalance_strategy SET improvement_threshold = 0.5 WHERE name = 'by_disk_size'; diff --git a/src/backend/distributed/sql/downgrades/citus--10.1-1--10.0-3.sql b/src/backend/distributed/sql/downgrades/citus--10.1-1--10.0-3.sql index 765cf9cbe..c328eba43 100644 --- a/src/backend/distributed/sql/downgrades/citus--10.1-1--10.0-3.sql +++ b/src/backend/distributed/sql/downgrades/citus--10.1-1--10.0-3.sql @@ -21,4 +21,13 @@ DROP FUNCTION pg_catalog.worker_partitioned_relation_size(regclass); DROP FUNCTION pg_catalog.worker_partitioned_table_size(regclass); DROP FUNCTION pg_catalog.citus_local_disk_space_stats(); +#include "../udfs/citus_prepare_pg_upgrade/9.5-1.sql" #include "../udfs/citus_finish_pg_upgrade/10.0-1.sql" +#include "../udfs/get_rebalance_table_shards_plan/9.2-1.sql" + +-- the migration for citus_add_rebalance_strategy from 9.2-1 was the first one, +-- so it doesn't have a DROP. This is why we DROP manually here. +DROP FUNCTION pg_catalog.citus_add_rebalance_strategy; +#include "../udfs/citus_add_rebalance_strategy/9.2-1.sql" + +ALTER TABLE pg_catalog.pg_dist_rebalance_strategy DROP COLUMN improvement_threshold; diff --git a/src/backend/distributed/sql/udfs/citus_add_rebalance_strategy/10.1-1.sql b/src/backend/distributed/sql/udfs/citus_add_rebalance_strategy/10.1-1.sql new file mode 100644 index 000000000..4c5f8ba79 --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_add_rebalance_strategy/10.1-1.sql @@ -0,0 +1,30 @@ +DROP FUNCTION pg_catalog.citus_add_rebalance_strategy; +CREATE OR REPLACE FUNCTION pg_catalog.citus_add_rebalance_strategy( + name name, + shard_cost_function regproc, + node_capacity_function regproc, + shard_allowed_on_node_function regproc, + default_threshold float4, + minimum_threshold float4 DEFAULT 0, + improvement_threshold float4 DEFAULT 0 +) + RETURNS VOID AS $$ + INSERT INTO + pg_catalog.pg_dist_rebalance_strategy( + name, + shard_cost_function, + node_capacity_function, + shard_allowed_on_node_function, + default_threshold, + minimum_threshold + ) VALUES ( + name, + shard_cost_function, + node_capacity_function, + shard_allowed_on_node_function, + default_threshold, + minimum_threshold + ); + $$ LANGUAGE sql; +COMMENT ON FUNCTION pg_catalog.citus_add_rebalance_strategy(name,regproc,regproc,regproc,float4, float4, float4) + IS 'adds a new rebalance strategy which can be used when rebalancing shards or draining nodes'; diff --git a/src/backend/distributed/sql/udfs/citus_add_rebalance_strategy/latest.sql b/src/backend/distributed/sql/udfs/citus_add_rebalance_strategy/latest.sql index aeffc9c00..4c5f8ba79 100644 --- a/src/backend/distributed/sql/udfs/citus_add_rebalance_strategy/latest.sql +++ b/src/backend/distributed/sql/udfs/citus_add_rebalance_strategy/latest.sql @@ -1,10 +1,12 @@ +DROP FUNCTION pg_catalog.citus_add_rebalance_strategy; CREATE OR REPLACE FUNCTION pg_catalog.citus_add_rebalance_strategy( name name, shard_cost_function regproc, node_capacity_function regproc, shard_allowed_on_node_function regproc, default_threshold float4, - minimum_threshold float4 DEFAULT 0 + minimum_threshold float4 DEFAULT 0, + improvement_threshold float4 DEFAULT 0 ) RETURNS VOID AS $$ INSERT INTO @@ -24,5 +26,5 @@ CREATE OR REPLACE FUNCTION pg_catalog.citus_add_rebalance_strategy( minimum_threshold ); $$ LANGUAGE sql; -COMMENT ON FUNCTION pg_catalog.citus_add_rebalance_strategy(name,regproc,regproc,regproc,float4, float4) +COMMENT ON FUNCTION pg_catalog.citus_add_rebalance_strategy(name,regproc,regproc,regproc,float4, float4, float4) IS 'adds a new rebalance strategy which can be used when rebalancing shards or draining nodes'; diff --git a/src/backend/distributed/sql/udfs/citus_finish_pg_upgrade/10.1-1.sql b/src/backend/distributed/sql/udfs/citus_finish_pg_upgrade/10.1-1.sql index d936c958b..557ab832c 100644 --- a/src/backend/distributed/sql/udfs/citus_finish_pg_upgrade/10.1-1.sql +++ b/src/backend/distributed/sql/udfs/citus_finish_pg_upgrade/10.1-1.sql @@ -31,7 +31,8 @@ BEGIN node_capacity_function::regprocedure::regproc, shard_allowed_on_node_function::regprocedure::regproc, default_threshold, - minimum_threshold + minimum_threshold, + improvement_threshold FROM public.pg_dist_rebalance_strategy; ALTER TABLE pg_catalog.pg_dist_rebalance_strategy ENABLE TRIGGER pg_dist_rebalance_strategy_enterprise_check_trigger; diff --git a/src/backend/distributed/sql/udfs/citus_finish_pg_upgrade/latest.sql b/src/backend/distributed/sql/udfs/citus_finish_pg_upgrade/latest.sql index d936c958b..557ab832c 100644 --- a/src/backend/distributed/sql/udfs/citus_finish_pg_upgrade/latest.sql +++ b/src/backend/distributed/sql/udfs/citus_finish_pg_upgrade/latest.sql @@ -31,7 +31,8 @@ BEGIN node_capacity_function::regprocedure::regproc, shard_allowed_on_node_function::regprocedure::regproc, default_threshold, - minimum_threshold + minimum_threshold, + improvement_threshold FROM public.pg_dist_rebalance_strategy; ALTER TABLE pg_catalog.pg_dist_rebalance_strategy ENABLE TRIGGER pg_dist_rebalance_strategy_enterprise_check_trigger; diff --git a/src/backend/distributed/sql/udfs/citus_prepare_pg_upgrade/10.1-1.sql b/src/backend/distributed/sql/udfs/citus_prepare_pg_upgrade/10.1-1.sql new file mode 100644 index 000000000..8b4ce1479 --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_prepare_pg_upgrade/10.1-1.sql @@ -0,0 +1,54 @@ +CREATE OR REPLACE FUNCTION pg_catalog.citus_prepare_pg_upgrade() + RETURNS void + LANGUAGE plpgsql + SET search_path = pg_catalog + AS $cppu$ +BEGIN + -- + -- Drop existing backup tables + -- + DROP TABLE IF EXISTS public.pg_dist_partition; + DROP TABLE IF EXISTS public.pg_dist_shard; + DROP TABLE IF EXISTS public.pg_dist_placement; + DROP TABLE IF EXISTS public.pg_dist_node_metadata; + DROP TABLE IF EXISTS public.pg_dist_node; + DROP TABLE IF EXISTS public.pg_dist_local_group; + DROP TABLE IF EXISTS public.pg_dist_transaction; + DROP TABLE IF EXISTS public.pg_dist_colocation; + DROP TABLE IF EXISTS public.pg_dist_authinfo; + DROP TABLE IF EXISTS public.pg_dist_poolinfo; + DROP TABLE IF EXISTS public.pg_dist_rebalance_strategy; + + -- + -- backup citus catalog tables + -- + CREATE TABLE public.pg_dist_partition AS SELECT * FROM pg_catalog.pg_dist_partition; + CREATE TABLE public.pg_dist_shard AS SELECT * FROM pg_catalog.pg_dist_shard; + CREATE TABLE public.pg_dist_placement AS SELECT * FROM pg_catalog.pg_dist_placement; + CREATE TABLE public.pg_dist_node_metadata AS SELECT * FROM pg_catalog.pg_dist_node_metadata; + CREATE TABLE public.pg_dist_node AS SELECT * FROM pg_catalog.pg_dist_node; + CREATE TABLE public.pg_dist_local_group AS SELECT * FROM pg_catalog.pg_dist_local_group; + CREATE TABLE public.pg_dist_transaction AS SELECT * FROM pg_catalog.pg_dist_transaction; + CREATE TABLE public.pg_dist_colocation AS SELECT * FROM pg_catalog.pg_dist_colocation; + -- enterprise catalog tables + CREATE TABLE public.pg_dist_authinfo AS SELECT * FROM pg_catalog.pg_dist_authinfo; + CREATE TABLE public.pg_dist_poolinfo AS SELECT * FROM pg_catalog.pg_dist_poolinfo; + CREATE TABLE public.pg_dist_rebalance_strategy AS SELECT + name, + default_strategy, + shard_cost_function::regprocedure::text, + node_capacity_function::regprocedure::text, + shard_allowed_on_node_function::regprocedure::text, + default_threshold, + minimum_threshold, + improvement_threshold + FROM pg_catalog.pg_dist_rebalance_strategy; + + -- store upgrade stable identifiers on pg_dist_object catalog + UPDATE citus.pg_dist_object + SET (type, object_names, object_args) = (SELECT * FROM pg_identify_object_as_address(classid, objid, objsubid)); +END; +$cppu$; + +COMMENT ON FUNCTION pg_catalog.citus_prepare_pg_upgrade() + IS 'perform tasks to copy citus settings to a location that could later be restored after pg_upgrade is done'; diff --git a/src/backend/distributed/sql/udfs/citus_prepare_pg_upgrade/latest.sql b/src/backend/distributed/sql/udfs/citus_prepare_pg_upgrade/latest.sql index fc5dab6b1..8b4ce1479 100644 --- a/src/backend/distributed/sql/udfs/citus_prepare_pg_upgrade/latest.sql +++ b/src/backend/distributed/sql/udfs/citus_prepare_pg_upgrade/latest.sql @@ -40,7 +40,8 @@ BEGIN node_capacity_function::regprocedure::text, shard_allowed_on_node_function::regprocedure::text, default_threshold, - minimum_threshold + minimum_threshold, + improvement_threshold FROM pg_catalog.pg_dist_rebalance_strategy; -- store upgrade stable identifiers on pg_dist_object catalog diff --git a/src/backend/distributed/sql/udfs/get_rebalance_table_shards_plan/10.1-1.sql b/src/backend/distributed/sql/udfs/get_rebalance_table_shards_plan/10.1-1.sql new file mode 100644 index 000000000..8a875ce90 --- /dev/null +++ b/src/backend/distributed/sql/udfs/get_rebalance_table_shards_plan/10.1-1.sql @@ -0,0 +1,27 @@ +-- get_rebalance_table_shards_plan shows the actual events that will be performed +-- if a rebalance operation will be performed with the same arguments, which allows users +-- to understand the impact of the change overall availability of the application and +-- network trafic. +-- +DROP FUNCTION pg_catalog.get_rebalance_table_shards_plan; +CREATE OR REPLACE FUNCTION pg_catalog.get_rebalance_table_shards_plan( + relation regclass default NULL, + threshold float4 default NULL, + max_shard_moves int default 1000000, + excluded_shard_list bigint[] default '{}', + drain_only boolean default false, + rebalance_strategy name default NULL, + improvement_threshold float4 DEFAULT NULL + ) + RETURNS TABLE (table_name regclass, + shardid bigint, + shard_size bigint, + sourcename text, + sourceport int, + targetname text, + targetport int) + AS 'MODULE_PATHNAME' + LANGUAGE C VOLATILE; +COMMENT ON FUNCTION pg_catalog.get_rebalance_table_shards_plan(regclass, float4, int, bigint[], boolean, name, float4) + IS 'returns the list of shard placement moves to be done on a rebalance operation'; + diff --git a/src/backend/distributed/sql/udfs/get_rebalance_table_shards_plan/latest.sql b/src/backend/distributed/sql/udfs/get_rebalance_table_shards_plan/latest.sql index 7970a61dd..8a875ce90 100644 --- a/src/backend/distributed/sql/udfs/get_rebalance_table_shards_plan/latest.sql +++ b/src/backend/distributed/sql/udfs/get_rebalance_table_shards_plan/latest.sql @@ -10,7 +10,8 @@ CREATE OR REPLACE FUNCTION pg_catalog.get_rebalance_table_shards_plan( max_shard_moves int default 1000000, excluded_shard_list bigint[] default '{}', drain_only boolean default false, - rebalance_strategy name default NULL + rebalance_strategy name default NULL, + improvement_threshold float4 DEFAULT NULL ) RETURNS TABLE (table_name regclass, shardid bigint, @@ -21,6 +22,6 @@ CREATE OR REPLACE FUNCTION pg_catalog.get_rebalance_table_shards_plan( targetport int) AS 'MODULE_PATHNAME' LANGUAGE C VOLATILE; -COMMENT ON FUNCTION pg_catalog.get_rebalance_table_shards_plan(regclass, float4, int, bigint[], boolean, name) +COMMENT ON FUNCTION pg_catalog.get_rebalance_table_shards_plan(regclass, float4, int, bigint[], boolean, name, float4) IS 'returns the list of shard placement moves to be done on a rebalance operation'; diff --git a/src/backend/distributed/test/shard_rebalancer.c b/src/backend/distributed/test/shard_rebalancer.c index 67785c30e..4bd137c8c 100644 --- a/src/backend/distributed/test/shard_rebalancer.c +++ b/src/backend/distributed/test/shard_rebalancer.c @@ -89,6 +89,7 @@ shard_placement_rebalance_array(PG_FUNCTION_ARGS) float threshold = PG_GETARG_FLOAT4(2); int32 maxShardMoves = PG_GETARG_INT32(3); bool drainOnly = PG_GETARG_BOOL(4); + float utilizationImproventThreshold = PG_GETARG_FLOAT4(5); List *workerNodeList = NIL; List *shardPlacementListList = NIL; @@ -143,6 +144,7 @@ shard_placement_rebalance_array(PG_FUNCTION_ARGS) threshold, maxShardMoves, drainOnly, + utilizationImproventThreshold, &rebalancePlanFunctions); ArrayType *placementUpdateJsonArray = PlacementUpdateListToJsonArray( placementUpdateList); diff --git a/src/include/distributed/pg_dist_rebalance_strategy.h b/src/include/distributed/pg_dist_rebalance_strategy.h index 148c772cc..0c346501d 100644 --- a/src/include/distributed/pg_dist_rebalance_strategy.h +++ b/src/include/distributed/pg_dist_rebalance_strategy.h @@ -25,10 +25,11 @@ typedef struct FormData_pg_dist_rebalance_strategy NameData name; /* user readable name of the strategy */ bool default_strategy; /* if this strategy is the default strategy */ Oid shardCostFunction; /* function to calculate the shard cost */ - Oid nodeCapacityFunction; /* function to get the capacity of a node */ - Oid shardAllowedOnNodeFunction; /* function to check if shard is allowed on node */ - float4 defaultThreshold; /* default threshold that is used */ - float4 minimumThreshold; /* minimum threshold that is allowed */ + Oid nodeCapacityFunction; /* function to get the capacity of a node */ + Oid shardAllowedOnNodeFunction; /* function to check if shard is allowed on node */ + float4 defaultThreshold; /* default threshold that is used */ + float4 minimumThreshold; /* minimum threshold that is allowed */ + float4 improvementThreshold; /* the shard size threshold that is used */ } FormData_pg_dist_rebalance_strategy; /* ---------------- diff --git a/src/include/distributed/shard_rebalancer.h b/src/include/distributed/shard_rebalancer.h index 7e0716cb5..53c34cb21 100644 --- a/src/include/distributed/shard_rebalancer.h +++ b/src/include/distributed/shard_rebalancer.h @@ -112,15 +112,48 @@ typedef struct PlacementUpdateEventProgress typedef struct NodeFillState { WorkerNode *node; + + /* + * capacity is how big this node is, relative to the other nodes in the + * cluster. This has no unit, it can represent whatever the user wants. + * Some examples: + * 1. GBs of RAM + * 2. number of CPUs + * 3. GBs of disk + * 4. relative improvement of new CPU generation in newly added nodes + */ float4 capacity; + + /* + * totalCost is the costs of ShardCosts on the node added together. This + * doesn't have a unit. See the ShardCost->cost comment for some examples. + */ float4 totalCost; + + /* + * utilization is how "full" the node is. This is always totalCost divided + * by capacity. Since neither of those have a unit, this also doesn't have + * one. + */ float4 utilization; + + /* + * shardCostListDesc contains all ShardCosts that are on the current node, + * ordered from high cost to low cost. + */ List *shardCostListDesc; } NodeFillState; typedef struct ShardCost { uint64 shardId; + + /* + * cost is the cost of the shard. This doesn't have a unit. + * Some examples of what this could represent: + * 1. GBs of data + * 2. number of queries per day + */ float4 cost; } ShardCost; @@ -138,6 +171,8 @@ typedef struct RebalancePlanFunctions void *context; } RebalancePlanFunctions; +extern int MaxRebalancerLoggedIgnoredMoves; + /* External function declarations */ extern Datum shard_placement_rebalance_array(PG_FUNCTION_ARGS); extern Datum shard_placement_replication_array(PG_FUNCTION_ARGS); @@ -151,6 +186,7 @@ extern List * RebalancePlacementUpdates(List *workerNodeList, List *shardPlaceme double threshold, int32 maxShardMoves, bool drainOnly, + float4 utilizationImproventThreshold, RebalancePlanFunctions *rebalancePlanFunctions); extern List * ReplicationPlacementUpdates(List *workerNodeList, List *shardPlacementList, int shardReplicationFactor); diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index b22e6d5ae..f1fb15ade 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -560,16 +560,20 @@ SELECT * FROM print_extension_changes(); -- Snapshot of state at 10.1-1 ALTER EXTENSION citus UPDATE TO '10.1-1'; SELECT * FROM print_extension_changes(); - previous_object | current_object + previous_object | current_object --------------------------------------------------------------------- - function citus_internal.columnar_ensure_objects_exist() | - function create_distributed_table(regclass,text,citus.distribution_type,text) | - | function citus_local_disk_space_stats() - | function create_distributed_table(regclass,text,citus.distribution_type,text,integer) - | function worker_partitioned_relation_size(regclass) - | function worker_partitioned_relation_total_size(regclass) - | function worker_partitioned_table_size(regclass) -(7 rows) + function citus_add_rebalance_strategy(name,regproc,regproc,regproc,real,real) | + function citus_internal.columnar_ensure_objects_exist() | + function create_distributed_table(regclass,text,citus.distribution_type,text) | + function get_rebalance_table_shards_plan(regclass,real,integer,bigint[],boolean,name) | + | function citus_add_rebalance_strategy(name,regproc,regproc,regproc,real,real,real) + | function citus_local_disk_space_stats() + | function create_distributed_table(regclass,text,citus.distribution_type,text,integer) + | function get_rebalance_table_shards_plan(regclass,real,integer,bigint[],boolean,name,real) + | function worker_partitioned_relation_size(regclass) + | function worker_partitioned_relation_total_size(regclass) + | function worker_partitioned_table_size(regclass) +(11 rows) DROP TABLE prev_objects, extension_diff; -- show running version diff --git a/src/test/regress/expected/multi_metadata_attributes.out b/src/test/regress/expected/multi_metadata_attributes.out index 91d927c18..85ececdac 100644 --- a/src/test/regress/expected/multi_metadata_attributes.out +++ b/src/test/regress/expected/multi_metadata_attributes.out @@ -6,7 +6,7 @@ -- part of the query so new changes to it won't affect this test. SELECT attrelid::regclass, attname, atthasmissing, attmissingval FROM pg_attribute -WHERE atthasmissing AND attrelid NOT IN ('pg_dist_node'::regclass) +WHERE atthasmissing AND attrelid NOT IN ('pg_dist_node'::regclass, 'pg_dist_rebalance_strategy'::regclass) ORDER BY attrelid, attname; attrelid | attname | atthasmissing | attmissingval --------------------------------------------------------------------- diff --git a/src/test/regress/expected/shard_rebalancer.out b/src/test/regress/expected/shard_rebalancer.out index 843d7c170..8bc7ffc3f 100644 --- a/src/test/regress/expected/shard_rebalancer.out +++ b/src/test/regress/expected/shard_rebalancer.out @@ -132,7 +132,8 @@ CREATE OR REPLACE FUNCTION shard_placement_rebalance_array( shard_placement_list json[], threshold float4 DEFAULT 0, max_shard_moves int DEFAULT 1000000, - drain_only bool DEFAULT false + drain_only bool DEFAULT false, + improvement_threshold float4 DEFAULT 0.5 ) RETURNS json[] AS 'citus' @@ -1180,6 +1181,7 @@ CONTEXT: while executing command on localhost:xxxxx DROP USER testrole; -- Test costs set citus.shard_count = 4; +SET citus.next_shard_id TO 123040; CREATE TABLE tab (x int); SELECT create_distributed_table('tab','x'); create_distributed_table @@ -1193,6 +1195,7 @@ INSERT INTO tab SELECT 1 from generate_series(1, 30000); INSERT INTO tab SELECT 2 from generate_series(1, 10000); INSERT INTO tab SELECT 3 from generate_series(1, 10000); INSERT INTO tab SELECT 6 from generate_series(1, 10000); +VACUUM FULL tab; ANALYZE tab; \c - - - :worker_1_port SELECT table_schema, table_name, row_estimate, total_bytes @@ -1211,8 +1214,8 @@ WHERE table_schema = 'public' ) a ORDER BY table_name; table_schema | table_name | row_estimate | total_bytes --------------------------------------------------------------------- - public | tab_123033 | 30000 | 1114112 - public | tab_123035 | 10000 | 393216 + public | tab_123040 | 30000 | 1089536 + public | tab_123042 | 10000 | 368640 (2 rows) \c - - - :worker_2_port @@ -1232,8 +1235,8 @@ WHERE table_schema = 'public' ) a ORDER BY table_name; table_schema | table_name | row_estimate | total_bytes --------------------------------------------------------------------- - public | tab_123034 | 10000 | 393216 - public | tab_123036 | 10000 | 393216 + public | tab_123041 | 10000 | 368640 + public | tab_123043 | 10000 | 368640 (2 rows) \c - - - :master_port @@ -1245,7 +1248,7 @@ SELECT * FROM get_rebalance_table_shards_plan('tab'); SELECT * FROM get_rebalance_table_shards_plan('tab', rebalance_strategy := 'by_disk_size'); table_name | shardid | shard_size | sourcename | sourceport | targetname | targetport --------------------------------------------------------------------- - tab | 123035 | 0 | localhost | 57637 | localhost | 57638 + tab | 123042 | 0 | localhost | 57637 | localhost | 57638 (1 row) SELECT * FROM get_rebalance_table_shards_plan('tab', rebalance_strategy := 'by_disk_size', threshold := 0); @@ -1253,7 +1256,7 @@ WARNING: the given threshold is lower than the minimum threshold allowed by the DETAIL: Using threshold of 0.01 table_name | shardid | shard_size | sourcename | sourceport | targetname | targetport --------------------------------------------------------------------- - tab | 123035 | 0 | localhost | 57637 | localhost | 57638 + tab | 123042 | 0 | localhost | 57637 | localhost | 57638 (1 row) SELECT * FROM rebalance_table_shards('tab', shard_transfer_mode:='block_writes'); @@ -1312,6 +1315,7 @@ INSERT INTO tab2 SELECT 1 from generate_series(1, 0); INSERT INTO tab2 SELECT 2 from generate_series(1, 60000); INSERT INTO tab2 SELECT 3 from generate_series(1, 10000); INSERT INTO tab2 SELECT 6 from generate_series(1, 10000); +VACUUM FULL tab, tab2; ANALYZE tab, tab2; \c - - - :worker_1_port SELECT table_schema, table_name, row_estimate, total_bytes @@ -1331,7 +1335,7 @@ WHERE table_schema = 'public' table_schema | table_name | row_estimate | total_bytes --------------------------------------------------------------------- public | tab2_123050 | 0 | 0 - public | tab_123033 | 30000 | 1114112 + public | tab_123040 | 30000 | 1089536 (2 rows) \c - - - :worker_2_port @@ -1351,25 +1355,43 @@ WHERE table_schema = 'public' ) a ORDER BY table_name; table_schema | table_name | row_estimate | total_bytes --------------------------------------------------------------------- - public | tab2_123051 | 10000 | 393216 - public | tab2_123052 | 10000 | 393216 - public | tab2_123053 | 60000 | 2203648 - public | tab_123034 | 10000 | 393216 - public | tab_123035 | 10000 | 368640 - public | tab_123036 | 10000 | 393216 + public | tab2_123051 | 10000 | 368640 + public | tab2_123052 | 10000 | 368640 + public | tab2_123053 | 60000 | 2179072 + public | tab_123041 | 10000 | 368640 + public | tab_123042 | 10000 | 368640 + public | tab_123043 | 10000 | 368640 (6 rows) \c - - - :master_port SELECT * FROM get_rebalance_table_shards_plan('tab', rebalance_strategy := 'by_disk_size'); +NOTICE: Ignoring move of shard xxxxx from localhost:xxxxx to localhost:xxxxx, because the move only brings a small improvement relative to the shard its size +DETAIL: The balance improvement of 0.151125 is lower than the improvement_threshold of 0.5 +NOTICE: Ignored 1 moves, all of which are shown in notices above +HINT: If you do want these moves to happen, try changing improvement_threshold to a lower value than what it is now (0.5). table_name | shardid | shard_size | sourcename | sourceport | targetname | targetport --------------------------------------------------------------------- - tab | 123036 | 0 | localhost | 57638 | localhost | 57637 + tab | 123041 | 0 | localhost | 57638 | localhost | 57637 + tab2 | 123051 | 0 | localhost | 57638 | localhost | 57637 + tab | 123042 | 0 | localhost | 57638 | localhost | 57637 + tab2 | 123052 | 0 | localhost | 57638 | localhost | 57637 +(4 rows) + +-- supports improvement_threshold +SELECT * FROM get_rebalance_table_shards_plan('tab', rebalance_strategy := 'by_disk_size', improvement_threshold := 0); + table_name | shardid | shard_size | sourcename | sourceport | targetname | targetport +--------------------------------------------------------------------- + tab | 123043 | 0 | localhost | 57638 | localhost | 57637 tab2 | 123053 | 0 | localhost | 57638 | localhost | 57637 - tab | 123033 | 0 | localhost | 57637 | localhost | 57638 + tab | 123040 | 0 | localhost | 57637 | localhost | 57638 tab2 | 123050 | 0 | localhost | 57637 | localhost | 57638 (4 rows) SELECT * FROM rebalance_table_shards('tab', rebalance_strategy := 'by_disk_size', shard_transfer_mode:='block_writes'); +NOTICE: Ignoring move of shard xxxxx from localhost:xxxxx to localhost:xxxxx, because the move only brings a small improvement relative to the shard its size +DETAIL: The balance improvement of 0.151125 is lower than the improvement_threshold of 0.5 +NOTICE: Ignored 1 moves, all of which are shown in notices above +HINT: If you do want these moves to happen, try changing improvement_threshold to a lower value than what it is now (0.5). NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ... NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ... rebalance_table_shards @@ -1380,10 +1402,10 @@ NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ... SELECT * FROM public.table_placements_per_node; nodeport | logicalrelid | count --------------------------------------------------------------------- - 57637 | tab | 1 - 57638 | tab | 3 - 57637 | tab2 | 1 - 57638 | tab2 | 3 + 57637 | tab | 3 + 57638 | tab | 1 + 57637 | tab2 | 3 + 57638 | tab2 | 1 (4 rows) ANALYZE tab, tab2; @@ -1401,50 +1423,50 @@ SELECT table_schema, table_name, row_estimate, total_bytes WHERE relkind = 'r' ) a WHERE table_schema = 'public' +) a ORDER BY table_name; + table_schema | table_name | row_estimate | total_bytes +--------------------------------------------------------------------- + public | tab2_123050 | 0 | 0 + public | tab2_123051 | 10000 | 368640 + public | tab2_123052 | 10000 | 368640 + public | tab_123040 | 30000 | 1089536 + public | tab_123041 | 10000 | 368640 + public | tab_123042 | 10000 | 368640 +(6 rows) + +\c - - - :worker_2_port +SELECT table_schema, table_name, row_estimate, total_bytes + FROM ( + SELECT *, total_bytes-index_bytes-COALESCE(toast_bytes,0) AS table_bytes FROM ( + SELECT c.oid,nspname AS table_schema, relname AS TABLE_NAME + , c.reltuples AS row_estimate + , pg_total_relation_size(c.oid) AS total_bytes + , pg_indexes_size(c.oid) AS index_bytes + , pg_total_relation_size(reltoastrelid) AS toast_bytes + FROM pg_class c + LEFT JOIN pg_namespace n ON n.oid = c.relnamespace + WHERE relkind = 'r' + ) a +WHERE table_schema = 'public' ) a ORDER BY table_name; table_schema | table_name | row_estimate | total_bytes --------------------------------------------------------------------- public | tab2_123053 | 60000 | 2179072 - public | tab_123036 | 10000 | 368640 + public | tab_123043 | 10000 | 368640 (2 rows) -\c - - - :worker_2_port -SELECT table_schema, table_name, row_estimate, total_bytes - FROM ( - SELECT *, total_bytes-index_bytes-COALESCE(toast_bytes,0) AS table_bytes FROM ( - SELECT c.oid,nspname AS table_schema, relname AS TABLE_NAME - , c.reltuples AS row_estimate - , pg_total_relation_size(c.oid) AS total_bytes - , pg_indexes_size(c.oid) AS index_bytes - , pg_total_relation_size(reltoastrelid) AS toast_bytes - FROM pg_class c - LEFT JOIN pg_namespace n ON n.oid = c.relnamespace - WHERE relkind = 'r' - ) a -WHERE table_schema = 'public' -) a ORDER BY table_name; - table_schema | table_name | row_estimate | total_bytes ---------------------------------------------------------------------- - public | tab2_123050 | 0 | 0 - public | tab2_123051 | 10000 | 393216 - public | tab2_123052 | 10000 | 393216 - public | tab_123033 | 30000 | 1089536 - public | tab_123034 | 10000 | 393216 - public | tab_123035 | 10000 | 368640 -(6 rows) - \c - - - :master_port DROP TABLE tab2; -CREATE OR REPLACE FUNCTION capacity_high_worker_1(nodeidarg int) +CREATE OR REPLACE FUNCTION capacity_high_worker_2(nodeidarg int) RETURNS real AS $$ SELECT - (CASE WHEN nodeport = 57637 THEN 1000 ELSE 1 END)::real + (CASE WHEN nodeport = 57638 THEN 1000 ELSE 1 END)::real FROM pg_dist_node where nodeid = nodeidarg $$ LANGUAGE sql; SELECT citus_add_rebalance_strategy( - 'capacity_high_worker_1', + 'capacity_high_worker_2', 'citus_shard_cost_1', - 'capacity_high_worker_1', + 'capacity_high_worker_2', 'citus_shard_allowed_on_node_true', 0 ); @@ -1453,15 +1475,15 @@ SELECT citus_add_rebalance_strategy( (1 row) -SELECT * FROM get_rebalance_table_shards_plan('tab', rebalance_strategy := 'capacity_high_worker_1'); +SELECT * FROM get_rebalance_table_shards_plan('tab', rebalance_strategy := 'capacity_high_worker_2'); table_name | shardid | shard_size | sourcename | sourceport | targetname | targetport --------------------------------------------------------------------- - tab | 123033 | 0 | localhost | 57638 | localhost | 57637 - tab | 123034 | 0 | localhost | 57638 | localhost | 57637 - tab | 123035 | 0 | localhost | 57638 | localhost | 57637 + tab | 123040 | 0 | localhost | 57637 | localhost | 57638 + tab | 123041 | 0 | localhost | 57637 | localhost | 57638 + tab | 123042 | 0 | localhost | 57637 | localhost | 57638 (3 rows) -SELECT * FROM rebalance_table_shards('tab', rebalance_strategy := 'capacity_high_worker_1', shard_transfer_mode:='block_writes'); +SELECT * FROM rebalance_table_shards('tab', rebalance_strategy := 'capacity_high_worker_2', shard_transfer_mode:='block_writes'); NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ... NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ... NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ... @@ -1473,10 +1495,10 @@ NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ... SELECT * FROM public.table_placements_per_node; nodeport | logicalrelid | count --------------------------------------------------------------------- - 57637 | tab | 4 + 57638 | tab | 4 (1 row) -SELECT citus_set_default_rebalance_strategy('capacity_high_worker_1'); +SELECT citus_set_default_rebalance_strategy('capacity_high_worker_2'); citus_set_default_rebalance_strategy --------------------------------------------------------------------- @@ -1496,20 +1518,20 @@ SELECT * FROM rebalance_table_shards('tab', shard_transfer_mode:='block_writes') SELECT * FROM public.table_placements_per_node; nodeport | logicalrelid | count --------------------------------------------------------------------- - 57637 | tab | 4 + 57638 | tab | 4 (1 row) -CREATE FUNCTION only_worker_2(shardid bigint, nodeidarg int) +CREATE FUNCTION only_worker_1(shardid bigint, nodeidarg int) RETURNS boolean AS $$ SELECT - (CASE WHEN nodeport = 57638 THEN TRUE ELSE FALSE END) + (CASE WHEN nodeport = 57637 THEN TRUE ELSE FALSE END) FROM pg_dist_node where nodeid = nodeidarg $$ LANGUAGE sql; SELECT citus_add_rebalance_strategy( - 'only_worker_2', + 'only_worker_1', 'citus_shard_cost_1', 'citus_node_capacity_1', - 'only_worker_2', + 'only_worker_1', 0 ); citus_add_rebalance_strategy @@ -1517,7 +1539,7 @@ SELECT citus_add_rebalance_strategy( (1 row) -SELECT citus_set_default_rebalance_strategy('only_worker_2'); +SELECT citus_set_default_rebalance_strategy('only_worker_1'); citus_set_default_rebalance_strategy --------------------------------------------------------------------- @@ -1526,10 +1548,10 @@ SELECT citus_set_default_rebalance_strategy('only_worker_2'); SELECT * FROM get_rebalance_table_shards_plan('tab'); table_name | shardid | shard_size | sourcename | sourceport | targetname | targetport --------------------------------------------------------------------- - tab | 123033 | 0 | localhost | 57637 | localhost | 57638 - tab | 123034 | 0 | localhost | 57637 | localhost | 57638 - tab | 123035 | 0 | localhost | 57637 | localhost | 57638 - tab | 123036 | 0 | localhost | 57637 | localhost | 57638 + tab | 123040 | 0 | localhost | 57638 | localhost | 57637 + tab | 123041 | 0 | localhost | 57638 | localhost | 57637 + tab | 123042 | 0 | localhost | 57638 | localhost | 57637 + tab | 123043 | 0 | localhost | 57638 | localhost | 57637 (4 rows) SELECT * FROM rebalance_table_shards('tab', shard_transfer_mode:='block_writes'); @@ -1545,7 +1567,7 @@ NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ... SELECT * FROM public.table_placements_per_node; nodeport | logicalrelid | count --------------------------------------------------------------------- - 57638 | tab | 4 + 57637 | tab | 4 (1 row) SELECT citus_set_default_rebalance_strategy('by_shard_count'); @@ -1557,8 +1579,8 @@ SELECT citus_set_default_rebalance_strategy('by_shard_count'); SELECT * FROM get_rebalance_table_shards_plan('tab'); table_name | shardid | shard_size | sourcename | sourceport | targetname | targetport --------------------------------------------------------------------- - tab | 123033 | 0 | localhost | 57638 | localhost | 57637 - tab | 123034 | 0 | localhost | 57638 | localhost | 57637 + tab | 123040 | 0 | localhost | 57637 | localhost | 57638 + tab | 123041 | 0 | localhost | 57637 | localhost | 57638 (2 rows) -- Check all the error handling cases @@ -1849,7 +1871,7 @@ UPDATE pg_dist_rebalance_strategy SET default_strategy=true WHERE name='by_shard SELECT citus_add_rebalance_strategy( 'default_threshold_too_low', 'citus_shard_cost_1', - 'capacity_high_worker_1', + 'capacity_high_worker_2', 'citus_shard_allowed_on_node_true', 0, 0.1 diff --git a/src/test/regress/expected/shard_rebalancer_unit.out b/src/test/regress/expected/shard_rebalancer_unit.out index dcbf29df3..fe88968fe 100644 --- a/src/test/regress/expected/shard_rebalancer_unit.out +++ b/src/test/regress/expected/shard_rebalancer_unit.out @@ -3,7 +3,8 @@ CREATE OR REPLACE FUNCTION shard_placement_rebalance_array( shard_placement_list json[], threshold float4 DEFAULT 0, max_shard_moves int DEFAULT 1000000, - drain_only bool DEFAULT false + drain_only bool DEFAULT false, + improvement_threshold float4 DEFAULT 0.5 ) RETURNS json[] AS 'citus' @@ -500,3 +501,244 @@ NOTICE: Stopped searching before we were out of moves. Please rerun the rebalan {"updatetype":1,"shardid":8,"sourcename":"hostname2","sourceport":5432,"targetname":"hostname3","targetport":5432} (5 rows) +-- Don't move a big shards if it doesn't improve the utilization balance much. +SELECT unnest(shard_placement_rebalance_array( + ARRAY['{"node_name": "a"}', + '{"node_name": "b"}']::json[], + ARRAY['{"shardid":1, "cost":20, "nodename":"a"}', + '{"shardid":2, "cost":20, "nodename":"a"}', + '{"shardid":3, "cost":100, "nodename":"b"}', + '{"shardid":4, "cost":50, "nodename":"b"}' + ]::json[] +)); +NOTICE: Ignoring move of shard xxxxx from b:5432 to a:5432, because the move only brings a small improvement relative to the shard its size +DETAIL: The balance improvement of 0.1 is lower than the improvement_threshold of 0.5 +NOTICE: Ignored 1 moves, all of which are shown in notices above +HINT: If you do want these moves to happen, try changing improvement_threshold to a lower value than what it is now (0.5). + unnest +--------------------------------------------------------------------- + {"updatetype":1,"shardid":4,"sourcename":"b","sourceport":5432,"targetname":"a","targetport":5432} +(1 row) + +SELECT unnest(shard_placement_rebalance_array( + ARRAY['{"node_name": "a"}', + '{"node_name": "b"}']::json[], + ARRAY['{"shardid":1, "cost":40, "nodename":"a"}', + '{"shardid":2, "cost":40, "nodename":"a"}', + '{"shardid":3, "cost":100, "nodename":"b"}', + '{"shardid":4, "cost":100, "nodename":"b"}' + ]::json[] +)); +NOTICE: Ignoring move of shard xxxxx from b:5432 to a:5432, because the move only brings a small improvement relative to the shard its size +DETAIL: The balance improvement of 0.2 is lower than the improvement_threshold of 0.5 +NOTICE: Ignoring move of shard xxxxx from b:5432 to a:5432, because the move only brings a small improvement relative to the shard its size +DETAIL: The balance improvement of 0.2 is lower than the improvement_threshold of 0.5 +NOTICE: Ignored 2 moves, all of which are shown in notices above +HINT: If you do want these moves to happen, try changing improvement_threshold to a lower value than what it is now (0.5). + unnest +--------------------------------------------------------------------- +(0 rows) + +-- improvement_threshold can be used to force a move of big shards +-- if needed. +SELECT unnest(shard_placement_rebalance_array( + ARRAY['{"node_name": "a"}', + '{"node_name": "b"}']::json[], + ARRAY['{"shardid":1, "cost":20, "nodename":"a"}', + '{"shardid":2, "cost":20, "nodename":"a"}', + '{"shardid":3, "cost":100, "nodename":"b"}', + '{"shardid":4, "cost":50, "nodename":"b"}' + ]::json[], + improvement_threshold := 0.1 +)); + unnest +--------------------------------------------------------------------- + {"updatetype":1,"shardid":3,"sourcename":"b","sourceport":5432,"targetname":"a","targetport":5432} + {"updatetype":1,"shardid":1,"sourcename":"a","sourceport":5432,"targetname":"b","targetport":5432} + {"updatetype":1,"shardid":2,"sourcename":"a","sourceport":5432,"targetname":"b","targetport":5432} +(3 rows) + +SELECT unnest(shard_placement_rebalance_array( + ARRAY['{"node_name": "a"}', + '{"node_name": "b"}']::json[], + ARRAY['{"shardid":1, "cost":40, "nodename":"a"}', + '{"shardid":2, "cost":40, "nodename":"a"}', + '{"shardid":3, "cost":100, "nodename":"b"}', + '{"shardid":4, "cost":100, "nodename":"b"}' + ]::json[], + improvement_threshold := 0.2 +)); + unnest +--------------------------------------------------------------------- + {"updatetype":1,"shardid":3,"sourcename":"b","sourceport":5432,"targetname":"a","targetport":5432} + {"updatetype":1,"shardid":1,"sourcename":"a","sourceport":5432,"targetname":"b","targetport":5432} +(2 rows) + +-- limits notices about ignored moves +SELECT unnest(shard_placement_rebalance_array( + ARRAY['{"node_name": "a"}', + '{"node_name": "b"}', + '{"node_name": "c"}', + '{"node_name": "d"}', + '{"node_name": "e"}', + '{"node_name": "f"}', + '{"node_name": "g"}' + ]::json[], + ARRAY['{"shardid":1, "cost":39, "nodename":"a"}', + '{"shardid":2, "cost":39, "nodename":"b"}', + '{"shardid":3, "cost":39, "nodename":"c"}', + '{"shardid":4, "cost":39, "nodename":"d"}', + '{"shardid":5, "cost":39, "nodename":"e"}', + '{"shardid":6, "cost":39, "nodename":"f"}', + '{"shardid":7, "cost":40, "nodename":"g"}', + '{"shardid":8, "cost":39, "nodename":"g"}' + ]::json[], + improvement_threshold := 0.1 +)); +NOTICE: Ignoring move of shard xxxxx from g:5432 to a:5432, because the move only brings a small improvement relative to the shard its size +DETAIL: The balance improvement of 0.025641 is lower than the improvement_threshold of 0.1 +NOTICE: Ignoring move of shard xxxxx from g:5432 to b:5432, because the move only brings a small improvement relative to the shard its size +DETAIL: The balance improvement of 0.025641 is lower than the improvement_threshold of 0.1 +NOTICE: Ignoring move of shard xxxxx from g:5432 to c:5432, because the move only brings a small improvement relative to the shard its size +DETAIL: The balance improvement of 0.025641 is lower than the improvement_threshold of 0.1 +NOTICE: Ignoring move of shard xxxxx from g:5432 to d:5432, because the move only brings a small improvement relative to the shard its size +DETAIL: The balance improvement of 0.025641 is lower than the improvement_threshold of 0.1 +NOTICE: Ignoring move of shard xxxxx from g:5432 to e:5432, because the move only brings a small improvement relative to the shard its size +DETAIL: The balance improvement of 0.025641 is lower than the improvement_threshold of 0.1 +NOTICE: Ignored 6 moves, 5 of which are shown in notices above +HINT: If you do want these moves to happen, try changing improvement_threshold to a lower value than what it is now (0.1). + unnest +--------------------------------------------------------------------- +(0 rows) + +-- limits notices based on GUC +set citus.max_rebalancer_logged_ignored_moves = 1; +SELECT unnest(shard_placement_rebalance_array( + ARRAY['{"node_name": "a"}', + '{"node_name": "b"}', + '{"node_name": "c"}', + '{"node_name": "d"}', + '{"node_name": "e"}', + '{"node_name": "f"}', + '{"node_name": "g"}' + ]::json[], + ARRAY['{"shardid":1, "cost":39, "nodename":"a"}', + '{"shardid":2, "cost":39, "nodename":"b"}', + '{"shardid":3, "cost":39, "nodename":"c"}', + '{"shardid":4, "cost":39, "nodename":"d"}', + '{"shardid":5, "cost":39, "nodename":"e"}', + '{"shardid":6, "cost":39, "nodename":"f"}', + '{"shardid":7, "cost":40, "nodename":"g"}', + '{"shardid":8, "cost":39, "nodename":"g"}' + ]::json[], + improvement_threshold := 0.1 +)); +NOTICE: Ignoring move of shard xxxxx from g:5432 to a:5432, because the move only brings a small improvement relative to the shard its size +DETAIL: The balance improvement of 0.025641 is lower than the improvement_threshold of 0.1 +NOTICE: Ignored 6 moves, 1 of which are shown in notices above +HINT: If you do want these moves to happen, try changing improvement_threshold to a lower value than what it is now (0.1). + unnest +--------------------------------------------------------------------- +(0 rows) + +set citus.max_rebalancer_logged_ignored_moves = 10; +SELECT unnest(shard_placement_rebalance_array( + ARRAY['{"node_name": "a"}', + '{"node_name": "b"}', + '{"node_name": "c"}', + '{"node_name": "d"}', + '{"node_name": "e"}', + '{"node_name": "f"}', + '{"node_name": "g"}' + ]::json[], + ARRAY['{"shardid":1, "cost":39, "nodename":"a"}', + '{"shardid":2, "cost":39, "nodename":"b"}', + '{"shardid":3, "cost":39, "nodename":"c"}', + '{"shardid":4, "cost":39, "nodename":"d"}', + '{"shardid":5, "cost":39, "nodename":"e"}', + '{"shardid":6, "cost":39, "nodename":"f"}', + '{"shardid":7, "cost":40, "nodename":"g"}', + '{"shardid":8, "cost":39, "nodename":"g"}' + ]::json[], + improvement_threshold := 0.1 +)); +NOTICE: Ignoring move of shard xxxxx from g:5432 to a:5432, because the move only brings a small improvement relative to the shard its size +DETAIL: The balance improvement of 0.025641 is lower than the improvement_threshold of 0.1 +NOTICE: Ignoring move of shard xxxxx from g:5432 to b:5432, because the move only brings a small improvement relative to the shard its size +DETAIL: The balance improvement of 0.025641 is lower than the improvement_threshold of 0.1 +NOTICE: Ignoring move of shard xxxxx from g:5432 to c:5432, because the move only brings a small improvement relative to the shard its size +DETAIL: The balance improvement of 0.025641 is lower than the improvement_threshold of 0.1 +NOTICE: Ignoring move of shard xxxxx from g:5432 to d:5432, because the move only brings a small improvement relative to the shard its size +DETAIL: The balance improvement of 0.025641 is lower than the improvement_threshold of 0.1 +NOTICE: Ignoring move of shard xxxxx from g:5432 to e:5432, because the move only brings a small improvement relative to the shard its size +DETAIL: The balance improvement of 0.025641 is lower than the improvement_threshold of 0.1 +NOTICE: Ignoring move of shard xxxxx from g:5432 to f:5432, because the move only brings a small improvement relative to the shard its size +DETAIL: The balance improvement of 0.025641 is lower than the improvement_threshold of 0.1 +NOTICE: Ignored 6 moves, all of which are shown in notices above +HINT: If you do want these moves to happen, try changing improvement_threshold to a lower value than what it is now (0.1). + unnest +--------------------------------------------------------------------- +(0 rows) + +set citus.max_rebalancer_logged_ignored_moves = -1; +SELECT unnest(shard_placement_rebalance_array( + ARRAY['{"node_name": "a"}', + '{"node_name": "b"}', + '{"node_name": "c"}', + '{"node_name": "d"}', + '{"node_name": "e"}', + '{"node_name": "f"}', + '{"node_name": "g"}' + ]::json[], + ARRAY['{"shardid":1, "cost":39, "nodename":"a"}', + '{"shardid":2, "cost":39, "nodename":"b"}', + '{"shardid":3, "cost":39, "nodename":"c"}', + '{"shardid":4, "cost":39, "nodename":"d"}', + '{"shardid":5, "cost":39, "nodename":"e"}', + '{"shardid":6, "cost":39, "nodename":"f"}', + '{"shardid":7, "cost":40, "nodename":"g"}', + '{"shardid":8, "cost":39, "nodename":"g"}' + ]::json[], + improvement_threshold := 0.1 +)); +NOTICE: Ignoring move of shard xxxxx from g:5432 to a:5432, because the move only brings a small improvement relative to the shard its size +DETAIL: The balance improvement of 0.025641 is lower than the improvement_threshold of 0.1 +NOTICE: Ignoring move of shard xxxxx from g:5432 to b:5432, because the move only brings a small improvement relative to the shard its size +DETAIL: The balance improvement of 0.025641 is lower than the improvement_threshold of 0.1 +NOTICE: Ignoring move of shard xxxxx from g:5432 to c:5432, because the move only brings a small improvement relative to the shard its size +DETAIL: The balance improvement of 0.025641 is lower than the improvement_threshold of 0.1 +NOTICE: Ignoring move of shard xxxxx from g:5432 to d:5432, because the move only brings a small improvement relative to the shard its size +DETAIL: The balance improvement of 0.025641 is lower than the improvement_threshold of 0.1 +NOTICE: Ignoring move of shard xxxxx from g:5432 to e:5432, because the move only brings a small improvement relative to the shard its size +DETAIL: The balance improvement of 0.025641 is lower than the improvement_threshold of 0.1 +NOTICE: Ignoring move of shard xxxxx from g:5432 to f:5432, because the move only brings a small improvement relative to the shard its size +DETAIL: The balance improvement of 0.025641 is lower than the improvement_threshold of 0.1 +NOTICE: Ignored 6 moves, all of which are shown in notices above +HINT: If you do want these moves to happen, try changing improvement_threshold to a lower value than what it is now (0.1). + unnest +--------------------------------------------------------------------- +(0 rows) + +-- Combining improvement_threshold and capacity works as expected. +SELECT unnest(shard_placement_rebalance_array( + ARRAY['{"node_name": "a"}', + '{"node_name": "b", "capacity": 2}']::json[], + ARRAY['{"shardid":1, "cost":20, "nodename":"a"}', + '{"shardid":2, "cost":10, "nodename":"a"}', + '{"shardid":3, "cost":10, "nodename":"a"}', + '{"shardid":4, "cost":100, "nodename":"b"}', + '{"shardid":5, "cost":50, "nodename":"b"}', + '{"shardid":6, "cost":50, "nodename":"b"}' + ]::json[], + improvement_threshold := 0.6 +)); +NOTICE: Ignoring move of shard xxxxx from a:5432 to b:5432, because the move only brings a small improvement relative to the shard its size +DETAIL: The balance improvement of 0.5 is lower than the improvement_threshold of 0.6 +NOTICE: Ignored 1 moves, all of which are shown in notices above +HINT: If you do want these moves to happen, try changing improvement_threshold to a lower value than what it is now (0.6). + unnest +--------------------------------------------------------------------- + {"updatetype":1,"shardid":5,"sourcename":"b","sourceport":5432,"targetname":"a","targetport":5432} + {"updatetype":1,"shardid":2,"sourcename":"a","sourceport":5432,"targetname":"b","targetport":5432} +(2 rows) + diff --git a/src/test/regress/expected/upgrade_list_citus_objects.out b/src/test/regress/expected/upgrade_list_citus_objects.out index 72c5a79a2..cd96f8caa 100644 --- a/src/test/regress/expected/upgrade_list_citus_objects.out +++ b/src/test/regress/expected/upgrade_list_citus_objects.out @@ -35,7 +35,7 @@ ORDER BY 1; function citus_add_inactive_node(text,integer,integer,noderole,name) function citus_add_local_table_to_metadata(regclass,boolean) function citus_add_node(text,integer,integer,noderole,name) - function citus_add_rebalance_strategy(name,regproc,regproc,regproc,real,real) + function citus_add_rebalance_strategy(name,regproc,regproc,regproc,real,real,real) function citus_add_secondary_node(text,integer,text,integer,name) function citus_blocking_pids(integer) function citus_conninfo_cache_invalidate() @@ -121,7 +121,7 @@ ORDER BY 1; function get_current_transaction_id() function get_global_active_transactions() function get_rebalance_progress() - function get_rebalance_table_shards_plan(regclass,real,integer,bigint[],boolean,name) + function get_rebalance_table_shards_plan(regclass,real,integer,bigint[],boolean,name,real) function get_shard_id_for_distribution_column(regclass,"any") function isolate_tenant_to_new_shard(regclass,"any",text) function json_cat_agg(json) diff --git a/src/test/regress/expected/upgrade_list_citus_objects_0.out b/src/test/regress/expected/upgrade_list_citus_objects_0.out index 5b5c04fb3..e3c5322bd 100644 --- a/src/test/regress/expected/upgrade_list_citus_objects_0.out +++ b/src/test/regress/expected/upgrade_list_citus_objects_0.out @@ -32,7 +32,7 @@ ORDER BY 1; function citus_add_inactive_node(text,integer,integer,noderole,name) function citus_add_local_table_to_metadata(regclass,boolean) function citus_add_node(text,integer,integer,noderole,name) - function citus_add_rebalance_strategy(name,regproc,regproc,regproc,real,real) + function citus_add_rebalance_strategy(name,regproc,regproc,regproc,real,real,real) function citus_add_secondary_node(text,integer,text,integer,name) function citus_blocking_pids(integer) function citus_conninfo_cache_invalidate() @@ -116,7 +116,7 @@ ORDER BY 1; function get_current_transaction_id() function get_global_active_transactions() function get_rebalance_progress() - function get_rebalance_table_shards_plan(regclass,real,integer,bigint[],boolean,name) + function get_rebalance_table_shards_plan(regclass,real,integer,bigint[],boolean,name,real) function get_shard_id_for_distribution_column(regclass,"any") function isolate_tenant_to_new_shard(regclass,"any",text) function json_cat_agg(json) diff --git a/src/test/regress/expected/upgrade_rebalance_strategy_after.out b/src/test/regress/expected/upgrade_rebalance_strategy_after.out index 36dd71b6c..da822fffd 100644 --- a/src/test/regress/expected/upgrade_rebalance_strategy_after.out +++ b/src/test/regress/expected/upgrade_rebalance_strategy_after.out @@ -1,8 +1,8 @@ SELECT * FROM pg_catalog.pg_dist_rebalance_strategy ORDER BY name; - name | default_strategy | shard_cost_function | node_capacity_function | shard_allowed_on_node_function | default_threshold | minimum_threshold + name | default_strategy | shard_cost_function | node_capacity_function | shard_allowed_on_node_function | default_threshold | minimum_threshold | improvement_threshold --------------------------------------------------------------------- - by_disk_size | f | citus_shard_cost_by_disk_size | citus_node_capacity_1 | citus_shard_allowed_on_node_true | 0.1 | 0.01 - by_shard_count | f | citus_shard_cost_1 | citus_node_capacity_1 | citus_shard_allowed_on_node_true | 0 | 0 - custom_strategy | t | upgrade_rebalance_strategy.shard_cost_2 | upgrade_rebalance_strategy.capacity_high_worker_1 | upgrade_rebalance_strategy.only_worker_2 | 0.5 | 0.2 + by_disk_size | f | citus_shard_cost_by_disk_size | citus_node_capacity_1 | citus_shard_allowed_on_node_true | 0.1 | 0.01 | 0.5 + by_shard_count | f | citus_shard_cost_1 | citus_node_capacity_1 | citus_shard_allowed_on_node_true | 0 | 0 | 0 + custom_strategy | t | upgrade_rebalance_strategy.shard_cost_2 | upgrade_rebalance_strategy.capacity_high_worker_1 | upgrade_rebalance_strategy.only_worker_2 | 0.5 | 0.2 | 0 (3 rows) diff --git a/src/test/regress/expected/upgrade_rebalance_strategy_before.out b/src/test/regress/expected/upgrade_rebalance_strategy_before.out index 0a12b1d60..cf1d122b3 100644 --- a/src/test/regress/expected/upgrade_rebalance_strategy_before.out +++ b/src/test/regress/expected/upgrade_rebalance_strategy_before.out @@ -15,14 +15,14 @@ CREATE FUNCTION only_worker_2(shardid bigint, nodeidarg int) (CASE WHEN nodeport = 57638 THEN TRUE ELSE FALSE END) FROM pg_dist_node where nodeid = nodeidarg $$ LANGUAGE sql; -ALTER TABLE pg_catalog.pg_dist_rebalance_strategy DISABLE TRIGGER pg_dist_rebalance_strategy_enterprise_check_trigger; SELECT citus_add_rebalance_strategy( 'custom_strategy', 'shard_cost_2', 'capacity_high_worker_1', 'only_worker_2', 0.5, - 0.2 + 0.2, + 0.3 ); citus_add_rebalance_strategy --------------------------------------------------------------------- @@ -35,4 +35,3 @@ SELECT citus_set_default_rebalance_strategy('custom_strategy'); (1 row) -ALTER TABLE pg_catalog.pg_dist_rebalance_strategy ENABLE TRIGGER pg_dist_rebalance_strategy_enterprise_check_trigger; diff --git a/src/test/regress/sql/multi_metadata_attributes.sql b/src/test/regress/sql/multi_metadata_attributes.sql index 1845a4f53..174c92331 100644 --- a/src/test/regress/sql/multi_metadata_attributes.sql +++ b/src/test/regress/sql/multi_metadata_attributes.sql @@ -7,5 +7,5 @@ -- part of the query so new changes to it won't affect this test. SELECT attrelid::regclass, attname, atthasmissing, attmissingval FROM pg_attribute -WHERE atthasmissing AND attrelid NOT IN ('pg_dist_node'::regclass) +WHERE atthasmissing AND attrelid NOT IN ('pg_dist_node'::regclass, 'pg_dist_rebalance_strategy'::regclass) ORDER BY attrelid, attname; diff --git a/src/test/regress/sql/shard_rebalancer.sql b/src/test/regress/sql/shard_rebalancer.sql index ec8751d3b..934bb11e8 100644 --- a/src/test/regress/sql/shard_rebalancer.sql +++ b/src/test/regress/sql/shard_rebalancer.sql @@ -65,7 +65,8 @@ CREATE OR REPLACE FUNCTION shard_placement_rebalance_array( shard_placement_list json[], threshold float4 DEFAULT 0, max_shard_moves int DEFAULT 1000000, - drain_only bool DEFAULT false + drain_only bool DEFAULT false, + improvement_threshold float4 DEFAULT 0.5 ) RETURNS json[] AS 'citus' @@ -647,6 +648,7 @@ DROP USER testrole; -- Test costs set citus.shard_count = 4; +SET citus.next_shard_id TO 123040; CREATE TABLE tab (x int); SELECT create_distributed_table('tab','x'); -- The following numbers are chosen such that they are placed on different @@ -655,6 +657,7 @@ INSERT INTO tab SELECT 1 from generate_series(1, 30000); INSERT INTO tab SELECT 2 from generate_series(1, 10000); INSERT INTO tab SELECT 3 from generate_series(1, 10000); INSERT INTO tab SELECT 6 from generate_series(1, 10000); +VACUUM FULL tab; ANALYZE tab; \c - - - :worker_1_port @@ -712,6 +715,7 @@ INSERT INTO tab2 SELECT 1 from generate_series(1, 0); INSERT INTO tab2 SELECT 2 from generate_series(1, 60000); INSERT INTO tab2 SELECT 3 from generate_series(1, 10000); INSERT INTO tab2 SELECT 6 from generate_series(1, 10000); +VACUUM FULL tab, tab2; ANALYZE tab, tab2; \c - - - :worker_1_port @@ -747,6 +751,8 @@ WHERE table_schema = 'public' \c - - - :master_port SELECT * FROM get_rebalance_table_shards_plan('tab', rebalance_strategy := 'by_disk_size'); +-- supports improvement_threshold +SELECT * FROM get_rebalance_table_shards_plan('tab', rebalance_strategy := 'by_disk_size', improvement_threshold := 0); SELECT * FROM rebalance_table_shards('tab', rebalance_strategy := 'by_disk_size', shard_transfer_mode:='block_writes'); SELECT * FROM public.table_placements_per_node; ANALYZE tab, tab2; @@ -785,46 +791,46 @@ WHERE table_schema = 'public' DROP TABLE tab2; -CREATE OR REPLACE FUNCTION capacity_high_worker_1(nodeidarg int) +CREATE OR REPLACE FUNCTION capacity_high_worker_2(nodeidarg int) RETURNS real AS $$ SELECT - (CASE WHEN nodeport = 57637 THEN 1000 ELSE 1 END)::real + (CASE WHEN nodeport = 57638 THEN 1000 ELSE 1 END)::real FROM pg_dist_node where nodeid = nodeidarg $$ LANGUAGE sql; SELECT citus_add_rebalance_strategy( - 'capacity_high_worker_1', + 'capacity_high_worker_2', 'citus_shard_cost_1', - 'capacity_high_worker_1', + 'capacity_high_worker_2', 'citus_shard_allowed_on_node_true', 0 ); -SELECT * FROM get_rebalance_table_shards_plan('tab', rebalance_strategy := 'capacity_high_worker_1'); -SELECT * FROM rebalance_table_shards('tab', rebalance_strategy := 'capacity_high_worker_1', shard_transfer_mode:='block_writes'); +SELECT * FROM get_rebalance_table_shards_plan('tab', rebalance_strategy := 'capacity_high_worker_2'); +SELECT * FROM rebalance_table_shards('tab', rebalance_strategy := 'capacity_high_worker_2', shard_transfer_mode:='block_writes'); SELECT * FROM public.table_placements_per_node; -SELECT citus_set_default_rebalance_strategy('capacity_high_worker_1'); +SELECT citus_set_default_rebalance_strategy('capacity_high_worker_2'); SELECT * FROM get_rebalance_table_shards_plan('tab'); SELECT * FROM rebalance_table_shards('tab', shard_transfer_mode:='block_writes'); SELECT * FROM public.table_placements_per_node; -CREATE FUNCTION only_worker_2(shardid bigint, nodeidarg int) +CREATE FUNCTION only_worker_1(shardid bigint, nodeidarg int) RETURNS boolean AS $$ SELECT - (CASE WHEN nodeport = 57638 THEN TRUE ELSE FALSE END) + (CASE WHEN nodeport = 57637 THEN TRUE ELSE FALSE END) FROM pg_dist_node where nodeid = nodeidarg $$ LANGUAGE sql; SELECT citus_add_rebalance_strategy( - 'only_worker_2', + 'only_worker_1', 'citus_shard_cost_1', 'citus_node_capacity_1', - 'only_worker_2', + 'only_worker_1', 0 ); -SELECT citus_set_default_rebalance_strategy('only_worker_2'); +SELECT citus_set_default_rebalance_strategy('only_worker_1'); SELECT * FROM get_rebalance_table_shards_plan('tab'); SELECT * FROM rebalance_table_shards('tab', shard_transfer_mode:='block_writes'); SELECT * FROM public.table_placements_per_node; @@ -1012,7 +1018,7 @@ UPDATE pg_dist_rebalance_strategy SET default_strategy=true WHERE name='by_shard SELECT citus_add_rebalance_strategy( 'default_threshold_too_low', 'citus_shard_cost_1', - 'capacity_high_worker_1', + 'capacity_high_worker_2', 'citus_shard_allowed_on_node_true', 0, 0.1 diff --git a/src/test/regress/sql/shard_rebalancer_unit.sql b/src/test/regress/sql/shard_rebalancer_unit.sql index 8907ad1ec..51293a227 100644 --- a/src/test/regress/sql/shard_rebalancer_unit.sql +++ b/src/test/regress/sql/shard_rebalancer_unit.sql @@ -3,7 +3,8 @@ CREATE OR REPLACE FUNCTION shard_placement_rebalance_array( shard_placement_list json[], threshold float4 DEFAULT 0, max_shard_moves int DEFAULT 1000000, - drain_only bool DEFAULT false + drain_only bool DEFAULT false, + improvement_threshold float4 DEFAULT 0.5 ) RETURNS json[] AS 'citus' @@ -381,3 +382,151 @@ SELECT unnest(shard_placement_rebalance_array( ]::json[], max_shard_moves := 5 )); + + +-- Don't move a big shards if it doesn't improve the utilization balance much. +SELECT unnest(shard_placement_rebalance_array( + ARRAY['{"node_name": "a"}', + '{"node_name": "b"}']::json[], + ARRAY['{"shardid":1, "cost":20, "nodename":"a"}', + '{"shardid":2, "cost":20, "nodename":"a"}', + '{"shardid":3, "cost":100, "nodename":"b"}', + '{"shardid":4, "cost":50, "nodename":"b"}' + ]::json[] +)); +SELECT unnest(shard_placement_rebalance_array( + ARRAY['{"node_name": "a"}', + '{"node_name": "b"}']::json[], + ARRAY['{"shardid":1, "cost":40, "nodename":"a"}', + '{"shardid":2, "cost":40, "nodename":"a"}', + '{"shardid":3, "cost":100, "nodename":"b"}', + '{"shardid":4, "cost":100, "nodename":"b"}' + ]::json[] +)); + +-- improvement_threshold can be used to force a move of big shards +-- if needed. +SELECT unnest(shard_placement_rebalance_array( + ARRAY['{"node_name": "a"}', + '{"node_name": "b"}']::json[], + ARRAY['{"shardid":1, "cost":20, "nodename":"a"}', + '{"shardid":2, "cost":20, "nodename":"a"}', + '{"shardid":3, "cost":100, "nodename":"b"}', + '{"shardid":4, "cost":50, "nodename":"b"}' + ]::json[], + improvement_threshold := 0.1 +)); +SELECT unnest(shard_placement_rebalance_array( + ARRAY['{"node_name": "a"}', + '{"node_name": "b"}']::json[], + ARRAY['{"shardid":1, "cost":40, "nodename":"a"}', + '{"shardid":2, "cost":40, "nodename":"a"}', + '{"shardid":3, "cost":100, "nodename":"b"}', + '{"shardid":4, "cost":100, "nodename":"b"}' + ]::json[], + improvement_threshold := 0.2 +)); + +-- limits notices about ignored moves +SELECT unnest(shard_placement_rebalance_array( + ARRAY['{"node_name": "a"}', + '{"node_name": "b"}', + '{"node_name": "c"}', + '{"node_name": "d"}', + '{"node_name": "e"}', + '{"node_name": "f"}', + '{"node_name": "g"}' + ]::json[], + ARRAY['{"shardid":1, "cost":39, "nodename":"a"}', + '{"shardid":2, "cost":39, "nodename":"b"}', + '{"shardid":3, "cost":39, "nodename":"c"}', + '{"shardid":4, "cost":39, "nodename":"d"}', + '{"shardid":5, "cost":39, "nodename":"e"}', + '{"shardid":6, "cost":39, "nodename":"f"}', + '{"shardid":7, "cost":40, "nodename":"g"}', + '{"shardid":8, "cost":39, "nodename":"g"}' + ]::json[], + improvement_threshold := 0.1 +)); + + + +-- limits notices based on GUC +set citus.max_rebalancer_logged_ignored_moves = 1; +SELECT unnest(shard_placement_rebalance_array( + ARRAY['{"node_name": "a"}', + '{"node_name": "b"}', + '{"node_name": "c"}', + '{"node_name": "d"}', + '{"node_name": "e"}', + '{"node_name": "f"}', + '{"node_name": "g"}' + ]::json[], + ARRAY['{"shardid":1, "cost":39, "nodename":"a"}', + '{"shardid":2, "cost":39, "nodename":"b"}', + '{"shardid":3, "cost":39, "nodename":"c"}', + '{"shardid":4, "cost":39, "nodename":"d"}', + '{"shardid":5, "cost":39, "nodename":"e"}', + '{"shardid":6, "cost":39, "nodename":"f"}', + '{"shardid":7, "cost":40, "nodename":"g"}', + '{"shardid":8, "cost":39, "nodename":"g"}' + ]::json[], + improvement_threshold := 0.1 +)); +set citus.max_rebalancer_logged_ignored_moves = 10; +SELECT unnest(shard_placement_rebalance_array( + ARRAY['{"node_name": "a"}', + '{"node_name": "b"}', + '{"node_name": "c"}', + '{"node_name": "d"}', + '{"node_name": "e"}', + '{"node_name": "f"}', + '{"node_name": "g"}' + ]::json[], + ARRAY['{"shardid":1, "cost":39, "nodename":"a"}', + '{"shardid":2, "cost":39, "nodename":"b"}', + '{"shardid":3, "cost":39, "nodename":"c"}', + '{"shardid":4, "cost":39, "nodename":"d"}', + '{"shardid":5, "cost":39, "nodename":"e"}', + '{"shardid":6, "cost":39, "nodename":"f"}', + '{"shardid":7, "cost":40, "nodename":"g"}', + '{"shardid":8, "cost":39, "nodename":"g"}' + ]::json[], + improvement_threshold := 0.1 +)); +set citus.max_rebalancer_logged_ignored_moves = -1; +SELECT unnest(shard_placement_rebalance_array( + ARRAY['{"node_name": "a"}', + '{"node_name": "b"}', + '{"node_name": "c"}', + '{"node_name": "d"}', + '{"node_name": "e"}', + '{"node_name": "f"}', + '{"node_name": "g"}' + ]::json[], + ARRAY['{"shardid":1, "cost":39, "nodename":"a"}', + '{"shardid":2, "cost":39, "nodename":"b"}', + '{"shardid":3, "cost":39, "nodename":"c"}', + '{"shardid":4, "cost":39, "nodename":"d"}', + '{"shardid":5, "cost":39, "nodename":"e"}', + '{"shardid":6, "cost":39, "nodename":"f"}', + '{"shardid":7, "cost":40, "nodename":"g"}', + '{"shardid":8, "cost":39, "nodename":"g"}' + ]::json[], + improvement_threshold := 0.1 +)); + + +-- Combining improvement_threshold and capacity works as expected. +SELECT unnest(shard_placement_rebalance_array( + ARRAY['{"node_name": "a"}', + '{"node_name": "b", "capacity": 2}']::json[], + ARRAY['{"shardid":1, "cost":20, "nodename":"a"}', + '{"shardid":2, "cost":10, "nodename":"a"}', + '{"shardid":3, "cost":10, "nodename":"a"}', + '{"shardid":4, "cost":100, "nodename":"b"}', + '{"shardid":5, "cost":50, "nodename":"b"}', + '{"shardid":6, "cost":50, "nodename":"b"}' + ]::json[], + improvement_threshold := 0.6 +)); diff --git a/src/test/regress/sql/upgrade_rebalance_strategy_before.sql b/src/test/regress/sql/upgrade_rebalance_strategy_before.sql index b76aa4ccd..458fb9cf6 100644 --- a/src/test/regress/sql/upgrade_rebalance_strategy_before.sql +++ b/src/test/regress/sql/upgrade_rebalance_strategy_before.sql @@ -18,7 +18,6 @@ CREATE FUNCTION only_worker_2(shardid bigint, nodeidarg int) (CASE WHEN nodeport = 57638 THEN TRUE ELSE FALSE END) FROM pg_dist_node where nodeid = nodeidarg $$ LANGUAGE sql; -ALTER TABLE pg_catalog.pg_dist_rebalance_strategy DISABLE TRIGGER pg_dist_rebalance_strategy_enterprise_check_trigger; SELECT citus_add_rebalance_strategy( 'custom_strategy', @@ -26,7 +25,7 @@ SELECT citus_add_rebalance_strategy( 'capacity_high_worker_1', 'only_worker_2', 0.5, - 0.2 + 0.2, + 0.3 ); SELECT citus_set_default_rebalance_strategy('custom_strategy'); -ALTER TABLE pg_catalog.pg_dist_rebalance_strategy ENABLE TRIGGER pg_dist_rebalance_strategy_enterprise_check_trigger;