From 88c473e007749205aee519663c635e95b11c8103 Mon Sep 17 00:00:00 2001 From: Onder Kalaci Date: Thu, 18 Jun 2020 09:43:37 +0200 Subject: [PATCH] Sort WorkerPool in executions We sort the workerList because adaptive connection management (e.g., OPTIONAL_CONNECTION) requires any concurrent executions to wait for the connections in the same order to prevent any starvation. If we don't sort, we might end up with: Execution 1: Get connection for worker 1, wait for worker 2 Execution 2: Get connection for worker 2, wait for worker 1 and, none could proceed. Instead, we enforce every execution establish the required connections to workers in the same order. --- .../distributed/executor/adaptive_executor.c | 39 +++++++++++++++++++ 1 file changed, 39 insertions(+) diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index 35f7178f7..cf1e568c3 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -644,6 +644,7 @@ static void ProcessWaitEvents(DistributedExecution *execution, WaitEvent *events static long MillisecondsBetweenTimestamps(instr_time startTime, instr_time endTime); static HeapTuple BuildTupleFromBytes(AttInMetadata *attinmeta, fmStringInfo *values); static AttInMetadata * TupleDescGetAttBinaryInMetadata(TupleDesc tupdesc); +static int WorkerPoolCompare(const void *lhsKey, const void *rhsKey); static void SetAttributeInputMetadata(DistributedExecution *execution, ShardCommandExecution *shardCommandExecution); @@ -1906,6 +1907,19 @@ AssignTasksToConnectionsOrWorkerPool(DistributedExecution *execution) } } + /* + * We sort the workerList because adaptive connection management + * (e.g., OPTIONAL_CONNECTION) requires any concurrent executions + * to wait for the connections in the same order to prevent any + * starvation. If we don't sort, we might end up with: + * Execution 1: Get connection for worker 1, wait for worker 2 + * Execution 2: Get connection for worker 2, wait for worker 1 + * + * and, none could proceed. Instead, we enforce every execution establish + * the required connections to workers in the same order. + */ + execution->workerList = SortList(execution->workerList, WorkerPoolCompare); + /* * The executor claims connections exclusively to make sure that calls to * StartNodeUserDatabaseConnection do not return the same connections. @@ -1923,6 +1937,31 @@ AssignTasksToConnectionsOrWorkerPool(DistributedExecution *execution) } +/* + * WorkerPoolCompare is based on WorkerNodeCompare function. + * + * The function compares two worker nodes by their host name and port + * number. + */ +static int +WorkerPoolCompare(const void *lhsKey, const void *rhsKey) +{ + const WorkerPool *workerLhs = *(const WorkerPool **) lhsKey; + const WorkerPool *workerRhs = *(const WorkerPool **) rhsKey; + + int nameCompare = strncmp(workerLhs->nodeName, workerRhs->nodeName, + WORKER_LENGTH); + + if (nameCompare != 0) + { + return nameCompare; + } + + int portCompare = workerLhs->nodePort - workerRhs->nodePort; + return portCompare; +} + + /* * SetAttributeInputMetadata sets attributeInputMetadata in * shardCommandExecution for all the queries that are part of its task.