diff --git a/src/backend/distributed/metadata/metadata_utility.c b/src/backend/distributed/metadata/metadata_utility.c index 2c5a8b0fe..4ad3f9e48 100644 --- a/src/backend/distributed/metadata/metadata_utility.c +++ b/src/backend/distributed/metadata/metadata_utility.c @@ -1291,6 +1291,52 @@ NodeGroupHasShardPlacements(int32 groupId, bool onlyConsiderActivePlacements) } +/* + * IsActiveShardPlacement checks if the shard placement is labelled as + * active, and that it is placed in an active worker. + * Expects shard worker to not be NULL. + */ +bool +IsActiveShardPlacement(ShardPlacement *shardPlacement) +{ + WorkerNode *workerNode = + FindWorkerNode(shardPlacement->nodeName, shardPlacement->nodePort); + + if (!workerNode) + { + ereport(ERROR, (errmsg("There is a shard placement on node %s:%d but " + "could not find the node.", shardPlacement->nodeName, + shardPlacement->nodePort))); + } + + return shardPlacement->shardState == SHARD_STATE_ACTIVE && + workerNode->isActive; +} + + +/* + * FilterShardPlacementList filters a list of shard placements based on a filter. + * Keep only the shard for which the filter function returns true. + */ +List * +FilterShardPlacementList(List *shardPlacementList, bool (*filter)(ShardPlacement *)) +{ + List *filteredShardPlacementList = NIL; + ShardPlacement *shardPlacement = NULL; + + foreach_ptr(shardPlacement, shardPlacementList) + { + if (filter(shardPlacement)) + { + filteredShardPlacementList = lappend(filteredShardPlacementList, + shardPlacement); + } + } + + return filteredShardPlacementList; +} + + /* * ActiveShardPlacementListOnGroup returns a list of active shard placements * that are sitting on group with groupId for given shardId. @@ -1323,53 +1369,39 @@ ActiveShardPlacementListOnGroup(uint64 shardId, int32 groupId) List * ActiveShardPlacementList(uint64 shardId) { - List *activePlacementList = NIL; List *shardPlacementList = ShardPlacementListIncludingOrphanedPlacements(shardId); - ShardPlacement *shardPlacement = NULL; - foreach_ptr(shardPlacement, shardPlacementList) - { - WorkerNode *workerNode = - FindWorkerNode(shardPlacement->nodeName, shardPlacement->nodePort); - - /* - * We have already resolved the placement to node, so would have - * errored out earlier. - */ - Assert(workerNode != NULL); - - if (shardPlacement->shardState == SHARD_STATE_ACTIVE && - workerNode->isActive) - { - activePlacementList = lappend(activePlacementList, shardPlacement); - } - } + List *activePlacementList = FilterShardPlacementList(shardPlacementList, + IsActiveShardPlacement); return SortList(activePlacementList, CompareShardPlacementsByWorker); } +/* + * IsShardPlacementNotOrphaned checks returns true if a shard placement is not orphaned + * Orphaned shards are shards marked to be deleted at a later point (shardstate = 4). + */ +static inline bool +IsShardPlacementNotOrphaned(ShardPlacement *shardPlacement) +{ + return shardPlacement->shardState != SHARD_STATE_TO_DELETE; +} + + /* * ShardPlacementListWithoutOrphanedPlacements returns shard placements exluding - * the ones that are orphaned, because they are marked to be deleted at a later - * point (shardstate = 4). + * the ones that are orphaned. */ List * ShardPlacementListWithoutOrphanedPlacements(uint64 shardId) { - List *activePlacementList = NIL; List *shardPlacementList = ShardPlacementListIncludingOrphanedPlacements(shardId); - ShardPlacement *shardPlacement = NULL; - foreach_ptr(shardPlacement, shardPlacementList) - { - if (shardPlacement->shardState != SHARD_STATE_TO_DELETE) - { - activePlacementList = lappend(activePlacementList, shardPlacement); - } - } + List *activePlacementList = FilterShardPlacementList(shardPlacementList, + IsShardPlacementNotOrphaned); return SortList(activePlacementList, CompareShardPlacementsByWorker); } diff --git a/src/backend/distributed/operations/shard_rebalancer.c b/src/backend/distributed/operations/shard_rebalancer.c index d35427e6b..43dd167b0 100644 --- a/src/backend/distributed/operations/shard_rebalancer.c +++ b/src/backend/distributed/operations/shard_rebalancer.c @@ -34,6 +34,7 @@ #include "distributed/lock_graph.h" #include "distributed/coordinator_protocol.h" #include "distributed/metadata_cache.h" +#include "distributed/metadata_utility.h" #include "distributed/multi_client_executor.h" #include "distributed/multi_progress.h" #include "distributed/multi_server_executor.h" @@ -190,7 +191,7 @@ static void UpdateShardPlacement(PlacementUpdateEvent *placementUpdateEvent, List *responsiveNodeList, Oid shardReplicationModeOid); /* static declarations for main logic's utility functions */ -static HTAB * ActivePlacementsHash(List *shardPlacementList); +static HTAB * ShardPlacementsListToHash(List *shardPlacementList); static bool PlacementsHashFind(HTAB *placementsHash, uint64 shardId, WorkerNode *workerNode); static void PlacementsHashEnter(HTAB *placementsHash, uint64 shardId, @@ -396,6 +397,7 @@ FullShardPlacementList(Oid relationId, ArrayType *excludedShardArray) placement->shardId = groupPlacement->shardId; placement->shardLength = groupPlacement->shardLength; placement->shardState = groupPlacement->shardState; + placement->nodeId = worker->nodeId; placement->nodeName = pstrdup(worker->workerName); placement->nodePort = worker->workerPort; placement->placementId = groupPlacement->placementId; @@ -446,14 +448,17 @@ GetRebalanceSteps(RebalanceOptions *options) /* sort the lists to make the function more deterministic */ List *activeWorkerList = SortedActiveWorkers(); - List *shardPlacementListList = NIL; + List *activeShardPlacementListList = NIL; Oid relationId = InvalidOid; foreach_oid(relationId, options->relationIdList) { List *shardPlacementList = FullShardPlacementList(relationId, options->excludedShardArray); - shardPlacementListList = lappend(shardPlacementListList, shardPlacementList); + List *activeShardPlacementListForRelation = + FilterShardPlacementList(shardPlacementList, IsActiveShardPlacement); + activeShardPlacementListList = + lappend(activeShardPlacementListList, activeShardPlacementListForRelation); } if (options->threshold < options->rebalanceStrategy->minimumThreshold) @@ -471,7 +476,7 @@ GetRebalanceSteps(RebalanceOptions *options) } return RebalancePlacementUpdates(activeWorkerList, - shardPlacementListList, + activeShardPlacementListList, options->threshold, options->maxShardMoves, options->drainOnly, @@ -795,7 +800,6 @@ rebalance_table_shards(PG_FUNCTION_ARGS) { Oid relationId = PG_GETARG_OID(0); ErrorIfMoveUnsupportedTableType(relationId); - relationIdList = list_make1_oid(relationId); } else @@ -951,9 +955,11 @@ replicate_table_shards(PG_FUNCTION_ARGS) List *activeWorkerList = SortedActiveWorkers(); List *shardPlacementList = FullShardPlacementList(relationId, excludedShardArray); + List *activeShardPlacementList = FilterShardPlacementList(shardPlacementList, + IsActiveShardPlacement); List *placementUpdateList = ReplicationPlacementUpdates(activeWorkerList, - shardPlacementList, + activeShardPlacementList, shardReplicationFactor); placementUpdateList = list_truncate(placementUpdateList, maxShardCopies); @@ -1737,13 +1743,13 @@ ExecuteRebalancerCommandInSeparateTransaction(char *command) * which is placed in the source node but not in the target node as the shard to * move. * - * The shardPlacementListList argument contains a list of lists of shard + * The activeShardPlacementListList argument contains a list of lists of active shard * placements. Each of these lists are balanced independently. This is used to * make sure different colocation groups are balanced separately, so each list * contains the placements of a colocation group. */ List * -RebalancePlacementUpdates(List *workerNodeList, List *shardPlacementListList, +RebalancePlacementUpdates(List *workerNodeList, List *activeShardPlacementListList, double threshold, int32 maxShardMoves, bool drainOnly, @@ -1755,7 +1761,7 @@ RebalancePlacementUpdates(List *workerNodeList, List *shardPlacementListList, List *shardPlacementList = NIL; List *placementUpdateList = NIL; - foreach_ptr(shardPlacementList, shardPlacementListList) + foreach_ptr(shardPlacementList, activeShardPlacementListList) { state = InitRebalanceState(workerNodeList, shardPlacementList, functions); @@ -1861,7 +1867,7 @@ InitRebalanceState(List *workerNodeList, List *shardPlacementList, RebalanceState *state = palloc0(sizeof(RebalanceState)); state->functions = functions; - state->placementsHash = ActivePlacementsHash(shardPlacementList); + state->placementsHash = ShardPlacementsListToHash(shardPlacementList); /* create empty fill state for all of the worker nodes */ foreach_ptr(workerNode, workerNodeList) @@ -2413,29 +2419,25 @@ FindAndMoveShardCost(float4 utilizationLowerBound, /* * ReplicationPlacementUpdates returns a list of placement updates which * replicates shard placements that need re-replication. To do this, the - * function loops over the shard placements, and for each shard placement + * function loops over the active shard placements, and for each shard placement * which needs to be re-replicated, it chooses an active worker node with * smallest number of shards as the target node. */ List * -ReplicationPlacementUpdates(List *workerNodeList, List *shardPlacementList, +ReplicationPlacementUpdates(List *workerNodeList, List *activeShardPlacementList, int shardReplicationFactor) { List *placementUpdateList = NIL; ListCell *shardPlacementCell = NULL; uint32 workerNodeIndex = 0; - HTAB *placementsHash = ActivePlacementsHash(shardPlacementList); + HTAB *placementsHash = ShardPlacementsListToHash(activeShardPlacementList); uint32 workerNodeCount = list_length(workerNodeList); /* get number of shards per node */ uint32 *shardCountArray = palloc0(workerNodeCount * sizeof(uint32)); - foreach(shardPlacementCell, shardPlacementList) + foreach(shardPlacementCell, activeShardPlacementList) { ShardPlacement *placement = lfirst(shardPlacementCell); - if (placement->shardState != SHARD_STATE_ACTIVE) - { - continue; - } for (workerNodeIndex = 0; workerNodeIndex < workerNodeCount; workerNodeIndex++) { @@ -2449,7 +2451,7 @@ ReplicationPlacementUpdates(List *workerNodeList, List *shardPlacementList, } } - foreach(shardPlacementCell, shardPlacementList) + foreach(shardPlacementCell, activeShardPlacementList) { WorkerNode *sourceNode = NULL; WorkerNode *targetNode = NULL; @@ -2586,11 +2588,11 @@ ShardActivePlacementCount(HTAB *activePlacementsHash, uint64 shardId, /* - * ActivePlacementsHash creates and returns a hash set for the placements in - * the given list of shard placements which are in active state. + * ShardPlacementsListToHash creates and returns a hash set from a shard + * placement list. */ static HTAB * -ActivePlacementsHash(List *shardPlacementList) +ShardPlacementsListToHash(List *shardPlacementList) { ListCell *shardPlacementCell = NULL; HASHCTL info; @@ -2609,11 +2611,8 @@ ActivePlacementsHash(List *shardPlacementList) foreach(shardPlacementCell, shardPlacementList) { ShardPlacement *shardPlacement = (ShardPlacement *) lfirst(shardPlacementCell); - if (shardPlacement->shardState == SHARD_STATE_ACTIVE) - { - void *hashKey = (void *) shardPlacement; - hash_search(shardPlacementsHash, hashKey, HASH_ENTER, NULL); - } + void *hashKey = (void *) shardPlacement; + hash_search(shardPlacementsHash, hashKey, HASH_ENTER, NULL); } return shardPlacementsHash; diff --git a/src/backend/distributed/test/shard_rebalancer.c b/src/backend/distributed/test/shard_rebalancer.c index 4cccd851d..ea770cb6e 100644 --- a/src/backend/distributed/test/shard_rebalancer.c +++ b/src/backend/distributed/test/shard_rebalancer.c @@ -20,9 +20,11 @@ #include "distributed/citus_ruleutils.h" #include "distributed/connection_management.h" #include "distributed/listutils.h" +#include "distributed/metadata_utility.h" #include "distributed/multi_physical_planner.h" #include "distributed/shard_cleaner.h" #include "distributed/shard_rebalancer.h" +#include "distributed/relay_utility.h" #include "funcapi.h" #include "miscadmin.h" #include "utils/builtins.h" @@ -85,6 +87,18 @@ run_try_drop_marked_shards(PG_FUNCTION_ARGS) } +/* + * IsActiveTestShardPlacement checks if the dummy shard placement created in tests + * are labelled as active. Note that this function does not check if the worker is also + * active, because the dummy test workers are not registered as actual workers. + */ +static inline bool +IsActiveTestShardPlacement(ShardPlacement *shardPlacement) +{ + return shardPlacement->shardState == SHARD_STATE_ACTIVE; +} + + /* * shard_placement_rebalance_array returns a list of operations which can make a * cluster consisting of given shard placements and worker nodes balanced with @@ -138,7 +152,9 @@ shard_placement_rebalance_array(PG_FUNCTION_ARGS) if (shardPlacementTestInfo->nextColocationGroup) { shardPlacementList = SortList(shardPlacementList, CompareShardPlacements); - shardPlacementListList = lappend(shardPlacementListList, shardPlacementList); + shardPlacementListList = lappend(shardPlacementListList, + FilterShardPlacementList(shardPlacementList, + IsActiveTestShardPlacement)); shardPlacementList = NIL; } shardPlacementList = lappend(shardPlacementList, @@ -290,12 +306,15 @@ shard_placement_replication_array(PG_FUNCTION_ARGS) shardPlacementTestInfo->placement); } + List *activeShardPlacementList = FilterShardPlacementList(shardPlacementList, + IsActiveTestShardPlacement); + /* sort the lists to make the function more deterministic */ workerNodeList = SortList(workerNodeList, CompareWorkerNodes); - shardPlacementList = SortList(shardPlacementList, CompareShardPlacements); + activeShardPlacementList = SortList(activeShardPlacementList, CompareShardPlacements); List *placementUpdateList = ReplicationPlacementUpdates(workerNodeList, - shardPlacementList, + activeShardPlacementList, shardReplicationFactor); ArrayType *placementUpdateJsonArray = PlacementUpdateListToJsonArray( placementUpdateList); @@ -426,6 +445,9 @@ JsonArrayToWorkerTestInfoList(ArrayType *workerNodeJsonArrayObject) workerTestInfo->capacity = JsonFieldValueUInt64Default(workerNodeJson, "capacity", 1); + workerNode->isActive = JsonFieldValueBoolDefault(workerNodeJson, + "isActive", true); + workerTestInfoList = lappend(workerTestInfoList, workerTestInfo); char *disallowedShardsString = JsonFieldValueString( workerNodeJson, "disallowed_shards"); diff --git a/src/include/distributed/metadata_utility.h b/src/include/distributed/metadata_utility.h index 3e7a3b6f3..75e76ec8d 100644 --- a/src/include/distributed/metadata_utility.h +++ b/src/include/distributed/metadata_utility.h @@ -209,6 +209,9 @@ extern ShardInterval * CopyShardInterval(ShardInterval *srcInterval); extern uint64 ShardLength(uint64 shardId); extern bool NodeGroupHasShardPlacements(int32 groupId, bool onlyConsiderActivePlacements); +extern bool IsActiveShardPlacement(ShardPlacement *ShardPlacement); +extern List * FilterShardPlacementList(List *shardPlacementList, bool (*filter)( + ShardPlacement *)); extern List * ActiveShardPlacementListOnGroup(uint64 shardId, int32 groupId); extern List * ActiveShardPlacementList(uint64 shardId); extern List * ShardPlacementListWithoutOrphanedPlacements(uint64 shardId); diff --git a/src/include/distributed/shard_rebalancer.h b/src/include/distributed/shard_rebalancer.h index 3e6d7a8b7..11730492a 100644 --- a/src/include/distributed/shard_rebalancer.h +++ b/src/include/distributed/shard_rebalancer.h @@ -182,7 +182,8 @@ extern Datum init_rebalance_monitor(PG_FUNCTION_ARGS); extern Datum finalize_rebalance_monitor(PG_FUNCTION_ARGS); extern Datum get_rebalance_progress(PG_FUNCTION_ARGS); -extern List * RebalancePlacementUpdates(List *workerNodeList, List *shardPlacementList, +extern List * RebalancePlacementUpdates(List *workerNodeList, + List *shardPlacementListList, double threshold, int32 maxShardMoves, bool drainOnly, diff --git a/src/test/regress/expected/shard_rebalancer.out b/src/test/regress/expected/shard_rebalancer.out index c63b50842..b443d99e9 100644 --- a/src/test/regress/expected/shard_rebalancer.out +++ b/src/test/regress/expected/shard_rebalancer.out @@ -2380,3 +2380,64 @@ SELECT rebalance_table_shards(); CALL citus_cleanup_orphaned_shards(); DROP TABLE test_rebalance_with_index CASCADE; +-- Test rebalancer with disabled worker +SET citus.next_shard_id TO 433500; +SET citus.shard_replication_factor TO 2; +DROP TABLE IF EXISTS test_rebalance_with_disabled_worker; +CREATE TABLE test_rebalance_with_disabled_worker (a int); +SELECT create_distributed_table('test_rebalance_with_disabled_worker', 'a', colocate_with:='none'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT citus_disable_node('localhost', :worker_2_port); + citus_disable_node +--------------------------------------------------------------------- + +(1 row) + +SELECT public.wait_until_metadata_sync(30000); + wait_until_metadata_sync +--------------------------------------------------------------------- + +(1 row) + +SELECT rebalance_table_shards('test_rebalance_with_disabled_worker'); + rebalance_table_shards +--------------------------------------------------------------------- + +(1 row) + +SELECT citus_activate_node('localhost', :worker_2_port); + citus_activate_node +--------------------------------------------------------------------- + 35 +(1 row) + +DROP TABLE test_rebalance_with_disabled_worker; +-- Test rebalance with all shards excluded +DROP TABLE IF EXISTS test_with_all_shards_excluded; +CREATE TABLE test_with_all_shards_excluded(a int PRIMARY KEY); +SELECT create_distributed_table('test_with_all_shards_excluded', 'a', colocate_with:='none', shard_count:=4); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT shardid FROM pg_dist_shard; + shardid +--------------------------------------------------------------------- + 433504 + 433505 + 433506 + 433507 +(4 rows) + +SELECT rebalance_table_shards('test_with_all_shards_excluded', excluded_shard_list:='{102073, 102074, 102075, 102076}'); + rebalance_table_shards +--------------------------------------------------------------------- + +(1 row) + +DROP TABLE test_with_all_shards_excluded; diff --git a/src/test/regress/sql/shard_rebalancer.sql b/src/test/regress/sql/shard_rebalancer.sql index 8a9ae16d9..bad28f1e9 100644 --- a/src/test/regress/sql/shard_rebalancer.sql +++ b/src/test/regress/sql/shard_rebalancer.sql @@ -1419,3 +1419,34 @@ UPDATE pg_dist_node SET shouldhaveshards=true WHERE nodeport = :worker_2_port; SELECT rebalance_table_shards(); CALL citus_cleanup_orphaned_shards(); DROP TABLE test_rebalance_with_index CASCADE; + + +-- Test rebalancer with disabled worker + +SET citus.next_shard_id TO 433500; +SET citus.shard_replication_factor TO 2; + +DROP TABLE IF EXISTS test_rebalance_with_disabled_worker; +CREATE TABLE test_rebalance_with_disabled_worker (a int); +SELECT create_distributed_table('test_rebalance_with_disabled_worker', 'a', colocate_with:='none'); + +SELECT citus_disable_node('localhost', :worker_2_port); +SELECT public.wait_until_metadata_sync(30000); + +SELECT rebalance_table_shards('test_rebalance_with_disabled_worker'); + +SELECT citus_activate_node('localhost', :worker_2_port); + +DROP TABLE test_rebalance_with_disabled_worker; + +-- Test rebalance with all shards excluded + +DROP TABLE IF EXISTS test_with_all_shards_excluded; +CREATE TABLE test_with_all_shards_excluded(a int PRIMARY KEY); +SELECT create_distributed_table('test_with_all_shards_excluded', 'a', colocate_with:='none', shard_count:=4); + +SELECT shardid FROM pg_dist_shard; + +SELECT rebalance_table_shards('test_with_all_shards_excluded', excluded_shard_list:='{102073, 102074, 102075, 102076}'); + +DROP TABLE test_with_all_shards_excluded;