From 7b74eca22d8b96607677cffc570ea4454789c541 Mon Sep 17 00:00:00 2001 From: Hadi Moshayedi Date: Fri, 24 Jul 2020 16:42:02 -0700 Subject: [PATCH] Support EXPLAIN EXECUTE ANALYZE. --- .../distributed/executor/adaptive_executor.c | 83 ++++- .../distributed/executor/multi_executor.c | 29 ++ .../distributed/planner/deparse_shard_query.c | 19 + .../distributed/planner/multi_explain.c | 85 +++-- src/include/distributed/deparse_shard_query.h | 1 + src/include/distributed/multi_executor.h | 1 + src/test/regress/expected/multi_explain.out | 325 +++++++++++++++++- src/test/regress/sql/multi_explain.sql | 117 +++++++ 8 files changed, 613 insertions(+), 47 deletions(-) diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index 1cfa03ae0..f9fa78d40 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -618,6 +618,8 @@ static TaskPlacementExecution * PopAssignedPlacementExecution(WorkerSession *ses static TaskPlacementExecution * PopUnassignedPlacementExecution(WorkerPool *workerPool); static bool StartPlacementExecutionOnSession(TaskPlacementExecution *placementExecution, WorkerSession *session); +static bool SendNextQuery(TaskPlacementExecution *placementExecution, + WorkerSession *session); static void ConnectionStateMachine(WorkerSession *session); static void HandleMultiConnectionSuccess(WorkerSession *session); static bool HasAnyConnectionFailure(WorkerPool *workerPool); @@ -709,6 +711,11 @@ AdaptiveExecutor(CitusScanState *scanState) if (RequestedForExplainAnalyze(scanState)) { + /* + * We use multiple queries per task in EXPLAIN ANALYZE which need to + * be part of the same transaction. + */ + UseCoordinatedTransaction(); taskList = ExplainAnalyzeTaskList(taskList, defaultTupleDest, tupleDescriptor, paramListInfo); } @@ -3281,6 +3288,32 @@ TransactionStateMachine(WorkerSession *session) break; } + /* if this is a multi-query task, send the next query */ + if (placementExecution->queryIndex < task->queryCount) + { + bool querySent = SendNextQuery(placementExecution, session); + if (!querySent) + { + /* no need to continue, connection is lost */ + Assert(session->connection->connectionState == + MULTI_CONNECTION_LOST); + + return; + } + + /* + * At this point the query might be just in pgconn buffers. We + * need to wait until it becomes writeable to actually send + * the query. + */ + UpdateConnectionWaitFlags(session, + WL_SOCKET_WRITEABLE | WL_SOCKET_READABLE); + + transaction->transactionState = REMOTE_TRANS_SENT_COMMAND; + + break; + } + shardCommandExecution->gotResults = true; transaction->transactionState = REMOTE_TRANS_CLEARING_RESULTS; break; @@ -3454,7 +3487,7 @@ PopUnassignedPlacementExecution(WorkerPool *workerPool) /* - * StartPlacementExecutionOnSession gets a TaskPlacementExecition and + * StartPlacementExecutionOnSession gets a TaskPlacementExecution and * WorkerSession, the task's query is sent to the worker via the session. * * The function does some bookkeeping such as associating the placement @@ -3470,17 +3503,13 @@ StartPlacementExecutionOnSession(TaskPlacementExecution *placementExecution, { WorkerPool *workerPool = session->workerPool; DistributedExecution *execution = workerPool->distributedExecution; - ParamListInfo paramListInfo = execution->paramListInfo; MultiConnection *connection = session->connection; ShardCommandExecution *shardCommandExecution = placementExecution->shardCommandExecution; - bool binaryResults = shardCommandExecution->binaryResults; Task *task = shardCommandExecution->task; ShardPlacement *taskPlacement = placementExecution->shardPlacement; List *placementAccessList = PlacementAccessListForTask(task, taskPlacement); - int querySent = 0; - char *queryString = TaskQueryString(task); if (execution->transactionProperties->useRemoteTransactionBlocks != TRANSACTION_BLOCKS_DISALLOWED) @@ -3492,11 +3521,7 @@ StartPlacementExecutionOnSession(TaskPlacementExecution *placementExecution, AssignPlacementListToConnection(placementAccessList, connection); } - - /* one more command is sent over the session */ - session->commandsSent++; - - if (session->commandsSent == 1) + if (session->commandsSent == 0) { /* first time we send a command, consider the connection used (not unused) */ workerPool->unusedConnectionCount--; @@ -3507,6 +3532,38 @@ StartPlacementExecutionOnSession(TaskPlacementExecution *placementExecution, session->currentTask = placementExecution; placementExecution->executionState = PLACEMENT_EXECUTION_RUNNING; + bool querySent = SendNextQuery(placementExecution, session); + if (querySent) + { + session->commandsSent++; + } + + return querySent; +} + + +/* + * SendNextQuery sends the next query for placementExecution on the given + * session. + */ +static bool +SendNextQuery(TaskPlacementExecution *placementExecution, + WorkerSession *session) +{ + WorkerPool *workerPool = session->workerPool; + DistributedExecution *execution = workerPool->distributedExecution; + MultiConnection *connection = session->connection; + ShardCommandExecution *shardCommandExecution = + placementExecution->shardCommandExecution; + bool binaryResults = shardCommandExecution->binaryResults; + Task *task = shardCommandExecution->task; + ParamListInfo paramListInfo = execution->paramListInfo; + int querySent = 0; + uint32 queryIndex = placementExecution->queryIndex; + + Assert(queryIndex < task->queryCount); + char *queryString = TaskQueryStringAtIndex(task, queryIndex); + if (paramListInfo != NULL && !task->parametersInQueryStringResolved) { int parameterCount = paramListInfo->numParams; @@ -3659,6 +3716,12 @@ ReceiveResults(WorkerSession *session, bool storeRows) } uint32 queryIndex = placementExecution->queryIndex; + if (queryIndex >= task->queryCount) + { + ereport(ERROR, (errmsg("unexpected query index while processing" + " query results"))); + } + TupleDesc tupleDescriptor = tupleDest->tupleDescForQuery(tupleDest, queryIndex); if (tupleDescriptor == NULL) { diff --git a/src/backend/distributed/executor/multi_executor.c b/src/backend/distributed/executor/multi_executor.c index 3c3f0edc0..5bff8e949 100644 --- a/src/backend/distributed/executor/multi_executor.c +++ b/src/backend/distributed/executor/multi_executor.c @@ -57,6 +57,12 @@ int MultiShardConnectionType = PARALLEL_CONNECTION; bool WritableStandbyCoordinator = false; +/* + * Pointer to bound parameters of the current ongoing call to ExecutorRun. + * If executor is not running, then this value is meaningless. + */ +ParamListInfo executorBoundParams = NULL; + /* sort the returning to get consistent outputs, used only for testing */ bool SortReturning = false; @@ -129,6 +135,14 @@ CitusExecutorRun(QueryDesc *queryDesc, { DestReceiver *dest = queryDesc->dest; + ParamListInfo savedBoundParams = executorBoundParams; + + /* + * Save a pointer to query params so UDFs can access them by calling + * ExecutorBoundParams(). + */ + executorBoundParams = queryDesc->params; + /* * We do some potentially time consuming operations our self now before we hand of * control to postgres' executor. To make sure that time spent is accurately measured @@ -209,6 +223,7 @@ CitusExecutorRun(QueryDesc *queryDesc, queryDesc->totaltime = totalTime; } + executorBoundParams = savedBoundParams; ExecutorLevel--; if (ExecutorLevel == 0 && PlannerLevel == 0) @@ -228,6 +243,7 @@ CitusExecutorRun(QueryDesc *queryDesc, queryDesc->totaltime = totalTime; } + executorBoundParams = savedBoundParams; ExecutorLevel--; PG_RE_THROW(); @@ -691,3 +707,16 @@ AlterTableConstraintCheck(QueryDesc *queryDesc) return true; } + + +/* + * ExecutorBoundParams returns the bound parameters of the current ongoing call + * to ExecutorRun. This is meant to be used by UDFs which need to access bound + * parameters. + */ +ParamListInfo +ExecutorBoundParams(void) +{ + Assert(ExecutorLevel > 0); + return executorBoundParams; +} diff --git a/src/backend/distributed/planner/deparse_shard_query.c b/src/backend/distributed/planner/deparse_shard_query.c index d96108471..60a83a186 100644 --- a/src/backend/distributed/planner/deparse_shard_query.c +++ b/src/backend/distributed/planner/deparse_shard_query.c @@ -513,6 +513,25 @@ GetTaskQueryType(Task *task) } +/* + * TaskQueryStringAtIndex returns query at given index among the possibly + * multiple queries that a task can have. + */ +char * +TaskQueryStringAtIndex(Task *task, int index) +{ + Assert(index < task->queryCount); + + int taskQueryType = GetTaskQueryType(task); + if (taskQueryType == TASK_QUERY_TEXT_LIST) + { + return list_nth(task->taskQuery.data.queryStringList, index); + } + + return TaskQueryString(task); +} + + /* * TaskQueryString generates task query string text if missing. * diff --git a/src/backend/distributed/planner/multi_explain.c b/src/backend/distributed/planner/multi_explain.c index 414531c56..4aefebd6b 100644 --- a/src/backend/distributed/planner/multi_explain.c +++ b/src/backend/distributed/planner/multi_explain.c @@ -116,12 +116,16 @@ typedef struct ExplainAnalyzeDestination /* Explain functions for distributed queries */ static void ExplainSubPlans(DistributedPlan *distributedPlan, ExplainState *es); -static void ExplainJob(CitusScanState *scanState, Job *job, ExplainState *es); +static void ExplainJob(CitusScanState *scanState, Job *job, ExplainState *es, + ParamListInfo params); static void ExplainMapMergeJob(MapMergeJob *mapMergeJob, ExplainState *es); -static void ExplainTaskList(CitusScanState *scanState, List *taskList, ExplainState *es); -static RemoteExplainPlan * RemoteExplain(Task *task, ExplainState *es); +static void ExplainTaskList(CitusScanState *scanState, List *taskList, ExplainState *es, + ParamListInfo params); +static RemoteExplainPlan * RemoteExplain(Task *task, ExplainState *es, ParamListInfo + params); static RemoteExplainPlan * GetSavedRemoteExplain(Task *task, ExplainState *es); -static RemoteExplainPlan * FetchRemoteExplainFromWorkers(Task *task, ExplainState *es); +static RemoteExplainPlan * FetchRemoteExplainFromWorkers(Task *task, ExplainState *es, + ParamListInfo params); static void ExplainTask(CitusScanState *scanState, Task *task, int placementIndex, List *explainOutputList, ExplainState *es); @@ -175,6 +179,8 @@ CitusExplainScan(CustomScanState *node, List *ancestors, struct ExplainState *es { CitusScanState *scanState = (CitusScanState *) node; DistributedPlan *distributedPlan = scanState->distributedPlan; + EState *executorState = ScanStateGetExecutorState(scanState); + ParamListInfo params = executorState->es_param_list_info; if (!ExplainDistributedQueries) { @@ -191,7 +197,7 @@ CitusExplainScan(CustomScanState *node, List *ancestors, struct ExplainState *es ExplainSubPlans(distributedPlan, es); } - ExplainJob(scanState, distributedPlan->workerJob, es); + ExplainJob(scanState, distributedPlan->workerJob, es, params); ExplainCloseGroup("Distributed Query", "Distributed Query", true, es); } @@ -377,7 +383,8 @@ ShowReceivedTupleData(CitusScanState *scanState, ExplainState *es) * or all tasks if citus.explain_all_tasks is on. */ static void -ExplainJob(CitusScanState *scanState, Job *job, ExplainState *es) +ExplainJob(CitusScanState *scanState, Job *job, ExplainState *es, + ParamListInfo params) { List *dependentJobList = job->dependentJobList; int dependentJobCount = list_length(dependentJobList); @@ -426,7 +433,7 @@ ExplainJob(CitusScanState *scanState, Job *job, ExplainState *es) { ExplainOpenGroup("Tasks", "Tasks", false, es); - ExplainTaskList(scanState, taskList, es); + ExplainTaskList(scanState, taskList, es, params); ExplainCloseGroup("Tasks", "Tasks", false, es); } @@ -526,7 +533,8 @@ ExplainMapMergeJob(MapMergeJob *mapMergeJob, ExplainState *es) * or all tasks if citus.explain_all_tasks is on. */ static void -ExplainTaskList(CitusScanState *scanState, List *taskList, ExplainState *es) +ExplainTaskList(CitusScanState *scanState, List *taskList, ExplainState *es, + ParamListInfo params) { ListCell *taskCell = NULL; ListCell *remoteExplainCell = NULL; @@ -539,7 +547,7 @@ ExplainTaskList(CitusScanState *scanState, List *taskList, ExplainState *es) { Task *task = (Task *) lfirst(taskCell); - RemoteExplainPlan *remoteExplain = RemoteExplain(task, es); + RemoteExplainPlan *remoteExplain = RemoteExplain(task, es, params); remoteExplainList = lappend(remoteExplainList, remoteExplain); if (!ExplainAllTasks) @@ -564,7 +572,7 @@ ExplainTaskList(CitusScanState *scanState, List *taskList, ExplainState *es) * RemoteExplain fetches the remote EXPLAIN output for a single task. */ static RemoteExplainPlan * -RemoteExplain(Task *task, ExplainState *es) +RemoteExplain(Task *task, ExplainState *es, ParamListInfo params) { /* * For EXPLAIN EXECUTE we still use the old method, so task->fetchedExplainAnalyzePlan @@ -576,7 +584,7 @@ RemoteExplain(Task *task, ExplainState *es) } else { - return FetchRemoteExplainFromWorkers(task, es); + return FetchRemoteExplainFromWorkers(task, es, params); } } @@ -619,7 +627,7 @@ GetSavedRemoteExplain(Task *task, ExplainState *es) * one succeeds or all failed. */ static RemoteExplainPlan * -FetchRemoteExplainFromWorkers(Task *task, ExplainState *es) +FetchRemoteExplainFromWorkers(Task *task, ExplainState *es, ParamListInfo params) { List *taskPlacementList = task->taskPlacementList; int placementCount = list_length(taskPlacementList); @@ -638,7 +646,6 @@ FetchRemoteExplainFromWorkers(Task *task, ExplainState *es) for (int placementIndex = 0; placementIndex < placementCount; placementIndex++) { ShardPlacement *taskPlacement = list_nth(taskPlacementList, placementIndex); - PGresult *queryResult = NULL; int connectionFlags = 0; remotePlan->placementIndex = placementIndex; @@ -668,14 +675,28 @@ FetchRemoteExplainFromWorkers(Task *task, ExplainState *es) ExecuteCriticalRemoteCommand(connection, "SAVEPOINT citus_explain_savepoint"); /* run explain query */ - int executeResult = ExecuteOptionalRemoteCommand(connection, explainQuery->data, - &queryResult); - if (executeResult != 0) - { - PQclear(queryResult); - ForgetResults(connection); + int numParams = params ? params->numParams : 0; + Oid *paramTypes = NULL; + const char **paramValues = NULL; + PGresult *queryResult = NULL; - continue; + if (params) + { + ExtractParametersFromParamList(params, ¶mTypes, ¶mValues, false); + } + + int sendStatus = SendRemoteCommandParams(connection, explainQuery->data, + numParams, paramTypes, paramValues, + false); + if (sendStatus != 0) + { + queryResult = GetRemoteCommandResult(connection, false); + if (!IsResponseOK(queryResult)) + { + PQclear(queryResult); + ForgetResults(connection); + continue; + } } /* read explain query results */ @@ -965,7 +986,18 @@ worker_save_query_explain_analyze(PG_FUNCTION_ARGS) RawStmt *parseTree = linitial(parseTreeList); - List *queryList = pg_analyze_and_rewrite(parseTree, queryString, NULL, 0, NULL); + ParamListInfo boundParams = ExecutorBoundParams(); + int numParams = boundParams ? boundParams->numParams : 0; + Oid *paramTypes = NULL; + const char **paramValues = NULL; + if (boundParams != NULL) + { + ExtractParametersFromParamList(boundParams, ¶mTypes, ¶mValues, false); + } + + List *queryList = pg_analyze_and_rewrite(parseTree, queryString, paramTypes, + numParams, NULL); + if (list_length(queryList) != 1) { ereport(ERROR, (errmsg("cannot EXPLAIN ANALYZE a query rewritten " @@ -988,7 +1020,7 @@ worker_save_query_explain_analyze(PG_FUNCTION_ARGS) INSTR_TIME_SUBTRACT(planDuration, planStart); /* do the actual EXPLAIN ANALYZE */ - ExplainWorkerPlan(plan, tupleStoreDest, es, queryString, NULL, NULL, + ExplainWorkerPlan(plan, tupleStoreDest, es, queryString, boundParams, NULL, &planDuration, &executionDurationMillisec); ExplainEndOutput(es); @@ -1339,15 +1371,6 @@ ExplainAnalyzeTaskList(List *originalTaskList, List *explainAnalyzeTaskList = NIL; Task *originalTask = NULL; - /* - * We cannot use multiple commands in a prepared statement, so use the old - * EXPLAIN ANALYZE method for this case. - */ - if (params != NULL) - { - return originalTaskList; - } - foreach_ptr(originalTask, originalTaskList) { if (originalTask->queryCount != 1) diff --git a/src/include/distributed/deparse_shard_query.h b/src/include/distributed/deparse_shard_query.h index 9ab4cdbff..f98c0d996 100644 --- a/src/include/distributed/deparse_shard_query.h +++ b/src/include/distributed/deparse_shard_query.h @@ -27,6 +27,7 @@ extern void SetTaskQueryIfShouldLazyDeparse(Task *task, Query *query); extern void SetTaskQueryString(Task *task, char *queryString); extern void SetTaskQueryStringList(Task *task, List *queryStringList); extern char * TaskQueryString(Task *task); +extern char * TaskQueryStringAtIndex(Task *task, int index); extern bool UpdateRelationsToLocalShardTables(Node *node, List *relationShardList); extern int GetTaskQueryType(Task *task); diff --git a/src/include/distributed/multi_executor.h b/src/include/distributed/multi_executor.h index 000859616..0054c958a 100644 --- a/src/include/distributed/multi_executor.h +++ b/src/include/distributed/multi_executor.h @@ -146,6 +146,7 @@ extern void ExtractParametersFromParamList(ParamListInfo paramListInfo, Oid **parameterTypes, const char ***parameterValues, bool useOriginalCustomTypeOids); +extern ParamListInfo ExecutorBoundParams(void); #endif /* MULTI_EXECUTOR_H */ diff --git a/src/test/regress/expected/multi_explain.out b/src/test/regress/expected/multi_explain.out index 85fc6dbb7..3d88dbc08 100644 --- a/src/test/regress/expected/multi_explain.out +++ b/src/test/regress/expected/multi_explain.out @@ -1283,23 +1283,23 @@ Custom Scan (Citus Adaptive) (actual rows=3 loops=1) PREPARE multi_shard_query_param(int) AS UPDATE lineitem SET l_quantity = $1; BEGIN; EXPLAIN EXECUTE multi_shard_query_param(5); -WARNING: there is no parameter $1 -WARNING: there is no parameter $1 Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=0 width=0) Task Count: 2 Tasks Shown: One of 2 -> Task - Error: Could not get remote plan. + Node: host=localhost port=xxxxx dbname=regression + -> Update on lineitem_290000 lineitem (cost=0.00..176.00 rows=6000 width=140) + -> Seq Scan on lineitem_290000 lineitem (cost=0.00..176.00 rows=6000 width=140) ROLLBACK; BEGIN; EXPLAIN (ANALYZE ON, COSTS OFF, TIMING OFF, SUMMARY OFF) EXECUTE multi_shard_query_param(5); -WARNING: there is no parameter $1 -WARNING: there is no parameter $1 Custom Scan (Citus Adaptive) (actual rows=0 loops=1) Task Count: 2 Tasks Shown: One of 2 -> Task - Error: Could not get remote plan. + Node: host=localhost port=xxxxx dbname=regression + -> Update on lineitem_290000 lineitem (actual rows=0 loops=1) + -> Seq Scan on lineitem_290000 lineitem (actual rows=6000 loops=1) ROLLBACK; \set VERBOSITY DEFAULT -- test explain in a transaction with alter table to test we use right connections @@ -1811,6 +1811,52 @@ SELECT execution_duration BETWEEN 30 AND 200 FROM worker_last_saved_explain_anal (1 row) END; +-- +-- verify we handle parametrized queries properly +-- +CREATE TABLE t(a int); +INSERT INTO t VALUES (1), (2), (3); +-- simple case +PREPARE save_explain AS +SELECT $1, * FROM worker_save_query_explain_analyze('SELECT $1::int', :default_opts) as (a int); +EXECUTE save_explain(1); + ?column? | a +--------------------------------------------------------------------- + 1 | 1 +(1 row) + +deallocate save_explain; +-- Call a UDF first to make sure that we handle stacks of executorBoundParams properly. +-- +-- The prepared statement will first call f() which will force new executor run with new +-- set of parameters. Then it will call worker_save_query_explain_analyze with a +-- parametrized query. If we don't have the correct set of parameters here, it will fail. +CREATE FUNCTION f() RETURNS INT +AS $$ +PREPARE pp1 AS SELECT $1 WHERE $2 = $3; +EXECUTE pp1(4, 5, 5); +deallocate pp1; +SELECT 1$$ LANGUAGE sql volatile; +PREPARE save_explain AS + SELECT $1, CASE WHEN i < 2 THEN + f() = 1 + ELSE + EXISTS(SELECT * FROM worker_save_query_explain_analyze('SELECT $1::int', :default_opts) as (a int) + WHERE a = 1) + END + FROM generate_series(1, 4) i; +EXECUTE save_explain(1); + ?column? | exists +--------------------------------------------------------------------- + 1 | t + 1 | t + 1 | t + 1 | t +(4 rows) + +deallocate save_explain; +DROP FUNCTION f(); +DROP TABLE t; SELECT * FROM explain_analyze_test ORDER BY a; a | b --------------------------------------------------------------------- @@ -1829,6 +1875,7 @@ SET client_min_messages TO WARNING; SELECT create_distributed_table('explain_analyze_test', 'a'); \set default_analyze_flags '(ANALYZE on, COSTS off, TIMING off, SUMMARY off)' +\set default_explain_flags '(ANALYZE off, COSTS off, TIMING off, SUMMARY off)' -- router SELECT EXPLAIN :default_analyze_flags SELECT * FROM explain_analyze_test WHERE a = 1; Custom Scan (Citus Adaptive) (actual rows=1 loops=1) @@ -2339,6 +2386,20 @@ Custom Scan (Citus Adaptive) (actual rows=1 loops=1) -> Aggregate (actual rows=1 loops=1) -> Function Scan on read_intermediate_result intermediate_result (actual rows=10 loops=1) ROLLBACK; +-- https://github.com/citusdata/citus/issues/4074 +prepare ref_select(int) AS select * from ref_table where 1 = $1; +explain :default_analyze_flags execute ref_select(1); +Custom Scan (Citus Adaptive) (actual rows=10 loops=1) + Task Count: 1 + Tuple data received from nodes: 11 bytes + Tasks Shown: All + -> Task + Tuple data received from node: 11 bytes + Node: host=localhost port=xxxxx dbname=regression + -> Result (actual rows=10 loops=1) + One-Time Filter: (1 = $1) + -> Seq Scan on ref_table_570021 ref_table (actual rows=10 loops=1) +deallocate ref_select; DROP TABLE ref_table, dist_table; -- test EXPLAIN ANALYZE with different replication factors SET citus.shard_count = 2; @@ -2600,3 +2661,255 @@ Custom Scan (Citus Adaptive) (actual rows=1 loops=1) Filter: (a = 1) Rows Removed by Filter: 3 DROP TABLE dist_table_rep1, dist_table_rep2; +-- https://github.com/citusdata/citus/issues/2009 +CREATE TABLE simple (id integer, name text); +SELECT create_distributed_table('simple', 'id'); + +PREPARE simple_router AS SELECT *, $1 FROM simple WHERE id = 1; +EXPLAIN :default_explain_flags EXECUTE simple_router(1); +Custom Scan (Citus Adaptive) + Task Count: 1 + Tasks Shown: All + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Seq Scan on simple_570026 simple + Filter: (id = 1) +EXPLAIN :default_analyze_flags EXECUTE simple_router(1); +Custom Scan (Citus Adaptive) (actual rows=0 loops=1) + Task Count: 1 + Tuple data received from nodes: 0 bytes + Tasks Shown: All + -> Task + Tuple data received from node: 0 bytes + Node: host=localhost port=xxxxx dbname=regression + -> Seq Scan on simple_570026 simple (actual rows=0 loops=1) + Filter: (id = 1) +EXPLAIN :default_analyze_flags EXECUTE simple_router(1); +Custom Scan (Citus Adaptive) (actual rows=0 loops=1) + Task Count: 1 + Tuple data received from nodes: 0 bytes + Tasks Shown: All + -> Task + Tuple data received from node: 0 bytes + Node: host=localhost port=xxxxx dbname=regression + -> Seq Scan on simple_570026 simple (actual rows=0 loops=1) + Filter: (id = 1) +EXPLAIN :default_analyze_flags EXECUTE simple_router(1); +Custom Scan (Citus Adaptive) (actual rows=0 loops=1) + Task Count: 1 + Tuple data received from nodes: 0 bytes + Tasks Shown: All + -> Task + Tuple data received from node: 0 bytes + Node: host=localhost port=xxxxx dbname=regression + -> Seq Scan on simple_570026 simple (actual rows=0 loops=1) + Filter: (id = 1) +EXPLAIN :default_analyze_flags EXECUTE simple_router(1); +Custom Scan (Citus Adaptive) (actual rows=0 loops=1) + Task Count: 1 + Tuple data received from nodes: 0 bytes + Tasks Shown: All + -> Task + Tuple data received from node: 0 bytes + Node: host=localhost port=xxxxx dbname=regression + -> Seq Scan on simple_570026 simple (actual rows=0 loops=1) + Filter: (id = 1) +EXPLAIN :default_analyze_flags EXECUTE simple_router(1); +Custom Scan (Citus Adaptive) (actual rows=0 loops=1) + Task Count: 1 + Tuple data received from nodes: 0 bytes + Tasks Shown: All + -> Task + Tuple data received from node: 0 bytes + Node: host=localhost port=xxxxx dbname=regression + -> Seq Scan on simple_570026 simple (actual rows=0 loops=1) + Filter: (id = 1) +EXPLAIN :default_analyze_flags EXECUTE simple_router(1); +Custom Scan (Citus Adaptive) (actual rows=0 loops=1) + Task Count: 1 + Tuple data received from nodes: 0 bytes + Tasks Shown: All + -> Task + Tuple data received from node: 0 bytes + Node: host=localhost port=xxxxx dbname=regression + -> Seq Scan on simple_570026 simple (actual rows=0 loops=1) + Filter: (id = 1) +EXPLAIN :default_analyze_flags EXECUTE simple_router(1); +Custom Scan (Citus Adaptive) (actual rows=0 loops=1) + Task Count: 1 + Tuple data received from nodes: 0 bytes + Tasks Shown: All + -> Task + Tuple data received from node: 0 bytes + Node: host=localhost port=xxxxx dbname=regression + -> Seq Scan on simple_570026 simple (actual rows=0 loops=1) + Filter: (id = 1) +deallocate simple_router; +-- prepared multi-row insert +PREPARE insert_query AS INSERT INTO simple VALUES ($1, 2), (2, $2); +EXPLAIN :default_explain_flags EXECUTE insert_query(3, 4); +Custom Scan (Citus Adaptive) + Task Count: 2 + Tasks Shown: One of 2 + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Insert on simple_570026 citus_table_alias + -> Result +EXPLAIN :default_analyze_flags EXECUTE insert_query(3, 4); +Custom Scan (Citus Adaptive) (actual rows=0 loops=1) + Task Count: 2 + Tasks Shown: One of 2 + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Insert on simple_570026 citus_table_alias (actual rows=0 loops=1) + -> Result (actual rows=1 loops=1) +deallocate insert_query; +-- prepared updates +PREPARE update_query AS UPDATE simple SET name=$1 WHERE name=$2; +EXPLAIN :default_explain_flags EXECUTE update_query('x', 'y'); +Custom Scan (Citus Adaptive) + Task Count: 2 + Tasks Shown: One of 2 + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Update on simple_570026 simple + -> Seq Scan on simple_570026 simple + Filter: (name = 'y'::text) +EXPLAIN :default_analyze_flags EXECUTE update_query('x', 'y'); +Custom Scan (Citus Adaptive) (actual rows=0 loops=1) + Task Count: 2 + Tasks Shown: One of 2 + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Update on simple_570026 simple (actual rows=0 loops=1) + -> Seq Scan on simple_570026 simple (actual rows=0 loops=1) + Filter: (name = $2) + Rows Removed by Filter: 1 +deallocate update_query; +-- prepared deletes +PREPARE delete_query AS DELETE FROM simple WHERE name=$1 OR name=$2; +EXPLAIN EXECUTE delete_query('x', 'y'); +Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=0 width=0) + Task Count: 2 + Tasks Shown: One of 2 + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Delete on simple_570026 simple (cost=0.00..29.05 rows=13 width=6) + -> Seq Scan on simple_570026 simple (cost=0.00..29.05 rows=13 width=6) + Filter: ((name = 'x'::text) OR (name = 'y'::text)) +EXPLAIN :default_analyze_flags EXECUTE delete_query('x', 'y'); +Custom Scan (Citus Adaptive) (actual rows=0 loops=1) + Task Count: 2 + Tasks Shown: One of 2 + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Delete on simple_570026 simple (actual rows=0 loops=1) + -> Seq Scan on simple_570026 simple (actual rows=0 loops=1) + Filter: ((name = $1) OR (name = $2)) + Rows Removed by Filter: 1 +deallocate delete_query; +-- prepared distributed insert/select +-- we don't support EXPLAIN for prepared insert/selects of other types. +PREPARE distributed_insert_select AS INSERT INTO simple SELECT * FROM simple WHERE name IN ($1, $2); +EXPLAIN :default_explain_flags EXECUTE distributed_insert_select('x', 'y'); +Custom Scan (Citus Adaptive) + Task Count: 2 + Tasks Shown: One of 2 + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Insert on simple_570026 citus_table_alias + -> Seq Scan on simple_570026 simple + Filter: ((name = ANY ('{x,y}'::text[])) AND (worker_hash(id) >= '-2147483648'::integer) AND (worker_hash(id) <= '-1'::integer)) +EXPLAIN :default_analyze_flags EXECUTE distributed_insert_select('x', 'y'); +Custom Scan (Citus Adaptive) (actual rows=0 loops=1) + Task Count: 2 + Tasks Shown: One of 2 + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Insert on simple_570026 citus_table_alias (actual rows=0 loops=1) + -> Seq Scan on simple_570026 simple (actual rows=0 loops=1) + Filter: ((name = ANY (ARRAY[$1, $2])) AND (worker_hash(id) >= '-2147483648'::integer) AND (worker_hash(id) <= '-1'::integer)) + Rows Removed by Filter: 1 +deallocate distributed_insert_select; +-- prepared cte +BEGIN; +PREPARE cte_query AS +WITH keys AS ( + SELECT count(*) FROM + (SELECT DISTINCT l_orderkey, GREATEST(random(), 2) FROM lineitem_hash_part WHERE l_quantity > $1) t +), +series AS ( + SELECT s FROM generate_series(1, $2) s +), +delete_result AS ( + DELETE FROM lineitem_hash_part WHERE l_quantity < $3 RETURNING * +) +SELECT s FROM series; +EXPLAIN :default_explain_flags EXECUTE cte_query(2, 10, -1); +Custom Scan (Citus Adaptive) + -> Distributed Subplan XXX_1 + -> Custom Scan (Citus Adaptive) + Task Count: 4 + Tasks Shown: One of 4 + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Delete on lineitem_hash_part_360041 lineitem_hash_part + -> Seq Scan on lineitem_hash_part_360041 lineitem_hash_part + Filter: (l_quantity < '-1'::numeric) + Task Count: 1 + Tasks Shown: All + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Function Scan on generate_series s +EXPLAIN :default_analyze_flags EXECUTE cte_query(2, 10, -1); +Custom Scan (Citus Adaptive) (actual rows=10 loops=1) + -> Distributed Subplan XXX_1 + Intermediate Data Size: 0 bytes + Result destination: Send to 0 nodes + -> Custom Scan (Citus Adaptive) (actual rows=0 loops=1) + Task Count: 4 + Tuple data received from nodes: 0 bytes + Tasks Shown: One of 4 + -> Task + Tuple data received from node: 0 bytes + Node: host=localhost port=xxxxx dbname=regression + -> Delete on lineitem_hash_part_360041 lineitem_hash_part (actual rows=0 loops=1) + -> Seq Scan on lineitem_hash_part_360041 lineitem_hash_part (actual rows=0 loops=1) + Filter: (l_quantity < '-1'::numeric) + Rows Removed by Filter: 2885 + Task Count: 1 + Tuple data received from nodes: 11 bytes + Tasks Shown: All + -> Task + Tuple data received from node: 11 bytes + Node: host=localhost port=xxxxx dbname=regression + -> Function Scan on generate_series s (actual rows=10 loops=1) +ROLLBACK; +-- https://github.com/citusdata/citus/issues/2009#issuecomment-653036502 +CREATE TABLE users_table_2 (user_id int primary key, time timestamp, value_1 int, value_2 int, value_3 float, value_4 bigint); +SELECT create_reference_table('users_table_2'); + +PREPARE p4 (int, int) AS insert into users_table_2 ( value_1, user_id) select value_1, user_id + $2 FROM users_table_2 ON CONFLICT (user_id) DO UPDATE SET value_2 = EXCLUDED.value_1 + $1; +EXPLAIN :default_explain_flags execute p4(20,20); +Custom Scan (Citus Adaptive) + Task Count: 1 + Tasks Shown: All + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Insert on users_table_2_570028 citus_table_alias + Conflict Resolution: UPDATE + Conflict Arbiter Indexes: users_table_2_pkey_570028 + -> Seq Scan on users_table_2_570028 users_table_2 +EXPLAIN :default_analyze_flags execute p4(20,20); +Custom Scan (Citus Adaptive) (actual rows=0 loops=1) + Task Count: 1 + Tasks Shown: All + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Insert on users_table_2_570028 citus_table_alias (actual rows=0 loops=1) + Conflict Resolution: UPDATE + Conflict Arbiter Indexes: users_table_2_pkey_570028 + Tuples Inserted: 0 + Conflicting Tuples: 0 + -> Seq Scan on users_table_2_570028 users_table_2 (actual rows=0 loops=1) +DROP TABLE simple, users_table_2; diff --git a/src/test/regress/sql/multi_explain.sql b/src/test/regress/sql/multi_explain.sql index 191903646..00259bef2 100644 --- a/src/test/regress/sql/multi_explain.sql +++ b/src/test/regress/sql/multi_explain.sql @@ -751,6 +751,46 @@ SELECT count(*) FROM worker_save_query_explain_analyze('SELECT pg_sleep(0.05)', SELECT execution_duration BETWEEN 30 AND 200 FROM worker_last_saved_explain_analyze(); END; +-- +-- verify we handle parametrized queries properly +-- + +CREATE TABLE t(a int); +INSERT INTO t VALUES (1), (2), (3); + +-- simple case +PREPARE save_explain AS +SELECT $1, * FROM worker_save_query_explain_analyze('SELECT $1::int', :default_opts) as (a int); +EXECUTE save_explain(1); +deallocate save_explain; + + +-- Call a UDF first to make sure that we handle stacks of executorBoundParams properly. +-- +-- The prepared statement will first call f() which will force new executor run with new +-- set of parameters. Then it will call worker_save_query_explain_analyze with a +-- parametrized query. If we don't have the correct set of parameters here, it will fail. +CREATE FUNCTION f() RETURNS INT +AS $$ +PREPARE pp1 AS SELECT $1 WHERE $2 = $3; +EXECUTE pp1(4, 5, 5); +deallocate pp1; +SELECT 1$$ LANGUAGE sql volatile; + +PREPARE save_explain AS + SELECT $1, CASE WHEN i < 2 THEN + f() = 1 + ELSE + EXISTS(SELECT * FROM worker_save_query_explain_analyze('SELECT $1::int', :default_opts) as (a int) + WHERE a = 1) + END + FROM generate_series(1, 4) i; +EXECUTE save_explain(1); + +deallocate save_explain; +DROP FUNCTION f(); +DROP TABLE t; + SELECT * FROM explain_analyze_test ORDER BY a; \a\t @@ -764,6 +804,7 @@ SET client_min_messages TO WARNING; SELECT create_distributed_table('explain_analyze_test', 'a'); \set default_analyze_flags '(ANALYZE on, COSTS off, TIMING off, SUMMARY off)' +\set default_explain_flags '(ANALYZE off, COSTS off, TIMING off, SUMMARY off)' -- router SELECT EXPLAIN :default_analyze_flags SELECT * FROM explain_analyze_test WHERE a = 1; @@ -871,6 +912,11 @@ WITH r AS ( SELECT count(distinct a2) FROM s; ROLLBACK; +-- https://github.com/citusdata/citus/issues/4074 +prepare ref_select(int) AS select * from ref_table where 1 = $1; +explain :default_analyze_flags execute ref_select(1); +deallocate ref_select; + DROP TABLE ref_table, dist_table; -- test EXPLAIN ANALYZE with different replication factors @@ -916,3 +962,74 @@ EXPLAIN :default_analyze_flags EXECUTE p3; EXPLAIN :default_analyze_flags EXECUTE p3; DROP TABLE dist_table_rep1, dist_table_rep2; + +-- https://github.com/citusdata/citus/issues/2009 +CREATE TABLE simple (id integer, name text); +SELECT create_distributed_table('simple', 'id'); +PREPARE simple_router AS SELECT *, $1 FROM simple WHERE id = 1; + +EXPLAIN :default_explain_flags EXECUTE simple_router(1); +EXPLAIN :default_analyze_flags EXECUTE simple_router(1); +EXPLAIN :default_analyze_flags EXECUTE simple_router(1); +EXPLAIN :default_analyze_flags EXECUTE simple_router(1); +EXPLAIN :default_analyze_flags EXECUTE simple_router(1); +EXPLAIN :default_analyze_flags EXECUTE simple_router(1); +EXPLAIN :default_analyze_flags EXECUTE simple_router(1); +EXPLAIN :default_analyze_flags EXECUTE simple_router(1); + +deallocate simple_router; + +-- prepared multi-row insert +PREPARE insert_query AS INSERT INTO simple VALUES ($1, 2), (2, $2); +EXPLAIN :default_explain_flags EXECUTE insert_query(3, 4); +EXPLAIN :default_analyze_flags EXECUTE insert_query(3, 4); +deallocate insert_query; + +-- prepared updates +PREPARE update_query AS UPDATE simple SET name=$1 WHERE name=$2; +EXPLAIN :default_explain_flags EXECUTE update_query('x', 'y'); +EXPLAIN :default_analyze_flags EXECUTE update_query('x', 'y'); +deallocate update_query; + +-- prepared deletes +PREPARE delete_query AS DELETE FROM simple WHERE name=$1 OR name=$2; +EXPLAIN EXECUTE delete_query('x', 'y'); +EXPLAIN :default_analyze_flags EXECUTE delete_query('x', 'y'); +deallocate delete_query; + +-- prepared distributed insert/select +-- we don't support EXPLAIN for prepared insert/selects of other types. +PREPARE distributed_insert_select AS INSERT INTO simple SELECT * FROM simple WHERE name IN ($1, $2); +EXPLAIN :default_explain_flags EXECUTE distributed_insert_select('x', 'y'); +EXPLAIN :default_analyze_flags EXECUTE distributed_insert_select('x', 'y'); +deallocate distributed_insert_select; + +-- prepared cte +BEGIN; +PREPARE cte_query AS +WITH keys AS ( + SELECT count(*) FROM + (SELECT DISTINCT l_orderkey, GREATEST(random(), 2) FROM lineitem_hash_part WHERE l_quantity > $1) t +), +series AS ( + SELECT s FROM generate_series(1, $2) s +), +delete_result AS ( + DELETE FROM lineitem_hash_part WHERE l_quantity < $3 RETURNING * +) +SELECT s FROM series; + +EXPLAIN :default_explain_flags EXECUTE cte_query(2, 10, -1); +EXPLAIN :default_analyze_flags EXECUTE cte_query(2, 10, -1); + +ROLLBACK; + +-- https://github.com/citusdata/citus/issues/2009#issuecomment-653036502 +CREATE TABLE users_table_2 (user_id int primary key, time timestamp, value_1 int, value_2 int, value_3 float, value_4 bigint); +SELECT create_reference_table('users_table_2'); + +PREPARE p4 (int, int) AS insert into users_table_2 ( value_1, user_id) select value_1, user_id + $2 FROM users_table_2 ON CONFLICT (user_id) DO UPDATE SET value_2 = EXCLUDED.value_1 + $1; +EXPLAIN :default_explain_flags execute p4(20,20); +EXPLAIN :default_analyze_flags execute p4(20,20); + +DROP TABLE simple, users_table_2;