onur-leftjoin_push-improvements
eaydingol 2025-07-31 17:15:38 +03:00
parent 665ae75a65
commit fc109f408b
6 changed files with 171 additions and 128 deletions

View File

@ -520,6 +520,7 @@ IsCitusTableTypeCacheEntry(CitusTableCacheEntry *tableEntry, CitusTableType tabl
tableEntry->colocationId, tableType); tableEntry->colocationId, tableType);
} }
/* /*
* IsFirstShard returns true if the given shardId is the first shard. * IsFirstShard returns true if the given shardId is the first shard.
*/ */
@ -545,6 +546,7 @@ IsFirstShard(CitusTableCacheEntry *tableEntry, uint64 shardId)
} }
} }
/* /*
* HasDistributionKey returns true if given Citus table has a distribution key. * HasDistributionKey returns true if given Citus table has a distribution key.
*/ */

View File

@ -221,27 +221,31 @@ UpdateWhereClauseForOuterJoin(Node *node, List *relationShardList)
if (!IsA(node, Query)) if (!IsA(node, Query))
{ {
return expression_tree_walker(node, UpdateWhereClauseForOuterJoin, relationShardList); return expression_tree_walker(node, UpdateWhereClauseForOuterJoin,
relationShardList);
} }
Query *query = (Query *) node; Query *query = (Query *) node;
if (query->jointree == NULL) if (query->jointree == NULL)
{ {
return query_tree_walker(query, UpdateWhereClauseForOuterJoin, relationShardList, 0); return query_tree_walker(query, UpdateWhereClauseForOuterJoin, relationShardList,
0);
} }
FromExpr *fromExpr = query->jointree; FromExpr *fromExpr = query->jointree;
if (fromExpr == NULL || fromExpr->fromlist == NIL) if (fromExpr == NULL || fromExpr->fromlist == NIL)
{ {
return query_tree_walker(query, UpdateWhereClauseForOuterJoin, relationShardList, 0); return query_tree_walker(query, UpdateWhereClauseForOuterJoin, relationShardList,
0);
} }
// TODO: generalize to the list /* TODO: generalize to the list */
Node *firstFromItem = linitial(fromExpr->fromlist); Node *firstFromItem = linitial(fromExpr->fromlist);
if (!IsA(firstFromItem, JoinExpr)) if (!IsA(firstFromItem, JoinExpr))
{ {
return query_tree_walker(query, UpdateWhereClauseForOuterJoin, relationShardList, 0); return query_tree_walker(query, UpdateWhereClauseForOuterJoin, relationShardList,
0);
} }
JoinExpr *joinExpr = (JoinExpr *) firstFromItem; JoinExpr *joinExpr = (JoinExpr *) firstFromItem;
@ -256,15 +260,20 @@ UpdateWhereClauseForOuterJoin(Node *node, List *relationShardList)
RangeTblEntry *outerRte = NULL; RangeTblEntry *outerRte = NULL;
int outerRtIndex = -1; int outerRtIndex = -1;
int attnum; int attnum;
if(!CheckPushDownFeasibilityAndComputeIndexes(joinExpr, query, &outerRtIndex, &outerRte, &innerRte, &attnum)) if (!CheckPushDownFeasibilityAndComputeIndexes(joinExpr, query, &outerRtIndex,
&outerRte, &innerRte, &attnum))
{ {
return query_tree_walker(query, UpdateWhereClauseForOuterJoin, relationShardList, 0); return query_tree_walker(query, UpdateWhereClauseForOuterJoin, relationShardList,
0);
} }
if (attnum == InvalidAttrNumber) if (attnum == InvalidAttrNumber)
{ {
return query_tree_walker(query, UpdateWhereClauseForOuterJoin, relationShardList, 0); return query_tree_walker(query, UpdateWhereClauseForOuterJoin, relationShardList,
0);
} }
ereport(DEBUG5, (errmsg("Distributed table from the inner part of the outer join: %s.", innerRte->eref->aliasname))); ereport(DEBUG5, (errmsg(
"Distributed table from the inner part of the outer join: %s.",
innerRte->eref->aliasname)));
RelationShard *relationShard = FindRelationShard(innerRte->relid, relationShardList); RelationShard *relationShard = FindRelationShard(innerRte->relid, relationShardList);
@ -289,9 +298,12 @@ UpdateWhereClauseForOuterJoin(Node *node, List *relationShardList)
/* load the interval for the shard and create constant nodes for the upper/lower bounds */ /* load the interval for the shard and create constant nodes for the upper/lower bounds */
ShardInterval *shardInterval = LoadShardInterval(shardId); ShardInterval *shardInterval = LoadShardInterval(shardId);
Const *constNodeLowerBound = makeConst(INT4OID, -1, InvalidOid, sizeof(int32), shardInterval->minValue, false, true); Const *constNodeLowerBound = makeConst(INT4OID, -1, InvalidOid, sizeof(int32),
Const *constNodeUpperBound = makeConst(INT4OID, -1, InvalidOid, sizeof(int32), shardInterval->maxValue, false, true); shardInterval->minValue, false, true);
Const *constNodeZero = makeConst(INT4OID, -1, InvalidOid, sizeof(int32), Int32GetDatum(0), false, true); Const *constNodeUpperBound = makeConst(INT4OID, -1, InvalidOid, sizeof(int32),
shardInterval->maxValue, false, true);
Const *constNodeZero = makeConst(INT4OID, -1, InvalidOid, sizeof(int32),
Int32GetDatum(0), false, true);
/* create a function expression node for the hash partition column */ /* create a function expression node for the hash partition column */
FuncExpr *hashFunction = makeNode(FuncExpr); FuncExpr *hashFunction = makeNode(FuncExpr);
@ -301,10 +313,12 @@ UpdateWhereClauseForOuterJoin(Node *node, List *relationShardList)
hashFunction->funcretset = false; hashFunction->funcretset = false;
/* create a function expression for the lower bound of the shard interval */ /* create a function expression for the lower bound of the shard interval */
Oid resultTypeOid = get_func_rettype(cacheEntry->shardIntervalCompareFunction->fn_oid); Oid resultTypeOid = get_func_rettype(
cacheEntry->shardIntervalCompareFunction->fn_oid);
FuncExpr *lowerBoundFuncExpr = makeNode(FuncExpr); FuncExpr *lowerBoundFuncExpr = makeNode(FuncExpr);
lowerBoundFuncExpr->funcid = cacheEntry->shardIntervalCompareFunction->fn_oid; lowerBoundFuncExpr->funcid = cacheEntry->shardIntervalCompareFunction->fn_oid;
lowerBoundFuncExpr->args = list_make2((Node *) constNodeLowerBound, (Node *) hashFunction); lowerBoundFuncExpr->args = list_make2((Node *) constNodeLowerBound,
(Node *) hashFunction);
lowerBoundFuncExpr->funcresulttype = resultTypeOid; lowerBoundFuncExpr->funcresulttype = resultTypeOid;
lowerBoundFuncExpr->funcretset = false; lowerBoundFuncExpr->funcretset = false;
@ -316,18 +330,22 @@ UpdateWhereClauseForOuterJoin(Node *node, List *relationShardList)
* shardInterval->minValue < hash(partitionColumn) * shardInterval->minValue < hash(partitionColumn)
* See SearchCachedShardInterval for the behavior at the boundaries. * See SearchCachedShardInterval for the behavior at the boundaries.
*/ */
Expr *lowerBoundExpr = make_opclause(lessThan, BOOLOID, false, (Expr *) lowerBoundFuncExpr, Expr *lowerBoundExpr = make_opclause(lessThan, BOOLOID, false,
(Expr *) lowerBoundFuncExpr,
(Expr *) constNodeZero, InvalidOid, InvalidOid); (Expr *) constNodeZero, InvalidOid, InvalidOid);
/* create a function expression for the upper bound of the shard interval */ /* create a function expression for the upper bound of the shard interval */
FuncExpr *upperBoundFuncExpr = makeNode(FuncExpr); FuncExpr *upperBoundFuncExpr = makeNode(FuncExpr);
upperBoundFuncExpr->funcid = cacheEntry->shardIntervalCompareFunction->fn_oid; upperBoundFuncExpr->funcid = cacheEntry->shardIntervalCompareFunction->fn_oid;
upperBoundFuncExpr->args = list_make2((Node *) hashFunction, (Expr *) constNodeUpperBound); upperBoundFuncExpr->args = list_make2((Node *) hashFunction,
(Expr *) constNodeUpperBound);
upperBoundFuncExpr->funcresulttype = resultTypeOid; upperBoundFuncExpr->funcresulttype = resultTypeOid;
upperBoundFuncExpr->funcretset = false; upperBoundFuncExpr->funcretset = false;
Oid lessThanOrEqualTo = GetSysCacheOid(OPERNAMENSP, Anum_pg_operator_oid, CStringGetDatum("<="), Oid lessThanOrEqualTo = GetSysCacheOid(OPERNAMENSP, Anum_pg_operator_oid,
resultTypeOid, resultTypeOid, ObjectIdGetDatum(11)); CStringGetDatum("<="),
resultTypeOid, resultTypeOid, ObjectIdGetDatum(
11));
/* /*
@ -335,12 +353,14 @@ UpdateWhereClauseForOuterJoin(Node *node, List *relationShardList)
* hash(partitionColumn) <= shardInterval->maxValue * hash(partitionColumn) <= shardInterval->maxValue
* See SearchCachedShardInterval for the behavior at the boundaries. * See SearchCachedShardInterval for the behavior at the boundaries.
*/ */
Expr *upperBoundExpr = make_opclause(lessThanOrEqualTo, BOOLOID, false, (Expr *) upperBoundFuncExpr, Expr *upperBoundExpr = make_opclause(lessThanOrEqualTo, BOOLOID, false,
(Expr *) upperBoundFuncExpr,
(Expr *) constNodeZero, InvalidOid, InvalidOid); (Expr *) constNodeZero, InvalidOid, InvalidOid);
/* create a node for both upper and lower bound */ /* create a node for both upper and lower bound */
Node *shardIntervalBoundQuals = make_and_qual((Node *) lowerBoundExpr, (Node *) upperBoundExpr); Node *shardIntervalBoundQuals = make_and_qual((Node *) lowerBoundExpr,
(Node *) upperBoundExpr);
/* /*
* Add a null test for the partition column for the first shard. * Add a null test for the partition column for the first shard.
@ -354,7 +374,8 @@ UpdateWhereClauseForOuterJoin(Node *node, List *relationShardList)
nullTest->nulltesttype = IS_NULL; /* Check for IS NULL */ nullTest->nulltesttype = IS_NULL; /* Check for IS NULL */
nullTest->arg = (Expr *) partitionColumnVar; /* The variable to check */ nullTest->arg = (Expr *) partitionColumnVar; /* The variable to check */
nullTest->argisrow = false; nullTest->argisrow = false;
shardIntervalBoundQuals = (Node *) make_orclause(list_make2(nullTest, shardIntervalBoundQuals)); shardIntervalBoundQuals = (Node *) make_orclause(list_make2(nullTest,
shardIntervalBoundQuals));
} }
if (fromExpr->quals == NULL) if (fromExpr->quals == NULL)

View File

@ -2240,6 +2240,7 @@ QueryPushdownSqlTaskList(Query *query, uint64 jobId,
} }
prevShardCount = cacheEntry->shardIntervalArrayLength; prevShardCount = cacheEntry->shardIntervalArrayLength;
innerTableOfOuterJoin = false; innerTableOfOuterJoin = false;
/* /*
* For left outer joins, we need to check if the table is in the inner * For left outer joins, we need to check if the table is in the inner
* part of the join. If it is, we need to mark this shard and add interval * part of the join. If it is, we need to mark this shard and add interval

View File

@ -765,7 +765,8 @@ RecursivelyPlanRecurringTupleOuterJoinWalker(Node *node, Query *query,
} }
else else
{ {
ereport(DEBUG3, (errmsg("a push down safe left join with recurring left side"))); ereport(DEBUG3, (errmsg(
"a push down safe left join with recurring left side")));
leftNodeRecurs = false; /* left node will be pushed down */ leftNodeRecurs = false; /* left node will be pushed down */
} }
} }
@ -2666,7 +2667,8 @@ hasPseudoconstantQuals(RelationRestrictionContext *relationRestrictionContext)
* IsPushdownSafeForRTEInLeftJoin returns true if the given range table entry * IsPushdownSafeForRTEInLeftJoin returns true if the given range table entry
* is safe for pushdown. Currently, we only allow reference tables. * is safe for pushdown. Currently, we only allow reference tables.
*/ */
bool IsPushdownSafeForRTEInLeftJoin(RangeTblEntry *rte) bool
IsPushdownSafeForRTEInLeftJoin(RangeTblEntry *rte)
{ {
if (IsCitusTable(rte->relid) && IsCitusTableType(rte->relid, REFERENCE_TABLE)) if (IsCitusTable(rte->relid) && IsCitusTableType(rte->relid, REFERENCE_TABLE))
{ {
@ -2684,12 +2686,15 @@ bool IsPushdownSafeForRTEInLeftJoin(RangeTblEntry *rte)
/* /*
* Recursively resolve a Var from a subquery target list to the base Var and RTE * Recursively resolve a Var from a subquery target list to the base Var and RTE
*/ */
bool ResolveBaseVarFromSubquery(Var *var, Query *query, bool
ResolveBaseVarFromSubquery(Var *var, Query *query,
Var **baseVar, RangeTblEntry **baseRte) Var **baseVar, RangeTblEntry **baseRte)
{ {
TargetEntry *tle = get_tle_by_resno(query->targetList, var->varattno); TargetEntry *tle = get_tle_by_resno(query->targetList, var->varattno);
if (!tle || !IsA(tle->expr, Var)) if (!tle || !IsA(tle->expr, Var))
{
return false; return false;
}
Var *tleVar = (Var *) tle->expr; Var *tleVar = (Var *) tle->expr;
RangeTblEntry *rte = rt_fetch(tleVar->varno, query->rtable); RangeTblEntry *rte = rt_fetch(tleVar->varno, query->rtable);
@ -2719,7 +2724,8 @@ bool ResolveBaseVarFromSubquery(Var *var, Query *query,
* from a join qual for a join pushdown. It returns true if it is valid, * from a join qual for a join pushdown. It returns true if it is valid,
* it is the partition column and hash distributed, otherwise it returns false. * it is the partition column and hash distributed, otherwise it returns false.
*/ */
bool CheckPushDownConditionOnInnerVar(Var* innerVar, RangeTblEntry *rte) bool
CheckPushDownConditionOnInnerVar(Var *innerVar, RangeTblEntry *rte)
{ {
if (!innerVar || !rte) if (!innerVar || !rte)
{ {
@ -2739,7 +2745,8 @@ bool CheckPushDownConditionOnInnerVar(Var* innerVar, RangeTblEntry *rte)
} }
/* Check if the inner variable is part of the distribution column */ /* Check if the inner variable is part of the distribution column */
if (cacheEntry->partitionColumn && innerVar->varattno == cacheEntry->partitionColumn->varattno) if (cacheEntry->partitionColumn && innerVar->varattno ==
cacheEntry->partitionColumn->varattno)
{ {
return true; return true;
} }
@ -2755,15 +2762,17 @@ bool CheckPushDownConditionOnInnerVar(Var* innerVar, RangeTblEntry *rte)
* range table entry, the inner (distributed) relation's range table entry, and the * range table entry, the inner (distributed) relation's range table entry, and the
* attribute number of the partition column in the outer relation. * attribute number of the partition column in the outer relation.
*/ */
bool CheckPushDownFeasibilityAndComputeIndexes(JoinExpr *joinExpr, Query *query, int *outerRtIndex, RangeTblEntry **outerRte, RangeTblEntry **distRte, int *attnum) bool
CheckPushDownFeasibilityAndComputeIndexes(JoinExpr *joinExpr, Query *query,
int *outerRtIndex, RangeTblEntry **outerRte,
RangeTblEntry **distRte, int *attnum)
{ {
if (!IS_OUTER_JOIN(joinExpr->jointype)) if (!IS_OUTER_JOIN(joinExpr->jointype))
{ {
return false; return false;
} }
// TODO: generalize to right joins /* TODO: generalize to right joins */
if (joinExpr->jointype != JOIN_LEFT) if (joinExpr->jointype != JOIN_LEFT)
{ {
return false; return false;
@ -2780,7 +2789,8 @@ bool CheckPushDownFeasibilityAndComputeIndexes(JoinExpr *joinExpr, Query *query,
/* Push down for chained joins is not supported in this path. */ /* Push down for chained joins is not supported in this path. */
if (IsA(joinExpr->rarg, JoinExpr) || IsA(joinExpr->larg, JoinExpr)) if (IsA(joinExpr->rarg, JoinExpr) || IsA(joinExpr->larg, JoinExpr))
{ {
ereport(DEBUG5, (errmsg("One side is a join expression, pushdown is not supported in this path."))); ereport(DEBUG5, (errmsg(
"One side is a join expression, pushdown is not supported in this path.")));
return false; return false;
} }
@ -2814,6 +2824,7 @@ bool CheckPushDownFeasibilityAndComputeIndexes(JoinExpr *joinExpr, Query *query,
/* left column is the outer table of the comparison, get right */ /* left column is the outer table of the comparison, get right */
rte = rt_fetch(rightColumn->varno, query->rtable); rte = rt_fetch(rightColumn->varno, query->rtable);
innerVar = rightColumn; innerVar = rightColumn;
/* additional constraints will be introduced on outer relation variable */ /* additional constraints will be introduced on outer relation variable */
*attnum = leftColumn->varattno; *attnum = leftColumn->varattno;
} }
@ -2822,6 +2833,7 @@ bool CheckPushDownFeasibilityAndComputeIndexes(JoinExpr *joinExpr, Query *query,
/* right column is the outer table of the comparison, get left*/ /* right column is the outer table of the comparison, get left*/
rte = rt_fetch(leftColumn->varno, query->rtable); rte = rt_fetch(leftColumn->varno, query->rtable);
innerVar = leftColumn; innerVar = leftColumn;
/* additional constraints will be introduced on outer relation variable */ /* additional constraints will be introduced on outer relation variable */
*attnum = rightColumn->varattno; *attnum = rightColumn->varattno;
} }
@ -2867,11 +2879,13 @@ bool CheckPushDownFeasibilityAndComputeIndexes(JoinExpr *joinExpr, Query *query,
* Initializes input variables to call CheckPushDownFeasibilityAndComputeIndexes. * Initializes input variables to call CheckPushDownFeasibilityAndComputeIndexes.
* See CheckPushDownFeasibilityAndComputeIndexes for more details. * See CheckPushDownFeasibilityAndComputeIndexes for more details.
*/ */
bool CheckPushDownFeasibilityLeftJoin(JoinExpr *joinExpr, Query *query) bool
CheckPushDownFeasibilityLeftJoin(JoinExpr *joinExpr, Query *query)
{ {
int outerRtIndex; int outerRtIndex;
RangeTblEntry *outerRte = NULL; RangeTblEntry *outerRte = NULL;
RangeTblEntry *innerRte = NULL; RangeTblEntry *innerRte = NULL;
int attnum; int attnum;
return CheckPushDownFeasibilityAndComputeIndexes(joinExpr, query, &outerRtIndex, &outerRte, &innerRte, &attnum); return CheckPushDownFeasibilityAndComputeIndexes(joinExpr, query, &outerRtIndex,
&outerRte, &innerRte, &attnum);
} }

View File

@ -52,8 +52,13 @@ extern bool IsRelationLocalTableOrMatView(Oid relationId);
extern bool ContainsReferencesToOuterQuery(Query *query); extern bool ContainsReferencesToOuterQuery(Query *query);
extern void UpdateVarNosInNode(Node *node, Index newVarNo); extern void UpdateVarNosInNode(Node *node, Index newVarNo);
extern bool IsPushdownSafeForRTEInLeftJoin(RangeTblEntry *rte); extern bool IsPushdownSafeForRTEInLeftJoin(RangeTblEntry *rte);
extern bool CheckPushDownFeasibilityAndComputeIndexes(JoinExpr *joinExpr, Query *query, int *outerRtIndex, RangeTblEntry **outerRte, RangeTblEntry **distRte, int *attnum); extern bool CheckPushDownFeasibilityAndComputeIndexes(JoinExpr *joinExpr, Query *query,
int *outerRtIndex,
RangeTblEntry **outerRte,
RangeTblEntry **distRte,
int *attnum);
extern bool CheckPushDownFeasibilityLeftJoin(JoinExpr *joinExpr, Query *query); extern bool CheckPushDownFeasibilityLeftJoin(JoinExpr *joinExpr, Query *query);
bool ResolveBaseVarFromSubquery(Var *var, Query *query, Var **baseVar, RangeTblEntry **baseRte); bool ResolveBaseVarFromSubquery(Var *var, Query *query, Var **baseVar,
RangeTblEntry **baseRte);
bool CheckPushDownConditionOnInnerVar(Var *innervar, RangeTblEntry *rte); bool CheckPushDownConditionOnInnerVar(Var *innervar, RangeTblEntry *rte);
#endif /* RECURSIVE_PLANNING_H */ #endif /* RECURSIVE_PLANNING_H */