From 7c3c29e5057a8d041ea1239fad79662e9d557c5c Mon Sep 17 00:00:00 2001 From: Marco Slot Date: Thu, 19 Jul 2018 13:51:14 +0200 Subject: [PATCH] Don't try to check unopened connection in EXEC_TASK_FAILED state --- .../executor/multi_client_executor.c | 8 +++ .../executor/multi_real_time_executor.c | 51 +++++++++++-------- 2 files changed, 39 insertions(+), 20 deletions(-) diff --git a/src/backend/distributed/executor/multi_client_executor.c b/src/backend/distributed/executor/multi_client_executor.c index 118104299..8651ad741 100644 --- a/src/backend/distributed/executor/multi_client_executor.c +++ b/src/backend/distributed/executor/multi_client_executor.c @@ -294,6 +294,14 @@ MultiClientConnectPoll(int32 connectionId) MultiConnection * MultiClientGetConnection(int32 connectionId) { + if (connectionId == INVALID_CONNECTION_ID) + { + return NULL; + } + + Assert(connectionId >= 0); + Assert(connectionId < MAX_CONNECTION_COUNT); + return ClientConnectionArray[connectionId]; } diff --git a/src/backend/distributed/executor/multi_real_time_executor.c b/src/backend/distributed/executor/multi_real_time_executor.c index 9ce102ad0..21f9fd276 100644 --- a/src/backend/distributed/executor/multi_real_time_executor.c +++ b/src/backend/distributed/executor/multi_real_time_executor.c @@ -425,6 +425,7 @@ ManageTaskExecution(Task *task, TaskExecution *taskExecution, case EXEC_TASK_FAILED: { bool raiseError = false; + bool isCritical = false; /* * On task failure, we close the connection. We also reset our execution @@ -434,7 +435,35 @@ ManageTaskExecution(Task *task, TaskExecution *taskExecution, */ int32 connectionId = connectionIdArray[currentIndex]; MultiConnection *connection = MultiClientGetConnection(connectionId); - bool isCritical = connection->remoteTransaction.transactionCritical; + + /* next time we try this worker node, start from the beginning */ + taskStatusArray[currentIndex] = EXEC_TASK_CONNECT_START; + + /* try next worker node */ + AdjustStateForFailure(taskExecution); + + /* + * Add a delay in MultiClientWait, to avoid potentially excerbating problems + * by looping quickly + */ + *executionStatus = TASK_STATUS_ERROR; + + if (connection == NULL) + { + /* + * The task failed before we even managed to connect. This happens when + * the metadata is out of sync due to a rebalance. It may be that only + * one placement was moved, in that case the other one might still work. + */ + break; + } + + isCritical = connection->remoteTransaction.transactionCritical; + if (isCritical) + { + /* cannot recover when error occurs in a critical transaction */ + taskExecution->criticalErrorOccurred = true; + } /* * Mark the connection as failed in case it was already used to perform @@ -447,27 +476,9 @@ ManageTaskExecution(Task *task, TaskExecution *taskExecution, MultiClientDisconnect(connectionId); connectionIdArray[currentIndex] = INVALID_CONNECTION_ID; + connectAction = CONNECT_ACTION_CLOSED; - taskStatusArray[currentIndex] = EXEC_TASK_CONNECT_START; - - if (isCritical) - { - /* cannot recover when error occurs in a critical transaction */ - taskExecution->criticalErrorOccurred = true; - } - else - { - /* try next worker node */ - AdjustStateForFailure(taskExecution); - } - - /* - * Add a delay, to avoid potentially excerbating problems by - * looping quickly - */ - *executionStatus = TASK_STATUS_ERROR; - break; }