mirror of https://github.com/citusdata/citus.git
Ignore nodes not allowed for shards, when planning rebalance steps (#6887)
We are handling colocation groups with shard group count less than the worker node count, using a method different than the usual rebalancer. See #6739 While making the decision of using this method or not, we should've ignored the nodes that are marked `shouldhaveshards = false`. This PR excludes those nodes when making the decision. Adds a test such that: coordinator: [] worker 1: [1_1, 1_2] worker 2: [2_1, 2_2] (rebalance) coordinator: [] worker 1: [1_1, 2_1] worker 2: [1_2, 2_2] If we take the coordinator into account, the rebalancer considers the first state as balanced and does nothing (because shard_count < worker_count) But with this pr, we ignore the coordinator because it's shouldhaveshards = false So the rebalancer distributes each colocation group to both workers Also, fixes an unrelated flaky test in the same filepull/6898/head
parent
8cb69cfd13
commit
59ccf364df
|
@ -515,6 +515,16 @@ GetRebalanceSteps(RebalanceOptions *options)
|
||||||
|
|
||||||
/* sort the lists to make the function more deterministic */
|
/* sort the lists to make the function more deterministic */
|
||||||
List *activeWorkerList = SortedActiveWorkers();
|
List *activeWorkerList = SortedActiveWorkers();
|
||||||
|
int shardAllowedNodeCount = 0;
|
||||||
|
WorkerNode *workerNode = NULL;
|
||||||
|
foreach_ptr(workerNode, activeWorkerList)
|
||||||
|
{
|
||||||
|
if (workerNode->shouldHaveShards)
|
||||||
|
{
|
||||||
|
shardAllowedNodeCount++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
List *activeShardPlacementListList = NIL;
|
List *activeShardPlacementListList = NIL;
|
||||||
List *unbalancedShards = NIL;
|
List *unbalancedShards = NIL;
|
||||||
|
|
||||||
|
@ -532,8 +542,7 @@ GetRebalanceSteps(RebalanceOptions *options)
|
||||||
shardPlacementList, options->workerNode);
|
shardPlacementList, options->workerNode);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (list_length(activeShardPlacementListForRelation) >= list_length(
|
if (list_length(activeShardPlacementListForRelation) >= shardAllowedNodeCount)
|
||||||
activeWorkerList))
|
|
||||||
{
|
{
|
||||||
activeShardPlacementListList = lappend(activeShardPlacementListList,
|
activeShardPlacementListList = lappend(activeShardPlacementListList,
|
||||||
activeShardPlacementListForRelation);
|
activeShardPlacementListForRelation);
|
||||||
|
|
|
@ -20,13 +20,14 @@ SELECT create_distributed_table('dist_table_test', 'a');
|
||||||
CREATE TABLE postgres_table_test(a int primary key);
|
CREATE TABLE postgres_table_test(a int primary key);
|
||||||
-- make sure that all rebalance operations works fine when
|
-- make sure that all rebalance operations works fine when
|
||||||
-- reference tables are replicated to the coordinator
|
-- reference tables are replicated to the coordinator
|
||||||
|
SET client_min_messages TO ERROR;
|
||||||
SELECT 1 FROM master_add_node('localhost', :master_port, groupId=>0);
|
SELECT 1 FROM master_add_node('localhost', :master_port, groupId=>0);
|
||||||
NOTICE: localhost:xxxxx is the coordinator and already contains metadata, skipping syncing the metadata
|
|
||||||
?column?
|
?column?
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
1
|
1
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
|
RESET client_min_messages;
|
||||||
-- should just be noops even if we add the coordinator to the pg_dist_node
|
-- should just be noops even if we add the coordinator to the pg_dist_node
|
||||||
SELECT rebalance_table_shards('dist_table_test');
|
SELECT rebalance_table_shards('dist_table_test');
|
||||||
rebalance_table_shards
|
rebalance_table_shards
|
||||||
|
@ -2713,6 +2714,113 @@ SELECT sh.logicalrelid, pl.nodeport
|
||||||
(5 rows)
|
(5 rows)
|
||||||
|
|
||||||
DROP TABLE single_shard_colocation_1a, single_shard_colocation_1b, single_shard_colocation_1c, single_shard_colocation_2a, single_shard_colocation_2b CASCADE;
|
DROP TABLE single_shard_colocation_1a, single_shard_colocation_1b, single_shard_colocation_1c, single_shard_colocation_2a, single_shard_colocation_2b CASCADE;
|
||||||
|
-- test the same with coordinator shouldhaveshards = false and shard_count = 2
|
||||||
|
-- so that the shard allowed node count would be 2 when rebalancing
|
||||||
|
-- for such cases, we only count the nodes that are allowed for shard placements
|
||||||
|
UPDATE pg_dist_node SET shouldhaveshards=false WHERE nodeport = :master_port;
|
||||||
|
create table two_shard_colocation_1a (a int primary key);
|
||||||
|
create table two_shard_colocation_1b (a int primary key);
|
||||||
|
SET citus.shard_replication_factor = 1;
|
||||||
|
select create_distributed_table('two_shard_colocation_1a','a', colocate_with => 'none', shard_count => 2);
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
select create_distributed_table('two_shard_colocation_1b','a',colocate_with=>'two_shard_colocation_1a');
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
create table two_shard_colocation_2a (a int primary key);
|
||||||
|
create table two_shard_colocation_2b (a int primary key);
|
||||||
|
select create_distributed_table('two_shard_colocation_2a','a', colocate_with => 'none', shard_count => 2);
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
select create_distributed_table('two_shard_colocation_2b','a',colocate_with=>'two_shard_colocation_2a');
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- move shards of colocation group 1 to worker1
|
||||||
|
SELECT citus_move_shard_placement(sh.shardid, 'localhost', :worker_2_port, 'localhost', :worker_1_port)
|
||||||
|
FROM pg_dist_shard sh JOIN pg_dist_shard_placement pl ON sh.shardid = pl.shardid
|
||||||
|
WHERE sh.logicalrelid = 'two_shard_colocation_1a'::regclass
|
||||||
|
AND pl.nodeport = :worker_2_port
|
||||||
|
LIMIT 1;
|
||||||
|
citus_move_shard_placement
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- move shards of colocation group 2 to worker2
|
||||||
|
SELECT citus_move_shard_placement(sh.shardid, 'localhost', :worker_1_port, 'localhost', :worker_2_port)
|
||||||
|
FROM pg_dist_shard sh JOIN pg_dist_shard_placement pl ON sh.shardid = pl.shardid
|
||||||
|
WHERE sh.logicalrelid = 'two_shard_colocation_2a'::regclass
|
||||||
|
AND pl.nodeport = :worker_1_port
|
||||||
|
LIMIT 1;
|
||||||
|
citus_move_shard_placement
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- current state:
|
||||||
|
-- coordinator: []
|
||||||
|
-- worker 1: [1_1, 1_2]
|
||||||
|
-- worker 2: [2_1, 2_2]
|
||||||
|
SELECT sh.logicalrelid, pl.nodeport
|
||||||
|
FROM pg_dist_shard sh JOIN pg_dist_shard_placement pl ON sh.shardid = pl.shardid
|
||||||
|
WHERE sh.logicalrelid::text IN ('two_shard_colocation_1a', 'two_shard_colocation_1b', 'two_shard_colocation_2a', 'two_shard_colocation_2b')
|
||||||
|
ORDER BY sh.logicalrelid, pl.nodeport;
|
||||||
|
logicalrelid | nodeport
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
two_shard_colocation_1a | 57637
|
||||||
|
two_shard_colocation_1a | 57637
|
||||||
|
two_shard_colocation_1b | 57637
|
||||||
|
two_shard_colocation_1b | 57637
|
||||||
|
two_shard_colocation_2a | 57638
|
||||||
|
two_shard_colocation_2a | 57638
|
||||||
|
two_shard_colocation_2b | 57638
|
||||||
|
two_shard_colocation_2b | 57638
|
||||||
|
(8 rows)
|
||||||
|
|
||||||
|
-- If we take the coordinator into account, the rebalancer considers this as balanced and does nothing (shard_count < worker_count)
|
||||||
|
-- but because the coordinator is not allowed for shards, rebalancer will distribute each colocation group to both workers
|
||||||
|
select rebalance_table_shards(shard_transfer_mode:='block_writes');
|
||||||
|
NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ...
|
||||||
|
NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ...
|
||||||
|
rebalance_table_shards
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- final state:
|
||||||
|
-- coordinator: []
|
||||||
|
-- worker 1: [1_1, 2_1]
|
||||||
|
-- worker 2: [1_2, 2_2]
|
||||||
|
SELECT sh.logicalrelid, pl.nodeport
|
||||||
|
FROM pg_dist_shard sh JOIN pg_dist_shard_placement pl ON sh.shardid = pl.shardid
|
||||||
|
WHERE sh.logicalrelid::text IN ('two_shard_colocation_1a', 'two_shard_colocation_1b', 'two_shard_colocation_2a', 'two_shard_colocation_2b')
|
||||||
|
ORDER BY sh.logicalrelid, pl.nodeport;
|
||||||
|
logicalrelid | nodeport
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
two_shard_colocation_1a | 57637
|
||||||
|
two_shard_colocation_1a | 57638
|
||||||
|
two_shard_colocation_1b | 57637
|
||||||
|
two_shard_colocation_1b | 57638
|
||||||
|
two_shard_colocation_2a | 57637
|
||||||
|
two_shard_colocation_2a | 57638
|
||||||
|
two_shard_colocation_2b | 57637
|
||||||
|
two_shard_colocation_2b | 57638
|
||||||
|
(8 rows)
|
||||||
|
|
||||||
|
-- cleanup
|
||||||
|
DROP TABLE two_shard_colocation_1a, two_shard_colocation_1b, two_shard_colocation_2a, two_shard_colocation_2b CASCADE;
|
||||||
-- verify we detect if one of the tables do not have a replica identity or primary key
|
-- verify we detect if one of the tables do not have a replica identity or primary key
|
||||||
-- and error out in case of shard transfer mode = auto
|
-- and error out in case of shard transfer mode = auto
|
||||||
SELECT 1 FROM citus_remove_node('localhost', :worker_2_port);
|
SELECT 1 FROM citus_remove_node('localhost', :worker_2_port);
|
||||||
|
|
|
@ -13,7 +13,9 @@ CREATE TABLE postgres_table_test(a int primary key);
|
||||||
|
|
||||||
-- make sure that all rebalance operations works fine when
|
-- make sure that all rebalance operations works fine when
|
||||||
-- reference tables are replicated to the coordinator
|
-- reference tables are replicated to the coordinator
|
||||||
|
SET client_min_messages TO ERROR;
|
||||||
SELECT 1 FROM master_add_node('localhost', :master_port, groupId=>0);
|
SELECT 1 FROM master_add_node('localhost', :master_port, groupId=>0);
|
||||||
|
RESET client_min_messages;
|
||||||
|
|
||||||
-- should just be noops even if we add the coordinator to the pg_dist_node
|
-- should just be noops even if we add the coordinator to the pg_dist_node
|
||||||
SELECT rebalance_table_shards('dist_table_test');
|
SELECT rebalance_table_shards('dist_table_test');
|
||||||
|
@ -1497,6 +1499,61 @@ SELECT sh.logicalrelid, pl.nodeport
|
||||||
|
|
||||||
DROP TABLE single_shard_colocation_1a, single_shard_colocation_1b, single_shard_colocation_1c, single_shard_colocation_2a, single_shard_colocation_2b CASCADE;
|
DROP TABLE single_shard_colocation_1a, single_shard_colocation_1b, single_shard_colocation_1c, single_shard_colocation_2a, single_shard_colocation_2b CASCADE;
|
||||||
|
|
||||||
|
-- test the same with coordinator shouldhaveshards = false and shard_count = 2
|
||||||
|
-- so that the shard allowed node count would be 2 when rebalancing
|
||||||
|
-- for such cases, we only count the nodes that are allowed for shard placements
|
||||||
|
UPDATE pg_dist_node SET shouldhaveshards=false WHERE nodeport = :master_port;
|
||||||
|
|
||||||
|
create table two_shard_colocation_1a (a int primary key);
|
||||||
|
create table two_shard_colocation_1b (a int primary key);
|
||||||
|
SET citus.shard_replication_factor = 1;
|
||||||
|
|
||||||
|
select create_distributed_table('two_shard_colocation_1a','a', colocate_with => 'none', shard_count => 2);
|
||||||
|
select create_distributed_table('two_shard_colocation_1b','a',colocate_with=>'two_shard_colocation_1a');
|
||||||
|
|
||||||
|
create table two_shard_colocation_2a (a int primary key);
|
||||||
|
create table two_shard_colocation_2b (a int primary key);
|
||||||
|
select create_distributed_table('two_shard_colocation_2a','a', colocate_with => 'none', shard_count => 2);
|
||||||
|
select create_distributed_table('two_shard_colocation_2b','a',colocate_with=>'two_shard_colocation_2a');
|
||||||
|
|
||||||
|
-- move shards of colocation group 1 to worker1
|
||||||
|
SELECT citus_move_shard_placement(sh.shardid, 'localhost', :worker_2_port, 'localhost', :worker_1_port)
|
||||||
|
FROM pg_dist_shard sh JOIN pg_dist_shard_placement pl ON sh.shardid = pl.shardid
|
||||||
|
WHERE sh.logicalrelid = 'two_shard_colocation_1a'::regclass
|
||||||
|
AND pl.nodeport = :worker_2_port
|
||||||
|
LIMIT 1;
|
||||||
|
-- move shards of colocation group 2 to worker2
|
||||||
|
SELECT citus_move_shard_placement(sh.shardid, 'localhost', :worker_1_port, 'localhost', :worker_2_port)
|
||||||
|
FROM pg_dist_shard sh JOIN pg_dist_shard_placement pl ON sh.shardid = pl.shardid
|
||||||
|
WHERE sh.logicalrelid = 'two_shard_colocation_2a'::regclass
|
||||||
|
AND pl.nodeport = :worker_1_port
|
||||||
|
LIMIT 1;
|
||||||
|
|
||||||
|
-- current state:
|
||||||
|
-- coordinator: []
|
||||||
|
-- worker 1: [1_1, 1_2]
|
||||||
|
-- worker 2: [2_1, 2_2]
|
||||||
|
SELECT sh.logicalrelid, pl.nodeport
|
||||||
|
FROM pg_dist_shard sh JOIN pg_dist_shard_placement pl ON sh.shardid = pl.shardid
|
||||||
|
WHERE sh.logicalrelid::text IN ('two_shard_colocation_1a', 'two_shard_colocation_1b', 'two_shard_colocation_2a', 'two_shard_colocation_2b')
|
||||||
|
ORDER BY sh.logicalrelid, pl.nodeport;
|
||||||
|
|
||||||
|
-- If we take the coordinator into account, the rebalancer considers this as balanced and does nothing (shard_count < worker_count)
|
||||||
|
-- but because the coordinator is not allowed for shards, rebalancer will distribute each colocation group to both workers
|
||||||
|
select rebalance_table_shards(shard_transfer_mode:='block_writes');
|
||||||
|
|
||||||
|
-- final state:
|
||||||
|
-- coordinator: []
|
||||||
|
-- worker 1: [1_1, 2_1]
|
||||||
|
-- worker 2: [1_2, 2_2]
|
||||||
|
SELECT sh.logicalrelid, pl.nodeport
|
||||||
|
FROM pg_dist_shard sh JOIN pg_dist_shard_placement pl ON sh.shardid = pl.shardid
|
||||||
|
WHERE sh.logicalrelid::text IN ('two_shard_colocation_1a', 'two_shard_colocation_1b', 'two_shard_colocation_2a', 'two_shard_colocation_2b')
|
||||||
|
ORDER BY sh.logicalrelid, pl.nodeport;
|
||||||
|
|
||||||
|
-- cleanup
|
||||||
|
DROP TABLE two_shard_colocation_1a, two_shard_colocation_1b, two_shard_colocation_2a, two_shard_colocation_2b CASCADE;
|
||||||
|
|
||||||
-- verify we detect if one of the tables do not have a replica identity or primary key
|
-- verify we detect if one of the tables do not have a replica identity or primary key
|
||||||
-- and error out in case of shard transfer mode = auto
|
-- and error out in case of shard transfer mode = auto
|
||||||
SELECT 1 FROM citus_remove_node('localhost', :worker_2_port);
|
SELECT 1 FROM citus_remove_node('localhost', :worker_2_port);
|
||||||
|
|
Loading…
Reference in New Issue