From 59ccf364dfd820c5645c112bcb5828bd7df1e541 Mon Sep 17 00:00:00 2001 From: Ahmet Gedemenli Date: Mon, 1 May 2023 13:21:08 +0300 Subject: [PATCH] Ignore nodes not allowed for shards, when planning rebalance steps (#6887) We are handling colocation groups with shard group count less than the worker node count, using a method different than the usual rebalancer. See #6739 While making the decision of using this method or not, we should've ignored the nodes that are marked `shouldhaveshards = false`. This PR excludes those nodes when making the decision. Adds a test such that: coordinator: [] worker 1: [1_1, 1_2] worker 2: [2_1, 2_2] (rebalance) coordinator: [] worker 1: [1_1, 2_1] worker 2: [1_2, 2_2] If we take the coordinator into account, the rebalancer considers the first state as balanced and does nothing (because shard_count < worker_count) But with this pr, we ignore the coordinator because it's shouldhaveshards = false So the rebalancer distributes each colocation group to both workers Also, fixes an unrelated flaky test in the same file --- .../distributed/operations/shard_rebalancer.c | 13 ++- .../regress/expected/shard_rebalancer.out | 110 +++++++++++++++++- src/test/regress/sql/shard_rebalancer.sql | 57 +++++++++ 3 files changed, 177 insertions(+), 3 deletions(-) diff --git a/src/backend/distributed/operations/shard_rebalancer.c b/src/backend/distributed/operations/shard_rebalancer.c index b5ec9b7ba..082e0a8b5 100644 --- a/src/backend/distributed/operations/shard_rebalancer.c +++ b/src/backend/distributed/operations/shard_rebalancer.c @@ -515,6 +515,16 @@ GetRebalanceSteps(RebalanceOptions *options) /* sort the lists to make the function more deterministic */ List *activeWorkerList = SortedActiveWorkers(); + int shardAllowedNodeCount = 0; + WorkerNode *workerNode = NULL; + foreach_ptr(workerNode, activeWorkerList) + { + if (workerNode->shouldHaveShards) + { + shardAllowedNodeCount++; + } + } + List *activeShardPlacementListList = NIL; List *unbalancedShards = NIL; @@ -532,8 +542,7 @@ GetRebalanceSteps(RebalanceOptions *options) shardPlacementList, options->workerNode); } - if (list_length(activeShardPlacementListForRelation) >= list_length( - activeWorkerList)) + if (list_length(activeShardPlacementListForRelation) >= shardAllowedNodeCount) { activeShardPlacementListList = lappend(activeShardPlacementListList, activeShardPlacementListForRelation); diff --git a/src/test/regress/expected/shard_rebalancer.out b/src/test/regress/expected/shard_rebalancer.out index 1dea3b442..23f1f7373 100644 --- a/src/test/regress/expected/shard_rebalancer.out +++ b/src/test/regress/expected/shard_rebalancer.out @@ -20,13 +20,14 @@ SELECT create_distributed_table('dist_table_test', 'a'); CREATE TABLE postgres_table_test(a int primary key); -- make sure that all rebalance operations works fine when -- reference tables are replicated to the coordinator +SET client_min_messages TO ERROR; SELECT 1 FROM master_add_node('localhost', :master_port, groupId=>0); -NOTICE: localhost:xxxxx is the coordinator and already contains metadata, skipping syncing the metadata ?column? --------------------------------------------------------------------- 1 (1 row) +RESET client_min_messages; -- should just be noops even if we add the coordinator to the pg_dist_node SELECT rebalance_table_shards('dist_table_test'); rebalance_table_shards @@ -2713,6 +2714,113 @@ SELECT sh.logicalrelid, pl.nodeport (5 rows) DROP TABLE single_shard_colocation_1a, single_shard_colocation_1b, single_shard_colocation_1c, single_shard_colocation_2a, single_shard_colocation_2b CASCADE; +-- test the same with coordinator shouldhaveshards = false and shard_count = 2 +-- so that the shard allowed node count would be 2 when rebalancing +-- for such cases, we only count the nodes that are allowed for shard placements +UPDATE pg_dist_node SET shouldhaveshards=false WHERE nodeport = :master_port; +create table two_shard_colocation_1a (a int primary key); +create table two_shard_colocation_1b (a int primary key); +SET citus.shard_replication_factor = 1; +select create_distributed_table('two_shard_colocation_1a','a', colocate_with => 'none', shard_count => 2); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +select create_distributed_table('two_shard_colocation_1b','a',colocate_with=>'two_shard_colocation_1a'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +create table two_shard_colocation_2a (a int primary key); +create table two_shard_colocation_2b (a int primary key); +select create_distributed_table('two_shard_colocation_2a','a', colocate_with => 'none', shard_count => 2); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +select create_distributed_table('two_shard_colocation_2b','a',colocate_with=>'two_shard_colocation_2a'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +-- move shards of colocation group 1 to worker1 +SELECT citus_move_shard_placement(sh.shardid, 'localhost', :worker_2_port, 'localhost', :worker_1_port) + FROM pg_dist_shard sh JOIN pg_dist_shard_placement pl ON sh.shardid = pl.shardid + WHERE sh.logicalrelid = 'two_shard_colocation_1a'::regclass + AND pl.nodeport = :worker_2_port + LIMIT 1; + citus_move_shard_placement +--------------------------------------------------------------------- + +(1 row) + +-- move shards of colocation group 2 to worker2 +SELECT citus_move_shard_placement(sh.shardid, 'localhost', :worker_1_port, 'localhost', :worker_2_port) + FROM pg_dist_shard sh JOIN pg_dist_shard_placement pl ON sh.shardid = pl.shardid + WHERE sh.logicalrelid = 'two_shard_colocation_2a'::regclass + AND pl.nodeport = :worker_1_port + LIMIT 1; + citus_move_shard_placement +--------------------------------------------------------------------- + +(1 row) + +-- current state: +-- coordinator: [] +-- worker 1: [1_1, 1_2] +-- worker 2: [2_1, 2_2] +SELECT sh.logicalrelid, pl.nodeport + FROM pg_dist_shard sh JOIN pg_dist_shard_placement pl ON sh.shardid = pl.shardid + WHERE sh.logicalrelid::text IN ('two_shard_colocation_1a', 'two_shard_colocation_1b', 'two_shard_colocation_2a', 'two_shard_colocation_2b') + ORDER BY sh.logicalrelid, pl.nodeport; + logicalrelid | nodeport +--------------------------------------------------------------------- + two_shard_colocation_1a | 57637 + two_shard_colocation_1a | 57637 + two_shard_colocation_1b | 57637 + two_shard_colocation_1b | 57637 + two_shard_colocation_2a | 57638 + two_shard_colocation_2a | 57638 + two_shard_colocation_2b | 57638 + two_shard_colocation_2b | 57638 +(8 rows) + +-- If we take the coordinator into account, the rebalancer considers this as balanced and does nothing (shard_count < worker_count) +-- but because the coordinator is not allowed for shards, rebalancer will distribute each colocation group to both workers +select rebalance_table_shards(shard_transfer_mode:='block_writes'); +NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ... +NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ... + rebalance_table_shards +--------------------------------------------------------------------- + +(1 row) + +-- final state: +-- coordinator: [] +-- worker 1: [1_1, 2_1] +-- worker 2: [1_2, 2_2] +SELECT sh.logicalrelid, pl.nodeport + FROM pg_dist_shard sh JOIN pg_dist_shard_placement pl ON sh.shardid = pl.shardid + WHERE sh.logicalrelid::text IN ('two_shard_colocation_1a', 'two_shard_colocation_1b', 'two_shard_colocation_2a', 'two_shard_colocation_2b') + ORDER BY sh.logicalrelid, pl.nodeport; + logicalrelid | nodeport +--------------------------------------------------------------------- + two_shard_colocation_1a | 57637 + two_shard_colocation_1a | 57638 + two_shard_colocation_1b | 57637 + two_shard_colocation_1b | 57638 + two_shard_colocation_2a | 57637 + two_shard_colocation_2a | 57638 + two_shard_colocation_2b | 57637 + two_shard_colocation_2b | 57638 +(8 rows) + +-- cleanup +DROP TABLE two_shard_colocation_1a, two_shard_colocation_1b, two_shard_colocation_2a, two_shard_colocation_2b CASCADE; -- verify we detect if one of the tables do not have a replica identity or primary key -- and error out in case of shard transfer mode = auto SELECT 1 FROM citus_remove_node('localhost', :worker_2_port); diff --git a/src/test/regress/sql/shard_rebalancer.sql b/src/test/regress/sql/shard_rebalancer.sql index da4259f5b..f524212bf 100644 --- a/src/test/regress/sql/shard_rebalancer.sql +++ b/src/test/regress/sql/shard_rebalancer.sql @@ -13,7 +13,9 @@ CREATE TABLE postgres_table_test(a int primary key); -- make sure that all rebalance operations works fine when -- reference tables are replicated to the coordinator +SET client_min_messages TO ERROR; SELECT 1 FROM master_add_node('localhost', :master_port, groupId=>0); +RESET client_min_messages; -- should just be noops even if we add the coordinator to the pg_dist_node SELECT rebalance_table_shards('dist_table_test'); @@ -1497,6 +1499,61 @@ SELECT sh.logicalrelid, pl.nodeport DROP TABLE single_shard_colocation_1a, single_shard_colocation_1b, single_shard_colocation_1c, single_shard_colocation_2a, single_shard_colocation_2b CASCADE; +-- test the same with coordinator shouldhaveshards = false and shard_count = 2 +-- so that the shard allowed node count would be 2 when rebalancing +-- for such cases, we only count the nodes that are allowed for shard placements +UPDATE pg_dist_node SET shouldhaveshards=false WHERE nodeport = :master_port; + +create table two_shard_colocation_1a (a int primary key); +create table two_shard_colocation_1b (a int primary key); +SET citus.shard_replication_factor = 1; + +select create_distributed_table('two_shard_colocation_1a','a', colocate_with => 'none', shard_count => 2); +select create_distributed_table('two_shard_colocation_1b','a',colocate_with=>'two_shard_colocation_1a'); + +create table two_shard_colocation_2a (a int primary key); +create table two_shard_colocation_2b (a int primary key); +select create_distributed_table('two_shard_colocation_2a','a', colocate_with => 'none', shard_count => 2); +select create_distributed_table('two_shard_colocation_2b','a',colocate_with=>'two_shard_colocation_2a'); + +-- move shards of colocation group 1 to worker1 +SELECT citus_move_shard_placement(sh.shardid, 'localhost', :worker_2_port, 'localhost', :worker_1_port) + FROM pg_dist_shard sh JOIN pg_dist_shard_placement pl ON sh.shardid = pl.shardid + WHERE sh.logicalrelid = 'two_shard_colocation_1a'::regclass + AND pl.nodeport = :worker_2_port + LIMIT 1; +-- move shards of colocation group 2 to worker2 +SELECT citus_move_shard_placement(sh.shardid, 'localhost', :worker_1_port, 'localhost', :worker_2_port) + FROM pg_dist_shard sh JOIN pg_dist_shard_placement pl ON sh.shardid = pl.shardid + WHERE sh.logicalrelid = 'two_shard_colocation_2a'::regclass + AND pl.nodeport = :worker_1_port + LIMIT 1; + +-- current state: +-- coordinator: [] +-- worker 1: [1_1, 1_2] +-- worker 2: [2_1, 2_2] +SELECT sh.logicalrelid, pl.nodeport + FROM pg_dist_shard sh JOIN pg_dist_shard_placement pl ON sh.shardid = pl.shardid + WHERE sh.logicalrelid::text IN ('two_shard_colocation_1a', 'two_shard_colocation_1b', 'two_shard_colocation_2a', 'two_shard_colocation_2b') + ORDER BY sh.logicalrelid, pl.nodeport; + +-- If we take the coordinator into account, the rebalancer considers this as balanced and does nothing (shard_count < worker_count) +-- but because the coordinator is not allowed for shards, rebalancer will distribute each colocation group to both workers +select rebalance_table_shards(shard_transfer_mode:='block_writes'); + +-- final state: +-- coordinator: [] +-- worker 1: [1_1, 2_1] +-- worker 2: [1_2, 2_2] +SELECT sh.logicalrelid, pl.nodeport + FROM pg_dist_shard sh JOIN pg_dist_shard_placement pl ON sh.shardid = pl.shardid + WHERE sh.logicalrelid::text IN ('two_shard_colocation_1a', 'two_shard_colocation_1b', 'two_shard_colocation_2a', 'two_shard_colocation_2b') + ORDER BY sh.logicalrelid, pl.nodeport; + +-- cleanup +DROP TABLE two_shard_colocation_1a, two_shard_colocation_1b, two_shard_colocation_2a, two_shard_colocation_2b CASCADE; + -- verify we detect if one of the tables do not have a replica identity or primary key -- and error out in case of shard transfer mode = auto SELECT 1 FROM citus_remove_node('localhost', :worker_2_port);