Add lateral join checks for outer joins and drop the useless ones for semi joins

pull/6512/head
Onur Tirtir 2022-12-01 18:18:55 +03:00
parent e7e4881289
commit 2803470b58
6 changed files with 90 additions and 96 deletions

View File

@ -105,6 +105,7 @@ static List * CreateSubqueryTargetListAndAdjustVars(List *columnList);
static AttrNumber FindResnoForVarInTargetList(List *targetList, int varno, int varattno); static AttrNumber FindResnoForVarInTargetList(List *targetList, int varno, int varattno);
static bool RelationInfoContainsOnlyRecurringTuples(PlannerInfo *plannerInfo, static bool RelationInfoContainsOnlyRecurringTuples(PlannerInfo *plannerInfo,
Relids relids); Relids relids);
static char * RecurringTypeDescription(RecurringTuplesType recurType);
static DeferredErrorMessage * DeferredErrorIfUnsupportedLateralSubquery( static DeferredErrorMessage * DeferredErrorIfUnsupportedLateralSubquery(
PlannerInfo *plannerInfo, Relids recurringRelIds, Relids nonRecurringRelIds); PlannerInfo *plannerInfo, Relids recurringRelIds, Relids nonRecurringRelIds);
static Var * PartitionColumnForPushedDownSubquery(Query *query); static Var * PartitionColumnForPushedDownSubquery(Query *query);
@ -603,7 +604,6 @@ DeferErrorIfUnsupportedSubqueryPushdown(Query *originalQuery,
return error; return error;
} }
/* we shouldn't allow reference tables in the outer part of outer joins */
error = DeferredErrorIfUnsupportedRecurringTuplesJoin(plannerRestrictionContext); error = DeferredErrorIfUnsupportedRecurringTuplesJoin(plannerRestrictionContext);
if (error) if (error)
{ {
@ -643,7 +643,8 @@ DeferErrorIfUnsupportedSubqueryPushdown(Query *originalQuery,
* sublinks into joins. * sublinks into joins.
* *
* In some cases, sublinks are pulled up and converted into outer joins. Those cases * In some cases, sublinks are pulled up and converted into outer joins. Those cases
* are already handled with DeferredErrorIfUnsupportedRecurringTuplesJoin(). * are already handled with RecursivelyPlanRecurringTupleOuterJoinWalker() or thrown
* an error for in DeferredErrorIfUnsupportedRecurringTuplesJoin().
* *
* If the sublinks are not pulled up, we should still error out in if the expression * If the sublinks are not pulled up, we should still error out in if the expression
* in the FROM clause would recur for every shard in a subquery on the WHERE clause. * in the FROM clause would recur for every shard in a subquery on the WHERE clause.
@ -751,20 +752,11 @@ FromClauseRecurringTupleType(Query *queryTree)
/* /*
* DeferredErrorIfUnsupportedRecurringTuplesJoin returns true if there exists a outer join * DeferredErrorIfUnsupportedRecurringTuplesJoin returns a DeferredError if
* between reference table and distributed tables which does not follow * there exists a join between a recurring rel (such as reference tables
* the rules : * and intermediate_results) and a non-recurring rel (such as distributed tables
* - Reference tables can not be located in the outer part of the semi join or the * and subqueries that we can push-down to worker nodes) that can return an
* anti join. Otherwise, we may have duplicate results. Although getting duplicate * incorrect result set due to recurring tuples coming from the recurring rel.
* results is not possible by checking the equality on the column of the reference
* table and partition column of distributed table, we still keep these checks.
* Because, using the reference table in the outer part of the semi join or anti
* join is not very common.
* - Reference tables can not be located in the outer part of the left join
* (Note that PostgreSQL converts right joins to left joins. While converting
* join types, innerrel and outerrel are also switched.) Otherwise we will
* definitely have duplicate rows. Beside, reference tables can not be used
* with full outer joins because of the same reason.
*/ */
static DeferredErrorMessage * static DeferredErrorMessage *
DeferredErrorIfUnsupportedRecurringTuplesJoin( DeferredErrorIfUnsupportedRecurringTuplesJoin(
@ -783,31 +775,48 @@ DeferredErrorIfUnsupportedRecurringTuplesJoin(
Relids innerrelRelids = joinRestriction->innerrelRelids; Relids innerrelRelids = joinRestriction->innerrelRelids;
Relids outerrelRelids = joinRestriction->outerrelRelids; Relids outerrelRelids = joinRestriction->outerrelRelids;
if (joinType == JOIN_SEMI || joinType == JOIN_ANTI || joinType == JOIN_LEFT) /*
* This loop aims to determine whether this join is between a recurring
* rel and a non-recurring rel, and if so, whether it can yield an incorrect
* result set due to recurring tuples.
*
* For outer joins, this can only happen if it's a lateral outer join
* where the inner distributed subquery references the recurring outer
* rel. This because, such outer joins should not appear here because
* the recursive planner (RecursivelyPlanRecurringTupleOuterJoinWalker)
* should have already planned the non-recurring side if it wasn't a
* lateral join. For this reason, if the outer join is between a recurring
* rel --on the outer side-- and a non-recurring rel --on the other side--,
* we throw an error assuming that it's a lateral outer join.
* Also note that; in the context of outer joins, we only check left outer
* and full outer joins because PostgreSQL converts right joins to left
* joins before passing them through "set_join_pathlist_hook"s.
*
* For semi / anti joins, we anyway throw an error when the inner
* side is a distributed subquery that references a recurring outer rel
* (in the FROM clause) thanks to DeferErrorIfFromClauseRecurs. And when
* the inner side is a recurring rel and the outer side a non-recurring
* one, then the non-recurring side can't reference the recurring side
* anyway.
*
* For those reasons, here we perform below lateral join checks only for
* outer (except anti) / inner joins but not for anti / semi joins.
*/
if (joinType == JOIN_LEFT)
{ {
/*
* If there are only recurring tuples on the inner side of a join then
* we can push it down, regardless of whether the outer side is
* recurring or not. Otherwise, we check the outer side for recurring
* tuples.
*/
if (RelationInfoContainsOnlyRecurringTuples(plannerInfo, innerrelRelids)) if (RelationInfoContainsOnlyRecurringTuples(plannerInfo, innerrelRelids))
{ {
/* inner side only contains recurring rels */
continue; continue;
} }
/*
* If the outer side of the join doesn't have any distributed tables
* (e.g., contains only recurring tuples), Citus should not pushdown
* the query. The reason is that recurring tuples on every shard would
* be added to the result, which is wrong.
*/
if (RelationInfoContainsOnlyRecurringTuples(plannerInfo, outerrelRelids)) if (RelationInfoContainsOnlyRecurringTuples(plannerInfo, outerrelRelids))
{ {
/* /*
* Find the first (or only) recurring RTE to give a meaningful * Inner side contains distributed rels but the outer side only
* error to the user. * contains recurring rels, must be an unsupported lateral outer
* join.
*/ */
recurType = FetchFirstRecurType(plannerInfo, outerrelRelids); recurType = FetchFirstRecurType(plannerInfo, outerrelRelids);
@ -816,11 +825,6 @@ DeferredErrorIfUnsupportedRecurringTuplesJoin(
} }
else if (joinType == JOIN_FULL) else if (joinType == JOIN_FULL)
{ {
/*
* If one of the outer or inner side contains recurring tuples and the other side
* contains nonrecurring tuples, then duplicate results can exist in the result.
* Thus, Citus should not pushdown the query.
*/
bool innerContainOnlyRecurring = bool innerContainOnlyRecurring =
RelationInfoContainsOnlyRecurringTuples(plannerInfo, innerrelRelids); RelationInfoContainsOnlyRecurringTuples(plannerInfo, innerrelRelids);
bool outerContainOnlyRecurring = bool outerContainOnlyRecurring =
@ -829,8 +833,9 @@ DeferredErrorIfUnsupportedRecurringTuplesJoin(
if (innerContainOnlyRecurring && !outerContainOnlyRecurring) if (innerContainOnlyRecurring && !outerContainOnlyRecurring)
{ {
/* /*
* Find the first (or only) recurring RTE to give a meaningful * Right side contains distributed rels but the left side only
* error to the user. * contains recurring rels, must be an unsupported lateral outer
* join.
*/ */
recurType = FetchFirstRecurType(plannerInfo, innerrelRelids); recurType = FetchFirstRecurType(plannerInfo, innerrelRelids);
@ -840,8 +845,9 @@ DeferredErrorIfUnsupportedRecurringTuplesJoin(
if (!innerContainOnlyRecurring && outerContainOnlyRecurring) if (!innerContainOnlyRecurring && outerContainOnlyRecurring)
{ {
/* /*
* Find the first (or only) recurring RTE to give a meaningful * Left side contains distributed rels but the right side only
* error to the user. * contains recurring rels, must be an unsupported lateral outer
* join.
*/ */
recurType = FetchFirstRecurType(plannerInfo, outerrelRelids); recurType = FetchFirstRecurType(plannerInfo, outerrelRelids);
@ -856,7 +862,7 @@ DeferredErrorIfUnsupportedRecurringTuplesJoin(
* See comment on DeferredErrorIfUnsupportedLateralSubquery for * See comment on DeferredErrorIfUnsupportedLateralSubquery for
* details. * details.
* *
* When planning inner joins postgres can move RTEs from left to * When planning inner joins, postgres can move RTEs from left to
* right and from right to left. So we don't know on which side the * right and from right to left. So we don't know on which side the
* lateral join wil appear. Thus we try to find a side of the join * lateral join wil appear. Thus we try to find a side of the join
* that only contains recurring tuples. And then we check the other * that only contains recurring tuples. And then we check the other
@ -893,41 +899,13 @@ DeferredErrorIfUnsupportedRecurringTuplesJoin(
} }
} }
if (recurType == RECURRING_TUPLES_REFERENCE_TABLE) if (recurType != RECURRING_TUPLES_INVALID)
{ {
char *errmsg = psprintf("cannot perform a lateral outer join when "
"a distributed subquery references %s",
RecurringTypeDescription(recurType));
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"cannot pushdown the subquery", errmsg, NULL, NULL);
"There exist a reference table in the outer "
"part of the outer join", NULL);
}
else if (recurType == RECURRING_TUPLES_FUNCTION)
{
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"cannot pushdown the subquery",
"There exist a table function in the outer "
"part of the outer join", NULL);
}
else if (recurType == RECURRING_TUPLES_EMPTY_JOIN_TREE)
{
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"cannot pushdown the subquery",
"There exist a subquery without FROM in the outer "
"part of the outer join", NULL);
}
else if (recurType == RECURRING_TUPLES_RESULT_FUNCTION)
{
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"cannot pushdown the subquery",
"Complex subqueries, CTEs and local tables cannot be in "
"the outer part of an outer join with a distributed table",
NULL);
}
else if (recurType == RECURRING_TUPLES_VALUES)
{
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"cannot pushdown the subquery",
"There exist a VALUES clause in the outer "
"part of the outer join", NULL);
} }
return NULL; return NULL;

View File

@ -372,9 +372,6 @@ RecursivelyPlanSubqueriesAndCTEs(Query *query, RecursivePlanningContext *context
* side of the outer join. That way, inner rel gets converted into an intermediate * side of the outer join. That way, inner rel gets converted into an intermediate
* result and logical planner can handle the new query since it's of the from * result and logical planner can handle the new query since it's of the from
* "<recurring> LEFT JOIN <recurring>". * "<recurring> LEFT JOIN <recurring>".
*
* See DeferredErrorIfUnsupportedRecurringTuplesJoin for the supported join
* types.
*/ */
if (ShouldRecursivelyPlanOuterJoins(context)) if (ShouldRecursivelyPlanOuterJoins(context))
{ {
@ -719,7 +716,7 @@ RecursivelyPlanRecurringTupleOuterJoinWalker(Node *node, Query *query,
/* /*
* A LEFT JOIN is recurring if the lhs is recurring. * A LEFT JOIN is recurring if the lhs is recurring.
* Note that we should have converted the rhs into a recurring * Note that we might have converted the rhs into a recurring
* one too if the lhs is recurring, but this anyway has no * one too if the lhs is recurring, but this anyway has no
* effects when deciding whether a LEFT JOIN is recurring. * effects when deciding whether a LEFT JOIN is recurring.
*/ */
@ -812,6 +809,11 @@ RecursivelyPlanRecurringTupleOuterJoinWalker(Node *node, Query *query,
* RecursivelyPlanRecurringTupleOuterJoinWalker that recursively plans given * RecursivelyPlanRecurringTupleOuterJoinWalker that recursively plans given
* distributed node that is known to be inner side of an outer join. * distributed node that is known to be inner side of an outer join.
* *
* Fails to do so if the distributed join node references the recurring one.
* In that case, we don't throw an error here but instead we let
* DeferredErrorIfUnsupportedRecurringTuplesJoin to so for a better error
* message.
*
* We call a node "distributed" if it points to a distributed table or a * We call a node "distributed" if it points to a distributed table or a
* more complex object (i.e., a join tree or a subquery) that can be pushed * more complex object (i.e., a join tree or a subquery) that can be pushed
* down to the worker nodes directly. For a join, this means that it's either * down to the worker nodes directly. For a join, this means that it's either
@ -894,7 +896,24 @@ RecursivelyPlanDistributedJoinNode(Node *node, Query *query,
"since it is part of a distributed join node " "since it is part of a distributed join node "
"that is outer joined with a recurring rel"))); "that is outer joined with a recurring rel")));
RecursivelyPlanSubquery(distributedRte->subquery, recursivePlanningContext); bool recursivelyPlanned = RecursivelyPlanSubquery(distributedRte->subquery,
recursivePlanningContext);
if (!recursivelyPlanned)
{
/*
* RecursivelyPlanSubquery fails to plan a subquery only if it
* contains references to the outer query. This means that, we can't
* plan such outer joins (like <recurring LEFT OUTER distributed>)
* if it's a LATERAL join where the distributed side is a subquery that
* references the outer side, as in,
*
* SELECT * FROM reference
* LEFT JOIN LATERAL
* (SELECT * FROM distributed WHERE reference.b > distributed.b) q
* USING (a);
*/
Assert(ContainsReferencesToOuterQuery(distributedRte->subquery));
}
} }
else else
{ {

View File

@ -1839,8 +1839,6 @@ FROM (SELECT SUM(raw_events_second.value_4) AS v4,
ON (f.id = f2.id) ON (f.id = f2.id)
WHERE f.id IN (SELECT user_id WHERE f.id IN (SELECT user_id
FROM raw_events_second)); FROM raw_events_second));
ERROR: cannot pushdown the subquery
DETAIL: There exist a reference table in the outer part of the outer join
-- cannot push down since join is not equi join (f.id > f2.id) -- cannot push down since join is not equi join (f.id > f2.id)
INSERT INTO raw_events_second INSERT INTO raw_events_second
(user_id) (user_id)
@ -1869,8 +1867,6 @@ FROM (SELECT SUM(raw_events_second.value_4) AS v4,
ON (f.id > f2.id) ON (f.id > f2.id)
WHERE f.id IN (SELECT user_id WHERE f.id IN (SELECT user_id
FROM raw_events_second)); FROM raw_events_second));
ERROR: cannot pushdown the subquery
DETAIL: There exist a reference table in the outer part of the outer join
-- we currently not support grouping sets -- we currently not support grouping sets
INSERT INTO agg_events INSERT INTO agg_events
(user_id, (user_id,
@ -1978,7 +1974,7 @@ INSERT INTO raw_events_first (user_id, time, value_1, value_2, value_3, value_4)
SELECT count(*) FROM raw_events_second; SELECT count(*) FROM raw_events_second;
count count
--------------------------------------------------------------------- ---------------------------------------------------------------------
36 45
(1 row) (1 row)
INSERT INTO raw_events_second SELECT * FROM test_view; INSERT INTO raw_events_second SELECT * FROM test_view;
@ -1988,7 +1984,7 @@ INSERT INTO raw_events_second SELECT * FROM test_view WHERE user_id = 17 GROUP B
SELECT count(*) FROM raw_events_second; SELECT count(*) FROM raw_events_second;
count count
--------------------------------------------------------------------- ---------------------------------------------------------------------
38 47
(1 row) (1 row)
-- intermediate results (CTEs) should be allowed when doing INSERT...SELECT within a CTE -- intermediate results (CTEs) should be allowed when doing INSERT...SELECT within a CTE

View File

@ -1839,8 +1839,6 @@ FROM (SELECT SUM(raw_events_second.value_4) AS v4,
ON (f.id = f2.id) ON (f.id = f2.id)
WHERE f.id IN (SELECT user_id WHERE f.id IN (SELECT user_id
FROM raw_events_second)); FROM raw_events_second));
ERROR: cannot pushdown the subquery
DETAIL: There exist a reference table in the outer part of the outer join
-- cannot push down since join is not equi join (f.id > f2.id) -- cannot push down since join is not equi join (f.id > f2.id)
INSERT INTO raw_events_second INSERT INTO raw_events_second
(user_id) (user_id)
@ -1869,8 +1867,6 @@ FROM (SELECT SUM(raw_events_second.value_4) AS v4,
ON (f.id > f2.id) ON (f.id > f2.id)
WHERE f.id IN (SELECT user_id WHERE f.id IN (SELECT user_id
FROM raw_events_second)); FROM raw_events_second));
ERROR: cannot pushdown the subquery
DETAIL: There exist a reference table in the outer part of the outer join
-- we currently not support grouping sets -- we currently not support grouping sets
INSERT INTO agg_events INSERT INTO agg_events
(user_id, (user_id,
@ -1978,7 +1974,7 @@ INSERT INTO raw_events_first (user_id, time, value_1, value_2, value_3, value_4)
SELECT count(*) FROM raw_events_second; SELECT count(*) FROM raw_events_second;
count count
--------------------------------------------------------------------- ---------------------------------------------------------------------
36 45
(1 row) (1 row)
INSERT INTO raw_events_second SELECT * FROM test_view; INSERT INTO raw_events_second SELECT * FROM test_view;
@ -1988,7 +1984,7 @@ INSERT INTO raw_events_second SELECT * FROM test_view WHERE user_id = 17 GROUP B
SELECT count(*) FROM raw_events_second; SELECT count(*) FROM raw_events_second;
count count
--------------------------------------------------------------------- ---------------------------------------------------------------------
38 47
(1 row) (1 row)
-- intermediate results (CTEs) should be allowed when doing INSERT...SELECT within a CTE -- intermediate results (CTEs) should be allowed when doing INSERT...SELECT within a CTE

View File

@ -489,8 +489,14 @@ SELECT * FROM
SELECT user_id FROM user_buy_test_table) sub SELECT user_id FROM user_buy_test_table) sub
ORDER BY 1 DESC; ORDER BY 1 DESC;
DEBUG: Router planner cannot handle multi-shard select queries DEBUG: Router planner cannot handle multi-shard select queries
ERROR: cannot pushdown the subquery user_id
DETAIL: There exist a reference table in the outer part of the outer join ---------------------------------------------------------------------
7
3
2
1
(4 rows)
SELECT * FROM SELECT * FROM
(SELECT user_id FROM users_ref_test_table ref JOIN user_buy_test_table dis (SELECT user_id FROM users_ref_test_table ref JOIN user_buy_test_table dis
on (ref.id = dis.user_id) on (ref.id = dis.user_id)

View File

@ -361,8 +361,7 @@ SELECT user_id, value_2 FROM users_table WHERE
HAVING sum(submit_card_info) > 0 HAVING sum(submit_card_info) > 0
) )
ORDER BY 1, 2; ORDER BY 1, 2;
ERROR: cannot pushdown the subquery ERROR: cannot perform a lateral outer join when a distributed subquery references a reference table
DETAIL: There exist a reference table in the outer part of the outer join
-- non-partition key equality with reference table -- non-partition key equality with reference table
SELECT SELECT
user_id, count(*) user_id, count(*)