diff --git a/src/backend/distributed/planner/query_pushdown_planning.c b/src/backend/distributed/planner/query_pushdown_planning.c index 7dca6579f..5cae19497 100644 --- a/src/backend/distributed/planner/query_pushdown_planning.c +++ b/src/backend/distributed/planner/query_pushdown_planning.c @@ -105,6 +105,7 @@ static List * CreateSubqueryTargetListAndAdjustVars(List *columnList); static AttrNumber FindResnoForVarInTargetList(List *targetList, int varno, int varattno); static bool RelationInfoContainsOnlyRecurringTuples(PlannerInfo *plannerInfo, Relids relids); +static char * RecurringTypeDescription(RecurringTuplesType recurType); static DeferredErrorMessage * DeferredErrorIfUnsupportedLateralSubquery( PlannerInfo *plannerInfo, Relids recurringRelIds, Relids nonRecurringRelIds); static Var * PartitionColumnForPushedDownSubquery(Query *query); @@ -603,7 +604,6 @@ DeferErrorIfUnsupportedSubqueryPushdown(Query *originalQuery, return error; } - /* we shouldn't allow reference tables in the outer part of outer joins */ error = DeferredErrorIfUnsupportedRecurringTuplesJoin(plannerRestrictionContext); if (error) { @@ -643,7 +643,8 @@ DeferErrorIfUnsupportedSubqueryPushdown(Query *originalQuery, * sublinks into joins. * * In some cases, sublinks are pulled up and converted into outer joins. Those cases - * are already handled with DeferredErrorIfUnsupportedRecurringTuplesJoin(). + * are already handled with RecursivelyPlanRecurringTupleOuterJoinWalker() or thrown + * an error for in DeferredErrorIfUnsupportedRecurringTuplesJoin(). * * If the sublinks are not pulled up, we should still error out in if the expression * in the FROM clause would recur for every shard in a subquery on the WHERE clause. @@ -751,20 +752,11 @@ FromClauseRecurringTupleType(Query *queryTree) /* - * DeferredErrorIfUnsupportedRecurringTuplesJoin returns true if there exists a outer join - * between reference table and distributed tables which does not follow - * the rules : - * - Reference tables can not be located in the outer part of the semi join or the - * anti join. Otherwise, we may have duplicate results. Although getting duplicate - * results is not possible by checking the equality on the column of the reference - * table and partition column of distributed table, we still keep these checks. - * Because, using the reference table in the outer part of the semi join or anti - * join is not very common. - * - Reference tables can not be located in the outer part of the left join - * (Note that PostgreSQL converts right joins to left joins. While converting - * join types, innerrel and outerrel are also switched.) Otherwise we will - * definitely have duplicate rows. Beside, reference tables can not be used - * with full outer joins because of the same reason. + * DeferredErrorIfUnsupportedRecurringTuplesJoin returns a DeferredError if + * there exists a join between a recurring rel (such as reference tables + * and intermediate_results) and a non-recurring rel (such as distributed tables + * and subqueries that we can push-down to worker nodes) that can return an + * incorrect result set due to recurring tuples coming from the recurring rel. */ static DeferredErrorMessage * DeferredErrorIfUnsupportedRecurringTuplesJoin( @@ -783,31 +775,48 @@ DeferredErrorIfUnsupportedRecurringTuplesJoin( Relids innerrelRelids = joinRestriction->innerrelRelids; Relids outerrelRelids = joinRestriction->outerrelRelids; - if (joinType == JOIN_SEMI || joinType == JOIN_ANTI || joinType == JOIN_LEFT) + /* + * This loop aims to determine whether this join is between a recurring + * rel and a non-recurring rel, and if so, whether it can yield an incorrect + * result set due to recurring tuples. + * + * For outer joins, this can only happen if it's a lateral outer join + * where the inner distributed subquery references the recurring outer + * rel. This because, such outer joins should not appear here because + * the recursive planner (RecursivelyPlanRecurringTupleOuterJoinWalker) + * should have already planned the non-recurring side if it wasn't a + * lateral join. For this reason, if the outer join is between a recurring + * rel --on the outer side-- and a non-recurring rel --on the other side--, + * we throw an error assuming that it's a lateral outer join. + * Also note that; in the context of outer joins, we only check left outer + * and full outer joins because PostgreSQL converts right joins to left + * joins before passing them through "set_join_pathlist_hook"s. + * + * For semi / anti joins, we anyway throw an error when the inner + * side is a distributed subquery that references a recurring outer rel + * (in the FROM clause) thanks to DeferErrorIfFromClauseRecurs. And when + * the inner side is a recurring rel and the outer side a non-recurring + * one, then the non-recurring side can't reference the recurring side + * anyway. + * + * For those reasons, here we perform below lateral join checks only for + * outer (except anti) / inner joins but not for anti / semi joins. + */ + + if (joinType == JOIN_LEFT) { - /* - * If there are only recurring tuples on the inner side of a join then - * we can push it down, regardless of whether the outer side is - * recurring or not. Otherwise, we check the outer side for recurring - * tuples. - */ if (RelationInfoContainsOnlyRecurringTuples(plannerInfo, innerrelRelids)) { + /* inner side only contains recurring rels */ continue; } - - /* - * If the outer side of the join doesn't have any distributed tables - * (e.g., contains only recurring tuples), Citus should not pushdown - * the query. The reason is that recurring tuples on every shard would - * be added to the result, which is wrong. - */ if (RelationInfoContainsOnlyRecurringTuples(plannerInfo, outerrelRelids)) { /* - * Find the first (or only) recurring RTE to give a meaningful - * error to the user. + * Inner side contains distributed rels but the outer side only + * contains recurring rels, must be an unsupported lateral outer + * join. */ recurType = FetchFirstRecurType(plannerInfo, outerrelRelids); @@ -816,11 +825,6 @@ DeferredErrorIfUnsupportedRecurringTuplesJoin( } else if (joinType == JOIN_FULL) { - /* - * If one of the outer or inner side contains recurring tuples and the other side - * contains nonrecurring tuples, then duplicate results can exist in the result. - * Thus, Citus should not pushdown the query. - */ bool innerContainOnlyRecurring = RelationInfoContainsOnlyRecurringTuples(plannerInfo, innerrelRelids); bool outerContainOnlyRecurring = @@ -829,8 +833,9 @@ DeferredErrorIfUnsupportedRecurringTuplesJoin( if (innerContainOnlyRecurring && !outerContainOnlyRecurring) { /* - * Find the first (or only) recurring RTE to give a meaningful - * error to the user. + * Right side contains distributed rels but the left side only + * contains recurring rels, must be an unsupported lateral outer + * join. */ recurType = FetchFirstRecurType(plannerInfo, innerrelRelids); @@ -840,8 +845,9 @@ DeferredErrorIfUnsupportedRecurringTuplesJoin( if (!innerContainOnlyRecurring && outerContainOnlyRecurring) { /* - * Find the first (or only) recurring RTE to give a meaningful - * error to the user. + * Left side contains distributed rels but the right side only + * contains recurring rels, must be an unsupported lateral outer + * join. */ recurType = FetchFirstRecurType(plannerInfo, outerrelRelids); @@ -856,7 +862,7 @@ DeferredErrorIfUnsupportedRecurringTuplesJoin( * See comment on DeferredErrorIfUnsupportedLateralSubquery for * details. * - * When planning inner joins postgres can move RTEs from left to + * When planning inner joins, postgres can move RTEs from left to * right and from right to left. So we don't know on which side the * lateral join wil appear. Thus we try to find a side of the join * that only contains recurring tuples. And then we check the other @@ -893,41 +899,13 @@ DeferredErrorIfUnsupportedRecurringTuplesJoin( } } - if (recurType == RECURRING_TUPLES_REFERENCE_TABLE) + if (recurType != RECURRING_TUPLES_INVALID) { + char *errmsg = psprintf("cannot perform a lateral outer join when " + "a distributed subquery references %s", + RecurringTypeDescription(recurType)); return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, - "cannot pushdown the subquery", - "There exist a reference table in the outer " - "part of the outer join", NULL); - } - else if (recurType == RECURRING_TUPLES_FUNCTION) - { - return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, - "cannot pushdown the subquery", - "There exist a table function in the outer " - "part of the outer join", NULL); - } - else if (recurType == RECURRING_TUPLES_EMPTY_JOIN_TREE) - { - return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, - "cannot pushdown the subquery", - "There exist a subquery without FROM in the outer " - "part of the outer join", NULL); - } - else if (recurType == RECURRING_TUPLES_RESULT_FUNCTION) - { - return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, - "cannot pushdown the subquery", - "Complex subqueries, CTEs and local tables cannot be in " - "the outer part of an outer join with a distributed table", - NULL); - } - else if (recurType == RECURRING_TUPLES_VALUES) - { - return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, - "cannot pushdown the subquery", - "There exist a VALUES clause in the outer " - "part of the outer join", NULL); + errmsg, NULL, NULL); } return NULL; diff --git a/src/backend/distributed/planner/recursive_planning.c b/src/backend/distributed/planner/recursive_planning.c index b74b59de3..f540ae7aa 100644 --- a/src/backend/distributed/planner/recursive_planning.c +++ b/src/backend/distributed/planner/recursive_planning.c @@ -372,9 +372,6 @@ RecursivelyPlanSubqueriesAndCTEs(Query *query, RecursivePlanningContext *context * side of the outer join. That way, inner rel gets converted into an intermediate * result and logical planner can handle the new query since it's of the from * " LEFT JOIN ". - * - * See DeferredErrorIfUnsupportedRecurringTuplesJoin for the supported join - * types. */ if (ShouldRecursivelyPlanOuterJoins(context)) { @@ -719,7 +716,7 @@ RecursivelyPlanRecurringTupleOuterJoinWalker(Node *node, Query *query, /* * A LEFT JOIN is recurring if the lhs is recurring. - * Note that we should have converted the rhs into a recurring + * Note that we might have converted the rhs into a recurring * one too if the lhs is recurring, but this anyway has no * effects when deciding whether a LEFT JOIN is recurring. */ @@ -812,6 +809,11 @@ RecursivelyPlanRecurringTupleOuterJoinWalker(Node *node, Query *query, * RecursivelyPlanRecurringTupleOuterJoinWalker that recursively plans given * distributed node that is known to be inner side of an outer join. * + * Fails to do so if the distributed join node references the recurring one. + * In that case, we don't throw an error here but instead we let + * DeferredErrorIfUnsupportedRecurringTuplesJoin to so for a better error + * message. + * * We call a node "distributed" if it points to a distributed table or a * more complex object (i.e., a join tree or a subquery) that can be pushed * down to the worker nodes directly. For a join, this means that it's either @@ -894,7 +896,24 @@ RecursivelyPlanDistributedJoinNode(Node *node, Query *query, "since it is part of a distributed join node " "that is outer joined with a recurring rel"))); - RecursivelyPlanSubquery(distributedRte->subquery, recursivePlanningContext); + bool recursivelyPlanned = RecursivelyPlanSubquery(distributedRte->subquery, + recursivePlanningContext); + if (!recursivelyPlanned) + { + /* + * RecursivelyPlanSubquery fails to plan a subquery only if it + * contains references to the outer query. This means that, we can't + * plan such outer joins (like ) + * if it's a LATERAL join where the distributed side is a subquery that + * references the outer side, as in, + * + * SELECT * FROM reference + * LEFT JOIN LATERAL + * (SELECT * FROM distributed WHERE reference.b > distributed.b) q + * USING (a); + */ + Assert(ContainsReferencesToOuterQuery(distributedRte->subquery)); + } } else { diff --git a/src/test/regress/expected/multi_insert_select.out b/src/test/regress/expected/multi_insert_select.out index b1bf7a5e6..1dd326ccc 100644 --- a/src/test/regress/expected/multi_insert_select.out +++ b/src/test/regress/expected/multi_insert_select.out @@ -1839,8 +1839,6 @@ FROM (SELECT SUM(raw_events_second.value_4) AS v4, ON (f.id = f2.id) WHERE f.id IN (SELECT user_id FROM raw_events_second)); -ERROR: cannot pushdown the subquery -DETAIL: There exist a reference table in the outer part of the outer join -- cannot push down since join is not equi join (f.id > f2.id) INSERT INTO raw_events_second (user_id) @@ -1869,8 +1867,6 @@ FROM (SELECT SUM(raw_events_second.value_4) AS v4, ON (f.id > f2.id) WHERE f.id IN (SELECT user_id FROM raw_events_second)); -ERROR: cannot pushdown the subquery -DETAIL: There exist a reference table in the outer part of the outer join -- we currently not support grouping sets INSERT INTO agg_events (user_id, @@ -1978,7 +1974,7 @@ INSERT INTO raw_events_first (user_id, time, value_1, value_2, value_3, value_4) SELECT count(*) FROM raw_events_second; count --------------------------------------------------------------------- - 36 + 45 (1 row) INSERT INTO raw_events_second SELECT * FROM test_view; @@ -1988,7 +1984,7 @@ INSERT INTO raw_events_second SELECT * FROM test_view WHERE user_id = 17 GROUP B SELECT count(*) FROM raw_events_second; count --------------------------------------------------------------------- - 38 + 47 (1 row) -- intermediate results (CTEs) should be allowed when doing INSERT...SELECT within a CTE diff --git a/src/test/regress/expected/multi_insert_select_0.out b/src/test/regress/expected/multi_insert_select_0.out index 2947f8be0..edf2f19ed 100644 --- a/src/test/regress/expected/multi_insert_select_0.out +++ b/src/test/regress/expected/multi_insert_select_0.out @@ -1839,8 +1839,6 @@ FROM (SELECT SUM(raw_events_second.value_4) AS v4, ON (f.id = f2.id) WHERE f.id IN (SELECT user_id FROM raw_events_second)); -ERROR: cannot pushdown the subquery -DETAIL: There exist a reference table in the outer part of the outer join -- cannot push down since join is not equi join (f.id > f2.id) INSERT INTO raw_events_second (user_id) @@ -1869,8 +1867,6 @@ FROM (SELECT SUM(raw_events_second.value_4) AS v4, ON (f.id > f2.id) WHERE f.id IN (SELECT user_id FROM raw_events_second)); -ERROR: cannot pushdown the subquery -DETAIL: There exist a reference table in the outer part of the outer join -- we currently not support grouping sets INSERT INTO agg_events (user_id, @@ -1978,7 +1974,7 @@ INSERT INTO raw_events_first (user_id, time, value_1, value_2, value_3, value_4) SELECT count(*) FROM raw_events_second; count --------------------------------------------------------------------- - 36 + 45 (1 row) INSERT INTO raw_events_second SELECT * FROM test_view; @@ -1988,7 +1984,7 @@ INSERT INTO raw_events_second SELECT * FROM test_view WHERE user_id = 17 GROUP B SELECT count(*) FROM raw_events_second; count --------------------------------------------------------------------- - 38 + 47 (1 row) -- intermediate results (CTEs) should be allowed when doing INSERT...SELECT within a CTE diff --git a/src/test/regress/expected/multi_subquery_complex_reference_clause.out b/src/test/regress/expected/multi_subquery_complex_reference_clause.out index 6fe7c1570..7db33834f 100644 --- a/src/test/regress/expected/multi_subquery_complex_reference_clause.out +++ b/src/test/regress/expected/multi_subquery_complex_reference_clause.out @@ -489,8 +489,14 @@ SELECT * FROM SELECT user_id FROM user_buy_test_table) sub ORDER BY 1 DESC; DEBUG: Router planner cannot handle multi-shard select queries -ERROR: cannot pushdown the subquery -DETAIL: There exist a reference table in the outer part of the outer join + user_id +--------------------------------------------------------------------- + 7 + 3 + 2 + 1 +(4 rows) + SELECT * FROM (SELECT user_id FROM users_ref_test_table ref JOIN user_buy_test_table dis on (ref.id = dis.user_id) diff --git a/src/test/regress/expected/multi_subquery_in_where_reference_clause.out b/src/test/regress/expected/multi_subquery_in_where_reference_clause.out index d4ada83d3..52cbe3917 100644 --- a/src/test/regress/expected/multi_subquery_in_where_reference_clause.out +++ b/src/test/regress/expected/multi_subquery_in_where_reference_clause.out @@ -361,8 +361,7 @@ SELECT user_id, value_2 FROM users_table WHERE HAVING sum(submit_card_info) > 0 ) ORDER BY 1, 2; -ERROR: cannot pushdown the subquery -DETAIL: There exist a reference table in the outer part of the outer join +ERROR: cannot perform a lateral outer join when a distributed subquery references a reference table -- non-partition key equality with reference table SELECT user_id, count(*)