EXPLAIN ANALYZE - Prevent execution of the plan during the plan-print (#8017)

DESCRIPTION: Fixed a bug in EXPLAIN ANALYZE to prevent unintended (duplicate) execution of the (sub)plans during the explain phase.

Fixes #4212 

### 🐞 Bug #4212 : Redundant (Subplan) Execution in `EXPLAIN ANALYZE`
codepath

#### 🔍 Background
In the standard PostgreSQL execution path, `ExplainOnePlan()` is
responsible for two distinct operations depending on whether `EXPLAIN
ANALYZE` is requested:

1. **Execute the plan**

   ```c
   if (es->analyze)
       ExecutorRun(queryDesc, direction, 0L, true);
   ```

2. **Print the plan tree** 

   ```c
   ExplainPrintPlan(es, queryDesc);
   ```

When printing the plan, the executor should **not run the plan again**.
Execution is only expected to happen once—at the top level when
`es->analyze = true`.

---

#### ⚠️ Issue in Citus

In the Citus implementation of `CustomScanMethods.ExplainCustomScan =
CitusExplainScan`, which is a custom scan explain callback function used
to print explain information of a Citus plan incorrectly performs
**redundant execution** inside the explain path of `ExplainPrintPlan()`

```c
ExplainOnePlan()
  ExplainPrintPlan()
      ExplainNode()
        CitusExplainScan()
          if (distributedPlan->subPlanList != NIL)
          {
              ExplainSubPlans(distributedPlan, es);
             {
              PlannedStmt *plan = subPlan->plan;
              ExplainOnePlan(plan, ...);  // ⚠️ May re-execute subplan if es->analyze is true
             }
         }
```
This causes the subplans to be **executed again**, even though they have
already been executed during the top-level plan execution. This behavior
violates the expectation in PostgreSQL where `EXPLAIN ANALYZE` should
**execute each node exactly once** for analysis.

---
####  Fix (proposed)
Save the output of Subplans during `ExecuteSubPlans()`, and later use it
in `ExplainSubPlans()`
pull/8085/head
Teja Mupparti 2025-07-30 11:29:50 -07:00 committed by GitHub
parent f31bcb4219
commit 889aa92ac0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
22 changed files with 775 additions and 81 deletions

View File

@ -760,7 +760,7 @@ AdaptiveExecutorPreExecutorRun(CitusScanState *scanState)
*/ */
LockPartitionsForDistributedPlan(distributedPlan); LockPartitionsForDistributedPlan(distributedPlan);
ExecuteSubPlans(distributedPlan); ExecuteSubPlans(distributedPlan, RequestedForExplainAnalyze(scanState));
scanState->finishedPreScan = true; scanState->finishedPreScan = true;
} }

View File

@ -42,6 +42,7 @@
#include "distributed/merge_planner.h" #include "distributed/merge_planner.h"
#include "distributed/metadata_cache.h" #include "distributed/metadata_cache.h"
#include "distributed/multi_executor.h" #include "distributed/multi_executor.h"
#include "distributed/multi_explain.h"
#include "distributed/multi_partitioning_utils.h" #include "distributed/multi_partitioning_utils.h"
#include "distributed/multi_physical_planner.h" #include "distributed/multi_physical_planner.h"
#include "distributed/multi_router_planner.h" #include "distributed/multi_router_planner.h"
@ -121,7 +122,7 @@ NonPushableInsertSelectExecScan(CustomScanState *node)
bool binaryFormat = bool binaryFormat =
CanUseBinaryCopyFormatForTargetList(selectQuery->targetList); CanUseBinaryCopyFormatForTargetList(selectQuery->targetList);
ExecuteSubPlans(distSelectPlan); ExecuteSubPlans(distSelectPlan, RequestedForExplainAnalyze(scanState));
/* /*
* We have a separate directory for each transaction, so choosing * We have a separate directory for each transaction, so choosing

View File

@ -23,6 +23,7 @@
#include "distributed/merge_executor.h" #include "distributed/merge_executor.h"
#include "distributed/merge_planner.h" #include "distributed/merge_planner.h"
#include "distributed/multi_executor.h" #include "distributed/multi_executor.h"
#include "distributed/multi_explain.h"
#include "distributed/multi_partitioning_utils.h" #include "distributed/multi_partitioning_utils.h"
#include "distributed/multi_router_planner.h" #include "distributed/multi_router_planner.h"
#include "distributed/repartition_executor.h" #include "distributed/repartition_executor.h"
@ -132,7 +133,7 @@ ExecuteSourceAtWorkerAndRepartition(CitusScanState *scanState)
ereport(DEBUG1, (errmsg("Executing subplans of the source query and " ereport(DEBUG1, (errmsg("Executing subplans of the source query and "
"storing the results at the respective node(s)"))); "storing the results at the respective node(s)")));
ExecuteSubPlans(distSourcePlan); ExecuteSubPlans(distSourcePlan, RequestedForExplainAnalyze(scanState));
/* /*
* We have a separate directory for each transaction, so choosing * We have a separate directory for each transaction, so choosing

View File

@ -688,7 +688,7 @@ ExecuteQueryIntoDestReceiver(Query *query, ParamListInfo params, DestReceiver *d
* ExecutePlanIntoDestReceiver executes a query plan and sends results to the given * ExecutePlanIntoDestReceiver executes a query plan and sends results to the given
* DestReceiver. * DestReceiver.
*/ */
void uint64
ExecutePlanIntoDestReceiver(PlannedStmt *queryPlan, ParamListInfo params, ExecutePlanIntoDestReceiver(PlannedStmt *queryPlan, ParamListInfo params,
DestReceiver *dest) DestReceiver *dest)
{ {
@ -713,6 +713,8 @@ ExecutePlanIntoDestReceiver(PlannedStmt *queryPlan, ParamListInfo params,
PortalStart(portal, params, eflags, GetActiveSnapshot()); PortalStart(portal, params, eflags, GetActiveSnapshot());
QueryCompletion qc = { 0 };
#if PG_VERSION_NUM >= PG_VERSION_18 #if PG_VERSION_NUM >= PG_VERSION_18
/* PG 18+: six-arg signature (drop the run_once bool) */ /* PG 18+: six-arg signature (drop the run_once bool) */
@ -721,7 +723,7 @@ ExecutePlanIntoDestReceiver(PlannedStmt *queryPlan, ParamListInfo params,
false, /* isTopLevel */ false, /* isTopLevel */
dest, /* DestReceiver *dest */ dest, /* DestReceiver *dest */
dest, /* DestReceiver *altdest */ dest, /* DestReceiver *altdest */
NULL); /* QueryCompletion *qc */ &qc); /* QueryCompletion *qc */
#else #else
/* PG 17-: original seven-arg signature */ /* PG 17-: original seven-arg signature */
@ -731,10 +733,12 @@ ExecutePlanIntoDestReceiver(PlannedStmt *queryPlan, ParamListInfo params,
true, /* run_once */ true, /* run_once */
dest, /* DestReceiver *dest */ dest, /* DestReceiver *dest */
dest, /* DestReceiver *altdest */ dest, /* DestReceiver *altdest */
NULL); /* QueryCompletion *qc */ &qc); /* QueryCompletion *qc */
#endif #endif
PortalDrop(portal, false); PortalDrop(portal, false);
return qc.nprocessed;
} }

View File

@ -30,13 +30,22 @@ int MaxIntermediateResult = 1048576; /* maximum size in KB the intermediate resu
/* when this is true, we enforce intermediate result size limit in all executors */ /* when this is true, we enforce intermediate result size limit in all executors */
int SubPlanLevel = 0; int SubPlanLevel = 0;
/*
* SubPlanExplainAnalyzeContext is both a memory context for storing
* subplans EXPLAIN ANALYZE output and a flag indicating that execution
* is running under EXPLAIN ANALYZE for subplans.
*/
MemoryContext SubPlanExplainAnalyzeContext = NULL;
SubPlanExplainOutputData *SubPlanExplainOutput;
extern uint8 TotalExplainOutputCapacity;
extern uint8 NumTasksOutput;
/* /*
* ExecuteSubPlans executes a list of subplans from a distributed plan * ExecuteSubPlans executes a list of subplans from a distributed plan
* by sequentially executing each plan from the top. * by sequentially executing each plan from the top.
*/ */
void void
ExecuteSubPlans(DistributedPlan *distributedPlan) ExecuteSubPlans(DistributedPlan *distributedPlan, bool explainAnalyzeEnabled)
{ {
uint64 planId = distributedPlan->planId; uint64 planId = distributedPlan->planId;
List *subPlanList = distributedPlan->subPlanList; List *subPlanList = distributedPlan->subPlanList;
@ -47,6 +56,19 @@ ExecuteSubPlans(DistributedPlan *distributedPlan)
return; return;
} }
/*
* If the root DistributedPlan has EXPLAIN ANALYZE enabled,
* its subplans should also have EXPLAIN ANALYZE enabled.
*/
if (explainAnalyzeEnabled)
{
SubPlanExplainAnalyzeContext = GetMemoryChunkContext(distributedPlan);
}
else
{
SubPlanExplainAnalyzeContext = NULL;
}
HTAB *intermediateResultsHash = MakeIntermediateResultHTAB(); HTAB *intermediateResultsHash = MakeIntermediateResultHTAB();
RecordSubplanExecutionsOnNodes(intermediateResultsHash, distributedPlan); RecordSubplanExecutionsOnNodes(intermediateResultsHash, distributedPlan);
@ -79,7 +101,23 @@ ExecuteSubPlans(DistributedPlan *distributedPlan)
TimestampTz startTimestamp = GetCurrentTimestamp(); TimestampTz startTimestamp = GetCurrentTimestamp();
ExecutePlanIntoDestReceiver(plannedStmt, params, copyDest); uint64 nprocessed;
PG_TRY();
{
nprocessed =
ExecutePlanIntoDestReceiver(plannedStmt, params, copyDest);
}
PG_CATCH();
{
SubPlanExplainAnalyzeContext = NULL;
SubPlanExplainOutput = NULL;
TotalExplainOutputCapacity = 0;
NumTasksOutput = 0;
PG_RE_THROW();
}
PG_END_TRY();
/* /*
* EXPLAIN ANALYZE instrumentations. Calculating these are very light-weight, * EXPLAIN ANALYZE instrumentations. Calculating these are very light-weight,
@ -94,10 +132,24 @@ ExecuteSubPlans(DistributedPlan *distributedPlan)
subPlan->durationMillisecs += durationMicrosecs * MICRO_TO_MILLI_SECOND; subPlan->durationMillisecs += durationMicrosecs * MICRO_TO_MILLI_SECOND;
subPlan->bytesSentPerWorker = RemoteFileDestReceiverBytesSent(copyDest); subPlan->bytesSentPerWorker = RemoteFileDestReceiverBytesSent(copyDest);
subPlan->ntuples = nprocessed;
subPlan->remoteWorkerCount = list_length(remoteWorkerNodeList); subPlan->remoteWorkerCount = list_length(remoteWorkerNodeList);
subPlan->writeLocalFile = entry->writeLocalFile; subPlan->writeLocalFile = entry->writeLocalFile;
SubPlanLevel--; SubPlanLevel--;
/*
* Save the EXPLAIN ANALYZE output(s) for later extraction in ExplainSubPlans().
* Because the SubPlan context isnt available during distributed execution,
* pass the pointer as a global variable in SubPlanExplainOutput.
*/
subPlan->totalExplainOutput = SubPlanExplainOutput;
subPlan->numTasksOutput = NumTasksOutput;
SubPlanExplainOutput = NULL;
TotalExplainOutputCapacity = 0;
NumTasksOutput = 0;
FreeExecutorState(estate); FreeExecutorState(estate);
} }
SubPlanExplainAnalyzeContext = NULL;
} }

View File

@ -26,6 +26,7 @@
#include "commands/tablecmds.h" #include "commands/tablecmds.h"
#include "executor/tstoreReceiver.h" #include "executor/tstoreReceiver.h"
#include "lib/stringinfo.h" #include "lib/stringinfo.h"
#include "nodes/nodeFuncs.h"
#include "nodes/plannodes.h" #include "nodes/plannodes.h"
#include "nodes/primnodes.h" #include "nodes/primnodes.h"
#include "nodes/print.h" #include "nodes/print.h"
@ -73,6 +74,7 @@
#include "distributed/placement_connection.h" #include "distributed/placement_connection.h"
#include "distributed/recursive_planning.h" #include "distributed/recursive_planning.h"
#include "distributed/remote_commands.h" #include "distributed/remote_commands.h"
#include "distributed/subplan_execution.h"
#include "distributed/tuple_destination.h" #include "distributed/tuple_destination.h"
#include "distributed/tuplestore.h" #include "distributed/tuplestore.h"
#include "distributed/version_compat.h" #include "distributed/version_compat.h"
@ -83,6 +85,7 @@
bool ExplainDistributedQueries = true; bool ExplainDistributedQueries = true;
bool ExplainAllTasks = false; bool ExplainAllTasks = false;
int ExplainAnalyzeSortMethod = EXPLAIN_ANALYZE_SORT_BY_TIME; int ExplainAnalyzeSortMethod = EXPLAIN_ANALYZE_SORT_BY_TIME;
extern MemoryContext SubPlanExplainAnalyzeContext;
/* /*
* If enabled, EXPLAIN ANALYZE output & other statistics of last worker task * If enabled, EXPLAIN ANALYZE output & other statistics of last worker task
@ -90,6 +93,11 @@ int ExplainAnalyzeSortMethod = EXPLAIN_ANALYZE_SORT_BY_TIME;
*/ */
static char *SavedExplainPlan = NULL; static char *SavedExplainPlan = NULL;
static double SavedExecutionDurationMillisec = 0.0; static double SavedExecutionDurationMillisec = 0.0;
static double SavedExplainPlanNtuples = 0;
static double SavedExplainPlanNloops = 0;
extern SubPlanExplainOutputData *SubPlanExplainOutput;
uint8 TotalExplainOutputCapacity = 0;
uint8 NumTasksOutput = 0;
/* struct to save explain flags */ /* struct to save explain flags */
typedef struct typedef struct
@ -215,7 +223,8 @@ static const char * ExplainFormatStr(ExplainFormat format);
#if PG_VERSION_NUM >= PG_VERSION_17 #if PG_VERSION_NUM >= PG_VERSION_17
static const char * ExplainSerializeStr(ExplainSerializeOption serializeOption); static const char * ExplainSerializeStr(ExplainSerializeOption serializeOption);
#endif #endif
static void ExplainWorkerPlan(PlannedStmt *plannedStmt, DestReceiver *dest, static void ExplainWorkerPlan(PlannedStmt *plannedStmt, DistributedSubPlan *subPlan,
DestReceiver *dest,
ExplainState *es, ExplainState *es,
const char *queryString, ParamListInfo params, const char *queryString, ParamListInfo params,
QueryEnvironment *queryEnv, QueryEnvironment *queryEnv,
@ -224,7 +233,9 @@ static void ExplainWorkerPlan(PlannedStmt *plannedStmt, DestReceiver *dest,
const BufferUsage *bufusage, const BufferUsage *bufusage,
const MemoryContextCounters *mem_counters, const MemoryContextCounters *mem_counters,
#endif #endif
double *executionDurationMillisec); double *executionDurationMillisec,
double *executionTuples,
double *executionLoops);
static ExplainFormat ExtractFieldExplainFormat(Datum jsonbDoc, const char *fieldName, static ExplainFormat ExtractFieldExplainFormat(Datum jsonbDoc, const char *fieldName,
ExplainFormat defaultValue); ExplainFormat defaultValue);
#if PG_VERSION_NUM >= PG_VERSION_17 #if PG_VERSION_NUM >= PG_VERSION_17
@ -256,7 +267,8 @@ static double elapsed_time(instr_time *starttime);
static void ExplainPropertyBytes(const char *qlabel, int64 bytes, ExplainState *es); static void ExplainPropertyBytes(const char *qlabel, int64 bytes, ExplainState *es);
static uint64 TaskReceivedTupleData(Task *task); static uint64 TaskReceivedTupleData(Task *task);
static bool ShowReceivedTupleData(CitusScanState *scanState, ExplainState *es); static bool ShowReceivedTupleData(CitusScanState *scanState, ExplainState *es);
static bool PlanStateAnalyzeWalker(PlanState *planState, void *ctx);
static void ExtractAnalyzeStats(DistributedSubPlan *subPlan, PlanState *planState);
/* exports for SQL callable functions */ /* exports for SQL callable functions */
PG_FUNCTION_INFO_V1(worker_last_saved_explain_analyze); PG_FUNCTION_INFO_V1(worker_last_saved_explain_analyze);
@ -432,6 +444,84 @@ NonPushableMergeCommandExplainScan(CustomScanState *node, List *ancestors,
} }
/*
* ExtractAnalyzeStats parses the EXPLAIN ANALYZE output of the pre-executed
* subplans and injects the parsed statistics into queryDesc->planstate->instrument.
*/
static void
ExtractAnalyzeStats(DistributedSubPlan *subPlan, PlanState *planState)
{
if (!planState)
{
return;
}
Instrumentation *instr = planState->instrument;
if (!IsA(planState, CustomScanState))
{
instr->ntuples = subPlan->ntuples;
instr->nloops = 1; /* subplan nodes are executed only once */
return;
}
Assert(IsA(planState, CustomScanState));
if (subPlan->numTasksOutput <= 0)
{
return;
}
ListCell *lc;
int tasksOutput = 0;
double tasksNtuples = 0;
double tasksNloops = 0;
memset(instr, 0, sizeof(Instrumentation));
DistributedPlan *newdistributedPlan =
((CitusScanState *) planState)->distributedPlan;
/*
* Inject the earlier executed resultsextracted from the workers' EXPLAIN output
* into the newly created tasks.
*/
foreach(lc, newdistributedPlan->workerJob->taskList)
{
Task *task = (Task *) lfirst(lc);
uint32 taskId = task->taskId;
if (tasksOutput > subPlan->numTasksOutput)
{
break;
}
if (!subPlan->totalExplainOutput[taskId].explainOutput)
{
continue;
}
/*
* Now feed the earlier saved output, which will be used
* by RemoteExplain() when printing tasks
*/
MemoryContext taskContext = GetMemoryChunkContext(task);
task->totalReceivedTupleData =
subPlan->totalExplainOutput[taskId].totalReceivedTupleData;
task->fetchedExplainAnalyzeExecutionDuration =
subPlan->totalExplainOutput[taskId].executionDuration;
task->fetchedExplainAnalyzePlan =
MemoryContextStrdup(taskContext,
subPlan->totalExplainOutput[taskId].explainOutput);
tasksNtuples += subPlan->totalExplainOutput[taskId].executionNtuples;
tasksNloops = subPlan->totalExplainOutput[taskId].executionNloops;
subPlan->totalExplainOutput[taskId].explainOutput = NULL;
tasksOutput++;
}
instr->ntuples = tasksNtuples;
instr->nloops = tasksNloops;
}
/* /*
* ExplainSubPlans generates EXPLAIN output for subplans for CTEs * ExplainSubPlans generates EXPLAIN output for subplans for CTEs
* and complex subqueries. Because the planning for these queries * and complex subqueries. Because the planning for these queries
@ -450,7 +540,6 @@ ExplainSubPlans(DistributedPlan *distributedPlan, ExplainState *es)
{ {
DistributedSubPlan *subPlan = (DistributedSubPlan *) lfirst(subPlanCell); DistributedSubPlan *subPlan = (DistributedSubPlan *) lfirst(subPlanCell);
PlannedStmt *plan = subPlan->plan; PlannedStmt *plan = subPlan->plan;
IntoClause *into = NULL;
ParamListInfo params = NULL; ParamListInfo params = NULL;
/* /*
@ -534,6 +623,11 @@ ExplainSubPlans(DistributedPlan *distributedPlan, ExplainState *es)
ExplainOpenGroup("PlannedStmt", "PlannedStmt", false, es); ExplainOpenGroup("PlannedStmt", "PlannedStmt", false, es);
DestReceiver *dest = None_Receiver; /* No query execution */
double executionDurationMillisec = 0.0;
double executionTuples = 0;
double executionLoops = 0;
/* Capture memory stats on PG17+ */ /* Capture memory stats on PG17+ */
#if PG_VERSION_NUM >= PG_VERSION_17 #if PG_VERSION_NUM >= PG_VERSION_17
if (es->memory) if (es->memory)
@ -541,31 +635,21 @@ ExplainSubPlans(DistributedPlan *distributedPlan, ExplainState *es)
MemoryContextSwitchTo(saved_ctx); MemoryContextSwitchTo(saved_ctx);
MemoryContextMemConsumed(planner_ctx, &mem_counters); MemoryContextMemConsumed(planner_ctx, &mem_counters);
} }
#endif
#if PG_VERSION_NUM >= PG_VERSION_17 /* Execute EXPLAIN without ANALYZE */
ExplainOnePlan( ExplainWorkerPlan(plan, subPlan, dest, es, queryString, params, NULL,
plan, &planduration,
into, (es->buffers ? &bufusage : NULL),
es, (es->memory ? &mem_counters : NULL),
queryString, &executionDurationMillisec,
params, &executionTuples,
NULL, /* QueryEnvironment *queryEnv */ &executionLoops);
&planduration,
(es->buffers ? &bufusage : NULL),
(es->memory ? &mem_counters : NULL)
);
#else #else
ExplainOnePlan(
plan, /* Execute EXPLAIN without ANALYZE */
into, ExplainWorkerPlan(plan, subPlan, dest, es, queryString, params, NULL,
es, &planduration, &executionDurationMillisec,
queryString, &executionTuples, &executionLoops);
params,
NULL, /* QueryEnvironment *queryEnv */
&planduration,
(es->buffers ? &bufusage : NULL)
);
#endif #endif
ExplainCloseGroup("PlannedStmt", "PlannedStmt", false, es); ExplainCloseGroup("PlannedStmt", "PlannedStmt", false, es);
@ -1236,17 +1320,19 @@ worker_last_saved_explain_analyze(PG_FUNCTION_ARGS)
if (SavedExplainPlan != NULL) if (SavedExplainPlan != NULL)
{ {
int columnCount = tupleDescriptor->natts; int columnCount = tupleDescriptor->natts;
if (columnCount != 2) if (columnCount != 4)
{ {
ereport(ERROR, (errmsg("expected 3 output columns in definition of " ereport(ERROR, (errmsg("expected 4 output columns in definition of "
"worker_last_saved_explain_analyze, but got %d", "worker_last_saved_explain_analyze, but got %d",
columnCount))); columnCount)));
} }
bool columnNulls[2] = { false }; bool columnNulls[4] = { false };
Datum columnValues[2] = { Datum columnValues[4] = {
CStringGetTextDatum(SavedExplainPlan), CStringGetTextDatum(SavedExplainPlan),
Float8GetDatum(SavedExecutionDurationMillisec) Float8GetDatum(SavedExecutionDurationMillisec),
Float8GetDatum(SavedExplainPlanNtuples),
Float8GetDatum(SavedExplainPlanNloops)
}; };
tuplestore_putvalues(tupleStore, tupleDescriptor, columnValues, columnNulls); tuplestore_putvalues(tupleStore, tupleDescriptor, columnValues, columnNulls);
@ -1267,6 +1353,8 @@ worker_save_query_explain_analyze(PG_FUNCTION_ARGS)
text *queryText = PG_GETARG_TEXT_P(0); text *queryText = PG_GETARG_TEXT_P(0);
char *queryString = text_to_cstring(queryText); char *queryString = text_to_cstring(queryText);
double executionDurationMillisec = 0.0; double executionDurationMillisec = 0.0;
double executionTuples = 0;
double executionLoops = 0;
Datum explainOptions = PG_GETARG_DATUM(1); Datum explainOptions = PG_GETARG_DATUM(1);
ExplainState *es = NewExplainState(); ExplainState *es = NewExplainState();
@ -1383,16 +1471,19 @@ worker_save_query_explain_analyze(PG_FUNCTION_ARGS)
} }
/* do the actual EXPLAIN ANALYZE */ /* do the actual EXPLAIN ANALYZE */
ExplainWorkerPlan(plan, tupleStoreDest, es, queryString, boundParams, NULL, ExplainWorkerPlan(plan, NULL, tupleStoreDest, es, queryString, boundParams, NULL,
&planDuration, &planDuration,
(es->buffers ? &bufusage : NULL), (es->buffers ? &bufusage : NULL),
(es->memory ? &mem_counters : NULL), (es->memory ? &mem_counters : NULL),
&executionDurationMillisec); &executionDurationMillisec,
&executionTuples,
&executionLoops);
#else #else
/* do the actual EXPLAIN ANALYZE */ /* do the actual EXPLAIN ANALYZE */
ExplainWorkerPlan(plan, tupleStoreDest, es, queryString, boundParams, NULL, ExplainWorkerPlan(plan, NULL, tupleStoreDest, es, queryString, boundParams, NULL,
&planDuration, &executionDurationMillisec); &planDuration, &executionDurationMillisec,
&executionTuples, &executionLoops);
#endif #endif
ExplainEndOutput(es); ExplainEndOutput(es);
@ -1403,6 +1494,8 @@ worker_save_query_explain_analyze(PG_FUNCTION_ARGS)
SavedExplainPlan = pstrdup(es->str->data); SavedExplainPlan = pstrdup(es->str->data);
SavedExecutionDurationMillisec = executionDurationMillisec; SavedExecutionDurationMillisec = executionDurationMillisec;
SavedExplainPlanNtuples = executionTuples;
SavedExplainPlanNloops = executionLoops;
MemoryContextSwitchTo(oldContext); MemoryContextSwitchTo(oldContext);
@ -1632,11 +1725,13 @@ CreateExplainAnlyzeDestination(Task *task, TupleDestination *taskDest)
tupleDestination->originalTask = task; tupleDestination->originalTask = task;
tupleDestination->originalTaskDestination = taskDest; tupleDestination->originalTaskDestination = taskDest;
TupleDesc lastSavedExplainAnalyzeTupDesc = CreateTemplateTupleDesc(2); TupleDesc lastSavedExplainAnalyzeTupDesc = CreateTemplateTupleDesc(4);
TupleDescInitEntry(lastSavedExplainAnalyzeTupDesc, 1, "explain analyze", TEXTOID, 0, TupleDescInitEntry(lastSavedExplainAnalyzeTupDesc, 1, "explain analyze", TEXTOID, 0,
0); 0);
TupleDescInitEntry(lastSavedExplainAnalyzeTupDesc, 2, "duration", FLOAT8OID, 0, 0); TupleDescInitEntry(lastSavedExplainAnalyzeTupDesc, 2, "duration", FLOAT8OID, 0, 0);
TupleDescInitEntry(lastSavedExplainAnalyzeTupDesc, 3, "ntuples", FLOAT8OID, 0, 0);
TupleDescInitEntry(lastSavedExplainAnalyzeTupDesc, 4, "nloops", FLOAT8OID, 0, 0);
tupleDestination->lastSavedExplainAnalyzeTupDesc = lastSavedExplainAnalyzeTupDesc; tupleDestination->lastSavedExplainAnalyzeTupDesc = lastSavedExplainAnalyzeTupDesc;
@ -1647,6 +1742,51 @@ CreateExplainAnlyzeDestination(Task *task, TupleDestination *taskDest)
} }
/*
* EnsureExplainOutputCapacity is to ensure capacity for new entries. Input
* parameter requiredSize is minimum number of elements needed.
*/
static void
EnsureExplainOutputCapacity(int requiredSize)
{
if (requiredSize < TotalExplainOutputCapacity)
{
return;
}
int newCapacity =
(TotalExplainOutputCapacity == 0) ? 32 : TotalExplainOutputCapacity * 2;
while (newCapacity <= requiredSize)
{
newCapacity *= 2;
}
if (SubPlanExplainOutput == NULL)
{
SubPlanExplainOutput =
(SubPlanExplainOutputData *) MemoryContextAllocZero(
SubPlanExplainAnalyzeContext,
newCapacity *
sizeof(SubPlanExplainOutputData));
}
else
{
/* Use repalloc and manually zero the new memory */
int oldSize = TotalExplainOutputCapacity * sizeof(SubPlanExplainOutputData);
int newSize = newCapacity * sizeof(SubPlanExplainOutputData);
SubPlanExplainOutput =
(SubPlanExplainOutputData *) repalloc(SubPlanExplainOutput, newSize);
/* Zero out the newly allocated memory */
MemSet((char *) SubPlanExplainOutput + oldSize, 0, newSize - oldSize);
}
TotalExplainOutputCapacity = newCapacity;
}
/* /*
* ExplainAnalyzeDestPutTuple implements TupleDestination->putTuple * ExplainAnalyzeDestPutTuple implements TupleDestination->putTuple
* for ExplainAnalyzeDestination. * for ExplainAnalyzeDestination.
@ -1656,6 +1796,8 @@ ExplainAnalyzeDestPutTuple(TupleDestination *self, Task *task,
int placementIndex, int queryNumber, int placementIndex, int queryNumber,
HeapTuple heapTuple, uint64 tupleLibpqSize) HeapTuple heapTuple, uint64 tupleLibpqSize)
{ {
uint32 taskId = task->taskId;
ExplainAnalyzeDestination *tupleDestination = (ExplainAnalyzeDestination *) self; ExplainAnalyzeDestination *tupleDestination = (ExplainAnalyzeDestination *) self;
if (queryNumber == 0) if (queryNumber == 0)
{ {
@ -1663,6 +1805,13 @@ ExplainAnalyzeDestPutTuple(TupleDestination *self, Task *task,
originalTupDest->putTuple(originalTupDest, task, placementIndex, 0, heapTuple, originalTupDest->putTuple(originalTupDest, task, placementIndex, 0, heapTuple,
tupleLibpqSize); tupleLibpqSize);
tupleDestination->originalTask->totalReceivedTupleData += tupleLibpqSize; tupleDestination->originalTask->totalReceivedTupleData += tupleLibpqSize;
if (SubPlanExplainAnalyzeContext)
{
EnsureExplainOutputCapacity(taskId + 1);
SubPlanExplainOutput[taskId].totalReceivedTupleData =
tupleDestination->originalTask->totalReceivedTupleData;
}
} }
else if (queryNumber == 1) else if (queryNumber == 1)
{ {
@ -1678,6 +1827,8 @@ ExplainAnalyzeDestPutTuple(TupleDestination *self, Task *task,
} }
Datum executionDuration = heap_getattr(heapTuple, 2, tupDesc, &isNull); Datum executionDuration = heap_getattr(heapTuple, 2, tupDesc, &isNull);
Datum executionTuples = heap_getattr(heapTuple, 3, tupDesc, &isNull);
Datum executionLoops = heap_getattr(heapTuple, 4, tupDesc, &isNull);
if (isNull) if (isNull)
{ {
@ -1687,6 +1838,8 @@ ExplainAnalyzeDestPutTuple(TupleDestination *self, Task *task,
char *fetchedExplainAnalyzePlan = TextDatumGetCString(explainAnalyze); char *fetchedExplainAnalyzePlan = TextDatumGetCString(explainAnalyze);
double fetchedExplainAnalyzeExecutionDuration = DatumGetFloat8(executionDuration); double fetchedExplainAnalyzeExecutionDuration = DatumGetFloat8(executionDuration);
double fetchedExplainAnalyzeTuples = DatumGetFloat8(executionTuples);
double fetchedExplainAnalyzeLoops = DatumGetFloat8(executionLoops);
/* /*
* Allocate fetchedExplainAnalyzePlan in the same context as the Task, since we are * Allocate fetchedExplainAnalyzePlan in the same context as the Task, since we are
@ -1712,6 +1865,20 @@ ExplainAnalyzeDestPutTuple(TupleDestination *self, Task *task,
placementIndex; placementIndex;
tupleDestination->originalTask->fetchedExplainAnalyzeExecutionDuration = tupleDestination->originalTask->fetchedExplainAnalyzeExecutionDuration =
fetchedExplainAnalyzeExecutionDuration; fetchedExplainAnalyzeExecutionDuration;
/* We should build tupleDestination in subPlan similar to the above */
if (SubPlanExplainAnalyzeContext)
{
EnsureExplainOutputCapacity(taskId + 1);
SubPlanExplainOutput[taskId].explainOutput =
MemoryContextStrdup(SubPlanExplainAnalyzeContext,
fetchedExplainAnalyzePlan);
SubPlanExplainOutput[taskId].executionDuration =
fetchedExplainAnalyzeExecutionDuration;
SubPlanExplainOutput[taskId].executionNtuples = fetchedExplainAnalyzeTuples;
SubPlanExplainOutput[taskId].executionNloops = fetchedExplainAnalyzeLoops;
NumTasksOutput++;
}
} }
else else
{ {
@ -1774,7 +1941,14 @@ ExplainAnalyzeDestTupleDescForQuery(TupleDestination *self, int queryNumber)
bool bool
RequestedForExplainAnalyze(CitusScanState *node) RequestedForExplainAnalyze(CitusScanState *node)
{ {
return (node->customScanState.ss.ps.state->es_instrument != 0); /*
* When running a distributed planeither the root plan or a subplans
* distributed fragmentwe need to know if were under EXPLAIN ANALYZE.
* Subplans cant receive the EXPLAIN ANALYZE flag directly, so we use
* SubPlanExplainAnalyzeContext as a flag to indicate that context.
*/
return (node->customScanState.ss.ps.state->es_instrument != 0) ||
(SubPlanLevel > 0 && SubPlanExplainAnalyzeContext);
} }
@ -1933,7 +2107,8 @@ FetchPlanQueryForExplainAnalyze(const char *queryString, ParamListInfo params)
} }
appendStringInfoString(fetchQuery, appendStringInfoString(fetchQuery,
"SELECT explain_analyze_output, execution_duration " "SELECT explain_analyze_output, execution_duration, "
"execution_ntuples, execution_nloops "
"FROM worker_last_saved_explain_analyze()"); "FROM worker_last_saved_explain_analyze()");
return fetchQuery->data; return fetchQuery->data;
@ -2105,6 +2280,20 @@ ExplainOneQuery(Query *query, int cursorOptions,
} }
/*
* PlanStateAnalyzeWalker Tree walker callback that visits each PlanState node in the
* plan tree and extracts analyze statistics from CustomScanState tasks using
* ExtractAnalyzeStats. Always returns false to recurse into all children.
*/
static bool
PlanStateAnalyzeWalker(PlanState *planState, void *ctx)
{
DistributedSubPlan *subplan = (DistributedSubPlan *) ctx;
ExtractAnalyzeStats(subplan, planState);
return false;
}
/* /*
* ExplainWorkerPlan produces explain output into es. If es->analyze, it also executes * ExplainWorkerPlan produces explain output into es. If es->analyze, it also executes
* the given plannedStmt and sends the results to dest. It puts total time to execute in * the given plannedStmt and sends the results to dest. It puts total time to execute in
@ -2119,20 +2308,25 @@ ExplainOneQuery(Query *query, int cursorOptions,
* destination. * destination.
*/ */
static void static void
ExplainWorkerPlan(PlannedStmt *plannedstmt, DestReceiver *dest, ExplainState *es, ExplainWorkerPlan(PlannedStmt *plannedstmt, DistributedSubPlan *subPlan, DestReceiver *dest, ExplainState *es,
const char *queryString, ParamListInfo params, QueryEnvironment *queryEnv, const char *queryString, ParamListInfo params, QueryEnvironment *queryEnv,
const instr_time *planduration, const instr_time *planduration,
#if PG_VERSION_NUM >= PG_VERSION_17 #if PG_VERSION_NUM >= PG_VERSION_17
const BufferUsage *bufusage, const BufferUsage *bufusage,
const MemoryContextCounters *mem_counters, const MemoryContextCounters *mem_counters,
#endif #endif
double *executionDurationMillisec) double *executionDurationMillisec,
double *executionTuples,
double *executionLoops)
{ {
QueryDesc *queryDesc; QueryDesc *queryDesc;
instr_time starttime; instr_time starttime;
double totaltime = 0; double totaltime = 0;
int eflags; int eflags;
int instrument_option = 0; int instrument_option = 0;
/* Sub-plan already executed; skipping execution */
bool executeQuery = (es->analyze && !subPlan);
bool executeSubplan = (es->analyze && subPlan);
Assert(plannedstmt->commandType != CMD_UTILITY); Assert(plannedstmt->commandType != CMD_UTILITY);
@ -2174,7 +2368,7 @@ ExplainWorkerPlan(PlannedStmt *plannedstmt, DestReceiver *dest, ExplainState *es
); );
/* Select execution options */ /* Select execution options */
if (es->analyze) if (executeQuery)
eflags = 0; /* default run-to-completion flags */ eflags = 0; /* default run-to-completion flags */
else else
eflags = EXEC_FLAG_EXPLAIN_ONLY; eflags = EXEC_FLAG_EXPLAIN_ONLY;
@ -2183,7 +2377,7 @@ ExplainWorkerPlan(PlannedStmt *plannedstmt, DestReceiver *dest, ExplainState *es
ExecutorStart(queryDesc, eflags); ExecutorStart(queryDesc, eflags);
/* Execute the plan for statistics if asked for */ /* Execute the plan for statistics if asked for */
if (es->analyze) if (executeQuery)
{ {
ScanDirection dir = ForwardScanDirection; ScanDirection dir = ForwardScanDirection;
@ -2206,6 +2400,12 @@ ExplainWorkerPlan(PlannedStmt *plannedstmt, DestReceiver *dest, ExplainState *es
ExplainOpenGroup("Query", NULL, true, es); ExplainOpenGroup("Query", NULL, true, es);
if (executeSubplan)
{
ExtractAnalyzeStats(subPlan, queryDesc->planstate);
planstate_tree_walker(queryDesc->planstate, PlanStateAnalyzeWalker, (void *) subPlan);
}
/* Create textual dump of plan tree */ /* Create textual dump of plan tree */
ExplainPrintPlan(es, queryDesc); ExplainPrintPlan(es, queryDesc);
@ -2278,6 +2478,13 @@ ExplainWorkerPlan(PlannedStmt *plannedstmt, DestReceiver *dest, ExplainState *es
*/ */
INSTR_TIME_SET_CURRENT(starttime); INSTR_TIME_SET_CURRENT(starttime);
if (executeQuery)
{
Instrumentation *instr = queryDesc->planstate->instrument;
*executionTuples = instr->ntuples;
*executionLoops = instr->nloops;
}
ExecutorEnd(queryDesc); ExecutorEnd(queryDesc);
FreeQueryDesc(queryDesc); FreeQueryDesc(queryDesc);
@ -2285,7 +2492,7 @@ ExplainWorkerPlan(PlannedStmt *plannedstmt, DestReceiver *dest, ExplainState *es
PopActiveSnapshot(); PopActiveSnapshot();
/* We need a CCI just in case query expanded to multiple plans */ /* We need a CCI just in case query expanded to multiple plans */
if (es->analyze) if (executeQuery)
CommandCounterIncrement(); CommandCounterIncrement();
totaltime += elapsed_time(&starttime); totaltime += elapsed_time(&starttime);

View File

@ -1,2 +1,3 @@
-- citus--13.1-1--13.2-1 -- citus--13.1-1--13.2-1
-- bump version to 13.2-1 -- bump version to 13.2-1
#include "udfs/worker_last_saved_explain_analyze/13.2-1.sql"

View File

@ -1,2 +1,3 @@
-- citus--13.2-1--13.1-1 -- citus--13.2-1--13.1-1
-- downgrade version to 13.1-1 -- downgrade version to 13.1-1
#include "../udfs/worker_last_saved_explain_analyze/9.4-1.sql"

View File

@ -0,0 +1,10 @@
DROP FUNCTION pg_catalog.worker_last_saved_explain_analyze();
CREATE OR REPLACE FUNCTION pg_catalog.worker_last_saved_explain_analyze()
RETURNS TABLE(explain_analyze_output TEXT, execution_duration DOUBLE PRECISION,
execution_ntuples DOUBLE PRECISION, execution_nloops DOUBLE PRECISION)
LANGUAGE C STRICT
AS 'citus';
COMMENT ON FUNCTION pg_catalog.worker_last_saved_explain_analyze() IS
'Returns the saved explain analyze output for the last run query';

View File

@ -1,4 +1,6 @@
DROP FUNCTION IF EXISTS pg_catalog.worker_last_saved_explain_analyze();
CREATE OR REPLACE FUNCTION pg_catalog.worker_last_saved_explain_analyze() CREATE OR REPLACE FUNCTION pg_catalog.worker_last_saved_explain_analyze()
RETURNS TABLE(explain_analyze_output TEXT, execution_duration DOUBLE PRECISION) RETURNS TABLE(explain_analyze_output TEXT, execution_duration DOUBLE PRECISION)
LANGUAGE C STRICT LANGUAGE C STRICT

View File

@ -1,6 +1,9 @@
DROP FUNCTION pg_catalog.worker_last_saved_explain_analyze();
CREATE OR REPLACE FUNCTION pg_catalog.worker_last_saved_explain_analyze() CREATE OR REPLACE FUNCTION pg_catalog.worker_last_saved_explain_analyze()
RETURNS TABLE(explain_analyze_output TEXT, execution_duration DOUBLE PRECISION) RETURNS TABLE(explain_analyze_output TEXT, execution_duration DOUBLE PRECISION,
execution_ntuples DOUBLE PRECISION, execution_nloops DOUBLE PRECISION)
LANGUAGE C STRICT LANGUAGE C STRICT
AS 'citus'; AS 'citus';
COMMENT ON FUNCTION pg_catalog.worker_last_saved_explain_analyze() IS COMMENT ON FUNCTION pg_catalog.worker_last_saved_explain_analyze() IS

View File

@ -147,6 +147,31 @@ CopyNodeDistributedSubPlan(COPYFUNC_ARGS)
COPY_SCALAR_FIELD(subPlanId); COPY_SCALAR_FIELD(subPlanId);
COPY_NODE_FIELD(plan); COPY_NODE_FIELD(plan);
COPY_SCALAR_FIELD(bytesSentPerWorker);
COPY_SCALAR_FIELD(remoteWorkerCount);
COPY_SCALAR_FIELD(durationMillisecs);
COPY_SCALAR_FIELD(writeLocalFile);
if (newnode->totalExplainOutput)
{
MemSet(newnode->totalExplainOutput, 0, sizeof(newnode->totalExplainOutput));
}
/* copy each SubPlanExplainOutput element */
for (int i = 0; i < from->numTasksOutput; i++)
{
/* copy the explainOutput string pointer */
COPY_STRING_FIELD(totalExplainOutput[i].explainOutput);
/* copy the executionDuration (double) */
COPY_SCALAR_FIELD(totalExplainOutput[i].executionDuration);
/* copy the totalReceivedTupleData (uint64) */
COPY_SCALAR_FIELD(totalExplainOutput[i].totalReceivedTupleData);
}
COPY_SCALAR_FIELD(numTasksOutput);
COPY_SCALAR_FIELD(ntuples);
} }

View File

@ -215,6 +215,48 @@ OutDistributedSubPlan(OUTFUNC_ARGS)
WRITE_UINT_FIELD(subPlanId); WRITE_UINT_FIELD(subPlanId);
WRITE_NODE_FIELD(plan); WRITE_NODE_FIELD(plan);
WRITE_UINT64_FIELD(bytesSentPerWorker);
WRITE_INT_FIELD(remoteWorkerCount);
WRITE_FLOAT_FIELD(durationMillisecs, "%.2f");
WRITE_BOOL_FIELD(writeLocalFile);
appendStringInfoString(str, " totalExplainOutput [");
for (int i = 0; i < node->numTasksOutput; i++)
{
const SubPlanExplainOutputData *e = &node->totalExplainOutput[i];
/* skip empty slots */
if (e->explainOutput == NULL &&
e->executionDuration == 0
&& e->totalReceivedTupleData == 0)
{
continue;
}
if (i > 0)
{
appendStringInfoChar(str, ' ');
}
appendStringInfoChar(str, '(');
/* string pointer prints quoted or NULL */
WRITE_STRING_FIELD(totalExplainOutput[i].explainOutput);
/* double field */
WRITE_FLOAT_FIELD(totalExplainOutput[i].executionDuration, "%.2f");
/* 64-bit unsigned use the uint64 macro */
WRITE_UINT64_FIELD(totalExplainOutput[i].totalReceivedTupleData);
appendStringInfoChar(str, ')');
}
appendStringInfoChar(str, ']');
WRITE_INT_FIELD(numTasksOutput);
WRITE_FLOAT_FIELD(ntuples, "%.2f");
} }
void void

View File

@ -146,8 +146,8 @@ extern void ExecuteQueryStringIntoDestReceiver(const char *queryString, ParamLis
DestReceiver *dest); DestReceiver *dest);
extern void ExecuteQueryIntoDestReceiver(Query *query, ParamListInfo params, extern void ExecuteQueryIntoDestReceiver(Query *query, ParamListInfo params,
DestReceiver *dest); DestReceiver *dest);
extern void ExecutePlanIntoDestReceiver(PlannedStmt *queryPlan, ParamListInfo params, extern uint64 ExecutePlanIntoDestReceiver(PlannedStmt *queryPlan, ParamListInfo params,
DestReceiver *dest); DestReceiver *dest);
extern void SetLocalMultiShardModifyModeToSequential(void); extern void SetLocalMultiShardModifyModeToSequential(void);
extern void EnsureSequentialMode(ObjectType objType); extern void EnsureSequentialMode(ObjectType objType);
extern void SetLocalForceMaxQueryParallelization(void); extern void SetLocalForceMaxQueryParallelization(void);

View File

@ -490,6 +490,24 @@ typedef struct DistributedPlan
} DistributedPlan; } DistributedPlan;
/*
* SubPlanExplainOutputData Holds the EXPLAIN ANALYZE output and collected
* statistics for a single task executed by a worker during distributed
* query execution.
* explainOutput raw EXPLAIN ANALYZE output for the task
* executionDuration wallclock time taken to run the task
* totalReceivedTupleData total bytes of tuple data received from the worker
*/
typedef struct SubPlanExplainOutputData
{
char *explainOutput;
double executionDuration;
double executionNtuples;
double executionNloops;
uint64 totalReceivedTupleData;
} SubPlanExplainOutputData;
/* /*
* DistributedSubPlan contains a subplan of a distributed plan. Subplans are * DistributedSubPlan contains a subplan of a distributed plan. Subplans are
* executed before the distributed query and their results are written to * executed before the distributed query and their results are written to
@ -508,6 +526,9 @@ typedef struct DistributedSubPlan
uint32 remoteWorkerCount; uint32 remoteWorkerCount;
double durationMillisecs; double durationMillisecs;
bool writeLocalFile; bool writeLocalFile;
SubPlanExplainOutputData *totalExplainOutput;
uint32 numTasksOutput; /* actual size of the above array */
double ntuples; /* total tuples produced */
} DistributedSubPlan; } DistributedSubPlan;

View File

@ -17,7 +17,7 @@
extern int MaxIntermediateResult; extern int MaxIntermediateResult;
extern int SubPlanLevel; extern int SubPlanLevel;
extern void ExecuteSubPlans(DistributedPlan *distributedPlan); extern void ExecuteSubPlans(DistributedPlan *distributedPlan, bool explainAnalyzeEnabled);
/** /**
* IntermediateResultsHashEntry is used to store which nodes need to receive * IntermediateResultsHashEntry is used to store which nodes need to receive

View File

@ -2492,15 +2492,15 @@ Custom Scan (Citus Adaptive) (actual rows=1 loops=1)
-> Distributed Subplan XXX_1 -> Distributed Subplan XXX_1
Intermediate Data Size: 100 bytes Intermediate Data Size: 100 bytes
Result destination: Write locally Result destination: Write locally
-> Custom Scan (Citus Adaptive) (actual rows=20 loops=1) -> Custom Scan (Citus Adaptive) (actual rows=10 loops=1)
Task Count: 4 Task Count: 4
Tuple data received from nodes: 160 bytes Tuple data received from nodes: 80 bytes
Tasks Shown: One of 4 Tasks Shown: One of 4
-> Task -> Task
Tuple data received from node: 64 bytes Tuple data received from node: 32 bytes
Node: host=localhost port=xxxxx dbname=regression Node: host=localhost port=xxxxx dbname=regression
-> Insert on dist_table_570017 citus_table_alias (actual rows=8 loops=1) -> Insert on dist_table_570017 citus_table_alias (actual rows=4 loops=1)
-> Seq Scan on dist_table_570017 dist_table (actual rows=8 loops=1) -> Seq Scan on dist_table_570017 dist_table (actual rows=4 loops=1)
Filter: (a IS NOT NULL) Filter: (a IS NOT NULL)
-> Distributed Subplan XXX_2 -> Distributed Subplan XXX_2
Intermediate Data Size: 150 bytes Intermediate Data Size: 150 bytes
@ -3228,6 +3228,159 @@ Custom Scan (Citus Adaptive) (actual rows=0 loops=1)
-> Update on tbl_570036 tbl (actual rows=0 loops=1) -> Update on tbl_570036 tbl (actual rows=0 loops=1)
-> Seq Scan on tbl_570036 tbl (actual rows=0 loops=1) -> Seq Scan on tbl_570036 tbl (actual rows=0 loops=1)
Filter: (a = 1) Filter: (a = 1)
-- EXPLAIN ANALYZE shouldn't execute SubPlans twice (bug #4212)
SET search_path TO multi_explain;
CREATE TABLE test_subplans (x int primary key, y int);
SELECT create_distributed_table('test_subplans','x');
EXPLAIN (COSTS off, ANALYZE on, TIMING off, SUMMARY off)
WITH a AS (INSERT INTO test_subplans VALUES (1,2) RETURNING *)
SELECT * FROM a;
Custom Scan (Citus Adaptive) (actual rows=1 loops=1)
-> Distributed Subplan XXX_1
Intermediate Data Size: 18 bytes
Result destination: Write locally
-> Custom Scan (Citus Adaptive) (actual rows=1 loops=1)
Task Count: 1
Tuple data received from nodes: 16 bytes
Tasks Shown: All
-> Task
Tuple data received from node: 16 bytes
Node: host=localhost port=xxxxx dbname=regression
-> Insert on test_subplans_570038 (actual rows=1 loops=1)
-> Result (actual rows=1 loops=1)
Task Count: 1
Tuple data received from nodes: 8 bytes
Tasks Shown: All
-> Task
Tuple data received from node: 8 bytes
Node: host=localhost port=xxxxx dbname=regression
-> Function Scan on read_intermediate_result intermediate_result (actual rows=1 loops=1)
-- Only one row must exist
SELECT * FROM test_subplans;
1|2
-- Will fail with duplicate pk
EXPLAIN (COSTS off, ANALYZE on, TIMING off, SUMMARY off)
WITH a AS (INSERT INTO test_subplans VALUES (1,2) RETURNING *)
SELECT * FROM a;
ERROR: duplicate key value violates unique constraint "test_subplans_pkey_570038"
DETAIL: Key (x)=(1) already exists.
CONTEXT: while executing command on localhost:xxxxx
-- Test JSON format
TRUNCATE test_subplans;
EXPLAIN (FORMAT JSON, COSTS off, ANALYZE on, TIMING off, SUMMARY off)
WITH a AS (INSERT INTO test_subplans VALUES (1,2) RETURNING *)
SELECT * FROM a;
[
{
"Plan": {
"Node Type": "Custom Scan",
"Custom Plan Provider": "Citus Adaptive",
"Parallel Aware": false,
"Async Capable": false,
"Actual Rows": 1,
"Actual Loops": 1,
"Distributed Query": {
"Subplans": [
{
"Intermediate Data Size": "18 bytes",
"Result destination": "Write locally",
"PlannedStmt": [
{
"Plan": {
"Node Type": "Custom Scan",
"Custom Plan Provider": "Citus Adaptive",
"Parallel Aware": false,
"Async Capable": false,
"Actual Rows": 1,
"Actual Loops": 1,
"Distributed Query": {
"Job": {
"Task Count": 1,
"Tuple data received from nodes": "16 bytes",
"Tasks Shown": "All",
"Tasks": [
{
"Tuple data received from node": "16 bytes",
"Node": "host=localhost port=xxxxx dbname=regression",
"Remote Plan": [
[
{
"Plan": {
"Node Type": "ModifyTable",
"Operation": "Insert",
"Parallel Aware": false,
"Async Capable": false,
"Relation Name": "test_subplans_570038",
"Alias": "test_subplans_570038",
"Actual Rows": 1,
"Actual Loops": 1,
"Plans": [
{
"Node Type": "Result",
"Parent Relationship": "Outer",
"Parallel Aware": false,
"Async Capable": false,
"Actual Rows": 1,
"Actual Loops": 1
}
]
},
"Triggers": [
]
}
]
]
}
]
}
}
},
"Triggers": [
]
}
]
}
],
"Job": {
"Task Count": 1,
"Tuple data received from nodes": "8 bytes",
"Tasks Shown": "All",
"Tasks": [
{
"Tuple data received from node": "8 bytes",
"Node": "host=localhost port=xxxxx dbname=regression",
"Remote Plan": [
[
{
"Plan": {
"Node Type": "Function Scan",
"Parallel Aware": false,
"Async Capable": false,
"Function Name": "read_intermediate_result",
"Alias": "intermediate_result",
"Actual Rows": 1,
"Actual Loops": 1
},
"Triggers": [
]
}
]
]
}
]
}
}
},
"Triggers": [
]
}
]
-- Only one row must exist
SELECT * FROM test_subplans;
1|2
-- check when auto explain + analyze is enabled, we do not allow local execution. -- check when auto explain + analyze is enabled, we do not allow local execution.
CREATE SCHEMA test_auto_explain; CREATE SCHEMA test_auto_explain;
SET search_path TO 'test_auto_explain'; SET search_path TO 'test_auto_explain';

View File

@ -2484,15 +2484,15 @@ Custom Scan (Citus Adaptive) (actual rows=1 loops=1)
-> Distributed Subplan XXX_1 -> Distributed Subplan XXX_1
Intermediate Data Size: 100 bytes Intermediate Data Size: 100 bytes
Result destination: Write locally Result destination: Write locally
-> Custom Scan (Citus Adaptive) (actual rows=20 loops=1) -> Custom Scan (Citus Adaptive) (actual rows=10 loops=1)
Task Count: 4 Task Count: 4
Tuple data received from nodes: 160 bytes Tuple data received from nodes: 80 bytes
Tasks Shown: One of 4 Tasks Shown: One of 4
-> Task -> Task
Tuple data received from node: 64 bytes Tuple data received from node: 32 bytes
Node: host=localhost port=xxxxx dbname=regression Node: host=localhost port=xxxxx dbname=regression
-> Insert on dist_table_570017 citus_table_alias (actual rows=8 loops=1) -> Insert on dist_table_570017 citus_table_alias (actual rows=4 loops=1)
-> Seq Scan on dist_table_570017 dist_table (actual rows=8 loops=1) -> Seq Scan on dist_table_570017 dist_table (actual rows=4 loops=1)
Filter: (a IS NOT NULL) Filter: (a IS NOT NULL)
-> Distributed Subplan XXX_2 -> Distributed Subplan XXX_2
Intermediate Data Size: 150 bytes Intermediate Data Size: 150 bytes
@ -3217,6 +3217,159 @@ Custom Scan (Citus Adaptive) (actual rows=0 loops=1)
-> Update on tbl_570036 tbl (actual rows=0 loops=1) -> Update on tbl_570036 tbl (actual rows=0 loops=1)
-> Seq Scan on tbl_570036 tbl (actual rows=0 loops=1) -> Seq Scan on tbl_570036 tbl (actual rows=0 loops=1)
Filter: (a = 1) Filter: (a = 1)
-- EXPLAIN ANALYZE shouldn't execute SubPlans twice (bug #4212)
SET search_path TO multi_explain;
CREATE TABLE test_subplans (x int primary key, y int);
SELECT create_distributed_table('test_subplans','x');
EXPLAIN (COSTS off, ANALYZE on, TIMING off, SUMMARY off)
WITH a AS (INSERT INTO test_subplans VALUES (1,2) RETURNING *)
SELECT * FROM a;
Custom Scan (Citus Adaptive) (actual rows=1 loops=1)
-> Distributed Subplan XXX_1
Intermediate Data Size: 18 bytes
Result destination: Write locally
-> Custom Scan (Citus Adaptive) (actual rows=1 loops=1)
Task Count: 1
Tuple data received from nodes: 16 bytes
Tasks Shown: All
-> Task
Tuple data received from node: 16 bytes
Node: host=localhost port=xxxxx dbname=regression
-> Insert on test_subplans_570038 (actual rows=1 loops=1)
-> Result (actual rows=1 loops=1)
Task Count: 1
Tuple data received from nodes: 8 bytes
Tasks Shown: All
-> Task
Tuple data received from node: 8 bytes
Node: host=localhost port=xxxxx dbname=regression
-> Function Scan on read_intermediate_result intermediate_result (actual rows=1 loops=1)
-- Only one row must exist
SELECT * FROM test_subplans;
1|2
-- Will fail with duplicate pk
EXPLAIN (COSTS off, ANALYZE on, TIMING off, SUMMARY off)
WITH a AS (INSERT INTO test_subplans VALUES (1,2) RETURNING *)
SELECT * FROM a;
ERROR: duplicate key value violates unique constraint "test_subplans_pkey_570038"
DETAIL: Key (x)=(1) already exists.
CONTEXT: while executing command on localhost:xxxxx
-- Test JSON format
TRUNCATE test_subplans;
EXPLAIN (FORMAT JSON, COSTS off, ANALYZE on, TIMING off, SUMMARY off)
WITH a AS (INSERT INTO test_subplans VALUES (1,2) RETURNING *)
SELECT * FROM a;
[
{
"Plan": {
"Node Type": "Custom Scan",
"Custom Plan Provider": "Citus Adaptive",
"Parallel Aware": false,
"Async Capable": false,
"Actual Rows": 1,
"Actual Loops": 1,
"Distributed Query": {
"Subplans": [
{
"Intermediate Data Size": "18 bytes",
"Result destination": "Write locally",
"PlannedStmt": [
{
"Plan": {
"Node Type": "Custom Scan",
"Custom Plan Provider": "Citus Adaptive",
"Parallel Aware": false,
"Async Capable": false,
"Actual Rows": 1,
"Actual Loops": 1,
"Distributed Query": {
"Job": {
"Task Count": 1,
"Tuple data received from nodes": "16 bytes",
"Tasks Shown": "All",
"Tasks": [
{
"Tuple data received from node": "16 bytes",
"Node": "host=localhost port=xxxxx dbname=regression",
"Remote Plan": [
[
{
"Plan": {
"Node Type": "ModifyTable",
"Operation": "Insert",
"Parallel Aware": false,
"Async Capable": false,
"Relation Name": "test_subplans_570038",
"Alias": "test_subplans_570038",
"Actual Rows": 1,
"Actual Loops": 1,
"Plans": [
{
"Node Type": "Result",
"Parent Relationship": "Outer",
"Parallel Aware": false,
"Async Capable": false,
"Actual Rows": 1,
"Actual Loops": 1
}
]
},
"Triggers": [
]
}
]
]
}
]
}
}
},
"Triggers": [
]
}
]
}
],
"Job": {
"Task Count": 1,
"Tuple data received from nodes": "8 bytes",
"Tasks Shown": "All",
"Tasks": [
{
"Tuple data received from node": "8 bytes",
"Node": "host=localhost port=xxxxx dbname=regression",
"Remote Plan": [
[
{
"Plan": {
"Node Type": "Function Scan",
"Parallel Aware": false,
"Async Capable": false,
"Function Name": "read_intermediate_result",
"Alias": "intermediate_result",
"Actual Rows": 1,
"Actual Loops": 1
},
"Triggers": [
]
}
]
]
}
]
}
}
},
"Triggers": [
]
}
]
-- Only one row must exist
SELECT * FROM test_subplans;
1|2
-- check when auto explain + analyze is enabled, we do not allow local execution. -- check when auto explain + analyze is enabled, we do not allow local execution.
CREATE SCHEMA test_auto_explain; CREATE SCHEMA test_auto_explain;
SET search_path TO 'test_auto_explain'; SET search_path TO 'test_auto_explain';

View File

@ -1503,7 +1503,9 @@ ALTER EXTENSION citus UPDATE TO '13.2-1';
SELECT * FROM multi_extension.print_extension_changes(); SELECT * FROM multi_extension.print_extension_changes();
previous_object | current_object previous_object | current_object
--------------------------------------------------------------------- ---------------------------------------------------------------------
(0 rows) function worker_last_saved_explain_analyze() TABLE(explain_analyze_output text, execution_duration double precision) |
| function worker_last_saved_explain_analyze() TABLE(explain_analyze_output text, execution_duration double precision, execution_ntuples double precision, execution_nloops double precision)
(2 rows)
DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff; DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff;
-- show running version -- show running version

View File

@ -721,13 +721,11 @@ CALL exec_query_and_check_query_counters($$
0, 0 0, 0
); );
-- same with explain analyze -- same with explain analyze
--
-- this time, query_execution_multi_shard is incremented twice because of #4212
CALL exec_query_and_check_query_counters($$ CALL exec_query_and_check_query_counters($$
EXPLAIN (ANALYZE) EXPLAIN (ANALYZE)
SELECT * FROM (SELECT * FROM dist_table OFFSET 0) q SELECT * FROM (SELECT * FROM dist_table OFFSET 0) q
$$, $$,
1, 2 1, 1
); );
CALL exec_query_and_check_query_counters($$ CALL exec_query_and_check_query_counters($$
DELETE FROM dist_table WHERE a = 1 DELETE FROM dist_table WHERE a = 1
@ -1041,9 +1039,6 @@ PL/pgSQL function exec_query_and_check_query_counters(text,bigint,bigint) line X
-- A similar one but without the insert, so we would normally expect 2 increments -- A similar one but without the insert, so we would normally expect 2 increments
-- for query_execution_single_shard and 2 for query_execution_multi_shard instead -- for query_execution_single_shard and 2 for query_execution_multi_shard instead
-- of 3 since the insert is not there anymore. -- of 3 since the insert is not there anymore.
--
-- But this time we observe more counter increments because we execute the subplans
-- twice because of #4212.
CALL exec_query_and_check_query_counters($$ CALL exec_query_and_check_query_counters($$
EXPLAIN (ANALYZE) EXPLAIN (ANALYZE)
-- single-shard subplan (whole cte) -- single-shard subplan (whole cte)
@ -1057,7 +1052,7 @@ CALL exec_query_and_check_query_counters($$
FROM (SELECT * FROM dist_table_1 ORDER BY a LIMIT 16) q -- multi-shard subplan (subquery q) FROM (SELECT * FROM dist_table_1 ORDER BY a LIMIT 16) q -- multi-shard subplan (subquery q)
JOIN cte ON q.a = cte.a JOIN cte ON q.a = cte.a
$$, $$,
3, 4 2, 2
); );
-- safe to push-down -- safe to push-down
CALL exec_query_and_check_query_counters($$ CALL exec_query_and_check_query_counters($$

View File

@ -1166,6 +1166,32 @@ PREPARE q2(int_wrapper_type) AS WITH a AS (UPDATE tbl SET b = $1 WHERE a = 1 RET
EXPLAIN (COSTS false) EXECUTE q2('(1)'); EXPLAIN (COSTS false) EXECUTE q2('(1)');
EXPLAIN :default_analyze_flags EXECUTE q2('(1)'); EXPLAIN :default_analyze_flags EXECUTE q2('(1)');
-- EXPLAIN ANALYZE shouldn't execute SubPlans twice (bug #4212)
SET search_path TO multi_explain;
CREATE TABLE test_subplans (x int primary key, y int);
SELECT create_distributed_table('test_subplans','x');
EXPLAIN (COSTS off, ANALYZE on, TIMING off, SUMMARY off)
WITH a AS (INSERT INTO test_subplans VALUES (1,2) RETURNING *)
SELECT * FROM a;
-- Only one row must exist
SELECT * FROM test_subplans;
-- Will fail with duplicate pk
EXPLAIN (COSTS off, ANALYZE on, TIMING off, SUMMARY off)
WITH a AS (INSERT INTO test_subplans VALUES (1,2) RETURNING *)
SELECT * FROM a;
-- Test JSON format
TRUNCATE test_subplans;
EXPLAIN (FORMAT JSON, COSTS off, ANALYZE on, TIMING off, SUMMARY off)
WITH a AS (INSERT INTO test_subplans VALUES (1,2) RETURNING *)
SELECT * FROM a;
-- Only one row must exist
SELECT * FROM test_subplans;
-- check when auto explain + analyze is enabled, we do not allow local execution. -- check when auto explain + analyze is enabled, we do not allow local execution.
CREATE SCHEMA test_auto_explain; CREATE SCHEMA test_auto_explain;
SET search_path TO 'test_auto_explain'; SET search_path TO 'test_auto_explain';

View File

@ -476,13 +476,11 @@ CALL exec_query_and_check_query_counters($$
); );
-- same with explain analyze -- same with explain analyze
--
-- this time, query_execution_multi_shard is incremented twice because of #4212
CALL exec_query_and_check_query_counters($$ CALL exec_query_and_check_query_counters($$
EXPLAIN (ANALYZE) EXPLAIN (ANALYZE)
SELECT * FROM (SELECT * FROM dist_table OFFSET 0) q SELECT * FROM (SELECT * FROM dist_table OFFSET 0) q
$$, $$,
1, 2 1, 1
); );
CALL exec_query_and_check_query_counters($$ CALL exec_query_and_check_query_counters($$
@ -807,9 +805,6 @@ CALL exec_query_and_check_query_counters($$
-- A similar one but without the insert, so we would normally expect 2 increments -- A similar one but without the insert, so we would normally expect 2 increments
-- for query_execution_single_shard and 2 for query_execution_multi_shard instead -- for query_execution_single_shard and 2 for query_execution_multi_shard instead
-- of 3 since the insert is not there anymore. -- of 3 since the insert is not there anymore.
--
-- But this time we observe more counter increments because we execute the subplans
-- twice because of #4212.
CALL exec_query_and_check_query_counters($$ CALL exec_query_and_check_query_counters($$
EXPLAIN (ANALYZE) EXPLAIN (ANALYZE)
-- single-shard subplan (whole cte) -- single-shard subplan (whole cte)
@ -823,7 +818,7 @@ CALL exec_query_and_check_query_counters($$
FROM (SELECT * FROM dist_table_1 ORDER BY a LIMIT 16) q -- multi-shard subplan (subquery q) FROM (SELECT * FROM dist_table_1 ORDER BY a LIMIT 16) q -- multi-shard subplan (subquery q)
JOIN cte ON q.a = cte.a JOIN cte ON q.a = cte.a
$$, $$,
3, 4 2, 2
); );
-- safe to push-down -- safe to push-down