From 222f11e23e892463b75166daba9a514c7c90eba5 Mon Sep 17 00:00:00 2001 From: aykutbozkurt Date: Mon, 16 Jan 2023 20:35:37 +0300 Subject: [PATCH] use planner restriction context * to fetch join and base restrictions properly, * to push down restrictions properly. --- .../distributed/planner/distributed_planner.c | 100 ++++-- .../planner/insert_select_planner.c | 68 +++- .../distributed/planner/multi_join_order.c | 213 ++++++++---- .../planner/multi_logical_optimizer.c | 90 ++--- .../planner/multi_logical_planner.c | 317 +++++++++++------- .../planner/query_pushdown_planning.c | 14 +- src/include/distributed/distributed_planner.h | 5 + .../distributed/insert_select_planner.h | 2 + src/include/distributed/multi_join_order.h | 15 +- .../distributed/multi_logical_planner.h | 21 +- src/test/regress/expected/ch_bench_having.out | 9 +- .../expected/chbenchmark_all_queries.out | 10 +- src/test/regress/expected/cross_join.out | 2 + .../expected/multi_join_order_additional.out | 4 +- .../expected/multi_repartition_join_ref.out | 10 +- .../expected/non_colocated_outer_joins.out | 92 ++++- .../expected/single_hash_repartition_join.out | 36 +- src/test/regress/sql/cross_join.sql | 2 + .../regress/sql/non_colocated_outer_joins.sql | 37 ++ 19 files changed, 720 insertions(+), 327 deletions(-) diff --git a/src/backend/distributed/planner/distributed_planner.c b/src/backend/distributed/planner/distributed_planner.c index 701ae4ff5..6cb6118b4 100644 --- a/src/backend/distributed/planner/distributed_planner.c +++ b/src/backend/distributed/planner/distributed_planner.c @@ -59,6 +59,7 @@ #include "optimizer/optimizer.h" #include "optimizer/plancat.h" #include "optimizer/pathnode.h" +#include "optimizer/paths.h" #include "optimizer/planner.h" #include "optimizer/planmain.h" #include "utils/builtins.h" @@ -126,7 +127,8 @@ static void PopPlannerRestrictionContext(void); static void ResetPlannerRestrictionContext( PlannerRestrictionContext *plannerRestrictionContext); static PlannedStmt * PlanFastPathDistributedStmt(DistributedPlanningContext *planContext, - Node *distributionKeyValue); + Node *distributionKeyValue, int + fastPathRelId); static PlannedStmt * PlanDistributedStmt(DistributedPlanningContext *planContext, int rteIdCounter); static RTEListProperties * GetRTEListProperties(List *rangeTableList); @@ -144,6 +146,7 @@ distributed_planner(Query *parse, bool needsDistributedPlanning = false; bool fastPathRouterQuery = false; Node *distributionKeyValue = NULL; + int fastPathRelId = InvalidOid; List *rangeTableList = ExtractRangeTableEntryList(parse); @@ -247,7 +250,11 @@ distributed_planner(Query *parse, { if (fastPathRouterQuery) { - result = PlanFastPathDistributedStmt(&planContext, distributionKeyValue); + RangeTblEntry *rangeTableEntry = (RangeTblEntry *) linitial(parse->rtable); + fastPathRelId = rangeTableEntry->relid; + + result = PlanFastPathDistributedStmt(&planContext, distributionKeyValue, + fastPathRelId); } else { @@ -686,7 +693,7 @@ IsUpdateOrDelete(Query *query) */ static PlannedStmt * PlanFastPathDistributedStmt(DistributedPlanningContext *planContext, - Node *distributionKeyValue) + Node *distributionKeyValue, int fastPathRelId) { FastPathRestrictionContext *fastPathContext = planContext->plannerRestrictionContext->fastPathRestrictionContext; @@ -694,6 +701,9 @@ PlanFastPathDistributedStmt(DistributedPlanningContext *planContext, planContext->plannerRestrictionContext->fastPathRestrictionContext-> fastPathRouterQuery = true; + planContext->plannerRestrictionContext->fastPathRestrictionContext-> + distRelId = fastPathRelId; + if (distributionKeyValue == NULL) { /* nothing to record */ @@ -710,6 +720,8 @@ PlanFastPathDistributedStmt(DistributedPlanningContext *planContext, planContext->plan = FastPathPlanner(planContext->originalQuery, planContext->query, planContext->boundParams); + RelabelPlannerRestrictionContext(planContext->plannerRestrictionContext); + return CreateDistributedPlannedStmt(planContext); } @@ -955,6 +967,42 @@ TryCreateDistributedPlannedStmt(PlannedStmt *localPlan, } +/* + * ReplanAfterQueryModification replans modified originalquery to update plannercontext + * properly. Returns modified query. + */ +Query * +ReplanAfterQueryModification(Query *originalQuery, ParamListInfo boundParams) +{ + Query *newQuery = copyObject(originalQuery); + bool setPartitionedTablesInherited = false; + PlannerRestrictionContext *currentPlannerRestrictionContext = + CurrentPlannerRestrictionContext(); + + /* reset the current planner restrictions context */ + ResetPlannerRestrictionContext(currentPlannerRestrictionContext); + + /* + * We force standard_planner to treat partitioned tables as regular tables + * by clearing the inh flag on RTEs. We already did this at the start of + * distributed_planner, but on a copy of the original query, so we need + * to do it again here. + */ + AdjustPartitioningForDistributedPlanning(ExtractRangeTableEntryList(newQuery), + setPartitionedTablesInherited); + + /* + * Some relations may have been removed from the query, but we can skip + * AssignRTEIdentities since we currently do not rely on RTE identities + * being contiguous. + */ + + standard_planner(newQuery, NULL, 0, boundParams); + + return newQuery; +} + + /* * CreateDistributedPlan generates a distributed plan for a query. * It goes through 3 steps: @@ -1119,30 +1167,7 @@ CreateDistributedPlan(uint64 planId, bool allowRecursivePlanning, Query *origina "joined on their distribution columns"))); } - Query *newQuery = copyObject(originalQuery); - bool setPartitionedTablesInherited = false; - PlannerRestrictionContext *currentPlannerRestrictionContext = - CurrentPlannerRestrictionContext(); - - /* reset the current planner restrictions context */ - ResetPlannerRestrictionContext(currentPlannerRestrictionContext); - - /* - * We force standard_planner to treat partitioned tables as regular tables - * by clearing the inh flag on RTEs. We already did this at the start of - * distributed_planner, but on a copy of the original query, so we need - * to do it again here. - */ - AdjustPartitioningForDistributedPlanning(ExtractRangeTableEntryList(newQuery), - setPartitionedTablesInherited); - - /* - * Some relations may have been removed from the query, but we can skip - * AssignRTEIdentities since we currently do not rely on RTE identities - * being contiguous. - */ - - standard_planner(newQuery, NULL, 0, boundParams); + Query *newQuery = ReplanAfterQueryModification(originalQuery, boundParams); /* overwrite the old transformed query with the new transformed query */ *query = *newQuery; @@ -1874,6 +1899,27 @@ multi_join_restriction_hook(PlannerInfo *root, joinRestriction->innerrelRelids = bms_copy(innerrel->relids); joinRestriction->outerrelRelids = bms_copy(outerrel->relids); + Relids joinrelids = bms_union(innerrel->relids, outerrel->relids); + List *prevVals = NIL; + EquivalenceClass *eqclass = NULL; + foreach_ptr(eqclass, root->eq_classes) + { + prevVals = lappend_int(prevVals, eqclass->ec_has_const); + eqclass->ec_has_const = false; + } + joinRestrictionContext->generatedEcJoinRestrictInfoList = + generate_join_implied_equalities( + root, + joinrelids, + outerrel->relids, + innerrel); + int i; + for (i = 0; i < list_length(prevVals); i++) + { + EquivalenceClass *eqClass = list_nth(root->eq_classes, i); + eqClass->ec_has_const = list_nth_int(prevVals, i); + } + joinRestrictionContext->joinRestrictionList = lappend(joinRestrictionContext->joinRestrictionList, joinRestriction); diff --git a/src/backend/distributed/planner/insert_select_planner.c b/src/backend/distributed/planner/insert_select_planner.c index 21fd13800..e7c5f3508 100644 --- a/src/backend/distributed/planner/insert_select_planner.c +++ b/src/backend/distributed/planner/insert_select_planner.c @@ -83,7 +83,9 @@ static DeferredErrorMessage * InsertPartitionColumnMatchesSelect(Query *query, Oid * selectPartitionColumnTableId); static DistributedPlan * CreateNonPushableInsertSelectPlan(uint64 planId, Query *parse, - ParamListInfo boundParams); + ParamListInfo boundParams, + PlannerRestrictionContext * + plannerRestrictionContext); static DeferredErrorMessage * NonPushableInsertSelectSupported(Query *insertSelectQuery); static Query * WrapSubquery(Query *subquery); static void RelabelTargetEntryList(List *selectTargetList, List *insertTargetList); @@ -253,7 +255,8 @@ CreateInsertSelectPlanInternal(uint64 planId, Query *originalQuery, * repartitioning. */ distributedPlan = CreateNonPushableInsertSelectPlan(planId, originalQuery, - boundParams); + boundParams, + plannerRestrictionContext); } return distributedPlan; @@ -360,6 +363,62 @@ CreateDistributedInsertSelectPlan(Query *originalQuery, } +/* + * RelabelPlannerRestrictionContext relabels var nos inside restriction infos to 1. + */ +void +RelabelPlannerRestrictionContext(PlannerRestrictionContext *plannerRestrictionContext) +{ + List *relationRestrictionList = + plannerRestrictionContext->relationRestrictionContext->relationRestrictionList; + + if (plannerRestrictionContext->fastPathRestrictionContext && + plannerRestrictionContext->fastPathRestrictionContext->distributionKeyValue) + { + Const *distKeyVal = + plannerRestrictionContext->fastPathRestrictionContext->distributionKeyValue; + Var *partitionColumn = PartitionColumn( + plannerRestrictionContext->fastPathRestrictionContext->distRelId, 1); + OpExpr *partitionExpression = MakeOpExpression(partitionColumn, + BTEqualStrategyNumber); + Node *rightOp = get_rightop((Expr *) partitionExpression); + Const *rightConst = (Const *) rightOp; + *rightConst = *distKeyVal; + + RestrictInfo *fastpathRestrictInfo = makeNode(RestrictInfo); + fastpathRestrictInfo->can_join = false; + fastpathRestrictInfo->is_pushed_down = true; + fastpathRestrictInfo->clause = (Expr *) partitionExpression; + + RelationRestriction *relationRestriction = palloc0(sizeof(RelationRestriction)); + relationRestriction->relOptInfo = palloc0(sizeof(RelOptInfo)); + relationRestriction->relOptInfo->baserestrictinfo = list_make1( + fastpathRestrictInfo); + plannerRestrictionContext->relationRestrictionContext->relationRestrictionList = + list_make1(relationRestriction); + return; + } + + RelationRestriction *relationRestriction = NULL; + foreach_ptr(relationRestriction, relationRestrictionList) + { + RelOptInfo *relOptInfo = relationRestriction->relOptInfo; + List *baseRestrictInfoList = relOptInfo->baserestrictinfo; + RestrictInfo *baseRestrictInfo = NULL; + foreach_ptr(baseRestrictInfo, baseRestrictInfoList) + { + List *varList = pull_var_clause_default((Node *) baseRestrictInfo->clause); + Var *var = NULL; + foreach_ptr(var, varList) + { + var->varno = 1; + var->varnosyn = 1; + } + } + } +} + + /* * CreateInsertSelectIntoLocalTablePlan creates the plan for INSERT .. SELECT queries * where the selected table is distributed and the inserted table is not. @@ -384,6 +443,8 @@ CreateInsertSelectIntoLocalTablePlan(uint64 planId, Query *insertSelectQuery, /* get the SELECT query (may have changed after PrepareInsertSelectForCitusPlanner) */ Query *selectQuery = selectRte->subquery; + RelabelPlannerRestrictionContext(plannerRestrictionContext); + bool allowRecursivePlanning = true; DistributedPlan *distPlan = CreateDistributedPlan(planId, allowRecursivePlanning, selectQuery, @@ -1476,7 +1537,8 @@ InsertPartitionColumnMatchesSelect(Query *query, RangeTblEntry *insertRte, * distributed table. The query plan can also be executed on a worker in MX. */ static DistributedPlan * -CreateNonPushableInsertSelectPlan(uint64 planId, Query *parse, ParamListInfo boundParams) +CreateNonPushableInsertSelectPlan(uint64 planId, Query *parse, ParamListInfo boundParams, + PlannerRestrictionContext *plannerRestrictionContext) { Query *insertSelectQuery = copyObject(parse); diff --git a/src/backend/distributed/planner/multi_join_order.c b/src/backend/distributed/planner/multi_join_order.c index 5282b250c..ac86014b0 100644 --- a/src/backend/distributed/planner/multi_join_order.c +++ b/src/backend/distributed/planner/multi_join_order.c @@ -25,19 +25,22 @@ #include "distributed/metadata_cache.h" #include "distributed/multi_join_order.h" #include "distributed/multi_physical_planner.h" +#include "distributed/multi_router_planner.h" #include "distributed/pg_dist_partition.h" +#include "distributed/shard_pruning.h" #include "distributed/worker_protocol.h" #include "lib/stringinfo.h" #include "optimizer/optimizer.h" #include "utils/builtins.h" #include "nodes/nodeFuncs.h" +#include "nodes/nodes.h" +#include "nodes/pathnodes.h" #include "utils/builtins.h" #include "utils/datum.h" #include "utils/lsyscache.h" #include "utils/rel.h" #include "utils/syscache.h" - /* Config variables managed via guc.c */ bool LogMultiJoinOrder = false; /* print join order as a debugging aid */ bool EnableSingleHashRepartitioning = false; @@ -55,7 +58,8 @@ static RuleEvalFunction RuleEvalFunctionArray[JOIN_RULE_LAST] = { 0 }; /* join r /* Local functions forward declarations */ static bool JoinExprListWalker(Node *node, List **joinList); static List * JoinOrderForTable(TableEntry *firstTable, List *tableEntryList, - List *joinClauseList); + List *joinRestrictInfoListList, + List *generatedEcJoinClauseList); static List * BestJoinOrder(List *candidateJoinOrders); static List * FewestOfJoinRuleType(List *candidateJoinOrders, JoinRuleType ruleType); static uint32 JoinRuleTypeCount(List *joinOrder, JoinRuleType ruleTypeToCount); @@ -65,27 +69,39 @@ static uint32 LargeDataTransferLocation(List *joinOrder); static List * TableEntryListDifference(List *lhsTableList, List *rhsTableList); static bool ConvertSemiToInnerInJoinInfoContext(JoinInfoContext *joinOrderContext); static bool JoinInfoContextHasAntiJoin(JoinInfoContext *joinOrderContext); +static List * FindJoinClauseForTables(List *joinRestrictInfoListList, + List *generatedEcJoinClauseList, + List *lhsTableIdList, + uint32 rhsTableId, + JoinType joinType); static const char * JoinTypeName(JoinType jointype); +static List * ExtractPushdownJoinRestrictInfos(List *joinRestrictInfoList, + RestrictInfo *joinRestrictInfo, JoinType + joinType); /* Local functions forward declarations for join evaluations */ static JoinOrderNode * EvaluateJoinRules(List *joinedTableList, JoinOrderNode *currentJoinNode, TableEntry *candidateTable, - List *joinClauseList, - JoinType joinType, - bool passJoinClauseDirectly); + List *joinRestrictInfoListList, + List *generatedEcJoinClauseList, + JoinType joinType); static List * RangeTableIdList(List *tableList); static TableEntry * TableEntryByRangeTableId(List *tableEntryList, uint32 rangeTableIdx); static RuleEvalFunction JoinRuleEvalFunction(JoinRuleType ruleType); static char * JoinRuleName(JoinRuleType ruleType); -static JoinOrderNode * ReferenceJoin(JoinOrderNode *joinNode, TableEntry *candidateTable, - List *applicableJoinClauses, JoinType joinType); +static JoinOrderNode * ReferenceJoin(JoinOrderNode *joinNode, + TableEntry *candidateTable, + List *applicableJoinClauses, + JoinType joinType); static JoinOrderNode * CartesianProductReferenceJoin(JoinOrderNode *joinNode, TableEntry *candidateTable, List *applicableJoinClauses, JoinType joinType); -static JoinOrderNode * LocalJoin(JoinOrderNode *joinNode, TableEntry *candidateTable, - List *applicableJoinClauses, JoinType joinType); +static JoinOrderNode * LocalJoin(JoinOrderNode *joinNode, + TableEntry *candidateTable, + List *applicableJoinClauses, + JoinType joinType); static bool JoinOnColumns(List *currentPartitionColumnList, Var *candidatePartitionColumn, List *joinClauseList); static JoinOrderNode * SinglePartitionJoin(JoinOrderNode *joinNode, @@ -290,7 +306,8 @@ NodeIsEqualsOpExpr(Node *node) * least amount of data across the network, and returns this join order. */ List * -JoinOrderList(List *tableEntryList, List *joinClauseList) +JoinOrderList(List *tableEntryList, List *joinRestrictInfoListList, + List *generatedEcJoinClauseList, List *pseudoClauseList) { List *candidateJoinOrderList = NIL; ListCell *tableEntryCell = NULL; @@ -301,7 +318,8 @@ JoinOrderList(List *tableEntryList, List *joinClauseList) /* each candidate join order starts with a different table */ List *candidateJoinOrder = JoinOrderForTable(startingTable, tableEntryList, - joinClauseList); + joinRestrictInfoListList, + generatedEcJoinClauseList); if (candidateJoinOrder != NULL) { @@ -349,12 +367,90 @@ TableEntryByRangeTableId(List *tableEntryList, uint32 rangeTableIdx) } +/* + * ExtractPushdownJoinRestrictInfos extracts clauses, which are pushed down and treated like a normal filter, + * inside given join restriction infos. + */ +static List * +ExtractPushdownJoinRestrictInfos(List *restrictInfoListOfJoin, + RestrictInfo *joinRestrictInfo, JoinType joinType) +{ + List *joinFilterRestrictInfoList = NIL; + + /* left and right relids of the join restriction */ + Bitmapset *joinRelids = bms_union(joinRestrictInfo->left_relids, + joinRestrictInfo->right_relids); + + RestrictInfo *restrictInfo = NULL; + foreach_ptr(restrictInfo, restrictInfoListOfJoin) + { + if (!restrictInfo->can_join && + (!IS_OUTER_JOIN(joinType) || RINFO_IS_PUSHED_DOWN(restrictInfo, joinRelids))) + { + joinFilterRestrictInfoList = lappend(joinFilterRestrictInfoList, + restrictInfo); + } + } + + return joinFilterRestrictInfoList; +} + + +/* + * FindJoinClauseForTables finds join clause for given left hand side tables and + * right hand side table. + */ +static List * +FindJoinClauseForTables(List *joinRestrictInfoListList, List *generatedEcJoinClauseList, + List *lhsTableIdList, uint32 rhsTableId, JoinType joinType) +{ + List *joinRestrictInfoList = NIL; + foreach_ptr(joinRestrictInfoList, joinRestrictInfoListList) + { + RestrictInfo *joinRestrictInfo = NULL; + foreach_ptr(joinRestrictInfo, joinRestrictInfoList) + { + Node *restrictClause = (Node *) joinRestrictInfo->clause; + if (joinRestrictInfo->can_join && IsApplicableJoinClause(lhsTableIdList, + rhsTableId, + restrictClause)) + { + List *pushdownFakeRestrictInfoList = ExtractPushdownJoinRestrictInfos( + joinRestrictInfoList, joinRestrictInfo, joinType); + List *nonPushdownRestrictInfoList = list_difference(joinRestrictInfoList, + pushdownFakeRestrictInfoList); + List *nonPushdownRestrictClauseList = + ExtractRestrictClausesFromRestrictionInfoList( + nonPushdownRestrictInfoList); + return nonPushdownRestrictClauseList; + } + } + } + + if (joinType == JOIN_INNER) + { + Node *ecClause = NULL; + foreach_ptr(ecClause, generatedEcJoinClauseList) + { + if (IsApplicableJoinClause(lhsTableIdList, rhsTableId, ecClause)) + { + return list_make1(ecClause); + } + } + } + + return NIL; +} + + /* * FixedJoinOrderList returns the best fixed join order according to * applicable join rules for the nodes in the list. */ List * -FixedJoinOrderList(List *tableEntryList, JoinInfoContext *joinInfoContext) +FixedJoinOrderList(List *tableEntryList, JoinInfoContext *joinInfoContext, + List *joinRestrictInfoListList, List *generatedEcJoinClauseList, + List *pseudoClauseList) { /* we donot support anti joins as ruleutils files cannot deparse JOIN_ANTI */ if (JoinInfoContextHasAntiJoin(joinInfoContext)) @@ -404,13 +500,12 @@ FixedJoinOrderList(List *tableEntryList, JoinInfoContext *joinInfoContext) TableEntry *nextTable = TableEntryByRangeTableId(tableEntryList, joinInfo->rightTableIdx); - bool passJoinClauseDirectly = true; nextJoinNode = EvaluateJoinRules(joinedTableList, currentJoinNode, nextTable, - joinInfo->joinQualifierList, - joinInfo->joinType, - passJoinClauseDirectly); + joinRestrictInfoListList, + generatedEcJoinClauseList, + joinInfo->joinType); if (nextJoinNode == NULL) { @@ -490,7 +585,8 @@ ConvertSemiToInnerInJoinInfoContext(JoinInfoContext *joinOrderContext) * returns this list. */ static List * -JoinOrderForTable(TableEntry *firstTable, List *tableEntryList, List *joinClauseList) +JoinOrderForTable(TableEntry *firstTable, List *tableEntryList, + List *joinRestrictInfoListList, List *generatedEcJoinClauseList) { JoinRuleType firstJoinRule = JOIN_RULE_INVALID_FIRST; int joinedTableCount = 1; @@ -533,13 +629,12 @@ JoinOrderForTable(TableEntry *firstTable, List *tableEntryList, List *joinClause JoinType joinType = JOIN_INNER; /* evaluate all join rules for this pending table */ - bool passJoinClauseDirectly = false; JoinOrderNode *pendingJoinNode = EvaluateJoinRules(joinedTableList, currentJoinNode, pendingTable, - joinClauseList, - joinType, - passJoinClauseDirectly); + joinRestrictInfoListList, + generatedEcJoinClauseList, + joinType); if (pendingJoinNode == NULL) { @@ -872,40 +967,27 @@ TableEntryListDifference(List *lhsTableList, List *rhsTableList) * next table, evaluates different join rules between the two tables, and finds * the best join rule that applies. The function returns the applicable join * order node which includes the join rule and the partition information. - * - * When we have only inner joins, we can commute the joins as we wish and it also - * does not matter if we merge or move join and where clauses. For query trees with - * only inner joins, `joinClauseList` contains join and where clauses combined so that - * we can push down some where clauses which are applicable as join clause, which is - * determined by `ApplicableJoinClauses`. - * When we have at least 1 outer join in a query tree, we cannot commute joins(that is - * why we have `FixedJoinOrderList`) or move join and where clauses as we wish because - * we would have incorrect results. We should pass join and where clauses separately while - * creating tasks. `joinClauseList` contains only join clauses when `passJoinClauseDirectly` - * is set true. */ static JoinOrderNode * EvaluateJoinRules(List *joinedTableList, JoinOrderNode *currentJoinNode, - TableEntry *candidateTable, List *joinClauseList, - JoinType joinType, bool passJoinClauseDirectly) + TableEntry *candidateTable, List *joinRestrictInfoListList, + List *generatedEcJoinClauseList, JoinType joinType) { JoinOrderNode *nextJoinNode = NULL; uint32 lowestValidIndex = JOIN_RULE_INVALID_FIRST + 1; uint32 highestValidIndex = JOIN_RULE_LAST - 1; - List *joinClauses = joinClauseList; - if (!passJoinClauseDirectly) - { - /* - * We first find all applicable join clauses between already joined tables - * and the candidate table. - */ - List *joinedTableIdList = RangeTableIdList(joinedTableList); - uint32 candidateTableId = candidateTable->rangeTableId; - joinClauses = ApplicableJoinClauses(joinedTableIdList, - candidateTableId, - joinClauseList); - } + /* + * We first find all applicable join clauses between already joined tables + * and the candidate table. + */ + List *joinedTableIdList = RangeTableIdList(joinedTableList); + uint32 candidateTableId = candidateTable->rangeTableId; + List *applicableJoinClauseList = FindJoinClauseForTables(joinRestrictInfoListList, + generatedEcJoinClauseList, + joinedTableIdList, + candidateTableId, + joinType); /* we then evaluate all join rules in order */ for (uint32 ruleIndex = lowestValidIndex; ruleIndex <= highestValidIndex; ruleIndex++) @@ -915,7 +997,7 @@ EvaluateJoinRules(List *joinedTableList, JoinOrderNode *currentJoinNode, nextJoinNode = (*ruleEvalFunction)(currentJoinNode, candidateTable, - joinClauses, + applicableJoinClauseList, joinType); /* break after finding the first join rule that applies */ @@ -932,7 +1014,7 @@ EvaluateJoinRules(List *joinedTableList, JoinOrderNode *currentJoinNode, Assert(nextJoinNode != NULL); nextJoinNode->joinType = joinType; - nextJoinNode->joinClauseList = joinClauses; + nextJoinNode->joinClauseList = applicableJoinClauseList; return nextJoinNode; } @@ -1500,27 +1582,30 @@ IsApplicableJoinClause(List *leftTableIdList, uint32 rightTableId, Node *joinCla /* - * ApplicableJoinClauses finds all join clauses that apply between the given - * left table list and the right table, and returns these found join clauses. + * IsApplicableFalseConstantJoinClause returns true if it can find a constant false filter + * which is applied to right table and also at least one of the table in left tables. */ -List * -ApplicableJoinClauses(List *leftTableIdList, uint32 rightTableId, List *joinClauseList) +bool +IsApplicableFalseConstantJoinClause(List *leftTableIdList, uint32 rightTableId, + RestrictInfo *restrictInfo) { - List *applicableJoinClauses = NIL; + /* find whether restrictinfo relids contain right table relid */ + Relids restrictionRelIds = restrictInfo->required_relids; + bool hasRightTable = bms_is_member(rightTableId, restrictionRelIds); - /* make sure joinClauseList contains only join clauses */ - joinClauseList = JoinClauseList(joinClauseList); - - Node *joinClause = NULL; - foreach_ptr(joinClause, joinClauseList) + /* convert left table id list to bitmapset */ + Relids leftTableRelIds = NULL; + int leftTableId = -1; + foreach_int(leftTableId, leftTableIdList) { - if (IsApplicableJoinClause(leftTableIdList, rightTableId, joinClause)) - { - applicableJoinClauses = lappend(applicableJoinClauses, joinClause); - } + leftTableRelIds = bms_add_member(leftTableRelIds, leftTableId); } - return applicableJoinClauses; + /* find whether restrictinfo relids contain any of the left table relids */ + Relids intersectLeftRelids = bms_intersect(restrictionRelIds, leftTableRelIds); + bool hasOneOfLeftTables = bms_num_members(intersectLeftRelids) > 0; + + return hasRightTable && hasOneOfLeftTables; } diff --git a/src/backend/distributed/planner/multi_logical_optimizer.c b/src/backend/distributed/planner/multi_logical_optimizer.c index 19b4aea4d..e787cc463 100644 --- a/src/backend/distributed/planner/multi_logical_optimizer.c +++ b/src/backend/distributed/planner/multi_logical_optimizer.c @@ -169,9 +169,8 @@ typedef struct OrderByLimitReference /* Local functions forward declarations */ -static MultiSelect * AndSelectNode(MultiSelect *selectNode); -static MultiSelect * OrSelectNode(MultiSelect *selectNode); -static List * OrSelectClauseList(List *selectClauseList); +static MultiSelect * PushdownableSelectNode(MultiSelect *selectNode); +static MultiSelect * NonPushdownableSelectNode(MultiSelect *selectNode); static void PushDownNodeLoop(MultiUnaryNode *currentNode); static void PullUpCollectLoop(MultiCollect *collectNode); static void AddressProjectSpecialConditions(MultiProject *projectNode); @@ -381,27 +380,29 @@ MultiLogicalPlanOptimize(MultiTreeRoot *multiLogicalPlan) if (selectNodeList != NIL) { MultiSelect *selectNode = (MultiSelect *) linitial(selectNodeList); - MultiSelect *andSelectNode = AndSelectNode(selectNode); - MultiSelect *orSelectNode = OrSelectNode(selectNode); + MultiSelect *pushdownableSelectNode = PushdownableSelectNode(selectNode); + MultiSelect *nonPushdownableSelectNode = NonPushdownableSelectNode(selectNode); - if (andSelectNode != NULL && orSelectNode != NULL) + if (pushdownableSelectNode != NULL && nonPushdownableSelectNode != NULL) { MultiNode *parentNode = ParentNode((MultiNode *) selectNode); MultiNode *childNode = ChildNode((MultiUnaryNode *) selectNode); Assert(UnaryOperator(parentNode)); - SetChild((MultiUnaryNode *) parentNode, (MultiNode *) orSelectNode); - SetChild((MultiUnaryNode *) orSelectNode, (MultiNode *) andSelectNode); - SetChild((MultiUnaryNode *) andSelectNode, (MultiNode *) childNode); + SetChild((MultiUnaryNode *) parentNode, + (MultiNode *) nonPushdownableSelectNode); + SetChild((MultiUnaryNode *) nonPushdownableSelectNode, + (MultiNode *) pushdownableSelectNode); + SetChild((MultiUnaryNode *) pushdownableSelectNode, (MultiNode *) childNode); } - else if (andSelectNode != NULL && orSelectNode == NULL) + else if (pushdownableSelectNode != NULL && nonPushdownableSelectNode == NULL) { - andSelectNode = selectNode; /* no need to modify the tree */ + pushdownableSelectNode = selectNode; /* no need to modify the tree */ } - if (andSelectNode != NULL) + if (pushdownableSelectNode != NULL) { - PushDownNodeLoop((MultiUnaryNode *) andSelectNode); + PushDownNodeLoop((MultiUnaryNode *) pushdownableSelectNode); } } @@ -486,71 +487,44 @@ MultiLogicalPlanOptimize(MultiTreeRoot *multiLogicalPlan) /* - * AndSelectNode looks for AND clauses in the given select node. If they exist, - * the function returns these clauses in a new node. Otherwise, the function + * PushdownableSelectNode looks for pushdownable clauses in the given select node. + * If they exist, the function returns these clauses in a new node. Otherwise, the function * returns null. */ static MultiSelect * -AndSelectNode(MultiSelect *selectNode) +PushdownableSelectNode(MultiSelect *selectNode) { - MultiSelect *andSelectNode = NULL; - List *selectClauseList = selectNode->selectClauseList; - List *orSelectClauseList = OrSelectClauseList(selectClauseList); + MultiSelect *pushdownableSelectNode = NULL; - /* AND clauses are select clauses that are not OR clauses */ - List *andSelectClauseList = list_difference(selectClauseList, orSelectClauseList); - if (andSelectClauseList != NIL) + if (selectNode->pushdownableSelectClauseList != NIL) { - andSelectNode = CitusMakeNode(MultiSelect); - andSelectNode->selectClauseList = andSelectClauseList; + pushdownableSelectNode = CitusMakeNode(MultiSelect); + pushdownableSelectNode->selectClauseList = + selectNode->pushdownableSelectClauseList; } - return andSelectNode; + return pushdownableSelectNode; } /* - * OrSelectNode looks for OR clauses in the given select node. If they exist, - * the function returns these clauses in a new node. Otherwise, the function + * PushdownableSelectNode looks for nonpushdownable clauses in the given select node. + * If they exist, the function returns these clauses in a new node. Otherwise, the function * returns null. */ static MultiSelect * -OrSelectNode(MultiSelect *selectNode) +NonPushdownableSelectNode(MultiSelect *selectNode) { - MultiSelect *orSelectNode = NULL; - List *selectClauseList = selectNode->selectClauseList; - List *orSelectClauseList = OrSelectClauseList(selectClauseList); + MultiSelect *nonPushdownableSelectNode = NULL; - if (orSelectClauseList != NIL) + if (selectNode->nonPushdownableSelectClauseList != NIL) { - orSelectNode = CitusMakeNode(MultiSelect); - orSelectNode->selectClauseList = orSelectClauseList; + nonPushdownableSelectNode = CitusMakeNode(MultiSelect); + nonPushdownableSelectNode->selectClauseList = + selectNode->nonPushdownableSelectClauseList; } - return orSelectNode; -} - - -/* - * OrSelectClauseList walks over the select clause list, and returns all clauses - * that have OR expressions in them. - */ -static List * -OrSelectClauseList(List *selectClauseList) -{ - List *orSelectClauseList = NIL; - - Node *selectClause = NULL; - foreach_ptr(selectClause, selectClauseList) - { - bool orClause = is_orclause(selectClause); - if (orClause) - { - orSelectClauseList = lappend(orSelectClauseList, selectClause); - } - } - - return orSelectClauseList; + return nonPushdownableSelectNode; } diff --git a/src/backend/distributed/planner/multi_logical_planner.c b/src/backend/distributed/planner/multi_logical_planner.c index 73df7435b..bb926440c 100644 --- a/src/backend/distributed/planner/multi_logical_planner.c +++ b/src/backend/distributed/planner/multi_logical_planner.c @@ -33,7 +33,6 @@ #include "distributed/relation_restriction_equivalence.h" #include "distributed/query_pushdown_planning.h" #include "distributed/query_utils.h" -#include "distributed/multi_router_planner.h" #include "distributed/worker_protocol.h" #include "distributed/version_compat.h" #include "nodes/makefuncs.h" @@ -41,6 +40,7 @@ #include "nodes/pathnodes.h" #include "optimizer/optimizer.h" #include "optimizer/clauses.h" +#include "optimizer/paths.h" #include "optimizer/prep.h" #include "optimizer/tlist.h" #include "parser/parsetree.h" @@ -85,20 +85,20 @@ static bool ExtractFromExpressionWalker(Node *node, QualifierWalkerContext *walkerContext); static List * MultiTableNodeList(List *tableEntryList, List *rangeTableList); static List * AddMultiCollectNodes(List *tableNodeList); -static MultiNode * MultiJoinTree(List *joinOrderList, List *collectTableList, - bool passJoinClauseDirectly); +static MultiNode * MultiJoinTree(List *joinOrderList, List *collectTableList); static MultiCollect * CollectNodeForTable(List *collectTableList, uint32 rangeTableId); -static MultiSelect * MultiSelectNode(List *whereClauseList, bool passWhereClauseDirectly); +static MultiSelect * MultiSelectNode(List *pushdownableClauseList, + List *nonPushdownableClauseList); static bool IsSelectClause(Node *clause); static JoinInfoContext * FetchJoinOrderContext(FromExpr *fromExpr); static bool JoinInfoWalker(Node *node, JoinInfoContext *joinInfoContext); +static List * ExtractNonPushdownableJoinClauses(List *joinOrderList); /* Local functions forward declarations for applying joins */ static MultiNode * ApplyJoinRule(MultiNode *leftNode, MultiNode *rightNode, JoinRuleType ruleType, List *partitionColumnList, - JoinType joinType, List *joinClauseList, - bool passJoinClauseDirectly); + JoinType joinType, List *joinClauseList); static RuleApplyFunction JoinRuleApplyFunction(JoinRuleType ruleType); static MultiNode * ApplyReferenceJoin(MultiNode *leftNode, MultiNode *rightNode, List *partitionColumnList, JoinType joinType, @@ -157,7 +157,7 @@ MultiLogicalPlanCreate(Query *originalQuery, Query *queryTree, } else { - multiQueryNode = MultiNodeTree(queryTree); + multiQueryNode = MultiNodeTree(queryTree, plannerRestrictionContext); } /* add a root node to serve as the permanent handle to the tree */ @@ -566,18 +566,16 @@ SubqueryEntryList(Query *queryTree) * group, and limit nodes if they appear in the original query tree. */ MultiNode * -MultiNodeTree(Query *queryTree) +MultiNodeTree(Query *queryTree, PlannerRestrictionContext *plannerRestrictionContext) { List *rangeTableList = queryTree->rtable; List *targetEntryList = queryTree->targetList; - List *joinClauseList = NIL; List *joinOrderList = NIL; List *tableEntryList = NIL; List *tableNodeList = NIL; List *collectTableList = NIL; MultiNode *joinTreeNode = NULL; MultiNode *currentTopNode = NULL; - bool passQualClauseDirectly = false; /* verify we can perform distributed planning on this query */ DeferredErrorMessage *unsupportedQueryError = DeferErrorIfQueryNotSupported( @@ -587,17 +585,35 @@ MultiNodeTree(Query *queryTree) RaiseDeferredError(unsupportedQueryError, ERROR); } - /* extract where and join clause qualifiers(including outer join quals) and verify we can plan for them. */ - List *qualClauseList = QualifierList(queryTree->jointree); + /* extract join and nonjoin clauses from plannerRestrictionContext */ + RestrictInfoContext *restrictInfoContext = ExtractRestrictionInfosFromPlannerContext( + plannerRestrictionContext); + List *joinRestrictInfoList = restrictInfoContext->joinRestrictInfoList; + List *nonJoinRestrictionInfoList = restrictInfoContext->baseRestrictInfoList; + List *joinRestrictInfoListList = restrictInfoContext->joinRestrictInfoListList; + List *pseudoRestrictInfoList = restrictInfoContext->pseudoRestrictInfoList; + List *generatedEcJoinRestrictInfoList = + plannerRestrictionContext->joinRestrictionContext->generatedEcJoinRestrictInfoList; + + /* verify we can plan for restriction clauses */ + List *baseClauseList = ExtractRestrictClausesFromRestrictionInfoList( + nonJoinRestrictionInfoList); + List *allJoinClauseList = ExtractRestrictClausesFromRestrictionInfoList( + joinRestrictInfoList); + List *pseudoClauseList = ExtractRestrictClausesFromRestrictionInfoList( + pseudoRestrictInfoList); + List *generatedEcJoinClauseList = ExtractRestrictClausesFromRestrictionInfoList( + generatedEcJoinRestrictInfoList); + allJoinClauseList = list_concat_unique(allJoinClauseList, generatedEcJoinClauseList); + + List *qualClauseList = list_concat_copy(baseClauseList, allJoinClauseList); + unsupportedQueryError = DeferErrorIfUnsupportedClause(qualClauseList); if (unsupportedQueryError) { RaiseDeferredErrorInternal(unsupportedQueryError, ERROR); } - /* WhereClauseList() merges join qualifiers and base qualifiers into result list */ - List *whereClauseList = WhereClauseList(queryTree->jointree); - /* * If we have a subquery, build a multi table node for the subquery and * add a collect node on top of the multi table node. @@ -634,7 +650,7 @@ MultiNodeTree(Query *queryTree) */ Assert(list_length(subqueryEntryList) == 1); - List *whereClauseColumnList = pull_var_clause_default((Node *) whereClauseList); + List *whereClauseColumnList = pull_var_clause_default((Node *) baseClauseList); List *targetListColumnList = pull_var_clause_default((Node *) targetEntryList); List *columnList = list_concat(whereClauseColumnList, targetListColumnList); @@ -645,7 +661,8 @@ MultiNodeTree(Query *queryTree) } /* recursively create child nested multitree */ - MultiNode *subqueryExtendedNode = MultiNodeTree(subqueryTree); + MultiNode *subqueryExtendedNode = MultiNodeTree(subqueryTree, + plannerRestrictionContext); SetChild((MultiUnaryNode *) subqueryCollectNode, (MultiNode *) subqueryNode); SetChild((MultiUnaryNode *) subqueryNode, subqueryExtendedNode); @@ -686,35 +703,41 @@ MultiNodeTree(Query *queryTree) /* extract join infos for left recursive join tree */ JoinInfoContext *joinInfoContext = FetchJoinOrderContext(queryTree->jointree); - /* where clause should only contain base qualifiers */ - whereClauseList = joinInfoContext->baseQualifierList; - /* we simply donot commute joins as we have at least 1 outer join */ - joinOrderList = FixedJoinOrderList(tableEntryList, joinInfoContext); - - /* pass join clauses directly as they are while creating tasks */ - passQualClauseDirectly = true; + joinOrderList = FixedJoinOrderList(tableEntryList, joinInfoContext, + joinRestrictInfoListList, + generatedEcJoinClauseList, + pseudoClauseList); } else { - /* consider also base qualifications */ - joinClauseList = JoinClauseList(whereClauseList); - /* find best join order for commutative inner joins */ - joinOrderList = JoinOrderList(tableEntryList, joinClauseList); + joinOrderList = JoinOrderList(tableEntryList, joinRestrictInfoListList, + generatedEcJoinClauseList, pseudoClauseList); } /* build join tree using the join order and collected tables */ - joinTreeNode = MultiJoinTree(joinOrderList, collectTableList, - passQualClauseDirectly); + joinTreeNode = MultiJoinTree(joinOrderList, collectTableList); currentTopNode = joinTreeNode; } Assert(currentTopNode != NULL); - /* build select node if the query has selection criteria */ - MultiSelect *selectNode = MultiSelectNode(whereClauseList, passQualClauseDirectly); + + /* + * build select node if the query has selection criteria + * select node will have pushdownable and non-pushdownable parts. + * - all base clauses can be pushdownable + * - some of join clauses cannot be pushed down e.g. they can be only applied after join + */ + List *pushdownableSelectClauseList = baseClauseList; + List *nonpushdownableJoinClauseList = ExtractNonPushdownableJoinClauses( + joinOrderList); + List *nonPushdownableSelectClauseList = list_difference(allJoinClauseList, + nonpushdownableJoinClauseList); + MultiSelect *selectNode = MultiSelectNode(pushdownableSelectClauseList, + nonPushdownableSelectClauseList); if (selectNode != NULL) { SetChild((MultiUnaryNode *) selectNode, currentTopNode); @@ -740,6 +763,128 @@ MultiNodeTree(Query *queryTree) } +/* + * ExtractNonPushdownableJoinClauses returns pushdownable clauses from given join + * restrict infos. + */ +static List * +ExtractNonPushdownableJoinClauses(List *joinOrderList) +{ + List *nonPushdownJoinClauseList = NIL; + + JoinOrderNode *joinOrderNode = NULL; + foreach_ptr(joinOrderNode, joinOrderList) + { + List *joinClauselist = joinOrderNode->joinClauseList; + nonPushdownJoinClauseList = list_concat(nonPushdownJoinClauseList, + joinClauselist); + } + + return nonPushdownJoinClauseList; +} + + +/* + * RestrictInfoContext extracts all RestrictionInfo from PlannerRestrictionContext. + */ +RestrictInfoContext * +ExtractRestrictionInfosFromPlannerContext( + PlannerRestrictionContext *plannerRestrictionContext) +{ + RelationRestrictionContext *relationRestrictionContext = + plannerRestrictionContext->relationRestrictionContext; + JoinRestrictionContext *joinRestrictionContext = + plannerRestrictionContext->joinRestrictionContext; + List *baseRestrictInfoList = NIL; + List *joinRestrictInfoList = NIL; + List *joinRestrictInfoListList = NIL; + List *pseudoRestrictInfoList = NIL; + + /* collect all restrictInfos from relationRestrictionContext */ + RelationRestriction *relationRestriction = NULL; + foreach_ptr(relationRestriction, relationRestrictionContext->relationRestrictionList) + { + RelOptInfo *relOptInfo = relationRestriction->relOptInfo; + RestrictInfo *baseRestrictInfo = NULL; + foreach_ptr(baseRestrictInfo, relOptInfo->baserestrictinfo) + { + if (baseRestrictInfo->pseudoconstant) + { + pseudoRestrictInfoList = list_append_unique(pseudoRestrictInfoList, + baseRestrictInfo); + continue; + } + + baseRestrictInfoList = list_append_unique(baseRestrictInfoList, + baseRestrictInfo); + } + + RestrictInfo *joinRestrictInfo = NULL; + foreach_ptr(joinRestrictInfo, relOptInfo->joininfo) + { + if (joinRestrictInfo->pseudoconstant) + { + pseudoRestrictInfoList = list_append_unique(pseudoRestrictInfoList, + joinRestrictInfo); + } + } + } + + /* collect all restrictInfos from joinRestrictionContext */ + JoinRestriction *joinRestriction = NULL; + foreach_ptr(joinRestriction, joinRestrictionContext->joinRestrictionList) + { + List *currentJoinRestrictInfoList = NIL; + RestrictInfo *joinRestrictInfo = NULL; + foreach_ptr(joinRestrictInfo, joinRestriction->joinRestrictInfoList) + { + if (joinRestrictInfo->pseudoconstant) + { + pseudoRestrictInfoList = list_append_unique(pseudoRestrictInfoList, + joinRestrictInfo); + continue; + } + + currentJoinRestrictInfoList = list_append_unique(currentJoinRestrictInfoList, + joinRestrictInfo); + + joinRestrictInfoList = list_append_unique(joinRestrictInfoList, + joinRestrictInfo); + } + + joinRestrictInfoListList = lappend(joinRestrictInfoListList, + currentJoinRestrictInfoList); + } + + RestrictInfoContext *restrictInfoContext = palloc0(sizeof(RestrictInfoContext)); + restrictInfoContext->baseRestrictInfoList = baseRestrictInfoList; + restrictInfoContext->joinRestrictInfoList = joinRestrictInfoList; + restrictInfoContext->joinRestrictInfoListList = joinRestrictInfoListList; + restrictInfoContext->pseudoRestrictInfoList = pseudoRestrictInfoList; + + return restrictInfoContext; +} + + +/* + * ExtractRestrictClausesFromRestrictionInfoList extracts RestrictInfo clauses from + * given restrictInfoList. + */ +List * +ExtractRestrictClausesFromRestrictionInfoList(List *restrictInfoList) +{ + List *restrictClauseList = NIL; + + RestrictInfo *restrictInfo = NULL; + foreach_ptr(restrictInfo, restrictInfoList) + { + restrictClauseList = lappend(restrictClauseList, restrictInfo->clause); + } + + return restrictClauseList; +} + + /* * FetchJoinOrderContext returns all join info for given node. */ @@ -1357,28 +1502,6 @@ WhereClauseList(FromExpr *fromExpr) } -/* - * QualifierList walks over the FROM expression in the query tree, and builds - * a list of all qualifiers from the expression tree. The function checks for - * both implicitly and explicitly defined qualifiers. Note that this function - * is very similar to WhereClauseList(), but QualifierList() also includes - * outer-join clauses. - */ -List * -QualifierList(FromExpr *fromExpr) -{ - FromExpr *fromExprCopy = copyObject(fromExpr); - QualifierWalkerContext *walkerContext = palloc0(sizeof(QualifierWalkerContext)); - List *qualifierList = NIL; - - ExtractFromExpressionWalker((Node *) fromExprCopy, walkerContext); - qualifierList = list_concat(qualifierList, walkerContext->baseQualifierList); - qualifierList = list_concat(qualifierList, walkerContext->outerJoinQualifierList); - - return qualifierList; -} - - /* * DeferErrorIfUnsupportedClause walks over the given list of clauses, and * checks that we can recognize all the clauses. This function ensures that @@ -1405,31 +1528,6 @@ DeferErrorIfUnsupportedClause(List *clauseList) } -/* - * JoinClauseList finds the join clauses from the given where clause expression - * list, and returns them. The function does not iterate into nested OR clauses - * and relies on find_duplicate_ors() in the optimizer to pull up factorizable - * OR clauses. - */ -List * -JoinClauseList(List *whereClauseList) -{ - List *joinClauseList = NIL; - ListCell *whereClauseCell = NULL; - - foreach(whereClauseCell, whereClauseList) - { - Node *whereClause = (Node *) lfirst(whereClauseCell); - if (IsJoinClause(whereClause)) - { - joinClauseList = lappend(joinClauseList, whereClause); - } - } - - return joinClauseList; -} - - /* * ExtractFromExpressionWalker walks over a FROM expression, and finds all * implicit and explicit qualifiers in the expression. The function looks at @@ -1712,7 +1810,7 @@ AddMultiCollectNodes(List *tableNodeList) * this tree after every table in the list has been joined. */ static MultiNode * -MultiJoinTree(List *joinOrderList, List *collectTableList, bool passJoinClauseDirectly) +MultiJoinTree(List *joinOrderList, List *collectTableList) { MultiNode *currentTopNode = NULL; ListCell *joinOrderCell = NULL; @@ -1744,8 +1842,7 @@ MultiJoinTree(List *joinOrderList, List *collectTableList, bool passJoinClauseDi (MultiNode *) collectNode, joinRuleType, partitionColumnList, joinType, - joinClauseList, - passJoinClauseDirectly); + joinClauseList); /* the new join node becomes the top of our join tree */ currentTopNode = newJoinNode; @@ -1792,33 +1889,20 @@ CollectNodeForTable(List *collectTableList, uint32 rangeTableId) * MultiSelectNode extracts the select clauses from the given where clause list, * and builds a MultiSelect node from these clauses. If the expression tree does * not have any select clauses, the function return null. - * - * When we have at least 1 outer join in a query tree, we cannot commute joins(that is - * why we have `FixedJoinOrderList`) or move join and where clauses as we wish because - * we would have incorrect results. We should pass join and where clauses separately as - * they are while creating tasks. `whereClauseList` should be passed as it is when - * `passWhereClauseDirectly` is set true. */ static MultiSelect * -MultiSelectNode(List *whereClauseList, bool passWhereClauseDirectly) +MultiSelectNode(List *pushdownableClauseList, List *nonPushdownableClauseList) { - List *selectClauseList = NIL; MultiSelect *selectNode = NULL; - ListCell *whereClauseCell = NULL; - foreach(whereClauseCell, whereClauseList) - { - Node *whereClause = (Node *) lfirst(whereClauseCell); - if (passWhereClauseDirectly || IsSelectClause(whereClause)) - { - selectClauseList = lappend(selectClauseList, whereClause); - } - } - - if (list_length(selectClauseList) > 0) + if (list_length(pushdownableClauseList) > 0 || + list_length(nonPushdownableClauseList) > 0) { selectNode = CitusMakeNode(MultiSelect); - selectNode->selectClauseList = selectClauseList; + selectNode->selectClauseList = list_concat_copy(pushdownableClauseList, + nonPushdownableClauseList); + selectNode->pushdownableSelectClauseList = pushdownableClauseList; + selectNode->nonPushdownableSelectClauseList = nonPushdownableClauseList; } return selectNode; @@ -2100,37 +2184,12 @@ pull_var_clause_default(Node *node) */ static MultiNode * ApplyJoinRule(MultiNode *leftNode, MultiNode *rightNode, JoinRuleType ruleType, - List *partitionColumnList, JoinType joinType, List *joinClauseList, - bool passJoinClauseDirectly) + List *partitionColumnList, JoinType joinType, List *joinClauseList) { - List *leftTableIdList = OutputTableIdList(leftNode); - List *rightTableIdList = OutputTableIdList(rightNode); - int rightTableIdCount PG_USED_FOR_ASSERTS_ONLY = 0; - - rightTableIdCount = list_length(rightTableIdList); - Assert(rightTableIdCount == 1); - - List *joinClauses = joinClauseList; - if (!passJoinClauseDirectly) - { - /* find applicable join clauses between the left and right data sources */ - uint32 rightTableId = (uint32) linitial_int(rightTableIdList); - joinClauses = ApplicableJoinClauses(leftTableIdList, rightTableId, - joinClauseList); - } - /* call the join rule application function to create the new join node */ RuleApplyFunction ruleApplyFunction = JoinRuleApplyFunction(ruleType); MultiNode *multiNode = (*ruleApplyFunction)(leftNode, rightNode, partitionColumnList, - joinType, joinClauses); - - if (joinType != JOIN_INNER && CitusIsA(multiNode, MultiJoin)) - { - MultiJoin *joinNode = (MultiJoin *) multiNode; - - /* preserve non-join clauses for OUTER joins */ - joinNode->joinClauseList = list_copy(joinClauseList); - } + joinType, joinClauseList); return multiNode; } diff --git a/src/backend/distributed/planner/query_pushdown_planning.c b/src/backend/distributed/planner/query_pushdown_planning.c index 6e4ce6426..a8a0d4a90 100644 --- a/src/backend/distributed/planner/query_pushdown_planning.c +++ b/src/backend/distributed/planner/query_pushdown_planning.c @@ -224,8 +224,18 @@ ShouldUseSubqueryPushDown(Query *originalQuery, Query *rewrittenQuery, * Some unsupported join clauses in logical planner * may be supported by subquery pushdown planner. */ - List *qualifierList = QualifierList(rewrittenQuery->jointree); - if (DeferErrorIfUnsupportedClause(qualifierList) != NULL) + RestrictInfoContext *restrictInfoContext = ExtractRestrictionInfosFromPlannerContext( + plannerRestrictionContext); + List *joinRestrictionInfoList = restrictInfoContext->joinRestrictInfoList; + List *nonJoinRestrictionInfoList = restrictInfoContext->baseRestrictInfoList; + + /* verify we can plan for restriction clauses */ + List *whereClauseList = ExtractRestrictClausesFromRestrictionInfoList( + nonJoinRestrictionInfoList); + List *joinClauseList = ExtractRestrictClausesFromRestrictionInfoList( + joinRestrictionInfoList); + List *qualClauseList = list_concat_copy(whereClauseList, joinClauseList); + if (DeferErrorIfUnsupportedClause(qualClauseList) != NULL) { return true; } diff --git a/src/include/distributed/distributed_planner.h b/src/include/distributed/distributed_planner.h index 29c3c7154..95a6500f4 100644 --- a/src/include/distributed/distributed_planner.h +++ b/src/include/distributed/distributed_planner.h @@ -78,6 +78,7 @@ typedef struct JoinRestrictionContext List *joinRestrictionList; bool hasSemiJoin; bool hasOuterJoin; + List *generatedEcJoinRestrictInfoList; } JoinRestrictionContext; typedef struct JoinRestriction @@ -105,6 +106,8 @@ typedef struct FastPathRestrictionContext * Set to true when distKey = Param; in the queryTree */ bool distributionKeyHasParam; + + int distRelId; }FastPathRestrictionContext; typedef struct PlannerRestrictionContext @@ -241,6 +244,8 @@ extern bool GetOriginalInh(RangeTblEntry *rte); extern LOCKMODE GetQueryLockMode(Query *query); extern int32 BlessRecordExpression(Expr *expr); extern void DissuadePlannerFromUsingPlan(PlannedStmt *plan); +extern Query * ReplanAfterQueryModification(Query *originalQuery, ParamListInfo + boundParams); extern PlannedStmt * FinalizePlan(PlannedStmt *localPlan, struct DistributedPlan *distributedPlan); extern RTEListProperties * GetRTEListPropertiesForQuery(Query *query); diff --git a/src/include/distributed/insert_select_planner.h b/src/include/distributed/insert_select_planner.h index 74b8a0708..b5c9dc84d 100644 --- a/src/include/distributed/insert_select_planner.h +++ b/src/include/distributed/insert_select_planner.h @@ -44,6 +44,8 @@ extern DistributedPlan * CreateInsertSelectIntoLocalTablePlan(uint64 planId, plannerRestrictionContext); extern char * InsertSelectResultIdPrefix(uint64 planId); extern bool PlanningInsertSelect(void); +extern void RelabelPlannerRestrictionContext( + PlannerRestrictionContext *plannerRestrictionContext); #endif /* INSERT_SELECT_PLANNER_H */ diff --git a/src/include/distributed/multi_join_order.h b/src/include/distributed/multi_join_order.h index ffa6b630c..1113cbd14 100644 --- a/src/include/distributed/multi_join_order.h +++ b/src/include/distributed/multi_join_order.h @@ -17,6 +17,7 @@ #include "postgres.h" +#include "nodes/pathnodes.h" #include "nodes/pg_list.h" #include "nodes/primnodes.h" @@ -30,6 +31,7 @@ typedef enum JoinRuleType { JOIN_RULE_INVALID_FIRST = 0, + REFERENCE_JOIN = 1, LOCAL_PARTITION_JOIN = 2, SINGLE_HASH_PARTITION_JOIN = 3, @@ -114,13 +116,18 @@ extern bool EnableSingleHashRepartitioning; /* Function declaration for determining table join orders */ extern List * JoinExprList(FromExpr *fromExpr); -extern List * JoinOrderList(List *rangeTableEntryList, List *joinClauseList); +extern List * JoinOrderList(List *rangeTableEntryList, List *joinRestrictInfoListList, + List *generatedEcJoinClauseList, List *pseudoClauseList); extern List * FixedJoinOrderList(List *rangeTableEntryList, - JoinInfoContext *joinInfoContext); + JoinInfoContext *joinInfoContext, + List *joinRestrictInfoListList, + List *generatedEcJoinClauseList, + List *pseudoClauseList); extern bool IsApplicableJoinClause(List *leftTableIdList, uint32 rightTableId, Node *joinClause); -extern List * ApplicableJoinClauses(List *leftTableIdList, uint32 rightTableId, - List *joinClauseList); +extern bool IsApplicableFalseConstantJoinClause(List *leftTableIdList, uint32 + rightTableId, + RestrictInfo *restrictInfo); extern bool NodeIsEqualsOpExpr(Node *node); extern bool IsSupportedReferenceJoin(JoinType joinType, bool leftIsReferenceTable, bool rightIsReferenceTable); diff --git a/src/include/distributed/multi_logical_planner.h b/src/include/distributed/multi_logical_planner.h index 189170358..8f4793b81 100644 --- a/src/include/distributed/multi_logical_planner.h +++ b/src/include/distributed/multi_logical_planner.h @@ -124,6 +124,8 @@ typedef struct MultiSelect { MultiUnaryNode unaryNode; List *selectClauseList; + List *pushdownableSelectClauseList; + List *nonPushdownableSelectClauseList; } MultiSelect; @@ -188,6 +190,17 @@ typedef struct MultiExtendedOp } MultiExtendedOp; +/* RestrictInfoContext stores join and base restriction infos extracted from planner context*/ +typedef struct RestrictInfoContext +{ + List *baseRestrictInfoList; + List *joinRestrictInfoList; + List *joinRestrictInfoListList; + List *generatedEcJoinClauseList; + List *pseudoRestrictInfoList; +} RestrictInfoContext; + + /* Function declarations for building logical plans */ extern MultiTreeRoot * MultiLogicalPlanCreate(Query *originalQuery, Query *queryTree, PlannerRestrictionContext * @@ -214,12 +227,13 @@ extern bool UnaryOperator(MultiNode *node); extern bool BinaryOperator(MultiNode *node); extern List * OutputTableIdList(MultiNode *multiNode); extern List * FindNodesOfType(MultiNode *node, int type); -extern List * JoinClauseList(List *whereClauseList); extern bool IsJoinClause(Node *clause); extern List * SubqueryEntryList(Query *queryTree); extern DeferredErrorMessage * DeferErrorIfQueryNotSupported(Query *queryTree); extern List * WhereClauseList(FromExpr *fromExpr); -extern List * QualifierList(FromExpr *fromExpr); +extern RestrictInfoContext * ExtractRestrictionInfosFromPlannerContext( + PlannerRestrictionContext *plannerRestrictionContext); +extern List * ExtractRestrictClausesFromRestrictionInfoList(List *restrictInfoList); extern List * TableEntryList(List *rangeTableList); extern List * UsedTableEntryList(Query *query); extern List * pull_var_clause_default(Node *node); @@ -229,7 +243,8 @@ extern MultiProject * MultiProjectNode(List *targetEntryList); extern MultiExtendedOp * MultiExtendedOpNode(Query *queryTree, Query *originalQuery); extern DeferredErrorMessage * DeferErrorIfUnsupportedSubqueryRepartition(Query * subqueryTree); -extern MultiNode * MultiNodeTree(Query *queryTree); +extern MultiNode * MultiNodeTree(Query *queryTree, + PlannerRestrictionContext *plannerRestrictionContext); #endif /* MULTI_LOGICAL_PLANNER_H */ diff --git a/src/test/regress/expected/ch_bench_having.out b/src/test/regress/expected/ch_bench_having.out index 29feb0305..096beaca7 100644 --- a/src/test/regress/expected/ch_bench_having.out +++ b/src/test/regress/expected/ch_bench_having.out @@ -47,6 +47,7 @@ order by s_i_id; Node: host=localhost port=xxxxx dbname=regression -> Aggregate -> Seq Scan on stock_1640000 stock + Filter: (s_order_cnt IS NOT NULL) Task Count: 4 Tasks Shown: One of 4 -> Task @@ -57,7 +58,7 @@ order by s_i_id; -> Function Scan on read_intermediate_result intermediate_result -> Seq Scan on stock_1640000 stock Filter: ((s_order_cnt)::numeric > $0) -(36 rows) +(37 rows) explain (costs false, summary false, timing false) select s_i_id, sum(s_order_cnt) as ordercount @@ -84,6 +85,7 @@ order by s_i_id; Node: host=localhost port=xxxxx dbname=regression -> Aggregate -> Seq Scan on stock_1640000 stock + Filter: (s_order_cnt IS NOT NULL) Task Count: 4 Tasks Shown: One of 4 -> Task @@ -91,7 +93,7 @@ order by s_i_id; -> HashAggregate Group Key: stock.s_i_id -> Seq Scan on stock_1640000 stock -(24 rows) +(25 rows) explain (costs false, summary false, timing false) select s_i_id, sum(s_order_cnt) as ordercount @@ -115,6 +117,7 @@ having sum(s_order_cnt) > (select max(s_order_cnt) - 3 as having_query from st Node: host=localhost port=xxxxx dbname=regression -> Aggregate -> Seq Scan on stock_1640000 stock + Filter: (s_order_cnt IS NOT NULL) Task Count: 4 Tasks Shown: One of 4 -> Task @@ -122,7 +125,7 @@ having sum(s_order_cnt) > (select max(s_order_cnt) - 3 as having_query from st -> HashAggregate Group Key: stock.s_i_id -> Seq Scan on stock_1640000 stock -(22 rows) +(23 rows) explain (costs false) select s_i_id, sum(s_order_cnt) as ordercount diff --git a/src/test/regress/expected/chbenchmark_all_queries.out b/src/test/regress/expected/chbenchmark_all_queries.out index 110234a74..ea3c33163 100644 --- a/src/test/regress/expected/chbenchmark_all_queries.out +++ b/src/test/regress/expected/chbenchmark_all_queries.out @@ -345,7 +345,7 @@ GROUP BY ORDER BY revenue DESC, o_entry_d; -LOG: join order: [ "customer" ][ local partition join(INNER) "oorder" ][ local partition join(INNER) "new_order" ][ local partition join(INNER) "order_line" ] +LOG: join order: [ "customer" ][ local partition join(INNER) "new_order" ][ local partition join(INNER) "oorder" ][ local partition join(INNER) "order_line" ] ol_o_id | ol_w_id | ol_d_id | revenue | o_entry_d --------------------------------------------------------------------- 10 | 10 | 10 | 10.00 | Fri Oct 17 00:00:00 2008 @@ -472,7 +472,7 @@ ORDER BY su_nationkey, cust_nation, l_year; -LOG: join order: [ "order_line" ][ local partition join(INNER) "oorder" ][ local partition join(INNER) "customer" ][ reference join(INNER) "nation" ][ reference join(INNER) "nation" ][ reference join(INNER) "supplier" ][ dual partition join(INNER) "stock" ] +LOG: join order: [ "order_line" ][ local partition join(INNER) "oorder" ][ local partition join(INNER) "customer" ][ reference join(INNER) "nation" ][ dual partition join(INNER) "stock" ][ reference join(INNER) "supplier" ][ reference join(INNER) "nation" ] supp_nation | cust_nation | l_year | revenue --------------------------------------------------------------------- 9 | C | 2008 | 3.00 @@ -963,7 +963,7 @@ ORDER BY su_nationkey, cust_nation, l_year; -LOG: join order: [ "order_line" ][ local partition join(INNER) "oorder" ][ local partition join(INNER) "customer" ][ reference join(INNER) "nation" ][ reference join(INNER) "nation" ][ reference join(INNER) "supplier" ][ single hash partition join(INNER) "stock" ] +LOG: join order: [ "order_line" ][ local partition join(INNER) "oorder" ][ local partition join(INNER) "customer" ][ reference join(INNER) "nation" ][ single hash partition join(INNER) "stock" ][ reference join(INNER) "supplier" ][ reference join(INNER) "nation" ] supp_nation | cust_nation | l_year | revenue --------------------------------------------------------------------- 9 | C | 2008 | 3.00 @@ -1003,7 +1003,7 @@ WHERE i_id = s_i_id AND i_id = ol_i_id GROUP BY extract(YEAR FROM o_entry_d) ORDER BY l_year; -LOG: join order: [ "order_line" ][ reference join(INNER) "item" ][ local partition join(INNER) "oorder" ][ local partition join(INNER) "customer" ][ reference join(INNER) "nation" ][ reference join(INNER) "region" ][ single hash partition join(INNER) "stock" ][ reference join(INNER) "supplier" ][ reference join(INNER) "nation" ] +LOG: join order: [ "order_line" ][ reference join(INNER) "item" ][ local partition join(INNER) "oorder" ][ local partition join(INNER) "customer" ][ reference join(INNER) "nation" ][ reference join(INNER) "region" ][ dual partition join(INNER) "stock" ][ reference join(INNER) "supplier" ][ reference join(INNER) "nation" ] l_year | mkt_share --------------------------------------------------------------------- 2008 | 0.50000000000000000000 @@ -1036,7 +1036,7 @@ GROUP BY ORDER BY n_name, l_year DESC; -LOG: join order: [ "order_line" ][ reference join(INNER) "item" ][ local partition join(INNER) "oorder" ][ single hash partition join(INNER) "stock" ][ reference join(INNER) "supplier" ][ reference join(INNER) "nation" ] +LOG: join order: [ "order_line" ][ reference join(INNER) "item" ][ local partition join(INNER) "oorder" ][ dual partition join(INNER) "stock" ][ reference join(INNER) "supplier" ][ reference join(INNER) "nation" ] n_name | l_year | sum_profit --------------------------------------------------------------------- Germany | 2008 | 3.00 diff --git a/src/test/regress/expected/cross_join.out b/src/test/regress/expected/cross_join.out index d1b40de7f..74daa8845 100644 --- a/src/test/regress/expected/cross_join.out +++ b/src/test/regress/expected/cross_join.out @@ -189,12 +189,14 @@ SELECT count(*) FROM users_table u1 CROSS JOIN users_ref_test_table ref2 JOIN us reset citus.enable_repartition_joins; -- although the following has the "ref LEFT JOIN dist" type of query, the LEFT JOIN is eliminated by Postgres -- because the INNER JOIN eliminates the LEFT JOIN +SET citus.enable_repartition_joins TO on; SELECT count(*) FROM users_ref_test_table ref1 CROSS JOIN users_ref_test_table ref2 LEFT JOIN users_table ON (ref1.id = users_table.user_id) JOIN users_table u2 ON (u2.user_id = users_table.user_id); count --------------------------------------------------------------------- 11802 (1 row) +reset citus.enable_repartition_joins; -- this is the same query as the above, but this time the outer query is also LEFT JOIN, meaning that Postgres -- cannot eliminate the outer join SELECT count(*) FROM users_ref_test_table ref1 CROSS JOIN users_ref_test_table ref2 LEFT JOIN users_table ON (ref1.id = users_table.user_id) LEFT JOIN users_table u2 ON (u2.user_id = users_table.user_id); diff --git a/src/test/regress/expected/multi_join_order_additional.out b/src/test/regress/expected/multi_join_order_additional.out index ede866855..57a0c25a5 100644 --- a/src/test/regress/expected/multi_join_order_additional.out +++ b/src/test/regress/expected/multi_join_order_additional.out @@ -102,7 +102,9 @@ LOG: join order: [ "lineitem" ][ local partition join(INNER) "orders" ] EXPLAIN (COSTS OFF) SELECT l_quantity FROM lineitem, orders WHERE (l_orderkey = o_orderkey OR l_quantity > 5); -ERROR: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator +LOG: join order: [ "lineitem" ][ cartesian product(INNER) "orders" ] +ERROR: cannot perform distributed planning on this query +DETAIL: Cartesian products are currently unsupported EXPLAIN (COSTS OFF) SELECT count(*) FROM orders, lineitem_hash WHERE o_orderkey = l_orderkey; diff --git a/src/test/regress/expected/multi_repartition_join_ref.out b/src/test/regress/expected/multi_repartition_join_ref.out index 90c5caf14..94eb76d50 100644 --- a/src/test/regress/expected/multi_repartition_join_ref.out +++ b/src/test/regress/expected/multi_repartition_join_ref.out @@ -41,7 +41,7 @@ GROUP BY ORDER BY l_partkey, l_suppkey LIMIT 10; -LOG: join order: [ "lineitem" ][ reference join(INNER) "supplier" ][ dual partition join(INNER) "part_append" ] +LOG: join order: [ "lineitem" ][ dual partition join(INNER) "part_append" ][ cartesian product reference join(INNER) "supplier" ] DEBUG: push down of limit count: 10 l_partkey | l_suppkey | count --------------------------------------------------------------------- @@ -69,7 +69,7 @@ GROUP BY ORDER BY l_partkey, l_suppkey LIMIT 10; -LOG: join order: [ "lineitem" ][ reference join(INNER) "supplier" ][ dual partition join(INNER) "part_append" ] +LOG: join order: [ "lineitem" ][ dual partition join(INNER) "part_append" ][ cartesian product reference join(INNER) "supplier" ] DEBUG: push down of limit count: 10 l_partkey | l_suppkey | count --------------------------------------------------------------------- @@ -124,7 +124,7 @@ GROUP BY ORDER BY l_partkey, l_suppkey LIMIT 10; -LOG: join order: [ "lineitem" ][ reference join(INNER) "supplier" ][ dual partition join(INNER) "part_append" ] +LOG: join order: [ "lineitem" ][ dual partition join(INNER) "part_append" ][ cartesian product reference join(INNER) "supplier" ] DEBUG: push down of limit count: 10 l_partkey | l_suppkey | count --------------------------------------------------------------------- @@ -152,7 +152,7 @@ GROUP BY ORDER BY l_partkey, l_suppkey LIMIT 10; -LOG: join order: [ "lineitem" ][ reference join(INNER) "supplier" ][ dual partition join(INNER) "part_append" ] +LOG: join order: [ "lineitem" ][ dual partition join(INNER) "part_append" ][ cartesian product reference join(INNER) "supplier" ] DEBUG: push down of limit count: 10 l_partkey | l_suppkey | count --------------------------------------------------------------------- @@ -180,7 +180,7 @@ GROUP BY ORDER BY l_partkey, l_suppkey LIMIT 10; -LOG: join order: [ "lineitem" ][ reference join(INNER) "supplier" ][ dual partition join(INNER) "part_append" ] +LOG: join order: [ "lineitem" ][ dual partition join(INNER) "part_append" ][ cartesian product reference join(INNER) "supplier" ] DEBUG: push down of limit count: 10 l_partkey | l_suppkey | count --------------------------------------------------------------------- diff --git a/src/test/regress/expected/non_colocated_outer_joins.out b/src/test/regress/expected/non_colocated_outer_joins.out index 579c81d17..185bd7b2a 100644 --- a/src/test/regress/expected/non_colocated_outer_joins.out +++ b/src/test/regress/expected/non_colocated_outer_joins.out @@ -1074,8 +1074,96 @@ DETAIL: Cartesian products are currently unsupported -- join order planner cannot handle right recursive joins SELECT t1.*, t2.* FROM test_hash1 t1 LEFT JOIN ( test_hash2 t2 JOIN test_hash3 t3 ON t2.col2 = t3.col1) ON (t1.col1 = t2.col1) ORDER BY 1,2,3,4; ERROR: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns +-- sometimes join filters are pushed down and applied before join by PG +CREATE TABLE dist1 (x INT, y INT); +CREATE TABLE dist2 (x INT, y INT); +SELECT create_distributed_table('dist1','x'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT create_distributed_table('dist2','x'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO dist1 VALUES (1,2); +INSERT INTO dist1 VALUES (3,4); +INSERT INTO dist2 VALUES (1,2); +INSERT INTO dist2 VALUES (5,6); +-- single join condition +SELECT * FROM dist1 LEFT JOIN dist2 ON (dist1.x = dist2.x) ORDER BY 1,2,3,4; + x | y | x | y +--------------------------------------------------------------------- + 1 | 2 | 1 | 2 + 3 | 4 | | +(2 rows) + +-- single join condition and dist2.x >2 will be pushed down as it is on inner part of the join. e.g. filter out dist2.x <= 2 beforehand +SELECT * FROM dist1 LEFT JOIN dist2 ON (dist1.x = dist2.x AND dist2.x >2) ORDER BY 1,2,3,4; + x | y | x | y +--------------------------------------------------------------------- + 1 | 2 | | + 3 | 4 | | +(2 rows) + +-- single join condition and dist2.x >2 is regular filter and applied after join +SELECT * FROM dist1 LEFT JOIN dist2 ON (dist1.x = dist2.x) WHERE dist2.x >2 ORDER BY 1,2,3,4; +LOG: join order: [ "dist1" ][ local partition join(INNER) "dist2" ] + x | y | x | y +--------------------------------------------------------------------- +(0 rows) + +-- single join condition and dist1.x >2 will not be pushed down as it is on outer part of the join +SELECT * FROM dist1 LEFT JOIN dist2 ON (dist1.x = dist2.x AND dist1.x >2) ORDER BY 1,2,3,4; + x | y | x | y +--------------------------------------------------------------------- + 1 | 2 | | + 3 | 4 | | +(2 rows) + +-- single join condition and dist1.x >2 is regular filter and applied after join +SELECT * FROM dist1 LEFT JOIN dist2 ON (dist1.x = dist2.x) WHERE dist1.x >2 ORDER BY 1,2,3,4; + x | y | x | y +--------------------------------------------------------------------- + 3 | 4 | | +(1 row) + +-- constant false filter as join filter for left join. +-- Inner table will be converted to empty result. Constant filter will be applied before join but will not be pushdowned. +SELECT * FROM dist1 LEFT JOIN dist2 ON (dist1.y = dist2.y AND false) ORDER BY 1,2,3,4; +LOG: join order: [ "dist1" ][ cartesian product(LEFT) "dist2" ] + x | y | x | y +--------------------------------------------------------------------- + 1 | 2 | | + 3 | 4 | | +(2 rows) + +-- constant false filter as base filter for left join. +-- Both tables will be converted to empty result .e.g RTE_RESULT +SELECT * FROM dist1 LEFT JOIN dist2 ON (dist1.y = dist2.y) WHERE false ORDER BY 1,2,3,4; + x | y | x | y +--------------------------------------------------------------------- +(0 rows) + +-- constant false filter as join filter for inner join. +-- Both tables will be converted to empty result .e.g RTE_RESULT +SELECT * FROM dist1 INNER JOIN dist2 ON (dist1.y = dist2.y AND false) ORDER BY 1,2,3,4; + x | y | x | y +--------------------------------------------------------------------- +(0 rows) + +-- constant false filter as base filter for inner join. +-- Both tables will be converted to empty result .e.g RTE_RESULT +SELECT * FROM dist1 INNER JOIN dist2 ON (dist1.y = dist2.y) WHERE false ORDER BY 1,2,3,4; + x | y | x | y +--------------------------------------------------------------------- +(0 rows) + DROP SCHEMA non_colocated_outer_joins CASCADE; -NOTICE: drop cascades to 9 other objects +NOTICE: drop cascades to 11 other objects DETAIL: drop cascades to table test_hash1 drop cascades to table test_hash2 drop cascades to table test_hash3 @@ -1085,6 +1173,8 @@ drop cascades to table test_append3 drop cascades to table test_range1 drop cascades to table test_range2 drop cascades to table test_range3 +drop cascades to table dist1 +drop cascades to table dist2 RESET client_min_messages; RESET citus.log_multi_join_order; RESET citus.enable_repartition_joins; diff --git a/src/test/regress/expected/single_hash_repartition_join.out b/src/test/regress/expected/single_hash_repartition_join.out index 1b349ecde..2e026b13e 100644 --- a/src/test/regress/expected/single_hash_repartition_join.out +++ b/src/test/regress/expected/single_hash_repartition_join.out @@ -98,7 +98,7 @@ FROM WHERE r1.id = t1.id AND t2.sum = t1.id; DEBUG: Router planner cannot handle multi-shard select queries -LOG: join order: [ "single_hash_repartition_second" ][ reference join(INNER) "ref_table" ][ single hash partition join(INNER) "single_hash_repartition_first" ] +LOG: join order: [ "single_hash_repartition_first" ][ reference join(INNER) "ref_table" ][ single hash partition join(INNER) "single_hash_repartition_second" ] DEBUG: join prunable for intervals [-2147483648,-1073741825] and [-1073741824,-1] DEBUG: join prunable for intervals [-2147483648,-1073741825] and [0,1073741823] DEBUG: join prunable for intervals [-2147483648,-1073741825] and [1073741824,2147483647] @@ -326,7 +326,19 @@ FROM WHERE t1.id = t2.sum AND t2.sum = t3.id; DEBUG: Router planner cannot handle multi-shard select queries -LOG: join order: [ "single_hash_repartition_first" ][ single hash partition join(INNER) "single_hash_repartition_second" ][ single hash partition join(INNER) "single_hash_repartition_second" ] +LOG: join order: [ "single_hash_repartition_first" ][ local partition join(INNER) "single_hash_repartition_second" ][ single hash partition join(INNER) "single_hash_repartition_second" ] +DEBUG: join prunable for intervals [-2147483648,-1073741825] and [-1073741824,-1] +DEBUG: join prunable for intervals [-2147483648,-1073741825] and [0,1073741823] +DEBUG: join prunable for intervals [-2147483648,-1073741825] and [1073741824,2147483647] +DEBUG: join prunable for intervals [-1073741824,-1] and [-2147483648,-1073741825] +DEBUG: join prunable for intervals [-1073741824,-1] and [0,1073741823] +DEBUG: join prunable for intervals [-1073741824,-1] and [1073741824,2147483647] +DEBUG: join prunable for intervals [0,1073741823] and [-2147483648,-1073741825] +DEBUG: join prunable for intervals [0,1073741823] and [-1073741824,-1] +DEBUG: join prunable for intervals [0,1073741823] and [1073741824,2147483647] +DEBUG: join prunable for intervals [1073741824,2147483647] and [-2147483648,-1073741825] +DEBUG: join prunable for intervals [1073741824,2147483647] and [-1073741824,-1] +DEBUG: join prunable for intervals [1073741824,2147483647] and [0,1073741823] DEBUG: join prunable for intervals [-2147483648,-1073741825] and [-1073741824,-1] DEBUG: join prunable for intervals [-2147483648,-1073741825] and [0,1073741823] DEBUG: join prunable for intervals [-2147483648,-1073741825] and [1073741824,2147483647] @@ -347,26 +359,6 @@ DEBUG: pruning merge fetch taskId 5 DETAIL: Creating dependency on merge taskId 15 DEBUG: pruning merge fetch taskId 7 DETAIL: Creating dependency on merge taskId 20 -DEBUG: join prunable for intervals [-2147483648,-1073741825] and [-1073741824,-1] -DEBUG: join prunable for intervals [-2147483648,-1073741825] and [0,1073741823] -DEBUG: join prunable for intervals [-2147483648,-1073741825] and [1073741824,2147483647] -DEBUG: join prunable for intervals [-1073741824,-1] and [-2147483648,-1073741825] -DEBUG: join prunable for intervals [-1073741824,-1] and [0,1073741823] -DEBUG: join prunable for intervals [-1073741824,-1] and [1073741824,2147483647] -DEBUG: join prunable for intervals [0,1073741823] and [-2147483648,-1073741825] -DEBUG: join prunable for intervals [0,1073741823] and [-1073741824,-1] -DEBUG: join prunable for intervals [0,1073741823] and [1073741824,2147483647] -DEBUG: join prunable for intervals [1073741824,2147483647] and [-2147483648,-1073741825] -DEBUG: join prunable for intervals [1073741824,2147483647] and [-1073741824,-1] -DEBUG: join prunable for intervals [1073741824,2147483647] and [0,1073741823] -DEBUG: pruning merge fetch taskId 1 -DETAIL: Creating dependency on merge taskId 9 -DEBUG: pruning merge fetch taskId 3 -DETAIL: Creating dependency on merge taskId 14 -DEBUG: pruning merge fetch taskId 5 -DETAIL: Creating dependency on merge taskId 19 -DEBUG: pruning merge fetch taskId 7 -DETAIL: Creating dependency on merge taskId 24 ERROR: the query contains a join that requires repartitioning HINT: Set citus.enable_repartition_joins to on to enable repartitioning -- two single repartitions again, but this diff --git a/src/test/regress/sql/cross_join.sql b/src/test/regress/sql/cross_join.sql index a083fa527..d30c3d8bc 100644 --- a/src/test/regress/sql/cross_join.sql +++ b/src/test/regress/sql/cross_join.sql @@ -65,7 +65,9 @@ reset citus.enable_repartition_joins; -- although the following has the "ref LEFT JOIN dist" type of query, the LEFT JOIN is eliminated by Postgres -- because the INNER JOIN eliminates the LEFT JOIN +SET citus.enable_repartition_joins TO on; SELECT count(*) FROM users_ref_test_table ref1 CROSS JOIN users_ref_test_table ref2 LEFT JOIN users_table ON (ref1.id = users_table.user_id) JOIN users_table u2 ON (u2.user_id = users_table.user_id); +reset citus.enable_repartition_joins; -- this is the same query as the above, but this time the outer query is also LEFT JOIN, meaning that Postgres -- cannot eliminate the outer join diff --git a/src/test/regress/sql/non_colocated_outer_joins.sql b/src/test/regress/sql/non_colocated_outer_joins.sql index 836c64ed7..ffe6ec190 100644 --- a/src/test/regress/sql/non_colocated_outer_joins.sql +++ b/src/test/regress/sql/non_colocated_outer_joins.sql @@ -205,6 +205,43 @@ SELECT tt1.*, t3.* FROM (SELECT t1.* FROM test_hash1 t1, test_hash2 t2) tt1 LEFT SELECT t1.*, t2.* FROM test_hash1 t1 LEFT JOIN ( test_hash2 t2 JOIN test_hash3 t3 ON t2.col2 = t3.col1) ON (t1.col1 = t2.col1) ORDER BY 1,2,3,4; + +-- sometimes join filters are pushed down and applied before join by PG +CREATE TABLE dist1 (x INT, y INT); +CREATE TABLE dist2 (x INT, y INT); +SELECT create_distributed_table('dist1','x'); +SELECT create_distributed_table('dist2','x'); +INSERT INTO dist1 VALUES (1,2); +INSERT INTO dist1 VALUES (3,4); +INSERT INTO dist2 VALUES (1,2); +INSERT INTO dist2 VALUES (5,6); + +-- single join condition +SELECT * FROM dist1 LEFT JOIN dist2 ON (dist1.x = dist2.x) ORDER BY 1,2,3,4; +-- single join condition and dist2.x >2 will be pushed down as it is on inner part of the join. e.g. filter out dist2.x <= 2 beforehand +SELECT * FROM dist1 LEFT JOIN dist2 ON (dist1.x = dist2.x AND dist2.x >2) ORDER BY 1,2,3,4; +-- single join condition and dist2.x >2 is regular filter and applied after join +SELECT * FROM dist1 LEFT JOIN dist2 ON (dist1.x = dist2.x) WHERE dist2.x >2 ORDER BY 1,2,3,4; +-- single join condition and dist1.x >2 will not be pushed down as it is on outer part of the join +SELECT * FROM dist1 LEFT JOIN dist2 ON (dist1.x = dist2.x AND dist1.x >2) ORDER BY 1,2,3,4; +-- single join condition and dist1.x >2 is regular filter and applied after join +SELECT * FROM dist1 LEFT JOIN dist2 ON (dist1.x = dist2.x) WHERE dist1.x >2 ORDER BY 1,2,3,4; + + +-- constant false filter as join filter for left join. +-- Inner table will be converted to empty result. Constant filter will be applied before join but will not be pushdowned. +SELECT * FROM dist1 LEFT JOIN dist2 ON (dist1.y = dist2.y AND false) ORDER BY 1,2,3,4; +-- constant false filter as base filter for left join. +-- Both tables will be converted to empty result .e.g RTE_RESULT +SELECT * FROM dist1 LEFT JOIN dist2 ON (dist1.y = dist2.y) WHERE false ORDER BY 1,2,3,4; +-- constant false filter as join filter for inner join. +-- Both tables will be converted to empty result .e.g RTE_RESULT +SELECT * FROM dist1 INNER JOIN dist2 ON (dist1.y = dist2.y AND false) ORDER BY 1,2,3,4; +-- constant false filter as base filter for inner join. +-- Both tables will be converted to empty result .e.g RTE_RESULT +SELECT * FROM dist1 INNER JOIN dist2 ON (dist1.y = dist2.y) WHERE false ORDER BY 1,2,3,4; + + DROP SCHEMA non_colocated_outer_joins CASCADE; RESET client_min_messages; RESET citus.log_multi_join_order;