properly handle the cases where rebalancer is called for specific table

tenant-schema-isolation-complete-view
Onur Tirtir 2023-10-06 11:53:52 +03:00
parent 6e9fc45b97
commit 77c5c882de
3 changed files with 201 additions and 48 deletions

View File

@ -75,11 +75,12 @@ typedef struct
* NodeToPlacementGroupHashEntry. * NodeToPlacementGroupHashEntry.
*/ */
static void NodeToPlacementGroupHashInit(HTAB *nodePlacementGroupHash, static void NodeToPlacementGroupHashInit(HTAB *nodePlacementGroupHash,
List *workerNodeList, List *activeWorkerNodeList,
List *rebalancePlacementList,
WorkerNode *drainWorkerNode); WorkerNode *drainWorkerNode);
static void NodeToPlacementGroupHashAssignNodes(HTAB *nodePlacementGroupHash, static void NodeToPlacementGroupHashAssignNodes(HTAB *nodePlacementGroupHash,
List *workerNodeList, List *activeWorkerNodeList,
List *shardPlacementList, List *rebalancePlacementList,
FmgrInfo *shardAllowedOnNodeUDF); FmgrInfo *shardAllowedOnNodeUDF);
static bool NodeToPlacementGroupHashAssignNode(HTAB *nodePlacementGroupHash, static bool NodeToPlacementGroupHashAssignNode(HTAB *nodePlacementGroupHash,
int32 nodeGroupId, int32 nodeGroupId,
@ -92,6 +93,7 @@ static NodeToPlacementGroupHashEntry * NodeToPlacementGroupHashGetNodeWithGroupI
/* other helpers */ /* other helpers */
static List * PlacementListGetUniqueNodeGroupIds(List *placementList);
static int WorkerNodeListGetNodeWithGroupId(List *workerNodeList, int32 nodeGroupId); static int WorkerNodeListGetNodeWithGroupId(List *workerNodeList, int32 nodeGroupId);
@ -102,7 +104,7 @@ static int WorkerNodeListGetNodeWithGroupId(List *workerNodeList, int32 nodeGrou
*/ */
RebalancerPlacementIsolationContext * RebalancerPlacementIsolationContext *
PrepareRebalancerPlacementIsolationContext(List *activeWorkerNodeList, PrepareRebalancerPlacementIsolationContext(List *activeWorkerNodeList,
List *activeShardPlacementList, List *rebalancePlacementList,
WorkerNode *drainWorkerNode, WorkerNode *drainWorkerNode,
FmgrInfo *shardAllowedOnNodeUDF) FmgrInfo *shardAllowedOnNodeUDF)
{ {
@ -112,14 +114,14 @@ PrepareRebalancerPlacementIsolationContext(List *activeWorkerNodeList,
list_length(activeWorkerNodeList)); list_length(activeWorkerNodeList));
activeWorkerNodeList = SortList(activeWorkerNodeList, CompareWorkerNodes); activeWorkerNodeList = SortList(activeWorkerNodeList, CompareWorkerNodes);
activeShardPlacementList = SortList(activeShardPlacementList, CompareShardPlacements); rebalancePlacementList = SortList(rebalancePlacementList, CompareShardPlacements);
NodeToPlacementGroupHashInit(nodePlacementGroupHash, activeWorkerNodeList, NodeToPlacementGroupHashInit(nodePlacementGroupHash, activeWorkerNodeList,
drainWorkerNode); rebalancePlacementList, drainWorkerNode);
NodeToPlacementGroupHashAssignNodes(nodePlacementGroupHash, NodeToPlacementGroupHashAssignNodes(nodePlacementGroupHash,
activeWorkerNodeList, activeWorkerNodeList,
activeShardPlacementList, rebalancePlacementList,
shardAllowedOnNodeUDF); shardAllowedOnNodeUDF);
RebalancerPlacementIsolationContext *context = RebalancerPlacementIsolationContext *context =
@ -136,13 +138,14 @@ PrepareRebalancerPlacementIsolationContext(List *activeWorkerNodeList,
* of worker nodes and the worker node that is being drained, if specified. * of worker nodes and the worker node that is being drained, if specified.
*/ */
static void static void
NodeToPlacementGroupHashInit(HTAB *nodePlacementGroupHash, List *workerNodeList, NodeToPlacementGroupHashInit(HTAB *nodePlacementGroupHash, List *activeWorkerNodeList,
WorkerNode *drainWorkerNode) List *rebalancePlacementList, WorkerNode *drainWorkerNode)
{ {
bool drainSingleNode = drainWorkerNode != NULL; List *placementListUniqueNodeGroupIds =
PlacementListGetUniqueNodeGroupIds(rebalancePlacementList);
WorkerNode *workerNode = NULL; WorkerNode *workerNode = NULL;
foreach_ptr(workerNode, workerNodeList) foreach_ptr(workerNode, activeWorkerNodeList)
{ {
NodeToPlacementGroupHashEntry *nodePlacementGroupHashEntry = NodeToPlacementGroupHashEntry *nodePlacementGroupHashEntry =
hash_search(nodePlacementGroupHash, &workerNode->groupId, HASH_ENTER, hash_search(nodePlacementGroupHash, &workerNode->groupId, HASH_ENTER,
@ -162,39 +165,49 @@ NodeToPlacementGroupHashInit(HTAB *nodePlacementGroupHash, List *workerNodeList,
nodePlacementGroupHashEntry->assignedPlacementGroup = NULL; nodePlacementGroupHashEntry->assignedPlacementGroup = NULL;
/* /*
* For the rest of the comment, assume that: * Lets call set of the nodes that placements in rebalancePlacementList
* Node D: the node we're draining * are stored on as D and the others as S. In other words, D is the set
* Node I: a node that is not D and that has a shard placement group * of the nodes that we're allowed to move the placements "from" or
* that needs a separate node * "to (*)" (* = if we're not draining it) and S is the set of the nodes
* Node R: a node that is not D and that has some regular shard * that we're only allowed to move the placements "to" but not "from".
* placements
* *
* If we're draining a single node, then we don't know whether other * This means that, for a node of type S, the fact that whether the node
* nodes have any regular shard placements or any that need a separate * is used to separate a placement group or not cannot be changed in the
* node because in that case GetRebalanceSteps() would provide a list of * runtime.
* shard placements that are stored on D, not a list that contains all
* the placements accross the cluster (because we want to limit node
* draining to that node in that case). Note that when all shard
* placements in the cluster are provided, NodeToPlacementGroupHashAssignNodes()
* would already be aware of which node is used to separate which shard
* placement group or which node is used to store some regular shard
* placements. That is why we skip below code if we're not draining a
* single node. It's not only inefficient to run below code when we're
* not draining a single node, but also it's not correct because otherwise
* rebalancer might decide to move some shard placements between any
* nodes in the cluster and it would be incorrect to assume that current
* placement distribution would be the same after the rebalancer plans the
* moves.
* *
* Below we find out the assigned placement groups for nodes of type * For this reason, below we find out the assigned placement groups for
* I because we want to avoid from moving the placements (if any) from * nodes of type S because we want to avoid from moving the placements
* node D to node I. We also set allowedToSeparateAnyPlacementGroup to * (if any) from a node of type D to S. We also set
* false for the nodes that already have some shard placements because * allowedToSeparateAnyPlacementGroup to false for the nodes that already
* we want to avoid from moving the placements that need a separate node * have some shard placements within S because we want to avoid from moving
* (if any) from node D to node R. * the placements that need a separate node (if any) from node D to node S.
*
* We skip below code for nodes of type D not because optimization purposes
* but because it would be "incorrect" to assume that "current placement
* distribution for a node of type D would be the same" after the rebalancer
* plans the moves.
*/ */
if (!(shouldHaveShards && drainSingleNode))
if (!shouldHaveShards)
{ {
/* we can't assing any shard placement groups to the node anyway */
continue;
}
if (list_length(placementListUniqueNodeGroupIds) == list_length(
activeWorkerNodeList))
{
/*
* list_member_oid() check would return true for all placements then.
* This means that all the nodes are of type D.
*/
Assert(list_member_oid(placementListUniqueNodeGroupIds, workerNode->groupId));
continue;
}
if (list_member_oid(placementListUniqueNodeGroupIds, workerNode->groupId))
{
/* node is of type D */
continue; continue;
} }
@ -221,19 +234,19 @@ NodeToPlacementGroupHashInit(HTAB *nodePlacementGroupHash, List *workerNodeList,
*/ */
static void static void
NodeToPlacementGroupHashAssignNodes(HTAB *nodePlacementGroupHash, NodeToPlacementGroupHashAssignNodes(HTAB *nodePlacementGroupHash,
List *workerNodeList, List *activeWorkerNodeList,
List *shardPlacementList, List *rebalancePlacementList,
FmgrInfo *shardAllowedOnNodeUDF) FmgrInfo *shardAllowedOnNodeUDF)
{ {
List *availableWorkerList = list_copy(workerNodeList); List *availableWorkerList = list_copy(activeWorkerNodeList);
List *unassignedShardPlacementList = NIL; List *unassignedPlacementList = NIL;
/* /*
* Assign as much as possible shard placement groups to worker nodes where * Assign as much as possible shard placement groups to worker nodes where
* they are stored already. * they are stored already.
*/ */
ShardPlacement *shardPlacement = NULL; ShardPlacement *shardPlacement = NULL;
foreach_ptr(shardPlacement, shardPlacementList) foreach_ptr(shardPlacement, rebalancePlacementList)
{ {
ShardInterval *shardInterval = LoadShardInterval(shardPlacement->shardId); ShardInterval *shardInterval = LoadShardInterval(shardPlacement->shardId);
if (!shardInterval->needsSeparateNode) if (!shardInterval->needsSeparateNode)
@ -260,8 +273,8 @@ NodeToPlacementGroupHashAssignNodes(HTAB *nodePlacementGroupHash,
} }
else else
{ {
unassignedShardPlacementList = unassignedPlacementList =
lappend(unassignedShardPlacementList, shardPlacement); lappend(unassignedPlacementList, shardPlacement);
} }
} }
@ -271,7 +284,7 @@ NodeToPlacementGroupHashAssignNodes(HTAB *nodePlacementGroupHash,
*/ */
int availableNodeIdx = 0; int availableNodeIdx = 0;
ShardPlacement *unassignedShardPlacement = NULL; ShardPlacement *unassignedShardPlacement = NULL;
foreach_ptr(unassignedShardPlacement, unassignedShardPlacementList) foreach_ptr(unassignedShardPlacement, unassignedPlacementList)
{ {
bool separated = false; bool separated = false;
while (!separated && availableNodeIdx < list_length(availableWorkerList)) while (!separated && availableNodeIdx < list_length(availableWorkerList))
@ -419,6 +432,27 @@ NodeToPlacementGroupHashGetNodeWithGroupId(HTAB *nodePlacementGroupHash,
} }
/*
* PlacementListGetUniqueNodeGroupIds returns a list of unique node group ids
* that are used by given list of shard placements.
*/
static List *
PlacementListGetUniqueNodeGroupIds(List *placementList)
{
List *placementListUniqueNodeGroupIds = NIL;
ShardPlacement *shardPlacement = NULL;
foreach_ptr(shardPlacement, placementList)
{
placementListUniqueNodeGroupIds =
list_append_unique_oid(placementListUniqueNodeGroupIds,
shardPlacement->groupId);
}
return placementListUniqueNodeGroupIds;
}
/* /*
* WorkerNodeListGetNodeWithGroupId returns the index of worker node with given id * WorkerNodeListGetNodeWithGroupId returns the index of worker node with given id
* in given worker node list. * in given worker node list.

View File

@ -1919,6 +1919,90 @@ SELECT public.verify_placements_in_shard_group_isolated('isolate_placement.singl
DROP TABLE single_shard_1_shardid_nodeid, single_shard_3_shardid_nodeid; DROP TABLE single_shard_1_shardid_nodeid, single_shard_3_shardid_nodeid;
DELETE FROM pg_catalog.pg_dist_rebalance_strategy WHERE name='test_isolate_placement'; DELETE FROM pg_catalog.pg_dist_rebalance_strategy WHERE name='test_isolate_placement';
DROP TABLE single_shard_1, single_shard_3; DROP TABLE single_shard_1, single_shard_3;
SELECT citus_set_node_property('localhost', :master_port, 'shouldhaveshards', true);
citus_set_node_property
---------------------------------------------------------------------
(1 row)
CREATE TABLE dist_1(a int);
SELECT create_distributed_table('dist_1', 'a', shard_count=>4);
create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT shardids[1] AS shardgroup_1_shardid
FROM public.get_enumerated_shard_groups('isolate_placement.dist_1')
WHERE shardgroupindex = 1 \gset
SELECT citus_shard_property_set(:shardgroup_1_shardid, anti_affinity=>true);
citus_shard_property_set
---------------------------------------------------------------------
(1 row)
CREATE TABLE single_shard_1(a int);
SELECT create_distributed_table('single_shard_1', null, colocate_with=>'none');
create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT citus_shard_property_set(shardid, anti_affinity=>true) FROM pg_dist_shard WHERE logicalrelid IN ('single_shard_1'::regclass);
citus_shard_property_set
---------------------------------------------------------------------
(1 row)
SET client_min_messages TO WARNING;
SELECT rebalance_table_shards(shard_transfer_mode=>'block_writes');
rebalance_table_shards
---------------------------------------------------------------------
(1 row)
SET client_min_messages TO NOTICE;
SELECT public.verify_placements_in_shard_group_isolated('isolate_placement.single_shard_1', 1) = true;
?column?
---------------------------------------------------------------------
t
(1 row)
SELECT public.verify_placements_in_shard_group_isolated('isolate_placement.dist_1', 1) = true;
?column?
---------------------------------------------------------------------
t
(1 row)
SET client_min_messages TO WARNING;
SELECT rebalance_table_shards('isolate_placement.dist_1', shard_transfer_mode=>'block_writes');
rebalance_table_shards
---------------------------------------------------------------------
(1 row)
SET client_min_messages TO NOTICE;
-- Make sure that calling the rebalancer specifically for dist_1 doesn't
-- break the placement separation rules.
SELECT public.verify_placements_in_shard_group_isolated('isolate_placement.single_shard_1', 1) = true;
?column?
---------------------------------------------------------------------
t
(1 row)
SELECT public.verify_placements_in_shard_group_isolated('isolate_placement.dist_1', 1) = true;
?column?
---------------------------------------------------------------------
t
(1 row)
DROP TABLE dist_1, single_shard_1;
SELECT citus_set_node_property('localhost', :master_port, 'shouldhaveshards', false);
citus_set_node_property
---------------------------------------------------------------------
(1 row)
SET client_min_messages TO WARNING; SET client_min_messages TO WARNING;
DROP SCHEMA isolate_placement CASCADE; DROP SCHEMA isolate_placement CASCADE;
DROP FUNCTION public.verify_placements_in_shard_group_isolated(text, bigint); DROP FUNCTION public.verify_placements_in_shard_group_isolated(text, bigint);

View File

@ -1104,6 +1104,41 @@ DELETE FROM pg_catalog.pg_dist_rebalance_strategy WHERE name='test_isolate_place
DROP TABLE single_shard_1, single_shard_3; DROP TABLE single_shard_1, single_shard_3;
SELECT citus_set_node_property('localhost', :master_port, 'shouldhaveshards', true);
CREATE TABLE dist_1(a int);
SELECT create_distributed_table('dist_1', 'a', shard_count=>4);
SELECT shardids[1] AS shardgroup_1_shardid
FROM public.get_enumerated_shard_groups('isolate_placement.dist_1')
WHERE shardgroupindex = 1 \gset
SELECT citus_shard_property_set(:shardgroup_1_shardid, anti_affinity=>true);
CREATE TABLE single_shard_1(a int);
SELECT create_distributed_table('single_shard_1', null, colocate_with=>'none');
SELECT citus_shard_property_set(shardid, anti_affinity=>true) FROM pg_dist_shard WHERE logicalrelid IN ('single_shard_1'::regclass);
SET client_min_messages TO WARNING;
SELECT rebalance_table_shards(shard_transfer_mode=>'block_writes');
SET client_min_messages TO NOTICE;
SELECT public.verify_placements_in_shard_group_isolated('isolate_placement.single_shard_1', 1) = true;
SELECT public.verify_placements_in_shard_group_isolated('isolate_placement.dist_1', 1) = true;
SET client_min_messages TO WARNING;
SELECT rebalance_table_shards('isolate_placement.dist_1', shard_transfer_mode=>'block_writes');
SET client_min_messages TO NOTICE;
-- Make sure that calling the rebalancer specifically for dist_1 doesn't
-- break the placement separation rules.
SELECT public.verify_placements_in_shard_group_isolated('isolate_placement.single_shard_1', 1) = true;
SELECT public.verify_placements_in_shard_group_isolated('isolate_placement.dist_1', 1) = true;
DROP TABLE dist_1, single_shard_1;
SELECT citus_set_node_property('localhost', :master_port, 'shouldhaveshards', false);
SET client_min_messages TO WARNING; SET client_min_messages TO WARNING;
DROP SCHEMA isolate_placement CASCADE; DROP SCHEMA isolate_placement CASCADE;
DROP FUNCTION public.verify_placements_in_shard_group_isolated(text, bigint); DROP FUNCTION public.verify_placements_in_shard_group_isolated(text, bigint);