mirror of https://github.com/citusdata/citus.git
improve
parent
d541f64e3c
commit
6e9fc45b97
|
@ -68,24 +68,24 @@ typedef struct
|
||||||
* NULL if no shard placement group is assigned yet.
|
* NULL if no shard placement group is assigned yet.
|
||||||
*/
|
*/
|
||||||
ShardPlacementGroup *assignedPlacementGroup;
|
ShardPlacementGroup *assignedPlacementGroup;
|
||||||
} NodePlacementGroupHashEntry;
|
} NodeToPlacementGroupHashEntry;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Routines to prepare a hash table where each entry is of type
|
* Routines to prepare a hash table where each entry is of type
|
||||||
* NodePlacementGroupHashEntry.
|
* NodeToPlacementGroupHashEntry.
|
||||||
*/
|
*/
|
||||||
static void NodePlacementGroupHashInit(HTAB *nodePlacementGroupHash,
|
static void NodeToPlacementGroupHashInit(HTAB *nodePlacementGroupHash,
|
||||||
List *workerNodeList,
|
List *workerNodeList,
|
||||||
WorkerNode *drainWorkerNode);
|
WorkerNode *drainWorkerNode);
|
||||||
static void NodePlacementGroupHashAssignNodes(HTAB *nodePlacementGroupHash,
|
static void NodeToPlacementGroupHashAssignNodes(HTAB *nodePlacementGroupHash,
|
||||||
List *workerNodeList,
|
List *workerNodeList,
|
||||||
List *shardPlacementList,
|
List *shardPlacementList,
|
||||||
FmgrInfo *shardAllowedOnNodeUDF);
|
FmgrInfo *shardAllowedOnNodeUDF);
|
||||||
static bool NodePlacementGroupHashAssignNode(HTAB *nodePlacementGroupHash,
|
static bool NodeToPlacementGroupHashAssignNode(HTAB *nodePlacementGroupHash,
|
||||||
int32 nodeGroupId,
|
int32 nodeGroupId,
|
||||||
ShardPlacement *shardPlacement,
|
ShardPlacement *shardPlacement,
|
||||||
FmgrInfo *shardAllowedOnNodeUDF);
|
FmgrInfo *shardAllowedOnNodeUDF);
|
||||||
static NodePlacementGroupHashEntry * NodePlacementGroupHashGetNodeWithGroupId(
|
static NodeToPlacementGroupHashEntry * NodeToPlacementGroupHashGetNodeWithGroupId(
|
||||||
HTAB *nodePlacementGroupHash,
|
HTAB *nodePlacementGroupHash,
|
||||||
int32
|
int32
|
||||||
nodeGroupId);
|
nodeGroupId);
|
||||||
|
@ -107,17 +107,17 @@ PrepareRebalancerPlacementIsolationContext(List *activeWorkerNodeList,
|
||||||
FmgrInfo *shardAllowedOnNodeUDF)
|
FmgrInfo *shardAllowedOnNodeUDF)
|
||||||
{
|
{
|
||||||
HTAB *nodePlacementGroupHash =
|
HTAB *nodePlacementGroupHash =
|
||||||
CreateSimpleHashWithNameAndSize(uint32, NodePlacementGroupHashEntry,
|
CreateSimpleHashWithNameAndSize(uint32, NodeToPlacementGroupHashEntry,
|
||||||
"NodePlacementGroupHash",
|
"NodeToPlacementGroupHash",
|
||||||
list_length(activeWorkerNodeList));
|
list_length(activeWorkerNodeList));
|
||||||
|
|
||||||
activeWorkerNodeList = SortList(activeWorkerNodeList, CompareWorkerNodes);
|
activeWorkerNodeList = SortList(activeWorkerNodeList, CompareWorkerNodes);
|
||||||
activeShardPlacementList = SortList(activeShardPlacementList, CompareShardPlacements);
|
activeShardPlacementList = SortList(activeShardPlacementList, CompareShardPlacements);
|
||||||
|
|
||||||
NodePlacementGroupHashInit(nodePlacementGroupHash, activeWorkerNodeList,
|
NodeToPlacementGroupHashInit(nodePlacementGroupHash, activeWorkerNodeList,
|
||||||
drainWorkerNode);
|
drainWorkerNode);
|
||||||
|
|
||||||
NodePlacementGroupHashAssignNodes(nodePlacementGroupHash,
|
NodeToPlacementGroupHashAssignNodes(nodePlacementGroupHash,
|
||||||
activeWorkerNodeList,
|
activeWorkerNodeList,
|
||||||
activeShardPlacementList,
|
activeShardPlacementList,
|
||||||
shardAllowedOnNodeUDF);
|
shardAllowedOnNodeUDF);
|
||||||
|
@ -131,12 +131,12 @@ PrepareRebalancerPlacementIsolationContext(List *activeWorkerNodeList,
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* NodePlacementGroupHashInit initializes given hash table where each
|
* NodeToPlacementGroupHashInit initializes given hash table where each
|
||||||
* entry is of type NodePlacementGroupHashEntry by using given list
|
* entry is of type NodeToPlacementGroupHashEntry by using given list
|
||||||
* of worker nodes and the worker node that is being drained, if specified.
|
* of worker nodes and the worker node that is being drained, if specified.
|
||||||
*/
|
*/
|
||||||
static void
|
static void
|
||||||
NodePlacementGroupHashInit(HTAB *nodePlacementGroupHash, List *workerNodeList,
|
NodeToPlacementGroupHashInit(HTAB *nodePlacementGroupHash, List *workerNodeList,
|
||||||
WorkerNode *drainWorkerNode)
|
WorkerNode *drainWorkerNode)
|
||||||
{
|
{
|
||||||
bool drainSingleNode = drainWorkerNode != NULL;
|
bool drainSingleNode = drainWorkerNode != NULL;
|
||||||
|
@ -144,7 +144,7 @@ NodePlacementGroupHashInit(HTAB *nodePlacementGroupHash, List *workerNodeList,
|
||||||
WorkerNode *workerNode = NULL;
|
WorkerNode *workerNode = NULL;
|
||||||
foreach_ptr(workerNode, workerNodeList)
|
foreach_ptr(workerNode, workerNodeList)
|
||||||
{
|
{
|
||||||
NodePlacementGroupHashEntry *nodePlacementGroupHashEntry =
|
NodeToPlacementGroupHashEntry *nodePlacementGroupHashEntry =
|
||||||
hash_search(nodePlacementGroupHash, &workerNode->groupId, HASH_ENTER,
|
hash_search(nodePlacementGroupHash, &workerNode->groupId, HASH_ENTER,
|
||||||
NULL);
|
NULL);
|
||||||
|
|
||||||
|
@ -175,7 +175,7 @@ NodePlacementGroupHashInit(HTAB *nodePlacementGroupHash, List *workerNodeList,
|
||||||
* shard placements that are stored on D, not a list that contains all
|
* shard placements that are stored on D, not a list that contains all
|
||||||
* the placements accross the cluster (because we want to limit node
|
* the placements accross the cluster (because we want to limit node
|
||||||
* draining to that node in that case). Note that when all shard
|
* draining to that node in that case). Note that when all shard
|
||||||
* placements in the cluster are provided, NodePlacementGroupHashAssignNodes()
|
* placements in the cluster are provided, NodeToPlacementGroupHashAssignNodes()
|
||||||
* would already be aware of which node is used to separate which shard
|
* would already be aware of which node is used to separate which shard
|
||||||
* placement group or which node is used to store some regular shard
|
* placement group or which node is used to store some regular shard
|
||||||
* placements. That is why we skip below code if we're not draining a
|
* placements. That is why we skip below code if we're not draining a
|
||||||
|
@ -216,11 +216,11 @@ NodePlacementGroupHashInit(HTAB *nodePlacementGroupHash, List *workerNodeList,
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* NodePlacementGroupHashAssignNodes assigns all active shard placements in
|
* NodeToPlacementGroupHashAssignNodes assigns all active shard placements in
|
||||||
* the cluster that need separate nodes to individual worker nodes.
|
* the cluster that need separate nodes to individual worker nodes.
|
||||||
*/
|
*/
|
||||||
static void
|
static void
|
||||||
NodePlacementGroupHashAssignNodes(HTAB *nodePlacementGroupHash,
|
NodeToPlacementGroupHashAssignNodes(HTAB *nodePlacementGroupHash,
|
||||||
List *workerNodeList,
|
List *workerNodeList,
|
||||||
List *shardPlacementList,
|
List *shardPlacementList,
|
||||||
FmgrInfo *shardAllowedOnNodeUDF)
|
FmgrInfo *shardAllowedOnNodeUDF)
|
||||||
|
@ -241,15 +241,20 @@ NodePlacementGroupHashAssignNodes(HTAB *nodePlacementGroupHash,
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32 assignGroupId = shardPlacement->groupId;
|
int32 shardPlacementGroupId = shardPlacement->groupId;
|
||||||
if (NodePlacementGroupHashAssignNode(nodePlacementGroupHash,
|
if (NodeToPlacementGroupHashAssignNode(nodePlacementGroupHash,
|
||||||
assignGroupId,
|
shardPlacementGroupId,
|
||||||
shardPlacement,
|
shardPlacement,
|
||||||
shardAllowedOnNodeUDF))
|
shardAllowedOnNodeUDF))
|
||||||
{
|
{
|
||||||
|
/*
|
||||||
|
* NodeToPlacementGroupHashAssignNode() succeeds for each worker node
|
||||||
|
* once, hence we must not have removed the worker node from the list
|
||||||
|
* yet, and WorkerNodeListGetNodeWithGroupId() ensures that already.
|
||||||
|
*/
|
||||||
int currentPlacementNodeIdx =
|
int currentPlacementNodeIdx =
|
||||||
WorkerNodeListGetNodeWithGroupId(availableWorkerList,
|
WorkerNodeListGetNodeWithGroupId(availableWorkerList,
|
||||||
assignGroupId);
|
shardPlacementGroupId);
|
||||||
availableWorkerList = list_delete_nth_cell(availableWorkerList,
|
availableWorkerList = list_delete_nth_cell(availableWorkerList,
|
||||||
currentPlacementNodeIdx);
|
currentPlacementNodeIdx);
|
||||||
}
|
}
|
||||||
|
@ -275,7 +280,7 @@ NodePlacementGroupHashAssignNodes(HTAB *nodePlacementGroupHash,
|
||||||
(WorkerNode *) list_nth(availableWorkerList, availableNodeIdx);
|
(WorkerNode *) list_nth(availableWorkerList, availableNodeIdx);
|
||||||
availableNodeIdx++;
|
availableNodeIdx++;
|
||||||
|
|
||||||
if (NodePlacementGroupHashAssignNode(nodePlacementGroupHash,
|
if (NodeToPlacementGroupHashAssignNode(nodePlacementGroupHash,
|
||||||
availableWorkerNode->groupId,
|
availableWorkerNode->groupId,
|
||||||
unassignedShardPlacement,
|
unassignedShardPlacement,
|
||||||
shardAllowedOnNodeUDF))
|
shardAllowedOnNodeUDF))
|
||||||
|
@ -296,35 +301,28 @@ NodePlacementGroupHashAssignNodes(HTAB *nodePlacementGroupHash,
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* NodePlacementGroupHashAssignNode is an helper to
|
* NodeToPlacementGroupHashAssignNode is an helper to
|
||||||
* NodePlacementGroupHashAssignNodes that tries to assign given
|
* NodeToPlacementGroupHashAssignNodes that tries to assign given
|
||||||
* shard placement to given node and returns true if it succeeds.
|
* shard placement to given node and returns true if it succeeds.
|
||||||
*/
|
*/
|
||||||
static bool
|
static bool
|
||||||
NodePlacementGroupHashAssignNode(HTAB *nodePlacementGroupHash,
|
NodeToPlacementGroupHashAssignNode(HTAB *nodePlacementGroupHash,
|
||||||
int32 nodeGroupId,
|
int32 nodeGroupId,
|
||||||
ShardPlacement *shardPlacement,
|
ShardPlacement *shardPlacement,
|
||||||
FmgrInfo *shardAllowedOnNodeUDF)
|
FmgrInfo *shardAllowedOnNodeUDF)
|
||||||
{
|
{
|
||||||
NodePlacementGroupHashEntry *nodePlacementGroupHashEntry =
|
NodeToPlacementGroupHashEntry *nodePlacementGroupHashEntry =
|
||||||
NodePlacementGroupHashGetNodeWithGroupId(nodePlacementGroupHash, nodeGroupId);
|
NodeToPlacementGroupHashGetNodeWithGroupId(nodePlacementGroupHash, nodeGroupId);
|
||||||
|
|
||||||
ShardPlacementGroup *placementGroup =
|
|
||||||
GetShardPlacementGroupForPlacement(shardPlacement->shardId,
|
|
||||||
shardPlacement->placementId);
|
|
||||||
|
|
||||||
if (nodePlacementGroupHashEntry->assignedPlacementGroup)
|
if (nodePlacementGroupHashEntry->assignedPlacementGroup)
|
||||||
{
|
{
|
||||||
/*
|
/*
|
||||||
* Right now callers of this function call it once for each distinct
|
* Right now callers of this function call it once for each distinct
|
||||||
* shard placement group, hence we assume that placementGroup and
|
* shard placement group, hence we assume that shard placement group
|
||||||
|
* that given shard placement belongs to and
|
||||||
* nodePlacementGroupHashEntry->assignedPlacementGroup cannot be the
|
* nodePlacementGroupHashEntry->assignedPlacementGroup cannot be the
|
||||||
* same already, unless no shard placement group is assigned to this
|
* same, without checking.
|
||||||
* node yet.
|
|
||||||
*/
|
*/
|
||||||
Assert(!ShardPlacementGroupsSame(placementGroup,
|
|
||||||
nodePlacementGroupHashEntry->
|
|
||||||
assignedPlacementGroup));
|
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -346,7 +344,9 @@ NodePlacementGroupHashAssignNode(HTAB *nodePlacementGroupHash,
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
nodePlacementGroupHashEntry->assignedPlacementGroup = placementGroup;
|
nodePlacementGroupHashEntry->assignedPlacementGroup =
|
||||||
|
GetShardPlacementGroupForPlacement(shardPlacement->shardId,
|
||||||
|
shardPlacement->placementId);
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
@ -365,8 +365,8 @@ RebalancerPlacementIsolationContextPlacementIsAllowedOnWorker(
|
||||||
WorkerNode *workerNode)
|
WorkerNode *workerNode)
|
||||||
{
|
{
|
||||||
HTAB *nodePlacementGroupHash = context->nodePlacementGroupHash;
|
HTAB *nodePlacementGroupHash = context->nodePlacementGroupHash;
|
||||||
NodePlacementGroupHashEntry *nodePlacementGroupHashEntry =
|
NodeToPlacementGroupHashEntry *nodePlacementGroupHashEntry =
|
||||||
NodePlacementGroupHashGetNodeWithGroupId(nodePlacementGroupHash,
|
NodeToPlacementGroupHashGetNodeWithGroupId(nodePlacementGroupHash,
|
||||||
workerNode->groupId);
|
workerNode->groupId);
|
||||||
|
|
||||||
ShardInterval *shardInterval = LoadShardInterval(shardId);
|
ShardInterval *shardInterval = LoadShardInterval(shardId);
|
||||||
|
@ -384,24 +384,30 @@ RebalancerPlacementIsolationContextPlacementIsAllowedOnWorker(
|
||||||
* Given shard placement needs a separate node.
|
* Given shard placement needs a separate node.
|
||||||
* Check if given worker node is the one that is assigned to separate it.
|
* Check if given worker node is the one that is assigned to separate it.
|
||||||
*/
|
*/
|
||||||
|
if (nodePlacementGroupHashEntry->assignedPlacementGroup == NULL)
|
||||||
|
{
|
||||||
|
/* the node is not supposed to separate a placement group */
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
ShardPlacementGroup *placementGroup =
|
ShardPlacementGroup *placementGroup =
|
||||||
GetShardPlacementGroupForPlacement(shardId, placementId);
|
GetShardPlacementGroupForPlacement(shardId, placementId);
|
||||||
return nodePlacementGroupHashEntry->assignedPlacementGroup != NULL &&
|
return ShardPlacementGroupsSame(nodePlacementGroupHashEntry->assignedPlacementGroup,
|
||||||
ShardPlacementGroupsSame(nodePlacementGroupHashEntry->assignedPlacementGroup,
|
|
||||||
placementGroup);
|
placementGroup);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* NodePlacementGroupHashGetNodeWithGroupId searches given hash table for
|
* NodeToPlacementGroupHashGetNodeWithGroupId searches given hash table for
|
||||||
* NodePlacementGroupHashEntry with given node id and returns it.
|
* NodeToPlacementGroupHashEntry with given node id and returns it.
|
||||||
*
|
*
|
||||||
* Throws an error if no such entry is found.
|
* Throws an error if no such entry is found.
|
||||||
*/
|
*/
|
||||||
static NodePlacementGroupHashEntry *
|
static NodeToPlacementGroupHashEntry *
|
||||||
NodePlacementGroupHashGetNodeWithGroupId(HTAB *nodePlacementGroupHash, int32 nodeGroupId)
|
NodeToPlacementGroupHashGetNodeWithGroupId(HTAB *nodePlacementGroupHash,
|
||||||
|
int32 nodeGroupId)
|
||||||
{
|
{
|
||||||
NodePlacementGroupHashEntry *nodePlacementGroupHashEntry =
|
NodeToPlacementGroupHashEntry *nodePlacementGroupHashEntry =
|
||||||
hash_search(nodePlacementGroupHash, &nodeGroupId, HASH_FIND, NULL);
|
hash_search(nodePlacementGroupHash, &nodeGroupId, HASH_FIND, NULL);
|
||||||
|
|
||||||
if (nodePlacementGroupHashEntry == NULL)
|
if (nodePlacementGroupHashEntry == NULL)
|
||||||
|
|
Loading…
Reference in New Issue