Do not push down the left join for nested cases.

pull/7973/merge^2
eaydingol 2025-05-06 12:18:37 +03:00
parent 5c05af952f
commit 0fb3b6a725
3 changed files with 91 additions and 41 deletions

View File

@ -206,29 +206,28 @@ UpdateTaskQueryString(Query *query, Task *task)
SetTaskQueryIfShouldLazyDeparse(task, query);
}
/*
* Iterates through the FROM clause of the query and checks if there is a join
* clause with a reference and distributed table.
* If there is, it returns the index of the range table entry of the outer
* table in the join clause. It also sets the innerRte to point to the
* range table entry inner table. If there is no join clause with a distributed
* table, it returns -1.
*/
int
GetRepresentativeTablesFromJoinClause(List *fromlist, List *rtable, RangeTblEntry **innerRte)
* expr with a reference and distributed table.
* If there is, it adds the index of the range table entry of the outer
* table in the join clause to the constraintIndexes list. It also sets the
* innerRte to point to the range table entry inner table.
*/
bool ExtractIndexesForConstaints(List *fromList, List *rtable,
int *outerRtIndex, RangeTblEntry **distRte)
{
ereport(DEBUG5, (errmsg("******")));
ListCell *fromExprCell;
/* TODO: is this case even possible | fromlist | > 1, no test cases yet */
if(list_length(fromlist) > 1)
/* TODO: is this case even possible | fromlist | > 1. */
if(list_length(fromList) > 1)
{
ereport(DEBUG5, (errmsg("GetRepresentativeTablesFromJoinClause: Fromlist length > 1")));
ereport(DEBUG5, (errmsg("ExtractIndexesForConstaints: Fromlist length > 1")));
return -1;
}
foreach(fromExprCell, fromlist)
foreach(fromExprCell, fromList)
{
Node *fromElement = (Node *) lfirst(fromExprCell);
if (IsA(fromElement, JoinExpr))
{
JoinExpr *joinExpr = (JoinExpr *) fromElement;
@ -236,29 +235,24 @@ GetRepresentativeTablesFromJoinClause(List *fromlist, List *rtable, RangeTblEntr
{
continue;
}
// TODO: this path should not be active when the conditions are not met.
int outerRtIndex = ((RangeTblRef *)joinExpr->larg)->rtindex;
RangeTblEntry *outerRte = rt_fetch(outerRtIndex, rtable);
*outerRtIndex = (((RangeTblRef *)joinExpr->larg)->rtindex);
RangeTblEntry *outerRte = rt_fetch(*outerRtIndex, rtable);
if(!IsPushdownSafeForRTEInLeftJoin(outerRte))
{
ereport(DEBUG5, (errmsg("GetRepresentativeTablesFromJoinClause: RTE is not pushdown safe")));
return -1;
return false;
}
ereport(DEBUG5, (errmsg("\t OK outerRte: %s", outerRte->eref->aliasname)));
if (!CheckIfAllCitusRTEsAreColocated((Node *)joinExpr->rarg, rtable, innerRte))
if (!CheckIfAllCitusRTEsAreColocated((Node *)joinExpr->rarg, rtable, distRte))
{
return -1;
return false;
}
return outerRtIndex;
return true;
}
}
return -1;
return false;
}
@ -301,13 +295,16 @@ UpdateWhereClauseForOuterJoin(Node *node, List *relationShardList)
* distributed tables in the join clause.
*/
RangeTblEntry *innerRte = NULL;
int outerRtIndex = GetRepresentativeTablesFromJoinClause(fromExpr->fromlist, query->rtable, &innerRte);
if (outerRtIndex < 0 || innerRte == NULL)
int outerRtIndex = -1;
bool result = ExtractIndexesForConstaints(fromExpr->fromlist, query->rtable, &outerRtIndex, &innerRte);
if (!result)
{
ereport(DEBUG5, (errmsg("ExtractIndexesForConstaints: failed to extract indexes")));
return query_tree_walker(query, UpdateWhereClauseForOuterJoin, relationShardList, 0);
}
ereport(DEBUG5, (errmsg("\t innerRte: %s", innerRte->eref->aliasname)));
ereport(DEBUG5, (errmsg("\t Distributed table from the inner part: %s", innerRte->eref->aliasname)));
RelationShard *relationShard = FindRelationShard(innerRte->relid, relationShardList);
uint64 shardId = relationShard->shardId;

View File

@ -88,6 +88,7 @@
#include "distributed/multi_server_executor.h"
#include "distributed/query_colocation_checker.h"
#include "distributed/query_pushdown_planning.h"
#include "distributed/query_utils.h"
#include "distributed/recursive_planning.h"
#include "distributed/relation_restriction_equivalence.h"
#include "distributed/shard_pruning.h"
@ -105,6 +106,7 @@ struct RecursivePlanningContextInternal
bool allDistributionKeysInQueryAreEqual; /* used for some optimizations */
List *subPlanList;
PlannerRestrictionContext *plannerRestrictionContext;
bool restrictionEquivalenceCheck;
};
/* track depth of current recursive planner query */
@ -641,6 +643,35 @@ RecursivelyPlanNonColocatedSubqueriesInWhere(Query *query,
}
/*
* Returns true if the given node is recurring, or the node is a
* JoinExpr that contains a recurring node.
*/
static bool
JoinExprHasNonRecurringTable(Node *node, Query *query)
{
if (node == NULL)
{
return false;
}
else if (IsA(node, RangeTblRef))
{
return IsRTERefRecurring((RangeTblRef *) node, query);
}
else if (IsA(node, JoinExpr))
{
JoinExpr *joinExpr = (JoinExpr *) node;
return JoinExprHasNonRecurringTable(joinExpr->larg, query) ||
JoinExprHasNonRecurringTable(joinExpr->rarg, query);
}
else
{
return false;
}
}
/*
* RecursivelyPlanRecurringTupleOuterJoinWalker descends into a join tree and
* recursively plans all non-recurring (i.e., distributed) rels that that
@ -714,16 +745,18 @@ RecursivelyPlanRecurringTupleOuterJoinWalker(Node *node, Query *query,
* conditions are met:
* 1. The left side is recurring
* 2. The right side is not recurring
* 3. The left side is not a RangeTblRef (i.e., it is not a reference/local table)
* 4. The tables in the rigt side are not colocated.
* 5. The left side does not have the distribution column
* 3. Either of the following:
* a. The left side is not a RangeTblRef (i.e., it is not a reference/local table)
* b. The tables in the rigt side are not colocated.
* 5. The left side does not have the distribution column (TODO: CHECK THIS)
*/
if (leftNodeRecurs && !rightNodeRecurs)
{
int outerRtIndex = ((RangeTblRef *) leftNode)->rtindex;
RangeTblEntry *rte = rt_fetch(outerRtIndex, query->rtable);
if(!IsPushdownSafeForRTEInLeftJoin(rte))
RangeTblEntry *innerRte = NULL;
if (!IsPushdownSafeForRTEInLeftJoin(rte))
{
ereport(DEBUG1, (errmsg("recursively planning right side of "
"the left join since the outer side "
@ -731,6 +764,27 @@ RecursivelyPlanRecurringTupleOuterJoinWalker(Node *node, Query *query,
RecursivelyPlanDistributedJoinNode(rightNode, query,
recursivePlanningContext);
}
else if (!CheckIfAllCitusRTEsAreColocated(rightNode, query->rtable, &innerRte))
{
ereport(DEBUG1, (errmsg("recursively planning right side of the left join "
"since tables in the inner side of the left "
"join are not colocated")));
RecursivelyPlanDistributedJoinNode(rightNode, query,
recursivePlanningContext);
}
}
/*
* rightNodeRecurs if there is a recurring table in the right side. However, if the right side
* is a join expression, we need to check if it contains a recurring table. If it does, we need to
* recursively plan the right side of the left join. Push-down path does not handle the nested joins
* yet, once we have that, we can remove this check.
*/
else if (leftNodeRecurs && rightNodeRecurs && JoinExprHasNonRecurringTable(rightNode, query))
{
ereport(DEBUG1, (errmsg("recursively planning right side of the left join "
"since right side is a joinexpr with non-recurring tables")));
RecursivelyPlanDistributedJoinNode(rightNode, query,
recursivePlanningContext);
}
/*

View File

@ -23,8 +23,7 @@
#include "distributed/query_utils.h"
int GetRepresentativeTablesFromJoinClause(List *fromlist, List *rtable, RangeTblEntry **innerRte);
bool ExtractIndexesForConstaints(List *fromList, List *rtable, int *outerRtIndex, RangeTblEntry **distRte);
extern void RebuildQueryStrings(Job *workerJob);
extern bool UpdateRelationToShardNames(Node *node, List *relationShardList);
extern bool UpdateWhereClauseForOuterJoin(Node *node, List *relationShardList);