mirror of https://github.com/citusdata/citus.git
Active placements can only be on active nodes
We re-define the meaning of active shard placement. It used to only be defined via shardstate == SHARD_STATE_ACTIVE. Now, we also add one more check. The worker node that the placement is on should be active as well. This is a preparation for supporting citus_disable_node() for MX with multiple failures at the same time. With this change, the maintanince daemon only needs to sync the "node metadata" (e.g., pg_dist_node), not the shard metadata.pull/5470/head
parent
d6cbfd0886
commit
121f5c4271
|
@ -954,7 +954,7 @@ ShardListInsertCommand(List *shardIntervalList)
|
||||||
"shardlength, groupid, placementid) AS (VALUES ");
|
"shardlength, groupid, placementid) AS (VALUES ");
|
||||||
|
|
||||||
ShardInterval *shardInterval = NULL;
|
ShardInterval *shardInterval = NULL;
|
||||||
bool isFirstValue = true;
|
bool firstPlacementProcessed = false;
|
||||||
foreach_ptr(shardInterval, shardIntervalList)
|
foreach_ptr(shardInterval, shardIntervalList)
|
||||||
{
|
{
|
||||||
uint64 shardId = shardInterval->shardId;
|
uint64 shardId = shardInterval->shardId;
|
||||||
|
@ -963,7 +963,7 @@ ShardListInsertCommand(List *shardIntervalList)
|
||||||
ShardPlacement *placement = NULL;
|
ShardPlacement *placement = NULL;
|
||||||
foreach_ptr(placement, shardPlacementList)
|
foreach_ptr(placement, shardPlacementList)
|
||||||
{
|
{
|
||||||
if (!isFirstValue)
|
if (firstPlacementProcessed)
|
||||||
{
|
{
|
||||||
/*
|
/*
|
||||||
* As long as this is not the first placement of the first shard,
|
* As long as this is not the first placement of the first shard,
|
||||||
|
@ -971,7 +971,7 @@ ShardListInsertCommand(List *shardIntervalList)
|
||||||
*/
|
*/
|
||||||
appendStringInfo(insertPlacementCommand, ", ");
|
appendStringInfo(insertPlacementCommand, ", ");
|
||||||
}
|
}
|
||||||
isFirstValue = false;
|
firstPlacementProcessed = true;
|
||||||
|
|
||||||
appendStringInfo(insertPlacementCommand,
|
appendStringInfo(insertPlacementCommand,
|
||||||
"(%ld, %d, %ld, %d, %ld)",
|
"(%ld, %d, %ld, %d, %ld)",
|
||||||
|
@ -1046,9 +1046,25 @@ ShardListInsertCommand(List *shardIntervalList)
|
||||||
"storagetype, shardminvalue, shardmaxvalue) "
|
"storagetype, shardminvalue, shardmaxvalue) "
|
||||||
"FROM shard_data;");
|
"FROM shard_data;");
|
||||||
|
|
||||||
|
/*
|
||||||
|
* There are no active placements for the table, so do not create the
|
||||||
|
* command as it'd lead to syntax error.
|
||||||
|
*
|
||||||
|
* This is normally not an expected situation, however the current
|
||||||
|
* implementation of citus_disable_node allows to disable nodes with
|
||||||
|
* the only active placements. So, for example a single shard/placement
|
||||||
|
* distributed table on a disabled node might trigger zero placement
|
||||||
|
* case.
|
||||||
|
*
|
||||||
|
* TODO: remove this check once citus_disable_node errors out for
|
||||||
|
* the above scenario.
|
||||||
|
*/
|
||||||
|
if (firstPlacementProcessed)
|
||||||
|
{
|
||||||
/* first insert shards, than the placements */
|
/* first insert shards, than the placements */
|
||||||
commandList = lappend(commandList, insertShardCommand->data);
|
commandList = lappend(commandList, insertShardCommand->data);
|
||||||
commandList = lappend(commandList, insertPlacementCommand->data);
|
commandList = lappend(commandList, insertPlacementCommand->data);
|
||||||
|
}
|
||||||
|
|
||||||
return commandList;
|
return commandList;
|
||||||
}
|
}
|
||||||
|
|
|
@ -1353,7 +1353,17 @@ ActiveShardPlacementList(uint64 shardId)
|
||||||
ShardPlacement *shardPlacement = NULL;
|
ShardPlacement *shardPlacement = NULL;
|
||||||
foreach_ptr(shardPlacement, shardPlacementList)
|
foreach_ptr(shardPlacement, shardPlacementList)
|
||||||
{
|
{
|
||||||
if (shardPlacement->shardState == SHARD_STATE_ACTIVE)
|
WorkerNode *workerNode =
|
||||||
|
FindWorkerNode(shardPlacement->nodeName, shardPlacement->nodePort);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* We have already resolved the placement to node, so would have
|
||||||
|
* errored out earlier.
|
||||||
|
*/
|
||||||
|
Assert(workerNode != NULL);
|
||||||
|
|
||||||
|
if (shardPlacement->shardState == SHARD_STATE_ACTIVE &&
|
||||||
|
workerNode->isActive)
|
||||||
{
|
{
|
||||||
activePlacementList = lappend(activePlacementList, shardPlacement);
|
activePlacementList = lappend(activePlacementList, shardPlacement);
|
||||||
}
|
}
|
||||||
|
|
|
@ -211,7 +211,6 @@ static List * ReorderAndAssignTaskList(List *taskList,
|
||||||
ReorderFunction reorderFunction);
|
ReorderFunction reorderFunction);
|
||||||
static int CompareTasksByShardId(const void *leftElement, const void *rightElement);
|
static int CompareTasksByShardId(const void *leftElement, const void *rightElement);
|
||||||
static List * ActiveShardPlacementLists(List *taskList);
|
static List * ActiveShardPlacementLists(List *taskList);
|
||||||
static List * ActivePlacementList(List *placementList);
|
|
||||||
static List * LeftRotateList(List *list, uint32 rotateCount);
|
static List * LeftRotateList(List *list, uint32 rotateCount);
|
||||||
static List * FindDependentMergeTaskList(Task *sqlTask);
|
static List * FindDependentMergeTaskList(Task *sqlTask);
|
||||||
static List * AssignDualHashTaskList(List *taskList);
|
static List * AssignDualHashTaskList(List *taskList);
|
||||||
|
@ -5381,10 +5380,8 @@ ActiveShardPlacementLists(List *taskList)
|
||||||
{
|
{
|
||||||
Task *task = (Task *) lfirst(taskCell);
|
Task *task = (Task *) lfirst(taskCell);
|
||||||
uint64 anchorShardId = task->anchorShardId;
|
uint64 anchorShardId = task->anchorShardId;
|
||||||
List *shardPlacementList = ActiveShardPlacementList(anchorShardId);
|
List *activeShardPlacementList = ActiveShardPlacementList(anchorShardId);
|
||||||
|
|
||||||
/* filter out shard placements that reside in inactive nodes */
|
|
||||||
List *activeShardPlacementList = ActivePlacementList(shardPlacementList);
|
|
||||||
if (activeShardPlacementList == NIL)
|
if (activeShardPlacementList == NIL)
|
||||||
{
|
{
|
||||||
ereport(ERROR,
|
ereport(ERROR,
|
||||||
|
@ -5430,34 +5427,6 @@ CompareShardPlacements(const void *leftElement, const void *rightElement)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* ActivePlacementList walks over shard placements in the given list, and finds
|
|
||||||
* the corresponding worker node for each placement. The function then checks if
|
|
||||||
* that worker node is active, and if it is, appends the placement to a new list.
|
|
||||||
* The function last returns the new placement list.
|
|
||||||
*/
|
|
||||||
static List *
|
|
||||||
ActivePlacementList(List *placementList)
|
|
||||||
{
|
|
||||||
List *activePlacementList = NIL;
|
|
||||||
ListCell *placementCell = NULL;
|
|
||||||
|
|
||||||
foreach(placementCell, placementList)
|
|
||||||
{
|
|
||||||
ShardPlacement *placement = (ShardPlacement *) lfirst(placementCell);
|
|
||||||
|
|
||||||
/* check if the worker node for this shard placement is active */
|
|
||||||
WorkerNode *workerNode = FindWorkerNode(placement->nodeName, placement->nodePort);
|
|
||||||
if (workerNode != NULL && workerNode->isActive)
|
|
||||||
{
|
|
||||||
activePlacementList = lappend(activePlacementList, placement);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return activePlacementList;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* LeftRotateList returns a copy of the given list that has been cyclically
|
* LeftRotateList returns a copy of the given list that has been cyclically
|
||||||
* shifted to the left by the given rotation count. For this, the function
|
* shifted to the left by the given rotation count. For this, the function
|
||||||
|
|
Loading…
Reference in New Issue