From fd1427de2c5e9c77e2ffd850741351f90bd9d27f Mon Sep 17 00:00:00 2001 From: Jelte Fennema Date: Tue, 27 Jun 2023 16:37:09 +0200 Subject: [PATCH] Change by_disk_size rebalance strategy to have a base size (#7035) One problem with rebalancing by disk size is that shards in newly created collocation groups are considered extremely small. This can easily result in bad balances if there are some other collocation groups that do have some data. One extremely bad example of this is: 1. You have 2 workers 2. Both contain about 100GB of data, but there's a 70MB difference. 3. You create 100 new distributed schemas with a few empty tables in them 4. You run the rebalancer 5. Now all new distributed schemas are placed on the node with that had 70MB less. 6. You start loading some data in these shards and quickly the balance is completely off To address this edge case, this PR changes the by_disk_size rebalance strategy to add a a base size of 100MB to the actual size of each shard group. This can still result in a bad balance when shard groups are empty, but it solves some of the worst cases. --- .../distributed/operations/shard_rebalancer.c | 3 +++ src/backend/distributed/shared_library_init.c | 17 +++++++++++++++++ src/include/distributed/shard_rebalancer.h | 1 + src/test/regress/citus_tests/run_test.py | 1 + .../regress/expected/multi_partitioning.out | 2 +- .../expected/shard_move_deferred_delete.out | 2 +- src/test/regress/expected/shard_rebalancer.out | 18 +++++++++++++++++- src/test/regress/sql/shard_rebalancer.sql | 9 ++++++++- 8 files changed, 49 insertions(+), 4 deletions(-) diff --git a/src/backend/distributed/operations/shard_rebalancer.c b/src/backend/distributed/operations/shard_rebalancer.c index 5750b9e8d..20004f5fb 100644 --- a/src/backend/distributed/operations/shard_rebalancer.c +++ b/src/backend/distributed/operations/shard_rebalancer.c @@ -319,6 +319,7 @@ PG_FUNCTION_INFO_V1(citus_rebalance_wait); bool RunningUnderIsolationTest = false; int MaxRebalancerLoggedIgnoredMoves = 5; +int RebalancerByDiskSizeBaseCost = 100 * 1024 * 1024; bool PropagateSessionSettingsForLoopbackConnection = false; static const char *PlacementUpdateTypeNames[] = { @@ -677,6 +678,8 @@ citus_shard_cost_by_disk_size(PG_FUNCTION_ARGS) MemoryContextSwitchTo(oldContext); MemoryContextReset(localContext); + colocationSizeInBytes += RebalancerByDiskSizeBaseCost; + if (colocationSizeInBytes <= 0) { PG_RETURN_FLOAT4(1); diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index 907d8e73e..2fbe2f5f5 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -2194,6 +2194,23 @@ RegisterCitusConfigVariables(void) GUC_STANDARD, NULL, NULL, NULL); + DefineCustomIntVariable( + "citus.rebalancer_by_disk_size_base_cost", + gettext_noop( + "When using the by_disk_size rebalance strategy each shard group " + "will get this cost in bytes added to its actual disk size. This " + "is used to avoid creating a bad balance when there's very little " + "data in some of the shards. The assumption is that even empty " + "shards have some cost, because of parallelism and because empty " + "shard groups will likely grow in the future."), + gettext_noop( + "The main reason this is configurable, is so it can be lowered for Citus its regression tests."), + &RebalancerByDiskSizeBaseCost, + 100 * 1024 * 1024, 0, INT_MAX, + PGC_USERSET, + GUC_UNIT_BYTE | GUC_NO_SHOW_ALL | GUC_NOT_IN_SAMPLE, + NULL, NULL, NULL); + DefineCustomIntVariable( "citus.recover_2pc_interval", gettext_noop("Sets the time to wait between recovering 2PCs."), diff --git a/src/include/distributed/shard_rebalancer.h b/src/include/distributed/shard_rebalancer.h index 705196ad4..38ce4f485 100644 --- a/src/include/distributed/shard_rebalancer.h +++ b/src/include/distributed/shard_rebalancer.h @@ -188,6 +188,7 @@ typedef struct RebalancePlanFunctions extern char *VariablesToBePassedToNewConnections; extern int MaxRebalancerLoggedIgnoredMoves; +extern int RebalancerByDiskSizeBaseCost; extern bool RunningUnderIsolationTest; extern bool PropagateSessionSettingsForLoopbackConnection; extern int MaxBackgroundTaskExecutorsPerNode; diff --git a/src/test/regress/citus_tests/run_test.py b/src/test/regress/citus_tests/run_test.py index 8e1e1c91e..00d5638d9 100755 --- a/src/test/regress/citus_tests/run_test.py +++ b/src/test/regress/citus_tests/run_test.py @@ -118,6 +118,7 @@ DEPS = { "multi_extension": TestDeps(None, repeatable=False), "multi_test_helpers": TestDeps(None), "multi_insert_select": TestDeps("base_schedule"), + "multi_partitioning": TestDeps("base_schedule"), "multi_mx_create_table": TestDeps( None, [ diff --git a/src/test/regress/expected/multi_partitioning.out b/src/test/regress/expected/multi_partitioning.out index c0a21d4d5..64a4fd4dc 100644 --- a/src/test/regress/expected/multi_partitioning.out +++ b/src/test/regress/expected/multi_partitioning.out @@ -2042,7 +2042,7 @@ SELECT citus_shard_cost_by_disk_size(shardid) FROM pg_dist_shard WHERE logicalre DEBUG: skipping child tables for relation named: events.Energy Added citus_shard_cost_by_disk_size --------------------------------------------------------------------- - 16384 + 1.04874e+08 (1 row) RESET client_min_messages; diff --git a/src/test/regress/expected/shard_move_deferred_delete.out b/src/test/regress/expected/shard_move_deferred_delete.out index e87cd0f97..6a0c4fcd1 100644 --- a/src/test/regress/expected/shard_move_deferred_delete.out +++ b/src/test/regress/expected/shard_move_deferred_delete.out @@ -226,7 +226,7 @@ SET search_path TO shard_move_deferred_delete; SELECT citus_shard_cost_by_disk_size(20000001); citus_shard_cost_by_disk_size --------------------------------------------------------------------- - 8192 + 1.04866e+08 (1 row) -- When there's not enough space the move should fail diff --git a/src/test/regress/expected/shard_rebalancer.out b/src/test/regress/expected/shard_rebalancer.out index 0ae2193e5..62ae17487 100644 --- a/src/test/regress/expected/shard_rebalancer.out +++ b/src/test/regress/expected/shard_rebalancer.out @@ -3,6 +3,15 @@ -- SET citus.next_shard_id TO 433000; SET citus.propagate_session_settings_for_loopback_connection TO ON; +-- Lower the minimum disk size that a shard group is considered as. Otherwise +-- we need to create shards of more than 100MB. +ALTER SYSTEM SET citus.rebalancer_by_disk_size_base_cost = 0; +SELECT pg_reload_conf(); + pg_reload_conf +--------------------------------------------------------------------- + t +(1 row) + CREATE TABLE ref_table_test(a int primary key); SELECT create_reference_table('ref_table_test'); create_reference_table @@ -2182,7 +2191,7 @@ SELECT 1 FROM master_remove_node('localhost', :master_port); 1 (1 row) -SELECT public.wait_until_metadata_sync(30000); +SELECT public.wait_until_metadata_sync(60000); wait_until_metadata_sync --------------------------------------------------------------------- @@ -2854,6 +2863,13 @@ select 1 from citus_add_node('localhost', :worker_2_port); select rebalance_table_shards(); ERROR: cannot use logical replication to transfer shards of the relation table_without_primary_key since it doesn't have a REPLICA IDENTITY or PRIMARY KEY DROP TABLE table_with_primary_key, table_without_primary_key; +ALTER SYSTEM RESET citus.rebalancer_by_disk_size_base_cost; +SELECT pg_reload_conf(); + pg_reload_conf +--------------------------------------------------------------------- + t +(1 row) + \c - - - :worker_1_port SET citus.enable_ddl_propagation TO OFF; REVOKE ALL ON SCHEMA public FROM testrole; diff --git a/src/test/regress/sql/shard_rebalancer.sql b/src/test/regress/sql/shard_rebalancer.sql index f524212bf..ba22a8abd 100644 --- a/src/test/regress/sql/shard_rebalancer.sql +++ b/src/test/regress/sql/shard_rebalancer.sql @@ -5,6 +5,11 @@ SET citus.next_shard_id TO 433000; SET citus.propagate_session_settings_for_loopback_connection TO ON; +-- Lower the minimum disk size that a shard group is considered as. Otherwise +-- we need to create shards of more than 100MB. +ALTER SYSTEM SET citus.rebalancer_by_disk_size_base_cost = 0; +SELECT pg_reload_conf(); + CREATE TABLE ref_table_test(a int primary key); SELECT create_reference_table('ref_table_test'); CREATE TABLE dist_table_test(a int primary key); @@ -1228,7 +1233,7 @@ DROP TABLE tab; -- we don't need the coordinator on pg_dist_node anymore SELECT 1 FROM master_remove_node('localhost', :master_port); -SELECT public.wait_until_metadata_sync(30000); +SELECT public.wait_until_metadata_sync(60000); -- -- Make sure that rebalance_table_shards() and replicate_table_shards() replicate @@ -1569,6 +1574,8 @@ select 1 from citus_add_node('localhost', :worker_2_port); select rebalance_table_shards(); DROP TABLE table_with_primary_key, table_without_primary_key; +ALTER SYSTEM RESET citus.rebalancer_by_disk_size_base_cost; +SELECT pg_reload_conf(); \c - - - :worker_1_port SET citus.enable_ddl_propagation TO OFF; REVOKE ALL ON SCHEMA public FROM testrole;