Does the EXPLAIN ANALYZE at the same time as execution, so avoids executing twice.

We wrap worker tasks in worker_save_query_explain_analyze() so we can fetch
their explain output later by a call worker_last_saved_explain_analyze().

Fixes #3519
Fixes #2347
Fixes #2613
Fixes #621
pull/3890/head
Hadi Moshayedi 2020-06-09 10:29:59 -07:00
parent 8551affc1e
commit bb96ef5047
9 changed files with 936 additions and 59 deletions

View File

@ -143,6 +143,7 @@
#include "distributed/local_executor.h"
#include "distributed/multi_client_executor.h"
#include "distributed/multi_executor.h"
#include "distributed/multi_explain.h"
#include "distributed/multi_partitioning_utils.h"
#include "distributed/multi_physical_planner.h"
#include "distributed/multi_resowner.h"
@ -188,12 +189,6 @@ typedef struct DistributedExecution
List *remoteTaskList;
List *localTaskList;
/*
* Corresponding distributed plan returns results,
* either because it is a SELECT or has RETURNING.
*/
bool expectResults;
/*
* If a task specific destination is not provided for a task, then use
* defaultTupleDest.
@ -490,9 +485,6 @@ typedef struct ShardCommandExecution
struct TaskPlacementExecution **placementExecutions;
int placementExecutionCount;
/* whether we expect results to come back */
bool expectResults;
/*
* RETURNING results from other shard placements can be ignored
* after we got results from the first placements.
@ -562,7 +554,6 @@ typedef struct TaskPlacementExecution
/* local functions */
static DistributedExecution * CreateDistributedExecution(RowModifyLevel modLevel,
List *taskList,
bool expectResults,
ParamListInfo paramListInfo,
int targetPoolSize,
TupleDestination *
@ -694,6 +685,12 @@ AdaptiveExecutor(CitusScanState *scanState)
TupleDestination *defaultTupleDest =
CreateTupleStoreTupleDest(scanState->tuplestorestate, tupleDescriptor);
if (RequestedForExplainAnalyze(scanState))
{
taskList = ExplainAnalyzeTaskList(taskList, defaultTupleDest, tupleDescriptor,
paramListInfo);
}
bool hasDependentJobs = HasDependentJobs(job);
if (hasDependentJobs)
{
@ -714,7 +711,6 @@ AdaptiveExecutor(CitusScanState *scanState)
DistributedExecution *execution = CreateDistributedExecution(
distributedPlan->modLevel,
taskList,
distributedPlan->expectResults,
paramListInfo,
targetPoolSize,
defaultTupleDest,
@ -1010,9 +1006,8 @@ ExecuteTaskListExtended(ExecutionParams *executionParams)
DistributedExecution *execution =
CreateDistributedExecution(
executionParams->modLevel, remoteTaskList,
executionParams->expectResults, paramListInfo,
executionParams->targetPoolSize, defaultTupleDest,
&executionParams->xactProperties,
paramListInfo, executionParams->targetPoolSize,
defaultTupleDest, &executionParams->xactProperties,
executionParams->jobIdList);
StartDistributedExecution(execution);
@ -1055,7 +1050,7 @@ CreateBasicExecutionParams(RowModifyLevel modLevel,
*/
static DistributedExecution *
CreateDistributedExecution(RowModifyLevel modLevel, List *taskList,
bool expectResults, ParamListInfo paramListInfo,
ParamListInfo paramListInfo,
int targetPoolSize, TupleDestination *defaultTupleDest,
TransactionProperties *xactProperties,
List *jobIdList)
@ -1065,7 +1060,6 @@ CreateDistributedExecution(RowModifyLevel modLevel, List *taskList,
execution->modLevel = modLevel;
execution->tasksToExecute = taskList;
execution->expectResults = expectResults;
execution->transactionProperties = xactProperties;
execution->localTaskList = NIL;
@ -1723,7 +1717,6 @@ AssignTasksToConnectionsOrWorkerPool(DistributedExecution *execution)
{
RowModifyLevel modLevel = execution->modLevel;
List *taskList = execution->tasksToExecute;
bool expectResults = execution->expectResults;
int32 localGroupId = GetLocalGroupId();
@ -1764,9 +1757,6 @@ AssignTasksToConnectionsOrWorkerPool(DistributedExecution *execution)
NULL;
}
shardCommandExecution->expectResults =
expectResults && !task->partiallyLocalOrRemote;
ShardPlacement *taskPlacement = NULL;
foreach_ptr(taskPlacement, task->taskPlacementList)
{
@ -3178,13 +3168,25 @@ TransactionStateMachine(WorkerSession *session)
TaskPlacementExecution *placementExecution = session->currentTask;
ShardCommandExecution *shardCommandExecution =
placementExecution->shardCommandExecution;
bool storeRows = shardCommandExecution->expectResults;
Task *task = shardCommandExecution->task;
/*
* In EXPLAIN ANALYZE we need to store results except for multiple placements,
* regardless of query type. In other cases, doing the same doesn't seem to have
* a drawback.
*/
bool storeRows = true;
if (shardCommandExecution->gotResults)
{
/* already received results from another replica */
storeRows = false;
}
else if (task->partiallyLocalOrRemote)
{
/* already received results from local execution */
storeRows = false;
}
bool fetchDone = ReceiveResults(session, storeRows);
if (!fetchDone)

View File

@ -11,6 +11,7 @@
#include "libpq-fe.h"
#include "miscadmin.h"
#include "access/htup_details.h"
#include "access/xact.h"
#include "catalog/namespace.h"
#include "catalog/pg_class.h"
@ -41,6 +42,7 @@
#include "distributed/remote_commands.h"
#include "distributed/recursive_planning.h"
#include "distributed/placement_connection.h"
#include "distributed/tuple_destination.h"
#include "distributed/tuplestore.h"
#include "distributed/listutils.h"
#include "distributed/worker_protocol.h"
@ -73,6 +75,22 @@ bool ExplainAllTasks = false;
*/
static char *SavedExplainPlan = NULL;
/* struct to save explain flags */
typedef struct
{
bool verbose;
bool costs;
bool buffers;
bool timing;
bool summary;
ExplainFormat format;
} ExplainOptions;
/* EXPLAIN flags of current distributed explain */
static ExplainOptions CurrentDistributedQueryExplainOptions = {
0, 0, 0, 0, 0, EXPLAIN_FORMAT_TEXT
};
/* Result for a single remote EXPLAIN command */
typedef struct RemoteExplainPlan
@ -82,17 +100,33 @@ typedef struct RemoteExplainPlan
} RemoteExplainPlan;
/*
* ExplainAnalyzeDestination is internal representation of a TupleDestination
* which collects EXPLAIN ANALYZE output after the main query is run.
*/
typedef struct ExplainAnalyzeDestination
{
TupleDestination pub;
Task *originalTask;
TupleDestination *originalTaskDestination;
TupleDesc lastSavedExplainAnalyzeTupDesc;
} ExplainAnalyzeDestination;
/* Explain functions for distributed queries */
static void ExplainSubPlans(DistributedPlan *distributedPlan, ExplainState *es);
static void ExplainJob(Job *job, ExplainState *es);
static void ExplainMapMergeJob(MapMergeJob *mapMergeJob, ExplainState *es);
static void ExplainTaskList(List *taskList, ExplainState *es);
static RemoteExplainPlan * RemoteExplain(Task *task, ExplainState *es);
static RemoteExplainPlan * GetSavedRemoteExplain(Task *task, ExplainState *es);
static RemoteExplainPlan * FetchRemoteExplainFromWorkers(Task *task, ExplainState *es);
static void ExplainTask(Task *task, int placementIndex, List *explainOutputList,
ExplainState *es);
static void ExplainTaskPlacement(ShardPlacement *taskPlacement, List *explainOutputList,
ExplainState *es);
static StringInfo BuildRemoteExplainQuery(char *queryString, ExplainState *es);
static const char * ExplainFormatStr(ExplainFormat format);
static void ExplainWorkerPlan(PlannedStmt *plannedStmt, DestReceiver *dest,
ExplainState *es,
const char *queryString, ParamListInfo params,
@ -102,6 +136,15 @@ static bool ExtractFieldBoolean(Datum jsonbDoc, const char *fieldName, bool defa
static ExplainFormat ExtractFieldExplainFormat(Datum jsonbDoc, const char *fieldName,
ExplainFormat defaultValue);
static bool ExtractFieldJsonbDatum(Datum jsonbDoc, const char *fieldName, Datum *result);
static TupleDestination * CreateExplainAnlyzeDestination(Task *task,
TupleDestination *taskDest);
static void ExplainAnalyzeDestPutTuple(TupleDestination *self, Task *task,
int placementIndex, int queryNumber,
HeapTuple heapTuple);
static TupleDesc ExplainAnalyzeDestTupleDescForQuery(TupleDestination *self, int
queryNumber);
static char * WrapQueryForExplainAnalyze(const char *queryString, TupleDesc tupleDesc);
static List * SplitString(const char *str, char delimiter);
/* Static Explain functions copied from explain.c */
static void ExplainOneQuery(Query *query, int cursorOptions,
@ -409,18 +452,72 @@ ExplainTaskList(List *taskList, ExplainState *es)
/*
* RemoteExplain fetches the remote EXPLAIN output for a single
* task. It tries each shard placement until one succeeds or all
* failed.
* RemoteExplain fetches the remote EXPLAIN output for a single task.
*/
static RemoteExplainPlan *
RemoteExplain(Task *task, ExplainState *es)
{
/*
* For EXPLAIN EXECUTE we still use the old method, so task->fetchedExplainAnalyzePlan
* can be NULL for some cases of es->analyze == true.
*/
if (es->analyze && task->fetchedExplainAnalyzePlan)
{
return GetSavedRemoteExplain(task, es);
}
else
{
return FetchRemoteExplainFromWorkers(task, es);
}
}
/*
* GetSavedRemoteExplain creates a remote EXPLAIN output from information saved
* in task.
*/
static RemoteExplainPlan *
GetSavedRemoteExplain(Task *task, ExplainState *es)
{
RemoteExplainPlan *remotePlan = (RemoteExplainPlan *) palloc0(
sizeof(RemoteExplainPlan));
/*
* Similar to postgres' ExplainQuery(), we split by newline only for
* text format.
*/
if (es->format == EXPLAIN_FORMAT_TEXT)
{
remotePlan->explainOutputList = SplitString(task->fetchedExplainAnalyzePlan,
'\n');
}
else
{
StringInfo explainAnalyzeString = makeStringInfo();
appendStringInfoString(explainAnalyzeString, task->fetchedExplainAnalyzePlan);
remotePlan->explainOutputList = list_make1(explainAnalyzeString);
}
remotePlan->placementIndex = task->fetchedExplainAnalyzePlacementIndex;
return remotePlan;
}
/*
* FetchRemoteExplainFromWorkers fetches the remote EXPLAIN output for a single
* task by querying it from worker nodes. It tries each shard placement until
* one succeeds or all failed.
*/
static RemoteExplainPlan *
FetchRemoteExplainFromWorkers(Task *task, ExplainState *es)
{
List *taskPlacementList = task->taskPlacementList;
int placementCount = list_length(taskPlacementList);
RemoteExplainPlan *remotePlan = (RemoteExplainPlan *) palloc0(
sizeof(RemoteExplainPlan));
StringInfo explainQuery = BuildRemoteExplainQuery(TaskQueryStringForAllPlacements(
task),
es);
@ -615,34 +712,7 @@ static StringInfo
BuildRemoteExplainQuery(char *queryString, ExplainState *es)
{
StringInfo explainQuery = makeStringInfo();
char *formatStr = NULL;
switch (es->format)
{
case EXPLAIN_FORMAT_XML:
{
formatStr = "XML";
break;
}
case EXPLAIN_FORMAT_JSON:
{
formatStr = "JSON";
break;
}
case EXPLAIN_FORMAT_YAML:
{
formatStr = "YAML";
break;
}
default:
{
formatStr = "TEXT";
break;
}
}
const char *formatStr = ExplainFormatStr(es->format);
appendStringInfo(explainQuery,
"EXPLAIN (ANALYZE %s, VERBOSE %s, "
@ -661,6 +731,37 @@ BuildRemoteExplainQuery(char *queryString, ExplainState *es)
}
/*
* ExplainFormatStr converts the given explain format to string.
*/
static const char *
ExplainFormatStr(ExplainFormat format)
{
switch (format)
{
case EXPLAIN_FORMAT_XML:
{
return "XML";
}
case EXPLAIN_FORMAT_JSON:
{
return "JSON";
}
case EXPLAIN_FORMAT_YAML:
{
return "YAML";
}
default:
{
return "TEXT";
}
}
}
/*
* worker_last_saved_explain_analyze returns the last saved EXPLAIN ANALYZE output of
* a worker task query. It returns NULL if nothing has been saved yet.
@ -875,6 +976,289 @@ ExtractFieldJsonbDatum(Datum jsonbDoc, const char *fieldName, Datum *result)
}
/*
* CitusExplainOneQuery is the executor hook that is called when
* postgres wants to explain a query.
*/
void
CitusExplainOneQuery(Query *query, int cursorOptions, IntoClause *into,
ExplainState *es, const char *queryString, ParamListInfo params,
QueryEnvironment *queryEnv)
{
/* save the flags of current EXPLAIN command */
CurrentDistributedQueryExplainOptions.costs = es->costs;
CurrentDistributedQueryExplainOptions.buffers = es->buffers;
CurrentDistributedQueryExplainOptions.verbose = es->verbose;
CurrentDistributedQueryExplainOptions.summary = es->summary;
CurrentDistributedQueryExplainOptions.timing = es->timing;
CurrentDistributedQueryExplainOptions.format = es->format;
/* rest is copied from ExplainOneQuery() */
instr_time planstart,
planduration;
INSTR_TIME_SET_CURRENT(planstart);
/* plan the query */
PlannedStmt *plan = pg_plan_query(query, cursorOptions, params);
INSTR_TIME_SET_CURRENT(planduration);
INSTR_TIME_SUBTRACT(planduration, planstart);
/* run it (if needed) and produce output */
ExplainOnePlan(plan, into, es, queryString, params, queryEnv,
&planduration);
}
/*
* CreateExplainAnlyzeDestination creates a destination suitable for collecting
* explain analyze output from workers.
*/
static TupleDestination *
CreateExplainAnlyzeDestination(Task *task, TupleDestination *taskDest)
{
ExplainAnalyzeDestination *tupleDestination = palloc0(
sizeof(ExplainAnalyzeDestination));
tupleDestination->originalTask = task;
tupleDestination->originalTaskDestination = taskDest;
#if PG_VERSION_NUM >= PG_VERSION_12
TupleDesc lastSavedExplainAnalyzeTupDesc = CreateTemplateTupleDesc(1);
#else
TupleDesc lastSavedExplainAnalyzeTupDesc = CreateTemplateTupleDesc(1, false);
#endif
TupleDescInitEntry(lastSavedExplainAnalyzeTupDesc, 1, "explain analyze", TEXTOID, 0,
0);
tupleDestination->lastSavedExplainAnalyzeTupDesc = lastSavedExplainAnalyzeTupDesc;
tupleDestination->pub.putTuple = ExplainAnalyzeDestPutTuple;
tupleDestination->pub.tupleDescForQuery = ExplainAnalyzeDestTupleDescForQuery;
return (TupleDestination *) tupleDestination;
}
/*
* ExplainAnalyzeDestPutTuple implements TupleDestination->putTuple
* for ExplainAnalyzeDestination.
*/
static void
ExplainAnalyzeDestPutTuple(TupleDestination *self, Task *task,
int placementIndex, int queryNumber,
HeapTuple heapTuple)
{
ExplainAnalyzeDestination *tupleDestination = (ExplainAnalyzeDestination *) self;
if (queryNumber == 0)
{
TupleDestination *originalTupDest = tupleDestination->originalTaskDestination;
originalTupDest->putTuple(originalTupDest, task, placementIndex, 0, heapTuple);
}
else if (queryNumber == 1)
{
bool isNull = false;
TupleDesc tupDesc = tupleDestination->lastSavedExplainAnalyzeTupDesc;
Datum explainAnalyze = heap_getattr(heapTuple, 1, tupDesc, &isNull);
if (isNull)
{
ereport(WARNING, (errmsg(
"received null explain analyze output from worker")));
return;
}
char *fetchedExplainAnalyzePlan = TextDatumGetCString(explainAnalyze);
tupleDestination->originalTask->fetchedExplainAnalyzePlan =
pstrdup(fetchedExplainAnalyzePlan);
tupleDestination->originalTask->fetchedExplainAnalyzePlacementIndex =
placementIndex;
}
else
{
ereport(ERROR, (errmsg("cannot get EXPLAIN ANALYZE of multiple queries"),
errdetail("while receiving tuples for query %d", queryNumber)));
}
}
/*
* ExplainAnalyzeDestTupleDescForQuery implements TupleDestination->tupleDescForQuery
* for ExplainAnalyzeDestination.
*/
static TupleDesc
ExplainAnalyzeDestTupleDescForQuery(TupleDestination *self, int queryNumber)
{
ExplainAnalyzeDestination *tupleDestination = (ExplainAnalyzeDestination *) self;
if (queryNumber == 0)
{
TupleDestination *originalTupDest = tupleDestination->originalTaskDestination;
return originalTupDest->tupleDescForQuery(originalTupDest, 0);
}
else if (queryNumber == 1)
{
return tupleDestination->lastSavedExplainAnalyzeTupDesc;
}
ereport(ERROR, (errmsg("cannot get EXPLAIN ANALYZE of multiple queries"),
errdetail("while requesting for tuple descriptor of query %d",
queryNumber)));
return NULL;
}
/*
* RequestedForExplainAnalyze returns true if we should get the EXPLAIN ANALYZE
* output for the given custom scan node.
*/
bool
RequestedForExplainAnalyze(CitusScanState *node)
{
return (node->customScanState.ss.ps.state->es_instrument != 0);
}
/*
* ExplainAnalyzeTaskList returns a task list suitable for explain analyze. After executing
* these tasks, fetchedExplainAnalyzePlan of originalTaskList should be populated.
*/
List *
ExplainAnalyzeTaskList(List *originalTaskList,
TupleDestination *defaultTupleDest,
TupleDesc tupleDesc,
ParamListInfo params)
{
List *explainAnalyzeTaskList = NIL;
Task *originalTask = NULL;
/*
* We cannot use multiple commands in a prepared statement, so use the old
* EXPLAIN ANALYZE method for this case.
*/
if (params != NULL)
{
return originalTaskList;
}
foreach_ptr(originalTask, originalTaskList)
{
if (originalTask->queryCount != 1)
{
ereport(ERROR, (errmsg("cannot get EXPLAIN ANALYZE of multiple queries")));
}
Task *explainAnalyzeTask = copyObject(originalTask);
const char *queryString = TaskQueryStringForAllPlacements(explainAnalyzeTask);
char *wrappedQuery = WrapQueryForExplainAnalyze(queryString, tupleDesc);
char *fetchQuery = "SELECT worker_last_saved_explain_analyze()";
SetTaskQueryStringList(explainAnalyzeTask, list_make2(wrappedQuery, fetchQuery));
TupleDestination *originalTaskDest = originalTask->tupleDest ?
originalTask->tupleDest :
defaultTupleDest;
explainAnalyzeTask->tupleDest =
CreateExplainAnlyzeDestination(originalTask, originalTaskDest);
explainAnalyzeTaskList = lappend(explainAnalyzeTaskList, explainAnalyzeTask);
}
return explainAnalyzeTaskList;
}
/*
* WrapQueryForExplainAnalyze wraps a query into a worker_save_query_explain_analyze()
* call so we can fetch its explain analyze after its execution.
*/
static char *
WrapQueryForExplainAnalyze(const char *queryString, TupleDesc tupleDesc)
{
StringInfo columnDef = makeStringInfo();
for (int columnIndex = 0; columnIndex < tupleDesc->natts; columnIndex++)
{
if (columnIndex != 0)
{
appendStringInfoString(columnDef, ", ");
}
Form_pg_attribute attr = &tupleDesc->attrs[columnIndex];
char *attrType = format_type_with_typemod(attr->atttypid, attr->atttypmod);
appendStringInfo(columnDef, "field_%d %s", columnIndex, attrType);
}
/*
* column definition cannot be empty, so create a dummy column definition for
* queries with no results.
*/
if (tupleDesc->natts == 0)
{
appendStringInfo(columnDef, "dummy_field int");
}
StringInfo explainOptions = makeStringInfo();
appendStringInfo(explainOptions, "{\"verbose\": %s, \"costs\": %s, \"buffers\": %s, "
"\"timing\": %s, \"summary\": %s, \"format\": \"%s\"}",
CurrentDistributedQueryExplainOptions.verbose ? "true" : "false",
CurrentDistributedQueryExplainOptions.costs ? "true" : "false",
CurrentDistributedQueryExplainOptions.buffers ? "true" : "false",
CurrentDistributedQueryExplainOptions.timing ? "true" : "false",
CurrentDistributedQueryExplainOptions.summary ? "true" : "false",
ExplainFormatStr(CurrentDistributedQueryExplainOptions.format));
StringInfo wrappedQuery = makeStringInfo();
appendStringInfo(wrappedQuery,
"SELECT * FROM worker_save_query_explain_analyze(%s, %s) AS (%s)",
quote_literal_cstr(queryString),
quote_literal_cstr(explainOptions->data),
columnDef->data);
return wrappedQuery->data;
}
/*
* SplitString splits the given string by the given delimiter.
*
* Why not use strtok_s()? Its signature and semantics are difficult to understand.
*
* Why not use strchr() (similar to do_text_output_multiline)? Although not banned,
* it isn't safe if by any chance str is not null-terminated.
*/
static List *
SplitString(const char *str, char delimiter)
{
size_t len = strnlen_s(str, RSIZE_MAX_STR);
if (len == 0)
{
return NIL;
}
List *tokenList = NIL;
StringInfo token = makeStringInfo();
for (size_t index = 0; index < len; index++)
{
if (str[index] == delimiter)
{
tokenList = lappend(tokenList, token);
token = makeStringInfo();
}
else
{
appendStringInfoChar(token, str[index]);
}
}
/* append last token */
tokenList = lappend(tokenList, token);
return tokenList;
}
/* below are private functions copied from explain.c */

View File

@ -221,7 +221,8 @@ _PG_init(void)
* duties. For simplicity insist that all hooks are previously unused.
*/
if (planner_hook != NULL || ProcessUtility_hook != NULL ||
ExecutorStart_hook != NULL || ExecutorRun_hook != NULL)
ExecutorStart_hook != NULL || ExecutorRun_hook != NULL ||
ExplainOneQuery_hook != NULL)
{
ereport(ERROR, (errmsg("Citus has to be loaded first"),
errhint("Place citus at the beginning of "
@ -267,6 +268,7 @@ _PG_init(void)
set_join_pathlist_hook = multi_join_restriction_hook;
ExecutorStart_hook = CitusExecutorStart;
ExecutorRun_hook = CitusExecutorRun;
ExplainOneQuery_hook = CitusExplainOneQuery;
/* register hook for error messages */
emit_log_hook = multi_log_hook;

View File

@ -329,6 +329,8 @@ CopyNodeTask(COPYFUNC_ARGS)
COPY_SCALAR_FIELD(parametersInQueryStringResolved);
COPY_SCALAR_FIELD(tupleDest);
COPY_SCALAR_FIELD(queryCount);
COPY_SCALAR_FIELD(fetchedExplainAnalyzePlacementIndex);
COPY_STRING_FIELD(fetchedExplainAnalyzePlan);
}

View File

@ -11,11 +11,20 @@
#define MULTI_EXPLAIN_H
#include "executor/executor.h"
#include "tuple_destination.h"
/* Config variables managed via guc.c to explain distributed query plans */
extern bool ExplainDistributedQueries;
extern bool ExplainAllTasks;
extern void FreeSavedExplainPlan(void);
extern void CitusExplainOneQuery(Query *query, int cursorOptions, IntoClause *into,
ExplainState *es, const char *queryString, ParamListInfo
params,
QueryEnvironment *queryEnv);
extern List * ExplainAnalyzeTaskList(List *originalTaskList,
TupleDestination *defaultTupleDest, TupleDesc
tupleDesc, ParamListInfo params);
extern bool RequestedForExplainAnalyze(CitusScanState *node);
#endif /* MULTI_EXPLAIN_H */

View File

@ -337,6 +337,13 @@ typedef struct Task
* NULL, in which case executor might use a default destination.
*/
struct TupleDestination *tupleDest;
/*
* EXPLAIN ANALYZE output fetched from worker. This is saved to be used later
* by RemoteExplain().
*/
char *fetchedExplainAnalyzePlan;
int fetchedExplainAnalyzePlacementIndex;
} Task;

View File

@ -327,10 +327,11 @@ EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) DELETE FROM distributed_ta
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> Delete on distributed_table_1470001 distributed_table (actual rows=0 loops=1)
-> Index Scan using distributed_table_pkey_1470001 on distributed_table_1470001 distributed_table (actual rows=0 loops=1)
-> Index Scan using distributed_table_pkey_1470001 on distributed_table_1470001 distributed_table (actual rows=1 loops=1)
Index Cond: (key = 1)
Filter: (age = 20)
(9 rows)
Trigger for constraint second_distributed_table_key_fkey_1470005: calls=1
(10 rows)
-- show that EXPLAIN ANALYZE deleted the row and cascades deletes
SELECT * FROM distributed_table WHERE key = 1 AND age = 20 ORDER BY 1,2,3;

View File

@ -1377,7 +1377,7 @@ t
--
\a\t
\set default_opts '''{"costs": false, "timing": false, "summary": false}'''::jsonb
CREATE TABLE explain_analyze_test(a int, b text);;
CREATE TABLE explain_analyze_test(a int, b text);
INSERT INTO explain_analyze_test VALUES (1, 'value 1'), (2, 'value 2'), (3, 'value 3'), (4, 'value 4');
-- simple select
BEGIN;
@ -1690,7 +1690,7 @@ SELECT worker_last_saved_explain_analyze() IS NULL;
-- should be deleted at the end of prepare commit
BEGIN;
SELECT * FROM worker_save_query_explain_analyze('UPDATE explain_analyze_test SET a=1', '{}') as (a int);
SELECT * FROM worker_save_query_explain_analyze('UPDATE explain_analyze_test SET a=6 WHERE a=4', '{}') as (a int);
a
---------------------------------------------------------------------
(0 rows)
@ -1709,3 +1709,367 @@ SELECT worker_last_saved_explain_analyze() IS NULL;
(1 row)
COMMIT PREPARED 'citus_0_1496350_7_0';
SELECT * FROM explain_analyze_test ORDER BY a;
a | b
---------------------------------------------------------------------
1 | value 1
2 | value 2
3 | value 3
6 | value 4
(4 rows)
\a\t
--
-- Test different cases of EXPLAIN ANALYZE
--
SET citus.shard_count TO 4;
SET client_min_messages TO WARNING;
SELECT create_distributed_table('explain_analyze_test', 'a');
\set default_analyze_flags '(ANALYZE on, COSTS off, TIMING off, SUMMARY off)'
-- router SELECT
EXPLAIN :default_analyze_flags SELECT * FROM explain_analyze_test WHERE a = 1;
Custom Scan (Citus Adaptive) (actual rows=1 loops=1)
Task Count: 1
Tasks Shown: All
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> Seq Scan on explain_analyze_test_570009 explain_analyze_test (actual rows=1 loops=1)
Filter: (a = 1)
-- multi-shard SELECT
EXPLAIN :default_analyze_flags SELECT count(*) FROM explain_analyze_test;
Aggregate (actual rows=1 loops=1)
-> Custom Scan (Citus Adaptive) (actual rows=4 loops=1)
Task Count: 4
Tasks Shown: One of 4
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> Aggregate (actual rows=1 loops=1)
-> Seq Scan on explain_analyze_test_570009 explain_analyze_test (actual rows=1 loops=1)
-- router DML
BEGIN;
EXPLAIN :default_analyze_flags DELETE FROM explain_analyze_test WHERE a = 1;
Custom Scan (Citus Adaptive) (actual rows=0 loops=1)
Task Count: 1
Tasks Shown: All
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> Delete on explain_analyze_test_570009 explain_analyze_test (actual rows=0 loops=1)
-> Seq Scan on explain_analyze_test_570009 explain_analyze_test (actual rows=1 loops=1)
Filter: (a = 1)
EXPLAIN :default_analyze_flags UPDATE explain_analyze_test SET b = 'b' WHERE a = 2;
Custom Scan (Citus Adaptive) (actual rows=0 loops=1)
Task Count: 1
Tasks Shown: All
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> Update on explain_analyze_test_570012 explain_analyze_test (actual rows=0 loops=1)
-> Seq Scan on explain_analyze_test_570012 explain_analyze_test (actual rows=1 loops=1)
Filter: (a = 2)
SELECT * FROM explain_analyze_test ORDER BY a;
2|b
3|value 3
6|value 4
ROLLBACK;
-- multi-shard DML
BEGIN;
EXPLAIN :default_analyze_flags UPDATE explain_analyze_test SET b = 'b' WHERE a IN (1, 2);
Custom Scan (Citus Adaptive) (actual rows=0 loops=1)
Task Count: 2
Tasks Shown: One of 2
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> Update on explain_analyze_test_570009 explain_analyze_test (actual rows=0 loops=1)
-> Seq Scan on explain_analyze_test_570009 explain_analyze_test (actual rows=1 loops=1)
Filter: (a = ANY ('{1,2}'::integer[]))
EXPLAIN :default_analyze_flags DELETE FROM explain_analyze_test;
Custom Scan (Citus Adaptive) (actual rows=0 loops=1)
Task Count: 4
Tasks Shown: One of 4
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> Delete on explain_analyze_test_570009 explain_analyze_test (actual rows=0 loops=1)
-> Seq Scan on explain_analyze_test_570009 explain_analyze_test (actual rows=1 loops=1)
SELECT * FROM explain_analyze_test ORDER BY a;
ROLLBACK;
-- single-row insert
BEGIN;
EXPLAIN :default_analyze_flags INSERT INTO explain_analyze_test VALUES (5, 'value 5');
Custom Scan (Citus Adaptive) (actual rows=0 loops=1)
Task Count: 1
Tasks Shown: All
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> Insert on explain_analyze_test_570009 (actual rows=0 loops=1)
-> Result (actual rows=1 loops=1)
ROLLBACK;
-- multi-row insert
BEGIN;
EXPLAIN :default_analyze_flags INSERT INTO explain_analyze_test VALUES (5, 'value 5'), (6, 'value 6');
Custom Scan (Citus Adaptive) (actual rows=0 loops=1)
Task Count: 2
Tasks Shown: One of 2
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> Insert on explain_analyze_test_570009 citus_table_alias (actual rows=0 loops=1)
-> Result (actual rows=1 loops=1)
ROLLBACK;
-- distributed insert/select
BEGIN;
EXPLAIN :default_analyze_flags INSERT INTO explain_analyze_test SELECT * FROM explain_analyze_test;
Custom Scan (Citus Adaptive) (actual rows=0 loops=1)
Task Count: 4
Tasks Shown: One of 4
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> Insert on explain_analyze_test_570009 citus_table_alias (actual rows=0 loops=1)
-> Seq Scan on explain_analyze_test_570009 explain_analyze_test (actual rows=1 loops=1)
Filter: ((worker_hash(a) >= '-2147483648'::integer) AND (worker_hash(a) <= '-1073741825'::integer))
ROLLBACK;
DROP TABLE explain_analyze_test;
-- test EXPLAIN ANALYZE works fine with primary keys
CREATE TABLE explain_pk(a int primary key, b int);
SELECT create_distributed_table('explain_pk', 'a');
BEGIN;
EXPLAIN :default_analyze_flags INSERT INTO explain_pk VALUES (1, 2), (2, 3);
Custom Scan (Citus Adaptive) (actual rows=0 loops=1)
Task Count: 2
Tasks Shown: One of 2
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> Insert on explain_pk_570013 citus_table_alias (actual rows=0 loops=1)
-> Result (actual rows=1 loops=1)
SELECT * FROM explain_pk ORDER BY 1;
1|2
2|3
ROLLBACK;
-- test EXPLAIN ANALYZE with non-text output formats
BEGIN;
EXPLAIN (COSTS off, ANALYZE on, TIMING off, SUMMARY off, FORMAT JSON) INSERT INTO explain_pk VALUES (1, 2), (2, 3);
[
{
"Plan": {
"Node Type": "Custom Scan",
"Custom Plan Provider": "Citus Adaptive",
"Parallel Aware": false,
"Actual Rows": 0,
"Actual Loops": 1,
"Distributed Query": {
"Job": {
"Task Count": 2,
"Tasks Shown": "One of 2",
"Tasks": [
{
"Node": "host=localhost port=xxxxx dbname=regression",
"Remote Plan": [
[
{
"Plan": {
"Node Type": "ModifyTable",
"Operation": "Insert",
"Parallel Aware": false,
"Relation Name": "explain_pk_570013",
"Alias": "citus_table_alias",
"Actual Rows": 0,
"Actual Loops": 1,
"Plans": [
{
"Node Type": "Result",
"Parent Relationship": "Member",
"Parallel Aware": false,
"Actual Rows": 1,
"Actual Loops": 1
}
]
},
"Triggers": [
]
}
]
]
}
]
}
}
},
"Triggers": [
]
}
]
ROLLBACK;
BEGIN;
EXPLAIN (COSTS off, ANALYZE on, TIMING off, SUMMARY off, FORMAT XML) INSERT INTO explain_pk VALUES (1, 2), (2, 3);
<explain xmlns="http://www.postgresql.org/2009/explain">
<Query>
<Plan>
<Node-Type>Custom Scan</Node-Type>
<Custom-Plan-Provider>Citus Adaptive</Custom-Plan-Provider>
<Parallel-Aware>false</Parallel-Aware>
<Actual-Rows>0</Actual-Rows>
<Actual-Loops>1</Actual-Loops>
<Distributed-Query>
<Job>
<Task-Count>2</Task-Count>
<Tasks-Shown>One of 2</Tasks-Shown>
<Tasks>
<Task>
<Node>host=localhost port=xxxxx dbname=regression</Node>
<Remote-Plan>
<explain xmlns="http://www.postgresql.org/2009/explain">
<Query>
<Plan>
<Node-Type>ModifyTable</Node-Type>
<Operation>Insert</Operation>
<Parallel-Aware>false</Parallel-Aware>
<Relation-Name>explain_pk_570013</Relation-Name>
<Alias>citus_table_alias</Alias>
<Actual-Rows>0</Actual-Rows>
<Actual-Loops>1</Actual-Loops>
<Plans>
<Plan>
<Node-Type>Result</Node-Type>
<Parent-Relationship>Member</Parent-Relationship>
<Parallel-Aware>false</Parallel-Aware>
<Actual-Rows>1</Actual-Rows>
<Actual-Loops>1</Actual-Loops>
</Plan>
</Plans>
</Plan>
<Triggers>
</Triggers>
</Query>
</explain>
</Remote-Plan>
</Task>
</Tasks>
</Job>
</Distributed-Query>
</Plan>
<Triggers>
</Triggers>
</Query>
</explain>
ROLLBACK;
DROP TABLE explain_pk;
-- test EXPLAIN ANALYZE with CTEs and subqueries
CREATE TABLE dist_table(a int, b int);
SELECT create_distributed_table('dist_table', 'a');
CREATE TABLE ref_table(a int);
SELECT create_reference_table('ref_table');
INSERT INTO dist_table SELECT i, i*i FROM generate_series(1, 10) i;
INSERT INTO ref_table SELECT i FROM generate_series(1, 10) i;
EXPLAIN :default_analyze_flags
WITH r AS (
SELECT random() r, a FROM dist_table
)
SELECT count(distinct a) from r NATURAL JOIN ref_table;
Custom Scan (Citus Adaptive) (actual rows=1 loops=1)
-> Distributed Subplan XXX_1
-> Custom Scan (Citus Adaptive) (actual rows=10 loops=1)
Task Count: 4
Tasks Shown: One of 4
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> Seq Scan on dist_table_570017 dist_table (actual rows=4 loops=1)
Task Count: 1
Tasks Shown: All
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> Aggregate (actual rows=1 loops=1)
-> Hash Join (actual rows=10 loops=1)
Hash Cond: (ref_table.a = intermediate_result.a)
-> Seq Scan on ref_table_570021 ref_table (actual rows=10 loops=1)
-> Hash (actual rows=10 loops=1)
Buckets: 1024 Batches: 1 Memory Usage: 9kB
-> Function Scan on read_intermediate_result intermediate_result (actual rows=10 loops=1)
EXPLAIN :default_analyze_flags
SELECT count(distinct a) FROM (SELECT random() r, a FROM dist_table) t NATURAL JOIN ref_table;
Aggregate (actual rows=1 loops=1)
-> Custom Scan (Citus Adaptive) (actual rows=10 loops=1)
Task Count: 4
Tasks Shown: One of 4
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> Group (actual rows=4 loops=1)
Group Key: t.a
-> Merge Join (actual rows=4 loops=1)
Merge Cond: (t.a = ref_table.a)
-> Sort (actual rows=4 loops=1)
Sort Key: t.a
Sort Method: quicksort Memory: 25kB
-> Subquery Scan on t (actual rows=4 loops=1)
-> Seq Scan on dist_table_570017 dist_table (actual rows=4 loops=1)
-> Sort (actual rows=10 loops=1)
Sort Key: ref_table.a
Sort Method: quicksort Memory: 25kB
-> Seq Scan on ref_table_570021 ref_table (actual rows=10 loops=1)
EXPLAIN :default_analyze_flags
SELECT count(distinct a) FROM dist_table
WHERE EXISTS(SELECT random() FROM dist_table NATURAL JOIN ref_table);
Aggregate (actual rows=1 loops=1)
-> Custom Scan (Citus Adaptive) (actual rows=10 loops=1)
-> Distributed Subplan XXX_1
-> Custom Scan (Citus Adaptive) (actual rows=10 loops=1)
Task Count: 4
Tasks Shown: One of 4
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> Merge Join (actual rows=4 loops=1)
Merge Cond: (dist_table.a = ref_table.a)
-> Sort (actual rows=4 loops=1)
Sort Key: dist_table.a
Sort Method: quicksort Memory: 25kB
-> Seq Scan on dist_table_570017 dist_table (actual rows=4 loops=1)
-> Sort (actual rows=10 loops=1)
Sort Key: ref_table.a
Sort Method: quicksort Memory: 25kB
-> Seq Scan on ref_table_570021 ref_table (actual rows=10 loops=1)
Task Count: 4
Tasks Shown: One of 4
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> HashAggregate (actual rows=4 loops=1)
Group Key: dist_table.a
InitPlan 1 (returns $0)
-> Function Scan on read_intermediate_result intermediate_result (actual rows=1 loops=1)
-> Result (actual rows=4 loops=1)
One-Time Filter: $0
-> Seq Scan on dist_table_570017 dist_table (actual rows=4 loops=1)
BEGIN;
EXPLAIN :default_analyze_flags
WITH r AS (
INSERT INTO dist_table SELECT a, a * a FROM dist_table
RETURNING a
), s AS (
SELECT random(), a * a a2 FROM r
)
SELECT count(distinct a2) FROM s;
Custom Scan (Citus Adaptive) (actual rows=1 loops=1)
-> Distributed Subplan XXX_1
-> Custom Scan (Citus Adaptive) (actual rows=20 loops=1)
Task Count: 4
Tasks Shown: One of 4
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> Insert on dist_table_570017 citus_table_alias (actual rows=8 loops=1)
-> Seq Scan on dist_table_570017 dist_table (actual rows=8 loops=1)
Filter: ((worker_hash(a) >= '-2147483648'::integer) AND (worker_hash(a) <= '-1073741825'::integer))
-> Distributed Subplan XXX_2
-> Custom Scan (Citus Adaptive) (actual rows=10 loops=1)
Task Count: 1
Tasks Shown: All
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> Function Scan on read_intermediate_result intermediate_result (actual rows=10 loops=1)
Task Count: 1
Tasks Shown: All
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> Aggregate (actual rows=1 loops=1)
-> Function Scan on read_intermediate_result intermediate_result (actual rows=10 loops=1)
ROLLBACK;
DROP TABLE ref_table, dist_table;

View File

@ -620,7 +620,7 @@ $$);
\set default_opts '''{"costs": false, "timing": false, "summary": false}'''::jsonb
CREATE TABLE explain_analyze_test(a int, b text);;
CREATE TABLE explain_analyze_test(a int, b text);
INSERT INTO explain_analyze_test VALUES (1, 'value 1'), (2, 'value 2'), (3, 'value 3'), (4, 'value 4');
-- simple select
@ -727,8 +727,114 @@ SELECT worker_last_saved_explain_analyze() IS NULL;
-- should be deleted at the end of prepare commit
BEGIN;
SELECT * FROM worker_save_query_explain_analyze('UPDATE explain_analyze_test SET a=1', '{}') as (a int);
SELECT * FROM worker_save_query_explain_analyze('UPDATE explain_analyze_test SET a=6 WHERE a=4', '{}') as (a int);
SELECT worker_last_saved_explain_analyze() IS NOT NULL;
PREPARE TRANSACTION 'citus_0_1496350_7_0';
SELECT worker_last_saved_explain_analyze() IS NULL;
COMMIT PREPARED 'citus_0_1496350_7_0';
SELECT * FROM explain_analyze_test ORDER BY a;
\a\t
--
-- Test different cases of EXPLAIN ANALYZE
--
SET citus.shard_count TO 4;
SET client_min_messages TO WARNING;
SELECT create_distributed_table('explain_analyze_test', 'a');
\set default_analyze_flags '(ANALYZE on, COSTS off, TIMING off, SUMMARY off)'
-- router SELECT
EXPLAIN :default_analyze_flags SELECT * FROM explain_analyze_test WHERE a = 1;
-- multi-shard SELECT
EXPLAIN :default_analyze_flags SELECT count(*) FROM explain_analyze_test;
-- router DML
BEGIN;
EXPLAIN :default_analyze_flags DELETE FROM explain_analyze_test WHERE a = 1;
EXPLAIN :default_analyze_flags UPDATE explain_analyze_test SET b = 'b' WHERE a = 2;
SELECT * FROM explain_analyze_test ORDER BY a;
ROLLBACK;
-- multi-shard DML
BEGIN;
EXPLAIN :default_analyze_flags UPDATE explain_analyze_test SET b = 'b' WHERE a IN (1, 2);
EXPLAIN :default_analyze_flags DELETE FROM explain_analyze_test;
SELECT * FROM explain_analyze_test ORDER BY a;
ROLLBACK;
-- single-row insert
BEGIN;
EXPLAIN :default_analyze_flags INSERT INTO explain_analyze_test VALUES (5, 'value 5');
ROLLBACK;
-- multi-row insert
BEGIN;
EXPLAIN :default_analyze_flags INSERT INTO explain_analyze_test VALUES (5, 'value 5'), (6, 'value 6');
ROLLBACK;
-- distributed insert/select
BEGIN;
EXPLAIN :default_analyze_flags INSERT INTO explain_analyze_test SELECT * FROM explain_analyze_test;
ROLLBACK;
DROP TABLE explain_analyze_test;
-- test EXPLAIN ANALYZE works fine with primary keys
CREATE TABLE explain_pk(a int primary key, b int);
SELECT create_distributed_table('explain_pk', 'a');
BEGIN;
EXPLAIN :default_analyze_flags INSERT INTO explain_pk VALUES (1, 2), (2, 3);
SELECT * FROM explain_pk ORDER BY 1;
ROLLBACK;
-- test EXPLAIN ANALYZE with non-text output formats
BEGIN;
EXPLAIN (COSTS off, ANALYZE on, TIMING off, SUMMARY off, FORMAT JSON) INSERT INTO explain_pk VALUES (1, 2), (2, 3);
ROLLBACK;
BEGIN;
EXPLAIN (COSTS off, ANALYZE on, TIMING off, SUMMARY off, FORMAT XML) INSERT INTO explain_pk VALUES (1, 2), (2, 3);
ROLLBACK;
DROP TABLE explain_pk;
-- test EXPLAIN ANALYZE with CTEs and subqueries
CREATE TABLE dist_table(a int, b int);
SELECT create_distributed_table('dist_table', 'a');
CREATE TABLE ref_table(a int);
SELECT create_reference_table('ref_table');
INSERT INTO dist_table SELECT i, i*i FROM generate_series(1, 10) i;
INSERT INTO ref_table SELECT i FROM generate_series(1, 10) i;
EXPLAIN :default_analyze_flags
WITH r AS (
SELECT random() r, a FROM dist_table
)
SELECT count(distinct a) from r NATURAL JOIN ref_table;
EXPLAIN :default_analyze_flags
SELECT count(distinct a) FROM (SELECT random() r, a FROM dist_table) t NATURAL JOIN ref_table;
EXPLAIN :default_analyze_flags
SELECT count(distinct a) FROM dist_table
WHERE EXISTS(SELECT random() FROM dist_table NATURAL JOIN ref_table);
BEGIN;
EXPLAIN :default_analyze_flags
WITH r AS (
INSERT INTO dist_table SELECT a, a * a FROM dist_table
RETURNING a
), s AS (
SELECT random(), a * a a2 FROM r
)
SELECT count(distinct a2) FROM s;
ROLLBACK;
DROP TABLE ref_table, dist_table;