Avoid FindWorkerNode calls in adaptive executor

pull/2848/head
Marco Slot 2019-07-21 16:44:47 +02:00
parent 4444d92dbc
commit a3811b1e55
1 changed files with 17 additions and 14 deletions

View File

@ -290,7 +290,8 @@ typedef struct WorkerPool
DistributedExecution *distributedExecution; DistributedExecution *distributedExecution;
/* worker node on which we have a pool of sessions */ /* worker node on which we have a pool of sessions */
WorkerNode *node; char *nodeName;
int nodePort;
/* all sessions on the worker that are part of the current execution */ /* all sessions on the worker that are part of the current execution */
List *sessionList; List *sessionList;
@ -537,7 +538,7 @@ static void UnclaimAllSessionConnections(List *sessionList);
static bool UseConnectionPerPlacement(void); static bool UseConnectionPerPlacement(void);
static PlacementExecutionOrder ExecutionOrderForTask(RowModifyLevel modLevel, Task *task); static PlacementExecutionOrder ExecutionOrderForTask(RowModifyLevel modLevel, Task *task);
static WorkerPool * FindOrCreateWorkerPool(DistributedExecution *execution, static WorkerPool * FindOrCreateWorkerPool(DistributedExecution *execution,
WorkerNode *workerNode); char *nodeName, int nodePort);
static WorkerSession * FindOrCreateWorkerSession(WorkerPool *workerPool, static WorkerSession * FindOrCreateWorkerSession(WorkerPool *workerPool,
MultiConnection *connection); MultiConnection *connection);
static void ManageWorkerPool(WorkerPool *workerPool); static void ManageWorkerPool(WorkerPool *workerPool);
@ -1258,9 +1259,10 @@ AssignTasksToConnections(DistributedExecution *execution)
MultiConnection *connection = NULL; MultiConnection *connection = NULL;
int connectionFlags = 0; int connectionFlags = 0;
TaskPlacementExecution *placementExecution = NULL; TaskPlacementExecution *placementExecution = NULL;
WorkerNode *node = FindWorkerNode(taskPlacement->nodeName, char *nodeName = taskPlacement->nodeName;
taskPlacement->nodePort); int nodePort = taskPlacement->nodePort;
WorkerPool *workerPool = FindOrCreateWorkerPool(execution, node); WorkerPool *workerPool = FindOrCreateWorkerPool(execution, nodeName,
nodePort);
/* /*
* Execution of a command on a shard placement, which may not always * Execution of a command on a shard placement, which may not always
@ -1450,7 +1452,7 @@ ExecutionOrderForTask(RowModifyLevel modLevel, Task *task)
* FindOrCreateWorkerPool gets the pool of connections for a particular worker. * FindOrCreateWorkerPool gets the pool of connections for a particular worker.
*/ */
static WorkerPool * static WorkerPool *
FindOrCreateWorkerPool(DistributedExecution *execution, WorkerNode *workerNode) FindOrCreateWorkerPool(DistributedExecution *execution, char *nodeName, int nodePort)
{ {
WorkerPool *workerPool = NULL; WorkerPool *workerPool = NULL;
ListCell *workerCell = NULL; ListCell *workerCell = NULL;
@ -1460,14 +1462,16 @@ FindOrCreateWorkerPool(DistributedExecution *execution, WorkerNode *workerNode)
{ {
workerPool = lfirst(workerCell); workerPool = lfirst(workerCell);
if (WorkerNodeCompare(workerPool->node, workerNode, 0) == 0) if (strncmp(nodeName, workerPool->nodeName, WORKER_LENGTH) == 0 &&
nodePort == workerPool->nodePort)
{ {
return workerPool; return workerPool;
} }
} }
workerPool = (WorkerPool *) palloc0(sizeof(WorkerPool)); workerPool = (WorkerPool *) palloc0(sizeof(WorkerPool));
workerPool->node = workerNode; workerPool->nodeName = pstrdup(nodeName);
workerPool->nodePort = nodePort;
workerPool->poolStartTime = 0; workerPool->poolStartTime = 0;
workerPool->distributedExecution = execution; workerPool->distributedExecution = execution;
@ -1779,7 +1783,6 @@ static void
ManageWorkerPool(WorkerPool *workerPool) ManageWorkerPool(WorkerPool *workerPool)
{ {
DistributedExecution *execution = workerPool->distributedExecution; DistributedExecution *execution = workerPool->distributedExecution;
WorkerNode *workerNode = workerPool->node;
int targetPoolSize = execution->targetPoolSize; int targetPoolSize = execution->targetPoolSize;
int initiatedConnectionCount = list_length(workerPool->sessionList); int initiatedConnectionCount = list_length(workerPool->sessionList);
int activeConnectionCount PG_USED_FOR_ASSERTS_ONLY = int activeConnectionCount PG_USED_FOR_ASSERTS_ONLY =
@ -1874,7 +1877,7 @@ ManageWorkerPool(WorkerPool *workerPool)
} }
elog(DEBUG4, "opening %d new connections to %s:%d", newConnectionCount, elog(DEBUG4, "opening %d new connections to %s:%d", newConnectionCount,
workerNode->workerName, workerNode->workerPort); workerPool->nodeName, workerPool->nodePort);
for (connectionIndex = 0; connectionIndex < newConnectionCount; connectionIndex++) for (connectionIndex = 0; connectionIndex < newConnectionCount; connectionIndex++)
{ {
@ -1886,8 +1889,8 @@ ManageWorkerPool(WorkerPool *workerPool)
/* open a new connection to the worker */ /* open a new connection to the worker */
connection = StartNodeUserDatabaseConnection(connectionFlags, connection = StartNodeUserDatabaseConnection(connectionFlags,
workerNode->workerName, workerPool->nodeName,
workerNode->workerPort, workerPool->nodePort,
NULL, NULL); NULL, NULL);
/* /*
@ -1981,8 +1984,8 @@ CheckConnectionTimeout(WorkerPool *workerPool)
ereport(logLevel, (errcode(ERRCODE_CONNECTION_FAILURE), ereport(logLevel, (errcode(ERRCODE_CONNECTION_FAILURE),
errmsg("could not establish any connections to the node " errmsg("could not establish any connections to the node "
"%s:%d after %u ms", workerPool->node->workerName, "%s:%d after %u ms", workerPool->nodeName,
workerPool->node->workerPort, workerPool->nodePort,
NodeConnectionTimeout))); NodeConnectionTimeout)));
} }
else else