diff --git a/src/backend/distributed/executor/multi_real_time_executor.c b/src/backend/distributed/executor/multi_real_time_executor.c index ca1a40c62..c13c41a78 100644 --- a/src/backend/distributed/executor/multi_real_time_executor.c +++ b/src/backend/distributed/executor/multi_real_time_executor.c @@ -387,7 +387,7 @@ ManageTaskExecution(Task *task, TaskExecution *taskExecution, case EXEC_TASK_FAILED: { - bool raiseError = true; + bool raiseError = false; /* * On task failure, we close the connection. We also reset our execution @@ -397,12 +397,15 @@ ManageTaskExecution(Task *task, TaskExecution *taskExecution, */ int32 connectionId = connectionIdArray[currentIndex]; MultiConnection *connection = MultiClientGetConnection(connectionId); + bool isCritical = connection->remoteTransaction.transactionCritical; /* - * If this connection was previously marked as critical (e.g. it was used - * to perform a DDL command), then throw an error. Otherwise, mark it - * as failed and continue executing the query. + * Mark the connection as failed in case it was already used to perform + * writes. We do not error out here, because the abort handler currently + * cannot handle connections with COPY (SELECT ...) TO STDOUT commands + * in progress. */ + raiseError = false; MarkRemoteTransactionFailed(connection, raiseError); MultiClientDisconnect(connectionId); @@ -411,8 +414,16 @@ ManageTaskExecution(Task *task, TaskExecution *taskExecution, taskStatusArray[currentIndex] = EXEC_TASK_CONNECT_START; - /* try next worker node */ - AdjustStateForFailure(taskExecution); + if (isCritical) + { + /* cannot recover when error occurs in a critical transaction */ + taskExecution->criticalErrorOccurred = true; + } + else + { + /* try next worker node */ + AdjustStateForFailure(taskExecution); + } /* * Add a delay, to avoid potentially excerbating problems by diff --git a/src/backend/distributed/executor/multi_server_executor.c b/src/backend/distributed/executor/multi_server_executor.c index b1196eff2..46d408fe8 100644 --- a/src/backend/distributed/executor/multi_server_executor.c +++ b/src/backend/distributed/executor/multi_server_executor.c @@ -249,6 +249,11 @@ CleanupTaskExecution(TaskExecution *taskExecution) bool TaskExecutionFailed(TaskExecution *taskExecution) { + if (taskExecution->criticalErrorOccurred) + { + return true; + } + if (taskExecution->failureCount >= MAX_TASK_EXECUTION_FAILURES) { return true; diff --git a/src/include/distributed/multi_server_executor.h b/src/include/distributed/multi_server_executor.h index 6f2ef0c28..66e5dbec6 100644 --- a/src/include/distributed/multi_server_executor.h +++ b/src/include/distributed/multi_server_executor.h @@ -145,6 +145,7 @@ struct TaskExecution uint32 querySourceNodeIndex; /* only applies to map fetch tasks */ int32 dataFetchTaskIndex; uint32 failureCount; + bool criticalErrorOccurred; }; diff --git a/src/test/regress/expected/subquery_and_cte.out b/src/test/regress/expected/subquery_and_cte.out index 1a785f9ff..e2c63a7a2 100644 --- a/src/test/regress/expected/subquery_and_cte.out +++ b/src/test/regress/expected/subquery_and_cte.out @@ -435,6 +435,42 @@ FROM ) as bar WHERE foo.user_id = bar.user_id; ERROR: recursive CTEs are not supported in distributed queries +-- We error-out when there's an error in execution of the query. By repeating it +-- multiple times, we increase the chance of this test failing before PR #1903. +SET client_min_messages TO ERROR; +DO $$ +DECLARE + errors_received INTEGER; +BEGIN +errors_received := 0; +FOR i IN 1..3 LOOP + BEGIN + WITH cte as ( + SELECT + user_id, value_2 + from + events_table + ) + SELECT * FROM users_table where value_2 < ( + SELECT + min(cte.value_2) + FROM + cte + WHERE + users_table.user_id=cte.user_id + GROUP BY + user_id, cte.value_2); + EXCEPTION WHEN OTHERS THEN + IF SQLERRM LIKE 'failed to execute task%' THEN + errors_received := errors_received + 1; + END IF; + END; +END LOOP; +RAISE '(%/3) failed to execute one of the tasks', errors_received; +END; +$$; +ERROR: (3/3) failed to execute one of the tasks +CONTEXT: PL/pgSQL function inline_code_block line 29 at RAISE SET client_min_messages TO DEFAULT; DROP SCHEMA subquery_and_ctes CASCADE; NOTICE: drop cascades to table users_table_local diff --git a/src/test/regress/sql/subquery_and_cte.sql b/src/test/regress/sql/subquery_and_cte.sql index 7636a921b..3ec23cb39 100644 --- a/src/test/regress/sql/subquery_and_cte.sql +++ b/src/test/regress/sql/subquery_and_cte.sql @@ -319,6 +319,40 @@ FROM ) as bar WHERE foo.user_id = bar.user_id; +-- We error-out when there's an error in execution of the query. By repeating it +-- multiple times, we increase the chance of this test failing before PR #1903. +SET client_min_messages TO ERROR; +DO $$ +DECLARE + errors_received INTEGER; +BEGIN +errors_received := 0; +FOR i IN 1..3 LOOP + BEGIN + WITH cte as ( + SELECT + user_id, value_2 + from + events_table + ) + SELECT * FROM users_table where value_2 < ( + SELECT + min(cte.value_2) + FROM + cte + WHERE + users_table.user_id=cte.user_id + GROUP BY + user_id, cte.value_2); + EXCEPTION WHEN OTHERS THEN + IF SQLERRM LIKE 'failed to execute task%' THEN + errors_received := errors_received + 1; + END IF; + END; +END LOOP; +RAISE '(%/3) failed to execute one of the tasks', errors_received; +END; +$$; SET client_min_messages TO DEFAULT;