mirror of https://github.com/citusdata/citus.git
fix incorrect groupid or nodeid (#3710)
For shardplacements, we were setting nodeid, nodename, nodeport and nodegroup manually. This makes it very error prone, and it seems that we already forgot to set some of them. This would mean that they would have their default values, e.g group id would be 0 when its group id is not 0. So the implication is that we would have inconsistent worker metadata. A new method is introduced, and we call the method to set those fields now, so that as long as we call this method, we won't be setting inconsistent metadata. It probably makes sense to have a struct for these fields. We already have NodeMetadata but it doesn't have nodename or nodeport. So that could be done over another refactor to make things simpler.pull/3711/head
parent
ec734a643b
commit
a369f9001d
|
@ -51,6 +51,7 @@
|
|||
#include "distributed/metadata_sync.h"
|
||||
#include "distributed/multi_executor.h"
|
||||
#include "distributed/multi_explain.h"
|
||||
#include "distributed/multi_physical_planner.h"
|
||||
#include "distributed/resource_lock.h"
|
||||
#include "distributed/transmit.h"
|
||||
#include "distributed/version_compat.h"
|
||||
|
|
|
@ -1938,6 +1938,7 @@ FindOrCreateWorkerPool(DistributedExecution *execution, char *nodeName, int node
|
|||
workerPool = (WorkerPool *) palloc0(sizeof(WorkerPool));
|
||||
workerPool->nodeName = pstrdup(nodeName);
|
||||
workerPool->nodePort = nodePort;
|
||||
|
||||
INSTR_TIME_SET_ZERO(workerPool->poolStartTime);
|
||||
workerPool->distributedExecution = execution;
|
||||
|
||||
|
|
|
@ -26,6 +26,7 @@
|
|||
#include "distributed/master_metadata_utility.h"
|
||||
#include "distributed/metadata_cache.h"
|
||||
#include "distributed/multi_executor.h"
|
||||
#include "distributed/multi_physical_planner.h"
|
||||
#include "distributed/transaction_management.h"
|
||||
#include "distributed/tuplestore.h"
|
||||
#include "distributed/worker_protocol.h"
|
||||
|
@ -546,9 +547,7 @@ FragmentTransferTaskList(List *fragmentListTransfers)
|
|||
WorkerNode *workerNode = ForceLookupNodeByNodeId(targetNodeId);
|
||||
|
||||
ShardPlacement *targetPlacement = CitusMakeNode(ShardPlacement);
|
||||
targetPlacement->nodeName = workerNode->workerName;
|
||||
targetPlacement->nodePort = workerNode->workerPort;
|
||||
targetPlacement->groupId = workerNode->groupId;
|
||||
SetPlacementNodeMetadata(targetPlacement, workerNode);
|
||||
|
||||
Task *task = CitusMakeNode(Task);
|
||||
task->taskType = SELECT_TASK;
|
||||
|
|
|
@ -41,6 +41,7 @@
|
|||
#include "distributed/metadata/pg_dist_object.h"
|
||||
#include "distributed/metadata_cache.h"
|
||||
#include "distributed/multi_executor.h"
|
||||
#include "distributed/multi_physical_planner.h"
|
||||
#include "distributed/pg_dist_local_group.h"
|
||||
#include "distributed/pg_dist_node_metadata.h"
|
||||
#include "distributed/pg_dist_node.h"
|
||||
|
@ -521,9 +522,7 @@ ResolveGroupShardPlacement(GroupShardPlacement *groupShardPlacement,
|
|||
*shardPlacementAsGroupPlacement = *groupShardPlacement;
|
||||
shardPlacement->type = header;
|
||||
|
||||
shardPlacement->nodeName = pstrdup(workerNode->workerName);
|
||||
shardPlacement->nodePort = workerNode->workerPort;
|
||||
shardPlacement->nodeId = workerNode->nodeId;
|
||||
SetPlacementNodeMetadata(shardPlacement, workerNode);
|
||||
|
||||
/* fill in remaining fields */
|
||||
Assert(tableEntry->partitionMethod != 0);
|
||||
|
|
|
@ -5579,9 +5579,7 @@ AssignDualHashTaskList(List *taskList)
|
|||
WorkerNode *workerNode = list_nth(workerNodeList, assignmentIndex);
|
||||
|
||||
ShardPlacement *taskPlacement = CitusMakeNode(ShardPlacement);
|
||||
taskPlacement->nodeName = pstrdup(workerNode->workerName);
|
||||
taskPlacement->nodePort = workerNode->workerPort;
|
||||
taskPlacement->nodeId = workerNode->nodeId;
|
||||
SetPlacementNodeMetadata(taskPlacement, workerNode);
|
||||
|
||||
taskPlacementList = lappend(taskPlacementList, taskPlacement);
|
||||
}
|
||||
|
@ -5602,6 +5600,19 @@ AssignDualHashTaskList(List *taskList)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* SetPlacementNodeMetadata sets nodename, nodeport, nodeid and groupid for the placement.
|
||||
*/
|
||||
void
|
||||
SetPlacementNodeMetadata(ShardPlacement *placement, WorkerNode *workerNode)
|
||||
{
|
||||
placement->nodeName = pstrdup(workerNode->workerName);
|
||||
placement->nodePort = workerNode->workerPort;
|
||||
placement->nodeId = workerNode->nodeId;
|
||||
placement->groupId = workerNode->groupId;
|
||||
}
|
||||
|
||||
|
||||
/* Helper function to compare two tasks by their taskId. */
|
||||
int
|
||||
CompareTasksByTaskId(const void *leftElement, const void *rightElement)
|
||||
|
|
|
@ -2197,10 +2197,7 @@ CreateDummyPlacement(void)
|
|||
int workerNodeIndex = zeroShardQueryRoundRobin % workerNodeCount;
|
||||
WorkerNode *workerNode = (WorkerNode *) list_nth(workerNodeList,
|
||||
workerNodeIndex);
|
||||
dummyPlacement->nodeName = workerNode->workerName;
|
||||
dummyPlacement->nodePort = workerNode->workerPort;
|
||||
dummyPlacement->nodeId = workerNode->nodeId;
|
||||
dummyPlacement->groupId = workerNode->groupId;
|
||||
SetPlacementNodeMetadata(dummyPlacement, workerNode);
|
||||
|
||||
zeroShardQueryRoundRobin++;
|
||||
}
|
||||
|
|
|
@ -25,6 +25,7 @@
|
|||
#include "distributed/errormessage.h"
|
||||
#include "distributed/log_utils.h"
|
||||
#include "distributed/master_metadata_utility.h"
|
||||
#include "distributed/worker_manager.h"
|
||||
#include "distributed/multi_logical_planner.h"
|
||||
#include "distributed/distributed_planner.h"
|
||||
#include "lib/stringinfo.h"
|
||||
|
@ -517,6 +518,7 @@ extern List * AssignAnchorShardTaskList(List *taskList);
|
|||
extern List * FirstReplicaAssignTaskList(List *taskList);
|
||||
extern List * RoundRobinAssignTaskList(List *taskList);
|
||||
extern List * RoundRobinReorder(Task *task, List *placementList);
|
||||
extern void SetPlacementNodeMetadata(ShardPlacement *placement, WorkerNode *workerNode);
|
||||
extern int CompareTasksByTaskId(const void *leftElement, const void *rightElement);
|
||||
|
||||
/* function declaration for creating Task */
|
||||
|
|
Loading…
Reference in New Issue