diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index 9b31fbac1..4854676e1 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -809,13 +809,7 @@ AdaptiveExecutor(CitusScanState *scanState) bool localExecutionSupported = true; - /* - * When running a distributed plan—either the root plan or a subplan’s - * distributed fragment—we need to know if we’re under EXPLAIN ANALYZE. - * Subplans can’t receive the EXPLAIN ANALYZE flag directly, so we use - * SubPlanExplainAnalyzeContext as a flag to indicate that context. - */ - if (RequestedForExplainAnalyze(scanState) || SubPlanExplainAnalyzeContext) + if (RequestedForExplainAnalyze(scanState)) { /* * We use multiple queries per task in EXPLAIN ANALYZE which need to diff --git a/src/backend/distributed/executor/insert_select_executor.c b/src/backend/distributed/executor/insert_select_executor.c index bf5886c44..58c172c66 100644 --- a/src/backend/distributed/executor/insert_select_executor.c +++ b/src/backend/distributed/executor/insert_select_executor.c @@ -42,6 +42,7 @@ #include "distributed/merge_planner.h" #include "distributed/metadata_cache.h" #include "distributed/multi_executor.h" +#include "distributed/multi_explain.h" #include "distributed/multi_partitioning_utils.h" #include "distributed/multi_physical_planner.h" #include "distributed/multi_router_planner.h" @@ -121,7 +122,7 @@ NonPushableInsertSelectExecScan(CustomScanState *node) bool binaryFormat = CanUseBinaryCopyFormatForTargetList(selectQuery->targetList); - ExecuteSubPlans(distSelectPlan, false); + ExecuteSubPlans(distSelectPlan, RequestedForExplainAnalyze(scanState)); /* * We have a separate directory for each transaction, so choosing diff --git a/src/backend/distributed/executor/merge_executor.c b/src/backend/distributed/executor/merge_executor.c index 8b7321d9b..56bde62bc 100644 --- a/src/backend/distributed/executor/merge_executor.c +++ b/src/backend/distributed/executor/merge_executor.c @@ -23,6 +23,7 @@ #include "distributed/merge_executor.h" #include "distributed/merge_planner.h" #include "distributed/multi_executor.h" +#include "distributed/multi_explain.h" #include "distributed/multi_partitioning_utils.h" #include "distributed/multi_router_planner.h" #include "distributed/repartition_executor.h" @@ -132,7 +133,7 @@ ExecuteSourceAtWorkerAndRepartition(CitusScanState *scanState) ereport(DEBUG1, (errmsg("Executing subplans of the source query and " "storing the results at the respective node(s)"))); - ExecuteSubPlans(distSourcePlan, false); + ExecuteSubPlans(distSourcePlan, RequestedForExplainAnalyze(scanState)); /* * We have a separate directory for each transaction, so choosing diff --git a/src/backend/distributed/executor/subplan_execution.c b/src/backend/distributed/executor/subplan_execution.c index 033a21b2b..cee66eb2b 100644 --- a/src/backend/distributed/executor/subplan_execution.c +++ b/src/backend/distributed/executor/subplan_execution.c @@ -63,6 +63,10 @@ ExecuteSubPlans(DistributedPlan *distributedPlan, bool explainAnalyzeEnabled) { SubPlanExplainAnalyzeContext = GetMemoryChunkContext(distributedPlan); } + else + { + SubPlanExplainAnalyzeContext = NULL; + } HTAB *intermediateResultsHash = MakeIntermediateResultHTAB(); RecordSubplanExecutionsOnNodes(intermediateResultsHash, distributedPlan); @@ -103,7 +107,17 @@ ExecuteSubPlans(DistributedPlan *distributedPlan, bool explainAnalyzeEnabled) TimestampTz startTimestamp = GetCurrentTimestamp(); - ExecutePlanIntoDestReceiver(plannedStmt, params, copyDest); + PG_TRY(); + { + ExecutePlanIntoDestReceiver(plannedStmt, params, copyDest); + } + PG_CATCH(); + { + SubPlanExplainAnalyzeContext = NULL; + PG_RE_THROW(); + } + PG_END_TRY(); + /* * EXPLAIN ANALYZE instrumentations. Calculating these are very light-weight, @@ -127,8 +141,5 @@ ExecuteSubPlans(DistributedPlan *distributedPlan, bool explainAnalyzeEnabled) FreeExecutorState(estate); } - if (explainAnalyzeEnabled) - { - SubPlanExplainAnalyzeContext = NULL; - } + SubPlanExplainAnalyzeContext = NULL; } diff --git a/src/backend/distributed/planner/multi_explain.c b/src/backend/distributed/planner/multi_explain.c index 93d1cbba7..7ad06c002 100644 --- a/src/backend/distributed/planner/multi_explain.c +++ b/src/backend/distributed/planner/multi_explain.c @@ -68,6 +68,7 @@ #include "distributed/placement_connection.h" #include "distributed/recursive_planning.h" #include "distributed/remote_commands.h" +#include "distributed/subplan_execution.h" #include "distributed/tuple_destination.h" #include "distributed/tuplestore.h" #include "distributed/version_compat.h" @@ -164,6 +165,7 @@ static void ExplainIndentText(ExplainState *es); static void ExplainPrintSerialize(ExplainState *es, SerializeMetrics *metrics); static SerializeMetrics GetSerializationMetrics(DestReceiver *dest); +static void ExtractAnalyzeStats(DistributedSubPlan *subPlan, QueryDesc *queryDesc); /* * DestReceiver functions for SERIALIZE option @@ -213,7 +215,8 @@ static const char * ExplainFormatStr(ExplainFormat format); #if PG_VERSION_NUM >= PG_VERSION_17 static const char * ExplainSerializeStr(ExplainSerializeOption serializeOption); #endif -static void ExplainWorkerPlan(PlannedStmt *plannedStmt, DestReceiver *dest, +static void ExplainWorkerPlan(PlannedStmt *plannedStmt, DistributedSubPlan *subPlan, + DestReceiver *dest, ExplainState *es, const char *queryString, ParamListInfo params, QueryEnvironment *queryEnv, @@ -431,6 +434,58 @@ NonPushableMergeCommandExplainScan(CustomScanState *node, List *ancestors, } +/* + * ExtractAnalyzeStats parses the EXPLAIN ANALYZE output of the pre-executed + * subplans and injects the parsed statistics into queryDesc->planstate->instrument. + */ +static void +ExtractAnalyzeStats(DistributedSubPlan *subPlan, QueryDesc *queryDesc) +{ + /* Inject the earlier executed results into the newly created tasks */ + if (subPlan->numTasksOutput && (queryDesc->planstate != NULL) && + IsA(queryDesc->planstate, CustomScanState)) + { + DistributedPlan *newdistributedPlan = + ((CitusScanState *) queryDesc->planstate)->distributedPlan; + + ListCell *lc; + int idx = 0; + + /* We need to extract this from the explain output of workers */ + Instrumentation instr = { 0 }; + foreach(lc, newdistributedPlan->workerJob->taskList) + { + if (subPlan->totalExplainOutput[idx].explainOutput && + idx < subPlan->numTasksOutput) + { + /* + * Now feed the earlier saved output, which will be used + * by RemoteExplain() when printing tasks + */ + Task *task = (Task *) lfirst(lc); + MemoryContext taskContext = GetMemoryChunkContext(task); + + task->totalReceivedTupleData = + subPlan->totalExplainOutput[idx].totalReceivedTupleData; + task->fetchedExplainAnalyzeExecutionDuration = + subPlan->totalExplainOutput[idx].executionDuration; + task->fetchedExplainAnalyzePlan = + MemoryContextStrdup(taskContext, + subPlan->totalExplainOutput[idx].explainOutput); + ParseExplainAnalyzeOutput(task->fetchedExplainAnalyzePlan, &instr); + + subPlan->totalExplainOutput[idx].explainOutput = NULL; + } + + idx++; + } + + instr.nloops = 1; + queryDesc->planstate->instrument = &instr; + } +} + + /* * ExplainSubPlans generates EXPLAIN output for subplans for CTEs * and complex subqueries. Because the planning for these queries @@ -449,7 +504,6 @@ ExplainSubPlans(DistributedPlan *distributedPlan, ExplainState *es) { DistributedSubPlan *subPlan = (DistributedSubPlan *) lfirst(subPlanCell); PlannedStmt *plan = subPlan->plan; - IntoClause *into = NULL; ParamListInfo params = NULL; /* @@ -457,9 +511,25 @@ ExplainSubPlans(DistributedPlan *distributedPlan, ExplainState *es) * for now we put an empty string, which is valid according to postgres. */ char *queryString = pstrdup(""); + instr_time planDuration; BufferUsage bufusage_start, bufusage; +#if PG_VERSION_NUM >= PG_VERSION_17 + MemoryContextCounters mem_counters; + MemoryContext planner_ctx = NULL; + MemoryContext saved_ctx = NULL; + + if (es->memory) + { + /* copy paste from postgres code */ + planner_ctx = AllocSetContextCreate(CurrentMemoryContext, + "explain analyze planner context", + ALLOCSET_DEFAULT_SIZES); + saved_ctx = MemoryContextSwitchTo(planner_ctx); + } +#endif + if (es->buffers) { bufusage_start = pgBufferUsage; @@ -506,6 +576,8 @@ ExplainSubPlans(DistributedPlan *distributedPlan, ExplainState *es) ExplainPropertyText("Result destination", destination->data, es); } + INSTR_TIME_SET_ZERO(planDuration); + /* calc differences of buffer counters. */ if (es->buffers) { @@ -515,100 +587,37 @@ ExplainSubPlans(DistributedPlan *distributedPlan, ExplainState *es) ExplainOpenGroup("PlannedStmt", "PlannedStmt", false, es); - /* Print only and not execute */ - DestReceiver *dest; - if (into) + /* TODO: Do we really need dest and bound params when not executing? */ + DestReceiver *dest = None_Receiver; + double executionDurationMillisec = 0.0; + +#if PG_VERSION_NUM >= PG_VERSION_17 + if (es->memory) { - dest = CreateIntoRelDestReceiver(into); - } - else - { - dest = None_Receiver; - } - - int instrument_option = 0; - - if (es->analyze && es->timing) - { - instrument_option |= INSTRUMENT_TIMER; - } - else if (es->analyze) - { - instrument_option |= INSTRUMENT_ROWS; + MemoryContextSwitchTo(saved_ctx); + MemoryContextMemConsumed(planner_ctx, &mem_counters); } + /* calc differences of buffer counters. */ if (es->buffers) { - instrument_option |= INSTRUMENT_BUFFERS; - } - if (es->wal) - { - instrument_option |= INSTRUMENT_WAL; + memset(&bufusage, 0, sizeof(BufferUsage)); + BufferUsageAccumDiff(&bufusage, &pgBufferUsage, &bufusage_start); } - /* Create a QueryDesc for the query */ - QueryDesc *queryDesc = - CreateQueryDesc(plan, queryString, GetActiveSnapshot(), - InvalidSnapshot, dest, params, NULL, instrument_option); + /* do the actual EXPLAIN ANALYZE */ + ExplainWorkerPlan(plan, subPlan, dest, es, queryString, params, NULL, + &planDuration, + (es->buffers ? &bufusage : NULL), + (es->memory ? &mem_counters : NULL), + &executionDurationMillisec); +#else - ExecutorStart(queryDesc, EXEC_FLAG_EXPLAIN_ONLY); + /* do the actual EXPLAIN ANALYZE */ + ExplainWorkerPlan(plan, subPlan, dest, es, queryString, params, NULL, + &planDuration, &executionDurationMillisec); +#endif - /* Inject the earlier executed results into the newly created tasks */ - - if (subPlan->numTasksOutput && (queryDesc->planstate != NULL) && - IsA(queryDesc->planstate, CustomScanState)) - { - DistributedPlan *newdistributedPlan = - ((CitusScanState *) queryDesc->planstate)->distributedPlan; - - ListCell *lc; - int idx = 0; - - /* We need to extract this from the explain output of workers */ - Instrumentation instr = { 0 }; - foreach(lc, newdistributedPlan->workerJob->taskList) - { - if (subPlan->totalExplainOutput[idx].explainOutput && - idx < subPlan->numTasksOutput) - { - /* - * Now feed the earlier saved output, which will be used - * by RemoteExplain() when printing tasks - */ - Task *task = (Task *) lfirst(lc); - MemoryContext taskContext = GetMemoryChunkContext(task); - - task->totalReceivedTupleData = - subPlan->totalExplainOutput[idx].totalReceivedTupleData; - task->fetchedExplainAnalyzeExecutionDuration = - subPlan->totalExplainOutput[idx].executionDuration; - task->fetchedExplainAnalyzePlan = - MemoryContextStrdup(taskContext, - subPlan->totalExplainOutput[idx].explainOutput); - ParseExplainAnalyzeOutput(task->fetchedExplainAnalyzePlan, &instr); - - subPlan->totalExplainOutput[idx].explainOutput = NULL; - } - - idx++; - } - instr.nloops = 1; - queryDesc->planstate->instrument = &instr; - } - - ExplainOpenGroup("Query", NULL, true, es); - - ExplainPrintPlan(es, queryDesc); - - if (es->analyze) - { - ExplainPrintTriggers(es, queryDesc); - } - - ExecutorEnd(queryDesc); - FreeQueryDesc(queryDesc); - - ExplainCloseGroup("Query", NULL, true, es); ExplainCloseGroup("PlannedStmt", "PlannedStmt", false, es); ExplainCloseGroup("Subplan", NULL, true, es); @@ -1424,7 +1433,7 @@ worker_save_query_explain_analyze(PG_FUNCTION_ARGS) } /* do the actual EXPLAIN ANALYZE */ - ExplainWorkerPlan(plan, tupleStoreDest, es, queryString, boundParams, NULL, + ExplainWorkerPlan(plan, NULL, tupleStoreDest, es, queryString, boundParams, NULL, &planDuration, (es->buffers ? &bufusage : NULL), (es->memory ? &mem_counters : NULL), @@ -1432,7 +1441,7 @@ worker_save_query_explain_analyze(PG_FUNCTION_ARGS) #else /* do the actual EXPLAIN ANALYZE */ - ExplainWorkerPlan(plan, tupleStoreDest, es, queryString, boundParams, NULL, + ExplainWorkerPlan(plan, NULL, tupleStoreDest, es, queryString, boundParams, NULL, &planDuration, &executionDurationMillisec); #endif @@ -1813,7 +1822,14 @@ ExplainAnalyzeDestTupleDescForQuery(TupleDestination *self, int queryNumber) bool RequestedForExplainAnalyze(CitusScanState *node) { - return (node->customScanState.ss.ps.state->es_instrument != 0); + /* + * When running a distributed plan—either the root plan or a subplan’s + * distributed fragment—we need to know if we’re under EXPLAIN ANALYZE. + * Subplans can’t receive the EXPLAIN ANALYZE flag directly, so we use + * SubPlanExplainAnalyzeContext as a flag to indicate that context. + */ + return (node->customScanState.ss.ps.state->es_instrument != 0) || + (SubPlanLevel > 0 && SubPlanExplainAnalyzeContext); } @@ -2140,7 +2156,7 @@ ExplainOneQuery(Query *query, int cursorOptions, * destination. */ static void -ExplainWorkerPlan(PlannedStmt *plannedstmt, DestReceiver *dest, ExplainState *es, +ExplainWorkerPlan(PlannedStmt *plannedstmt, DistributedSubPlan *subPlan, DestReceiver *dest, ExplainState *es, const char *queryString, ParamListInfo params, QueryEnvironment *queryEnv, const instr_time *planduration, #if PG_VERSION_NUM >= PG_VERSION_17 @@ -2154,6 +2170,8 @@ ExplainWorkerPlan(PlannedStmt *plannedstmt, DestReceiver *dest, ExplainState *es double totaltime = 0; int eflags; int instrument_option = 0; + /* Never executed a sub-plan, it's already done */ + bool executeQuery = (es->analyze && !subPlan); Assert(plannedstmt->commandType != CMD_UTILITY); @@ -2188,7 +2206,7 @@ ExplainWorkerPlan(PlannedStmt *plannedstmt, DestReceiver *dest, ExplainState *es dest, params, queryEnv, instrument_option); /* Select execution options */ - if (es->analyze) + if (executeQuery) eflags = 0; /* default run-to-completion flags */ else eflags = EXEC_FLAG_EXPLAIN_ONLY; @@ -2197,7 +2215,7 @@ ExplainWorkerPlan(PlannedStmt *plannedstmt, DestReceiver *dest, ExplainState *es ExecutorStart(queryDesc, eflags); /* Execute the plan for statistics if asked for */ - if (es->analyze) + if (executeQuery) { ScanDirection dir = ForwardScanDirection; @@ -2213,6 +2231,11 @@ ExplainWorkerPlan(PlannedStmt *plannedstmt, DestReceiver *dest, ExplainState *es ExplainOpenGroup("Query", NULL, true, es); + if (subPlan) + { + ExtractAnalyzeStats(subPlan, queryDesc); + } + /* Create textual dump of plan tree */ ExplainPrintPlan(es, queryDesc); @@ -2292,7 +2315,7 @@ ExplainWorkerPlan(PlannedStmt *plannedstmt, DestReceiver *dest, ExplainState *es PopActiveSnapshot(); /* We need a CCI just in case query expanded to multiple plans */ - if (es->analyze) + if (executeQuery) CommandCounterIncrement(); totaltime += elapsed_time(&starttime); @@ -2329,6 +2352,25 @@ elapsed_time(instr_time *starttime) } +/* + * ParseExplainAnalyzeOutput + * + * Parses the output of an EXPLAIN ANALYZE run to extract ANALYZE statistics + * and other instrumentation data. + * + * Parameters: + * explainOutput - a null-terminated string containing the raw EXPLAIN ANALYZE output + * instr - pointer to an Instrumentation struct to accumulate parsed metrics + * + * Behavior: + * - Validates inputs and returns immediately if either pointer is NULL. + * - Tokenizes the output on spaces, parentheses, and newlines. + * - Populates the instr-><..> fields. + * + * Notes: + * - Caller must free or manage the memory for explainOutput. + * - Parsing is case-sensitive and assumes standard EXPLAIN ANALYZE formatting. + */ static void ParseExplainAnalyzeOutput(char *explainOutput, Instrumentation *instr) { @@ -2417,6 +2459,7 @@ ParseExplainAnalyzeOutput(char *explainOutput, Instrumentation *instr) pfree(line); } + #if PG_VERSION_NUM >= PG_VERSION_17 /* * Return whether show_buffer_usage would have anything to print, if given diff --git a/src/test/regress/expected/multi_explain.out b/src/test/regress/expected/multi_explain.out index 91ee0ebf3..8d980a711 100644 --- a/src/test/regress/expected/multi_explain.out +++ b/src/test/regress/expected/multi_explain.out @@ -3228,6 +3228,47 @@ Custom Scan (Citus Adaptive) (actual rows=0 loops=1) -> Update on tbl_570036 tbl (actual rows=0 loops=1) -> Seq Scan on tbl_570036 tbl (actual rows=0 loops=1) Filter: (a = 1) +-- EXPLAIN ANALYZE shouldn't execute SubPlans twice (bug #4212) +SET search_path TO multi_explain; +CREATE TABLE test_subplans (x int primary key, y int); +SELECT create_distributed_table('test_subplans','x'); + +EXPLAIN (COSTS off, ANALYZE on, TIMING off, SUMMARY off) +WITH a AS (INSERT INTO test_subplans VALUES (1,2) RETURNING *) +SELECT * FROM a; +Custom Scan (Citus Adaptive) (actual rows=1 loops=1) + -> Distributed Subplan XXX_1 + Intermediate Data Size: 18 bytes + Result destination: Write locally + -> Custom Scan (Citus Adaptive) (actual rows=1 loops=1) + Task Count: 1 + Tuple data received from nodes: 16 bytes + Tasks Shown: All + -> Task + Tuple data received from node: 16 bytes + Node: host=localhost port=xxxxx dbname=regression + -> Insert on test_subplans_570038 (actual rows=1 loops=1) + -> Result (actual rows=1 loops=1) + Task Count: 1 + Tuple data received from nodes: 8 bytes + Tasks Shown: All + -> Task + Tuple data received from node: 8 bytes + Node: host=localhost port=xxxxx dbname=regression + -> Function Scan on read_intermediate_result intermediate_result (actual rows=1 loops=1) +-- Only one row must exist +SELECT * FROM test_subplans; +1|2 +-- Will fail with duplicate pk +EXPLAIN (COSTS off, ANALYZE on, TIMING off, SUMMARY off) +WITH a AS (INSERT INTO test_subplans VALUES (1,2) RETURNING *) +SELECT * FROM a; +ERROR: duplicate key value violates unique constraint "test_subplans_pkey_570038" +DETAIL: Key (x)=(1) already exists. +CONTEXT: while executing command on localhost:xxxxx +-- Only one row must exist +SELECT * FROM test_subplans; +1|2 -- check when auto explain + analyze is enabled, we do not allow local execution. CREATE SCHEMA test_auto_explain; SET search_path TO 'test_auto_explain'; diff --git a/src/test/regress/sql/multi_explain.sql b/src/test/regress/sql/multi_explain.sql index 65ca6f5da..762f725e0 100644 --- a/src/test/regress/sql/multi_explain.sql +++ b/src/test/regress/sql/multi_explain.sql @@ -1166,6 +1166,26 @@ PREPARE q2(int_wrapper_type) AS WITH a AS (UPDATE tbl SET b = $1 WHERE a = 1 RET EXPLAIN (COSTS false) EXECUTE q2('(1)'); EXPLAIN :default_analyze_flags EXECUTE q2('(1)'); +-- EXPLAIN ANALYZE shouldn't execute SubPlans twice (bug #4212) +SET search_path TO multi_explain; +CREATE TABLE test_subplans (x int primary key, y int); +SELECT create_distributed_table('test_subplans','x'); + +EXPLAIN (COSTS off, ANALYZE on, TIMING off, SUMMARY off) +WITH a AS (INSERT INTO test_subplans VALUES (1,2) RETURNING *) +SELECT * FROM a; + +-- Only one row must exist +SELECT * FROM test_subplans; + +-- Will fail with duplicate pk +EXPLAIN (COSTS off, ANALYZE on, TIMING off, SUMMARY off) +WITH a AS (INSERT INTO test_subplans VALUES (1,2) RETURNING *) +SELECT * FROM a; + +-- Only one row must exist +SELECT * FROM test_subplans; + -- check when auto explain + analyze is enabled, we do not allow local execution. CREATE SCHEMA test_auto_explain; SET search_path TO 'test_auto_explain';