mirror of https://github.com/citusdata/citus.git
Don't try to check unopened connection in EXEC_TASK_FAILED state
parent
416738374a
commit
7c3c29e505
|
@ -294,6 +294,14 @@ MultiClientConnectPoll(int32 connectionId)
|
||||||
MultiConnection *
|
MultiConnection *
|
||||||
MultiClientGetConnection(int32 connectionId)
|
MultiClientGetConnection(int32 connectionId)
|
||||||
{
|
{
|
||||||
|
if (connectionId == INVALID_CONNECTION_ID)
|
||||||
|
{
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
Assert(connectionId >= 0);
|
||||||
|
Assert(connectionId < MAX_CONNECTION_COUNT);
|
||||||
|
|
||||||
return ClientConnectionArray[connectionId];
|
return ClientConnectionArray[connectionId];
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -425,6 +425,7 @@ ManageTaskExecution(Task *task, TaskExecution *taskExecution,
|
||||||
case EXEC_TASK_FAILED:
|
case EXEC_TASK_FAILED:
|
||||||
{
|
{
|
||||||
bool raiseError = false;
|
bool raiseError = false;
|
||||||
|
bool isCritical = false;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* On task failure, we close the connection. We also reset our execution
|
* On task failure, we close the connection. We also reset our execution
|
||||||
|
@ -434,7 +435,35 @@ ManageTaskExecution(Task *task, TaskExecution *taskExecution,
|
||||||
*/
|
*/
|
||||||
int32 connectionId = connectionIdArray[currentIndex];
|
int32 connectionId = connectionIdArray[currentIndex];
|
||||||
MultiConnection *connection = MultiClientGetConnection(connectionId);
|
MultiConnection *connection = MultiClientGetConnection(connectionId);
|
||||||
bool isCritical = connection->remoteTransaction.transactionCritical;
|
|
||||||
|
/* next time we try this worker node, start from the beginning */
|
||||||
|
taskStatusArray[currentIndex] = EXEC_TASK_CONNECT_START;
|
||||||
|
|
||||||
|
/* try next worker node */
|
||||||
|
AdjustStateForFailure(taskExecution);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Add a delay in MultiClientWait, to avoid potentially excerbating problems
|
||||||
|
* by looping quickly
|
||||||
|
*/
|
||||||
|
*executionStatus = TASK_STATUS_ERROR;
|
||||||
|
|
||||||
|
if (connection == NULL)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* The task failed before we even managed to connect. This happens when
|
||||||
|
* the metadata is out of sync due to a rebalance. It may be that only
|
||||||
|
* one placement was moved, in that case the other one might still work.
|
||||||
|
*/
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
isCritical = connection->remoteTransaction.transactionCritical;
|
||||||
|
if (isCritical)
|
||||||
|
{
|
||||||
|
/* cannot recover when error occurs in a critical transaction */
|
||||||
|
taskExecution->criticalErrorOccurred = true;
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Mark the connection as failed in case it was already used to perform
|
* Mark the connection as failed in case it was already used to perform
|
||||||
|
@ -447,27 +476,9 @@ ManageTaskExecution(Task *task, TaskExecution *taskExecution,
|
||||||
|
|
||||||
MultiClientDisconnect(connectionId);
|
MultiClientDisconnect(connectionId);
|
||||||
connectionIdArray[currentIndex] = INVALID_CONNECTION_ID;
|
connectionIdArray[currentIndex] = INVALID_CONNECTION_ID;
|
||||||
|
|
||||||
connectAction = CONNECT_ACTION_CLOSED;
|
connectAction = CONNECT_ACTION_CLOSED;
|
||||||
|
|
||||||
taskStatusArray[currentIndex] = EXEC_TASK_CONNECT_START;
|
|
||||||
|
|
||||||
if (isCritical)
|
|
||||||
{
|
|
||||||
/* cannot recover when error occurs in a critical transaction */
|
|
||||||
taskExecution->criticalErrorOccurred = true;
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
/* try next worker node */
|
|
||||||
AdjustStateForFailure(taskExecution);
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
|
||||||
* Add a delay, to avoid potentially excerbating problems by
|
|
||||||
* looping quickly
|
|
||||||
*/
|
|
||||||
*executionStatus = TASK_STATUS_ERROR;
|
|
||||||
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue