mirror of https://github.com/citusdata/citus.git
take shardAllowedOnNode udf into account when planning
parent
d1a1ad0147
commit
faffeccc76
|
@ -79,10 +79,12 @@ static void NodePlacementGroupHashInit(HTAB *nodePlacementGroupHash,
|
|||
WorkerNode *drainWorkerNode);
|
||||
static void NodePlacementGroupHashAssignNodes(HTAB *nodePlacementGroupHash,
|
||||
List *workerNodeList,
|
||||
List *shardPlacementList);
|
||||
List *shardPlacementList,
|
||||
FmgrInfo *shardAllowedOnNodeUDF);
|
||||
static bool NodePlacementGroupHashAssignNode(HTAB *nodePlacementGroupHash,
|
||||
int32 nodeGroupId,
|
||||
ShardPlacement *shardPlacement);
|
||||
ShardPlacement *shardPlacement,
|
||||
FmgrInfo *shardAllowedOnNodeUDF);
|
||||
static NodePlacementGroupHashEntry * NodePlacementGroupHashGetNodeWithGroupId(
|
||||
HTAB *nodePlacementGroupHash,
|
||||
int32
|
||||
|
@ -101,7 +103,8 @@ static int WorkerNodeListGetNodeWithGroupId(List *workerNodeList, int32 nodeGrou
|
|||
RebalancerPlacementIsolationContext *
|
||||
PrepareRebalancerPlacementIsolationContext(List *activeWorkerNodeList,
|
||||
List *activeShardPlacementList,
|
||||
WorkerNode *drainWorkerNode)
|
||||
WorkerNode *drainWorkerNode,
|
||||
FmgrInfo *shardAllowedOnNodeUDF)
|
||||
{
|
||||
HTAB *nodePlacementGroupHash =
|
||||
CreateSimpleHashWithNameAndSize(uint32, NodePlacementGroupHashEntry,
|
||||
|
@ -116,7 +119,8 @@ PrepareRebalancerPlacementIsolationContext(List *activeWorkerNodeList,
|
|||
|
||||
NodePlacementGroupHashAssignNodes(nodePlacementGroupHash,
|
||||
activeWorkerNodeList,
|
||||
activeShardPlacementList);
|
||||
activeShardPlacementList,
|
||||
shardAllowedOnNodeUDF);
|
||||
|
||||
RebalancerPlacementIsolationContext *context =
|
||||
palloc(sizeof(RebalancerPlacementIsolationContext));
|
||||
|
@ -218,7 +222,8 @@ NodePlacementGroupHashInit(HTAB *nodePlacementGroupHash, List *workerNodeList,
|
|||
static void
|
||||
NodePlacementGroupHashAssignNodes(HTAB *nodePlacementGroupHash,
|
||||
List *workerNodeList,
|
||||
List *shardPlacementList)
|
||||
List *shardPlacementList,
|
||||
FmgrInfo *shardAllowedOnNodeUDF)
|
||||
{
|
||||
List *availableWorkerList = list_copy(workerNodeList);
|
||||
List *unassignedShardPlacementList = NIL;
|
||||
|
@ -239,7 +244,8 @@ NodePlacementGroupHashAssignNodes(HTAB *nodePlacementGroupHash,
|
|||
int32 assignGroupId = shardPlacement->groupId;
|
||||
if (NodePlacementGroupHashAssignNode(nodePlacementGroupHash,
|
||||
assignGroupId,
|
||||
shardPlacement))
|
||||
shardPlacement,
|
||||
shardAllowedOnNodeUDF))
|
||||
{
|
||||
int currentPlacementNodeIdx =
|
||||
WorkerNodeListGetNodeWithGroupId(availableWorkerList,
|
||||
|
@ -271,7 +277,8 @@ NodePlacementGroupHashAssignNodes(HTAB *nodePlacementGroupHash,
|
|||
|
||||
if (NodePlacementGroupHashAssignNode(nodePlacementGroupHash,
|
||||
availableWorkerNode->groupId,
|
||||
unassignedShardPlacement))
|
||||
unassignedShardPlacement,
|
||||
shardAllowedOnNodeUDF))
|
||||
{
|
||||
separated = true;
|
||||
break;
|
||||
|
@ -296,7 +303,8 @@ NodePlacementGroupHashAssignNodes(HTAB *nodePlacementGroupHash,
|
|||
static bool
|
||||
NodePlacementGroupHashAssignNode(HTAB *nodePlacementGroupHash,
|
||||
int32 nodeGroupId,
|
||||
ShardPlacement *shardPlacement)
|
||||
ShardPlacement *shardPlacement,
|
||||
FmgrInfo *shardAllowedOnNodeUDF)
|
||||
{
|
||||
NodePlacementGroupHashEntry *nodePlacementGroupHashEntry =
|
||||
NodePlacementGroupHashGetNodeWithGroupId(nodePlacementGroupHash, nodeGroupId);
|
||||
|
@ -330,6 +338,14 @@ NodePlacementGroupHashAssignNode(HTAB *nodePlacementGroupHash,
|
|||
return false;
|
||||
}
|
||||
|
||||
WorkerNode *workerNode = PrimaryNodeForGroup(nodeGroupId, NULL);
|
||||
Datum allowed = FunctionCall2(shardAllowedOnNodeUDF, shardPlacement->shardId,
|
||||
workerNode->nodeId);
|
||||
if (!DatumGetBool(allowed))
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
nodePlacementGroupHashEntry->assignedPlacementGroup = placementGroup;
|
||||
|
||||
return true;
|
||||
|
|
|
@ -598,7 +598,8 @@ GetRebalanceSteps(RebalanceOptions *options)
|
|||
PrepareRebalancerPlacementIsolationContext(
|
||||
activeWorkerList,
|
||||
FlattenNestedList(activeShardPlacementListList),
|
||||
options->workerNode);
|
||||
options->workerNode,
|
||||
&context.shardAllowedOnNodeUDF);
|
||||
|
||||
return RebalancePlacementUpdates(activeWorkerList,
|
||||
activeShardPlacementListList,
|
||||
|
|
|
@ -28,7 +28,10 @@ extern RebalancerPlacementIsolationContext * PrepareRebalancerPlacementIsolation
|
|||
activeShardPlacementList,
|
||||
WorkerNode
|
||||
*
|
||||
drainWorkerNode);
|
||||
drainWorkerNode,
|
||||
FmgrInfo
|
||||
*
|
||||
shardAllowedOnNodeUDF);
|
||||
extern bool RebalancerPlacementIsolationContextPlacementIsAllowedOnWorker(
|
||||
RebalancerPlacementIsolationContext *context,
|
||||
uint64 shardId,
|
||||
|
|
|
@ -1668,7 +1668,7 @@ VALUES (
|
|||
0,
|
||||
0
|
||||
);
|
||||
SET client_min_messages TO ERROR;
|
||||
SET client_min_messages TO WARNING;
|
||||
SELECT rebalance_table_shards(rebalance_strategy := 'test_isolate_placement', shard_transfer_mode=>'block_writes');
|
||||
rebalance_table_shards
|
||||
---------------------------------------------------------------------
|
||||
|
@ -1801,6 +1801,109 @@ SELECT public.verify_placements_in_shard_group_isolated('isolate_placement.singl
|
|||
DROP TABLE single_shard_3_shardid_nodeid;
|
||||
DELETE FROM pg_catalog.pg_dist_rebalance_strategy WHERE name='test_isolate_placement';
|
||||
DROP TABLE single_shard_1, single_shard_3;
|
||||
CREATE TABLE single_shard_1(a int);
|
||||
SELECT create_distributed_table('single_shard_1', null, colocate_with=>'none');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
CREATE TABLE single_shard_2(a int);
|
||||
SELECT create_distributed_table('single_shard_2', null, colocate_with=>'none');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- this would be placed on the same node as single_shard_1
|
||||
CREATE TABLE single_shard_3(a int);
|
||||
SELECT create_distributed_table('single_shard_3', null, colocate_with=>'none');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
DROP TABLE single_shard_2;
|
||||
SELECT shardid, nodeid INTO single_shard_1_shardid_nodeid
|
||||
FROM pg_dist_shard JOIN pg_dist_placement USING (shardid) JOIN pg_dist_node USING (groupid)
|
||||
WHERE logicalrelid = 'isolate_placement.single_shard_1'::regclass AND noderole = 'primary';
|
||||
SELECT citus_shard_property_set(shardid, anti_affinity=>true) FROM pg_dist_shard WHERE logicalrelid IN ('single_shard_1'::regclass);
|
||||
citus_shard_property_set
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT citus_shard_property_set(shardid, anti_affinity=>true) FROM pg_dist_shard WHERE logicalrelid IN ('single_shard_3'::regclass);
|
||||
citus_shard_property_set
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- tell rebalancer that single_shard_1 cannot be placed on the node where it is currently placed
|
||||
CREATE OR REPLACE FUNCTION test_shard_allowed_on_node(p_shardid bigint, p_nodeid int)
|
||||
RETURNS boolean AS
|
||||
$$
|
||||
SELECT
|
||||
CASE
|
||||
WHEN (p_shardid = shardid and p_nodeid = nodeid) THEN false
|
||||
ELSE true
|
||||
END
|
||||
FROM single_shard_1_shardid_nodeid;
|
||||
$$ LANGUAGE sql;
|
||||
INSERT INTO pg_catalog.pg_dist_rebalance_strategy(
|
||||
name,
|
||||
default_strategy,
|
||||
shard_cost_function,
|
||||
node_capacity_function,
|
||||
shard_allowed_on_node_function,
|
||||
default_threshold,
|
||||
minimum_threshold,
|
||||
improvement_threshold
|
||||
)
|
||||
VALUES (
|
||||
'test_isolate_placement',
|
||||
false,
|
||||
'citus_shard_cost_1',
|
||||
'citus_node_capacity_1',
|
||||
'isolate_placement.test_shard_allowed_on_node',
|
||||
0,
|
||||
0,
|
||||
0
|
||||
);
|
||||
SET client_min_messages TO WARNING;
|
||||
SELECT rebalance_table_shards(rebalance_strategy := 'test_isolate_placement', shard_transfer_mode=>'block_writes');
|
||||
rebalance_table_shards
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SET client_min_messages TO NOTICE;
|
||||
-- This time, test_shard_allowed_on_node() didn't cause rebalance_table_shards() to
|
||||
-- emit a warning.
|
||||
--
|
||||
-- Right now single_shard_1 & single_shard_3 are placed on the same node. And
|
||||
-- due to order we follow when assigning nodes to placement groups that need an
|
||||
-- isolated node, we will try placing single_shard_1 to the node where it is
|
||||
-- currently placed but this is not possible due to test_shard_allowed_on_node().
|
||||
-- But this is not a problem because we will take the specified rebalancer strategy
|
||||
-- into the account when assigning nodes to placements that need separate nodes and
|
||||
-- will try to place it to a different node. Then we will try placing single_shard_3
|
||||
-- to the node where it is currently placed, and this is ok.
|
||||
SELECT public.verify_placements_in_shard_group_isolated('isolate_placement.single_shard_1', 1) = true;
|
||||
?column?
|
||||
---------------------------------------------------------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
SELECT public.verify_placements_in_shard_group_isolated('isolate_placement.single_shard_3', 1) = true;
|
||||
?column?
|
||||
---------------------------------------------------------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
DROP TABLE single_shard_1_shardid_nodeid;
|
||||
DELETE FROM pg_catalog.pg_dist_rebalance_strategy WHERE name='test_isolate_placement';
|
||||
DROP TABLE single_shard_1, single_shard_3;
|
||||
SET client_min_messages TO WARNING;
|
||||
DROP SCHEMA isolate_placement CASCADE;
|
||||
DROP FUNCTION public.verify_placements_in_shard_group_isolated(text, bigint);
|
||||
|
|
|
@ -919,7 +919,7 @@ VALUES (
|
|||
0
|
||||
);
|
||||
|
||||
SET client_min_messages TO ERROR;
|
||||
SET client_min_messages TO WARNING;
|
||||
SELECT rebalance_table_shards(rebalance_strategy := 'test_isolate_placement', shard_transfer_mode=>'block_writes');
|
||||
SET client_min_messages TO NOTICE;
|
||||
|
||||
|
@ -1013,6 +1013,81 @@ DELETE FROM pg_catalog.pg_dist_rebalance_strategy WHERE name='test_isolate_place
|
|||
|
||||
DROP TABLE single_shard_1, single_shard_3;
|
||||
|
||||
CREATE TABLE single_shard_1(a int);
|
||||
SELECT create_distributed_table('single_shard_1', null, colocate_with=>'none');
|
||||
|
||||
CREATE TABLE single_shard_2(a int);
|
||||
SELECT create_distributed_table('single_shard_2', null, colocate_with=>'none');
|
||||
|
||||
-- this would be placed on the same node as single_shard_1
|
||||
CREATE TABLE single_shard_3(a int);
|
||||
SELECT create_distributed_table('single_shard_3', null, colocate_with=>'none');
|
||||
|
||||
DROP TABLE single_shard_2;
|
||||
|
||||
SELECT shardid, nodeid INTO single_shard_1_shardid_nodeid
|
||||
FROM pg_dist_shard JOIN pg_dist_placement USING (shardid) JOIN pg_dist_node USING (groupid)
|
||||
WHERE logicalrelid = 'isolate_placement.single_shard_1'::regclass AND noderole = 'primary';
|
||||
|
||||
SELECT citus_shard_property_set(shardid, anti_affinity=>true) FROM pg_dist_shard WHERE logicalrelid IN ('single_shard_1'::regclass);
|
||||
SELECT citus_shard_property_set(shardid, anti_affinity=>true) FROM pg_dist_shard WHERE logicalrelid IN ('single_shard_3'::regclass);
|
||||
|
||||
-- tell rebalancer that single_shard_1 cannot be placed on the node where it is currently placed
|
||||
CREATE OR REPLACE FUNCTION test_shard_allowed_on_node(p_shardid bigint, p_nodeid int)
|
||||
RETURNS boolean AS
|
||||
$$
|
||||
SELECT
|
||||
CASE
|
||||
WHEN (p_shardid = shardid and p_nodeid = nodeid) THEN false
|
||||
ELSE true
|
||||
END
|
||||
FROM single_shard_1_shardid_nodeid;
|
||||
$$ LANGUAGE sql;
|
||||
|
||||
INSERT INTO pg_catalog.pg_dist_rebalance_strategy(
|
||||
name,
|
||||
default_strategy,
|
||||
shard_cost_function,
|
||||
node_capacity_function,
|
||||
shard_allowed_on_node_function,
|
||||
default_threshold,
|
||||
minimum_threshold,
|
||||
improvement_threshold
|
||||
)
|
||||
VALUES (
|
||||
'test_isolate_placement',
|
||||
false,
|
||||
'citus_shard_cost_1',
|
||||
'citus_node_capacity_1',
|
||||
'isolate_placement.test_shard_allowed_on_node',
|
||||
0,
|
||||
0,
|
||||
0
|
||||
);
|
||||
|
||||
SET client_min_messages TO WARNING;
|
||||
SELECT rebalance_table_shards(rebalance_strategy := 'test_isolate_placement', shard_transfer_mode=>'block_writes');
|
||||
SET client_min_messages TO NOTICE;
|
||||
|
||||
-- This time, test_shard_allowed_on_node() didn't cause rebalance_table_shards() to
|
||||
-- emit a warning.
|
||||
--
|
||||
-- Right now single_shard_1 & single_shard_3 are placed on the same node. And
|
||||
-- due to order we follow when assigning nodes to placement groups that need an
|
||||
-- isolated node, we will try placing single_shard_1 to the node where it is
|
||||
-- currently placed but this is not possible due to test_shard_allowed_on_node().
|
||||
-- But this is not a problem because we will take the specified rebalancer strategy
|
||||
-- into the account when assigning nodes to placements that need separate nodes and
|
||||
-- will try to place it to a different node. Then we will try placing single_shard_3
|
||||
-- to the node where it is currently placed, and this is ok.
|
||||
SELECT public.verify_placements_in_shard_group_isolated('isolate_placement.single_shard_1', 1) = true;
|
||||
SELECT public.verify_placements_in_shard_group_isolated('isolate_placement.single_shard_3', 1) = true;
|
||||
|
||||
DROP TABLE single_shard_1_shardid_nodeid;
|
||||
DELETE FROM pg_catalog.pg_dist_rebalance_strategy WHERE name='test_isolate_placement';
|
||||
|
||||
DROP TABLE single_shard_1, single_shard_3;
|
||||
|
||||
SET client_min_messages TO WARNING;
|
||||
DROP SCHEMA isolate_placement CASCADE;
|
||||
DROP FUNCTION public.verify_placements_in_shard_group_isolated(text, bigint);
|
||||
|
|
Loading…
Reference in New Issue