/*------------------------------------------------------------------------- * * multi_explain.c * Citus explain support. * * Copyright (c) Citus Data, Inc. *------------------------------------------------------------------------- */ #include "postgres.h" #include "fmgr.h" #include "libpq-fe.h" #include "miscadmin.h" #include "access/htup_details.h" #include "access/xact.h" #include "catalog/namespace.h" #include "catalog/pg_class.h" #include "catalog/pg_collation.h" #include "catalog/pg_type.h" #include "commands/copy.h" #include "commands/createas.h" #include "commands/dbcommands.h" #include "commands/explain.h" #include "commands/tablecmds.h" #include "executor/tstoreReceiver.h" #include "lib/stringinfo.h" #include "nodes/plannodes.h" #include "nodes/primnodes.h" #include "nodes/print.h" #include "optimizer/clauses.h" #include "optimizer/cost.h" #include "optimizer/planner.h" #include "parser/analyze.h" #include "portability/instr_time.h" #include "rewrite/rewriteHandler.h" #include "tcop/dest.h" #include "tcop/tcopprot.h" #include "tcop/utility.h" #include "utils/builtins.h" #include "utils/json.h" #include "utils/lsyscache.h" #include "utils/snapmgr.h" #include "pg_version_constants.h" #include "distributed/citus_depended_object.h" #include "distributed/citus_nodefuncs.h" #include "distributed/combine_query_planner.h" #include "distributed/commands/utility_hook.h" #include "distributed/connection_management.h" #include "distributed/deparse_shard_query.h" #include "distributed/distributed_planner.h" #include "distributed/executor_util.h" #include "distributed/insert_select_executor.h" #include "distributed/insert_select_planner.h" #include "distributed/jsonbutils.h" #include "distributed/listutils.h" #include "distributed/merge_planner.h" #include "distributed/multi_executor.h" #include "distributed/multi_explain.h" #include "distributed/multi_logical_optimizer.h" #include "distributed/multi_logical_planner.h" #include "distributed/multi_physical_planner.h" #include "distributed/multi_router_planner.h" #include "distributed/multi_server_executor.h" #include "distributed/placement_connection.h" #include "distributed/recursive_planning.h" #include "distributed/remote_commands.h" #include "distributed/tuple_destination.h" #include "distributed/tuplestore.h" #include "distributed/version_compat.h" #include "distributed/worker_protocol.h" /* Config variables that enable printing distributed query plans */ bool ExplainDistributedQueries = true; bool ExplainAllTasks = false; int ExplainAnalyzeSortMethod = EXPLAIN_ANALYZE_SORT_BY_TIME; /* * If enabled, EXPLAIN ANALYZE output & other statistics of last worker task * are saved in following variables. */ static char *SavedExplainPlan = NULL; static double SavedExecutionDurationMillisec = 0.0; /* struct to save explain flags */ typedef struct { bool verbose; bool costs; bool buffers; bool wal; bool timing; bool summary; #if PG_VERSION_NUM >= PG_VERSION_17 bool memory; ExplainSerializeOption serialize; #endif ExplainFormat format; } ExplainOptions; /* EXPLAIN flags of current distributed explain */ #if PG_VERSION_NUM >= PG_VERSION_17 static ExplainOptions CurrentDistributedQueryExplainOptions = { 0, 0, 0, 0, 0, 0, 0, EXPLAIN_SERIALIZE_NONE, EXPLAIN_FORMAT_TEXT }; #else static ExplainOptions CurrentDistributedQueryExplainOptions = { 0, 0, 0, 0, 0, 0, EXPLAIN_FORMAT_TEXT }; #endif /* Result for a single remote EXPLAIN command */ typedef struct RemoteExplainPlan { int placementIndex; List *explainOutputList; } RemoteExplainPlan; /* * ExplainAnalyzeDestination is internal representation of a TupleDestination * which collects EXPLAIN ANALYZE output after the main query is run. */ typedef struct ExplainAnalyzeDestination { TupleDestination pub; Task *originalTask; TupleDestination *originalTaskDestination; TupleDesc lastSavedExplainAnalyzeTupDesc; } ExplainAnalyzeDestination; #if PG_VERSION_NUM >= PG_VERSION_17 /* * Various places within need to convert bytes to kilobytes. Round these up * to the next whole kilobyte. * copied from explain.c */ #define BYTES_TO_KILOBYTES(b) (((b) + 1023) / 1024) /* copied from explain.c */ /* Instrumentation data for SERIALIZE option */ typedef struct SerializeMetrics { uint64 bytesSent; /* # of bytes serialized */ instr_time timeSpent; /* time spent serializing */ BufferUsage bufferUsage; /* buffers accessed during serialization */ } SerializeMetrics; /* copied from explain.c */ static bool peek_buffer_usage(ExplainState *es, const BufferUsage *usage); static void show_buffer_usage(ExplainState *es, const BufferUsage *usage); static void show_memory_counters(ExplainState *es, const MemoryContextCounters *mem_counters); static void ExplainIndentText(ExplainState *es); static void ExplainPrintSerialize(ExplainState *es, SerializeMetrics *metrics); static SerializeMetrics GetSerializationMetrics(DestReceiver *dest); /* * DestReceiver functions for SERIALIZE option * * A DestReceiver for query tuples, that serializes passed rows into RowData * messages while measuring the resources expended and total serialized size, * while never sending the data to the client. This allows measuring the * overhead of deTOASTing and datatype out/sendfuncs, which are not otherwise * exercisable without actually hitting the network. * * copied from explain.c */ typedef struct SerializeDestReceiver { DestReceiver pub; ExplainState *es; /* this EXPLAIN statement's ExplainState */ int8 format; /* text or binary, like pq wire protocol */ TupleDesc attrinfo; /* the output tuple desc */ int nattrs; /* current number of columns */ FmgrInfo *finfos; /* precomputed call info for output fns */ MemoryContext tmpcontext; /* per-row temporary memory context */ StringInfoData buf; /* buffer to hold the constructed message */ SerializeMetrics metrics; /* collected metrics */ } SerializeDestReceiver; #endif /* Explain functions for distributed queries */ static void ExplainSubPlans(DistributedPlan *distributedPlan, ExplainState *es, bool subPlansExecuted); static void ExplainJob(CitusScanState *scanState, Job *job, ExplainState *es, ParamListInfo params); static void ExplainMapMergeJob(MapMergeJob *mapMergeJob, ExplainState *es); static void ExplainTaskList(CitusScanState *scanState, List *taskList, ExplainState *es, ParamListInfo params); static RemoteExplainPlan * RemoteExplain(Task *task, ExplainState *es, ParamListInfo params); static RemoteExplainPlan * GetSavedRemoteExplain(Task *task, ExplainState *es); static RemoteExplainPlan * FetchRemoteExplainFromWorkers(Task *task, ExplainState *es, ParamListInfo params); static void ExplainTask(CitusScanState *scanState, Task *task, int placementIndex, List *explainOutputList, ExplainState *es); static void ExplainTaskPlacement(ShardPlacement *taskPlacement, List *explainOutputList, ExplainState *es); static StringInfo BuildRemoteExplainQuery(char *queryString, ExplainState *es); static const char * ExplainFormatStr(ExplainFormat format); #if PG_VERSION_NUM >= PG_VERSION_17 static const char * ExplainSerializeStr(ExplainSerializeOption serializeOption); #endif static void ExplainWorkerPlan(PlannedStmt *plannedStmt, DestReceiver *dest, ExplainState *es, const char *queryString, ParamListInfo params, QueryEnvironment *queryEnv, const instr_time *planduration, #if PG_VERSION_NUM >= PG_VERSION_17 const BufferUsage *bufusage, const MemoryContextCounters *mem_counters, #endif double *executionDurationMillisec); static ExplainFormat ExtractFieldExplainFormat(Datum jsonbDoc, const char *fieldName, ExplainFormat defaultValue); #if PG_VERSION_NUM >= PG_VERSION_17 static ExplainSerializeOption ExtractFieldExplainSerialize(Datum jsonbDoc, const char *fieldName, ExplainSerializeOption defaultValue); #endif static TupleDestination * CreateExplainAnlyzeDestination(Task *task, TupleDestination *taskDest); static void ExplainAnalyzeDestPutTuple(TupleDestination *self, Task *task, int placementIndex, int queryNumber, HeapTuple heapTuple, uint64 tupleLibpqSize); static TupleDesc ExplainAnalyzeDestTupleDescForQuery(TupleDestination *self, int queryNumber); static char * WrapQueryForExplainAnalyze(const char *queryString, TupleDesc tupleDesc, ParamListInfo params); static char * FetchPlanQueryForExplainAnalyze(const char *queryString, ParamListInfo params); static char * ParameterResolutionSubquery(ParamListInfo params); static List * SplitString(const char *str, char delimiter, int maxLength); /* Static Explain functions copied from explain.c */ static void ExplainOneQuery(Query *query, int cursorOptions, IntoClause *into, ExplainState *es, const char *queryString, ParamListInfo params, QueryEnvironment *queryEnv); static double elapsed_time(instr_time *starttime); static void ExplainPropertyBytes(const char *qlabel, int64 bytes, ExplainState *es); static uint64 TaskReceivedTupleData(Task *task); static bool ShowReceivedTupleData(CitusScanState *scanState, ExplainState *es); /* exports for SQL callable functions */ PG_FUNCTION_INFO_V1(worker_last_saved_explain_analyze); PG_FUNCTION_INFO_V1(worker_save_query_explain_analyze); /* * CitusExplainScan is a custom scan explain callback function which is used to * print explain information of a Citus plan which includes both combine query and * distributed plan. */ void CitusExplainScan(CustomScanState *node, List *ancestors, struct ExplainState *es) { #if PG_VERSION_NUM >= PG_VERSION_16 if (es->generic) { ereport(ERROR, (errmsg( "EXPLAIN GENERIC_PLAN is currently not supported for Citus tables"))); } #endif CitusScanState *scanState = (CitusScanState *) node; DistributedPlan *distributedPlan = scanState->distributedPlan; EState *executorState = ScanStateGetExecutorState(scanState); ParamListInfo params = executorState->es_param_list_info; if (!ExplainDistributedQueries) { ExplainPropertyBool("citus.explain_distributed_queries", false, es); return; } ExplainOpenGroup("Distributed Query", "Distributed Query", true, es); /* * ExplainOnePlan function of postgres might be called in this codepath. * It requires an ActiveSnapshot being set. Make sure to make ActiveSnapshot available before calling into * Citus Explain functions. */ PushActiveSnapshot(executorState->es_snapshot); if (distributedPlan->subPlanList != NIL) { ExplainSubPlans(distributedPlan, es, scanState->finishedPreScan); } ExplainJob(scanState, distributedPlan->workerJob, es, params); PopActiveSnapshot(); ExplainCloseGroup("Distributed Query", "Distributed Query", true, es); } /* * NonPushableInsertSelectExplainScan is a custom scan explain callback function * which is used to print explain information of a Citus plan for an INSERT INTO * distributed_table SELECT ... query that is evaluated on the coordinator or * uses repartitioning. */ void NonPushableInsertSelectExplainScan(CustomScanState *node, List *ancestors, struct ExplainState *es) { CitusScanState *scanState = (CitusScanState *) node; DistributedPlan *distributedPlan = scanState->distributedPlan; Query *insertSelectQuery = distributedPlan->modifyQueryViaCoordinatorOrRepartition; RangeTblEntry *selectRte = ExtractSelectRangeTableEntry(insertSelectQuery); /* * Create a copy because ExplainOneQuery can modify the query, and later * executions of prepared statements might require it. See * https://github.com/citusdata/citus/issues/3947 for what can happen. */ Query *queryCopy = copyObject(selectRte->subquery); bool repartition = distributedPlan->modifyWithSelectMethod == MODIFY_WITH_SELECT_REPARTITION; if (es->analyze) { ereport(ERROR, (errmsg("EXPLAIN ANALYZE is currently not supported for INSERT " "... SELECT commands %s", repartition ? "with repartitioning" : "via coordinator"))); } if (repartition) { ExplainPropertyText("INSERT/SELECT method", "repartition", es); } else { ExplainPropertyText("INSERT/SELECT method", "pull to coordinator", es); } ExplainOpenGroup("Select Query", "Select Query", false, es); /* explain the inner SELECT query */ IntoClause *into = NULL; ParamListInfo params = NULL; /* * With PG14, we need to provide a string here, * for now we put an empty string, which is valid according to postgres. */ char *queryString = pstrdup(""); ExplainOneQuery(queryCopy, 0, into, es, queryString, params, NULL); ExplainCloseGroup("Select Query", "Select Query", false, es); } /* * NonPushableMergeSqlExplainScan is a custom scan explain callback function * which is used to print explain information of a Citus plan for MERGE INTO * distributed_table USING (source query/table), where source can be any query * whose results are repartitioned to colocated with the target table. */ void NonPushableMergeCommandExplainScan(CustomScanState *node, List *ancestors, struct ExplainState *es) { CitusScanState *scanState = (CitusScanState *) node; DistributedPlan *distributedPlan = scanState->distributedPlan; Query *mergeQuery = distributedPlan->modifyQueryViaCoordinatorOrRepartition; RangeTblEntry *sourceRte = ExtractMergeSourceRangeTableEntry(mergeQuery, false); /* * Create a copy because ExplainOneQuery can modify the query, and later * executions of prepared statements might require it. See * https://github.com/citusdata/citus/issues/3947 for what can happen. */ Query *sourceQueryCopy = copyObject(sourceRte->subquery); bool repartition = distributedPlan->modifyWithSelectMethod == MODIFY_WITH_SELECT_REPARTITION; if (es->analyze) { ereport(ERROR, (errmsg("EXPLAIN ANALYZE is currently not supported for " "MERGE INTO ... commands with repartitioning"))); } Oid targetRelationId = ModifyQueryResultRelationId(mergeQuery); StringInfo mergeMethodMessage = makeStringInfo(); appendStringInfo(mergeMethodMessage, "MERGE INTO %s method", get_rel_name(targetRelationId)); if (repartition) { ExplainPropertyText(mergeMethodMessage->data, "repartition", es); } else { ExplainPropertyText(mergeMethodMessage->data, "pull to coordinator", es); } ExplainOpenGroup("Source Query", "Source Query", false, es); /* explain the MERGE source query */ IntoClause *into = NULL; ParamListInfo params = NULL; /* * With PG14, we need to provide a string here, for now we put an empty * string, which is valid according to postgres. */ char *queryString = pstrdup(""); ExplainOneQuery(sourceQueryCopy, 0, into, es, queryString, params, NULL); ExplainCloseGroup("Source Query", "Source Query", false, es); } /* * ExplainSubPlans generates EXPLAIN output for subplans for CTEs * and complex subqueries. Because the planning for these queries * is done along with the top-level plan, we cannot determine the * planning time and set it to 0. */ static void ExplainSubPlans(DistributedPlan *distributedPlan, ExplainState *es, bool subPlansExecuted) { ListCell *subPlanCell = NULL; uint64 planId = distributedPlan->planId; bool analyzeEnabled = es->analyze; bool timingEnabled = es->timing; bool walEnabled = es->wal; ExplainOpenGroup("Subplans", "Subplans", false, es); if (subPlansExecuted) { /* * Subplans are already executed recursively when * executing the top-level of the plan. Here, we just * need to explain them but not execute them again. */ es->analyze = false; es->timing = false; es->wal = false; } foreach(subPlanCell, distributedPlan->subPlanList) { DistributedSubPlan *subPlan = (DistributedSubPlan *) lfirst(subPlanCell); PlannedStmt *plan = subPlan->plan; IntoClause *into = NULL; ParamListInfo params = NULL; /* * With PG14, we need to provide a string here, * for now we put an empty string, which is valid according to postgres. */ char *queryString = pstrdup(""); instr_time planduration; BufferUsage bufusage_start, bufusage; #if PG_VERSION_NUM >= PG_VERSION_17 MemoryContextCounters mem_counters; MemoryContext planner_ctx = NULL; MemoryContext saved_ctx = NULL; if (es->memory) { /* copy paste from postgres code */ planner_ctx = AllocSetContextCreate(CurrentMemoryContext, "explain analyze planner context", ALLOCSET_DEFAULT_SIZES); saved_ctx = MemoryContextSwitchTo(planner_ctx); } #endif if (es->buffers) { bufusage_start = pgBufferUsage; } if (es->format == EXPLAIN_FORMAT_TEXT) { char *resultId = GenerateResultId(planId, subPlan->subPlanId); appendStringInfoSpaces(es->str, es->indent * 2); appendStringInfo(es->str, "-> Distributed Subplan %s\n", resultId); es->indent += 3; } ExplainOpenGroup("Subplan", NULL, true, es); if (analyzeEnabled) { if (timingEnabled) { ExplainPropertyFloat("Subplan Duration", "ms", subPlan->durationMillisecs, 2, es); } ExplainPropertyBytes("Intermediate Data Size", subPlan->bytesSentPerWorker, es); StringInfo destination = makeStringInfo(); if (subPlan->remoteWorkerCount && subPlan->writeLocalFile) { appendStringInfo(destination, "Send to %d nodes, write locally", subPlan->remoteWorkerCount); } else if (subPlan->writeLocalFile) { appendStringInfoString(destination, "Write locally"); } else { appendStringInfo(destination, "Send to %d nodes", subPlan->remoteWorkerCount); } ExplainPropertyText("Result destination", destination->data, es); } INSTR_TIME_SET_ZERO(planduration); /* calc differences of buffer counters. */ if (es->buffers) { memset(&bufusage, 0, sizeof(BufferUsage)); BufferUsageAccumDiff(&bufusage, &pgBufferUsage, &bufusage_start); } ExplainOpenGroup("PlannedStmt", "PlannedStmt", false, es); #if PG_VERSION_NUM >= PG_VERSION_17 if (es->memory) { MemoryContextSwitchTo(saved_ctx); MemoryContextMemConsumed(planner_ctx, &mem_counters); } ExplainOnePlan(plan, into, es, queryString, params, NULL, &planduration, (es->buffers ? &bufusage : NULL), (es->memory ? &mem_counters : NULL)); #else ExplainOnePlan(plan, into, es, queryString, params, NULL, &planduration, (es->buffers ? &bufusage : NULL)); #endif ExplainCloseGroup("PlannedStmt", "PlannedStmt", false, es); ExplainCloseGroup("Subplan", NULL, true, es); if (es->format == EXPLAIN_FORMAT_TEXT) { es->indent -= 3; } } /* Restore the settings */ if (subPlansExecuted) { es->analyze = analyzeEnabled; es->timing = timingEnabled; es->wal = walEnabled; } ExplainCloseGroup("Subplans", "Subplans", false, es); } /* * ExplainPropertyBytes formats bytes in a human readable way by using * pg_size_pretty. */ static void ExplainPropertyBytes(const char *qlabel, int64 bytes, ExplainState *es) { Datum textDatum = DirectFunctionCall1(pg_size_pretty, Int64GetDatum(bytes)); ExplainPropertyText(qlabel, TextDatumGetCString(textDatum), es); } /* * ShowReceivedTupleData returns true if explain should show received data. * This is only the case when using EXPLAIN ANALYZE on queries that return * rows. */ static bool ShowReceivedTupleData(CitusScanState *scanState, ExplainState *es) { TupleDesc tupDesc = ScanStateGetTupleDescriptor(scanState); return es->analyze && tupDesc != NULL && tupDesc->natts > 0; } /* * ExplainJob shows the EXPLAIN output for a Job in the physical plan of * a distributed query by showing the remote EXPLAIN for the first task, * or all tasks if citus.explain_all_tasks is on. */ static void ExplainJob(CitusScanState *scanState, Job *job, ExplainState *es, ParamListInfo params) { List *dependentJobList = job->dependentJobList; int dependentJobCount = list_length(dependentJobList); ListCell *dependentJobCell = NULL; List *taskList = job->taskList; int taskCount = list_length(taskList); ExplainOpenGroup("Job", "Job", true, es); ExplainPropertyInteger("Task Count", NULL, taskCount, es); if (ShowReceivedTupleData(scanState, es) || job->jobExecuted) { Task *task = NULL; uint64 totalReceivedTupleDataForAllTasks = 0; foreach_declared_ptr(task, taskList) { totalReceivedTupleDataForAllTasks += TaskReceivedTupleData(task); } ExplainPropertyBytes("Tuple data received from nodes", totalReceivedTupleDataForAllTasks, es); } if (dependentJobCount > 0) { ExplainPropertyText("Tasks Shown", "None, not supported for re-partition " "queries", es); } else if (ExplainAllTasks || taskCount <= 1) { ExplainPropertyText("Tasks Shown", "All", es); } else { StringInfo tasksShownText = makeStringInfo(); appendStringInfo(tasksShownText, "One of %d", taskCount); ExplainPropertyText("Tasks Shown", tasksShownText->data, es); } /* * We cannot fetch EXPLAIN plans for jobs that have dependencies, since the * intermediate tables have not been created. */ if (dependentJobCount == 0) { ExplainOpenGroup("Tasks", "Tasks", false, es); ExplainTaskList(scanState, taskList, es, params); ExplainCloseGroup("Tasks", "Tasks", false, es); } else { ExplainOpenGroup("Dependent Jobs", "Dependent Jobs", false, es); /* show explain output for dependent jobs, if any */ foreach(dependentJobCell, dependentJobList) { Job *dependentJob = (Job *) lfirst(dependentJobCell); if (CitusIsA(dependentJob, MapMergeJob)) { ExplainMapMergeJob((MapMergeJob *) dependentJob, es); } } ExplainCloseGroup("Dependent Jobs", "Dependent Jobs", false, es); } ExplainCloseGroup("Job", "Job", true, es); } /* * TaskReceivedTupleData returns the amount of data that was received by the * coordinator for the task. If it's a RETURNING DML task the value stored in * totalReceivedTupleData is not correct yet because it only counts the bytes for * one placement. */ static uint64 TaskReceivedTupleData(Task *task) { if (task->taskType == MODIFY_TASK) { return task->totalReceivedTupleData * list_length(task->taskPlacementList); } return task->totalReceivedTupleData; } /* * ExplainMapMergeJob shows a very basic EXPLAIN plan for a MapMergeJob. It does * not yet show the EXPLAIN plan for the individual tasks, because this requires * specific logic for getting the query (which is wrapped in a UDF), and the * queries may use intermediate tables that have not been created. */ static void ExplainMapMergeJob(MapMergeJob *mapMergeJob, ExplainState *es) { List *dependentJobList = mapMergeJob->job.dependentJobList; int dependentJobCount = list_length(dependentJobList); ListCell *dependentJobCell = NULL; int mapTaskCount = list_length(mapMergeJob->mapTaskList); int mergeTaskCount = list_length(mapMergeJob->mergeTaskList); if (es->format == EXPLAIN_FORMAT_TEXT) { appendStringInfoSpaces(es->str, es->indent * 2); appendStringInfo(es->str, "-> MapMergeJob\n"); es->indent += 3; } ExplainOpenGroup("MapMergeJob", NULL, true, es); ExplainPropertyInteger("Map Task Count", NULL, mapTaskCount, es); ExplainPropertyInteger("Merge Task Count", NULL, mergeTaskCount, es); if (dependentJobCount > 0) { ExplainOpenGroup("Dependent Jobs", "Dependent Jobs", false, es); foreach(dependentJobCell, dependentJobList) { Job *dependentJob = (Job *) lfirst(dependentJobCell); if (CitusIsA(dependentJob, MapMergeJob)) { ExplainMapMergeJob((MapMergeJob *) dependentJob, es); } } ExplainCloseGroup("Dependent Jobs", "Dependent Jobs", false, es); } ExplainCloseGroup("MapMergeJob", NULL, true, es); if (es->format == EXPLAIN_FORMAT_TEXT) { es->indent -= 3; } } /* * CompareTasksByFetchedExplainAnalyzeDuration is a helper function to compare two tasks by their execution duration. */ static int CompareTasksByFetchedExplainAnalyzeDuration(const void *leftElement, const void *rightElement) { const Task *leftTask = *((const Task **) leftElement); const Task *rightTask = *((const Task **) rightElement); double leftTaskExecutionDuration = leftTask->fetchedExplainAnalyzeExecutionDuration; double rightTaskExecutionDuration = rightTask->fetchedExplainAnalyzeExecutionDuration; double diff = leftTaskExecutionDuration - rightTaskExecutionDuration; if (diff > 0) { return -1; } else if (diff < 0) { return 1; } return 0; } /* * ExplainTaskList shows the remote EXPLAIN and execution time for the first task * in taskList, or all tasks if citus.explain_all_tasks is on. */ static void ExplainTaskList(CitusScanState *scanState, List *taskList, ExplainState *es, ParamListInfo params) { List *remoteExplainList = NIL; /* if tasks are executed, we sort them by time; unless we are on a test env */ if (es->analyze && ExplainAnalyzeSortMethod == EXPLAIN_ANALYZE_SORT_BY_TIME) { /* sort by execution duration only in case of ANALYZE */ taskList = SortList(taskList, CompareTasksByFetchedExplainAnalyzeDuration); } else { /* make sure that the output is consistent */ taskList = SortList(taskList, CompareTasksByTaskId); } Task *task = NULL; foreach_declared_ptr(task, taskList) { RemoteExplainPlan *remoteExplain = RemoteExplain(task, es, params); remoteExplainList = lappend(remoteExplainList, remoteExplain); if (!ExplainAllTasks) { break; } } RemoteExplainPlan *remoteExplain = NULL; forboth_ptr(task, taskList, remoteExplain, remoteExplainList) { ExplainTask(scanState, task, remoteExplain->placementIndex, remoteExplain->explainOutputList, es); } } /* * RemoteExplain fetches the remote EXPLAIN output for a single task. */ static RemoteExplainPlan * RemoteExplain(Task *task, ExplainState *es, ParamListInfo params) { /* * For EXPLAIN EXECUTE we still use the old method, so task->fetchedExplainAnalyzePlan * can be NULL for some cases of es->analyze == true. */ if (es->analyze && task->fetchedExplainAnalyzePlan) { return GetSavedRemoteExplain(task, es); } else { return FetchRemoteExplainFromWorkers(task, es, params); } } /* * GetSavedRemoteExplain creates a remote EXPLAIN output from information saved * in task. */ static RemoteExplainPlan * GetSavedRemoteExplain(Task *task, ExplainState *es) { RemoteExplainPlan *remotePlan = (RemoteExplainPlan *) palloc0( sizeof(RemoteExplainPlan)); /* * Similar to postgres' ExplainQuery(), we split by newline only for * text format. */ if (es->format == EXPLAIN_FORMAT_TEXT) { /* * We limit the size of EXPLAIN plans to RSIZE_MAX_MEM (256MB). */ remotePlan->explainOutputList = SplitString(task->fetchedExplainAnalyzePlan, '\n', RSIZE_MAX_MEM); } else { StringInfo explainAnalyzeString = makeStringInfo(); appendStringInfoString(explainAnalyzeString, task->fetchedExplainAnalyzePlan); remotePlan->explainOutputList = list_make1(explainAnalyzeString); } remotePlan->placementIndex = task->fetchedExplainAnalyzePlacementIndex; return remotePlan; } /* * FetchRemoteExplainFromWorkers fetches the remote EXPLAIN output for a single * task by querying it from worker nodes. It tries each shard placement until * one succeeds or all failed. */ static RemoteExplainPlan * FetchRemoteExplainFromWorkers(Task *task, ExplainState *es, ParamListInfo params) { List *taskPlacementList = task->taskPlacementList; int placementCount = list_length(taskPlacementList); RemoteExplainPlan *remotePlan = (RemoteExplainPlan *) palloc0( sizeof(RemoteExplainPlan)); StringInfo explainQuery = BuildRemoteExplainQuery(TaskQueryString(task), es); /* * Use a coordinated transaction to ensure that we open a transaction block * such that we can set a savepoint. */ UseCoordinatedTransaction(); for (int placementIndex = 0; placementIndex < placementCount; placementIndex++) { ShardPlacement *taskPlacement = list_nth(taskPlacementList, placementIndex); int connectionFlags = 0; remotePlan->placementIndex = placementIndex; MultiConnection *connection = GetPlacementConnection(connectionFlags, taskPlacement, NULL); /* * This code-path doesn't support optional connections, so we don't expect * NULL connections. */ Assert(connection != NULL); /* try other placements if we fail to connect this one */ if (PQstatus(connection->pgConn) != CONNECTION_OK) { continue; } RemoteTransactionBeginIfNecessary(connection); /* * Start a savepoint for the explain query. After running the explain * query, we will rollback to this savepoint. This saves us from side * effects of EXPLAIN ANALYZE on DML queries. */ ExecuteCriticalRemoteCommand(connection, "SAVEPOINT citus_explain_savepoint"); /* run explain query */ int numParams = params ? params->numParams : 0; Oid *paramTypes = NULL; const char **paramValues = NULL; PGresult *queryResult = NULL; if (params) { ExtractParametersFromParamList(params, ¶mTypes, ¶mValues, false); } int sendStatus = SendRemoteCommandParams(connection, explainQuery->data, numParams, paramTypes, paramValues, false); if (sendStatus != 0) { queryResult = GetRemoteCommandResult(connection, false); if (!IsResponseOK(queryResult)) { PQclear(queryResult); ForgetResults(connection); continue; } } /* read explain query results */ remotePlan->explainOutputList = ReadFirstColumnAsText(queryResult); PQclear(queryResult); ForgetResults(connection); /* rollback to the savepoint */ ExecuteCriticalRemoteCommand(connection, "ROLLBACK TO SAVEPOINT citus_explain_savepoint"); if (remotePlan->explainOutputList != NIL) { break; } } return remotePlan; } /* * ExplainTask shows the EXPLAIN output for an single task. The output has been * fetched from the placement at index placementIndex. If explainOutputList is NIL, * then the EXPLAIN output could not be fetched from any placement. */ static void ExplainTask(CitusScanState *scanState, Task *task, int placementIndex, List *explainOutputList, ExplainState *es) { ExplainOpenGroup("Task", NULL, true, es); if (es->format == EXPLAIN_FORMAT_TEXT) { appendStringInfoSpaces(es->str, es->indent * 2); appendStringInfo(es->str, "-> Task\n"); es->indent += 3; } if (es->verbose) { const char *queryText = TaskQueryString(task); ExplainPropertyText("Query", queryText, es); } if (ShowReceivedTupleData(scanState, es)) { ExplainPropertyBytes("Tuple data received from node", TaskReceivedTupleData(task), es); } if (explainOutputList != NIL) { List *taskPlacementList = task->taskPlacementList; ShardPlacement *taskPlacement = list_nth(taskPlacementList, placementIndex); ExplainTaskPlacement(taskPlacement, explainOutputList, es); } else { ExplainPropertyText("Error", "Could not get remote plan.", es); } ExplainCloseGroup("Task", NULL, true, es); if (es->format == EXPLAIN_FORMAT_TEXT) { es->indent -= 3; } } /* * ExplainTaskPlacement shows the EXPLAIN output for an individual task placement. * It corrects the indentation of the remote explain output to match the local * output. */ static void ExplainTaskPlacement(ShardPlacement *taskPlacement, List *explainOutputList, ExplainState *es) { int savedIndentation = es->indent; StringInfo nodeAddress = makeStringInfo(); char *nodeName = taskPlacement->nodeName; uint32 nodePort = taskPlacement->nodePort; const char *nodeDatabase = CurrentDatabaseName(); ListCell *explainOutputCell = NULL; int rowIndex = 0; appendStringInfo(nodeAddress, "host=%s port=%d dbname=%s", nodeName, nodePort, nodeDatabase); ExplainPropertyText("Node", nodeAddress->data, es); ExplainOpenGroup("Remote Plan", "Remote Plan", false, es); if (es->format == EXPLAIN_FORMAT_JSON || es->format == EXPLAIN_FORMAT_YAML) { /* prevent appending the remote EXPLAIN on the same line */ appendStringInfoChar(es->str, '\n'); } foreach(explainOutputCell, explainOutputList) { StringInfo rowString = (StringInfo) lfirst(explainOutputCell); int rowLength = strlen(rowString->data); char *lineStart = rowString->data; /* parse the lines in the remote EXPLAIN for proper indentation */ while (lineStart < rowString->data + rowLength) { /* find the end-of-line */ char *lineEnd = strchr(lineStart, '\n'); if (lineEnd == NULL) { /* no end-of-line, use end of row string instead */ lineEnd = rowString->data + rowLength; } /* convert line to a separate string */ *lineEnd = '\0'; /* indentation that is applied to all lines */ appendStringInfoSpaces(es->str, es->indent * 2); if (es->format == EXPLAIN_FORMAT_TEXT && rowIndex == 0) { /* indent the first line of the remote plan with an arrow */ appendStringInfoString(es->str, "-> "); es->indent += 2; } /* show line in the output */ appendStringInfo(es->str, "%s\n", lineStart); /* continue at the start of the next line */ lineStart = lineEnd + 1; } rowIndex++; } ExplainCloseGroup("Remote Plan", "Remote Plan", false, es); if (es->format == EXPLAIN_FORMAT_TEXT) { es->indent = savedIndentation; } } /* * BuildRemoteExplainQuery returns an EXPLAIN query string * to run on a worker node which explicitly contains all * the options in the explain state. */ static StringInfo BuildRemoteExplainQuery(char *queryString, ExplainState *es) { StringInfo explainQuery = makeStringInfo(); const char *formatStr = ExplainFormatStr(es->format); #if PG_VERSION_NUM >= PG_VERSION_17 const char *serializeStr = ExplainSerializeStr(es->serialize); #endif appendStringInfo(explainQuery, "EXPLAIN (ANALYZE %s, VERBOSE %s, " "COSTS %s, BUFFERS %s, WAL %s, " "TIMING %s, SUMMARY %s, " #if PG_VERSION_NUM >= PG_VERSION_17 "MEMORY %s, SERIALIZE %s, " #endif "FORMAT %s) %s", es->analyze ? "TRUE" : "FALSE", es->verbose ? "TRUE" : "FALSE", es->costs ? "TRUE" : "FALSE", es->buffers ? "TRUE" : "FALSE", es->wal ? "TRUE" : "FALSE", es->timing ? "TRUE" : "FALSE", es->summary ? "TRUE" : "FALSE", #if PG_VERSION_NUM >= PG_VERSION_17 es->memory ? "TRUE" : "FALSE", serializeStr, #endif formatStr, queryString); return explainQuery; } /* * ExplainFormatStr converts the given explain format to string. */ static const char * ExplainFormatStr(ExplainFormat format) { switch (format) { case EXPLAIN_FORMAT_XML: { return "XML"; } case EXPLAIN_FORMAT_JSON: { return "JSON"; } case EXPLAIN_FORMAT_YAML: { return "YAML"; } default: { return "TEXT"; } } } #if PG_VERSION_NUM >= PG_VERSION_17 /* * ExplainSerializeStr converts the given explain serialize option to string. */ static const char * ExplainSerializeStr(ExplainSerializeOption serializeOption) { switch (serializeOption) { case EXPLAIN_SERIALIZE_NONE: { return "none"; } case EXPLAIN_SERIALIZE_TEXT: { return "text"; } case EXPLAIN_SERIALIZE_BINARY: { return "binary"; } default: { return "none"; } } } #endif /* * worker_last_saved_explain_analyze returns the last saved EXPLAIN ANALYZE output of * a worker task query. It returns NULL if nothing has been saved yet. */ Datum worker_last_saved_explain_analyze(PG_FUNCTION_ARGS) { CheckCitusVersion(ERROR); TupleDesc tupleDescriptor = NULL; Tuplestorestate *tupleStore = SetupTuplestore(fcinfo, &tupleDescriptor); if (SavedExplainPlan != NULL) { int columnCount = tupleDescriptor->natts; if (columnCount != 2) { ereport(ERROR, (errmsg("expected 3 output columns in definition of " "worker_last_saved_explain_analyze, but got %d", columnCount))); } bool columnNulls[2] = { false }; Datum columnValues[2] = { CStringGetTextDatum(SavedExplainPlan), Float8GetDatum(SavedExecutionDurationMillisec) }; tuplestore_putvalues(tupleStore, tupleDescriptor, columnValues, columnNulls); } PG_RETURN_DATUM(0); } /* * worker_save_query_explain_analyze executes and returns results of query while * saving its EXPLAIN ANALYZE to be fetched later. */ Datum worker_save_query_explain_analyze(PG_FUNCTION_ARGS) { CheckCitusVersion(ERROR); text *queryText = PG_GETARG_TEXT_P(0); char *queryString = text_to_cstring(queryText); double executionDurationMillisec = 0.0; Datum explainOptions = PG_GETARG_DATUM(1); ExplainState *es = NewExplainState(); es->analyze = true; /* use the same defaults as NewExplainState() for following options */ es->buffers = ExtractFieldBoolean(explainOptions, "buffers", es->buffers); es->wal = ExtractFieldBoolean(explainOptions, "wal", es->wal); es->costs = ExtractFieldBoolean(explainOptions, "costs", es->costs); es->summary = ExtractFieldBoolean(explainOptions, "summary", es->summary); es->verbose = ExtractFieldBoolean(explainOptions, "verbose", es->verbose); es->timing = ExtractFieldBoolean(explainOptions, "timing", es->timing); es->format = ExtractFieldExplainFormat(explainOptions, "format", es->format); #if PG_VERSION_NUM >= PG_VERSION_17 es->memory = ExtractFieldBoolean(explainOptions, "memory", es->memory); es->serialize = ExtractFieldExplainSerialize(explainOptions, "serialize", es->serialize); #endif TupleDesc tupleDescriptor = NULL; Tuplestorestate *tupleStore = SetupTuplestore(fcinfo, &tupleDescriptor); DestReceiver *tupleStoreDest = CreateTuplestoreDestReceiver(); SetTuplestoreDestReceiverParams(tupleStoreDest, tupleStore, CurrentMemoryContext, false, NULL, NULL); List *parseTreeList = pg_parse_query(queryString); if (list_length(parseTreeList) != 1) { ereport(ERROR, (errmsg("cannot EXPLAIN ANALYZE multiple queries"))); } RawStmt *parseTree = linitial(parseTreeList); ParamListInfo boundParams = ExecutorBoundParams(); int numParams = boundParams ? boundParams->numParams : 0; Oid *paramTypes = NULL; const char **paramValues = NULL; if (boundParams != NULL) { ExtractParametersFromParamList(boundParams, ¶mTypes, ¶mValues, false); } /* resolve OIDs of unknown (user-defined) types */ Query *analyzedQuery = parse_analyze_varparams(parseTree, queryString, ¶mTypes, &numParams, NULL); /* pg_rewrite_query is a wrapper around QueryRewrite with some debugging logic */ List *queryList = pg_rewrite_query(analyzedQuery); if (list_length(queryList) != 1) { ereport(ERROR, (errmsg("cannot EXPLAIN ANALYZE a query rewritten " "into multiple queries"))); } Query *query = linitial(queryList); ExplainBeginOutput(es); /* plan query and record planning stats */ instr_time planStart; instr_time planDuration; #if PG_VERSION_NUM >= PG_VERSION_17 BufferUsage bufusage_start, bufusage; MemoryContextCounters mem_counters; MemoryContext planner_ctx = NULL; MemoryContext saved_ctx = NULL; if (es->memory) { /* * Create a new memory context to measure planner's memory consumption * accurately. Note that if the planner were to be modified to use a * different memory context type, here we would be changing that to * AllocSet, which might be undesirable. However, we don't have a way * to create a context of the same type as another, so we pray and * hope that this is OK. * * copied from explain.c */ planner_ctx = AllocSetContextCreate(CurrentMemoryContext, "explain analyze planner context", ALLOCSET_DEFAULT_SIZES); saved_ctx = MemoryContextSwitchTo(planner_ctx); } if (es->buffers) { bufusage_start = pgBufferUsage; } #endif INSTR_TIME_SET_CURRENT(planStart); PlannedStmt *plan = pg_plan_query(query, NULL, CURSOR_OPT_PARALLEL_OK, NULL); INSTR_TIME_SET_CURRENT(planDuration); INSTR_TIME_SUBTRACT(planDuration, planStart); #if PG_VERSION_NUM >= PG_VERSION_17 if (es->memory) { MemoryContextSwitchTo(saved_ctx); MemoryContextMemConsumed(planner_ctx, &mem_counters); } /* calc differences of buffer counters. */ if (es->buffers) { memset(&bufusage, 0, sizeof(BufferUsage)); BufferUsageAccumDiff(&bufusage, &pgBufferUsage, &bufusage_start); } /* do the actual EXPLAIN ANALYZE */ ExplainWorkerPlan(plan, tupleStoreDest, es, queryString, boundParams, NULL, &planDuration, (es->buffers ? &bufusage : NULL), (es->memory ? &mem_counters : NULL), &executionDurationMillisec); #else /* do the actual EXPLAIN ANALYZE */ ExplainWorkerPlan(plan, tupleStoreDest, es, queryString, boundParams, NULL, &planDuration, &executionDurationMillisec); #endif ExplainEndOutput(es); /* save EXPLAIN ANALYZE result to be fetched later */ MemoryContext oldContext = MemoryContextSwitchTo(TopTransactionContext); FreeSavedExplainPlan(); SavedExplainPlan = pstrdup(es->str->data); SavedExecutionDurationMillisec = executionDurationMillisec; MemoryContextSwitchTo(oldContext); PG_RETURN_DATUM(0); } /* * FreeSavedExplainPlan frees allocated saved explain plan if any. */ void FreeSavedExplainPlan(void) { if (SavedExplainPlan) { pfree(SavedExplainPlan); SavedExplainPlan = NULL; } } /* * ExtractFieldExplainFormat gets value of fieldName from jsonbDoc, or returns * defaultValue if it doesn't exist. */ static ExplainFormat ExtractFieldExplainFormat(Datum jsonbDoc, const char *fieldName, ExplainFormat defaultValue) { Datum jsonbDatum = 0; bool found = ExtractFieldJsonbDatum(jsonbDoc, fieldName, &jsonbDatum); if (!found) { return defaultValue; } const char *formatStr = DatumGetCString(DirectFunctionCall1(jsonb_out, jsonbDatum)); if (pg_strcasecmp(formatStr, "\"text\"") == 0) { return EXPLAIN_FORMAT_TEXT; } else if (pg_strcasecmp(formatStr, "\"xml\"") == 0) { return EXPLAIN_FORMAT_XML; } else if (pg_strcasecmp(formatStr, "\"yaml\"") == 0) { return EXPLAIN_FORMAT_YAML; } else if (pg_strcasecmp(formatStr, "\"json\"") == 0) { return EXPLAIN_FORMAT_JSON; } ereport(ERROR, (errmsg("Invalid explain analyze format: %s", formatStr))); return 0; } #if PG_VERSION_NUM >= PG_VERSION_17 /* * ExtractFieldExplainSerialize gets value of fieldName from jsonbDoc, or returns * defaultValue if it doesn't exist. */ static ExplainSerializeOption ExtractFieldExplainSerialize(Datum jsonbDoc, const char *fieldName, ExplainSerializeOption defaultValue) { Datum jsonbDatum = 0; bool found = ExtractFieldJsonbDatum(jsonbDoc, fieldName, &jsonbDatum); if (!found) { return defaultValue; } const char *serializeStr = DatumGetCString(DirectFunctionCall1(jsonb_out, jsonbDatum)); if (pg_strcasecmp(serializeStr, "\"none\"") == 0) { return EXPLAIN_SERIALIZE_NONE; } else if (pg_strcasecmp(serializeStr, "\"off\"") == 0) { return EXPLAIN_SERIALIZE_NONE; } else if (pg_strcasecmp(serializeStr, "\"text\"") == 0) { return EXPLAIN_SERIALIZE_TEXT; } else if (pg_strcasecmp(serializeStr, "\"binary\"") == 0) { return EXPLAIN_SERIALIZE_BINARY; } ereport(ERROR, (errmsg("Invalid explain analyze serialize: %s", serializeStr))); return 0; } #endif /* * CitusExplainOneQuery is the executor hook that is called when * postgres wants to explain a query. */ void CitusExplainOneQuery(Query *query, int cursorOptions, IntoClause *into, ExplainState *es, const char *queryString, ParamListInfo params, QueryEnvironment *queryEnv) { /* save the flags of current EXPLAIN command */ CurrentDistributedQueryExplainOptions.costs = es->costs; CurrentDistributedQueryExplainOptions.buffers = es->buffers; CurrentDistributedQueryExplainOptions.wal = es->wal; CurrentDistributedQueryExplainOptions.verbose = es->verbose; CurrentDistributedQueryExplainOptions.summary = es->summary; CurrentDistributedQueryExplainOptions.timing = es->timing; CurrentDistributedQueryExplainOptions.format = es->format; #if PG_VERSION_NUM >= PG_VERSION_17 CurrentDistributedQueryExplainOptions.memory = es->memory; CurrentDistributedQueryExplainOptions.serialize = es->serialize; #endif /* rest is copied from ExplainOneQuery() */ instr_time planstart, planduration; BufferUsage bufusage_start, bufusage; #if PG_VERSION_NUM >= PG_VERSION_17 MemoryContextCounters mem_counters; MemoryContext planner_ctx = NULL; MemoryContext saved_ctx = NULL; if (es->memory) { /* copy paste from postgres code */ planner_ctx = AllocSetContextCreate(CurrentMemoryContext, "explain analyze planner context", ALLOCSET_DEFAULT_SIZES); saved_ctx = MemoryContextSwitchTo(planner_ctx); } #endif if (es->buffers) { bufusage_start = pgBufferUsage; } INSTR_TIME_SET_CURRENT(planstart); /* * We should not hide any objects while explaining some query to not break * postgres vanilla tests. * * The filter 'is_citus_depended_object' is added to explain result * and causes some tests to fail if HideCitusDependentObjects is true. * Therefore, we disable HideCitusDependentObjects until the current transaction * ends. * * We do not use security quals because a postgres vanilla test fails * with a change of order for its result. */ SetLocalHideCitusDependentObjectsDisabledWhenAlreadyEnabled(); /* plan the query */ PlannedStmt *plan = pg_plan_query(query, NULL, cursorOptions, params); INSTR_TIME_SET_CURRENT(planduration); INSTR_TIME_SUBTRACT(planduration, planstart); /* calc differences of buffer counters. */ if (es->buffers) { memset(&bufusage, 0, sizeof(BufferUsage)); BufferUsageAccumDiff(&bufusage, &pgBufferUsage, &bufusage_start); } #if PG_VERSION_NUM >= PG_VERSION_17 if (es->memory) { MemoryContextSwitchTo(saved_ctx); MemoryContextMemConsumed(planner_ctx, &mem_counters); } /* run it (if needed) and produce output */ ExplainOnePlan(plan, into, es, queryString, params, queryEnv, &planduration, (es->buffers ? &bufusage : NULL), (es->memory ? &mem_counters : NULL)); #else /* run it (if needed) and produce output */ ExplainOnePlan(plan, into, es, queryString, params, queryEnv, &planduration, (es->buffers ? &bufusage : NULL)); #endif } /* * CreateExplainAnlyzeDestination creates a destination suitable for collecting * explain analyze output from workers. */ static TupleDestination * CreateExplainAnlyzeDestination(Task *task, TupleDestination *taskDest) { ExplainAnalyzeDestination *tupleDestination = palloc0( sizeof(ExplainAnalyzeDestination)); tupleDestination->originalTask = task; tupleDestination->originalTaskDestination = taskDest; TupleDesc lastSavedExplainAnalyzeTupDesc = CreateTemplateTupleDesc(2); TupleDescInitEntry(lastSavedExplainAnalyzeTupDesc, 1, "explain analyze", TEXTOID, 0, 0); TupleDescInitEntry(lastSavedExplainAnalyzeTupDesc, 2, "duration", FLOAT8OID, 0, 0); tupleDestination->lastSavedExplainAnalyzeTupDesc = lastSavedExplainAnalyzeTupDesc; tupleDestination->pub.putTuple = ExplainAnalyzeDestPutTuple; tupleDestination->pub.tupleDescForQuery = ExplainAnalyzeDestTupleDescForQuery; return (TupleDestination *) tupleDestination; } /* * ExplainAnalyzeDestPutTuple implements TupleDestination->putTuple * for ExplainAnalyzeDestination. */ static void ExplainAnalyzeDestPutTuple(TupleDestination *self, Task *task, int placementIndex, int queryNumber, HeapTuple heapTuple, uint64 tupleLibpqSize) { ExplainAnalyzeDestination *tupleDestination = (ExplainAnalyzeDestination *) self; if (queryNumber == 0) { TupleDestination *originalTupDest = tupleDestination->originalTaskDestination; originalTupDest->putTuple(originalTupDest, task, placementIndex, 0, heapTuple, tupleLibpqSize); tupleDestination->originalTask->totalReceivedTupleData += tupleLibpqSize; } else if (queryNumber == 1) { bool isNull = false; TupleDesc tupDesc = tupleDestination->lastSavedExplainAnalyzeTupDesc; Datum explainAnalyze = heap_getattr(heapTuple, 1, tupDesc, &isNull); if (isNull) { ereport(WARNING, (errmsg( "received null explain analyze output from worker"))); return; } Datum executionDuration = heap_getattr(heapTuple, 2, tupDesc, &isNull); if (isNull) { ereport(WARNING, (errmsg("received null execution time from worker"))); return; } char *fetchedExplainAnalyzePlan = TextDatumGetCString(explainAnalyze); double fetchedExplainAnalyzeExecutionDuration = DatumGetFloat8(executionDuration); /* * Allocate fetchedExplainAnalyzePlan in the same context as the Task, since we are * currently in execution context and a Task can span multiple executions. * * Although we won't reuse the same value in a future execution, but we have * calls to CheckNodeCopyAndSerialization() which asserts copy functions of the task * work as expected, which will try to copy this value in a future execution. * * Why don't we just allocate this field in executor context and reset it before * the next execution? Because when an error is raised we can skip pretty much most * of the meaningful places that we can insert the reset. * * TODO: Take all EXPLAIN ANALYZE related fields out of Task and store them in a * Task to ExplainAnalyzePrivate mapping in multi_explain.c, so we don't need to * do these hacky memory context management tricks. */ MemoryContext taskContext = GetMemoryChunkContext(tupleDestination->originalTask); tupleDestination->originalTask->fetchedExplainAnalyzePlan = MemoryContextStrdup(taskContext, fetchedExplainAnalyzePlan); tupleDestination->originalTask->fetchedExplainAnalyzePlacementIndex = placementIndex; tupleDestination->originalTask->fetchedExplainAnalyzeExecutionDuration = fetchedExplainAnalyzeExecutionDuration; } else { ereport(ERROR, (errmsg("cannot get EXPLAIN ANALYZE of multiple queries"), errdetail("while receiving tuples for query %d", queryNumber))); } } /* * ResetExplainAnalyzeData reset fields in Task that are used by multi_explain.c */ void ResetExplainAnalyzeData(List *taskList) { Task *task = NULL; foreach_declared_ptr(task, taskList) { if (task->fetchedExplainAnalyzePlan != NULL) { pfree(task->fetchedExplainAnalyzePlan); } task->totalReceivedTupleData = 0; task->fetchedExplainAnalyzePlacementIndex = 0; task->fetchedExplainAnalyzePlan = NULL; } } /* * ExplainAnalyzeDestTupleDescForQuery implements TupleDestination->tupleDescForQuery * for ExplainAnalyzeDestination. */ static TupleDesc ExplainAnalyzeDestTupleDescForQuery(TupleDestination *self, int queryNumber) { ExplainAnalyzeDestination *tupleDestination = (ExplainAnalyzeDestination *) self; if (queryNumber == 0) { TupleDestination *originalTupDest = tupleDestination->originalTaskDestination; return originalTupDest->tupleDescForQuery(originalTupDest, 0); } else if (queryNumber == 1) { return tupleDestination->lastSavedExplainAnalyzeTupDesc; } ereport(ERROR, (errmsg("cannot get EXPLAIN ANALYZE of multiple queries"), errdetail("while requesting for tuple descriptor of query %d", queryNumber))); return NULL; } /* * RequestedForExplainAnalyze returns true if we should get the EXPLAIN ANALYZE * output for the given custom scan node. */ bool RequestedForExplainAnalyze(CitusScanState *node) { return (node->customScanState.ss.ps.state->es_instrument != 0); } /* * ExplainAnalyzeTaskList returns a task list suitable for explain analyze. After executing * these tasks, fetchedExplainAnalyzePlan of originalTaskList should be populated. */ List * ExplainAnalyzeTaskList(List *originalTaskList, TupleDestination *defaultTupleDest, TupleDesc tupleDesc, ParamListInfo params) { List *explainAnalyzeTaskList = NIL; Task *originalTask = NULL; foreach_declared_ptr(originalTask, originalTaskList) { if (originalTask->queryCount != 1) { ereport(ERROR, (errmsg("cannot get EXPLAIN ANALYZE of multiple queries"))); } Task *explainAnalyzeTask = copyObject(originalTask); const char *queryString = TaskQueryString(explainAnalyzeTask); ParamListInfo taskParams = params; /* * We will not send parameters if they have already been resolved in the query * string. */ if (explainAnalyzeTask->parametersInQueryStringResolved) { taskParams = NULL; } char *wrappedQuery = WrapQueryForExplainAnalyze(queryString, tupleDesc, taskParams); char *fetchQuery = FetchPlanQueryForExplainAnalyze(queryString, taskParams); SetTaskQueryStringList(explainAnalyzeTask, list_make2(wrappedQuery, fetchQuery)); TupleDestination *originalTaskDest = originalTask->tupleDest ? originalTask->tupleDest : defaultTupleDest; explainAnalyzeTask->tupleDest = CreateExplainAnlyzeDestination(originalTask, originalTaskDest); explainAnalyzeTaskList = lappend(explainAnalyzeTaskList, explainAnalyzeTask); } return explainAnalyzeTaskList; } /* * WrapQueryForExplainAnalyze wraps a query into a worker_save_query_explain_analyze() * call so we can fetch its explain analyze after its execution. */ static char * WrapQueryForExplainAnalyze(const char *queryString, TupleDesc tupleDesc, ParamListInfo params) { StringInfo columnDef = makeStringInfo(); for (int columnIndex = 0; columnIndex < tupleDesc->natts; columnIndex++) { if (columnIndex != 0) { appendStringInfoString(columnDef, ", "); } Form_pg_attribute attr = &tupleDesc->attrs[columnIndex]; char *attrType = format_type_extended(attr->atttypid, attr->atttypmod, FORMAT_TYPE_TYPEMOD_GIVEN | FORMAT_TYPE_FORCE_QUALIFY); appendStringInfo(columnDef, "field_%d %s", columnIndex, attrType); } /* * column definition cannot be empty, so create a dummy column definition for * queries with no results. */ if (tupleDesc->natts == 0) { appendStringInfo(columnDef, "dummy_field int"); } StringInfo explainOptions = makeStringInfo(); appendStringInfo(explainOptions, "{\"verbose\": %s, \"costs\": %s, \"buffers\": %s, \"wal\": %s, " #if PG_VERSION_NUM >= PG_VERSION_17 "\"memory\": %s, \"serialize\": \"%s\", " #endif "\"timing\": %s, \"summary\": %s, \"format\": \"%s\"}", CurrentDistributedQueryExplainOptions.verbose ? "true" : "false", CurrentDistributedQueryExplainOptions.costs ? "true" : "false", CurrentDistributedQueryExplainOptions.buffers ? "true" : "false", CurrentDistributedQueryExplainOptions.wal ? "true" : "false", #if PG_VERSION_NUM >= PG_VERSION_17 CurrentDistributedQueryExplainOptions.memory ? "true" : "false", ExplainSerializeStr(CurrentDistributedQueryExplainOptions.serialize), #endif CurrentDistributedQueryExplainOptions.timing ? "true" : "false", CurrentDistributedQueryExplainOptions.summary ? "true" : "false", ExplainFormatStr(CurrentDistributedQueryExplainOptions.format)); StringInfo wrappedQuery = makeStringInfo(); /* * We do not include dummy column if original query didn't return any columns. * Otherwise, number of columns that original query returned wouldn't match * number of columns returned by worker_save_query_explain_analyze. */ char *workerSaveQueryFetchCols = (tupleDesc->natts == 0) ? "" : "*"; if (params != NULL) { /* * Add a dummy CTE to ensure all parameters are referenced, such that their * types can be resolved. */ appendStringInfo(wrappedQuery, "WITH unused AS (%s) ", ParameterResolutionSubquery(params)); } appendStringInfo(wrappedQuery, "SELECT %s FROM worker_save_query_explain_analyze(%s, %s) AS (%s)", workerSaveQueryFetchCols, quote_literal_cstr(queryString), quote_literal_cstr(explainOptions->data), columnDef->data); return wrappedQuery->data; } /* * FetchPlanQueryForExplainAnalyze generates a query to fetch the plan saved * by worker_save_query_explain_analyze from the worker. */ static char * FetchPlanQueryForExplainAnalyze(const char *queryString, ParamListInfo params) { StringInfo fetchQuery = makeStringInfo(); if (params != NULL) { /* * Add a dummy CTE to ensure all parameters are referenced, such that their * types can be resolved. */ appendStringInfo(fetchQuery, "WITH unused AS (%s) ", ParameterResolutionSubquery(params)); } appendStringInfoString(fetchQuery, "SELECT explain_analyze_output, execution_duration " "FROM worker_last_saved_explain_analyze()"); return fetchQuery->data; } /* * ParameterResolutionSubquery generates a subquery that returns all parameters * in params with explicit casts to their type names. This can be used in cases * where we use custom type parameters that are not directly referenced. */ static char * ParameterResolutionSubquery(ParamListInfo params) { StringInfo paramsQuery = makeStringInfo(); appendStringInfo(paramsQuery, "SELECT"); for (int paramIndex = 0; paramIndex < params->numParams; paramIndex++) { ParamExternData *param = ¶ms->params[paramIndex]; char *typeName = format_type_extended(param->ptype, -1, FORMAT_TYPE_FORCE_QUALIFY); appendStringInfo(paramsQuery, "%s $%d::%s", paramIndex > 0 ? "," : "", paramIndex + 1, typeName); } return paramsQuery->data; } /* * SplitString splits the given string by the given delimiter. * * Why not use strtok_s()? Its signature and semantics are difficult to understand. * * Why not use strchr() (similar to do_text_output_multiline)? Although not banned, * it isn't safe if by any chance str is not null-terminated. */ static List * SplitString(const char *str, char delimiter, int maxLength) { size_t len = strnlen(str, maxLength); if (len == 0) { return NIL; } List *tokenList = NIL; StringInfo token = makeStringInfo(); for (size_t index = 0; index < len; index++) { if (str[index] == delimiter) { tokenList = lappend(tokenList, token); token = makeStringInfo(); } else { appendStringInfoChar(token, str[index]); } } /* append last token */ tokenList = lappend(tokenList, token); return tokenList; } /* below are private functions copied from explain.c */ /* *INDENT-OFF* */ /* * ExplainOneQuery - * print out the execution plan for one Query * * "into" is NULL unless we are explaining the contents of a CreateTableAsStmt. */ static void ExplainOneQuery(Query *query, int cursorOptions, IntoClause *into, ExplainState *es, const char *queryString, ParamListInfo params, QueryEnvironment *queryEnv) { /* if an advisor plugin is present, let it manage things */ if (ExplainOneQuery_hook) { (*ExplainOneQuery_hook) (query, cursorOptions, into, es, queryString, params, queryEnv); } else { instr_time planstart, planduration; BufferUsage bufusage_start, bufusage; #if PG_VERSION_NUM >= PG_VERSION_17 MemoryContextCounters mem_counters; MemoryContext planner_ctx = NULL; MemoryContext saved_ctx = NULL; if (es->memory) { /* copy paste from postgres code */ planner_ctx = AllocSetContextCreate(CurrentMemoryContext, "explain analyze planner context", ALLOCSET_DEFAULT_SIZES); saved_ctx = MemoryContextSwitchTo(planner_ctx); } #endif if (es->buffers) bufusage_start = pgBufferUsage; INSTR_TIME_SET_CURRENT(planstart); /* plan the query */ PlannedStmt *plan = pg_plan_query(query, NULL, cursorOptions, params); INSTR_TIME_SET_CURRENT(planduration); INSTR_TIME_SUBTRACT(planduration, planstart); /* calc differences of buffer counters. */ if (es->buffers) { memset(&bufusage, 0, sizeof(BufferUsage)); BufferUsageAccumDiff(&bufusage, &pgBufferUsage, &bufusage_start); } #if PG_VERSION_NUM >= PG_VERSION_17 if (es->memory) { MemoryContextSwitchTo(saved_ctx); MemoryContextMemConsumed(planner_ctx, &mem_counters); } /* run it (if needed) and produce output */ ExplainOnePlan(plan, into, es, queryString, params, queryEnv, &planduration, (es->buffers ? &bufusage : NULL), (es->memory ? &mem_counters : NULL)); #else /* run it (if needed) and produce output */ ExplainOnePlan(plan, into, es, queryString, params, queryEnv, &planduration, (es->buffers ? &bufusage : NULL)); #endif } } /* * ExplainWorkerPlan produces explain output into es. If es->analyze, it also executes * the given plannedStmt and sends the results to dest. It puts total time to execute in * executionDurationMillisec. * * This is based on postgres' ExplainOnePlan(). We couldn't use an IntoClause to store results * into tupleStore, so we had to copy the same functionality with some minor changes. * * Keeping the formatting to make comparing with the ExplainOnePlan() easier. * * TODO: Send a PR to postgres to change ExplainOnePlan's API to use a more generic result * destination. */ static void ExplainWorkerPlan(PlannedStmt *plannedstmt, DestReceiver *dest, ExplainState *es, const char *queryString, ParamListInfo params, QueryEnvironment *queryEnv, const instr_time *planduration, #if PG_VERSION_NUM >= PG_VERSION_17 const BufferUsage *bufusage, const MemoryContextCounters *mem_counters, #endif double *executionDurationMillisec) { QueryDesc *queryDesc; instr_time starttime; double totaltime = 0; int eflags; int instrument_option = 0; Assert(plannedstmt->commandType != CMD_UTILITY); if (es->analyze && es->timing) instrument_option |= INSTRUMENT_TIMER; else if (es->analyze) instrument_option |= INSTRUMENT_ROWS; if (es->buffers) instrument_option |= INSTRUMENT_BUFFERS; if (es->wal) instrument_option |= INSTRUMENT_WAL; /* * We always collect timing for the entire statement, even when node-level * timing is off, so we don't look at es->timing here. (We could skip * this if !es->summary, but it's hardly worth the complication.) */ INSTR_TIME_SET_CURRENT(starttime); /* * Use a snapshot with an updated command ID to ensure this query sees * results of any previously executed queries. */ PushCopiedSnapshot(GetActiveSnapshot()); UpdateActiveSnapshotCommandId(); /* Create a QueryDesc for the query */ queryDesc = CreateQueryDesc(plannedstmt, queryString, GetActiveSnapshot(), InvalidSnapshot, dest, params, queryEnv, instrument_option); /* Select execution options */ if (es->analyze) eflags = 0; /* default run-to-completion flags */ else eflags = EXEC_FLAG_EXPLAIN_ONLY; /* call ExecutorStart to prepare the plan for execution */ ExecutorStart(queryDesc, eflags); /* Execute the plan for statistics if asked for */ if (es->analyze) { ScanDirection dir = ForwardScanDirection; /* run the plan */ ExecutorRun(queryDesc, dir, 0L, true); /* run cleanup too */ ExecutorFinish(queryDesc); /* We can't run ExecutorEnd 'till we're done printing the stats... */ totaltime += elapsed_time(&starttime); } ExplainOpenGroup("Query", NULL, true, es); /* Create textual dump of plan tree */ ExplainPrintPlan(es, queryDesc); #if PG_VERSION_NUM >= PG_VERSION_17 /* Show buffer and/or memory usage in planning */ if (peek_buffer_usage(es, bufusage) || mem_counters) { ExplainOpenGroup("Planning", "Planning", true, es); if (es->format == EXPLAIN_FORMAT_TEXT) { ExplainIndentText(es); appendStringInfoString(es->str, "Planning:\n"); es->indent++; } if (bufusage) show_buffer_usage(es, bufusage); if (mem_counters) show_memory_counters(es, mem_counters); if (es->format == EXPLAIN_FORMAT_TEXT) es->indent--; ExplainCloseGroup("Planning", "Planning", true, es); } #endif if (es->summary && planduration) { double plantime = INSTR_TIME_GET_DOUBLE(*planduration); ExplainPropertyFloat("Planning Time", "ms", 1000.0 * plantime, 3, es); } /* Print info about runtime of triggers */ if (es->analyze) ExplainPrintTriggers(es, queryDesc); /* * Print info about JITing. Tied to es->costs because we don't want to * display this in regression tests, as it'd cause output differences * depending on build options. Might want to separate that out from COSTS * at a later stage. */ if (es->costs) ExplainPrintJITSummary(es, queryDesc); #if PG_VERSION_NUM >= PG_VERSION_17 if (es->serialize != EXPLAIN_SERIALIZE_NONE) { /* the SERIALIZE option requires its own tuple receiver */ DestReceiver *dest_serialize = CreateExplainSerializeDestReceiver(es); /* grab serialization metrics before we destroy the DestReceiver */ SerializeMetrics serializeMetrics = GetSerializationMetrics(dest_serialize); /* call the DestReceiver's destroy method even during explain */ dest_serialize->rDestroy(dest_serialize); /* Print info about serialization of output */ ExplainPrintSerialize(es, &serializeMetrics); } #endif /* * Close down the query and free resources. Include time for this in the * total execution time (although it should be pretty minimal). */ INSTR_TIME_SET_CURRENT(starttime); ExecutorEnd(queryDesc); FreeQueryDesc(queryDesc); PopActiveSnapshot(); /* We need a CCI just in case query expanded to multiple plans */ if (es->analyze) CommandCounterIncrement(); totaltime += elapsed_time(&starttime); /* * We only report execution time if we actually ran the query (that is, * the user specified ANALYZE), and if summary reporting is enabled (the * user can set SUMMARY OFF to not have the timing information included in * the output). By default, ANALYZE sets SUMMARY to true. */ if (es->summary && es->analyze) ExplainPropertyFloat("Execution Time", "ms", 1000.0 * totaltime, 3, es); *executionDurationMillisec = totaltime * 1000; ExplainCloseGroup("Query", NULL, true, es); } /* * Compute elapsed time in seconds since given timestamp. * * Copied from explain.c. */ static double elapsed_time(instr_time *starttime) { instr_time endtime; INSTR_TIME_SET_CURRENT(endtime); INSTR_TIME_SUBTRACT(endtime, *starttime); return INSTR_TIME_GET_DOUBLE(endtime); } #if PG_VERSION_NUM >= PG_VERSION_17 /* * Return whether show_buffer_usage would have anything to print, if given * the same 'usage' data. Note that when the format is anything other than * text, we print even if the counters are all zeroes. * * Copied from explain.c. */ static bool peek_buffer_usage(ExplainState *es, const BufferUsage *usage) { bool has_shared; bool has_local; bool has_temp; bool has_shared_timing; bool has_local_timing; bool has_temp_timing; if (usage == NULL) return false; if (es->format != EXPLAIN_FORMAT_TEXT) return true; has_shared = (usage->shared_blks_hit > 0 || usage->shared_blks_read > 0 || usage->shared_blks_dirtied > 0 || usage->shared_blks_written > 0); has_local = (usage->local_blks_hit > 0 || usage->local_blks_read > 0 || usage->local_blks_dirtied > 0 || usage->local_blks_written > 0); has_temp = (usage->temp_blks_read > 0 || usage->temp_blks_written > 0); has_shared_timing = (!INSTR_TIME_IS_ZERO(usage->shared_blk_read_time) || !INSTR_TIME_IS_ZERO(usage->shared_blk_write_time)); has_local_timing = (!INSTR_TIME_IS_ZERO(usage->local_blk_read_time) || !INSTR_TIME_IS_ZERO(usage->local_blk_write_time)); has_temp_timing = (!INSTR_TIME_IS_ZERO(usage->temp_blk_read_time) || !INSTR_TIME_IS_ZERO(usage->temp_blk_write_time)); return has_shared || has_local || has_temp || has_shared_timing || has_local_timing || has_temp_timing; } /* * Show buffer usage details. This better be sync with peek_buffer_usage. * * Copied from explain.c. */ static void show_buffer_usage(ExplainState *es, const BufferUsage *usage) { if (es->format == EXPLAIN_FORMAT_TEXT) { bool has_shared = (usage->shared_blks_hit > 0 || usage->shared_blks_read > 0 || usage->shared_blks_dirtied > 0 || usage->shared_blks_written > 0); bool has_local = (usage->local_blks_hit > 0 || usage->local_blks_read > 0 || usage->local_blks_dirtied > 0 || usage->local_blks_written > 0); bool has_temp = (usage->temp_blks_read > 0 || usage->temp_blks_written > 0); bool has_shared_timing = (!INSTR_TIME_IS_ZERO(usage->shared_blk_read_time) || !INSTR_TIME_IS_ZERO(usage->shared_blk_write_time)); bool has_local_timing = (!INSTR_TIME_IS_ZERO(usage->local_blk_read_time) || !INSTR_TIME_IS_ZERO(usage->local_blk_write_time)); bool has_temp_timing = (!INSTR_TIME_IS_ZERO(usage->temp_blk_read_time) || !INSTR_TIME_IS_ZERO(usage->temp_blk_write_time)); /* Show only positive counter values. */ if (has_shared || has_local || has_temp) { ExplainIndentText(es); appendStringInfoString(es->str, "Buffers:"); if (has_shared) { appendStringInfoString(es->str, " shared"); if (usage->shared_blks_hit > 0) appendStringInfo(es->str, " hit=%lld", (long long) usage->shared_blks_hit); if (usage->shared_blks_read > 0) appendStringInfo(es->str, " read=%lld", (long long) usage->shared_blks_read); if (usage->shared_blks_dirtied > 0) appendStringInfo(es->str, " dirtied=%lld", (long long) usage->shared_blks_dirtied); if (usage->shared_blks_written > 0) appendStringInfo(es->str, " written=%lld", (long long) usage->shared_blks_written); if (has_local || has_temp) appendStringInfoChar(es->str, ','); } if (has_local) { appendStringInfoString(es->str, " local"); if (usage->local_blks_hit > 0) appendStringInfo(es->str, " hit=%lld", (long long) usage->local_blks_hit); if (usage->local_blks_read > 0) appendStringInfo(es->str, " read=%lld", (long long) usage->local_blks_read); if (usage->local_blks_dirtied > 0) appendStringInfo(es->str, " dirtied=%lld", (long long) usage->local_blks_dirtied); if (usage->local_blks_written > 0) appendStringInfo(es->str, " written=%lld", (long long) usage->local_blks_written); if (has_temp) appendStringInfoChar(es->str, ','); } if (has_temp) { appendStringInfoString(es->str, " temp"); if (usage->temp_blks_read > 0) appendStringInfo(es->str, " read=%lld", (long long) usage->temp_blks_read); if (usage->temp_blks_written > 0) appendStringInfo(es->str, " written=%lld", (long long) usage->temp_blks_written); } appendStringInfoChar(es->str, '\n'); } /* As above, show only positive counter values. */ if (has_shared_timing || has_local_timing || has_temp_timing) { ExplainIndentText(es); appendStringInfoString(es->str, "I/O Timings:"); if (has_shared_timing) { appendStringInfoString(es->str, " shared"); if (!INSTR_TIME_IS_ZERO(usage->shared_blk_read_time)) appendStringInfo(es->str, " read=%0.3f", INSTR_TIME_GET_MILLISEC(usage->shared_blk_read_time)); if (!INSTR_TIME_IS_ZERO(usage->shared_blk_write_time)) appendStringInfo(es->str, " write=%0.3f", INSTR_TIME_GET_MILLISEC(usage->shared_blk_write_time)); if (has_local_timing || has_temp_timing) appendStringInfoChar(es->str, ','); } if (has_local_timing) { appendStringInfoString(es->str, " local"); if (!INSTR_TIME_IS_ZERO(usage->local_blk_read_time)) appendStringInfo(es->str, " read=%0.3f", INSTR_TIME_GET_MILLISEC(usage->local_blk_read_time)); if (!INSTR_TIME_IS_ZERO(usage->local_blk_write_time)) appendStringInfo(es->str, " write=%0.3f", INSTR_TIME_GET_MILLISEC(usage->local_blk_write_time)); if (has_temp_timing) appendStringInfoChar(es->str, ','); } if (has_temp_timing) { appendStringInfoString(es->str, " temp"); if (!INSTR_TIME_IS_ZERO(usage->temp_blk_read_time)) appendStringInfo(es->str, " read=%0.3f", INSTR_TIME_GET_MILLISEC(usage->temp_blk_read_time)); if (!INSTR_TIME_IS_ZERO(usage->temp_blk_write_time)) appendStringInfo(es->str, " write=%0.3f", INSTR_TIME_GET_MILLISEC(usage->temp_blk_write_time)); } appendStringInfoChar(es->str, '\n'); } } else { ExplainPropertyInteger("Shared Hit Blocks", NULL, usage->shared_blks_hit, es); ExplainPropertyInteger("Shared Read Blocks", NULL, usage->shared_blks_read, es); ExplainPropertyInteger("Shared Dirtied Blocks", NULL, usage->shared_blks_dirtied, es); ExplainPropertyInteger("Shared Written Blocks", NULL, usage->shared_blks_written, es); ExplainPropertyInteger("Local Hit Blocks", NULL, usage->local_blks_hit, es); ExplainPropertyInteger("Local Read Blocks", NULL, usage->local_blks_read, es); ExplainPropertyInteger("Local Dirtied Blocks", NULL, usage->local_blks_dirtied, es); ExplainPropertyInteger("Local Written Blocks", NULL, usage->local_blks_written, es); ExplainPropertyInteger("Temp Read Blocks", NULL, usage->temp_blks_read, es); ExplainPropertyInteger("Temp Written Blocks", NULL, usage->temp_blks_written, es); if (track_io_timing) { ExplainPropertyFloat("Shared I/O Read Time", "ms", INSTR_TIME_GET_MILLISEC(usage->shared_blk_read_time), 3, es); ExplainPropertyFloat("Shared I/O Write Time", "ms", INSTR_TIME_GET_MILLISEC(usage->shared_blk_write_time), 3, es); ExplainPropertyFloat("Local I/O Read Time", "ms", INSTR_TIME_GET_MILLISEC(usage->local_blk_read_time), 3, es); ExplainPropertyFloat("Local I/O Write Time", "ms", INSTR_TIME_GET_MILLISEC(usage->local_blk_write_time), 3, es); ExplainPropertyFloat("Temp I/O Read Time", "ms", INSTR_TIME_GET_MILLISEC(usage->temp_blk_read_time), 3, es); ExplainPropertyFloat("Temp I/O Write Time", "ms", INSTR_TIME_GET_MILLISEC(usage->temp_blk_write_time), 3, es); } } } /* * Indent a text-format line. * * We indent by two spaces per indentation level. However, when emitting * data for a parallel worker there might already be data on the current line * (cf. ExplainOpenWorker); in that case, don't indent any more. * * Copied from explain.c. */ static void ExplainIndentText(ExplainState *es) { Assert(es->format == EXPLAIN_FORMAT_TEXT); if (es->str->len == 0 || es->str->data[es->str->len - 1] == '\n') appendStringInfoSpaces(es->str, es->indent * 2); } /* * Show memory usage details. * * Copied from explain.c. */ static void show_memory_counters(ExplainState *es, const MemoryContextCounters *mem_counters) { int64 memUsedkB = BYTES_TO_KILOBYTES(mem_counters->totalspace - mem_counters->freespace); int64 memAllocatedkB = BYTES_TO_KILOBYTES(mem_counters->totalspace); if (es->format == EXPLAIN_FORMAT_TEXT) { ExplainIndentText(es); appendStringInfo(es->str, "Memory: used=" INT64_FORMAT "kB allocated=" INT64_FORMAT "kB", memUsedkB, memAllocatedkB); appendStringInfoChar(es->str, '\n'); } else { ExplainPropertyInteger("Memory Used", "kB", memUsedkB, es); ExplainPropertyInteger("Memory Allocated", "kB", memAllocatedkB, es); } } /* * ExplainPrintSerialize - * Append information about query output volume to es->str. * * Copied from explain.c. */ static void ExplainPrintSerialize(ExplainState *es, SerializeMetrics *metrics) { const char *format; /* We shouldn't get called for EXPLAIN_SERIALIZE_NONE */ if (es->serialize == EXPLAIN_SERIALIZE_TEXT) format = "text"; else { Assert(es->serialize == EXPLAIN_SERIALIZE_BINARY); format = "binary"; } ExplainOpenGroup("Serialization", "Serialization", true, es); if (es->format == EXPLAIN_FORMAT_TEXT) { ExplainIndentText(es); if (es->timing) appendStringInfo(es->str, "Serialization: time=%.3f ms output=" UINT64_FORMAT "kB format=%s\n", 1000.0 * INSTR_TIME_GET_DOUBLE(metrics->timeSpent), BYTES_TO_KILOBYTES(metrics->bytesSent), format); else appendStringInfo(es->str, "Serialization: output=" UINT64_FORMAT "kB format=%s\n", BYTES_TO_KILOBYTES(metrics->bytesSent), format); if (es->buffers && peek_buffer_usage(es, &metrics->bufferUsage)) { es->indent++; show_buffer_usage(es, &metrics->bufferUsage); es->indent--; } } else { if (es->timing) ExplainPropertyFloat("Time", "ms", 1000.0 * INSTR_TIME_GET_DOUBLE(metrics->timeSpent), 3, es); ExplainPropertyUInteger("Output Volume", "kB", BYTES_TO_KILOBYTES(metrics->bytesSent), es); ExplainPropertyText("Format", format, es); if (es->buffers) show_buffer_usage(es, &metrics->bufferUsage); } ExplainCloseGroup("Serialization", "Serialization", true, es); } /* * GetSerializationMetrics - collect metrics * * We have to be careful here since the receiver could be an IntoRel * receiver if the subject statement is CREATE TABLE AS. In that * case, return all-zeroes stats. * * Copied from explain.c. */ static SerializeMetrics GetSerializationMetrics(DestReceiver *dest) { SerializeMetrics empty; if (dest->mydest == DestExplainSerialize) return ((SerializeDestReceiver *) dest)->metrics; memset(&empty, 0, sizeof(SerializeMetrics)); INSTR_TIME_SET_ZERO(empty.timeSpent); return empty; } #endif