pull/6876/head
aykutbozkurt 2023-04-24 17:50:42 +03:00
parent c660d01ae3
commit d682452424
1 changed files with 18 additions and 65 deletions

View File

@ -92,15 +92,8 @@ static bool ListContainsDistributedTableRTE(List *rangeTableList,
bool *maybeHasForeignDistributedTable); bool *maybeHasForeignDistributedTable);
static PlannedStmt * CreateDistributedPlannedStmt( static PlannedStmt * CreateDistributedPlannedStmt(
DistributedPlanningContext *planContext); DistributedPlanningContext *planContext);
static PlannedStmt * InlineCtesAndCreateDistributedPlannedStmt(uint64 planId, static PlannedStmt * TryInlineCtesAndCreateDistributedPlannedStmt(
DistributedPlanningContext DistributedPlanningContext *planContext);
*planContext);
static PlannedStmt * TryCreateDistributedPlannedStmt(PlannedStmt *localPlan,
Query *originalQuery,
Query *query, ParamListInfo
boundParams,
PlannerRestrictionContext *
plannerRestrictionContext);
static DeferredErrorMessage * DeferErrorIfPartitionTableNotSingleReplicated(Oid static DeferredErrorMessage * DeferErrorIfPartitionTableNotSingleReplicated(Oid
relationId); relationId);
@ -711,7 +704,7 @@ CreateDistributedPlannedStmt(DistributedPlanningContext *planContext)
* if the distributed planning fails with inlined CTEs, because recursively * if the distributed planning fails with inlined CTEs, because recursively
* planning CTEs can provide full SQL coverage, although it might be slow. * planning CTEs can provide full SQL coverage, although it might be slow.
*/ */
resultPlan = InlineCtesAndCreateDistributedPlannedStmt(planId, planContext); resultPlan = TryInlineCtesAndCreateDistributedPlannedStmt(planContext);
if (resultPlan != NULL) if (resultPlan != NULL)
{ {
return resultPlan; return resultPlan;
@ -796,7 +789,7 @@ CreateDistributedPlannedStmt(DistributedPlanningContext *planContext)
/* /*
* InlineCtesAndCreateDistributedPlannedStmt gets all the parameters required * TryInlineCtesAndCreateDistributedPlannedStmt gets all the parameters required
* for creating a distributed planned statement. The function is primarily a * for creating a distributed planned statement. The function is primarily a
* wrapper on top of CreateDistributedPlannedStmt(), by first inlining the * wrapper on top of CreateDistributedPlannedStmt(), by first inlining the
* CTEs and calling CreateDistributedPlannedStmt() in PG_TRY() block. The * CTEs and calling CreateDistributedPlannedStmt() in PG_TRY() block. The
@ -804,68 +797,28 @@ CreateDistributedPlannedStmt(DistributedPlanningContext *planContext)
* CTEs are inlined. * CTEs are inlined.
*/ */
static PlannedStmt * static PlannedStmt *
InlineCtesAndCreateDistributedPlannedStmt(uint64 planId, TryInlineCtesAndCreateDistributedPlannedStmt(DistributedPlanningContext *planContext)
DistributedPlanningContext *planContext)
{
/*
* We'll inline the CTEs and try distributed planning, preserve the original
* query ,the modified query, and planner context in case the planning fails
* and we fallback to recursive planning of CTEs.
*/
Query *copyOfOriginalQuery = copyObject(planContext->originalQuery);
RecursivelyInlineCtesInQueryTree(copyOfOriginalQuery);
/* after inlining, we shouldn't have any inlinable CTEs */
Assert(!QueryTreeContainsInlinableCTE(copyOfOriginalQuery));
/* recompute modified query and planner context after we inlined the query */
PlannerRestrictionContext *plannerContextForInlinedQuery =
CreateAndPushPlannerRestrictionContext();
Query *copyOfInlinedOriginalQuery = copyObject(copyOfOriginalQuery);
standard_planner(copyOfInlinedOriginalQuery, NULL, 0, planContext->boundParams);
Query *modifiedInlinedQuery = copyOfInlinedOriginalQuery;
/* simply recurse into CreateDistributedPlannedStmt() in a PG_TRY() block */
PlannedStmt *result = TryCreateDistributedPlannedStmt(planContext->plan,
copyOfOriginalQuery,
modifiedInlinedQuery,
planContext->boundParams,
plannerContextForInlinedQuery);
PopPlannerRestrictionContext();
return result;
}
/*
* TryCreateDistributedPlannedStmt is a wrapper around CreateDistributedPlannedStmt, simply
* calling it in PG_TRY()/PG_CATCH() block. The function returns a PlannedStmt if the input
* query can be planned by Citus. If not, the function returns NULL and generates a DEBUG4
* message with the reason for the failure.
*/
static PlannedStmt *
TryCreateDistributedPlannedStmt(PlannedStmt *localPlan,
Query *originalQuery,
Query *query, ParamListInfo boundParams,
PlannerRestrictionContext *plannerRestrictionContext)
{ {
MemoryContext savedContext = CurrentMemoryContext; MemoryContext savedContext = CurrentMemoryContext;
PlannedStmt *result = NULL; PlannedStmt *result = NULL;
DistributedPlanningContext *planContext = palloc0(sizeof(DistributedPlanningContext)); /*
* We'll inline the CTEs and try distributed planning, preserve the original
planContext->plan = localPlan; * query, the modified query, and planner context in case the planning fails
planContext->boundParams = boundParams; * and we fallback to recursive planning of CTEs.
planContext->originalQuery = originalQuery; */
planContext->query = query; Query *inlinedQuery = copyObject(planContext->originalQuery);
planContext->plannerRestrictionContext = plannerRestrictionContext; RecursivelyInlineCtesInQueryTree(inlinedQuery);
/* after inlining, we shouldn't have any inlinable CTEs */
Assert(!QueryTreeContainsInlinableCTE(inlinedQuery));
PG_TRY(); PG_TRY();
{ {
result = CreateDistributedPlannedStmt(planContext); /* replan after we inlined the query */
result = planner(inlinedQuery,
NULL, 0,
planContext->boundParams);
} }
PG_CATCH(); PG_CATCH();
{ {