diff --git a/src/backend/distributed/metadata/metadata_utility.c b/src/backend/distributed/metadata/metadata_utility.c index c09ad358c..dce66965a 100644 --- a/src/backend/distributed/metadata/metadata_utility.c +++ b/src/backend/distributed/metadata/metadata_utility.c @@ -1386,6 +1386,21 @@ IsActiveShardPlacement(ShardPlacement *shardPlacement) } +/* + * IsPlacementOnWorkerNode checks if the shard placement is for to the given + * workenode. + */ +bool +IsPlacementOnWorkerNode(ShardPlacement *placement, WorkerNode *workerNode) +{ + if (strncmp(workerNode->workerName, placement->nodeName, WORKER_LENGTH) != 0) + { + return false; + } + return workerNode->workerPort == placement->nodePort; +} + + /* * FilterShardPlacementList filters a list of shard placements based on a filter. * Keep only the shard for which the filter function returns true. @@ -1409,6 +1424,30 @@ FilterShardPlacementList(List *shardPlacementList, bool (*filter)(ShardPlacement } +/* + * FilterActiveShardPlacementListByNode filters a list of active shard placements based on given nodeName and nodePort. + */ +List * +FilterActiveShardPlacementListByNode(List *shardPlacementList, WorkerNode *workerNode) +{ + List *activeShardPlacementList = FilterShardPlacementList(shardPlacementList, + IsActiveShardPlacement); + List *filteredShardPlacementList = NIL; + ShardPlacement *shardPlacement = NULL; + + foreach_ptr(shardPlacement, activeShardPlacementList) + { + if (IsPlacementOnWorkerNode(shardPlacement, workerNode)) + { + filteredShardPlacementList = lappend(filteredShardPlacementList, + shardPlacement); + } + } + + return filteredShardPlacementList; +} + + /* * ActiveShardPlacementListOnGroup returns a list of active shard placements * that are sitting on group with groupId for given shardId. diff --git a/src/backend/distributed/operations/shard_rebalancer.c b/src/backend/distributed/operations/shard_rebalancer.c index 8290260d1..45f4a07f2 100644 --- a/src/backend/distributed/operations/shard_rebalancer.c +++ b/src/backend/distributed/operations/shard_rebalancer.c @@ -77,6 +77,7 @@ typedef struct RebalanceOptions float4 improvementThreshold; Form_pg_dist_rebalance_strategy rebalanceStrategy; const char *operationName; + WorkerNode *workerNode; } RebalanceOptions; @@ -209,7 +210,6 @@ static bool WorkerNodeListContains(List *workerNodeList, const char *workerName, uint32 workerPort); static void UpdateColocatedShardPlacementProgress(uint64 shardId, char *sourceName, int sourcePort, uint64 progress); -static bool IsPlacementOnWorkerNode(ShardPlacement *placement, WorkerNode *workerNode); static NodeFillState * FindFillStateForPlacement(RebalanceState *state, ShardPlacement *placement); static RebalanceState * InitRebalanceState(List *workerNodeList, List *shardPlacementList, @@ -469,6 +469,13 @@ GetRebalanceSteps(RebalanceOptions *options) options->excludedShardArray); List *activeShardPlacementListForRelation = FilterShardPlacementList(shardPlacementList, IsActiveShardPlacement); + + if (options->workerNode != NULL) + { + activeShardPlacementListForRelation = FilterActiveShardPlacementListByNode( + shardPlacementList, options->workerNode); + } + activeShardPlacementListList = lappend(activeShardPlacementListList, activeShardPlacementListForRelation); } @@ -1052,6 +1059,7 @@ citus_drain_node(PG_FUNCTION_ARGS) }; char *nodeName = text_to_cstring(nodeNameText); + options.workerNode = FindWorkerNodeOrError(nodeName, nodePort); /* * This is done in a separate session. This way it's not undone if the @@ -2294,21 +2302,6 @@ FindFillStateForPlacement(RebalanceState *state, ShardPlacement *placement) } -/* - * IsPlacementOnWorkerNode checks if the shard placement is for to the given - * workenode. - */ -static bool -IsPlacementOnWorkerNode(ShardPlacement *placement, WorkerNode *workerNode) -{ - if (strncmp(workerNode->workerName, placement->nodeName, WORKER_LENGTH) != 0) - { - return false; - } - return workerNode->workerPort == placement->nodePort; -} - - /* * CompareNodeFillStateAsc can be used to sort fill states from empty to full. */ diff --git a/src/include/distributed/metadata_utility.h b/src/include/distributed/metadata_utility.h index 5376dd858..06ec8a484 100644 --- a/src/include/distributed/metadata_utility.h +++ b/src/include/distributed/metadata_utility.h @@ -289,8 +289,11 @@ extern uint64 ShardLength(uint64 shardId); extern bool NodeGroupHasShardPlacements(int32 groupId, bool onlyConsiderActivePlacements); extern bool IsActiveShardPlacement(ShardPlacement *ShardPlacement); +extern bool IsPlacementOnWorkerNode(ShardPlacement *placement, WorkerNode *workerNode); extern List * FilterShardPlacementList(List *shardPlacementList, bool (*filter)( ShardPlacement *)); +extern List * FilterActiveShardPlacementListByNode(List *shardPlacementList, + WorkerNode *workerNode); extern List * ActiveShardPlacementListOnGroup(uint64 shardId, int32 groupId); extern List * ActiveShardPlacementList(uint64 shardId); extern List * ShardPlacementListWithoutOrphanedPlacements(uint64 shardId); diff --git a/src/test/regress/expected/citus_drain_node.out b/src/test/regress/expected/citus_drain_node.out new file mode 100644 index 000000000..48d60d45a --- /dev/null +++ b/src/test/regress/expected/citus_drain_node.out @@ -0,0 +1,120 @@ +CREATE SCHEMA citus_drain_node; +SET search_path TO citus_drain_node; +SET citus.shard_count TO 4; +SET citus.shard_replication_factor TO 1; +SET citus.next_shard_id TO 974653; +SET client_min_messages TO ERROR; +SELECT * FROM citus_set_coordinator_host('localhost', :master_port); + citus_set_coordinator_host +--------------------------------------------------------------------- + +(1 row) + +SELECT * FROM master_set_node_property('localhost', :master_port, 'shouldhaveshards', true); + master_set_node_property +--------------------------------------------------------------------- + +(1 row) + +CREATE TABLE test (x INT, y INT); +SELECT create_distributed_table('test','x'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +CALL citus_cleanup_orphaned_shards(); +SELECT nodename, nodeport, COUNT(*) + FROM pg_dist_placement AS placement, + pg_dist_node AS node + WHERE placement.groupid = node.groupid + AND node.noderole = 'primary' GROUP BY nodename, nodeport ORDER BY 1,2; + nodename | nodeport | count +--------------------------------------------------------------------- + localhost | 57636 | 2 + localhost | 57637 | 1 + localhost | 57638 | 1 +(3 rows) + +SELECT * FROM citus_set_node_property('localhost', :worker_2_port, 'shouldhaveshards', false); + citus_set_node_property +--------------------------------------------------------------------- + +(1 row) + +SELECT * from citus_drain_node('localhost', :worker_1_port, shard_transfer_mode :='force_logical'); + citus_drain_node +--------------------------------------------------------------------- + +(1 row) + +CALL citus_cleanup_orphaned_shards(); +SELECT nodename, nodeport, COUNT(*) + FROM pg_dist_placement AS placement, + pg_dist_node AS node + WHERE placement.groupid = node.groupid + AND node.noderole = 'primary' GROUP BY nodename, nodeport ORDER BY 1,2; + nodename | nodeport | count +--------------------------------------------------------------------- + localhost | 57636 | 3 + localhost | 57638 | 1 +(2 rows) + +SELECT * FROM citus_set_node_property('localhost', :worker_1_port, 'shouldhaveshards', true); + citus_set_node_property +--------------------------------------------------------------------- + +(1 row) + +SELECT * FROM citus_set_node_property('localhost', :worker_2_port, 'shouldhaveshards', true); + citus_set_node_property +--------------------------------------------------------------------- + +(1 row) + +SELECT * FROM rebalance_table_shards(shard_transfer_mode :='force_logical'); + rebalance_table_shards +--------------------------------------------------------------------- + +(1 row) + +CALL citus_cleanup_orphaned_shards(); +SELECT nodename, nodeport, COUNT(*) + FROM pg_dist_placement AS placement, + pg_dist_node AS node + WHERE placement.groupid = node.groupid + AND node.noderole = 'primary' GROUP BY nodename, nodeport ORDER BY 1,2; + nodename | nodeport | count +--------------------------------------------------------------------- + localhost | 57636 | 2 + localhost | 57637 | 1 + localhost | 57638 | 1 +(3 rows) + +SELECT * FROM citus_set_node_property('localhost', :master_port, 'shouldhaveshards', false); + citus_set_node_property +--------------------------------------------------------------------- + +(1 row) + +SELECT * FROM rebalance_table_shards(shard_transfer_mode :='force_logical'); + rebalance_table_shards +--------------------------------------------------------------------- + +(1 row) + +CALL citus_cleanup_orphaned_shards(); +SELECT nodename, nodeport, COUNT(*) + FROM pg_dist_placement AS placement, + pg_dist_node AS node + WHERE placement.groupid = node.groupid + AND node.noderole = 'primary' GROUP BY nodename, nodeport ORDER BY 1,2; + nodename | nodeport | count +--------------------------------------------------------------------- + localhost | 57637 | 2 + localhost | 57638 | 2 +(2 rows) + +RESET search_path; +SET client_min_messages TO WARNING; +DROP SCHEMA citus_drain_node CASCADE; diff --git a/src/test/regress/operations_schedule b/src/test/regress/operations_schedule index 96eafc3ea..2c767eb42 100644 --- a/src/test/regress/operations_schedule +++ b/src/test/regress/operations_schedule @@ -12,3 +12,4 @@ test: multi_colocated_shard_rebalance test: ignoring_orphaned_shards test: cpu_priority test: check_mx +test: citus_drain_node diff --git a/src/test/regress/sql/citus_drain_node.sql b/src/test/regress/sql/citus_drain_node.sql new file mode 100644 index 000000000..6ee16a29f --- /dev/null +++ b/src/test/regress/sql/citus_drain_node.sql @@ -0,0 +1,62 @@ +CREATE SCHEMA citus_drain_node; +SET search_path TO citus_drain_node; +SET citus.shard_count TO 4; +SET citus.shard_replication_factor TO 1; +SET citus.next_shard_id TO 974653; + +SET client_min_messages TO ERROR; + +SELECT * FROM citus_set_coordinator_host('localhost', :master_port); +SELECT * FROM master_set_node_property('localhost', :master_port, 'shouldhaveshards', true); + +CREATE TABLE test (x INT, y INT); +SELECT create_distributed_table('test','x'); + +CALL citus_cleanup_orphaned_shards(); + +SELECT nodename, nodeport, COUNT(*) + FROM pg_dist_placement AS placement, + pg_dist_node AS node + WHERE placement.groupid = node.groupid + AND node.noderole = 'primary' GROUP BY nodename, nodeport ORDER BY 1,2; + +SELECT * FROM citus_set_node_property('localhost', :worker_2_port, 'shouldhaveshards', false); +SELECT * from citus_drain_node('localhost', :worker_1_port, shard_transfer_mode :='force_logical'); + +CALL citus_cleanup_orphaned_shards(); + +SELECT nodename, nodeport, COUNT(*) + FROM pg_dist_placement AS placement, + pg_dist_node AS node + WHERE placement.groupid = node.groupid + AND node.noderole = 'primary' GROUP BY nodename, nodeport ORDER BY 1,2; + +SELECT * FROM citus_set_node_property('localhost', :worker_1_port, 'shouldhaveshards', true); +SELECT * FROM citus_set_node_property('localhost', :worker_2_port, 'shouldhaveshards', true); + +SELECT * FROM rebalance_table_shards(shard_transfer_mode :='force_logical'); + +CALL citus_cleanup_orphaned_shards(); + +SELECT nodename, nodeport, COUNT(*) + FROM pg_dist_placement AS placement, + pg_dist_node AS node + WHERE placement.groupid = node.groupid + AND node.noderole = 'primary' GROUP BY nodename, nodeport ORDER BY 1,2; + +SELECT * FROM citus_set_node_property('localhost', :master_port, 'shouldhaveshards', false); + +SELECT * FROM rebalance_table_shards(shard_transfer_mode :='force_logical'); + +CALL citus_cleanup_orphaned_shards(); + +SELECT nodename, nodeport, COUNT(*) + FROM pg_dist_placement AS placement, + pg_dist_node AS node + WHERE placement.groupid = node.groupid + AND node.noderole = 'primary' GROUP BY nodename, nodeport ORDER BY 1,2; + +RESET search_path; + +SET client_min_messages TO WARNING; +DROP SCHEMA citus_drain_node CASCADE;