diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index 22963c458..d88cdbea8 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -805,6 +805,8 @@ AdaptiveExecutor(CitusScanState *scanState) TupleDestination *defaultTupleDest = CreateTupleStoreTupleDest(scanState->tuplestorestate, tupleDescriptor); + bool localExecutionSupported = true; + if (RequestedForExplainAnalyze(scanState)) { /* @@ -814,6 +816,12 @@ AdaptiveExecutor(CitusScanState *scanState) UseCoordinatedTransaction(); taskList = ExplainAnalyzeTaskList(taskList, defaultTupleDest, tupleDescriptor, paramListInfo); + + /* + * Multiple queries per task is not supported with local execution. See the Assert in + * TupleDestDestReceiverReceive. + */ + localExecutionSupported = false; } bool hasDependentJobs = job->dependentJobList != NIL; @@ -836,8 +844,6 @@ AdaptiveExecutor(CitusScanState *scanState) TransactionProperties xactProperties = DecideTransactionPropertiesForTaskList( distributedPlan->modLevel, taskList, excludeFromXact); - bool localExecutionSupported = true; - /* * In some rare cases, we have prepared statements that pass a parameter * and never used in the query, mark such parameters' type as Invalid(0), diff --git a/src/test/regress/citus_tests/run_test.py b/src/test/regress/citus_tests/run_test.py index 3249c98a7..ddd0d454e 100755 --- a/src/test/regress/citus_tests/run_test.py +++ b/src/test/regress/citus_tests/run_test.py @@ -103,6 +103,9 @@ DEPS = { "single_node_enterprise": TestDeps(None), "single_node": TestDeps(None), "single_node_truncate": TestDeps(None), + "multi_explain": TestDeps( + "base_schedule", ["multi_insert_select_non_pushable_queries"] + ), "multi_extension": TestDeps(None, repeatable=False), "multi_test_helpers": TestDeps(None), "multi_insert_select": TestDeps("base_schedule"), diff --git a/src/test/regress/expected/multi_explain.out b/src/test/regress/expected/multi_explain.out index eee7a6236..7b533a642 100644 --- a/src/test/regress/expected/multi_explain.out +++ b/src/test/regress/expected/multi_explain.out @@ -8,7 +8,7 @@ SET citus.enable_repartition_joins to ON; -- Ensure tuple data in explain analyze output is the same on all PG versions SET citus.enable_binary_protocol = TRUE; -- Function that parses explain output as JSON -CREATE FUNCTION explain_json(query text) +CREATE OR REPLACE FUNCTION explain_json(query text) RETURNS jsonb AS $BODY$ DECLARE @@ -18,7 +18,7 @@ BEGIN RETURN result; END; $BODY$ LANGUAGE plpgsql; -CREATE FUNCTION explain_analyze_json(query text) +CREATE OR REPLACE FUNCTION explain_analyze_json(query text) RETURNS jsonb AS $BODY$ DECLARE @@ -29,7 +29,7 @@ BEGIN END; $BODY$ LANGUAGE plpgsql; -- Function that parses explain output as XML -CREATE FUNCTION explain_xml(query text) +CREATE OR REPLACE FUNCTION explain_xml(query text) RETURNS xml AS $BODY$ DECLARE @@ -40,7 +40,7 @@ BEGIN END; $BODY$ LANGUAGE plpgsql; -- Function that parses explain output as XML -CREATE FUNCTION explain_analyze_xml(query text) +CREATE OR REPLACE FUNCTION explain_analyze_xml(query text) RETURNS xml AS $BODY$ DECLARE @@ -1276,6 +1276,7 @@ CREATE TABLE lineitem_clone (LIKE lineitem); EXPLAIN (COSTS FALSE) SELECT avg(l_linenumber) FROM lineitem_clone; Aggregate -> Seq Scan on lineitem_clone +DROP TABLE lineitem_clone; -- ensure distributed plans don't break EXPLAIN (COSTS FALSE) SELECT avg(l_linenumber) FROM lineitem; Aggregate @@ -2884,6 +2885,7 @@ Custom Scan (Citus Adaptive) (actual rows=0 loops=1) Filter: ((id IS NOT NULL) AND (name = ANY (ARRAY[$1, $2]))) Rows Removed by Filter: 1 deallocate distributed_insert_select; +DROP TABLE simple; -- prepared cte BEGIN; PREPARE cte_query AS @@ -2976,6 +2978,7 @@ Custom Scan (Citus Adaptive) (actual rows=0 loops=1) Node: host=localhost port=xxxxx dbname=regression -> Seq Scan on users_table_2_570028 users_table_2 (actual rows=0 loops=1) Filter: ((value_1)::texttext) +DROP TABLE users_table_2; -- sorted explain analyze output CREATE TABLE explain_analyze_execution_time (a int); INSERT INTO explain_analyze_execution_time VALUES (2); @@ -3140,5 +3143,23 @@ Custom Scan (Citus Adaptive) (actual rows=0 loops=1) -> Update on tbl_570036 tbl (actual rows=0 loops=1) -> Seq Scan on tbl_570036 tbl (actual rows=0 loops=1) Filter: (a = 1) +-- check when auto explain + analyze is enabled, we do not allow local execution. +CREATE SCHEMA test_auto_explain; +SET search_path TO 'test_auto_explain'; +SELECT citus_set_coordinator_host('localhost'); + +CREATE TABLE test_ref_table (key int PRIMARY KEY); +SELECT create_reference_table('test_ref_table'); + +LOAD 'auto_explain'; +SET auto_explain.log_min_duration = 0; +set auto_explain.log_analyze to true; +-- the following should not be locally executed since explain analyze is on +select * from test_ref_table; +DROP SCHEMA test_auto_explain CASCADE; +select master_remove_node('localhost', :master_port); + +SELECT public.wait_until_metadata_sync(30000); + SET client_min_messages TO ERROR; DROP SCHEMA multi_explain CASCADE; diff --git a/src/test/regress/multi_schedule b/src/test/regress/multi_schedule index 6dcf41266..67fb48fa2 100644 --- a/src/test/regress/multi_schedule +++ b/src/test/regress/multi_schedule @@ -72,7 +72,8 @@ test: tableam # Miscellaneous tests to check our query planning behavior # ---------- test: multi_deparse_shard_query multi_distributed_transaction_id intermediate_results limit_intermediate_size -test: multi_explain hyperscale_tutorial partitioned_intermediate_results distributed_intermediate_results multi_real_time_transaction +test: multi_explain +test: hyperscale_tutorial partitioned_intermediate_results distributed_intermediate_results multi_real_time_transaction test: multi_basic_queries cross_join multi_complex_expressions multi_subquery multi_subquery_complex_queries multi_subquery_behavioral_analytics test: multi_subquery_complex_reference_clause multi_subquery_window_functions multi_view multi_sql_function multi_prepare_sql test: sql_procedure multi_function_in_join row_types materialized_view diff --git a/src/test/regress/sql/multi_explain.sql b/src/test/regress/sql/multi_explain.sql index 4429e46e7..931fb02bc 100644 --- a/src/test/regress/sql/multi_explain.sql +++ b/src/test/regress/sql/multi_explain.sql @@ -13,7 +13,7 @@ SET citus.enable_repartition_joins to ON; SET citus.enable_binary_protocol = TRUE; -- Function that parses explain output as JSON -CREATE FUNCTION explain_json(query text) +CREATE OR REPLACE FUNCTION explain_json(query text) RETURNS jsonb AS $BODY$ DECLARE @@ -24,7 +24,7 @@ BEGIN END; $BODY$ LANGUAGE plpgsql; -CREATE FUNCTION explain_analyze_json(query text) +CREATE OR REPLACE FUNCTION explain_analyze_json(query text) RETURNS jsonb AS $BODY$ DECLARE @@ -36,7 +36,7 @@ END; $BODY$ LANGUAGE plpgsql; -- Function that parses explain output as XML -CREATE FUNCTION explain_xml(query text) +CREATE OR REPLACE FUNCTION explain_xml(query text) RETURNS xml AS $BODY$ DECLARE @@ -48,7 +48,7 @@ END; $BODY$ LANGUAGE plpgsql; -- Function that parses explain output as XML -CREATE FUNCTION explain_analyze_xml(query text) +CREATE OR REPLACE FUNCTION explain_analyze_xml(query text) RETURNS xml AS $BODY$ DECLARE @@ -559,6 +559,7 @@ EXPLAIN (COSTS FALSE, FORMAT YAML) -- ensure local plans display correctly CREATE TABLE lineitem_clone (LIKE lineitem); EXPLAIN (COSTS FALSE) SELECT avg(l_linenumber) FROM lineitem_clone; +DROP TABLE lineitem_clone; -- ensure distributed plans don't break EXPLAIN (COSTS FALSE) SELECT avg(l_linenumber) FROM lineitem; @@ -1048,6 +1049,8 @@ EXPLAIN :default_explain_flags EXECUTE distributed_insert_select('x', 'y'); EXPLAIN :default_analyze_flags EXECUTE distributed_insert_select('x', 'y'); deallocate distributed_insert_select; +DROP TABLE simple; + -- prepared cte BEGIN; PREPARE cte_query AS @@ -1079,6 +1082,8 @@ EXPLAIN :default_analyze_flags execute p4(20,20); -- simple test to confirm we can fetch long (>4KB) plans EXPLAIN (ANALYZE, COSTS OFF, TIMING OFF, SUMMARY OFF) SELECT * FROM users_table_2 WHERE value_1::textusers_table_2; + -- sorted explain analyze output CREATE TABLE explain_analyze_execution_time (a int); INSERT INTO explain_analyze_execution_time VALUES (2); @@ -1142,5 +1147,25 @@ PREPARE q2(int_wrapper_type) AS WITH a AS (UPDATE tbl SET b = $1 WHERE a = 1 RET EXPLAIN (COSTS false) EXECUTE q2('(1)'); EXPLAIN :default_analyze_flags EXECUTE q2('(1)'); +-- check when auto explain + analyze is enabled, we do not allow local execution. +CREATE SCHEMA test_auto_explain; +SET search_path TO 'test_auto_explain'; + +SELECT citus_set_coordinator_host('localhost'); + +CREATE TABLE test_ref_table (key int PRIMARY KEY); +SELECT create_reference_table('test_ref_table'); + +LOAD 'auto_explain'; +SET auto_explain.log_min_duration = 0; +set auto_explain.log_analyze to true; + +-- the following should not be locally executed since explain analyze is on +select * from test_ref_table; + +DROP SCHEMA test_auto_explain CASCADE; +select master_remove_node('localhost', :master_port); +SELECT public.wait_until_metadata_sync(30000); + SET client_min_messages TO ERROR; DROP SCHEMA multi_explain CASCADE;