From 1ae03192d2dba9882e5df81adf89db11f0cf3b4e Mon Sep 17 00:00:00 2001 From: Onder Kalaci Date: Tue, 10 Sep 2019 15:03:08 +0200 Subject: [PATCH] Make sure that lost connections are handled properly Before this patch, when a connection is lost, we'd have the following situation: - Pop a task execution from readyQueue - Lost connection - Fail the session/pool. -> This step was not acting properly because we've popped the task, but not set to session->currentTask yet After the patch: - Pop a task execution from readyQueue - Immediately set it to session->currentTask - Lost connection - Fail the session/pool. -> At this step, failing the session would trigger query failures (or failovers) properly. (cherry picked from commit 485189c0b6d201a501997212f811d5e093bffee2) --- .../distributed/executor/adaptive_executor.c | 31 +++++++++++++++---- 1 file changed, 25 insertions(+), 6 deletions(-) diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index dc16db3e2..0ebd4025a 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -2405,6 +2405,7 @@ TransactionStateMachine(WorkerSession *session) else { TaskPlacementExecution *placementExecution = NULL; + bool placementExecutionStarted = false; placementExecution = PopPlacementExecution(session); if (placementExecution == NULL) @@ -2418,7 +2419,17 @@ TransactionStateMachine(WorkerSession *session) break; } - StartPlacementExecutionOnSession(placementExecution, session); + placementExecutionStarted = + StartPlacementExecutionOnSession(placementExecution, session); + if (!placementExecutionStarted) + { + /* no need to continue, connection is lost */ + Assert(session->connection->connectionState == + MULTI_CONNECTION_LOST); + + return; + } + transaction->transactionState = REMOTE_TRANS_SENT_COMMAND; } @@ -2487,6 +2498,7 @@ TransactionStateMachine(WorkerSession *session) case REMOTE_TRANS_STARTED: { TaskPlacementExecution *placementExecution = NULL; + bool placementExecutionStarted = false; placementExecution = PopPlacementExecution(session); if (placementExecution == NULL) @@ -2496,7 +2508,16 @@ TransactionStateMachine(WorkerSession *session) break; } - StartPlacementExecutionOnSession(placementExecution, session); + placementExecutionStarted = + StartPlacementExecutionOnSession(placementExecution, session); + if (!placementExecutionStarted) + { + /* no need to continue, connection is lost */ + Assert(session->connection->connectionState == MULTI_CONNECTION_LOST); + + return; + } + transaction->transactionState = REMOTE_TRANS_SENT_COMMAND; break; } @@ -2532,7 +2553,6 @@ TransactionStateMachine(WorkerSession *session) } } } - /* iterate in case we can perform multiple transitions at once */ while (transaction->transactionState != currentState); } @@ -2741,6 +2761,8 @@ StartPlacementExecutionOnSession(TaskPlacementExecution *placementExecution, /* connection is going to be in use */ workerPool->idleConnectionCount--; + session->currentTask = placementExecution; + placementExecution->executionState = PLACEMENT_EXECUTION_RUNNING; if (paramListInfo != NULL) { @@ -2774,9 +2796,6 @@ StartPlacementExecutionOnSession(TaskPlacementExecution *placementExecution, return false; } - session->currentTask = placementExecution; - placementExecution->executionState = PLACEMENT_EXECUTION_RUNNING; - return true; }