From 9b29a32d7ad937e6fe4c2b23bbe8e6208c59712b Mon Sep 17 00:00:00 2001 From: Onder Kalaci Date: Fri, 3 Apr 2020 11:35:25 +0200 Subject: [PATCH 1/6] Remove all references for side channel connections We don't need any side channel connections. That is actually problematic in the sense that it creates extra connections. Say, citus.max_adaptive_executor_pool_size equals to 1, Citus ends up using one extra connection for the intermediate results. Thus, not obeying citus.max_adaptive_executor_pool_size. In this PR, we remove the following entities from the codebase to allow further commits to implement not requiring extra connection for the intermediate results: - The connection flag REQUIRE_SIDECHANNEL - The function GivePurposeToConnection - The ConnectionPurpose struct and related fields --- .../connection/connection_management.c | 56 +------------------ .../executor/intermediate_results.c | 10 +--- .../distributed/connection_management.h | 24 +------- 3 files changed, 5 insertions(+), 85 deletions(-) diff --git a/src/backend/distributed/connection/connection_management.c b/src/backend/distributed/connection/connection_management.c index 7f4cfcff3..9c5bd17a5 100644 --- a/src/backend/distributed/connection/connection_management.c +++ b/src/backend/distributed/connection/connection_management.c @@ -54,7 +54,6 @@ static bool ShouldShutdownConnection(MultiConnection *connection, const int static void ResetConnection(MultiConnection *connection); static void DefaultCitusNoticeProcessor(void *arg, const char *message); static MultiConnection * FindAvailableConnection(dlist_head *connections, uint32 flags); -static void GivePurposeToConnection(MultiConnection *connection, int flags); static bool RemoteTransactionIdle(MultiConnection *connection); static int EventSetSizeForConnectionList(List *connections); @@ -314,8 +313,6 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port, connection = FindAvailableConnection(entry->connections, flags); if (connection) { - GivePurposeToConnection(connection, flags); - return connection; } } @@ -330,7 +327,6 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port, dlist_push_tail(entry->connections, &connection->connectionNode); ResetShardPlacementAssociation(connection); - GivePurposeToConnection(connection, flags); return connection; } @@ -338,10 +334,7 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port, /* * FindAvailableConnection searches the given list of connections for one that - * is not claimed exclusively or marked as a side channel. If the caller passed - * the REQUIRE_SIDECHANNEL flag, it will only return a connection that has not - * been used to access shard placements and that connectoin will only be returned - * in subsequent calls if the REQUIRE_SIDECHANNEL flag is passed. + * is not claimed exclusively. * * If no connection is available, FindAvailableConnection returns NULL. */ @@ -382,56 +375,13 @@ FindAvailableConnection(dlist_head *connections, uint32 flags) continue; } - if ((flags & REQUIRE_SIDECHANNEL) != 0) - { - if (connection->purpose == CONNECTION_PURPOSE_SIDECHANNEL || - connection->purpose == CONNECTION_PURPOSE_ANY) - { - /* side channel must not have been used to access data */ - Assert(!ConnectionUsedForAnyPlacements(connection)); - - return connection; - } - } - else if (connection->purpose == CONNECTION_PURPOSE_DATA_ACCESS || - connection->purpose == CONNECTION_PURPOSE_ANY) - { - /* can use this connection to access data */ - return connection; - } + return connection; } return NULL; } -/* - * GivePurposeToConnection gives purpose to a connection if it does not already - * have a purpose. More specifically, it marks the connection as a sidechannel - * if the REQUIRE_SIDECHANNEL flag is set. - */ -static void -GivePurposeToConnection(MultiConnection *connection, int flags) -{ - if (connection->purpose != CONNECTION_PURPOSE_ANY) - { - /* connection already has a purpose */ - return; - } - - if ((flags & REQUIRE_SIDECHANNEL) != 0) - { - /* connection should not be used for data access */ - connection->purpose = CONNECTION_PURPOSE_SIDECHANNEL; - } - else - { - /* connection should be used for data access */ - connection->purpose = CONNECTION_PURPOSE_DATA_ACCESS; - } -} - - /* * CloseAllConnectionsAfterTransaction sets the forceClose flag of all the * connections. This is mainly done when citus.node_conninfo changes. @@ -1071,7 +1021,6 @@ StartConnectionEstablishment(ConnectionHashKey *key) false); connection->connectionStart = GetCurrentTimestamp(); connection->connectionId = connectionId++; - connection->purpose = CONNECTION_PURPOSE_ANY; /* * To avoid issues with interrupts not getting caught all our connections @@ -1228,7 +1177,6 @@ ResetConnection(MultiConnection *connection) /* reset copy state */ connection->copyBytesWrittenSinceLastFlush = 0; - connection->purpose = CONNECTION_PURPOSE_ANY; UnclaimConnection(connection); } diff --git a/src/backend/distributed/executor/intermediate_results.c b/src/backend/distributed/executor/intermediate_results.c index 2cc875d5d..48808f2cb 100644 --- a/src/backend/distributed/executor/intermediate_results.c +++ b/src/backend/distributed/executor/intermediate_results.c @@ -274,17 +274,11 @@ RemoteFileDestReceiverStartup(DestReceiver *dest, int operation, WorkerNode *workerNode = NULL; foreach_ptr(workerNode, initialNodeList) { + int flags = 0; + const char *nodeName = workerNode->workerName; int nodePort = workerNode->workerPort; - /* - * We prefer to use a connection that is not associcated with - * any placements. The reason is that we claim this connection - * exclusively and that would prevent the consecutive DML/DDL - * use the same connection. - */ - int flags = REQUIRE_SIDECHANNEL; - MultiConnection *connection = StartNodeConnection(flags, nodeName, nodePort); ClaimConnectionExclusively(connection); MarkRemoteTransactionCritical(connection); diff --git a/src/include/distributed/connection_management.h b/src/include/distributed/connection_management.h index c0c8a8f28..015546158 100644 --- a/src/include/distributed/connection_management.h +++ b/src/include/distributed/connection_management.h @@ -52,28 +52,9 @@ enum MultiConnectionMode /* open a connection per (co-located set of) placement(s) */ CONNECTION_PER_PLACEMENT = 1 << 3, - OUTSIDE_TRANSACTION = 1 << 4, - - /* connection has not been used to access data */ - REQUIRE_SIDECHANNEL = 1 << 5 + OUTSIDE_TRANSACTION = 1 << 4 }; -/* - * ConnectionPurpose defines what a connection is used for during the - * current transaction. This is primarily to not allocate connections - * that are needed for data access to other purposes. - */ -typedef enum ConnectionPurpose -{ - /* connection can be used for any purpose */ - CONNECTION_PURPOSE_ANY, - - /* connection can be used to access placements */ - CONNECTION_PURPOSE_DATA_ACCESS, - - /* connection can be used for auxiliary functions, but not data access */ - CONNECTION_PURPOSE_SIDECHANNEL -} ConnectionPurpose; typedef enum MultiConnectionState { @@ -116,9 +97,6 @@ typedef struct MultiConnection /* is the connection currently in use, and shouldn't be used by anything else */ bool claimedExclusively; - /* defines the purpose of the connection */ - ConnectionPurpose purpose; - /* time connection establishment was started, for timeout */ TimestampTz connectionStart; From 721daec9a5d2851a359ce4290e0a8d2d0cd75bcf Mon Sep 17 00:00:00 2001 From: Onder Kalaci Date: Fri, 3 Apr 2020 11:57:53 +0200 Subject: [PATCH 2/6] Move the logic that initilize connections/local files into a function --- .../executor/intermediate_results.c | 23 +++++++++++++++---- 1 file changed, 18 insertions(+), 5 deletions(-) diff --git a/src/backend/distributed/executor/intermediate_results.c b/src/backend/distributed/executor/intermediate_results.c index 48808f2cb..9330400f8 100644 --- a/src/backend/distributed/executor/intermediate_results.c +++ b/src/backend/distributed/executor/intermediate_results.c @@ -82,6 +82,7 @@ typedef struct RemoteFileDestReceiver static void RemoteFileDestReceiverStartup(DestReceiver *dest, int operation, TupleDesc inputTupleDescriptor); +static void PrepareIntermediateResultBroadcast(RemoteFileDestReceiver *resultDest); static StringInfo ConstructCopyResultStatement(const char *resultId); static void WriteToLocalFile(StringInfo copyData, FileCompat *fileCompat); static bool RemoteFileDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest); @@ -233,14 +234,9 @@ RemoteFileDestReceiverStartup(DestReceiver *dest, int operation, { RemoteFileDestReceiver *resultDest = (RemoteFileDestReceiver *) dest; - const char *resultId = resultDest->resultId; - const char *delimiterCharacter = "\t"; const char *nullPrintCharacter = "\\N"; - List *initialNodeList = resultDest->initialNodeList; - List *connectionList = NIL; - resultDest->tupleDescriptor = inputTupleDescriptor; /* define how tuples will be serialised */ @@ -256,6 +252,23 @@ RemoteFileDestReceiverStartup(DestReceiver *dest, int operation, resultDest->columnOutputFunctions = ColumnOutputFunctions(inputTupleDescriptor, copyOutState->binary); + PrepareIntermediateResultBroadcast(resultDest); +} + + +/* + * PrepareIntermediateResultBroadcast gets a RemoteFileDestReceiver and does + * the necessary initilizations including initiating the remote connnections + * and creating the local file, which is necessary (it might be both). + */ +static void +PrepareIntermediateResultBroadcast(RemoteFileDestReceiver *resultDest) +{ + List *initialNodeList = resultDest->initialNodeList; + const char *resultId = resultDest->resultId; + List *connectionList = NIL; + CopyOutState copyOutState = resultDest->copyOutState; + if (resultDest->writeLocalFile) { const int fileFlags = (O_APPEND | O_CREAT | O_RDWR | O_TRUNC | PG_BINARY); From 4f7c902c6cd1ec7df75197c357e86662355ac48a Mon Sep 17 00:00:00 2001 From: Onder Kalaci Date: Fri, 3 Apr 2020 12:04:07 +0200 Subject: [PATCH 3/6] Move connection establishment for intermediate results after query execution When we have a query like the following: ```SQL WITH a AS (SELECT * FROM foo LIMIT 10) SELECT max(x) FROM a JOIN bar 2 USING (y); ``` Citus currently opens side channels for doing the `COPY "1_1"` FROM STDIN (format 'result') before starting the execution of `SELECT * FROM foo LIMIT 10` Since we need at least 1 connection per worker to do `SELECT * FROM foo LIMIT 10` We need to have 2 connections to worker in order to broadcast the results. However, we don't actually send a single row over the side channel until the execution of `SELECT * FROM foo LIMIT 10` is completely done (and connections unclaimed) and the results are written to a tuple store. We could actually reuse the same connection for doing the `COPY "1_1"` FROM STDIN (format 'result'). This also fixes the issue that Citus doesn't obey `citus.max_adaptive_executor_pool_size` when the query includes an intermediate result. --- .../executor/intermediate_results.c | 22 +++++++++++++++++-- 1 file changed, 20 insertions(+), 2 deletions(-) diff --git a/src/backend/distributed/executor/intermediate_results.c b/src/backend/distributed/executor/intermediate_results.c index 9330400f8..ca7b20f08 100644 --- a/src/backend/distributed/executor/intermediate_results.c +++ b/src/backend/distributed/executor/intermediate_results.c @@ -251,8 +251,6 @@ RemoteFileDestReceiverStartup(DestReceiver *dest, int operation, resultDest->columnOutputFunctions = ColumnOutputFunctions(inputTupleDescriptor, copyOutState->binary); - - PrepareIntermediateResultBroadcast(resultDest); } @@ -372,6 +370,15 @@ RemoteFileDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest) { RemoteFileDestReceiver *resultDest = (RemoteFileDestReceiver *) dest; + if (resultDest->tuplesSent == 0) + { + /* + * We get the first tuple, lets initialize the remote connections + * and/or the local file. + */ + PrepareIntermediateResultBroadcast(resultDest); + } + TupleDesc tupleDescriptor = resultDest->tupleDescriptor; List *connectionList = resultDest->connectionList; @@ -441,6 +448,17 @@ RemoteFileDestReceiverShutdown(DestReceiver *destReceiver) { RemoteFileDestReceiver *resultDest = (RemoteFileDestReceiver *) destReceiver; + if (resultDest->tuplesSent == 0) + { + /* + * We have not received any tuples (when the intermediate result + * returns zero rows). Still, we want to create the necessary + * intermediate result files even if they are empty, as the query + * execution requires the files to be present. + */ + PrepareIntermediateResultBroadcast(resultDest); + } + List *connectionList = resultDest->connectionList; CopyOutState copyOutState = resultDest->copyOutState; From 4b3d17f466dff90da13ff503de089f5998d99fe3 Mon Sep 17 00:00:00 2001 From: Onder Kalaci Date: Fri, 3 Apr 2020 13:29:15 +0200 Subject: [PATCH 4/6] Make sure that tests are not failing randomly --- src/test/regress/expected/failure_multi_shard_update_delete.out | 1 + src/test/regress/sql/failure_multi_shard_update_delete.sql | 1 + 2 files changed, 2 insertions(+) diff --git a/src/test/regress/expected/failure_multi_shard_update_delete.out b/src/test/regress/expected/failure_multi_shard_update_delete.out index cebd7f8c6..ffc08aef7 100644 --- a/src/test/regress/expected/failure_multi_shard_update_delete.out +++ b/src/test/regress/expected/failure_multi_shard_update_delete.out @@ -6,6 +6,7 @@ SET SEARCH_PATH = multi_shard; SET citus.shard_count TO 4; SET citus.next_shard_id TO 201000; SET citus.shard_replication_factor TO 1; +SET citus.max_adaptive_executor_pool_size TO 1; -- do not cache any connections SET citus.max_cached_conns_per_worker TO 0; SELECT citus.mitmproxy('conn.allow()'); diff --git a/src/test/regress/sql/failure_multi_shard_update_delete.sql b/src/test/regress/sql/failure_multi_shard_update_delete.sql index 60a4078e2..d9d0c2250 100644 --- a/src/test/regress/sql/failure_multi_shard_update_delete.sql +++ b/src/test/regress/sql/failure_multi_shard_update_delete.sql @@ -7,6 +7,7 @@ SET SEARCH_PATH = multi_shard; SET citus.shard_count TO 4; SET citus.next_shard_id TO 201000; SET citus.shard_replication_factor TO 1; +SET citus.max_adaptive_executor_pool_size TO 1; -- do not cache any connections SET citus.max_cached_conns_per_worker TO 0; From a695b44ce9c3805bcbdf6bd971efd6d8d85e24a7 Mon Sep 17 00:00:00 2001 From: Onder Kalaci Date: Fri, 3 Apr 2020 15:45:02 +0200 Subject: [PATCH 5/6] Add new regression tests --- src/test/regress/expected/with_basics.out | 5 +++++ src/test/regress/sql/with_basics.sql | 5 ++++- 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/src/test/regress/expected/with_basics.out b/src/test/regress/expected/with_basics.out index 4cd27d624..ce66ec103 100644 --- a/src/test/regress/expected/with_basics.out +++ b/src/test/regress/expected/with_basics.out @@ -1011,6 +1011,11 @@ LEFT JOIN WHERE d1.user_id = d2.user_id )) AS bar USING (user_id); ERROR: cannot pushdown the subquery DETAIL: Complex subqueries and CTEs cannot be in the outer part of the outer join +-- some test with failures +WITH a AS (SELECT * FROM users_table LIMIT 10) + SELECT user_id/0 FROM users_table JOIN a USING (user_id); +ERROR: division by zero +CONTEXT: while executing command on localhost:xxxxx RESET citus.enable_cte_inlining; DROP VIEW basic_view; DROP VIEW cte_view; diff --git a/src/test/regress/sql/with_basics.sql b/src/test/regress/sql/with_basics.sql index 88e048a02..fb7a857cf 100644 --- a/src/test/regress/sql/with_basics.sql +++ b/src/test/regress/sql/with_basics.sql @@ -695,8 +695,11 @@ LEFT JOIN FROM distinct_undistribured d2 WHERE d1.user_id = d2.user_id )) AS bar USING (user_id); -RESET citus.enable_cte_inlining; +-- some test with failures +WITH a AS (SELECT * FROM users_table LIMIT 10) + SELECT user_id/0 FROM users_table JOIN a USING (user_id); +RESET citus.enable_cte_inlining; DROP VIEW basic_view; DROP VIEW cte_view; From 70012dfd33e0aaac4324d3778f56e99a03553afd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C3=96nder=20Kalac=C4=B1?= Date: Tue, 7 Apr 2020 16:31:10 +0200 Subject: [PATCH 6/6] Do not error when an intermediate file does not exit (#3707) When the file does not exist, it could mean two different things. First -- and a lot more common -- case is that a failure happened in a concurrent backend on the same distributed transaction. And, one of the backends in that transaction has already been roll backed, which has already removed the file. If we throw an error here, the user might see this error instead of the actual error message. Instead, we prefer to WARN the user and pretend that the file has no data in it. In the end, the user would see the actual error message for the failure. Second, in case of any bugs in intermediate result broadcasts, we could try to read a non-existing file. That is most likely to happen during development. Thus, when asserts enabled, we throw an error instead of WARNING so that the developers cannot miss. --- .../executor/intermediate_results.c | 29 +++++- .../regress/expected/intermediate_results.out | 96 ++++++++++++------- src/test/regress/sql/intermediate_results.sql | 32 ++----- 3 files changed, 94 insertions(+), 63 deletions(-) diff --git a/src/backend/distributed/executor/intermediate_results.c b/src/backend/distributed/executor/intermediate_results.c index ca7b20f08..d5c39e4ff 100644 --- a/src/backend/distributed/executor/intermediate_results.c +++ b/src/backend/distributed/executor/intermediate_results.c @@ -795,11 +795,32 @@ ReadIntermediateResultsIntoFuncOutput(FunctionCallInfo fcinfo, char *copyFormat, int statOK = stat(resultFileName, &fileStat); if (statOK != 0) { - ereport(ERROR, (errcode_for_file_access(), - errmsg("result \"%s\" does not exist", resultId))); + /* + * When the file does not exist, it could mean two different things. + * First -- and a lot more common -- case is that a failure happened + * in a concurrent backend on the same distributed transaction. And, + * one of the backends in that transaction has already been roll + * backed, which has already removed the file. If we throw an error + * here, the user might see this error instead of the actual error + * message. Instead, we prefer to WARN the user and pretend that the + * file has no data in it. In the end, the user would see the actual + * error message for the failure. + * + * Second, in case of any bugs in intermediate result broadcasts, + * we could try to read a non-existing file. That is most likely + * to happen during development. + */ + ereport(WARNING, (errcode_for_file_access(), + errmsg("Query could not find the intermediate result file " + "\"%s\", it was mostly likely deleted due to an " + "error in a parallel process within the same " + "distributed transaction", resultId))); + } + else + { + ReadFileIntoTupleStore(resultFileName, copyFormat, tupleDescriptor, + tupleStore); } - - ReadFileIntoTupleStore(resultFileName, copyFormat, tupleDescriptor, tupleStore); } tuplestore_donestoring(tupleStore); diff --git a/src/test/regress/expected/intermediate_results.out b/src/test/regress/expected/intermediate_results.out index cfdf3382a..fc46ab14a 100644 --- a/src/test/regress/expected/intermediate_results.out +++ b/src/test/regress/expected/intermediate_results.out @@ -33,7 +33,11 @@ SELECT create_intermediate_result('squares', 'SELECT s, s*s FROM generate_series (1 row) SELECT * FROM read_intermediate_result('squares', 'binary') AS res (x int, x2 int); -ERROR: result "squares" does not exist +WARNING: Query could not find the intermediate result file "squares", it was mostly likely deleted due to an error in a parallel process within the same distributed transaction + x | x2 +--------------------------------------------------------------------- +(0 rows) + BEGIN; CREATE TABLE interesting_squares (user_id text, interested_in text); SELECT create_distributed_table('interesting_squares', 'user_id'); @@ -83,30 +87,20 @@ ORDER BY x; (3 rows) END; -CREATE FUNCTION raise_failed_execution_int_result(query text) RETURNS void AS $$ -BEGIN - EXECUTE query; - EXCEPTION WHEN OTHERS THEN - IF SQLERRM LIKE '%does not exist%' THEN - RAISE 'Task failed to execute'; - ELSIF SQLERRM LIKE '%could not receive query results%' THEN - RAISE 'Task failed to execute'; - END IF; -END; -$$LANGUAGE plpgsql; --- don't print the worker port -\set VERBOSITY terse -SET client_min_messages TO ERROR; -- files should now be cleaned up -SELECT raise_failed_execution_int_result($$ - SELECT x, x2 - FROM interesting_squares JOIN (SELECT * FROM read_intermediate_result('squares', 'binary') AS res (x text, x2 int)) squares ON (x = interested_in) - WHERE user_id = 'jon' - ORDER BY x; -$$); -ERROR: Task failed to execute -\set VERBOSITY DEFAULT -SET client_min_messages TO DEFAULT; +SET client_min_messages TO DEBUG; +SELECT x, x2 +FROM interesting_squares JOIN (SELECT * FROM read_intermediate_result('squares', 'binary') AS res (x text, x2 int)) squares ON (x = interested_in) +WHERE user_id = 'jon' OR true +ORDER BY x; +DEBUG: Router planner cannot handle multi-shard select queries +DEBUG: Query could not find the intermediate result file "squares", it was mostly likely deleted due to an error in a parallel process within the same distributed transaction +DETAIL: WARNING from localhost:xxxxx + x | x2 +--------------------------------------------------------------------- +(0 rows) + +RESET client_min_messages; -- try to read the file as text, will fail because of binary encoding BEGIN; SELECT create_intermediate_result('squares', 'SELECT s, s*s FROM generate_series(1,5) s'); @@ -314,7 +308,11 @@ SELECT create_intermediate_result('squares_1', 'SELECT s, s*s FROM generate_seri (1 row) SELECT * FROM read_intermediate_results(ARRAY['squares_1']::text[], 'binary') AS res (x int, x2 int); -ERROR: result "squares_1" does not exist +WARNING: Query could not find the intermediate result file "squares_1", it was mostly likely deleted due to an error in a parallel process within the same distributed transaction + x | x2 +--------------------------------------------------------------------- +(0 rows) + -- error behaviour, and also check that results are deleted on rollback BEGIN; SELECT create_intermediate_result('squares_1', 'SELECT s, s*s FROM generate_series(1,3) s'); @@ -325,10 +323,24 @@ SELECT create_intermediate_result('squares_1', 'SELECT s, s*s FROM generate_seri SAVEPOINT s1; SELECT * FROM read_intermediate_results(ARRAY['notexistingfile', 'squares_1'], 'binary') AS res (x int, x2 int); -ERROR: result "notexistingfile" does not exist +WARNING: Query could not find the intermediate result file "notexistingfile", it was mostly likely deleted due to an error in a parallel process within the same distributed transaction + x | x2 +--------------------------------------------------------------------- + 1 | 1 + 2 | 4 + 3 | 9 +(3 rows) + ROLLBACK TO SAVEPOINT s1; SELECT * FROM read_intermediate_results(ARRAY['squares_1', 'notexistingfile'], 'binary') AS res (x int, x2 int); -ERROR: result "notexistingfile" does not exist +WARNING: Query could not find the intermediate result file "notexistingfile", it was mostly likely deleted due to an error in a parallel process within the same distributed transaction + x | x2 +--------------------------------------------------------------------- + 1 | 1 + 2 | 4 + 3 | 9 +(3 rows) + ROLLBACK TO SAVEPOINT s1; SELECT * FROM read_intermediate_results(ARRAY['squares_1', NULL], 'binary') AS res (x int, x2 int); ERROR: null array element not allowed in this context @@ -348,7 +360,11 @@ SELECT count(*) FROM read_intermediate_results(ARRAY[]::text[], 'binary') AS res END; SELECT * FROM read_intermediate_results(ARRAY['squares_1']::text[], 'binary') AS res (x int, x2 int); -ERROR: result "squares_1" does not exist +WARNING: Query could not find the intermediate result file "squares_1", it was mostly likely deleted due to an error in a parallel process within the same distributed transaction + x | x2 +--------------------------------------------------------------------- +(0 rows) + -- Test non-binary format: read_intermediate_results(..., 'text') BEGIN; -- ROW(...) types switch the output format to text @@ -481,7 +497,12 @@ SELECT store_intermediate_result_on_node('localhost', :worker_1_port, SAVEPOINT s1; -- results aren't available on coordinator yet SELECT * FROM read_intermediate_results(ARRAY['squares_1', 'squares_2']::text[], 'binary') AS res (x int, x2 int); -ERROR: result "squares_1" does not exist +WARNING: Query could not find the intermediate result file "squares_1", it was mostly likely deleted due to an error in a parallel process within the same distributed transaction +WARNING: Query could not find the intermediate result file "squares_2", it was mostly likely deleted due to an error in a parallel process within the same distributed transaction + x | x2 +--------------------------------------------------------------------- +(0 rows) + ROLLBACK TO SAVEPOINT s1; -- fetch from worker 2 should fail SELECT * FROM fetch_intermediate_results(ARRAY['squares_1', 'squares_2']::text[], 'localhost', :worker_2_port); @@ -490,7 +511,12 @@ CONTEXT: while executing command on localhost:xxxxx ROLLBACK TO SAVEPOINT s1; -- still, results aren't available on coordinator yet SELECT * FROM read_intermediate_results(ARRAY['squares_1', 'squares_2']::text[], 'binary') AS res (x int, x2 int); -ERROR: result "squares_1" does not exist +WARNING: Query could not find the intermediate result file "squares_1", it was mostly likely deleted due to an error in a parallel process within the same distributed transaction +WARNING: Query could not find the intermediate result file "squares_2", it was mostly likely deleted due to an error in a parallel process within the same distributed transaction + x | x2 +--------------------------------------------------------------------- +(0 rows) + ROLLBACK TO SAVEPOINT s1; -- fetch from worker 1 should succeed SELECT * FROM fetch_intermediate_results(ARRAY['squares_1', 'squares_2']::text[], 'localhost', :worker_1_port); @@ -538,11 +564,15 @@ ERROR: worker array object cannot contain null values END; -- results should have been deleted after transaction commit SELECT * FROM read_intermediate_results(ARRAY['squares_1', 'squares_2']::text[], 'binary') AS res (x int, x2 int); -ERROR: result "squares_1" does not exist +WARNING: Query could not find the intermediate result file "squares_1", it was mostly likely deleted due to an error in a parallel process within the same distributed transaction +WARNING: Query could not find the intermediate result file "squares_2", it was mostly likely deleted due to an error in a parallel process within the same distributed transaction + x | x2 +--------------------------------------------------------------------- +(0 rows) + DROP SCHEMA intermediate_results CASCADE; -NOTICE: drop cascades to 5 other objects +NOTICE: drop cascades to 4 other objects DETAIL: drop cascades to table interesting_squares -drop cascades to function raise_failed_execution_int_result(text) drop cascades to type square_type drop cascades to table stored_squares drop cascades to table squares diff --git a/src/test/regress/sql/intermediate_results.sql b/src/test/regress/sql/intermediate_results.sql index c421c3da9..156cd09ac 100644 --- a/src/test/regress/sql/intermediate_results.sql +++ b/src/test/regress/sql/intermediate_results.sql @@ -44,34 +44,14 @@ JOIN (SELECT * FROM read_intermediate_result('squares', 'binary') AS res (x int, ORDER BY x; END; - -CREATE FUNCTION raise_failed_execution_int_result(query text) RETURNS void AS $$ -BEGIN - EXECUTE query; - EXCEPTION WHEN OTHERS THEN - IF SQLERRM LIKE '%does not exist%' THEN - RAISE 'Task failed to execute'; - ELSIF SQLERRM LIKE '%could not receive query results%' THEN - RAISE 'Task failed to execute'; - END IF; -END; -$$LANGUAGE plpgsql; - --- don't print the worker port -\set VERBOSITY terse -SET client_min_messages TO ERROR; - -- files should now be cleaned up -SELECT raise_failed_execution_int_result($$ - SELECT x, x2 - FROM interesting_squares JOIN (SELECT * FROM read_intermediate_result('squares', 'binary') AS res (x text, x2 int)) squares ON (x = interested_in) - WHERE user_id = 'jon' - ORDER BY x; -$$); - -\set VERBOSITY DEFAULT -SET client_min_messages TO DEFAULT; +SET client_min_messages TO DEBUG; +SELECT x, x2 +FROM interesting_squares JOIN (SELECT * FROM read_intermediate_result('squares', 'binary') AS res (x text, x2 int)) squares ON (x = interested_in) +WHERE user_id = 'jon' OR true +ORDER BY x; +RESET client_min_messages; -- try to read the file as text, will fail because of binary encoding BEGIN; SELECT create_intermediate_result('squares', 'SELECT s, s*s FROM generate_series(1,5) s');