From eaf7ff89959708d839a79cdfa6cd22aabc957f4a Mon Sep 17 00:00:00 2001 From: Teja Mupparti Date: Sun, 8 Jun 2025 13:43:25 -0700 Subject: [PATCH 1/3] Save the output of Subplans during ExecuteSubPlans(), and later use it in ExplainSubPlans() --- .../distributed/executor/adaptive_executor.c | 11 +- .../executor/insert_select_executor.c | 2 +- .../distributed/executor/merge_executor.c | 2 +- .../distributed/executor/subplan_execution.c | 31 ++- .../distributed/planner/multi_explain.c | 241 +++++++++++++++--- .../distributed/utils/citus_copyfuncs.c | 19 ++ .../distributed/utils/citus_outfuncs.c | 38 +++ src/include/distributed/distributed_planner.h | 1 + .../distributed/multi_physical_planner.h | 12 + src/include/distributed/subplan_execution.h | 2 +- src/test/regress/expected/stat_counters.out | 9 +- src/test/regress/sql/stat_counters.sql | 9 +- 12 files changed, 328 insertions(+), 49 deletions(-) diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index 895f01ae7..9b31fbac1 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -630,6 +630,7 @@ typedef struct TaskPlacementExecution instr_time endTime; } TaskPlacementExecution; +extern MemoryContext SubPlanExplainAnalyzeContext; /* local functions */ static DistributedExecution * CreateDistributedExecution(RowModifyLevel modLevel, @@ -760,7 +761,7 @@ AdaptiveExecutorPreExecutorRun(CitusScanState *scanState) */ LockPartitionsForDistributedPlan(distributedPlan); - ExecuteSubPlans(distributedPlan); + ExecuteSubPlans(distributedPlan, RequestedForExplainAnalyze(scanState)); scanState->finishedPreScan = true; } @@ -808,7 +809,13 @@ AdaptiveExecutor(CitusScanState *scanState) bool localExecutionSupported = true; - if (RequestedForExplainAnalyze(scanState)) + /* + * When running a distributed plan—either the root plan or a subplan’s + * distributed fragment—we need to know if we’re under EXPLAIN ANALYZE. + * Subplans can’t receive the EXPLAIN ANALYZE flag directly, so we use + * SubPlanExplainAnalyzeContext as a flag to indicate that context. + */ + if (RequestedForExplainAnalyze(scanState) || SubPlanExplainAnalyzeContext) { /* * We use multiple queries per task in EXPLAIN ANALYZE which need to diff --git a/src/backend/distributed/executor/insert_select_executor.c b/src/backend/distributed/executor/insert_select_executor.c index 9ed1962fa..bf5886c44 100644 --- a/src/backend/distributed/executor/insert_select_executor.c +++ b/src/backend/distributed/executor/insert_select_executor.c @@ -121,7 +121,7 @@ NonPushableInsertSelectExecScan(CustomScanState *node) bool binaryFormat = CanUseBinaryCopyFormatForTargetList(selectQuery->targetList); - ExecuteSubPlans(distSelectPlan); + ExecuteSubPlans(distSelectPlan, false); /* * We have a separate directory for each transaction, so choosing diff --git a/src/backend/distributed/executor/merge_executor.c b/src/backend/distributed/executor/merge_executor.c index d0f01dcf2..8b7321d9b 100644 --- a/src/backend/distributed/executor/merge_executor.c +++ b/src/backend/distributed/executor/merge_executor.c @@ -132,7 +132,7 @@ ExecuteSourceAtWorkerAndRepartition(CitusScanState *scanState) ereport(DEBUG1, (errmsg("Executing subplans of the source query and " "storing the results at the respective node(s)"))); - ExecuteSubPlans(distSourcePlan); + ExecuteSubPlans(distSourcePlan, false); /* * We have a separate directory for each transaction, so choosing diff --git a/src/backend/distributed/executor/subplan_execution.c b/src/backend/distributed/executor/subplan_execution.c index ef2838343..10e0fe470 100644 --- a/src/backend/distributed/executor/subplan_execution.c +++ b/src/backend/distributed/executor/subplan_execution.c @@ -30,13 +30,21 @@ int MaxIntermediateResult = 1048576; /* maximum size in KB the intermediate resu /* when this is true, we enforce intermediate result size limit in all executors */ int SubPlanLevel = 0; +/* + * SubPlanExplainAnalyzeContext is both a memory context for storing + * subplans’ EXPLAIN ANALYZE output and a flag indicating that execution + * is running under EXPLAIN ANALYZE for subplans. + */ +MemoryContext SubPlanExplainAnalyzeContext = NULL; +SubPlanExplainOutput *SubPlanExplainAnalyzeOutput; +extern int NumTasksOutput; /* * ExecuteSubPlans executes a list of subplans from a distributed plan * by sequentially executing each plan from the top. */ void -ExecuteSubPlans(DistributedPlan *distributedPlan) +ExecuteSubPlans(DistributedPlan *distributedPlan, bool explainAnalyzeEnabled) { uint64 planId = distributedPlan->planId; List *subPlanList = distributedPlan->subPlanList; @@ -47,6 +55,15 @@ ExecuteSubPlans(DistributedPlan *distributedPlan) return; } + /* + * If the root DistributedPlan has EXPLAIN ANALYZE enabled, + * its subplans should also have EXPLAIN ANALYZE enabled. + */ + if (explainAnalyzeEnabled) + { + SubPlanExplainAnalyzeContext = GetMemoryChunkContext(distributedPlan); + } + HTAB *intermediateResultsHash = MakeIntermediateResultHTAB(); RecordSubplanExecutionsOnNodes(intermediateResultsHash, distributedPlan); @@ -61,6 +78,13 @@ ExecuteSubPlans(DistributedPlan *distributedPlan) DistributedSubPlan *subPlan = NULL; foreach_declared_ptr(subPlan, subPlanList) { + /* + * Save the EXPLAIN ANALYZE output(s) to be extracted later + * in ExplainSubPlans() + */ + MemSet(subPlan->totalExplainOutput, 0, sizeof(subPlan->totalExplainOutput)); + SubPlanExplainAnalyzeOutput = subPlan->totalExplainOutput; + PlannedStmt *plannedStmt = subPlan->plan; uint32 subPlanId = subPlan->subPlanId; ParamListInfo params = NULL; @@ -100,4 +124,9 @@ ExecuteSubPlans(DistributedPlan *distributedPlan) SubPlanLevel--; FreeExecutorState(estate); } + + if (explainAnalyzeEnabled) + { + SubPlanExplainAnalyzeContext = NULL; + } } diff --git a/src/backend/distributed/planner/multi_explain.c b/src/backend/distributed/planner/multi_explain.c index 370e487b4..f97887de4 100644 --- a/src/backend/distributed/planner/multi_explain.c +++ b/src/backend/distributed/planner/multi_explain.c @@ -78,6 +78,7 @@ bool ExplainDistributedQueries = true; bool ExplainAllTasks = false; int ExplainAnalyzeSortMethod = EXPLAIN_ANALYZE_SORT_BY_TIME; +extern MemoryContext SubPlanExplainAnalyzeContext; /* * If enabled, EXPLAIN ANALYZE output & other statistics of last worker task @@ -85,6 +86,8 @@ int ExplainAnalyzeSortMethod = EXPLAIN_ANALYZE_SORT_BY_TIME; */ static char *SavedExplainPlan = NULL; static double SavedExecutionDurationMillisec = 0.0; +extern SubPlanExplainOutput *SubPlanExplainAnalyzeOutput; +int NumTasksOutput = 0; /* struct to save explain flags */ typedef struct @@ -251,6 +254,7 @@ static double elapsed_time(instr_time *starttime); static void ExplainPropertyBytes(const char *qlabel, int64 bytes, ExplainState *es); static uint64 TaskReceivedTupleData(Task *task); static bool ShowReceivedTupleData(CitusScanState *scanState, ExplainState *es); +static void ParseExplainAnalyzeOutput(char *explainOutput, Instrumentation *instr); /* exports for SQL callable functions */ @@ -297,6 +301,7 @@ CitusExplainScan(CustomScanState *node, List *ancestors, struct ExplainState *es if (distributedPlan->subPlanList != NIL) { ExplainSubPlans(distributedPlan, es); + NumTasksOutput = 0; } ExplainJob(scanState, distributedPlan->workerJob, es, params); @@ -453,25 +458,9 @@ ExplainSubPlans(DistributedPlan *distributedPlan, ExplainState *es) * for now we put an empty string, which is valid according to postgres. */ char *queryString = pstrdup(""); - instr_time planduration; BufferUsage bufusage_start, bufusage; -#if PG_VERSION_NUM >= PG_VERSION_17 - MemoryContextCounters mem_counters; - MemoryContext planner_ctx = NULL; - MemoryContext saved_ctx = NULL; - - if (es->memory) - { - /* copy paste from postgres code */ - planner_ctx = AllocSetContextCreate(CurrentMemoryContext, - "explain analyze planner context", - ALLOCSET_DEFAULT_SIZES); - saved_ctx = MemoryContextSwitchTo(planner_ctx); - } -#endif - if (es->buffers) { bufusage_start = pgBufferUsage; @@ -518,8 +507,6 @@ ExplainSubPlans(DistributedPlan *distributedPlan, ExplainState *es) ExplainPropertyText("Result destination", destination->data, es); } - INSTR_TIME_SET_ZERO(planduration); - /* calc differences of buffer counters. */ if (es->buffers) { @@ -529,21 +516,99 @@ ExplainSubPlans(DistributedPlan *distributedPlan, ExplainState *es) ExplainOpenGroup("PlannedStmt", "PlannedStmt", false, es); -#if PG_VERSION_NUM >= PG_VERSION_17 - if (es->memory) + /* Print only and not execute */ + DestReceiver *dest; + if (into) { - MemoryContextSwitchTo(saved_ctx); - MemoryContextMemConsumed(planner_ctx, &mem_counters); + dest = CreateIntoRelDestReceiver(into); + } + else + { + dest = None_Receiver; } - ExplainOnePlan(plan, into, es, queryString, params, NULL, &planduration, - (es->buffers ? &bufusage : NULL), - (es->memory ? &mem_counters : NULL)); -#else - ExplainOnePlan(plan, into, es, queryString, params, NULL, &planduration, - (es->buffers ? &bufusage : NULL)); -#endif + int instrument_option = 0; + if (es->analyze && es->timing) + { + instrument_option |= INSTRUMENT_TIMER; + } + else if (es->analyze) + { + instrument_option |= INSTRUMENT_ROWS; + } + + if (es->buffers) + { + instrument_option |= INSTRUMENT_BUFFERS; + } + if (es->wal) + { + instrument_option |= INSTRUMENT_WAL; + } + + /* Create a QueryDesc for the query */ + QueryDesc *queryDesc = + CreateQueryDesc(plan, queryString, GetActiveSnapshot(), + InvalidSnapshot, dest, params, NULL, instrument_option); + + ExecutorStart(queryDesc, EXEC_FLAG_EXPLAIN_ONLY); + + /* Inject the earlier executed results into the newly created tasks */ + + if (NumTasksOutput && (queryDesc->planstate != NULL) && + IsA(queryDesc->planstate, CustomScanState)) + { + DistributedPlan *newdistributedPlan = + ((CitusScanState *) queryDesc->planstate)->distributedPlan; + + ListCell *lc; + int idx = 0; + + /* We need to extract this from the explain output of workers */ + Instrumentation instr = { 0 }; + foreach(lc, newdistributedPlan->workerJob->taskList) + { + if (subPlan->totalExplainOutput[idx].explainOutput && + idx < NumTasksOutput) + { + /* + * Now feed the earlier saved output, which will be used + * by RemoteExplain() when printing tasks + */ + Task *task = (Task *) lfirst(lc); + MemoryContext taskContext = GetMemoryChunkContext(task); + + task->totalReceivedTupleData = + subPlan->totalExplainOutput[idx].totalReceivedTupleData; + task->fetchedExplainAnalyzeExecutionDuration = + subPlan->totalExplainOutput[idx].executionDuration; + task->fetchedExplainAnalyzePlan = + MemoryContextStrdup(taskContext, + subPlan->totalExplainOutput[idx].explainOutput); + ParseExplainAnalyzeOutput(task->fetchedExplainAnalyzePlan, &instr); + + subPlan->totalExplainOutput[idx].explainOutput = NULL; + } + + idx++; + } + queryDesc->planstate->instrument = &instr; + } + + ExplainOpenGroup("Query", NULL, true, es); + + ExplainPrintPlan(es, queryDesc); + + if (es->analyze) + { + ExplainPrintTriggers(es, queryDesc); + } + + ExecutorEnd(queryDesc); + FreeQueryDesc(queryDesc); + + ExplainCloseGroup("Query", NULL, true, es); ExplainCloseGroup("PlannedStmt", "PlannedStmt", false, es); ExplainCloseGroup("Subplan", NULL, true, es); @@ -1621,6 +1686,11 @@ ExplainAnalyzeDestPutTuple(TupleDestination *self, Task *task, originalTupDest->putTuple(originalTupDest, task, placementIndex, 0, heapTuple, tupleLibpqSize); tupleDestination->originalTask->totalReceivedTupleData += tupleLibpqSize; + if (SubPlanExplainAnalyzeContext && NumTasksOutput < MAX_ANALYZE_OUTPUT) + { + SubPlanExplainAnalyzeOutput[NumTasksOutput].totalReceivedTupleData = + tupleDestination->originalTask->totalReceivedTupleData; + } } else if (queryNumber == 1) { @@ -1670,6 +1740,17 @@ ExplainAnalyzeDestPutTuple(TupleDestination *self, Task *task, placementIndex; tupleDestination->originalTask->fetchedExplainAnalyzeExecutionDuration = fetchedExplainAnalyzeExecutionDuration; + + /* We should build tupleDestination in subPlan similar to the above */ + if (SubPlanExplainAnalyzeContext && NumTasksOutput < MAX_ANALYZE_OUTPUT) + { + SubPlanExplainAnalyzeOutput[NumTasksOutput].explainOutput = + MemoryContextStrdup(SubPlanExplainAnalyzeContext, + fetchedExplainAnalyzePlan); + SubPlanExplainAnalyzeOutput[NumTasksOutput].executionDuration = + fetchedExplainAnalyzeExecutionDuration; + NumTasksOutput++; + } } else { @@ -2248,6 +2329,108 @@ elapsed_time(instr_time *starttime) } +static void +ParseExplainAnalyzeOutput(char *explainOutput, Instrumentation *instr) +{ + char *line = pstrdup(explainOutput); + char *token, + *saveptr; + bool in_wal = false; + + /* split on spaces, parentheses or newlines */ + for (token = strtok_r(line, " ()\n", &saveptr); + token != NULL; + token = strtok_r(NULL, " ()\n", &saveptr)) + { + if (strcmp(token, "WAL:") == 0) + { + in_wal = true; + continue; + } + + if (in_wal) + { + if (strncmp(token, "records=", 8) == 0) + instr->walusage.wal_records = + strtoul(token + 8, NULL, 10); + else if (strncmp(token, "bytes=", 6) == 0) + { + instr->walusage.wal_bytes = + strtoul(token + 6, NULL, 10); + /* once we’ve seen bytes=, we can leave WAL mode */ + in_wal = false; + } + continue; + } + + if (strncmp(token, "time=", 5) == 0) + { + /* token is "time=X..Y" */ + char *p = token + 5; + char *dd = strstr(p, ".."); + + if (dd) + { + *dd = '\0'; + instr->startup += strtod(p, NULL) / 1000.0; + instr->total += strtod(dd + 2, NULL) / 1000.0; + } + } + else if (strncmp(token, "rows=", 5) == 0) + { + instr->ntuples += strtol(token + 5, NULL, 10); + } + else if (strncmp(token, "loops=", 6) == 0) + { + instr->nloops = strtol(token + 6, NULL, 10); + } + } + + pfree(line); +} + +#if 0 +/* + * ParseExplainAnalyzeOutput + */ +static void +ParseExplainAnalyzeOutput(char *explainOutput, Instrumentation *instr) +{ + double start_ms = 0.0, end_ms = 0.0; + int rows = 0, loops = 0; + + /* 1) Extract “actual time=XXX..YYY rows=R loops=L” */ + if (sscanf(explainOutput, "%*[^=]=%lf..%lf rows=%d loops=%d", + &start_ms, &end_ms, &rows, &loops) == 4) + { + /* times in ms, convert to seconds */ + instr->startup += start_ms / 1000.0; + instr->total += end_ms / 1000.0; + instr->ntuples += (double) rows; + instr->nloops = (double) loops; + } + else if (sscanf(explainOutput, "%*[^(\n](actual rows=%d loops=%d)", &rows, &loops) == 2) + { + /* no timing present, just capture rows & loops */ + instr->ntuples += (double) rows; + instr->nloops = (double) loops; + } + + /* 2) Look for “WAL: records=X bytes=Y” */ + const char *wal = strstr(explainOutput, "WAL:"); + if (wal) + { + int recs = 0, bytes = 0; + if (sscanf(wal, "WAL: records=%d bytes=%d", &recs, &bytes) == 2) + { + instr->walusage.wal_records += recs; + instr->walusage.wal_bytes += bytes; + } + } +} +#endif + + #if PG_VERSION_NUM >= PG_VERSION_17 /* * Return whether show_buffer_usage would have anything to print, if given diff --git a/src/backend/distributed/utils/citus_copyfuncs.c b/src/backend/distributed/utils/citus_copyfuncs.c index 4b4a334c8..81775c6db 100644 --- a/src/backend/distributed/utils/citus_copyfuncs.c +++ b/src/backend/distributed/utils/citus_copyfuncs.c @@ -147,6 +147,25 @@ CopyNodeDistributedSubPlan(COPYFUNC_ARGS) COPY_SCALAR_FIELD(subPlanId); COPY_NODE_FIELD(plan); + COPY_SCALAR_FIELD(bytesSentPerWorker); + COPY_SCALAR_FIELD(remoteWorkerCount); + COPY_SCALAR_FIELD(durationMillisecs); + COPY_SCALAR_FIELD(writeLocalFile); + + MemSet(newnode->totalExplainOutput, 0, sizeof(newnode->totalExplainOutput)); + + /* copy each SubPlanExplainOutput element */ + for (int i = 0; i < MAX_ANALYZE_OUTPUT; i++) + { + /* copy the explainOutput string pointer */ + COPY_STRING_FIELD(totalExplainOutput[i].explainOutput); + + /* copy the executionDuration (double) */ + COPY_SCALAR_FIELD(totalExplainOutput[i].executionDuration); + + /* copy the totalReceivedTupleData (uint64) */ + COPY_SCALAR_FIELD(totalExplainOutput[i].totalReceivedTupleData); + } } diff --git a/src/backend/distributed/utils/citus_outfuncs.c b/src/backend/distributed/utils/citus_outfuncs.c index 751063789..451bc279e 100644 --- a/src/backend/distributed/utils/citus_outfuncs.c +++ b/src/backend/distributed/utils/citus_outfuncs.c @@ -215,6 +215,44 @@ OutDistributedSubPlan(OUTFUNC_ARGS) WRITE_UINT_FIELD(subPlanId); WRITE_NODE_FIELD(plan); + WRITE_UINT64_FIELD(bytesSentPerWorker); + WRITE_INT_FIELD(remoteWorkerCount); + WRITE_FLOAT_FIELD(durationMillisecs, "%.2f"); + WRITE_BOOL_FIELD(writeLocalFile); + + appendStringInfoString(str, " totalExplainOutput ["); + for (int i = 0; i < MAX_ANALYZE_OUTPUT; i++) + { + const SubPlanExplainOutput *e = &node->totalExplainOutput[i]; + + /* skip empty slots */ + if (e->explainOutput == NULL && + e->executionDuration == 0 + && e->totalReceivedTupleData == 0) + { + continue; + } + + if (i > 0) + { + appendStringInfoChar(str, ' '); + } + + appendStringInfoChar(str, '('); + + /* string pointer – prints quoted or NULL */ + WRITE_STRING_FIELD(totalExplainOutput[i].explainOutput); + + /* double field */ + WRITE_FLOAT_FIELD(totalExplainOutput[i].executionDuration, "%.2f"); + + /* 64-bit unsigned – use the uint64 macro */ + WRITE_UINT64_FIELD(totalExplainOutput[i].totalReceivedTupleData); + + appendStringInfoChar(str, ')'); + } + + appendStringInfoChar(str, ']'); } void diff --git a/src/include/distributed/distributed_planner.h b/src/include/distributed/distributed_planner.h index 33a9c2fa8..a0505d821 100644 --- a/src/include/distributed/distributed_planner.h +++ b/src/include/distributed/distributed_planner.h @@ -28,6 +28,7 @@ #define CURSOR_OPT_FORCE_DISTRIBUTED 0x080000 +#define MAX_ANALYZE_OUTPUT 32 /* level of planner calls */ extern int PlannerLevel; diff --git a/src/include/distributed/multi_physical_planner.h b/src/include/distributed/multi_physical_planner.h index 475a41b37..fbb13fb71 100644 --- a/src/include/distributed/multi_physical_planner.h +++ b/src/include/distributed/multi_physical_planner.h @@ -474,6 +474,17 @@ typedef struct DistributedPlan } DistributedPlan; +/* + * + */ +typedef struct SubPlanExplainOutput +{ + char *explainOutput; + double executionDuration; + uint64 totalReceivedTupleData; +} SubPlanExplainOutput; + + /* * DistributedSubPlan contains a subplan of a distributed plan. Subplans are * executed before the distributed query and their results are written to @@ -492,6 +503,7 @@ typedef struct DistributedSubPlan uint32 remoteWorkerCount; double durationMillisecs; bool writeLocalFile; + SubPlanExplainOutput totalExplainOutput[MAX_ANALYZE_OUTPUT]; } DistributedSubPlan; diff --git a/src/include/distributed/subplan_execution.h b/src/include/distributed/subplan_execution.h index d68db43ce..045e77bc6 100644 --- a/src/include/distributed/subplan_execution.h +++ b/src/include/distributed/subplan_execution.h @@ -17,7 +17,7 @@ extern int MaxIntermediateResult; extern int SubPlanLevel; -extern void ExecuteSubPlans(DistributedPlan *distributedPlan); +extern void ExecuteSubPlans(DistributedPlan *distributedPlan, bool explainAnalyzeEnabled); /** * IntermediateResultsHashEntry is used to store which nodes need to receive diff --git a/src/test/regress/expected/stat_counters.out b/src/test/regress/expected/stat_counters.out index a27eb3241..25327d4f7 100644 --- a/src/test/regress/expected/stat_counters.out +++ b/src/test/regress/expected/stat_counters.out @@ -721,13 +721,11 @@ CALL exec_query_and_check_query_counters($$ 0, 0 ); -- same with explain analyze --- --- this time, query_execution_multi_shard is incremented twice because of #4212 CALL exec_query_and_check_query_counters($$ EXPLAIN (ANALYZE) SELECT * FROM (SELECT * FROM dist_table OFFSET 0) q $$, - 1, 2 + 1, 1 ); CALL exec_query_and_check_query_counters($$ DELETE FROM dist_table WHERE a = 1 @@ -1041,9 +1039,6 @@ PL/pgSQL function exec_query_and_check_query_counters(text,bigint,bigint) line X -- A similar one but without the insert, so we would normally expect 2 increments -- for query_execution_single_shard and 2 for query_execution_multi_shard instead -- of 3 since the insert is not there anymore. --- --- But this time we observe more counter increments because we execute the subplans --- twice because of #4212. CALL exec_query_and_check_query_counters($$ EXPLAIN (ANALYZE) -- single-shard subplan (whole cte) @@ -1057,7 +1052,7 @@ CALL exec_query_and_check_query_counters($$ FROM (SELECT * FROM dist_table_1 ORDER BY a LIMIT 16) q -- multi-shard subplan (subquery q) JOIN cte ON q.a = cte.a $$, - 3, 4 + 2, 2 ); -- safe to push-down CALL exec_query_and_check_query_counters($$ diff --git a/src/test/regress/sql/stat_counters.sql b/src/test/regress/sql/stat_counters.sql index 3376ba6c7..18f4b8aac 100644 --- a/src/test/regress/sql/stat_counters.sql +++ b/src/test/regress/sql/stat_counters.sql @@ -476,13 +476,11 @@ CALL exec_query_and_check_query_counters($$ ); -- same with explain analyze --- --- this time, query_execution_multi_shard is incremented twice because of #4212 CALL exec_query_and_check_query_counters($$ EXPLAIN (ANALYZE) SELECT * FROM (SELECT * FROM dist_table OFFSET 0) q $$, - 1, 2 + 1, 1 ); CALL exec_query_and_check_query_counters($$ @@ -807,9 +805,6 @@ CALL exec_query_and_check_query_counters($$ -- A similar one but without the insert, so we would normally expect 2 increments -- for query_execution_single_shard and 2 for query_execution_multi_shard instead -- of 3 since the insert is not there anymore. --- --- But this time we observe more counter increments because we execute the subplans --- twice because of #4212. CALL exec_query_and_check_query_counters($$ EXPLAIN (ANALYZE) -- single-shard subplan (whole cte) @@ -823,7 +818,7 @@ CALL exec_query_and_check_query_counters($$ FROM (SELECT * FROM dist_table_1 ORDER BY a LIMIT 16) q -- multi-shard subplan (subquery q) JOIN cte ON q.a = cte.a $$, - 3, 4 + 2, 2 ); -- safe to push-down From 7665ef7b834f9189140c660492ce840ecd6991e7 Mon Sep 17 00:00:00 2001 From: Teja Mupparti Date: Thu, 12 Jun 2025 11:51:26 -0700 Subject: [PATCH 2/3] Remove scanf --- .../distributed/executor/subplan_execution.c | 2 + .../distributed/planner/multi_explain.c | 170 ++++++++---------- .../distributed/utils/citus_copyfuncs.c | 2 + .../distributed/utils/citus_outfuncs.c | 2 + .../distributed/multi_physical_planner.h | 1 + src/test/regress/expected/multi_explain.out | 6 +- 6 files changed, 88 insertions(+), 95 deletions(-) diff --git a/src/backend/distributed/executor/subplan_execution.c b/src/backend/distributed/executor/subplan_execution.c index 10e0fe470..033a21b2b 100644 --- a/src/backend/distributed/executor/subplan_execution.c +++ b/src/backend/distributed/executor/subplan_execution.c @@ -122,6 +122,8 @@ ExecuteSubPlans(DistributedPlan *distributedPlan, bool explainAnalyzeEnabled) subPlan->writeLocalFile = entry->writeLocalFile; SubPlanLevel--; + subPlan->numTasksOutput = NumTasksOutput; + NumTasksOutput = 0; FreeExecutorState(estate); } diff --git a/src/backend/distributed/planner/multi_explain.c b/src/backend/distributed/planner/multi_explain.c index f97887de4..93d1cbba7 100644 --- a/src/backend/distributed/planner/multi_explain.c +++ b/src/backend/distributed/planner/multi_explain.c @@ -301,7 +301,6 @@ CitusExplainScan(CustomScanState *node, List *ancestors, struct ExplainState *es if (distributedPlan->subPlanList != NIL) { ExplainSubPlans(distributedPlan, es); - NumTasksOutput = 0; } ExplainJob(scanState, distributedPlan->workerJob, es, params); @@ -556,7 +555,7 @@ ExplainSubPlans(DistributedPlan *distributedPlan, ExplainState *es) /* Inject the earlier executed results into the newly created tasks */ - if (NumTasksOutput && (queryDesc->planstate != NULL) && + if (subPlan->numTasksOutput && (queryDesc->planstate != NULL) && IsA(queryDesc->planstate, CustomScanState)) { DistributedPlan *newdistributedPlan = @@ -570,7 +569,7 @@ ExplainSubPlans(DistributedPlan *distributedPlan, ExplainState *es) foreach(lc, newdistributedPlan->workerJob->taskList) { if (subPlan->totalExplainOutput[idx].explainOutput && - idx < NumTasksOutput) + idx < subPlan->numTasksOutput) { /* * Now feed the earlier saved output, which will be used @@ -593,6 +592,7 @@ ExplainSubPlans(DistributedPlan *distributedPlan, ExplainState *es) idx++; } + instr.nloops = 1; queryDesc->planstate->instrument = &instr; } @@ -2332,104 +2332,90 @@ elapsed_time(instr_time *starttime) static void ParseExplainAnalyzeOutput(char *explainOutput, Instrumentation *instr) { - char *line = pstrdup(explainOutput); - char *token, - *saveptr; - bool in_wal = false; + char *token, *saveptr; - /* split on spaces, parentheses or newlines */ - for (token = strtok_r(line, " ()\n", &saveptr); - token != NULL; - token = strtok_r(NULL, " ()\n", &saveptr)) - { - if (strcmp(token, "WAL:") == 0) - { - in_wal = true; - continue; - } + /* Validate input */ + if (explainOutput == NULL || instr == NULL) + return; - if (in_wal) - { - if (strncmp(token, "records=", 8) == 0) - instr->walusage.wal_records = - strtoul(token + 8, NULL, 10); - else if (strncmp(token, "bytes=", 6) == 0) - { - instr->walusage.wal_bytes = - strtoul(token + 6, NULL, 10); - /* once we’ve seen bytes=, we can leave WAL mode */ - in_wal = false; - } - continue; - } + char *line = pstrdup(explainOutput); - if (strncmp(token, "time=", 5) == 0) - { - /* token is "time=X..Y" */ - char *p = token + 5; - char *dd = strstr(p, ".."); + bool inWal = false; + bool inResult = false; - if (dd) - { - *dd = '\0'; - instr->startup += strtod(p, NULL) / 1000.0; - instr->total += strtod(dd + 2, NULL) / 1000.0; - } - } - else if (strncmp(token, "rows=", 5) == 0) - { - instr->ntuples += strtol(token + 5, NULL, 10); - } - else if (strncmp(token, "loops=", 6) == 0) - { - instr->nloops = strtol(token + 6, NULL, 10); - } - } - - pfree(line); -} - -#if 0 -/* - * ParseExplainAnalyzeOutput - */ -static void -ParseExplainAnalyzeOutput(char *explainOutput, Instrumentation *instr) -{ - double start_ms = 0.0, end_ms = 0.0; - int rows = 0, loops = 0; - - /* 1) Extract “actual time=XXX..YYY rows=R loops=L” */ - if (sscanf(explainOutput, "%*[^=]=%lf..%lf rows=%d loops=%d", - &start_ms, &end_ms, &rows, &loops) == 4) + /* split on spaces, parentheses or newlines */ + for (token = strtok_r(line, " ()\n", &saveptr); + token != NULL; + token = strtok_r(NULL, " ()\n", &saveptr)) { - /* times in ms, convert to seconds */ - instr->startup += start_ms / 1000.0; - instr->total += end_ms / 1000.0; - instr->ntuples += (double) rows; - instr->nloops = (double) loops; - } - else if (sscanf(explainOutput, "%*[^(\n](actual rows=%d loops=%d)", &rows, &loops) == 2) - { - /* no timing present, just capture rows & loops */ - instr->ntuples += (double) rows; - instr->nloops = (double) loops; - } - - /* 2) Look for “WAL: records=X bytes=Y” */ - const char *wal = strstr(explainOutput, "WAL:"); - if (wal) - { - int recs = 0, bytes = 0; - if (sscanf(wal, "WAL: records=%d bytes=%d", &recs, &bytes) == 2) + if (strcmp(token, "WAL:") == 0) { - instr->walusage.wal_records += recs; - instr->walusage.wal_bytes += bytes; + inWal = true; + continue; + } + + if (strcmp(token, "Result") == 0) + { + inResult = true; + continue; + } + + /* Reset Result flag when we see "actual" - but only skip if we're in Result mode */ + if (strcmp(token, "actual") == 0) + { + /* If we were in Result mode, the next tokens should be skipped */ + /* If we weren't in Result mode, continue normally */ + continue; + } + + if (inWal) + { + if (strncmp(token, "records=", 8) == 0) + instr->walusage.wal_records += strtoul(token + 8, NULL, 10); + else if (strncmp(token, "bytes=", 6) == 0) + { + instr->walusage.wal_bytes += strtoul(token + 6, NULL, 10); + /* once we've seen bytes=, we can leave WAL mode */ + inWal = false; + } + continue; + } + + /* Skip Result node's actual timing data */ + if (inResult) + { + if (strncmp(token, "time=", 5) == 0 || + strncmp(token, "rows=", 5) == 0 || + strncmp(token, "loops=", 6) == 0) + { + /* If this is loops=, we've seen all Result data */ + if (strncmp(token, "loops=", 6) == 0) + inResult = false; + continue; + } + } + + if (strncmp(token, "time=", 5) == 0) + { + /* token is "time=X..Y" */ + char *timeStr = token + 5; + char *doubleDot = strstr(timeStr, ".."); + if (doubleDot) + { + *doubleDot = '\0'; + instr->startup += strtod(timeStr, NULL) / 1000.0; + instr->total += strtod(doubleDot + 2, NULL) / 1000.0; + } + } + else if (strncmp(token, "rows=", 5) == 0) + { + instr->ntuples += strtol(token + 5, NULL, 10); + break; /* We are done for this Task */ } } -} -#endif + pfree(line); +} #if PG_VERSION_NUM >= PG_VERSION_17 /* diff --git a/src/backend/distributed/utils/citus_copyfuncs.c b/src/backend/distributed/utils/citus_copyfuncs.c index 81775c6db..d0b97de53 100644 --- a/src/backend/distributed/utils/citus_copyfuncs.c +++ b/src/backend/distributed/utils/citus_copyfuncs.c @@ -166,6 +166,8 @@ CopyNodeDistributedSubPlan(COPYFUNC_ARGS) /* copy the totalReceivedTupleData (uint64) */ COPY_SCALAR_FIELD(totalExplainOutput[i].totalReceivedTupleData); } + + COPY_SCALAR_FIELD(numTasksOutput); } diff --git a/src/backend/distributed/utils/citus_outfuncs.c b/src/backend/distributed/utils/citus_outfuncs.c index 451bc279e..aac0c60e6 100644 --- a/src/backend/distributed/utils/citus_outfuncs.c +++ b/src/backend/distributed/utils/citus_outfuncs.c @@ -252,6 +252,8 @@ OutDistributedSubPlan(OUTFUNC_ARGS) appendStringInfoChar(str, ')'); } + WRITE_INT_FIELD(numTasksOutput); + appendStringInfoChar(str, ']'); } diff --git a/src/include/distributed/multi_physical_planner.h b/src/include/distributed/multi_physical_planner.h index fbb13fb71..d6b0cc36c 100644 --- a/src/include/distributed/multi_physical_planner.h +++ b/src/include/distributed/multi_physical_planner.h @@ -504,6 +504,7 @@ typedef struct DistributedSubPlan double durationMillisecs; bool writeLocalFile; SubPlanExplainOutput totalExplainOutput[MAX_ANALYZE_OUTPUT]; + uint32 numTasksOutput; } DistributedSubPlan; diff --git a/src/test/regress/expected/multi_explain.out b/src/test/regress/expected/multi_explain.out index bfcf29c4d..91ee0ebf3 100644 --- a/src/test/regress/expected/multi_explain.out +++ b/src/test/regress/expected/multi_explain.out @@ -2492,12 +2492,12 @@ Custom Scan (Citus Adaptive) (actual rows=1 loops=1) -> Distributed Subplan XXX_1 Intermediate Data Size: 100 bytes Result destination: Write locally - -> Custom Scan (Citus Adaptive) (actual rows=20 loops=1) + -> Custom Scan (Citus Adaptive) (actual rows=10 loops=1) Task Count: 4 - Tuple data received from nodes: 160 bytes + Tuple data received from nodes: 80 bytes Tasks Shown: One of 4 -> Task - Tuple data received from node: 64 bytes + Tuple data received from node: 32 bytes Node: host=localhost port=xxxxx dbname=regression -> Insert on dist_table_570017 citus_table_alias (actual rows=8 loops=1) -> Seq Scan on dist_table_570017 dist_table (actual rows=8 loops=1) From e0a3d2266257aa60d0bb6e2c32bef78475a4048b Mon Sep 17 00:00:00 2001 From: Teja Mupparti Date: Fri, 11 Jul 2025 09:16:45 -0700 Subject: [PATCH 3/3] Implement comments --- .../distributed/executor/adaptive_executor.c | 8 +- .../executor/insert_select_executor.c | 3 +- .../distributed/executor/merge_executor.c | 3 +- .../distributed/executor/subplan_execution.c | 21 +- .../distributed/planner/multi_explain.c | 254 +++++++++++------- src/test/regress/expected/multi_explain.out | 41 +++ src/test/regress/sql/multi_explain.sql | 20 ++ 7 files changed, 242 insertions(+), 108 deletions(-) diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index 9b31fbac1..4854676e1 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -809,13 +809,7 @@ AdaptiveExecutor(CitusScanState *scanState) bool localExecutionSupported = true; - /* - * When running a distributed plan—either the root plan or a subplan’s - * distributed fragment—we need to know if we’re under EXPLAIN ANALYZE. - * Subplans can’t receive the EXPLAIN ANALYZE flag directly, so we use - * SubPlanExplainAnalyzeContext as a flag to indicate that context. - */ - if (RequestedForExplainAnalyze(scanState) || SubPlanExplainAnalyzeContext) + if (RequestedForExplainAnalyze(scanState)) { /* * We use multiple queries per task in EXPLAIN ANALYZE which need to diff --git a/src/backend/distributed/executor/insert_select_executor.c b/src/backend/distributed/executor/insert_select_executor.c index bf5886c44..58c172c66 100644 --- a/src/backend/distributed/executor/insert_select_executor.c +++ b/src/backend/distributed/executor/insert_select_executor.c @@ -42,6 +42,7 @@ #include "distributed/merge_planner.h" #include "distributed/metadata_cache.h" #include "distributed/multi_executor.h" +#include "distributed/multi_explain.h" #include "distributed/multi_partitioning_utils.h" #include "distributed/multi_physical_planner.h" #include "distributed/multi_router_planner.h" @@ -121,7 +122,7 @@ NonPushableInsertSelectExecScan(CustomScanState *node) bool binaryFormat = CanUseBinaryCopyFormatForTargetList(selectQuery->targetList); - ExecuteSubPlans(distSelectPlan, false); + ExecuteSubPlans(distSelectPlan, RequestedForExplainAnalyze(scanState)); /* * We have a separate directory for each transaction, so choosing diff --git a/src/backend/distributed/executor/merge_executor.c b/src/backend/distributed/executor/merge_executor.c index 8b7321d9b..56bde62bc 100644 --- a/src/backend/distributed/executor/merge_executor.c +++ b/src/backend/distributed/executor/merge_executor.c @@ -23,6 +23,7 @@ #include "distributed/merge_executor.h" #include "distributed/merge_planner.h" #include "distributed/multi_executor.h" +#include "distributed/multi_explain.h" #include "distributed/multi_partitioning_utils.h" #include "distributed/multi_router_planner.h" #include "distributed/repartition_executor.h" @@ -132,7 +133,7 @@ ExecuteSourceAtWorkerAndRepartition(CitusScanState *scanState) ereport(DEBUG1, (errmsg("Executing subplans of the source query and " "storing the results at the respective node(s)"))); - ExecuteSubPlans(distSourcePlan, false); + ExecuteSubPlans(distSourcePlan, RequestedForExplainAnalyze(scanState)); /* * We have a separate directory for each transaction, so choosing diff --git a/src/backend/distributed/executor/subplan_execution.c b/src/backend/distributed/executor/subplan_execution.c index 033a21b2b..cee66eb2b 100644 --- a/src/backend/distributed/executor/subplan_execution.c +++ b/src/backend/distributed/executor/subplan_execution.c @@ -63,6 +63,10 @@ ExecuteSubPlans(DistributedPlan *distributedPlan, bool explainAnalyzeEnabled) { SubPlanExplainAnalyzeContext = GetMemoryChunkContext(distributedPlan); } + else + { + SubPlanExplainAnalyzeContext = NULL; + } HTAB *intermediateResultsHash = MakeIntermediateResultHTAB(); RecordSubplanExecutionsOnNodes(intermediateResultsHash, distributedPlan); @@ -103,7 +107,17 @@ ExecuteSubPlans(DistributedPlan *distributedPlan, bool explainAnalyzeEnabled) TimestampTz startTimestamp = GetCurrentTimestamp(); - ExecutePlanIntoDestReceiver(plannedStmt, params, copyDest); + PG_TRY(); + { + ExecutePlanIntoDestReceiver(plannedStmt, params, copyDest); + } + PG_CATCH(); + { + SubPlanExplainAnalyzeContext = NULL; + PG_RE_THROW(); + } + PG_END_TRY(); + /* * EXPLAIN ANALYZE instrumentations. Calculating these are very light-weight, @@ -127,8 +141,5 @@ ExecuteSubPlans(DistributedPlan *distributedPlan, bool explainAnalyzeEnabled) FreeExecutorState(estate); } - if (explainAnalyzeEnabled) - { - SubPlanExplainAnalyzeContext = NULL; - } + SubPlanExplainAnalyzeContext = NULL; } diff --git a/src/backend/distributed/planner/multi_explain.c b/src/backend/distributed/planner/multi_explain.c index 93d1cbba7..aa092cdea 100644 --- a/src/backend/distributed/planner/multi_explain.c +++ b/src/backend/distributed/planner/multi_explain.c @@ -26,6 +26,7 @@ #include "commands/tablecmds.h" #include "executor/tstoreReceiver.h" #include "lib/stringinfo.h" +#include "nodes/nodeFuncs.h" #include "nodes/plannodes.h" #include "nodes/primnodes.h" #include "nodes/print.h" @@ -68,6 +69,7 @@ #include "distributed/placement_connection.h" #include "distributed/recursive_planning.h" #include "distributed/remote_commands.h" +#include "distributed/subplan_execution.h" #include "distributed/tuple_destination.h" #include "distributed/tuplestore.h" #include "distributed/version_compat.h" @@ -164,6 +166,7 @@ static void ExplainIndentText(ExplainState *es); static void ExplainPrintSerialize(ExplainState *es, SerializeMetrics *metrics); static SerializeMetrics GetSerializationMetrics(DestReceiver *dest); +static void ExtractAnalyzeStats(DistributedSubPlan *subPlan, PlanState *planState); /* * DestReceiver functions for SERIALIZE option @@ -213,7 +216,8 @@ static const char * ExplainFormatStr(ExplainFormat format); #if PG_VERSION_NUM >= PG_VERSION_17 static const char * ExplainSerializeStr(ExplainSerializeOption serializeOption); #endif -static void ExplainWorkerPlan(PlannedStmt *plannedStmt, DestReceiver *dest, +static void ExplainWorkerPlan(PlannedStmt *plannedStmt, DistributedSubPlan *subPlan, + DestReceiver *dest, ExplainState *es, const char *queryString, ParamListInfo params, QueryEnvironment *queryEnv, @@ -431,6 +435,63 @@ NonPushableMergeCommandExplainScan(CustomScanState *node, List *ancestors, } +/* + * ExtractAnalyzeStats parses the EXPLAIN ANALYZE output of the pre-executed + * subplans and injects the parsed statistics into queryDesc->planstate->instrument. + */ +static void +ExtractAnalyzeStats(DistributedSubPlan *subPlan, PlanState *planState) +{ + if (!planState || !IsA(planState, CustomScanState)) + { + return; + } + + /* Inject the earlier executed results into the newly created tasks */ + if (subPlan->numTasksOutput) + { + DistributedPlan *newdistributedPlan = + ((CitusScanState *) planState)->distributedPlan; + + ListCell *lc; + int idx = 0; + + /* We need to extract this from the explain output of workers */ + Instrumentation *instr = planState->instrument; + memset(instr, 0, sizeof(Instrumentation)); + + foreach(lc, newdistributedPlan->workerJob->taskList) + { + if (subPlan->totalExplainOutput[idx].explainOutput && + idx < subPlan->numTasksOutput) + { + /* + * Now feed the earlier saved output, which will be used + * by RemoteExplain() when printing tasks + */ + Task *task = (Task *) lfirst(lc); + MemoryContext taskContext = GetMemoryChunkContext(task); + + task->totalReceivedTupleData = + subPlan->totalExplainOutput[idx].totalReceivedTupleData; + task->fetchedExplainAnalyzeExecutionDuration = + subPlan->totalExplainOutput[idx].executionDuration; + task->fetchedExplainAnalyzePlan = + MemoryContextStrdup(taskContext, + subPlan->totalExplainOutput[idx].explainOutput); + ParseExplainAnalyzeOutput(task->fetchedExplainAnalyzePlan, instr); + + subPlan->totalExplainOutput[idx].explainOutput = NULL; + } + + idx++; + } + + planState->instrument->nloops = 1; /* TODO */ + } +} + + /* * ExplainSubPlans generates EXPLAIN output for subplans for CTEs * and complex subqueries. Because the planning for these queries @@ -449,7 +510,6 @@ ExplainSubPlans(DistributedPlan *distributedPlan, ExplainState *es) { DistributedSubPlan *subPlan = (DistributedSubPlan *) lfirst(subPlanCell); PlannedStmt *plan = subPlan->plan; - IntoClause *into = NULL; ParamListInfo params = NULL; /* @@ -457,9 +517,25 @@ ExplainSubPlans(DistributedPlan *distributedPlan, ExplainState *es) * for now we put an empty string, which is valid according to postgres. */ char *queryString = pstrdup(""); + instr_time planDuration; BufferUsage bufusage_start, bufusage; +#if PG_VERSION_NUM >= PG_VERSION_17 + MemoryContextCounters mem_counters; + MemoryContext planner_ctx = NULL; + MemoryContext saved_ctx = NULL; + + if (es->memory) + { + /* copy paste from postgres code */ + planner_ctx = AllocSetContextCreate(CurrentMemoryContext, + "explain analyze planner context", + ALLOCSET_DEFAULT_SIZES); + saved_ctx = MemoryContextSwitchTo(planner_ctx); + } +#endif + if (es->buffers) { bufusage_start = pgBufferUsage; @@ -506,6 +582,8 @@ ExplainSubPlans(DistributedPlan *distributedPlan, ExplainState *es) ExplainPropertyText("Result destination", destination->data, es); } + INSTR_TIME_SET_ZERO(planDuration); + /* calc differences of buffer counters. */ if (es->buffers) { @@ -515,100 +593,37 @@ ExplainSubPlans(DistributedPlan *distributedPlan, ExplainState *es) ExplainOpenGroup("PlannedStmt", "PlannedStmt", false, es); - /* Print only and not execute */ - DestReceiver *dest; - if (into) + /* TODO: Do we really need dest and bound params when not executing? */ + DestReceiver *dest = None_Receiver; + double executionDurationMillisec = 0.0; + +#if PG_VERSION_NUM >= PG_VERSION_17 + if (es->memory) { - dest = CreateIntoRelDestReceiver(into); - } - else - { - dest = None_Receiver; - } - - int instrument_option = 0; - - if (es->analyze && es->timing) - { - instrument_option |= INSTRUMENT_TIMER; - } - else if (es->analyze) - { - instrument_option |= INSTRUMENT_ROWS; + MemoryContextSwitchTo(saved_ctx); + MemoryContextMemConsumed(planner_ctx, &mem_counters); } + /* calc differences of buffer counters. */ if (es->buffers) { - instrument_option |= INSTRUMENT_BUFFERS; - } - if (es->wal) - { - instrument_option |= INSTRUMENT_WAL; + memset(&bufusage, 0, sizeof(BufferUsage)); + BufferUsageAccumDiff(&bufusage, &pgBufferUsage, &bufusage_start); } - /* Create a QueryDesc for the query */ - QueryDesc *queryDesc = - CreateQueryDesc(plan, queryString, GetActiveSnapshot(), - InvalidSnapshot, dest, params, NULL, instrument_option); + /* do the actual EXPLAIN ANALYZE */ + ExplainWorkerPlan(plan, subPlan, dest, es, queryString, params, NULL, + &planDuration, + (es->buffers ? &bufusage : NULL), + (es->memory ? &mem_counters : NULL), + &executionDurationMillisec); +#else - ExecutorStart(queryDesc, EXEC_FLAG_EXPLAIN_ONLY); + /* do the actual EXPLAIN ANALYZE */ + ExplainWorkerPlan(plan, subPlan, dest, es, queryString, params, NULL, + &planDuration, &executionDurationMillisec); +#endif - /* Inject the earlier executed results into the newly created tasks */ - - if (subPlan->numTasksOutput && (queryDesc->planstate != NULL) && - IsA(queryDesc->planstate, CustomScanState)) - { - DistributedPlan *newdistributedPlan = - ((CitusScanState *) queryDesc->planstate)->distributedPlan; - - ListCell *lc; - int idx = 0; - - /* We need to extract this from the explain output of workers */ - Instrumentation instr = { 0 }; - foreach(lc, newdistributedPlan->workerJob->taskList) - { - if (subPlan->totalExplainOutput[idx].explainOutput && - idx < subPlan->numTasksOutput) - { - /* - * Now feed the earlier saved output, which will be used - * by RemoteExplain() when printing tasks - */ - Task *task = (Task *) lfirst(lc); - MemoryContext taskContext = GetMemoryChunkContext(task); - - task->totalReceivedTupleData = - subPlan->totalExplainOutput[idx].totalReceivedTupleData; - task->fetchedExplainAnalyzeExecutionDuration = - subPlan->totalExplainOutput[idx].executionDuration; - task->fetchedExplainAnalyzePlan = - MemoryContextStrdup(taskContext, - subPlan->totalExplainOutput[idx].explainOutput); - ParseExplainAnalyzeOutput(task->fetchedExplainAnalyzePlan, &instr); - - subPlan->totalExplainOutput[idx].explainOutput = NULL; - } - - idx++; - } - instr.nloops = 1; - queryDesc->planstate->instrument = &instr; - } - - ExplainOpenGroup("Query", NULL, true, es); - - ExplainPrintPlan(es, queryDesc); - - if (es->analyze) - { - ExplainPrintTriggers(es, queryDesc); - } - - ExecutorEnd(queryDesc); - FreeQueryDesc(queryDesc); - - ExplainCloseGroup("Query", NULL, true, es); ExplainCloseGroup("PlannedStmt", "PlannedStmt", false, es); ExplainCloseGroup("Subplan", NULL, true, es); @@ -1424,7 +1439,7 @@ worker_save_query_explain_analyze(PG_FUNCTION_ARGS) } /* do the actual EXPLAIN ANALYZE */ - ExplainWorkerPlan(plan, tupleStoreDest, es, queryString, boundParams, NULL, + ExplainWorkerPlan(plan, NULL, tupleStoreDest, es, queryString, boundParams, NULL, &planDuration, (es->buffers ? &bufusage : NULL), (es->memory ? &mem_counters : NULL), @@ -1432,7 +1447,7 @@ worker_save_query_explain_analyze(PG_FUNCTION_ARGS) #else /* do the actual EXPLAIN ANALYZE */ - ExplainWorkerPlan(plan, tupleStoreDest, es, queryString, boundParams, NULL, + ExplainWorkerPlan(plan, NULL, tupleStoreDest, es, queryString, boundParams, NULL, &planDuration, &executionDurationMillisec); #endif @@ -1813,7 +1828,14 @@ ExplainAnalyzeDestTupleDescForQuery(TupleDestination *self, int queryNumber) bool RequestedForExplainAnalyze(CitusScanState *node) { - return (node->customScanState.ss.ps.state->es_instrument != 0); + /* + * When running a distributed plan—either the root plan or a subplan’s + * distributed fragment—we need to know if we’re under EXPLAIN ANALYZE. + * Subplans can’t receive the EXPLAIN ANALYZE flag directly, so we use + * SubPlanExplainAnalyzeContext as a flag to indicate that context. + */ + return (node->customScanState.ss.ps.state->es_instrument != 0) || + (SubPlanLevel > 0 && SubPlanExplainAnalyzeContext); } @@ -2126,6 +2148,19 @@ ExplainOneQuery(Query *query, int cursorOptions, } +static bool +extract_analyze_walker(PlanState *ps, void *ctx) +{ + DistributedSubPlan *subplan = (DistributedSubPlan *) ctx; + + /* call your extractor on every node */ + ExtractAnalyzeStats(subplan, ps); + + /* return false to keep recursing into children */ + return false; +} + + /* * ExplainWorkerPlan produces explain output into es. If es->analyze, it also executes * the given plannedStmt and sends the results to dest. It puts total time to execute in @@ -2140,7 +2175,7 @@ ExplainOneQuery(Query *query, int cursorOptions, * destination. */ static void -ExplainWorkerPlan(PlannedStmt *plannedstmt, DestReceiver *dest, ExplainState *es, +ExplainWorkerPlan(PlannedStmt *plannedstmt, DistributedSubPlan *subPlan, DestReceiver *dest, ExplainState *es, const char *queryString, ParamListInfo params, QueryEnvironment *queryEnv, const instr_time *planduration, #if PG_VERSION_NUM >= PG_VERSION_17 @@ -2154,6 +2189,8 @@ ExplainWorkerPlan(PlannedStmt *plannedstmt, DestReceiver *dest, ExplainState *es double totaltime = 0; int eflags; int instrument_option = 0; + /* Never executed a sub-plan, it's already done */ + bool executeQuery = (es->analyze && !subPlan); Assert(plannedstmt->commandType != CMD_UTILITY); @@ -2188,7 +2225,7 @@ ExplainWorkerPlan(PlannedStmt *plannedstmt, DestReceiver *dest, ExplainState *es dest, params, queryEnv, instrument_option); /* Select execution options */ - if (es->analyze) + if (executeQuery) eflags = 0; /* default run-to-completion flags */ else eflags = EXEC_FLAG_EXPLAIN_ONLY; @@ -2197,7 +2234,7 @@ ExplainWorkerPlan(PlannedStmt *plannedstmt, DestReceiver *dest, ExplainState *es ExecutorStart(queryDesc, eflags); /* Execute the plan for statistics if asked for */ - if (es->analyze) + if (executeQuery) { ScanDirection dir = ForwardScanDirection; @@ -2213,6 +2250,14 @@ ExplainWorkerPlan(PlannedStmt *plannedstmt, DestReceiver *dest, ExplainState *es ExplainOpenGroup("Query", NULL, true, es); + if (subPlan) + { + ExtractAnalyzeStats(subPlan, queryDesc->planstate); + /* …then, once you’ve got your top‐level PlanState (e.g. planstate of your subplan): */ + planstate_tree_walker(queryDesc->planstate, extract_analyze_walker, (void *) subPlan); + + } + /* Create textual dump of plan tree */ ExplainPrintPlan(es, queryDesc); @@ -2292,7 +2337,7 @@ ExplainWorkerPlan(PlannedStmt *plannedstmt, DestReceiver *dest, ExplainState *es PopActiveSnapshot(); /* We need a CCI just in case query expanded to multiple plans */ - if (es->analyze) + if (executeQuery) CommandCounterIncrement(); totaltime += elapsed_time(&starttime); @@ -2329,9 +2374,29 @@ elapsed_time(instr_time *starttime) } +/* + * ParseExplainAnalyzeOutput + * + * Parses the output of an EXPLAIN ANALYZE run to extract ANALYZE statistics + * and other instrumentation data. + * + * Parameters: + * explainOutput - a null-terminated string containing the raw EXPLAIN ANALYZE output + * instr - pointer to an Instrumentation struct to accumulate parsed metrics + * + * Behavior: + * - Validates inputs and returns immediately if either pointer is NULL. + * - Tokenizes the output on spaces, parentheses, and newlines. + * - Populates the instr-><..> fields. + * + * Notes: + * - Caller must free or manage the memory for explainOutput. + * - Parsing is case-sensitive and assumes standard EXPLAIN ANALYZE formatting. + */ static void ParseExplainAnalyzeOutput(char *explainOutput, Instrumentation *instr) { + /* elog(NOTICE, "Parsing :%s ", explainOutput); */ char *token, *saveptr; /* Validate input */ @@ -2417,6 +2482,7 @@ ParseExplainAnalyzeOutput(char *explainOutput, Instrumentation *instr) pfree(line); } + #if PG_VERSION_NUM >= PG_VERSION_17 /* * Return whether show_buffer_usage would have anything to print, if given diff --git a/src/test/regress/expected/multi_explain.out b/src/test/regress/expected/multi_explain.out index 91ee0ebf3..8d980a711 100644 --- a/src/test/regress/expected/multi_explain.out +++ b/src/test/regress/expected/multi_explain.out @@ -3228,6 +3228,47 @@ Custom Scan (Citus Adaptive) (actual rows=0 loops=1) -> Update on tbl_570036 tbl (actual rows=0 loops=1) -> Seq Scan on tbl_570036 tbl (actual rows=0 loops=1) Filter: (a = 1) +-- EXPLAIN ANALYZE shouldn't execute SubPlans twice (bug #4212) +SET search_path TO multi_explain; +CREATE TABLE test_subplans (x int primary key, y int); +SELECT create_distributed_table('test_subplans','x'); + +EXPLAIN (COSTS off, ANALYZE on, TIMING off, SUMMARY off) +WITH a AS (INSERT INTO test_subplans VALUES (1,2) RETURNING *) +SELECT * FROM a; +Custom Scan (Citus Adaptive) (actual rows=1 loops=1) + -> Distributed Subplan XXX_1 + Intermediate Data Size: 18 bytes + Result destination: Write locally + -> Custom Scan (Citus Adaptive) (actual rows=1 loops=1) + Task Count: 1 + Tuple data received from nodes: 16 bytes + Tasks Shown: All + -> Task + Tuple data received from node: 16 bytes + Node: host=localhost port=xxxxx dbname=regression + -> Insert on test_subplans_570038 (actual rows=1 loops=1) + -> Result (actual rows=1 loops=1) + Task Count: 1 + Tuple data received from nodes: 8 bytes + Tasks Shown: All + -> Task + Tuple data received from node: 8 bytes + Node: host=localhost port=xxxxx dbname=regression + -> Function Scan on read_intermediate_result intermediate_result (actual rows=1 loops=1) +-- Only one row must exist +SELECT * FROM test_subplans; +1|2 +-- Will fail with duplicate pk +EXPLAIN (COSTS off, ANALYZE on, TIMING off, SUMMARY off) +WITH a AS (INSERT INTO test_subplans VALUES (1,2) RETURNING *) +SELECT * FROM a; +ERROR: duplicate key value violates unique constraint "test_subplans_pkey_570038" +DETAIL: Key (x)=(1) already exists. +CONTEXT: while executing command on localhost:xxxxx +-- Only one row must exist +SELECT * FROM test_subplans; +1|2 -- check when auto explain + analyze is enabled, we do not allow local execution. CREATE SCHEMA test_auto_explain; SET search_path TO 'test_auto_explain'; diff --git a/src/test/regress/sql/multi_explain.sql b/src/test/regress/sql/multi_explain.sql index 65ca6f5da..762f725e0 100644 --- a/src/test/regress/sql/multi_explain.sql +++ b/src/test/regress/sql/multi_explain.sql @@ -1166,6 +1166,26 @@ PREPARE q2(int_wrapper_type) AS WITH a AS (UPDATE tbl SET b = $1 WHERE a = 1 RET EXPLAIN (COSTS false) EXECUTE q2('(1)'); EXPLAIN :default_analyze_flags EXECUTE q2('(1)'); +-- EXPLAIN ANALYZE shouldn't execute SubPlans twice (bug #4212) +SET search_path TO multi_explain; +CREATE TABLE test_subplans (x int primary key, y int); +SELECT create_distributed_table('test_subplans','x'); + +EXPLAIN (COSTS off, ANALYZE on, TIMING off, SUMMARY off) +WITH a AS (INSERT INTO test_subplans VALUES (1,2) RETURNING *) +SELECT * FROM a; + +-- Only one row must exist +SELECT * FROM test_subplans; + +-- Will fail with duplicate pk +EXPLAIN (COSTS off, ANALYZE on, TIMING off, SUMMARY off) +WITH a AS (INSERT INTO test_subplans VALUES (1,2) RETURNING *) +SELECT * FROM a; + +-- Only one row must exist +SELECT * FROM test_subplans; + -- check when auto explain + analyze is enabled, we do not allow local execution. CREATE SCHEMA test_auto_explain; SET search_path TO 'test_auto_explain';