Rebalance shard groups with placement count less than worker count (#6739)

DESCRIPTION: Adds logic to distribute unbalanced shards

If the number of shard placements (for a colocation group) is less than
the number of workers, it means that some of the workers will remain
empty. With this PR, we consider these shard groups as a colocation
group, in order to make them be distributed evenly as much as possible
across the cluster.

Example:
```sql
create table t1 (a int primary key);
create table t2 (a int primary key);
create table t3 (a int primary key);
set citus.shard_count =1;
select create_distributed_table('t1','a');
select create_distributed_table('t2','a',colocate_with=>'t1');
select create_distributed_table('t3','a',colocate_with=>'t2');

create table tb1 (a bigint);
create table tb2 (a bigint);
select create_distributed_table('tb1','a');
select create_distributed_table('tb2','a',colocate_with=>'tb1');

select citus_add_node('localhost',9702);
select rebalance_table_shards();
```

Here we have two colocation groups, each with one shard group. Both
shard groups are placed on the first worker node. When we add a new
worker node and try to rebalance table shards, the rebalance planner
considers it well balanced and does nothing. With this PR, the
rebalancer tries to distribute these shard groups evenly across the
cluster as much as possible. For this example, with this PR, the
rebalancer moves one of the shard groups to the second worker node.

fixes: #6715
pull/6751/head
Ahmet Gedemenli 2023-03-06 14:14:27 +03:00 committed by GitHub
parent ed7cc8f460
commit 03f1bb70b7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 293 additions and 2 deletions

View File

@ -475,6 +475,7 @@ GetRebalanceSteps(RebalanceOptions *options)
/* sort the lists to make the function more deterministic */ /* sort the lists to make the function more deterministic */
List *activeWorkerList = SortedActiveWorkers(); List *activeWorkerList = SortedActiveWorkers();
List *activeShardPlacementListList = NIL; List *activeShardPlacementListList = NIL;
List *unbalancedShards = NIL;
Oid relationId = InvalidOid; Oid relationId = InvalidOid;
foreach_oid(relationId, options->relationIdList) foreach_oid(relationId, options->relationIdList)
@ -490,8 +491,29 @@ GetRebalanceSteps(RebalanceOptions *options)
shardPlacementList, options->workerNode); shardPlacementList, options->workerNode);
} }
activeShardPlacementListList = if (list_length(activeShardPlacementListForRelation) >= list_length(
lappend(activeShardPlacementListList, activeShardPlacementListForRelation); activeWorkerList))
{
activeShardPlacementListList = lappend(activeShardPlacementListList,
activeShardPlacementListForRelation);
}
else
{
/*
* If the number of shard groups are less than the number of worker nodes,
* at least one of the worker nodes will remain empty. For such cases,
* we consider those shard groups as a colocation group and try to
* distribute them across the cluster.
*/
unbalancedShards = list_concat(unbalancedShards,
activeShardPlacementListForRelation);
}
}
if (list_length(unbalancedShards) > 0)
{
activeShardPlacementListList = lappend(activeShardPlacementListList,
unbalancedShards);
} }
if (options->threshold < options->rebalanceStrategy->minimumThreshold) if (options->threshold < options->rebalanceStrategy->minimumThreshold)

View File

@ -147,6 +147,26 @@ shard_placement_rebalance_array(PG_FUNCTION_ARGS)
shardPlacementList = SortList(shardPlacementList, CompareShardPlacements); shardPlacementList = SortList(shardPlacementList, CompareShardPlacements);
shardPlacementListList = lappend(shardPlacementListList, shardPlacementList); shardPlacementListList = lappend(shardPlacementListList, shardPlacementList);
List *unbalancedShards = NIL;
ListCell *shardPlacementListCell = NULL;
foreach(shardPlacementListCell, shardPlacementListList)
{
List *placementList = (List *) lfirst(shardPlacementListCell);
if (list_length(placementList) < list_length(workerNodeList))
{
unbalancedShards = list_concat(unbalancedShards,
placementList);
shardPlacementListList = foreach_delete_current(shardPlacementListList,
shardPlacementListCell);
}
}
if (list_length(unbalancedShards) > 0)
{
shardPlacementListList = lappend(shardPlacementListList, unbalancedShards);
}
rebalancePlanFunctions.context = &context; rebalancePlanFunctions.context = &context;
/* sort the lists to make the function more deterministic */ /* sort the lists to make the function more deterministic */

View File

@ -2626,6 +2626,94 @@ RESET citus.shard_count;
DROP VIEW table_placements_per_node; DROP VIEW table_placements_per_node;
DELETE FROM pg_catalog.pg_dist_rebalance_strategy WHERE name='capacity_high_worker_2'; DELETE FROM pg_catalog.pg_dist_rebalance_strategy WHERE name='capacity_high_worker_2';
DELETE FROM pg_catalog.pg_dist_rebalance_strategy WHERE name='only_worker_1'; DELETE FROM pg_catalog.pg_dist_rebalance_strategy WHERE name='only_worker_1';
-- add colocation groups with shard group count < worker count
-- the rebalancer should balance those "unbalanced shards" evenly as much as possible
SELECT 1 FROM citus_remove_node('localhost', :worker_2_port);
?column?
---------------------------------------------------------------------
1
(1 row)
create table single_shard_colocation_1a (a int primary key);
create table single_shard_colocation_1b (a int primary key);
create table single_shard_colocation_1c (a int primary key);
SET citus.shard_replication_factor = 1;
select create_distributed_table('single_shard_colocation_1a','a', colocate_with => 'none', shard_count => 1);
create_distributed_table
---------------------------------------------------------------------
(1 row)
select create_distributed_table('single_shard_colocation_1b','a',colocate_with=>'single_shard_colocation_1a');
create_distributed_table
---------------------------------------------------------------------
(1 row)
select create_distributed_table('single_shard_colocation_1c','a',colocate_with=>'single_shard_colocation_1b');
create_distributed_table
---------------------------------------------------------------------
(1 row)
create table single_shard_colocation_2a (a bigint);
create table single_shard_colocation_2b (a bigint);
select create_distributed_table('single_shard_colocation_2a','a', colocate_with => 'none', shard_count => 1);
create_distributed_table
---------------------------------------------------------------------
(1 row)
select create_distributed_table('single_shard_colocation_2b','a',colocate_with=>'single_shard_colocation_2a');
create_distributed_table
---------------------------------------------------------------------
(1 row)
-- all shards are placed on the first worker node
SELECT sh.logicalrelid, pl.nodeport
FROM pg_dist_shard sh JOIN pg_dist_shard_placement pl ON sh.shardid = pl.shardid
WHERE sh.logicalrelid::text IN ('single_shard_colocation_1a', 'single_shard_colocation_1b', 'single_shard_colocation_1c', 'single_shard_colocation_2a', 'single_shard_colocation_2b')
ORDER BY sh.logicalrelid;
logicalrelid | nodeport
---------------------------------------------------------------------
single_shard_colocation_1a | 57637
single_shard_colocation_1b | 57637
single_shard_colocation_1c | 57637
single_shard_colocation_2a | 57637
single_shard_colocation_2b | 57637
(5 rows)
-- add the second node back, then rebalance
ALTER SEQUENCE pg_dist_groupid_seq RESTART WITH 16;
select 1 from citus_add_node('localhost', :worker_2_port);
?column?
---------------------------------------------------------------------
1
(1 row)
select rebalance_table_shards();
NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ...
rebalance_table_shards
---------------------------------------------------------------------
(1 row)
-- verify some shards are moved to the new node
SELECT sh.logicalrelid, pl.nodeport
FROM pg_dist_shard sh JOIN pg_dist_shard_placement pl ON sh.shardid = pl.shardid
WHERE sh.logicalrelid::text IN ('single_shard_colocation_1a', 'single_shard_colocation_1b', 'single_shard_colocation_1c', 'single_shard_colocation_2a', 'single_shard_colocation_2b')
ORDER BY sh.logicalrelid;
logicalrelid | nodeport
---------------------------------------------------------------------
single_shard_colocation_1a | 57638
single_shard_colocation_1b | 57638
single_shard_colocation_1c | 57638
single_shard_colocation_2a | 57637
single_shard_colocation_2b | 57637
(5 rows)
DROP TABLE single_shard_colocation_1a, single_shard_colocation_1b, single_shard_colocation_1c, single_shard_colocation_2a, single_shard_colocation_2b CASCADE;
\c - - - :worker_1_port \c - - - :worker_1_port
SET citus.enable_ddl_propagation TO OFF; SET citus.enable_ddl_propagation TO OFF;
REVOKE ALL ON SCHEMA public FROM testrole; REVOKE ALL ON SCHEMA public FROM testrole;

View File

@ -742,3 +742,75 @@ HINT: If you do want these moves to happen, try changing improvement_threshold
{"updatetype":1,"shardid":2,"sourcename":"a","sourceport":5432,"targetname":"b","targetport":5432} {"updatetype":1,"shardid":2,"sourcename":"a","sourceport":5432,"targetname":"b","targetport":5432}
(2 rows) (2 rows)
-- Test single shard colocation groups
SELECT unnest(shard_placement_rebalance_array(
ARRAY['{"node_name": "a"}',
'{"node_name": "b"}']::json[],
ARRAY['{"shardid":1, "cost":20, "nodename":"a"}',
'{"shardid":2, "cost":10, "nodename":"a", "next_colocation": true}',
'{"shardid":3, "cost":10, "nodename":"a", "next_colocation": true}',
'{"shardid":4, "cost":100, "nodename":"a", "next_colocation": true}',
'{"shardid":5, "cost":50, "nodename":"a", "next_colocation": true}',
'{"shardid":6, "cost":50, "nodename":"a", "next_colocation": true}'
]::json[],
improvement_threshold := 0.1
));
unnest
---------------------------------------------------------------------
{"updatetype":1,"shardid":4,"sourcename":"a","sourceport":5432,"targetname":"b","targetport":5432}
{"updatetype":1,"shardid":1,"sourcename":"a","sourceport":5432,"targetname":"b","targetport":5432}
(2 rows)
-- Test colocation groups with shard count < worker count
SELECT unnest(shard_placement_rebalance_array(
ARRAY['{"node_name": "a"}',
'{"node_name": "b"}',
'{"node_name": "c"}']::json[],
ARRAY['{"shardid":1, "cost":20, "nodename":"a"}',
'{"shardid":2, "cost":10, "nodename":"a"}',
'{"shardid":3, "cost":10, "nodename":"a", "next_colocation": true}',
'{"shardid":4, "cost":100, "nodename":"a"}',
'{"shardid":5, "cost":50, "nodename":"a", "next_colocation": true}',
'{"shardid":6, "cost":50, "nodename":"a"}'
]::json[],
improvement_threshold := 0.1
));
unnest
---------------------------------------------------------------------
{"updatetype":1,"shardid":4,"sourcename":"a","sourceport":5432,"targetname":"b","targetport":5432}
{"updatetype":1,"shardid":5,"sourcename":"a","sourceport":5432,"targetname":"c","targetport":5432}
{"updatetype":1,"shardid":1,"sourcename":"a","sourceport":5432,"targetname":"c","targetport":5432}
(3 rows)
-- Test colocation groups with shard count < worker count
-- mixed with a colocation group shard_count > worker count
SELECT unnest(shard_placement_rebalance_array(
ARRAY['{"node_name": "a"}',
'{"node_name": "b"}',
'{"node_name": "c"}']::json[],
ARRAY['{"shardid":1, "cost":20, "nodename":"a"}',
'{"shardid":2, "cost":10, "nodename":"a"}',
'{"shardid":3, "cost":10, "nodename":"a", "next_colocation": true}',
'{"shardid":4, "cost":100, "nodename":"a"}',
'{"shardid":5, "cost":50, "nodename":"a", "next_colocation": true}',
'{"shardid":6, "cost":50, "nodename":"a"}',
'{"shardid":7, "cost":50, "nodename":"b", "next_colocation": true}',
'{"shardid":8, "cost":50, "nodename":"b"}',
'{"shardid":9, "cost":50, "nodename":"b"}',
'{"shardid":10, "cost":50, "nodename":"b"}',
'{"shardid":11, "cost":50, "nodename":"b"}',
'{"shardid":12, "cost":50, "nodename":"b"}'
]::json[],
improvement_threshold := 0.1
));
unnest
---------------------------------------------------------------------
{"updatetype":1,"shardid":7,"sourcename":"b","sourceport":5432,"targetname":"a","targetport":5432}
{"updatetype":1,"shardid":8,"sourcename":"b","sourceport":5432,"targetname":"c","targetport":5432}
{"updatetype":1,"shardid":9,"sourcename":"b","sourceport":5432,"targetname":"a","targetport":5432}
{"updatetype":1,"shardid":10,"sourcename":"b","sourceport":5432,"targetname":"c","targetport":5432}
{"updatetype":1,"shardid":4,"sourcename":"a","sourceport":5432,"targetname":"b","targetport":5432}
{"updatetype":1,"shardid":5,"sourcename":"a","sourceport":5432,"targetname":"c","targetport":5432}
{"updatetype":1,"shardid":1,"sourcename":"a","sourceport":5432,"targetname":"c","targetport":5432}
(7 rows)

View File

@ -1462,6 +1462,41 @@ DROP VIEW table_placements_per_node;
DELETE FROM pg_catalog.pg_dist_rebalance_strategy WHERE name='capacity_high_worker_2'; DELETE FROM pg_catalog.pg_dist_rebalance_strategy WHERE name='capacity_high_worker_2';
DELETE FROM pg_catalog.pg_dist_rebalance_strategy WHERE name='only_worker_1'; DELETE FROM pg_catalog.pg_dist_rebalance_strategy WHERE name='only_worker_1';
-- add colocation groups with shard group count < worker count
-- the rebalancer should balance those "unbalanced shards" evenly as much as possible
SELECT 1 FROM citus_remove_node('localhost', :worker_2_port);
create table single_shard_colocation_1a (a int primary key);
create table single_shard_colocation_1b (a int primary key);
create table single_shard_colocation_1c (a int primary key);
SET citus.shard_replication_factor = 1;
select create_distributed_table('single_shard_colocation_1a','a', colocate_with => 'none', shard_count => 1);
select create_distributed_table('single_shard_colocation_1b','a',colocate_with=>'single_shard_colocation_1a');
select create_distributed_table('single_shard_colocation_1c','a',colocate_with=>'single_shard_colocation_1b');
create table single_shard_colocation_2a (a bigint);
create table single_shard_colocation_2b (a bigint);
select create_distributed_table('single_shard_colocation_2a','a', colocate_with => 'none', shard_count => 1);
select create_distributed_table('single_shard_colocation_2b','a',colocate_with=>'single_shard_colocation_2a');
-- all shards are placed on the first worker node
SELECT sh.logicalrelid, pl.nodeport
FROM pg_dist_shard sh JOIN pg_dist_shard_placement pl ON sh.shardid = pl.shardid
WHERE sh.logicalrelid::text IN ('single_shard_colocation_1a', 'single_shard_colocation_1b', 'single_shard_colocation_1c', 'single_shard_colocation_2a', 'single_shard_colocation_2b')
ORDER BY sh.logicalrelid;
-- add the second node back, then rebalance
ALTER SEQUENCE pg_dist_groupid_seq RESTART WITH 16;
select 1 from citus_add_node('localhost', :worker_2_port);
select rebalance_table_shards();
-- verify some shards are moved to the new node
SELECT sh.logicalrelid, pl.nodeport
FROM pg_dist_shard sh JOIN pg_dist_shard_placement pl ON sh.shardid = pl.shardid
WHERE sh.logicalrelid::text IN ('single_shard_colocation_1a', 'single_shard_colocation_1b', 'single_shard_colocation_1c', 'single_shard_colocation_2a', 'single_shard_colocation_2b')
ORDER BY sh.logicalrelid;
DROP TABLE single_shard_colocation_1a, single_shard_colocation_1b, single_shard_colocation_1c, single_shard_colocation_2a, single_shard_colocation_2b CASCADE;
\c - - - :worker_1_port \c - - - :worker_1_port
SET citus.enable_ddl_propagation TO OFF; SET citus.enable_ddl_propagation TO OFF;
REVOKE ALL ON SCHEMA public FROM testrole; REVOKE ALL ON SCHEMA public FROM testrole;

View File

@ -530,3 +530,57 @@ SELECT unnest(shard_placement_rebalance_array(
]::json[], ]::json[],
improvement_threshold := 0.6 improvement_threshold := 0.6
)); ));
-- Test single shard colocation groups
SELECT unnest(shard_placement_rebalance_array(
ARRAY['{"node_name": "a"}',
'{"node_name": "b"}']::json[],
ARRAY['{"shardid":1, "cost":20, "nodename":"a"}',
'{"shardid":2, "cost":10, "nodename":"a", "next_colocation": true}',
'{"shardid":3, "cost":10, "nodename":"a", "next_colocation": true}',
'{"shardid":4, "cost":100, "nodename":"a", "next_colocation": true}',
'{"shardid":5, "cost":50, "nodename":"a", "next_colocation": true}',
'{"shardid":6, "cost":50, "nodename":"a", "next_colocation": true}'
]::json[],
improvement_threshold := 0.1
));
-- Test colocation groups with shard count < worker count
SELECT unnest(shard_placement_rebalance_array(
ARRAY['{"node_name": "a"}',
'{"node_name": "b"}',
'{"node_name": "c"}']::json[],
ARRAY['{"shardid":1, "cost":20, "nodename":"a"}',
'{"shardid":2, "cost":10, "nodename":"a"}',
'{"shardid":3, "cost":10, "nodename":"a", "next_colocation": true}',
'{"shardid":4, "cost":100, "nodename":"a"}',
'{"shardid":5, "cost":50, "nodename":"a", "next_colocation": true}',
'{"shardid":6, "cost":50, "nodename":"a"}'
]::json[],
improvement_threshold := 0.1
));
-- Test colocation groups with shard count < worker count
-- mixed with a colocation group shard_count > worker count
SELECT unnest(shard_placement_rebalance_array(
ARRAY['{"node_name": "a"}',
'{"node_name": "b"}',
'{"node_name": "c"}']::json[],
ARRAY['{"shardid":1, "cost":20, "nodename":"a"}',
'{"shardid":2, "cost":10, "nodename":"a"}',
'{"shardid":3, "cost":10, "nodename":"a", "next_colocation": true}',
'{"shardid":4, "cost":100, "nodename":"a"}',
'{"shardid":5, "cost":50, "nodename":"a", "next_colocation": true}',
'{"shardid":6, "cost":50, "nodename":"a"}',
'{"shardid":7, "cost":50, "nodename":"b", "next_colocation": true}',
'{"shardid":8, "cost":50, "nodename":"b"}',
'{"shardid":9, "cost":50, "nodename":"b"}',
'{"shardid":10, "cost":50, "nodename":"b"}',
'{"shardid":11, "cost":50, "nodename":"b"}',
'{"shardid":12, "cost":50, "nodename":"b"}'
]::json[],
improvement_threshold := 0.1
));