Use quals for both on and using syntax.

onur-leftjoin_push-improvements
eaydingol 2025-07-29 11:14:12 +03:00
parent 96a7d70fa9
commit 9f72067c12
2 changed files with 93 additions and 192 deletions

View File

@ -156,7 +156,8 @@ static void RecursivelyPlanNonColocatedSubqueriesInWhere(Query *query,
RecursivePlanningContext * RecursivePlanningContext *
recursivePlanningContext); recursivePlanningContext);
static bool RecursivelyPlanRecurringTupleOuterJoinWalker(Node *node, Query *query, static bool RecursivelyPlanRecurringTupleOuterJoinWalker(Node *node, Query *query,
RecursivePlanningContext *context); RecursivePlanningContext *context,
bool chainedJoin);
static void RecursivelyPlanDistributedJoinNode(Node *node, Query *query, static void RecursivelyPlanDistributedJoinNode(Node *node, Query *query,
RecursivePlanningContext *context); RecursivePlanningContext *context);
static bool IsRTERefRecurring(RangeTblRef *rangeTableRef, Query *query); static bool IsRTERefRecurring(RangeTblRef *rangeTableRef, Query *query);
@ -367,7 +368,7 @@ RecursivelyPlanSubqueriesAndCTEs(Query *query, RecursivePlanningContext *context
if (ShouldRecursivelyPlanOuterJoins(query, context)) if (ShouldRecursivelyPlanOuterJoins(query, context))
{ {
RecursivelyPlanRecurringTupleOuterJoinWalker((Node *) query->jointree, RecursivelyPlanRecurringTupleOuterJoinWalker((Node *) query->jointree,
query, context); query, context, false);
} }
/* /*
@ -695,7 +696,8 @@ RecursivelyPlanNonColocatedSubqueriesInWhere(Query *query,
static bool static bool
RecursivelyPlanRecurringTupleOuterJoinWalker(Node *node, Query *query, RecursivelyPlanRecurringTupleOuterJoinWalker(Node *node, Query *query,
RecursivePlanningContext * RecursivePlanningContext *
recursivePlanningContext) recursivePlanningContext,
bool chainedJoin)
{ {
if (node == NULL) if (node == NULL)
{ {
@ -712,7 +714,8 @@ RecursivelyPlanRecurringTupleOuterJoinWalker(Node *node, Query *query,
Node *fromElement = (Node *) lfirst(fromExprCell); Node *fromElement = (Node *) lfirst(fromExprCell);
RecursivelyPlanRecurringTupleOuterJoinWalker(fromElement, query, RecursivelyPlanRecurringTupleOuterJoinWalker(fromElement, query,
recursivePlanningContext); recursivePlanningContext,
false);
} }
/* /*
@ -738,10 +741,12 @@ RecursivelyPlanRecurringTupleOuterJoinWalker(Node *node, Query *query,
*/ */
bool leftNodeRecurs = bool leftNodeRecurs =
RecursivelyPlanRecurringTupleOuterJoinWalker(leftNode, query, RecursivelyPlanRecurringTupleOuterJoinWalker(leftNode, query,
recursivePlanningContext); recursivePlanningContext,
true);
bool rightNodeRecurs = bool rightNodeRecurs =
RecursivelyPlanRecurringTupleOuterJoinWalker(rightNode, query, RecursivelyPlanRecurringTupleOuterJoinWalker(rightNode, query,
recursivePlanningContext); recursivePlanningContext,
true);
switch (joinExpr->jointype) switch (joinExpr->jointype)
{ {
case JOIN_LEFT: case JOIN_LEFT:
@ -750,7 +755,7 @@ RecursivelyPlanRecurringTupleOuterJoinWalker(Node *node, Query *query,
if (leftNodeRecurs && !rightNodeRecurs) if (leftNodeRecurs && !rightNodeRecurs)
{ {
if(!CheckPushDownFeasibilityLeftJoin(joinExpr, query)) if(chainedJoin || !CheckPushDownFeasibilityLeftJoin(joinExpr, query))
{ {
ereport(DEBUG1, (errmsg("recursively planning right side of " ereport(DEBUG1, (errmsg("recursively planning right side of "
"the left join since the outer side " "the left join since the outer side "
@ -761,6 +766,7 @@ RecursivelyPlanRecurringTupleOuterJoinWalker(Node *node, Query *query,
else else
{ {
ereport(DEBUG3, (errmsg("a push down safe left join with recurring left side"))); ereport(DEBUG3, (errmsg("a push down safe left join with recurring left side")));
leftNodeRecurs = false; /* left node will be pushed down */
} }
} }
@ -2675,83 +2681,6 @@ bool IsPushdownSafeForRTEInLeftJoin(RangeTblEntry *rte)
} }
/*
* Check if the given rte is a citus table whose distribution column appears in
* the using clause and if it is hash distributed. If so, return true and set
* the partitionColumnVar to the Var representing the distribution column. If not, it return false.
*/
bool CheckUsingClauseForRte(RangeTblEntry *rte, List *usingClause, Var **partitionColumnVar)
{
if (rte->rtekind != RTE_RELATION && rte->rtekind != RTE_FUNCTION)
{
return false;
}
if (usingClause == NULL)
{
return false;
}
if(!IsCitusTable(rte->relid))
{
return false;
}
/* Get the partition column of the Citus table */
CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(rte->relid);
*partitionColumnVar = cacheEntry->partitionColumn;
char *partitionColumnName = get_attname(rte->relid, (*partitionColumnVar)->varattno, false); // WHERE I GET ERROR FOR FROM r1 LEFT JOIN (SELECT * FROM d1) USING (a);
/* Check if the partition column is in the using clause */
ListCell *lc;
foreach(lc, usingClause)
{
char *colname = strVal(lfirst(lc));
if (strcmp(colname, partitionColumnName) == 0)
{
return true;
}
}
return false;
}
/*
* FindPartitionColumnInSubquery iteratively searches for a partition column
* in the subquery's range table entries. If it finds a match, it sets the
* partitionColumnVar to the Var representing the partition column and
* rte to the RangeTblEntry containing the partition column. It returns true
* if it finds the partition column, otherwise it returns false.
*/
bool FindPartitionColumnInSubquery(Query *query, List *usingClause, Var **partitionColumnVar, RangeTblEntry **rte)
{
if (query == NULL || query->rtable == NULL || usingClause == NULL)
{
return false;
}
ListCell *lc;
foreach(lc, query->rtable)
{
RangeTblEntry *rteTmp = (RangeTblEntry *) lfirst(lc);
/* cases for RTE_RELATION and RTE_FUNCTION */
if(CheckUsingClauseForRte(rteTmp, usingClause, partitionColumnVar))
{
*rte = rteTmp;
return true;
}
if (rteTmp->rtekind == RTE_SUBQUERY )
{
if (FindPartitionColumnInSubquery(rteTmp->subquery, usingClause, partitionColumnVar, rte))
{
return true;
}
}
}
return false;
}
/* /*
* Recursively resolve a Var from a subquery target list to the base Var and RTE * Recursively resolve a Var from a subquery target list to the base Var and RTE
*/ */
@ -2786,24 +2715,36 @@ bool ResolveBaseVarFromSubquery(Var *var, Query *query,
/* /*
* CheckAttrNumAndDistributionTypeForJoinPushdown checks if the given attribute * CheckPushDownConditionOnVarsForJoinPushdown checks if the inner variable
* number is valid and if the Citus table is hash distributed. If both conditions * from a join qual for a join pushdown. It returns true if it is valid,
* are met, it returns true, otherwise it returns false. * it is the partition column and hash distributed, otherwise it returns false.
*/ */
bool CheckAttrNumAndDistributionTypeForJoinPushdown(int attnum, RangeTblEntry *rte) bool CheckPushDownConditionOnInnerVar(Var* innerVar, RangeTblEntry *rte)
{ {
if (attnum == InvalidAttrNumber) if (!innerVar || !rte)
{
return false;
}
if(innerVar->varattno == InvalidAttrNumber)
{ {
return false; return false;
} }
CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(rte->relid); CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(rte->relid);
if(GetCitusTableType(cacheEntry) != HASH_DISTRIBUTED)
if(!cacheEntry || GetCitusTableType(cacheEntry) != HASH_DISTRIBUTED)
{ {
return false; return false;
} }
/* Check if the inner variable is part of the distribution column */
if (cacheEntry->partitionColumn && innerVar->varattno == cacheEntry->partitionColumn->varattno)
{
return true; return true;
}
return false;
} }
@ -2836,54 +2777,14 @@ bool CheckPushDownFeasibilityAndComputeIndexes(JoinExpr *joinExpr, Query *query,
return false; return false;
} }
/* If the right side is itself a join, we currently do not support pushdown for such cases */ /* Push down for chained joins is not supported in this path. */
if (IsA(joinExpr->rarg, JoinExpr)) if (IsA(joinExpr->rarg, JoinExpr) || IsA(joinExpr->larg, JoinExpr))
{ {
ereport(DEBUG5, (errmsg("CheckPushDownFeasibilityAndComputeIndexes: right side is a join expression, pushdown is not supported"))); ereport(DEBUG5, (errmsg("One side is a join expression, pushdown is not supported in this path.")));
return false; return false;
} }
RangeTblRef *rightTableRef = (RangeTblRef *) joinExpr->rarg;
RangeTblEntry *rightTableEntry = rt_fetch(rightTableRef->rtindex, query->rtable);
/* Check if the join is performed on the distribution column */ /* Check if the join is performed on the distribution column */
if (joinExpr->usingClause)
{
Var *partitionColumnVar = NULL;
if(rightTableEntry->rtekind == RTE_FUNCTION || rightTableEntry->rtekind == RTE_RELATION)
{
if(!CheckUsingClauseForRte(rightTableEntry, joinExpr->usingClause, &partitionColumnVar))
{
return false;
}
*distRte = rightTableEntry;
/* Get the attribute number for the outer table */
*attnum = GetAttrNumForMatchingColumn(*outerRte, rightTableEntry->relid, partitionColumnVar);
return CheckAttrNumAndDistributionTypeForJoinPushdown(*attnum, *distRte);
}
else if(rightTableEntry->rtekind == RTE_SUBQUERY)
{
if(FindPartitionColumnInSubquery(rightTableEntry->subquery, joinExpr->usingClause, &partitionColumnVar, distRte))
{
/* Get the attribute number for the outer table */
*attnum = GetAttrNumForMatchingColumn(*outerRte, (*distRte)->relid, partitionColumnVar);
return CheckAttrNumAndDistributionTypeForJoinPushdown(*attnum, *distRte);
}
else
{
return false;
}
}
else
{
ereport(DEBUG5, (errmsg("CheckPushDownFeasibilityAndComputeIndexes: right table of kind %d is not supported for pushdown when using clause is present",
rightTableEntry->rtekind)));
return false;
}
}
else /* join is defined with on clause */
{
List *joinClauseList = make_ands_implicit((Expr *) joinExpr->quals); List *joinClauseList = make_ands_implicit((Expr *) joinExpr->quals);
if (joinClauseList == NIL) if (joinClauseList == NIL)
{ {
@ -2913,6 +2814,7 @@ bool CheckPushDownFeasibilityAndComputeIndexes(JoinExpr *joinExpr, Query *query,
/* left column is the outer table of the comparison, get right */ /* left column is the outer table of the comparison, get right */
rte = rt_fetch(rightColumn->varno, query->rtable); rte = rt_fetch(rightColumn->varno, query->rtable);
innerVar = rightColumn; innerVar = rightColumn;
/* additional constraints will be introduced on outer relation variable */
*attnum = leftColumn->varattno; *attnum = leftColumn->varattno;
} }
else if (rightColumn->varno == *outerRtIndex) else if (rightColumn->varno == *outerRtIndex)
@ -2920,21 +2822,24 @@ bool CheckPushDownFeasibilityAndComputeIndexes(JoinExpr *joinExpr, Query *query,
/* right column is the outer table of the comparison, get left*/ /* right column is the outer table of the comparison, get left*/
rte = rt_fetch(leftColumn->varno, query->rtable); rte = rt_fetch(leftColumn->varno, query->rtable);
innerVar = leftColumn; innerVar = leftColumn;
/* additional constraints will be introduced on outer relation variable */
*attnum = rightColumn->varattno; *attnum = rightColumn->varattno;
} }
else else
{ {
continue; continue;
} }
/* the simple case, the inner table itself a Citus table */
if (rte && IsCitusTable(rte->relid)) if (rte && IsCitusTable(rte->relid))
{ {
CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(rte->relid); if(CheckPushDownConditionOnInnerVar(innerVar, rte))
if(innerVar->varattno == cacheEntry->partitionColumn->varattno)
{ {
*distRte = rte; *distRte = rte;
return CheckAttrNumAndDistributionTypeForJoinPushdown(*attnum, *distRte); return true;
} }
} }
/* the inner table is a subquery, extract the base relation referred in the qual */
else if (rte && rte->rtekind == RTE_SUBQUERY) else if (rte && rte->rtekind == RTE_SUBQUERY)
{ {
Var *baseVar = NULL; Var *baseVar = NULL;
@ -2942,24 +2847,22 @@ bool CheckPushDownFeasibilityAndComputeIndexes(JoinExpr *joinExpr, Query *query,
if (ResolveBaseVarFromSubquery(innerVar, rte->subquery, &baseVar, &baseRte)) if (ResolveBaseVarFromSubquery(innerVar, rte->subquery, &baseVar, &baseRte))
{ {
if (IsCitusTable(baseRte->relid)) if (baseRte && IsCitusTable(baseRte->relid))
{ {
CitusTableCacheEntry *entry = GetCitusTableCacheEntry(baseRte->relid); if(CheckPushDownConditionOnInnerVar(baseVar, baseRte))
if (baseVar->varattno == entry->partitionColumn->varattno)
{ {
*distRte = baseRte; *distRte = baseRte;
return CheckAttrNumAndDistributionTypeForJoinPushdown(*attnum, *distRte); return true;
} }
} }
} }
return false;
} }
} }
return false;
}
return false;
} }
/* /*
* Initializes input variables to call CheckPushDownFeasibilityAndComputeIndexes. * Initializes input variables to call CheckPushDownFeasibilityAndComputeIndexes.
* See CheckPushDownFeasibilityAndComputeIndexes for more details. * See CheckPushDownFeasibilityAndComputeIndexes for more details.

View File

@ -54,8 +54,6 @@ extern void UpdateVarNosInNode(Node *node, Index newVarNo);
extern bool IsPushdownSafeForRTEInLeftJoin(RangeTblEntry *rte); extern bool IsPushdownSafeForRTEInLeftJoin(RangeTblEntry *rte);
extern bool CheckPushDownFeasibilityAndComputeIndexes(JoinExpr *joinExpr, Query *query, int *outerRtIndex, RangeTblEntry **outerRte, RangeTblEntry **distRte, int *attnum); extern bool CheckPushDownFeasibilityAndComputeIndexes(JoinExpr *joinExpr, Query *query, int *outerRtIndex, RangeTblEntry **outerRte, RangeTblEntry **distRte, int *attnum);
extern bool CheckPushDownFeasibilityLeftJoin(JoinExpr *joinExpr, Query *query); extern bool CheckPushDownFeasibilityLeftJoin(JoinExpr *joinExpr, Query *query);
bool CheckUsingClauseForRte(RangeTblEntry *rte, List *usingClause, Var **partitionColumnVar);
bool FindPartitionColumnInSubquery(Query *query, List *usingClause, Var **partitionColumnVar, RangeTblEntry **rte);
bool ResolveBaseVarFromSubquery(Var *var, Query *query, Var **baseVar, RangeTblEntry **baseRte); bool ResolveBaseVarFromSubquery(Var *var, Query *query, Var **baseVar, RangeTblEntry **baseRte);
bool CheckAttrNumAndDistributionTypeForJoinPushdown(int attnum, RangeTblEntry *rte); bool CheckPushDownConditionOnInnerVar(Var* innervar, RangeTblEntry *rte);
#endif /* RECURSIVE_PLANNING_H */ #endif /* RECURSIVE_PLANNING_H */