Do not error when an intermediate file does not exit (#3707)

When the file does not exist, it could mean two different things.
First -- and a lot more common -- case is that a failure happened
in a concurrent backend on the same distributed transaction. And,
one of the backends in that transaction has already been roll
backed, which has already removed the file. If we throw an error
here, the user might see this error instead of the actual error
message. Instead, we prefer to WARN the user and pretend that the
file has no data in it. In the end, the user would see the actual
error message for the failure.

Second, in case of any bugs in intermediate result broadcasts,
we could try to read a non-existing file. That is most likely
to happen during development. Thus, when asserts enabled, we throw
an error instead of WARNING so that the developers cannot miss.
pull/3703/head
Önder Kalacı 2020-04-07 16:31:10 +02:00 committed by Onder Kalaci
parent a695b44ce9
commit 70012dfd33
3 changed files with 94 additions and 63 deletions

View File

@ -795,11 +795,32 @@ ReadIntermediateResultsIntoFuncOutput(FunctionCallInfo fcinfo, char *copyFormat,
int statOK = stat(resultFileName, &fileStat);
if (statOK != 0)
{
ereport(ERROR, (errcode_for_file_access(),
errmsg("result \"%s\" does not exist", resultId)));
/*
* When the file does not exist, it could mean two different things.
* First -- and a lot more common -- case is that a failure happened
* in a concurrent backend on the same distributed transaction. And,
* one of the backends in that transaction has already been roll
* backed, which has already removed the file. If we throw an error
* here, the user might see this error instead of the actual error
* message. Instead, we prefer to WARN the user and pretend that the
* file has no data in it. In the end, the user would see the actual
* error message for the failure.
*
* Second, in case of any bugs in intermediate result broadcasts,
* we could try to read a non-existing file. That is most likely
* to happen during development.
*/
ereport(WARNING, (errcode_for_file_access(),
errmsg("Query could not find the intermediate result file "
"\"%s\", it was mostly likely deleted due to an "
"error in a parallel process within the same "
"distributed transaction", resultId)));
}
else
{
ReadFileIntoTupleStore(resultFileName, copyFormat, tupleDescriptor,
tupleStore);
}
ReadFileIntoTupleStore(resultFileName, copyFormat, tupleDescriptor, tupleStore);
}
tuplestore_donestoring(tupleStore);

View File

@ -33,7 +33,11 @@ SELECT create_intermediate_result('squares', 'SELECT s, s*s FROM generate_series
(1 row)
SELECT * FROM read_intermediate_result('squares', 'binary') AS res (x int, x2 int);
ERROR: result "squares" does not exist
WARNING: Query could not find the intermediate result file "squares", it was mostly likely deleted due to an error in a parallel process within the same distributed transaction
x | x2
---------------------------------------------------------------------
(0 rows)
BEGIN;
CREATE TABLE interesting_squares (user_id text, interested_in text);
SELECT create_distributed_table('interesting_squares', 'user_id');
@ -83,30 +87,20 @@ ORDER BY x;
(3 rows)
END;
CREATE FUNCTION raise_failed_execution_int_result(query text) RETURNS void AS $$
BEGIN
EXECUTE query;
EXCEPTION WHEN OTHERS THEN
IF SQLERRM LIKE '%does not exist%' THEN
RAISE 'Task failed to execute';
ELSIF SQLERRM LIKE '%could not receive query results%' THEN
RAISE 'Task failed to execute';
END IF;
END;
$$LANGUAGE plpgsql;
-- don't print the worker port
\set VERBOSITY terse
SET client_min_messages TO ERROR;
-- files should now be cleaned up
SELECT raise_failed_execution_int_result($$
SELECT x, x2
FROM interesting_squares JOIN (SELECT * FROM read_intermediate_result('squares', 'binary') AS res (x text, x2 int)) squares ON (x = interested_in)
WHERE user_id = 'jon'
ORDER BY x;
$$);
ERROR: Task failed to execute
\set VERBOSITY DEFAULT
SET client_min_messages TO DEFAULT;
SET client_min_messages TO DEBUG;
SELECT x, x2
FROM interesting_squares JOIN (SELECT * FROM read_intermediate_result('squares', 'binary') AS res (x text, x2 int)) squares ON (x = interested_in)
WHERE user_id = 'jon' OR true
ORDER BY x;
DEBUG: Router planner cannot handle multi-shard select queries
DEBUG: Query could not find the intermediate result file "squares", it was mostly likely deleted due to an error in a parallel process within the same distributed transaction
DETAIL: WARNING from localhost:xxxxx
x | x2
---------------------------------------------------------------------
(0 rows)
RESET client_min_messages;
-- try to read the file as text, will fail because of binary encoding
BEGIN;
SELECT create_intermediate_result('squares', 'SELECT s, s*s FROM generate_series(1,5) s');
@ -314,7 +308,11 @@ SELECT create_intermediate_result('squares_1', 'SELECT s, s*s FROM generate_seri
(1 row)
SELECT * FROM read_intermediate_results(ARRAY['squares_1']::text[], 'binary') AS res (x int, x2 int);
ERROR: result "squares_1" does not exist
WARNING: Query could not find the intermediate result file "squares_1", it was mostly likely deleted due to an error in a parallel process within the same distributed transaction
x | x2
---------------------------------------------------------------------
(0 rows)
-- error behaviour, and also check that results are deleted on rollback
BEGIN;
SELECT create_intermediate_result('squares_1', 'SELECT s, s*s FROM generate_series(1,3) s');
@ -325,10 +323,24 @@ SELECT create_intermediate_result('squares_1', 'SELECT s, s*s FROM generate_seri
SAVEPOINT s1;
SELECT * FROM read_intermediate_results(ARRAY['notexistingfile', 'squares_1'], 'binary') AS res (x int, x2 int);
ERROR: result "notexistingfile" does not exist
WARNING: Query could not find the intermediate result file "notexistingfile", it was mostly likely deleted due to an error in a parallel process within the same distributed transaction
x | x2
---------------------------------------------------------------------
1 | 1
2 | 4
3 | 9
(3 rows)
ROLLBACK TO SAVEPOINT s1;
SELECT * FROM read_intermediate_results(ARRAY['squares_1', 'notexistingfile'], 'binary') AS res (x int, x2 int);
ERROR: result "notexistingfile" does not exist
WARNING: Query could not find the intermediate result file "notexistingfile", it was mostly likely deleted due to an error in a parallel process within the same distributed transaction
x | x2
---------------------------------------------------------------------
1 | 1
2 | 4
3 | 9
(3 rows)
ROLLBACK TO SAVEPOINT s1;
SELECT * FROM read_intermediate_results(ARRAY['squares_1', NULL], 'binary') AS res (x int, x2 int);
ERROR: null array element not allowed in this context
@ -348,7 +360,11 @@ SELECT count(*) FROM read_intermediate_results(ARRAY[]::text[], 'binary') AS res
END;
SELECT * FROM read_intermediate_results(ARRAY['squares_1']::text[], 'binary') AS res (x int, x2 int);
ERROR: result "squares_1" does not exist
WARNING: Query could not find the intermediate result file "squares_1", it was mostly likely deleted due to an error in a parallel process within the same distributed transaction
x | x2
---------------------------------------------------------------------
(0 rows)
-- Test non-binary format: read_intermediate_results(..., 'text')
BEGIN;
-- ROW(...) types switch the output format to text
@ -481,7 +497,12 @@ SELECT store_intermediate_result_on_node('localhost', :worker_1_port,
SAVEPOINT s1;
-- results aren't available on coordinator yet
SELECT * FROM read_intermediate_results(ARRAY['squares_1', 'squares_2']::text[], 'binary') AS res (x int, x2 int);
ERROR: result "squares_1" does not exist
WARNING: Query could not find the intermediate result file "squares_1", it was mostly likely deleted due to an error in a parallel process within the same distributed transaction
WARNING: Query could not find the intermediate result file "squares_2", it was mostly likely deleted due to an error in a parallel process within the same distributed transaction
x | x2
---------------------------------------------------------------------
(0 rows)
ROLLBACK TO SAVEPOINT s1;
-- fetch from worker 2 should fail
SELECT * FROM fetch_intermediate_results(ARRAY['squares_1', 'squares_2']::text[], 'localhost', :worker_2_port);
@ -490,7 +511,12 @@ CONTEXT: while executing command on localhost:xxxxx
ROLLBACK TO SAVEPOINT s1;
-- still, results aren't available on coordinator yet
SELECT * FROM read_intermediate_results(ARRAY['squares_1', 'squares_2']::text[], 'binary') AS res (x int, x2 int);
ERROR: result "squares_1" does not exist
WARNING: Query could not find the intermediate result file "squares_1", it was mostly likely deleted due to an error in a parallel process within the same distributed transaction
WARNING: Query could not find the intermediate result file "squares_2", it was mostly likely deleted due to an error in a parallel process within the same distributed transaction
x | x2
---------------------------------------------------------------------
(0 rows)
ROLLBACK TO SAVEPOINT s1;
-- fetch from worker 1 should succeed
SELECT * FROM fetch_intermediate_results(ARRAY['squares_1', 'squares_2']::text[], 'localhost', :worker_1_port);
@ -538,11 +564,15 @@ ERROR: worker array object cannot contain null values
END;
-- results should have been deleted after transaction commit
SELECT * FROM read_intermediate_results(ARRAY['squares_1', 'squares_2']::text[], 'binary') AS res (x int, x2 int);
ERROR: result "squares_1" does not exist
WARNING: Query could not find the intermediate result file "squares_1", it was mostly likely deleted due to an error in a parallel process within the same distributed transaction
WARNING: Query could not find the intermediate result file "squares_2", it was mostly likely deleted due to an error in a parallel process within the same distributed transaction
x | x2
---------------------------------------------------------------------
(0 rows)
DROP SCHEMA intermediate_results CASCADE;
NOTICE: drop cascades to 5 other objects
NOTICE: drop cascades to 4 other objects
DETAIL: drop cascades to table interesting_squares
drop cascades to function raise_failed_execution_int_result(text)
drop cascades to type square_type
drop cascades to table stored_squares
drop cascades to table squares

View File

@ -44,34 +44,14 @@ JOIN (SELECT * FROM read_intermediate_result('squares', 'binary') AS res (x int,
ORDER BY x;
END;
CREATE FUNCTION raise_failed_execution_int_result(query text) RETURNS void AS $$
BEGIN
EXECUTE query;
EXCEPTION WHEN OTHERS THEN
IF SQLERRM LIKE '%does not exist%' THEN
RAISE 'Task failed to execute';
ELSIF SQLERRM LIKE '%could not receive query results%' THEN
RAISE 'Task failed to execute';
END IF;
END;
$$LANGUAGE plpgsql;
-- don't print the worker port
\set VERBOSITY terse
SET client_min_messages TO ERROR;
-- files should now be cleaned up
SELECT raise_failed_execution_int_result($$
SELECT x, x2
FROM interesting_squares JOIN (SELECT * FROM read_intermediate_result('squares', 'binary') AS res (x text, x2 int)) squares ON (x = interested_in)
WHERE user_id = 'jon'
ORDER BY x;
$$);
\set VERBOSITY DEFAULT
SET client_min_messages TO DEFAULT;
SET client_min_messages TO DEBUG;
SELECT x, x2
FROM interesting_squares JOIN (SELECT * FROM read_intermediate_result('squares', 'binary') AS res (x text, x2 int)) squares ON (x = interested_in)
WHERE user_id = 'jon' OR true
ORDER BY x;
RESET client_min_messages;
-- try to read the file as text, will fail because of binary encoding
BEGIN;
SELECT create_intermediate_result('squares', 'SELECT s, s*s FROM generate_series(1,5) s');