diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index dc16db3e2..0ebd4025a 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -2405,6 +2405,7 @@ TransactionStateMachine(WorkerSession *session) else { TaskPlacementExecution *placementExecution = NULL; + bool placementExecutionStarted = false; placementExecution = PopPlacementExecution(session); if (placementExecution == NULL) @@ -2418,7 +2419,17 @@ TransactionStateMachine(WorkerSession *session) break; } - StartPlacementExecutionOnSession(placementExecution, session); + placementExecutionStarted = + StartPlacementExecutionOnSession(placementExecution, session); + if (!placementExecutionStarted) + { + /* no need to continue, connection is lost */ + Assert(session->connection->connectionState == + MULTI_CONNECTION_LOST); + + return; + } + transaction->transactionState = REMOTE_TRANS_SENT_COMMAND; } @@ -2487,6 +2498,7 @@ TransactionStateMachine(WorkerSession *session) case REMOTE_TRANS_STARTED: { TaskPlacementExecution *placementExecution = NULL; + bool placementExecutionStarted = false; placementExecution = PopPlacementExecution(session); if (placementExecution == NULL) @@ -2496,7 +2508,16 @@ TransactionStateMachine(WorkerSession *session) break; } - StartPlacementExecutionOnSession(placementExecution, session); + placementExecutionStarted = + StartPlacementExecutionOnSession(placementExecution, session); + if (!placementExecutionStarted) + { + /* no need to continue, connection is lost */ + Assert(session->connection->connectionState == MULTI_CONNECTION_LOST); + + return; + } + transaction->transactionState = REMOTE_TRANS_SENT_COMMAND; break; } @@ -2532,7 +2553,6 @@ TransactionStateMachine(WorkerSession *session) } } } - /* iterate in case we can perform multiple transitions at once */ while (transaction->transactionState != currentState); } @@ -2741,6 +2761,8 @@ StartPlacementExecutionOnSession(TaskPlacementExecution *placementExecution, /* connection is going to be in use */ workerPool->idleConnectionCount--; + session->currentTask = placementExecution; + placementExecution->executionState = PLACEMENT_EXECUTION_RUNNING; if (paramListInfo != NULL) { @@ -2774,9 +2796,6 @@ StartPlacementExecutionOnSession(TaskPlacementExecution *placementExecution, return false; } - session->currentTask = placementExecution; - placementExecution->executionState = PLACEMENT_EXECUTION_RUNNING; - return true; }