diff --git a/src/backend/distributed/planner/cte_inline.c b/src/backend/distributed/planner/cte_inline.c index 22b17c3d5..d85784563 100644 --- a/src/backend/distributed/planner/cte_inline.c +++ b/src/backend/distributed/planner/cte_inline.c @@ -51,11 +51,6 @@ typedef struct inline_cte_walker_context List *aliascolnames; /* citus addition to Postgres' inline_cte_walker_context */ } inline_cte_walker_context; -/* copy & paste from Postgres source, moved into a function for readability */ -static bool PostgreSQLCTEInlineCondition(CommonTableExpr *cte, CmdType cmdType); - -/* the following utility functions are copy & paste from PostgreSQL code */ -static void inline_cte(Query *mainQuery, CommonTableExpr *cte); static bool inline_cte_walker(Node *node, inline_cte_walker_context *context); static bool contain_dml(Node *node); static bool contain_dml_walker(Node *node, void *context); @@ -80,6 +75,11 @@ bool EnableCTEInlining = true; void RecursivelyInlineCtesInQueryTree(Query *query) { + if (!EnableCTEInlining) + { + return; + } + InlineCTEsInQueryTree(query); query_tree_walker(query, RecursivelyInlineCteWalker, NULL, 0); @@ -205,7 +205,7 @@ QueryTreeContainsInlinableCteWalker(Node *node) * PostgreSQLCTEInlineCondition returns true if the CTE is considered * safe to inline by Postgres. */ -static bool +bool PostgreSQLCTEInlineCondition(CommonTableExpr *cte, CmdType cmdType) { /* @@ -261,7 +261,7 @@ PostgreSQLCTEInlineCondition(CommonTableExpr *cte, CmdType cmdType) /* * inline_cte: convert RTE_CTE references to given CTE into RTE_SUBQUERYs */ -static void +void inline_cte(Query *mainQuery, CommonTableExpr *cte) { struct inline_cte_walker_context context; diff --git a/src/backend/distributed/planner/distributed_planner.c b/src/backend/distributed/planner/distributed_planner.c index a0c27fc66..5426bacba 100644 --- a/src/backend/distributed/planner/distributed_planner.c +++ b/src/backend/distributed/planner/distributed_planner.c @@ -78,15 +78,6 @@ static bool ListContainsDistributedTableRTE(List *rangeTableList); static bool IsUpdateOrDelete(Query *query); static PlannedStmt * CreateDistributedPlannedStmt( DistributedPlanningContext *planContext); -static PlannedStmt * InlineCtesAndCreateDistributedPlannedStmt(uint64 planId, - DistributedPlanningContext - *planContext); -static PlannedStmt * TryCreateDistributedPlannedStmt(PlannedStmt *localPlan, - Query *originalQuery, - Query *query, ParamListInfo - boundParams, - PlannerRestrictionContext * - plannerRestrictionContext); static DeferredErrorMessage * DeferErrorIfPartitionTableNotSingleReplicated(Oid relationId); @@ -615,28 +606,6 @@ CreateDistributedPlannedStmt(DistributedPlanningContext *planContext) uint64 planId = NextPlanId++; bool hasUnresolvedParams = false; - PlannedStmt *resultPlan = NULL; - - if (QueryTreeContainsInlinableCTE(planContext->originalQuery)) - { - /* - * Inlining CTEs as subqueries in the query can avoid recursively - * planning some (or all) of the CTEs. In other words, the inlined - * CTEs could become part of query pushdown planning, which is much - * more efficient than recursively planning. So, first try distributed - * planning on the inlined CTEs in the query tree. - * - * We also should fallback to distributed planning with non-inlined CTEs - * if the distributed planning fails with inlined CTEs, because recursively - * planning CTEs can provide full SQL coverage, although it might be slow. - */ - resultPlan = InlineCtesAndCreateDistributedPlannedStmt(planId, planContext); - if (resultPlan != NULL) - { - return resultPlan; - } - } - if (HasUnresolvedExternParamsWalker((Node *) planContext->originalQuery, planContext->boundParams)) { @@ -645,9 +614,9 @@ CreateDistributedPlannedStmt(DistributedPlanningContext *planContext) DistributedPlan *distributedPlan = CreateDistributedPlan(planId, planContext->originalQuery, planContext->query, - planContext->boundParams, - hasUnresolvedParams, - planContext->plannerRestrictionContext); + planContext->boundParams, + hasUnresolvedParams, + planContext->plannerRestrictionContext); /* * If no plan was generated, prepare a generic error to be emitted. @@ -692,7 +661,7 @@ CreateDistributedPlannedStmt(DistributedPlanningContext *planContext) distributedPlan->planId = planId; /* create final plan by combining local plan with distributed plan */ - resultPlan = FinalizePlan(planContext->plan, distributedPlan); + PlannedStmt *resultPlan = FinalizePlan(planContext->plan, distributedPlan); /* * As explained above, force planning costs to be unrealistically high if @@ -711,139 +680,6 @@ CreateDistributedPlannedStmt(DistributedPlanningContext *planContext) } -/* - * InlineCtesAndCreateDistributedPlannedStmt gets all the parameters required - * for creating a distributed planned statement. The function is primarily a - * wrapper on top of CreateDistributedPlannedStmt(), by first inlining the - * CTEs and calling CreateDistributedPlannedStmt() in PG_TRY() block. The - * function returns NULL if the planning fails on the query where eligable - * CTEs are inlined. - */ -static PlannedStmt * -InlineCtesAndCreateDistributedPlannedStmt(uint64 planId, - DistributedPlanningContext *planContext) -{ - if (!EnableCTEInlining) - { - /* - * In Postgres 12+, users can adjust whether to inline/not inline CTEs - * by [NOT] MATERIALIZED keywords. However, in PG 11, that's not possible. - * So, with this we provide a way to prevent CTE inlining on Postgres 11. - * - * The main use-case for this is not to have divergent test outputs between - * PG 11 vs PG 12, so not very much intended for users. - */ - return NULL; - } - - /* - * We'll inline the CTEs and try distributed planning, preserve the original - * query in case the planning fails and we fallback to recursive planning of - * CTEs. - */ - Query *copyOfOriginalQuery = copyObject(planContext->originalQuery); - - RecursivelyInlineCtesInQueryTree(copyOfOriginalQuery); - - /* after inlining, we shouldn't have any inlinable CTEs */ - Assert(!QueryTreeContainsInlinableCTE(copyOfOriginalQuery)); - -#if PG_VERSION_NUM < PG_VERSION_12 - Query *query = planContext->query; - - /* - * We had to implement this hack because on Postgres11 and below, the originalQuery - * and the query would have significant differences in terms of CTEs where CTEs - * would not be inlined on the query (as standard_planner() wouldn't inline CTEs - * on PG 11 and below). - * - * Instead, we prefer to pass the inlined query to the distributed planning. We rely - * on the fact that the query includes subqueries, and it'd definitely go through - * query pushdown planning. During query pushdown planning, the only relevant query - * tree is the original query. - */ - planContext->query = copyObject(copyOfOriginalQuery); -#endif - - - /* simply recurse into CreateDistributedPlannedStmt() in a PG_TRY() block */ - PlannedStmt *result = TryCreateDistributedPlannedStmt(planContext->plan, - copyOfOriginalQuery, - planContext->query, - planContext->boundParams, - planContext-> - plannerRestrictionContext); - -#if PG_VERSION_NUM < PG_VERSION_12 - - /* - * Set back the original query, in case the planning failed and we need to go - * into distributed planning again. - */ - planContext->query = query; -#endif - - return result; -} - - -/* - * TryCreateDistributedPlannedStmt is a wrapper around CreateDistributedPlannedStmt, simply - * calling it in PG_TRY()/PG_CATCH() block. The function returns a PlannedStmt if the input - * query can be planned by Citus. If not, the function returns NULL and generates a DEBUG4 - * message with the reason for the failure. - */ -static PlannedStmt * -TryCreateDistributedPlannedStmt(PlannedStmt *localPlan, - Query *originalQuery, - Query *query, ParamListInfo boundParams, - PlannerRestrictionContext *plannerRestrictionContext) -{ - MemoryContext savedContext = CurrentMemoryContext; - PlannedStmt *result = NULL; - - DistributedPlanningContext *planContext = palloc0(sizeof(DistributedPlanningContext)); - - planContext->plan = localPlan; - planContext->boundParams = boundParams; - planContext->originalQuery = originalQuery; - planContext->query = query; - planContext->plannerRestrictionContext = plannerRestrictionContext; - - - PG_TRY(); - { - result = CreateDistributedPlannedStmt(planContext); - } - PG_CATCH(); - { - MemoryContextSwitchTo(savedContext); - ErrorData *edata = CopyErrorData(); - FlushErrorState(); - - /* don't try to intercept PANIC or FATAL, let those breeze past us */ - if (edata->elevel != ERROR) - { - PG_RE_THROW(); - } - - ereport(DEBUG4, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("Planning after CTEs inlined failed with " - "\nmessage: %s\ndetail: %s\nhint: %s", - edata->message ? edata->message : "", - edata->detail ? edata->detail : "", - edata->hint ? edata->hint : ""))); - - /* leave the error handling system */ - FreeErrorData(edata); - - result = NULL; - } - PG_END_TRY(); - - return result; -} - /* * CreateDistributedPlan generates a distributed plan for a query. @@ -860,7 +696,6 @@ CreateDistributedPlan(uint64 planId, Query *originalQuery, Query *query, ParamLi PlannerRestrictionContext *plannerRestrictionContext) { DistributedPlan *distributedPlan = NULL; - bool hasCtes = originalQuery->cteList != NIL; if (IsModifyCommand(originalQuery)) { @@ -991,6 +826,7 @@ CreateDistributedPlan(uint64 planId, Query *originalQuery, Query *query, ParamLi * the CTEs are referenced then there are no subplans, but we still want * to retry the router planner. */ + bool hasCtes = originalQuery->cteList != NIL; if (list_length(subPlanList) > 0 || hasCtes) { Query *newQuery = copyObject(originalQuery); @@ -1022,8 +858,9 @@ CreateDistributedPlan(uint64 planId, Query *originalQuery, Query *query, ParamLi *query = *newQuery; /* recurse into CreateDistributedPlan with subqueries/CTEs replaced */ - distributedPlan = CreateDistributedPlan(planId, originalQuery, query, NULL, false, - plannerRestrictionContext); + distributedPlan = CreateDistributedPlan(planId, originalQuery, query, NULL, + false, + plannerRestrictionContext); /* distributedPlan cannot be null since hasUnresolvedParams argument was false */ Assert(distributedPlan != NULL); diff --git a/src/backend/distributed/planner/insert_select_planner.c b/src/backend/distributed/planner/insert_select_planner.c index f2da9ef90..d8f156b46 100644 --- a/src/backend/distributed/planner/insert_select_planner.c +++ b/src/backend/distributed/planner/insert_select_planner.c @@ -383,9 +383,9 @@ CreateInsertSelectIntoLocalTablePlan(uint64 planId, Query *originalQuery, ParamL Query *selectQuery = BuildSelectForInsertSelect(originalQuery); originalQuery->cteList = NIL; DistributedPlan *distPlan = CreateDistributedPlan(planId, selectQuery, - copyObject(selectQuery), - boundParams, hasUnresolvedParams, - plannerRestrictionContext); + copyObject(selectQuery), + boundParams, hasUnresolvedParams, + plannerRestrictionContext); /* * We don't expect distPlan to be NULL here because hasUnresolvedParams is diff --git a/src/backend/distributed/planner/multi_logical_planner.c b/src/backend/distributed/planner/multi_logical_planner.c index 8d0289d7c..bfadcf575 100644 --- a/src/backend/distributed/planner/multi_logical_planner.c +++ b/src/backend/distributed/planner/multi_logical_planner.c @@ -80,7 +80,6 @@ static Oid NodeTryGetRteRelid(Node *node); static bool FullCompositeFieldList(List *compositeFieldList); static bool HasUnsupportedJoinWalker(Node *node, void *context); static bool ErrorHintRequired(const char *errorHint, Query *queryTree); -static bool HasTablesample(Query *queryTree); static bool HasComplexRangeTableType(Query *queryTree); static bool IsReadIntermediateResultFunction(Node *node); static bool IsReadIntermediateResultArrayFunction(Node *node); @@ -956,7 +955,7 @@ DeferErrorIfQueryNotSupported(Query *queryTree) /* HasTablesample returns tree if the query contains tablesample */ -static bool +bool HasTablesample(Query *queryTree) { List *rangeTableList = queryTree->rtable; diff --git a/src/backend/distributed/planner/query_pushdown_planning.c b/src/backend/distributed/planner/query_pushdown_planning.c index 0cb935a13..765dda3e5 100644 --- a/src/backend/distributed/planner/query_pushdown_planning.c +++ b/src/backend/distributed/planner/query_pushdown_planning.c @@ -971,8 +971,8 @@ DeferErrorIfCannotPushdownSubquery(Query *subqueryTree, bool outerMostQueryHasLi if (subqueryTree->cteList) { - preconditionsSatisfied = false; - errorDetail = "Common Table Expressions are currently unsupported"; + //preconditionsSatisfied = false; + //errorDetail = "Common Table Expressions are currently unsupported"; } if (subqueryTree->hasForUpdate) @@ -1145,9 +1145,9 @@ DeferErrorIfUnsupportedTableCombination(Query *queryTree) } else if (rangeTableEntry->rtekind == RTE_CTE) { - unsupportedTableCombination = true; - errorDetail = "CTEs in subqueries are currently unsupported"; - break; + //unsupportedTableCombination = true; + //errorDetail = "CTEs in subqueries are currently unsupported"; + //break; } else if (rangeTableEntry->rtekind == RTE_VALUES) { diff --git a/src/backend/distributed/planner/recursive_planning.c b/src/backend/distributed/planner/recursive_planning.c index f8cc45cc7..55286f9ca 100644 --- a/src/backend/distributed/planner/recursive_planning.c +++ b/src/backend/distributed/planner/recursive_planning.c @@ -57,6 +57,7 @@ #include "distributed/citus_nodes.h" #include "distributed/citus_ruleutils.h" #include "distributed/commands/multi_copy.h" +#include "distributed/cte_inline.h" #include "distributed/distributed_planner.h" #include "distributed/errormessage.h" #include "distributed/local_distributed_join_planner.h" @@ -327,7 +328,7 @@ RecursivelyPlanSubqueriesAndCTEs(Query *query, RecursivePlanningContext *context if (ShouldRecursivelyPlanAllSubqueriesInWhere(query)) { /* replace all subqueries in the WHERE clause */ - RecursivelyPlanAllSubqueries((Node *) query->jointree->quals, context); + RecursivelyPlanAllSubqueries((Node *) (query)->jointree->quals, context); } if (query->havingQual != NULL) @@ -712,6 +713,14 @@ RecursivelyPlanAllSubqueries(Node *node, RecursivePlanningContext *planningConte return expression_tree_walker(node, RecursivelyPlanAllSubqueries, planningContext); } +/* + * IsGroupingFunc returns whether node is a GroupingFunc. + */ +static bool +IsGroupingFunc(Node *node) +{ + return IsA(node, GroupingFunc); +} /* * RecursivelyPlanCTEs plans all CTEs in the query by recursively calling the planner @@ -733,20 +742,15 @@ RecursivelyPlanCTEs(Query *query, RecursivePlanningContext *planningContext) return NULL; } - if (query->hasRecursive) - { - return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, - "recursive CTEs are not supported in distributed " - "queries", - NULL, NULL); - } /* get all RTE_CTEs that point to CTEs from cteList */ CteReferenceListWalker((Node *) query, &context); - foreach(cteCell, query->cteList) + List *copyOfCteList = list_copy(query->cteList); + foreach(cteCell, copyOfCteList) { CommonTableExpr *cte = (CommonTableExpr *) lfirst(cteCell); + char *cteName = cte->ctename; Query *subquery = (Query *) cte->ctequery; uint64 planId = planningContext->planId; @@ -754,13 +758,7 @@ RecursivelyPlanCTEs(Query *query, RecursivePlanningContext *planningContext) ListCell *rteCell = NULL; int replacedCtesCount = 0; - if (ContainsReferencesToOuterQuery(subquery)) - { - return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, - "CTEs that refer to other subqueries are not " - "supported in multi-shard queries", - NULL, NULL); - } + if (cte->cterefcount == 0 && subquery->commandType == CMD_SELECT) { @@ -772,6 +770,64 @@ RecursivelyPlanCTEs(Query *query, RecursivePlanningContext *planningContext) continue; } + /* + * First, make sure that Postgres is OK to inline the CTE. Later, check for + * distributed query planning constraints that might prevent inlining. + */ + if (EnableCTEInlining && + PostgreSQLCTEInlineCondition(cte, query->commandType) && + planningContext->allDistributionKeysInQueryAreEqual && + query->groupingSets == NIL && !query->hasForUpdate && + !FindNodeMatchingCheckFunction((Node *) query, IsGroupingFunc) && + !HasTablesample(query)) + { + + Query *copyQuery = copyObject(query); + CommonTableExpr *copyCte = copyObject(cte); + + + /* do the hard work of cte inlining */ + inline_cte(copyQuery, copyCte); + /* clean-up the necessary fields for distributed planning */ + copyCte->cterefcount = 0; + copyQuery->cteList = list_delete_ptr(copyQuery->cteList, copyCte); + DeferredErrorMessage *df = DeferErrorIfUnsupportedSubqueryPushdown(copyQuery, planningContext->plannerRestrictionContext); + if (df == NULL) + { + elog(DEBUG1, "CTE %s is going to be inlined via " + "distributed planning", cte->ctename); + + inline_cte(query, cte); + + /* clean-up the necessary fields for distributed planning */ + cte->cterefcount = 0; + query->cteList = list_delete_ptr(query->cteList, cte); + + continue; + + } + //else + // elog(DEBUG1, "Reason: %s:%s", df->message, df->detail); + + } + + if (ContainsReferencesToOuterQuery(subquery)) + { + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "CTEs that refer to other subqueries are not " + "supported in multi-shard queries", + NULL, NULL); + } + + + if (query->hasRecursive) + { + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "recursive CTEs are not supported in distributed " + "queries", + NULL, NULL); + } + uint32 subPlanId = list_length(planningContext->subPlanList) + 1; if (IsLoggableLevel(DEBUG1)) @@ -971,7 +1027,7 @@ AllDistributionKeysInSubqueryAreEqual(Query *subquery, /* we don't support distribution eq. checks for CTEs yet */ if (subquery->cteList != NIL) { - return false; + /*return false; */ } PlannerRestrictionContext *filteredRestrictionContext = diff --git a/src/backend/distributed/planner/relation_restriction_equivalence.c b/src/backend/distributed/planner/relation_restriction_equivalence.c index cf1c5a59d..1d9e350ea 100644 --- a/src/backend/distributed/planner/relation_restriction_equivalence.c +++ b/src/backend/distributed/planner/relation_restriction_equivalence.c @@ -152,7 +152,6 @@ static bool RangeTableArrayContainsAnyRTEIdentities(RangeTblEntry **rangeTableEn rangeTableArrayLength, Relids queryRteIdentities); static int RangeTableOffsetCompat(PlannerInfo *root, AppendRelInfo *appendRelInfo); -static Relids QueryRteIdentities(Query *queryTree); /* @@ -169,7 +168,7 @@ AllDistributionKeysInQueryAreEqual(Query *originalQuery, /* we don't support distribution key equality checks for CTEs yet */ if (originalQuery->cteList != NIL) { - return false; + //return false; } /* we don't support distribution key equality checks for local tables */ @@ -2091,7 +2090,7 @@ RangeTableArrayContainsAnyRTEIdentities(RangeTblEntry **rangeTableEntries, int * QueryRteIdentities gets a queryTree, find get all the rte identities assigned by * us. */ -static Relids +Relids QueryRteIdentities(Query *queryTree) { List *rangeTableList = NULL; diff --git a/src/include/distributed/cte_inline.h b/src/include/distributed/cte_inline.h index 09cac7bdb..beb1bb1f9 100644 --- a/src/include/distributed/cte_inline.h +++ b/src/include/distributed/cte_inline.h @@ -17,5 +17,10 @@ extern bool EnableCTEInlining; extern void RecursivelyInlineCtesInQueryTree(Query *query); extern bool QueryTreeContainsInlinableCTE(Query *queryTree); +/* copy & paste from Postgres source, moved into a function for readability */ +extern bool PostgreSQLCTEInlineCondition(CommonTableExpr *cte, CmdType cmdType); + +/* the following utility functions are copy & paste from PostgreSQL code */ +extern void inline_cte(Query *mainQuery, CommonTableExpr *cte); #endif /* CTE_INLINE_H */ diff --git a/src/include/distributed/multi_logical_planner.h b/src/include/distributed/multi_logical_planner.h index f8d1811a2..c07a522ec 100644 --- a/src/include/distributed/multi_logical_planner.h +++ b/src/include/distributed/multi_logical_planner.h @@ -229,6 +229,7 @@ extern MultiExtendedOp * MultiExtendedOpNode(Query *queryTree, Query *originalQu extern DeferredErrorMessage * DeferErrorIfUnsupportedSubqueryRepartition(Query * subqueryTree); extern MultiNode * MultiNodeTree(Query *queryTree); +extern bool HasTablesample(Query *queryTree); #endif /* MULTI_LOGICAL_PLANNER_H */ diff --git a/src/include/distributed/multi_router_planner.h b/src/include/distributed/multi_router_planner.h index 40cbaf447..b8ebd20ac 100644 --- a/src/include/distributed/multi_router_planner.h +++ b/src/include/distributed/multi_router_planner.h @@ -65,8 +65,6 @@ extern DeferredErrorMessage * ModifyQuerySupported(Query *queryTree, Query *orig plannerRestrictionContext); extern DeferredErrorMessage * ErrorIfOnConflictNotSupported(Query *queryTree); extern List * ShardIntervalOpExpressions(ShardInterval *shardInterval, Index rteIndex); -extern RelationRestrictionContext * CopyRelationRestrictionContext( - RelationRestrictionContext *oldContext); extern Oid ExtractFirstCitusTableId(Query *query); extern RangeTblEntry * ExtractSelectRangeTableEntry(Query *query); diff --git a/src/include/distributed/relation_restriction_equivalence.h b/src/include/distributed/relation_restriction_equivalence.h index bfa650c0e..cf8d0fafd 100644 --- a/src/include/distributed/relation_restriction_equivalence.h +++ b/src/include/distributed/relation_restriction_equivalence.h @@ -16,6 +16,7 @@ #include "distributed/metadata_cache.h" #define SINGLE_RTE_INDEX 1 +extern Relids QueryRteIdentities(Query *queryTree); extern bool AllDistributionKeysInQueryAreEqual(Query *originalQuery, PlannerRestrictionContext * @@ -55,4 +56,6 @@ extern RelationRestrictionContext * FilterRelationRestrictionContext( RelationRestrictionContext *relationRestrictionContext, Relids queryRteIdentities); +extern RelationRestrictionContext * +CopyRelationRestrictionContext(RelationRestrictionContext *relationRestrictionContext); #endif /* RELATION_RESTRICTION_EQUIVALENCE_H */