Implement an improvement threshold in the rebalancer (#4927)

Every move in the rebalancer algorithm results in an improvement in the
balance. However, even if the improvement in the balance was very small
the move was still chosen. This is especially problematic if the shard
itself is very big and the move will take a long time.

This changes the rebalancer algorithm to take the relative size of the
balance improvement into account when choosing moves. By default a move
will not be chosen if it improves the balance by less than half of the
size of the shard. An extra argument is added to the rebalancer
functions so that the user can decide to lower the default threshold if
the ignored move is wanted anyway.
pull/4978/head
Jelte Fennema 2021-05-11 14:24:59 +02:00 committed by GitHub
parent fa61eda7b9
commit cbbd10b974
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
27 changed files with 913 additions and 138 deletions

View File

@ -70,6 +70,7 @@ typedef struct RebalanceOptions
int32 maxShardMoves;
ArrayType *excludedShardArray;
bool drainOnly;
float4 improvementThreshold;
Form_pg_dist_rebalance_strategy rebalanceStrategy;
} RebalanceOptions;
@ -80,14 +81,54 @@ typedef struct RebalanceOptions
*/
typedef struct RebalanceState
{
/*
* placementsHash contains the current state of all shard placements, it
* is initialized from pg_dist_placement and is then modified based on the
* found shard moves.
*/
HTAB *placementsHash;
/*
* placementUpdateList contains all of the updates that have been done to
* reach the current state of placementsHash.
*/
List *placementUpdateList;
RebalancePlanFunctions *functions;
/*
* fillStateListDesc contains all NodeFillStates ordered from full nodes to
* empty nodes.
*/
List *fillStateListDesc;
/*
* fillStateListAsc contains all NodeFillStates ordered from empty nodes to
* full nodes.
*/
List *fillStateListAsc;
/*
* disallowedPlacementList contains all placements that currently exist,
* but are not allowed according to the shardAllowedOnNode function.
*/
List *disallowedPlacementList;
/*
* totalCost is the cost of all the shards in the cluster added together.
*/
float4 totalCost;
/*
* totalCapacity is the capacity of all the nodes in the cluster added
* together.
*/
float4 totalCapacity;
/*
* ignoredMoves is the number of moves that were ignored. This is used to
* limit the amount of loglines we send.
*/
int64 ignoredMoves;
} RebalanceState;
@ -128,6 +169,7 @@ static RebalanceState * InitRebalanceState(List *workerNodeList, List *shardPlac
static void MoveShardsAwayFromDisallowedNodes(RebalanceState *state);
static bool FindAndMoveShardCost(float4 utilizationLowerBound,
float4 utilizationUpperBound,
float4 improvementThreshold,
RebalanceState *state);
static NodeFillState * FindAllowedTargetFillState(RebalanceState *state, uint64 shardId);
static void MoveShardCost(NodeFillState *sourceFillState, NodeFillState *targetFillState,
@ -163,6 +205,8 @@ PG_FUNCTION_INFO_V1(citus_shard_cost_by_disk_size);
PG_FUNCTION_INFO_V1(citus_validate_rebalance_strategy_functions);
PG_FUNCTION_INFO_V1(pg_dist_rebalance_strategy_enterprise_check);
int MaxRebalancerLoggedIgnoredMoves = 5;
#ifdef USE_ASSERT_CHECKING
@ -369,6 +413,7 @@ GetRebalanceSteps(RebalanceOptions *options)
options->threshold,
options->maxShardMoves,
options->drainOnly,
options->improvementThreshold,
&rebalancePlanFunctions);
}
@ -711,6 +756,7 @@ rebalance_table_shards(PG_FUNCTION_ARGS)
.excludedShardArray = PG_GETARG_ARRAYTYPE_P(3),
.drainOnly = PG_GETARG_BOOL(5),
.rebalanceStrategy = strategy,
.improvementThreshold = strategy->improvementThreshold,
};
Oid shardTransferModeOid = PG_GETARG_OID(4);
RebalanceTableShards(&options, shardTransferModeOid);
@ -911,6 +957,8 @@ get_rebalance_table_shards_plan(PG_FUNCTION_ARGS)
.excludedShardArray = PG_GETARG_ARRAYTYPE_P(3),
.drainOnly = PG_GETARG_BOOL(4),
.rebalanceStrategy = strategy,
.improvementThreshold = PG_GETARG_FLOAT4_OR_DEFAULT(
6, strategy->improvementThreshold),
};
@ -1228,6 +1276,7 @@ RebalancePlacementUpdates(List *workerNodeList, List *shardPlacementListList,
double threshold,
int32 maxShardMoves,
bool drainOnly,
float4 improvementThreshold,
RebalancePlanFunctions *functions)
{
List *rebalanceStates = NIL;
@ -1264,9 +1313,11 @@ RebalancePlacementUpdates(List *workerNodeList, List *shardPlacementListList,
while (list_length(state->placementUpdateList) < maxShardMoves &&
moreMovesAvailable)
{
moreMovesAvailable = FindAndMoveShardCost(utilizationLowerBound,
utilizationUpperBound,
state);
moreMovesAvailable = FindAndMoveShardCost(
utilizationLowerBound,
utilizationUpperBound,
improvementThreshold,
state);
}
placementUpdateList = state->placementUpdateList;
@ -1286,6 +1337,36 @@ RebalancePlacementUpdates(List *workerNodeList, List *shardPlacementListList,
hash_destroy(state->placementsHash);
}
if (state->ignoredMoves > 0)
{
if (MaxRebalancerLoggedIgnoredMoves == -1 ||
state->ignoredMoves <= MaxRebalancerLoggedIgnoredMoves)
{
ereport(NOTICE, (
errmsg(
"Ignored %ld moves, all of which are shown in notices above",
state->ignoredMoves
),
errhint(
"If you do want these moves to happen, try changing improvement_threshold to a lower value than what it is now (%g).",
improvementThreshold)
));
}
else
{
ereport(NOTICE, (
errmsg(
"Ignored %ld moves, %d of which are shown in notices above",
state->ignoredMoves,
MaxRebalancerLoggedIgnoredMoves
),
errhint(
"If you do want these moves to happen, try changing improvement_threshold to a lower value than what it is now (%g).",
improvementThreshold)
));
}
}
return placementUpdateList;
}
@ -1314,8 +1395,8 @@ InitRebalanceState(List *workerNodeList, List *shardPlacementList,
fillState->capacity = functions->nodeCapacity(workerNode, functions->context);
/*
* Set the utilization here although the totalCost is not set yet. This is
* important to set the utilization to INFINITY when the capacity is 0.
* Set the utilization here although the totalCost is not set yet. This
* is needed to set the utilization to INFINITY when the capacity is 0.
*/
fillState->utilization = CalculateUtilization(fillState->totalCost,
fillState->capacity);
@ -1648,13 +1729,39 @@ MoveShardCost(NodeFillState *sourceFillState,
* current state and returns a list with a new move appended that improves the
* balance of shards. The algorithm is greedy and will use the first new move
* that improves the balance. It finds nodes by trying to move a shard from the
* fullest node to the emptiest node. If no moves are possible it will try the
* second emptiest node until it tried all of them. Then it wil try the second
* fullest node. If it was able to find a move it will return true and false if
* it couldn't.
* most utilized node (highest utilization) to the emptiest node (lowest
* utilization). If no moves are possible it will try the second emptiest node
* until it tried all of them. Then it wil try the second fullest node. If it
* was able to find a move it will return true and false if it couldn't.
*
* This algorithm won't necessarily result in the best possible balance. Getting
* the best balance is an NP problem, so it's not feasible to go for the best
* balance. This algorithm was chosen because of the following reasons:
* 1. Literature research showed that similar problems would get within 2X of
* the optimal balance with a greedy algoritm.
* 2. Every move will always improve the balance. So if the user stops a
* rebalance midway through, they will never be in a worse situation than
* before.
* 3. It's pretty easy to reason about.
* 4. It's simple to implement.
*
* utilizationLowerBound and utilizationUpperBound are used to indicate what
* the target utilization range of all nodes is. If they are within this range,
* then balance is good enough. If all nodes are in this range then the cluster
* is considered balanced and no more moves are done. This is mostly useful for
* the by_disk_size rebalance strategy. If we wouldn't have this then the
* rebalancer could become flappy in certain cases.
*
* improvementThreshold is a threshold that can be used to ignore moves when
* they only improve the balance a little relative to the cost of the shard.
* Again this is mostly useful for the by_disk_size rebalance strategy.
* Without this threshold the rebalancer would move a shard of 1TB when this
* move only improves the cluster by 10GB.
*/
static bool
FindAndMoveShardCost(float4 utilizationLowerBound, float4 utilizationUpperBound,
FindAndMoveShardCost(float4 utilizationLowerBound,
float4 utilizationUpperBound,
float4 improvementThreshold,
RebalanceState *state)
{
NodeFillState *sourceFillState = NULL;
@ -1727,11 +1834,24 @@ FindAndMoveShardCost(float4 utilizationLowerBound, float4 utilizationUpperBound,
}
/*
* Ensure that the cost distrubition is actually better
* after the move, i.e. the new highest utilization of
* source and target is lower than the previous highest, or
* the highest utilization is the same, but the lowest
* increased.
* If the target is still less utilized than the source, then
* this is clearly a good move. And if they are equally
* utilized too.
*/
if (newTargetUtilization <= newSourceUtilization)
{
MoveShardCost(sourceFillState, targetFillState,
shardCost, state);
return true;
}
/*
* The target is now more utilized than the source. So we need
* to determine if the move is a net positive for the overall
* cost distribution. This means that the new highest
* utilization of source and target is lower than the previous
* highest, or the highest utilization is the same, but the
* lowest increased.
*/
if (newTargetUtilization > sourceFillState->utilization)
{
@ -1752,6 +1872,58 @@ FindAndMoveShardCost(float4 utilizationLowerBound, float4 utilizationUpperBound,
*/
continue;
}
/*
* fmaxf and fminf here are only needed for cases when nodes
* have different capacities. If they are the same, then both
* arguments are equal.
*/
float4 utilizationImprovement = fmaxf(
sourceFillState->utilization - newTargetUtilization,
newSourceUtilization - targetFillState->utilization
);
float4 utilizationAddedByShard = fminf(
newTargetUtilization - targetFillState->utilization,
sourceFillState->utilization - newSourceUtilization
);
/*
* If the shard causes a lot of utilization, but the
* improvement which is gained by moving it is small, then we
* ignore the move. Probably there are other shards that are
* better candidates, and in any case it's probably not worth
* the effort to move the this shard.
*
* One of the main cases this tries to avoid is the rebalancer
* moving a very large shard with the "by_disk_size" strategy
* when that only gives a small benefit in data distribution.
*/
float4 normalizedUtilizationImprovement = utilizationImprovement /
utilizationAddedByShard;
if (normalizedUtilizationImprovement < improvementThreshold)
{
state->ignoredMoves++;
if (MaxRebalancerLoggedIgnoredMoves == -1 ||
state->ignoredMoves <= MaxRebalancerLoggedIgnoredMoves)
{
ereport(NOTICE, (
errmsg(
"Ignoring move of shard %ld from %s:%d to %s:%d, because the move only brings a small improvement relative to the shard its size",
shardCost->shardId,
sourceFillState->node->workerName,
sourceFillState->node->workerPort,
targetFillState->node->workerName,
targetFillState->node->workerPort
),
errdetail(
"The balance improvement of %g is lower than the improvement_threshold of %g",
normalizedUtilizationImprovement,
improvementThreshold
)
));
}
continue;
}
MoveShardCost(sourceFillState, targetFillState,
shardCost, state);
return true;

View File

@ -68,6 +68,7 @@
#include "distributed/time_constants.h"
#include "distributed/query_stats.h"
#include "distributed/remote_commands.h"
#include "distributed/shard_rebalancer.h"
#include "distributed/shared_library_init.h"
#include "distributed/statistics_collection.h"
#include "distributed/subplan_execution.h"
@ -1190,6 +1191,16 @@ RegisterCitusConfigVariables(void)
GUC_UNIT_KB | GUC_STANDARD,
NULL, NULL, NULL);
DefineCustomIntVariable(
"citus.max_rebalancer_logged_ignored_moves",
gettext_noop("Sets the maximum number of ignored moves the rebalance logs"),
NULL,
&MaxRebalancerLoggedIgnoredMoves,
5, -1, INT_MAX,
PGC_USERSET,
GUC_NO_SHOW_ALL,
NULL, NULL, NULL);
DefineCustomIntVariable(
"citus.max_shared_pool_size",
gettext_noop("Sets the maximum number of connections allowed per worker node "

View File

@ -5,5 +5,11 @@
#include "udfs/worker_partitioned_relation_total_size/10.1-1.sql"
#include "udfs/worker_partitioned_relation_size/10.1-1.sql"
#include "udfs/worker_partitioned_table_size/10.1-1.sql"
#include "udfs/citus_prepare_pg_upgrade/10.1-1.sql"
#include "udfs/citus_finish_pg_upgrade/10.1-1.sql"
#include "udfs/citus_local_disk_space_stats/10.1-1.sql"
#include "udfs/get_rebalance_table_shards_plan/10.1-1.sql"
#include "udfs/citus_add_rebalance_strategy/10.1-1.sql"
ALTER TABLE pg_catalog.pg_dist_rebalance_strategy ADD COLUMN improvement_threshold float4 NOT NULL default 0;
UPDATE pg_catalog.pg_dist_rebalance_strategy SET improvement_threshold = 0.5 WHERE name = 'by_disk_size';

View File

@ -21,4 +21,13 @@ DROP FUNCTION pg_catalog.worker_partitioned_relation_size(regclass);
DROP FUNCTION pg_catalog.worker_partitioned_table_size(regclass);
DROP FUNCTION pg_catalog.citus_local_disk_space_stats();
#include "../udfs/citus_prepare_pg_upgrade/9.5-1.sql"
#include "../udfs/citus_finish_pg_upgrade/10.0-1.sql"
#include "../udfs/get_rebalance_table_shards_plan/9.2-1.sql"
-- the migration for citus_add_rebalance_strategy from 9.2-1 was the first one,
-- so it doesn't have a DROP. This is why we DROP manually here.
DROP FUNCTION pg_catalog.citus_add_rebalance_strategy;
#include "../udfs/citus_add_rebalance_strategy/9.2-1.sql"
ALTER TABLE pg_catalog.pg_dist_rebalance_strategy DROP COLUMN improvement_threshold;

View File

@ -0,0 +1,30 @@
DROP FUNCTION pg_catalog.citus_add_rebalance_strategy;
CREATE OR REPLACE FUNCTION pg_catalog.citus_add_rebalance_strategy(
name name,
shard_cost_function regproc,
node_capacity_function regproc,
shard_allowed_on_node_function regproc,
default_threshold float4,
minimum_threshold float4 DEFAULT 0,
improvement_threshold float4 DEFAULT 0
)
RETURNS VOID AS $$
INSERT INTO
pg_catalog.pg_dist_rebalance_strategy(
name,
shard_cost_function,
node_capacity_function,
shard_allowed_on_node_function,
default_threshold,
minimum_threshold
) VALUES (
name,
shard_cost_function,
node_capacity_function,
shard_allowed_on_node_function,
default_threshold,
minimum_threshold
);
$$ LANGUAGE sql;
COMMENT ON FUNCTION pg_catalog.citus_add_rebalance_strategy(name,regproc,regproc,regproc,float4, float4, float4)
IS 'adds a new rebalance strategy which can be used when rebalancing shards or draining nodes';

View File

@ -1,10 +1,12 @@
DROP FUNCTION pg_catalog.citus_add_rebalance_strategy;
CREATE OR REPLACE FUNCTION pg_catalog.citus_add_rebalance_strategy(
name name,
shard_cost_function regproc,
node_capacity_function regproc,
shard_allowed_on_node_function regproc,
default_threshold float4,
minimum_threshold float4 DEFAULT 0
minimum_threshold float4 DEFAULT 0,
improvement_threshold float4 DEFAULT 0
)
RETURNS VOID AS $$
INSERT INTO
@ -24,5 +26,5 @@ CREATE OR REPLACE FUNCTION pg_catalog.citus_add_rebalance_strategy(
minimum_threshold
);
$$ LANGUAGE sql;
COMMENT ON FUNCTION pg_catalog.citus_add_rebalance_strategy(name,regproc,regproc,regproc,float4, float4)
COMMENT ON FUNCTION pg_catalog.citus_add_rebalance_strategy(name,regproc,regproc,regproc,float4, float4, float4)
IS 'adds a new rebalance strategy which can be used when rebalancing shards or draining nodes';

View File

@ -31,7 +31,8 @@ BEGIN
node_capacity_function::regprocedure::regproc,
shard_allowed_on_node_function::regprocedure::regproc,
default_threshold,
minimum_threshold
minimum_threshold,
improvement_threshold
FROM public.pg_dist_rebalance_strategy;
ALTER TABLE pg_catalog.pg_dist_rebalance_strategy ENABLE TRIGGER pg_dist_rebalance_strategy_enterprise_check_trigger;

View File

@ -31,7 +31,8 @@ BEGIN
node_capacity_function::regprocedure::regproc,
shard_allowed_on_node_function::regprocedure::regproc,
default_threshold,
minimum_threshold
minimum_threshold,
improvement_threshold
FROM public.pg_dist_rebalance_strategy;
ALTER TABLE pg_catalog.pg_dist_rebalance_strategy ENABLE TRIGGER pg_dist_rebalance_strategy_enterprise_check_trigger;

View File

@ -0,0 +1,54 @@
CREATE OR REPLACE FUNCTION pg_catalog.citus_prepare_pg_upgrade()
RETURNS void
LANGUAGE plpgsql
SET search_path = pg_catalog
AS $cppu$
BEGIN
--
-- Drop existing backup tables
--
DROP TABLE IF EXISTS public.pg_dist_partition;
DROP TABLE IF EXISTS public.pg_dist_shard;
DROP TABLE IF EXISTS public.pg_dist_placement;
DROP TABLE IF EXISTS public.pg_dist_node_metadata;
DROP TABLE IF EXISTS public.pg_dist_node;
DROP TABLE IF EXISTS public.pg_dist_local_group;
DROP TABLE IF EXISTS public.pg_dist_transaction;
DROP TABLE IF EXISTS public.pg_dist_colocation;
DROP TABLE IF EXISTS public.pg_dist_authinfo;
DROP TABLE IF EXISTS public.pg_dist_poolinfo;
DROP TABLE IF EXISTS public.pg_dist_rebalance_strategy;
--
-- backup citus catalog tables
--
CREATE TABLE public.pg_dist_partition AS SELECT * FROM pg_catalog.pg_dist_partition;
CREATE TABLE public.pg_dist_shard AS SELECT * FROM pg_catalog.pg_dist_shard;
CREATE TABLE public.pg_dist_placement AS SELECT * FROM pg_catalog.pg_dist_placement;
CREATE TABLE public.pg_dist_node_metadata AS SELECT * FROM pg_catalog.pg_dist_node_metadata;
CREATE TABLE public.pg_dist_node AS SELECT * FROM pg_catalog.pg_dist_node;
CREATE TABLE public.pg_dist_local_group AS SELECT * FROM pg_catalog.pg_dist_local_group;
CREATE TABLE public.pg_dist_transaction AS SELECT * FROM pg_catalog.pg_dist_transaction;
CREATE TABLE public.pg_dist_colocation AS SELECT * FROM pg_catalog.pg_dist_colocation;
-- enterprise catalog tables
CREATE TABLE public.pg_dist_authinfo AS SELECT * FROM pg_catalog.pg_dist_authinfo;
CREATE TABLE public.pg_dist_poolinfo AS SELECT * FROM pg_catalog.pg_dist_poolinfo;
CREATE TABLE public.pg_dist_rebalance_strategy AS SELECT
name,
default_strategy,
shard_cost_function::regprocedure::text,
node_capacity_function::regprocedure::text,
shard_allowed_on_node_function::regprocedure::text,
default_threshold,
minimum_threshold,
improvement_threshold
FROM pg_catalog.pg_dist_rebalance_strategy;
-- store upgrade stable identifiers on pg_dist_object catalog
UPDATE citus.pg_dist_object
SET (type, object_names, object_args) = (SELECT * FROM pg_identify_object_as_address(classid, objid, objsubid));
END;
$cppu$;
COMMENT ON FUNCTION pg_catalog.citus_prepare_pg_upgrade()
IS 'perform tasks to copy citus settings to a location that could later be restored after pg_upgrade is done';

View File

@ -40,7 +40,8 @@ BEGIN
node_capacity_function::regprocedure::text,
shard_allowed_on_node_function::regprocedure::text,
default_threshold,
minimum_threshold
minimum_threshold,
improvement_threshold
FROM pg_catalog.pg_dist_rebalance_strategy;
-- store upgrade stable identifiers on pg_dist_object catalog

View File

@ -0,0 +1,27 @@
-- get_rebalance_table_shards_plan shows the actual events that will be performed
-- if a rebalance operation will be performed with the same arguments, which allows users
-- to understand the impact of the change overall availability of the application and
-- network trafic.
--
DROP FUNCTION pg_catalog.get_rebalance_table_shards_plan;
CREATE OR REPLACE FUNCTION pg_catalog.get_rebalance_table_shards_plan(
relation regclass default NULL,
threshold float4 default NULL,
max_shard_moves int default 1000000,
excluded_shard_list bigint[] default '{}',
drain_only boolean default false,
rebalance_strategy name default NULL,
improvement_threshold float4 DEFAULT NULL
)
RETURNS TABLE (table_name regclass,
shardid bigint,
shard_size bigint,
sourcename text,
sourceport int,
targetname text,
targetport int)
AS 'MODULE_PATHNAME'
LANGUAGE C VOLATILE;
COMMENT ON FUNCTION pg_catalog.get_rebalance_table_shards_plan(regclass, float4, int, bigint[], boolean, name, float4)
IS 'returns the list of shard placement moves to be done on a rebalance operation';

View File

@ -10,7 +10,8 @@ CREATE OR REPLACE FUNCTION pg_catalog.get_rebalance_table_shards_plan(
max_shard_moves int default 1000000,
excluded_shard_list bigint[] default '{}',
drain_only boolean default false,
rebalance_strategy name default NULL
rebalance_strategy name default NULL,
improvement_threshold float4 DEFAULT NULL
)
RETURNS TABLE (table_name regclass,
shardid bigint,
@ -21,6 +22,6 @@ CREATE OR REPLACE FUNCTION pg_catalog.get_rebalance_table_shards_plan(
targetport int)
AS 'MODULE_PATHNAME'
LANGUAGE C VOLATILE;
COMMENT ON FUNCTION pg_catalog.get_rebalance_table_shards_plan(regclass, float4, int, bigint[], boolean, name)
COMMENT ON FUNCTION pg_catalog.get_rebalance_table_shards_plan(regclass, float4, int, bigint[], boolean, name, float4)
IS 'returns the list of shard placement moves to be done on a rebalance operation';

View File

@ -89,6 +89,7 @@ shard_placement_rebalance_array(PG_FUNCTION_ARGS)
float threshold = PG_GETARG_FLOAT4(2);
int32 maxShardMoves = PG_GETARG_INT32(3);
bool drainOnly = PG_GETARG_BOOL(4);
float utilizationImproventThreshold = PG_GETARG_FLOAT4(5);
List *workerNodeList = NIL;
List *shardPlacementListList = NIL;
@ -143,6 +144,7 @@ shard_placement_rebalance_array(PG_FUNCTION_ARGS)
threshold,
maxShardMoves,
drainOnly,
utilizationImproventThreshold,
&rebalancePlanFunctions);
ArrayType *placementUpdateJsonArray = PlacementUpdateListToJsonArray(
placementUpdateList);

View File

@ -25,10 +25,11 @@ typedef struct FormData_pg_dist_rebalance_strategy
NameData name; /* user readable name of the strategy */
bool default_strategy; /* if this strategy is the default strategy */
Oid shardCostFunction; /* function to calculate the shard cost */
Oid nodeCapacityFunction; /* function to get the capacity of a node */
Oid shardAllowedOnNodeFunction; /* function to check if shard is allowed on node */
float4 defaultThreshold; /* default threshold that is used */
float4 minimumThreshold; /* minimum threshold that is allowed */
Oid nodeCapacityFunction; /* function to get the capacity of a node */
Oid shardAllowedOnNodeFunction; /* function to check if shard is allowed on node */
float4 defaultThreshold; /* default threshold that is used */
float4 minimumThreshold; /* minimum threshold that is allowed */
float4 improvementThreshold; /* the shard size threshold that is used */
} FormData_pg_dist_rebalance_strategy;
/* ----------------

View File

@ -112,15 +112,48 @@ typedef struct PlacementUpdateEventProgress
typedef struct NodeFillState
{
WorkerNode *node;
/*
* capacity is how big this node is, relative to the other nodes in the
* cluster. This has no unit, it can represent whatever the user wants.
* Some examples:
* 1. GBs of RAM
* 2. number of CPUs
* 3. GBs of disk
* 4. relative improvement of new CPU generation in newly added nodes
*/
float4 capacity;
/*
* totalCost is the costs of ShardCosts on the node added together. This
* doesn't have a unit. See the ShardCost->cost comment for some examples.
*/
float4 totalCost;
/*
* utilization is how "full" the node is. This is always totalCost divided
* by capacity. Since neither of those have a unit, this also doesn't have
* one.
*/
float4 utilization;
/*
* shardCostListDesc contains all ShardCosts that are on the current node,
* ordered from high cost to low cost.
*/
List *shardCostListDesc;
} NodeFillState;
typedef struct ShardCost
{
uint64 shardId;
/*
* cost is the cost of the shard. This doesn't have a unit.
* Some examples of what this could represent:
* 1. GBs of data
* 2. number of queries per day
*/
float4 cost;
} ShardCost;
@ -138,6 +171,8 @@ typedef struct RebalancePlanFunctions
void *context;
} RebalancePlanFunctions;
extern int MaxRebalancerLoggedIgnoredMoves;
/* External function declarations */
extern Datum shard_placement_rebalance_array(PG_FUNCTION_ARGS);
extern Datum shard_placement_replication_array(PG_FUNCTION_ARGS);
@ -151,6 +186,7 @@ extern List * RebalancePlacementUpdates(List *workerNodeList, List *shardPlaceme
double threshold,
int32 maxShardMoves,
bool drainOnly,
float4 utilizationImproventThreshold,
RebalancePlanFunctions *rebalancePlanFunctions);
extern List * ReplicationPlacementUpdates(List *workerNodeList, List *shardPlacementList,
int shardReplicationFactor);

View File

@ -560,16 +560,20 @@ SELECT * FROM print_extension_changes();
-- Snapshot of state at 10.1-1
ALTER EXTENSION citus UPDATE TO '10.1-1';
SELECT * FROM print_extension_changes();
previous_object | current_object
previous_object | current_object
---------------------------------------------------------------------
function citus_internal.columnar_ensure_objects_exist() |
function create_distributed_table(regclass,text,citus.distribution_type,text) |
| function citus_local_disk_space_stats()
| function create_distributed_table(regclass,text,citus.distribution_type,text,integer)
| function worker_partitioned_relation_size(regclass)
| function worker_partitioned_relation_total_size(regclass)
| function worker_partitioned_table_size(regclass)
(7 rows)
function citus_add_rebalance_strategy(name,regproc,regproc,regproc,real,real) |
function citus_internal.columnar_ensure_objects_exist() |
function create_distributed_table(regclass,text,citus.distribution_type,text) |
function get_rebalance_table_shards_plan(regclass,real,integer,bigint[],boolean,name) |
| function citus_add_rebalance_strategy(name,regproc,regproc,regproc,real,real,real)
| function citus_local_disk_space_stats()
| function create_distributed_table(regclass,text,citus.distribution_type,text,integer)
| function get_rebalance_table_shards_plan(regclass,real,integer,bigint[],boolean,name,real)
| function worker_partitioned_relation_size(regclass)
| function worker_partitioned_relation_total_size(regclass)
| function worker_partitioned_table_size(regclass)
(11 rows)
DROP TABLE prev_objects, extension_diff;
-- show running version

View File

@ -6,7 +6,7 @@
-- part of the query so new changes to it won't affect this test.
SELECT attrelid::regclass, attname, atthasmissing, attmissingval
FROM pg_attribute
WHERE atthasmissing AND attrelid NOT IN ('pg_dist_node'::regclass)
WHERE atthasmissing AND attrelid NOT IN ('pg_dist_node'::regclass, 'pg_dist_rebalance_strategy'::regclass)
ORDER BY attrelid, attname;
attrelid | attname | atthasmissing | attmissingval
---------------------------------------------------------------------

View File

@ -132,7 +132,8 @@ CREATE OR REPLACE FUNCTION shard_placement_rebalance_array(
shard_placement_list json[],
threshold float4 DEFAULT 0,
max_shard_moves int DEFAULT 1000000,
drain_only bool DEFAULT false
drain_only bool DEFAULT false,
improvement_threshold float4 DEFAULT 0.5
)
RETURNS json[]
AS 'citus'
@ -1180,6 +1181,7 @@ CONTEXT: while executing command on localhost:xxxxx
DROP USER testrole;
-- Test costs
set citus.shard_count = 4;
SET citus.next_shard_id TO 123040;
CREATE TABLE tab (x int);
SELECT create_distributed_table('tab','x');
create_distributed_table
@ -1193,6 +1195,7 @@ INSERT INTO tab SELECT 1 from generate_series(1, 30000);
INSERT INTO tab SELECT 2 from generate_series(1, 10000);
INSERT INTO tab SELECT 3 from generate_series(1, 10000);
INSERT INTO tab SELECT 6 from generate_series(1, 10000);
VACUUM FULL tab;
ANALYZE tab;
\c - - - :worker_1_port
SELECT table_schema, table_name, row_estimate, total_bytes
@ -1211,8 +1214,8 @@ WHERE table_schema = 'public'
) a ORDER BY table_name;
table_schema | table_name | row_estimate | total_bytes
---------------------------------------------------------------------
public | tab_123033 | 30000 | 1114112
public | tab_123035 | 10000 | 393216
public | tab_123040 | 30000 | 1089536
public | tab_123042 | 10000 | 368640
(2 rows)
\c - - - :worker_2_port
@ -1232,8 +1235,8 @@ WHERE table_schema = 'public'
) a ORDER BY table_name;
table_schema | table_name | row_estimate | total_bytes
---------------------------------------------------------------------
public | tab_123034 | 10000 | 393216
public | tab_123036 | 10000 | 393216
public | tab_123041 | 10000 | 368640
public | tab_123043 | 10000 | 368640
(2 rows)
\c - - - :master_port
@ -1245,7 +1248,7 @@ SELECT * FROM get_rebalance_table_shards_plan('tab');
SELECT * FROM get_rebalance_table_shards_plan('tab', rebalance_strategy := 'by_disk_size');
table_name | shardid | shard_size | sourcename | sourceport | targetname | targetport
---------------------------------------------------------------------
tab | 123035 | 0 | localhost | 57637 | localhost | 57638
tab | 123042 | 0 | localhost | 57637 | localhost | 57638
(1 row)
SELECT * FROM get_rebalance_table_shards_plan('tab', rebalance_strategy := 'by_disk_size', threshold := 0);
@ -1253,7 +1256,7 @@ WARNING: the given threshold is lower than the minimum threshold allowed by the
DETAIL: Using threshold of 0.01
table_name | shardid | shard_size | sourcename | sourceport | targetname | targetport
---------------------------------------------------------------------
tab | 123035 | 0 | localhost | 57637 | localhost | 57638
tab | 123042 | 0 | localhost | 57637 | localhost | 57638
(1 row)
SELECT * FROM rebalance_table_shards('tab', shard_transfer_mode:='block_writes');
@ -1312,6 +1315,7 @@ INSERT INTO tab2 SELECT 1 from generate_series(1, 0);
INSERT INTO tab2 SELECT 2 from generate_series(1, 60000);
INSERT INTO tab2 SELECT 3 from generate_series(1, 10000);
INSERT INTO tab2 SELECT 6 from generate_series(1, 10000);
VACUUM FULL tab, tab2;
ANALYZE tab, tab2;
\c - - - :worker_1_port
SELECT table_schema, table_name, row_estimate, total_bytes
@ -1331,7 +1335,7 @@ WHERE table_schema = 'public'
table_schema | table_name | row_estimate | total_bytes
---------------------------------------------------------------------
public | tab2_123050 | 0 | 0
public | tab_123033 | 30000 | 1114112
public | tab_123040 | 30000 | 1089536
(2 rows)
\c - - - :worker_2_port
@ -1351,25 +1355,43 @@ WHERE table_schema = 'public'
) a ORDER BY table_name;
table_schema | table_name | row_estimate | total_bytes
---------------------------------------------------------------------
public | tab2_123051 | 10000 | 393216
public | tab2_123052 | 10000 | 393216
public | tab2_123053 | 60000 | 2203648
public | tab_123034 | 10000 | 393216
public | tab_123035 | 10000 | 368640
public | tab_123036 | 10000 | 393216
public | tab2_123051 | 10000 | 368640
public | tab2_123052 | 10000 | 368640
public | tab2_123053 | 60000 | 2179072
public | tab_123041 | 10000 | 368640
public | tab_123042 | 10000 | 368640
public | tab_123043 | 10000 | 368640
(6 rows)
\c - - - :master_port
SELECT * FROM get_rebalance_table_shards_plan('tab', rebalance_strategy := 'by_disk_size');
NOTICE: Ignoring move of shard xxxxx from localhost:xxxxx to localhost:xxxxx, because the move only brings a small improvement relative to the shard its size
DETAIL: The balance improvement of 0.151125 is lower than the improvement_threshold of 0.5
NOTICE: Ignored 1 moves, all of which are shown in notices above
HINT: If you do want these moves to happen, try changing improvement_threshold to a lower value than what it is now (0.5).
table_name | shardid | shard_size | sourcename | sourceport | targetname | targetport
---------------------------------------------------------------------
tab | 123036 | 0 | localhost | 57638 | localhost | 57637
tab | 123041 | 0 | localhost | 57638 | localhost | 57637
tab2 | 123051 | 0 | localhost | 57638 | localhost | 57637
tab | 123042 | 0 | localhost | 57638 | localhost | 57637
tab2 | 123052 | 0 | localhost | 57638 | localhost | 57637
(4 rows)
-- supports improvement_threshold
SELECT * FROM get_rebalance_table_shards_plan('tab', rebalance_strategy := 'by_disk_size', improvement_threshold := 0);
table_name | shardid | shard_size | sourcename | sourceport | targetname | targetport
---------------------------------------------------------------------
tab | 123043 | 0 | localhost | 57638 | localhost | 57637
tab2 | 123053 | 0 | localhost | 57638 | localhost | 57637
tab | 123033 | 0 | localhost | 57637 | localhost | 57638
tab | 123040 | 0 | localhost | 57637 | localhost | 57638
tab2 | 123050 | 0 | localhost | 57637 | localhost | 57638
(4 rows)
SELECT * FROM rebalance_table_shards('tab', rebalance_strategy := 'by_disk_size', shard_transfer_mode:='block_writes');
NOTICE: Ignoring move of shard xxxxx from localhost:xxxxx to localhost:xxxxx, because the move only brings a small improvement relative to the shard its size
DETAIL: The balance improvement of 0.151125 is lower than the improvement_threshold of 0.5
NOTICE: Ignored 1 moves, all of which are shown in notices above
HINT: If you do want these moves to happen, try changing improvement_threshold to a lower value than what it is now (0.5).
NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ...
NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ...
rebalance_table_shards
@ -1380,10 +1402,10 @@ NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ...
SELECT * FROM public.table_placements_per_node;
nodeport | logicalrelid | count
---------------------------------------------------------------------
57637 | tab | 1
57638 | tab | 3
57637 | tab2 | 1
57638 | tab2 | 3
57637 | tab | 3
57638 | tab | 1
57637 | tab2 | 3
57638 | tab2 | 1
(4 rows)
ANALYZE tab, tab2;
@ -1401,50 +1423,50 @@ SELECT table_schema, table_name, row_estimate, total_bytes
WHERE relkind = 'r'
) a
WHERE table_schema = 'public'
) a ORDER BY table_name;
table_schema | table_name | row_estimate | total_bytes
---------------------------------------------------------------------
public | tab2_123050 | 0 | 0
public | tab2_123051 | 10000 | 368640
public | tab2_123052 | 10000 | 368640
public | tab_123040 | 30000 | 1089536
public | tab_123041 | 10000 | 368640
public | tab_123042 | 10000 | 368640
(6 rows)
\c - - - :worker_2_port
SELECT table_schema, table_name, row_estimate, total_bytes
FROM (
SELECT *, total_bytes-index_bytes-COALESCE(toast_bytes,0) AS table_bytes FROM (
SELECT c.oid,nspname AS table_schema, relname AS TABLE_NAME
, c.reltuples AS row_estimate
, pg_total_relation_size(c.oid) AS total_bytes
, pg_indexes_size(c.oid) AS index_bytes
, pg_total_relation_size(reltoastrelid) AS toast_bytes
FROM pg_class c
LEFT JOIN pg_namespace n ON n.oid = c.relnamespace
WHERE relkind = 'r'
) a
WHERE table_schema = 'public'
) a ORDER BY table_name;
table_schema | table_name | row_estimate | total_bytes
---------------------------------------------------------------------
public | tab2_123053 | 60000 | 2179072
public | tab_123036 | 10000 | 368640
public | tab_123043 | 10000 | 368640
(2 rows)
\c - - - :worker_2_port
SELECT table_schema, table_name, row_estimate, total_bytes
FROM (
SELECT *, total_bytes-index_bytes-COALESCE(toast_bytes,0) AS table_bytes FROM (
SELECT c.oid,nspname AS table_schema, relname AS TABLE_NAME
, c.reltuples AS row_estimate
, pg_total_relation_size(c.oid) AS total_bytes
, pg_indexes_size(c.oid) AS index_bytes
, pg_total_relation_size(reltoastrelid) AS toast_bytes
FROM pg_class c
LEFT JOIN pg_namespace n ON n.oid = c.relnamespace
WHERE relkind = 'r'
) a
WHERE table_schema = 'public'
) a ORDER BY table_name;
table_schema | table_name | row_estimate | total_bytes
---------------------------------------------------------------------
public | tab2_123050 | 0 | 0
public | tab2_123051 | 10000 | 393216
public | tab2_123052 | 10000 | 393216
public | tab_123033 | 30000 | 1089536
public | tab_123034 | 10000 | 393216
public | tab_123035 | 10000 | 368640
(6 rows)
\c - - - :master_port
DROP TABLE tab2;
CREATE OR REPLACE FUNCTION capacity_high_worker_1(nodeidarg int)
CREATE OR REPLACE FUNCTION capacity_high_worker_2(nodeidarg int)
RETURNS real AS $$
SELECT
(CASE WHEN nodeport = 57637 THEN 1000 ELSE 1 END)::real
(CASE WHEN nodeport = 57638 THEN 1000 ELSE 1 END)::real
FROM pg_dist_node where nodeid = nodeidarg
$$ LANGUAGE sql;
SELECT citus_add_rebalance_strategy(
'capacity_high_worker_1',
'capacity_high_worker_2',
'citus_shard_cost_1',
'capacity_high_worker_1',
'capacity_high_worker_2',
'citus_shard_allowed_on_node_true',
0
);
@ -1453,15 +1475,15 @@ SELECT citus_add_rebalance_strategy(
(1 row)
SELECT * FROM get_rebalance_table_shards_plan('tab', rebalance_strategy := 'capacity_high_worker_1');
SELECT * FROM get_rebalance_table_shards_plan('tab', rebalance_strategy := 'capacity_high_worker_2');
table_name | shardid | shard_size | sourcename | sourceport | targetname | targetport
---------------------------------------------------------------------
tab | 123033 | 0 | localhost | 57638 | localhost | 57637
tab | 123034 | 0 | localhost | 57638 | localhost | 57637
tab | 123035 | 0 | localhost | 57638 | localhost | 57637
tab | 123040 | 0 | localhost | 57637 | localhost | 57638
tab | 123041 | 0 | localhost | 57637 | localhost | 57638
tab | 123042 | 0 | localhost | 57637 | localhost | 57638
(3 rows)
SELECT * FROM rebalance_table_shards('tab', rebalance_strategy := 'capacity_high_worker_1', shard_transfer_mode:='block_writes');
SELECT * FROM rebalance_table_shards('tab', rebalance_strategy := 'capacity_high_worker_2', shard_transfer_mode:='block_writes');
NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ...
NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ...
NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ...
@ -1473,10 +1495,10 @@ NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ...
SELECT * FROM public.table_placements_per_node;
nodeport | logicalrelid | count
---------------------------------------------------------------------
57637 | tab | 4
57638 | tab | 4
(1 row)
SELECT citus_set_default_rebalance_strategy('capacity_high_worker_1');
SELECT citus_set_default_rebalance_strategy('capacity_high_worker_2');
citus_set_default_rebalance_strategy
---------------------------------------------------------------------
@ -1496,20 +1518,20 @@ SELECT * FROM rebalance_table_shards('tab', shard_transfer_mode:='block_writes')
SELECT * FROM public.table_placements_per_node;
nodeport | logicalrelid | count
---------------------------------------------------------------------
57637 | tab | 4
57638 | tab | 4
(1 row)
CREATE FUNCTION only_worker_2(shardid bigint, nodeidarg int)
CREATE FUNCTION only_worker_1(shardid bigint, nodeidarg int)
RETURNS boolean AS $$
SELECT
(CASE WHEN nodeport = 57638 THEN TRUE ELSE FALSE END)
(CASE WHEN nodeport = 57637 THEN TRUE ELSE FALSE END)
FROM pg_dist_node where nodeid = nodeidarg
$$ LANGUAGE sql;
SELECT citus_add_rebalance_strategy(
'only_worker_2',
'only_worker_1',
'citus_shard_cost_1',
'citus_node_capacity_1',
'only_worker_2',
'only_worker_1',
0
);
citus_add_rebalance_strategy
@ -1517,7 +1539,7 @@ SELECT citus_add_rebalance_strategy(
(1 row)
SELECT citus_set_default_rebalance_strategy('only_worker_2');
SELECT citus_set_default_rebalance_strategy('only_worker_1');
citus_set_default_rebalance_strategy
---------------------------------------------------------------------
@ -1526,10 +1548,10 @@ SELECT citus_set_default_rebalance_strategy('only_worker_2');
SELECT * FROM get_rebalance_table_shards_plan('tab');
table_name | shardid | shard_size | sourcename | sourceport | targetname | targetport
---------------------------------------------------------------------
tab | 123033 | 0 | localhost | 57637 | localhost | 57638
tab | 123034 | 0 | localhost | 57637 | localhost | 57638
tab | 123035 | 0 | localhost | 57637 | localhost | 57638
tab | 123036 | 0 | localhost | 57637 | localhost | 57638
tab | 123040 | 0 | localhost | 57638 | localhost | 57637
tab | 123041 | 0 | localhost | 57638 | localhost | 57637
tab | 123042 | 0 | localhost | 57638 | localhost | 57637
tab | 123043 | 0 | localhost | 57638 | localhost | 57637
(4 rows)
SELECT * FROM rebalance_table_shards('tab', shard_transfer_mode:='block_writes');
@ -1545,7 +1567,7 @@ NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ...
SELECT * FROM public.table_placements_per_node;
nodeport | logicalrelid | count
---------------------------------------------------------------------
57638 | tab | 4
57637 | tab | 4
(1 row)
SELECT citus_set_default_rebalance_strategy('by_shard_count');
@ -1557,8 +1579,8 @@ SELECT citus_set_default_rebalance_strategy('by_shard_count');
SELECT * FROM get_rebalance_table_shards_plan('tab');
table_name | shardid | shard_size | sourcename | sourceport | targetname | targetport
---------------------------------------------------------------------
tab | 123033 | 0 | localhost | 57638 | localhost | 57637
tab | 123034 | 0 | localhost | 57638 | localhost | 57637
tab | 123040 | 0 | localhost | 57637 | localhost | 57638
tab | 123041 | 0 | localhost | 57637 | localhost | 57638
(2 rows)
-- Check all the error handling cases
@ -1849,7 +1871,7 @@ UPDATE pg_dist_rebalance_strategy SET default_strategy=true WHERE name='by_shard
SELECT citus_add_rebalance_strategy(
'default_threshold_too_low',
'citus_shard_cost_1',
'capacity_high_worker_1',
'capacity_high_worker_2',
'citus_shard_allowed_on_node_true',
0,
0.1

View File

@ -3,7 +3,8 @@ CREATE OR REPLACE FUNCTION shard_placement_rebalance_array(
shard_placement_list json[],
threshold float4 DEFAULT 0,
max_shard_moves int DEFAULT 1000000,
drain_only bool DEFAULT false
drain_only bool DEFAULT false,
improvement_threshold float4 DEFAULT 0.5
)
RETURNS json[]
AS 'citus'
@ -500,3 +501,244 @@ NOTICE: Stopped searching before we were out of moves. Please rerun the rebalan
{"updatetype":1,"shardid":8,"sourcename":"hostname2","sourceport":5432,"targetname":"hostname3","targetport":5432}
(5 rows)
-- Don't move a big shards if it doesn't improve the utilization balance much.
SELECT unnest(shard_placement_rebalance_array(
ARRAY['{"node_name": "a"}',
'{"node_name": "b"}']::json[],
ARRAY['{"shardid":1, "cost":20, "nodename":"a"}',
'{"shardid":2, "cost":20, "nodename":"a"}',
'{"shardid":3, "cost":100, "nodename":"b"}',
'{"shardid":4, "cost":50, "nodename":"b"}'
]::json[]
));
NOTICE: Ignoring move of shard xxxxx from b:5432 to a:5432, because the move only brings a small improvement relative to the shard its size
DETAIL: The balance improvement of 0.1 is lower than the improvement_threshold of 0.5
NOTICE: Ignored 1 moves, all of which are shown in notices above
HINT: If you do want these moves to happen, try changing improvement_threshold to a lower value than what it is now (0.5).
unnest
---------------------------------------------------------------------
{"updatetype":1,"shardid":4,"sourcename":"b","sourceport":5432,"targetname":"a","targetport":5432}
(1 row)
SELECT unnest(shard_placement_rebalance_array(
ARRAY['{"node_name": "a"}',
'{"node_name": "b"}']::json[],
ARRAY['{"shardid":1, "cost":40, "nodename":"a"}',
'{"shardid":2, "cost":40, "nodename":"a"}',
'{"shardid":3, "cost":100, "nodename":"b"}',
'{"shardid":4, "cost":100, "nodename":"b"}'
]::json[]
));
NOTICE: Ignoring move of shard xxxxx from b:5432 to a:5432, because the move only brings a small improvement relative to the shard its size
DETAIL: The balance improvement of 0.2 is lower than the improvement_threshold of 0.5
NOTICE: Ignoring move of shard xxxxx from b:5432 to a:5432, because the move only brings a small improvement relative to the shard its size
DETAIL: The balance improvement of 0.2 is lower than the improvement_threshold of 0.5
NOTICE: Ignored 2 moves, all of which are shown in notices above
HINT: If you do want these moves to happen, try changing improvement_threshold to a lower value than what it is now (0.5).
unnest
---------------------------------------------------------------------
(0 rows)
-- improvement_threshold can be used to force a move of big shards
-- if needed.
SELECT unnest(shard_placement_rebalance_array(
ARRAY['{"node_name": "a"}',
'{"node_name": "b"}']::json[],
ARRAY['{"shardid":1, "cost":20, "nodename":"a"}',
'{"shardid":2, "cost":20, "nodename":"a"}',
'{"shardid":3, "cost":100, "nodename":"b"}',
'{"shardid":4, "cost":50, "nodename":"b"}'
]::json[],
improvement_threshold := 0.1
));
unnest
---------------------------------------------------------------------
{"updatetype":1,"shardid":3,"sourcename":"b","sourceport":5432,"targetname":"a","targetport":5432}
{"updatetype":1,"shardid":1,"sourcename":"a","sourceport":5432,"targetname":"b","targetport":5432}
{"updatetype":1,"shardid":2,"sourcename":"a","sourceport":5432,"targetname":"b","targetport":5432}
(3 rows)
SELECT unnest(shard_placement_rebalance_array(
ARRAY['{"node_name": "a"}',
'{"node_name": "b"}']::json[],
ARRAY['{"shardid":1, "cost":40, "nodename":"a"}',
'{"shardid":2, "cost":40, "nodename":"a"}',
'{"shardid":3, "cost":100, "nodename":"b"}',
'{"shardid":4, "cost":100, "nodename":"b"}'
]::json[],
improvement_threshold := 0.2
));
unnest
---------------------------------------------------------------------
{"updatetype":1,"shardid":3,"sourcename":"b","sourceport":5432,"targetname":"a","targetport":5432}
{"updatetype":1,"shardid":1,"sourcename":"a","sourceport":5432,"targetname":"b","targetport":5432}
(2 rows)
-- limits notices about ignored moves
SELECT unnest(shard_placement_rebalance_array(
ARRAY['{"node_name": "a"}',
'{"node_name": "b"}',
'{"node_name": "c"}',
'{"node_name": "d"}',
'{"node_name": "e"}',
'{"node_name": "f"}',
'{"node_name": "g"}'
]::json[],
ARRAY['{"shardid":1, "cost":39, "nodename":"a"}',
'{"shardid":2, "cost":39, "nodename":"b"}',
'{"shardid":3, "cost":39, "nodename":"c"}',
'{"shardid":4, "cost":39, "nodename":"d"}',
'{"shardid":5, "cost":39, "nodename":"e"}',
'{"shardid":6, "cost":39, "nodename":"f"}',
'{"shardid":7, "cost":40, "nodename":"g"}',
'{"shardid":8, "cost":39, "nodename":"g"}'
]::json[],
improvement_threshold := 0.1
));
NOTICE: Ignoring move of shard xxxxx from g:5432 to a:5432, because the move only brings a small improvement relative to the shard its size
DETAIL: The balance improvement of 0.025641 is lower than the improvement_threshold of 0.1
NOTICE: Ignoring move of shard xxxxx from g:5432 to b:5432, because the move only brings a small improvement relative to the shard its size
DETAIL: The balance improvement of 0.025641 is lower than the improvement_threshold of 0.1
NOTICE: Ignoring move of shard xxxxx from g:5432 to c:5432, because the move only brings a small improvement relative to the shard its size
DETAIL: The balance improvement of 0.025641 is lower than the improvement_threshold of 0.1
NOTICE: Ignoring move of shard xxxxx from g:5432 to d:5432, because the move only brings a small improvement relative to the shard its size
DETAIL: The balance improvement of 0.025641 is lower than the improvement_threshold of 0.1
NOTICE: Ignoring move of shard xxxxx from g:5432 to e:5432, because the move only brings a small improvement relative to the shard its size
DETAIL: The balance improvement of 0.025641 is lower than the improvement_threshold of 0.1
NOTICE: Ignored 6 moves, 5 of which are shown in notices above
HINT: If you do want these moves to happen, try changing improvement_threshold to a lower value than what it is now (0.1).
unnest
---------------------------------------------------------------------
(0 rows)
-- limits notices based on GUC
set citus.max_rebalancer_logged_ignored_moves = 1;
SELECT unnest(shard_placement_rebalance_array(
ARRAY['{"node_name": "a"}',
'{"node_name": "b"}',
'{"node_name": "c"}',
'{"node_name": "d"}',
'{"node_name": "e"}',
'{"node_name": "f"}',
'{"node_name": "g"}'
]::json[],
ARRAY['{"shardid":1, "cost":39, "nodename":"a"}',
'{"shardid":2, "cost":39, "nodename":"b"}',
'{"shardid":3, "cost":39, "nodename":"c"}',
'{"shardid":4, "cost":39, "nodename":"d"}',
'{"shardid":5, "cost":39, "nodename":"e"}',
'{"shardid":6, "cost":39, "nodename":"f"}',
'{"shardid":7, "cost":40, "nodename":"g"}',
'{"shardid":8, "cost":39, "nodename":"g"}'
]::json[],
improvement_threshold := 0.1
));
NOTICE: Ignoring move of shard xxxxx from g:5432 to a:5432, because the move only brings a small improvement relative to the shard its size
DETAIL: The balance improvement of 0.025641 is lower than the improvement_threshold of 0.1
NOTICE: Ignored 6 moves, 1 of which are shown in notices above
HINT: If you do want these moves to happen, try changing improvement_threshold to a lower value than what it is now (0.1).
unnest
---------------------------------------------------------------------
(0 rows)
set citus.max_rebalancer_logged_ignored_moves = 10;
SELECT unnest(shard_placement_rebalance_array(
ARRAY['{"node_name": "a"}',
'{"node_name": "b"}',
'{"node_name": "c"}',
'{"node_name": "d"}',
'{"node_name": "e"}',
'{"node_name": "f"}',
'{"node_name": "g"}'
]::json[],
ARRAY['{"shardid":1, "cost":39, "nodename":"a"}',
'{"shardid":2, "cost":39, "nodename":"b"}',
'{"shardid":3, "cost":39, "nodename":"c"}',
'{"shardid":4, "cost":39, "nodename":"d"}',
'{"shardid":5, "cost":39, "nodename":"e"}',
'{"shardid":6, "cost":39, "nodename":"f"}',
'{"shardid":7, "cost":40, "nodename":"g"}',
'{"shardid":8, "cost":39, "nodename":"g"}'
]::json[],
improvement_threshold := 0.1
));
NOTICE: Ignoring move of shard xxxxx from g:5432 to a:5432, because the move only brings a small improvement relative to the shard its size
DETAIL: The balance improvement of 0.025641 is lower than the improvement_threshold of 0.1
NOTICE: Ignoring move of shard xxxxx from g:5432 to b:5432, because the move only brings a small improvement relative to the shard its size
DETAIL: The balance improvement of 0.025641 is lower than the improvement_threshold of 0.1
NOTICE: Ignoring move of shard xxxxx from g:5432 to c:5432, because the move only brings a small improvement relative to the shard its size
DETAIL: The balance improvement of 0.025641 is lower than the improvement_threshold of 0.1
NOTICE: Ignoring move of shard xxxxx from g:5432 to d:5432, because the move only brings a small improvement relative to the shard its size
DETAIL: The balance improvement of 0.025641 is lower than the improvement_threshold of 0.1
NOTICE: Ignoring move of shard xxxxx from g:5432 to e:5432, because the move only brings a small improvement relative to the shard its size
DETAIL: The balance improvement of 0.025641 is lower than the improvement_threshold of 0.1
NOTICE: Ignoring move of shard xxxxx from g:5432 to f:5432, because the move only brings a small improvement relative to the shard its size
DETAIL: The balance improvement of 0.025641 is lower than the improvement_threshold of 0.1
NOTICE: Ignored 6 moves, all of which are shown in notices above
HINT: If you do want these moves to happen, try changing improvement_threshold to a lower value than what it is now (0.1).
unnest
---------------------------------------------------------------------
(0 rows)
set citus.max_rebalancer_logged_ignored_moves = -1;
SELECT unnest(shard_placement_rebalance_array(
ARRAY['{"node_name": "a"}',
'{"node_name": "b"}',
'{"node_name": "c"}',
'{"node_name": "d"}',
'{"node_name": "e"}',
'{"node_name": "f"}',
'{"node_name": "g"}'
]::json[],
ARRAY['{"shardid":1, "cost":39, "nodename":"a"}',
'{"shardid":2, "cost":39, "nodename":"b"}',
'{"shardid":3, "cost":39, "nodename":"c"}',
'{"shardid":4, "cost":39, "nodename":"d"}',
'{"shardid":5, "cost":39, "nodename":"e"}',
'{"shardid":6, "cost":39, "nodename":"f"}',
'{"shardid":7, "cost":40, "nodename":"g"}',
'{"shardid":8, "cost":39, "nodename":"g"}'
]::json[],
improvement_threshold := 0.1
));
NOTICE: Ignoring move of shard xxxxx from g:5432 to a:5432, because the move only brings a small improvement relative to the shard its size
DETAIL: The balance improvement of 0.025641 is lower than the improvement_threshold of 0.1
NOTICE: Ignoring move of shard xxxxx from g:5432 to b:5432, because the move only brings a small improvement relative to the shard its size
DETAIL: The balance improvement of 0.025641 is lower than the improvement_threshold of 0.1
NOTICE: Ignoring move of shard xxxxx from g:5432 to c:5432, because the move only brings a small improvement relative to the shard its size
DETAIL: The balance improvement of 0.025641 is lower than the improvement_threshold of 0.1
NOTICE: Ignoring move of shard xxxxx from g:5432 to d:5432, because the move only brings a small improvement relative to the shard its size
DETAIL: The balance improvement of 0.025641 is lower than the improvement_threshold of 0.1
NOTICE: Ignoring move of shard xxxxx from g:5432 to e:5432, because the move only brings a small improvement relative to the shard its size
DETAIL: The balance improvement of 0.025641 is lower than the improvement_threshold of 0.1
NOTICE: Ignoring move of shard xxxxx from g:5432 to f:5432, because the move only brings a small improvement relative to the shard its size
DETAIL: The balance improvement of 0.025641 is lower than the improvement_threshold of 0.1
NOTICE: Ignored 6 moves, all of which are shown in notices above
HINT: If you do want these moves to happen, try changing improvement_threshold to a lower value than what it is now (0.1).
unnest
---------------------------------------------------------------------
(0 rows)
-- Combining improvement_threshold and capacity works as expected.
SELECT unnest(shard_placement_rebalance_array(
ARRAY['{"node_name": "a"}',
'{"node_name": "b", "capacity": 2}']::json[],
ARRAY['{"shardid":1, "cost":20, "nodename":"a"}',
'{"shardid":2, "cost":10, "nodename":"a"}',
'{"shardid":3, "cost":10, "nodename":"a"}',
'{"shardid":4, "cost":100, "nodename":"b"}',
'{"shardid":5, "cost":50, "nodename":"b"}',
'{"shardid":6, "cost":50, "nodename":"b"}'
]::json[],
improvement_threshold := 0.6
));
NOTICE: Ignoring move of shard xxxxx from a:5432 to b:5432, because the move only brings a small improvement relative to the shard its size
DETAIL: The balance improvement of 0.5 is lower than the improvement_threshold of 0.6
NOTICE: Ignored 1 moves, all of which are shown in notices above
HINT: If you do want these moves to happen, try changing improvement_threshold to a lower value than what it is now (0.6).
unnest
---------------------------------------------------------------------
{"updatetype":1,"shardid":5,"sourcename":"b","sourceport":5432,"targetname":"a","targetport":5432}
{"updatetype":1,"shardid":2,"sourcename":"a","sourceport":5432,"targetname":"b","targetport":5432}
(2 rows)

View File

@ -35,7 +35,7 @@ ORDER BY 1;
function citus_add_inactive_node(text,integer,integer,noderole,name)
function citus_add_local_table_to_metadata(regclass,boolean)
function citus_add_node(text,integer,integer,noderole,name)
function citus_add_rebalance_strategy(name,regproc,regproc,regproc,real,real)
function citus_add_rebalance_strategy(name,regproc,regproc,regproc,real,real,real)
function citus_add_secondary_node(text,integer,text,integer,name)
function citus_blocking_pids(integer)
function citus_conninfo_cache_invalidate()
@ -121,7 +121,7 @@ ORDER BY 1;
function get_current_transaction_id()
function get_global_active_transactions()
function get_rebalance_progress()
function get_rebalance_table_shards_plan(regclass,real,integer,bigint[],boolean,name)
function get_rebalance_table_shards_plan(regclass,real,integer,bigint[],boolean,name,real)
function get_shard_id_for_distribution_column(regclass,"any")
function isolate_tenant_to_new_shard(regclass,"any",text)
function json_cat_agg(json)

View File

@ -32,7 +32,7 @@ ORDER BY 1;
function citus_add_inactive_node(text,integer,integer,noderole,name)
function citus_add_local_table_to_metadata(regclass,boolean)
function citus_add_node(text,integer,integer,noderole,name)
function citus_add_rebalance_strategy(name,regproc,regproc,regproc,real,real)
function citus_add_rebalance_strategy(name,regproc,regproc,regproc,real,real,real)
function citus_add_secondary_node(text,integer,text,integer,name)
function citus_blocking_pids(integer)
function citus_conninfo_cache_invalidate()
@ -116,7 +116,7 @@ ORDER BY 1;
function get_current_transaction_id()
function get_global_active_transactions()
function get_rebalance_progress()
function get_rebalance_table_shards_plan(regclass,real,integer,bigint[],boolean,name)
function get_rebalance_table_shards_plan(regclass,real,integer,bigint[],boolean,name,real)
function get_shard_id_for_distribution_column(regclass,"any")
function isolate_tenant_to_new_shard(regclass,"any",text)
function json_cat_agg(json)

View File

@ -1,8 +1,8 @@
SELECT * FROM pg_catalog.pg_dist_rebalance_strategy ORDER BY name;
name | default_strategy | shard_cost_function | node_capacity_function | shard_allowed_on_node_function | default_threshold | minimum_threshold
name | default_strategy | shard_cost_function | node_capacity_function | shard_allowed_on_node_function | default_threshold | minimum_threshold | improvement_threshold
---------------------------------------------------------------------
by_disk_size | f | citus_shard_cost_by_disk_size | citus_node_capacity_1 | citus_shard_allowed_on_node_true | 0.1 | 0.01
by_shard_count | f | citus_shard_cost_1 | citus_node_capacity_1 | citus_shard_allowed_on_node_true | 0 | 0
custom_strategy | t | upgrade_rebalance_strategy.shard_cost_2 | upgrade_rebalance_strategy.capacity_high_worker_1 | upgrade_rebalance_strategy.only_worker_2 | 0.5 | 0.2
by_disk_size | f | citus_shard_cost_by_disk_size | citus_node_capacity_1 | citus_shard_allowed_on_node_true | 0.1 | 0.01 | 0.5
by_shard_count | f | citus_shard_cost_1 | citus_node_capacity_1 | citus_shard_allowed_on_node_true | 0 | 0 | 0
custom_strategy | t | upgrade_rebalance_strategy.shard_cost_2 | upgrade_rebalance_strategy.capacity_high_worker_1 | upgrade_rebalance_strategy.only_worker_2 | 0.5 | 0.2 | 0
(3 rows)

View File

@ -15,14 +15,14 @@ CREATE FUNCTION only_worker_2(shardid bigint, nodeidarg int)
(CASE WHEN nodeport = 57638 THEN TRUE ELSE FALSE END)
FROM pg_dist_node where nodeid = nodeidarg
$$ LANGUAGE sql;
ALTER TABLE pg_catalog.pg_dist_rebalance_strategy DISABLE TRIGGER pg_dist_rebalance_strategy_enterprise_check_trigger;
SELECT citus_add_rebalance_strategy(
'custom_strategy',
'shard_cost_2',
'capacity_high_worker_1',
'only_worker_2',
0.5,
0.2
0.2,
0.3
);
citus_add_rebalance_strategy
---------------------------------------------------------------------
@ -35,4 +35,3 @@ SELECT citus_set_default_rebalance_strategy('custom_strategy');
(1 row)
ALTER TABLE pg_catalog.pg_dist_rebalance_strategy ENABLE TRIGGER pg_dist_rebalance_strategy_enterprise_check_trigger;

View File

@ -7,5 +7,5 @@
-- part of the query so new changes to it won't affect this test.
SELECT attrelid::regclass, attname, atthasmissing, attmissingval
FROM pg_attribute
WHERE atthasmissing AND attrelid NOT IN ('pg_dist_node'::regclass)
WHERE atthasmissing AND attrelid NOT IN ('pg_dist_node'::regclass, 'pg_dist_rebalance_strategy'::regclass)
ORDER BY attrelid, attname;

View File

@ -65,7 +65,8 @@ CREATE OR REPLACE FUNCTION shard_placement_rebalance_array(
shard_placement_list json[],
threshold float4 DEFAULT 0,
max_shard_moves int DEFAULT 1000000,
drain_only bool DEFAULT false
drain_only bool DEFAULT false,
improvement_threshold float4 DEFAULT 0.5
)
RETURNS json[]
AS 'citus'
@ -647,6 +648,7 @@ DROP USER testrole;
-- Test costs
set citus.shard_count = 4;
SET citus.next_shard_id TO 123040;
CREATE TABLE tab (x int);
SELECT create_distributed_table('tab','x');
-- The following numbers are chosen such that they are placed on different
@ -655,6 +657,7 @@ INSERT INTO tab SELECT 1 from generate_series(1, 30000);
INSERT INTO tab SELECT 2 from generate_series(1, 10000);
INSERT INTO tab SELECT 3 from generate_series(1, 10000);
INSERT INTO tab SELECT 6 from generate_series(1, 10000);
VACUUM FULL tab;
ANALYZE tab;
\c - - - :worker_1_port
@ -712,6 +715,7 @@ INSERT INTO tab2 SELECT 1 from generate_series(1, 0);
INSERT INTO tab2 SELECT 2 from generate_series(1, 60000);
INSERT INTO tab2 SELECT 3 from generate_series(1, 10000);
INSERT INTO tab2 SELECT 6 from generate_series(1, 10000);
VACUUM FULL tab, tab2;
ANALYZE tab, tab2;
\c - - - :worker_1_port
@ -747,6 +751,8 @@ WHERE table_schema = 'public'
\c - - - :master_port
SELECT * FROM get_rebalance_table_shards_plan('tab', rebalance_strategy := 'by_disk_size');
-- supports improvement_threshold
SELECT * FROM get_rebalance_table_shards_plan('tab', rebalance_strategy := 'by_disk_size', improvement_threshold := 0);
SELECT * FROM rebalance_table_shards('tab', rebalance_strategy := 'by_disk_size', shard_transfer_mode:='block_writes');
SELECT * FROM public.table_placements_per_node;
ANALYZE tab, tab2;
@ -785,46 +791,46 @@ WHERE table_schema = 'public'
DROP TABLE tab2;
CREATE OR REPLACE FUNCTION capacity_high_worker_1(nodeidarg int)
CREATE OR REPLACE FUNCTION capacity_high_worker_2(nodeidarg int)
RETURNS real AS $$
SELECT
(CASE WHEN nodeport = 57637 THEN 1000 ELSE 1 END)::real
(CASE WHEN nodeport = 57638 THEN 1000 ELSE 1 END)::real
FROM pg_dist_node where nodeid = nodeidarg
$$ LANGUAGE sql;
SELECT citus_add_rebalance_strategy(
'capacity_high_worker_1',
'capacity_high_worker_2',
'citus_shard_cost_1',
'capacity_high_worker_1',
'capacity_high_worker_2',
'citus_shard_allowed_on_node_true',
0
);
SELECT * FROM get_rebalance_table_shards_plan('tab', rebalance_strategy := 'capacity_high_worker_1');
SELECT * FROM rebalance_table_shards('tab', rebalance_strategy := 'capacity_high_worker_1', shard_transfer_mode:='block_writes');
SELECT * FROM get_rebalance_table_shards_plan('tab', rebalance_strategy := 'capacity_high_worker_2');
SELECT * FROM rebalance_table_shards('tab', rebalance_strategy := 'capacity_high_worker_2', shard_transfer_mode:='block_writes');
SELECT * FROM public.table_placements_per_node;
SELECT citus_set_default_rebalance_strategy('capacity_high_worker_1');
SELECT citus_set_default_rebalance_strategy('capacity_high_worker_2');
SELECT * FROM get_rebalance_table_shards_plan('tab');
SELECT * FROM rebalance_table_shards('tab', shard_transfer_mode:='block_writes');
SELECT * FROM public.table_placements_per_node;
CREATE FUNCTION only_worker_2(shardid bigint, nodeidarg int)
CREATE FUNCTION only_worker_1(shardid bigint, nodeidarg int)
RETURNS boolean AS $$
SELECT
(CASE WHEN nodeport = 57638 THEN TRUE ELSE FALSE END)
(CASE WHEN nodeport = 57637 THEN TRUE ELSE FALSE END)
FROM pg_dist_node where nodeid = nodeidarg
$$ LANGUAGE sql;
SELECT citus_add_rebalance_strategy(
'only_worker_2',
'only_worker_1',
'citus_shard_cost_1',
'citus_node_capacity_1',
'only_worker_2',
'only_worker_1',
0
);
SELECT citus_set_default_rebalance_strategy('only_worker_2');
SELECT citus_set_default_rebalance_strategy('only_worker_1');
SELECT * FROM get_rebalance_table_shards_plan('tab');
SELECT * FROM rebalance_table_shards('tab', shard_transfer_mode:='block_writes');
SELECT * FROM public.table_placements_per_node;
@ -1012,7 +1018,7 @@ UPDATE pg_dist_rebalance_strategy SET default_strategy=true WHERE name='by_shard
SELECT citus_add_rebalance_strategy(
'default_threshold_too_low',
'citus_shard_cost_1',
'capacity_high_worker_1',
'capacity_high_worker_2',
'citus_shard_allowed_on_node_true',
0,
0.1

View File

@ -3,7 +3,8 @@ CREATE OR REPLACE FUNCTION shard_placement_rebalance_array(
shard_placement_list json[],
threshold float4 DEFAULT 0,
max_shard_moves int DEFAULT 1000000,
drain_only bool DEFAULT false
drain_only bool DEFAULT false,
improvement_threshold float4 DEFAULT 0.5
)
RETURNS json[]
AS 'citus'
@ -381,3 +382,151 @@ SELECT unnest(shard_placement_rebalance_array(
]::json[],
max_shard_moves := 5
));
-- Don't move a big shards if it doesn't improve the utilization balance much.
SELECT unnest(shard_placement_rebalance_array(
ARRAY['{"node_name": "a"}',
'{"node_name": "b"}']::json[],
ARRAY['{"shardid":1, "cost":20, "nodename":"a"}',
'{"shardid":2, "cost":20, "nodename":"a"}',
'{"shardid":3, "cost":100, "nodename":"b"}',
'{"shardid":4, "cost":50, "nodename":"b"}'
]::json[]
));
SELECT unnest(shard_placement_rebalance_array(
ARRAY['{"node_name": "a"}',
'{"node_name": "b"}']::json[],
ARRAY['{"shardid":1, "cost":40, "nodename":"a"}',
'{"shardid":2, "cost":40, "nodename":"a"}',
'{"shardid":3, "cost":100, "nodename":"b"}',
'{"shardid":4, "cost":100, "nodename":"b"}'
]::json[]
));
-- improvement_threshold can be used to force a move of big shards
-- if needed.
SELECT unnest(shard_placement_rebalance_array(
ARRAY['{"node_name": "a"}',
'{"node_name": "b"}']::json[],
ARRAY['{"shardid":1, "cost":20, "nodename":"a"}',
'{"shardid":2, "cost":20, "nodename":"a"}',
'{"shardid":3, "cost":100, "nodename":"b"}',
'{"shardid":4, "cost":50, "nodename":"b"}'
]::json[],
improvement_threshold := 0.1
));
SELECT unnest(shard_placement_rebalance_array(
ARRAY['{"node_name": "a"}',
'{"node_name": "b"}']::json[],
ARRAY['{"shardid":1, "cost":40, "nodename":"a"}',
'{"shardid":2, "cost":40, "nodename":"a"}',
'{"shardid":3, "cost":100, "nodename":"b"}',
'{"shardid":4, "cost":100, "nodename":"b"}'
]::json[],
improvement_threshold := 0.2
));
-- limits notices about ignored moves
SELECT unnest(shard_placement_rebalance_array(
ARRAY['{"node_name": "a"}',
'{"node_name": "b"}',
'{"node_name": "c"}',
'{"node_name": "d"}',
'{"node_name": "e"}',
'{"node_name": "f"}',
'{"node_name": "g"}'
]::json[],
ARRAY['{"shardid":1, "cost":39, "nodename":"a"}',
'{"shardid":2, "cost":39, "nodename":"b"}',
'{"shardid":3, "cost":39, "nodename":"c"}',
'{"shardid":4, "cost":39, "nodename":"d"}',
'{"shardid":5, "cost":39, "nodename":"e"}',
'{"shardid":6, "cost":39, "nodename":"f"}',
'{"shardid":7, "cost":40, "nodename":"g"}',
'{"shardid":8, "cost":39, "nodename":"g"}'
]::json[],
improvement_threshold := 0.1
));
-- limits notices based on GUC
set citus.max_rebalancer_logged_ignored_moves = 1;
SELECT unnest(shard_placement_rebalance_array(
ARRAY['{"node_name": "a"}',
'{"node_name": "b"}',
'{"node_name": "c"}',
'{"node_name": "d"}',
'{"node_name": "e"}',
'{"node_name": "f"}',
'{"node_name": "g"}'
]::json[],
ARRAY['{"shardid":1, "cost":39, "nodename":"a"}',
'{"shardid":2, "cost":39, "nodename":"b"}',
'{"shardid":3, "cost":39, "nodename":"c"}',
'{"shardid":4, "cost":39, "nodename":"d"}',
'{"shardid":5, "cost":39, "nodename":"e"}',
'{"shardid":6, "cost":39, "nodename":"f"}',
'{"shardid":7, "cost":40, "nodename":"g"}',
'{"shardid":8, "cost":39, "nodename":"g"}'
]::json[],
improvement_threshold := 0.1
));
set citus.max_rebalancer_logged_ignored_moves = 10;
SELECT unnest(shard_placement_rebalance_array(
ARRAY['{"node_name": "a"}',
'{"node_name": "b"}',
'{"node_name": "c"}',
'{"node_name": "d"}',
'{"node_name": "e"}',
'{"node_name": "f"}',
'{"node_name": "g"}'
]::json[],
ARRAY['{"shardid":1, "cost":39, "nodename":"a"}',
'{"shardid":2, "cost":39, "nodename":"b"}',
'{"shardid":3, "cost":39, "nodename":"c"}',
'{"shardid":4, "cost":39, "nodename":"d"}',
'{"shardid":5, "cost":39, "nodename":"e"}',
'{"shardid":6, "cost":39, "nodename":"f"}',
'{"shardid":7, "cost":40, "nodename":"g"}',
'{"shardid":8, "cost":39, "nodename":"g"}'
]::json[],
improvement_threshold := 0.1
));
set citus.max_rebalancer_logged_ignored_moves = -1;
SELECT unnest(shard_placement_rebalance_array(
ARRAY['{"node_name": "a"}',
'{"node_name": "b"}',
'{"node_name": "c"}',
'{"node_name": "d"}',
'{"node_name": "e"}',
'{"node_name": "f"}',
'{"node_name": "g"}'
]::json[],
ARRAY['{"shardid":1, "cost":39, "nodename":"a"}',
'{"shardid":2, "cost":39, "nodename":"b"}',
'{"shardid":3, "cost":39, "nodename":"c"}',
'{"shardid":4, "cost":39, "nodename":"d"}',
'{"shardid":5, "cost":39, "nodename":"e"}',
'{"shardid":6, "cost":39, "nodename":"f"}',
'{"shardid":7, "cost":40, "nodename":"g"}',
'{"shardid":8, "cost":39, "nodename":"g"}'
]::json[],
improvement_threshold := 0.1
));
-- Combining improvement_threshold and capacity works as expected.
SELECT unnest(shard_placement_rebalance_array(
ARRAY['{"node_name": "a"}',
'{"node_name": "b", "capacity": 2}']::json[],
ARRAY['{"shardid":1, "cost":20, "nodename":"a"}',
'{"shardid":2, "cost":10, "nodename":"a"}',
'{"shardid":3, "cost":10, "nodename":"a"}',
'{"shardid":4, "cost":100, "nodename":"b"}',
'{"shardid":5, "cost":50, "nodename":"b"}',
'{"shardid":6, "cost":50, "nodename":"b"}'
]::json[],
improvement_threshold := 0.6
));

View File

@ -18,7 +18,6 @@ CREATE FUNCTION only_worker_2(shardid bigint, nodeidarg int)
(CASE WHEN nodeport = 57638 THEN TRUE ELSE FALSE END)
FROM pg_dist_node where nodeid = nodeidarg
$$ LANGUAGE sql;
ALTER TABLE pg_catalog.pg_dist_rebalance_strategy DISABLE TRIGGER pg_dist_rebalance_strategy_enterprise_check_trigger;
SELECT citus_add_rebalance_strategy(
'custom_strategy',
@ -26,7 +25,7 @@ SELECT citus_add_rebalance_strategy(
'capacity_high_worker_1',
'only_worker_2',
0.5,
0.2
0.2,
0.3
);
SELECT citus_set_default_rebalance_strategy('custom_strategy');
ALTER TABLE pg_catalog.pg_dist_rebalance_strategy ENABLE TRIGGER pg_dist_rebalance_strategy_enterprise_check_trigger;