mirror of https://github.com/citusdata/citus.git
review
parent
f1181db55c
commit
08ae68e5db
|
|
@ -16,6 +16,7 @@
|
||||||
#include "access/heapam.h"
|
#include "access/heapam.h"
|
||||||
#include "access/htup_details.h"
|
#include "access/htup_details.h"
|
||||||
#include "catalog/pg_constraint.h"
|
#include "catalog/pg_constraint.h"
|
||||||
|
#include "catalog/pg_namespace.h"
|
||||||
#include "catalog/pg_operator.h"
|
#include "catalog/pg_operator.h"
|
||||||
#include "lib/stringinfo.h"
|
#include "lib/stringinfo.h"
|
||||||
#include "nodes/makefuncs.h"
|
#include "nodes/makefuncs.h"
|
||||||
|
|
@ -261,7 +262,8 @@ DefineQualsForShardInterval(RelationShard *relationShard, int attnum, int rtinde
|
||||||
lowerBoundFuncExpr->funcretset = false;
|
lowerBoundFuncExpr->funcretset = false;
|
||||||
|
|
||||||
Oid lessThan = GetSysCacheOid(OPERNAMENSP, Anum_pg_operator_oid, CStringGetDatum("<"),
|
Oid lessThan = GetSysCacheOid(OPERNAMENSP, Anum_pg_operator_oid, CStringGetDatum("<"),
|
||||||
resultTypeOid, resultTypeOid, ObjectIdGetDatum(11));
|
resultTypeOid, resultTypeOid, ObjectIdGetDatum(
|
||||||
|
PG_CATALOG_NAMESPACE));
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Finally, check if the comparison result is less than 0, i.e.,
|
* Finally, check if the comparison result is less than 0, i.e.,
|
||||||
|
|
@ -282,8 +284,8 @@ DefineQualsForShardInterval(RelationShard *relationShard, int attnum, int rtinde
|
||||||
|
|
||||||
Oid lessThanOrEqualTo = GetSysCacheOid(OPERNAMENSP, Anum_pg_operator_oid,
|
Oid lessThanOrEqualTo = GetSysCacheOid(OPERNAMENSP, Anum_pg_operator_oid,
|
||||||
CStringGetDatum("<="),
|
CStringGetDatum("<="),
|
||||||
resultTypeOid, resultTypeOid, ObjectIdGetDatum(
|
resultTypeOid, resultTypeOid,
|
||||||
11));
|
ObjectIdGetDatum(PG_CATALOG_NAMESPACE));
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
|
|
||||||
|
|
@ -49,6 +49,7 @@
|
||||||
#include "postgres.h"
|
#include "postgres.h"
|
||||||
|
|
||||||
#include "funcapi.h"
|
#include "funcapi.h"
|
||||||
|
#include "miscadmin.h"
|
||||||
|
|
||||||
#include "catalog/pg_class.h"
|
#include "catalog/pg_class.h"
|
||||||
#include "catalog/pg_type.h"
|
#include "catalog/pg_type.h"
|
||||||
|
|
@ -110,7 +111,7 @@ struct RecursivePlanningContextInternal
|
||||||
List *subPlanList;
|
List *subPlanList;
|
||||||
PlannerRestrictionContext *plannerRestrictionContext;
|
PlannerRestrictionContext *plannerRestrictionContext;
|
||||||
bool restrictionEquivalenceCheck;
|
bool restrictionEquivalenceCheck;
|
||||||
bool forceRecursivePlanning;
|
bool forceRecursivelyPlanRecurringOuterJoins;
|
||||||
};
|
};
|
||||||
|
|
||||||
/* track depth of current recursive planner query */
|
/* track depth of current recursive planner query */
|
||||||
|
|
@ -201,7 +202,9 @@ static Query * CreateOuterSubquery(RangeTblEntry *rangeTableEntry,
|
||||||
List *outerSubqueryTargetList);
|
List *outerSubqueryTargetList);
|
||||||
static List * GenerateRequiredColNamesFromTargetList(List *targetList);
|
static List * GenerateRequiredColNamesFromTargetList(List *targetList);
|
||||||
static char * GetRelationNameAndAliasName(RangeTblEntry *rangeTablentry);
|
static char * GetRelationNameAndAliasName(RangeTblEntry *rangeTablentry);
|
||||||
static bool JoinTreeContainsLateral(Node *node, List *rtable);
|
static bool CanPushdownRecurringOuterJoinOnOuterRTE(RangeTblEntry *rte);
|
||||||
|
static bool CanPushdownRecurringOuterJoinOnInnerVar(Var *innervar, RangeTblEntry *rte);
|
||||||
|
static bool CanPushdownRecurringOuterJoin(JoinExpr *joinExpr, Query *query);
|
||||||
#if PG_VERSION_NUM < PG_VERSION_17
|
#if PG_VERSION_NUM < PG_VERSION_17
|
||||||
static bool hasPseudoconstantQuals(
|
static bool hasPseudoconstantQuals(
|
||||||
RelationRestrictionContext *relationRestrictionContext);
|
RelationRestrictionContext *relationRestrictionContext);
|
||||||
|
|
@ -231,7 +234,7 @@ GenerateSubplansForSubqueriesAndCTEs(uint64 planId, Query *originalQuery,
|
||||||
context.planId = planId;
|
context.planId = planId;
|
||||||
context.subPlanList = NIL;
|
context.subPlanList = NIL;
|
||||||
context.plannerRestrictionContext = plannerRestrictionContext;
|
context.plannerRestrictionContext = plannerRestrictionContext;
|
||||||
context.forceRecursivePlanning = false;
|
context.forceRecursivelyPlanRecurringOuterJoins = false;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Force recursive planning of recurring outer joins for these queries
|
* Force recursive planning of recurring outer joins for these queries
|
||||||
|
|
@ -240,7 +243,7 @@ GenerateSubplansForSubqueriesAndCTEs(uint64 planId, Query *originalQuery,
|
||||||
*/
|
*/
|
||||||
if (routerPlan == DML_QUERY)
|
if (routerPlan == DML_QUERY)
|
||||||
{
|
{
|
||||||
context.forceRecursivePlanning = true;
|
context.forceRecursivelyPlanRecurringOuterJoins = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
|
@ -770,9 +773,10 @@ RecursivelyPlanRecurringTupleOuterJoinWalker(Node *node, Query *query,
|
||||||
/* <recurring> left join <distributed> */
|
/* <recurring> left join <distributed> */
|
||||||
if (leftNodeRecurs && !rightNodeRecurs)
|
if (leftNodeRecurs && !rightNodeRecurs)
|
||||||
{
|
{
|
||||||
if (recursivePlanningContext->forceRecursivePlanning ||
|
if (recursivePlanningContext->forceRecursivelyPlanRecurringOuterJoins
|
||||||
chainedJoin || !CheckPushDownFeasibilityOuterJoin(joinExpr,
|
||
|
||||||
query))
|
chainedJoin || !CanPushdownRecurringOuterJoin(joinExpr,
|
||||||
|
query))
|
||||||
{
|
{
|
||||||
ereport(DEBUG1, (errmsg("recursively planning right side of "
|
ereport(DEBUG1, (errmsg("recursively planning right side of "
|
||||||
"the left join since the outer side "
|
"the left join since the outer side "
|
||||||
|
|
@ -802,9 +806,10 @@ RecursivelyPlanRecurringTupleOuterJoinWalker(Node *node, Query *query,
|
||||||
/* <distributed> right join <recurring> */
|
/* <distributed> right join <recurring> */
|
||||||
if (!leftNodeRecurs && rightNodeRecurs)
|
if (!leftNodeRecurs && rightNodeRecurs)
|
||||||
{
|
{
|
||||||
if (recursivePlanningContext->forceRecursivePlanning ||
|
if (recursivePlanningContext->forceRecursivelyPlanRecurringOuterJoins
|
||||||
chainedJoin || !CheckPushDownFeasibilityOuterJoin(joinExpr,
|
||
|
||||||
query))
|
chainedJoin || !CanPushdownRecurringOuterJoin(joinExpr,
|
||||||
|
query))
|
||||||
{
|
{
|
||||||
ereport(DEBUG1, (errmsg("recursively planning left side of "
|
ereport(DEBUG1, (errmsg("recursively planning left side of "
|
||||||
"the right join since the outer side "
|
"the right join since the outer side "
|
||||||
|
|
@ -2692,13 +2697,13 @@ hasPseudoconstantQuals(RelationRestrictionContext *relationRestrictionContext)
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* IsPushdownSafeForOuterRTEInOuterJoin returns true if the given range table entry
|
* CanPushdownRecurringOuterJoinOnOuterRTE returns true if the given range table entry
|
||||||
* is safe for pushdown when it is the outer relation of a outer join when the
|
* is safe for pushdown when it is the outer relation of a outer join when the
|
||||||
* inner relation is not recurring.
|
* inner relation is not recurring.
|
||||||
* Currently, we only allow reference tables.
|
* Currently, we only allow reference tables.
|
||||||
*/
|
*/
|
||||||
bool
|
static bool
|
||||||
IsPushdownSafeForOuterRTEInOuterJoin(RangeTblEntry *rte)
|
CanPushdownRecurringOuterJoinOnOuterRTE(RangeTblEntry *rte)
|
||||||
{
|
{
|
||||||
if (IsCitusTable(rte->relid) && IsCitusTableType(rte->relid, REFERENCE_TABLE))
|
if (IsCitusTable(rte->relid) && IsCitusTableType(rte->relid, REFERENCE_TABLE))
|
||||||
{
|
{
|
||||||
|
|
@ -2714,7 +2719,8 @@ IsPushdownSafeForOuterRTEInOuterJoin(RangeTblEntry *rte)
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Recursively resolve a Var from a subquery target list to the base Var and RTE
|
* ResolveBaseVarFromSubquery recursively resolves a Var from a subquery target list to
|
||||||
|
* the base Var and RTE
|
||||||
*/
|
*/
|
||||||
bool
|
bool
|
||||||
ResolveBaseVarFromSubquery(Var *var, Query *query,
|
ResolveBaseVarFromSubquery(Var *var, Query *query,
|
||||||
|
|
@ -2742,6 +2748,9 @@ ResolveBaseVarFromSubquery(Var *var, Query *query,
|
||||||
}
|
}
|
||||||
else if (rte->rtekind == RTE_SUBQUERY)
|
else if (rte->rtekind == RTE_SUBQUERY)
|
||||||
{
|
{
|
||||||
|
/* Prevent overflow, and allow query cancellation */
|
||||||
|
check_stack_depth();
|
||||||
|
CHECK_FOR_INTERRUPTS();
|
||||||
return ResolveBaseVarFromSubquery(tleVar, rte->subquery, baseVar, baseRte);
|
return ResolveBaseVarFromSubquery(tleVar, rte->subquery, baseVar, baseRte);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -2750,12 +2759,12 @@ ResolveBaseVarFromSubquery(Var *var, Query *query,
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* CheckPushDownConditionOnVarsForJoinPushdown checks if the inner variable
|
* CanPushdownRecurringOuterJoinOnInnerVar checks if the inner variable
|
||||||
* from a join qual for a join pushdown. It returns true if it is valid,
|
* from a join qual for a join pushdown. It returns true if it is valid,
|
||||||
* it is the partition column and hash distributed, otherwise it returns false.
|
* it is the partition column and hash distributed, otherwise it returns false.
|
||||||
*/
|
*/
|
||||||
bool
|
static bool
|
||||||
CheckPushDownConditionOnInnerVar(Var *innerVar, RangeTblEntry *rte)
|
CanPushdownRecurringOuterJoinOnInnerVar(Var *innerVar, RangeTblEntry *rte)
|
||||||
{
|
{
|
||||||
if (!innerVar || !rte)
|
if (!innerVar || !rte)
|
||||||
{
|
{
|
||||||
|
|
@ -2800,6 +2809,10 @@ JoinTreeContainsLateral(Node *node, List *rtable)
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* Prevent overflow, and allow query cancellation */
|
||||||
|
check_stack_depth();
|
||||||
|
CHECK_FOR_INTERRUPTS();
|
||||||
|
|
||||||
if (IsA(node, RangeTblRef))
|
if (IsA(node, RangeTblRef))
|
||||||
{
|
{
|
||||||
RangeTblEntry *rte = rt_fetch(((RangeTblRef *) node)->rtindex, rtable);
|
RangeTblEntry *rte = rt_fetch(((RangeTblRef *) node)->rtindex, rtable);
|
||||||
|
|
@ -2847,7 +2860,8 @@ JoinTreeContainsLateral(Node *node, List *rtable)
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* CheckPushDownFeasibilityAndComputeIndexes checks if the given join expression
|
* CheckPushDownFeasibilityAndComputeIndexes checks if the given join expression
|
||||||
* is a left outer join and if it is feasible to push down the join. If feasible,
|
* is an outer join between recurring rel -on outer part- and a distributed
|
||||||
|
* rel -on the inner side- and if it is feasible to push down the join. If feasible,
|
||||||
* it computes the outer relation's range table index, the outer relation's
|
* it computes the outer relation's range table index, the outer relation's
|
||||||
* range table entry, the inner (distributed) relation's range table entry, and the
|
* range table entry, the inner (distributed) relation's range table entry, and the
|
||||||
* attribute number of the partition column in the outer relation.
|
* attribute number of the partition column in the outer relation.
|
||||||
|
|
@ -2880,6 +2894,7 @@ CheckPushDownFeasibilityAndComputeIndexes(JoinExpr *joinExpr, Query *query,
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* Push down for joins with fromExpr on one side is not supported in this path. */
|
||||||
if (!IsA(joinExpr->larg, RangeTblRef) || !IsA(joinExpr->rarg, RangeTblRef))
|
if (!IsA(joinExpr->larg, RangeTblRef) || !IsA(joinExpr->rarg, RangeTblRef))
|
||||||
{
|
{
|
||||||
ereport(DEBUG5, (errmsg(
|
ereport(DEBUG5, (errmsg(
|
||||||
|
|
@ -2898,7 +2913,7 @@ CheckPushDownFeasibilityAndComputeIndexes(JoinExpr *joinExpr, Query *query,
|
||||||
|
|
||||||
*outerRte = rt_fetch(*outerRtIndex, query->rtable);
|
*outerRte = rt_fetch(*outerRtIndex, query->rtable);
|
||||||
|
|
||||||
if (!IsPushdownSafeForOuterRTEInOuterJoin(*outerRte))
|
if (!CanPushdownRecurringOuterJoinOnOuterRTE(*outerRte))
|
||||||
{
|
{
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
@ -2966,7 +2981,7 @@ CheckPushDownFeasibilityAndComputeIndexes(JoinExpr *joinExpr, Query *query,
|
||||||
/* the simple case, the inner table itself a Citus table */
|
/* the simple case, the inner table itself a Citus table */
|
||||||
if (rte && IsCitusTable(rte->relid))
|
if (rte && IsCitusTable(rte->relid))
|
||||||
{
|
{
|
||||||
if (CheckPushDownConditionOnInnerVar(innerVar, rte))
|
if (CanPushdownRecurringOuterJoinOnInnerVar(innerVar, rte))
|
||||||
{
|
{
|
||||||
*distRte = rte;
|
*distRte = rte;
|
||||||
return true;
|
return true;
|
||||||
|
|
@ -2982,7 +2997,7 @@ CheckPushDownFeasibilityAndComputeIndexes(JoinExpr *joinExpr, Query *query,
|
||||||
{
|
{
|
||||||
if (baseRte && IsCitusTable(baseRte->relid))
|
if (baseRte && IsCitusTable(baseRte->relid))
|
||||||
{
|
{
|
||||||
if (CheckPushDownConditionOnInnerVar(baseVar, baseRte))
|
if (CanPushdownRecurringOuterJoinOnInnerVar(baseVar, baseRte))
|
||||||
{
|
{
|
||||||
*distRte = baseRte;
|
*distRte = baseRte;
|
||||||
return true;
|
return true;
|
||||||
|
|
@ -2997,11 +3012,12 @@ CheckPushDownFeasibilityAndComputeIndexes(JoinExpr *joinExpr, Query *query,
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Initializes input variables to call CheckPushDownFeasibilityAndComputeIndexes.
|
* CanPushdownRecurringOuterJoin initializes input variables to call
|
||||||
|
* CheckPushDownFeasibilityAndComputeIndexes.
|
||||||
* See CheckPushDownFeasibilityAndComputeIndexes for more details.
|
* See CheckPushDownFeasibilityAndComputeIndexes for more details.
|
||||||
*/
|
*/
|
||||||
bool
|
bool
|
||||||
CheckPushDownFeasibilityOuterJoin(JoinExpr *joinExpr, Query *query)
|
CanPushdownRecurringOuterJoin(JoinExpr *joinExpr, Query *query)
|
||||||
{
|
{
|
||||||
int outerRtIndex;
|
int outerRtIndex;
|
||||||
RangeTblEntry *outerRte = NULL;
|
RangeTblEntry *outerRte = NULL;
|
||||||
|
|
|
||||||
|
|
@ -54,14 +54,11 @@ extern bool IsRecursivelyPlannableRelation(RangeTblEntry *rangeTableEntry);
|
||||||
extern bool IsRelationLocalTableOrMatView(Oid relationId);
|
extern bool IsRelationLocalTableOrMatView(Oid relationId);
|
||||||
extern bool ContainsReferencesToOuterQuery(Query *query);
|
extern bool ContainsReferencesToOuterQuery(Query *query);
|
||||||
extern void UpdateVarNosInNode(Node *node, Index newVarNo);
|
extern void UpdateVarNosInNode(Node *node, Index newVarNo);
|
||||||
extern bool IsPushdownSafeForOuterRTEInOuterJoin(RangeTblEntry *rte);
|
|
||||||
extern bool CheckPushDownFeasibilityAndComputeIndexes(JoinExpr *joinExpr, Query *query,
|
extern bool CheckPushDownFeasibilityAndComputeIndexes(JoinExpr *joinExpr, Query *query,
|
||||||
int *outerRtIndex,
|
int *outerRtIndex,
|
||||||
RangeTblEntry **outerRte,
|
RangeTblEntry **outerRte,
|
||||||
RangeTblEntry **distRte,
|
RangeTblEntry **distRte,
|
||||||
int *attnum);
|
int *attnum);
|
||||||
extern bool CheckPushDownFeasibilityOuterJoin(JoinExpr *joinExpr, Query *query);
|
|
||||||
bool ResolveBaseVarFromSubquery(Var *var, Query *query, Var **baseVar,
|
bool ResolveBaseVarFromSubquery(Var *var, Query *query, Var **baseVar,
|
||||||
RangeTblEntry **baseRte);
|
RangeTblEntry **baseRte);
|
||||||
bool CheckPushDownConditionOnInnerVar(Var *innervar, RangeTblEntry *rte);
|
|
||||||
#endif /* RECURSIVE_PLANNING_H */
|
#endif /* RECURSIVE_PLANNING_H */
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue