diff --git a/src/backend/distributed/operations/rebalancer_placement_isolation.c b/src/backend/distributed/operations/rebalancer_placement_isolation.c index 61ad3d4a9..7f352c9d3 100644 --- a/src/backend/distributed/operations/rebalancer_placement_isolation.c +++ b/src/backend/distributed/operations/rebalancer_placement_isolation.c @@ -68,24 +68,24 @@ typedef struct * NULL if no shard placement group is assigned yet. */ ShardPlacementGroup *assignedPlacementGroup; -} NodePlacementGroupHashEntry; +} NodeToPlacementGroupHashEntry; /* * Routines to prepare a hash table where each entry is of type - * NodePlacementGroupHashEntry. + * NodeToPlacementGroupHashEntry. */ -static void NodePlacementGroupHashInit(HTAB *nodePlacementGroupHash, - List *workerNodeList, - WorkerNode *drainWorkerNode); -static void NodePlacementGroupHashAssignNodes(HTAB *nodePlacementGroupHash, - List *workerNodeList, - List *shardPlacementList, - FmgrInfo *shardAllowedOnNodeUDF); -static bool NodePlacementGroupHashAssignNode(HTAB *nodePlacementGroupHash, - int32 nodeGroupId, - ShardPlacement *shardPlacement, - FmgrInfo *shardAllowedOnNodeUDF); -static NodePlacementGroupHashEntry * NodePlacementGroupHashGetNodeWithGroupId( +static void NodeToPlacementGroupHashInit(HTAB *nodePlacementGroupHash, + List *workerNodeList, + WorkerNode *drainWorkerNode); +static void NodeToPlacementGroupHashAssignNodes(HTAB *nodePlacementGroupHash, + List *workerNodeList, + List *shardPlacementList, + FmgrInfo *shardAllowedOnNodeUDF); +static bool NodeToPlacementGroupHashAssignNode(HTAB *nodePlacementGroupHash, + int32 nodeGroupId, + ShardPlacement *shardPlacement, + FmgrInfo *shardAllowedOnNodeUDF); +static NodeToPlacementGroupHashEntry * NodeToPlacementGroupHashGetNodeWithGroupId( HTAB *nodePlacementGroupHash, int32 nodeGroupId); @@ -107,20 +107,20 @@ PrepareRebalancerPlacementIsolationContext(List *activeWorkerNodeList, FmgrInfo *shardAllowedOnNodeUDF) { HTAB *nodePlacementGroupHash = - CreateSimpleHashWithNameAndSize(uint32, NodePlacementGroupHashEntry, - "NodePlacementGroupHash", + CreateSimpleHashWithNameAndSize(uint32, NodeToPlacementGroupHashEntry, + "NodeToPlacementGroupHash", list_length(activeWorkerNodeList)); activeWorkerNodeList = SortList(activeWorkerNodeList, CompareWorkerNodes); activeShardPlacementList = SortList(activeShardPlacementList, CompareShardPlacements); - NodePlacementGroupHashInit(nodePlacementGroupHash, activeWorkerNodeList, - drainWorkerNode); + NodeToPlacementGroupHashInit(nodePlacementGroupHash, activeWorkerNodeList, + drainWorkerNode); - NodePlacementGroupHashAssignNodes(nodePlacementGroupHash, - activeWorkerNodeList, - activeShardPlacementList, - shardAllowedOnNodeUDF); + NodeToPlacementGroupHashAssignNodes(nodePlacementGroupHash, + activeWorkerNodeList, + activeShardPlacementList, + shardAllowedOnNodeUDF); RebalancerPlacementIsolationContext *context = palloc(sizeof(RebalancerPlacementIsolationContext)); @@ -131,20 +131,20 @@ PrepareRebalancerPlacementIsolationContext(List *activeWorkerNodeList, /* - * NodePlacementGroupHashInit initializes given hash table where each - * entry is of type NodePlacementGroupHashEntry by using given list + * NodeToPlacementGroupHashInit initializes given hash table where each + * entry is of type NodeToPlacementGroupHashEntry by using given list * of worker nodes and the worker node that is being drained, if specified. */ static void -NodePlacementGroupHashInit(HTAB *nodePlacementGroupHash, List *workerNodeList, - WorkerNode *drainWorkerNode) +NodeToPlacementGroupHashInit(HTAB *nodePlacementGroupHash, List *workerNodeList, + WorkerNode *drainWorkerNode) { bool drainSingleNode = drainWorkerNode != NULL; WorkerNode *workerNode = NULL; foreach_ptr(workerNode, workerNodeList) { - NodePlacementGroupHashEntry *nodePlacementGroupHashEntry = + NodeToPlacementGroupHashEntry *nodePlacementGroupHashEntry = hash_search(nodePlacementGroupHash, &workerNode->groupId, HASH_ENTER, NULL); @@ -175,7 +175,7 @@ NodePlacementGroupHashInit(HTAB *nodePlacementGroupHash, List *workerNodeList, * shard placements that are stored on D, not a list that contains all * the placements accross the cluster (because we want to limit node * draining to that node in that case). Note that when all shard - * placements in the cluster are provided, NodePlacementGroupHashAssignNodes() + * placements in the cluster are provided, NodeToPlacementGroupHashAssignNodes() * would already be aware of which node is used to separate which shard * placement group or which node is used to store some regular shard * placements. That is why we skip below code if we're not draining a @@ -216,14 +216,14 @@ NodePlacementGroupHashInit(HTAB *nodePlacementGroupHash, List *workerNodeList, /* - * NodePlacementGroupHashAssignNodes assigns all active shard placements in + * NodeToPlacementGroupHashAssignNodes assigns all active shard placements in * the cluster that need separate nodes to individual worker nodes. */ static void -NodePlacementGroupHashAssignNodes(HTAB *nodePlacementGroupHash, - List *workerNodeList, - List *shardPlacementList, - FmgrInfo *shardAllowedOnNodeUDF) +NodeToPlacementGroupHashAssignNodes(HTAB *nodePlacementGroupHash, + List *workerNodeList, + List *shardPlacementList, + FmgrInfo *shardAllowedOnNodeUDF) { List *availableWorkerList = list_copy(workerNodeList); List *unassignedShardPlacementList = NIL; @@ -241,15 +241,20 @@ NodePlacementGroupHashAssignNodes(HTAB *nodePlacementGroupHash, continue; } - int32 assignGroupId = shardPlacement->groupId; - if (NodePlacementGroupHashAssignNode(nodePlacementGroupHash, - assignGroupId, - shardPlacement, - shardAllowedOnNodeUDF)) + int32 shardPlacementGroupId = shardPlacement->groupId; + if (NodeToPlacementGroupHashAssignNode(nodePlacementGroupHash, + shardPlacementGroupId, + shardPlacement, + shardAllowedOnNodeUDF)) { + /* + * NodeToPlacementGroupHashAssignNode() succeeds for each worker node + * once, hence we must not have removed the worker node from the list + * yet, and WorkerNodeListGetNodeWithGroupId() ensures that already. + */ int currentPlacementNodeIdx = WorkerNodeListGetNodeWithGroupId(availableWorkerList, - assignGroupId); + shardPlacementGroupId); availableWorkerList = list_delete_nth_cell(availableWorkerList, currentPlacementNodeIdx); } @@ -275,10 +280,10 @@ NodePlacementGroupHashAssignNodes(HTAB *nodePlacementGroupHash, (WorkerNode *) list_nth(availableWorkerList, availableNodeIdx); availableNodeIdx++; - if (NodePlacementGroupHashAssignNode(nodePlacementGroupHash, - availableWorkerNode->groupId, - unassignedShardPlacement, - shardAllowedOnNodeUDF)) + if (NodeToPlacementGroupHashAssignNode(nodePlacementGroupHash, + availableWorkerNode->groupId, + unassignedShardPlacement, + shardAllowedOnNodeUDF)) { separated = true; break; @@ -296,35 +301,28 @@ NodePlacementGroupHashAssignNodes(HTAB *nodePlacementGroupHash, /* - * NodePlacementGroupHashAssignNode is an helper to - * NodePlacementGroupHashAssignNodes that tries to assign given + * NodeToPlacementGroupHashAssignNode is an helper to + * NodeToPlacementGroupHashAssignNodes that tries to assign given * shard placement to given node and returns true if it succeeds. */ static bool -NodePlacementGroupHashAssignNode(HTAB *nodePlacementGroupHash, - int32 nodeGroupId, - ShardPlacement *shardPlacement, - FmgrInfo *shardAllowedOnNodeUDF) +NodeToPlacementGroupHashAssignNode(HTAB *nodePlacementGroupHash, + int32 nodeGroupId, + ShardPlacement *shardPlacement, + FmgrInfo *shardAllowedOnNodeUDF) { - NodePlacementGroupHashEntry *nodePlacementGroupHashEntry = - NodePlacementGroupHashGetNodeWithGroupId(nodePlacementGroupHash, nodeGroupId); - - ShardPlacementGroup *placementGroup = - GetShardPlacementGroupForPlacement(shardPlacement->shardId, - shardPlacement->placementId); + NodeToPlacementGroupHashEntry *nodePlacementGroupHashEntry = + NodeToPlacementGroupHashGetNodeWithGroupId(nodePlacementGroupHash, nodeGroupId); if (nodePlacementGroupHashEntry->assignedPlacementGroup) { /* * Right now callers of this function call it once for each distinct - * shard placement group, hence we assume that placementGroup and + * shard placement group, hence we assume that shard placement group + * that given shard placement belongs to and * nodePlacementGroupHashEntry->assignedPlacementGroup cannot be the - * same already, unless no shard placement group is assigned to this - * node yet. + * same, without checking. */ - Assert(!ShardPlacementGroupsSame(placementGroup, - nodePlacementGroupHashEntry-> - assignedPlacementGroup)); return false; } @@ -346,7 +344,9 @@ NodePlacementGroupHashAssignNode(HTAB *nodePlacementGroupHash, return false; } - nodePlacementGroupHashEntry->assignedPlacementGroup = placementGroup; + nodePlacementGroupHashEntry->assignedPlacementGroup = + GetShardPlacementGroupForPlacement(shardPlacement->shardId, + shardPlacement->placementId); return true; } @@ -365,9 +365,9 @@ RebalancerPlacementIsolationContextPlacementIsAllowedOnWorker( WorkerNode *workerNode) { HTAB *nodePlacementGroupHash = context->nodePlacementGroupHash; - NodePlacementGroupHashEntry *nodePlacementGroupHashEntry = - NodePlacementGroupHashGetNodeWithGroupId(nodePlacementGroupHash, - workerNode->groupId); + NodeToPlacementGroupHashEntry *nodePlacementGroupHashEntry = + NodeToPlacementGroupHashGetNodeWithGroupId(nodePlacementGroupHash, + workerNode->groupId); ShardInterval *shardInterval = LoadShardInterval(shardId); if (!shardInterval->needsSeparateNode) @@ -384,24 +384,30 @@ RebalancerPlacementIsolationContextPlacementIsAllowedOnWorker( * Given shard placement needs a separate node. * Check if given worker node is the one that is assigned to separate it. */ + if (nodePlacementGroupHashEntry->assignedPlacementGroup == NULL) + { + /* the node is not supposed to separate a placement group */ + return false; + } + ShardPlacementGroup *placementGroup = GetShardPlacementGroupForPlacement(shardId, placementId); - return nodePlacementGroupHashEntry->assignedPlacementGroup != NULL && - ShardPlacementGroupsSame(nodePlacementGroupHashEntry->assignedPlacementGroup, + return ShardPlacementGroupsSame(nodePlacementGroupHashEntry->assignedPlacementGroup, placementGroup); } /* - * NodePlacementGroupHashGetNodeWithGroupId searches given hash table for - * NodePlacementGroupHashEntry with given node id and returns it. + * NodeToPlacementGroupHashGetNodeWithGroupId searches given hash table for + * NodeToPlacementGroupHashEntry with given node id and returns it. * * Throws an error if no such entry is found. */ -static NodePlacementGroupHashEntry * -NodePlacementGroupHashGetNodeWithGroupId(HTAB *nodePlacementGroupHash, int32 nodeGroupId) +static NodeToPlacementGroupHashEntry * +NodeToPlacementGroupHashGetNodeWithGroupId(HTAB *nodePlacementGroupHash, + int32 nodeGroupId) { - NodePlacementGroupHashEntry *nodePlacementGroupHashEntry = + NodeToPlacementGroupHashEntry *nodePlacementGroupHashEntry = hash_search(nodePlacementGroupHash, &nodeGroupId, HASH_FIND, NULL); if (nodePlacementGroupHashEntry == NULL)