Merge pull request #2942 from citusdata/fix_adaptive_bug

Make sure that lost connections are handled properly in adaptive executor
pull/2941/head^2
Önder Kalacı 2019-09-10 18:01:17 +02:00 committed by GitHub
commit dd4e767702
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 25 additions and 6 deletions

View File

@ -2412,6 +2412,7 @@ TransactionStateMachine(WorkerSession *session)
else
{
TaskPlacementExecution *placementExecution = NULL;
bool placementExecutionStarted = false;
placementExecution = PopPlacementExecution(session);
if (placementExecution == NULL)
@ -2425,7 +2426,17 @@ TransactionStateMachine(WorkerSession *session)
break;
}
StartPlacementExecutionOnSession(placementExecution, session);
placementExecutionStarted =
StartPlacementExecutionOnSession(placementExecution, session);
if (!placementExecutionStarted)
{
/* no need to continue, connection is lost */
Assert(session->connection->connectionState ==
MULTI_CONNECTION_LOST);
return;
}
transaction->transactionState = REMOTE_TRANS_SENT_COMMAND;
}
@ -2494,6 +2505,7 @@ TransactionStateMachine(WorkerSession *session)
case REMOTE_TRANS_STARTED:
{
TaskPlacementExecution *placementExecution = NULL;
bool placementExecutionStarted = false;
placementExecution = PopPlacementExecution(session);
if (placementExecution == NULL)
@ -2503,7 +2515,16 @@ TransactionStateMachine(WorkerSession *session)
break;
}
StartPlacementExecutionOnSession(placementExecution, session);
placementExecutionStarted =
StartPlacementExecutionOnSession(placementExecution, session);
if (!placementExecutionStarted)
{
/* no need to continue, connection is lost */
Assert(session->connection->connectionState == MULTI_CONNECTION_LOST);
return;
}
transaction->transactionState = REMOTE_TRANS_SENT_COMMAND;
break;
}
@ -2539,7 +2560,6 @@ TransactionStateMachine(WorkerSession *session)
}
}
}
/* iterate in case we can perform multiple transitions at once */
while (transaction->transactionState != currentState);
}
@ -2748,6 +2768,8 @@ StartPlacementExecutionOnSession(TaskPlacementExecution *placementExecution,
/* connection is going to be in use */
workerPool->idleConnectionCount--;
session->currentTask = placementExecution;
placementExecution->executionState = PLACEMENT_EXECUTION_RUNNING;
if (paramListInfo != NULL)
{
@ -2781,9 +2803,6 @@ StartPlacementExecutionOnSession(TaskPlacementExecution *placementExecution,
return false;
}
session->currentTask = placementExecution;
placementExecution->executionState = PLACEMENT_EXECUTION_RUNNING;
return true;
}