mirror of https://github.com/citusdata/citus.git
Don't try to check unopened connection in EXEC_TASK_FAILED state
parent
e494f172a1
commit
adabcd087a
|
@ -283,6 +283,14 @@ MultiClientConnectPoll(int32 connectionId)
|
|||
MultiConnection *
|
||||
MultiClientGetConnection(int32 connectionId)
|
||||
{
|
||||
if (connectionId == INVALID_CONNECTION_ID)
|
||||
{
|
||||
return NULL;
|
||||
}
|
||||
|
||||
Assert(connectionId >= 0);
|
||||
Assert(connectionId < MAX_CONNECTION_COUNT);
|
||||
|
||||
return ClientConnectionArray[connectionId];
|
||||
}
|
||||
|
||||
|
|
|
@ -409,6 +409,7 @@ ManageTaskExecution(Task *task, TaskExecution *taskExecution,
|
|||
case EXEC_TASK_FAILED:
|
||||
{
|
||||
bool raiseError = false;
|
||||
bool isCritical = false;
|
||||
|
||||
/*
|
||||
* On task failure, we close the connection. We also reset our execution
|
||||
|
@ -418,7 +419,35 @@ ManageTaskExecution(Task *task, TaskExecution *taskExecution,
|
|||
*/
|
||||
int32 connectionId = connectionIdArray[currentIndex];
|
||||
MultiConnection *connection = MultiClientGetConnection(connectionId);
|
||||
bool isCritical = connection->remoteTransaction.transactionCritical;
|
||||
|
||||
/* next time we try this worker node, start from the beginning */
|
||||
taskStatusArray[currentIndex] = EXEC_TASK_CONNECT_START;
|
||||
|
||||
/* try next worker node */
|
||||
AdjustStateForFailure(taskExecution);
|
||||
|
||||
/*
|
||||
* Add a delay in MultiClientWait, to avoid potentially excerbating problems
|
||||
* by looping quickly
|
||||
*/
|
||||
*executionStatus = TASK_STATUS_ERROR;
|
||||
|
||||
if (connection == NULL)
|
||||
{
|
||||
/*
|
||||
* The task failed before we even managed to connect. This happens when
|
||||
* the metadata is out of sync due to a rebalance. It may be that only
|
||||
* one placement was moved, in that case the other one might still work.
|
||||
*/
|
||||
break;
|
||||
}
|
||||
|
||||
isCritical = connection->remoteTransaction.transactionCritical;
|
||||
if (isCritical)
|
||||
{
|
||||
/* cannot recover when error occurs in a critical transaction */
|
||||
taskExecution->criticalErrorOccurred = true;
|
||||
}
|
||||
|
||||
/*
|
||||
* Mark the connection as failed in case it was already used to perform
|
||||
|
@ -431,27 +460,9 @@ ManageTaskExecution(Task *task, TaskExecution *taskExecution,
|
|||
|
||||
MultiClientDisconnect(connectionId);
|
||||
connectionIdArray[currentIndex] = INVALID_CONNECTION_ID;
|
||||
|
||||
connectAction = CONNECT_ACTION_CLOSED;
|
||||
|
||||
taskStatusArray[currentIndex] = EXEC_TASK_CONNECT_START;
|
||||
|
||||
if (isCritical)
|
||||
{
|
||||
/* cannot recover when error occurs in a critical transaction */
|
||||
taskExecution->criticalErrorOccurred = true;
|
||||
}
|
||||
else
|
||||
{
|
||||
/* try next worker node */
|
||||
AdjustStateForFailure(taskExecution);
|
||||
}
|
||||
|
||||
/*
|
||||
* Add a delay, to avoid potentially excerbating problems by
|
||||
* looping quickly
|
||||
*/
|
||||
*executionStatus = TASK_STATUS_ERROR;
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue