diff --git a/src/backend/distributed/operations/rebalancer_placement_separation.c b/src/backend/distributed/operations/rebalancer_placement_separation.c index e7658fc80..82fec2949 100644 --- a/src/backend/distributed/operations/rebalancer_placement_separation.c +++ b/src/backend/distributed/operations/rebalancer_placement_separation.c @@ -11,9 +11,11 @@ #include "postgres.h" +#include "access/genam.h" #include "nodes/pg_list.h" #include "utils/hsearch.h" #include "utils/lsyscache.h" +#include "utils/rel.h" #include "distributed/colocation_utils.h" #include "distributed/hash_helpers.h" @@ -21,6 +23,7 @@ #include "distributed/metadata_cache.h" #include "distributed/metadata_utility.h" #include "distributed/multi_physical_planner.h" +#include "distributed/pg_dist_placement.h" #include "distributed/rebalancer_placement_separation.h" #include "distributed/shard_rebalancer.h" @@ -135,6 +138,75 @@ PrepareRebalancerPlacementSeparationContext(List *activeWorkerNodeList, } +/* + * LoadAllShardgroupPlacements loads all shardgroup placements present in the system. + */ +static HTAB * +LoadAllShardgroupPlacements() +{ + HTAB *shardgroupPlacementSet = CreateSimpleHashSet(ShardgroupPlacement); + + Relation pgDistPlacement = table_open(DistPlacementRelationId(), AccessShareLock); + SysScanDesc scanDescriptor = systable_beginscan(pgDistPlacement, InvalidOid, false, + NULL, 0, NULL); + + HeapTuple heapTuple = NULL; + while (HeapTupleIsValid(heapTuple = systable_getnext(scanDescriptor))) + { + Form_pg_dist_placement placement = (Form_pg_dist_placement) GETSTRUCT(heapTuple); + + ShardInterval *shardInterval = LoadShardInterval(placement->shardid); + Oid citusTableId = shardInterval->relationId; + CitusTableCacheEntry *cacheEntry = LookupCitusTableCacheEntry(citusTableId); + if (cacheEntry == NULL || !IsCitusTableTypeCacheEntry(cacheEntry, + DISTRIBUTED_TABLE)) + { + /* we only want placement entries for distributed tables */ + continue; + } + + ShardgroupPlacement findShardgroupPlacement = { + .colocatationId = cacheEntry->colocationId, + .shardIntervalIndex = shardInterval->shardIndex, + .nodeGroupId = placement->groupid, + }; + + /* add ShardgroupPlacement to Set */ + hash_search(shardgroupPlacementSet, &findShardgroupPlacement, HASH_ENTER, NULL); + + /* there are potentially many shardgroups, lets cleanup the copies */ + pfree(shardInterval); + } + + systable_endscan(scanDescriptor); + table_close(pgDistPlacement, NoLock); + + return shardgroupPlacementSet; +} + + +static HTAB * +ShardPlacementListToShardgroupPlacementSet(List *shardPlacementList) +{ + HTAB *shardgroupPlacementSet = CreateSimpleHashSet(ShardgroupPlacement); + + ShardPlacement *shardPlacement = NULL; + foreach_ptr(shardPlacement, shardPlacementList) + { + ShardgroupPlacement findShardgroupPlacement = { + .colocatationId = shardPlacement->colocationGroupId, + .shardIntervalIndex = shardPlacement->shardIndex, + .nodeGroupId = shardPlacement->groupId, + }; + + /* add ShardgroupPlacement to Set */ + hash_search(shardgroupPlacementSet, &findShardgroupPlacement, HASH_ENTER, NULL); + } + + return shardgroupPlacementSet; +} + + /* * InitRebalancerPlacementSeparationContext initializes given * RebalancerPlacementSeparationContext by using given list @@ -148,9 +220,12 @@ InitRebalancerPlacementSeparationContext(RebalancerPlacementSeparationContext *c { HTAB *nodePlacementGroupHash = context->nodePlacementGroupHash; + /* List *placementListUniqueNodeGroupIds = PlacementListGetUniqueNodeGroupIds(rebalancePlacementList); + */ + /* create entry for every worker node so nodes without placements are present */ WorkerNode *workerNode = NULL; foreach_ptr(workerNode, activeWorkerNodeList) { @@ -162,51 +237,41 @@ InitRebalancerPlacementSeparationContext(RebalancerPlacementSeparationContext *c nodePlacementGroupHashEntry->hasPlacementsThatCannotBeMovedAway = false; nodePlacementGroupHashEntry->assignedPlacementGroup = NULL; - /* - * Lets call set of the nodes that placements in rebalancePlacementList - * are stored on as D and the others as S. In other words, D is the set - * of the nodes that we're allowed to move the placements "from" or - * "to (*)" (* = if we're not draining it) and S is the set of the nodes - * that we're only allowed to move the placements "to" but not "from". - * - * This means that, for a node of type S, the fact that whether the node - * is used to separate a placement group or not cannot be changed in the - * runtime. - * - * For this reason, below we find out the assigned placement groups for - * nodes of type S because we want to avoid from moving the placements - * (if any) from a node of type D to a node that is used to separate a - * placement group within S. We also set hasPlacementsThatCannotBeMovedAway - * to true for the nodes that already have some shard placements within S - * because we want to avoid from moving the placements that need a separate - * node (if any) from node D to node S. - * - * We skip below code for nodes of type D not because optimization purposes - * but because it would be "incorrect" to assume that "current placement - * distribution for a node of type D would be the same" after the rebalancer - * plans the moves. - */ - - if (!workerNode->shouldHaveShards) + if (!nodePlacementGroupHashEntry->shouldHaveShards) { - /* we can't assing any shardgroup placements to the node anyway */ continue; } - if (list_member_int(placementListUniqueNodeGroupIds, workerNode->groupId)) - { - /* node is of type D */ - continue; - } - - /* node is of type S */ - nodePlacementGroupHashEntry->hasPlacementsThatCannotBeMovedAway = - NodeGroupHasDistributedTableShardPlacements( - nodePlacementGroupHashEntry->nodeGroupId); nodePlacementGroupHashEntry->assignedPlacementGroup = NodeGroupGetSeparatedShardgroupPlacement( nodePlacementGroupHashEntry->nodeGroupId); } + + HTAB *allShardgroupPlacementsSet = LoadAllShardgroupPlacements(); + HTAB *balancingShardgroupPlacementsSet = + ShardPlacementListToShardgroupPlacementSet(rebalancePlacementList); + + /* iterate over all shardgroups to find nodes that have shardgroups not balancing */ + HASH_SEQ_STATUS status = { 0 }; + ShardgroupPlacement *entry = NULL; + + hash_seq_init(&status, allShardgroupPlacementsSet); + while ((entry = (ShardgroupPlacement *) hash_seq_search(&status)) != NULL) + { + bool found = false; + hash_search(balancingShardgroupPlacementsSet, entry, HASH_FIND, &found); + if (found) + { + /* we are balancing this shardgroup placement, skip */ + continue; + } + + /* we have a ShardgroupPlacement we are not balancing, marking node as such */ + NodeToPlacementGroupHashEntry *nodePlacementGroupHashEntry = + hash_search(nodePlacementGroupHash, &entry->nodeGroupId, HASH_ENTER, NULL); + + nodePlacementGroupHashEntry->hasPlacementsThatCannotBeMovedAway = true; + } } diff --git a/src/backend/distributed/operations/shard_rebalancer.c b/src/backend/distributed/operations/shard_rebalancer.c index 06c188bb1..b52527d67 100644 --- a/src/backend/distributed/operations/shard_rebalancer.c +++ b/src/backend/distributed/operations/shard_rebalancer.c @@ -471,6 +471,8 @@ FullShardPlacementList(Oid relationId, ArrayType *excludedShardArray) placement->shardId = groupPlacement->shardId; placement->shardLength = groupPlacement->shardLength; placement->groupId = groupPlacement->groupId; + placement->colocationGroupId = citusTableCacheEntry->colocationId; + placement->shardIndex = shardInterval->shardIndex; placement->nodeId = worker->nodeId; placement->nodeName = pstrdup(worker->workerName); placement->nodePort = worker->workerPort; diff --git a/src/include/distributed/metadata_utility.h b/src/include/distributed/metadata_utility.h index 1fbce7534..84c71e477 100644 --- a/src/include/distributed/metadata_utility.h +++ b/src/include/distributed/metadata_utility.h @@ -100,6 +100,7 @@ typedef struct ShardPlacement uint32 nodeId; char partitionMethod; uint32 colocationGroupId; + int shardIndex; uint32 representativeValue; } ShardPlacement;