From 485189c0b6d201a501997212f811d5e093bffee2 Mon Sep 17 00:00:00 2001 From: Onder Kalaci Date: Tue, 10 Sep 2019 15:03:08 +0200 Subject: [PATCH] Make sure that lost connections are handled properly Before this patch, when a connection is lost, we'd have the following situation: - Pop a task execution from readyQueue - Lost connection - Fail the session/pool. -> This step was not acting properly because we've popped the task, but not set to session->currentTask yet After the patch: - Pop a task execution from readyQueue - Immediately set it to session->currentTask - Lost connection - Fail the session/pool. -> At this step, failing the session would trigger query failures (or failovers) properly. --- .../distributed/executor/adaptive_executor.c | 31 +++++++++++++++---- 1 file changed, 25 insertions(+), 6 deletions(-) diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index ae3f23ae0..78568bb39 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -2412,6 +2412,7 @@ TransactionStateMachine(WorkerSession *session) else { TaskPlacementExecution *placementExecution = NULL; + bool placementExecutionStarted = false; placementExecution = PopPlacementExecution(session); if (placementExecution == NULL) @@ -2425,7 +2426,17 @@ TransactionStateMachine(WorkerSession *session) break; } - StartPlacementExecutionOnSession(placementExecution, session); + placementExecutionStarted = + StartPlacementExecutionOnSession(placementExecution, session); + if (!placementExecutionStarted) + { + /* no need to continue, connection is lost */ + Assert(session->connection->connectionState == + MULTI_CONNECTION_LOST); + + return; + } + transaction->transactionState = REMOTE_TRANS_SENT_COMMAND; } @@ -2494,6 +2505,7 @@ TransactionStateMachine(WorkerSession *session) case REMOTE_TRANS_STARTED: { TaskPlacementExecution *placementExecution = NULL; + bool placementExecutionStarted = false; placementExecution = PopPlacementExecution(session); if (placementExecution == NULL) @@ -2503,7 +2515,16 @@ TransactionStateMachine(WorkerSession *session) break; } - StartPlacementExecutionOnSession(placementExecution, session); + placementExecutionStarted = + StartPlacementExecutionOnSession(placementExecution, session); + if (!placementExecutionStarted) + { + /* no need to continue, connection is lost */ + Assert(session->connection->connectionState == MULTI_CONNECTION_LOST); + + return; + } + transaction->transactionState = REMOTE_TRANS_SENT_COMMAND; break; } @@ -2539,7 +2560,6 @@ TransactionStateMachine(WorkerSession *session) } } } - /* iterate in case we can perform multiple transitions at once */ while (transaction->transactionState != currentState); } @@ -2748,6 +2768,8 @@ StartPlacementExecutionOnSession(TaskPlacementExecution *placementExecution, /* connection is going to be in use */ workerPool->idleConnectionCount--; + session->currentTask = placementExecution; + placementExecution->executionState = PLACEMENT_EXECUTION_RUNNING; if (paramListInfo != NULL) { @@ -2781,9 +2803,6 @@ StartPlacementExecutionOnSession(TaskPlacementExecution *placementExecution, return false; } - session->currentTask = placementExecution; - placementExecution->executionState = PLACEMENT_EXECUTION_RUNNING; - return true; }