mirror of https://github.com/citusdata/citus.git
separate the logic in ManageWorkerPool
parent
dcdd510d75
commit
9f66a78695
|
@ -590,6 +590,11 @@ static WorkerSession * FindOrCreateWorkerSession(WorkerPool *workerPool,
|
|||
MultiConnection *connection);
|
||||
static void ManageWorkerPool(WorkerPool *workerPool);
|
||||
static bool ShouldWaitForConnection(WorkerPool *workerPool);
|
||||
static bool CanOpenNewConnection(WorkerPool *workerPool);
|
||||
static bool IsTooEarlyToOpenConnections(WorkerPool *workerPool);
|
||||
static int CalculateNewConnectionCount(WorkerPool *workerPool);
|
||||
static void OpenNewConnections(WorkerPool *workerPool, int newConnectionCount,
|
||||
TransactionProperties *transactionProperties);
|
||||
static void CheckConnectionTimeout(WorkerPool *workerPool);
|
||||
static int UsableConnectionCount(WorkerPool *workerPool);
|
||||
static long NextEventTimeout(DistributedExecution *execution);
|
||||
|
@ -2300,29 +2305,39 @@ static void
|
|||
ManageWorkerPool(WorkerPool *workerPool)
|
||||
{
|
||||
DistributedExecution *execution = workerPool->distributedExecution;
|
||||
int targetPoolSize = execution->targetPoolSize;
|
||||
int initiatedConnectionCount = list_length(workerPool->sessionList);
|
||||
int activeConnectionCount PG_USED_FOR_ASSERTS_ONLY =
|
||||
workerPool->activeConnectionCount;
|
||||
int idleConnectionCount PG_USED_FOR_ASSERTS_ONLY =
|
||||
workerPool->idleConnectionCount;
|
||||
|
||||
if (!CanOpenNewConnection(workerPool))
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
int newConnectionCount = CalculateNewConnectionCount(workerPool);
|
||||
|
||||
if (newConnectionCount <= 0)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
OpenNewConnections(workerPool, newConnectionCount, execution->transactionProperties);
|
||||
|
||||
INSTR_TIME_SET_CURRENT(workerPool->lastConnectionOpenTime);
|
||||
execution->connectionSetChanged = true;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* CanOpenNewConnection returns true if we can open new connections
|
||||
* for the given workerPool.
|
||||
*/
|
||||
static bool
|
||||
CanOpenNewConnection(WorkerPool *workerPool)
|
||||
{
|
||||
int failedConnectionCount = workerPool->failedConnectionCount;
|
||||
int readyTaskCount = workerPool->readyTaskCount;
|
||||
int newConnectionCount = 0;
|
||||
|
||||
/* we should always have more (or equal) active connections than idle connections */
|
||||
Assert(activeConnectionCount >= idleConnectionCount);
|
||||
|
||||
/* we should always have more (or equal) initiated connections than active connections */
|
||||
Assert(initiatedConnectionCount >= activeConnectionCount);
|
||||
|
||||
/* we should never have less than 0 connections ever */
|
||||
Assert(activeConnectionCount >= 0 && idleConnectionCount >= 0);
|
||||
|
||||
if (workerPool->failed)
|
||||
{
|
||||
/* connection pool failed */
|
||||
return;
|
||||
return false;
|
||||
}
|
||||
|
||||
/* we might fail the execution or warn the user about connection timeouts */
|
||||
|
@ -2334,9 +2349,60 @@ ManageWorkerPool(WorkerPool *workerPool)
|
|||
if (failedConnectionCount >= 1)
|
||||
{
|
||||
/* do not attempt to open more connections after one failed */
|
||||
return;
|
||||
return false;
|
||||
}
|
||||
|
||||
if (IsTooEarlyToOpenConnections(workerPool))
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* IsTooEarlyToOpenConnections returns true if we should not open
|
||||
* new connections.
|
||||
*/
|
||||
static bool
|
||||
IsTooEarlyToOpenConnections(WorkerPool *workerPool)
|
||||
{
|
||||
return !UseConnectionPerPlacement() && ExecutorSlowStartInterval !=
|
||||
SLOW_START_DISABLED &&
|
||||
(MillisecondsPassedSince(workerPool->lastConnectionOpenTime) <
|
||||
ExecutorSlowStartInterval);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* CalculateNewConnectionCount returns the amount of connections
|
||||
* that we can currently open.
|
||||
*/
|
||||
static int
|
||||
CalculateNewConnectionCount(WorkerPool *workerPool)
|
||||
{
|
||||
DistributedExecution *execution = workerPool->distributedExecution;
|
||||
|
||||
int targetPoolSize = execution->targetPoolSize;
|
||||
int initiatedConnectionCount = list_length(workerPool->sessionList);
|
||||
int activeConnectionCount PG_USED_FOR_ASSERTS_ONLY =
|
||||
workerPool->activeConnectionCount;
|
||||
int idleConnectionCount PG_USED_FOR_ASSERTS_ONLY =
|
||||
workerPool->idleConnectionCount;
|
||||
int readyTaskCount = workerPool->readyTaskCount;
|
||||
int newConnectionCount = 0;
|
||||
|
||||
|
||||
/* we should always have more (or equal) active connections than idle connections */
|
||||
Assert(activeConnectionCount >= idleConnectionCount);
|
||||
|
||||
/* we should always have more (or equal) initiated connections than active connections */
|
||||
Assert(initiatedConnectionCount >= activeConnectionCount);
|
||||
|
||||
/* we should never have less than 0 connections ever */
|
||||
Assert(activeConnectionCount >= 0 && idleConnectionCount >= 0);
|
||||
|
||||
if (UseConnectionPerPlacement())
|
||||
{
|
||||
int unusedConnectionCount = workerPool->unusedConnectionCount;
|
||||
|
@ -2361,7 +2427,14 @@ ManageWorkerPool(WorkerPool *workerPool)
|
|||
* Number of additional connections we would need to run all ready tasks in
|
||||
* parallel.
|
||||
*/
|
||||
int newConnectionsForReadyTasks = readyTaskCount - usableConnectionCount;
|
||||
int newConnectionsForReadyTasks = Max(0, readyTaskCount - usableConnectionCount);
|
||||
|
||||
/* If Slow start is enabled we need to update the maxNewConnection to the current cycle's maximum.*/
|
||||
if (ExecutorSlowStartInterval > 0)
|
||||
{
|
||||
maxNewConnectionCount = Min(workerPool->maxNewConnectionsPerCycle,
|
||||
maxNewConnectionCount);
|
||||
}
|
||||
|
||||
/*
|
||||
* Open enough connections to handle all tasks that are ready, but no more
|
||||
|
@ -2369,30 +2442,23 @@ ManageWorkerPool(WorkerPool *workerPool)
|
|||
*/
|
||||
newConnectionCount = Min(newConnectionsForReadyTasks, maxNewConnectionCount);
|
||||
|
||||
if (newConnectionCount > 0 && ExecutorSlowStartInterval != SLOW_START_DISABLED)
|
||||
if (newConnectionCount > 0)
|
||||
{
|
||||
if (MillisecondsPassedSince(workerPool->lastConnectionOpenTime) >=
|
||||
ExecutorSlowStartInterval)
|
||||
{
|
||||
newConnectionCount = Min(newConnectionCount,
|
||||
workerPool->maxNewConnectionsPerCycle);
|
||||
|
||||
/* increase the open rate every cycle (like TCP slow start) */
|
||||
workerPool->maxNewConnectionsPerCycle += 1;
|
||||
}
|
||||
else
|
||||
{
|
||||
/* wait a bit until opening more connections */
|
||||
return;
|
||||
}
|
||||
/* increase the open rate every cycle (like TCP slow start) */
|
||||
workerPool->maxNewConnectionsPerCycle += 1;
|
||||
}
|
||||
}
|
||||
return newConnectionCount;
|
||||
}
|
||||
|
||||
if (newConnectionCount <= 0)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
/*
|
||||
* OpenNewConnections opens the given amount of connections for the given workerPool.
|
||||
*/
|
||||
static void
|
||||
OpenNewConnections(WorkerPool *workerPool, int newConnectionCount,
|
||||
TransactionProperties *transactionProperties)
|
||||
{
|
||||
ereport(DEBUG4, (errmsg("opening %d new connections to %s:%d", newConnectionCount,
|
||||
workerPool->nodeName, workerPool->nodePort)));
|
||||
|
||||
|
@ -2401,7 +2467,7 @@ ManageWorkerPool(WorkerPool *workerPool)
|
|||
/* experimental: just to see the perf benefits of caching connections */
|
||||
int connectionFlags = 0;
|
||||
|
||||
if (execution->transactionProperties->useRemoteTransactionBlocks ==
|
||||
if (transactionProperties->useRemoteTransactionBlocks ==
|
||||
TRANSACTION_BLOCKS_DISALLOWED)
|
||||
{
|
||||
connectionFlags |= OUTSIDE_TRANSACTION;
|
||||
|
@ -2480,9 +2546,6 @@ ManageWorkerPool(WorkerPool *workerPool)
|
|||
/* immediately run the state machine to handle potential failure */
|
||||
ConnectionStateMachine(session);
|
||||
}
|
||||
|
||||
INSTR_TIME_SET_CURRENT(workerPool->lastConnectionOpenTime);
|
||||
execution->connectionSetChanged = true;
|
||||
}
|
||||
|
||||
|
||||
|
|
Loading…
Reference in New Issue