Merge pull request #4199 from citusdata/terminate_connection

Forcefully terminate connections after citus.node_connection_timeout
pull/4213/head
Önder Kalacı 2020-10-01 08:56:39 +02:00 committed by GitHub
commit f3962fc7f6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 71 additions and 6 deletions

View File

@ -609,6 +609,7 @@ static int CalculateNewConnectionCount(WorkerPool *workerPool);
static void OpenNewConnections(WorkerPool *workerPool, int newConnectionCount,
TransactionProperties *transactionProperties);
static void CheckConnectionTimeout(WorkerPool *workerPool);
static void MarkEstablishingSessionsTimedOut(WorkerPool *workerPool);
static int UsableConnectionCount(WorkerPool *workerPool);
static long NextEventTimeout(DistributedExecution *execution);
static WaitEventSet * BuildWaitEventSet(List *sessionList);
@ -1663,7 +1664,8 @@ CleanUpSessions(DistributedExecution *execution)
if (connection->connectionState == MULTI_CONNECTION_CONNECTING ||
connection->connectionState == MULTI_CONNECTION_FAILED ||
connection->connectionState == MULTI_CONNECTION_LOST)
connection->connectionState == MULTI_CONNECTION_LOST ||
connection->connectionState == MULTI_CONNECTION_TIMED_OUT)
{
/*
* We want the MultiConnection go away and not used in
@ -2704,6 +2706,16 @@ CheckConnectionTimeout(WorkerPool *workerPool)
"%s:%d after %u ms", workerPool->nodeName,
workerPool->nodePort,
NodeConnectionTimeout)));
/*
* We hit the connection timeout. In that case, we should not let the
* connection establishment to continue because the execution logic
* pretends that failed sessions are not going to be used anymore.
*
* That's why we mark the connection as timed out to trigger the state
* changes in the executor.
*/
MarkEstablishingSessionsTimedOut(workerPool);
}
else
{
@ -2714,6 +2726,28 @@ CheckConnectionTimeout(WorkerPool *workerPool)
}
/*
* MarkEstablishingSessionsTimedOut goes over the sessions in the given
* workerPool and marks them timed out. ConnectionStateMachine()
* later cleans up the sessions.
*/
static void
MarkEstablishingSessionsTimedOut(WorkerPool *workerPool)
{
WorkerSession *session = NULL;
foreach_ptr(session, workerPool->sessionList)
{
MultiConnection *connection = session->connection;
if (connection->connectionState == MULTI_CONNECTION_CONNECTING ||
connection->connectionState == MULTI_CONNECTION_INITIAL)
{
connection->connectionState = MULTI_CONNECTION_TIMED_OUT;
}
}
}
/*
* UsableConnectionCount returns the number of connections in the worker pool
* that are (soon to be) usable for sending commands, this includes both idle
@ -2845,6 +2879,20 @@ ConnectionStateMachine(WorkerSession *session)
break;
}
case MULTI_CONNECTION_TIMED_OUT:
{
/*
* When the connection timeout happens, the connection
* might still be able to successfuly established. However,
* the executor should not try to use this connection as
* the state machines might have already progressed and used
* new pools/sessions instead. That's why we terminate the
* connection, clear any state associated with it.
*/
connection->connectionState = MULTI_CONNECTION_FAILED;
break;
}
case MULTI_CONNECTION_CONNECTING:
{
ConnStatusType status = PQstatus(connection->pgConn);
@ -3988,8 +4036,14 @@ WorkerPoolFailed(WorkerPool *workerPool)
bool succeeded = false;
dlist_iter iter;
/* a pool cannot fail multiple times */
Assert(!workerPool->failed);
/*
* A pool cannot fail multiple times, the necessary actions
* has already be taken, so bail out.
*/
if (workerPool->failed)
{
return;
}
dlist_foreach(iter, &workerPool->pendingTaskQueue)
{
@ -4206,8 +4260,18 @@ ScheduleNextPlacementExecution(TaskPlacementExecution *placementExecution, bool
int nextPlacementExecutionIndex =
placementExecution->placementExecutionIndex + 1;
/* if all tasks failed then we should already have errored out */
Assert(nextPlacementExecutionIndex < placementExecutionCount);
/*
* If all tasks failed then we should already have errored out.
* Still, be defensive and throw error instead of crashes.
*/
if (nextPlacementExecutionIndex >= placementExecutionCount)
{
WorkerPool *workerPool = placementExecution->workerPool;
ereport(ERROR, (errmsg("execution cannot recover from multiple "
"connection failures. Last node failed "
"%s:%d", workerPool->nodeName,
workerPool->nodePort)));
}
/* get the next placement in the planning order */
nextPlacementExecution =

View File

@ -82,7 +82,8 @@ typedef enum MultiConnectionState
MULTI_CONNECTION_CONNECTING,
MULTI_CONNECTION_CONNECTED,
MULTI_CONNECTION_FAILED,
MULTI_CONNECTION_LOST
MULTI_CONNECTION_LOST,
MULTI_CONNECTION_TIMED_OUT
} MultiConnectionState;