Change by_disk_size rebalance strategy to have a base size (#7035)

One problem with rebalancing by disk size is that shards in newly
created collocation groups are considered extremely small. This can
easily result in bad balances if there are some other collocation groups
that do have some data. One extremely bad example of this is:
1. You have 2 workers
2. Both contain about 100GB of data, but there's a 70MB difference.
3. You create 100 new distributed schemas with a few empty tables in
   them
4. You run the rebalancer
5. Now all new distributed schemas are placed on the node with that had
   70MB less.
6. You start loading some data in these shards and quickly the balance
   is completely off

To address this edge case, this PR changes the by_disk_size rebalance
strategy to add a a base size of 100MB to the actual size of each
shard group. This can still result in a bad balance when shard groups
are empty, but it solves some of the worst cases.
pull/7033/head
Jelte Fennema 2023-06-27 16:37:09 +02:00 committed by GitHub
parent 03a4769c3a
commit fd1427de2c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 49 additions and 4 deletions

View File

@ -319,6 +319,7 @@ PG_FUNCTION_INFO_V1(citus_rebalance_wait);
bool RunningUnderIsolationTest = false;
int MaxRebalancerLoggedIgnoredMoves = 5;
int RebalancerByDiskSizeBaseCost = 100 * 1024 * 1024;
bool PropagateSessionSettingsForLoopbackConnection = false;
static const char *PlacementUpdateTypeNames[] = {
@ -677,6 +678,8 @@ citus_shard_cost_by_disk_size(PG_FUNCTION_ARGS)
MemoryContextSwitchTo(oldContext);
MemoryContextReset(localContext);
colocationSizeInBytes += RebalancerByDiskSizeBaseCost;
if (colocationSizeInBytes <= 0)
{
PG_RETURN_FLOAT4(1);

View File

@ -2194,6 +2194,23 @@ RegisterCitusConfigVariables(void)
GUC_STANDARD,
NULL, NULL, NULL);
DefineCustomIntVariable(
"citus.rebalancer_by_disk_size_base_cost",
gettext_noop(
"When using the by_disk_size rebalance strategy each shard group "
"will get this cost in bytes added to its actual disk size. This "
"is used to avoid creating a bad balance when there's very little "
"data in some of the shards. The assumption is that even empty "
"shards have some cost, because of parallelism and because empty "
"shard groups will likely grow in the future."),
gettext_noop(
"The main reason this is configurable, is so it can be lowered for Citus its regression tests."),
&RebalancerByDiskSizeBaseCost,
100 * 1024 * 1024, 0, INT_MAX,
PGC_USERSET,
GUC_UNIT_BYTE | GUC_NO_SHOW_ALL | GUC_NOT_IN_SAMPLE,
NULL, NULL, NULL);
DefineCustomIntVariable(
"citus.recover_2pc_interval",
gettext_noop("Sets the time to wait between recovering 2PCs."),

View File

@ -188,6 +188,7 @@ typedef struct RebalancePlanFunctions
extern char *VariablesToBePassedToNewConnections;
extern int MaxRebalancerLoggedIgnoredMoves;
extern int RebalancerByDiskSizeBaseCost;
extern bool RunningUnderIsolationTest;
extern bool PropagateSessionSettingsForLoopbackConnection;
extern int MaxBackgroundTaskExecutorsPerNode;

View File

@ -118,6 +118,7 @@ DEPS = {
"multi_extension": TestDeps(None, repeatable=False),
"multi_test_helpers": TestDeps(None),
"multi_insert_select": TestDeps("base_schedule"),
"multi_partitioning": TestDeps("base_schedule"),
"multi_mx_create_table": TestDeps(
None,
[

View File

@ -2042,7 +2042,7 @@ SELECT citus_shard_cost_by_disk_size(shardid) FROM pg_dist_shard WHERE logicalre
DEBUG: skipping child tables for relation named: events.Energy Added
citus_shard_cost_by_disk_size
---------------------------------------------------------------------
16384
1.04874e+08
(1 row)
RESET client_min_messages;

View File

@ -226,7 +226,7 @@ SET search_path TO shard_move_deferred_delete;
SELECT citus_shard_cost_by_disk_size(20000001);
citus_shard_cost_by_disk_size
---------------------------------------------------------------------
8192
1.04866e+08
(1 row)
-- When there's not enough space the move should fail

View File

@ -3,6 +3,15 @@
--
SET citus.next_shard_id TO 433000;
SET citus.propagate_session_settings_for_loopback_connection TO ON;
-- Lower the minimum disk size that a shard group is considered as. Otherwise
-- we need to create shards of more than 100MB.
ALTER SYSTEM SET citus.rebalancer_by_disk_size_base_cost = 0;
SELECT pg_reload_conf();
pg_reload_conf
---------------------------------------------------------------------
t
(1 row)
CREATE TABLE ref_table_test(a int primary key);
SELECT create_reference_table('ref_table_test');
create_reference_table
@ -2182,7 +2191,7 @@ SELECT 1 FROM master_remove_node('localhost', :master_port);
1
(1 row)
SELECT public.wait_until_metadata_sync(30000);
SELECT public.wait_until_metadata_sync(60000);
wait_until_metadata_sync
---------------------------------------------------------------------
@ -2854,6 +2863,13 @@ select 1 from citus_add_node('localhost', :worker_2_port);
select rebalance_table_shards();
ERROR: cannot use logical replication to transfer shards of the relation table_without_primary_key since it doesn't have a REPLICA IDENTITY or PRIMARY KEY
DROP TABLE table_with_primary_key, table_without_primary_key;
ALTER SYSTEM RESET citus.rebalancer_by_disk_size_base_cost;
SELECT pg_reload_conf();
pg_reload_conf
---------------------------------------------------------------------
t
(1 row)
\c - - - :worker_1_port
SET citus.enable_ddl_propagation TO OFF;
REVOKE ALL ON SCHEMA public FROM testrole;

View File

@ -5,6 +5,11 @@
SET citus.next_shard_id TO 433000;
SET citus.propagate_session_settings_for_loopback_connection TO ON;
-- Lower the minimum disk size that a shard group is considered as. Otherwise
-- we need to create shards of more than 100MB.
ALTER SYSTEM SET citus.rebalancer_by_disk_size_base_cost = 0;
SELECT pg_reload_conf();
CREATE TABLE ref_table_test(a int primary key);
SELECT create_reference_table('ref_table_test');
CREATE TABLE dist_table_test(a int primary key);
@ -1228,7 +1233,7 @@ DROP TABLE tab;
-- we don't need the coordinator on pg_dist_node anymore
SELECT 1 FROM master_remove_node('localhost', :master_port);
SELECT public.wait_until_metadata_sync(30000);
SELECT public.wait_until_metadata_sync(60000);
--
-- Make sure that rebalance_table_shards() and replicate_table_shards() replicate
@ -1569,6 +1574,8 @@ select 1 from citus_add_node('localhost', :worker_2_port);
select rebalance_table_shards();
DROP TABLE table_with_primary_key, table_without_primary_key;
ALTER SYSTEM RESET citus.rebalancer_by_disk_size_base_cost;
SELECT pg_reload_conf();
\c - - - :worker_1_port
SET citus.enable_ddl_propagation TO OFF;
REVOKE ALL ON SCHEMA public FROM testrole;