mirror of https://github.com/citusdata/citus.git
Use planContext in createDistributedPlan
parent
7c76a5ab95
commit
70a332ffb6
|
@ -70,11 +70,9 @@ static bool ListContainsDistributedTableRTE(List *rangeTableList);
|
||||||
static bool IsUpdateOrDelete(Query *query);
|
static bool IsUpdateOrDelete(Query *query);
|
||||||
static PlannedStmt * CreateDistributedPlannedStmt(
|
static PlannedStmt * CreateDistributedPlannedStmt(
|
||||||
DistributedPlanningContext *planContext);
|
DistributedPlanningContext *planContext);
|
||||||
static DistributedPlan * CreateDistributedPlan(uint64 planId, Query *originalQuery,
|
static DistributedPlan * CreateDistributedPlan(uint64 planId,
|
||||||
Query *query, ParamListInfo boundParams,
|
DistributedPlanningContext *planContext,
|
||||||
bool hasUnresolvedParams,
|
bool hasUnresolvedParams);
|
||||||
PlannerRestrictionContext *
|
|
||||||
plannerRestrictionContext);
|
|
||||||
static void FinalizeDistributedPlan(DistributedPlan *plan, Query *originalQuery);
|
static void FinalizeDistributedPlan(DistributedPlan *plan, Query *originalQuery);
|
||||||
static void RecordSubPlansUsedInPlan(DistributedPlan *plan, Query *originalQuery);
|
static void RecordSubPlansUsedInPlan(DistributedPlan *plan, Query *originalQuery);
|
||||||
static DeferredErrorMessage * DeferErrorIfPartitionTableNotSingleReplicated(Oid
|
static DeferredErrorMessage * DeferErrorIfPartitionTableNotSingleReplicated(Oid
|
||||||
|
@ -690,21 +688,20 @@ CreateDistributedPlannedStmt(DistributedPlanningContext *planContext)
|
||||||
* 3. Logical planner
|
* 3. Logical planner
|
||||||
*/
|
*/
|
||||||
static DistributedPlan *
|
static DistributedPlan *
|
||||||
CreateDistributedPlan(uint64 planId, Query *originalQuery, Query *query, ParamListInfo
|
CreateDistributedPlan(uint64 planId, DistributedPlanningContext *planContext, bool
|
||||||
boundParams, bool hasUnresolvedParams,
|
hasUnresolvedParams)
|
||||||
PlannerRestrictionContext *plannerRestrictionContext)
|
|
||||||
{
|
{
|
||||||
DistributedPlan *distributedPlan = NULL;
|
DistributedPlan *distributedPlan = NULL;
|
||||||
bool hasCtes = originalQuery->cteList != NIL;
|
bool hasCtes = planContext->originalQuery->cteList != NIL;
|
||||||
|
|
||||||
if (IsModifyCommand(originalQuery))
|
if (IsModifyCommand(planContext->originalQuery))
|
||||||
{
|
{
|
||||||
EnsureModificationsCanRun();
|
EnsureModificationsCanRun();
|
||||||
|
|
||||||
Oid targetRelationId = ModifyQueryResultRelationId(query);
|
Oid targetRelationId = ModifyQueryResultRelationId(planContext->query);
|
||||||
EnsurePartitionTableNotReplicated(targetRelationId);
|
EnsurePartitionTableNotReplicated(targetRelationId);
|
||||||
|
|
||||||
if (InsertSelectIntoDistributedTable(originalQuery))
|
if (InsertSelectIntoDistributedTable(planContext->originalQuery))
|
||||||
{
|
{
|
||||||
if (hasUnresolvedParams)
|
if (hasUnresolvedParams)
|
||||||
{
|
{
|
||||||
|
@ -716,14 +713,16 @@ CreateDistributedPlan(uint64 planId, Query *originalQuery, Query *query, ParamLi
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
distributedPlan =
|
distributedPlan = CreateInsertSelectPlan(planId, planContext->originalQuery,
|
||||||
CreateInsertSelectPlan(planId, originalQuery, plannerRestrictionContext);
|
planContext->
|
||||||
|
plannerRestrictionContext);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
/* modifications are always routed through the same planner/executor */
|
/* modifications are always routed through the same planner/executor */
|
||||||
distributedPlan =
|
distributedPlan =
|
||||||
CreateModifyPlan(originalQuery, query, plannerRestrictionContext);
|
CreateModifyPlan(planContext->originalQuery, planContext->query,
|
||||||
|
planContext->plannerRestrictionContext);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* the functions above always return a plan, possibly with an error */
|
/* the functions above always return a plan, possibly with an error */
|
||||||
|
@ -731,7 +730,7 @@ CreateDistributedPlan(uint64 planId, Query *originalQuery, Query *query, ParamLi
|
||||||
|
|
||||||
if (distributedPlan->planningError == NULL)
|
if (distributedPlan->planningError == NULL)
|
||||||
{
|
{
|
||||||
FinalizeDistributedPlan(distributedPlan, originalQuery);
|
FinalizeDistributedPlan(distributedPlan, planContext->originalQuery);
|
||||||
|
|
||||||
return distributedPlan;
|
return distributedPlan;
|
||||||
}
|
}
|
||||||
|
@ -749,11 +748,11 @@ CreateDistributedPlan(uint64 planId, Query *originalQuery, Query *query, ParamLi
|
||||||
* produce distributed query plans.
|
* produce distributed query plans.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
distributedPlan = CreateRouterPlan(originalQuery, query,
|
distributedPlan = CreateRouterPlan(planContext->originalQuery, planContext->query,
|
||||||
plannerRestrictionContext);
|
planContext->plannerRestrictionContext);
|
||||||
if (distributedPlan->planningError == NULL)
|
if (distributedPlan->planningError == NULL)
|
||||||
{
|
{
|
||||||
FinalizeDistributedPlan(distributedPlan, originalQuery);
|
FinalizeDistributedPlan(distributedPlan, planContext->originalQuery);
|
||||||
|
|
||||||
return distributedPlan;
|
return distributedPlan;
|
||||||
}
|
}
|
||||||
|
@ -781,7 +780,7 @@ CreateDistributedPlan(uint64 planId, Query *originalQuery, Query *query, ParamLi
|
||||||
}
|
}
|
||||||
|
|
||||||
/* force evaluation of bound params */
|
/* force evaluation of bound params */
|
||||||
boundParams = copyParamList(boundParams);
|
planContext->boundParams = copyParamList(planContext->boundParams);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* If there are parameters that do have a value in boundParams, replace
|
* If there are parameters that do have a value in boundParams, replace
|
||||||
|
@ -789,14 +788,17 @@ CreateDistributedPlan(uint64 planId, Query *originalQuery, Query *query, ParamLi
|
||||||
* query into pieces (during recursive planning) or deparse parts of
|
* query into pieces (during recursive planning) or deparse parts of
|
||||||
* the query (during subquery pushdown planning).
|
* the query (during subquery pushdown planning).
|
||||||
*/
|
*/
|
||||||
originalQuery = (Query *) ResolveExternalParams((Node *) originalQuery,
|
planContext->originalQuery = (Query *) ResolveExternalParams(
|
||||||
boundParams);
|
(Node *) planContext->originalQuery,
|
||||||
|
planContext->boundParams);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Plan subqueries and CTEs that cannot be pushed down by recursively
|
* Plan subqueries and CTEs that cannot be pushed down by recursively
|
||||||
* calling the planner and return the resulting plans to subPlanList.
|
* calling the planner and return the resulting plans to subPlanList.
|
||||||
*/
|
*/
|
||||||
List *subPlanList = GenerateSubplansForSubqueriesAndCTEs(planId, originalQuery,
|
List *subPlanList = GenerateSubplansForSubqueriesAndCTEs(planId,
|
||||||
|
planContext->originalQuery,
|
||||||
|
planContext->
|
||||||
plannerRestrictionContext);
|
plannerRestrictionContext);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -814,7 +816,7 @@ CreateDistributedPlan(uint64 planId, Query *originalQuery, Query *query, ParamLi
|
||||||
*/
|
*/
|
||||||
if (list_length(subPlanList) > 0 || hasCtes)
|
if (list_length(subPlanList) > 0 || hasCtes)
|
||||||
{
|
{
|
||||||
Query *newQuery = copyObject(originalQuery);
|
Query *newQuery = copyObject(planContext->originalQuery);
|
||||||
bool setPartitionedTablesInherited = false;
|
bool setPartitionedTablesInherited = false;
|
||||||
PlannerRestrictionContext *currentPlannerRestrictionContext =
|
PlannerRestrictionContext *currentPlannerRestrictionContext =
|
||||||
CurrentPlannerRestrictionContext();
|
CurrentPlannerRestrictionContext();
|
||||||
|
@ -837,17 +839,18 @@ CreateDistributedPlan(uint64 planId, Query *originalQuery, Query *query, ParamLi
|
||||||
* being contiguous.
|
* being contiguous.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
standard_planner(newQuery, 0, boundParams);
|
standard_planner(newQuery, 0, planContext->boundParams);
|
||||||
|
|
||||||
/* overwrite the old transformed query with the new transformed query */
|
/* overwrite the old transformed query with the new transformed query */
|
||||||
memcpy(query, newQuery, sizeof(Query));
|
memcpy(planContext->query, newQuery, sizeof(Query));
|
||||||
|
|
||||||
|
planContext->boundParams = NULL;
|
||||||
|
|
||||||
/* recurse into CreateDistributedPlan with subqueries/CTEs replaced */
|
/* recurse into CreateDistributedPlan with subqueries/CTEs replaced */
|
||||||
distributedPlan = CreateDistributedPlan(planId, originalQuery, query, NULL, false,
|
distributedPlan = CreateDistributedPlan(planId, planContext, false);
|
||||||
plannerRestrictionContext);
|
|
||||||
distributedPlan->subPlanList = subPlanList;
|
distributedPlan->subPlanList = subPlanList;
|
||||||
|
|
||||||
FinalizeDistributedPlan(distributedPlan, originalQuery);
|
FinalizeDistributedPlan(distributedPlan, planContext->originalQuery);
|
||||||
|
|
||||||
return distributedPlan;
|
return distributedPlan;
|
||||||
}
|
}
|
||||||
|
@ -857,9 +860,9 @@ CreateDistributedPlan(uint64 planId, Query *originalQuery, Query *query, ParamLi
|
||||||
* logical planner cannot handle DML commands so return the plan with the
|
* logical planner cannot handle DML commands so return the plan with the
|
||||||
* error.
|
* error.
|
||||||
*/
|
*/
|
||||||
if (IsModifyCommand(originalQuery))
|
if (IsModifyCommand(planContext->originalQuery))
|
||||||
{
|
{
|
||||||
FinalizeDistributedPlan(distributedPlan, originalQuery);
|
FinalizeDistributedPlan(distributedPlan, planContext->originalQuery);
|
||||||
|
|
||||||
return distributedPlan;
|
return distributedPlan;
|
||||||
}
|
}
|
||||||
|
@ -869,10 +872,12 @@ CreateDistributedPlan(uint64 planId, Query *originalQuery, Query *query, ParamLi
|
||||||
* If we get here and there are still CTEs that means that none of the CTEs are
|
* If we get here and there are still CTEs that means that none of the CTEs are
|
||||||
* referenced. We therefore also strip the CTEs from the rewritten query.
|
* referenced. We therefore also strip the CTEs from the rewritten query.
|
||||||
*/
|
*/
|
||||||
query->cteList = NIL;
|
planContext->query->cteList = NIL;
|
||||||
Assert(originalQuery->cteList == NIL);
|
Assert(planContext->originalQuery->cteList == NIL);
|
||||||
|
|
||||||
MultiTreeRoot *logicalPlan = MultiLogicalPlanCreate(originalQuery, query,
|
MultiTreeRoot *logicalPlan = MultiLogicalPlanCreate(planContext->originalQuery,
|
||||||
|
planContext->query,
|
||||||
|
planContext->
|
||||||
plannerRestrictionContext);
|
plannerRestrictionContext);
|
||||||
MultiLogicalPlanOptimize(logicalPlan);
|
MultiLogicalPlanOptimize(logicalPlan);
|
||||||
|
|
||||||
|
@ -887,12 +892,12 @@ CreateDistributedPlan(uint64 planId, Query *originalQuery, Query *query, ParamLi
|
||||||
|
|
||||||
/* Create the physical plan */
|
/* Create the physical plan */
|
||||||
distributedPlan = CreatePhysicalDistributedPlan(logicalPlan,
|
distributedPlan = CreatePhysicalDistributedPlan(logicalPlan,
|
||||||
plannerRestrictionContext);
|
planContext->plannerRestrictionContext);
|
||||||
|
|
||||||
/* distributed plan currently should always succeed or error out */
|
/* distributed plan currently should always succeed or error out */
|
||||||
Assert(distributedPlan && distributedPlan->planningError == NULL);
|
Assert(distributedPlan && distributedPlan->planningError == NULL);
|
||||||
|
|
||||||
FinalizeDistributedPlan(distributedPlan, originalQuery);
|
FinalizeDistributedPlan(distributedPlan, planContext->originalQuery);
|
||||||
|
|
||||||
return distributedPlan;
|
return distributedPlan;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue