From 121f5c427115b750e00e3c02c32abb1c857c799a Mon Sep 17 00:00:00 2001 From: Onder Kalaci Date: Wed, 17 Nov 2021 11:04:39 +0100 Subject: [PATCH] Active placements can only be on active nodes We re-define the meaning of active shard placement. It used to only be defined via shardstate == SHARD_STATE_ACTIVE. Now, we also add one more check. The worker node that the placement is on should be active as well. This is a preparation for supporting citus_disable_node() for MX with multiple failures at the same time. With this change, the maintanince daemon only needs to sync the "node metadata" (e.g., pg_dist_node), not the shard metadata. --- .../distributed/metadata/metadata_sync.c | 28 ++++++++++++---- .../distributed/metadata/metadata_utility.c | 12 ++++++- .../planner/multi_physical_planner.c | 33 +------------------ 3 files changed, 34 insertions(+), 39 deletions(-) diff --git a/src/backend/distributed/metadata/metadata_sync.c b/src/backend/distributed/metadata/metadata_sync.c index 56360b3a7..efac2d341 100644 --- a/src/backend/distributed/metadata/metadata_sync.c +++ b/src/backend/distributed/metadata/metadata_sync.c @@ -954,7 +954,7 @@ ShardListInsertCommand(List *shardIntervalList) "shardlength, groupid, placementid) AS (VALUES "); ShardInterval *shardInterval = NULL; - bool isFirstValue = true; + bool firstPlacementProcessed = false; foreach_ptr(shardInterval, shardIntervalList) { uint64 shardId = shardInterval->shardId; @@ -963,7 +963,7 @@ ShardListInsertCommand(List *shardIntervalList) ShardPlacement *placement = NULL; foreach_ptr(placement, shardPlacementList) { - if (!isFirstValue) + if (firstPlacementProcessed) { /* * As long as this is not the first placement of the first shard, @@ -971,7 +971,7 @@ ShardListInsertCommand(List *shardIntervalList) */ appendStringInfo(insertPlacementCommand, ", "); } - isFirstValue = false; + firstPlacementProcessed = true; appendStringInfo(insertPlacementCommand, "(%ld, %d, %ld, %d, %ld)", @@ -1046,9 +1046,25 @@ ShardListInsertCommand(List *shardIntervalList) "storagetype, shardminvalue, shardmaxvalue) " "FROM shard_data;"); - /* first insert shards, than the placements */ - commandList = lappend(commandList, insertShardCommand->data); - commandList = lappend(commandList, insertPlacementCommand->data); + /* + * There are no active placements for the table, so do not create the + * command as it'd lead to syntax error. + * + * This is normally not an expected situation, however the current + * implementation of citus_disable_node allows to disable nodes with + * the only active placements. So, for example a single shard/placement + * distributed table on a disabled node might trigger zero placement + * case. + * + * TODO: remove this check once citus_disable_node errors out for + * the above scenario. + */ + if (firstPlacementProcessed) + { + /* first insert shards, than the placements */ + commandList = lappend(commandList, insertShardCommand->data); + commandList = lappend(commandList, insertPlacementCommand->data); + } return commandList; } diff --git a/src/backend/distributed/metadata/metadata_utility.c b/src/backend/distributed/metadata/metadata_utility.c index b610cc3b4..36152f69e 100644 --- a/src/backend/distributed/metadata/metadata_utility.c +++ b/src/backend/distributed/metadata/metadata_utility.c @@ -1353,7 +1353,17 @@ ActiveShardPlacementList(uint64 shardId) ShardPlacement *shardPlacement = NULL; foreach_ptr(shardPlacement, shardPlacementList) { - if (shardPlacement->shardState == SHARD_STATE_ACTIVE) + WorkerNode *workerNode = + FindWorkerNode(shardPlacement->nodeName, shardPlacement->nodePort); + + /* + * We have already resolved the placement to node, so would have + * errored out earlier. + */ + Assert(workerNode != NULL); + + if (shardPlacement->shardState == SHARD_STATE_ACTIVE && + workerNode->isActive) { activePlacementList = lappend(activePlacementList, shardPlacement); } diff --git a/src/backend/distributed/planner/multi_physical_planner.c b/src/backend/distributed/planner/multi_physical_planner.c index a8f4f50c9..28b750e8f 100644 --- a/src/backend/distributed/planner/multi_physical_planner.c +++ b/src/backend/distributed/planner/multi_physical_planner.c @@ -211,7 +211,6 @@ static List * ReorderAndAssignTaskList(List *taskList, ReorderFunction reorderFunction); static int CompareTasksByShardId(const void *leftElement, const void *rightElement); static List * ActiveShardPlacementLists(List *taskList); -static List * ActivePlacementList(List *placementList); static List * LeftRotateList(List *list, uint32 rotateCount); static List * FindDependentMergeTaskList(Task *sqlTask); static List * AssignDualHashTaskList(List *taskList); @@ -5381,10 +5380,8 @@ ActiveShardPlacementLists(List *taskList) { Task *task = (Task *) lfirst(taskCell); uint64 anchorShardId = task->anchorShardId; - List *shardPlacementList = ActiveShardPlacementList(anchorShardId); + List *activeShardPlacementList = ActiveShardPlacementList(anchorShardId); - /* filter out shard placements that reside in inactive nodes */ - List *activeShardPlacementList = ActivePlacementList(shardPlacementList); if (activeShardPlacementList == NIL) { ereport(ERROR, @@ -5430,34 +5427,6 @@ CompareShardPlacements(const void *leftElement, const void *rightElement) } -/* - * ActivePlacementList walks over shard placements in the given list, and finds - * the corresponding worker node for each placement. The function then checks if - * that worker node is active, and if it is, appends the placement to a new list. - * The function last returns the new placement list. - */ -static List * -ActivePlacementList(List *placementList) -{ - List *activePlacementList = NIL; - ListCell *placementCell = NULL; - - foreach(placementCell, placementList) - { - ShardPlacement *placement = (ShardPlacement *) lfirst(placementCell); - - /* check if the worker node for this shard placement is active */ - WorkerNode *workerNode = FindWorkerNode(placement->nodeName, placement->nodePort); - if (workerNode != NULL && workerNode->isActive) - { - activePlacementList = lappend(activePlacementList, placement); - } - } - - return activePlacementList; -} - - /* * LeftRotateList returns a copy of the given list that has been cyclically * shifted to the left by the given rotation count. For this, the function