mirror of https://github.com/citusdata/citus.git
Merge e0a3d22662
into 5deaf9a616
commit
d51554529f
|
@ -630,6 +630,7 @@ typedef struct TaskPlacementExecution
|
||||||
instr_time endTime;
|
instr_time endTime;
|
||||||
} TaskPlacementExecution;
|
} TaskPlacementExecution;
|
||||||
|
|
||||||
|
extern MemoryContext SubPlanExplainAnalyzeContext;
|
||||||
|
|
||||||
/* local functions */
|
/* local functions */
|
||||||
static DistributedExecution * CreateDistributedExecution(RowModifyLevel modLevel,
|
static DistributedExecution * CreateDistributedExecution(RowModifyLevel modLevel,
|
||||||
|
@ -760,7 +761,7 @@ AdaptiveExecutorPreExecutorRun(CitusScanState *scanState)
|
||||||
*/
|
*/
|
||||||
LockPartitionsForDistributedPlan(distributedPlan);
|
LockPartitionsForDistributedPlan(distributedPlan);
|
||||||
|
|
||||||
ExecuteSubPlans(distributedPlan);
|
ExecuteSubPlans(distributedPlan, RequestedForExplainAnalyze(scanState));
|
||||||
|
|
||||||
scanState->finishedPreScan = true;
|
scanState->finishedPreScan = true;
|
||||||
}
|
}
|
||||||
|
|
|
@ -42,6 +42,7 @@
|
||||||
#include "distributed/merge_planner.h"
|
#include "distributed/merge_planner.h"
|
||||||
#include "distributed/metadata_cache.h"
|
#include "distributed/metadata_cache.h"
|
||||||
#include "distributed/multi_executor.h"
|
#include "distributed/multi_executor.h"
|
||||||
|
#include "distributed/multi_explain.h"
|
||||||
#include "distributed/multi_partitioning_utils.h"
|
#include "distributed/multi_partitioning_utils.h"
|
||||||
#include "distributed/multi_physical_planner.h"
|
#include "distributed/multi_physical_planner.h"
|
||||||
#include "distributed/multi_router_planner.h"
|
#include "distributed/multi_router_planner.h"
|
||||||
|
@ -121,7 +122,7 @@ NonPushableInsertSelectExecScan(CustomScanState *node)
|
||||||
bool binaryFormat =
|
bool binaryFormat =
|
||||||
CanUseBinaryCopyFormatForTargetList(selectQuery->targetList);
|
CanUseBinaryCopyFormatForTargetList(selectQuery->targetList);
|
||||||
|
|
||||||
ExecuteSubPlans(distSelectPlan);
|
ExecuteSubPlans(distSelectPlan, RequestedForExplainAnalyze(scanState));
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* We have a separate directory for each transaction, so choosing
|
* We have a separate directory for each transaction, so choosing
|
||||||
|
|
|
@ -23,6 +23,7 @@
|
||||||
#include "distributed/merge_executor.h"
|
#include "distributed/merge_executor.h"
|
||||||
#include "distributed/merge_planner.h"
|
#include "distributed/merge_planner.h"
|
||||||
#include "distributed/multi_executor.h"
|
#include "distributed/multi_executor.h"
|
||||||
|
#include "distributed/multi_explain.h"
|
||||||
#include "distributed/multi_partitioning_utils.h"
|
#include "distributed/multi_partitioning_utils.h"
|
||||||
#include "distributed/multi_router_planner.h"
|
#include "distributed/multi_router_planner.h"
|
||||||
#include "distributed/repartition_executor.h"
|
#include "distributed/repartition_executor.h"
|
||||||
|
@ -132,7 +133,7 @@ ExecuteSourceAtWorkerAndRepartition(CitusScanState *scanState)
|
||||||
ereport(DEBUG1, (errmsg("Executing subplans of the source query and "
|
ereport(DEBUG1, (errmsg("Executing subplans of the source query and "
|
||||||
"storing the results at the respective node(s)")));
|
"storing the results at the respective node(s)")));
|
||||||
|
|
||||||
ExecuteSubPlans(distSourcePlan);
|
ExecuteSubPlans(distSourcePlan, RequestedForExplainAnalyze(scanState));
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* We have a separate directory for each transaction, so choosing
|
* We have a separate directory for each transaction, so choosing
|
||||||
|
|
|
@ -30,13 +30,21 @@ int MaxIntermediateResult = 1048576; /* maximum size in KB the intermediate resu
|
||||||
/* when this is true, we enforce intermediate result size limit in all executors */
|
/* when this is true, we enforce intermediate result size limit in all executors */
|
||||||
int SubPlanLevel = 0;
|
int SubPlanLevel = 0;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* SubPlanExplainAnalyzeContext is both a memory context for storing
|
||||||
|
* subplans’ EXPLAIN ANALYZE output and a flag indicating that execution
|
||||||
|
* is running under EXPLAIN ANALYZE for subplans.
|
||||||
|
*/
|
||||||
|
MemoryContext SubPlanExplainAnalyzeContext = NULL;
|
||||||
|
SubPlanExplainOutput *SubPlanExplainAnalyzeOutput;
|
||||||
|
extern int NumTasksOutput;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* ExecuteSubPlans executes a list of subplans from a distributed plan
|
* ExecuteSubPlans executes a list of subplans from a distributed plan
|
||||||
* by sequentially executing each plan from the top.
|
* by sequentially executing each plan from the top.
|
||||||
*/
|
*/
|
||||||
void
|
void
|
||||||
ExecuteSubPlans(DistributedPlan *distributedPlan)
|
ExecuteSubPlans(DistributedPlan *distributedPlan, bool explainAnalyzeEnabled)
|
||||||
{
|
{
|
||||||
uint64 planId = distributedPlan->planId;
|
uint64 planId = distributedPlan->planId;
|
||||||
List *subPlanList = distributedPlan->subPlanList;
|
List *subPlanList = distributedPlan->subPlanList;
|
||||||
|
@ -47,6 +55,19 @@ ExecuteSubPlans(DistributedPlan *distributedPlan)
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* If the root DistributedPlan has EXPLAIN ANALYZE enabled,
|
||||||
|
* its subplans should also have EXPLAIN ANALYZE enabled.
|
||||||
|
*/
|
||||||
|
if (explainAnalyzeEnabled)
|
||||||
|
{
|
||||||
|
SubPlanExplainAnalyzeContext = GetMemoryChunkContext(distributedPlan);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
SubPlanExplainAnalyzeContext = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
HTAB *intermediateResultsHash = MakeIntermediateResultHTAB();
|
HTAB *intermediateResultsHash = MakeIntermediateResultHTAB();
|
||||||
RecordSubplanExecutionsOnNodes(intermediateResultsHash, distributedPlan);
|
RecordSubplanExecutionsOnNodes(intermediateResultsHash, distributedPlan);
|
||||||
|
|
||||||
|
@ -61,6 +82,13 @@ ExecuteSubPlans(DistributedPlan *distributedPlan)
|
||||||
DistributedSubPlan *subPlan = NULL;
|
DistributedSubPlan *subPlan = NULL;
|
||||||
foreach_declared_ptr(subPlan, subPlanList)
|
foreach_declared_ptr(subPlan, subPlanList)
|
||||||
{
|
{
|
||||||
|
/*
|
||||||
|
* Save the EXPLAIN ANALYZE output(s) to be extracted later
|
||||||
|
* in ExplainSubPlans()
|
||||||
|
*/
|
||||||
|
MemSet(subPlan->totalExplainOutput, 0, sizeof(subPlan->totalExplainOutput));
|
||||||
|
SubPlanExplainAnalyzeOutput = subPlan->totalExplainOutput;
|
||||||
|
|
||||||
PlannedStmt *plannedStmt = subPlan->plan;
|
PlannedStmt *plannedStmt = subPlan->plan;
|
||||||
uint32 subPlanId = subPlan->subPlanId;
|
uint32 subPlanId = subPlan->subPlanId;
|
||||||
ParamListInfo params = NULL;
|
ParamListInfo params = NULL;
|
||||||
|
@ -79,7 +107,17 @@ ExecuteSubPlans(DistributedPlan *distributedPlan)
|
||||||
|
|
||||||
TimestampTz startTimestamp = GetCurrentTimestamp();
|
TimestampTz startTimestamp = GetCurrentTimestamp();
|
||||||
|
|
||||||
ExecutePlanIntoDestReceiver(plannedStmt, params, copyDest);
|
PG_TRY();
|
||||||
|
{
|
||||||
|
ExecutePlanIntoDestReceiver(plannedStmt, params, copyDest);
|
||||||
|
}
|
||||||
|
PG_CATCH();
|
||||||
|
{
|
||||||
|
SubPlanExplainAnalyzeContext = NULL;
|
||||||
|
PG_RE_THROW();
|
||||||
|
}
|
||||||
|
PG_END_TRY();
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* EXPLAIN ANALYZE instrumentations. Calculating these are very light-weight,
|
* EXPLAIN ANALYZE instrumentations. Calculating these are very light-weight,
|
||||||
|
@ -98,6 +136,10 @@ ExecuteSubPlans(DistributedPlan *distributedPlan)
|
||||||
subPlan->writeLocalFile = entry->writeLocalFile;
|
subPlan->writeLocalFile = entry->writeLocalFile;
|
||||||
|
|
||||||
SubPlanLevel--;
|
SubPlanLevel--;
|
||||||
|
subPlan->numTasksOutput = NumTasksOutput;
|
||||||
|
NumTasksOutput = 0;
|
||||||
FreeExecutorState(estate);
|
FreeExecutorState(estate);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
SubPlanExplainAnalyzeContext = NULL;
|
||||||
}
|
}
|
||||||
|
|
|
@ -26,6 +26,7 @@
|
||||||
#include "commands/tablecmds.h"
|
#include "commands/tablecmds.h"
|
||||||
#include "executor/tstoreReceiver.h"
|
#include "executor/tstoreReceiver.h"
|
||||||
#include "lib/stringinfo.h"
|
#include "lib/stringinfo.h"
|
||||||
|
#include "nodes/nodeFuncs.h"
|
||||||
#include "nodes/plannodes.h"
|
#include "nodes/plannodes.h"
|
||||||
#include "nodes/primnodes.h"
|
#include "nodes/primnodes.h"
|
||||||
#include "nodes/print.h"
|
#include "nodes/print.h"
|
||||||
|
@ -68,6 +69,7 @@
|
||||||
#include "distributed/placement_connection.h"
|
#include "distributed/placement_connection.h"
|
||||||
#include "distributed/recursive_planning.h"
|
#include "distributed/recursive_planning.h"
|
||||||
#include "distributed/remote_commands.h"
|
#include "distributed/remote_commands.h"
|
||||||
|
#include "distributed/subplan_execution.h"
|
||||||
#include "distributed/tuple_destination.h"
|
#include "distributed/tuple_destination.h"
|
||||||
#include "distributed/tuplestore.h"
|
#include "distributed/tuplestore.h"
|
||||||
#include "distributed/version_compat.h"
|
#include "distributed/version_compat.h"
|
||||||
|
@ -78,6 +80,7 @@
|
||||||
bool ExplainDistributedQueries = true;
|
bool ExplainDistributedQueries = true;
|
||||||
bool ExplainAllTasks = false;
|
bool ExplainAllTasks = false;
|
||||||
int ExplainAnalyzeSortMethod = EXPLAIN_ANALYZE_SORT_BY_TIME;
|
int ExplainAnalyzeSortMethod = EXPLAIN_ANALYZE_SORT_BY_TIME;
|
||||||
|
extern MemoryContext SubPlanExplainAnalyzeContext;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* If enabled, EXPLAIN ANALYZE output & other statistics of last worker task
|
* If enabled, EXPLAIN ANALYZE output & other statistics of last worker task
|
||||||
|
@ -85,6 +88,8 @@ int ExplainAnalyzeSortMethod = EXPLAIN_ANALYZE_SORT_BY_TIME;
|
||||||
*/
|
*/
|
||||||
static char *SavedExplainPlan = NULL;
|
static char *SavedExplainPlan = NULL;
|
||||||
static double SavedExecutionDurationMillisec = 0.0;
|
static double SavedExecutionDurationMillisec = 0.0;
|
||||||
|
extern SubPlanExplainOutput *SubPlanExplainAnalyzeOutput;
|
||||||
|
int NumTasksOutput = 0;
|
||||||
|
|
||||||
/* struct to save explain flags */
|
/* struct to save explain flags */
|
||||||
typedef struct
|
typedef struct
|
||||||
|
@ -161,6 +166,7 @@ static void ExplainIndentText(ExplainState *es);
|
||||||
static void ExplainPrintSerialize(ExplainState *es,
|
static void ExplainPrintSerialize(ExplainState *es,
|
||||||
SerializeMetrics *metrics);
|
SerializeMetrics *metrics);
|
||||||
static SerializeMetrics GetSerializationMetrics(DestReceiver *dest);
|
static SerializeMetrics GetSerializationMetrics(DestReceiver *dest);
|
||||||
|
static void ExtractAnalyzeStats(DistributedSubPlan *subPlan, PlanState *planState);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* DestReceiver functions for SERIALIZE option
|
* DestReceiver functions for SERIALIZE option
|
||||||
|
@ -210,7 +216,8 @@ static const char * ExplainFormatStr(ExplainFormat format);
|
||||||
#if PG_VERSION_NUM >= PG_VERSION_17
|
#if PG_VERSION_NUM >= PG_VERSION_17
|
||||||
static const char * ExplainSerializeStr(ExplainSerializeOption serializeOption);
|
static const char * ExplainSerializeStr(ExplainSerializeOption serializeOption);
|
||||||
#endif
|
#endif
|
||||||
static void ExplainWorkerPlan(PlannedStmt *plannedStmt, DestReceiver *dest,
|
static void ExplainWorkerPlan(PlannedStmt *plannedStmt, DistributedSubPlan *subPlan,
|
||||||
|
DestReceiver *dest,
|
||||||
ExplainState *es,
|
ExplainState *es,
|
||||||
const char *queryString, ParamListInfo params,
|
const char *queryString, ParamListInfo params,
|
||||||
QueryEnvironment *queryEnv,
|
QueryEnvironment *queryEnv,
|
||||||
|
@ -251,6 +258,7 @@ static double elapsed_time(instr_time *starttime);
|
||||||
static void ExplainPropertyBytes(const char *qlabel, int64 bytes, ExplainState *es);
|
static void ExplainPropertyBytes(const char *qlabel, int64 bytes, ExplainState *es);
|
||||||
static uint64 TaskReceivedTupleData(Task *task);
|
static uint64 TaskReceivedTupleData(Task *task);
|
||||||
static bool ShowReceivedTupleData(CitusScanState *scanState, ExplainState *es);
|
static bool ShowReceivedTupleData(CitusScanState *scanState, ExplainState *es);
|
||||||
|
static void ParseExplainAnalyzeOutput(char *explainOutput, Instrumentation *instr);
|
||||||
|
|
||||||
|
|
||||||
/* exports for SQL callable functions */
|
/* exports for SQL callable functions */
|
||||||
|
@ -427,6 +435,63 @@ NonPushableMergeCommandExplainScan(CustomScanState *node, List *ancestors,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* ExtractAnalyzeStats parses the EXPLAIN ANALYZE output of the pre-executed
|
||||||
|
* subplans and injects the parsed statistics into queryDesc->planstate->instrument.
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
ExtractAnalyzeStats(DistributedSubPlan *subPlan, PlanState *planState)
|
||||||
|
{
|
||||||
|
if (!planState || !IsA(planState, CustomScanState))
|
||||||
|
{
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Inject the earlier executed results into the newly created tasks */
|
||||||
|
if (subPlan->numTasksOutput)
|
||||||
|
{
|
||||||
|
DistributedPlan *newdistributedPlan =
|
||||||
|
((CitusScanState *) planState)->distributedPlan;
|
||||||
|
|
||||||
|
ListCell *lc;
|
||||||
|
int idx = 0;
|
||||||
|
|
||||||
|
/* We need to extract this from the explain output of workers */
|
||||||
|
Instrumentation *instr = planState->instrument;
|
||||||
|
memset(instr, 0, sizeof(Instrumentation));
|
||||||
|
|
||||||
|
foreach(lc, newdistributedPlan->workerJob->taskList)
|
||||||
|
{
|
||||||
|
if (subPlan->totalExplainOutput[idx].explainOutput &&
|
||||||
|
idx < subPlan->numTasksOutput)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* Now feed the earlier saved output, which will be used
|
||||||
|
* by RemoteExplain() when printing tasks
|
||||||
|
*/
|
||||||
|
Task *task = (Task *) lfirst(lc);
|
||||||
|
MemoryContext taskContext = GetMemoryChunkContext(task);
|
||||||
|
|
||||||
|
task->totalReceivedTupleData =
|
||||||
|
subPlan->totalExplainOutput[idx].totalReceivedTupleData;
|
||||||
|
task->fetchedExplainAnalyzeExecutionDuration =
|
||||||
|
subPlan->totalExplainOutput[idx].executionDuration;
|
||||||
|
task->fetchedExplainAnalyzePlan =
|
||||||
|
MemoryContextStrdup(taskContext,
|
||||||
|
subPlan->totalExplainOutput[idx].explainOutput);
|
||||||
|
ParseExplainAnalyzeOutput(task->fetchedExplainAnalyzePlan, instr);
|
||||||
|
|
||||||
|
subPlan->totalExplainOutput[idx].explainOutput = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
idx++;
|
||||||
|
}
|
||||||
|
|
||||||
|
planState->instrument->nloops = 1; /* TODO */
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* ExplainSubPlans generates EXPLAIN output for subplans for CTEs
|
* ExplainSubPlans generates EXPLAIN output for subplans for CTEs
|
||||||
* and complex subqueries. Because the planning for these queries
|
* and complex subqueries. Because the planning for these queries
|
||||||
|
@ -445,7 +510,6 @@ ExplainSubPlans(DistributedPlan *distributedPlan, ExplainState *es)
|
||||||
{
|
{
|
||||||
DistributedSubPlan *subPlan = (DistributedSubPlan *) lfirst(subPlanCell);
|
DistributedSubPlan *subPlan = (DistributedSubPlan *) lfirst(subPlanCell);
|
||||||
PlannedStmt *plan = subPlan->plan;
|
PlannedStmt *plan = subPlan->plan;
|
||||||
IntoClause *into = NULL;
|
|
||||||
ParamListInfo params = NULL;
|
ParamListInfo params = NULL;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -453,7 +517,7 @@ ExplainSubPlans(DistributedPlan *distributedPlan, ExplainState *es)
|
||||||
* for now we put an empty string, which is valid according to postgres.
|
* for now we put an empty string, which is valid according to postgres.
|
||||||
*/
|
*/
|
||||||
char *queryString = pstrdup("");
|
char *queryString = pstrdup("");
|
||||||
instr_time planduration;
|
instr_time planDuration;
|
||||||
BufferUsage bufusage_start,
|
BufferUsage bufusage_start,
|
||||||
bufusage;
|
bufusage;
|
||||||
|
|
||||||
|
@ -518,7 +582,7 @@ ExplainSubPlans(DistributedPlan *distributedPlan, ExplainState *es)
|
||||||
ExplainPropertyText("Result destination", destination->data, es);
|
ExplainPropertyText("Result destination", destination->data, es);
|
||||||
}
|
}
|
||||||
|
|
||||||
INSTR_TIME_SET_ZERO(planduration);
|
INSTR_TIME_SET_ZERO(planDuration);
|
||||||
|
|
||||||
/* calc differences of buffer counters. */
|
/* calc differences of buffer counters. */
|
||||||
if (es->buffers)
|
if (es->buffers)
|
||||||
|
@ -529,6 +593,10 @@ ExplainSubPlans(DistributedPlan *distributedPlan, ExplainState *es)
|
||||||
|
|
||||||
ExplainOpenGroup("PlannedStmt", "PlannedStmt", false, es);
|
ExplainOpenGroup("PlannedStmt", "PlannedStmt", false, es);
|
||||||
|
|
||||||
|
/* TODO: Do we really need dest and bound params when not executing? */
|
||||||
|
DestReceiver *dest = None_Receiver;
|
||||||
|
double executionDurationMillisec = 0.0;
|
||||||
|
|
||||||
#if PG_VERSION_NUM >= PG_VERSION_17
|
#if PG_VERSION_NUM >= PG_VERSION_17
|
||||||
if (es->memory)
|
if (es->memory)
|
||||||
{
|
{
|
||||||
|
@ -536,12 +604,24 @@ ExplainSubPlans(DistributedPlan *distributedPlan, ExplainState *es)
|
||||||
MemoryContextMemConsumed(planner_ctx, &mem_counters);
|
MemoryContextMemConsumed(planner_ctx, &mem_counters);
|
||||||
}
|
}
|
||||||
|
|
||||||
ExplainOnePlan(plan, into, es, queryString, params, NULL, &planduration,
|
/* calc differences of buffer counters. */
|
||||||
(es->buffers ? &bufusage : NULL),
|
if (es->buffers)
|
||||||
(es->memory ? &mem_counters : NULL));
|
{
|
||||||
|
memset(&bufusage, 0, sizeof(BufferUsage));
|
||||||
|
BufferUsageAccumDiff(&bufusage, &pgBufferUsage, &bufusage_start);
|
||||||
|
}
|
||||||
|
|
||||||
|
/* do the actual EXPLAIN ANALYZE */
|
||||||
|
ExplainWorkerPlan(plan, subPlan, dest, es, queryString, params, NULL,
|
||||||
|
&planDuration,
|
||||||
|
(es->buffers ? &bufusage : NULL),
|
||||||
|
(es->memory ? &mem_counters : NULL),
|
||||||
|
&executionDurationMillisec);
|
||||||
#else
|
#else
|
||||||
ExplainOnePlan(plan, into, es, queryString, params, NULL, &planduration,
|
|
||||||
(es->buffers ? &bufusage : NULL));
|
/* do the actual EXPLAIN ANALYZE */
|
||||||
|
ExplainWorkerPlan(plan, subPlan, dest, es, queryString, params, NULL,
|
||||||
|
&planDuration, &executionDurationMillisec);
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
ExplainCloseGroup("PlannedStmt", "PlannedStmt", false, es);
|
ExplainCloseGroup("PlannedStmt", "PlannedStmt", false, es);
|
||||||
|
@ -1359,7 +1439,7 @@ worker_save_query_explain_analyze(PG_FUNCTION_ARGS)
|
||||||
}
|
}
|
||||||
|
|
||||||
/* do the actual EXPLAIN ANALYZE */
|
/* do the actual EXPLAIN ANALYZE */
|
||||||
ExplainWorkerPlan(plan, tupleStoreDest, es, queryString, boundParams, NULL,
|
ExplainWorkerPlan(plan, NULL, tupleStoreDest, es, queryString, boundParams, NULL,
|
||||||
&planDuration,
|
&planDuration,
|
||||||
(es->buffers ? &bufusage : NULL),
|
(es->buffers ? &bufusage : NULL),
|
||||||
(es->memory ? &mem_counters : NULL),
|
(es->memory ? &mem_counters : NULL),
|
||||||
|
@ -1367,7 +1447,7 @@ worker_save_query_explain_analyze(PG_FUNCTION_ARGS)
|
||||||
#else
|
#else
|
||||||
|
|
||||||
/* do the actual EXPLAIN ANALYZE */
|
/* do the actual EXPLAIN ANALYZE */
|
||||||
ExplainWorkerPlan(plan, tupleStoreDest, es, queryString, boundParams, NULL,
|
ExplainWorkerPlan(plan, NULL, tupleStoreDest, es, queryString, boundParams, NULL,
|
||||||
&planDuration, &executionDurationMillisec);
|
&planDuration, &executionDurationMillisec);
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
@ -1621,6 +1701,11 @@ ExplainAnalyzeDestPutTuple(TupleDestination *self, Task *task,
|
||||||
originalTupDest->putTuple(originalTupDest, task, placementIndex, 0, heapTuple,
|
originalTupDest->putTuple(originalTupDest, task, placementIndex, 0, heapTuple,
|
||||||
tupleLibpqSize);
|
tupleLibpqSize);
|
||||||
tupleDestination->originalTask->totalReceivedTupleData += tupleLibpqSize;
|
tupleDestination->originalTask->totalReceivedTupleData += tupleLibpqSize;
|
||||||
|
if (SubPlanExplainAnalyzeContext && NumTasksOutput < MAX_ANALYZE_OUTPUT)
|
||||||
|
{
|
||||||
|
SubPlanExplainAnalyzeOutput[NumTasksOutput].totalReceivedTupleData =
|
||||||
|
tupleDestination->originalTask->totalReceivedTupleData;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
else if (queryNumber == 1)
|
else if (queryNumber == 1)
|
||||||
{
|
{
|
||||||
|
@ -1670,6 +1755,17 @@ ExplainAnalyzeDestPutTuple(TupleDestination *self, Task *task,
|
||||||
placementIndex;
|
placementIndex;
|
||||||
tupleDestination->originalTask->fetchedExplainAnalyzeExecutionDuration =
|
tupleDestination->originalTask->fetchedExplainAnalyzeExecutionDuration =
|
||||||
fetchedExplainAnalyzeExecutionDuration;
|
fetchedExplainAnalyzeExecutionDuration;
|
||||||
|
|
||||||
|
/* We should build tupleDestination in subPlan similar to the above */
|
||||||
|
if (SubPlanExplainAnalyzeContext && NumTasksOutput < MAX_ANALYZE_OUTPUT)
|
||||||
|
{
|
||||||
|
SubPlanExplainAnalyzeOutput[NumTasksOutput].explainOutput =
|
||||||
|
MemoryContextStrdup(SubPlanExplainAnalyzeContext,
|
||||||
|
fetchedExplainAnalyzePlan);
|
||||||
|
SubPlanExplainAnalyzeOutput[NumTasksOutput].executionDuration =
|
||||||
|
fetchedExplainAnalyzeExecutionDuration;
|
||||||
|
NumTasksOutput++;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
|
@ -1732,7 +1828,14 @@ ExplainAnalyzeDestTupleDescForQuery(TupleDestination *self, int queryNumber)
|
||||||
bool
|
bool
|
||||||
RequestedForExplainAnalyze(CitusScanState *node)
|
RequestedForExplainAnalyze(CitusScanState *node)
|
||||||
{
|
{
|
||||||
return (node->customScanState.ss.ps.state->es_instrument != 0);
|
/*
|
||||||
|
* When running a distributed plan—either the root plan or a subplan’s
|
||||||
|
* distributed fragment—we need to know if we’re under EXPLAIN ANALYZE.
|
||||||
|
* Subplans can’t receive the EXPLAIN ANALYZE flag directly, so we use
|
||||||
|
* SubPlanExplainAnalyzeContext as a flag to indicate that context.
|
||||||
|
*/
|
||||||
|
return (node->customScanState.ss.ps.state->es_instrument != 0) ||
|
||||||
|
(SubPlanLevel > 0 && SubPlanExplainAnalyzeContext);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -2045,6 +2148,19 @@ ExplainOneQuery(Query *query, int cursorOptions,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static bool
|
||||||
|
extract_analyze_walker(PlanState *ps, void *ctx)
|
||||||
|
{
|
||||||
|
DistributedSubPlan *subplan = (DistributedSubPlan *) ctx;
|
||||||
|
|
||||||
|
/* call your extractor on every node */
|
||||||
|
ExtractAnalyzeStats(subplan, ps);
|
||||||
|
|
||||||
|
/* return false to keep recursing into children */
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* ExplainWorkerPlan produces explain output into es. If es->analyze, it also executes
|
* ExplainWorkerPlan produces explain output into es. If es->analyze, it also executes
|
||||||
* the given plannedStmt and sends the results to dest. It puts total time to execute in
|
* the given plannedStmt and sends the results to dest. It puts total time to execute in
|
||||||
|
@ -2059,7 +2175,7 @@ ExplainOneQuery(Query *query, int cursorOptions,
|
||||||
* destination.
|
* destination.
|
||||||
*/
|
*/
|
||||||
static void
|
static void
|
||||||
ExplainWorkerPlan(PlannedStmt *plannedstmt, DestReceiver *dest, ExplainState *es,
|
ExplainWorkerPlan(PlannedStmt *plannedstmt, DistributedSubPlan *subPlan, DestReceiver *dest, ExplainState *es,
|
||||||
const char *queryString, ParamListInfo params, QueryEnvironment *queryEnv,
|
const char *queryString, ParamListInfo params, QueryEnvironment *queryEnv,
|
||||||
const instr_time *planduration,
|
const instr_time *planduration,
|
||||||
#if PG_VERSION_NUM >= PG_VERSION_17
|
#if PG_VERSION_NUM >= PG_VERSION_17
|
||||||
|
@ -2073,6 +2189,8 @@ ExplainWorkerPlan(PlannedStmt *plannedstmt, DestReceiver *dest, ExplainState *es
|
||||||
double totaltime = 0;
|
double totaltime = 0;
|
||||||
int eflags;
|
int eflags;
|
||||||
int instrument_option = 0;
|
int instrument_option = 0;
|
||||||
|
/* Never executed a sub-plan, it's already done */
|
||||||
|
bool executeQuery = (es->analyze && !subPlan);
|
||||||
|
|
||||||
Assert(plannedstmt->commandType != CMD_UTILITY);
|
Assert(plannedstmt->commandType != CMD_UTILITY);
|
||||||
|
|
||||||
|
@ -2107,7 +2225,7 @@ ExplainWorkerPlan(PlannedStmt *plannedstmt, DestReceiver *dest, ExplainState *es
|
||||||
dest, params, queryEnv, instrument_option);
|
dest, params, queryEnv, instrument_option);
|
||||||
|
|
||||||
/* Select execution options */
|
/* Select execution options */
|
||||||
if (es->analyze)
|
if (executeQuery)
|
||||||
eflags = 0; /* default run-to-completion flags */
|
eflags = 0; /* default run-to-completion flags */
|
||||||
else
|
else
|
||||||
eflags = EXEC_FLAG_EXPLAIN_ONLY;
|
eflags = EXEC_FLAG_EXPLAIN_ONLY;
|
||||||
|
@ -2116,7 +2234,7 @@ ExplainWorkerPlan(PlannedStmt *plannedstmt, DestReceiver *dest, ExplainState *es
|
||||||
ExecutorStart(queryDesc, eflags);
|
ExecutorStart(queryDesc, eflags);
|
||||||
|
|
||||||
/* Execute the plan for statistics if asked for */
|
/* Execute the plan for statistics if asked for */
|
||||||
if (es->analyze)
|
if (executeQuery)
|
||||||
{
|
{
|
||||||
ScanDirection dir = ForwardScanDirection;
|
ScanDirection dir = ForwardScanDirection;
|
||||||
|
|
||||||
|
@ -2132,6 +2250,14 @@ ExplainWorkerPlan(PlannedStmt *plannedstmt, DestReceiver *dest, ExplainState *es
|
||||||
|
|
||||||
ExplainOpenGroup("Query", NULL, true, es);
|
ExplainOpenGroup("Query", NULL, true, es);
|
||||||
|
|
||||||
|
if (subPlan)
|
||||||
|
{
|
||||||
|
ExtractAnalyzeStats(subPlan, queryDesc->planstate);
|
||||||
|
/* …then, once you’ve got your top‐level PlanState (e.g. planstate of your subplan): */
|
||||||
|
planstate_tree_walker(queryDesc->planstate, extract_analyze_walker, (void *) subPlan);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
/* Create textual dump of plan tree */
|
/* Create textual dump of plan tree */
|
||||||
ExplainPrintPlan(es, queryDesc);
|
ExplainPrintPlan(es, queryDesc);
|
||||||
|
|
||||||
|
@ -2211,7 +2337,7 @@ ExplainWorkerPlan(PlannedStmt *plannedstmt, DestReceiver *dest, ExplainState *es
|
||||||
PopActiveSnapshot();
|
PopActiveSnapshot();
|
||||||
|
|
||||||
/* We need a CCI just in case query expanded to multiple plans */
|
/* We need a CCI just in case query expanded to multiple plans */
|
||||||
if (es->analyze)
|
if (executeQuery)
|
||||||
CommandCounterIncrement();
|
CommandCounterIncrement();
|
||||||
|
|
||||||
totaltime += elapsed_time(&starttime);
|
totaltime += elapsed_time(&starttime);
|
||||||
|
@ -2248,6 +2374,115 @@ elapsed_time(instr_time *starttime)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* ParseExplainAnalyzeOutput
|
||||||
|
*
|
||||||
|
* Parses the output of an EXPLAIN ANALYZE run to extract ANALYZE statistics
|
||||||
|
* and other instrumentation data.
|
||||||
|
*
|
||||||
|
* Parameters:
|
||||||
|
* explainOutput - a null-terminated string containing the raw EXPLAIN ANALYZE output
|
||||||
|
* instr - pointer to an Instrumentation struct to accumulate parsed metrics
|
||||||
|
*
|
||||||
|
* Behavior:
|
||||||
|
* - Validates inputs and returns immediately if either pointer is NULL.
|
||||||
|
* - Tokenizes the output on spaces, parentheses, and newlines.
|
||||||
|
* - Populates the instr-><..> fields.
|
||||||
|
*
|
||||||
|
* Notes:
|
||||||
|
* - Caller must free or manage the memory for explainOutput.
|
||||||
|
* - Parsing is case-sensitive and assumes standard EXPLAIN ANALYZE formatting.
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
ParseExplainAnalyzeOutput(char *explainOutput, Instrumentation *instr)
|
||||||
|
{
|
||||||
|
/* elog(NOTICE, "Parsing :%s ", explainOutput); */
|
||||||
|
char *token, *saveptr;
|
||||||
|
|
||||||
|
/* Validate input */
|
||||||
|
if (explainOutput == NULL || instr == NULL)
|
||||||
|
return;
|
||||||
|
|
||||||
|
char *line = pstrdup(explainOutput);
|
||||||
|
|
||||||
|
bool inWal = false;
|
||||||
|
bool inResult = false;
|
||||||
|
|
||||||
|
/* split on spaces, parentheses or newlines */
|
||||||
|
for (token = strtok_r(line, " ()\n", &saveptr);
|
||||||
|
token != NULL;
|
||||||
|
token = strtok_r(NULL, " ()\n", &saveptr))
|
||||||
|
{
|
||||||
|
if (strcmp(token, "WAL:") == 0)
|
||||||
|
{
|
||||||
|
inWal = true;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (strcmp(token, "Result") == 0)
|
||||||
|
{
|
||||||
|
inResult = true;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Reset Result flag when we see "actual" - but only skip if we're in Result mode */
|
||||||
|
if (strcmp(token, "actual") == 0)
|
||||||
|
{
|
||||||
|
/* If we were in Result mode, the next tokens should be skipped */
|
||||||
|
/* If we weren't in Result mode, continue normally */
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (inWal)
|
||||||
|
{
|
||||||
|
if (strncmp(token, "records=", 8) == 0)
|
||||||
|
instr->walusage.wal_records += strtoul(token + 8, NULL, 10);
|
||||||
|
else if (strncmp(token, "bytes=", 6) == 0)
|
||||||
|
{
|
||||||
|
instr->walusage.wal_bytes += strtoul(token + 6, NULL, 10);
|
||||||
|
/* once we've seen bytes=, we can leave WAL mode */
|
||||||
|
inWal = false;
|
||||||
|
}
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Skip Result node's actual timing data */
|
||||||
|
if (inResult)
|
||||||
|
{
|
||||||
|
if (strncmp(token, "time=", 5) == 0 ||
|
||||||
|
strncmp(token, "rows=", 5) == 0 ||
|
||||||
|
strncmp(token, "loops=", 6) == 0)
|
||||||
|
{
|
||||||
|
/* If this is loops=, we've seen all Result data */
|
||||||
|
if (strncmp(token, "loops=", 6) == 0)
|
||||||
|
inResult = false;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (strncmp(token, "time=", 5) == 0)
|
||||||
|
{
|
||||||
|
/* token is "time=X..Y" */
|
||||||
|
char *timeStr = token + 5;
|
||||||
|
char *doubleDot = strstr(timeStr, "..");
|
||||||
|
if (doubleDot)
|
||||||
|
{
|
||||||
|
*doubleDot = '\0';
|
||||||
|
instr->startup += strtod(timeStr, NULL) / 1000.0;
|
||||||
|
instr->total += strtod(doubleDot + 2, NULL) / 1000.0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else if (strncmp(token, "rows=", 5) == 0)
|
||||||
|
{
|
||||||
|
instr->ntuples += strtol(token + 5, NULL, 10);
|
||||||
|
break; /* We are done for this Task */
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pfree(line);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
#if PG_VERSION_NUM >= PG_VERSION_17
|
#if PG_VERSION_NUM >= PG_VERSION_17
|
||||||
/*
|
/*
|
||||||
* Return whether show_buffer_usage would have anything to print, if given
|
* Return whether show_buffer_usage would have anything to print, if given
|
||||||
|
|
|
@ -147,6 +147,27 @@ CopyNodeDistributedSubPlan(COPYFUNC_ARGS)
|
||||||
|
|
||||||
COPY_SCALAR_FIELD(subPlanId);
|
COPY_SCALAR_FIELD(subPlanId);
|
||||||
COPY_NODE_FIELD(plan);
|
COPY_NODE_FIELD(plan);
|
||||||
|
COPY_SCALAR_FIELD(bytesSentPerWorker);
|
||||||
|
COPY_SCALAR_FIELD(remoteWorkerCount);
|
||||||
|
COPY_SCALAR_FIELD(durationMillisecs);
|
||||||
|
COPY_SCALAR_FIELD(writeLocalFile);
|
||||||
|
|
||||||
|
MemSet(newnode->totalExplainOutput, 0, sizeof(newnode->totalExplainOutput));
|
||||||
|
|
||||||
|
/* copy each SubPlanExplainOutput element */
|
||||||
|
for (int i = 0; i < MAX_ANALYZE_OUTPUT; i++)
|
||||||
|
{
|
||||||
|
/* copy the explainOutput string pointer */
|
||||||
|
COPY_STRING_FIELD(totalExplainOutput[i].explainOutput);
|
||||||
|
|
||||||
|
/* copy the executionDuration (double) */
|
||||||
|
COPY_SCALAR_FIELD(totalExplainOutput[i].executionDuration);
|
||||||
|
|
||||||
|
/* copy the totalReceivedTupleData (uint64) */
|
||||||
|
COPY_SCALAR_FIELD(totalExplainOutput[i].totalReceivedTupleData);
|
||||||
|
}
|
||||||
|
|
||||||
|
COPY_SCALAR_FIELD(numTasksOutput);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -215,6 +215,46 @@ OutDistributedSubPlan(OUTFUNC_ARGS)
|
||||||
|
|
||||||
WRITE_UINT_FIELD(subPlanId);
|
WRITE_UINT_FIELD(subPlanId);
|
||||||
WRITE_NODE_FIELD(plan);
|
WRITE_NODE_FIELD(plan);
|
||||||
|
WRITE_UINT64_FIELD(bytesSentPerWorker);
|
||||||
|
WRITE_INT_FIELD(remoteWorkerCount);
|
||||||
|
WRITE_FLOAT_FIELD(durationMillisecs, "%.2f");
|
||||||
|
WRITE_BOOL_FIELD(writeLocalFile);
|
||||||
|
|
||||||
|
appendStringInfoString(str, " totalExplainOutput [");
|
||||||
|
for (int i = 0; i < MAX_ANALYZE_OUTPUT; i++)
|
||||||
|
{
|
||||||
|
const SubPlanExplainOutput *e = &node->totalExplainOutput[i];
|
||||||
|
|
||||||
|
/* skip empty slots */
|
||||||
|
if (e->explainOutput == NULL &&
|
||||||
|
e->executionDuration == 0
|
||||||
|
&& e->totalReceivedTupleData == 0)
|
||||||
|
{
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (i > 0)
|
||||||
|
{
|
||||||
|
appendStringInfoChar(str, ' ');
|
||||||
|
}
|
||||||
|
|
||||||
|
appendStringInfoChar(str, '(');
|
||||||
|
|
||||||
|
/* string pointer – prints quoted or NULL */
|
||||||
|
WRITE_STRING_FIELD(totalExplainOutput[i].explainOutput);
|
||||||
|
|
||||||
|
/* double field */
|
||||||
|
WRITE_FLOAT_FIELD(totalExplainOutput[i].executionDuration, "%.2f");
|
||||||
|
|
||||||
|
/* 64-bit unsigned – use the uint64 macro */
|
||||||
|
WRITE_UINT64_FIELD(totalExplainOutput[i].totalReceivedTupleData);
|
||||||
|
|
||||||
|
appendStringInfoChar(str, ')');
|
||||||
|
}
|
||||||
|
|
||||||
|
WRITE_INT_FIELD(numTasksOutput);
|
||||||
|
|
||||||
|
appendStringInfoChar(str, ']');
|
||||||
}
|
}
|
||||||
|
|
||||||
void
|
void
|
||||||
|
|
|
@ -28,6 +28,7 @@
|
||||||
|
|
||||||
#define CURSOR_OPT_FORCE_DISTRIBUTED 0x080000
|
#define CURSOR_OPT_FORCE_DISTRIBUTED 0x080000
|
||||||
|
|
||||||
|
#define MAX_ANALYZE_OUTPUT 32
|
||||||
|
|
||||||
/* level of planner calls */
|
/* level of planner calls */
|
||||||
extern int PlannerLevel;
|
extern int PlannerLevel;
|
||||||
|
|
|
@ -474,6 +474,17 @@ typedef struct DistributedPlan
|
||||||
} DistributedPlan;
|
} DistributedPlan;
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
typedef struct SubPlanExplainOutput
|
||||||
|
{
|
||||||
|
char *explainOutput;
|
||||||
|
double executionDuration;
|
||||||
|
uint64 totalReceivedTupleData;
|
||||||
|
} SubPlanExplainOutput;
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* DistributedSubPlan contains a subplan of a distributed plan. Subplans are
|
* DistributedSubPlan contains a subplan of a distributed plan. Subplans are
|
||||||
* executed before the distributed query and their results are written to
|
* executed before the distributed query and their results are written to
|
||||||
|
@ -492,6 +503,8 @@ typedef struct DistributedSubPlan
|
||||||
uint32 remoteWorkerCount;
|
uint32 remoteWorkerCount;
|
||||||
double durationMillisecs;
|
double durationMillisecs;
|
||||||
bool writeLocalFile;
|
bool writeLocalFile;
|
||||||
|
SubPlanExplainOutput totalExplainOutput[MAX_ANALYZE_OUTPUT];
|
||||||
|
uint32 numTasksOutput;
|
||||||
} DistributedSubPlan;
|
} DistributedSubPlan;
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -17,7 +17,7 @@
|
||||||
extern int MaxIntermediateResult;
|
extern int MaxIntermediateResult;
|
||||||
extern int SubPlanLevel;
|
extern int SubPlanLevel;
|
||||||
|
|
||||||
extern void ExecuteSubPlans(DistributedPlan *distributedPlan);
|
extern void ExecuteSubPlans(DistributedPlan *distributedPlan, bool explainAnalyzeEnabled);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* IntermediateResultsHashEntry is used to store which nodes need to receive
|
* IntermediateResultsHashEntry is used to store which nodes need to receive
|
||||||
|
|
|
@ -2492,12 +2492,12 @@ Custom Scan (Citus Adaptive) (actual rows=1 loops=1)
|
||||||
-> Distributed Subplan XXX_1
|
-> Distributed Subplan XXX_1
|
||||||
Intermediate Data Size: 100 bytes
|
Intermediate Data Size: 100 bytes
|
||||||
Result destination: Write locally
|
Result destination: Write locally
|
||||||
-> Custom Scan (Citus Adaptive) (actual rows=20 loops=1)
|
-> Custom Scan (Citus Adaptive) (actual rows=10 loops=1)
|
||||||
Task Count: 4
|
Task Count: 4
|
||||||
Tuple data received from nodes: 160 bytes
|
Tuple data received from nodes: 80 bytes
|
||||||
Tasks Shown: One of 4
|
Tasks Shown: One of 4
|
||||||
-> Task
|
-> Task
|
||||||
Tuple data received from node: 64 bytes
|
Tuple data received from node: 32 bytes
|
||||||
Node: host=localhost port=xxxxx dbname=regression
|
Node: host=localhost port=xxxxx dbname=regression
|
||||||
-> Insert on dist_table_570017 citus_table_alias (actual rows=8 loops=1)
|
-> Insert on dist_table_570017 citus_table_alias (actual rows=8 loops=1)
|
||||||
-> Seq Scan on dist_table_570017 dist_table (actual rows=8 loops=1)
|
-> Seq Scan on dist_table_570017 dist_table (actual rows=8 loops=1)
|
||||||
|
@ -3228,6 +3228,47 @@ Custom Scan (Citus Adaptive) (actual rows=0 loops=1)
|
||||||
-> Update on tbl_570036 tbl (actual rows=0 loops=1)
|
-> Update on tbl_570036 tbl (actual rows=0 loops=1)
|
||||||
-> Seq Scan on tbl_570036 tbl (actual rows=0 loops=1)
|
-> Seq Scan on tbl_570036 tbl (actual rows=0 loops=1)
|
||||||
Filter: (a = 1)
|
Filter: (a = 1)
|
||||||
|
-- EXPLAIN ANALYZE shouldn't execute SubPlans twice (bug #4212)
|
||||||
|
SET search_path TO multi_explain;
|
||||||
|
CREATE TABLE test_subplans (x int primary key, y int);
|
||||||
|
SELECT create_distributed_table('test_subplans','x');
|
||||||
|
|
||||||
|
EXPLAIN (COSTS off, ANALYZE on, TIMING off, SUMMARY off)
|
||||||
|
WITH a AS (INSERT INTO test_subplans VALUES (1,2) RETURNING *)
|
||||||
|
SELECT * FROM a;
|
||||||
|
Custom Scan (Citus Adaptive) (actual rows=1 loops=1)
|
||||||
|
-> Distributed Subplan XXX_1
|
||||||
|
Intermediate Data Size: 18 bytes
|
||||||
|
Result destination: Write locally
|
||||||
|
-> Custom Scan (Citus Adaptive) (actual rows=1 loops=1)
|
||||||
|
Task Count: 1
|
||||||
|
Tuple data received from nodes: 16 bytes
|
||||||
|
Tasks Shown: All
|
||||||
|
-> Task
|
||||||
|
Tuple data received from node: 16 bytes
|
||||||
|
Node: host=localhost port=xxxxx dbname=regression
|
||||||
|
-> Insert on test_subplans_570038 (actual rows=1 loops=1)
|
||||||
|
-> Result (actual rows=1 loops=1)
|
||||||
|
Task Count: 1
|
||||||
|
Tuple data received from nodes: 8 bytes
|
||||||
|
Tasks Shown: All
|
||||||
|
-> Task
|
||||||
|
Tuple data received from node: 8 bytes
|
||||||
|
Node: host=localhost port=xxxxx dbname=regression
|
||||||
|
-> Function Scan on read_intermediate_result intermediate_result (actual rows=1 loops=1)
|
||||||
|
-- Only one row must exist
|
||||||
|
SELECT * FROM test_subplans;
|
||||||
|
1|2
|
||||||
|
-- Will fail with duplicate pk
|
||||||
|
EXPLAIN (COSTS off, ANALYZE on, TIMING off, SUMMARY off)
|
||||||
|
WITH a AS (INSERT INTO test_subplans VALUES (1,2) RETURNING *)
|
||||||
|
SELECT * FROM a;
|
||||||
|
ERROR: duplicate key value violates unique constraint "test_subplans_pkey_570038"
|
||||||
|
DETAIL: Key (x)=(1) already exists.
|
||||||
|
CONTEXT: while executing command on localhost:xxxxx
|
||||||
|
-- Only one row must exist
|
||||||
|
SELECT * FROM test_subplans;
|
||||||
|
1|2
|
||||||
-- check when auto explain + analyze is enabled, we do not allow local execution.
|
-- check when auto explain + analyze is enabled, we do not allow local execution.
|
||||||
CREATE SCHEMA test_auto_explain;
|
CREATE SCHEMA test_auto_explain;
|
||||||
SET search_path TO 'test_auto_explain';
|
SET search_path TO 'test_auto_explain';
|
||||||
|
|
|
@ -721,13 +721,11 @@ CALL exec_query_and_check_query_counters($$
|
||||||
0, 0
|
0, 0
|
||||||
);
|
);
|
||||||
-- same with explain analyze
|
-- same with explain analyze
|
||||||
--
|
|
||||||
-- this time, query_execution_multi_shard is incremented twice because of #4212
|
|
||||||
CALL exec_query_and_check_query_counters($$
|
CALL exec_query_and_check_query_counters($$
|
||||||
EXPLAIN (ANALYZE)
|
EXPLAIN (ANALYZE)
|
||||||
SELECT * FROM (SELECT * FROM dist_table OFFSET 0) q
|
SELECT * FROM (SELECT * FROM dist_table OFFSET 0) q
|
||||||
$$,
|
$$,
|
||||||
1, 2
|
1, 1
|
||||||
);
|
);
|
||||||
CALL exec_query_and_check_query_counters($$
|
CALL exec_query_and_check_query_counters($$
|
||||||
DELETE FROM dist_table WHERE a = 1
|
DELETE FROM dist_table WHERE a = 1
|
||||||
|
@ -1041,9 +1039,6 @@ PL/pgSQL function exec_query_and_check_query_counters(text,bigint,bigint) line X
|
||||||
-- A similar one but without the insert, so we would normally expect 2 increments
|
-- A similar one but without the insert, so we would normally expect 2 increments
|
||||||
-- for query_execution_single_shard and 2 for query_execution_multi_shard instead
|
-- for query_execution_single_shard and 2 for query_execution_multi_shard instead
|
||||||
-- of 3 since the insert is not there anymore.
|
-- of 3 since the insert is not there anymore.
|
||||||
--
|
|
||||||
-- But this time we observe more counter increments because we execute the subplans
|
|
||||||
-- twice because of #4212.
|
|
||||||
CALL exec_query_and_check_query_counters($$
|
CALL exec_query_and_check_query_counters($$
|
||||||
EXPLAIN (ANALYZE)
|
EXPLAIN (ANALYZE)
|
||||||
-- single-shard subplan (whole cte)
|
-- single-shard subplan (whole cte)
|
||||||
|
@ -1057,7 +1052,7 @@ CALL exec_query_and_check_query_counters($$
|
||||||
FROM (SELECT * FROM dist_table_1 ORDER BY a LIMIT 16) q -- multi-shard subplan (subquery q)
|
FROM (SELECT * FROM dist_table_1 ORDER BY a LIMIT 16) q -- multi-shard subplan (subquery q)
|
||||||
JOIN cte ON q.a = cte.a
|
JOIN cte ON q.a = cte.a
|
||||||
$$,
|
$$,
|
||||||
3, 4
|
2, 2
|
||||||
);
|
);
|
||||||
-- safe to push-down
|
-- safe to push-down
|
||||||
CALL exec_query_and_check_query_counters($$
|
CALL exec_query_and_check_query_counters($$
|
||||||
|
|
|
@ -1166,6 +1166,26 @@ PREPARE q2(int_wrapper_type) AS WITH a AS (UPDATE tbl SET b = $1 WHERE a = 1 RET
|
||||||
EXPLAIN (COSTS false) EXECUTE q2('(1)');
|
EXPLAIN (COSTS false) EXECUTE q2('(1)');
|
||||||
EXPLAIN :default_analyze_flags EXECUTE q2('(1)');
|
EXPLAIN :default_analyze_flags EXECUTE q2('(1)');
|
||||||
|
|
||||||
|
-- EXPLAIN ANALYZE shouldn't execute SubPlans twice (bug #4212)
|
||||||
|
SET search_path TO multi_explain;
|
||||||
|
CREATE TABLE test_subplans (x int primary key, y int);
|
||||||
|
SELECT create_distributed_table('test_subplans','x');
|
||||||
|
|
||||||
|
EXPLAIN (COSTS off, ANALYZE on, TIMING off, SUMMARY off)
|
||||||
|
WITH a AS (INSERT INTO test_subplans VALUES (1,2) RETURNING *)
|
||||||
|
SELECT * FROM a;
|
||||||
|
|
||||||
|
-- Only one row must exist
|
||||||
|
SELECT * FROM test_subplans;
|
||||||
|
|
||||||
|
-- Will fail with duplicate pk
|
||||||
|
EXPLAIN (COSTS off, ANALYZE on, TIMING off, SUMMARY off)
|
||||||
|
WITH a AS (INSERT INTO test_subplans VALUES (1,2) RETURNING *)
|
||||||
|
SELECT * FROM a;
|
||||||
|
|
||||||
|
-- Only one row must exist
|
||||||
|
SELECT * FROM test_subplans;
|
||||||
|
|
||||||
-- check when auto explain + analyze is enabled, we do not allow local execution.
|
-- check when auto explain + analyze is enabled, we do not allow local execution.
|
||||||
CREATE SCHEMA test_auto_explain;
|
CREATE SCHEMA test_auto_explain;
|
||||||
SET search_path TO 'test_auto_explain';
|
SET search_path TO 'test_auto_explain';
|
||||||
|
|
|
@ -476,13 +476,11 @@ CALL exec_query_and_check_query_counters($$
|
||||||
);
|
);
|
||||||
|
|
||||||
-- same with explain analyze
|
-- same with explain analyze
|
||||||
--
|
|
||||||
-- this time, query_execution_multi_shard is incremented twice because of #4212
|
|
||||||
CALL exec_query_and_check_query_counters($$
|
CALL exec_query_and_check_query_counters($$
|
||||||
EXPLAIN (ANALYZE)
|
EXPLAIN (ANALYZE)
|
||||||
SELECT * FROM (SELECT * FROM dist_table OFFSET 0) q
|
SELECT * FROM (SELECT * FROM dist_table OFFSET 0) q
|
||||||
$$,
|
$$,
|
||||||
1, 2
|
1, 1
|
||||||
);
|
);
|
||||||
|
|
||||||
CALL exec_query_and_check_query_counters($$
|
CALL exec_query_and_check_query_counters($$
|
||||||
|
@ -807,9 +805,6 @@ CALL exec_query_and_check_query_counters($$
|
||||||
-- A similar one but without the insert, so we would normally expect 2 increments
|
-- A similar one but without the insert, so we would normally expect 2 increments
|
||||||
-- for query_execution_single_shard and 2 for query_execution_multi_shard instead
|
-- for query_execution_single_shard and 2 for query_execution_multi_shard instead
|
||||||
-- of 3 since the insert is not there anymore.
|
-- of 3 since the insert is not there anymore.
|
||||||
--
|
|
||||||
-- But this time we observe more counter increments because we execute the subplans
|
|
||||||
-- twice because of #4212.
|
|
||||||
CALL exec_query_and_check_query_counters($$
|
CALL exec_query_and_check_query_counters($$
|
||||||
EXPLAIN (ANALYZE)
|
EXPLAIN (ANALYZE)
|
||||||
-- single-shard subplan (whole cte)
|
-- single-shard subplan (whole cte)
|
||||||
|
@ -823,7 +818,7 @@ CALL exec_query_and_check_query_counters($$
|
||||||
FROM (SELECT * FROM dist_table_1 ORDER BY a LIMIT 16) q -- multi-shard subplan (subquery q)
|
FROM (SELECT * FROM dist_table_1 ORDER BY a LIMIT 16) q -- multi-shard subplan (subquery q)
|
||||||
JOIN cte ON q.a = cte.a
|
JOIN cte ON q.a = cte.a
|
||||||
$$,
|
$$,
|
||||||
3, 4
|
2, 2
|
||||||
);
|
);
|
||||||
|
|
||||||
-- safe to push-down
|
-- safe to push-down
|
||||||
|
|
Loading…
Reference in New Issue