From e16410d1a6e0b8491d47f70ee35792af3a1a234a Mon Sep 17 00:00:00 2001 From: Ahmet Gedemenli Date: Wed, 1 Mar 2023 15:45:55 +0300 Subject: [PATCH] Rebalance unbalanced shards --- .../distributed/operations/shard_rebalancer.c | 20 ++++- .../regress/expected/shard_rebalancer.out | 89 +++++++++++++++++++ src/test/regress/sql/shard_rebalancer.sql | 36 ++++++++ 3 files changed, 143 insertions(+), 2 deletions(-) diff --git a/src/backend/distributed/operations/shard_rebalancer.c b/src/backend/distributed/operations/shard_rebalancer.c index d24936925..d549eced8 100644 --- a/src/backend/distributed/operations/shard_rebalancer.c +++ b/src/backend/distributed/operations/shard_rebalancer.c @@ -475,6 +475,7 @@ GetRebalanceSteps(RebalanceOptions *options) /* sort the lists to make the function more deterministic */ List *activeWorkerList = SortedActiveWorkers(); List *activeShardPlacementListList = NIL; + List *unbalancedShards = NIL; Oid relationId = InvalidOid; foreach_oid(relationId, options->relationIdList) @@ -490,8 +491,23 @@ GetRebalanceSteps(RebalanceOptions *options) shardPlacementList, options->workerNode); } - activeShardPlacementListList = - lappend(activeShardPlacementListList, activeShardPlacementListForRelation); + if (list_length(activeShardPlacementListForRelation) >= list_length( + activeWorkerList)) + { + activeShardPlacementListList = lappend(activeShardPlacementListList, + activeShardPlacementListForRelation); + } + else + { + unbalancedShards = list_concat(unbalancedShards, + activeShardPlacementListForRelation); + } + } + + if (list_length(unbalancedShards) > 0) + { + activeShardPlacementListList = lappend(activeShardPlacementListList, + unbalancedShards); } if (options->threshold < options->rebalanceStrategy->minimumThreshold) diff --git a/src/test/regress/expected/shard_rebalancer.out b/src/test/regress/expected/shard_rebalancer.out index 9eec2cee3..90530e79b 100644 --- a/src/test/regress/expected/shard_rebalancer.out +++ b/src/test/regress/expected/shard_rebalancer.out @@ -2626,6 +2626,95 @@ RESET citus.shard_count; DROP VIEW table_placements_per_node; DELETE FROM pg_catalog.pg_dist_rebalance_strategy WHERE name='capacity_high_worker_2'; DELETE FROM pg_catalog.pg_dist_rebalance_strategy WHERE name='only_worker_1'; +-- add colocation groups with shard group count < worker count +-- the rebalancer should balance those "unbalanced shards" evenly as much as possible +SELECT 1 FROM citus_remove_node('localhost', :worker_2_port); + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + +create table table_with_one_shard_1 (a int primary key); +create table table_with_one_shard_2 (a int primary key); +create table table_with_one_shard_3 (a int primary key); +SET citus.shard_replication_factor = 1; +SET citus.shard_count = 1; +select create_distributed_table('table_with_one_shard_1','a'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +select create_distributed_table('table_with_one_shard_2','a',colocate_with=>'table_with_one_shard_1'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +select create_distributed_table('table_with_one_shard_3','a',colocate_with=>'table_with_one_shard_2'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +create table table_with_one_shard_4 (a bigint); +create table table_with_one_shard_5 (a bigint); +select create_distributed_table('table_with_one_shard_4','a'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +select create_distributed_table('table_with_one_shard_5','a',colocate_with=>'table_with_one_shard_4'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +-- all shards are placed on the first worker node +SELECT sh.logicalrelid, pl.nodeport + FROM pg_dist_shard sh JOIN pg_dist_shard_placement pl ON sh.shardid = pl.shardid + WHERE sh.logicalrelid::text IN ('table_with_one_shard_1', 'table_with_one_shard_2', 'table_with_one_shard_3', 'table_with_one_shard_4', 'table_with_one_shard_5') + ORDER BY sh.logicalrelid; + logicalrelid | nodeport +--------------------------------------------------------------------- + table_with_one_shard_1 | 57637 + table_with_one_shard_2 | 57637 + table_with_one_shard_3 | 57637 + table_with_one_shard_4 | 57637 + table_with_one_shard_5 | 57637 +(5 rows) + +-- add the second node back, then rebalance +ALTER SEQUENCE pg_dist_groupid_seq RESTART WITH 16; +select 1 from citus_add_node('localhost', :worker_2_port); + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + +select rebalance_table_shards(); +NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ... + rebalance_table_shards +--------------------------------------------------------------------- + +(1 row) + +-- verify some shards are moved to the new node +SELECT sh.logicalrelid, pl.nodeport + FROM pg_dist_shard sh JOIN pg_dist_shard_placement pl ON sh.shardid = pl.shardid + WHERE sh.logicalrelid::text IN ('table_with_one_shard_1', 'table_with_one_shard_2', 'table_with_one_shard_3', 'table_with_one_shard_4', 'table_with_one_shard_5') + ORDER BY sh.logicalrelid; + logicalrelid | nodeport +--------------------------------------------------------------------- + table_with_one_shard_1 | 57638 + table_with_one_shard_2 | 57638 + table_with_one_shard_3 | 57638 + table_with_one_shard_4 | 57637 + table_with_one_shard_5 | 57637 +(5 rows) + +DROP TABLE table_with_one_shard_1, table_with_one_shard_2, table_with_one_shard_3, table_with_one_shard_4, table_with_one_shard_5 CASCADE; \c - - - :worker_1_port SET citus.enable_ddl_propagation TO OFF; REVOKE ALL ON SCHEMA public FROM testrole; diff --git a/src/test/regress/sql/shard_rebalancer.sql b/src/test/regress/sql/shard_rebalancer.sql index 02a6df666..08ef37273 100644 --- a/src/test/regress/sql/shard_rebalancer.sql +++ b/src/test/regress/sql/shard_rebalancer.sql @@ -1462,6 +1462,42 @@ DROP VIEW table_placements_per_node; DELETE FROM pg_catalog.pg_dist_rebalance_strategy WHERE name='capacity_high_worker_2'; DELETE FROM pg_catalog.pg_dist_rebalance_strategy WHERE name='only_worker_1'; +-- add colocation groups with shard group count < worker count +-- the rebalancer should balance those "unbalanced shards" evenly as much as possible +SELECT 1 FROM citus_remove_node('localhost', :worker_2_port); +create table table_with_one_shard_1 (a int primary key); +create table table_with_one_shard_2 (a int primary key); +create table table_with_one_shard_3 (a int primary key); +SET citus.shard_replication_factor = 1; +SET citus.shard_count = 1; +select create_distributed_table('table_with_one_shard_1','a'); +select create_distributed_table('table_with_one_shard_2','a',colocate_with=>'table_with_one_shard_1'); +select create_distributed_table('table_with_one_shard_3','a',colocate_with=>'table_with_one_shard_2'); + +create table table_with_one_shard_4 (a bigint); +create table table_with_one_shard_5 (a bigint); +select create_distributed_table('table_with_one_shard_4','a'); +select create_distributed_table('table_with_one_shard_5','a',colocate_with=>'table_with_one_shard_4'); + +-- all shards are placed on the first worker node +SELECT sh.logicalrelid, pl.nodeport + FROM pg_dist_shard sh JOIN pg_dist_shard_placement pl ON sh.shardid = pl.shardid + WHERE sh.logicalrelid::text IN ('table_with_one_shard_1', 'table_with_one_shard_2', 'table_with_one_shard_3', 'table_with_one_shard_4', 'table_with_one_shard_5') + ORDER BY sh.logicalrelid; + +-- add the second node back, then rebalance +ALTER SEQUENCE pg_dist_groupid_seq RESTART WITH 16; +select 1 from citus_add_node('localhost', :worker_2_port); +select rebalance_table_shards(); + +-- verify some shards are moved to the new node +SELECT sh.logicalrelid, pl.nodeport + FROM pg_dist_shard sh JOIN pg_dist_shard_placement pl ON sh.shardid = pl.shardid + WHERE sh.logicalrelid::text IN ('table_with_one_shard_1', 'table_with_one_shard_2', 'table_with_one_shard_3', 'table_with_one_shard_4', 'table_with_one_shard_5') + ORDER BY sh.logicalrelid; + +DROP TABLE table_with_one_shard_1, table_with_one_shard_2, table_with_one_shard_3, table_with_one_shard_4, table_with_one_shard_5 CASCADE; + \c - - - :worker_1_port SET citus.enable_ddl_propagation TO OFF; REVOKE ALL ON SCHEMA public FROM testrole;