mirror of https://github.com/citusdata/citus.git
Move recursive planning related function to recursive_planning
parent
2a44029aaf
commit
a34504d7bf
|
@ -75,10 +75,6 @@ typedef struct ConversionCandidates
|
||||||
List *localTableList; /* local or citus local table */
|
List *localTableList; /* local or citus local table */
|
||||||
}ConversionCandidates;
|
}ConversionCandidates;
|
||||||
|
|
||||||
static bool ShouldConvertLocalTableJoinsToSubqueries(Query *query, List *rangeTableList,
|
|
||||||
Oid resultRelationId,
|
|
||||||
PlannerRestrictionContext *
|
|
||||||
plannerRestrictionContext);
|
|
||||||
static bool HasConstantFilterOnUniqueColumn(FromExpr *joinTree,
|
static bool HasConstantFilterOnUniqueColumn(FromExpr *joinTree,
|
||||||
RangeTblEntry *rangeTableEntry, Index
|
RangeTblEntry *rangeTableEntry, Index
|
||||||
rteIndex);
|
rteIndex);
|
||||||
|
@ -96,8 +92,6 @@ static RangeTableEntryDetails * GetNextRTEToConvertToSubquery(FromExpr *joinTree
|
||||||
conversionCandidates,
|
conversionCandidates,
|
||||||
PlannerRestrictionContext *
|
PlannerRestrictionContext *
|
||||||
plannerRestrictionContext);
|
plannerRestrictionContext);
|
||||||
static void GetRangeTableEntriesFromJoinTree(Node *joinNode, List *rangeTableList,
|
|
||||||
List **joinRangeTableEntries);
|
|
||||||
static void RemoveFromConversionCandidates(ConversionCandidates *conversionCandidates, Oid
|
static void RemoveFromConversionCandidates(ConversionCandidates *conversionCandidates, Oid
|
||||||
relationId);
|
relationId);
|
||||||
static bool AllRangeTableEntriesHaveUniqueIndex(List *rangeTableEntryDetailsList);
|
static bool AllRangeTableEntriesHaveUniqueIndex(List *rangeTableEntryDetailsList);
|
||||||
|
@ -109,32 +103,22 @@ static bool AllRangeTableEntriesHaveUniqueIndex(List *rangeTableEntryDetailsList
|
||||||
*/
|
*/
|
||||||
void
|
void
|
||||||
RecursivelyPlanLocalTableJoins(Query *query,
|
RecursivelyPlanLocalTableJoins(Query *query,
|
||||||
RecursivePlanningContext *context)
|
RecursivePlanningContext *context, List* rangeTableList)
|
||||||
{
|
{
|
||||||
|
|
||||||
PlannerRestrictionContext *plannerRestrictionContext =
|
PlannerRestrictionContext *plannerRestrictionContext =
|
||||||
context->plannerRestrictionContext;
|
context->plannerRestrictionContext;
|
||||||
|
|
||||||
List *rangeTableList = NIL;
|
|
||||||
GetRangeTableEntriesFromJoinTree((Node *) query->jointree, query->rtable,
|
|
||||||
&rangeTableList);
|
|
||||||
|
|
||||||
Oid resultRelationId = InvalidOid;
|
Oid resultRelationId = InvalidOid;
|
||||||
if (IsModifyCommand(query))
|
if (IsModifyCommand(query))
|
||||||
{
|
{
|
||||||
resultRelationId = ModifyQueryResultRelationId(query);
|
resultRelationId = ModifyQueryResultRelationId(query);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!ShouldConvertLocalTableJoinsToSubqueries(query, rangeTableList, resultRelationId,
|
|
||||||
plannerRestrictionContext))
|
|
||||||
{
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
ConversionCandidates *conversionCandidates =
|
ConversionCandidates *conversionCandidates =
|
||||||
CreateConversionCandidates(query->jointree, plannerRestrictionContext,
|
CreateConversionCandidates(query->jointree, plannerRestrictionContext,
|
||||||
rangeTableList, resultRelationId);
|
rangeTableList, resultRelationId);
|
||||||
|
|
||||||
while (ShouldConvertLocalTableJoinsToSubqueries(query, rangeTableList,
|
while (ShouldConvertLocalTableJoinsToSubqueries(query, rangeTableList,
|
||||||
resultRelationId,
|
|
||||||
plannerRestrictionContext))
|
plannerRestrictionContext))
|
||||||
{
|
{
|
||||||
FromExpr *joinTree = query->jointree;
|
FromExpr *joinTree = query->jointree;
|
||||||
|
@ -156,51 +140,6 @@ RecursivelyPlanLocalTableJoins(Query *query,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* GetRangeTableEntriesFromJoinTree gets the range table entries that are
|
|
||||||
* on the given join tree.
|
|
||||||
*/
|
|
||||||
static void
|
|
||||||
GetRangeTableEntriesFromJoinTree(Node *joinNode, List *rangeTableList,
|
|
||||||
List **joinRangeTableEntries)
|
|
||||||
{
|
|
||||||
if (joinNode == NULL)
|
|
||||||
{
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
else if (IsA(joinNode, FromExpr))
|
|
||||||
{
|
|
||||||
FromExpr *fromExpr = (FromExpr *) joinNode;
|
|
||||||
Node *fromElement;
|
|
||||||
|
|
||||||
foreach_ptr(fromElement, fromExpr->fromlist)
|
|
||||||
{
|
|
||||||
GetRangeTableEntriesFromJoinTree(fromElement, rangeTableList,
|
|
||||||
joinRangeTableEntries);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
else if (IsA(joinNode, JoinExpr))
|
|
||||||
{
|
|
||||||
JoinExpr *joinExpr = (JoinExpr *) joinNode;
|
|
||||||
GetRangeTableEntriesFromJoinTree(joinExpr->larg, rangeTableList,
|
|
||||||
joinRangeTableEntries);
|
|
||||||
GetRangeTableEntriesFromJoinTree(joinExpr->rarg, rangeTableList,
|
|
||||||
joinRangeTableEntries);
|
|
||||||
}
|
|
||||||
else if (IsA(joinNode, RangeTblRef))
|
|
||||||
{
|
|
||||||
int rangeTableIndex = ((RangeTblRef *) joinNode)->rtindex;
|
|
||||||
RangeTblEntry *rte = rt_fetch(rangeTableIndex, rangeTableList);
|
|
||||||
*joinRangeTableEntries = lappend(*joinRangeTableEntries, rte);
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
pg_unreachable();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* GetNextRTEToConvertToSubquery returns the range table entry
|
* GetNextRTEToConvertToSubquery returns the range table entry
|
||||||
* which should be converted to a subquery. It considers the local join policy
|
* which should be converted to a subquery. It considers the local join policy
|
||||||
|
@ -291,9 +230,8 @@ RemoveFromConversionCandidates(ConversionCandidates *conversionCandidates, Oid r
|
||||||
* ShouldConvertLocalTableJoinsToSubqueries returns true if we should
|
* ShouldConvertLocalTableJoinsToSubqueries returns true if we should
|
||||||
* convert local-dist table joins to subqueries.
|
* convert local-dist table joins to subqueries.
|
||||||
*/
|
*/
|
||||||
static bool
|
bool
|
||||||
ShouldConvertLocalTableJoinsToSubqueries(Query *query, List *rangeTableList,
|
ShouldConvertLocalTableJoinsToSubqueries(Query *query, List *rangeTableList,
|
||||||
Oid resultRelationId,
|
|
||||||
PlannerRestrictionContext *
|
PlannerRestrictionContext *
|
||||||
plannerRestrictionContext)
|
plannerRestrictionContext)
|
||||||
{
|
{
|
||||||
|
@ -303,7 +241,7 @@ ShouldConvertLocalTableJoinsToSubqueries(Query *query, List *rangeTableList,
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!ContainsTableToBeConvertedToSubquery(rangeTableList, resultRelationId))
|
if (!ContainsTableToBeConvertedToSubquery(rangeTableList))
|
||||||
{
|
{
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
|
@ -178,9 +178,11 @@ static Query * BuildReadIntermediateResultsQuery(List *targetEntryList,
|
||||||
List *columnAliasList,
|
List *columnAliasList,
|
||||||
Const *resultIdConst, Oid functionOid,
|
Const *resultIdConst, Oid functionOid,
|
||||||
bool useBinaryCopyFormat);
|
bool useBinaryCopyFormat);
|
||||||
static void UpdateVarNosInQualForSubquery(Query *query);
|
static void
|
||||||
static bool ModifiesLocalTableWithRemoteCitusLocalTable(List *rangeTableList, Oid
|
UpdateVarNosInNode(Query *query, Index newVarNo);
|
||||||
resultRelationId);
|
static bool ModifiesLocalTableWithRemoteCitusLocalTable(List *rangeTableList);
|
||||||
|
static void GetRangeTableEntriesFromJoinTree(Node *joinNode, List *rangeTableList,
|
||||||
|
List **joinRangeTableEntries);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* GenerateSubplansForSubqueriesAndCTEs is a wrapper around RecursivelyPlanSubqueriesAndCTEs.
|
* GenerateSubplansForSubqueriesAndCTEs is a wrapper around RecursivelyPlanSubqueriesAndCTEs.
|
||||||
|
@ -336,6 +338,16 @@ RecursivelyPlanSubqueriesAndCTEs(Query *query, RecursivePlanningContext *context
|
||||||
RecursivelyPlanNonColocatedSubqueries(query, context);
|
RecursivelyPlanNonColocatedSubqueries(query, context);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
PlannerRestrictionContext *plannerRestrictionContext =
|
||||||
|
context->plannerRestrictionContext;
|
||||||
|
|
||||||
|
List *rangeTableList = NIL;
|
||||||
|
GetRangeTableEntriesFromJoinTree((Node *) query->jointree, query->rtable,
|
||||||
|
&rangeTableList);
|
||||||
|
|
||||||
|
if (ShouldConvertLocalTableJoinsToSubqueries(query, rangeTableList,
|
||||||
|
plannerRestrictionContext)) {
|
||||||
/*
|
/*
|
||||||
* Logical planner cannot handle "local_table" [OUTER] JOIN "dist_table", or
|
* Logical planner cannot handle "local_table" [OUTER] JOIN "dist_table", or
|
||||||
* a query with local table/citus local table and subquery. We convert local/citus local
|
* a query with local table/citus local table and subquery. We convert local/citus local
|
||||||
|
@ -343,11 +355,58 @@ RecursivelyPlanSubqueriesAndCTEs(Query *query, RecursivePlanningContext *context
|
||||||
* This is the last call in this function since we want the other calls to be finished
|
* This is the last call in this function since we want the other calls to be finished
|
||||||
* so that we can check if the current plan is router plannable at any step within this function.
|
* so that we can check if the current plan is router plannable at any step within this function.
|
||||||
*/
|
*/
|
||||||
RecursivelyPlanLocalTableJoins(query, context);
|
RecursivelyPlanLocalTableJoins(query, context, rangeTableList);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* GetRangeTableEntriesFromJoinTree gets the range table entries that are
|
||||||
|
* on the given join tree.
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
GetRangeTableEntriesFromJoinTree(Node *joinNode, List *rangeTableList,
|
||||||
|
List **joinRangeTableEntries)
|
||||||
|
{
|
||||||
|
if (joinNode == NULL)
|
||||||
|
{
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
else if (IsA(joinNode, FromExpr))
|
||||||
|
{
|
||||||
|
FromExpr *fromExpr = (FromExpr *) joinNode;
|
||||||
|
Node *fromElement;
|
||||||
|
|
||||||
|
foreach_ptr(fromElement, fromExpr->fromlist)
|
||||||
|
{
|
||||||
|
GetRangeTableEntriesFromJoinTree(fromElement, rangeTableList,
|
||||||
|
joinRangeTableEntries);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else if (IsA(joinNode, JoinExpr))
|
||||||
|
{
|
||||||
|
JoinExpr *joinExpr = (JoinExpr *) joinNode;
|
||||||
|
GetRangeTableEntriesFromJoinTree(joinExpr->larg, rangeTableList,
|
||||||
|
joinRangeTableEntries);
|
||||||
|
GetRangeTableEntriesFromJoinTree(joinExpr->rarg, rangeTableList,
|
||||||
|
joinRangeTableEntries);
|
||||||
|
}
|
||||||
|
else if (IsA(joinNode, RangeTblRef))
|
||||||
|
{
|
||||||
|
int rangeTableIndex = ((RangeTblRef *) joinNode)->rtindex;
|
||||||
|
RangeTblEntry *rte = rt_fetch(rangeTableIndex, rangeTableList);
|
||||||
|
*joinRangeTableEntries = lappend(*joinRangeTableEntries, rte);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
pg_unreachable();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* ShouldRecursivelyPlanNonColocatedSubqueries returns true if the input query contains joins
|
* ShouldRecursivelyPlanNonColocatedSubqueries returns true if the input query contains joins
|
||||||
|
@ -1360,6 +1419,8 @@ NodeContainsSubqueryReferencingOuterQuery(Node *node)
|
||||||
/*
|
/*
|
||||||
* ReplaceRTERelationWithRteSubquery replaces the input rte relation target entry
|
* ReplaceRTERelationWithRteSubquery replaces the input rte relation target entry
|
||||||
* with a subquery. The function also pushes down the filters to the subquery.
|
* with a subquery. The function also pushes down the filters to the subquery.
|
||||||
|
*
|
||||||
|
* It then recursively plans the subquery.
|
||||||
*/
|
*/
|
||||||
void
|
void
|
||||||
ReplaceRTERelationWithRteSubquery(RangeTblEntry *rangeTableEntry, List *restrictionList,
|
ReplaceRTERelationWithRteSubquery(RangeTblEntry *rangeTableEntry, List *restrictionList,
|
||||||
|
@ -1369,10 +1430,7 @@ ReplaceRTERelationWithRteSubquery(RangeTblEntry *rangeTableEntry, List *restrict
|
||||||
Query *subquery = WrapRteRelationIntoSubquery(rangeTableEntry, requiredAttrNumbers);
|
Query *subquery = WrapRteRelationIntoSubquery(rangeTableEntry, requiredAttrNumbers);
|
||||||
Expr *andedBoundExpressions = make_ands_explicit(restrictionList);
|
Expr *andedBoundExpressions = make_ands_explicit(restrictionList);
|
||||||
subquery->jointree->quals = (Node *) andedBoundExpressions;
|
subquery->jointree->quals = (Node *) andedBoundExpressions;
|
||||||
UpdateVarNosInQualForSubquery(subquery);
|
UpdateVarNosInNode(subquery, SINGLE_RTE_INDEX);
|
||||||
|
|
||||||
/* force recursively planning of the newly created subquery */
|
|
||||||
subquery->limitOffset = (Node *) MakeIntegerConst(0);
|
|
||||||
|
|
||||||
/* replace the function with the constructed subquery */
|
/* replace the function with the constructed subquery */
|
||||||
rangeTableEntry->rtekind = RTE_SUBQUERY;
|
rangeTableEntry->rtekind = RTE_SUBQUERY;
|
||||||
|
@ -1396,24 +1454,25 @@ ReplaceRTERelationWithRteSubquery(RangeTblEntry *rangeTableEntry, List *restrict
|
||||||
get_rel_name(rangeTableEntry->relid),
|
get_rel_name(rangeTableEntry->relid),
|
||||||
ApplyLogRedaction(subqueryString->data))));
|
ApplyLogRedaction(subqueryString->data))));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* as we created the subquery, now forcefully recursively plan it */
|
||||||
RecursivelyPlanSubquery(rangeTableEntry->subquery, context);
|
RecursivelyPlanSubquery(rangeTableEntry->subquery, context);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* UpdateVarNosInQualForSubquery iterates the Vars in the
|
* UpdateVarNosInNode iterates the Vars in the
|
||||||
* given quals node and updates the varno's as 1 as there
|
* given node and updates the varno's as the newVarNo.
|
||||||
* will be only one RTE in rtable, which is the subquery.
|
|
||||||
*/
|
*/
|
||||||
static void
|
static void
|
||||||
UpdateVarNosInQualForSubquery(Query *query)
|
UpdateVarNosInNode(Query *query, Index newVarNo)
|
||||||
{
|
{
|
||||||
List *varList = pull_var_clause(query->jointree->quals, PVC_RECURSE_AGGREGATES |
|
List *varList = pull_var_clause(query->jointree->quals, PVC_RECURSE_AGGREGATES |
|
||||||
PVC_RECURSE_PLACEHOLDERS);
|
PVC_RECURSE_PLACEHOLDERS);
|
||||||
Var *var = NULL;
|
Var *var = NULL;
|
||||||
foreach_ptr(var, varList)
|
foreach_ptr(var, varList)
|
||||||
{
|
{
|
||||||
var->varno = SINGLE_RTE_INDEX;
|
var->varno = newVarNo;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1423,13 +1482,13 @@ UpdateVarNosInQualForSubquery(Query *query)
|
||||||
* any table that should be converted to a subquery, which otherwise is not plannable.
|
* any table that should be converted to a subquery, which otherwise is not plannable.
|
||||||
*/
|
*/
|
||||||
bool
|
bool
|
||||||
ContainsTableToBeConvertedToSubquery(List *rangeTableList, Oid resultRelationId)
|
ContainsTableToBeConvertedToSubquery(List *rangeTableList)
|
||||||
{
|
{
|
||||||
if (ContainsLocalTableDistributedTableJoin(rangeTableList))
|
if (ContainsLocalTableDistributedTableJoin(rangeTableList))
|
||||||
{
|
{
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
if (ModifiesLocalTableWithRemoteCitusLocalTable(rangeTableList, resultRelationId))
|
if (ModifiesLocalTableWithRemoteCitusLocalTable(rangeTableList))
|
||||||
{
|
{
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
@ -1443,7 +1502,7 @@ ContainsTableToBeConvertedToSubquery(List *rangeTableList, Oid resultRelationId)
|
||||||
* MX structure.
|
* MX structure.
|
||||||
*/
|
*/
|
||||||
static bool
|
static bool
|
||||||
ModifiesLocalTableWithRemoteCitusLocalTable(List *rangeTableList, Oid resultRelationId)
|
ModifiesLocalTableWithRemoteCitusLocalTable(List *rangeTableList)
|
||||||
{
|
{
|
||||||
bool containsLocalResultRelation = false;
|
bool containsLocalResultRelation = false;
|
||||||
bool containsRemoteCitusLocalTable = false;
|
bool containsRemoteCitusLocalTable = false;
|
||||||
|
|
|
@ -27,7 +27,11 @@ typedef enum
|
||||||
|
|
||||||
extern int LocalTableJoinPolicy;
|
extern int LocalTableJoinPolicy;
|
||||||
|
|
||||||
|
extern bool
|
||||||
|
ShouldConvertLocalTableJoinsToSubqueries(Query *query,
|
||||||
|
List *rangeTableList,
|
||||||
|
PlannerRestrictionContext *plannerRestrictionContext);
|
||||||
extern void RecursivelyPlanLocalTableJoins(Query *query,
|
extern void RecursivelyPlanLocalTableJoins(Query *query,
|
||||||
RecursivePlanningContext *context);
|
RecursivePlanningContext *context, List *rangeTableList);
|
||||||
|
|
||||||
#endif /* LOCAL_DISTRIBUTED_JOIN_PLANNER_H */
|
#endif /* LOCAL_DISTRIBUTED_JOIN_PLANNER_H */
|
||||||
|
|
|
@ -53,8 +53,7 @@ extern void ReplaceRTERelationWithRteSubquery(RangeTblEntry *rangeTableEntry,
|
||||||
List *restrictionList,
|
List *restrictionList,
|
||||||
List *requiredAttrNumbers,
|
List *requiredAttrNumbers,
|
||||||
RecursivePlanningContext *context);
|
RecursivePlanningContext *context);
|
||||||
extern bool ContainsTableToBeConvertedToSubquery(List *rangeTableList, Oid
|
extern bool ContainsTableToBeConvertedToSubquery(List *rangeTableList);
|
||||||
resultRelationId);
|
|
||||||
extern bool IsRecursivelyPlannableRelation(RangeTblEntry *rangeTableEntry);
|
extern bool IsRecursivelyPlannableRelation(RangeTblEntry *rangeTableEntry);
|
||||||
extern bool IsRelationLocalTableOrMatView(Oid relationId);
|
extern bool IsRelationLocalTableOrMatView(Oid relationId);
|
||||||
#endif /* RECURSIVE_PLANNING_H */
|
#endif /* RECURSIVE_PLANNING_H */
|
||||||
|
|
Loading…
Reference in New Issue