diff --git a/src/backend/distributed/operations/rebalancer_placement_isolation.c b/src/backend/distributed/operations/rebalancer_placement_isolation.c index 9cbec0a45..61ad3d4a9 100644 --- a/src/backend/distributed/operations/rebalancer_placement_isolation.c +++ b/src/backend/distributed/operations/rebalancer_placement_isolation.c @@ -79,10 +79,12 @@ static void NodePlacementGroupHashInit(HTAB *nodePlacementGroupHash, WorkerNode *drainWorkerNode); static void NodePlacementGroupHashAssignNodes(HTAB *nodePlacementGroupHash, List *workerNodeList, - List *shardPlacementList); + List *shardPlacementList, + FmgrInfo *shardAllowedOnNodeUDF); static bool NodePlacementGroupHashAssignNode(HTAB *nodePlacementGroupHash, int32 nodeGroupId, - ShardPlacement *shardPlacement); + ShardPlacement *shardPlacement, + FmgrInfo *shardAllowedOnNodeUDF); static NodePlacementGroupHashEntry * NodePlacementGroupHashGetNodeWithGroupId( HTAB *nodePlacementGroupHash, int32 @@ -101,7 +103,8 @@ static int WorkerNodeListGetNodeWithGroupId(List *workerNodeList, int32 nodeGrou RebalancerPlacementIsolationContext * PrepareRebalancerPlacementIsolationContext(List *activeWorkerNodeList, List *activeShardPlacementList, - WorkerNode *drainWorkerNode) + WorkerNode *drainWorkerNode, + FmgrInfo *shardAllowedOnNodeUDF) { HTAB *nodePlacementGroupHash = CreateSimpleHashWithNameAndSize(uint32, NodePlacementGroupHashEntry, @@ -116,7 +119,8 @@ PrepareRebalancerPlacementIsolationContext(List *activeWorkerNodeList, NodePlacementGroupHashAssignNodes(nodePlacementGroupHash, activeWorkerNodeList, - activeShardPlacementList); + activeShardPlacementList, + shardAllowedOnNodeUDF); RebalancerPlacementIsolationContext *context = palloc(sizeof(RebalancerPlacementIsolationContext)); @@ -218,7 +222,8 @@ NodePlacementGroupHashInit(HTAB *nodePlacementGroupHash, List *workerNodeList, static void NodePlacementGroupHashAssignNodes(HTAB *nodePlacementGroupHash, List *workerNodeList, - List *shardPlacementList) + List *shardPlacementList, + FmgrInfo *shardAllowedOnNodeUDF) { List *availableWorkerList = list_copy(workerNodeList); List *unassignedShardPlacementList = NIL; @@ -239,7 +244,8 @@ NodePlacementGroupHashAssignNodes(HTAB *nodePlacementGroupHash, int32 assignGroupId = shardPlacement->groupId; if (NodePlacementGroupHashAssignNode(nodePlacementGroupHash, assignGroupId, - shardPlacement)) + shardPlacement, + shardAllowedOnNodeUDF)) { int currentPlacementNodeIdx = WorkerNodeListGetNodeWithGroupId(availableWorkerList, @@ -271,7 +277,8 @@ NodePlacementGroupHashAssignNodes(HTAB *nodePlacementGroupHash, if (NodePlacementGroupHashAssignNode(nodePlacementGroupHash, availableWorkerNode->groupId, - unassignedShardPlacement)) + unassignedShardPlacement, + shardAllowedOnNodeUDF)) { separated = true; break; @@ -296,7 +303,8 @@ NodePlacementGroupHashAssignNodes(HTAB *nodePlacementGroupHash, static bool NodePlacementGroupHashAssignNode(HTAB *nodePlacementGroupHash, int32 nodeGroupId, - ShardPlacement *shardPlacement) + ShardPlacement *shardPlacement, + FmgrInfo *shardAllowedOnNodeUDF) { NodePlacementGroupHashEntry *nodePlacementGroupHashEntry = NodePlacementGroupHashGetNodeWithGroupId(nodePlacementGroupHash, nodeGroupId); @@ -330,6 +338,14 @@ NodePlacementGroupHashAssignNode(HTAB *nodePlacementGroupHash, return false; } + WorkerNode *workerNode = PrimaryNodeForGroup(nodeGroupId, NULL); + Datum allowed = FunctionCall2(shardAllowedOnNodeUDF, shardPlacement->shardId, + workerNode->nodeId); + if (!DatumGetBool(allowed)) + { + return false; + } + nodePlacementGroupHashEntry->assignedPlacementGroup = placementGroup; return true; diff --git a/src/backend/distributed/operations/shard_rebalancer.c b/src/backend/distributed/operations/shard_rebalancer.c index 7192b545b..391c34bad 100644 --- a/src/backend/distributed/operations/shard_rebalancer.c +++ b/src/backend/distributed/operations/shard_rebalancer.c @@ -598,7 +598,8 @@ GetRebalanceSteps(RebalanceOptions *options) PrepareRebalancerPlacementIsolationContext( activeWorkerList, FlattenNestedList(activeShardPlacementListList), - options->workerNode); + options->workerNode, + &context.shardAllowedOnNodeUDF); return RebalancePlacementUpdates(activeWorkerList, activeShardPlacementListList, diff --git a/src/include/distributed/rebalancer_placement_isolation.h b/src/include/distributed/rebalancer_placement_isolation.h index 0b8e3ddc7..7d4c4253e 100644 --- a/src/include/distributed/rebalancer_placement_isolation.h +++ b/src/include/distributed/rebalancer_placement_isolation.h @@ -28,7 +28,10 @@ extern RebalancerPlacementIsolationContext * PrepareRebalancerPlacementIsolation activeShardPlacementList, WorkerNode * - drainWorkerNode); + drainWorkerNode, + FmgrInfo + * + shardAllowedOnNodeUDF); extern bool RebalancerPlacementIsolationContextPlacementIsAllowedOnWorker( RebalancerPlacementIsolationContext *context, uint64 shardId, diff --git a/src/test/regress/expected/isolate_placement.out b/src/test/regress/expected/isolate_placement.out index a0eea6e55..2371feb96 100644 --- a/src/test/regress/expected/isolate_placement.out +++ b/src/test/regress/expected/isolate_placement.out @@ -1668,7 +1668,7 @@ VALUES ( 0, 0 ); -SET client_min_messages TO ERROR; +SET client_min_messages TO WARNING; SELECT rebalance_table_shards(rebalance_strategy := 'test_isolate_placement', shard_transfer_mode=>'block_writes'); rebalance_table_shards --------------------------------------------------------------------- @@ -1801,6 +1801,109 @@ SELECT public.verify_placements_in_shard_group_isolated('isolate_placement.singl DROP TABLE single_shard_3_shardid_nodeid; DELETE FROM pg_catalog.pg_dist_rebalance_strategy WHERE name='test_isolate_placement'; DROP TABLE single_shard_1, single_shard_3; +CREATE TABLE single_shard_1(a int); +SELECT create_distributed_table('single_shard_1', null, colocate_with=>'none'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +CREATE TABLE single_shard_2(a int); +SELECT create_distributed_table('single_shard_2', null, colocate_with=>'none'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +-- this would be placed on the same node as single_shard_1 +CREATE TABLE single_shard_3(a int); +SELECT create_distributed_table('single_shard_3', null, colocate_with=>'none'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +DROP TABLE single_shard_2; +SELECT shardid, nodeid INTO single_shard_1_shardid_nodeid +FROM pg_dist_shard JOIN pg_dist_placement USING (shardid) JOIN pg_dist_node USING (groupid) +WHERE logicalrelid = 'isolate_placement.single_shard_1'::regclass AND noderole = 'primary'; +SELECT citus_shard_property_set(shardid, anti_affinity=>true) FROM pg_dist_shard WHERE logicalrelid IN ('single_shard_1'::regclass); + citus_shard_property_set +--------------------------------------------------------------------- + +(1 row) + +SELECT citus_shard_property_set(shardid, anti_affinity=>true) FROM pg_dist_shard WHERE logicalrelid IN ('single_shard_3'::regclass); + citus_shard_property_set +--------------------------------------------------------------------- + +(1 row) + +-- tell rebalancer that single_shard_1 cannot be placed on the node where it is currently placed +CREATE OR REPLACE FUNCTION test_shard_allowed_on_node(p_shardid bigint, p_nodeid int) + RETURNS boolean AS +$$ + SELECT + CASE + WHEN (p_shardid = shardid and p_nodeid = nodeid) THEN false + ELSE true + END + FROM single_shard_1_shardid_nodeid; +$$ LANGUAGE sql; +INSERT INTO pg_catalog.pg_dist_rebalance_strategy( + name, + default_strategy, + shard_cost_function, + node_capacity_function, + shard_allowed_on_node_function, + default_threshold, + minimum_threshold, + improvement_threshold +) +VALUES ( + 'test_isolate_placement', + false, + 'citus_shard_cost_1', + 'citus_node_capacity_1', + 'isolate_placement.test_shard_allowed_on_node', + 0, + 0, + 0 +); +SET client_min_messages TO WARNING; +SELECT rebalance_table_shards(rebalance_strategy := 'test_isolate_placement', shard_transfer_mode=>'block_writes'); + rebalance_table_shards +--------------------------------------------------------------------- + +(1 row) + +SET client_min_messages TO NOTICE; +-- This time, test_shard_allowed_on_node() didn't cause rebalance_table_shards() to +-- emit a warning. +-- +-- Right now single_shard_1 & single_shard_3 are placed on the same node. And +-- due to order we follow when assigning nodes to placement groups that need an +-- isolated node, we will try placing single_shard_1 to the node where it is +-- currently placed but this is not possible due to test_shard_allowed_on_node(). +-- But this is not a problem because we will take the specified rebalancer strategy +-- into the account when assigning nodes to placements that need separate nodes and +-- will try to place it to a different node. Then we will try placing single_shard_3 +-- to the node where it is currently placed, and this is ok. +SELECT public.verify_placements_in_shard_group_isolated('isolate_placement.single_shard_1', 1) = true; + ?column? +--------------------------------------------------------------------- + t +(1 row) + +SELECT public.verify_placements_in_shard_group_isolated('isolate_placement.single_shard_3', 1) = true; + ?column? +--------------------------------------------------------------------- + t +(1 row) + +DROP TABLE single_shard_1_shardid_nodeid; +DELETE FROM pg_catalog.pg_dist_rebalance_strategy WHERE name='test_isolate_placement'; +DROP TABLE single_shard_1, single_shard_3; SET client_min_messages TO WARNING; DROP SCHEMA isolate_placement CASCADE; DROP FUNCTION public.verify_placements_in_shard_group_isolated(text, bigint); diff --git a/src/test/regress/sql/isolate_placement.sql b/src/test/regress/sql/isolate_placement.sql index dad3b23c8..0d98bb1bd 100644 --- a/src/test/regress/sql/isolate_placement.sql +++ b/src/test/regress/sql/isolate_placement.sql @@ -919,7 +919,7 @@ VALUES ( 0 ); -SET client_min_messages TO ERROR; +SET client_min_messages TO WARNING; SELECT rebalance_table_shards(rebalance_strategy := 'test_isolate_placement', shard_transfer_mode=>'block_writes'); SET client_min_messages TO NOTICE; @@ -1013,6 +1013,81 @@ DELETE FROM pg_catalog.pg_dist_rebalance_strategy WHERE name='test_isolate_place DROP TABLE single_shard_1, single_shard_3; +CREATE TABLE single_shard_1(a int); +SELECT create_distributed_table('single_shard_1', null, colocate_with=>'none'); + +CREATE TABLE single_shard_2(a int); +SELECT create_distributed_table('single_shard_2', null, colocate_with=>'none'); + +-- this would be placed on the same node as single_shard_1 +CREATE TABLE single_shard_3(a int); +SELECT create_distributed_table('single_shard_3', null, colocate_with=>'none'); + +DROP TABLE single_shard_2; + +SELECT shardid, nodeid INTO single_shard_1_shardid_nodeid +FROM pg_dist_shard JOIN pg_dist_placement USING (shardid) JOIN pg_dist_node USING (groupid) +WHERE logicalrelid = 'isolate_placement.single_shard_1'::regclass AND noderole = 'primary'; + +SELECT citus_shard_property_set(shardid, anti_affinity=>true) FROM pg_dist_shard WHERE logicalrelid IN ('single_shard_1'::regclass); +SELECT citus_shard_property_set(shardid, anti_affinity=>true) FROM pg_dist_shard WHERE logicalrelid IN ('single_shard_3'::regclass); + +-- tell rebalancer that single_shard_1 cannot be placed on the node where it is currently placed +CREATE OR REPLACE FUNCTION test_shard_allowed_on_node(p_shardid bigint, p_nodeid int) + RETURNS boolean AS +$$ + SELECT + CASE + WHEN (p_shardid = shardid and p_nodeid = nodeid) THEN false + ELSE true + END + FROM single_shard_1_shardid_nodeid; +$$ LANGUAGE sql; + +INSERT INTO pg_catalog.pg_dist_rebalance_strategy( + name, + default_strategy, + shard_cost_function, + node_capacity_function, + shard_allowed_on_node_function, + default_threshold, + minimum_threshold, + improvement_threshold +) +VALUES ( + 'test_isolate_placement', + false, + 'citus_shard_cost_1', + 'citus_node_capacity_1', + 'isolate_placement.test_shard_allowed_on_node', + 0, + 0, + 0 +); + +SET client_min_messages TO WARNING; +SELECT rebalance_table_shards(rebalance_strategy := 'test_isolate_placement', shard_transfer_mode=>'block_writes'); +SET client_min_messages TO NOTICE; + +-- This time, test_shard_allowed_on_node() didn't cause rebalance_table_shards() to +-- emit a warning. +-- +-- Right now single_shard_1 & single_shard_3 are placed on the same node. And +-- due to order we follow when assigning nodes to placement groups that need an +-- isolated node, we will try placing single_shard_1 to the node where it is +-- currently placed but this is not possible due to test_shard_allowed_on_node(). +-- But this is not a problem because we will take the specified rebalancer strategy +-- into the account when assigning nodes to placements that need separate nodes and +-- will try to place it to a different node. Then we will try placing single_shard_3 +-- to the node where it is currently placed, and this is ok. +SELECT public.verify_placements_in_shard_group_isolated('isolate_placement.single_shard_1', 1) = true; +SELECT public.verify_placements_in_shard_group_isolated('isolate_placement.single_shard_3', 1) = true; + +DROP TABLE single_shard_1_shardid_nodeid; +DELETE FROM pg_catalog.pg_dist_rebalance_strategy WHERE name='test_isolate_placement'; + +DROP TABLE single_shard_1, single_shard_3; + SET client_min_messages TO WARNING; DROP SCHEMA isolate_placement CASCADE; DROP FUNCTION public.verify_placements_in_shard_group_isolated(text, bigint);