Fix segfault during EXPLAIN EXECUTE

Fix citusdata/citus#886

The way postgres' explain hook is designed means that our hook is never
called during EXPLAIN EXECUTE. So, we special-case EXPLAIN EXECUTE by
catching it in the utility hook.  We then replace the EXECUTE with the
original query and pass it back to Citus.
pull/916/head
Brian Cloutier 2016-10-20 17:20:30 +03:00 committed by Brian Cloutier
parent 61f6baf9e3
commit 1e6d1ef67e
6 changed files with 149 additions and 4 deletions

View File

@ -29,6 +29,7 @@
#include "catalog/pg_class.h" #include "catalog/pg_class.h"
#include "commands/defrem.h" #include "commands/defrem.h"
#include "commands/tablecmds.h" #include "commands/tablecmds.h"
#include "commands/prepare.h"
#include "distributed/citus_ruleutils.h" #include "distributed/citus_ruleutils.h"
#include "distributed/commit_protocol.h" #include "distributed/commit_protocol.h"
#include "distributed/connection_cache.h" #include "distributed/connection_cache.h"
@ -39,6 +40,7 @@
#include "distributed/multi_join_order.h" #include "distributed/multi_join_order.h"
#include "distributed/multi_planner.h" #include "distributed/multi_planner.h"
#include "distributed/multi_router_executor.h" #include "distributed/multi_router_executor.h"
#include "distributed/multi_router_planner.h"
#include "distributed/multi_shard_transaction.h" #include "distributed/multi_shard_transaction.h"
#include "distributed/multi_utility.h" /* IWYU pragma: keep */ #include "distributed/multi_utility.h" /* IWYU pragma: keep */
#include "distributed/pg_dist_partition.h" #include "distributed/pg_dist_partition.h"
@ -55,6 +57,7 @@
#include "nodes/pg_list.h" #include "nodes/pg_list.h"
#include "nodes/primnodes.h" #include "nodes/primnodes.h"
#include "nodes/value.h" #include "nodes/value.h"
#include "parser/analyze.h"
#include "storage/lmgr.h" #include "storage/lmgr.h"
#include "storage/lock.h" #include "storage/lock.h"
#include "tcop/dest.h" #include "tcop/dest.h"
@ -131,7 +134,6 @@ static List * CopyGetAttnums(TupleDesc tupDesc, Relation rel, List *attnamelist)
static bool warnedUserAbout2PC = false; static bool warnedUserAbout2PC = false;
/* /*
* Utility for handling citus specific concerns around utility statements. * Utility for handling citus specific concerns around utility statements.
* *
@ -299,6 +301,48 @@ multi_ProcessUtility(Node *parsetree,
" necessary users and roles."))); " necessary users and roles.")));
} }
/* due to an explain-hook limitation we have to special-case EXPLAIN EXECUTE */
if (IsA(parsetree, ExplainStmt) && IsA(((ExplainStmt *) parsetree)->query, Query))
{
ExplainStmt *explainStmt = (ExplainStmt *) parsetree;
Query *query = (Query *) explainStmt->query;
if (query->commandType == CMD_UTILITY &&
IsA(query->utilityStmt, ExecuteStmt))
{
ExecuteStmt *execstmt = (ExecuteStmt *) query->utilityStmt;
PreparedStatement *entry = FetchPreparedStatement(execstmt->name, true);
CachedPlanSource *plansource = entry->plansource;
Query *originalQuery;
/* copied from ExplainExecuteQuery, will never trigger if you used PREPARE */
if (!plansource->fixed_result)
{
ereport(ERROR, (errmsg("EXPLAIN EXECUTE does not support variable-result"
" cached plans")));
}
originalQuery = parse_analyze(plansource->raw_parse_tree,
plansource->query_string,
plansource->param_types,
plansource->num_params);
if (ExtractFirstDistributedTableId(originalQuery) != InvalidOid)
{
/*
* since pg no longer sees EXECUTE it will use the explain hook we've
* installed
*/
explainStmt->query = (Node *) originalQuery;
standard_ProcessUtility(parsetree, plansource->query_string, context,
params, dest, completionTag);
return;
}
/* if this is a normal query fall through to the usual executor */
}
}
if (commandMustRunAsOwner) if (commandMustRunAsOwner)
{ {
GetUserIdAndSecContext(&savedUserId, &savedSecurityContext); GetUserIdAndSecContext(&savedUserId, &savedSecurityContext);

View File

@ -69,7 +69,6 @@ typedef struct WalkerState
bool badCoalesce; bool badCoalesce;
} WalkerState; } WalkerState;
/* planner functions forward declarations */ /* planner functions forward declarations */
static MultiPlan * CreateSingleTaskRouterPlan(Query *originalQuery, Query *query, static MultiPlan * CreateSingleTaskRouterPlan(Query *originalQuery, Query *query,
RelationRestrictionContext * RelationRestrictionContext *
@ -98,7 +97,6 @@ static List * QueryRestrictList(Query *query);
static bool FastShardPruningPossible(CmdType commandType, char partitionMethod); static bool FastShardPruningPossible(CmdType commandType, char partitionMethod);
static ShardInterval * FastShardPruning(Oid distributedTableId, static ShardInterval * FastShardPruning(Oid distributedTableId,
Const *partionColumnValue); Const *partionColumnValue);
static Oid ExtractFirstDistributedTableId(Query *query);
static Const * ExtractInsertPartitionValue(Query *query, Var *partitionColumn); static Const * ExtractInsertPartitionValue(Query *query, Var *partitionColumn);
static Task * RouterSelectTask(Query *originalQuery, static Task * RouterSelectTask(Query *originalQuery,
RelationRestrictionContext *restrictionContext, RelationRestrictionContext *restrictionContext,
@ -1754,7 +1752,7 @@ QueryRestrictList(Query *query)
* for the first distributed table in that query. If the function cannot find a * for the first distributed table in that query. If the function cannot find a
* distributed table, it returns InvalidOid. * distributed table, it returns InvalidOid.
*/ */
static Oid Oid
ExtractFirstDistributedTableId(Query *query) ExtractFirstDistributedTableId(Query *query)
{ {
List *rangeTableList = NIL; List *rangeTableList = NIL;

View File

@ -37,5 +37,6 @@ extern Query * ReorderInsertSelectTargetLists(Query *originalQuery,
RangeTblEntry *insertRte, RangeTblEntry *insertRte,
RangeTblEntry *subqueryRte); RangeTblEntry *subqueryRte);
extern bool InsertSelectQuery(Query *query); extern bool InsertSelectQuery(Query *query);
extern Oid ExtractFirstDistributedTableId(Query *query);
#endif /* MULTI_ROUTER_PLANNER_H */ #endif /* MULTI_ROUTER_PLANNER_H */

View File

@ -670,3 +670,47 @@ Distributed Query into pg_merge_job_570037
Master Query Master Query
-> Aggregate -> Aggregate
-> Seq Scan on pg_merge_job_570037 -> Seq Scan on pg_merge_job_570037
-- ensure EXPLAIN EXECUTE doesn't crash
PREPARE task_tracker_query AS
SELECT avg(l_linenumber) FROM lineitem WHERE l_orderkey > 9030;
EXPLAIN (COSTS FALSE) EXECUTE task_tracker_query;
Distributed Query into pg_merge_job_570038
Executor: Task-Tracker
Task Count: 4
Tasks Shown: One of 4
-> Task
Node: host=localhost port=57637 dbname=regression
-> Aggregate
-> Seq Scan on lineitem_290005 lineitem
Filter: (l_orderkey > 9030)
Master Query
-> Aggregate
-> Seq Scan on pg_merge_job_570038
SET citus.task_executor_type TO 'real-time';
PREPARE router_executor_query AS SELECT l_quantity FROM lineitem WHERE l_orderkey = 5;
EXPLAIN EXECUTE router_executor_query;
Distributed Query into pg_merge_job_570039
Executor: Router
Task Count: 1
Tasks Shown: All
-> Task
Node: host=localhost port=57637 dbname=regression
-> Bitmap Heap Scan on lineitem_290000 lineitem (cost=4.30..13.44 rows=3 width=18)
Recheck Cond: (l_orderkey = 5)
-> Bitmap Index Scan on lineitem_pkey_290000 (cost=0.00..4.30 rows=3 width=0)
Index Cond: (l_orderkey = 5)
PREPARE real_time_executor_query AS
SELECT avg(l_linenumber) FROM lineitem WHERE l_orderkey > 9030;
EXPLAIN (COSTS FALSE) EXECUTE real_time_executor_query;
Distributed Query into pg_merge_job_570040
Executor: Real-Time
Task Count: 4
Tasks Shown: One of 4
-> Task
Node: host=localhost port=57637 dbname=regression
-> Aggregate
-> Seq Scan on lineitem_290005 lineitem
Filter: (l_orderkey > 9030)
Master Query
-> Aggregate
-> Seq Scan on pg_merge_job_570040

View File

@ -641,3 +641,47 @@ Distributed Query into pg_merge_job_570037
Master Query Master Query
-> Aggregate -> Aggregate
-> Seq Scan on pg_merge_job_570037 -> Seq Scan on pg_merge_job_570037
-- ensure EXPLAIN EXECUTE doesn't crash
PREPARE task_tracker_query AS
SELECT avg(l_linenumber) FROM lineitem WHERE l_orderkey > 9030;
EXPLAIN (COSTS FALSE) EXECUTE task_tracker_query;
Distributed Query into pg_merge_job_570038
Executor: Task-Tracker
Task Count: 4
Tasks Shown: One of 4
-> Task
Node: host=localhost port=57637 dbname=regression
-> Aggregate
-> Seq Scan on lineitem_290005 lineitem
Filter: (l_orderkey > 9030)
Master Query
-> Aggregate
-> Seq Scan on pg_merge_job_570038
SET citus.task_executor_type TO 'real-time';
PREPARE router_executor_query AS SELECT l_quantity FROM lineitem WHERE l_orderkey = 5;
EXPLAIN EXECUTE router_executor_query;
Distributed Query into pg_merge_job_570039
Executor: Router
Task Count: 1
Tasks Shown: All
-> Task
Node: host=localhost port=57637 dbname=regression
-> Bitmap Heap Scan on lineitem_290000 lineitem (cost=4.30..13.44 rows=3 width=18)
Recheck Cond: (l_orderkey = 5)
-> Bitmap Index Scan on lineitem_pkey_290000 (cost=0.00..4.30 rows=3 width=0)
Index Cond: (l_orderkey = 5)
PREPARE real_time_executor_query AS
SELECT avg(l_linenumber) FROM lineitem WHERE l_orderkey > 9030;
EXPLAIN (COSTS FALSE) EXECUTE real_time_executor_query;
Distributed Query into pg_merge_job_570040
Executor: Real-Time
Task Count: 4
Tasks Shown: One of 4
-> Task
Node: host=localhost port=57637 dbname=regression
-> Aggregate
-> Seq Scan on lineitem_290005 lineitem
Filter: (l_orderkey > 9030)
Master Query
-> Aggregate
-> Seq Scan on pg_merge_job_570040

View File

@ -200,3 +200,17 @@ EXPLAIN (COSTS FALSE) SELECT avg(l_linenumber) FROM lineitem_clone;
-- ensure distributed plans don't break -- ensure distributed plans don't break
EXPLAIN (COSTS FALSE) SELECT avg(l_linenumber) FROM lineitem; EXPLAIN (COSTS FALSE) SELECT avg(l_linenumber) FROM lineitem;
-- ensure EXPLAIN EXECUTE doesn't crash
PREPARE task_tracker_query AS
SELECT avg(l_linenumber) FROM lineitem WHERE l_orderkey > 9030;
EXPLAIN (COSTS FALSE) EXECUTE task_tracker_query;
SET citus.task_executor_type TO 'real-time';
PREPARE router_executor_query AS SELECT l_quantity FROM lineitem WHERE l_orderkey = 5;
EXPLAIN EXECUTE router_executor_query;
PREPARE real_time_executor_query AS
SELECT avg(l_linenumber) FROM lineitem WHERE l_orderkey > 9030;
EXPLAIN (COSTS FALSE) EXECUTE real_time_executor_query;