Make sure that lost connections are handled properly

Before this patch, when a connection is lost, we'd have the following
situation:

    - Pop a task execution from readyQueue
    - Lost connection
    - Fail the session/pool. -> This step was not acting properly
      because we've popped the task, but not set to session->currentTask
      yet

After the patch:

    - Pop a task execution from readyQueue
    - Immediately set it to session->currentTask
    - Lost connection
    - Fail the session/pool. -> At this step, failing the
      session would trigger query failures (or failovers)
      properly.

(cherry picked from commit 485189c0b6)
release-8.3
Onder Kalaci 2019-09-10 15:03:08 +02:00
parent 28a8f3fb95
commit 1ae03192d2
1 changed files with 25 additions and 6 deletions

View File

@ -2405,6 +2405,7 @@ TransactionStateMachine(WorkerSession *session)
else else
{ {
TaskPlacementExecution *placementExecution = NULL; TaskPlacementExecution *placementExecution = NULL;
bool placementExecutionStarted = false;
placementExecution = PopPlacementExecution(session); placementExecution = PopPlacementExecution(session);
if (placementExecution == NULL) if (placementExecution == NULL)
@ -2418,7 +2419,17 @@ TransactionStateMachine(WorkerSession *session)
break; break;
} }
StartPlacementExecutionOnSession(placementExecution, session); placementExecutionStarted =
StartPlacementExecutionOnSession(placementExecution, session);
if (!placementExecutionStarted)
{
/* no need to continue, connection is lost */
Assert(session->connection->connectionState ==
MULTI_CONNECTION_LOST);
return;
}
transaction->transactionState = REMOTE_TRANS_SENT_COMMAND; transaction->transactionState = REMOTE_TRANS_SENT_COMMAND;
} }
@ -2487,6 +2498,7 @@ TransactionStateMachine(WorkerSession *session)
case REMOTE_TRANS_STARTED: case REMOTE_TRANS_STARTED:
{ {
TaskPlacementExecution *placementExecution = NULL; TaskPlacementExecution *placementExecution = NULL;
bool placementExecutionStarted = false;
placementExecution = PopPlacementExecution(session); placementExecution = PopPlacementExecution(session);
if (placementExecution == NULL) if (placementExecution == NULL)
@ -2496,7 +2508,16 @@ TransactionStateMachine(WorkerSession *session)
break; break;
} }
StartPlacementExecutionOnSession(placementExecution, session); placementExecutionStarted =
StartPlacementExecutionOnSession(placementExecution, session);
if (!placementExecutionStarted)
{
/* no need to continue, connection is lost */
Assert(session->connection->connectionState == MULTI_CONNECTION_LOST);
return;
}
transaction->transactionState = REMOTE_TRANS_SENT_COMMAND; transaction->transactionState = REMOTE_TRANS_SENT_COMMAND;
break; break;
} }
@ -2532,7 +2553,6 @@ TransactionStateMachine(WorkerSession *session)
} }
} }
} }
/* iterate in case we can perform multiple transitions at once */ /* iterate in case we can perform multiple transitions at once */
while (transaction->transactionState != currentState); while (transaction->transactionState != currentState);
} }
@ -2741,6 +2761,8 @@ StartPlacementExecutionOnSession(TaskPlacementExecution *placementExecution,
/* connection is going to be in use */ /* connection is going to be in use */
workerPool->idleConnectionCount--; workerPool->idleConnectionCount--;
session->currentTask = placementExecution;
placementExecution->executionState = PLACEMENT_EXECUTION_RUNNING;
if (paramListInfo != NULL) if (paramListInfo != NULL)
{ {
@ -2774,9 +2796,6 @@ StartPlacementExecutionOnSession(TaskPlacementExecution *placementExecution,
return false; return false;
} }
session->currentTask = placementExecution;
placementExecution->executionState = PLACEMENT_EXECUTION_RUNNING;
return true; return true;
} }