From 64469708afc7fb7cab67f11bd7d0f53d5a66f59f Mon Sep 17 00:00:00 2001 From: SaitTalhaNisanci Date: Thu, 23 Jul 2020 13:47:35 +0300 Subject: [PATCH] separate the logic in ManageWorkerPool (#3298) --- .../distributed/executor/adaptive_executor.c | 164 +++++++++++++----- 1 file changed, 121 insertions(+), 43 deletions(-) diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index 9dcabfb86..4348b697e 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -603,6 +603,10 @@ static WorkerPool * FindOrCreateWorkerPool(DistributedExecution *execution, static WorkerSession * FindOrCreateWorkerSession(WorkerPool *workerPool, MultiConnection *connection); static void ManageWorkerPool(WorkerPool *workerPool); +static bool ShouldWaitForSlowStart(WorkerPool *workerPool); +static int CalculateNewConnectionCount(WorkerPool *workerPool); +static void OpenNewConnections(WorkerPool *workerPool, int newConnectionCount, + TransactionProperties *transactionProperties); static void CheckConnectionTimeout(WorkerPool *workerPool); static int UsableConnectionCount(WorkerPool *workerPool); static long NextEventTimeout(DistributedExecution *execution); @@ -615,6 +619,7 @@ static bool StartPlacementExecutionOnSession(TaskPlacementExecution *placementEx WorkerSession *session); static void ConnectionStateMachine(WorkerSession *session); static void HandleMultiConnectionSuccess(WorkerSession *session); +static bool HasAnyConnectionFailure(WorkerPool *workerPool); static void Activate2PCIfModifyingTransactionExpandsToNewNode(WorkerSession *session); static bool TransactionModifiedDistributedTable(DistributedExecution *execution); static void TransactionStateMachine(WorkerSession *session); @@ -2387,16 +2392,110 @@ static void ManageWorkerPool(WorkerPool *workerPool) { DistributedExecution *execution = workerPool->distributedExecution; + + /* we do not expand the pool further if there was any failure */ + if (HasAnyConnectionFailure(workerPool)) + { + return; + } + + /* we wait until a slow start interval has passed before expanding the pool */ + if (ShouldWaitForSlowStart(workerPool)) + { + return; + } + + int newConnectionCount = CalculateNewConnectionCount(workerPool); + + if (newConnectionCount <= 0) + { + return; + } + + OpenNewConnections(workerPool, newConnectionCount, execution->transactionProperties); + + INSTR_TIME_SET_CURRENT(workerPool->lastConnectionOpenTime); + execution->rebuildWaitEventSet = true; +} + + +/* + * HasAnyConnectionFailure returns true if worker pool has failed, + * or connection timed out or we have a failure in connections. + */ +static bool +HasAnyConnectionFailure(WorkerPool *workerPool) +{ + if (workerPool->failed) + { + /* connection pool failed */ + return true; + } + + /* we might fail the execution or warn the user about connection timeouts */ + if (workerPool->checkForPoolTimeout) + { + CheckConnectionTimeout(workerPool); + } + + int failedConnectionCount = workerPool->failedConnectionCount; + if (failedConnectionCount >= 1) + { + /* do not attempt to open more connections after one failed */ + return true; + } + return false; +} + + +/* + * ShouldWaitForSlowStart returns true if we should wait before + * opening a new connection because of slow start algorithm. + */ +static bool +ShouldWaitForSlowStart(WorkerPool *workerPool) +{ + /* if we can use a connection per placement, we don't need to wait for slowstart */ + if (UseConnectionPerPlacement()) + { + return false; + } + + /* if slow start is disabled, we can open new connections */ + if (ExecutorSlowStartInterval == SLOW_START_DISABLED) + { + return false; + } + + double milliSecondsPassedSince = MillisecondsPassedSince( + workerPool->lastConnectionOpenTime); + if (milliSecondsPassedSince < ExecutorSlowStartInterval) + { + return true; + } + return false; +} + + +/* + * CalculateNewConnectionCount returns the amount of connections + * that we can currently open. + */ +static int +CalculateNewConnectionCount(WorkerPool *workerPool) +{ + DistributedExecution *execution = workerPool->distributedExecution; + int targetPoolSize = execution->targetPoolSize; int initiatedConnectionCount = list_length(workerPool->sessionList); int activeConnectionCount PG_USED_FOR_ASSERTS_ONLY = workerPool->activeConnectionCount; int idleConnectionCount PG_USED_FOR_ASSERTS_ONLY = workerPool->idleConnectionCount; - int failedConnectionCount = workerPool->failedConnectionCount; int readyTaskCount = workerPool->readyTaskCount; int newConnectionCount = 0; + /* we should always have more (or equal) active connections than idle connections */ Assert(activeConnectionCount >= idleConnectionCount); @@ -2406,24 +2505,6 @@ ManageWorkerPool(WorkerPool *workerPool) /* we should never have less than 0 connections ever */ Assert(activeConnectionCount >= 0 && idleConnectionCount >= 0); - if (workerPool->failed) - { - /* connection pool failed */ - return; - } - - /* we might fail the execution or warn the user about connection timeouts */ - if (workerPool->checkForPoolTimeout) - { - CheckConnectionTimeout(workerPool); - } - - if (failedConnectionCount >= 1) - { - /* do not attempt to open more connections after one failed */ - return; - } - if (UseConnectionPerPlacement()) { int unusedConnectionCount = workerPool->unusedConnectionCount; @@ -2448,7 +2529,14 @@ ManageWorkerPool(WorkerPool *workerPool) * Number of additional connections we would need to run all ready tasks in * parallel. */ - int newConnectionsForReadyTasks = readyTaskCount - usableConnectionCount; + int newConnectionsForReadyTasks = Max(0, readyTaskCount - usableConnectionCount); + + /* If Slow start is enabled we need to update the maxNewConnection to the current cycle's maximum.*/ + if (ExecutorSlowStartInterval != SLOW_START_DISABLED) + { + maxNewConnectionCount = Min(workerPool->maxNewConnectionsPerCycle, + maxNewConnectionCount); + } /* * Open enough connections to handle all tasks that are ready, but no more @@ -2456,30 +2544,23 @@ ManageWorkerPool(WorkerPool *workerPool) */ newConnectionCount = Min(newConnectionsForReadyTasks, maxNewConnectionCount); - if (newConnectionCount > 0 && ExecutorSlowStartInterval != SLOW_START_DISABLED) + if (newConnectionCount > 0) { - if (MillisecondsPassedSince(workerPool->lastConnectionOpenTime) >= - ExecutorSlowStartInterval) - { - newConnectionCount = Min(newConnectionCount, - workerPool->maxNewConnectionsPerCycle); - - /* increase the open rate every cycle (like TCP slow start) */ - workerPool->maxNewConnectionsPerCycle += 1; - } - else - { - /* wait a bit until opening more connections */ - return; - } + /* increase the open rate every cycle (like TCP slow start) */ + workerPool->maxNewConnectionsPerCycle += 1; } } + return newConnectionCount; +} - if (newConnectionCount <= 0) - { - return; - } +/* + * OpenNewConnections opens the given amount of connections for the given workerPool. + */ +static void +OpenNewConnections(WorkerPool *workerPool, int newConnectionCount, + TransactionProperties *transactionProperties) +{ ereport(DEBUG4, (errmsg("opening %d new connections to %s:%d", newConnectionCount, workerPool->nodeName, workerPool->nodePort))); @@ -2488,7 +2569,7 @@ ManageWorkerPool(WorkerPool *workerPool) /* experimental: just to see the perf benefits of caching connections */ int connectionFlags = 0; - if (execution->transactionProperties->useRemoteTransactionBlocks == + if (transactionProperties->useRemoteTransactionBlocks == TRANSACTION_BLOCKS_DISALLOWED) { connectionFlags |= OUTSIDE_TRANSACTION; @@ -2544,9 +2625,6 @@ ManageWorkerPool(WorkerPool *workerPool) /* immediately run the state machine to handle potential failure */ ConnectionStateMachine(session); } - - INSTR_TIME_SET_CURRENT(workerPool->lastConnectionOpenTime); - execution->rebuildWaitEventSet = true; }