diff --git a/src/backend/distributed/executor/intermediate_results.c b/src/backend/distributed/executor/intermediate_results.c index ca7b20f08..d5c39e4ff 100644 --- a/src/backend/distributed/executor/intermediate_results.c +++ b/src/backend/distributed/executor/intermediate_results.c @@ -795,11 +795,32 @@ ReadIntermediateResultsIntoFuncOutput(FunctionCallInfo fcinfo, char *copyFormat, int statOK = stat(resultFileName, &fileStat); if (statOK != 0) { - ereport(ERROR, (errcode_for_file_access(), - errmsg("result \"%s\" does not exist", resultId))); + /* + * When the file does not exist, it could mean two different things. + * First -- and a lot more common -- case is that a failure happened + * in a concurrent backend on the same distributed transaction. And, + * one of the backends in that transaction has already been roll + * backed, which has already removed the file. If we throw an error + * here, the user might see this error instead of the actual error + * message. Instead, we prefer to WARN the user and pretend that the + * file has no data in it. In the end, the user would see the actual + * error message for the failure. + * + * Second, in case of any bugs in intermediate result broadcasts, + * we could try to read a non-existing file. That is most likely + * to happen during development. + */ + ereport(WARNING, (errcode_for_file_access(), + errmsg("Query could not find the intermediate result file " + "\"%s\", it was mostly likely deleted due to an " + "error in a parallel process within the same " + "distributed transaction", resultId))); + } + else + { + ReadFileIntoTupleStore(resultFileName, copyFormat, tupleDescriptor, + tupleStore); } - - ReadFileIntoTupleStore(resultFileName, copyFormat, tupleDescriptor, tupleStore); } tuplestore_donestoring(tupleStore); diff --git a/src/test/regress/expected/intermediate_results.out b/src/test/regress/expected/intermediate_results.out index cfdf3382a..fc46ab14a 100644 --- a/src/test/regress/expected/intermediate_results.out +++ b/src/test/regress/expected/intermediate_results.out @@ -33,7 +33,11 @@ SELECT create_intermediate_result('squares', 'SELECT s, s*s FROM generate_series (1 row) SELECT * FROM read_intermediate_result('squares', 'binary') AS res (x int, x2 int); -ERROR: result "squares" does not exist +WARNING: Query could not find the intermediate result file "squares", it was mostly likely deleted due to an error in a parallel process within the same distributed transaction + x | x2 +--------------------------------------------------------------------- +(0 rows) + BEGIN; CREATE TABLE interesting_squares (user_id text, interested_in text); SELECT create_distributed_table('interesting_squares', 'user_id'); @@ -83,30 +87,20 @@ ORDER BY x; (3 rows) END; -CREATE FUNCTION raise_failed_execution_int_result(query text) RETURNS void AS $$ -BEGIN - EXECUTE query; - EXCEPTION WHEN OTHERS THEN - IF SQLERRM LIKE '%does not exist%' THEN - RAISE 'Task failed to execute'; - ELSIF SQLERRM LIKE '%could not receive query results%' THEN - RAISE 'Task failed to execute'; - END IF; -END; -$$LANGUAGE plpgsql; --- don't print the worker port -\set VERBOSITY terse -SET client_min_messages TO ERROR; -- files should now be cleaned up -SELECT raise_failed_execution_int_result($$ - SELECT x, x2 - FROM interesting_squares JOIN (SELECT * FROM read_intermediate_result('squares', 'binary') AS res (x text, x2 int)) squares ON (x = interested_in) - WHERE user_id = 'jon' - ORDER BY x; -$$); -ERROR: Task failed to execute -\set VERBOSITY DEFAULT -SET client_min_messages TO DEFAULT; +SET client_min_messages TO DEBUG; +SELECT x, x2 +FROM interesting_squares JOIN (SELECT * FROM read_intermediate_result('squares', 'binary') AS res (x text, x2 int)) squares ON (x = interested_in) +WHERE user_id = 'jon' OR true +ORDER BY x; +DEBUG: Router planner cannot handle multi-shard select queries +DEBUG: Query could not find the intermediate result file "squares", it was mostly likely deleted due to an error in a parallel process within the same distributed transaction +DETAIL: WARNING from localhost:xxxxx + x | x2 +--------------------------------------------------------------------- +(0 rows) + +RESET client_min_messages; -- try to read the file as text, will fail because of binary encoding BEGIN; SELECT create_intermediate_result('squares', 'SELECT s, s*s FROM generate_series(1,5) s'); @@ -314,7 +308,11 @@ SELECT create_intermediate_result('squares_1', 'SELECT s, s*s FROM generate_seri (1 row) SELECT * FROM read_intermediate_results(ARRAY['squares_1']::text[], 'binary') AS res (x int, x2 int); -ERROR: result "squares_1" does not exist +WARNING: Query could not find the intermediate result file "squares_1", it was mostly likely deleted due to an error in a parallel process within the same distributed transaction + x | x2 +--------------------------------------------------------------------- +(0 rows) + -- error behaviour, and also check that results are deleted on rollback BEGIN; SELECT create_intermediate_result('squares_1', 'SELECT s, s*s FROM generate_series(1,3) s'); @@ -325,10 +323,24 @@ SELECT create_intermediate_result('squares_1', 'SELECT s, s*s FROM generate_seri SAVEPOINT s1; SELECT * FROM read_intermediate_results(ARRAY['notexistingfile', 'squares_1'], 'binary') AS res (x int, x2 int); -ERROR: result "notexistingfile" does not exist +WARNING: Query could not find the intermediate result file "notexistingfile", it was mostly likely deleted due to an error in a parallel process within the same distributed transaction + x | x2 +--------------------------------------------------------------------- + 1 | 1 + 2 | 4 + 3 | 9 +(3 rows) + ROLLBACK TO SAVEPOINT s1; SELECT * FROM read_intermediate_results(ARRAY['squares_1', 'notexistingfile'], 'binary') AS res (x int, x2 int); -ERROR: result "notexistingfile" does not exist +WARNING: Query could not find the intermediate result file "notexistingfile", it was mostly likely deleted due to an error in a parallel process within the same distributed transaction + x | x2 +--------------------------------------------------------------------- + 1 | 1 + 2 | 4 + 3 | 9 +(3 rows) + ROLLBACK TO SAVEPOINT s1; SELECT * FROM read_intermediate_results(ARRAY['squares_1', NULL], 'binary') AS res (x int, x2 int); ERROR: null array element not allowed in this context @@ -348,7 +360,11 @@ SELECT count(*) FROM read_intermediate_results(ARRAY[]::text[], 'binary') AS res END; SELECT * FROM read_intermediate_results(ARRAY['squares_1']::text[], 'binary') AS res (x int, x2 int); -ERROR: result "squares_1" does not exist +WARNING: Query could not find the intermediate result file "squares_1", it was mostly likely deleted due to an error in a parallel process within the same distributed transaction + x | x2 +--------------------------------------------------------------------- +(0 rows) + -- Test non-binary format: read_intermediate_results(..., 'text') BEGIN; -- ROW(...) types switch the output format to text @@ -481,7 +497,12 @@ SELECT store_intermediate_result_on_node('localhost', :worker_1_port, SAVEPOINT s1; -- results aren't available on coordinator yet SELECT * FROM read_intermediate_results(ARRAY['squares_1', 'squares_2']::text[], 'binary') AS res (x int, x2 int); -ERROR: result "squares_1" does not exist +WARNING: Query could not find the intermediate result file "squares_1", it was mostly likely deleted due to an error in a parallel process within the same distributed transaction +WARNING: Query could not find the intermediate result file "squares_2", it was mostly likely deleted due to an error in a parallel process within the same distributed transaction + x | x2 +--------------------------------------------------------------------- +(0 rows) + ROLLBACK TO SAVEPOINT s1; -- fetch from worker 2 should fail SELECT * FROM fetch_intermediate_results(ARRAY['squares_1', 'squares_2']::text[], 'localhost', :worker_2_port); @@ -490,7 +511,12 @@ CONTEXT: while executing command on localhost:xxxxx ROLLBACK TO SAVEPOINT s1; -- still, results aren't available on coordinator yet SELECT * FROM read_intermediate_results(ARRAY['squares_1', 'squares_2']::text[], 'binary') AS res (x int, x2 int); -ERROR: result "squares_1" does not exist +WARNING: Query could not find the intermediate result file "squares_1", it was mostly likely deleted due to an error in a parallel process within the same distributed transaction +WARNING: Query could not find the intermediate result file "squares_2", it was mostly likely deleted due to an error in a parallel process within the same distributed transaction + x | x2 +--------------------------------------------------------------------- +(0 rows) + ROLLBACK TO SAVEPOINT s1; -- fetch from worker 1 should succeed SELECT * FROM fetch_intermediate_results(ARRAY['squares_1', 'squares_2']::text[], 'localhost', :worker_1_port); @@ -538,11 +564,15 @@ ERROR: worker array object cannot contain null values END; -- results should have been deleted after transaction commit SELECT * FROM read_intermediate_results(ARRAY['squares_1', 'squares_2']::text[], 'binary') AS res (x int, x2 int); -ERROR: result "squares_1" does not exist +WARNING: Query could not find the intermediate result file "squares_1", it was mostly likely deleted due to an error in a parallel process within the same distributed transaction +WARNING: Query could not find the intermediate result file "squares_2", it was mostly likely deleted due to an error in a parallel process within the same distributed transaction + x | x2 +--------------------------------------------------------------------- +(0 rows) + DROP SCHEMA intermediate_results CASCADE; -NOTICE: drop cascades to 5 other objects +NOTICE: drop cascades to 4 other objects DETAIL: drop cascades to table interesting_squares -drop cascades to function raise_failed_execution_int_result(text) drop cascades to type square_type drop cascades to table stored_squares drop cascades to table squares diff --git a/src/test/regress/sql/intermediate_results.sql b/src/test/regress/sql/intermediate_results.sql index c421c3da9..156cd09ac 100644 --- a/src/test/regress/sql/intermediate_results.sql +++ b/src/test/regress/sql/intermediate_results.sql @@ -44,34 +44,14 @@ JOIN (SELECT * FROM read_intermediate_result('squares', 'binary') AS res (x int, ORDER BY x; END; - -CREATE FUNCTION raise_failed_execution_int_result(query text) RETURNS void AS $$ -BEGIN - EXECUTE query; - EXCEPTION WHEN OTHERS THEN - IF SQLERRM LIKE '%does not exist%' THEN - RAISE 'Task failed to execute'; - ELSIF SQLERRM LIKE '%could not receive query results%' THEN - RAISE 'Task failed to execute'; - END IF; -END; -$$LANGUAGE plpgsql; - --- don't print the worker port -\set VERBOSITY terse -SET client_min_messages TO ERROR; - -- files should now be cleaned up -SELECT raise_failed_execution_int_result($$ - SELECT x, x2 - FROM interesting_squares JOIN (SELECT * FROM read_intermediate_result('squares', 'binary') AS res (x text, x2 int)) squares ON (x = interested_in) - WHERE user_id = 'jon' - ORDER BY x; -$$); - -\set VERBOSITY DEFAULT -SET client_min_messages TO DEFAULT; +SET client_min_messages TO DEBUG; +SELECT x, x2 +FROM interesting_squares JOIN (SELECT * FROM read_intermediate_result('squares', 'binary') AS res (x text, x2 int)) squares ON (x = interested_in) +WHERE user_id = 'jon' OR true +ORDER BY x; +RESET client_min_messages; -- try to read the file as text, will fail because of binary encoding BEGIN; SELECT create_intermediate_result('squares', 'SELECT s, s*s FROM generate_series(1,5) s');