Call CTE inlining in Citus planner

The idea is simple: Inline CTEs(if any), try distributed planning.
If the planning yields a successful distributed plan, simply return
it.

If the planning fails, fallback to distributed planning on the query
tree where CTEs are not inlined. In that case, if the planning failed
just because of the CTE inlining, via recursive planning, the same
query would yield a successful plan.

A very basic set of examples:

WITH cte_1 AS (SELECT * FROM test_table)
SELECT
	*, row_number() OVER ()
FROM
	cte_1;

or

WITH a AS (SELECT * FROM test_table),
b AS (SELECT * FROM test_table)
SELECT * FROM  a JOIN b ON (a.value> b.value);
pull/3161/head
Onder Kalaci 2020-01-07 08:51:53 +01:00 committed by Jelte Fennema
parent 01a5800ee8
commit 05d600dd8f
1 changed files with 127 additions and 1 deletions

View File

@ -19,6 +19,7 @@
#include "catalog/pg_type.h"
#include "distributed/citus_nodefuncs.h"
#include "distributed/citus_nodes.h"
#include "distributed/cte_inline.h"
#include "distributed/function_call_delegation.h"
#include "distributed/insert_select_planner.h"
#include "distributed/intermediate_result_pruning.h"
@ -70,6 +71,15 @@ static bool ListContainsDistributedTableRTE(List *rangeTableList);
static bool IsUpdateOrDelete(Query *query);
static PlannedStmt * CreateDistributedPlannedStmt(
DistributedPlanningContext *planContext);
static PlannedStmt * InlineCtesAndCreateDistributedPlannedStmt(uint64 planId,
DistributedPlanningContext
*planContext);
static PlannedStmt * TryCreateDistributedPlannedStmt(PlannedStmt *localPlan,
Query *originalQuery,
Query *query, ParamListInfo
boundParams,
PlannerRestrictionContext *
plannerRestrictionContext);
static DistributedPlan * CreateDistributedPlan(uint64 planId, Query *originalQuery,
Query *query, ParamListInfo boundParams,
bool hasUnresolvedParams,
@ -606,6 +616,28 @@ CreateDistributedPlannedStmt(DistributedPlanningContext *planContext)
JoinRestrictionContext *joinRestrictionContext =
planContext->plannerRestrictionContext->joinRestrictionContext;
PlannedStmt *resultPlan = NULL;
if (QueryTreeContainsInlinableCTE(planContext->originalQuery))
{
/*
* Inlining CTEs as subqueries in the query can avoid recursively
* planning some (or all) of the CTEs. In other words, the inlined
* CTEs could become part of query pushdown planning, which is much
* more efficient than recursively planning. So, first try distributed
* planning on the inlined CTEs in the query tree.
*
* We also should fallback to distributed planning with non-inlined CTEs
* if the distributed planning fails with inlined CTEs, because recursively
* planning CTEs can provide full SQL coverage, although it might be slow.
*/
resultPlan = InlineCtesAndCreateDistributedPlannedStmt(planId, planContext);
if (resultPlan != NULL)
{
return resultPlan;
}
}
if (HasUnresolvedExternParamsWalker((Node *) planContext->originalQuery,
planContext->boundParams))
{
@ -664,7 +696,7 @@ CreateDistributedPlannedStmt(DistributedPlanningContext *planContext)
distributedPlan->planId = planId;
/* create final plan by combining local plan with distributed plan */
PlannedStmt *resultPlan = FinalizePlan(planContext->plan, distributedPlan);
resultPlan = FinalizePlan(planContext->plan, distributedPlan);
/*
* As explained above, force planning costs to be unrealistically high if
@ -683,6 +715,100 @@ CreateDistributedPlannedStmt(DistributedPlanningContext *planContext)
}
/*
* InlineCtesAndCreateDistributedPlannedStmt gets all the parameters required
* for creating a distributed planned statement. The function is primarily a
* wrapper on top of CreateDistributedPlannedStmt(), by first inlining the
* CTEs and calling CreateDistributedPlannedStmt() in PG_TRY() block. The
* function returns NULL if the planning fails on the query where eligable
* CTEs are inlined.
*/
static PlannedStmt *
InlineCtesAndCreateDistributedPlannedStmt(uint64 planId,
DistributedPlanningContext *planContext)
{
/*
* We'll inline the CTEs and try distributed planning, preserve the original
* query in case the planning fails and we fallback to recursive planning of
* CTEs.
*/
Query *copyOfOriginalQuery = copyObject(planContext->originalQuery);
RecursivelyInlineCtesInQueryTree(copyOfOriginalQuery);
/* after inlining, we shouldn't have any inlinable CTEs */
Assert(!QueryTreeContainsInlinableCTE(copyOfOriginalQuery));
/* simply recurse into CreateDistributedPlannedStmt() in a PG_TRY() block */
PlannedStmt *result = TryCreateDistributedPlannedStmt(planContext->plan,
copyOfOriginalQuery,
planContext->query,
planContext->boundParams,
planContext->
plannerRestrictionContext);
return result;
}
/*
* TryCreateDistributedPlannedStmt is a wrapper around CreateDistributedPlannedStmt, simply
* calling it in PG_TRY()/PG_CATCH() block. The function returns a PlannedStmt if the input
* query can be planned by Citus. If not, the function returns NULL and generates a DEBUG4
* message with the reason for the failure.
*/
static PlannedStmt *
TryCreateDistributedPlannedStmt(PlannedStmt *localPlan,
Query *originalQuery,
Query *query, ParamListInfo boundParams,
PlannerRestrictionContext *plannerRestrictionContext)
{
MemoryContext savedContext = CurrentMemoryContext;
PlannedStmt *result = NULL;
DistributedPlanningContext *planContext = palloc0(sizeof(DistributedPlanningContext));
planContext->plan = localPlan;
planContext->boundParams = boundParams;
planContext->originalQuery = originalQuery;
planContext->query = query;
planContext->plannerRestrictionContext = plannerRestrictionContext;
PG_TRY();
{
result = CreateDistributedPlannedStmt(planContext);
}
PG_CATCH();
{
MemoryContextSwitchTo(savedContext);
ErrorData *edata = CopyErrorData();
FlushErrorState();
/* don't try to intercept PANIC or FATAL, let those breeze past us */
if (edata->elevel != ERROR)
{
PG_RE_THROW();
}
ereport(DEBUG4, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("Planning after CTEs inlined failed with "
"\nmessage: %s\ndetail: %s\nhint: %s",
edata->message ? edata->message : "",
edata->detail ? edata->detail : "",
edata->hint ? edata->hint : "")));
/* leave the error handling system */
FreeErrorData(edata);
result = NULL;
}
PG_END_TRY();
return result;
}
/*
* CreateDistributedPlan generates a distributed plan for a query.
* It goes through 3 steps: