diff --git a/src/backend/distributed/operations/shard_rebalancer.c b/src/backend/distributed/operations/shard_rebalancer.c index d24936925..baed8b0d5 100644 --- a/src/backend/distributed/operations/shard_rebalancer.c +++ b/src/backend/distributed/operations/shard_rebalancer.c @@ -475,6 +475,7 @@ GetRebalanceSteps(RebalanceOptions *options) /* sort the lists to make the function more deterministic */ List *activeWorkerList = SortedActiveWorkers(); List *activeShardPlacementListList = NIL; + List *unbalancedShards = NIL; Oid relationId = InvalidOid; foreach_oid(relationId, options->relationIdList) @@ -490,8 +491,29 @@ GetRebalanceSteps(RebalanceOptions *options) shardPlacementList, options->workerNode); } - activeShardPlacementListList = - lappend(activeShardPlacementListList, activeShardPlacementListForRelation); + if (list_length(activeShardPlacementListForRelation) >= list_length( + activeWorkerList)) + { + activeShardPlacementListList = lappend(activeShardPlacementListList, + activeShardPlacementListForRelation); + } + else + { + /* + * If the number of shard groups are less than the number of worker nodes, + * at least one of the worker nodes will remain empty. For such cases, + * we consider those shard groups as a colocation group and try to + * distribute them across the cluster. + */ + unbalancedShards = list_concat(unbalancedShards, + activeShardPlacementListForRelation); + } + } + + if (list_length(unbalancedShards) > 0) + { + activeShardPlacementListList = lappend(activeShardPlacementListList, + unbalancedShards); } if (options->threshold < options->rebalanceStrategy->minimumThreshold) diff --git a/src/backend/distributed/test/shard_rebalancer.c b/src/backend/distributed/test/shard_rebalancer.c index 60603f091..56a063982 100644 --- a/src/backend/distributed/test/shard_rebalancer.c +++ b/src/backend/distributed/test/shard_rebalancer.c @@ -147,6 +147,26 @@ shard_placement_rebalance_array(PG_FUNCTION_ARGS) shardPlacementList = SortList(shardPlacementList, CompareShardPlacements); shardPlacementListList = lappend(shardPlacementListList, shardPlacementList); + List *unbalancedShards = NIL; + ListCell *shardPlacementListCell = NULL; + foreach(shardPlacementListCell, shardPlacementListList) + { + List *placementList = (List *) lfirst(shardPlacementListCell); + + if (list_length(placementList) < list_length(workerNodeList)) + { + unbalancedShards = list_concat(unbalancedShards, + placementList); + shardPlacementListList = foreach_delete_current(shardPlacementListList, + shardPlacementListCell); + } + } + + if (list_length(unbalancedShards) > 0) + { + shardPlacementListList = lappend(shardPlacementListList, unbalancedShards); + } + rebalancePlanFunctions.context = &context; /* sort the lists to make the function more deterministic */ diff --git a/src/test/regress/expected/shard_rebalancer.out b/src/test/regress/expected/shard_rebalancer.out index 9eec2cee3..2146d67f1 100644 --- a/src/test/regress/expected/shard_rebalancer.out +++ b/src/test/regress/expected/shard_rebalancer.out @@ -2626,6 +2626,94 @@ RESET citus.shard_count; DROP VIEW table_placements_per_node; DELETE FROM pg_catalog.pg_dist_rebalance_strategy WHERE name='capacity_high_worker_2'; DELETE FROM pg_catalog.pg_dist_rebalance_strategy WHERE name='only_worker_1'; +-- add colocation groups with shard group count < worker count +-- the rebalancer should balance those "unbalanced shards" evenly as much as possible +SELECT 1 FROM citus_remove_node('localhost', :worker_2_port); + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + +create table single_shard_colocation_1a (a int primary key); +create table single_shard_colocation_1b (a int primary key); +create table single_shard_colocation_1c (a int primary key); +SET citus.shard_replication_factor = 1; +select create_distributed_table('single_shard_colocation_1a','a', colocate_with => 'none', shard_count => 1); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +select create_distributed_table('single_shard_colocation_1b','a',colocate_with=>'single_shard_colocation_1a'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +select create_distributed_table('single_shard_colocation_1c','a',colocate_with=>'single_shard_colocation_1b'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +create table single_shard_colocation_2a (a bigint); +create table single_shard_colocation_2b (a bigint); +select create_distributed_table('single_shard_colocation_2a','a', colocate_with => 'none', shard_count => 1); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +select create_distributed_table('single_shard_colocation_2b','a',colocate_with=>'single_shard_colocation_2a'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +-- all shards are placed on the first worker node +SELECT sh.logicalrelid, pl.nodeport + FROM pg_dist_shard sh JOIN pg_dist_shard_placement pl ON sh.shardid = pl.shardid + WHERE sh.logicalrelid::text IN ('single_shard_colocation_1a', 'single_shard_colocation_1b', 'single_shard_colocation_1c', 'single_shard_colocation_2a', 'single_shard_colocation_2b') + ORDER BY sh.logicalrelid; + logicalrelid | nodeport +--------------------------------------------------------------------- + single_shard_colocation_1a | 57637 + single_shard_colocation_1b | 57637 + single_shard_colocation_1c | 57637 + single_shard_colocation_2a | 57637 + single_shard_colocation_2b | 57637 +(5 rows) + +-- add the second node back, then rebalance +ALTER SEQUENCE pg_dist_groupid_seq RESTART WITH 16; +select 1 from citus_add_node('localhost', :worker_2_port); + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + +select rebalance_table_shards(); +NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ... + rebalance_table_shards +--------------------------------------------------------------------- + +(1 row) + +-- verify some shards are moved to the new node +SELECT sh.logicalrelid, pl.nodeport + FROM pg_dist_shard sh JOIN pg_dist_shard_placement pl ON sh.shardid = pl.shardid + WHERE sh.logicalrelid::text IN ('single_shard_colocation_1a', 'single_shard_colocation_1b', 'single_shard_colocation_1c', 'single_shard_colocation_2a', 'single_shard_colocation_2b') + ORDER BY sh.logicalrelid; + logicalrelid | nodeport +--------------------------------------------------------------------- + single_shard_colocation_1a | 57638 + single_shard_colocation_1b | 57638 + single_shard_colocation_1c | 57638 + single_shard_colocation_2a | 57637 + single_shard_colocation_2b | 57637 +(5 rows) + +DROP TABLE single_shard_colocation_1a, single_shard_colocation_1b, single_shard_colocation_1c, single_shard_colocation_2a, single_shard_colocation_2b CASCADE; \c - - - :worker_1_port SET citus.enable_ddl_propagation TO OFF; REVOKE ALL ON SCHEMA public FROM testrole; diff --git a/src/test/regress/expected/shard_rebalancer_unit.out b/src/test/regress/expected/shard_rebalancer_unit.out index 156edfc6b..9ebd6f942 100644 --- a/src/test/regress/expected/shard_rebalancer_unit.out +++ b/src/test/regress/expected/shard_rebalancer_unit.out @@ -742,3 +742,75 @@ HINT: If you do want these moves to happen, try changing improvement_threshold {"updatetype":1,"shardid":2,"sourcename":"a","sourceport":5432,"targetname":"b","targetport":5432} (2 rows) +-- Test single shard colocation groups +SELECT unnest(shard_placement_rebalance_array( + ARRAY['{"node_name": "a"}', + '{"node_name": "b"}']::json[], + ARRAY['{"shardid":1, "cost":20, "nodename":"a"}', + '{"shardid":2, "cost":10, "nodename":"a", "next_colocation": true}', + '{"shardid":3, "cost":10, "nodename":"a", "next_colocation": true}', + '{"shardid":4, "cost":100, "nodename":"a", "next_colocation": true}', + '{"shardid":5, "cost":50, "nodename":"a", "next_colocation": true}', + '{"shardid":6, "cost":50, "nodename":"a", "next_colocation": true}' + ]::json[], + improvement_threshold := 0.1 +)); + unnest +--------------------------------------------------------------------- + {"updatetype":1,"shardid":4,"sourcename":"a","sourceport":5432,"targetname":"b","targetport":5432} + {"updatetype":1,"shardid":1,"sourcename":"a","sourceport":5432,"targetname":"b","targetport":5432} +(2 rows) + +-- Test colocation groups with shard count < worker count +SELECT unnest(shard_placement_rebalance_array( + ARRAY['{"node_name": "a"}', + '{"node_name": "b"}', + '{"node_name": "c"}']::json[], + ARRAY['{"shardid":1, "cost":20, "nodename":"a"}', + '{"shardid":2, "cost":10, "nodename":"a"}', + '{"shardid":3, "cost":10, "nodename":"a", "next_colocation": true}', + '{"shardid":4, "cost":100, "nodename":"a"}', + '{"shardid":5, "cost":50, "nodename":"a", "next_colocation": true}', + '{"shardid":6, "cost":50, "nodename":"a"}' + ]::json[], + improvement_threshold := 0.1 +)); + unnest +--------------------------------------------------------------------- + {"updatetype":1,"shardid":4,"sourcename":"a","sourceport":5432,"targetname":"b","targetport":5432} + {"updatetype":1,"shardid":5,"sourcename":"a","sourceport":5432,"targetname":"c","targetport":5432} + {"updatetype":1,"shardid":1,"sourcename":"a","sourceport":5432,"targetname":"c","targetport":5432} +(3 rows) + +-- Test colocation groups with shard count < worker count +-- mixed with a colocation group shard_count > worker count +SELECT unnest(shard_placement_rebalance_array( + ARRAY['{"node_name": "a"}', + '{"node_name": "b"}', + '{"node_name": "c"}']::json[], + ARRAY['{"shardid":1, "cost":20, "nodename":"a"}', + '{"shardid":2, "cost":10, "nodename":"a"}', + '{"shardid":3, "cost":10, "nodename":"a", "next_colocation": true}', + '{"shardid":4, "cost":100, "nodename":"a"}', + '{"shardid":5, "cost":50, "nodename":"a", "next_colocation": true}', + '{"shardid":6, "cost":50, "nodename":"a"}', + '{"shardid":7, "cost":50, "nodename":"b", "next_colocation": true}', + '{"shardid":8, "cost":50, "nodename":"b"}', + '{"shardid":9, "cost":50, "nodename":"b"}', + '{"shardid":10, "cost":50, "nodename":"b"}', + '{"shardid":11, "cost":50, "nodename":"b"}', + '{"shardid":12, "cost":50, "nodename":"b"}' + ]::json[], + improvement_threshold := 0.1 +)); + unnest +--------------------------------------------------------------------- + {"updatetype":1,"shardid":7,"sourcename":"b","sourceport":5432,"targetname":"a","targetport":5432} + {"updatetype":1,"shardid":8,"sourcename":"b","sourceport":5432,"targetname":"c","targetport":5432} + {"updatetype":1,"shardid":9,"sourcename":"b","sourceport":5432,"targetname":"a","targetport":5432} + {"updatetype":1,"shardid":10,"sourcename":"b","sourceport":5432,"targetname":"c","targetport":5432} + {"updatetype":1,"shardid":4,"sourcename":"a","sourceport":5432,"targetname":"b","targetport":5432} + {"updatetype":1,"shardid":5,"sourcename":"a","sourceport":5432,"targetname":"c","targetport":5432} + {"updatetype":1,"shardid":1,"sourcename":"a","sourceport":5432,"targetname":"c","targetport":5432} +(7 rows) + diff --git a/src/test/regress/sql/shard_rebalancer.sql b/src/test/regress/sql/shard_rebalancer.sql index 02a6df666..dbbc94732 100644 --- a/src/test/regress/sql/shard_rebalancer.sql +++ b/src/test/regress/sql/shard_rebalancer.sql @@ -1462,6 +1462,41 @@ DROP VIEW table_placements_per_node; DELETE FROM pg_catalog.pg_dist_rebalance_strategy WHERE name='capacity_high_worker_2'; DELETE FROM pg_catalog.pg_dist_rebalance_strategy WHERE name='only_worker_1'; +-- add colocation groups with shard group count < worker count +-- the rebalancer should balance those "unbalanced shards" evenly as much as possible +SELECT 1 FROM citus_remove_node('localhost', :worker_2_port); +create table single_shard_colocation_1a (a int primary key); +create table single_shard_colocation_1b (a int primary key); +create table single_shard_colocation_1c (a int primary key); +SET citus.shard_replication_factor = 1; +select create_distributed_table('single_shard_colocation_1a','a', colocate_with => 'none', shard_count => 1); +select create_distributed_table('single_shard_colocation_1b','a',colocate_with=>'single_shard_colocation_1a'); +select create_distributed_table('single_shard_colocation_1c','a',colocate_with=>'single_shard_colocation_1b'); + +create table single_shard_colocation_2a (a bigint); +create table single_shard_colocation_2b (a bigint); +select create_distributed_table('single_shard_colocation_2a','a', colocate_with => 'none', shard_count => 1); +select create_distributed_table('single_shard_colocation_2b','a',colocate_with=>'single_shard_colocation_2a'); + +-- all shards are placed on the first worker node +SELECT sh.logicalrelid, pl.nodeport + FROM pg_dist_shard sh JOIN pg_dist_shard_placement pl ON sh.shardid = pl.shardid + WHERE sh.logicalrelid::text IN ('single_shard_colocation_1a', 'single_shard_colocation_1b', 'single_shard_colocation_1c', 'single_shard_colocation_2a', 'single_shard_colocation_2b') + ORDER BY sh.logicalrelid; + +-- add the second node back, then rebalance +ALTER SEQUENCE pg_dist_groupid_seq RESTART WITH 16; +select 1 from citus_add_node('localhost', :worker_2_port); +select rebalance_table_shards(); + +-- verify some shards are moved to the new node +SELECT sh.logicalrelid, pl.nodeport + FROM pg_dist_shard sh JOIN pg_dist_shard_placement pl ON sh.shardid = pl.shardid + WHERE sh.logicalrelid::text IN ('single_shard_colocation_1a', 'single_shard_colocation_1b', 'single_shard_colocation_1c', 'single_shard_colocation_2a', 'single_shard_colocation_2b') + ORDER BY sh.logicalrelid; + +DROP TABLE single_shard_colocation_1a, single_shard_colocation_1b, single_shard_colocation_1c, single_shard_colocation_2a, single_shard_colocation_2b CASCADE; + \c - - - :worker_1_port SET citus.enable_ddl_propagation TO OFF; REVOKE ALL ON SCHEMA public FROM testrole; diff --git a/src/test/regress/sql/shard_rebalancer_unit.sql b/src/test/regress/sql/shard_rebalancer_unit.sql index 51293a227..607be4710 100644 --- a/src/test/regress/sql/shard_rebalancer_unit.sql +++ b/src/test/regress/sql/shard_rebalancer_unit.sql @@ -530,3 +530,57 @@ SELECT unnest(shard_placement_rebalance_array( ]::json[], improvement_threshold := 0.6 )); + + +-- Test single shard colocation groups +SELECT unnest(shard_placement_rebalance_array( + ARRAY['{"node_name": "a"}', + '{"node_name": "b"}']::json[], + ARRAY['{"shardid":1, "cost":20, "nodename":"a"}', + '{"shardid":2, "cost":10, "nodename":"a", "next_colocation": true}', + '{"shardid":3, "cost":10, "nodename":"a", "next_colocation": true}', + '{"shardid":4, "cost":100, "nodename":"a", "next_colocation": true}', + '{"shardid":5, "cost":50, "nodename":"a", "next_colocation": true}', + '{"shardid":6, "cost":50, "nodename":"a", "next_colocation": true}' + ]::json[], + improvement_threshold := 0.1 +)); + + +-- Test colocation groups with shard count < worker count +SELECT unnest(shard_placement_rebalance_array( + ARRAY['{"node_name": "a"}', + '{"node_name": "b"}', + '{"node_name": "c"}']::json[], + ARRAY['{"shardid":1, "cost":20, "nodename":"a"}', + '{"shardid":2, "cost":10, "nodename":"a"}', + '{"shardid":3, "cost":10, "nodename":"a", "next_colocation": true}', + '{"shardid":4, "cost":100, "nodename":"a"}', + '{"shardid":5, "cost":50, "nodename":"a", "next_colocation": true}', + '{"shardid":6, "cost":50, "nodename":"a"}' + ]::json[], + improvement_threshold := 0.1 +)); + + +-- Test colocation groups with shard count < worker count +-- mixed with a colocation group shard_count > worker count +SELECT unnest(shard_placement_rebalance_array( + ARRAY['{"node_name": "a"}', + '{"node_name": "b"}', + '{"node_name": "c"}']::json[], + ARRAY['{"shardid":1, "cost":20, "nodename":"a"}', + '{"shardid":2, "cost":10, "nodename":"a"}', + '{"shardid":3, "cost":10, "nodename":"a", "next_colocation": true}', + '{"shardid":4, "cost":100, "nodename":"a"}', + '{"shardid":5, "cost":50, "nodename":"a", "next_colocation": true}', + '{"shardid":6, "cost":50, "nodename":"a"}', + '{"shardid":7, "cost":50, "nodename":"b", "next_colocation": true}', + '{"shardid":8, "cost":50, "nodename":"b"}', + '{"shardid":9, "cost":50, "nodename":"b"}', + '{"shardid":10, "cost":50, "nodename":"b"}', + '{"shardid":11, "cost":50, "nodename":"b"}', + '{"shardid":12, "cost":50, "nodename":"b"}' + ]::json[], + improvement_threshold := 0.1 +));