Merge pull request #5470 from citusdata/redefine_active_placements

Active placements can only be on active nodes
pull/5496/head
Önder Kalacı 2021-11-26 09:23:28 +01:00 committed by GitHub
commit 0beb1aba62
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 34 additions and 39 deletions

View File

@ -954,7 +954,7 @@ ShardListInsertCommand(List *shardIntervalList)
"shardlength, groupid, placementid) AS (VALUES ");
ShardInterval *shardInterval = NULL;
bool isFirstValue = true;
bool firstPlacementProcessed = false;
foreach_ptr(shardInterval, shardIntervalList)
{
uint64 shardId = shardInterval->shardId;
@ -963,7 +963,7 @@ ShardListInsertCommand(List *shardIntervalList)
ShardPlacement *placement = NULL;
foreach_ptr(placement, shardPlacementList)
{
if (!isFirstValue)
if (firstPlacementProcessed)
{
/*
* As long as this is not the first placement of the first shard,
@ -971,7 +971,7 @@ ShardListInsertCommand(List *shardIntervalList)
*/
appendStringInfo(insertPlacementCommand, ", ");
}
isFirstValue = false;
firstPlacementProcessed = true;
appendStringInfo(insertPlacementCommand,
"(%ld, %d, %ld, %d, %ld)",
@ -1046,9 +1046,25 @@ ShardListInsertCommand(List *shardIntervalList)
"storagetype, shardminvalue, shardmaxvalue) "
"FROM shard_data;");
/* first insert shards, than the placements */
commandList = lappend(commandList, insertShardCommand->data);
commandList = lappend(commandList, insertPlacementCommand->data);
/*
* There are no active placements for the table, so do not create the
* command as it'd lead to syntax error.
*
* This is normally not an expected situation, however the current
* implementation of citus_disable_node allows to disable nodes with
* the only active placements. So, for example a single shard/placement
* distributed table on a disabled node might trigger zero placement
* case.
*
* TODO: remove this check once citus_disable_node errors out for
* the above scenario.
*/
if (firstPlacementProcessed)
{
/* first insert shards, than the placements */
commandList = lappend(commandList, insertShardCommand->data);
commandList = lappend(commandList, insertPlacementCommand->data);
}
return commandList;
}

View File

@ -1353,7 +1353,17 @@ ActiveShardPlacementList(uint64 shardId)
ShardPlacement *shardPlacement = NULL;
foreach_ptr(shardPlacement, shardPlacementList)
{
if (shardPlacement->shardState == SHARD_STATE_ACTIVE)
WorkerNode *workerNode =
FindWorkerNode(shardPlacement->nodeName, shardPlacement->nodePort);
/*
* We have already resolved the placement to node, so would have
* errored out earlier.
*/
Assert(workerNode != NULL);
if (shardPlacement->shardState == SHARD_STATE_ACTIVE &&
workerNode->isActive)
{
activePlacementList = lappend(activePlacementList, shardPlacement);
}

View File

@ -211,7 +211,6 @@ static List * ReorderAndAssignTaskList(List *taskList,
ReorderFunction reorderFunction);
static int CompareTasksByShardId(const void *leftElement, const void *rightElement);
static List * ActiveShardPlacementLists(List *taskList);
static List * ActivePlacementList(List *placementList);
static List * LeftRotateList(List *list, uint32 rotateCount);
static List * FindDependentMergeTaskList(Task *sqlTask);
static List * AssignDualHashTaskList(List *taskList);
@ -5381,10 +5380,8 @@ ActiveShardPlacementLists(List *taskList)
{
Task *task = (Task *) lfirst(taskCell);
uint64 anchorShardId = task->anchorShardId;
List *shardPlacementList = ActiveShardPlacementList(anchorShardId);
List *activeShardPlacementList = ActiveShardPlacementList(anchorShardId);
/* filter out shard placements that reside in inactive nodes */
List *activeShardPlacementList = ActivePlacementList(shardPlacementList);
if (activeShardPlacementList == NIL)
{
ereport(ERROR,
@ -5430,34 +5427,6 @@ CompareShardPlacements(const void *leftElement, const void *rightElement)
}
/*
* ActivePlacementList walks over shard placements in the given list, and finds
* the corresponding worker node for each placement. The function then checks if
* that worker node is active, and if it is, appends the placement to a new list.
* The function last returns the new placement list.
*/
static List *
ActivePlacementList(List *placementList)
{
List *activePlacementList = NIL;
ListCell *placementCell = NULL;
foreach(placementCell, placementList)
{
ShardPlacement *placement = (ShardPlacement *) lfirst(placementCell);
/* check if the worker node for this shard placement is active */
WorkerNode *workerNode = FindWorkerNode(placement->nodeName, placement->nodePort);
if (workerNode != NULL && workerNode->isActive)
{
activePlacementList = lappend(activePlacementList, placement);
}
}
return activePlacementList;
}
/*
* LeftRotateList returns a copy of the given list that has been cyclically
* shifted to the left by the given rotation count. For this, the function