From a3811b1e5517b8c0c5ae3a25f8753c4d9768331b Mon Sep 17 00:00:00 2001 From: Marco Slot Date: Sun, 21 Jul 2019 16:44:47 +0200 Subject: [PATCH] Avoid FindWorkerNode calls in adaptive executor --- .../distributed/executor/adaptive_executor.c | 31 ++++++++++--------- 1 file changed, 17 insertions(+), 14 deletions(-) diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index 815de8cc6..dfbcae051 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -290,7 +290,8 @@ typedef struct WorkerPool DistributedExecution *distributedExecution; /* worker node on which we have a pool of sessions */ - WorkerNode *node; + char *nodeName; + int nodePort; /* all sessions on the worker that are part of the current execution */ List *sessionList; @@ -537,7 +538,7 @@ static void UnclaimAllSessionConnections(List *sessionList); static bool UseConnectionPerPlacement(void); static PlacementExecutionOrder ExecutionOrderForTask(RowModifyLevel modLevel, Task *task); static WorkerPool * FindOrCreateWorkerPool(DistributedExecution *execution, - WorkerNode *workerNode); + char *nodeName, int nodePort); static WorkerSession * FindOrCreateWorkerSession(WorkerPool *workerPool, MultiConnection *connection); static void ManageWorkerPool(WorkerPool *workerPool); @@ -1258,9 +1259,10 @@ AssignTasksToConnections(DistributedExecution *execution) MultiConnection *connection = NULL; int connectionFlags = 0; TaskPlacementExecution *placementExecution = NULL; - WorkerNode *node = FindWorkerNode(taskPlacement->nodeName, - taskPlacement->nodePort); - WorkerPool *workerPool = FindOrCreateWorkerPool(execution, node); + char *nodeName = taskPlacement->nodeName; + int nodePort = taskPlacement->nodePort; + WorkerPool *workerPool = FindOrCreateWorkerPool(execution, nodeName, + nodePort); /* * Execution of a command on a shard placement, which may not always @@ -1450,7 +1452,7 @@ ExecutionOrderForTask(RowModifyLevel modLevel, Task *task) * FindOrCreateWorkerPool gets the pool of connections for a particular worker. */ static WorkerPool * -FindOrCreateWorkerPool(DistributedExecution *execution, WorkerNode *workerNode) +FindOrCreateWorkerPool(DistributedExecution *execution, char *nodeName, int nodePort) { WorkerPool *workerPool = NULL; ListCell *workerCell = NULL; @@ -1460,14 +1462,16 @@ FindOrCreateWorkerPool(DistributedExecution *execution, WorkerNode *workerNode) { workerPool = lfirst(workerCell); - if (WorkerNodeCompare(workerPool->node, workerNode, 0) == 0) + if (strncmp(nodeName, workerPool->nodeName, WORKER_LENGTH) == 0 && + nodePort == workerPool->nodePort) { return workerPool; } } workerPool = (WorkerPool *) palloc0(sizeof(WorkerPool)); - workerPool->node = workerNode; + workerPool->nodeName = pstrdup(nodeName); + workerPool->nodePort = nodePort; workerPool->poolStartTime = 0; workerPool->distributedExecution = execution; @@ -1779,7 +1783,6 @@ static void ManageWorkerPool(WorkerPool *workerPool) { DistributedExecution *execution = workerPool->distributedExecution; - WorkerNode *workerNode = workerPool->node; int targetPoolSize = execution->targetPoolSize; int initiatedConnectionCount = list_length(workerPool->sessionList); int activeConnectionCount PG_USED_FOR_ASSERTS_ONLY = @@ -1874,7 +1877,7 @@ ManageWorkerPool(WorkerPool *workerPool) } elog(DEBUG4, "opening %d new connections to %s:%d", newConnectionCount, - workerNode->workerName, workerNode->workerPort); + workerPool->nodeName, workerPool->nodePort); for (connectionIndex = 0; connectionIndex < newConnectionCount; connectionIndex++) { @@ -1886,8 +1889,8 @@ ManageWorkerPool(WorkerPool *workerPool) /* open a new connection to the worker */ connection = StartNodeUserDatabaseConnection(connectionFlags, - workerNode->workerName, - workerNode->workerPort, + workerPool->nodeName, + workerPool->nodePort, NULL, NULL); /* @@ -1981,8 +1984,8 @@ CheckConnectionTimeout(WorkerPool *workerPool) ereport(logLevel, (errcode(ERRCODE_CONNECTION_FAILURE), errmsg("could not establish any connections to the node " - "%s:%d after %u ms", workerPool->node->workerName, - workerPool->node->workerPort, + "%s:%d after %u ms", workerPool->nodeName, + workerPool->nodePort, NodeConnectionTimeout))); } else