diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index 4c8beb97c..6b38b4ebd 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -590,6 +590,11 @@ static WorkerSession * FindOrCreateWorkerSession(WorkerPool *workerPool, MultiConnection *connection); static void ManageWorkerPool(WorkerPool *workerPool); static bool ShouldWaitForConnection(WorkerPool *workerPool); +static bool CanOpenNewConnection(WorkerPool *workerPool); +static bool IsTooEarlyToOpenConnections(WorkerPool *workerPool); +static int CalculateNewConnectionCount(WorkerPool *workerPool); +static void OpenNewConnections(WorkerPool *workerPool, int newConnectionCount, + TransactionProperties *transactionProperties); static void CheckConnectionTimeout(WorkerPool *workerPool); static int UsableConnectionCount(WorkerPool *workerPool); static long NextEventTimeout(DistributedExecution *execution); @@ -2300,29 +2305,39 @@ static void ManageWorkerPool(WorkerPool *workerPool) { DistributedExecution *execution = workerPool->distributedExecution; - int targetPoolSize = execution->targetPoolSize; - int initiatedConnectionCount = list_length(workerPool->sessionList); - int activeConnectionCount PG_USED_FOR_ASSERTS_ONLY = - workerPool->activeConnectionCount; - int idleConnectionCount PG_USED_FOR_ASSERTS_ONLY = - workerPool->idleConnectionCount; + + if (!CanOpenNewConnection(workerPool)) + { + return; + } + + int newConnectionCount = CalculateNewConnectionCount(workerPool); + + if (newConnectionCount <= 0) + { + return; + } + + OpenNewConnections(workerPool, newConnectionCount, execution->transactionProperties); + + INSTR_TIME_SET_CURRENT(workerPool->lastConnectionOpenTime); + execution->connectionSetChanged = true; +} + + +/* + * CanOpenNewConnection returns true if we can open new connections + * for the given workerPool. + */ +static bool +CanOpenNewConnection(WorkerPool *workerPool) +{ int failedConnectionCount = workerPool->failedConnectionCount; - int readyTaskCount = workerPool->readyTaskCount; - int newConnectionCount = 0; - - /* we should always have more (or equal) active connections than idle connections */ - Assert(activeConnectionCount >= idleConnectionCount); - - /* we should always have more (or equal) initiated connections than active connections */ - Assert(initiatedConnectionCount >= activeConnectionCount); - - /* we should never have less than 0 connections ever */ - Assert(activeConnectionCount >= 0 && idleConnectionCount >= 0); if (workerPool->failed) { /* connection pool failed */ - return; + return false; } /* we might fail the execution or warn the user about connection timeouts */ @@ -2334,9 +2349,60 @@ ManageWorkerPool(WorkerPool *workerPool) if (failedConnectionCount >= 1) { /* do not attempt to open more connections after one failed */ - return; + return false; } + if (IsTooEarlyToOpenConnections(workerPool)) + { + return false; + } + + return true; +} + + +/* + * IsTooEarlyToOpenConnections returns true if we should not open + * new connections. + */ +static bool +IsTooEarlyToOpenConnections(WorkerPool *workerPool) +{ + return !UseConnectionPerPlacement() && ExecutorSlowStartInterval != + SLOW_START_DISABLED && + (MillisecondsPassedSince(workerPool->lastConnectionOpenTime) < + ExecutorSlowStartInterval); +} + + +/* + * CalculateNewConnectionCount returns the amount of connections + * that we can currently open. + */ +static int +CalculateNewConnectionCount(WorkerPool *workerPool) +{ + DistributedExecution *execution = workerPool->distributedExecution; + + int targetPoolSize = execution->targetPoolSize; + int initiatedConnectionCount = list_length(workerPool->sessionList); + int activeConnectionCount PG_USED_FOR_ASSERTS_ONLY = + workerPool->activeConnectionCount; + int idleConnectionCount PG_USED_FOR_ASSERTS_ONLY = + workerPool->idleConnectionCount; + int readyTaskCount = workerPool->readyTaskCount; + int newConnectionCount = 0; + + + /* we should always have more (or equal) active connections than idle connections */ + Assert(activeConnectionCount >= idleConnectionCount); + + /* we should always have more (or equal) initiated connections than active connections */ + Assert(initiatedConnectionCount >= activeConnectionCount); + + /* we should never have less than 0 connections ever */ + Assert(activeConnectionCount >= 0 && idleConnectionCount >= 0); + if (UseConnectionPerPlacement()) { int unusedConnectionCount = workerPool->unusedConnectionCount; @@ -2361,7 +2427,14 @@ ManageWorkerPool(WorkerPool *workerPool) * Number of additional connections we would need to run all ready tasks in * parallel. */ - int newConnectionsForReadyTasks = readyTaskCount - usableConnectionCount; + int newConnectionsForReadyTasks = Max(0, readyTaskCount - usableConnectionCount); + + /* If Slow start is enabled we need to update the maxNewConnection to the current cycle's maximum.*/ + if (ExecutorSlowStartInterval > 0) + { + maxNewConnectionCount = Min(workerPool->maxNewConnectionsPerCycle, + maxNewConnectionCount); + } /* * Open enough connections to handle all tasks that are ready, but no more @@ -2369,30 +2442,23 @@ ManageWorkerPool(WorkerPool *workerPool) */ newConnectionCount = Min(newConnectionsForReadyTasks, maxNewConnectionCount); - if (newConnectionCount > 0 && ExecutorSlowStartInterval != SLOW_START_DISABLED) + if (newConnectionCount > 0) { - if (MillisecondsPassedSince(workerPool->lastConnectionOpenTime) >= - ExecutorSlowStartInterval) - { - newConnectionCount = Min(newConnectionCount, - workerPool->maxNewConnectionsPerCycle); - - /* increase the open rate every cycle (like TCP slow start) */ - workerPool->maxNewConnectionsPerCycle += 1; - } - else - { - /* wait a bit until opening more connections */ - return; - } + /* increase the open rate every cycle (like TCP slow start) */ + workerPool->maxNewConnectionsPerCycle += 1; } } + return newConnectionCount; +} - if (newConnectionCount <= 0) - { - return; - } +/* + * OpenNewConnections opens the given amount of connections for the given workerPool. + */ +static void +OpenNewConnections(WorkerPool *workerPool, int newConnectionCount, + TransactionProperties *transactionProperties) +{ ereport(DEBUG4, (errmsg("opening %d new connections to %s:%d", newConnectionCount, workerPool->nodeName, workerPool->nodePort))); @@ -2401,7 +2467,7 @@ ManageWorkerPool(WorkerPool *workerPool) /* experimental: just to see the perf benefits of caching connections */ int connectionFlags = 0; - if (execution->transactionProperties->useRemoteTransactionBlocks == + if (transactionProperties->useRemoteTransactionBlocks == TRANSACTION_BLOCKS_DISALLOWED) { connectionFlags |= OUTSIDE_TRANSACTION; @@ -2480,9 +2546,6 @@ ManageWorkerPool(WorkerPool *workerPool) /* immediately run the state machine to handle potential failure */ ConnectionStateMachine(session); } - - INSTR_TIME_SET_CURRENT(workerPool->lastConnectionOpenTime); - execution->connectionSetChanged = true; }