From a369f9001de3bae2060d6c7ada103644416d9ad8 Mon Sep 17 00:00:00 2001 From: SaitTalhaNisanci Date: Tue, 7 Apr 2020 11:14:14 +0300 Subject: [PATCH] fix incorrect groupid or nodeid (#3710) For shardplacements, we were setting nodeid, nodename, nodeport and nodegroup manually. This makes it very error prone, and it seems that we already forgot to set some of them. This would mean that they would have their default values, e.g group id would be 0 when its group id is not 0. So the implication is that we would have inconsistent worker metadata. A new method is introduced, and we call the method to set those fields now, so that as long as we call this method, we won't be setting inconsistent metadata. It probably makes sense to have a struct for these fields. We already have NodeMetadata but it doesn't have nodename or nodeport. So that could be done over another refactor to make things simpler. --- src/backend/distributed/commands/utility_hook.c | 1 + .../distributed/executor/adaptive_executor.c | 1 + .../executor/distributed_intermediate_results.c | 5 ++--- .../distributed/metadata/metadata_cache.c | 5 ++--- .../planner/multi_physical_planner.c | 17 ++++++++++++++--- .../distributed/planner/multi_router_planner.c | 5 +---- .../distributed/multi_physical_planner.h | 2 ++ 7 files changed, 23 insertions(+), 13 deletions(-) diff --git a/src/backend/distributed/commands/utility_hook.c b/src/backend/distributed/commands/utility_hook.c index 083d42a2c..2dfd4386f 100644 --- a/src/backend/distributed/commands/utility_hook.c +++ b/src/backend/distributed/commands/utility_hook.c @@ -51,6 +51,7 @@ #include "distributed/metadata_sync.h" #include "distributed/multi_executor.h" #include "distributed/multi_explain.h" +#include "distributed/multi_physical_planner.h" #include "distributed/resource_lock.h" #include "distributed/transmit.h" #include "distributed/version_compat.h" diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index a3b707230..fab5e355e 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -1938,6 +1938,7 @@ FindOrCreateWorkerPool(DistributedExecution *execution, char *nodeName, int node workerPool = (WorkerPool *) palloc0(sizeof(WorkerPool)); workerPool->nodeName = pstrdup(nodeName); workerPool->nodePort = nodePort; + INSTR_TIME_SET_ZERO(workerPool->poolStartTime); workerPool->distributedExecution = execution; diff --git a/src/backend/distributed/executor/distributed_intermediate_results.c b/src/backend/distributed/executor/distributed_intermediate_results.c index b874f9018..44e4522a2 100644 --- a/src/backend/distributed/executor/distributed_intermediate_results.c +++ b/src/backend/distributed/executor/distributed_intermediate_results.c @@ -26,6 +26,7 @@ #include "distributed/master_metadata_utility.h" #include "distributed/metadata_cache.h" #include "distributed/multi_executor.h" +#include "distributed/multi_physical_planner.h" #include "distributed/transaction_management.h" #include "distributed/tuplestore.h" #include "distributed/worker_protocol.h" @@ -546,9 +547,7 @@ FragmentTransferTaskList(List *fragmentListTransfers) WorkerNode *workerNode = ForceLookupNodeByNodeId(targetNodeId); ShardPlacement *targetPlacement = CitusMakeNode(ShardPlacement); - targetPlacement->nodeName = workerNode->workerName; - targetPlacement->nodePort = workerNode->workerPort; - targetPlacement->groupId = workerNode->groupId; + SetPlacementNodeMetadata(targetPlacement, workerNode); Task *task = CitusMakeNode(Task); task->taskType = SELECT_TASK; diff --git a/src/backend/distributed/metadata/metadata_cache.c b/src/backend/distributed/metadata/metadata_cache.c index 74c666963..f5f3a111b 100644 --- a/src/backend/distributed/metadata/metadata_cache.c +++ b/src/backend/distributed/metadata/metadata_cache.c @@ -41,6 +41,7 @@ #include "distributed/metadata/pg_dist_object.h" #include "distributed/metadata_cache.h" #include "distributed/multi_executor.h" +#include "distributed/multi_physical_planner.h" #include "distributed/pg_dist_local_group.h" #include "distributed/pg_dist_node_metadata.h" #include "distributed/pg_dist_node.h" @@ -521,9 +522,7 @@ ResolveGroupShardPlacement(GroupShardPlacement *groupShardPlacement, *shardPlacementAsGroupPlacement = *groupShardPlacement; shardPlacement->type = header; - shardPlacement->nodeName = pstrdup(workerNode->workerName); - shardPlacement->nodePort = workerNode->workerPort; - shardPlacement->nodeId = workerNode->nodeId; + SetPlacementNodeMetadata(shardPlacement, workerNode); /* fill in remaining fields */ Assert(tableEntry->partitionMethod != 0); diff --git a/src/backend/distributed/planner/multi_physical_planner.c b/src/backend/distributed/planner/multi_physical_planner.c index fccc8a521..a2f239d33 100644 --- a/src/backend/distributed/planner/multi_physical_planner.c +++ b/src/backend/distributed/planner/multi_physical_planner.c @@ -5579,9 +5579,7 @@ AssignDualHashTaskList(List *taskList) WorkerNode *workerNode = list_nth(workerNodeList, assignmentIndex); ShardPlacement *taskPlacement = CitusMakeNode(ShardPlacement); - taskPlacement->nodeName = pstrdup(workerNode->workerName); - taskPlacement->nodePort = workerNode->workerPort; - taskPlacement->nodeId = workerNode->nodeId; + SetPlacementNodeMetadata(taskPlacement, workerNode); taskPlacementList = lappend(taskPlacementList, taskPlacement); } @@ -5602,6 +5600,19 @@ AssignDualHashTaskList(List *taskList) } +/* + * SetPlacementNodeMetadata sets nodename, nodeport, nodeid and groupid for the placement. + */ +void +SetPlacementNodeMetadata(ShardPlacement *placement, WorkerNode *workerNode) +{ + placement->nodeName = pstrdup(workerNode->workerName); + placement->nodePort = workerNode->workerPort; + placement->nodeId = workerNode->nodeId; + placement->groupId = workerNode->groupId; +} + + /* Helper function to compare two tasks by their taskId. */ int CompareTasksByTaskId(const void *leftElement, const void *rightElement) diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index e493c3008..10c877b1a 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -2197,10 +2197,7 @@ CreateDummyPlacement(void) int workerNodeIndex = zeroShardQueryRoundRobin % workerNodeCount; WorkerNode *workerNode = (WorkerNode *) list_nth(workerNodeList, workerNodeIndex); - dummyPlacement->nodeName = workerNode->workerName; - dummyPlacement->nodePort = workerNode->workerPort; - dummyPlacement->nodeId = workerNode->nodeId; - dummyPlacement->groupId = workerNode->groupId; + SetPlacementNodeMetadata(dummyPlacement, workerNode); zeroShardQueryRoundRobin++; } diff --git a/src/include/distributed/multi_physical_planner.h b/src/include/distributed/multi_physical_planner.h index 693f788b1..1da038cd7 100644 --- a/src/include/distributed/multi_physical_planner.h +++ b/src/include/distributed/multi_physical_planner.h @@ -25,6 +25,7 @@ #include "distributed/errormessage.h" #include "distributed/log_utils.h" #include "distributed/master_metadata_utility.h" +#include "distributed/worker_manager.h" #include "distributed/multi_logical_planner.h" #include "distributed/distributed_planner.h" #include "lib/stringinfo.h" @@ -517,6 +518,7 @@ extern List * AssignAnchorShardTaskList(List *taskList); extern List * FirstReplicaAssignTaskList(List *taskList); extern List * RoundRobinAssignTaskList(List *taskList); extern List * RoundRobinReorder(Task *task, List *placementList); +extern void SetPlacementNodeMetadata(ShardPlacement *placement, WorkerNode *workerNode); extern int CompareTasksByTaskId(const void *leftElement, const void *rightElement); /* function declaration for creating Task */