Merge pull request #3923 from citusdata/assert_order

Sort WorkerPool in executions
pull/3935/head
Önder Kalacı 2020-06-22 18:27:54 +02:00 committed by GitHub
commit f41e1b1a60
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 39 additions and 0 deletions

View File

@ -644,6 +644,7 @@ static void ProcessWaitEvents(DistributedExecution *execution, WaitEvent *events
static long MillisecondsBetweenTimestamps(instr_time startTime, instr_time endTime);
static HeapTuple BuildTupleFromBytes(AttInMetadata *attinmeta, fmStringInfo *values);
static AttInMetadata * TupleDescGetAttBinaryInMetadata(TupleDesc tupdesc);
static int WorkerPoolCompare(const void *lhsKey, const void *rhsKey);
static void SetAttributeInputMetadata(DistributedExecution *execution,
ShardCommandExecution *shardCommandExecution);
@ -1906,6 +1907,19 @@ AssignTasksToConnectionsOrWorkerPool(DistributedExecution *execution)
}
}
/*
* We sort the workerList because adaptive connection management
* (e.g., OPTIONAL_CONNECTION) requires any concurrent executions
* to wait for the connections in the same order to prevent any
* starvation. If we don't sort, we might end up with:
* Execution 1: Get connection for worker 1, wait for worker 2
* Execution 2: Get connection for worker 2, wait for worker 1
*
* and, none could proceed. Instead, we enforce every execution establish
* the required connections to workers in the same order.
*/
execution->workerList = SortList(execution->workerList, WorkerPoolCompare);
/*
* The executor claims connections exclusively to make sure that calls to
* StartNodeUserDatabaseConnection do not return the same connections.
@ -1923,6 +1937,31 @@ AssignTasksToConnectionsOrWorkerPool(DistributedExecution *execution)
}
/*
* WorkerPoolCompare is based on WorkerNodeCompare function.
*
* The function compares two worker nodes by their host name and port
* number.
*/
static int
WorkerPoolCompare(const void *lhsKey, const void *rhsKey)
{
const WorkerPool *workerLhs = *(const WorkerPool **) lhsKey;
const WorkerPool *workerRhs = *(const WorkerPool **) rhsKey;
int nameCompare = strncmp(workerLhs->nodeName, workerRhs->nodeName,
WORKER_LENGTH);
if (nameCompare != 0)
{
return nameCompare;
}
int portCompare = workerLhs->nodePort - workerRhs->nodePort;
return portCompare;
}
/*
* SetAttributeInputMetadata sets attributeInputMetadata in
* shardCommandExecution for all the queries that are part of its task.