diff --git a/src/backend/distributed/commands/utility_hook.c b/src/backend/distributed/commands/utility_hook.c index 083d42a2c..2dfd4386f 100644 --- a/src/backend/distributed/commands/utility_hook.c +++ b/src/backend/distributed/commands/utility_hook.c @@ -51,6 +51,7 @@ #include "distributed/metadata_sync.h" #include "distributed/multi_executor.h" #include "distributed/multi_explain.h" +#include "distributed/multi_physical_planner.h" #include "distributed/resource_lock.h" #include "distributed/transmit.h" #include "distributed/version_compat.h" diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index a3b707230..fab5e355e 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -1938,6 +1938,7 @@ FindOrCreateWorkerPool(DistributedExecution *execution, char *nodeName, int node workerPool = (WorkerPool *) palloc0(sizeof(WorkerPool)); workerPool->nodeName = pstrdup(nodeName); workerPool->nodePort = nodePort; + INSTR_TIME_SET_ZERO(workerPool->poolStartTime); workerPool->distributedExecution = execution; diff --git a/src/backend/distributed/executor/distributed_intermediate_results.c b/src/backend/distributed/executor/distributed_intermediate_results.c index b874f9018..44e4522a2 100644 --- a/src/backend/distributed/executor/distributed_intermediate_results.c +++ b/src/backend/distributed/executor/distributed_intermediate_results.c @@ -26,6 +26,7 @@ #include "distributed/master_metadata_utility.h" #include "distributed/metadata_cache.h" #include "distributed/multi_executor.h" +#include "distributed/multi_physical_planner.h" #include "distributed/transaction_management.h" #include "distributed/tuplestore.h" #include "distributed/worker_protocol.h" @@ -546,9 +547,7 @@ FragmentTransferTaskList(List *fragmentListTransfers) WorkerNode *workerNode = ForceLookupNodeByNodeId(targetNodeId); ShardPlacement *targetPlacement = CitusMakeNode(ShardPlacement); - targetPlacement->nodeName = workerNode->workerName; - targetPlacement->nodePort = workerNode->workerPort; - targetPlacement->groupId = workerNode->groupId; + SetPlacementNodeMetadata(targetPlacement, workerNode); Task *task = CitusMakeNode(Task); task->taskType = SELECT_TASK; diff --git a/src/backend/distributed/metadata/metadata_cache.c b/src/backend/distributed/metadata/metadata_cache.c index 74c666963..f5f3a111b 100644 --- a/src/backend/distributed/metadata/metadata_cache.c +++ b/src/backend/distributed/metadata/metadata_cache.c @@ -41,6 +41,7 @@ #include "distributed/metadata/pg_dist_object.h" #include "distributed/metadata_cache.h" #include "distributed/multi_executor.h" +#include "distributed/multi_physical_planner.h" #include "distributed/pg_dist_local_group.h" #include "distributed/pg_dist_node_metadata.h" #include "distributed/pg_dist_node.h" @@ -521,9 +522,7 @@ ResolveGroupShardPlacement(GroupShardPlacement *groupShardPlacement, *shardPlacementAsGroupPlacement = *groupShardPlacement; shardPlacement->type = header; - shardPlacement->nodeName = pstrdup(workerNode->workerName); - shardPlacement->nodePort = workerNode->workerPort; - shardPlacement->nodeId = workerNode->nodeId; + SetPlacementNodeMetadata(shardPlacement, workerNode); /* fill in remaining fields */ Assert(tableEntry->partitionMethod != 0); diff --git a/src/backend/distributed/planner/multi_physical_planner.c b/src/backend/distributed/planner/multi_physical_planner.c index fccc8a521..a2f239d33 100644 --- a/src/backend/distributed/planner/multi_physical_planner.c +++ b/src/backend/distributed/planner/multi_physical_planner.c @@ -5579,9 +5579,7 @@ AssignDualHashTaskList(List *taskList) WorkerNode *workerNode = list_nth(workerNodeList, assignmentIndex); ShardPlacement *taskPlacement = CitusMakeNode(ShardPlacement); - taskPlacement->nodeName = pstrdup(workerNode->workerName); - taskPlacement->nodePort = workerNode->workerPort; - taskPlacement->nodeId = workerNode->nodeId; + SetPlacementNodeMetadata(taskPlacement, workerNode); taskPlacementList = lappend(taskPlacementList, taskPlacement); } @@ -5602,6 +5600,19 @@ AssignDualHashTaskList(List *taskList) } +/* + * SetPlacementNodeMetadata sets nodename, nodeport, nodeid and groupid for the placement. + */ +void +SetPlacementNodeMetadata(ShardPlacement *placement, WorkerNode *workerNode) +{ + placement->nodeName = pstrdup(workerNode->workerName); + placement->nodePort = workerNode->workerPort; + placement->nodeId = workerNode->nodeId; + placement->groupId = workerNode->groupId; +} + + /* Helper function to compare two tasks by their taskId. */ int CompareTasksByTaskId(const void *leftElement, const void *rightElement) diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index e493c3008..10c877b1a 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -2197,10 +2197,7 @@ CreateDummyPlacement(void) int workerNodeIndex = zeroShardQueryRoundRobin % workerNodeCount; WorkerNode *workerNode = (WorkerNode *) list_nth(workerNodeList, workerNodeIndex); - dummyPlacement->nodeName = workerNode->workerName; - dummyPlacement->nodePort = workerNode->workerPort; - dummyPlacement->nodeId = workerNode->nodeId; - dummyPlacement->groupId = workerNode->groupId; + SetPlacementNodeMetadata(dummyPlacement, workerNode); zeroShardQueryRoundRobin++; } diff --git a/src/include/distributed/multi_physical_planner.h b/src/include/distributed/multi_physical_planner.h index 693f788b1..1da038cd7 100644 --- a/src/include/distributed/multi_physical_planner.h +++ b/src/include/distributed/multi_physical_planner.h @@ -25,6 +25,7 @@ #include "distributed/errormessage.h" #include "distributed/log_utils.h" #include "distributed/master_metadata_utility.h" +#include "distributed/worker_manager.h" #include "distributed/multi_logical_planner.h" #include "distributed/distributed_planner.h" #include "lib/stringinfo.h" @@ -517,6 +518,7 @@ extern List * AssignAnchorShardTaskList(List *taskList); extern List * FirstReplicaAssignTaskList(List *taskList); extern List * RoundRobinAssignTaskList(List *taskList); extern List * RoundRobinReorder(Task *task, List *placementList); +extern void SetPlacementNodeMetadata(ShardPlacement *placement, WorkerNode *workerNode); extern int CompareTasksByTaskId(const void *leftElement, const void *rightElement); /* function declaration for creating Task */