Forcefully terminate connections after citus.node_connection_timeout

After the connection timeout, we fail the session/pool. However, the
underlying connection can still be trying to connect. That is dangerous
because the new placement executions have already been in place. The
executor cannot handle the situation where multiple of
EXECUTION_ORDER_ANY task executions succeeds.

Adding a regression test doesn't seem easily doable. To reproduce the issue
- Add 2 worker nodes
- create a reference table
- set citus.node_connection_timeout to 1ms (requires code change)
- Continiously execute `SELECT count(*) FROM ref_table`
- Sometime later, you hit an out-of-array access in
  `ScheduleNextPlacementExecution()` hence crashing.
- The reason for that is sometimes the first connection
  successfully established while the executor is already
  trying to execute the query on the second node.
pull/4199/head
Onder Kalaci 2020-09-28 16:20:16 +02:00
parent 2894002211
commit 56ca256374
2 changed files with 71 additions and 6 deletions

View File

@ -609,6 +609,7 @@ static int CalculateNewConnectionCount(WorkerPool *workerPool);
static void OpenNewConnections(WorkerPool *workerPool, int newConnectionCount,
TransactionProperties *transactionProperties);
static void CheckConnectionTimeout(WorkerPool *workerPool);
static void MarkEstablishingSessionsTimedOut(WorkerPool *workerPool);
static int UsableConnectionCount(WorkerPool *workerPool);
static long NextEventTimeout(DistributedExecution *execution);
static WaitEventSet * BuildWaitEventSet(List *sessionList);
@ -1663,7 +1664,8 @@ CleanUpSessions(DistributedExecution *execution)
if (connection->connectionState == MULTI_CONNECTION_CONNECTING ||
connection->connectionState == MULTI_CONNECTION_FAILED ||
connection->connectionState == MULTI_CONNECTION_LOST)
connection->connectionState == MULTI_CONNECTION_LOST ||
connection->connectionState == MULTI_CONNECTION_TIMED_OUT)
{
/*
* We want the MultiConnection go away and not used in
@ -2704,6 +2706,16 @@ CheckConnectionTimeout(WorkerPool *workerPool)
"%s:%d after %u ms", workerPool->nodeName,
workerPool->nodePort,
NodeConnectionTimeout)));
/*
* We hit the connection timeout. In that case, we should not let the
* connection establishment to continue because the execution logic
* pretends that failed sessions are not going to be used anymore.
*
* That's why we mark the connection as timed out to trigger the state
* changes in the executor.
*/
MarkEstablishingSessionsTimedOut(workerPool);
}
else
{
@ -2714,6 +2726,28 @@ CheckConnectionTimeout(WorkerPool *workerPool)
}
/*
* MarkEstablishingSessionsTimedOut goes over the sessions in the given
* workerPool and marks them timed out. ConnectionStateMachine()
* later cleans up the sessions.
*/
static void
MarkEstablishingSessionsTimedOut(WorkerPool *workerPool)
{
WorkerSession *session = NULL;
foreach_ptr(session, workerPool->sessionList)
{
MultiConnection *connection = session->connection;
if (connection->connectionState == MULTI_CONNECTION_CONNECTING ||
connection->connectionState == MULTI_CONNECTION_INITIAL)
{
connection->connectionState = MULTI_CONNECTION_TIMED_OUT;
}
}
}
/*
* UsableConnectionCount returns the number of connections in the worker pool
* that are (soon to be) usable for sending commands, this includes both idle
@ -2845,6 +2879,20 @@ ConnectionStateMachine(WorkerSession *session)
break;
}
case MULTI_CONNECTION_TIMED_OUT:
{
/*
* When the connection timeout happens, the connection
* might still be able to successfuly established. However,
* the executor should not try to use this connection as
* the state machines might have already progressed and used
* new pools/sessions instead. That's why we terminate the
* connection, clear any state associated with it.
*/
connection->connectionState = MULTI_CONNECTION_FAILED;
break;
}
case MULTI_CONNECTION_CONNECTING:
{
ConnStatusType status = PQstatus(connection->pgConn);
@ -3988,8 +4036,14 @@ WorkerPoolFailed(WorkerPool *workerPool)
bool succeeded = false;
dlist_iter iter;
/* a pool cannot fail multiple times */
Assert(!workerPool->failed);
/*
* A pool cannot fail multiple times, the necessary actions
* has already be taken, so bail out.
*/
if (workerPool->failed)
{
return;
}
dlist_foreach(iter, &workerPool->pendingTaskQueue)
{
@ -4206,8 +4260,18 @@ ScheduleNextPlacementExecution(TaskPlacementExecution *placementExecution, bool
int nextPlacementExecutionIndex =
placementExecution->placementExecutionIndex + 1;
/* if all tasks failed then we should already have errored out */
Assert(nextPlacementExecutionIndex < placementExecutionCount);
/*
* If all tasks failed then we should already have errored out.
* Still, be defensive and throw error instead of crashes.
*/
if (nextPlacementExecutionIndex >= placementExecutionCount)
{
WorkerPool *workerPool = placementExecution->workerPool;
ereport(ERROR, (errmsg("execution cannot recover from multiple "
"connection failures. Last node failed "
"%s:%d", workerPool->nodeName,
workerPool->nodePort)));
}
/* get the next placement in the planning order */
nextPlacementExecution =

View File

@ -82,7 +82,8 @@ typedef enum MultiConnectionState
MULTI_CONNECTION_CONNECTING,
MULTI_CONNECTION_CONNECTED,
MULTI_CONNECTION_FAILED,
MULTI_CONNECTION_LOST
MULTI_CONNECTION_LOST,
MULTI_CONNECTION_TIMED_OUT
} MultiConnectionState;