diff --git a/src/backend/distributed/connection/connection_management.c b/src/backend/distributed/connection/connection_management.c index 7f4cfcff3..9c5bd17a5 100644 --- a/src/backend/distributed/connection/connection_management.c +++ b/src/backend/distributed/connection/connection_management.c @@ -54,7 +54,6 @@ static bool ShouldShutdownConnection(MultiConnection *connection, const int static void ResetConnection(MultiConnection *connection); static void DefaultCitusNoticeProcessor(void *arg, const char *message); static MultiConnection * FindAvailableConnection(dlist_head *connections, uint32 flags); -static void GivePurposeToConnection(MultiConnection *connection, int flags); static bool RemoteTransactionIdle(MultiConnection *connection); static int EventSetSizeForConnectionList(List *connections); @@ -314,8 +313,6 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port, connection = FindAvailableConnection(entry->connections, flags); if (connection) { - GivePurposeToConnection(connection, flags); - return connection; } } @@ -330,7 +327,6 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port, dlist_push_tail(entry->connections, &connection->connectionNode); ResetShardPlacementAssociation(connection); - GivePurposeToConnection(connection, flags); return connection; } @@ -338,10 +334,7 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port, /* * FindAvailableConnection searches the given list of connections for one that - * is not claimed exclusively or marked as a side channel. If the caller passed - * the REQUIRE_SIDECHANNEL flag, it will only return a connection that has not - * been used to access shard placements and that connectoin will only be returned - * in subsequent calls if the REQUIRE_SIDECHANNEL flag is passed. + * is not claimed exclusively. * * If no connection is available, FindAvailableConnection returns NULL. */ @@ -382,56 +375,13 @@ FindAvailableConnection(dlist_head *connections, uint32 flags) continue; } - if ((flags & REQUIRE_SIDECHANNEL) != 0) - { - if (connection->purpose == CONNECTION_PURPOSE_SIDECHANNEL || - connection->purpose == CONNECTION_PURPOSE_ANY) - { - /* side channel must not have been used to access data */ - Assert(!ConnectionUsedForAnyPlacements(connection)); - - return connection; - } - } - else if (connection->purpose == CONNECTION_PURPOSE_DATA_ACCESS || - connection->purpose == CONNECTION_PURPOSE_ANY) - { - /* can use this connection to access data */ - return connection; - } + return connection; } return NULL; } -/* - * GivePurposeToConnection gives purpose to a connection if it does not already - * have a purpose. More specifically, it marks the connection as a sidechannel - * if the REQUIRE_SIDECHANNEL flag is set. - */ -static void -GivePurposeToConnection(MultiConnection *connection, int flags) -{ - if (connection->purpose != CONNECTION_PURPOSE_ANY) - { - /* connection already has a purpose */ - return; - } - - if ((flags & REQUIRE_SIDECHANNEL) != 0) - { - /* connection should not be used for data access */ - connection->purpose = CONNECTION_PURPOSE_SIDECHANNEL; - } - else - { - /* connection should be used for data access */ - connection->purpose = CONNECTION_PURPOSE_DATA_ACCESS; - } -} - - /* * CloseAllConnectionsAfterTransaction sets the forceClose flag of all the * connections. This is mainly done when citus.node_conninfo changes. @@ -1071,7 +1021,6 @@ StartConnectionEstablishment(ConnectionHashKey *key) false); connection->connectionStart = GetCurrentTimestamp(); connection->connectionId = connectionId++; - connection->purpose = CONNECTION_PURPOSE_ANY; /* * To avoid issues with interrupts not getting caught all our connections @@ -1228,7 +1177,6 @@ ResetConnection(MultiConnection *connection) /* reset copy state */ connection->copyBytesWrittenSinceLastFlush = 0; - connection->purpose = CONNECTION_PURPOSE_ANY; UnclaimConnection(connection); } diff --git a/src/backend/distributed/executor/intermediate_results.c b/src/backend/distributed/executor/intermediate_results.c index 2cc875d5d..d5c39e4ff 100644 --- a/src/backend/distributed/executor/intermediate_results.c +++ b/src/backend/distributed/executor/intermediate_results.c @@ -82,6 +82,7 @@ typedef struct RemoteFileDestReceiver static void RemoteFileDestReceiverStartup(DestReceiver *dest, int operation, TupleDesc inputTupleDescriptor); +static void PrepareIntermediateResultBroadcast(RemoteFileDestReceiver *resultDest); static StringInfo ConstructCopyResultStatement(const char *resultId); static void WriteToLocalFile(StringInfo copyData, FileCompat *fileCompat); static bool RemoteFileDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest); @@ -233,14 +234,9 @@ RemoteFileDestReceiverStartup(DestReceiver *dest, int operation, { RemoteFileDestReceiver *resultDest = (RemoteFileDestReceiver *) dest; - const char *resultId = resultDest->resultId; - const char *delimiterCharacter = "\t"; const char *nullPrintCharacter = "\\N"; - List *initialNodeList = resultDest->initialNodeList; - List *connectionList = NIL; - resultDest->tupleDescriptor = inputTupleDescriptor; /* define how tuples will be serialised */ @@ -255,6 +251,21 @@ RemoteFileDestReceiverStartup(DestReceiver *dest, int operation, resultDest->columnOutputFunctions = ColumnOutputFunctions(inputTupleDescriptor, copyOutState->binary); +} + + +/* + * PrepareIntermediateResultBroadcast gets a RemoteFileDestReceiver and does + * the necessary initilizations including initiating the remote connnections + * and creating the local file, which is necessary (it might be both). + */ +static void +PrepareIntermediateResultBroadcast(RemoteFileDestReceiver *resultDest) +{ + List *initialNodeList = resultDest->initialNodeList; + const char *resultId = resultDest->resultId; + List *connectionList = NIL; + CopyOutState copyOutState = resultDest->copyOutState; if (resultDest->writeLocalFile) { @@ -274,17 +285,11 @@ RemoteFileDestReceiverStartup(DestReceiver *dest, int operation, WorkerNode *workerNode = NULL; foreach_ptr(workerNode, initialNodeList) { + int flags = 0; + const char *nodeName = workerNode->workerName; int nodePort = workerNode->workerPort; - /* - * We prefer to use a connection that is not associcated with - * any placements. The reason is that we claim this connection - * exclusively and that would prevent the consecutive DML/DDL - * use the same connection. - */ - int flags = REQUIRE_SIDECHANNEL; - MultiConnection *connection = StartNodeConnection(flags, nodeName, nodePort); ClaimConnectionExclusively(connection); MarkRemoteTransactionCritical(connection); @@ -365,6 +370,15 @@ RemoteFileDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest) { RemoteFileDestReceiver *resultDest = (RemoteFileDestReceiver *) dest; + if (resultDest->tuplesSent == 0) + { + /* + * We get the first tuple, lets initialize the remote connections + * and/or the local file. + */ + PrepareIntermediateResultBroadcast(resultDest); + } + TupleDesc tupleDescriptor = resultDest->tupleDescriptor; List *connectionList = resultDest->connectionList; @@ -434,6 +448,17 @@ RemoteFileDestReceiverShutdown(DestReceiver *destReceiver) { RemoteFileDestReceiver *resultDest = (RemoteFileDestReceiver *) destReceiver; + if (resultDest->tuplesSent == 0) + { + /* + * We have not received any tuples (when the intermediate result + * returns zero rows). Still, we want to create the necessary + * intermediate result files even if they are empty, as the query + * execution requires the files to be present. + */ + PrepareIntermediateResultBroadcast(resultDest); + } + List *connectionList = resultDest->connectionList; CopyOutState copyOutState = resultDest->copyOutState; @@ -770,11 +795,32 @@ ReadIntermediateResultsIntoFuncOutput(FunctionCallInfo fcinfo, char *copyFormat, int statOK = stat(resultFileName, &fileStat); if (statOK != 0) { - ereport(ERROR, (errcode_for_file_access(), - errmsg("result \"%s\" does not exist", resultId))); + /* + * When the file does not exist, it could mean two different things. + * First -- and a lot more common -- case is that a failure happened + * in a concurrent backend on the same distributed transaction. And, + * one of the backends in that transaction has already been roll + * backed, which has already removed the file. If we throw an error + * here, the user might see this error instead of the actual error + * message. Instead, we prefer to WARN the user and pretend that the + * file has no data in it. In the end, the user would see the actual + * error message for the failure. + * + * Second, in case of any bugs in intermediate result broadcasts, + * we could try to read a non-existing file. That is most likely + * to happen during development. + */ + ereport(WARNING, (errcode_for_file_access(), + errmsg("Query could not find the intermediate result file " + "\"%s\", it was mostly likely deleted due to an " + "error in a parallel process within the same " + "distributed transaction", resultId))); + } + else + { + ReadFileIntoTupleStore(resultFileName, copyFormat, tupleDescriptor, + tupleStore); } - - ReadFileIntoTupleStore(resultFileName, copyFormat, tupleDescriptor, tupleStore); } tuplestore_donestoring(tupleStore); diff --git a/src/include/distributed/connection_management.h b/src/include/distributed/connection_management.h index c0c8a8f28..015546158 100644 --- a/src/include/distributed/connection_management.h +++ b/src/include/distributed/connection_management.h @@ -52,28 +52,9 @@ enum MultiConnectionMode /* open a connection per (co-located set of) placement(s) */ CONNECTION_PER_PLACEMENT = 1 << 3, - OUTSIDE_TRANSACTION = 1 << 4, - - /* connection has not been used to access data */ - REQUIRE_SIDECHANNEL = 1 << 5 + OUTSIDE_TRANSACTION = 1 << 4 }; -/* - * ConnectionPurpose defines what a connection is used for during the - * current transaction. This is primarily to not allocate connections - * that are needed for data access to other purposes. - */ -typedef enum ConnectionPurpose -{ - /* connection can be used for any purpose */ - CONNECTION_PURPOSE_ANY, - - /* connection can be used to access placements */ - CONNECTION_PURPOSE_DATA_ACCESS, - - /* connection can be used for auxiliary functions, but not data access */ - CONNECTION_PURPOSE_SIDECHANNEL -} ConnectionPurpose; typedef enum MultiConnectionState { @@ -116,9 +97,6 @@ typedef struct MultiConnection /* is the connection currently in use, and shouldn't be used by anything else */ bool claimedExclusively; - /* defines the purpose of the connection */ - ConnectionPurpose purpose; - /* time connection establishment was started, for timeout */ TimestampTz connectionStart; diff --git a/src/test/regress/expected/failure_multi_shard_update_delete.out b/src/test/regress/expected/failure_multi_shard_update_delete.out index cebd7f8c6..ffc08aef7 100644 --- a/src/test/regress/expected/failure_multi_shard_update_delete.out +++ b/src/test/regress/expected/failure_multi_shard_update_delete.out @@ -6,6 +6,7 @@ SET SEARCH_PATH = multi_shard; SET citus.shard_count TO 4; SET citus.next_shard_id TO 201000; SET citus.shard_replication_factor TO 1; +SET citus.max_adaptive_executor_pool_size TO 1; -- do not cache any connections SET citus.max_cached_conns_per_worker TO 0; SELECT citus.mitmproxy('conn.allow()'); diff --git a/src/test/regress/expected/intermediate_results.out b/src/test/regress/expected/intermediate_results.out index cfdf3382a..fc46ab14a 100644 --- a/src/test/regress/expected/intermediate_results.out +++ b/src/test/regress/expected/intermediate_results.out @@ -33,7 +33,11 @@ SELECT create_intermediate_result('squares', 'SELECT s, s*s FROM generate_series (1 row) SELECT * FROM read_intermediate_result('squares', 'binary') AS res (x int, x2 int); -ERROR: result "squares" does not exist +WARNING: Query could not find the intermediate result file "squares", it was mostly likely deleted due to an error in a parallel process within the same distributed transaction + x | x2 +--------------------------------------------------------------------- +(0 rows) + BEGIN; CREATE TABLE interesting_squares (user_id text, interested_in text); SELECT create_distributed_table('interesting_squares', 'user_id'); @@ -83,30 +87,20 @@ ORDER BY x; (3 rows) END; -CREATE FUNCTION raise_failed_execution_int_result(query text) RETURNS void AS $$ -BEGIN - EXECUTE query; - EXCEPTION WHEN OTHERS THEN - IF SQLERRM LIKE '%does not exist%' THEN - RAISE 'Task failed to execute'; - ELSIF SQLERRM LIKE '%could not receive query results%' THEN - RAISE 'Task failed to execute'; - END IF; -END; -$$LANGUAGE plpgsql; --- don't print the worker port -\set VERBOSITY terse -SET client_min_messages TO ERROR; -- files should now be cleaned up -SELECT raise_failed_execution_int_result($$ - SELECT x, x2 - FROM interesting_squares JOIN (SELECT * FROM read_intermediate_result('squares', 'binary') AS res (x text, x2 int)) squares ON (x = interested_in) - WHERE user_id = 'jon' - ORDER BY x; -$$); -ERROR: Task failed to execute -\set VERBOSITY DEFAULT -SET client_min_messages TO DEFAULT; +SET client_min_messages TO DEBUG; +SELECT x, x2 +FROM interesting_squares JOIN (SELECT * FROM read_intermediate_result('squares', 'binary') AS res (x text, x2 int)) squares ON (x = interested_in) +WHERE user_id = 'jon' OR true +ORDER BY x; +DEBUG: Router planner cannot handle multi-shard select queries +DEBUG: Query could not find the intermediate result file "squares", it was mostly likely deleted due to an error in a parallel process within the same distributed transaction +DETAIL: WARNING from localhost:xxxxx + x | x2 +--------------------------------------------------------------------- +(0 rows) + +RESET client_min_messages; -- try to read the file as text, will fail because of binary encoding BEGIN; SELECT create_intermediate_result('squares', 'SELECT s, s*s FROM generate_series(1,5) s'); @@ -314,7 +308,11 @@ SELECT create_intermediate_result('squares_1', 'SELECT s, s*s FROM generate_seri (1 row) SELECT * FROM read_intermediate_results(ARRAY['squares_1']::text[], 'binary') AS res (x int, x2 int); -ERROR: result "squares_1" does not exist +WARNING: Query could not find the intermediate result file "squares_1", it was mostly likely deleted due to an error in a parallel process within the same distributed transaction + x | x2 +--------------------------------------------------------------------- +(0 rows) + -- error behaviour, and also check that results are deleted on rollback BEGIN; SELECT create_intermediate_result('squares_1', 'SELECT s, s*s FROM generate_series(1,3) s'); @@ -325,10 +323,24 @@ SELECT create_intermediate_result('squares_1', 'SELECT s, s*s FROM generate_seri SAVEPOINT s1; SELECT * FROM read_intermediate_results(ARRAY['notexistingfile', 'squares_1'], 'binary') AS res (x int, x2 int); -ERROR: result "notexistingfile" does not exist +WARNING: Query could not find the intermediate result file "notexistingfile", it was mostly likely deleted due to an error in a parallel process within the same distributed transaction + x | x2 +--------------------------------------------------------------------- + 1 | 1 + 2 | 4 + 3 | 9 +(3 rows) + ROLLBACK TO SAVEPOINT s1; SELECT * FROM read_intermediate_results(ARRAY['squares_1', 'notexistingfile'], 'binary') AS res (x int, x2 int); -ERROR: result "notexistingfile" does not exist +WARNING: Query could not find the intermediate result file "notexistingfile", it was mostly likely deleted due to an error in a parallel process within the same distributed transaction + x | x2 +--------------------------------------------------------------------- + 1 | 1 + 2 | 4 + 3 | 9 +(3 rows) + ROLLBACK TO SAVEPOINT s1; SELECT * FROM read_intermediate_results(ARRAY['squares_1', NULL], 'binary') AS res (x int, x2 int); ERROR: null array element not allowed in this context @@ -348,7 +360,11 @@ SELECT count(*) FROM read_intermediate_results(ARRAY[]::text[], 'binary') AS res END; SELECT * FROM read_intermediate_results(ARRAY['squares_1']::text[], 'binary') AS res (x int, x2 int); -ERROR: result "squares_1" does not exist +WARNING: Query could not find the intermediate result file "squares_1", it was mostly likely deleted due to an error in a parallel process within the same distributed transaction + x | x2 +--------------------------------------------------------------------- +(0 rows) + -- Test non-binary format: read_intermediate_results(..., 'text') BEGIN; -- ROW(...) types switch the output format to text @@ -481,7 +497,12 @@ SELECT store_intermediate_result_on_node('localhost', :worker_1_port, SAVEPOINT s1; -- results aren't available on coordinator yet SELECT * FROM read_intermediate_results(ARRAY['squares_1', 'squares_2']::text[], 'binary') AS res (x int, x2 int); -ERROR: result "squares_1" does not exist +WARNING: Query could not find the intermediate result file "squares_1", it was mostly likely deleted due to an error in a parallel process within the same distributed transaction +WARNING: Query could not find the intermediate result file "squares_2", it was mostly likely deleted due to an error in a parallel process within the same distributed transaction + x | x2 +--------------------------------------------------------------------- +(0 rows) + ROLLBACK TO SAVEPOINT s1; -- fetch from worker 2 should fail SELECT * FROM fetch_intermediate_results(ARRAY['squares_1', 'squares_2']::text[], 'localhost', :worker_2_port); @@ -490,7 +511,12 @@ CONTEXT: while executing command on localhost:xxxxx ROLLBACK TO SAVEPOINT s1; -- still, results aren't available on coordinator yet SELECT * FROM read_intermediate_results(ARRAY['squares_1', 'squares_2']::text[], 'binary') AS res (x int, x2 int); -ERROR: result "squares_1" does not exist +WARNING: Query could not find the intermediate result file "squares_1", it was mostly likely deleted due to an error in a parallel process within the same distributed transaction +WARNING: Query could not find the intermediate result file "squares_2", it was mostly likely deleted due to an error in a parallel process within the same distributed transaction + x | x2 +--------------------------------------------------------------------- +(0 rows) + ROLLBACK TO SAVEPOINT s1; -- fetch from worker 1 should succeed SELECT * FROM fetch_intermediate_results(ARRAY['squares_1', 'squares_2']::text[], 'localhost', :worker_1_port); @@ -538,11 +564,15 @@ ERROR: worker array object cannot contain null values END; -- results should have been deleted after transaction commit SELECT * FROM read_intermediate_results(ARRAY['squares_1', 'squares_2']::text[], 'binary') AS res (x int, x2 int); -ERROR: result "squares_1" does not exist +WARNING: Query could not find the intermediate result file "squares_1", it was mostly likely deleted due to an error in a parallel process within the same distributed transaction +WARNING: Query could not find the intermediate result file "squares_2", it was mostly likely deleted due to an error in a parallel process within the same distributed transaction + x | x2 +--------------------------------------------------------------------- +(0 rows) + DROP SCHEMA intermediate_results CASCADE; -NOTICE: drop cascades to 5 other objects +NOTICE: drop cascades to 4 other objects DETAIL: drop cascades to table interesting_squares -drop cascades to function raise_failed_execution_int_result(text) drop cascades to type square_type drop cascades to table stored_squares drop cascades to table squares diff --git a/src/test/regress/expected/with_basics.out b/src/test/regress/expected/with_basics.out index 4cd27d624..ce66ec103 100644 --- a/src/test/regress/expected/with_basics.out +++ b/src/test/regress/expected/with_basics.out @@ -1011,6 +1011,11 @@ LEFT JOIN WHERE d1.user_id = d2.user_id )) AS bar USING (user_id); ERROR: cannot pushdown the subquery DETAIL: Complex subqueries and CTEs cannot be in the outer part of the outer join +-- some test with failures +WITH a AS (SELECT * FROM users_table LIMIT 10) + SELECT user_id/0 FROM users_table JOIN a USING (user_id); +ERROR: division by zero +CONTEXT: while executing command on localhost:xxxxx RESET citus.enable_cte_inlining; DROP VIEW basic_view; DROP VIEW cte_view; diff --git a/src/test/regress/sql/failure_multi_shard_update_delete.sql b/src/test/regress/sql/failure_multi_shard_update_delete.sql index 60a4078e2..d9d0c2250 100644 --- a/src/test/regress/sql/failure_multi_shard_update_delete.sql +++ b/src/test/regress/sql/failure_multi_shard_update_delete.sql @@ -7,6 +7,7 @@ SET SEARCH_PATH = multi_shard; SET citus.shard_count TO 4; SET citus.next_shard_id TO 201000; SET citus.shard_replication_factor TO 1; +SET citus.max_adaptive_executor_pool_size TO 1; -- do not cache any connections SET citus.max_cached_conns_per_worker TO 0; diff --git a/src/test/regress/sql/intermediate_results.sql b/src/test/regress/sql/intermediate_results.sql index c421c3da9..156cd09ac 100644 --- a/src/test/regress/sql/intermediate_results.sql +++ b/src/test/regress/sql/intermediate_results.sql @@ -44,34 +44,14 @@ JOIN (SELECT * FROM read_intermediate_result('squares', 'binary') AS res (x int, ORDER BY x; END; - -CREATE FUNCTION raise_failed_execution_int_result(query text) RETURNS void AS $$ -BEGIN - EXECUTE query; - EXCEPTION WHEN OTHERS THEN - IF SQLERRM LIKE '%does not exist%' THEN - RAISE 'Task failed to execute'; - ELSIF SQLERRM LIKE '%could not receive query results%' THEN - RAISE 'Task failed to execute'; - END IF; -END; -$$LANGUAGE plpgsql; - --- don't print the worker port -\set VERBOSITY terse -SET client_min_messages TO ERROR; - -- files should now be cleaned up -SELECT raise_failed_execution_int_result($$ - SELECT x, x2 - FROM interesting_squares JOIN (SELECT * FROM read_intermediate_result('squares', 'binary') AS res (x text, x2 int)) squares ON (x = interested_in) - WHERE user_id = 'jon' - ORDER BY x; -$$); - -\set VERBOSITY DEFAULT -SET client_min_messages TO DEFAULT; +SET client_min_messages TO DEBUG; +SELECT x, x2 +FROM interesting_squares JOIN (SELECT * FROM read_intermediate_result('squares', 'binary') AS res (x text, x2 int)) squares ON (x = interested_in) +WHERE user_id = 'jon' OR true +ORDER BY x; +RESET client_min_messages; -- try to read the file as text, will fail because of binary encoding BEGIN; SELECT create_intermediate_result('squares', 'SELECT s, s*s FROM generate_series(1,5) s'); diff --git a/src/test/regress/sql/with_basics.sql b/src/test/regress/sql/with_basics.sql index 88e048a02..fb7a857cf 100644 --- a/src/test/regress/sql/with_basics.sql +++ b/src/test/regress/sql/with_basics.sql @@ -695,8 +695,11 @@ LEFT JOIN FROM distinct_undistribured d2 WHERE d1.user_id = d2.user_id )) AS bar USING (user_id); -RESET citus.enable_cte_inlining; +-- some test with failures +WITH a AS (SELECT * FROM users_table LIMIT 10) + SELECT user_id/0 FROM users_table JOIN a USING (user_id); +RESET citus.enable_cte_inlining; DROP VIEW basic_view; DROP VIEW cte_view;