mirror of https://github.com/citusdata/citus.git
Sort WorkerPool in executions
We sort the workerList because adaptive connection management (e.g., OPTIONAL_CONNECTION) requires any concurrent executions to wait for the connections in the same order to prevent any starvation. If we don't sort, we might end up with: Execution 1: Get connection for worker 1, wait for worker 2 Execution 2: Get connection for worker 2, wait for worker 1 and, none could proceed. Instead, we enforce every execution establish the required connections to workers in the same order.pull/3923/head
parent
fb46ef1d17
commit
88c473e007
|
@ -644,6 +644,7 @@ static void ProcessWaitEvents(DistributedExecution *execution, WaitEvent *events
|
||||||
static long MillisecondsBetweenTimestamps(instr_time startTime, instr_time endTime);
|
static long MillisecondsBetweenTimestamps(instr_time startTime, instr_time endTime);
|
||||||
static HeapTuple BuildTupleFromBytes(AttInMetadata *attinmeta, fmStringInfo *values);
|
static HeapTuple BuildTupleFromBytes(AttInMetadata *attinmeta, fmStringInfo *values);
|
||||||
static AttInMetadata * TupleDescGetAttBinaryInMetadata(TupleDesc tupdesc);
|
static AttInMetadata * TupleDescGetAttBinaryInMetadata(TupleDesc tupdesc);
|
||||||
|
static int WorkerPoolCompare(const void *lhsKey, const void *rhsKey);
|
||||||
static void SetAttributeInputMetadata(DistributedExecution *execution,
|
static void SetAttributeInputMetadata(DistributedExecution *execution,
|
||||||
ShardCommandExecution *shardCommandExecution);
|
ShardCommandExecution *shardCommandExecution);
|
||||||
|
|
||||||
|
@ -1906,6 +1907,19 @@ AssignTasksToConnectionsOrWorkerPool(DistributedExecution *execution)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* We sort the workerList because adaptive connection management
|
||||||
|
* (e.g., OPTIONAL_CONNECTION) requires any concurrent executions
|
||||||
|
* to wait for the connections in the same order to prevent any
|
||||||
|
* starvation. If we don't sort, we might end up with:
|
||||||
|
* Execution 1: Get connection for worker 1, wait for worker 2
|
||||||
|
* Execution 2: Get connection for worker 2, wait for worker 1
|
||||||
|
*
|
||||||
|
* and, none could proceed. Instead, we enforce every execution establish
|
||||||
|
* the required connections to workers in the same order.
|
||||||
|
*/
|
||||||
|
execution->workerList = SortList(execution->workerList, WorkerPoolCompare);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* The executor claims connections exclusively to make sure that calls to
|
* The executor claims connections exclusively to make sure that calls to
|
||||||
* StartNodeUserDatabaseConnection do not return the same connections.
|
* StartNodeUserDatabaseConnection do not return the same connections.
|
||||||
|
@ -1923,6 +1937,31 @@ AssignTasksToConnectionsOrWorkerPool(DistributedExecution *execution)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* WorkerPoolCompare is based on WorkerNodeCompare function.
|
||||||
|
*
|
||||||
|
* The function compares two worker nodes by their host name and port
|
||||||
|
* number.
|
||||||
|
*/
|
||||||
|
static int
|
||||||
|
WorkerPoolCompare(const void *lhsKey, const void *rhsKey)
|
||||||
|
{
|
||||||
|
const WorkerPool *workerLhs = *(const WorkerPool **) lhsKey;
|
||||||
|
const WorkerPool *workerRhs = *(const WorkerPool **) rhsKey;
|
||||||
|
|
||||||
|
int nameCompare = strncmp(workerLhs->nodeName, workerRhs->nodeName,
|
||||||
|
WORKER_LENGTH);
|
||||||
|
|
||||||
|
if (nameCompare != 0)
|
||||||
|
{
|
||||||
|
return nameCompare;
|
||||||
|
}
|
||||||
|
|
||||||
|
int portCompare = workerLhs->nodePort - workerRhs->nodePort;
|
||||||
|
return portCompare;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* SetAttributeInputMetadata sets attributeInputMetadata in
|
* SetAttributeInputMetadata sets attributeInputMetadata in
|
||||||
* shardCommandExecution for all the queries that are part of its task.
|
* shardCommandExecution for all the queries that are part of its task.
|
||||||
|
|
Loading…
Reference in New Issue