mirror of https://github.com/citusdata/citus.git
separate the logic in ManageWorkerPool (#3298)
parent
2c8066a313
commit
64469708af
|
@ -603,6 +603,10 @@ static WorkerPool * FindOrCreateWorkerPool(DistributedExecution *execution,
|
||||||
static WorkerSession * FindOrCreateWorkerSession(WorkerPool *workerPool,
|
static WorkerSession * FindOrCreateWorkerSession(WorkerPool *workerPool,
|
||||||
MultiConnection *connection);
|
MultiConnection *connection);
|
||||||
static void ManageWorkerPool(WorkerPool *workerPool);
|
static void ManageWorkerPool(WorkerPool *workerPool);
|
||||||
|
static bool ShouldWaitForSlowStart(WorkerPool *workerPool);
|
||||||
|
static int CalculateNewConnectionCount(WorkerPool *workerPool);
|
||||||
|
static void OpenNewConnections(WorkerPool *workerPool, int newConnectionCount,
|
||||||
|
TransactionProperties *transactionProperties);
|
||||||
static void CheckConnectionTimeout(WorkerPool *workerPool);
|
static void CheckConnectionTimeout(WorkerPool *workerPool);
|
||||||
static int UsableConnectionCount(WorkerPool *workerPool);
|
static int UsableConnectionCount(WorkerPool *workerPool);
|
||||||
static long NextEventTimeout(DistributedExecution *execution);
|
static long NextEventTimeout(DistributedExecution *execution);
|
||||||
|
@ -615,6 +619,7 @@ static bool StartPlacementExecutionOnSession(TaskPlacementExecution *placementEx
|
||||||
WorkerSession *session);
|
WorkerSession *session);
|
||||||
static void ConnectionStateMachine(WorkerSession *session);
|
static void ConnectionStateMachine(WorkerSession *session);
|
||||||
static void HandleMultiConnectionSuccess(WorkerSession *session);
|
static void HandleMultiConnectionSuccess(WorkerSession *session);
|
||||||
|
static bool HasAnyConnectionFailure(WorkerPool *workerPool);
|
||||||
static void Activate2PCIfModifyingTransactionExpandsToNewNode(WorkerSession *session);
|
static void Activate2PCIfModifyingTransactionExpandsToNewNode(WorkerSession *session);
|
||||||
static bool TransactionModifiedDistributedTable(DistributedExecution *execution);
|
static bool TransactionModifiedDistributedTable(DistributedExecution *execution);
|
||||||
static void TransactionStateMachine(WorkerSession *session);
|
static void TransactionStateMachine(WorkerSession *session);
|
||||||
|
@ -2387,16 +2392,110 @@ static void
|
||||||
ManageWorkerPool(WorkerPool *workerPool)
|
ManageWorkerPool(WorkerPool *workerPool)
|
||||||
{
|
{
|
||||||
DistributedExecution *execution = workerPool->distributedExecution;
|
DistributedExecution *execution = workerPool->distributedExecution;
|
||||||
|
|
||||||
|
/* we do not expand the pool further if there was any failure */
|
||||||
|
if (HasAnyConnectionFailure(workerPool))
|
||||||
|
{
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* we wait until a slow start interval has passed before expanding the pool */
|
||||||
|
if (ShouldWaitForSlowStart(workerPool))
|
||||||
|
{
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
int newConnectionCount = CalculateNewConnectionCount(workerPool);
|
||||||
|
|
||||||
|
if (newConnectionCount <= 0)
|
||||||
|
{
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
OpenNewConnections(workerPool, newConnectionCount, execution->transactionProperties);
|
||||||
|
|
||||||
|
INSTR_TIME_SET_CURRENT(workerPool->lastConnectionOpenTime);
|
||||||
|
execution->rebuildWaitEventSet = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* HasAnyConnectionFailure returns true if worker pool has failed,
|
||||||
|
* or connection timed out or we have a failure in connections.
|
||||||
|
*/
|
||||||
|
static bool
|
||||||
|
HasAnyConnectionFailure(WorkerPool *workerPool)
|
||||||
|
{
|
||||||
|
if (workerPool->failed)
|
||||||
|
{
|
||||||
|
/* connection pool failed */
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* we might fail the execution or warn the user about connection timeouts */
|
||||||
|
if (workerPool->checkForPoolTimeout)
|
||||||
|
{
|
||||||
|
CheckConnectionTimeout(workerPool);
|
||||||
|
}
|
||||||
|
|
||||||
|
int failedConnectionCount = workerPool->failedConnectionCount;
|
||||||
|
if (failedConnectionCount >= 1)
|
||||||
|
{
|
||||||
|
/* do not attempt to open more connections after one failed */
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* ShouldWaitForSlowStart returns true if we should wait before
|
||||||
|
* opening a new connection because of slow start algorithm.
|
||||||
|
*/
|
||||||
|
static bool
|
||||||
|
ShouldWaitForSlowStart(WorkerPool *workerPool)
|
||||||
|
{
|
||||||
|
/* if we can use a connection per placement, we don't need to wait for slowstart */
|
||||||
|
if (UseConnectionPerPlacement())
|
||||||
|
{
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* if slow start is disabled, we can open new connections */
|
||||||
|
if (ExecutorSlowStartInterval == SLOW_START_DISABLED)
|
||||||
|
{
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
double milliSecondsPassedSince = MillisecondsPassedSince(
|
||||||
|
workerPool->lastConnectionOpenTime);
|
||||||
|
if (milliSecondsPassedSince < ExecutorSlowStartInterval)
|
||||||
|
{
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* CalculateNewConnectionCount returns the amount of connections
|
||||||
|
* that we can currently open.
|
||||||
|
*/
|
||||||
|
static int
|
||||||
|
CalculateNewConnectionCount(WorkerPool *workerPool)
|
||||||
|
{
|
||||||
|
DistributedExecution *execution = workerPool->distributedExecution;
|
||||||
|
|
||||||
int targetPoolSize = execution->targetPoolSize;
|
int targetPoolSize = execution->targetPoolSize;
|
||||||
int initiatedConnectionCount = list_length(workerPool->sessionList);
|
int initiatedConnectionCount = list_length(workerPool->sessionList);
|
||||||
int activeConnectionCount PG_USED_FOR_ASSERTS_ONLY =
|
int activeConnectionCount PG_USED_FOR_ASSERTS_ONLY =
|
||||||
workerPool->activeConnectionCount;
|
workerPool->activeConnectionCount;
|
||||||
int idleConnectionCount PG_USED_FOR_ASSERTS_ONLY =
|
int idleConnectionCount PG_USED_FOR_ASSERTS_ONLY =
|
||||||
workerPool->idleConnectionCount;
|
workerPool->idleConnectionCount;
|
||||||
int failedConnectionCount = workerPool->failedConnectionCount;
|
|
||||||
int readyTaskCount = workerPool->readyTaskCount;
|
int readyTaskCount = workerPool->readyTaskCount;
|
||||||
int newConnectionCount = 0;
|
int newConnectionCount = 0;
|
||||||
|
|
||||||
|
|
||||||
/* we should always have more (or equal) active connections than idle connections */
|
/* we should always have more (or equal) active connections than idle connections */
|
||||||
Assert(activeConnectionCount >= idleConnectionCount);
|
Assert(activeConnectionCount >= idleConnectionCount);
|
||||||
|
|
||||||
|
@ -2406,24 +2505,6 @@ ManageWorkerPool(WorkerPool *workerPool)
|
||||||
/* we should never have less than 0 connections ever */
|
/* we should never have less than 0 connections ever */
|
||||||
Assert(activeConnectionCount >= 0 && idleConnectionCount >= 0);
|
Assert(activeConnectionCount >= 0 && idleConnectionCount >= 0);
|
||||||
|
|
||||||
if (workerPool->failed)
|
|
||||||
{
|
|
||||||
/* connection pool failed */
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
/* we might fail the execution or warn the user about connection timeouts */
|
|
||||||
if (workerPool->checkForPoolTimeout)
|
|
||||||
{
|
|
||||||
CheckConnectionTimeout(workerPool);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (failedConnectionCount >= 1)
|
|
||||||
{
|
|
||||||
/* do not attempt to open more connections after one failed */
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (UseConnectionPerPlacement())
|
if (UseConnectionPerPlacement())
|
||||||
{
|
{
|
||||||
int unusedConnectionCount = workerPool->unusedConnectionCount;
|
int unusedConnectionCount = workerPool->unusedConnectionCount;
|
||||||
|
@ -2448,7 +2529,14 @@ ManageWorkerPool(WorkerPool *workerPool)
|
||||||
* Number of additional connections we would need to run all ready tasks in
|
* Number of additional connections we would need to run all ready tasks in
|
||||||
* parallel.
|
* parallel.
|
||||||
*/
|
*/
|
||||||
int newConnectionsForReadyTasks = readyTaskCount - usableConnectionCount;
|
int newConnectionsForReadyTasks = Max(0, readyTaskCount - usableConnectionCount);
|
||||||
|
|
||||||
|
/* If Slow start is enabled we need to update the maxNewConnection to the current cycle's maximum.*/
|
||||||
|
if (ExecutorSlowStartInterval != SLOW_START_DISABLED)
|
||||||
|
{
|
||||||
|
maxNewConnectionCount = Min(workerPool->maxNewConnectionsPerCycle,
|
||||||
|
maxNewConnectionCount);
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Open enough connections to handle all tasks that are ready, but no more
|
* Open enough connections to handle all tasks that are ready, but no more
|
||||||
|
@ -2456,30 +2544,23 @@ ManageWorkerPool(WorkerPool *workerPool)
|
||||||
*/
|
*/
|
||||||
newConnectionCount = Min(newConnectionsForReadyTasks, maxNewConnectionCount);
|
newConnectionCount = Min(newConnectionsForReadyTasks, maxNewConnectionCount);
|
||||||
|
|
||||||
if (newConnectionCount > 0 && ExecutorSlowStartInterval != SLOW_START_DISABLED)
|
if (newConnectionCount > 0)
|
||||||
{
|
{
|
||||||
if (MillisecondsPassedSince(workerPool->lastConnectionOpenTime) >=
|
|
||||||
ExecutorSlowStartInterval)
|
|
||||||
{
|
|
||||||
newConnectionCount = Min(newConnectionCount,
|
|
||||||
workerPool->maxNewConnectionsPerCycle);
|
|
||||||
|
|
||||||
/* increase the open rate every cycle (like TCP slow start) */
|
/* increase the open rate every cycle (like TCP slow start) */
|
||||||
workerPool->maxNewConnectionsPerCycle += 1;
|
workerPool->maxNewConnectionsPerCycle += 1;
|
||||||
}
|
}
|
||||||
else
|
|
||||||
{
|
|
||||||
/* wait a bit until opening more connections */
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
return newConnectionCount;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (newConnectionCount <= 0)
|
|
||||||
{
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* OpenNewConnections opens the given amount of connections for the given workerPool.
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
OpenNewConnections(WorkerPool *workerPool, int newConnectionCount,
|
||||||
|
TransactionProperties *transactionProperties)
|
||||||
|
{
|
||||||
ereport(DEBUG4, (errmsg("opening %d new connections to %s:%d", newConnectionCount,
|
ereport(DEBUG4, (errmsg("opening %d new connections to %s:%d", newConnectionCount,
|
||||||
workerPool->nodeName, workerPool->nodePort)));
|
workerPool->nodeName, workerPool->nodePort)));
|
||||||
|
|
||||||
|
@ -2488,7 +2569,7 @@ ManageWorkerPool(WorkerPool *workerPool)
|
||||||
/* experimental: just to see the perf benefits of caching connections */
|
/* experimental: just to see the perf benefits of caching connections */
|
||||||
int connectionFlags = 0;
|
int connectionFlags = 0;
|
||||||
|
|
||||||
if (execution->transactionProperties->useRemoteTransactionBlocks ==
|
if (transactionProperties->useRemoteTransactionBlocks ==
|
||||||
TRANSACTION_BLOCKS_DISALLOWED)
|
TRANSACTION_BLOCKS_DISALLOWED)
|
||||||
{
|
{
|
||||||
connectionFlags |= OUTSIDE_TRANSACTION;
|
connectionFlags |= OUTSIDE_TRANSACTION;
|
||||||
|
@ -2544,9 +2625,6 @@ ManageWorkerPool(WorkerPool *workerPool)
|
||||||
/* immediately run the state machine to handle potential failure */
|
/* immediately run the state machine to handle potential failure */
|
||||||
ConnectionStateMachine(session);
|
ConnectionStateMachine(session);
|
||||||
}
|
}
|
||||||
|
|
||||||
INSTR_TIME_SET_CURRENT(workerPool->lastConnectionOpenTime);
|
|
||||||
execution->rebuildWaitEventSet = true;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue