address feedback

tenant-schema-isolation-complete-view
Onur Tirtir 2023-10-16 13:21:47 +03:00
parent b0fa3d91bc
commit 2d16b0fd9e
13 changed files with 243 additions and 262 deletions

View File

@ -95,8 +95,6 @@ char *EnableManualMetadataChangesForUser = "";
int MetadataSyncTransMode = METADATA_SYNC_TRANSACTIONAL;
static Datum citus_internal_add_shard_metadata_internal(PG_FUNCTION_ARGS,
bool expectNeedsSeparateNode);
static void EnsureObjectMetadataIsSane(int distributionArgumentIndex,
int colocationId);
static List * GetFunctionDependenciesForObjects(ObjectAddress *objectAddress);
@ -169,7 +167,6 @@ PG_FUNCTION_INFO_V1(worker_record_sequence_dependency);
PG_FUNCTION_INFO_V1(citus_internal_add_partition_metadata);
PG_FUNCTION_INFO_V1(citus_internal_delete_partition_metadata);
PG_FUNCTION_INFO_V1(citus_internal_add_shard_metadata);
PG_FUNCTION_INFO_V1(citus_internal_add_shard_metadata_legacy);
PG_FUNCTION_INFO_V1(citus_internal_add_placement_metadata);
PG_FUNCTION_INFO_V1(citus_internal_delete_placement_metadata);
PG_FUNCTION_INFO_V1(citus_internal_add_placement_metadata_legacy);
@ -3226,33 +3223,6 @@ citus_internal_delete_partition_metadata(PG_FUNCTION_ARGS)
*/
Datum
citus_internal_add_shard_metadata(PG_FUNCTION_ARGS)
{
bool expectNeedsSeparateNode = true;
return citus_internal_add_shard_metadata_internal(fcinfo, expectNeedsSeparateNode);
}
/*
* citus_internal_add_shard_metadata is an internal UDF to
* add a row to pg_dist_shard, but without the needs_separate_node
* parameter.
*/
Datum
citus_internal_add_shard_metadata_legacy(PG_FUNCTION_ARGS)
{
bool expectNeedsSeparateNode = false;
return citus_internal_add_shard_metadata_internal(fcinfo, expectNeedsSeparateNode);
}
/*
* citus_internal_add_shard_metadata_internal is a helper function for
* citus_internal_add_shard_metadata and citus_internal_add_shard_metadata_legacy
* functions.
*/
static Datum
citus_internal_add_shard_metadata_internal(PG_FUNCTION_ARGS,
bool expectNeedsSeparateNode)
{
CheckCitusVersion(ERROR);
@ -3277,12 +3247,8 @@ citus_internal_add_shard_metadata_internal(PG_FUNCTION_ARGS,
shardMaxValue = PG_GETARG_TEXT_P(4);
}
bool needsSeparateNode = false;
if (expectNeedsSeparateNode)
{
PG_ENSURE_ARGNOTNULL(5, "needs separate node");
needsSeparateNode = PG_GETARG_BOOL(5);
}
PG_ENSURE_ARGNOTNULL(5, "needs separate node");
bool needsSeparateNode = PG_GETARG_BOOL(5);
/* only owner of the table (or superuser) is allowed to add the Citus metadata */
EnsureTableOwner(relationId);

View File

@ -115,7 +115,7 @@ static void AppendShardIdNameValues(StringInfo selectQuery, ShardInterval *shard
static HeapTuple CreateDiskSpaceTuple(TupleDesc tupleDesc, uint64 availableBytes,
uint64 totalBytes);
static bool GetLocalDiskSpaceStats(uint64 *availableBytes, uint64 *totalBytes);
static void citus_shard_property_set_anti_affinity(uint64 shardId, bool enabled);
static void CitusShardPropertySetAntiAffinity(uint64 shardId, bool enabled);
static void ShardGroupSetNeedsSeparateNodeGlobally(uint64 shardId, bool enabled);
static void ShardSetNeedsSeparateNode(uint64 shardId, bool enabled);
static BackgroundTask * DeformBackgroundTaskHeapTuple(TupleDesc tupleDescriptor,
@ -373,17 +373,15 @@ citus_shard_property_set(PG_FUNCTION_ARGS)
PG_ENSURE_ARGNOTNULL(0, "shard_id");
uint64 shardId = PG_GETARG_INT64(0);
if (!ShardExists(shardId))
{
ereport(ERROR, (errmsg("shard %lu does not exist", shardId)));
}
/* RelationIdForShard() first checks whether the shard id is valid */
Oid distributedRelationId = RelationIdForShard(shardId);
List *colocatedTableList = ColocatedTableList(distributedRelationId);
colocatedTableList = SortList(colocatedTableList, CompareOids);
EnsureTableListOwner(colocatedTableList);
AcquirePlacementColocationLock(distributedRelationId, ExclusiveLock,
"set anti affinity property for a shard of");
"set a property for a shard of");
Oid colocatedTableId = InvalidOid;
foreach_oid(colocatedTableId, colocatedTableList)
@ -394,7 +392,7 @@ citus_shard_property_set(PG_FUNCTION_ARGS)
if (!PG_ARGISNULL(1))
{
bool antiAffinity = PG_GETARG_BOOL(1);
citus_shard_property_set_anti_affinity(shardId, antiAffinity);
CitusShardPropertySetAntiAffinity(shardId, antiAffinity);
}
PG_RETURN_VOID();
@ -402,19 +400,19 @@ citus_shard_property_set(PG_FUNCTION_ARGS)
/*
* citus_shard_property_set_anti_affinity is an helper function for
* CitusShardPropertySetAntiAffinity is an helper function for
* citus_shard_property_set UDF to set anti_affinity property for given
* shard.
*/
static void
citus_shard_property_set_anti_affinity(uint64 shardId, bool enabled)
CitusShardPropertySetAntiAffinity(uint64 shardId, bool enabled)
{
Oid distributedRelationId = RelationIdForShard(shardId);
if (!IsCitusTableType(distributedRelationId, HASH_DISTRIBUTED) &&
!IsCitusTableType(distributedRelationId, SINGLE_SHARD_DISTRIBUTED))
{
ereport(ERROR, (errmsg("shard isolation is only supported for hash "
"distributed tables")));
ereport(ERROR, (errmsg("setting anti-affinity property is only "
"supported for hash distributed tables")));
}
ShardGroupSetNeedsSeparateNodeGlobally(shardId, enabled);
@ -1502,18 +1500,17 @@ ShardLength(uint64 shardId)
/*
* NodeGroupGetSeparatedShardPlacementGroup returns the shard placement group
* NodeGroupGetSeparatedShardgroupPlacement returns the shard group placement
* that given node group is used to separate from others. Returns NULL if this
* node is not used to separate a shard placement group.
* node is not used to separate a shard group placement.
*/
ShardPlacementGroup *
NodeGroupGetSeparatedShardPlacementGroup(int32 groupId)
ShardgroupPlacement *
NodeGroupGetSeparatedShardgroupPlacement(int32 groupId)
{
ShardPlacementGroup *nodeShardPlacementGroup = NULL;
bool shardPlacementGroupNeedsSeparateNode = false;
ShardgroupPlacement *nodeShardgroupPlacement = NULL;
bool shardgroupPlacementNeedsSeparateNode = false;
bool indexOK = true;
int scanKeyCount = 1;
ScanKeyData scanKey[1];
Relation pgDistPlacement = table_open(DistPlacementRelationId(),
@ -1524,10 +1521,10 @@ NodeGroupGetSeparatedShardPlacementGroup(int32 groupId)
SysScanDesc scanDescriptor = systable_beginscan(pgDistPlacement,
DistPlacementGroupidIndexId(),
indexOK,
NULL, scanKeyCount, scanKey);
NULL, lengthof(scanKey), scanKey);
HeapTuple heapTuple = systable_getnext(scanDescriptor);
while (HeapTupleIsValid(heapTuple))
HeapTuple heapTuple = NULL;
while (HeapTupleIsValid(heapTuple = systable_getnext(scanDescriptor)))
{
TupleDesc tupleDescriptor = RelationGetDescr(pgDistPlacement);
@ -1538,46 +1535,49 @@ NodeGroupGetSeparatedShardPlacementGroup(int32 groupId)
Oid citusTableId = shardInterval->relationId;
if (!IsCitusTableType(citusTableId, DISTRIBUTED_TABLE))
{
heapTuple = systable_getnext(scanDescriptor);
continue;
}
ShardPlacementGroup *shardPlacementGroup =
GetShardPlacementGroupForPlacement(placement->shardId,
ShardgroupPlacement *shardgroupPlacement =
GetShardgroupPlacementForPlacement(placement->shardId,
placement->placementId);
if (nodeShardPlacementGroup &&
!ShardPlacementGroupsSame(shardPlacementGroup,
nodeShardPlacementGroup))
if (nodeShardgroupPlacement &&
!ShardgroupPlacementsSame(shardgroupPlacement,
nodeShardgroupPlacement))
{
nodeShardPlacementGroup = NULL;
/*
* If we have more than one shardgroup placement on the node,
* then this means that the node is not actually used to separate
* a shardgroup placement.
*/
nodeShardgroupPlacement = NULL;
shardgroupPlacementNeedsSeparateNode = false;
break;
}
nodeShardPlacementGroup = shardPlacementGroup;
shardPlacementGroupNeedsSeparateNode = shardInterval->needsSeparateNode;
heapTuple = systable_getnext(scanDescriptor);
nodeShardgroupPlacement = shardgroupPlacement;
shardgroupPlacementNeedsSeparateNode = shardInterval->needsSeparateNode;
}
systable_endscan(scanDescriptor);
table_close(pgDistPlacement, NoLock);
if (!shardPlacementGroupNeedsSeparateNode)
if (!shardgroupPlacementNeedsSeparateNode)
{
return NULL;
}
return nodeShardPlacementGroup;
return nodeShardgroupPlacement;
}
/*
* ShardPlacementGroupsSame returns true if two shard placement groups are the same.
* ShardgroupPlacementsSame returns true if two shardgroup placements are the same.
*/
bool
ShardPlacementGroupsSame(const ShardPlacementGroup *leftGroup,
const ShardPlacementGroup *rightGroup)
ShardgroupPlacementsSame(const ShardgroupPlacement *leftGroup,
const ShardgroupPlacement *rightGroup)
{
return leftGroup->colocatationId == rightGroup->colocatationId &&
leftGroup->shardIntervalIndex == rightGroup->shardIntervalIndex &&
@ -1618,16 +1618,61 @@ NodeGroupHasShardPlacements(int32 groupId)
/*
* GetShardPlacementGroupForPlacement returns ShardPlacementGroup that placement
* NodeGroupHasDistributedTableShardPlacements returns whether any active
* distributed table shards are placed on the group
*/
bool
NodeGroupHasDistributedTableShardPlacements(int32 groupId)
{
bool nodeGroupHasDistributedTableShardPlacements = false;
Relation pgPlacement = table_open(DistPlacementRelationId(), AccessShareLock);
ScanKeyData scanKey[1];
ScanKeyInit(&scanKey[0], Anum_pg_dist_placement_groupid,
BTEqualStrategyNumber, F_INT4EQ, Int32GetDatum(groupId));
bool indexOK = true;
SysScanDesc scanDescriptor = systable_beginscan(pgPlacement,
DistPlacementGroupidIndexId(),
indexOK,
NULL, lengthof(scanKey), scanKey);
HeapTuple heapTuple = NULL;
while (HeapTupleIsValid(heapTuple = systable_getnext(scanDescriptor)))
{
TupleDesc tupleDescriptor = RelationGetDescr(pgPlacement);
GroupShardPlacement *placement =
TupleToGroupShardPlacement(tupleDescriptor, heapTuple);
ShardInterval *shardInterval = LoadShardInterval(placement->shardId);
Oid citusTableId = shardInterval->relationId;
if (IsCitusTableType(citusTableId, DISTRIBUTED_TABLE))
{
nodeGroupHasDistributedTableShardPlacements = true;
break;
}
}
systable_endscan(scanDescriptor);
table_close(pgPlacement, NoLock);
return nodeGroupHasDistributedTableShardPlacements;
}
/*
* GetShardgroupPlacementForPlacement returns ShardgroupPlacement that placement
* with given shardId & placementId belongs to.
*/
ShardPlacementGroup *
GetShardPlacementGroupForPlacement(uint64 shardId, uint64 placementId)
ShardgroupPlacement *
GetShardgroupPlacementForPlacement(uint64 shardId, uint64 placementId)
{
ShardPlacement *shardPlacement = LoadShardPlacement(shardId, placementId);
ShardInterval *shardInterval = LoadShardInterval(shardId);
ShardPlacementGroup *placementGroup = palloc(sizeof(ShardPlacementGroup));
ShardgroupPlacement *placementGroup = palloc(sizeof(ShardgroupPlacement));
placementGroup->colocatationId = shardPlacement->colocationGroupId;
placementGroup->shardIntervalIndex = shardInterval->shardIndex;
placementGroup->nodeGroupId = shardPlacement->groupId;

View File

@ -1,6 +1,6 @@
/*-------------------------------------------------------------------------
*
* rebalancer_placement_isolation.c
* rebalancer_placement_separation.c
* Routines to determine which worker node should be used to separate
* a colocated set of shard placements that need separate nodes.
*
@ -10,6 +10,7 @@
*/
#include "postgres.h"
#include "nodes/pg_list.h"
#include "utils/hsearch.h"
#include "utils/lsyscache.h"
@ -20,12 +21,17 @@
#include "distributed/metadata_cache.h"
#include "distributed/metadata_utility.h"
#include "distributed/multi_physical_planner.h"
#include "distributed/rebalancer_placement_isolation.h"
#include "distributed/rebalancer_placement_separation.h"
#include "distributed/shard_rebalancer.h"
struct RebalancerPlacementIsolationContext
struct RebalancerPlacementSeparationContext
{
/*
* Hash table where each entry is of the form NodeToPlacementGroupHashEntry,
* meaning that each entry maps the node with nodeGroupId to
* a NodeToPlacementGroupHashEntry.
*/
HTAB *nodePlacementGroupHash;
};
@ -35,7 +41,7 @@ struct RebalancerPlacementIsolationContext
* placement group that is determined to be separated from other shards in
* the cluster via that node.
*/
typedef struct
typedef struct NodeToPlacementGroupHashEntry
{
/* hash key -- group id of the node */
int32 nodeGroupId;
@ -49,39 +55,37 @@ typedef struct
bool shouldHaveShards;
/*
* Whether given node is allowed to separate any shard placement groups.
* Whether given node is allowed to separate any shardgroup placements.
*/
bool allowedToSeparateAnyPlacementGroup;
/*
* Shard placement group that is assigned to this node to be separated
* Shardgroup placement that is assigned to this node to be separated
* from others in the cluster.
*
* NULL if no shard placement group is assigned yet.
* NULL if no shardgroup placement is assigned yet.
*/
ShardPlacementGroup *assignedPlacementGroup;
ShardgroupPlacement *assignedPlacementGroup;
} NodeToPlacementGroupHashEntry;
/*
* Routines to prepare a hash table where each entry is of type
* NodeToPlacementGroupHashEntry.
* Routines to prepare RebalancerPlacementSeparationContext.
*/
static void NodeToPlacementGroupHashInit(HTAB *nodePlacementGroupHash,
List *activeWorkerNodeList,
List *rebalancePlacementList,
WorkerNode *drainWorkerNode);
static void NodeToPlacementGroupHashAssignNodes(HTAB *nodePlacementGroupHash,
List *activeWorkerNodeList,
List *rebalancePlacementList,
FmgrInfo *shardAllowedOnNodeUDF);
static bool NodeToPlacementGroupHashAssignNode(HTAB *nodePlacementGroupHash,
int32 nodeGroupId,
ShardPlacement *shardPlacement,
FmgrInfo *shardAllowedOnNodeUDF);
static NodeToPlacementGroupHashEntry * NodeToPlacementGroupHashGetNodeWithGroupId(
HTAB *nodePlacementGroupHash,
int32
nodeGroupId);
static void InitRebalancerPlacementSeparationContext(
RebalancerPlacementSeparationContext *context,
List *activeWorkerNodeList,
List *rebalancePlacementList,
WorkerNode *drainWorkerNode);
static void TryAssignPlacementGroupsToNodeGroups(
RebalancerPlacementSeparationContext *context,
List *activeWorkerNodeList,
List *rebalancePlacementList,
FmgrInfo *shardAllowedOnNodeUDF);
static bool TryAssignPlacementGroupToNodeGroup(
RebalancerPlacementSeparationContext *context,
int32 candidateNodeGroupId,
ShardPlacement *shardPlacement,
FmgrInfo *shardAllowedOnNodeUDF);
/* other helpers */
@ -90,49 +94,54 @@ static int WorkerNodeListGetNodeWithGroupId(List *workerNodeList, int32 nodeGrou
/*
* PrepareRebalancerPlacementIsolationContext creates RebalancerPlacementIsolationContext
* that keeps track of which worker nodes are used to separate which shard placement groups
* PrepareRebalancerPlacementSeparationContext creates RebalancerPlacementSeparationContext
* that keeps track of which worker nodes are used to separate which shardgroup placements
* that need separate nodes.
*/
RebalancerPlacementIsolationContext *
PrepareRebalancerPlacementIsolationContext(List *activeWorkerNodeList,
List *rebalancePlacementList,
WorkerNode *drainWorkerNode,
FmgrInfo *shardAllowedOnNodeUDF)
RebalancerPlacementSeparationContext *
PrepareRebalancerPlacementSeparationContext(List *activeWorkerNodeList,
List *rebalancePlacementList,
WorkerNode *drainWorkerNode,
FmgrInfo *shardAllowedOnNodeUDF)
{
HTAB *nodePlacementGroupHash =
CreateSimpleHashWithNameAndSize(uint32, NodeToPlacementGroupHashEntry,
"NodeToPlacementGroupHash",
list_length(activeWorkerNodeList));
RebalancerPlacementSeparationContext *context =
palloc(sizeof(RebalancerPlacementSeparationContext));
context->nodePlacementGroupHash = nodePlacementGroupHash;
activeWorkerNodeList = SortList(activeWorkerNodeList, CompareWorkerNodes);
rebalancePlacementList = SortList(rebalancePlacementList, CompareShardPlacements);
NodeToPlacementGroupHashInit(nodePlacementGroupHash, activeWorkerNodeList,
rebalancePlacementList, drainWorkerNode);
InitRebalancerPlacementSeparationContext(context, activeWorkerNodeList,
rebalancePlacementList, drainWorkerNode);
NodeToPlacementGroupHashAssignNodes(nodePlacementGroupHash,
activeWorkerNodeList,
rebalancePlacementList,
shardAllowedOnNodeUDF);
RebalancerPlacementIsolationContext *context =
palloc(sizeof(RebalancerPlacementIsolationContext));
context->nodePlacementGroupHash = nodePlacementGroupHash;
TryAssignPlacementGroupsToNodeGroups(context,
activeWorkerNodeList,
rebalancePlacementList,
shardAllowedOnNodeUDF);
return context;
}
/*
* NodeToPlacementGroupHashInit initializes given hash table where each
* entry is of type NodeToPlacementGroupHashEntry by using given list
* of worker nodes and the worker node that is being drained, if specified.
* InitRebalancerPlacementSeparationContext initializes given
* RebalancerPlacementSeparationContext by using given list
* of worker nodes and the worker node that is being drained,
* if specified.
*/
static void
NodeToPlacementGroupHashInit(HTAB *nodePlacementGroupHash, List *activeWorkerNodeList,
List *rebalancePlacementList, WorkerNode *drainWorkerNode)
InitRebalancerPlacementSeparationContext(RebalancerPlacementSeparationContext *context,
List *activeWorkerNodeList,
List *rebalancePlacementList,
WorkerNode *drainWorkerNode)
{
HTAB *nodePlacementGroupHash = context->nodePlacementGroupHash;
List *placementListUniqueNodeGroupIds =
PlacementListGetUniqueNodeGroupIds(rebalancePlacementList);
@ -143,8 +152,6 @@ NodeToPlacementGroupHashInit(HTAB *nodePlacementGroupHash, List *activeWorkerNod
hash_search(nodePlacementGroupHash, &workerNode->groupId, HASH_ENTER,
NULL);
nodePlacementGroupHashEntry->nodeGroupId = workerNode->groupId;
bool shouldHaveShards = workerNode->shouldHaveShards;
if (drainWorkerNode && drainWorkerNode->groupId == workerNode->groupId)
{
@ -183,59 +190,49 @@ NodeToPlacementGroupHashInit(HTAB *nodePlacementGroupHash, List *activeWorkerNod
if (!shouldHaveShards)
{
/* we can't assing any shard placement groups to the node anyway */
/* we can't assing any shardgroup placements to the node anyway */
continue;
}
if (list_length(placementListUniqueNodeGroupIds) == list_length(
activeWorkerNodeList))
{
/*
* list_member_oid() check would return true for all placements then.
* This means that all the nodes are of type D.
*/
Assert(list_member_oid(placementListUniqueNodeGroupIds, workerNode->groupId));
continue;
}
if (list_member_oid(placementListUniqueNodeGroupIds, workerNode->groupId))
if (list_member_int(placementListUniqueNodeGroupIds, workerNode->groupId))
{
/* node is of type D */
continue;
}
ShardPlacementGroup *separatedShardPlacementGroup =
NodeGroupGetSeparatedShardPlacementGroup(
ShardgroupPlacement *separatedShardgroupPlacement =
NodeGroupGetSeparatedShardgroupPlacement(
nodePlacementGroupHashEntry->nodeGroupId);
if (separatedShardPlacementGroup)
if (separatedShardgroupPlacement)
{
nodePlacementGroupHashEntry->assignedPlacementGroup =
separatedShardPlacementGroup;
separatedShardgroupPlacement;
}
else
{
nodePlacementGroupHashEntry->allowedToSeparateAnyPlacementGroup =
!NodeGroupHasShardPlacements(nodePlacementGroupHashEntry->nodeGroupId);
!NodeGroupHasDistributedTableShardPlacements(
nodePlacementGroupHashEntry->nodeGroupId);
}
}
}
/*
* NodeToPlacementGroupHashAssignNodes assigns all active shard placements in
* the cluster that need separate nodes to individual worker nodes.
* TryAssignPlacementGroupsToNodeGroups tries to assign placements that need
* separate nodes within given placement list to individual worker nodes.
*/
static void
NodeToPlacementGroupHashAssignNodes(HTAB *nodePlacementGroupHash,
List *activeWorkerNodeList,
List *rebalancePlacementList,
FmgrInfo *shardAllowedOnNodeUDF)
TryAssignPlacementGroupsToNodeGroups(RebalancerPlacementSeparationContext *context,
List *activeWorkerNodeList,
List *rebalancePlacementList,
FmgrInfo *shardAllowedOnNodeUDF)
{
List *availableWorkerList = list_copy(activeWorkerNodeList);
List *unassignedPlacementList = NIL;
/*
* Assign as much as possible shard placement groups to worker nodes where
* Assign as much as possible shardgroup placements to worker nodes where
* they are stored already.
*/
ShardPlacement *shardPlacement = NULL;
@ -247,20 +244,20 @@ NodeToPlacementGroupHashAssignNodes(HTAB *nodePlacementGroupHash,
continue;
}
int32 shardPlacementGroupId = shardPlacement->groupId;
if (NodeToPlacementGroupHashAssignNode(nodePlacementGroupHash,
shardPlacementGroupId,
int32 currentNodeGroupId = shardPlacement->groupId;
if (TryAssignPlacementGroupToNodeGroup(context,
currentNodeGroupId,
shardPlacement,
shardAllowedOnNodeUDF))
{
/*
* NodeToPlacementGroupHashAssignNode() succeeds for each worker node
* TryAssignPlacementGroupToNodeGroup() succeeds for each worker node
* once, hence we must not have removed the worker node from the list
* yet, and WorkerNodeListGetNodeWithGroupId() ensures that already.
*/
int currentPlacementNodeIdx =
WorkerNodeListGetNodeWithGroupId(availableWorkerList,
shardPlacementGroupId);
currentNodeGroupId);
availableWorkerList = list_delete_nth_cell(availableWorkerList,
currentPlacementNodeIdx);
}
@ -274,7 +271,7 @@ NodeToPlacementGroupHashAssignNodes(HTAB *nodePlacementGroupHash,
bool emitWarning = false;
/*
* For the shard placement groups that could not be assigned to their
* For the shardgroup placements that could not be assigned to their
* current node, assign them to any other node that is available.
*/
ShardPlacement *unassignedShardPlacement = NULL;
@ -285,7 +282,7 @@ NodeToPlacementGroupHashAssignNodes(HTAB *nodePlacementGroupHash,
WorkerNode *availableWorkerNode = NULL;
foreach_ptr(availableWorkerNode, availableWorkerList)
{
if (NodeToPlacementGroupHashAssignNode(nodePlacementGroupHash,
if (TryAssignPlacementGroupToNodeGroup(context,
availableWorkerNode->groupId,
unassignedShardPlacement,
shardAllowedOnNodeUDF))
@ -310,24 +307,32 @@ NodeToPlacementGroupHashAssignNodes(HTAB *nodePlacementGroupHash,
/*
* NodeToPlacementGroupHashAssignNode is an helper to
* NodeToPlacementGroupHashAssignNodes that tries to assign given
* TryAssignPlacementGroupToNodeGroup is an helper to
* TryAssignPlacementGroupsToNodeGroups that tries to assign given
* shard placement to given node and returns true if it succeeds.
*/
static bool
NodeToPlacementGroupHashAssignNode(HTAB *nodePlacementGroupHash,
int32 nodeGroupId,
TryAssignPlacementGroupToNodeGroup(RebalancerPlacementSeparationContext *context,
int32 candidateNodeGroupId,
ShardPlacement *shardPlacement,
FmgrInfo *shardAllowedOnNodeUDF)
{
HTAB *nodePlacementGroupHash = context->nodePlacementGroupHash;
bool found = false;
NodeToPlacementGroupHashEntry *nodePlacementGroupHashEntry =
NodeToPlacementGroupHashGetNodeWithGroupId(nodePlacementGroupHash, nodeGroupId);
hash_search(nodePlacementGroupHash, &candidateNodeGroupId, HASH_FIND, &found);
if (!found)
{
ereport(ERROR, (errmsg("no such node is found")));
}
if (nodePlacementGroupHashEntry->assignedPlacementGroup)
{
/*
* Right now callers of this function call it once for each distinct
* shard placement group, hence we assume that shard placement group
* shardgroup placement, hence we assume that shardgroup placement
* that given shard placement belongs to and
* nodePlacementGroupHashEntry->assignedPlacementGroup cannot be the
* same, without checking.
@ -345,7 +350,7 @@ NodeToPlacementGroupHashAssignNode(HTAB *nodePlacementGroupHash,
return false;
}
WorkerNode *workerNode = PrimaryNodeForGroup(nodeGroupId, NULL);
WorkerNode *workerNode = PrimaryNodeForGroup(candidateNodeGroupId, NULL);
Datum allowed = FunctionCall2(shardAllowedOnNodeUDF, shardPlacement->shardId,
workerNode->nodeId);
if (!DatumGetBool(allowed))
@ -354,7 +359,7 @@ NodeToPlacementGroupHashAssignNode(HTAB *nodePlacementGroupHash,
}
nodePlacementGroupHashEntry->assignedPlacementGroup =
GetShardPlacementGroupForPlacement(shardPlacement->shardId,
GetShardgroupPlacementForPlacement(shardPlacement->shardId,
shardPlacement->placementId);
return true;
@ -362,28 +367,34 @@ NodeToPlacementGroupHashAssignNode(HTAB *nodePlacementGroupHash,
/*
* RebalancerPlacementIsolationContextPlacementIsAllowedOnWorker returns true
* RebalancerPlacementSeparationContextPlacementIsAllowedOnWorker returns true
* if shard placement with given shardId & placementId is allowed to be stored
* on given worker node.
*/
bool
RebalancerPlacementIsolationContextPlacementIsAllowedOnWorker(
RebalancerPlacementIsolationContext *context,
RebalancerPlacementSeparationContextPlacementIsAllowedOnWorker(
RebalancerPlacementSeparationContext *context,
uint64 shardId,
uint64 placementId,
WorkerNode *workerNode)
{
HTAB *nodePlacementGroupHash = context->nodePlacementGroupHash;
bool found = false;
NodeToPlacementGroupHashEntry *nodePlacementGroupHashEntry =
NodeToPlacementGroupHashGetNodeWithGroupId(nodePlacementGroupHash,
workerNode->groupId);
hash_search(nodePlacementGroupHash, &(workerNode->groupId), HASH_FIND, &found);
if (!found)
{
ereport(ERROR, (errmsg("no such node is found")));
}
ShardInterval *shardInterval = LoadShardInterval(shardId);
if (!shardInterval->needsSeparateNode)
{
/*
* It doesn't need a separate node, but is the node used to separate
* a shard placement group? If so, we cannot store it on this node.
* a shardgroup placement? If so, we cannot store it on this node.
*/
return nodePlacementGroupHashEntry->shouldHaveShards &&
nodePlacementGroupHashEntry->assignedPlacementGroup == NULL;
@ -399,35 +410,13 @@ RebalancerPlacementIsolationContextPlacementIsAllowedOnWorker(
return false;
}
ShardPlacementGroup *placementGroup =
GetShardPlacementGroupForPlacement(shardId, placementId);
return ShardPlacementGroupsSame(nodePlacementGroupHashEntry->assignedPlacementGroup,
ShardgroupPlacement *placementGroup =
GetShardgroupPlacementForPlacement(shardId, placementId);
return ShardgroupPlacementsSame(nodePlacementGroupHashEntry->assignedPlacementGroup,
placementGroup);
}
/*
* NodeToPlacementGroupHashGetNodeWithGroupId searches given hash table for
* NodeToPlacementGroupHashEntry with given node id and returns it.
*
* Throws an error if no such entry is found.
*/
static NodeToPlacementGroupHashEntry *
NodeToPlacementGroupHashGetNodeWithGroupId(HTAB *nodePlacementGroupHash,
int32 nodeGroupId)
{
NodeToPlacementGroupHashEntry *nodePlacementGroupHashEntry =
hash_search(nodePlacementGroupHash, &nodeGroupId, HASH_FIND, NULL);
if (nodePlacementGroupHashEntry == NULL)
{
ereport(ERROR, (errmsg("no such node is found")));
}
return nodePlacementGroupHashEntry;
}
/*
* PlacementListGetUniqueNodeGroupIds returns a list of unique node group ids
* that are used by given list of shard placements.
@ -441,7 +430,7 @@ PlacementListGetUniqueNodeGroupIds(List *placementList)
foreach_ptr(shardPlacement, placementList)
{
placementListUniqueNodeGroupIds =
list_append_unique_oid(placementListUniqueNodeGroupIds,
list_append_unique_int(placementListUniqueNodeGroupIds,
shardPlacement->groupId);
}

View File

@ -44,7 +44,7 @@
#include "distributed/pg_dist_rebalance_strategy.h"
#include "distributed/reference_table_utils.h"
#include "distributed/remote_commands.h"
#include "distributed/rebalancer_placement_isolation.h"
#include "distributed/rebalancer_placement_separation.h"
#include "distributed/resource_lock.h"
#include "distributed/shard_rebalancer.h"
#include "distributed/shard_cleaner.h"
@ -147,7 +147,7 @@ typedef struct RebalanceContext
FmgrInfo nodeCapacityUDF;
FmgrInfo shardAllowedOnNodeUDF;
RebalancerPlacementIsolationContext *rebalancerPlacementGroupIsolationContext;
RebalancerPlacementSeparationContext *shardgroupPlacementSeparationContext;
} RebalanceContext;
/* WorkerHashKey contains hostname and port to be used as a key in a hash */
@ -594,8 +594,8 @@ GetRebalanceSteps(RebalanceOptions *options)
options->threshold = options->rebalanceStrategy->minimumThreshold;
}
context.rebalancerPlacementGroupIsolationContext =
PrepareRebalancerPlacementIsolationContext(
context.shardgroupPlacementSeparationContext =
PrepareRebalancerPlacementSeparationContext(
activeWorkerList,
FlattenNestedList(activeShardPlacementListList),
options->workerNode,
@ -625,8 +625,8 @@ ShardAllowedOnNode(uint64 shardId, uint64 placementId, WorkerNode *workerNode,
RebalanceContext *context = voidContext;
if (!RebalancerPlacementIsolationContextPlacementIsAllowedOnWorker(
context->rebalancerPlacementGroupIsolationContext,
if (!RebalancerPlacementSeparationContextPlacementIsAllowedOnWorker(
context->shardgroupPlacementSeparationContext,
shardId, placementId, workerNode))
{
return false;
@ -3190,7 +3190,7 @@ ReplicationPlacementUpdates(List *workerNodeList, List *activeShardPlacementList
{
WorkerNode *workerNode = list_nth(workerNodeList, workerNodeIndex);
if (!NodeCanBeUsedForNonIsolatedPlacements(workerNode))
if (!NodeCanBeUsedForNonSeparatedPlacements(workerNode))
{
/* never replicate placements to nodes that should not have placements */
continue;

View File

@ -273,22 +273,22 @@ ErrorIfCoordinatorNotAddedAsWorkerNode()
/*
* NewDistributedTablePlacementNodeList returns a list of all active, primary
* worker nodes that can store new data, i.e shouldstoreshards is 'true'
* and that is not used to isolate a shard placement group.
* and that is not used to isolate a shardgroup placement.
*/
List *
NewDistributedTablePlacementNodeList(LOCKMODE lockMode)
{
EnsureModificationsCanRun();
return FilterActiveNodeListFunc(lockMode, NodeCanBeUsedForNonIsolatedPlacements);
return FilterActiveNodeListFunc(lockMode, NodeCanBeUsedForNonSeparatedPlacements);
}
/*
* NodeCanBeUsedForNonIsolatedPlacements returns true if given node can be
* NodeCanBeUsedForNonSeparatedPlacements returns true if given node can be
* used to store shard placements that don't need separate nodes.
*/
bool
NodeCanBeUsedForNonIsolatedPlacements(WorkerNode *node)
NodeCanBeUsedForNonSeparatedPlacements(WorkerNode *node)
{
if (!NodeIsPrimary(node))
{
@ -300,7 +300,7 @@ NodeCanBeUsedForNonIsolatedPlacements(WorkerNode *node)
return false;
}
return NodeGroupGetSeparatedShardPlacementGroup(node->groupId) == NULL;
return NodeGroupGetSeparatedShardgroupPlacement(node->groupId) == NULL;
}

View File

@ -6,7 +6,9 @@
ALTER TABLE pg_dist_shard ADD COLUMN needsseparatenode boolean NOT NULL DEFAULT false;
DROP FUNCTION pg_catalog.citus_internal_add_shard_metadata(regclass, bigint, "char", text, text);
#include "udfs/citus_internal_add_shard_metadata/12.2-1.sql"
#include "udfs/citus_internal_shard_group_set_needsseparatenode/12.2-1.sql"
#include "udfs/citus_shard_property_set/12.2-1.sql"

View File

@ -2,22 +2,11 @@
CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_add_shard_metadata(
relation_id regclass, shard_id bigint,
storage_type "char", shard_min_value text,
shard_max_value text, needs_separate_node boolean
shard_max_value text,
needs_separate_node boolean default false
)
RETURNS void
LANGUAGE C
AS 'MODULE_PATHNAME', $$citus_internal_add_shard_metadata$$;
COMMENT ON FUNCTION pg_catalog.citus_internal_add_shard_metadata(regclass, bigint, "char", text, text, boolean) IS
'Inserts into pg_dist_shard with user checks';
-- replace the old one so it would call the old C function without needs_separate_node
CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_add_shard_metadata(
relation_id regclass, shard_id bigint,
storage_type "char", shard_min_value text,
shard_max_value text
)
RETURNS void
LANGUAGE C
AS 'MODULE_PATHNAME', $$citus_internal_add_shard_metadata_legacy$$;
COMMENT ON FUNCTION pg_catalog.citus_internal_add_shard_metadata(regclass, bigint, "char", text, text) IS
'Inserts into pg_dist_shard with user checks';

View File

@ -2,22 +2,11 @@
CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_add_shard_metadata(
relation_id regclass, shard_id bigint,
storage_type "char", shard_min_value text,
shard_max_value text, needs_separate_node boolean
shard_max_value text,
needs_separate_node boolean default false
)
RETURNS void
LANGUAGE C
AS 'MODULE_PATHNAME', $$citus_internal_add_shard_metadata$$;
COMMENT ON FUNCTION pg_catalog.citus_internal_add_shard_metadata(regclass, bigint, "char", text, text, boolean) IS
'Inserts into pg_dist_shard with user checks';
-- replace the old one so it would call the old C function without needs_separate_node
CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_add_shard_metadata(
relation_id regclass, shard_id bigint,
storage_type "char", shard_min_value text,
shard_max_value text
)
RETURNS void
LANGUAGE C
AS 'MODULE_PATHNAME', $$citus_internal_add_shard_metadata_legacy$$;
COMMENT ON FUNCTION pg_catalog.citus_internal_add_shard_metadata(regclass, bigint, "char", text, text) IS
'Inserts into pg_dist_shard with user checks';

View File

@ -109,7 +109,7 @@ typedef struct
uint32 colocatationId;
int shardIntervalIndex;
int32 nodeGroupId;
} ShardPlacementGroup;
} ShardgroupPlacement;
typedef enum CascadeToColocatedOption
@ -331,11 +331,12 @@ extern int ShardIntervalCount(Oid relationId);
extern List * LoadShardList(Oid relationId);
extern ShardInterval * CopyShardInterval(ShardInterval *srcInterval);
extern uint64 ShardLength(uint64 shardId);
extern ShardPlacementGroup * NodeGroupGetSeparatedShardPlacementGroup(int32 groupId);
extern bool ShardPlacementGroupsSame(const ShardPlacementGroup *leftGroup,
const ShardPlacementGroup *rightGroup);
extern ShardgroupPlacement * NodeGroupGetSeparatedShardgroupPlacement(int32 groupId);
extern bool ShardgroupPlacementsSame(const ShardgroupPlacement *leftGroup,
const ShardgroupPlacement *rightGroup);
extern bool NodeGroupHasShardPlacements(int32 groupId);
extern ShardPlacementGroup * GetShardPlacementGroupForPlacement(uint64 shardId,
extern bool NodeGroupHasDistributedTableShardPlacements(int32 groupId);
extern ShardgroupPlacement * GetShardgroupPlacementForPlacement(uint64 shardId,
uint64 placementId);
extern bool IsActiveShardPlacement(ShardPlacement *ShardPlacement);
extern bool IsRemoteShardPlacement(ShardPlacement *shardPlacement);

View File

@ -1,6 +1,6 @@
/*-------------------------------------------------------------------------
*
* rebalancer_placement_isolation.h
* rebalancer_placement_separation.h
* Routines to determine which worker node should be used to separate
* a colocated set of shard placements that need separate nodes.
*
@ -18,10 +18,10 @@
#include "distributed/metadata_utility.h"
struct RebalancerPlacementIsolationContext;
typedef struct RebalancerPlacementIsolationContext RebalancerPlacementIsolationContext;
struct RebalancerPlacementSeparationContext;
typedef struct RebalancerPlacementSeparationContext RebalancerPlacementSeparationContext;
extern RebalancerPlacementIsolationContext * PrepareRebalancerPlacementIsolationContext(
extern RebalancerPlacementSeparationContext * PrepareRebalancerPlacementSeparationContext(
List *activeWorkerNodeList,
List
*
@ -32,8 +32,8 @@ extern RebalancerPlacementIsolationContext * PrepareRebalancerPlacementIsolation
FmgrInfo
*
shardAllowedOnNodeUDF);
extern bool RebalancerPlacementIsolationContextPlacementIsAllowedOnWorker(
RebalancerPlacementIsolationContext *context,
extern bool RebalancerPlacementSeparationContextPlacementIsAllowedOnWorker(
RebalancerPlacementSeparationContext *context,
uint64 shardId,
uint64
placementId,

View File

@ -78,7 +78,7 @@ extern List * ReferenceTablePlacementNodeList(LOCKMODE lockMode);
extern WorkerNode * CoordinatorNodeIfAddedAsWorkerOrError(void);
extern void ErrorIfCoordinatorNotAddedAsWorkerNode(void);
extern List * NewDistributedTablePlacementNodeList(LOCKMODE lockMode);
extern bool NodeCanBeUsedForNonIsolatedPlacements(WorkerNode *node);
extern bool NodeCanBeUsedForNonSeparatedPlacements(WorkerNode *node);
extern List * ActiveReadableNonCoordinatorNodeList(void);
extern List * ActiveReadableNodeList(void);
extern WorkerNode * FindWorkerNode(const char *nodeName, int32 nodePort);

View File

@ -812,14 +812,14 @@ ORDER BY result;
SELECT citus_shard_property_set(NULL, anti_affinity=>true);
ERROR: shard_id cannot be NULL
SELECT citus_shard_property_set(0, anti_affinity=>true);
ERROR: shard xxxxx does not exist
ERROR: could not find valid entry for shard xxxxx
SELECT citus_shard_property_set(NULL, anti_affinity=>false);
ERROR: shard_id cannot be NULL
SELECT citus_shard_property_set(0, anti_affinity=>false);
ERROR: shard xxxxx does not exist
ERROR: could not find valid entry for shard xxxxx
-- we verify whether shard exists even if anti_affinity is not provided
SELECT citus_shard_property_set(0, anti_affinity=>NULL);
ERROR: shard xxxxx does not exist
ERROR: could not find valid entry for shard xxxxx
CREATE TABLE append_table (a int, b int);
SELECT create_distributed_table('append_table', 'a', 'append');
create_distributed_table
@ -858,21 +858,21 @@ SELECT citus_add_local_table_to_metadata('local_table');
-- all should fail
SELECT citus_shard_property_set(shardid, anti_affinity=>true) FROM pg_dist_shard WHERE logicalrelid = 'append_table'::regclass LIMIT 1;
ERROR: shard isolation is only supported for hash distributed tables
ERROR: setting anti-affinity property is only supported for hash distributed tables
SELECT citus_shard_property_set(shardid, anti_affinity=>true) FROM pg_dist_shard WHERE logicalrelid = 'range_table'::regclass LIMIT 1;
ERROR: shard isolation is only supported for hash distributed tables
ERROR: setting anti-affinity property is only supported for hash distributed tables
SELECT citus_shard_property_set(shardid, anti_affinity=>true) FROM pg_dist_shard WHERE logicalrelid = 'ref_table'::regclass LIMIT 1;
ERROR: shard isolation is only supported for hash distributed tables
ERROR: setting anti-affinity property is only supported for hash distributed tables
SELECT citus_shard_property_set(shardid, anti_affinity=>true) FROM pg_dist_shard WHERE logicalrelid = 'local_table'::regclass LIMIT 1;
ERROR: shard isolation is only supported for hash distributed tables
ERROR: setting anti-affinity property is only supported for hash distributed tables
SELECT citus_shard_property_set(shardid, anti_affinity=>false) FROM pg_dist_shard WHERE logicalrelid = 'append_table'::regclass LIMIT 1;
ERROR: shard isolation is only supported for hash distributed tables
ERROR: setting anti-affinity property is only supported for hash distributed tables
SELECT citus_shard_property_set(shardid, anti_affinity=>false) FROM pg_dist_shard WHERE logicalrelid = 'range_table'::regclass LIMIT 1;
ERROR: shard isolation is only supported for hash distributed tables
ERROR: setting anti-affinity property is only supported for hash distributed tables
SELECT citus_shard_property_set(shardid, anti_affinity=>false) FROM pg_dist_shard WHERE logicalrelid = 'ref_table'::regclass LIMIT 1;
ERROR: shard isolation is only supported for hash distributed tables
ERROR: setting anti-affinity property is only supported for hash distributed tables
SELECT citus_shard_property_set(shardid, anti_affinity=>false) FROM pg_dist_shard WHERE logicalrelid = 'local_table'::regclass LIMIT 1;
ERROR: shard isolation is only supported for hash distributed tables
ERROR: setting anti-affinity property is only supported for hash distributed tables
DROP TABLE range_table;
DROP TYPE composite_key_type;
SET client_min_messages TO WARNING;

View File

@ -173,7 +173,7 @@ step s1_set-shard-property:
FROM pg_dist_shard WHERE logicalrelid = 'table_to_distribute'::regclass
ORDER BY shardid LIMIT 1;
ERROR: could not acquire the lock required to set anti affinity property for a shard of public.table_to_distribute
ERROR: could not acquire the lock required to set a property for a shard of public.table_to_distribute
step s1-rollback:
ROLLBACK;