diff --git a/src/backend/distributed/operations/shard_rebalancer.c b/src/backend/distributed/operations/shard_rebalancer.c index 5750b9e8d..20004f5fb 100644 --- a/src/backend/distributed/operations/shard_rebalancer.c +++ b/src/backend/distributed/operations/shard_rebalancer.c @@ -319,6 +319,7 @@ PG_FUNCTION_INFO_V1(citus_rebalance_wait); bool RunningUnderIsolationTest = false; int MaxRebalancerLoggedIgnoredMoves = 5; +int RebalancerByDiskSizeBaseCost = 100 * 1024 * 1024; bool PropagateSessionSettingsForLoopbackConnection = false; static const char *PlacementUpdateTypeNames[] = { @@ -677,6 +678,8 @@ citus_shard_cost_by_disk_size(PG_FUNCTION_ARGS) MemoryContextSwitchTo(oldContext); MemoryContextReset(localContext); + colocationSizeInBytes += RebalancerByDiskSizeBaseCost; + if (colocationSizeInBytes <= 0) { PG_RETURN_FLOAT4(1); diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index 907d8e73e..2fbe2f5f5 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -2194,6 +2194,23 @@ RegisterCitusConfigVariables(void) GUC_STANDARD, NULL, NULL, NULL); + DefineCustomIntVariable( + "citus.rebalancer_by_disk_size_base_cost", + gettext_noop( + "When using the by_disk_size rebalance strategy each shard group " + "will get this cost in bytes added to its actual disk size. This " + "is used to avoid creating a bad balance when there's very little " + "data in some of the shards. The assumption is that even empty " + "shards have some cost, because of parallelism and because empty " + "shard groups will likely grow in the future."), + gettext_noop( + "The main reason this is configurable, is so it can be lowered for Citus its regression tests."), + &RebalancerByDiskSizeBaseCost, + 100 * 1024 * 1024, 0, INT_MAX, + PGC_USERSET, + GUC_UNIT_BYTE | GUC_NO_SHOW_ALL | GUC_NOT_IN_SAMPLE, + NULL, NULL, NULL); + DefineCustomIntVariable( "citus.recover_2pc_interval", gettext_noop("Sets the time to wait between recovering 2PCs."), diff --git a/src/include/distributed/shard_rebalancer.h b/src/include/distributed/shard_rebalancer.h index 705196ad4..38ce4f485 100644 --- a/src/include/distributed/shard_rebalancer.h +++ b/src/include/distributed/shard_rebalancer.h @@ -188,6 +188,7 @@ typedef struct RebalancePlanFunctions extern char *VariablesToBePassedToNewConnections; extern int MaxRebalancerLoggedIgnoredMoves; +extern int RebalancerByDiskSizeBaseCost; extern bool RunningUnderIsolationTest; extern bool PropagateSessionSettingsForLoopbackConnection; extern int MaxBackgroundTaskExecutorsPerNode; diff --git a/src/test/regress/citus_tests/run_test.py b/src/test/regress/citus_tests/run_test.py index 8e1e1c91e..00d5638d9 100755 --- a/src/test/regress/citus_tests/run_test.py +++ b/src/test/regress/citus_tests/run_test.py @@ -118,6 +118,7 @@ DEPS = { "multi_extension": TestDeps(None, repeatable=False), "multi_test_helpers": TestDeps(None), "multi_insert_select": TestDeps("base_schedule"), + "multi_partitioning": TestDeps("base_schedule"), "multi_mx_create_table": TestDeps( None, [ diff --git a/src/test/regress/expected/multi_partitioning.out b/src/test/regress/expected/multi_partitioning.out index c0a21d4d5..64a4fd4dc 100644 --- a/src/test/regress/expected/multi_partitioning.out +++ b/src/test/regress/expected/multi_partitioning.out @@ -2042,7 +2042,7 @@ SELECT citus_shard_cost_by_disk_size(shardid) FROM pg_dist_shard WHERE logicalre DEBUG: skipping child tables for relation named: events.Energy Added citus_shard_cost_by_disk_size --------------------------------------------------------------------- - 16384 + 1.04874e+08 (1 row) RESET client_min_messages; diff --git a/src/test/regress/expected/shard_move_deferred_delete.out b/src/test/regress/expected/shard_move_deferred_delete.out index e87cd0f97..6a0c4fcd1 100644 --- a/src/test/regress/expected/shard_move_deferred_delete.out +++ b/src/test/regress/expected/shard_move_deferred_delete.out @@ -226,7 +226,7 @@ SET search_path TO shard_move_deferred_delete; SELECT citus_shard_cost_by_disk_size(20000001); citus_shard_cost_by_disk_size --------------------------------------------------------------------- - 8192 + 1.04866e+08 (1 row) -- When there's not enough space the move should fail diff --git a/src/test/regress/expected/shard_rebalancer.out b/src/test/regress/expected/shard_rebalancer.out index 0ae2193e5..62ae17487 100644 --- a/src/test/regress/expected/shard_rebalancer.out +++ b/src/test/regress/expected/shard_rebalancer.out @@ -3,6 +3,15 @@ -- SET citus.next_shard_id TO 433000; SET citus.propagate_session_settings_for_loopback_connection TO ON; +-- Lower the minimum disk size that a shard group is considered as. Otherwise +-- we need to create shards of more than 100MB. +ALTER SYSTEM SET citus.rebalancer_by_disk_size_base_cost = 0; +SELECT pg_reload_conf(); + pg_reload_conf +--------------------------------------------------------------------- + t +(1 row) + CREATE TABLE ref_table_test(a int primary key); SELECT create_reference_table('ref_table_test'); create_reference_table @@ -2182,7 +2191,7 @@ SELECT 1 FROM master_remove_node('localhost', :master_port); 1 (1 row) -SELECT public.wait_until_metadata_sync(30000); +SELECT public.wait_until_metadata_sync(60000); wait_until_metadata_sync --------------------------------------------------------------------- @@ -2854,6 +2863,13 @@ select 1 from citus_add_node('localhost', :worker_2_port); select rebalance_table_shards(); ERROR: cannot use logical replication to transfer shards of the relation table_without_primary_key since it doesn't have a REPLICA IDENTITY or PRIMARY KEY DROP TABLE table_with_primary_key, table_without_primary_key; +ALTER SYSTEM RESET citus.rebalancer_by_disk_size_base_cost; +SELECT pg_reload_conf(); + pg_reload_conf +--------------------------------------------------------------------- + t +(1 row) + \c - - - :worker_1_port SET citus.enable_ddl_propagation TO OFF; REVOKE ALL ON SCHEMA public FROM testrole; diff --git a/src/test/regress/sql/shard_rebalancer.sql b/src/test/regress/sql/shard_rebalancer.sql index f524212bf..ba22a8abd 100644 --- a/src/test/regress/sql/shard_rebalancer.sql +++ b/src/test/regress/sql/shard_rebalancer.sql @@ -5,6 +5,11 @@ SET citus.next_shard_id TO 433000; SET citus.propagate_session_settings_for_loopback_connection TO ON; +-- Lower the minimum disk size that a shard group is considered as. Otherwise +-- we need to create shards of more than 100MB. +ALTER SYSTEM SET citus.rebalancer_by_disk_size_base_cost = 0; +SELECT pg_reload_conf(); + CREATE TABLE ref_table_test(a int primary key); SELECT create_reference_table('ref_table_test'); CREATE TABLE dist_table_test(a int primary key); @@ -1228,7 +1233,7 @@ DROP TABLE tab; -- we don't need the coordinator on pg_dist_node anymore SELECT 1 FROM master_remove_node('localhost', :master_port); -SELECT public.wait_until_metadata_sync(30000); +SELECT public.wait_until_metadata_sync(60000); -- -- Make sure that rebalance_table_shards() and replicate_table_shards() replicate @@ -1569,6 +1574,8 @@ select 1 from citus_add_node('localhost', :worker_2_port); select rebalance_table_shards(); DROP TABLE table_with_primary_key, table_without_primary_key; +ALTER SYSTEM RESET citus.rebalancer_by_disk_size_base_cost; +SELECT pg_reload_conf(); \c - - - :worker_1_port SET citus.enable_ddl_propagation TO OFF; REVOKE ALL ON SCHEMA public FROM testrole;