Support EXPLAIN EXECUTE ANALYZE.

pull/4068/head
Hadi Moshayedi 2020-07-24 16:42:02 -07:00
parent e500779ddd
commit 7b74eca22d
8 changed files with 613 additions and 47 deletions

View File

@ -618,6 +618,8 @@ static TaskPlacementExecution * PopAssignedPlacementExecution(WorkerSession *ses
static TaskPlacementExecution * PopUnassignedPlacementExecution(WorkerPool *workerPool);
static bool StartPlacementExecutionOnSession(TaskPlacementExecution *placementExecution,
WorkerSession *session);
static bool SendNextQuery(TaskPlacementExecution *placementExecution,
WorkerSession *session);
static void ConnectionStateMachine(WorkerSession *session);
static void HandleMultiConnectionSuccess(WorkerSession *session);
static bool HasAnyConnectionFailure(WorkerPool *workerPool);
@ -709,6 +711,11 @@ AdaptiveExecutor(CitusScanState *scanState)
if (RequestedForExplainAnalyze(scanState))
{
/*
* We use multiple queries per task in EXPLAIN ANALYZE which need to
* be part of the same transaction.
*/
UseCoordinatedTransaction();
taskList = ExplainAnalyzeTaskList(taskList, defaultTupleDest, tupleDescriptor,
paramListInfo);
}
@ -3281,6 +3288,32 @@ TransactionStateMachine(WorkerSession *session)
break;
}
/* if this is a multi-query task, send the next query */
if (placementExecution->queryIndex < task->queryCount)
{
bool querySent = SendNextQuery(placementExecution, session);
if (!querySent)
{
/* no need to continue, connection is lost */
Assert(session->connection->connectionState ==
MULTI_CONNECTION_LOST);
return;
}
/*
* At this point the query might be just in pgconn buffers. We
* need to wait until it becomes writeable to actually send
* the query.
*/
UpdateConnectionWaitFlags(session,
WL_SOCKET_WRITEABLE | WL_SOCKET_READABLE);
transaction->transactionState = REMOTE_TRANS_SENT_COMMAND;
break;
}
shardCommandExecution->gotResults = true;
transaction->transactionState = REMOTE_TRANS_CLEARING_RESULTS;
break;
@ -3454,7 +3487,7 @@ PopUnassignedPlacementExecution(WorkerPool *workerPool)
/*
* StartPlacementExecutionOnSession gets a TaskPlacementExecition and
* StartPlacementExecutionOnSession gets a TaskPlacementExecution and
* WorkerSession, the task's query is sent to the worker via the session.
*
* The function does some bookkeeping such as associating the placement
@ -3470,17 +3503,13 @@ StartPlacementExecutionOnSession(TaskPlacementExecution *placementExecution,
{
WorkerPool *workerPool = session->workerPool;
DistributedExecution *execution = workerPool->distributedExecution;
ParamListInfo paramListInfo = execution->paramListInfo;
MultiConnection *connection = session->connection;
ShardCommandExecution *shardCommandExecution =
placementExecution->shardCommandExecution;
bool binaryResults = shardCommandExecution->binaryResults;
Task *task = shardCommandExecution->task;
ShardPlacement *taskPlacement = placementExecution->shardPlacement;
List *placementAccessList = PlacementAccessListForTask(task, taskPlacement);
int querySent = 0;
char *queryString = TaskQueryString(task);
if (execution->transactionProperties->useRemoteTransactionBlocks !=
TRANSACTION_BLOCKS_DISALLOWED)
@ -3492,11 +3521,7 @@ StartPlacementExecutionOnSession(TaskPlacementExecution *placementExecution,
AssignPlacementListToConnection(placementAccessList, connection);
}
/* one more command is sent over the session */
session->commandsSent++;
if (session->commandsSent == 1)
if (session->commandsSent == 0)
{
/* first time we send a command, consider the connection used (not unused) */
workerPool->unusedConnectionCount--;
@ -3507,6 +3532,38 @@ StartPlacementExecutionOnSession(TaskPlacementExecution *placementExecution,
session->currentTask = placementExecution;
placementExecution->executionState = PLACEMENT_EXECUTION_RUNNING;
bool querySent = SendNextQuery(placementExecution, session);
if (querySent)
{
session->commandsSent++;
}
return querySent;
}
/*
* SendNextQuery sends the next query for placementExecution on the given
* session.
*/
static bool
SendNextQuery(TaskPlacementExecution *placementExecution,
WorkerSession *session)
{
WorkerPool *workerPool = session->workerPool;
DistributedExecution *execution = workerPool->distributedExecution;
MultiConnection *connection = session->connection;
ShardCommandExecution *shardCommandExecution =
placementExecution->shardCommandExecution;
bool binaryResults = shardCommandExecution->binaryResults;
Task *task = shardCommandExecution->task;
ParamListInfo paramListInfo = execution->paramListInfo;
int querySent = 0;
uint32 queryIndex = placementExecution->queryIndex;
Assert(queryIndex < task->queryCount);
char *queryString = TaskQueryStringAtIndex(task, queryIndex);
if (paramListInfo != NULL && !task->parametersInQueryStringResolved)
{
int parameterCount = paramListInfo->numParams;
@ -3659,6 +3716,12 @@ ReceiveResults(WorkerSession *session, bool storeRows)
}
uint32 queryIndex = placementExecution->queryIndex;
if (queryIndex >= task->queryCount)
{
ereport(ERROR, (errmsg("unexpected query index while processing"
" query results")));
}
TupleDesc tupleDescriptor = tupleDest->tupleDescForQuery(tupleDest, queryIndex);
if (tupleDescriptor == NULL)
{

View File

@ -57,6 +57,12 @@
int MultiShardConnectionType = PARALLEL_CONNECTION;
bool WritableStandbyCoordinator = false;
/*
* Pointer to bound parameters of the current ongoing call to ExecutorRun.
* If executor is not running, then this value is meaningless.
*/
ParamListInfo executorBoundParams = NULL;
/* sort the returning to get consistent outputs, used only for testing */
bool SortReturning = false;
@ -129,6 +135,14 @@ CitusExecutorRun(QueryDesc *queryDesc,
{
DestReceiver *dest = queryDesc->dest;
ParamListInfo savedBoundParams = executorBoundParams;
/*
* Save a pointer to query params so UDFs can access them by calling
* ExecutorBoundParams().
*/
executorBoundParams = queryDesc->params;
/*
* We do some potentially time consuming operations our self now before we hand of
* control to postgres' executor. To make sure that time spent is accurately measured
@ -209,6 +223,7 @@ CitusExecutorRun(QueryDesc *queryDesc,
queryDesc->totaltime = totalTime;
}
executorBoundParams = savedBoundParams;
ExecutorLevel--;
if (ExecutorLevel == 0 && PlannerLevel == 0)
@ -228,6 +243,7 @@ CitusExecutorRun(QueryDesc *queryDesc,
queryDesc->totaltime = totalTime;
}
executorBoundParams = savedBoundParams;
ExecutorLevel--;
PG_RE_THROW();
@ -691,3 +707,16 @@ AlterTableConstraintCheck(QueryDesc *queryDesc)
return true;
}
/*
* ExecutorBoundParams returns the bound parameters of the current ongoing call
* to ExecutorRun. This is meant to be used by UDFs which need to access bound
* parameters.
*/
ParamListInfo
ExecutorBoundParams(void)
{
Assert(ExecutorLevel > 0);
return executorBoundParams;
}

View File

@ -513,6 +513,25 @@ GetTaskQueryType(Task *task)
}
/*
* TaskQueryStringAtIndex returns query at given index among the possibly
* multiple queries that a task can have.
*/
char *
TaskQueryStringAtIndex(Task *task, int index)
{
Assert(index < task->queryCount);
int taskQueryType = GetTaskQueryType(task);
if (taskQueryType == TASK_QUERY_TEXT_LIST)
{
return list_nth(task->taskQuery.data.queryStringList, index);
}
return TaskQueryString(task);
}
/*
* TaskQueryString generates task query string text if missing.
*

View File

@ -116,12 +116,16 @@ typedef struct ExplainAnalyzeDestination
/* Explain functions for distributed queries */
static void ExplainSubPlans(DistributedPlan *distributedPlan, ExplainState *es);
static void ExplainJob(CitusScanState *scanState, Job *job, ExplainState *es);
static void ExplainJob(CitusScanState *scanState, Job *job, ExplainState *es,
ParamListInfo params);
static void ExplainMapMergeJob(MapMergeJob *mapMergeJob, ExplainState *es);
static void ExplainTaskList(CitusScanState *scanState, List *taskList, ExplainState *es);
static RemoteExplainPlan * RemoteExplain(Task *task, ExplainState *es);
static void ExplainTaskList(CitusScanState *scanState, List *taskList, ExplainState *es,
ParamListInfo params);
static RemoteExplainPlan * RemoteExplain(Task *task, ExplainState *es, ParamListInfo
params);
static RemoteExplainPlan * GetSavedRemoteExplain(Task *task, ExplainState *es);
static RemoteExplainPlan * FetchRemoteExplainFromWorkers(Task *task, ExplainState *es);
static RemoteExplainPlan * FetchRemoteExplainFromWorkers(Task *task, ExplainState *es,
ParamListInfo params);
static void ExplainTask(CitusScanState *scanState, Task *task, int placementIndex,
List *explainOutputList,
ExplainState *es);
@ -175,6 +179,8 @@ CitusExplainScan(CustomScanState *node, List *ancestors, struct ExplainState *es
{
CitusScanState *scanState = (CitusScanState *) node;
DistributedPlan *distributedPlan = scanState->distributedPlan;
EState *executorState = ScanStateGetExecutorState(scanState);
ParamListInfo params = executorState->es_param_list_info;
if (!ExplainDistributedQueries)
{
@ -191,7 +197,7 @@ CitusExplainScan(CustomScanState *node, List *ancestors, struct ExplainState *es
ExplainSubPlans(distributedPlan, es);
}
ExplainJob(scanState, distributedPlan->workerJob, es);
ExplainJob(scanState, distributedPlan->workerJob, es, params);
ExplainCloseGroup("Distributed Query", "Distributed Query", true, es);
}
@ -377,7 +383,8 @@ ShowReceivedTupleData(CitusScanState *scanState, ExplainState *es)
* or all tasks if citus.explain_all_tasks is on.
*/
static void
ExplainJob(CitusScanState *scanState, Job *job, ExplainState *es)
ExplainJob(CitusScanState *scanState, Job *job, ExplainState *es,
ParamListInfo params)
{
List *dependentJobList = job->dependentJobList;
int dependentJobCount = list_length(dependentJobList);
@ -426,7 +433,7 @@ ExplainJob(CitusScanState *scanState, Job *job, ExplainState *es)
{
ExplainOpenGroup("Tasks", "Tasks", false, es);
ExplainTaskList(scanState, taskList, es);
ExplainTaskList(scanState, taskList, es, params);
ExplainCloseGroup("Tasks", "Tasks", false, es);
}
@ -526,7 +533,8 @@ ExplainMapMergeJob(MapMergeJob *mapMergeJob, ExplainState *es)
* or all tasks if citus.explain_all_tasks is on.
*/
static void
ExplainTaskList(CitusScanState *scanState, List *taskList, ExplainState *es)
ExplainTaskList(CitusScanState *scanState, List *taskList, ExplainState *es,
ParamListInfo params)
{
ListCell *taskCell = NULL;
ListCell *remoteExplainCell = NULL;
@ -539,7 +547,7 @@ ExplainTaskList(CitusScanState *scanState, List *taskList, ExplainState *es)
{
Task *task = (Task *) lfirst(taskCell);
RemoteExplainPlan *remoteExplain = RemoteExplain(task, es);
RemoteExplainPlan *remoteExplain = RemoteExplain(task, es, params);
remoteExplainList = lappend(remoteExplainList, remoteExplain);
if (!ExplainAllTasks)
@ -564,7 +572,7 @@ ExplainTaskList(CitusScanState *scanState, List *taskList, ExplainState *es)
* RemoteExplain fetches the remote EXPLAIN output for a single task.
*/
static RemoteExplainPlan *
RemoteExplain(Task *task, ExplainState *es)
RemoteExplain(Task *task, ExplainState *es, ParamListInfo params)
{
/*
* For EXPLAIN EXECUTE we still use the old method, so task->fetchedExplainAnalyzePlan
@ -576,7 +584,7 @@ RemoteExplain(Task *task, ExplainState *es)
}
else
{
return FetchRemoteExplainFromWorkers(task, es);
return FetchRemoteExplainFromWorkers(task, es, params);
}
}
@ -619,7 +627,7 @@ GetSavedRemoteExplain(Task *task, ExplainState *es)
* one succeeds or all failed.
*/
static RemoteExplainPlan *
FetchRemoteExplainFromWorkers(Task *task, ExplainState *es)
FetchRemoteExplainFromWorkers(Task *task, ExplainState *es, ParamListInfo params)
{
List *taskPlacementList = task->taskPlacementList;
int placementCount = list_length(taskPlacementList);
@ -638,7 +646,6 @@ FetchRemoteExplainFromWorkers(Task *task, ExplainState *es)
for (int placementIndex = 0; placementIndex < placementCount; placementIndex++)
{
ShardPlacement *taskPlacement = list_nth(taskPlacementList, placementIndex);
PGresult *queryResult = NULL;
int connectionFlags = 0;
remotePlan->placementIndex = placementIndex;
@ -668,14 +675,28 @@ FetchRemoteExplainFromWorkers(Task *task, ExplainState *es)
ExecuteCriticalRemoteCommand(connection, "SAVEPOINT citus_explain_savepoint");
/* run explain query */
int executeResult = ExecuteOptionalRemoteCommand(connection, explainQuery->data,
&queryResult);
if (executeResult != 0)
{
PQclear(queryResult);
ForgetResults(connection);
int numParams = params ? params->numParams : 0;
Oid *paramTypes = NULL;
const char **paramValues = NULL;
PGresult *queryResult = NULL;
continue;
if (params)
{
ExtractParametersFromParamList(params, &paramTypes, &paramValues, false);
}
int sendStatus = SendRemoteCommandParams(connection, explainQuery->data,
numParams, paramTypes, paramValues,
false);
if (sendStatus != 0)
{
queryResult = GetRemoteCommandResult(connection, false);
if (!IsResponseOK(queryResult))
{
PQclear(queryResult);
ForgetResults(connection);
continue;
}
}
/* read explain query results */
@ -965,7 +986,18 @@ worker_save_query_explain_analyze(PG_FUNCTION_ARGS)
RawStmt *parseTree = linitial(parseTreeList);
List *queryList = pg_analyze_and_rewrite(parseTree, queryString, NULL, 0, NULL);
ParamListInfo boundParams = ExecutorBoundParams();
int numParams = boundParams ? boundParams->numParams : 0;
Oid *paramTypes = NULL;
const char **paramValues = NULL;
if (boundParams != NULL)
{
ExtractParametersFromParamList(boundParams, &paramTypes, &paramValues, false);
}
List *queryList = pg_analyze_and_rewrite(parseTree, queryString, paramTypes,
numParams, NULL);
if (list_length(queryList) != 1)
{
ereport(ERROR, (errmsg("cannot EXPLAIN ANALYZE a query rewritten "
@ -988,7 +1020,7 @@ worker_save_query_explain_analyze(PG_FUNCTION_ARGS)
INSTR_TIME_SUBTRACT(planDuration, planStart);
/* do the actual EXPLAIN ANALYZE */
ExplainWorkerPlan(plan, tupleStoreDest, es, queryString, NULL, NULL,
ExplainWorkerPlan(plan, tupleStoreDest, es, queryString, boundParams, NULL,
&planDuration, &executionDurationMillisec);
ExplainEndOutput(es);
@ -1339,15 +1371,6 @@ ExplainAnalyzeTaskList(List *originalTaskList,
List *explainAnalyzeTaskList = NIL;
Task *originalTask = NULL;
/*
* We cannot use multiple commands in a prepared statement, so use the old
* EXPLAIN ANALYZE method for this case.
*/
if (params != NULL)
{
return originalTaskList;
}
foreach_ptr(originalTask, originalTaskList)
{
if (originalTask->queryCount != 1)

View File

@ -27,6 +27,7 @@ extern void SetTaskQueryIfShouldLazyDeparse(Task *task, Query *query);
extern void SetTaskQueryString(Task *task, char *queryString);
extern void SetTaskQueryStringList(Task *task, List *queryStringList);
extern char * TaskQueryString(Task *task);
extern char * TaskQueryStringAtIndex(Task *task, int index);
extern bool UpdateRelationsToLocalShardTables(Node *node, List *relationShardList);
extern int GetTaskQueryType(Task *task);

View File

@ -146,6 +146,7 @@ extern void ExtractParametersFromParamList(ParamListInfo paramListInfo,
Oid **parameterTypes,
const char ***parameterValues, bool
useOriginalCustomTypeOids);
extern ParamListInfo ExecutorBoundParams(void);
#endif /* MULTI_EXECUTOR_H */

View File

@ -1283,23 +1283,23 @@ Custom Scan (Citus Adaptive) (actual rows=3 loops=1)
PREPARE multi_shard_query_param(int) AS UPDATE lineitem SET l_quantity = $1;
BEGIN;
EXPLAIN EXECUTE multi_shard_query_param(5);
WARNING: there is no parameter $1
WARNING: there is no parameter $1
Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=0 width=0)
Task Count: 2
Tasks Shown: One of 2
-> Task
Error: Could not get remote plan.
Node: host=localhost port=xxxxx dbname=regression
-> Update on lineitem_290000 lineitem (cost=0.00..176.00 rows=6000 width=140)
-> Seq Scan on lineitem_290000 lineitem (cost=0.00..176.00 rows=6000 width=140)
ROLLBACK;
BEGIN;
EXPLAIN (ANALYZE ON, COSTS OFF, TIMING OFF, SUMMARY OFF) EXECUTE multi_shard_query_param(5);
WARNING: there is no parameter $1
WARNING: there is no parameter $1
Custom Scan (Citus Adaptive) (actual rows=0 loops=1)
Task Count: 2
Tasks Shown: One of 2
-> Task
Error: Could not get remote plan.
Node: host=localhost port=xxxxx dbname=regression
-> Update on lineitem_290000 lineitem (actual rows=0 loops=1)
-> Seq Scan on lineitem_290000 lineitem (actual rows=6000 loops=1)
ROLLBACK;
\set VERBOSITY DEFAULT
-- test explain in a transaction with alter table to test we use right connections
@ -1811,6 +1811,52 @@ SELECT execution_duration BETWEEN 30 AND 200 FROM worker_last_saved_explain_anal
(1 row)
END;
--
-- verify we handle parametrized queries properly
--
CREATE TABLE t(a int);
INSERT INTO t VALUES (1), (2), (3);
-- simple case
PREPARE save_explain AS
SELECT $1, * FROM worker_save_query_explain_analyze('SELECT $1::int', :default_opts) as (a int);
EXECUTE save_explain(1);
?column? | a
---------------------------------------------------------------------
1 | 1
(1 row)
deallocate save_explain;
-- Call a UDF first to make sure that we handle stacks of executorBoundParams properly.
--
-- The prepared statement will first call f() which will force new executor run with new
-- set of parameters. Then it will call worker_save_query_explain_analyze with a
-- parametrized query. If we don't have the correct set of parameters here, it will fail.
CREATE FUNCTION f() RETURNS INT
AS $$
PREPARE pp1 AS SELECT $1 WHERE $2 = $3;
EXECUTE pp1(4, 5, 5);
deallocate pp1;
SELECT 1$$ LANGUAGE sql volatile;
PREPARE save_explain AS
SELECT $1, CASE WHEN i < 2 THEN
f() = 1
ELSE
EXISTS(SELECT * FROM worker_save_query_explain_analyze('SELECT $1::int', :default_opts) as (a int)
WHERE a = 1)
END
FROM generate_series(1, 4) i;
EXECUTE save_explain(1);
?column? | exists
---------------------------------------------------------------------
1 | t
1 | t
1 | t
1 | t
(4 rows)
deallocate save_explain;
DROP FUNCTION f();
DROP TABLE t;
SELECT * FROM explain_analyze_test ORDER BY a;
a | b
---------------------------------------------------------------------
@ -1829,6 +1875,7 @@ SET client_min_messages TO WARNING;
SELECT create_distributed_table('explain_analyze_test', 'a');
\set default_analyze_flags '(ANALYZE on, COSTS off, TIMING off, SUMMARY off)'
\set default_explain_flags '(ANALYZE off, COSTS off, TIMING off, SUMMARY off)'
-- router SELECT
EXPLAIN :default_analyze_flags SELECT * FROM explain_analyze_test WHERE a = 1;
Custom Scan (Citus Adaptive) (actual rows=1 loops=1)
@ -2339,6 +2386,20 @@ Custom Scan (Citus Adaptive) (actual rows=1 loops=1)
-> Aggregate (actual rows=1 loops=1)
-> Function Scan on read_intermediate_result intermediate_result (actual rows=10 loops=1)
ROLLBACK;
-- https://github.com/citusdata/citus/issues/4074
prepare ref_select(int) AS select * from ref_table where 1 = $1;
explain :default_analyze_flags execute ref_select(1);
Custom Scan (Citus Adaptive) (actual rows=10 loops=1)
Task Count: 1
Tuple data received from nodes: 11 bytes
Tasks Shown: All
-> Task
Tuple data received from node: 11 bytes
Node: host=localhost port=xxxxx dbname=regression
-> Result (actual rows=10 loops=1)
One-Time Filter: (1 = $1)
-> Seq Scan on ref_table_570021 ref_table (actual rows=10 loops=1)
deallocate ref_select;
DROP TABLE ref_table, dist_table;
-- test EXPLAIN ANALYZE with different replication factors
SET citus.shard_count = 2;
@ -2600,3 +2661,255 @@ Custom Scan (Citus Adaptive) (actual rows=1 loops=1)
Filter: (a = 1)
Rows Removed by Filter: 3
DROP TABLE dist_table_rep1, dist_table_rep2;
-- https://github.com/citusdata/citus/issues/2009
CREATE TABLE simple (id integer, name text);
SELECT create_distributed_table('simple', 'id');
PREPARE simple_router AS SELECT *, $1 FROM simple WHERE id = 1;
EXPLAIN :default_explain_flags EXECUTE simple_router(1);
Custom Scan (Citus Adaptive)
Task Count: 1
Tasks Shown: All
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> Seq Scan on simple_570026 simple
Filter: (id = 1)
EXPLAIN :default_analyze_flags EXECUTE simple_router(1);
Custom Scan (Citus Adaptive) (actual rows=0 loops=1)
Task Count: 1
Tuple data received from nodes: 0 bytes
Tasks Shown: All
-> Task
Tuple data received from node: 0 bytes
Node: host=localhost port=xxxxx dbname=regression
-> Seq Scan on simple_570026 simple (actual rows=0 loops=1)
Filter: (id = 1)
EXPLAIN :default_analyze_flags EXECUTE simple_router(1);
Custom Scan (Citus Adaptive) (actual rows=0 loops=1)
Task Count: 1
Tuple data received from nodes: 0 bytes
Tasks Shown: All
-> Task
Tuple data received from node: 0 bytes
Node: host=localhost port=xxxxx dbname=regression
-> Seq Scan on simple_570026 simple (actual rows=0 loops=1)
Filter: (id = 1)
EXPLAIN :default_analyze_flags EXECUTE simple_router(1);
Custom Scan (Citus Adaptive) (actual rows=0 loops=1)
Task Count: 1
Tuple data received from nodes: 0 bytes
Tasks Shown: All
-> Task
Tuple data received from node: 0 bytes
Node: host=localhost port=xxxxx dbname=regression
-> Seq Scan on simple_570026 simple (actual rows=0 loops=1)
Filter: (id = 1)
EXPLAIN :default_analyze_flags EXECUTE simple_router(1);
Custom Scan (Citus Adaptive) (actual rows=0 loops=1)
Task Count: 1
Tuple data received from nodes: 0 bytes
Tasks Shown: All
-> Task
Tuple data received from node: 0 bytes
Node: host=localhost port=xxxxx dbname=regression
-> Seq Scan on simple_570026 simple (actual rows=0 loops=1)
Filter: (id = 1)
EXPLAIN :default_analyze_flags EXECUTE simple_router(1);
Custom Scan (Citus Adaptive) (actual rows=0 loops=1)
Task Count: 1
Tuple data received from nodes: 0 bytes
Tasks Shown: All
-> Task
Tuple data received from node: 0 bytes
Node: host=localhost port=xxxxx dbname=regression
-> Seq Scan on simple_570026 simple (actual rows=0 loops=1)
Filter: (id = 1)
EXPLAIN :default_analyze_flags EXECUTE simple_router(1);
Custom Scan (Citus Adaptive) (actual rows=0 loops=1)
Task Count: 1
Tuple data received from nodes: 0 bytes
Tasks Shown: All
-> Task
Tuple data received from node: 0 bytes
Node: host=localhost port=xxxxx dbname=regression
-> Seq Scan on simple_570026 simple (actual rows=0 loops=1)
Filter: (id = 1)
EXPLAIN :default_analyze_flags EXECUTE simple_router(1);
Custom Scan (Citus Adaptive) (actual rows=0 loops=1)
Task Count: 1
Tuple data received from nodes: 0 bytes
Tasks Shown: All
-> Task
Tuple data received from node: 0 bytes
Node: host=localhost port=xxxxx dbname=regression
-> Seq Scan on simple_570026 simple (actual rows=0 loops=1)
Filter: (id = 1)
deallocate simple_router;
-- prepared multi-row insert
PREPARE insert_query AS INSERT INTO simple VALUES ($1, 2), (2, $2);
EXPLAIN :default_explain_flags EXECUTE insert_query(3, 4);
Custom Scan (Citus Adaptive)
Task Count: 2
Tasks Shown: One of 2
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> Insert on simple_570026 citus_table_alias
-> Result
EXPLAIN :default_analyze_flags EXECUTE insert_query(3, 4);
Custom Scan (Citus Adaptive) (actual rows=0 loops=1)
Task Count: 2
Tasks Shown: One of 2
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> Insert on simple_570026 citus_table_alias (actual rows=0 loops=1)
-> Result (actual rows=1 loops=1)
deallocate insert_query;
-- prepared updates
PREPARE update_query AS UPDATE simple SET name=$1 WHERE name=$2;
EXPLAIN :default_explain_flags EXECUTE update_query('x', 'y');
Custom Scan (Citus Adaptive)
Task Count: 2
Tasks Shown: One of 2
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> Update on simple_570026 simple
-> Seq Scan on simple_570026 simple
Filter: (name = 'y'::text)
EXPLAIN :default_analyze_flags EXECUTE update_query('x', 'y');
Custom Scan (Citus Adaptive) (actual rows=0 loops=1)
Task Count: 2
Tasks Shown: One of 2
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> Update on simple_570026 simple (actual rows=0 loops=1)
-> Seq Scan on simple_570026 simple (actual rows=0 loops=1)
Filter: (name = $2)
Rows Removed by Filter: 1
deallocate update_query;
-- prepared deletes
PREPARE delete_query AS DELETE FROM simple WHERE name=$1 OR name=$2;
EXPLAIN EXECUTE delete_query('x', 'y');
Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=0 width=0)
Task Count: 2
Tasks Shown: One of 2
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> Delete on simple_570026 simple (cost=0.00..29.05 rows=13 width=6)
-> Seq Scan on simple_570026 simple (cost=0.00..29.05 rows=13 width=6)
Filter: ((name = 'x'::text) OR (name = 'y'::text))
EXPLAIN :default_analyze_flags EXECUTE delete_query('x', 'y');
Custom Scan (Citus Adaptive) (actual rows=0 loops=1)
Task Count: 2
Tasks Shown: One of 2
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> Delete on simple_570026 simple (actual rows=0 loops=1)
-> Seq Scan on simple_570026 simple (actual rows=0 loops=1)
Filter: ((name = $1) OR (name = $2))
Rows Removed by Filter: 1
deallocate delete_query;
-- prepared distributed insert/select
-- we don't support EXPLAIN for prepared insert/selects of other types.
PREPARE distributed_insert_select AS INSERT INTO simple SELECT * FROM simple WHERE name IN ($1, $2);
EXPLAIN :default_explain_flags EXECUTE distributed_insert_select('x', 'y');
Custom Scan (Citus Adaptive)
Task Count: 2
Tasks Shown: One of 2
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> Insert on simple_570026 citus_table_alias
-> Seq Scan on simple_570026 simple
Filter: ((name = ANY ('{x,y}'::text[])) AND (worker_hash(id) >= '-2147483648'::integer) AND (worker_hash(id) <= '-1'::integer))
EXPLAIN :default_analyze_flags EXECUTE distributed_insert_select('x', 'y');
Custom Scan (Citus Adaptive) (actual rows=0 loops=1)
Task Count: 2
Tasks Shown: One of 2
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> Insert on simple_570026 citus_table_alias (actual rows=0 loops=1)
-> Seq Scan on simple_570026 simple (actual rows=0 loops=1)
Filter: ((name = ANY (ARRAY[$1, $2])) AND (worker_hash(id) >= '-2147483648'::integer) AND (worker_hash(id) <= '-1'::integer))
Rows Removed by Filter: 1
deallocate distributed_insert_select;
-- prepared cte
BEGIN;
PREPARE cte_query AS
WITH keys AS (
SELECT count(*) FROM
(SELECT DISTINCT l_orderkey, GREATEST(random(), 2) FROM lineitem_hash_part WHERE l_quantity > $1) t
),
series AS (
SELECT s FROM generate_series(1, $2) s
),
delete_result AS (
DELETE FROM lineitem_hash_part WHERE l_quantity < $3 RETURNING *
)
SELECT s FROM series;
EXPLAIN :default_explain_flags EXECUTE cte_query(2, 10, -1);
Custom Scan (Citus Adaptive)
-> Distributed Subplan XXX_1
-> Custom Scan (Citus Adaptive)
Task Count: 4
Tasks Shown: One of 4
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> Delete on lineitem_hash_part_360041 lineitem_hash_part
-> Seq Scan on lineitem_hash_part_360041 lineitem_hash_part
Filter: (l_quantity < '-1'::numeric)
Task Count: 1
Tasks Shown: All
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> Function Scan on generate_series s
EXPLAIN :default_analyze_flags EXECUTE cte_query(2, 10, -1);
Custom Scan (Citus Adaptive) (actual rows=10 loops=1)
-> Distributed Subplan XXX_1
Intermediate Data Size: 0 bytes
Result destination: Send to 0 nodes
-> Custom Scan (Citus Adaptive) (actual rows=0 loops=1)
Task Count: 4
Tuple data received from nodes: 0 bytes
Tasks Shown: One of 4
-> Task
Tuple data received from node: 0 bytes
Node: host=localhost port=xxxxx dbname=regression
-> Delete on lineitem_hash_part_360041 lineitem_hash_part (actual rows=0 loops=1)
-> Seq Scan on lineitem_hash_part_360041 lineitem_hash_part (actual rows=0 loops=1)
Filter: (l_quantity < '-1'::numeric)
Rows Removed by Filter: 2885
Task Count: 1
Tuple data received from nodes: 11 bytes
Tasks Shown: All
-> Task
Tuple data received from node: 11 bytes
Node: host=localhost port=xxxxx dbname=regression
-> Function Scan on generate_series s (actual rows=10 loops=1)
ROLLBACK;
-- https://github.com/citusdata/citus/issues/2009#issuecomment-653036502
CREATE TABLE users_table_2 (user_id int primary key, time timestamp, value_1 int, value_2 int, value_3 float, value_4 bigint);
SELECT create_reference_table('users_table_2');
PREPARE p4 (int, int) AS insert into users_table_2 ( value_1, user_id) select value_1, user_id + $2 FROM users_table_2 ON CONFLICT (user_id) DO UPDATE SET value_2 = EXCLUDED.value_1 + $1;
EXPLAIN :default_explain_flags execute p4(20,20);
Custom Scan (Citus Adaptive)
Task Count: 1
Tasks Shown: All
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> Insert on users_table_2_570028 citus_table_alias
Conflict Resolution: UPDATE
Conflict Arbiter Indexes: users_table_2_pkey_570028
-> Seq Scan on users_table_2_570028 users_table_2
EXPLAIN :default_analyze_flags execute p4(20,20);
Custom Scan (Citus Adaptive) (actual rows=0 loops=1)
Task Count: 1
Tasks Shown: All
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> Insert on users_table_2_570028 citus_table_alias (actual rows=0 loops=1)
Conflict Resolution: UPDATE
Conflict Arbiter Indexes: users_table_2_pkey_570028
Tuples Inserted: 0
Conflicting Tuples: 0
-> Seq Scan on users_table_2_570028 users_table_2 (actual rows=0 loops=1)
DROP TABLE simple, users_table_2;

View File

@ -751,6 +751,46 @@ SELECT count(*) FROM worker_save_query_explain_analyze('SELECT pg_sleep(0.05)',
SELECT execution_duration BETWEEN 30 AND 200 FROM worker_last_saved_explain_analyze();
END;
--
-- verify we handle parametrized queries properly
--
CREATE TABLE t(a int);
INSERT INTO t VALUES (1), (2), (3);
-- simple case
PREPARE save_explain AS
SELECT $1, * FROM worker_save_query_explain_analyze('SELECT $1::int', :default_opts) as (a int);
EXECUTE save_explain(1);
deallocate save_explain;
-- Call a UDF first to make sure that we handle stacks of executorBoundParams properly.
--
-- The prepared statement will first call f() which will force new executor run with new
-- set of parameters. Then it will call worker_save_query_explain_analyze with a
-- parametrized query. If we don't have the correct set of parameters here, it will fail.
CREATE FUNCTION f() RETURNS INT
AS $$
PREPARE pp1 AS SELECT $1 WHERE $2 = $3;
EXECUTE pp1(4, 5, 5);
deallocate pp1;
SELECT 1$$ LANGUAGE sql volatile;
PREPARE save_explain AS
SELECT $1, CASE WHEN i < 2 THEN
f() = 1
ELSE
EXISTS(SELECT * FROM worker_save_query_explain_analyze('SELECT $1::int', :default_opts) as (a int)
WHERE a = 1)
END
FROM generate_series(1, 4) i;
EXECUTE save_explain(1);
deallocate save_explain;
DROP FUNCTION f();
DROP TABLE t;
SELECT * FROM explain_analyze_test ORDER BY a;
\a\t
@ -764,6 +804,7 @@ SET client_min_messages TO WARNING;
SELECT create_distributed_table('explain_analyze_test', 'a');
\set default_analyze_flags '(ANALYZE on, COSTS off, TIMING off, SUMMARY off)'
\set default_explain_flags '(ANALYZE off, COSTS off, TIMING off, SUMMARY off)'
-- router SELECT
EXPLAIN :default_analyze_flags SELECT * FROM explain_analyze_test WHERE a = 1;
@ -871,6 +912,11 @@ WITH r AS (
SELECT count(distinct a2) FROM s;
ROLLBACK;
-- https://github.com/citusdata/citus/issues/4074
prepare ref_select(int) AS select * from ref_table where 1 = $1;
explain :default_analyze_flags execute ref_select(1);
deallocate ref_select;
DROP TABLE ref_table, dist_table;
-- test EXPLAIN ANALYZE with different replication factors
@ -916,3 +962,74 @@ EXPLAIN :default_analyze_flags EXECUTE p3;
EXPLAIN :default_analyze_flags EXECUTE p3;
DROP TABLE dist_table_rep1, dist_table_rep2;
-- https://github.com/citusdata/citus/issues/2009
CREATE TABLE simple (id integer, name text);
SELECT create_distributed_table('simple', 'id');
PREPARE simple_router AS SELECT *, $1 FROM simple WHERE id = 1;
EXPLAIN :default_explain_flags EXECUTE simple_router(1);
EXPLAIN :default_analyze_flags EXECUTE simple_router(1);
EXPLAIN :default_analyze_flags EXECUTE simple_router(1);
EXPLAIN :default_analyze_flags EXECUTE simple_router(1);
EXPLAIN :default_analyze_flags EXECUTE simple_router(1);
EXPLAIN :default_analyze_flags EXECUTE simple_router(1);
EXPLAIN :default_analyze_flags EXECUTE simple_router(1);
EXPLAIN :default_analyze_flags EXECUTE simple_router(1);
deallocate simple_router;
-- prepared multi-row insert
PREPARE insert_query AS INSERT INTO simple VALUES ($1, 2), (2, $2);
EXPLAIN :default_explain_flags EXECUTE insert_query(3, 4);
EXPLAIN :default_analyze_flags EXECUTE insert_query(3, 4);
deallocate insert_query;
-- prepared updates
PREPARE update_query AS UPDATE simple SET name=$1 WHERE name=$2;
EXPLAIN :default_explain_flags EXECUTE update_query('x', 'y');
EXPLAIN :default_analyze_flags EXECUTE update_query('x', 'y');
deallocate update_query;
-- prepared deletes
PREPARE delete_query AS DELETE FROM simple WHERE name=$1 OR name=$2;
EXPLAIN EXECUTE delete_query('x', 'y');
EXPLAIN :default_analyze_flags EXECUTE delete_query('x', 'y');
deallocate delete_query;
-- prepared distributed insert/select
-- we don't support EXPLAIN for prepared insert/selects of other types.
PREPARE distributed_insert_select AS INSERT INTO simple SELECT * FROM simple WHERE name IN ($1, $2);
EXPLAIN :default_explain_flags EXECUTE distributed_insert_select('x', 'y');
EXPLAIN :default_analyze_flags EXECUTE distributed_insert_select('x', 'y');
deallocate distributed_insert_select;
-- prepared cte
BEGIN;
PREPARE cte_query AS
WITH keys AS (
SELECT count(*) FROM
(SELECT DISTINCT l_orderkey, GREATEST(random(), 2) FROM lineitem_hash_part WHERE l_quantity > $1) t
),
series AS (
SELECT s FROM generate_series(1, $2) s
),
delete_result AS (
DELETE FROM lineitem_hash_part WHERE l_quantity < $3 RETURNING *
)
SELECT s FROM series;
EXPLAIN :default_explain_flags EXECUTE cte_query(2, 10, -1);
EXPLAIN :default_analyze_flags EXECUTE cte_query(2, 10, -1);
ROLLBACK;
-- https://github.com/citusdata/citus/issues/2009#issuecomment-653036502
CREATE TABLE users_table_2 (user_id int primary key, time timestamp, value_1 int, value_2 int, value_3 float, value_4 bigint);
SELECT create_reference_table('users_table_2');
PREPARE p4 (int, int) AS insert into users_table_2 ( value_1, user_id) select value_1, user_id + $2 FROM users_table_2 ON CONFLICT (user_id) DO UPDATE SET value_2 = EXCLUDED.value_1 + $1;
EXPLAIN :default_explain_flags execute p4(20,20);
EXPLAIN :default_analyze_flags execute p4(20,20);
DROP TABLE simple, users_table_2;