diff --git a/src/backend/distributed/planner/distributed_planner.c b/src/backend/distributed/planner/distributed_planner.c index 4aef0b89c..f97266741 100644 --- a/src/backend/distributed/planner/distributed_planner.c +++ b/src/backend/distributed/planner/distributed_planner.c @@ -19,6 +19,7 @@ #include "catalog/pg_type.h" #include "distributed/citus_nodefuncs.h" #include "distributed/citus_nodes.h" +#include "distributed/cte_inline.h" #include "distributed/function_call_delegation.h" #include "distributed/insert_select_planner.h" #include "distributed/intermediate_result_pruning.h" @@ -70,6 +71,15 @@ static bool ListContainsDistributedTableRTE(List *rangeTableList); static bool IsUpdateOrDelete(Query *query); static PlannedStmt * CreateDistributedPlannedStmt( DistributedPlanningContext *planContext); +static PlannedStmt * InlineCtesAndCreateDistributedPlannedStmt(uint64 planId, + DistributedPlanningContext + *planContext); +static PlannedStmt * TryCreateDistributedPlannedStmt(PlannedStmt *localPlan, + Query *originalQuery, + Query *query, ParamListInfo + boundParams, + PlannerRestrictionContext * + plannerRestrictionContext); static DistributedPlan * CreateDistributedPlan(uint64 planId, Query *originalQuery, Query *query, ParamListInfo boundParams, bool hasUnresolvedParams, @@ -606,6 +616,28 @@ CreateDistributedPlannedStmt(DistributedPlanningContext *planContext) JoinRestrictionContext *joinRestrictionContext = planContext->plannerRestrictionContext->joinRestrictionContext; + PlannedStmt *resultPlan = NULL; + + if (QueryTreeContainsInlinableCTE(planContext->originalQuery)) + { + /* + * Inlining CTEs as subqueries in the query can avoid recursively + * planning some (or all) of the CTEs. In other words, the inlined + * CTEs could become part of query pushdown planning, which is much + * more efficient than recursively planning. So, first try distributed + * planning on the inlined CTEs in the query tree. + * + * We also should fallback to distributed planning with non-inlined CTEs + * if the distributed planning fails with inlined CTEs, because recursively + * planning CTEs can provide full SQL coverage, although it might be slow. + */ + resultPlan = InlineCtesAndCreateDistributedPlannedStmt(planId, planContext); + if (resultPlan != NULL) + { + return resultPlan; + } + } + if (HasUnresolvedExternParamsWalker((Node *) planContext->originalQuery, planContext->boundParams)) { @@ -664,7 +696,7 @@ CreateDistributedPlannedStmt(DistributedPlanningContext *planContext) distributedPlan->planId = planId; /* create final plan by combining local plan with distributed plan */ - PlannedStmt *resultPlan = FinalizePlan(planContext->plan, distributedPlan); + resultPlan = FinalizePlan(planContext->plan, distributedPlan); /* * As explained above, force planning costs to be unrealistically high if @@ -683,6 +715,100 @@ CreateDistributedPlannedStmt(DistributedPlanningContext *planContext) } +/* + * InlineCtesAndCreateDistributedPlannedStmt gets all the parameters required + * for creating a distributed planned statement. The function is primarily a + * wrapper on top of CreateDistributedPlannedStmt(), by first inlining the + * CTEs and calling CreateDistributedPlannedStmt() in PG_TRY() block. The + * function returns NULL if the planning fails on the query where eligable + * CTEs are inlined. + */ +static PlannedStmt * +InlineCtesAndCreateDistributedPlannedStmt(uint64 planId, + DistributedPlanningContext *planContext) +{ + /* + * We'll inline the CTEs and try distributed planning, preserve the original + * query in case the planning fails and we fallback to recursive planning of + * CTEs. + */ + Query *copyOfOriginalQuery = copyObject(planContext->originalQuery); + + RecursivelyInlineCtesInQueryTree(copyOfOriginalQuery); + + /* after inlining, we shouldn't have any inlinable CTEs */ + Assert(!QueryTreeContainsInlinableCTE(copyOfOriginalQuery)); + + /* simply recurse into CreateDistributedPlannedStmt() in a PG_TRY() block */ + PlannedStmt *result = TryCreateDistributedPlannedStmt(planContext->plan, + copyOfOriginalQuery, + planContext->query, + planContext->boundParams, + planContext-> + plannerRestrictionContext); + + return result; +} + + +/* + * TryCreateDistributedPlannedStmt is a wrapper around CreateDistributedPlannedStmt, simply + * calling it in PG_TRY()/PG_CATCH() block. The function returns a PlannedStmt if the input + * query can be planned by Citus. If not, the function returns NULL and generates a DEBUG4 + * message with the reason for the failure. + */ +static PlannedStmt * +TryCreateDistributedPlannedStmt(PlannedStmt *localPlan, + Query *originalQuery, + Query *query, ParamListInfo boundParams, + PlannerRestrictionContext *plannerRestrictionContext) +{ + MemoryContext savedContext = CurrentMemoryContext; + PlannedStmt *result = NULL; + + DistributedPlanningContext *planContext = palloc0(sizeof(DistributedPlanningContext)); + + planContext->plan = localPlan; + planContext->boundParams = boundParams; + planContext->originalQuery = originalQuery; + planContext->query = query; + planContext->plannerRestrictionContext = plannerRestrictionContext; + + + PG_TRY(); + { + result = CreateDistributedPlannedStmt(planContext); + } + PG_CATCH(); + { + MemoryContextSwitchTo(savedContext); + ErrorData *edata = CopyErrorData(); + FlushErrorState(); + + /* don't try to intercept PANIC or FATAL, let those breeze past us */ + if (edata->elevel != ERROR) + { + PG_RE_THROW(); + } + + ereport(DEBUG4, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("Planning after CTEs inlined failed with " + "\nmessage: %s\ndetail: %s\nhint: %s", + edata->message ? edata->message : "", + edata->detail ? edata->detail : "", + edata->hint ? edata->hint : ""))); + + /* leave the error handling system */ + FreeErrorData(edata); + + result = NULL; + } + PG_END_TRY(); + + return result; +} + + /* * CreateDistributedPlan generates a distributed plan for a query. * It goes through 3 steps: