Handle rebalance / replication when a node is disabled (Fix #5664) (#5729)

The issue in question is caused when rebalance / replication call `FullShardPlacementList` which returns all shard placements (including those in disabled nodes with `citus_disable_node`).  Eventually, `FindFillStateForPlacement` looks for the state across active workers and fails to find a state for the placements which are in the disabled workers causing a seg fault shortly after.

Approach:
* `ActivePlacementHash` was not using the status of the shard placement's node to determine if the node it is active. Initially, I just fixed that.
* Additionally, I refactored the code which handles active shards in replication / rebalance to:
	* use a single function to determine if a shard placement is active. 
	* do the shard active shard filtering before calling `RebalancePlacementUpdates` and `ReplicationPlacementUpdates`, so test methods like `shard_placement_rebalance_array` and `shard_placement_replication_array` which have different shard placement active requirements can do their own filtering while using the same rebalance / replicate logic that `rebalance_table_shards` and `replicate_table_shards` use. 

Fix #5664
velioglu/fe2
Gledis Zeneli 2022-02-25 19:54:30 +03:00 committed by GitHub
parent 6c25eea62f
commit b825232ecb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 210 additions and 61 deletions

View File

@ -1291,6 +1291,52 @@ NodeGroupHasShardPlacements(int32 groupId, bool onlyConsiderActivePlacements)
}
/*
* IsActiveShardPlacement checks if the shard placement is labelled as
* active, and that it is placed in an active worker.
* Expects shard worker to not be NULL.
*/
bool
IsActiveShardPlacement(ShardPlacement *shardPlacement)
{
WorkerNode *workerNode =
FindWorkerNode(shardPlacement->nodeName, shardPlacement->nodePort);
if (!workerNode)
{
ereport(ERROR, (errmsg("There is a shard placement on node %s:%d but "
"could not find the node.", shardPlacement->nodeName,
shardPlacement->nodePort)));
}
return shardPlacement->shardState == SHARD_STATE_ACTIVE &&
workerNode->isActive;
}
/*
* FilterShardPlacementList filters a list of shard placements based on a filter.
* Keep only the shard for which the filter function returns true.
*/
List *
FilterShardPlacementList(List *shardPlacementList, bool (*filter)(ShardPlacement *))
{
List *filteredShardPlacementList = NIL;
ShardPlacement *shardPlacement = NULL;
foreach_ptr(shardPlacement, shardPlacementList)
{
if (filter(shardPlacement))
{
filteredShardPlacementList = lappend(filteredShardPlacementList,
shardPlacement);
}
}
return filteredShardPlacementList;
}
/*
* ActiveShardPlacementListOnGroup returns a list of active shard placements
* that are sitting on group with groupId for given shardId.
@ -1323,53 +1369,39 @@ ActiveShardPlacementListOnGroup(uint64 shardId, int32 groupId)
List *
ActiveShardPlacementList(uint64 shardId)
{
List *activePlacementList = NIL;
List *shardPlacementList =
ShardPlacementListIncludingOrphanedPlacements(shardId);
ShardPlacement *shardPlacement = NULL;
foreach_ptr(shardPlacement, shardPlacementList)
{
WorkerNode *workerNode =
FindWorkerNode(shardPlacement->nodeName, shardPlacement->nodePort);
/*
* We have already resolved the placement to node, so would have
* errored out earlier.
*/
Assert(workerNode != NULL);
if (shardPlacement->shardState == SHARD_STATE_ACTIVE &&
workerNode->isActive)
{
activePlacementList = lappend(activePlacementList, shardPlacement);
}
}
List *activePlacementList = FilterShardPlacementList(shardPlacementList,
IsActiveShardPlacement);
return SortList(activePlacementList, CompareShardPlacementsByWorker);
}
/*
* IsShardPlacementNotOrphaned checks returns true if a shard placement is not orphaned
* Orphaned shards are shards marked to be deleted at a later point (shardstate = 4).
*/
static inline bool
IsShardPlacementNotOrphaned(ShardPlacement *shardPlacement)
{
return shardPlacement->shardState != SHARD_STATE_TO_DELETE;
}
/*
* ShardPlacementListWithoutOrphanedPlacements returns shard placements exluding
* the ones that are orphaned, because they are marked to be deleted at a later
* point (shardstate = 4).
* the ones that are orphaned.
*/
List *
ShardPlacementListWithoutOrphanedPlacements(uint64 shardId)
{
List *activePlacementList = NIL;
List *shardPlacementList =
ShardPlacementListIncludingOrphanedPlacements(shardId);
ShardPlacement *shardPlacement = NULL;
foreach_ptr(shardPlacement, shardPlacementList)
{
if (shardPlacement->shardState != SHARD_STATE_TO_DELETE)
{
activePlacementList = lappend(activePlacementList, shardPlacement);
}
}
List *activePlacementList = FilterShardPlacementList(shardPlacementList,
IsShardPlacementNotOrphaned);
return SortList(activePlacementList, CompareShardPlacementsByWorker);
}

View File

@ -34,6 +34,7 @@
#include "distributed/lock_graph.h"
#include "distributed/coordinator_protocol.h"
#include "distributed/metadata_cache.h"
#include "distributed/metadata_utility.h"
#include "distributed/multi_client_executor.h"
#include "distributed/multi_progress.h"
#include "distributed/multi_server_executor.h"
@ -190,7 +191,7 @@ static void UpdateShardPlacement(PlacementUpdateEvent *placementUpdateEvent,
List *responsiveNodeList, Oid shardReplicationModeOid);
/* static declarations for main logic's utility functions */
static HTAB * ActivePlacementsHash(List *shardPlacementList);
static HTAB * ShardPlacementsListToHash(List *shardPlacementList);
static bool PlacementsHashFind(HTAB *placementsHash, uint64 shardId,
WorkerNode *workerNode);
static void PlacementsHashEnter(HTAB *placementsHash, uint64 shardId,
@ -396,6 +397,7 @@ FullShardPlacementList(Oid relationId, ArrayType *excludedShardArray)
placement->shardId = groupPlacement->shardId;
placement->shardLength = groupPlacement->shardLength;
placement->shardState = groupPlacement->shardState;
placement->nodeId = worker->nodeId;
placement->nodeName = pstrdup(worker->workerName);
placement->nodePort = worker->workerPort;
placement->placementId = groupPlacement->placementId;
@ -446,14 +448,17 @@ GetRebalanceSteps(RebalanceOptions *options)
/* sort the lists to make the function more deterministic */
List *activeWorkerList = SortedActiveWorkers();
List *shardPlacementListList = NIL;
List *activeShardPlacementListList = NIL;
Oid relationId = InvalidOid;
foreach_oid(relationId, options->relationIdList)
{
List *shardPlacementList = FullShardPlacementList(relationId,
options->excludedShardArray);
shardPlacementListList = lappend(shardPlacementListList, shardPlacementList);
List *activeShardPlacementListForRelation =
FilterShardPlacementList(shardPlacementList, IsActiveShardPlacement);
activeShardPlacementListList =
lappend(activeShardPlacementListList, activeShardPlacementListForRelation);
}
if (options->threshold < options->rebalanceStrategy->minimumThreshold)
@ -471,7 +476,7 @@ GetRebalanceSteps(RebalanceOptions *options)
}
return RebalancePlacementUpdates(activeWorkerList,
shardPlacementListList,
activeShardPlacementListList,
options->threshold,
options->maxShardMoves,
options->drainOnly,
@ -795,7 +800,6 @@ rebalance_table_shards(PG_FUNCTION_ARGS)
{
Oid relationId = PG_GETARG_OID(0);
ErrorIfMoveUnsupportedTableType(relationId);
relationIdList = list_make1_oid(relationId);
}
else
@ -951,9 +955,11 @@ replicate_table_shards(PG_FUNCTION_ARGS)
List *activeWorkerList = SortedActiveWorkers();
List *shardPlacementList = FullShardPlacementList(relationId, excludedShardArray);
List *activeShardPlacementList = FilterShardPlacementList(shardPlacementList,
IsActiveShardPlacement);
List *placementUpdateList = ReplicationPlacementUpdates(activeWorkerList,
shardPlacementList,
activeShardPlacementList,
shardReplicationFactor);
placementUpdateList = list_truncate(placementUpdateList, maxShardCopies);
@ -1737,13 +1743,13 @@ ExecuteRebalancerCommandInSeparateTransaction(char *command)
* which is placed in the source node but not in the target node as the shard to
* move.
*
* The shardPlacementListList argument contains a list of lists of shard
* The activeShardPlacementListList argument contains a list of lists of active shard
* placements. Each of these lists are balanced independently. This is used to
* make sure different colocation groups are balanced separately, so each list
* contains the placements of a colocation group.
*/
List *
RebalancePlacementUpdates(List *workerNodeList, List *shardPlacementListList,
RebalancePlacementUpdates(List *workerNodeList, List *activeShardPlacementListList,
double threshold,
int32 maxShardMoves,
bool drainOnly,
@ -1755,7 +1761,7 @@ RebalancePlacementUpdates(List *workerNodeList, List *shardPlacementListList,
List *shardPlacementList = NIL;
List *placementUpdateList = NIL;
foreach_ptr(shardPlacementList, shardPlacementListList)
foreach_ptr(shardPlacementList, activeShardPlacementListList)
{
state = InitRebalanceState(workerNodeList, shardPlacementList,
functions);
@ -1861,7 +1867,7 @@ InitRebalanceState(List *workerNodeList, List *shardPlacementList,
RebalanceState *state = palloc0(sizeof(RebalanceState));
state->functions = functions;
state->placementsHash = ActivePlacementsHash(shardPlacementList);
state->placementsHash = ShardPlacementsListToHash(shardPlacementList);
/* create empty fill state for all of the worker nodes */
foreach_ptr(workerNode, workerNodeList)
@ -2413,29 +2419,25 @@ FindAndMoveShardCost(float4 utilizationLowerBound,
/*
* ReplicationPlacementUpdates returns a list of placement updates which
* replicates shard placements that need re-replication. To do this, the
* function loops over the shard placements, and for each shard placement
* function loops over the active shard placements, and for each shard placement
* which needs to be re-replicated, it chooses an active worker node with
* smallest number of shards as the target node.
*/
List *
ReplicationPlacementUpdates(List *workerNodeList, List *shardPlacementList,
ReplicationPlacementUpdates(List *workerNodeList, List *activeShardPlacementList,
int shardReplicationFactor)
{
List *placementUpdateList = NIL;
ListCell *shardPlacementCell = NULL;
uint32 workerNodeIndex = 0;
HTAB *placementsHash = ActivePlacementsHash(shardPlacementList);
HTAB *placementsHash = ShardPlacementsListToHash(activeShardPlacementList);
uint32 workerNodeCount = list_length(workerNodeList);
/* get number of shards per node */
uint32 *shardCountArray = palloc0(workerNodeCount * sizeof(uint32));
foreach(shardPlacementCell, shardPlacementList)
foreach(shardPlacementCell, activeShardPlacementList)
{
ShardPlacement *placement = lfirst(shardPlacementCell);
if (placement->shardState != SHARD_STATE_ACTIVE)
{
continue;
}
for (workerNodeIndex = 0; workerNodeIndex < workerNodeCount; workerNodeIndex++)
{
@ -2449,7 +2451,7 @@ ReplicationPlacementUpdates(List *workerNodeList, List *shardPlacementList,
}
}
foreach(shardPlacementCell, shardPlacementList)
foreach(shardPlacementCell, activeShardPlacementList)
{
WorkerNode *sourceNode = NULL;
WorkerNode *targetNode = NULL;
@ -2586,11 +2588,11 @@ ShardActivePlacementCount(HTAB *activePlacementsHash, uint64 shardId,
/*
* ActivePlacementsHash creates and returns a hash set for the placements in
* the given list of shard placements which are in active state.
* ShardPlacementsListToHash creates and returns a hash set from a shard
* placement list.
*/
static HTAB *
ActivePlacementsHash(List *shardPlacementList)
ShardPlacementsListToHash(List *shardPlacementList)
{
ListCell *shardPlacementCell = NULL;
HASHCTL info;
@ -2609,11 +2611,8 @@ ActivePlacementsHash(List *shardPlacementList)
foreach(shardPlacementCell, shardPlacementList)
{
ShardPlacement *shardPlacement = (ShardPlacement *) lfirst(shardPlacementCell);
if (shardPlacement->shardState == SHARD_STATE_ACTIVE)
{
void *hashKey = (void *) shardPlacement;
hash_search(shardPlacementsHash, hashKey, HASH_ENTER, NULL);
}
void *hashKey = (void *) shardPlacement;
hash_search(shardPlacementsHash, hashKey, HASH_ENTER, NULL);
}
return shardPlacementsHash;

View File

@ -20,9 +20,11 @@
#include "distributed/citus_ruleutils.h"
#include "distributed/connection_management.h"
#include "distributed/listutils.h"
#include "distributed/metadata_utility.h"
#include "distributed/multi_physical_planner.h"
#include "distributed/shard_cleaner.h"
#include "distributed/shard_rebalancer.h"
#include "distributed/relay_utility.h"
#include "funcapi.h"
#include "miscadmin.h"
#include "utils/builtins.h"
@ -85,6 +87,18 @@ run_try_drop_marked_shards(PG_FUNCTION_ARGS)
}
/*
* IsActiveTestShardPlacement checks if the dummy shard placement created in tests
* are labelled as active. Note that this function does not check if the worker is also
* active, because the dummy test workers are not registered as actual workers.
*/
static inline bool
IsActiveTestShardPlacement(ShardPlacement *shardPlacement)
{
return shardPlacement->shardState == SHARD_STATE_ACTIVE;
}
/*
* shard_placement_rebalance_array returns a list of operations which can make a
* cluster consisting of given shard placements and worker nodes balanced with
@ -138,7 +152,9 @@ shard_placement_rebalance_array(PG_FUNCTION_ARGS)
if (shardPlacementTestInfo->nextColocationGroup)
{
shardPlacementList = SortList(shardPlacementList, CompareShardPlacements);
shardPlacementListList = lappend(shardPlacementListList, shardPlacementList);
shardPlacementListList = lappend(shardPlacementListList,
FilterShardPlacementList(shardPlacementList,
IsActiveTestShardPlacement));
shardPlacementList = NIL;
}
shardPlacementList = lappend(shardPlacementList,
@ -290,12 +306,15 @@ shard_placement_replication_array(PG_FUNCTION_ARGS)
shardPlacementTestInfo->placement);
}
List *activeShardPlacementList = FilterShardPlacementList(shardPlacementList,
IsActiveTestShardPlacement);
/* sort the lists to make the function more deterministic */
workerNodeList = SortList(workerNodeList, CompareWorkerNodes);
shardPlacementList = SortList(shardPlacementList, CompareShardPlacements);
activeShardPlacementList = SortList(activeShardPlacementList, CompareShardPlacements);
List *placementUpdateList = ReplicationPlacementUpdates(workerNodeList,
shardPlacementList,
activeShardPlacementList,
shardReplicationFactor);
ArrayType *placementUpdateJsonArray = PlacementUpdateListToJsonArray(
placementUpdateList);
@ -426,6 +445,9 @@ JsonArrayToWorkerTestInfoList(ArrayType *workerNodeJsonArrayObject)
workerTestInfo->capacity = JsonFieldValueUInt64Default(workerNodeJson,
"capacity", 1);
workerNode->isActive = JsonFieldValueBoolDefault(workerNodeJson,
"isActive", true);
workerTestInfoList = lappend(workerTestInfoList, workerTestInfo);
char *disallowedShardsString = JsonFieldValueString(
workerNodeJson, "disallowed_shards");

View File

@ -209,6 +209,9 @@ extern ShardInterval * CopyShardInterval(ShardInterval *srcInterval);
extern uint64 ShardLength(uint64 shardId);
extern bool NodeGroupHasShardPlacements(int32 groupId,
bool onlyConsiderActivePlacements);
extern bool IsActiveShardPlacement(ShardPlacement *ShardPlacement);
extern List * FilterShardPlacementList(List *shardPlacementList, bool (*filter)(
ShardPlacement *));
extern List * ActiveShardPlacementListOnGroup(uint64 shardId, int32 groupId);
extern List * ActiveShardPlacementList(uint64 shardId);
extern List * ShardPlacementListWithoutOrphanedPlacements(uint64 shardId);

View File

@ -182,7 +182,8 @@ extern Datum init_rebalance_monitor(PG_FUNCTION_ARGS);
extern Datum finalize_rebalance_monitor(PG_FUNCTION_ARGS);
extern Datum get_rebalance_progress(PG_FUNCTION_ARGS);
extern List * RebalancePlacementUpdates(List *workerNodeList, List *shardPlacementList,
extern List * RebalancePlacementUpdates(List *workerNodeList,
List *shardPlacementListList,
double threshold,
int32 maxShardMoves,
bool drainOnly,

View File

@ -2380,3 +2380,64 @@ SELECT rebalance_table_shards();
CALL citus_cleanup_orphaned_shards();
DROP TABLE test_rebalance_with_index CASCADE;
-- Test rebalancer with disabled worker
SET citus.next_shard_id TO 433500;
SET citus.shard_replication_factor TO 2;
DROP TABLE IF EXISTS test_rebalance_with_disabled_worker;
CREATE TABLE test_rebalance_with_disabled_worker (a int);
SELECT create_distributed_table('test_rebalance_with_disabled_worker', 'a', colocate_with:='none');
create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT citus_disable_node('localhost', :worker_2_port);
citus_disable_node
---------------------------------------------------------------------
(1 row)
SELECT public.wait_until_metadata_sync(30000);
wait_until_metadata_sync
---------------------------------------------------------------------
(1 row)
SELECT rebalance_table_shards('test_rebalance_with_disabled_worker');
rebalance_table_shards
---------------------------------------------------------------------
(1 row)
SELECT citus_activate_node('localhost', :worker_2_port);
citus_activate_node
---------------------------------------------------------------------
35
(1 row)
DROP TABLE test_rebalance_with_disabled_worker;
-- Test rebalance with all shards excluded
DROP TABLE IF EXISTS test_with_all_shards_excluded;
CREATE TABLE test_with_all_shards_excluded(a int PRIMARY KEY);
SELECT create_distributed_table('test_with_all_shards_excluded', 'a', colocate_with:='none', shard_count:=4);
create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT shardid FROM pg_dist_shard;
shardid
---------------------------------------------------------------------
433504
433505
433506
433507
(4 rows)
SELECT rebalance_table_shards('test_with_all_shards_excluded', excluded_shard_list:='{102073, 102074, 102075, 102076}');
rebalance_table_shards
---------------------------------------------------------------------
(1 row)
DROP TABLE test_with_all_shards_excluded;

View File

@ -1419,3 +1419,34 @@ UPDATE pg_dist_node SET shouldhaveshards=true WHERE nodeport = :worker_2_port;
SELECT rebalance_table_shards();
CALL citus_cleanup_orphaned_shards();
DROP TABLE test_rebalance_with_index CASCADE;
-- Test rebalancer with disabled worker
SET citus.next_shard_id TO 433500;
SET citus.shard_replication_factor TO 2;
DROP TABLE IF EXISTS test_rebalance_with_disabled_worker;
CREATE TABLE test_rebalance_with_disabled_worker (a int);
SELECT create_distributed_table('test_rebalance_with_disabled_worker', 'a', colocate_with:='none');
SELECT citus_disable_node('localhost', :worker_2_port);
SELECT public.wait_until_metadata_sync(30000);
SELECT rebalance_table_shards('test_rebalance_with_disabled_worker');
SELECT citus_activate_node('localhost', :worker_2_port);
DROP TABLE test_rebalance_with_disabled_worker;
-- Test rebalance with all shards excluded
DROP TABLE IF EXISTS test_with_all_shards_excluded;
CREATE TABLE test_with_all_shards_excluded(a int PRIMARY KEY);
SELECT create_distributed_table('test_with_all_shards_excluded', 'a', colocate_with:='none', shard_count:=4);
SELECT shardid FROM pg_dist_shard;
SELECT rebalance_table_shards('test_with_all_shards_excluded', excluded_shard_list:='{102073, 102074, 102075, 102076}');
DROP TABLE test_with_all_shards_excluded;