mirror of https://github.com/citusdata/citus.git
Rebalance unbalanced shards
parent
e2654deeae
commit
e16410d1a6
|
@ -475,6 +475,7 @@ GetRebalanceSteps(RebalanceOptions *options)
|
||||||
/* sort the lists to make the function more deterministic */
|
/* sort the lists to make the function more deterministic */
|
||||||
List *activeWorkerList = SortedActiveWorkers();
|
List *activeWorkerList = SortedActiveWorkers();
|
||||||
List *activeShardPlacementListList = NIL;
|
List *activeShardPlacementListList = NIL;
|
||||||
|
List *unbalancedShards = NIL;
|
||||||
|
|
||||||
Oid relationId = InvalidOid;
|
Oid relationId = InvalidOid;
|
||||||
foreach_oid(relationId, options->relationIdList)
|
foreach_oid(relationId, options->relationIdList)
|
||||||
|
@ -490,8 +491,23 @@ GetRebalanceSteps(RebalanceOptions *options)
|
||||||
shardPlacementList, options->workerNode);
|
shardPlacementList, options->workerNode);
|
||||||
}
|
}
|
||||||
|
|
||||||
activeShardPlacementListList =
|
if (list_length(activeShardPlacementListForRelation) >= list_length(
|
||||||
lappend(activeShardPlacementListList, activeShardPlacementListForRelation);
|
activeWorkerList))
|
||||||
|
{
|
||||||
|
activeShardPlacementListList = lappend(activeShardPlacementListList,
|
||||||
|
activeShardPlacementListForRelation);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
unbalancedShards = list_concat(unbalancedShards,
|
||||||
|
activeShardPlacementListForRelation);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (list_length(unbalancedShards) > 0)
|
||||||
|
{
|
||||||
|
activeShardPlacementListList = lappend(activeShardPlacementListList,
|
||||||
|
unbalancedShards);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (options->threshold < options->rebalanceStrategy->minimumThreshold)
|
if (options->threshold < options->rebalanceStrategy->minimumThreshold)
|
||||||
|
|
|
@ -2626,6 +2626,95 @@ RESET citus.shard_count;
|
||||||
DROP VIEW table_placements_per_node;
|
DROP VIEW table_placements_per_node;
|
||||||
DELETE FROM pg_catalog.pg_dist_rebalance_strategy WHERE name='capacity_high_worker_2';
|
DELETE FROM pg_catalog.pg_dist_rebalance_strategy WHERE name='capacity_high_worker_2';
|
||||||
DELETE FROM pg_catalog.pg_dist_rebalance_strategy WHERE name='only_worker_1';
|
DELETE FROM pg_catalog.pg_dist_rebalance_strategy WHERE name='only_worker_1';
|
||||||
|
-- add colocation groups with shard group count < worker count
|
||||||
|
-- the rebalancer should balance those "unbalanced shards" evenly as much as possible
|
||||||
|
SELECT 1 FROM citus_remove_node('localhost', :worker_2_port);
|
||||||
|
?column?
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
1
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
create table table_with_one_shard_1 (a int primary key);
|
||||||
|
create table table_with_one_shard_2 (a int primary key);
|
||||||
|
create table table_with_one_shard_3 (a int primary key);
|
||||||
|
SET citus.shard_replication_factor = 1;
|
||||||
|
SET citus.shard_count = 1;
|
||||||
|
select create_distributed_table('table_with_one_shard_1','a');
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
select create_distributed_table('table_with_one_shard_2','a',colocate_with=>'table_with_one_shard_1');
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
select create_distributed_table('table_with_one_shard_3','a',colocate_with=>'table_with_one_shard_2');
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
create table table_with_one_shard_4 (a bigint);
|
||||||
|
create table table_with_one_shard_5 (a bigint);
|
||||||
|
select create_distributed_table('table_with_one_shard_4','a');
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
select create_distributed_table('table_with_one_shard_5','a',colocate_with=>'table_with_one_shard_4');
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- all shards are placed on the first worker node
|
||||||
|
SELECT sh.logicalrelid, pl.nodeport
|
||||||
|
FROM pg_dist_shard sh JOIN pg_dist_shard_placement pl ON sh.shardid = pl.shardid
|
||||||
|
WHERE sh.logicalrelid::text IN ('table_with_one_shard_1', 'table_with_one_shard_2', 'table_with_one_shard_3', 'table_with_one_shard_4', 'table_with_one_shard_5')
|
||||||
|
ORDER BY sh.logicalrelid;
|
||||||
|
logicalrelid | nodeport
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
table_with_one_shard_1 | 57637
|
||||||
|
table_with_one_shard_2 | 57637
|
||||||
|
table_with_one_shard_3 | 57637
|
||||||
|
table_with_one_shard_4 | 57637
|
||||||
|
table_with_one_shard_5 | 57637
|
||||||
|
(5 rows)
|
||||||
|
|
||||||
|
-- add the second node back, then rebalance
|
||||||
|
ALTER SEQUENCE pg_dist_groupid_seq RESTART WITH 16;
|
||||||
|
select 1 from citus_add_node('localhost', :worker_2_port);
|
||||||
|
?column?
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
1
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
select rebalance_table_shards();
|
||||||
|
NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ...
|
||||||
|
rebalance_table_shards
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- verify some shards are moved to the new node
|
||||||
|
SELECT sh.logicalrelid, pl.nodeport
|
||||||
|
FROM pg_dist_shard sh JOIN pg_dist_shard_placement pl ON sh.shardid = pl.shardid
|
||||||
|
WHERE sh.logicalrelid::text IN ('table_with_one_shard_1', 'table_with_one_shard_2', 'table_with_one_shard_3', 'table_with_one_shard_4', 'table_with_one_shard_5')
|
||||||
|
ORDER BY sh.logicalrelid;
|
||||||
|
logicalrelid | nodeport
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
table_with_one_shard_1 | 57638
|
||||||
|
table_with_one_shard_2 | 57638
|
||||||
|
table_with_one_shard_3 | 57638
|
||||||
|
table_with_one_shard_4 | 57637
|
||||||
|
table_with_one_shard_5 | 57637
|
||||||
|
(5 rows)
|
||||||
|
|
||||||
|
DROP TABLE table_with_one_shard_1, table_with_one_shard_2, table_with_one_shard_3, table_with_one_shard_4, table_with_one_shard_5 CASCADE;
|
||||||
\c - - - :worker_1_port
|
\c - - - :worker_1_port
|
||||||
SET citus.enable_ddl_propagation TO OFF;
|
SET citus.enable_ddl_propagation TO OFF;
|
||||||
REVOKE ALL ON SCHEMA public FROM testrole;
|
REVOKE ALL ON SCHEMA public FROM testrole;
|
||||||
|
|
|
@ -1462,6 +1462,42 @@ DROP VIEW table_placements_per_node;
|
||||||
DELETE FROM pg_catalog.pg_dist_rebalance_strategy WHERE name='capacity_high_worker_2';
|
DELETE FROM pg_catalog.pg_dist_rebalance_strategy WHERE name='capacity_high_worker_2';
|
||||||
DELETE FROM pg_catalog.pg_dist_rebalance_strategy WHERE name='only_worker_1';
|
DELETE FROM pg_catalog.pg_dist_rebalance_strategy WHERE name='only_worker_1';
|
||||||
|
|
||||||
|
-- add colocation groups with shard group count < worker count
|
||||||
|
-- the rebalancer should balance those "unbalanced shards" evenly as much as possible
|
||||||
|
SELECT 1 FROM citus_remove_node('localhost', :worker_2_port);
|
||||||
|
create table table_with_one_shard_1 (a int primary key);
|
||||||
|
create table table_with_one_shard_2 (a int primary key);
|
||||||
|
create table table_with_one_shard_3 (a int primary key);
|
||||||
|
SET citus.shard_replication_factor = 1;
|
||||||
|
SET citus.shard_count = 1;
|
||||||
|
select create_distributed_table('table_with_one_shard_1','a');
|
||||||
|
select create_distributed_table('table_with_one_shard_2','a',colocate_with=>'table_with_one_shard_1');
|
||||||
|
select create_distributed_table('table_with_one_shard_3','a',colocate_with=>'table_with_one_shard_2');
|
||||||
|
|
||||||
|
create table table_with_one_shard_4 (a bigint);
|
||||||
|
create table table_with_one_shard_5 (a bigint);
|
||||||
|
select create_distributed_table('table_with_one_shard_4','a');
|
||||||
|
select create_distributed_table('table_with_one_shard_5','a',colocate_with=>'table_with_one_shard_4');
|
||||||
|
|
||||||
|
-- all shards are placed on the first worker node
|
||||||
|
SELECT sh.logicalrelid, pl.nodeport
|
||||||
|
FROM pg_dist_shard sh JOIN pg_dist_shard_placement pl ON sh.shardid = pl.shardid
|
||||||
|
WHERE sh.logicalrelid::text IN ('table_with_one_shard_1', 'table_with_one_shard_2', 'table_with_one_shard_3', 'table_with_one_shard_4', 'table_with_one_shard_5')
|
||||||
|
ORDER BY sh.logicalrelid;
|
||||||
|
|
||||||
|
-- add the second node back, then rebalance
|
||||||
|
ALTER SEQUENCE pg_dist_groupid_seq RESTART WITH 16;
|
||||||
|
select 1 from citus_add_node('localhost', :worker_2_port);
|
||||||
|
select rebalance_table_shards();
|
||||||
|
|
||||||
|
-- verify some shards are moved to the new node
|
||||||
|
SELECT sh.logicalrelid, pl.nodeport
|
||||||
|
FROM pg_dist_shard sh JOIN pg_dist_shard_placement pl ON sh.shardid = pl.shardid
|
||||||
|
WHERE sh.logicalrelid::text IN ('table_with_one_shard_1', 'table_with_one_shard_2', 'table_with_one_shard_3', 'table_with_one_shard_4', 'table_with_one_shard_5')
|
||||||
|
ORDER BY sh.logicalrelid;
|
||||||
|
|
||||||
|
DROP TABLE table_with_one_shard_1, table_with_one_shard_2, table_with_one_shard_3, table_with_one_shard_4, table_with_one_shard_5 CASCADE;
|
||||||
|
|
||||||
\c - - - :worker_1_port
|
\c - - - :worker_1_port
|
||||||
SET citus.enable_ddl_propagation TO OFF;
|
SET citus.enable_ddl_propagation TO OFF;
|
||||||
REVOKE ALL ON SCHEMA public FROM testrole;
|
REVOKE ALL ON SCHEMA public FROM testrole;
|
||||||
|
|
Loading…
Reference in New Issue