stab at having a complete view of the cluster when assigning nodes

tenant-schema-isolation-complete-view
Nils Dijk 2023-11-02 14:07:38 +00:00
parent bcdcd7f455
commit 6a64c23e85
No known key found for this signature in database
GPG Key ID: CA1177EF9434F241
3 changed files with 105 additions and 37 deletions

View File

@ -11,9 +11,11 @@
#include "postgres.h" #include "postgres.h"
#include "access/genam.h"
#include "nodes/pg_list.h" #include "nodes/pg_list.h"
#include "utils/hsearch.h" #include "utils/hsearch.h"
#include "utils/lsyscache.h" #include "utils/lsyscache.h"
#include "utils/rel.h"
#include "distributed/colocation_utils.h" #include "distributed/colocation_utils.h"
#include "distributed/hash_helpers.h" #include "distributed/hash_helpers.h"
@ -21,6 +23,7 @@
#include "distributed/metadata_cache.h" #include "distributed/metadata_cache.h"
#include "distributed/metadata_utility.h" #include "distributed/metadata_utility.h"
#include "distributed/multi_physical_planner.h" #include "distributed/multi_physical_planner.h"
#include "distributed/pg_dist_placement.h"
#include "distributed/rebalancer_placement_separation.h" #include "distributed/rebalancer_placement_separation.h"
#include "distributed/shard_rebalancer.h" #include "distributed/shard_rebalancer.h"
@ -135,6 +138,75 @@ PrepareRebalancerPlacementSeparationContext(List *activeWorkerNodeList,
} }
/*
* LoadAllShardgroupPlacements loads all shardgroup placements present in the system.
*/
static HTAB *
LoadAllShardgroupPlacements()
{
HTAB *shardgroupPlacementSet = CreateSimpleHashSet(ShardgroupPlacement);
Relation pgDistPlacement = table_open(DistPlacementRelationId(), AccessShareLock);
SysScanDesc scanDescriptor = systable_beginscan(pgDistPlacement, InvalidOid, false,
NULL, 0, NULL);
HeapTuple heapTuple = NULL;
while (HeapTupleIsValid(heapTuple = systable_getnext(scanDescriptor)))
{
Form_pg_dist_placement placement = (Form_pg_dist_placement) GETSTRUCT(heapTuple);
ShardInterval *shardInterval = LoadShardInterval(placement->shardid);
Oid citusTableId = shardInterval->relationId;
CitusTableCacheEntry *cacheEntry = LookupCitusTableCacheEntry(citusTableId);
if (cacheEntry == NULL || !IsCitusTableTypeCacheEntry(cacheEntry,
DISTRIBUTED_TABLE))
{
/* we only want placement entries for distributed tables */
continue;
}
ShardgroupPlacement findShardgroupPlacement = {
.colocatationId = cacheEntry->colocationId,
.shardIntervalIndex = shardInterval->shardIndex,
.nodeGroupId = placement->groupid,
};
/* add ShardgroupPlacement to Set */
hash_search(shardgroupPlacementSet, &findShardgroupPlacement, HASH_ENTER, NULL);
/* there are potentially many shardgroups, lets cleanup the copies */
pfree(shardInterval);
}
systable_endscan(scanDescriptor);
table_close(pgDistPlacement, NoLock);
return shardgroupPlacementSet;
}
static HTAB *
ShardPlacementListToShardgroupPlacementSet(List *shardPlacementList)
{
HTAB *shardgroupPlacementSet = CreateSimpleHashSet(ShardgroupPlacement);
ShardPlacement *shardPlacement = NULL;
foreach_ptr(shardPlacement, shardPlacementList)
{
ShardgroupPlacement findShardgroupPlacement = {
.colocatationId = shardPlacement->colocationGroupId,
.shardIntervalIndex = shardPlacement->shardIndex,
.nodeGroupId = shardPlacement->groupId,
};
/* add ShardgroupPlacement to Set */
hash_search(shardgroupPlacementSet, &findShardgroupPlacement, HASH_ENTER, NULL);
}
return shardgroupPlacementSet;
}
/* /*
* InitRebalancerPlacementSeparationContext initializes given * InitRebalancerPlacementSeparationContext initializes given
* RebalancerPlacementSeparationContext by using given list * RebalancerPlacementSeparationContext by using given list
@ -148,9 +220,12 @@ InitRebalancerPlacementSeparationContext(RebalancerPlacementSeparationContext *c
{ {
HTAB *nodePlacementGroupHash = context->nodePlacementGroupHash; HTAB *nodePlacementGroupHash = context->nodePlacementGroupHash;
/*
List *placementListUniqueNodeGroupIds = List *placementListUniqueNodeGroupIds =
PlacementListGetUniqueNodeGroupIds(rebalancePlacementList); PlacementListGetUniqueNodeGroupIds(rebalancePlacementList);
*/
/* create entry for every worker node so nodes without placements are present */
WorkerNode *workerNode = NULL; WorkerNode *workerNode = NULL;
foreach_ptr(workerNode, activeWorkerNodeList) foreach_ptr(workerNode, activeWorkerNodeList)
{ {
@ -162,51 +237,41 @@ InitRebalancerPlacementSeparationContext(RebalancerPlacementSeparationContext *c
nodePlacementGroupHashEntry->hasPlacementsThatCannotBeMovedAway = false; nodePlacementGroupHashEntry->hasPlacementsThatCannotBeMovedAway = false;
nodePlacementGroupHashEntry->assignedPlacementGroup = NULL; nodePlacementGroupHashEntry->assignedPlacementGroup = NULL;
/* if (!nodePlacementGroupHashEntry->shouldHaveShards)
* Lets call set of the nodes that placements in rebalancePlacementList
* are stored on as D and the others as S. In other words, D is the set
* of the nodes that we're allowed to move the placements "from" or
* "to (*)" (* = if we're not draining it) and S is the set of the nodes
* that we're only allowed to move the placements "to" but not "from".
*
* This means that, for a node of type S, the fact that whether the node
* is used to separate a placement group or not cannot be changed in the
* runtime.
*
* For this reason, below we find out the assigned placement groups for
* nodes of type S because we want to avoid from moving the placements
* (if any) from a node of type D to a node that is used to separate a
* placement group within S. We also set hasPlacementsThatCannotBeMovedAway
* to true for the nodes that already have some shard placements within S
* because we want to avoid from moving the placements that need a separate
* node (if any) from node D to node S.
*
* We skip below code for nodes of type D not because optimization purposes
* but because it would be "incorrect" to assume that "current placement
* distribution for a node of type D would be the same" after the rebalancer
* plans the moves.
*/
if (!workerNode->shouldHaveShards)
{ {
/* we can't assing any shardgroup placements to the node anyway */
continue; continue;
} }
if (list_member_int(placementListUniqueNodeGroupIds, workerNode->groupId))
{
/* node is of type D */
continue;
}
/* node is of type S */
nodePlacementGroupHashEntry->hasPlacementsThatCannotBeMovedAway =
NodeGroupHasDistributedTableShardPlacements(
nodePlacementGroupHashEntry->nodeGroupId);
nodePlacementGroupHashEntry->assignedPlacementGroup = nodePlacementGroupHashEntry->assignedPlacementGroup =
NodeGroupGetSeparatedShardgroupPlacement( NodeGroupGetSeparatedShardgroupPlacement(
nodePlacementGroupHashEntry->nodeGroupId); nodePlacementGroupHashEntry->nodeGroupId);
} }
HTAB *allShardgroupPlacementsSet = LoadAllShardgroupPlacements();
HTAB *balancingShardgroupPlacementsSet =
ShardPlacementListToShardgroupPlacementSet(rebalancePlacementList);
/* iterate over all shardgroups to find nodes that have shardgroups not balancing */
HASH_SEQ_STATUS status = { 0 };
ShardgroupPlacement *entry = NULL;
hash_seq_init(&status, allShardgroupPlacementsSet);
while ((entry = (ShardgroupPlacement *) hash_seq_search(&status)) != NULL)
{
bool found = false;
hash_search(balancingShardgroupPlacementsSet, entry, HASH_FIND, &found);
if (found)
{
/* we are balancing this shardgroup placement, skip */
continue;
}
/* we have a ShardgroupPlacement we are not balancing, marking node as such */
NodeToPlacementGroupHashEntry *nodePlacementGroupHashEntry =
hash_search(nodePlacementGroupHash, &entry->nodeGroupId, HASH_ENTER, NULL);
nodePlacementGroupHashEntry->hasPlacementsThatCannotBeMovedAway = true;
}
} }

View File

@ -471,6 +471,8 @@ FullShardPlacementList(Oid relationId, ArrayType *excludedShardArray)
placement->shardId = groupPlacement->shardId; placement->shardId = groupPlacement->shardId;
placement->shardLength = groupPlacement->shardLength; placement->shardLength = groupPlacement->shardLength;
placement->groupId = groupPlacement->groupId; placement->groupId = groupPlacement->groupId;
placement->colocationGroupId = citusTableCacheEntry->colocationId;
placement->shardIndex = shardInterval->shardIndex;
placement->nodeId = worker->nodeId; placement->nodeId = worker->nodeId;
placement->nodeName = pstrdup(worker->workerName); placement->nodeName = pstrdup(worker->workerName);
placement->nodePort = worker->workerPort; placement->nodePort = worker->workerPort;

View File

@ -100,6 +100,7 @@ typedef struct ShardPlacement
uint32 nodeId; uint32 nodeId;
char partitionMethod; char partitionMethod;
uint32 colocationGroupId; uint32 colocationGroupId;
int shardIndex;
uint32 representativeValue; uint32 representativeValue;
} ShardPlacement; } ShardPlacement;