From 77c5c882dec24a4c29ac17f5ab8d7d35e6e2a4f3 Mon Sep 17 00:00:00 2001 From: Onur Tirtir Date: Fri, 6 Oct 2023 11:53:52 +0300 Subject: [PATCH] properly handle the cases where rebalancer is called for specific table --- .../rebalancer_placement_isolation.c | 130 +++++++++++------- .../regress/expected/isolate_placement.out | 84 +++++++++++ src/test/regress/sql/isolate_placement.sql | 35 +++++ 3 files changed, 201 insertions(+), 48 deletions(-) diff --git a/src/backend/distributed/operations/rebalancer_placement_isolation.c b/src/backend/distributed/operations/rebalancer_placement_isolation.c index 7f352c9d3..415e2f074 100644 --- a/src/backend/distributed/operations/rebalancer_placement_isolation.c +++ b/src/backend/distributed/operations/rebalancer_placement_isolation.c @@ -75,11 +75,12 @@ typedef struct * NodeToPlacementGroupHashEntry. */ static void NodeToPlacementGroupHashInit(HTAB *nodePlacementGroupHash, - List *workerNodeList, + List *activeWorkerNodeList, + List *rebalancePlacementList, WorkerNode *drainWorkerNode); static void NodeToPlacementGroupHashAssignNodes(HTAB *nodePlacementGroupHash, - List *workerNodeList, - List *shardPlacementList, + List *activeWorkerNodeList, + List *rebalancePlacementList, FmgrInfo *shardAllowedOnNodeUDF); static bool NodeToPlacementGroupHashAssignNode(HTAB *nodePlacementGroupHash, int32 nodeGroupId, @@ -92,6 +93,7 @@ static NodeToPlacementGroupHashEntry * NodeToPlacementGroupHashGetNodeWithGroupI /* other helpers */ +static List * PlacementListGetUniqueNodeGroupIds(List *placementList); static int WorkerNodeListGetNodeWithGroupId(List *workerNodeList, int32 nodeGroupId); @@ -102,7 +104,7 @@ static int WorkerNodeListGetNodeWithGroupId(List *workerNodeList, int32 nodeGrou */ RebalancerPlacementIsolationContext * PrepareRebalancerPlacementIsolationContext(List *activeWorkerNodeList, - List *activeShardPlacementList, + List *rebalancePlacementList, WorkerNode *drainWorkerNode, FmgrInfo *shardAllowedOnNodeUDF) { @@ -112,14 +114,14 @@ PrepareRebalancerPlacementIsolationContext(List *activeWorkerNodeList, list_length(activeWorkerNodeList)); activeWorkerNodeList = SortList(activeWorkerNodeList, CompareWorkerNodes); - activeShardPlacementList = SortList(activeShardPlacementList, CompareShardPlacements); + rebalancePlacementList = SortList(rebalancePlacementList, CompareShardPlacements); NodeToPlacementGroupHashInit(nodePlacementGroupHash, activeWorkerNodeList, - drainWorkerNode); + rebalancePlacementList, drainWorkerNode); NodeToPlacementGroupHashAssignNodes(nodePlacementGroupHash, activeWorkerNodeList, - activeShardPlacementList, + rebalancePlacementList, shardAllowedOnNodeUDF); RebalancerPlacementIsolationContext *context = @@ -136,13 +138,14 @@ PrepareRebalancerPlacementIsolationContext(List *activeWorkerNodeList, * of worker nodes and the worker node that is being drained, if specified. */ static void -NodeToPlacementGroupHashInit(HTAB *nodePlacementGroupHash, List *workerNodeList, - WorkerNode *drainWorkerNode) +NodeToPlacementGroupHashInit(HTAB *nodePlacementGroupHash, List *activeWorkerNodeList, + List *rebalancePlacementList, WorkerNode *drainWorkerNode) { - bool drainSingleNode = drainWorkerNode != NULL; + List *placementListUniqueNodeGroupIds = + PlacementListGetUniqueNodeGroupIds(rebalancePlacementList); WorkerNode *workerNode = NULL; - foreach_ptr(workerNode, workerNodeList) + foreach_ptr(workerNode, activeWorkerNodeList) { NodeToPlacementGroupHashEntry *nodePlacementGroupHashEntry = hash_search(nodePlacementGroupHash, &workerNode->groupId, HASH_ENTER, @@ -162,39 +165,49 @@ NodeToPlacementGroupHashInit(HTAB *nodePlacementGroupHash, List *workerNodeList, nodePlacementGroupHashEntry->assignedPlacementGroup = NULL; /* - * For the rest of the comment, assume that: - * Node D: the node we're draining - * Node I: a node that is not D and that has a shard placement group - * that needs a separate node - * Node R: a node that is not D and that has some regular shard - * placements + * Lets call set of the nodes that placements in rebalancePlacementList + * are stored on as D and the others as S. In other words, D is the set + * of the nodes that we're allowed to move the placements "from" or + * "to (*)" (* = if we're not draining it) and S is the set of the nodes + * that we're only allowed to move the placements "to" but not "from". * - * If we're draining a single node, then we don't know whether other - * nodes have any regular shard placements or any that need a separate - * node because in that case GetRebalanceSteps() would provide a list of - * shard placements that are stored on D, not a list that contains all - * the placements accross the cluster (because we want to limit node - * draining to that node in that case). Note that when all shard - * placements in the cluster are provided, NodeToPlacementGroupHashAssignNodes() - * would already be aware of which node is used to separate which shard - * placement group or which node is used to store some regular shard - * placements. That is why we skip below code if we're not draining a - * single node. It's not only inefficient to run below code when we're - * not draining a single node, but also it's not correct because otherwise - * rebalancer might decide to move some shard placements between any - * nodes in the cluster and it would be incorrect to assume that current - * placement distribution would be the same after the rebalancer plans the - * moves. + * This means that, for a node of type S, the fact that whether the node + * is used to separate a placement group or not cannot be changed in the + * runtime. * - * Below we find out the assigned placement groups for nodes of type - * I because we want to avoid from moving the placements (if any) from - * node D to node I. We also set allowedToSeparateAnyPlacementGroup to - * false for the nodes that already have some shard placements because - * we want to avoid from moving the placements that need a separate node - * (if any) from node D to node R. + * For this reason, below we find out the assigned placement groups for + * nodes of type S because we want to avoid from moving the placements + * (if any) from a node of type D to S. We also set + * allowedToSeparateAnyPlacementGroup to false for the nodes that already + * have some shard placements within S because we want to avoid from moving + * the placements that need a separate node (if any) from node D to node S. + * + * We skip below code for nodes of type D not because optimization purposes + * but because it would be "incorrect" to assume that "current placement + * distribution for a node of type D would be the same" after the rebalancer + * plans the moves. */ - if (!(shouldHaveShards && drainSingleNode)) + + if (!shouldHaveShards) { + /* we can't assing any shard placement groups to the node anyway */ + continue; + } + + if (list_length(placementListUniqueNodeGroupIds) == list_length( + activeWorkerNodeList)) + { + /* + * list_member_oid() check would return true for all placements then. + * This means that all the nodes are of type D. + */ + Assert(list_member_oid(placementListUniqueNodeGroupIds, workerNode->groupId)); + continue; + } + + if (list_member_oid(placementListUniqueNodeGroupIds, workerNode->groupId)) + { + /* node is of type D */ continue; } @@ -221,19 +234,19 @@ NodeToPlacementGroupHashInit(HTAB *nodePlacementGroupHash, List *workerNodeList, */ static void NodeToPlacementGroupHashAssignNodes(HTAB *nodePlacementGroupHash, - List *workerNodeList, - List *shardPlacementList, + List *activeWorkerNodeList, + List *rebalancePlacementList, FmgrInfo *shardAllowedOnNodeUDF) { - List *availableWorkerList = list_copy(workerNodeList); - List *unassignedShardPlacementList = NIL; + List *availableWorkerList = list_copy(activeWorkerNodeList); + List *unassignedPlacementList = NIL; /* * Assign as much as possible shard placement groups to worker nodes where * they are stored already. */ ShardPlacement *shardPlacement = NULL; - foreach_ptr(shardPlacement, shardPlacementList) + foreach_ptr(shardPlacement, rebalancePlacementList) { ShardInterval *shardInterval = LoadShardInterval(shardPlacement->shardId); if (!shardInterval->needsSeparateNode) @@ -260,8 +273,8 @@ NodeToPlacementGroupHashAssignNodes(HTAB *nodePlacementGroupHash, } else { - unassignedShardPlacementList = - lappend(unassignedShardPlacementList, shardPlacement); + unassignedPlacementList = + lappend(unassignedPlacementList, shardPlacement); } } @@ -271,7 +284,7 @@ NodeToPlacementGroupHashAssignNodes(HTAB *nodePlacementGroupHash, */ int availableNodeIdx = 0; ShardPlacement *unassignedShardPlacement = NULL; - foreach_ptr(unassignedShardPlacement, unassignedShardPlacementList) + foreach_ptr(unassignedShardPlacement, unassignedPlacementList) { bool separated = false; while (!separated && availableNodeIdx < list_length(availableWorkerList)) @@ -419,6 +432,27 @@ NodeToPlacementGroupHashGetNodeWithGroupId(HTAB *nodePlacementGroupHash, } +/* + * PlacementListGetUniqueNodeGroupIds returns a list of unique node group ids + * that are used by given list of shard placements. + */ +static List * +PlacementListGetUniqueNodeGroupIds(List *placementList) +{ + List *placementListUniqueNodeGroupIds = NIL; + + ShardPlacement *shardPlacement = NULL; + foreach_ptr(shardPlacement, placementList) + { + placementListUniqueNodeGroupIds = + list_append_unique_oid(placementListUniqueNodeGroupIds, + shardPlacement->groupId); + } + + return placementListUniqueNodeGroupIds; +} + + /* * WorkerNodeListGetNodeWithGroupId returns the index of worker node with given id * in given worker node list. diff --git a/src/test/regress/expected/isolate_placement.out b/src/test/regress/expected/isolate_placement.out index 3044098db..2a7e75d5d 100644 --- a/src/test/regress/expected/isolate_placement.out +++ b/src/test/regress/expected/isolate_placement.out @@ -1919,6 +1919,90 @@ SELECT public.verify_placements_in_shard_group_isolated('isolate_placement.singl DROP TABLE single_shard_1_shardid_nodeid, single_shard_3_shardid_nodeid; DELETE FROM pg_catalog.pg_dist_rebalance_strategy WHERE name='test_isolate_placement'; DROP TABLE single_shard_1, single_shard_3; +SELECT citus_set_node_property('localhost', :master_port, 'shouldhaveshards', true); + citus_set_node_property +--------------------------------------------------------------------- + +(1 row) + +CREATE TABLE dist_1(a int); +SELECT create_distributed_table('dist_1', 'a', shard_count=>4); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT shardids[1] AS shardgroup_1_shardid +FROM public.get_enumerated_shard_groups('isolate_placement.dist_1') +WHERE shardgroupindex = 1 \gset +SELECT citus_shard_property_set(:shardgroup_1_shardid, anti_affinity=>true); + citus_shard_property_set +--------------------------------------------------------------------- + +(1 row) + +CREATE TABLE single_shard_1(a int); +SELECT create_distributed_table('single_shard_1', null, colocate_with=>'none'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT citus_shard_property_set(shardid, anti_affinity=>true) FROM pg_dist_shard WHERE logicalrelid IN ('single_shard_1'::regclass); + citus_shard_property_set +--------------------------------------------------------------------- + +(1 row) + +SET client_min_messages TO WARNING; +SELECT rebalance_table_shards(shard_transfer_mode=>'block_writes'); + rebalance_table_shards +--------------------------------------------------------------------- + +(1 row) + +SET client_min_messages TO NOTICE; +SELECT public.verify_placements_in_shard_group_isolated('isolate_placement.single_shard_1', 1) = true; + ?column? +--------------------------------------------------------------------- + t +(1 row) + +SELECT public.verify_placements_in_shard_group_isolated('isolate_placement.dist_1', 1) = true; + ?column? +--------------------------------------------------------------------- + t +(1 row) + +SET client_min_messages TO WARNING; +SELECT rebalance_table_shards('isolate_placement.dist_1', shard_transfer_mode=>'block_writes'); + rebalance_table_shards +--------------------------------------------------------------------- + +(1 row) + +SET client_min_messages TO NOTICE; +-- Make sure that calling the rebalancer specifically for dist_1 doesn't +-- break the placement separation rules. +SELECT public.verify_placements_in_shard_group_isolated('isolate_placement.single_shard_1', 1) = true; + ?column? +--------------------------------------------------------------------- + t +(1 row) + +SELECT public.verify_placements_in_shard_group_isolated('isolate_placement.dist_1', 1) = true; + ?column? +--------------------------------------------------------------------- + t +(1 row) + +DROP TABLE dist_1, single_shard_1; +SELECT citus_set_node_property('localhost', :master_port, 'shouldhaveshards', false); + citus_set_node_property +--------------------------------------------------------------------- + +(1 row) + SET client_min_messages TO WARNING; DROP SCHEMA isolate_placement CASCADE; DROP FUNCTION public.verify_placements_in_shard_group_isolated(text, bigint); diff --git a/src/test/regress/sql/isolate_placement.sql b/src/test/regress/sql/isolate_placement.sql index 928c22e1a..eb3bd5961 100644 --- a/src/test/regress/sql/isolate_placement.sql +++ b/src/test/regress/sql/isolate_placement.sql @@ -1104,6 +1104,41 @@ DELETE FROM pg_catalog.pg_dist_rebalance_strategy WHERE name='test_isolate_place DROP TABLE single_shard_1, single_shard_3; +SELECT citus_set_node_property('localhost', :master_port, 'shouldhaveshards', true); + +CREATE TABLE dist_1(a int); +SELECT create_distributed_table('dist_1', 'a', shard_count=>4); + +SELECT shardids[1] AS shardgroup_1_shardid +FROM public.get_enumerated_shard_groups('isolate_placement.dist_1') +WHERE shardgroupindex = 1 \gset + +SELECT citus_shard_property_set(:shardgroup_1_shardid, anti_affinity=>true); + +CREATE TABLE single_shard_1(a int); +SELECT create_distributed_table('single_shard_1', null, colocate_with=>'none'); + +SELECT citus_shard_property_set(shardid, anti_affinity=>true) FROM pg_dist_shard WHERE logicalrelid IN ('single_shard_1'::regclass); + +SET client_min_messages TO WARNING; +SELECT rebalance_table_shards(shard_transfer_mode=>'block_writes'); +SET client_min_messages TO NOTICE; + +SELECT public.verify_placements_in_shard_group_isolated('isolate_placement.single_shard_1', 1) = true; +SELECT public.verify_placements_in_shard_group_isolated('isolate_placement.dist_1', 1) = true; + +SET client_min_messages TO WARNING; +SELECT rebalance_table_shards('isolate_placement.dist_1', shard_transfer_mode=>'block_writes'); +SET client_min_messages TO NOTICE; + +-- Make sure that calling the rebalancer specifically for dist_1 doesn't +-- break the placement separation rules. +SELECT public.verify_placements_in_shard_group_isolated('isolate_placement.single_shard_1', 1) = true; +SELECT public.verify_placements_in_shard_group_isolated('isolate_placement.dist_1', 1) = true; + +DROP TABLE dist_1, single_shard_1; +SELECT citus_set_node_property('localhost', :master_port, 'shouldhaveshards', false); + SET client_min_messages TO WARNING; DROP SCHEMA isolate_placement CASCADE; DROP FUNCTION public.verify_placements_in_shard_group_isolated(text, bigint);