diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index e5f6620ec..e7269fb36 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -143,6 +143,7 @@ #include "distributed/local_executor.h" #include "distributed/multi_client_executor.h" #include "distributed/multi_executor.h" +#include "distributed/multi_explain.h" #include "distributed/multi_partitioning_utils.h" #include "distributed/multi_physical_planner.h" #include "distributed/multi_resowner.h" @@ -188,12 +189,6 @@ typedef struct DistributedExecution List *remoteTaskList; List *localTaskList; - /* - * Corresponding distributed plan returns results, - * either because it is a SELECT or has RETURNING. - */ - bool expectResults; - /* * If a task specific destination is not provided for a task, then use * defaultTupleDest. @@ -490,9 +485,6 @@ typedef struct ShardCommandExecution struct TaskPlacementExecution **placementExecutions; int placementExecutionCount; - /* whether we expect results to come back */ - bool expectResults; - /* * RETURNING results from other shard placements can be ignored * after we got results from the first placements. @@ -562,7 +554,6 @@ typedef struct TaskPlacementExecution /* local functions */ static DistributedExecution * CreateDistributedExecution(RowModifyLevel modLevel, List *taskList, - bool expectResults, ParamListInfo paramListInfo, int targetPoolSize, TupleDestination * @@ -694,6 +685,12 @@ AdaptiveExecutor(CitusScanState *scanState) TupleDestination *defaultTupleDest = CreateTupleStoreTupleDest(scanState->tuplestorestate, tupleDescriptor); + if (RequestedForExplainAnalyze(scanState)) + { + taskList = ExplainAnalyzeTaskList(taskList, defaultTupleDest, tupleDescriptor, + paramListInfo); + } + bool hasDependentJobs = HasDependentJobs(job); if (hasDependentJobs) { @@ -714,7 +711,6 @@ AdaptiveExecutor(CitusScanState *scanState) DistributedExecution *execution = CreateDistributedExecution( distributedPlan->modLevel, taskList, - distributedPlan->expectResults, paramListInfo, targetPoolSize, defaultTupleDest, @@ -1010,9 +1006,8 @@ ExecuteTaskListExtended(ExecutionParams *executionParams) DistributedExecution *execution = CreateDistributedExecution( executionParams->modLevel, remoteTaskList, - executionParams->expectResults, paramListInfo, - executionParams->targetPoolSize, defaultTupleDest, - &executionParams->xactProperties, + paramListInfo, executionParams->targetPoolSize, + defaultTupleDest, &executionParams->xactProperties, executionParams->jobIdList); StartDistributedExecution(execution); @@ -1055,7 +1050,7 @@ CreateBasicExecutionParams(RowModifyLevel modLevel, */ static DistributedExecution * CreateDistributedExecution(RowModifyLevel modLevel, List *taskList, - bool expectResults, ParamListInfo paramListInfo, + ParamListInfo paramListInfo, int targetPoolSize, TupleDestination *defaultTupleDest, TransactionProperties *xactProperties, List *jobIdList) @@ -1065,7 +1060,6 @@ CreateDistributedExecution(RowModifyLevel modLevel, List *taskList, execution->modLevel = modLevel; execution->tasksToExecute = taskList; - execution->expectResults = expectResults; execution->transactionProperties = xactProperties; execution->localTaskList = NIL; @@ -1723,7 +1717,6 @@ AssignTasksToConnectionsOrWorkerPool(DistributedExecution *execution) { RowModifyLevel modLevel = execution->modLevel; List *taskList = execution->tasksToExecute; - bool expectResults = execution->expectResults; int32 localGroupId = GetLocalGroupId(); @@ -1764,9 +1757,6 @@ AssignTasksToConnectionsOrWorkerPool(DistributedExecution *execution) NULL; } - shardCommandExecution->expectResults = - expectResults && !task->partiallyLocalOrRemote; - ShardPlacement *taskPlacement = NULL; foreach_ptr(taskPlacement, task->taskPlacementList) { @@ -3178,13 +3168,25 @@ TransactionStateMachine(WorkerSession *session) TaskPlacementExecution *placementExecution = session->currentTask; ShardCommandExecution *shardCommandExecution = placementExecution->shardCommandExecution; - bool storeRows = shardCommandExecution->expectResults; + Task *task = shardCommandExecution->task; + + /* + * In EXPLAIN ANALYZE we need to store results except for multiple placements, + * regardless of query type. In other cases, doing the same doesn't seem to have + * a drawback. + */ + bool storeRows = true; if (shardCommandExecution->gotResults) { /* already received results from another replica */ storeRows = false; } + else if (task->partiallyLocalOrRemote) + { + /* already received results from local execution */ + storeRows = false; + } bool fetchDone = ReceiveResults(session, storeRows); if (!fetchDone) diff --git a/src/backend/distributed/planner/multi_explain.c b/src/backend/distributed/planner/multi_explain.c index 7d5ffc71b..c98b91609 100644 --- a/src/backend/distributed/planner/multi_explain.c +++ b/src/backend/distributed/planner/multi_explain.c @@ -11,6 +11,7 @@ #include "libpq-fe.h" #include "miscadmin.h" +#include "access/htup_details.h" #include "access/xact.h" #include "catalog/namespace.h" #include "catalog/pg_class.h" @@ -41,6 +42,7 @@ #include "distributed/remote_commands.h" #include "distributed/recursive_planning.h" #include "distributed/placement_connection.h" +#include "distributed/tuple_destination.h" #include "distributed/tuplestore.h" #include "distributed/listutils.h" #include "distributed/worker_protocol.h" @@ -73,6 +75,22 @@ bool ExplainAllTasks = false; */ static char *SavedExplainPlan = NULL; +/* struct to save explain flags */ +typedef struct +{ + bool verbose; + bool costs; + bool buffers; + bool timing; + bool summary; + ExplainFormat format; +} ExplainOptions; + + +/* EXPLAIN flags of current distributed explain */ +static ExplainOptions CurrentDistributedQueryExplainOptions = { + 0, 0, 0, 0, 0, EXPLAIN_FORMAT_TEXT +}; /* Result for a single remote EXPLAIN command */ typedef struct RemoteExplainPlan @@ -82,17 +100,33 @@ typedef struct RemoteExplainPlan } RemoteExplainPlan; +/* + * ExplainAnalyzeDestination is internal representation of a TupleDestination + * which collects EXPLAIN ANALYZE output after the main query is run. + */ +typedef struct ExplainAnalyzeDestination +{ + TupleDestination pub; + Task *originalTask; + TupleDestination *originalTaskDestination; + TupleDesc lastSavedExplainAnalyzeTupDesc; +} ExplainAnalyzeDestination; + + /* Explain functions for distributed queries */ static void ExplainSubPlans(DistributedPlan *distributedPlan, ExplainState *es); static void ExplainJob(Job *job, ExplainState *es); static void ExplainMapMergeJob(MapMergeJob *mapMergeJob, ExplainState *es); static void ExplainTaskList(List *taskList, ExplainState *es); static RemoteExplainPlan * RemoteExplain(Task *task, ExplainState *es); +static RemoteExplainPlan * GetSavedRemoteExplain(Task *task, ExplainState *es); +static RemoteExplainPlan * FetchRemoteExplainFromWorkers(Task *task, ExplainState *es); static void ExplainTask(Task *task, int placementIndex, List *explainOutputList, ExplainState *es); static void ExplainTaskPlacement(ShardPlacement *taskPlacement, List *explainOutputList, ExplainState *es); static StringInfo BuildRemoteExplainQuery(char *queryString, ExplainState *es); +static const char * ExplainFormatStr(ExplainFormat format); static void ExplainWorkerPlan(PlannedStmt *plannedStmt, DestReceiver *dest, ExplainState *es, const char *queryString, ParamListInfo params, @@ -102,6 +136,15 @@ static bool ExtractFieldBoolean(Datum jsonbDoc, const char *fieldName, bool defa static ExplainFormat ExtractFieldExplainFormat(Datum jsonbDoc, const char *fieldName, ExplainFormat defaultValue); static bool ExtractFieldJsonbDatum(Datum jsonbDoc, const char *fieldName, Datum *result); +static TupleDestination * CreateExplainAnlyzeDestination(Task *task, + TupleDestination *taskDest); +static void ExplainAnalyzeDestPutTuple(TupleDestination *self, Task *task, + int placementIndex, int queryNumber, + HeapTuple heapTuple); +static TupleDesc ExplainAnalyzeDestTupleDescForQuery(TupleDestination *self, int + queryNumber); +static char * WrapQueryForExplainAnalyze(const char *queryString, TupleDesc tupleDesc); +static List * SplitString(const char *str, char delimiter); /* Static Explain functions copied from explain.c */ static void ExplainOneQuery(Query *query, int cursorOptions, @@ -409,18 +452,72 @@ ExplainTaskList(List *taskList, ExplainState *es) /* - * RemoteExplain fetches the remote EXPLAIN output for a single - * task. It tries each shard placement until one succeeds or all - * failed. + * RemoteExplain fetches the remote EXPLAIN output for a single task. */ static RemoteExplainPlan * RemoteExplain(Task *task, ExplainState *es) +{ + /* + * For EXPLAIN EXECUTE we still use the old method, so task->fetchedExplainAnalyzePlan + * can be NULL for some cases of es->analyze == true. + */ + if (es->analyze && task->fetchedExplainAnalyzePlan) + { + return GetSavedRemoteExplain(task, es); + } + else + { + return FetchRemoteExplainFromWorkers(task, es); + } +} + + +/* + * GetSavedRemoteExplain creates a remote EXPLAIN output from information saved + * in task. + */ +static RemoteExplainPlan * +GetSavedRemoteExplain(Task *task, ExplainState *es) +{ + RemoteExplainPlan *remotePlan = (RemoteExplainPlan *) palloc0( + sizeof(RemoteExplainPlan)); + + /* + * Similar to postgres' ExplainQuery(), we split by newline only for + * text format. + */ + if (es->format == EXPLAIN_FORMAT_TEXT) + { + remotePlan->explainOutputList = SplitString(task->fetchedExplainAnalyzePlan, + '\n'); + } + else + { + StringInfo explainAnalyzeString = makeStringInfo(); + appendStringInfoString(explainAnalyzeString, task->fetchedExplainAnalyzePlan); + remotePlan->explainOutputList = list_make1(explainAnalyzeString); + } + + remotePlan->placementIndex = task->fetchedExplainAnalyzePlacementIndex; + + return remotePlan; +} + + +/* + * FetchRemoteExplainFromWorkers fetches the remote EXPLAIN output for a single + * task by querying it from worker nodes. It tries each shard placement until + * one succeeds or all failed. + */ +static RemoteExplainPlan * +FetchRemoteExplainFromWorkers(Task *task, ExplainState *es) { List *taskPlacementList = task->taskPlacementList; int placementCount = list_length(taskPlacementList); RemoteExplainPlan *remotePlan = (RemoteExplainPlan *) palloc0( sizeof(RemoteExplainPlan)); + StringInfo explainQuery = BuildRemoteExplainQuery(TaskQueryStringForAllPlacements( task), es); @@ -615,34 +712,7 @@ static StringInfo BuildRemoteExplainQuery(char *queryString, ExplainState *es) { StringInfo explainQuery = makeStringInfo(); - char *formatStr = NULL; - - switch (es->format) - { - case EXPLAIN_FORMAT_XML: - { - formatStr = "XML"; - break; - } - - case EXPLAIN_FORMAT_JSON: - { - formatStr = "JSON"; - break; - } - - case EXPLAIN_FORMAT_YAML: - { - formatStr = "YAML"; - break; - } - - default: - { - formatStr = "TEXT"; - break; - } - } + const char *formatStr = ExplainFormatStr(es->format); appendStringInfo(explainQuery, "EXPLAIN (ANALYZE %s, VERBOSE %s, " @@ -661,6 +731,37 @@ BuildRemoteExplainQuery(char *queryString, ExplainState *es) } +/* + * ExplainFormatStr converts the given explain format to string. + */ +static const char * +ExplainFormatStr(ExplainFormat format) +{ + switch (format) + { + case EXPLAIN_FORMAT_XML: + { + return "XML"; + } + + case EXPLAIN_FORMAT_JSON: + { + return "JSON"; + } + + case EXPLAIN_FORMAT_YAML: + { + return "YAML"; + } + + default: + { + return "TEXT"; + } + } +} + + /* * worker_last_saved_explain_analyze returns the last saved EXPLAIN ANALYZE output of * a worker task query. It returns NULL if nothing has been saved yet. @@ -875,6 +976,289 @@ ExtractFieldJsonbDatum(Datum jsonbDoc, const char *fieldName, Datum *result) } +/* + * CitusExplainOneQuery is the executor hook that is called when + * postgres wants to explain a query. + */ +void +CitusExplainOneQuery(Query *query, int cursorOptions, IntoClause *into, + ExplainState *es, const char *queryString, ParamListInfo params, + QueryEnvironment *queryEnv) +{ + /* save the flags of current EXPLAIN command */ + CurrentDistributedQueryExplainOptions.costs = es->costs; + CurrentDistributedQueryExplainOptions.buffers = es->buffers; + CurrentDistributedQueryExplainOptions.verbose = es->verbose; + CurrentDistributedQueryExplainOptions.summary = es->summary; + CurrentDistributedQueryExplainOptions.timing = es->timing; + CurrentDistributedQueryExplainOptions.format = es->format; + + /* rest is copied from ExplainOneQuery() */ + instr_time planstart, + planduration; + + INSTR_TIME_SET_CURRENT(planstart); + + /* plan the query */ + PlannedStmt *plan = pg_plan_query(query, cursorOptions, params); + + INSTR_TIME_SET_CURRENT(planduration); + INSTR_TIME_SUBTRACT(planduration, planstart); + + /* run it (if needed) and produce output */ + ExplainOnePlan(plan, into, es, queryString, params, queryEnv, + &planduration); +} + + +/* + * CreateExplainAnlyzeDestination creates a destination suitable for collecting + * explain analyze output from workers. + */ +static TupleDestination * +CreateExplainAnlyzeDestination(Task *task, TupleDestination *taskDest) +{ + ExplainAnalyzeDestination *tupleDestination = palloc0( + sizeof(ExplainAnalyzeDestination)); + tupleDestination->originalTask = task; + tupleDestination->originalTaskDestination = taskDest; + +#if PG_VERSION_NUM >= PG_VERSION_12 + TupleDesc lastSavedExplainAnalyzeTupDesc = CreateTemplateTupleDesc(1); +#else + TupleDesc lastSavedExplainAnalyzeTupDesc = CreateTemplateTupleDesc(1, false); +#endif + + TupleDescInitEntry(lastSavedExplainAnalyzeTupDesc, 1, "explain analyze", TEXTOID, 0, + 0); + tupleDestination->lastSavedExplainAnalyzeTupDesc = lastSavedExplainAnalyzeTupDesc; + + tupleDestination->pub.putTuple = ExplainAnalyzeDestPutTuple; + tupleDestination->pub.tupleDescForQuery = ExplainAnalyzeDestTupleDescForQuery; + + return (TupleDestination *) tupleDestination; +} + + +/* + * ExplainAnalyzeDestPutTuple implements TupleDestination->putTuple + * for ExplainAnalyzeDestination. + */ +static void +ExplainAnalyzeDestPutTuple(TupleDestination *self, Task *task, + int placementIndex, int queryNumber, + HeapTuple heapTuple) +{ + ExplainAnalyzeDestination *tupleDestination = (ExplainAnalyzeDestination *) self; + if (queryNumber == 0) + { + TupleDestination *originalTupDest = tupleDestination->originalTaskDestination; + originalTupDest->putTuple(originalTupDest, task, placementIndex, 0, heapTuple); + } + else if (queryNumber == 1) + { + bool isNull = false; + TupleDesc tupDesc = tupleDestination->lastSavedExplainAnalyzeTupDesc; + Datum explainAnalyze = heap_getattr(heapTuple, 1, tupDesc, &isNull); + + if (isNull) + { + ereport(WARNING, (errmsg( + "received null explain analyze output from worker"))); + return; + } + + char *fetchedExplainAnalyzePlan = TextDatumGetCString(explainAnalyze); + + tupleDestination->originalTask->fetchedExplainAnalyzePlan = + pstrdup(fetchedExplainAnalyzePlan); + tupleDestination->originalTask->fetchedExplainAnalyzePlacementIndex = + placementIndex; + } + else + { + ereport(ERROR, (errmsg("cannot get EXPLAIN ANALYZE of multiple queries"), + errdetail("while receiving tuples for query %d", queryNumber))); + } +} + + +/* + * ExplainAnalyzeDestTupleDescForQuery implements TupleDestination->tupleDescForQuery + * for ExplainAnalyzeDestination. + */ +static TupleDesc +ExplainAnalyzeDestTupleDescForQuery(TupleDestination *self, int queryNumber) +{ + ExplainAnalyzeDestination *tupleDestination = (ExplainAnalyzeDestination *) self; + if (queryNumber == 0) + { + TupleDestination *originalTupDest = tupleDestination->originalTaskDestination; + return originalTupDest->tupleDescForQuery(originalTupDest, 0); + } + else if (queryNumber == 1) + { + return tupleDestination->lastSavedExplainAnalyzeTupDesc; + } + + ereport(ERROR, (errmsg("cannot get EXPLAIN ANALYZE of multiple queries"), + errdetail("while requesting for tuple descriptor of query %d", + queryNumber))); + return NULL; +} + + +/* + * RequestedForExplainAnalyze returns true if we should get the EXPLAIN ANALYZE + * output for the given custom scan node. + */ +bool +RequestedForExplainAnalyze(CitusScanState *node) +{ + return (node->customScanState.ss.ps.state->es_instrument != 0); +} + + +/* + * ExplainAnalyzeTaskList returns a task list suitable for explain analyze. After executing + * these tasks, fetchedExplainAnalyzePlan of originalTaskList should be populated. + */ +List * +ExplainAnalyzeTaskList(List *originalTaskList, + TupleDestination *defaultTupleDest, + TupleDesc tupleDesc, + ParamListInfo params) +{ + List *explainAnalyzeTaskList = NIL; + Task *originalTask = NULL; + + /* + * We cannot use multiple commands in a prepared statement, so use the old + * EXPLAIN ANALYZE method for this case. + */ + if (params != NULL) + { + return originalTaskList; + } + + foreach_ptr(originalTask, originalTaskList) + { + if (originalTask->queryCount != 1) + { + ereport(ERROR, (errmsg("cannot get EXPLAIN ANALYZE of multiple queries"))); + } + + Task *explainAnalyzeTask = copyObject(originalTask); + const char *queryString = TaskQueryStringForAllPlacements(explainAnalyzeTask); + char *wrappedQuery = WrapQueryForExplainAnalyze(queryString, tupleDesc); + char *fetchQuery = "SELECT worker_last_saved_explain_analyze()"; + SetTaskQueryStringList(explainAnalyzeTask, list_make2(wrappedQuery, fetchQuery)); + + TupleDestination *originalTaskDest = originalTask->tupleDest ? + originalTask->tupleDest : + defaultTupleDest; + + explainAnalyzeTask->tupleDest = + CreateExplainAnlyzeDestination(originalTask, originalTaskDest); + + explainAnalyzeTaskList = lappend(explainAnalyzeTaskList, explainAnalyzeTask); + } + + return explainAnalyzeTaskList; +} + + +/* + * WrapQueryForExplainAnalyze wraps a query into a worker_save_query_explain_analyze() + * call so we can fetch its explain analyze after its execution. + */ +static char * +WrapQueryForExplainAnalyze(const char *queryString, TupleDesc tupleDesc) +{ + StringInfo columnDef = makeStringInfo(); + for (int columnIndex = 0; columnIndex < tupleDesc->natts; columnIndex++) + { + if (columnIndex != 0) + { + appendStringInfoString(columnDef, ", "); + } + + Form_pg_attribute attr = &tupleDesc->attrs[columnIndex]; + char *attrType = format_type_with_typemod(attr->atttypid, attr->atttypmod); + + appendStringInfo(columnDef, "field_%d %s", columnIndex, attrType); + } + + /* + * column definition cannot be empty, so create a dummy column definition for + * queries with no results. + */ + if (tupleDesc->natts == 0) + { + appendStringInfo(columnDef, "dummy_field int"); + } + + StringInfo explainOptions = makeStringInfo(); + appendStringInfo(explainOptions, "{\"verbose\": %s, \"costs\": %s, \"buffers\": %s, " + "\"timing\": %s, \"summary\": %s, \"format\": \"%s\"}", + CurrentDistributedQueryExplainOptions.verbose ? "true" : "false", + CurrentDistributedQueryExplainOptions.costs ? "true" : "false", + CurrentDistributedQueryExplainOptions.buffers ? "true" : "false", + CurrentDistributedQueryExplainOptions.timing ? "true" : "false", + CurrentDistributedQueryExplainOptions.summary ? "true" : "false", + ExplainFormatStr(CurrentDistributedQueryExplainOptions.format)); + + StringInfo wrappedQuery = makeStringInfo(); + appendStringInfo(wrappedQuery, + "SELECT * FROM worker_save_query_explain_analyze(%s, %s) AS (%s)", + quote_literal_cstr(queryString), + quote_literal_cstr(explainOptions->data), + columnDef->data); + + return wrappedQuery->data; +} + + +/* + * SplitString splits the given string by the given delimiter. + * + * Why not use strtok_s()? Its signature and semantics are difficult to understand. + * + * Why not use strchr() (similar to do_text_output_multiline)? Although not banned, + * it isn't safe if by any chance str is not null-terminated. + */ +static List * +SplitString(const char *str, char delimiter) +{ + size_t len = strnlen_s(str, RSIZE_MAX_STR); + if (len == 0) + { + return NIL; + } + + List *tokenList = NIL; + StringInfo token = makeStringInfo(); + + for (size_t index = 0; index < len; index++) + { + if (str[index] == delimiter) + { + tokenList = lappend(tokenList, token); + token = makeStringInfo(); + } + else + { + appendStringInfoChar(token, str[index]); + } + } + + /* append last token */ + tokenList = lappend(tokenList, token); + + return tokenList; +} + + /* below are private functions copied from explain.c */ diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index b5b85ba68..22a5cab39 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -221,7 +221,8 @@ _PG_init(void) * duties. For simplicity insist that all hooks are previously unused. */ if (planner_hook != NULL || ProcessUtility_hook != NULL || - ExecutorStart_hook != NULL || ExecutorRun_hook != NULL) + ExecutorStart_hook != NULL || ExecutorRun_hook != NULL || + ExplainOneQuery_hook != NULL) { ereport(ERROR, (errmsg("Citus has to be loaded first"), errhint("Place citus at the beginning of " @@ -267,6 +268,7 @@ _PG_init(void) set_join_pathlist_hook = multi_join_restriction_hook; ExecutorStart_hook = CitusExecutorStart; ExecutorRun_hook = CitusExecutorRun; + ExplainOneQuery_hook = CitusExplainOneQuery; /* register hook for error messages */ emit_log_hook = multi_log_hook; diff --git a/src/backend/distributed/utils/citus_copyfuncs.c b/src/backend/distributed/utils/citus_copyfuncs.c index 6c5f16883..18ca29582 100644 --- a/src/backend/distributed/utils/citus_copyfuncs.c +++ b/src/backend/distributed/utils/citus_copyfuncs.c @@ -329,6 +329,8 @@ CopyNodeTask(COPYFUNC_ARGS) COPY_SCALAR_FIELD(parametersInQueryStringResolved); COPY_SCALAR_FIELD(tupleDest); COPY_SCALAR_FIELD(queryCount); + COPY_SCALAR_FIELD(fetchedExplainAnalyzePlacementIndex); + COPY_STRING_FIELD(fetchedExplainAnalyzePlan); } diff --git a/src/include/distributed/multi_explain.h b/src/include/distributed/multi_explain.h index 2fe71f04a..67e7e4641 100644 --- a/src/include/distributed/multi_explain.h +++ b/src/include/distributed/multi_explain.h @@ -11,11 +11,20 @@ #define MULTI_EXPLAIN_H #include "executor/executor.h" +#include "tuple_destination.h" /* Config variables managed via guc.c to explain distributed query plans */ extern bool ExplainDistributedQueries; extern bool ExplainAllTasks; extern void FreeSavedExplainPlan(void); +extern void CitusExplainOneQuery(Query *query, int cursorOptions, IntoClause *into, + ExplainState *es, const char *queryString, ParamListInfo + params, + QueryEnvironment *queryEnv); +extern List * ExplainAnalyzeTaskList(List *originalTaskList, + TupleDestination *defaultTupleDest, TupleDesc + tupleDesc, ParamListInfo params); +extern bool RequestedForExplainAnalyze(CitusScanState *node); #endif /* MULTI_EXPLAIN_H */ diff --git a/src/include/distributed/multi_physical_planner.h b/src/include/distributed/multi_physical_planner.h index 78f13fd3c..77e1e5e94 100644 --- a/src/include/distributed/multi_physical_planner.h +++ b/src/include/distributed/multi_physical_planner.h @@ -337,6 +337,13 @@ typedef struct Task * NULL, in which case executor might use a default destination. */ struct TupleDestination *tupleDest; + + /* + * EXPLAIN ANALYZE output fetched from worker. This is saved to be used later + * by RemoteExplain(). + */ + char *fetchedExplainAnalyzePlan; + int fetchedExplainAnalyzePlacementIndex; } Task; diff --git a/src/test/regress/expected/local_shard_execution.out b/src/test/regress/expected/local_shard_execution.out index 000eefbf6..bc0014aeb 100644 --- a/src/test/regress/expected/local_shard_execution.out +++ b/src/test/regress/expected/local_shard_execution.out @@ -327,10 +327,11 @@ EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) DELETE FROM distributed_ta -> Task Node: host=localhost port=xxxxx dbname=regression -> Delete on distributed_table_1470001 distributed_table (actual rows=0 loops=1) - -> Index Scan using distributed_table_pkey_1470001 on distributed_table_1470001 distributed_table (actual rows=0 loops=1) + -> Index Scan using distributed_table_pkey_1470001 on distributed_table_1470001 distributed_table (actual rows=1 loops=1) Index Cond: (key = 1) Filter: (age = 20) -(9 rows) + Trigger for constraint second_distributed_table_key_fkey_1470005: calls=1 +(10 rows) -- show that EXPLAIN ANALYZE deleted the row and cascades deletes SELECT * FROM distributed_table WHERE key = 1 AND age = 20 ORDER BY 1,2,3; diff --git a/src/test/regress/expected/multi_explain.out b/src/test/regress/expected/multi_explain.out index 61c532fbf..eaed21e2d 100644 --- a/src/test/regress/expected/multi_explain.out +++ b/src/test/regress/expected/multi_explain.out @@ -1377,7 +1377,7 @@ t -- \a\t \set default_opts '''{"costs": false, "timing": false, "summary": false}'''::jsonb -CREATE TABLE explain_analyze_test(a int, b text);; +CREATE TABLE explain_analyze_test(a int, b text); INSERT INTO explain_analyze_test VALUES (1, 'value 1'), (2, 'value 2'), (3, 'value 3'), (4, 'value 4'); -- simple select BEGIN; @@ -1690,7 +1690,7 @@ SELECT worker_last_saved_explain_analyze() IS NULL; -- should be deleted at the end of prepare commit BEGIN; -SELECT * FROM worker_save_query_explain_analyze('UPDATE explain_analyze_test SET a=1', '{}') as (a int); +SELECT * FROM worker_save_query_explain_analyze('UPDATE explain_analyze_test SET a=6 WHERE a=4', '{}') as (a int); a --------------------------------------------------------------------- (0 rows) @@ -1709,3 +1709,367 @@ SELECT worker_last_saved_explain_analyze() IS NULL; (1 row) COMMIT PREPARED 'citus_0_1496350_7_0'; +SELECT * FROM explain_analyze_test ORDER BY a; + a | b +--------------------------------------------------------------------- + 1 | value 1 + 2 | value 2 + 3 | value 3 + 6 | value 4 +(4 rows) + +\a\t +-- +-- Test different cases of EXPLAIN ANALYZE +-- +SET citus.shard_count TO 4; +SET client_min_messages TO WARNING; +SELECT create_distributed_table('explain_analyze_test', 'a'); + +\set default_analyze_flags '(ANALYZE on, COSTS off, TIMING off, SUMMARY off)' +-- router SELECT +EXPLAIN :default_analyze_flags SELECT * FROM explain_analyze_test WHERE a = 1; +Custom Scan (Citus Adaptive) (actual rows=1 loops=1) + Task Count: 1 + Tasks Shown: All + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Seq Scan on explain_analyze_test_570009 explain_analyze_test (actual rows=1 loops=1) + Filter: (a = 1) +-- multi-shard SELECT +EXPLAIN :default_analyze_flags SELECT count(*) FROM explain_analyze_test; +Aggregate (actual rows=1 loops=1) + -> Custom Scan (Citus Adaptive) (actual rows=4 loops=1) + Task Count: 4 + Tasks Shown: One of 4 + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Aggregate (actual rows=1 loops=1) + -> Seq Scan on explain_analyze_test_570009 explain_analyze_test (actual rows=1 loops=1) +-- router DML +BEGIN; +EXPLAIN :default_analyze_flags DELETE FROM explain_analyze_test WHERE a = 1; +Custom Scan (Citus Adaptive) (actual rows=0 loops=1) + Task Count: 1 + Tasks Shown: All + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Delete on explain_analyze_test_570009 explain_analyze_test (actual rows=0 loops=1) + -> Seq Scan on explain_analyze_test_570009 explain_analyze_test (actual rows=1 loops=1) + Filter: (a = 1) +EXPLAIN :default_analyze_flags UPDATE explain_analyze_test SET b = 'b' WHERE a = 2; +Custom Scan (Citus Adaptive) (actual rows=0 loops=1) + Task Count: 1 + Tasks Shown: All + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Update on explain_analyze_test_570012 explain_analyze_test (actual rows=0 loops=1) + -> Seq Scan on explain_analyze_test_570012 explain_analyze_test (actual rows=1 loops=1) + Filter: (a = 2) +SELECT * FROM explain_analyze_test ORDER BY a; +2|b +3|value 3 +6|value 4 +ROLLBACK; +-- multi-shard DML +BEGIN; +EXPLAIN :default_analyze_flags UPDATE explain_analyze_test SET b = 'b' WHERE a IN (1, 2); +Custom Scan (Citus Adaptive) (actual rows=0 loops=1) + Task Count: 2 + Tasks Shown: One of 2 + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Update on explain_analyze_test_570009 explain_analyze_test (actual rows=0 loops=1) + -> Seq Scan on explain_analyze_test_570009 explain_analyze_test (actual rows=1 loops=1) + Filter: (a = ANY ('{1,2}'::integer[])) +EXPLAIN :default_analyze_flags DELETE FROM explain_analyze_test; +Custom Scan (Citus Adaptive) (actual rows=0 loops=1) + Task Count: 4 + Tasks Shown: One of 4 + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Delete on explain_analyze_test_570009 explain_analyze_test (actual rows=0 loops=1) + -> Seq Scan on explain_analyze_test_570009 explain_analyze_test (actual rows=1 loops=1) +SELECT * FROM explain_analyze_test ORDER BY a; +ROLLBACK; +-- single-row insert +BEGIN; +EXPLAIN :default_analyze_flags INSERT INTO explain_analyze_test VALUES (5, 'value 5'); +Custom Scan (Citus Adaptive) (actual rows=0 loops=1) + Task Count: 1 + Tasks Shown: All + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Insert on explain_analyze_test_570009 (actual rows=0 loops=1) + -> Result (actual rows=1 loops=1) +ROLLBACK; +-- multi-row insert +BEGIN; +EXPLAIN :default_analyze_flags INSERT INTO explain_analyze_test VALUES (5, 'value 5'), (6, 'value 6'); +Custom Scan (Citus Adaptive) (actual rows=0 loops=1) + Task Count: 2 + Tasks Shown: One of 2 + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Insert on explain_analyze_test_570009 citus_table_alias (actual rows=0 loops=1) + -> Result (actual rows=1 loops=1) +ROLLBACK; +-- distributed insert/select +BEGIN; +EXPLAIN :default_analyze_flags INSERT INTO explain_analyze_test SELECT * FROM explain_analyze_test; +Custom Scan (Citus Adaptive) (actual rows=0 loops=1) + Task Count: 4 + Tasks Shown: One of 4 + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Insert on explain_analyze_test_570009 citus_table_alias (actual rows=0 loops=1) + -> Seq Scan on explain_analyze_test_570009 explain_analyze_test (actual rows=1 loops=1) + Filter: ((worker_hash(a) >= '-2147483648'::integer) AND (worker_hash(a) <= '-1073741825'::integer)) +ROLLBACK; +DROP TABLE explain_analyze_test; +-- test EXPLAIN ANALYZE works fine with primary keys +CREATE TABLE explain_pk(a int primary key, b int); +SELECT create_distributed_table('explain_pk', 'a'); + +BEGIN; +EXPLAIN :default_analyze_flags INSERT INTO explain_pk VALUES (1, 2), (2, 3); +Custom Scan (Citus Adaptive) (actual rows=0 loops=1) + Task Count: 2 + Tasks Shown: One of 2 + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Insert on explain_pk_570013 citus_table_alias (actual rows=0 loops=1) + -> Result (actual rows=1 loops=1) +SELECT * FROM explain_pk ORDER BY 1; +1|2 +2|3 +ROLLBACK; +-- test EXPLAIN ANALYZE with non-text output formats +BEGIN; +EXPLAIN (COSTS off, ANALYZE on, TIMING off, SUMMARY off, FORMAT JSON) INSERT INTO explain_pk VALUES (1, 2), (2, 3); +[ + { + "Plan": { + "Node Type": "Custom Scan", + "Custom Plan Provider": "Citus Adaptive", + "Parallel Aware": false, + "Actual Rows": 0, + "Actual Loops": 1, + "Distributed Query": { + "Job": { + "Task Count": 2, + "Tasks Shown": "One of 2", + "Tasks": [ + { + "Node": "host=localhost port=xxxxx dbname=regression", + "Remote Plan": [ + [ + { + "Plan": { + "Node Type": "ModifyTable", + "Operation": "Insert", + "Parallel Aware": false, + "Relation Name": "explain_pk_570013", + "Alias": "citus_table_alias", + "Actual Rows": 0, + "Actual Loops": 1, + "Plans": [ + { + "Node Type": "Result", + "Parent Relationship": "Member", + "Parallel Aware": false, + "Actual Rows": 1, + "Actual Loops": 1 + } + ] + }, + "Triggers": [ + ] + } + ] + + ] + } + ] + } + } + }, + "Triggers": [ + ] + } +] +ROLLBACK; +BEGIN; +EXPLAIN (COSTS off, ANALYZE on, TIMING off, SUMMARY off, FORMAT XML) INSERT INTO explain_pk VALUES (1, 2), (2, 3); + + + + Custom Scan + Citus Adaptive + false + 0 + 1 + + + 2 + One of 2 + + + host=localhost port=xxxxx dbname=regression + + + + + ModifyTable + Insert + false + explain_pk_570013 + citus_table_alias + 0 + 1 + + + Result + Member + false + 1 + 1 + + + + + + + + + + + + + + + + + +ROLLBACK; +DROP TABLE explain_pk; +-- test EXPLAIN ANALYZE with CTEs and subqueries +CREATE TABLE dist_table(a int, b int); +SELECT create_distributed_table('dist_table', 'a'); + +CREATE TABLE ref_table(a int); +SELECT create_reference_table('ref_table'); + +INSERT INTO dist_table SELECT i, i*i FROM generate_series(1, 10) i; +INSERT INTO ref_table SELECT i FROM generate_series(1, 10) i; +EXPLAIN :default_analyze_flags +WITH r AS ( + SELECT random() r, a FROM dist_table +) +SELECT count(distinct a) from r NATURAL JOIN ref_table; +Custom Scan (Citus Adaptive) (actual rows=1 loops=1) + -> Distributed Subplan XXX_1 + -> Custom Scan (Citus Adaptive) (actual rows=10 loops=1) + Task Count: 4 + Tasks Shown: One of 4 + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Seq Scan on dist_table_570017 dist_table (actual rows=4 loops=1) + Task Count: 1 + Tasks Shown: All + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Aggregate (actual rows=1 loops=1) + -> Hash Join (actual rows=10 loops=1) + Hash Cond: (ref_table.a = intermediate_result.a) + -> Seq Scan on ref_table_570021 ref_table (actual rows=10 loops=1) + -> Hash (actual rows=10 loops=1) + Buckets: 1024 Batches: 1 Memory Usage: 9kB + -> Function Scan on read_intermediate_result intermediate_result (actual rows=10 loops=1) +EXPLAIN :default_analyze_flags +SELECT count(distinct a) FROM (SELECT random() r, a FROM dist_table) t NATURAL JOIN ref_table; +Aggregate (actual rows=1 loops=1) + -> Custom Scan (Citus Adaptive) (actual rows=10 loops=1) + Task Count: 4 + Tasks Shown: One of 4 + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Group (actual rows=4 loops=1) + Group Key: t.a + -> Merge Join (actual rows=4 loops=1) + Merge Cond: (t.a = ref_table.a) + -> Sort (actual rows=4 loops=1) + Sort Key: t.a + Sort Method: quicksort Memory: 25kB + -> Subquery Scan on t (actual rows=4 loops=1) + -> Seq Scan on dist_table_570017 dist_table (actual rows=4 loops=1) + -> Sort (actual rows=10 loops=1) + Sort Key: ref_table.a + Sort Method: quicksort Memory: 25kB + -> Seq Scan on ref_table_570021 ref_table (actual rows=10 loops=1) +EXPLAIN :default_analyze_flags +SELECT count(distinct a) FROM dist_table +WHERE EXISTS(SELECT random() FROM dist_table NATURAL JOIN ref_table); +Aggregate (actual rows=1 loops=1) + -> Custom Scan (Citus Adaptive) (actual rows=10 loops=1) + -> Distributed Subplan XXX_1 + -> Custom Scan (Citus Adaptive) (actual rows=10 loops=1) + Task Count: 4 + Tasks Shown: One of 4 + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Merge Join (actual rows=4 loops=1) + Merge Cond: (dist_table.a = ref_table.a) + -> Sort (actual rows=4 loops=1) + Sort Key: dist_table.a + Sort Method: quicksort Memory: 25kB + -> Seq Scan on dist_table_570017 dist_table (actual rows=4 loops=1) + -> Sort (actual rows=10 loops=1) + Sort Key: ref_table.a + Sort Method: quicksort Memory: 25kB + -> Seq Scan on ref_table_570021 ref_table (actual rows=10 loops=1) + Task Count: 4 + Tasks Shown: One of 4 + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> HashAggregate (actual rows=4 loops=1) + Group Key: dist_table.a + InitPlan 1 (returns $0) + -> Function Scan on read_intermediate_result intermediate_result (actual rows=1 loops=1) + -> Result (actual rows=4 loops=1) + One-Time Filter: $0 + -> Seq Scan on dist_table_570017 dist_table (actual rows=4 loops=1) +BEGIN; +EXPLAIN :default_analyze_flags +WITH r AS ( + INSERT INTO dist_table SELECT a, a * a FROM dist_table + RETURNING a +), s AS ( + SELECT random(), a * a a2 FROM r +) +SELECT count(distinct a2) FROM s; +Custom Scan (Citus Adaptive) (actual rows=1 loops=1) + -> Distributed Subplan XXX_1 + -> Custom Scan (Citus Adaptive) (actual rows=20 loops=1) + Task Count: 4 + Tasks Shown: One of 4 + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Insert on dist_table_570017 citus_table_alias (actual rows=8 loops=1) + -> Seq Scan on dist_table_570017 dist_table (actual rows=8 loops=1) + Filter: ((worker_hash(a) >= '-2147483648'::integer) AND (worker_hash(a) <= '-1073741825'::integer)) + -> Distributed Subplan XXX_2 + -> Custom Scan (Citus Adaptive) (actual rows=10 loops=1) + Task Count: 1 + Tasks Shown: All + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Function Scan on read_intermediate_result intermediate_result (actual rows=10 loops=1) + Task Count: 1 + Tasks Shown: All + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Aggregate (actual rows=1 loops=1) + -> Function Scan on read_intermediate_result intermediate_result (actual rows=10 loops=1) +ROLLBACK; +DROP TABLE ref_table, dist_table; diff --git a/src/test/regress/sql/multi_explain.sql b/src/test/regress/sql/multi_explain.sql index 9d7301422..224a5b886 100644 --- a/src/test/regress/sql/multi_explain.sql +++ b/src/test/regress/sql/multi_explain.sql @@ -620,7 +620,7 @@ $$); \set default_opts '''{"costs": false, "timing": false, "summary": false}'''::jsonb -CREATE TABLE explain_analyze_test(a int, b text);; +CREATE TABLE explain_analyze_test(a int, b text); INSERT INTO explain_analyze_test VALUES (1, 'value 1'), (2, 'value 2'), (3, 'value 3'), (4, 'value 4'); -- simple select @@ -727,8 +727,114 @@ SELECT worker_last_saved_explain_analyze() IS NULL; -- should be deleted at the end of prepare commit BEGIN; -SELECT * FROM worker_save_query_explain_analyze('UPDATE explain_analyze_test SET a=1', '{}') as (a int); +SELECT * FROM worker_save_query_explain_analyze('UPDATE explain_analyze_test SET a=6 WHERE a=4', '{}') as (a int); SELECT worker_last_saved_explain_analyze() IS NOT NULL; PREPARE TRANSACTION 'citus_0_1496350_7_0'; SELECT worker_last_saved_explain_analyze() IS NULL; COMMIT PREPARED 'citus_0_1496350_7_0'; + +SELECT * FROM explain_analyze_test ORDER BY a; + +\a\t + +-- +-- Test different cases of EXPLAIN ANALYZE +-- + +SET citus.shard_count TO 4; +SET client_min_messages TO WARNING; +SELECT create_distributed_table('explain_analyze_test', 'a'); + +\set default_analyze_flags '(ANALYZE on, COSTS off, TIMING off, SUMMARY off)' + +-- router SELECT +EXPLAIN :default_analyze_flags SELECT * FROM explain_analyze_test WHERE a = 1; + +-- multi-shard SELECT +EXPLAIN :default_analyze_flags SELECT count(*) FROM explain_analyze_test; + +-- router DML +BEGIN; +EXPLAIN :default_analyze_flags DELETE FROM explain_analyze_test WHERE a = 1; +EXPLAIN :default_analyze_flags UPDATE explain_analyze_test SET b = 'b' WHERE a = 2; +SELECT * FROM explain_analyze_test ORDER BY a; +ROLLBACK; + +-- multi-shard DML +BEGIN; +EXPLAIN :default_analyze_flags UPDATE explain_analyze_test SET b = 'b' WHERE a IN (1, 2); +EXPLAIN :default_analyze_flags DELETE FROM explain_analyze_test; +SELECT * FROM explain_analyze_test ORDER BY a; +ROLLBACK; + +-- single-row insert +BEGIN; +EXPLAIN :default_analyze_flags INSERT INTO explain_analyze_test VALUES (5, 'value 5'); +ROLLBACK; + +-- multi-row insert +BEGIN; +EXPLAIN :default_analyze_flags INSERT INTO explain_analyze_test VALUES (5, 'value 5'), (6, 'value 6'); +ROLLBACK; + +-- distributed insert/select +BEGIN; +EXPLAIN :default_analyze_flags INSERT INTO explain_analyze_test SELECT * FROM explain_analyze_test; +ROLLBACK; + +DROP TABLE explain_analyze_test; + +-- test EXPLAIN ANALYZE works fine with primary keys +CREATE TABLE explain_pk(a int primary key, b int); +SELECT create_distributed_table('explain_pk', 'a'); + +BEGIN; +EXPLAIN :default_analyze_flags INSERT INTO explain_pk VALUES (1, 2), (2, 3); +SELECT * FROM explain_pk ORDER BY 1; +ROLLBACK; + +-- test EXPLAIN ANALYZE with non-text output formats +BEGIN; +EXPLAIN (COSTS off, ANALYZE on, TIMING off, SUMMARY off, FORMAT JSON) INSERT INTO explain_pk VALUES (1, 2), (2, 3); +ROLLBACK; + +BEGIN; +EXPLAIN (COSTS off, ANALYZE on, TIMING off, SUMMARY off, FORMAT XML) INSERT INTO explain_pk VALUES (1, 2), (2, 3); +ROLLBACK; + +DROP TABLE explain_pk; + +-- test EXPLAIN ANALYZE with CTEs and subqueries +CREATE TABLE dist_table(a int, b int); +SELECT create_distributed_table('dist_table', 'a'); +CREATE TABLE ref_table(a int); +SELECT create_reference_table('ref_table'); + +INSERT INTO dist_table SELECT i, i*i FROM generate_series(1, 10) i; +INSERT INTO ref_table SELECT i FROM generate_series(1, 10) i; + +EXPLAIN :default_analyze_flags +WITH r AS ( + SELECT random() r, a FROM dist_table +) +SELECT count(distinct a) from r NATURAL JOIN ref_table; + +EXPLAIN :default_analyze_flags +SELECT count(distinct a) FROM (SELECT random() r, a FROM dist_table) t NATURAL JOIN ref_table; + +EXPLAIN :default_analyze_flags +SELECT count(distinct a) FROM dist_table +WHERE EXISTS(SELECT random() FROM dist_table NATURAL JOIN ref_table); + +BEGIN; +EXPLAIN :default_analyze_flags +WITH r AS ( + INSERT INTO dist_table SELECT a, a * a FROM dist_table + RETURNING a +), s AS ( + SELECT random(), a * a a2 FROM r +) +SELECT count(distinct a2) FROM s; +ROLLBACK; + +DROP TABLE ref_table, dist_table;