cte_inline_improve
Onder Kalaci 2020-12-15 22:05:30 +03:00
parent 26284bf2a1
commit 788737819a
11 changed files with 108 additions and 210 deletions

View File

@ -51,11 +51,6 @@ typedef struct inline_cte_walker_context
List *aliascolnames; /* citus addition to Postgres' inline_cte_walker_context */
} inline_cte_walker_context;
/* copy & paste from Postgres source, moved into a function for readability */
static bool PostgreSQLCTEInlineCondition(CommonTableExpr *cte, CmdType cmdType);
/* the following utility functions are copy & paste from PostgreSQL code */
static void inline_cte(Query *mainQuery, CommonTableExpr *cte);
static bool inline_cte_walker(Node *node, inline_cte_walker_context *context);
static bool contain_dml(Node *node);
static bool contain_dml_walker(Node *node, void *context);
@ -80,6 +75,11 @@ bool EnableCTEInlining = true;
void
RecursivelyInlineCtesInQueryTree(Query *query)
{
if (!EnableCTEInlining)
{
return;
}
InlineCTEsInQueryTree(query);
query_tree_walker(query, RecursivelyInlineCteWalker, NULL, 0);
@ -205,7 +205,7 @@ QueryTreeContainsInlinableCteWalker(Node *node)
* PostgreSQLCTEInlineCondition returns true if the CTE is considered
* safe to inline by Postgres.
*/
static bool
bool
PostgreSQLCTEInlineCondition(CommonTableExpr *cte, CmdType cmdType)
{
/*
@ -261,7 +261,7 @@ PostgreSQLCTEInlineCondition(CommonTableExpr *cte, CmdType cmdType)
/*
* inline_cte: convert RTE_CTE references to given CTE into RTE_SUBQUERYs
*/
static void
void
inline_cte(Query *mainQuery, CommonTableExpr *cte)
{
struct inline_cte_walker_context context;

View File

@ -78,15 +78,6 @@ static bool ListContainsDistributedTableRTE(List *rangeTableList);
static bool IsUpdateOrDelete(Query *query);
static PlannedStmt * CreateDistributedPlannedStmt(
DistributedPlanningContext *planContext);
static PlannedStmt * InlineCtesAndCreateDistributedPlannedStmt(uint64 planId,
DistributedPlanningContext
*planContext);
static PlannedStmt * TryCreateDistributedPlannedStmt(PlannedStmt *localPlan,
Query *originalQuery,
Query *query, ParamListInfo
boundParams,
PlannerRestrictionContext *
plannerRestrictionContext);
static DeferredErrorMessage * DeferErrorIfPartitionTableNotSingleReplicated(Oid
relationId);
@ -615,28 +606,6 @@ CreateDistributedPlannedStmt(DistributedPlanningContext *planContext)
uint64 planId = NextPlanId++;
bool hasUnresolvedParams = false;
PlannedStmt *resultPlan = NULL;
if (QueryTreeContainsInlinableCTE(planContext->originalQuery))
{
/*
* Inlining CTEs as subqueries in the query can avoid recursively
* planning some (or all) of the CTEs. In other words, the inlined
* CTEs could become part of query pushdown planning, which is much
* more efficient than recursively planning. So, first try distributed
* planning on the inlined CTEs in the query tree.
*
* We also should fallback to distributed planning with non-inlined CTEs
* if the distributed planning fails with inlined CTEs, because recursively
* planning CTEs can provide full SQL coverage, although it might be slow.
*/
resultPlan = InlineCtesAndCreateDistributedPlannedStmt(planId, planContext);
if (resultPlan != NULL)
{
return resultPlan;
}
}
if (HasUnresolvedExternParamsWalker((Node *) planContext->originalQuery,
planContext->boundParams))
{
@ -645,9 +614,9 @@ CreateDistributedPlannedStmt(DistributedPlanningContext *planContext)
DistributedPlan *distributedPlan =
CreateDistributedPlan(planId, planContext->originalQuery, planContext->query,
planContext->boundParams,
hasUnresolvedParams,
planContext->plannerRestrictionContext);
planContext->boundParams,
hasUnresolvedParams,
planContext->plannerRestrictionContext);
/*
* If no plan was generated, prepare a generic error to be emitted.
@ -692,7 +661,7 @@ CreateDistributedPlannedStmt(DistributedPlanningContext *planContext)
distributedPlan->planId = planId;
/* create final plan by combining local plan with distributed plan */
resultPlan = FinalizePlan(planContext->plan, distributedPlan);
PlannedStmt *resultPlan = FinalizePlan(planContext->plan, distributedPlan);
/*
* As explained above, force planning costs to be unrealistically high if
@ -711,139 +680,6 @@ CreateDistributedPlannedStmt(DistributedPlanningContext *planContext)
}
/*
* InlineCtesAndCreateDistributedPlannedStmt gets all the parameters required
* for creating a distributed planned statement. The function is primarily a
* wrapper on top of CreateDistributedPlannedStmt(), by first inlining the
* CTEs and calling CreateDistributedPlannedStmt() in PG_TRY() block. The
* function returns NULL if the planning fails on the query where eligable
* CTEs are inlined.
*/
static PlannedStmt *
InlineCtesAndCreateDistributedPlannedStmt(uint64 planId,
DistributedPlanningContext *planContext)
{
if (!EnableCTEInlining)
{
/*
* In Postgres 12+, users can adjust whether to inline/not inline CTEs
* by [NOT] MATERIALIZED keywords. However, in PG 11, that's not possible.
* So, with this we provide a way to prevent CTE inlining on Postgres 11.
*
* The main use-case for this is not to have divergent test outputs between
* PG 11 vs PG 12, so not very much intended for users.
*/
return NULL;
}
/*
* We'll inline the CTEs and try distributed planning, preserve the original
* query in case the planning fails and we fallback to recursive planning of
* CTEs.
*/
Query *copyOfOriginalQuery = copyObject(planContext->originalQuery);
RecursivelyInlineCtesInQueryTree(copyOfOriginalQuery);
/* after inlining, we shouldn't have any inlinable CTEs */
Assert(!QueryTreeContainsInlinableCTE(copyOfOriginalQuery));
#if PG_VERSION_NUM < PG_VERSION_12
Query *query = planContext->query;
/*
* We had to implement this hack because on Postgres11 and below, the originalQuery
* and the query would have significant differences in terms of CTEs where CTEs
* would not be inlined on the query (as standard_planner() wouldn't inline CTEs
* on PG 11 and below).
*
* Instead, we prefer to pass the inlined query to the distributed planning. We rely
* on the fact that the query includes subqueries, and it'd definitely go through
* query pushdown planning. During query pushdown planning, the only relevant query
* tree is the original query.
*/
planContext->query = copyObject(copyOfOriginalQuery);
#endif
/* simply recurse into CreateDistributedPlannedStmt() in a PG_TRY() block */
PlannedStmt *result = TryCreateDistributedPlannedStmt(planContext->plan,
copyOfOriginalQuery,
planContext->query,
planContext->boundParams,
planContext->
plannerRestrictionContext);
#if PG_VERSION_NUM < PG_VERSION_12
/*
* Set back the original query, in case the planning failed and we need to go
* into distributed planning again.
*/
planContext->query = query;
#endif
return result;
}
/*
* TryCreateDistributedPlannedStmt is a wrapper around CreateDistributedPlannedStmt, simply
* calling it in PG_TRY()/PG_CATCH() block. The function returns a PlannedStmt if the input
* query can be planned by Citus. If not, the function returns NULL and generates a DEBUG4
* message with the reason for the failure.
*/
static PlannedStmt *
TryCreateDistributedPlannedStmt(PlannedStmt *localPlan,
Query *originalQuery,
Query *query, ParamListInfo boundParams,
PlannerRestrictionContext *plannerRestrictionContext)
{
MemoryContext savedContext = CurrentMemoryContext;
PlannedStmt *result = NULL;
DistributedPlanningContext *planContext = palloc0(sizeof(DistributedPlanningContext));
planContext->plan = localPlan;
planContext->boundParams = boundParams;
planContext->originalQuery = originalQuery;
planContext->query = query;
planContext->plannerRestrictionContext = plannerRestrictionContext;
PG_TRY();
{
result = CreateDistributedPlannedStmt(planContext);
}
PG_CATCH();
{
MemoryContextSwitchTo(savedContext);
ErrorData *edata = CopyErrorData();
FlushErrorState();
/* don't try to intercept PANIC or FATAL, let those breeze past us */
if (edata->elevel != ERROR)
{
PG_RE_THROW();
}
ereport(DEBUG4, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("Planning after CTEs inlined failed with "
"\nmessage: %s\ndetail: %s\nhint: %s",
edata->message ? edata->message : "",
edata->detail ? edata->detail : "",
edata->hint ? edata->hint : "")));
/* leave the error handling system */
FreeErrorData(edata);
result = NULL;
}
PG_END_TRY();
return result;
}
/*
* CreateDistributedPlan generates a distributed plan for a query.
@ -860,7 +696,6 @@ CreateDistributedPlan(uint64 planId, Query *originalQuery, Query *query, ParamLi
PlannerRestrictionContext *plannerRestrictionContext)
{
DistributedPlan *distributedPlan = NULL;
bool hasCtes = originalQuery->cteList != NIL;
if (IsModifyCommand(originalQuery))
{
@ -991,6 +826,7 @@ CreateDistributedPlan(uint64 planId, Query *originalQuery, Query *query, ParamLi
* the CTEs are referenced then there are no subplans, but we still want
* to retry the router planner.
*/
bool hasCtes = originalQuery->cteList != NIL;
if (list_length(subPlanList) > 0 || hasCtes)
{
Query *newQuery = copyObject(originalQuery);
@ -1022,8 +858,9 @@ CreateDistributedPlan(uint64 planId, Query *originalQuery, Query *query, ParamLi
*query = *newQuery;
/* recurse into CreateDistributedPlan with subqueries/CTEs replaced */
distributedPlan = CreateDistributedPlan(planId, originalQuery, query, NULL, false,
plannerRestrictionContext);
distributedPlan = CreateDistributedPlan(planId, originalQuery, query, NULL,
false,
plannerRestrictionContext);
/* distributedPlan cannot be null since hasUnresolvedParams argument was false */
Assert(distributedPlan != NULL);

View File

@ -383,9 +383,9 @@ CreateInsertSelectIntoLocalTablePlan(uint64 planId, Query *originalQuery, ParamL
Query *selectQuery = BuildSelectForInsertSelect(originalQuery);
originalQuery->cteList = NIL;
DistributedPlan *distPlan = CreateDistributedPlan(planId, selectQuery,
copyObject(selectQuery),
boundParams, hasUnresolvedParams,
plannerRestrictionContext);
copyObject(selectQuery),
boundParams, hasUnresolvedParams,
plannerRestrictionContext);
/*
* We don't expect distPlan to be NULL here because hasUnresolvedParams is

View File

@ -80,7 +80,6 @@ static Oid NodeTryGetRteRelid(Node *node);
static bool FullCompositeFieldList(List *compositeFieldList);
static bool HasUnsupportedJoinWalker(Node *node, void *context);
static bool ErrorHintRequired(const char *errorHint, Query *queryTree);
static bool HasTablesample(Query *queryTree);
static bool HasComplexRangeTableType(Query *queryTree);
static bool IsReadIntermediateResultFunction(Node *node);
static bool IsReadIntermediateResultArrayFunction(Node *node);
@ -956,7 +955,7 @@ DeferErrorIfQueryNotSupported(Query *queryTree)
/* HasTablesample returns tree if the query contains tablesample */
static bool
bool
HasTablesample(Query *queryTree)
{
List *rangeTableList = queryTree->rtable;

View File

@ -971,8 +971,8 @@ DeferErrorIfCannotPushdownSubquery(Query *subqueryTree, bool outerMostQueryHasLi
if (subqueryTree->cteList)
{
preconditionsSatisfied = false;
errorDetail = "Common Table Expressions are currently unsupported";
//preconditionsSatisfied = false;
//errorDetail = "Common Table Expressions are currently unsupported";
}
if (subqueryTree->hasForUpdate)
@ -1145,9 +1145,9 @@ DeferErrorIfUnsupportedTableCombination(Query *queryTree)
}
else if (rangeTableEntry->rtekind == RTE_CTE)
{
unsupportedTableCombination = true;
errorDetail = "CTEs in subqueries are currently unsupported";
break;
//unsupportedTableCombination = true;
//errorDetail = "CTEs in subqueries are currently unsupported";
//break;
}
else if (rangeTableEntry->rtekind == RTE_VALUES)
{

View File

@ -57,6 +57,7 @@
#include "distributed/citus_nodes.h"
#include "distributed/citus_ruleutils.h"
#include "distributed/commands/multi_copy.h"
#include "distributed/cte_inline.h"
#include "distributed/distributed_planner.h"
#include "distributed/errormessage.h"
#include "distributed/local_distributed_join_planner.h"
@ -327,7 +328,7 @@ RecursivelyPlanSubqueriesAndCTEs(Query *query, RecursivePlanningContext *context
if (ShouldRecursivelyPlanAllSubqueriesInWhere(query))
{
/* replace all subqueries in the WHERE clause */
RecursivelyPlanAllSubqueries((Node *) query->jointree->quals, context);
RecursivelyPlanAllSubqueries((Node *) (query)->jointree->quals, context);
}
if (query->havingQual != NULL)
@ -712,6 +713,14 @@ RecursivelyPlanAllSubqueries(Node *node, RecursivePlanningContext *planningConte
return expression_tree_walker(node, RecursivelyPlanAllSubqueries, planningContext);
}
/*
* IsGroupingFunc returns whether node is a GroupingFunc.
*/
static bool
IsGroupingFunc(Node *node)
{
return IsA(node, GroupingFunc);
}
/*
* RecursivelyPlanCTEs plans all CTEs in the query by recursively calling the planner
@ -733,20 +742,15 @@ RecursivelyPlanCTEs(Query *query, RecursivePlanningContext *planningContext)
return NULL;
}
if (query->hasRecursive)
{
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"recursive CTEs are not supported in distributed "
"queries",
NULL, NULL);
}
/* get all RTE_CTEs that point to CTEs from cteList */
CteReferenceListWalker((Node *) query, &context);
foreach(cteCell, query->cteList)
List *copyOfCteList = list_copy(query->cteList);
foreach(cteCell, copyOfCteList)
{
CommonTableExpr *cte = (CommonTableExpr *) lfirst(cteCell);
char *cteName = cte->ctename;
Query *subquery = (Query *) cte->ctequery;
uint64 planId = planningContext->planId;
@ -754,13 +758,7 @@ RecursivelyPlanCTEs(Query *query, RecursivePlanningContext *planningContext)
ListCell *rteCell = NULL;
int replacedCtesCount = 0;
if (ContainsReferencesToOuterQuery(subquery))
{
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"CTEs that refer to other subqueries are not "
"supported in multi-shard queries",
NULL, NULL);
}
if (cte->cterefcount == 0 && subquery->commandType == CMD_SELECT)
{
@ -772,6 +770,64 @@ RecursivelyPlanCTEs(Query *query, RecursivePlanningContext *planningContext)
continue;
}
/*
* First, make sure that Postgres is OK to inline the CTE. Later, check for
* distributed query planning constraints that might prevent inlining.
*/
if (EnableCTEInlining &&
PostgreSQLCTEInlineCondition(cte, query->commandType) &&
planningContext->allDistributionKeysInQueryAreEqual &&
query->groupingSets == NIL && !query->hasForUpdate &&
!FindNodeMatchingCheckFunction((Node *) query, IsGroupingFunc) &&
!HasTablesample(query))
{
Query *copyQuery = copyObject(query);
CommonTableExpr *copyCte = copyObject(cte);
/* do the hard work of cte inlining */
inline_cte(copyQuery, copyCte);
/* clean-up the necessary fields for distributed planning */
copyCte->cterefcount = 0;
copyQuery->cteList = list_delete_ptr(copyQuery->cteList, copyCte);
DeferredErrorMessage *df = DeferErrorIfUnsupportedSubqueryPushdown(copyQuery, planningContext->plannerRestrictionContext);
if (df == NULL)
{
elog(DEBUG1, "CTE %s is going to be inlined via "
"distributed planning", cte->ctename);
inline_cte(query, cte);
/* clean-up the necessary fields for distributed planning */
cte->cterefcount = 0;
query->cteList = list_delete_ptr(query->cteList, cte);
continue;
}
//else
// elog(DEBUG1, "Reason: %s:%s", df->message, df->detail);
}
if (ContainsReferencesToOuterQuery(subquery))
{
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"CTEs that refer to other subqueries are not "
"supported in multi-shard queries",
NULL, NULL);
}
if (query->hasRecursive)
{
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"recursive CTEs are not supported in distributed "
"queries",
NULL, NULL);
}
uint32 subPlanId = list_length(planningContext->subPlanList) + 1;
if (IsLoggableLevel(DEBUG1))
@ -971,7 +1027,7 @@ AllDistributionKeysInSubqueryAreEqual(Query *subquery,
/* we don't support distribution eq. checks for CTEs yet */
if (subquery->cteList != NIL)
{
return false;
/*return false; */
}
PlannerRestrictionContext *filteredRestrictionContext =

View File

@ -152,7 +152,6 @@ static bool RangeTableArrayContainsAnyRTEIdentities(RangeTblEntry **rangeTableEn
rangeTableArrayLength, Relids
queryRteIdentities);
static int RangeTableOffsetCompat(PlannerInfo *root, AppendRelInfo *appendRelInfo);
static Relids QueryRteIdentities(Query *queryTree);
/*
@ -169,7 +168,7 @@ AllDistributionKeysInQueryAreEqual(Query *originalQuery,
/* we don't support distribution key equality checks for CTEs yet */
if (originalQuery->cteList != NIL)
{
return false;
//return false;
}
/* we don't support distribution key equality checks for local tables */
@ -2091,7 +2090,7 @@ RangeTableArrayContainsAnyRTEIdentities(RangeTblEntry **rangeTableEntries, int
* QueryRteIdentities gets a queryTree, find get all the rte identities assigned by
* us.
*/
static Relids
Relids
QueryRteIdentities(Query *queryTree)
{
List *rangeTableList = NULL;

View File

@ -17,5 +17,10 @@ extern bool EnableCTEInlining;
extern void RecursivelyInlineCtesInQueryTree(Query *query);
extern bool QueryTreeContainsInlinableCTE(Query *queryTree);
/* copy & paste from Postgres source, moved into a function for readability */
extern bool PostgreSQLCTEInlineCondition(CommonTableExpr *cte, CmdType cmdType);
/* the following utility functions are copy & paste from PostgreSQL code */
extern void inline_cte(Query *mainQuery, CommonTableExpr *cte);
#endif /* CTE_INLINE_H */

View File

@ -229,6 +229,7 @@ extern MultiExtendedOp * MultiExtendedOpNode(Query *queryTree, Query *originalQu
extern DeferredErrorMessage * DeferErrorIfUnsupportedSubqueryRepartition(Query *
subqueryTree);
extern MultiNode * MultiNodeTree(Query *queryTree);
extern bool HasTablesample(Query *queryTree);
#endif /* MULTI_LOGICAL_PLANNER_H */

View File

@ -65,8 +65,6 @@ extern DeferredErrorMessage * ModifyQuerySupported(Query *queryTree, Query *orig
plannerRestrictionContext);
extern DeferredErrorMessage * ErrorIfOnConflictNotSupported(Query *queryTree);
extern List * ShardIntervalOpExpressions(ShardInterval *shardInterval, Index rteIndex);
extern RelationRestrictionContext * CopyRelationRestrictionContext(
RelationRestrictionContext *oldContext);
extern Oid ExtractFirstCitusTableId(Query *query);
extern RangeTblEntry * ExtractSelectRangeTableEntry(Query *query);

View File

@ -16,6 +16,7 @@
#include "distributed/metadata_cache.h"
#define SINGLE_RTE_INDEX 1
extern Relids QueryRteIdentities(Query *queryTree);
extern bool AllDistributionKeysInQueryAreEqual(Query *originalQuery,
PlannerRestrictionContext *
@ -55,4 +56,6 @@ extern RelationRestrictionContext * FilterRelationRestrictionContext(
RelationRestrictionContext *relationRestrictionContext,
Relids
queryRteIdentities);
extern RelationRestrictionContext *
CopyRelationRestrictionContext(RelationRestrictionContext *relationRestrictionContext);
#endif /* RELATION_RESTRICTION_EQUIVALENCE_H */