Limit citus_drain_node to drain the specified node only (#6361)

DESCRIPTION: Fixes citus_drain_node to drain the specified worker only.

Fixes #6267
pull/6413/head
Gokhan Gulbiz 2022-10-09 13:33:08 +03:00 committed by GitHub
parent 86e186f671
commit 1776bdf654
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 234 additions and 16 deletions

View File

@ -1386,6 +1386,21 @@ IsActiveShardPlacement(ShardPlacement *shardPlacement)
}
/*
* IsPlacementOnWorkerNode checks if the shard placement is for to the given
* workenode.
*/
bool
IsPlacementOnWorkerNode(ShardPlacement *placement, WorkerNode *workerNode)
{
if (strncmp(workerNode->workerName, placement->nodeName, WORKER_LENGTH) != 0)
{
return false;
}
return workerNode->workerPort == placement->nodePort;
}
/*
* FilterShardPlacementList filters a list of shard placements based on a filter.
* Keep only the shard for which the filter function returns true.
@ -1409,6 +1424,30 @@ FilterShardPlacementList(List *shardPlacementList, bool (*filter)(ShardPlacement
}
/*
* FilterActiveShardPlacementListByNode filters a list of active shard placements based on given nodeName and nodePort.
*/
List *
FilterActiveShardPlacementListByNode(List *shardPlacementList, WorkerNode *workerNode)
{
List *activeShardPlacementList = FilterShardPlacementList(shardPlacementList,
IsActiveShardPlacement);
List *filteredShardPlacementList = NIL;
ShardPlacement *shardPlacement = NULL;
foreach_ptr(shardPlacement, activeShardPlacementList)
{
if (IsPlacementOnWorkerNode(shardPlacement, workerNode))
{
filteredShardPlacementList = lappend(filteredShardPlacementList,
shardPlacement);
}
}
return filteredShardPlacementList;
}
/*
* ActiveShardPlacementListOnGroup returns a list of active shard placements
* that are sitting on group with groupId for given shardId.

View File

@ -77,6 +77,7 @@ typedef struct RebalanceOptions
float4 improvementThreshold;
Form_pg_dist_rebalance_strategy rebalanceStrategy;
const char *operationName;
WorkerNode *workerNode;
} RebalanceOptions;
@ -209,7 +210,6 @@ static bool WorkerNodeListContains(List *workerNodeList, const char *workerName,
uint32 workerPort);
static void UpdateColocatedShardPlacementProgress(uint64 shardId, char *sourceName,
int sourcePort, uint64 progress);
static bool IsPlacementOnWorkerNode(ShardPlacement *placement, WorkerNode *workerNode);
static NodeFillState * FindFillStateForPlacement(RebalanceState *state,
ShardPlacement *placement);
static RebalanceState * InitRebalanceState(List *workerNodeList, List *shardPlacementList,
@ -469,6 +469,13 @@ GetRebalanceSteps(RebalanceOptions *options)
options->excludedShardArray);
List *activeShardPlacementListForRelation =
FilterShardPlacementList(shardPlacementList, IsActiveShardPlacement);
if (options->workerNode != NULL)
{
activeShardPlacementListForRelation = FilterActiveShardPlacementListByNode(
shardPlacementList, options->workerNode);
}
activeShardPlacementListList =
lappend(activeShardPlacementListList, activeShardPlacementListForRelation);
}
@ -1052,6 +1059,7 @@ citus_drain_node(PG_FUNCTION_ARGS)
};
char *nodeName = text_to_cstring(nodeNameText);
options.workerNode = FindWorkerNodeOrError(nodeName, nodePort);
/*
* This is done in a separate session. This way it's not undone if the
@ -2294,21 +2302,6 @@ FindFillStateForPlacement(RebalanceState *state, ShardPlacement *placement)
}
/*
* IsPlacementOnWorkerNode checks if the shard placement is for to the given
* workenode.
*/
static bool
IsPlacementOnWorkerNode(ShardPlacement *placement, WorkerNode *workerNode)
{
if (strncmp(workerNode->workerName, placement->nodeName, WORKER_LENGTH) != 0)
{
return false;
}
return workerNode->workerPort == placement->nodePort;
}
/*
* CompareNodeFillStateAsc can be used to sort fill states from empty to full.
*/

View File

@ -289,8 +289,11 @@ extern uint64 ShardLength(uint64 shardId);
extern bool NodeGroupHasShardPlacements(int32 groupId,
bool onlyConsiderActivePlacements);
extern bool IsActiveShardPlacement(ShardPlacement *ShardPlacement);
extern bool IsPlacementOnWorkerNode(ShardPlacement *placement, WorkerNode *workerNode);
extern List * FilterShardPlacementList(List *shardPlacementList, bool (*filter)(
ShardPlacement *));
extern List * FilterActiveShardPlacementListByNode(List *shardPlacementList,
WorkerNode *workerNode);
extern List * ActiveShardPlacementListOnGroup(uint64 shardId, int32 groupId);
extern List * ActiveShardPlacementList(uint64 shardId);
extern List * ShardPlacementListWithoutOrphanedPlacements(uint64 shardId);

View File

@ -0,0 +1,120 @@
CREATE SCHEMA citus_drain_node;
SET search_path TO citus_drain_node;
SET citus.shard_count TO 4;
SET citus.shard_replication_factor TO 1;
SET citus.next_shard_id TO 974653;
SET client_min_messages TO ERROR;
SELECT * FROM citus_set_coordinator_host('localhost', :master_port);
citus_set_coordinator_host
---------------------------------------------------------------------
(1 row)
SELECT * FROM master_set_node_property('localhost', :master_port, 'shouldhaveshards', true);
master_set_node_property
---------------------------------------------------------------------
(1 row)
CREATE TABLE test (x INT, y INT);
SELECT create_distributed_table('test','x');
create_distributed_table
---------------------------------------------------------------------
(1 row)
CALL citus_cleanup_orphaned_shards();
SELECT nodename, nodeport, COUNT(*)
FROM pg_dist_placement AS placement,
pg_dist_node AS node
WHERE placement.groupid = node.groupid
AND node.noderole = 'primary' GROUP BY nodename, nodeport ORDER BY 1,2;
nodename | nodeport | count
---------------------------------------------------------------------
localhost | 57636 | 2
localhost | 57637 | 1
localhost | 57638 | 1
(3 rows)
SELECT * FROM citus_set_node_property('localhost', :worker_2_port, 'shouldhaveshards', false);
citus_set_node_property
---------------------------------------------------------------------
(1 row)
SELECT * from citus_drain_node('localhost', :worker_1_port, shard_transfer_mode :='force_logical');
citus_drain_node
---------------------------------------------------------------------
(1 row)
CALL citus_cleanup_orphaned_shards();
SELECT nodename, nodeport, COUNT(*)
FROM pg_dist_placement AS placement,
pg_dist_node AS node
WHERE placement.groupid = node.groupid
AND node.noderole = 'primary' GROUP BY nodename, nodeport ORDER BY 1,2;
nodename | nodeport | count
---------------------------------------------------------------------
localhost | 57636 | 3
localhost | 57638 | 1
(2 rows)
SELECT * FROM citus_set_node_property('localhost', :worker_1_port, 'shouldhaveshards', true);
citus_set_node_property
---------------------------------------------------------------------
(1 row)
SELECT * FROM citus_set_node_property('localhost', :worker_2_port, 'shouldhaveshards', true);
citus_set_node_property
---------------------------------------------------------------------
(1 row)
SELECT * FROM rebalance_table_shards(shard_transfer_mode :='force_logical');
rebalance_table_shards
---------------------------------------------------------------------
(1 row)
CALL citus_cleanup_orphaned_shards();
SELECT nodename, nodeport, COUNT(*)
FROM pg_dist_placement AS placement,
pg_dist_node AS node
WHERE placement.groupid = node.groupid
AND node.noderole = 'primary' GROUP BY nodename, nodeport ORDER BY 1,2;
nodename | nodeport | count
---------------------------------------------------------------------
localhost | 57636 | 2
localhost | 57637 | 1
localhost | 57638 | 1
(3 rows)
SELECT * FROM citus_set_node_property('localhost', :master_port, 'shouldhaveshards', false);
citus_set_node_property
---------------------------------------------------------------------
(1 row)
SELECT * FROM rebalance_table_shards(shard_transfer_mode :='force_logical');
rebalance_table_shards
---------------------------------------------------------------------
(1 row)
CALL citus_cleanup_orphaned_shards();
SELECT nodename, nodeport, COUNT(*)
FROM pg_dist_placement AS placement,
pg_dist_node AS node
WHERE placement.groupid = node.groupid
AND node.noderole = 'primary' GROUP BY nodename, nodeport ORDER BY 1,2;
nodename | nodeport | count
---------------------------------------------------------------------
localhost | 57637 | 2
localhost | 57638 | 2
(2 rows)
RESET search_path;
SET client_min_messages TO WARNING;
DROP SCHEMA citus_drain_node CASCADE;

View File

@ -12,3 +12,4 @@ test: multi_colocated_shard_rebalance
test: ignoring_orphaned_shards
test: cpu_priority
test: check_mx
test: citus_drain_node

View File

@ -0,0 +1,62 @@
CREATE SCHEMA citus_drain_node;
SET search_path TO citus_drain_node;
SET citus.shard_count TO 4;
SET citus.shard_replication_factor TO 1;
SET citus.next_shard_id TO 974653;
SET client_min_messages TO ERROR;
SELECT * FROM citus_set_coordinator_host('localhost', :master_port);
SELECT * FROM master_set_node_property('localhost', :master_port, 'shouldhaveshards', true);
CREATE TABLE test (x INT, y INT);
SELECT create_distributed_table('test','x');
CALL citus_cleanup_orphaned_shards();
SELECT nodename, nodeport, COUNT(*)
FROM pg_dist_placement AS placement,
pg_dist_node AS node
WHERE placement.groupid = node.groupid
AND node.noderole = 'primary' GROUP BY nodename, nodeport ORDER BY 1,2;
SELECT * FROM citus_set_node_property('localhost', :worker_2_port, 'shouldhaveshards', false);
SELECT * from citus_drain_node('localhost', :worker_1_port, shard_transfer_mode :='force_logical');
CALL citus_cleanup_orphaned_shards();
SELECT nodename, nodeport, COUNT(*)
FROM pg_dist_placement AS placement,
pg_dist_node AS node
WHERE placement.groupid = node.groupid
AND node.noderole = 'primary' GROUP BY nodename, nodeport ORDER BY 1,2;
SELECT * FROM citus_set_node_property('localhost', :worker_1_port, 'shouldhaveshards', true);
SELECT * FROM citus_set_node_property('localhost', :worker_2_port, 'shouldhaveshards', true);
SELECT * FROM rebalance_table_shards(shard_transfer_mode :='force_logical');
CALL citus_cleanup_orphaned_shards();
SELECT nodename, nodeport, COUNT(*)
FROM pg_dist_placement AS placement,
pg_dist_node AS node
WHERE placement.groupid = node.groupid
AND node.noderole = 'primary' GROUP BY nodename, nodeport ORDER BY 1,2;
SELECT * FROM citus_set_node_property('localhost', :master_port, 'shouldhaveshards', false);
SELECT * FROM rebalance_table_shards(shard_transfer_mode :='force_logical');
CALL citus_cleanup_orphaned_shards();
SELECT nodename, nodeport, COUNT(*)
FROM pg_dist_placement AS placement,
pg_dist_node AS node
WHERE placement.groupid = node.groupid
AND node.noderole = 'primary' GROUP BY nodename, nodeport ORDER BY 1,2;
RESET search_path;
SET client_min_messages TO WARNING;
DROP SCHEMA citus_drain_node CASCADE;