mirror of https://github.com/citusdata/citus.git
Handle user-defined type parameters in EXPLAIN ANALYZE
parent
cb9e510e40
commit
a74d991445
|
@ -57,7 +57,9 @@
|
|||
#include "nodes/print.h"
|
||||
#include "optimizer/clauses.h"
|
||||
#include "optimizer/planner.h"
|
||||
#include "parser/analyze.h"
|
||||
#include "portability/instr_time.h"
|
||||
#include "rewrite/rewriteHandler.h"
|
||||
#include "tcop/dest.h"
|
||||
#include "tcop/tcopprot.h"
|
||||
#include "tcop/utility.h"
|
||||
|
@ -152,7 +154,11 @@ static void ExplainAnalyzeDestPutTuple(TupleDestination *self, Task *task,
|
|||
HeapTuple heapTuple, uint64 tupleLibpqSize);
|
||||
static TupleDesc ExplainAnalyzeDestTupleDescForQuery(TupleDestination *self, int
|
||||
queryNumber);
|
||||
static char * WrapQueryForExplainAnalyze(const char *queryString, TupleDesc tupleDesc);
|
||||
static char * WrapQueryForExplainAnalyze(const char *queryString, TupleDesc tupleDesc,
|
||||
ParamListInfo params);
|
||||
static char * FetchPlanQueryForExplainAnalyze(const char *queryString,
|
||||
ParamListInfo params);
|
||||
static char * ParameterResolutionSubquery(ParamListInfo params);
|
||||
static List * SplitString(const char *str, char delimiter, int maxLength);
|
||||
|
||||
/* Static Explain functions copied from explain.c */
|
||||
|
@ -1042,14 +1048,25 @@ worker_save_query_explain_analyze(PG_FUNCTION_ARGS)
|
|||
int numParams = boundParams ? boundParams->numParams : 0;
|
||||
Oid *paramTypes = NULL;
|
||||
const char **paramValues = NULL;
|
||||
|
||||
if (boundParams != NULL)
|
||||
{
|
||||
ExtractParametersFromParamList(boundParams, ¶mTypes, ¶mValues, false);
|
||||
}
|
||||
|
||||
List *queryList = pg_analyze_and_rewrite(parseTree, queryString, paramTypes,
|
||||
numParams, NULL);
|
||||
/* resolve OIDs of unknown (user-defined) types */
|
||||
Query *analyzedQuery = parse_analyze_varparams(parseTree, queryString,
|
||||
¶mTypes, &numParams);
|
||||
|
||||
#if PG_VERSION_NUM >= PG_VERSION_14
|
||||
|
||||
/* pg_rewrite_query is a wrapper around QueryRewrite with some debugging logic */
|
||||
List *queryList = pg_rewrite_query(analyzedQuery);
|
||||
#else
|
||||
|
||||
/* pg_rewrite_query is not yet public in PostgreSQL 13 */
|
||||
List *queryList = QueryRewrite(analyzedQuery);
|
||||
#endif
|
||||
if (list_length(queryList) != 1)
|
||||
{
|
||||
ereport(ERROR, (errmsg("cannot EXPLAIN ANALYZE a query rewritten "
|
||||
|
@ -1377,9 +1394,21 @@ ExplainAnalyzeTaskList(List *originalTaskList,
|
|||
|
||||
Task *explainAnalyzeTask = copyObject(originalTask);
|
||||
const char *queryString = TaskQueryString(explainAnalyzeTask);
|
||||
char *wrappedQuery = WrapQueryForExplainAnalyze(queryString, tupleDesc);
|
||||
char *fetchQuery =
|
||||
"SELECT explain_analyze_output, execution_duration FROM worker_last_saved_explain_analyze()";
|
||||
ParamListInfo taskParams = params;
|
||||
|
||||
/*
|
||||
* We will not send parameters if they have already been resolved in the query
|
||||
* string.
|
||||
*/
|
||||
if (explainAnalyzeTask->parametersInQueryStringResolved)
|
||||
{
|
||||
taskParams = NULL;
|
||||
}
|
||||
|
||||
char *wrappedQuery = WrapQueryForExplainAnalyze(queryString, tupleDesc,
|
||||
taskParams);
|
||||
char *fetchQuery = FetchPlanQueryForExplainAnalyze(queryString, taskParams);
|
||||
|
||||
SetTaskQueryStringList(explainAnalyzeTask, list_make2(wrappedQuery, fetchQuery));
|
||||
|
||||
TupleDestination *originalTaskDest = originalTask->tupleDest ?
|
||||
|
@ -1401,7 +1430,8 @@ ExplainAnalyzeTaskList(List *originalTaskList,
|
|||
* call so we can fetch its explain analyze after its execution.
|
||||
*/
|
||||
static char *
|
||||
WrapQueryForExplainAnalyze(const char *queryString, TupleDesc tupleDesc)
|
||||
WrapQueryForExplainAnalyze(const char *queryString, TupleDesc tupleDesc,
|
||||
ParamListInfo params)
|
||||
{
|
||||
StringInfo columnDef = makeStringInfo();
|
||||
for (int columnIndex = 0; columnIndex < tupleDesc->natts; columnIndex++)
|
||||
|
@ -1453,6 +1483,17 @@ WrapQueryForExplainAnalyze(const char *queryString, TupleDesc tupleDesc)
|
|||
* number of columns returned by worker_save_query_explain_analyze.
|
||||
*/
|
||||
char *workerSaveQueryFetchCols = (tupleDesc->natts == 0) ? "" : "*";
|
||||
|
||||
if (params != NULL)
|
||||
{
|
||||
/*
|
||||
* Add a dummy CTE to ensure all parameters are referenced, such that their
|
||||
* types can be resolved.
|
||||
*/
|
||||
appendStringInfo(wrappedQuery, "WITH unused AS (%s) ",
|
||||
ParameterResolutionSubquery(params));
|
||||
}
|
||||
|
||||
appendStringInfo(wrappedQuery,
|
||||
"SELECT %s FROM worker_save_query_explain_analyze(%s, %s) AS (%s)",
|
||||
workerSaveQueryFetchCols,
|
||||
|
@ -1464,6 +1505,60 @@ WrapQueryForExplainAnalyze(const char *queryString, TupleDesc tupleDesc)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* FetchPlanQueryForExplainAnalyze generates a query to fetch the plan saved
|
||||
* by worker_save_query_explain_analyze from the worker.
|
||||
*/
|
||||
static char *
|
||||
FetchPlanQueryForExplainAnalyze(const char *queryString, ParamListInfo params)
|
||||
{
|
||||
StringInfo fetchQuery = makeStringInfo();
|
||||
|
||||
if (params != NULL)
|
||||
{
|
||||
/*
|
||||
* Add a dummy CTE to ensure all parameters are referenced, such that their
|
||||
* types can be resolved.
|
||||
*/
|
||||
appendStringInfo(fetchQuery, "WITH unused AS (%s) ",
|
||||
ParameterResolutionSubquery(params));
|
||||
}
|
||||
|
||||
appendStringInfoString(fetchQuery,
|
||||
"SELECT explain_analyze_output, execution_duration "
|
||||
"FROM worker_last_saved_explain_analyze()");
|
||||
|
||||
return fetchQuery->data;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* ParameterResolutionSubquery generates a subquery that returns all parameters
|
||||
* in params with explicit casts to their type names. This can be used in cases
|
||||
* where we use custom type parameters that are not directly referenced.
|
||||
*/
|
||||
static char *
|
||||
ParameterResolutionSubquery(ParamListInfo params)
|
||||
{
|
||||
StringInfo paramsQuery = makeStringInfo();
|
||||
|
||||
appendStringInfo(paramsQuery, "SELECT");
|
||||
|
||||
for (int paramIndex = 0; paramIndex < params->numParams; paramIndex++)
|
||||
{
|
||||
ParamExternData *param = ¶ms->params[paramIndex];
|
||||
char *typeName = format_type_extended(param->ptype, -1,
|
||||
FORMAT_TYPE_FORCE_QUALIFY);
|
||||
|
||||
appendStringInfo(paramsQuery, "%s $%d::%s",
|
||||
paramIndex > 0 ? "," : "",
|
||||
paramIndex + 1, typeName);
|
||||
}
|
||||
|
||||
return paramsQuery->data;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* SplitString splits the given string by the given delimiter.
|
||||
*
|
||||
|
|
|
@ -3056,5 +3056,49 @@ Custom Scan (Citus Adaptive) (actual rows=0 loops=1)
|
|||
Tuple data received from node: 0 bytes
|
||||
Node: host=localhost port=xxxxx dbname=regression
|
||||
-> Seq Scan on tbl_570036 tbl (actual rows=0 loops=1)
|
||||
PREPARE q1(int_wrapper_type) AS WITH a AS (SELECT * FROM tbl WHERE b = $1 AND a = 1 OFFSET 0) SELECT * FROM a;
|
||||
EXPLAIN (COSTS false) EXECUTE q1('(1)');
|
||||
Custom Scan (Citus Adaptive)
|
||||
Task Count: 1
|
||||
Tasks Shown: All
|
||||
-> Task
|
||||
Node: host=localhost port=xxxxx dbname=regression
|
||||
-> Seq Scan on tbl_570036 tbl
|
||||
Filter: ((b = '(1)'::multi_explain.int_wrapper_type) AND (a = 1))
|
||||
EXPLAIN :default_analyze_flags EXECUTE q1('(1)');
|
||||
Custom Scan (Citus Adaptive) (actual rows=0 loops=1)
|
||||
Task Count: 1
|
||||
Tuple data received from nodes: 0 bytes
|
||||
Tasks Shown: All
|
||||
-> Task
|
||||
Tuple data received from node: 0 bytes
|
||||
Node: host=localhost port=xxxxx dbname=regression
|
||||
-> Seq Scan on tbl_570036 tbl (actual rows=0 loops=1)
|
||||
Filter: ((b = $1) AND (a = 1))
|
||||
PREPARE q2(int_wrapper_type) AS WITH a AS (UPDATE tbl SET b = $1 WHERE a = 1 RETURNING *) SELECT * FROM a;
|
||||
EXPLAIN (COSTS false) EXECUTE q2('(1)');
|
||||
Custom Scan (Citus Adaptive)
|
||||
Task Count: 1
|
||||
Tasks Shown: All
|
||||
-> Task
|
||||
Node: host=localhost port=xxxxx dbname=regression
|
||||
-> CTE Scan on a
|
||||
CTE a
|
||||
-> Update on tbl_570036 tbl
|
||||
-> Seq Scan on tbl_570036 tbl
|
||||
Filter: (a = 1)
|
||||
EXPLAIN :default_analyze_flags EXECUTE q2('(1)');
|
||||
Custom Scan (Citus Adaptive) (actual rows=0 loops=1)
|
||||
Task Count: 1
|
||||
Tuple data received from nodes: 0 bytes
|
||||
Tasks Shown: All
|
||||
-> Task
|
||||
Tuple data received from node: 0 bytes
|
||||
Node: host=localhost port=xxxxx dbname=regression
|
||||
-> CTE Scan on a (actual rows=0 loops=1)
|
||||
CTE a
|
||||
-> Update on tbl_570036 tbl (actual rows=0 loops=1)
|
||||
-> Seq Scan on tbl_570036 tbl (actual rows=0 loops=1)
|
||||
Filter: (a = 1)
|
||||
SET client_min_messages TO ERROR;
|
||||
DROP SCHEMA multi_explain CASCADE;
|
||||
|
|
|
@ -1090,5 +1090,13 @@ SELECT create_distributed_table('tbl', 'a');
|
|||
|
||||
EXPLAIN :default_analyze_flags SELECT * FROM tbl;
|
||||
|
||||
PREPARE q1(int_wrapper_type) AS WITH a AS (SELECT * FROM tbl WHERE b = $1 AND a = 1 OFFSET 0) SELECT * FROM a;
|
||||
EXPLAIN (COSTS false) EXECUTE q1('(1)');
|
||||
EXPLAIN :default_analyze_flags EXECUTE q1('(1)');
|
||||
|
||||
PREPARE q2(int_wrapper_type) AS WITH a AS (UPDATE tbl SET b = $1 WHERE a = 1 RETURNING *) SELECT * FROM a;
|
||||
EXPLAIN (COSTS false) EXECUTE q2('(1)');
|
||||
EXPLAIN :default_analyze_flags EXECUTE q2('(1)');
|
||||
|
||||
SET client_min_messages TO ERROR;
|
||||
DROP SCHEMA multi_explain CASCADE;
|
||||
|
|
Loading…
Reference in New Issue