diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index ae3f23ae0..78568bb39 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -2412,6 +2412,7 @@ TransactionStateMachine(WorkerSession *session) else { TaskPlacementExecution *placementExecution = NULL; + bool placementExecutionStarted = false; placementExecution = PopPlacementExecution(session); if (placementExecution == NULL) @@ -2425,7 +2426,17 @@ TransactionStateMachine(WorkerSession *session) break; } - StartPlacementExecutionOnSession(placementExecution, session); + placementExecutionStarted = + StartPlacementExecutionOnSession(placementExecution, session); + if (!placementExecutionStarted) + { + /* no need to continue, connection is lost */ + Assert(session->connection->connectionState == + MULTI_CONNECTION_LOST); + + return; + } + transaction->transactionState = REMOTE_TRANS_SENT_COMMAND; } @@ -2494,6 +2505,7 @@ TransactionStateMachine(WorkerSession *session) case REMOTE_TRANS_STARTED: { TaskPlacementExecution *placementExecution = NULL; + bool placementExecutionStarted = false; placementExecution = PopPlacementExecution(session); if (placementExecution == NULL) @@ -2503,7 +2515,16 @@ TransactionStateMachine(WorkerSession *session) break; } - StartPlacementExecutionOnSession(placementExecution, session); + placementExecutionStarted = + StartPlacementExecutionOnSession(placementExecution, session); + if (!placementExecutionStarted) + { + /* no need to continue, connection is lost */ + Assert(session->connection->connectionState == MULTI_CONNECTION_LOST); + + return; + } + transaction->transactionState = REMOTE_TRANS_SENT_COMMAND; break; } @@ -2539,7 +2560,6 @@ TransactionStateMachine(WorkerSession *session) } } } - /* iterate in case we can perform multiple transitions at once */ while (transaction->transactionState != currentState); } @@ -2748,6 +2768,8 @@ StartPlacementExecutionOnSession(TaskPlacementExecution *placementExecution, /* connection is going to be in use */ workerPool->idleConnectionCount--; + session->currentTask = placementExecution; + placementExecution->executionState = PLACEMENT_EXECUTION_RUNNING; if (paramListInfo != NULL) { @@ -2781,9 +2803,6 @@ StartPlacementExecutionOnSession(TaskPlacementExecution *placementExecution, return false; } - session->currentTask = placementExecution; - placementExecution->executionState = PLACEMENT_EXECUTION_RUNNING; - return true; }