diff --git a/src/backend/distributed/operations/shard_rebalancer.c b/src/backend/distributed/operations/shard_rebalancer.c index b5ec9b7ba..082e0a8b5 100644 --- a/src/backend/distributed/operations/shard_rebalancer.c +++ b/src/backend/distributed/operations/shard_rebalancer.c @@ -515,6 +515,16 @@ GetRebalanceSteps(RebalanceOptions *options) /* sort the lists to make the function more deterministic */ List *activeWorkerList = SortedActiveWorkers(); + int shardAllowedNodeCount = 0; + WorkerNode *workerNode = NULL; + foreach_ptr(workerNode, activeWorkerList) + { + if (workerNode->shouldHaveShards) + { + shardAllowedNodeCount++; + } + } + List *activeShardPlacementListList = NIL; List *unbalancedShards = NIL; @@ -532,8 +542,7 @@ GetRebalanceSteps(RebalanceOptions *options) shardPlacementList, options->workerNode); } - if (list_length(activeShardPlacementListForRelation) >= list_length( - activeWorkerList)) + if (list_length(activeShardPlacementListForRelation) >= shardAllowedNodeCount) { activeShardPlacementListList = lappend(activeShardPlacementListList, activeShardPlacementListForRelation); diff --git a/src/test/regress/expected/shard_rebalancer.out b/src/test/regress/expected/shard_rebalancer.out index 1dea3b442..23f1f7373 100644 --- a/src/test/regress/expected/shard_rebalancer.out +++ b/src/test/regress/expected/shard_rebalancer.out @@ -20,13 +20,14 @@ SELECT create_distributed_table('dist_table_test', 'a'); CREATE TABLE postgres_table_test(a int primary key); -- make sure that all rebalance operations works fine when -- reference tables are replicated to the coordinator +SET client_min_messages TO ERROR; SELECT 1 FROM master_add_node('localhost', :master_port, groupId=>0); -NOTICE: localhost:xxxxx is the coordinator and already contains metadata, skipping syncing the metadata ?column? --------------------------------------------------------------------- 1 (1 row) +RESET client_min_messages; -- should just be noops even if we add the coordinator to the pg_dist_node SELECT rebalance_table_shards('dist_table_test'); rebalance_table_shards @@ -2713,6 +2714,113 @@ SELECT sh.logicalrelid, pl.nodeport (5 rows) DROP TABLE single_shard_colocation_1a, single_shard_colocation_1b, single_shard_colocation_1c, single_shard_colocation_2a, single_shard_colocation_2b CASCADE; +-- test the same with coordinator shouldhaveshards = false and shard_count = 2 +-- so that the shard allowed node count would be 2 when rebalancing +-- for such cases, we only count the nodes that are allowed for shard placements +UPDATE pg_dist_node SET shouldhaveshards=false WHERE nodeport = :master_port; +create table two_shard_colocation_1a (a int primary key); +create table two_shard_colocation_1b (a int primary key); +SET citus.shard_replication_factor = 1; +select create_distributed_table('two_shard_colocation_1a','a', colocate_with => 'none', shard_count => 2); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +select create_distributed_table('two_shard_colocation_1b','a',colocate_with=>'two_shard_colocation_1a'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +create table two_shard_colocation_2a (a int primary key); +create table two_shard_colocation_2b (a int primary key); +select create_distributed_table('two_shard_colocation_2a','a', colocate_with => 'none', shard_count => 2); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +select create_distributed_table('two_shard_colocation_2b','a',colocate_with=>'two_shard_colocation_2a'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +-- move shards of colocation group 1 to worker1 +SELECT citus_move_shard_placement(sh.shardid, 'localhost', :worker_2_port, 'localhost', :worker_1_port) + FROM pg_dist_shard sh JOIN pg_dist_shard_placement pl ON sh.shardid = pl.shardid + WHERE sh.logicalrelid = 'two_shard_colocation_1a'::regclass + AND pl.nodeport = :worker_2_port + LIMIT 1; + citus_move_shard_placement +--------------------------------------------------------------------- + +(1 row) + +-- move shards of colocation group 2 to worker2 +SELECT citus_move_shard_placement(sh.shardid, 'localhost', :worker_1_port, 'localhost', :worker_2_port) + FROM pg_dist_shard sh JOIN pg_dist_shard_placement pl ON sh.shardid = pl.shardid + WHERE sh.logicalrelid = 'two_shard_colocation_2a'::regclass + AND pl.nodeport = :worker_1_port + LIMIT 1; + citus_move_shard_placement +--------------------------------------------------------------------- + +(1 row) + +-- current state: +-- coordinator: [] +-- worker 1: [1_1, 1_2] +-- worker 2: [2_1, 2_2] +SELECT sh.logicalrelid, pl.nodeport + FROM pg_dist_shard sh JOIN pg_dist_shard_placement pl ON sh.shardid = pl.shardid + WHERE sh.logicalrelid::text IN ('two_shard_colocation_1a', 'two_shard_colocation_1b', 'two_shard_colocation_2a', 'two_shard_colocation_2b') + ORDER BY sh.logicalrelid, pl.nodeport; + logicalrelid | nodeport +--------------------------------------------------------------------- + two_shard_colocation_1a | 57637 + two_shard_colocation_1a | 57637 + two_shard_colocation_1b | 57637 + two_shard_colocation_1b | 57637 + two_shard_colocation_2a | 57638 + two_shard_colocation_2a | 57638 + two_shard_colocation_2b | 57638 + two_shard_colocation_2b | 57638 +(8 rows) + +-- If we take the coordinator into account, the rebalancer considers this as balanced and does nothing (shard_count < worker_count) +-- but because the coordinator is not allowed for shards, rebalancer will distribute each colocation group to both workers +select rebalance_table_shards(shard_transfer_mode:='block_writes'); +NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ... +NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ... + rebalance_table_shards +--------------------------------------------------------------------- + +(1 row) + +-- final state: +-- coordinator: [] +-- worker 1: [1_1, 2_1] +-- worker 2: [1_2, 2_2] +SELECT sh.logicalrelid, pl.nodeport + FROM pg_dist_shard sh JOIN pg_dist_shard_placement pl ON sh.shardid = pl.shardid + WHERE sh.logicalrelid::text IN ('two_shard_colocation_1a', 'two_shard_colocation_1b', 'two_shard_colocation_2a', 'two_shard_colocation_2b') + ORDER BY sh.logicalrelid, pl.nodeport; + logicalrelid | nodeport +--------------------------------------------------------------------- + two_shard_colocation_1a | 57637 + two_shard_colocation_1a | 57638 + two_shard_colocation_1b | 57637 + two_shard_colocation_1b | 57638 + two_shard_colocation_2a | 57637 + two_shard_colocation_2a | 57638 + two_shard_colocation_2b | 57637 + two_shard_colocation_2b | 57638 +(8 rows) + +-- cleanup +DROP TABLE two_shard_colocation_1a, two_shard_colocation_1b, two_shard_colocation_2a, two_shard_colocation_2b CASCADE; -- verify we detect if one of the tables do not have a replica identity or primary key -- and error out in case of shard transfer mode = auto SELECT 1 FROM citus_remove_node('localhost', :worker_2_port); diff --git a/src/test/regress/sql/shard_rebalancer.sql b/src/test/regress/sql/shard_rebalancer.sql index da4259f5b..f524212bf 100644 --- a/src/test/regress/sql/shard_rebalancer.sql +++ b/src/test/regress/sql/shard_rebalancer.sql @@ -13,7 +13,9 @@ CREATE TABLE postgres_table_test(a int primary key); -- make sure that all rebalance operations works fine when -- reference tables are replicated to the coordinator +SET client_min_messages TO ERROR; SELECT 1 FROM master_add_node('localhost', :master_port, groupId=>0); +RESET client_min_messages; -- should just be noops even if we add the coordinator to the pg_dist_node SELECT rebalance_table_shards('dist_table_test'); @@ -1497,6 +1499,61 @@ SELECT sh.logicalrelid, pl.nodeport DROP TABLE single_shard_colocation_1a, single_shard_colocation_1b, single_shard_colocation_1c, single_shard_colocation_2a, single_shard_colocation_2b CASCADE; +-- test the same with coordinator shouldhaveshards = false and shard_count = 2 +-- so that the shard allowed node count would be 2 when rebalancing +-- for such cases, we only count the nodes that are allowed for shard placements +UPDATE pg_dist_node SET shouldhaveshards=false WHERE nodeport = :master_port; + +create table two_shard_colocation_1a (a int primary key); +create table two_shard_colocation_1b (a int primary key); +SET citus.shard_replication_factor = 1; + +select create_distributed_table('two_shard_colocation_1a','a', colocate_with => 'none', shard_count => 2); +select create_distributed_table('two_shard_colocation_1b','a',colocate_with=>'two_shard_colocation_1a'); + +create table two_shard_colocation_2a (a int primary key); +create table two_shard_colocation_2b (a int primary key); +select create_distributed_table('two_shard_colocation_2a','a', colocate_with => 'none', shard_count => 2); +select create_distributed_table('two_shard_colocation_2b','a',colocate_with=>'two_shard_colocation_2a'); + +-- move shards of colocation group 1 to worker1 +SELECT citus_move_shard_placement(sh.shardid, 'localhost', :worker_2_port, 'localhost', :worker_1_port) + FROM pg_dist_shard sh JOIN pg_dist_shard_placement pl ON sh.shardid = pl.shardid + WHERE sh.logicalrelid = 'two_shard_colocation_1a'::regclass + AND pl.nodeport = :worker_2_port + LIMIT 1; +-- move shards of colocation group 2 to worker2 +SELECT citus_move_shard_placement(sh.shardid, 'localhost', :worker_1_port, 'localhost', :worker_2_port) + FROM pg_dist_shard sh JOIN pg_dist_shard_placement pl ON sh.shardid = pl.shardid + WHERE sh.logicalrelid = 'two_shard_colocation_2a'::regclass + AND pl.nodeport = :worker_1_port + LIMIT 1; + +-- current state: +-- coordinator: [] +-- worker 1: [1_1, 1_2] +-- worker 2: [2_1, 2_2] +SELECT sh.logicalrelid, pl.nodeport + FROM pg_dist_shard sh JOIN pg_dist_shard_placement pl ON sh.shardid = pl.shardid + WHERE sh.logicalrelid::text IN ('two_shard_colocation_1a', 'two_shard_colocation_1b', 'two_shard_colocation_2a', 'two_shard_colocation_2b') + ORDER BY sh.logicalrelid, pl.nodeport; + +-- If we take the coordinator into account, the rebalancer considers this as balanced and does nothing (shard_count < worker_count) +-- but because the coordinator is not allowed for shards, rebalancer will distribute each colocation group to both workers +select rebalance_table_shards(shard_transfer_mode:='block_writes'); + +-- final state: +-- coordinator: [] +-- worker 1: [1_1, 2_1] +-- worker 2: [1_2, 2_2] +SELECT sh.logicalrelid, pl.nodeport + FROM pg_dist_shard sh JOIN pg_dist_shard_placement pl ON sh.shardid = pl.shardid + WHERE sh.logicalrelid::text IN ('two_shard_colocation_1a', 'two_shard_colocation_1b', 'two_shard_colocation_2a', 'two_shard_colocation_2b') + ORDER BY sh.logicalrelid, pl.nodeport; + +-- cleanup +DROP TABLE two_shard_colocation_1a, two_shard_colocation_1b, two_shard_colocation_2a, two_shard_colocation_2b CASCADE; + -- verify we detect if one of the tables do not have a replica identity or primary key -- and error out in case of shard transfer mode = auto SELECT 1 FROM citus_remove_node('localhost', :worker_2_port);