mirror of https://github.com/citusdata/citus.git
WIP
parent
4c3934272f
commit
69f08ad88e
|
@ -41,7 +41,6 @@ static bool contain_dml_walker(Node *node, void *context);
|
||||||
|
|
||||||
/* the following utility functions are related to Citus' logic */
|
/* the following utility functions are related to Citus' logic */
|
||||||
static bool RecursivelyInlineCteWalker(Node *node, void *context);
|
static bool RecursivelyInlineCteWalker(Node *node, void *context);
|
||||||
static void InlineCTEsInQueryTree(Query *query);
|
|
||||||
static bool QueryTreeContainsInlinableCteWalker(Node *node);
|
static bool QueryTreeContainsInlinableCteWalker(Node *node);
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -75,15 +75,6 @@ static bool ListContainsDistributedTableRTE(List *rangeTableList);
|
||||||
static bool IsUpdateOrDelete(Query *query);
|
static bool IsUpdateOrDelete(Query *query);
|
||||||
static PlannedStmt * CreateDistributedPlannedStmt(
|
static PlannedStmt * CreateDistributedPlannedStmt(
|
||||||
DistributedPlanningContext *planContext);
|
DistributedPlanningContext *planContext);
|
||||||
static PlannedStmt * InlineCtesAndCreateDistributedPlannedStmt(uint64 planId,
|
|
||||||
DistributedPlanningContext
|
|
||||||
*planContext);
|
|
||||||
static PlannedStmt * TryCreateDistributedPlannedStmt(PlannedStmt *localPlan,
|
|
||||||
Query *originalQuery,
|
|
||||||
Query *query, ParamListInfo
|
|
||||||
boundParams,
|
|
||||||
PlannerRestrictionContext *
|
|
||||||
plannerRestrictionContext);
|
|
||||||
static DeferredErrorMessage * DeferErrorIfPartitionTableNotSingleReplicated(Oid
|
static DeferredErrorMessage * DeferErrorIfPartitionTableNotSingleReplicated(Oid
|
||||||
relationId);
|
relationId);
|
||||||
|
|
||||||
|
@ -625,28 +616,6 @@ CreateDistributedPlannedStmt(DistributedPlanningContext *planContext)
|
||||||
uint64 planId = NextPlanId++;
|
uint64 planId = NextPlanId++;
|
||||||
bool hasUnresolvedParams = false;
|
bool hasUnresolvedParams = false;
|
||||||
|
|
||||||
PlannedStmt *resultPlan = NULL;
|
|
||||||
|
|
||||||
if (QueryTreeContainsInlinableCTE(planContext->originalQuery))
|
|
||||||
{
|
|
||||||
/*
|
|
||||||
* Inlining CTEs as subqueries in the query can avoid recursively
|
|
||||||
* planning some (or all) of the CTEs. In other words, the inlined
|
|
||||||
* CTEs could become part of query pushdown planning, which is much
|
|
||||||
* more efficient than recursively planning. So, first try distributed
|
|
||||||
* planning on the inlined CTEs in the query tree.
|
|
||||||
*
|
|
||||||
* We also should fallback to distributed planning with non-inlined CTEs
|
|
||||||
* if the distributed planning fails with inlined CTEs, because recursively
|
|
||||||
* planning CTEs can provide full SQL coverage, although it might be slow.
|
|
||||||
*/
|
|
||||||
resultPlan = InlineCtesAndCreateDistributedPlannedStmt(planId, planContext);
|
|
||||||
if (resultPlan != NULL)
|
|
||||||
{
|
|
||||||
return resultPlan;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (HasUnresolvedExternParamsWalker((Node *) planContext->originalQuery,
|
if (HasUnresolvedExternParamsWalker((Node *) planContext->originalQuery,
|
||||||
planContext->boundParams))
|
planContext->boundParams))
|
||||||
{
|
{
|
||||||
|
@ -702,7 +671,7 @@ CreateDistributedPlannedStmt(DistributedPlanningContext *planContext)
|
||||||
distributedPlan->planId = planId;
|
distributedPlan->planId = planId;
|
||||||
|
|
||||||
/* create final plan by combining local plan with distributed plan */
|
/* create final plan by combining local plan with distributed plan */
|
||||||
resultPlan = FinalizePlan(planContext->plan, distributedPlan);
|
PlannedStmt *resultPlan = FinalizePlan(planContext->plan, distributedPlan);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* As explained above, force planning costs to be unrealistically high if
|
* As explained above, force planning costs to be unrealistically high if
|
||||||
|
@ -721,113 +690,6 @@ CreateDistributedPlannedStmt(DistributedPlanningContext *planContext)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* InlineCtesAndCreateDistributedPlannedStmt gets all the parameters required
|
|
||||||
* for creating a distributed planned statement. The function is primarily a
|
|
||||||
* wrapper on top of CreateDistributedPlannedStmt(), by first inlining the
|
|
||||||
* CTEs and calling CreateDistributedPlannedStmt() in PG_TRY() block. The
|
|
||||||
* function returns NULL if the planning fails on the query where eligable
|
|
||||||
* CTEs are inlined.
|
|
||||||
*/
|
|
||||||
static PlannedStmt *
|
|
||||||
InlineCtesAndCreateDistributedPlannedStmt(uint64 planId,
|
|
||||||
DistributedPlanningContext *planContext)
|
|
||||||
{
|
|
||||||
if (!EnableCTEInlining)
|
|
||||||
{
|
|
||||||
/*
|
|
||||||
* In Postgres 12+, users can adjust whether to inline/not inline CTEs
|
|
||||||
* by [NOT] MATERIALIZED keywords. However, in PG 11, that's not possible.
|
|
||||||
* So, with this we provide a way to prevent CTE inlining on Postgres 11.
|
|
||||||
*
|
|
||||||
* The main use-case for this is not to have divergent test outputs between
|
|
||||||
* PG 11 vs PG 12, so not very much intended for users.
|
|
||||||
*/
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
|
||||||
* We'll inline the CTEs and try distributed planning, preserve the original
|
|
||||||
* query in case the planning fails and we fallback to recursive planning of
|
|
||||||
* CTEs.
|
|
||||||
*/
|
|
||||||
Query *copyOfOriginalQuery = copyObject(planContext->originalQuery);
|
|
||||||
|
|
||||||
RecursivelyInlineCtesInQueryTree(copyOfOriginalQuery);
|
|
||||||
|
|
||||||
/* after inlining, we shouldn't have any inlinable CTEs */
|
|
||||||
Assert(!QueryTreeContainsInlinableCTE(copyOfOriginalQuery));
|
|
||||||
|
|
||||||
/* simply recurse into CreateDistributedPlannedStmt() in a PG_TRY() block */
|
|
||||||
PlannedStmt *result = TryCreateDistributedPlannedStmt(planContext->plan,
|
|
||||||
copyOfOriginalQuery,
|
|
||||||
planContext->query,
|
|
||||||
planContext->boundParams,
|
|
||||||
planContext->
|
|
||||||
plannerRestrictionContext);
|
|
||||||
|
|
||||||
return result;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* TryCreateDistributedPlannedStmt is a wrapper around CreateDistributedPlannedStmt, simply
|
|
||||||
* calling it in PG_TRY()/PG_CATCH() block. The function returns a PlannedStmt if the input
|
|
||||||
* query can be planned by Citus. If not, the function returns NULL and generates a DEBUG4
|
|
||||||
* message with the reason for the failure.
|
|
||||||
*/
|
|
||||||
static PlannedStmt *
|
|
||||||
TryCreateDistributedPlannedStmt(PlannedStmt *localPlan,
|
|
||||||
Query *originalQuery,
|
|
||||||
Query *query, ParamListInfo boundParams,
|
|
||||||
PlannerRestrictionContext *plannerRestrictionContext)
|
|
||||||
{
|
|
||||||
MemoryContext savedContext = CurrentMemoryContext;
|
|
||||||
PlannedStmt *result = NULL;
|
|
||||||
|
|
||||||
DistributedPlanningContext *planContext = palloc0(sizeof(DistributedPlanningContext));
|
|
||||||
|
|
||||||
planContext->plan = localPlan;
|
|
||||||
planContext->boundParams = boundParams;
|
|
||||||
planContext->originalQuery = originalQuery;
|
|
||||||
planContext->query = query;
|
|
||||||
planContext->plannerRestrictionContext = plannerRestrictionContext;
|
|
||||||
|
|
||||||
|
|
||||||
PG_TRY();
|
|
||||||
{
|
|
||||||
result = CreateDistributedPlannedStmt(planContext);
|
|
||||||
}
|
|
||||||
PG_CATCH();
|
|
||||||
{
|
|
||||||
MemoryContextSwitchTo(savedContext);
|
|
||||||
ErrorData *edata = CopyErrorData();
|
|
||||||
FlushErrorState();
|
|
||||||
|
|
||||||
/* don't try to intercept PANIC or FATAL, let those breeze past us */
|
|
||||||
if (edata->elevel != ERROR)
|
|
||||||
{
|
|
||||||
PG_RE_THROW();
|
|
||||||
}
|
|
||||||
|
|
||||||
ereport(DEBUG4, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
|
||||||
errmsg("Planning after CTEs inlined failed with "
|
|
||||||
"\nmessage: %s\ndetail: %s\nhint: %s",
|
|
||||||
edata->message ? edata->message : "",
|
|
||||||
edata->detail ? edata->detail : "",
|
|
||||||
edata->hint ? edata->hint : "")));
|
|
||||||
|
|
||||||
/* leave the error handling system */
|
|
||||||
FreeErrorData(edata);
|
|
||||||
|
|
||||||
result = NULL;
|
|
||||||
}
|
|
||||||
PG_END_TRY();
|
|
||||||
|
|
||||||
return result;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* CreateDistributedPlan generates a distributed plan for a query.
|
* CreateDistributedPlan generates a distributed plan for a query.
|
||||||
* It goes through 3 steps:
|
* It goes through 3 steps:
|
||||||
|
@ -843,7 +705,6 @@ CreateDistributedPlan(uint64 planId, Query *originalQuery, Query *query, ParamLi
|
||||||
PlannerRestrictionContext *plannerRestrictionContext)
|
PlannerRestrictionContext *plannerRestrictionContext)
|
||||||
{
|
{
|
||||||
DistributedPlan *distributedPlan = NULL;
|
DistributedPlan *distributedPlan = NULL;
|
||||||
bool hasCtes = originalQuery->cteList != NIL;
|
|
||||||
|
|
||||||
if (IsModifyCommand(originalQuery))
|
if (IsModifyCommand(originalQuery))
|
||||||
{
|
{
|
||||||
|
@ -974,7 +835,7 @@ CreateDistributedPlan(uint64 planId, Query *originalQuery, Query *query, ParamLi
|
||||||
* the CTEs are referenced then there are no subplans, but we still want
|
* the CTEs are referenced then there are no subplans, but we still want
|
||||||
* to retry the router planner.
|
* to retry the router planner.
|
||||||
*/
|
*/
|
||||||
if (list_length(subPlanList) > 0 || hasCtes)
|
if (list_length(subPlanList) > 0)
|
||||||
{
|
{
|
||||||
Query *newQuery = copyObject(originalQuery);
|
Query *newQuery = copyObject(originalQuery);
|
||||||
bool setPartitionedTablesInherited = false;
|
bool setPartitionedTablesInherited = false;
|
||||||
|
|
|
@ -983,12 +983,6 @@ DeferErrorIfCannotPushdownSubquery(Query *subqueryTree, bool outerMostQueryHasLi
|
||||||
errorDetail = "Recursive queries are currently unsupported";
|
errorDetail = "Recursive queries are currently unsupported";
|
||||||
}
|
}
|
||||||
|
|
||||||
if (subqueryTree->cteList)
|
|
||||||
{
|
|
||||||
preconditionsSatisfied = false;
|
|
||||||
errorDetail = "Common Table Expressions are currently unsupported";
|
|
||||||
}
|
|
||||||
|
|
||||||
if (subqueryTree->hasForUpdate)
|
if (subqueryTree->hasForUpdate)
|
||||||
{
|
{
|
||||||
preconditionsSatisfied = false;
|
preconditionsSatisfied = false;
|
||||||
|
@ -1216,12 +1210,6 @@ DeferErrorIfUnsupportedTableCombination(Query *queryTree)
|
||||||
/* immutable function RTEs are treated as reference tables */
|
/* immutable function RTEs are treated as reference tables */
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else if (rangeTableEntry->rtekind == RTE_CTE)
|
|
||||||
{
|
|
||||||
unsupportedTableCombination = true;
|
|
||||||
errorDetail = "CTEs in subqueries are currently unsupported";
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
unsupportedTableCombination = true;
|
unsupportedTableCombination = true;
|
||||||
|
|
|
@ -57,6 +57,7 @@
|
||||||
#include "distributed/citus_nodes.h"
|
#include "distributed/citus_nodes.h"
|
||||||
#include "distributed/citus_ruleutils.h"
|
#include "distributed/citus_ruleutils.h"
|
||||||
#include "distributed/commands/multi_copy.h"
|
#include "distributed/commands/multi_copy.h"
|
||||||
|
#include "distributed/cte_inline.h"
|
||||||
#include "distributed/distributed_planner.h"
|
#include "distributed/distributed_planner.h"
|
||||||
#include "distributed/errormessage.h"
|
#include "distributed/errormessage.h"
|
||||||
#include "distributed/local_distributed_join_planner.h"
|
#include "distributed/local_distributed_join_planner.h"
|
||||||
|
@ -268,6 +269,8 @@ GenerateSubplansForSubqueriesAndCTEs(uint64 planId, Query *originalQuery,
|
||||||
static DeferredErrorMessage *
|
static DeferredErrorMessage *
|
||||||
RecursivelyPlanSubqueriesAndCTEs(Query *query, RecursivePlanningContext *context)
|
RecursivelyPlanSubqueriesAndCTEs(Query *query, RecursivePlanningContext *context)
|
||||||
{
|
{
|
||||||
|
InlineCTEsInQueryTree(query);
|
||||||
|
|
||||||
DeferredErrorMessage *error = RecursivelyPlanCTEs(query, context);
|
DeferredErrorMessage *error = RecursivelyPlanCTEs(query, context);
|
||||||
if (error != NULL)
|
if (error != NULL)
|
||||||
{
|
{
|
||||||
|
|
|
@ -16,6 +16,7 @@
|
||||||
extern bool EnableCTEInlining;
|
extern bool EnableCTEInlining;
|
||||||
|
|
||||||
extern void RecursivelyInlineCtesInQueryTree(Query *query);
|
extern void RecursivelyInlineCtesInQueryTree(Query *query);
|
||||||
|
extern void InlineCTEsInQueryTree(Query *query);
|
||||||
extern bool QueryTreeContainsInlinableCTE(Query *queryTree);
|
extern bool QueryTreeContainsInlinableCTE(Query *queryTree);
|
||||||
|
|
||||||
#endif /* CTE_INLINE_H */
|
#endif /* CTE_INLINE_H */
|
||||||
|
|
Loading…
Reference in New Issue