Disable local execution when Explain Analyze is requested for a query. (#6892)

DESCRIPTION: Fixes a crash when explain analyze is requested for a query
that is normally locally executed.

When explain analyze is requested for a query, a task with two queries
is created. Those two queries are
    
1. Wrapped Query --> `SELECT ... FROM
worker_save_query_explain_analyze(<query>, <explain analyze options>)`
2. Fetch Query -->` SELECT explain_analyze_output, execution_duration
FROM worker_last_saved_explain_analyze();`

When the query is locally executed a task with multiple queries causes a
crash in production. See the Assert at
57455dc64d/src/backend/distributed/executor/tuple_destination.c#:~:text=Assert(task%2D%3EqueryCount%20%3D%3D%201)%3B

This becomes a critical issue when auto_explain extension is used. When
auto_explain extension is enabled, explain analyze is automatically
requested for every query.

One possible solution could be not to create two queries for a locally
executed query. The fetch part may not have to be a query since the
values are available in local variables.

Until we enable local execution for explain analyze, it is best to
disable local execution.

Fixes #6777.
pull/6832/head^2
Emel Şimşek 2023-05-23 14:33:22 +03:00 committed by GitHub
parent f9a5be59b9
commit 02f815ce1f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 67 additions and 11 deletions

View File

@ -805,6 +805,8 @@ AdaptiveExecutor(CitusScanState *scanState)
TupleDestination *defaultTupleDest = TupleDestination *defaultTupleDest =
CreateTupleStoreTupleDest(scanState->tuplestorestate, tupleDescriptor); CreateTupleStoreTupleDest(scanState->tuplestorestate, tupleDescriptor);
bool localExecutionSupported = true;
if (RequestedForExplainAnalyze(scanState)) if (RequestedForExplainAnalyze(scanState))
{ {
/* /*
@ -814,6 +816,12 @@ AdaptiveExecutor(CitusScanState *scanState)
UseCoordinatedTransaction(); UseCoordinatedTransaction();
taskList = ExplainAnalyzeTaskList(taskList, defaultTupleDest, tupleDescriptor, taskList = ExplainAnalyzeTaskList(taskList, defaultTupleDest, tupleDescriptor,
paramListInfo); paramListInfo);
/*
* Multiple queries per task is not supported with local execution. See the Assert in
* TupleDestDestReceiverReceive.
*/
localExecutionSupported = false;
} }
bool hasDependentJobs = job->dependentJobList != NIL; bool hasDependentJobs = job->dependentJobList != NIL;
@ -836,8 +844,6 @@ AdaptiveExecutor(CitusScanState *scanState)
TransactionProperties xactProperties = DecideTransactionPropertiesForTaskList( TransactionProperties xactProperties = DecideTransactionPropertiesForTaskList(
distributedPlan->modLevel, taskList, excludeFromXact); distributedPlan->modLevel, taskList, excludeFromXact);
bool localExecutionSupported = true;
/* /*
* In some rare cases, we have prepared statements that pass a parameter * In some rare cases, we have prepared statements that pass a parameter
* and never used in the query, mark such parameters' type as Invalid(0), * and never used in the query, mark such parameters' type as Invalid(0),

View File

@ -103,6 +103,9 @@ DEPS = {
"single_node_enterprise": TestDeps(None), "single_node_enterprise": TestDeps(None),
"single_node": TestDeps(None), "single_node": TestDeps(None),
"single_node_truncate": TestDeps(None), "single_node_truncate": TestDeps(None),
"multi_explain": TestDeps(
"base_schedule", ["multi_insert_select_non_pushable_queries"]
),
"multi_extension": TestDeps(None, repeatable=False), "multi_extension": TestDeps(None, repeatable=False),
"multi_test_helpers": TestDeps(None), "multi_test_helpers": TestDeps(None),
"multi_insert_select": TestDeps("base_schedule"), "multi_insert_select": TestDeps("base_schedule"),

View File

@ -8,7 +8,7 @@ SET citus.enable_repartition_joins to ON;
-- Ensure tuple data in explain analyze output is the same on all PG versions -- Ensure tuple data in explain analyze output is the same on all PG versions
SET citus.enable_binary_protocol = TRUE; SET citus.enable_binary_protocol = TRUE;
-- Function that parses explain output as JSON -- Function that parses explain output as JSON
CREATE FUNCTION explain_json(query text) CREATE OR REPLACE FUNCTION explain_json(query text)
RETURNS jsonb RETURNS jsonb
AS $BODY$ AS $BODY$
DECLARE DECLARE
@ -18,7 +18,7 @@ BEGIN
RETURN result; RETURN result;
END; END;
$BODY$ LANGUAGE plpgsql; $BODY$ LANGUAGE plpgsql;
CREATE FUNCTION explain_analyze_json(query text) CREATE OR REPLACE FUNCTION explain_analyze_json(query text)
RETURNS jsonb RETURNS jsonb
AS $BODY$ AS $BODY$
DECLARE DECLARE
@ -29,7 +29,7 @@ BEGIN
END; END;
$BODY$ LANGUAGE plpgsql; $BODY$ LANGUAGE plpgsql;
-- Function that parses explain output as XML -- Function that parses explain output as XML
CREATE FUNCTION explain_xml(query text) CREATE OR REPLACE FUNCTION explain_xml(query text)
RETURNS xml RETURNS xml
AS $BODY$ AS $BODY$
DECLARE DECLARE
@ -40,7 +40,7 @@ BEGIN
END; END;
$BODY$ LANGUAGE plpgsql; $BODY$ LANGUAGE plpgsql;
-- Function that parses explain output as XML -- Function that parses explain output as XML
CREATE FUNCTION explain_analyze_xml(query text) CREATE OR REPLACE FUNCTION explain_analyze_xml(query text)
RETURNS xml RETURNS xml
AS $BODY$ AS $BODY$
DECLARE DECLARE
@ -1276,6 +1276,7 @@ CREATE TABLE lineitem_clone (LIKE lineitem);
EXPLAIN (COSTS FALSE) SELECT avg(l_linenumber) FROM lineitem_clone; EXPLAIN (COSTS FALSE) SELECT avg(l_linenumber) FROM lineitem_clone;
Aggregate Aggregate
-> Seq Scan on lineitem_clone -> Seq Scan on lineitem_clone
DROP TABLE lineitem_clone;
-- ensure distributed plans don't break -- ensure distributed plans don't break
EXPLAIN (COSTS FALSE) SELECT avg(l_linenumber) FROM lineitem; EXPLAIN (COSTS FALSE) SELECT avg(l_linenumber) FROM lineitem;
Aggregate Aggregate
@ -2884,6 +2885,7 @@ Custom Scan (Citus Adaptive) (actual rows=0 loops=1)
Filter: ((id IS NOT NULL) AND (name = ANY (ARRAY[$1, $2]))) Filter: ((id IS NOT NULL) AND (name = ANY (ARRAY[$1, $2])))
Rows Removed by Filter: 1 Rows Removed by Filter: 1
deallocate distributed_insert_select; deallocate distributed_insert_select;
DROP TABLE simple;
-- prepared cte -- prepared cte
BEGIN; BEGIN;
PREPARE cte_query AS PREPARE cte_query AS
@ -2976,6 +2978,7 @@ Custom Scan (Citus Adaptive) (actual rows=0 loops=1)
Node: host=localhost port=xxxxx dbname=regression Node: host=localhost port=xxxxx dbname=regression
-> Seq Scan on users_table_2_570028 users_table_2 (actual rows=0 loops=1) -> Seq Scan on users_table_2_570028 users_table_2 (actual rows=0 loops=1)
Filter: ((value_1)::texttext) Filter: ((value_1)::texttext)
DROP TABLE users_table_2;
-- sorted explain analyze output -- sorted explain analyze output
CREATE TABLE explain_analyze_execution_time (a int); CREATE TABLE explain_analyze_execution_time (a int);
INSERT INTO explain_analyze_execution_time VALUES (2); INSERT INTO explain_analyze_execution_time VALUES (2);
@ -3140,5 +3143,23 @@ Custom Scan (Citus Adaptive) (actual rows=0 loops=1)
-> Update on tbl_570036 tbl (actual rows=0 loops=1) -> Update on tbl_570036 tbl (actual rows=0 loops=1)
-> Seq Scan on tbl_570036 tbl (actual rows=0 loops=1) -> Seq Scan on tbl_570036 tbl (actual rows=0 loops=1)
Filter: (a = 1) Filter: (a = 1)
-- check when auto explain + analyze is enabled, we do not allow local execution.
CREATE SCHEMA test_auto_explain;
SET search_path TO 'test_auto_explain';
SELECT citus_set_coordinator_host('localhost');
CREATE TABLE test_ref_table (key int PRIMARY KEY);
SELECT create_reference_table('test_ref_table');
LOAD 'auto_explain';
SET auto_explain.log_min_duration = 0;
set auto_explain.log_analyze to true;
-- the following should not be locally executed since explain analyze is on
select * from test_ref_table;
DROP SCHEMA test_auto_explain CASCADE;
select master_remove_node('localhost', :master_port);
SELECT public.wait_until_metadata_sync(30000);
SET client_min_messages TO ERROR; SET client_min_messages TO ERROR;
DROP SCHEMA multi_explain CASCADE; DROP SCHEMA multi_explain CASCADE;

View File

@ -72,7 +72,8 @@ test: tableam
# Miscellaneous tests to check our query planning behavior # Miscellaneous tests to check our query planning behavior
# ---------- # ----------
test: multi_deparse_shard_query multi_distributed_transaction_id intermediate_results limit_intermediate_size test: multi_deparse_shard_query multi_distributed_transaction_id intermediate_results limit_intermediate_size
test: multi_explain hyperscale_tutorial partitioned_intermediate_results distributed_intermediate_results multi_real_time_transaction test: multi_explain
test: hyperscale_tutorial partitioned_intermediate_results distributed_intermediate_results multi_real_time_transaction
test: multi_basic_queries cross_join multi_complex_expressions multi_subquery multi_subquery_complex_queries multi_subquery_behavioral_analytics test: multi_basic_queries cross_join multi_complex_expressions multi_subquery multi_subquery_complex_queries multi_subquery_behavioral_analytics
test: multi_subquery_complex_reference_clause multi_subquery_window_functions multi_view multi_sql_function multi_prepare_sql test: multi_subquery_complex_reference_clause multi_subquery_window_functions multi_view multi_sql_function multi_prepare_sql
test: sql_procedure multi_function_in_join row_types materialized_view test: sql_procedure multi_function_in_join row_types materialized_view

View File

@ -13,7 +13,7 @@ SET citus.enable_repartition_joins to ON;
SET citus.enable_binary_protocol = TRUE; SET citus.enable_binary_protocol = TRUE;
-- Function that parses explain output as JSON -- Function that parses explain output as JSON
CREATE FUNCTION explain_json(query text) CREATE OR REPLACE FUNCTION explain_json(query text)
RETURNS jsonb RETURNS jsonb
AS $BODY$ AS $BODY$
DECLARE DECLARE
@ -24,7 +24,7 @@ BEGIN
END; END;
$BODY$ LANGUAGE plpgsql; $BODY$ LANGUAGE plpgsql;
CREATE FUNCTION explain_analyze_json(query text) CREATE OR REPLACE FUNCTION explain_analyze_json(query text)
RETURNS jsonb RETURNS jsonb
AS $BODY$ AS $BODY$
DECLARE DECLARE
@ -36,7 +36,7 @@ END;
$BODY$ LANGUAGE plpgsql; $BODY$ LANGUAGE plpgsql;
-- Function that parses explain output as XML -- Function that parses explain output as XML
CREATE FUNCTION explain_xml(query text) CREATE OR REPLACE FUNCTION explain_xml(query text)
RETURNS xml RETURNS xml
AS $BODY$ AS $BODY$
DECLARE DECLARE
@ -48,7 +48,7 @@ END;
$BODY$ LANGUAGE plpgsql; $BODY$ LANGUAGE plpgsql;
-- Function that parses explain output as XML -- Function that parses explain output as XML
CREATE FUNCTION explain_analyze_xml(query text) CREATE OR REPLACE FUNCTION explain_analyze_xml(query text)
RETURNS xml RETURNS xml
AS $BODY$ AS $BODY$
DECLARE DECLARE
@ -559,6 +559,7 @@ EXPLAIN (COSTS FALSE, FORMAT YAML)
-- ensure local plans display correctly -- ensure local plans display correctly
CREATE TABLE lineitem_clone (LIKE lineitem); CREATE TABLE lineitem_clone (LIKE lineitem);
EXPLAIN (COSTS FALSE) SELECT avg(l_linenumber) FROM lineitem_clone; EXPLAIN (COSTS FALSE) SELECT avg(l_linenumber) FROM lineitem_clone;
DROP TABLE lineitem_clone;
-- ensure distributed plans don't break -- ensure distributed plans don't break
EXPLAIN (COSTS FALSE) SELECT avg(l_linenumber) FROM lineitem; EXPLAIN (COSTS FALSE) SELECT avg(l_linenumber) FROM lineitem;
@ -1048,6 +1049,8 @@ EXPLAIN :default_explain_flags EXECUTE distributed_insert_select('x', 'y');
EXPLAIN :default_analyze_flags EXECUTE distributed_insert_select('x', 'y'); EXPLAIN :default_analyze_flags EXECUTE distributed_insert_select('x', 'y');
deallocate distributed_insert_select; deallocate distributed_insert_select;
DROP TABLE simple;
-- prepared cte -- prepared cte
BEGIN; BEGIN;
PREPARE cte_query AS PREPARE cte_query AS
@ -1079,6 +1082,8 @@ EXPLAIN :default_analyze_flags execute p4(20,20);
-- simple test to confirm we can fetch long (>4KB) plans -- simple test to confirm we can fetch long (>4KB) plans
EXPLAIN (ANALYZE, COSTS OFF, TIMING OFF, SUMMARY OFF) SELECT * FROM users_table_2 WHERE value_1::textusers_table_2 WHERE value_1::text
DROP TABLE users_table_2;
-- sorted explain analyze output -- sorted explain analyze output
CREATE TABLE explain_analyze_execution_time (a int); CREATE TABLE explain_analyze_execution_time (a int);
INSERT INTO explain_analyze_execution_time VALUES (2); INSERT INTO explain_analyze_execution_time VALUES (2);
@ -1142,5 +1147,25 @@ PREPARE q2(int_wrapper_type) AS WITH a AS (UPDATE tbl SET b = $1 WHERE a = 1 RET
EXPLAIN (COSTS false) EXECUTE q2('(1)'); EXPLAIN (COSTS false) EXECUTE q2('(1)');
EXPLAIN :default_analyze_flags EXECUTE q2('(1)'); EXPLAIN :default_analyze_flags EXECUTE q2('(1)');
-- check when auto explain + analyze is enabled, we do not allow local execution.
CREATE SCHEMA test_auto_explain;
SET search_path TO 'test_auto_explain';
SELECT citus_set_coordinator_host('localhost');
CREATE TABLE test_ref_table (key int PRIMARY KEY);
SELECT create_reference_table('test_ref_table');
LOAD 'auto_explain';
SET auto_explain.log_min_duration = 0;
set auto_explain.log_analyze to true;
-- the following should not be locally executed since explain analyze is on
select * from test_ref_table;
DROP SCHEMA test_auto_explain CASCADE;
select master_remove_node('localhost', :master_port);
SELECT public.wait_until_metadata_sync(30000);
SET client_min_messages TO ERROR; SET client_min_messages TO ERROR;
DROP SCHEMA multi_explain CASCADE; DROP SCHEMA multi_explain CASCADE;