mirror of https://github.com/citusdata/citus.git
Do not raise errors in the real-time executor (#1903)
parent
6e34a8fbf4
commit
3fd65cb91b
|
@ -387,7 +387,7 @@ ManageTaskExecution(Task *task, TaskExecution *taskExecution,
|
||||||
|
|
||||||
case EXEC_TASK_FAILED:
|
case EXEC_TASK_FAILED:
|
||||||
{
|
{
|
||||||
bool raiseError = true;
|
bool raiseError = false;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* On task failure, we close the connection. We also reset our execution
|
* On task failure, we close the connection. We also reset our execution
|
||||||
|
@ -397,12 +397,15 @@ ManageTaskExecution(Task *task, TaskExecution *taskExecution,
|
||||||
*/
|
*/
|
||||||
int32 connectionId = connectionIdArray[currentIndex];
|
int32 connectionId = connectionIdArray[currentIndex];
|
||||||
MultiConnection *connection = MultiClientGetConnection(connectionId);
|
MultiConnection *connection = MultiClientGetConnection(connectionId);
|
||||||
|
bool isCritical = connection->remoteTransaction.transactionCritical;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* If this connection was previously marked as critical (e.g. it was used
|
* Mark the connection as failed in case it was already used to perform
|
||||||
* to perform a DDL command), then throw an error. Otherwise, mark it
|
* writes. We do not error out here, because the abort handler currently
|
||||||
* as failed and continue executing the query.
|
* cannot handle connections with COPY (SELECT ...) TO STDOUT commands
|
||||||
|
* in progress.
|
||||||
*/
|
*/
|
||||||
|
raiseError = false;
|
||||||
MarkRemoteTransactionFailed(connection, raiseError);
|
MarkRemoteTransactionFailed(connection, raiseError);
|
||||||
|
|
||||||
MultiClientDisconnect(connectionId);
|
MultiClientDisconnect(connectionId);
|
||||||
|
@ -411,8 +414,16 @@ ManageTaskExecution(Task *task, TaskExecution *taskExecution,
|
||||||
|
|
||||||
taskStatusArray[currentIndex] = EXEC_TASK_CONNECT_START;
|
taskStatusArray[currentIndex] = EXEC_TASK_CONNECT_START;
|
||||||
|
|
||||||
|
if (isCritical)
|
||||||
|
{
|
||||||
|
/* cannot recover when error occurs in a critical transaction */
|
||||||
|
taskExecution->criticalErrorOccurred = true;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
/* try next worker node */
|
/* try next worker node */
|
||||||
AdjustStateForFailure(taskExecution);
|
AdjustStateForFailure(taskExecution);
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Add a delay, to avoid potentially excerbating problems by
|
* Add a delay, to avoid potentially excerbating problems by
|
||||||
|
|
|
@ -249,6 +249,11 @@ CleanupTaskExecution(TaskExecution *taskExecution)
|
||||||
bool
|
bool
|
||||||
TaskExecutionFailed(TaskExecution *taskExecution)
|
TaskExecutionFailed(TaskExecution *taskExecution)
|
||||||
{
|
{
|
||||||
|
if (taskExecution->criticalErrorOccurred)
|
||||||
|
{
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
if (taskExecution->failureCount >= MAX_TASK_EXECUTION_FAILURES)
|
if (taskExecution->failureCount >= MAX_TASK_EXECUTION_FAILURES)
|
||||||
{
|
{
|
||||||
return true;
|
return true;
|
||||||
|
|
|
@ -145,6 +145,7 @@ struct TaskExecution
|
||||||
uint32 querySourceNodeIndex; /* only applies to map fetch tasks */
|
uint32 querySourceNodeIndex; /* only applies to map fetch tasks */
|
||||||
int32 dataFetchTaskIndex;
|
int32 dataFetchTaskIndex;
|
||||||
uint32 failureCount;
|
uint32 failureCount;
|
||||||
|
bool criticalErrorOccurred;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -435,6 +435,42 @@ FROM
|
||||||
) as bar
|
) as bar
|
||||||
WHERE foo.user_id = bar.user_id;
|
WHERE foo.user_id = bar.user_id;
|
||||||
ERROR: recursive CTEs are not supported in distributed queries
|
ERROR: recursive CTEs are not supported in distributed queries
|
||||||
|
-- We error-out when there's an error in execution of the query. By repeating it
|
||||||
|
-- multiple times, we increase the chance of this test failing before PR #1903.
|
||||||
|
SET client_min_messages TO ERROR;
|
||||||
|
DO $$
|
||||||
|
DECLARE
|
||||||
|
errors_received INTEGER;
|
||||||
|
BEGIN
|
||||||
|
errors_received := 0;
|
||||||
|
FOR i IN 1..3 LOOP
|
||||||
|
BEGIN
|
||||||
|
WITH cte as (
|
||||||
|
SELECT
|
||||||
|
user_id, value_2
|
||||||
|
from
|
||||||
|
events_table
|
||||||
|
)
|
||||||
|
SELECT * FROM users_table where value_2 < (
|
||||||
|
SELECT
|
||||||
|
min(cte.value_2)
|
||||||
|
FROM
|
||||||
|
cte
|
||||||
|
WHERE
|
||||||
|
users_table.user_id=cte.user_id
|
||||||
|
GROUP BY
|
||||||
|
user_id, cte.value_2);
|
||||||
|
EXCEPTION WHEN OTHERS THEN
|
||||||
|
IF SQLERRM LIKE 'failed to execute task%' THEN
|
||||||
|
errors_received := errors_received + 1;
|
||||||
|
END IF;
|
||||||
|
END;
|
||||||
|
END LOOP;
|
||||||
|
RAISE '(%/3) failed to execute one of the tasks', errors_received;
|
||||||
|
END;
|
||||||
|
$$;
|
||||||
|
ERROR: (3/3) failed to execute one of the tasks
|
||||||
|
CONTEXT: PL/pgSQL function inline_code_block line 29 at RAISE
|
||||||
SET client_min_messages TO DEFAULT;
|
SET client_min_messages TO DEFAULT;
|
||||||
DROP SCHEMA subquery_and_ctes CASCADE;
|
DROP SCHEMA subquery_and_ctes CASCADE;
|
||||||
NOTICE: drop cascades to table users_table_local
|
NOTICE: drop cascades to table users_table_local
|
||||||
|
|
|
@ -319,6 +319,40 @@ FROM
|
||||||
) as bar
|
) as bar
|
||||||
WHERE foo.user_id = bar.user_id;
|
WHERE foo.user_id = bar.user_id;
|
||||||
|
|
||||||
|
-- We error-out when there's an error in execution of the query. By repeating it
|
||||||
|
-- multiple times, we increase the chance of this test failing before PR #1903.
|
||||||
|
SET client_min_messages TO ERROR;
|
||||||
|
DO $$
|
||||||
|
DECLARE
|
||||||
|
errors_received INTEGER;
|
||||||
|
BEGIN
|
||||||
|
errors_received := 0;
|
||||||
|
FOR i IN 1..3 LOOP
|
||||||
|
BEGIN
|
||||||
|
WITH cte as (
|
||||||
|
SELECT
|
||||||
|
user_id, value_2
|
||||||
|
from
|
||||||
|
events_table
|
||||||
|
)
|
||||||
|
SELECT * FROM users_table where value_2 < (
|
||||||
|
SELECT
|
||||||
|
min(cte.value_2)
|
||||||
|
FROM
|
||||||
|
cte
|
||||||
|
WHERE
|
||||||
|
users_table.user_id=cte.user_id
|
||||||
|
GROUP BY
|
||||||
|
user_id, cte.value_2);
|
||||||
|
EXCEPTION WHEN OTHERS THEN
|
||||||
|
IF SQLERRM LIKE 'failed to execute task%' THEN
|
||||||
|
errors_received := errors_received + 1;
|
||||||
|
END IF;
|
||||||
|
END;
|
||||||
|
END LOOP;
|
||||||
|
RAISE '(%/3) failed to execute one of the tasks', errors_received;
|
||||||
|
END;
|
||||||
|
$$;
|
||||||
|
|
||||||
SET client_min_messages TO DEFAULT;
|
SET client_min_messages TO DEFAULT;
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue