diff --git a/src/backend/distributed/executor/multi_client_executor.c b/src/backend/distributed/executor/multi_client_executor.c index a28d1008c..a525be8e5 100644 --- a/src/backend/distributed/executor/multi_client_executor.c +++ b/src/backend/distributed/executor/multi_client_executor.c @@ -283,6 +283,14 @@ MultiClientConnectPoll(int32 connectionId) MultiConnection * MultiClientGetConnection(int32 connectionId) { + if (connectionId == INVALID_CONNECTION_ID) + { + return NULL; + } + + Assert(connectionId >= 0); + Assert(connectionId < MAX_CONNECTION_COUNT); + return ClientConnectionArray[connectionId]; } diff --git a/src/backend/distributed/executor/multi_real_time_executor.c b/src/backend/distributed/executor/multi_real_time_executor.c index 4d4f94993..84686d1f2 100644 --- a/src/backend/distributed/executor/multi_real_time_executor.c +++ b/src/backend/distributed/executor/multi_real_time_executor.c @@ -409,6 +409,7 @@ ManageTaskExecution(Task *task, TaskExecution *taskExecution, case EXEC_TASK_FAILED: { bool raiseError = false; + bool isCritical = false; /* * On task failure, we close the connection. We also reset our execution @@ -418,7 +419,35 @@ ManageTaskExecution(Task *task, TaskExecution *taskExecution, */ int32 connectionId = connectionIdArray[currentIndex]; MultiConnection *connection = MultiClientGetConnection(connectionId); - bool isCritical = connection->remoteTransaction.transactionCritical; + + /* next time we try this worker node, start from the beginning */ + taskStatusArray[currentIndex] = EXEC_TASK_CONNECT_START; + + /* try next worker node */ + AdjustStateForFailure(taskExecution); + + /* + * Add a delay in MultiClientWait, to avoid potentially excerbating problems + * by looping quickly + */ + *executionStatus = TASK_STATUS_ERROR; + + if (connection == NULL) + { + /* + * The task failed before we even managed to connect. This happens when + * the metadata is out of sync due to a rebalance. It may be that only + * one placement was moved, in that case the other one might still work. + */ + break; + } + + isCritical = connection->remoteTransaction.transactionCritical; + if (isCritical) + { + /* cannot recover when error occurs in a critical transaction */ + taskExecution->criticalErrorOccurred = true; + } /* * Mark the connection as failed in case it was already used to perform @@ -431,27 +460,9 @@ ManageTaskExecution(Task *task, TaskExecution *taskExecution, MultiClientDisconnect(connectionId); connectionIdArray[currentIndex] = INVALID_CONNECTION_ID; + connectAction = CONNECT_ACTION_CLOSED; - taskStatusArray[currentIndex] = EXEC_TASK_CONNECT_START; - - if (isCritical) - { - /* cannot recover when error occurs in a critical transaction */ - taskExecution->criticalErrorOccurred = true; - } - else - { - /* try next worker node */ - AdjustStateForFailure(taskExecution); - } - - /* - * Add a delay, to avoid potentially excerbating problems by - * looping quickly - */ - *executionStatus = TASK_STATUS_ERROR; - break; }