diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index 6ac1684a5..2b05012c6 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -609,6 +609,7 @@ static int CalculateNewConnectionCount(WorkerPool *workerPool); static void OpenNewConnections(WorkerPool *workerPool, int newConnectionCount, TransactionProperties *transactionProperties); static void CheckConnectionTimeout(WorkerPool *workerPool); +static void MarkEstablishingSessionsTimedOut(WorkerPool *workerPool); static int UsableConnectionCount(WorkerPool *workerPool); static long NextEventTimeout(DistributedExecution *execution); static WaitEventSet * BuildWaitEventSet(List *sessionList); @@ -1663,7 +1664,8 @@ CleanUpSessions(DistributedExecution *execution) if (connection->connectionState == MULTI_CONNECTION_CONNECTING || connection->connectionState == MULTI_CONNECTION_FAILED || - connection->connectionState == MULTI_CONNECTION_LOST) + connection->connectionState == MULTI_CONNECTION_LOST || + connection->connectionState == MULTI_CONNECTION_TIMED_OUT) { /* * We want the MultiConnection go away and not used in @@ -2704,6 +2706,16 @@ CheckConnectionTimeout(WorkerPool *workerPool) "%s:%d after %u ms", workerPool->nodeName, workerPool->nodePort, NodeConnectionTimeout))); + + /* + * We hit the connection timeout. In that case, we should not let the + * connection establishment to continue because the execution logic + * pretends that failed sessions are not going to be used anymore. + * + * That's why we mark the connection as timed out to trigger the state + * changes in the executor. + */ + MarkEstablishingSessionsTimedOut(workerPool); } else { @@ -2714,6 +2726,28 @@ CheckConnectionTimeout(WorkerPool *workerPool) } +/* + * MarkEstablishingSessionsTimedOut goes over the sessions in the given + * workerPool and marks them timed out. ConnectionStateMachine() + * later cleans up the sessions. + */ +static void +MarkEstablishingSessionsTimedOut(WorkerPool *workerPool) +{ + WorkerSession *session = NULL; + foreach_ptr(session, workerPool->sessionList) + { + MultiConnection *connection = session->connection; + + if (connection->connectionState == MULTI_CONNECTION_CONNECTING || + connection->connectionState == MULTI_CONNECTION_INITIAL) + { + connection->connectionState = MULTI_CONNECTION_TIMED_OUT; + } + } +} + + /* * UsableConnectionCount returns the number of connections in the worker pool * that are (soon to be) usable for sending commands, this includes both idle @@ -2845,6 +2879,20 @@ ConnectionStateMachine(WorkerSession *session) break; } + case MULTI_CONNECTION_TIMED_OUT: + { + /* + * When the connection timeout happens, the connection + * might still be able to successfuly established. However, + * the executor should not try to use this connection as + * the state machines might have already progressed and used + * new pools/sessions instead. That's why we terminate the + * connection, clear any state associated with it. + */ + connection->connectionState = MULTI_CONNECTION_FAILED; + break; + } + case MULTI_CONNECTION_CONNECTING: { ConnStatusType status = PQstatus(connection->pgConn); @@ -3988,8 +4036,14 @@ WorkerPoolFailed(WorkerPool *workerPool) bool succeeded = false; dlist_iter iter; - /* a pool cannot fail multiple times */ - Assert(!workerPool->failed); + /* + * A pool cannot fail multiple times, the necessary actions + * has already be taken, so bail out. + */ + if (workerPool->failed) + { + return; + } dlist_foreach(iter, &workerPool->pendingTaskQueue) { @@ -4206,8 +4260,18 @@ ScheduleNextPlacementExecution(TaskPlacementExecution *placementExecution, bool int nextPlacementExecutionIndex = placementExecution->placementExecutionIndex + 1; - /* if all tasks failed then we should already have errored out */ - Assert(nextPlacementExecutionIndex < placementExecutionCount); + /* + * If all tasks failed then we should already have errored out. + * Still, be defensive and throw error instead of crashes. + */ + if (nextPlacementExecutionIndex >= placementExecutionCount) + { + WorkerPool *workerPool = placementExecution->workerPool; + ereport(ERROR, (errmsg("execution cannot recover from multiple " + "connection failures. Last node failed " + "%s:%d", workerPool->nodeName, + workerPool->nodePort))); + } /* get the next placement in the planning order */ nextPlacementExecution = diff --git a/src/include/distributed/connection_management.h b/src/include/distributed/connection_management.h index 05e629fc6..189d695e6 100644 --- a/src/include/distributed/connection_management.h +++ b/src/include/distributed/connection_management.h @@ -82,7 +82,8 @@ typedef enum MultiConnectionState MULTI_CONNECTION_CONNECTING, MULTI_CONNECTION_CONNECTED, MULTI_CONNECTION_FAILED, - MULTI_CONNECTION_LOST + MULTI_CONNECTION_LOST, + MULTI_CONNECTION_TIMED_OUT } MultiConnectionState;