Merge pull request #2299 from citusdata/fix_real_time

Don't try to check unopened connection in EXEC_TASK_FAILED state

cr: @jasonmp85
pull/2296/head
Jason Petersen 2018-07-23 12:12:49 -06:00 committed by GitHub
commit 21114620a3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 39 additions and 20 deletions

View File

@ -294,6 +294,14 @@ MultiClientConnectPoll(int32 connectionId)
MultiConnection *
MultiClientGetConnection(int32 connectionId)
{
if (connectionId == INVALID_CONNECTION_ID)
{
return NULL;
}
Assert(connectionId >= 0);
Assert(connectionId < MAX_CONNECTION_COUNT);
return ClientConnectionArray[connectionId];
}

View File

@ -425,6 +425,7 @@ ManageTaskExecution(Task *task, TaskExecution *taskExecution,
case EXEC_TASK_FAILED:
{
bool raiseError = false;
bool isCritical = false;
/*
* On task failure, we close the connection. We also reset our execution
@ -434,7 +435,35 @@ ManageTaskExecution(Task *task, TaskExecution *taskExecution,
*/
int32 connectionId = connectionIdArray[currentIndex];
MultiConnection *connection = MultiClientGetConnection(connectionId);
bool isCritical = connection->remoteTransaction.transactionCritical;
/* next time we try this worker node, start from the beginning */
taskStatusArray[currentIndex] = EXEC_TASK_CONNECT_START;
/* try next worker node */
AdjustStateForFailure(taskExecution);
/*
* Add a delay in MultiClientWait, to avoid potentially excerbating problems
* by looping quickly
*/
*executionStatus = TASK_STATUS_ERROR;
if (connection == NULL)
{
/*
* The task failed before we even managed to connect. This happens when
* the metadata is out of sync due to a rebalance. It may be that only
* one placement was moved, in that case the other one might still work.
*/
break;
}
isCritical = connection->remoteTransaction.transactionCritical;
if (isCritical)
{
/* cannot recover when error occurs in a critical transaction */
taskExecution->criticalErrorOccurred = true;
}
/*
* Mark the connection as failed in case it was already used to perform
@ -447,27 +476,9 @@ ManageTaskExecution(Task *task, TaskExecution *taskExecution,
MultiClientDisconnect(connectionId);
connectionIdArray[currentIndex] = INVALID_CONNECTION_ID;
connectAction = CONNECT_ACTION_CLOSED;
taskStatusArray[currentIndex] = EXEC_TASK_CONNECT_START;
if (isCritical)
{
/* cannot recover when error occurs in a critical transaction */
taskExecution->criticalErrorOccurred = true;
}
else
{
/* try next worker node */
AdjustStateForFailure(taskExecution);
}
/*
* Add a delay, to avoid potentially excerbating problems by
* looping quickly
*/
*executionStatus = TASK_STATUS_ERROR;
break;
}