diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index a10c53d68..aa2555767 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -135,6 +135,9 @@ static DeferredErrorMessage * InsertPartitionColumnMatchesSelect(Query *query, subqueryRte, Oid * selectPartitionColumnTableId); +static Query * FindTopLevelJoinQuery(Query *query); +static void AddUninstantiatedEqualityQualToRelation(Query *query); +static Var * GetFirstTargetListVar(List *targetList); static void AddUninstantiatedEqualityQual(Query *query, Var *targetPartitionColumnVar); static DeferredErrorMessage * ErrorIfQueryHasModifyingCTE(Query *queryTree); @@ -1092,9 +1095,7 @@ AddUninstantiatedPartitionRestriction(Query *originalQuery) { Query *subquery = NULL; RangeTblEntry *subqueryEntry = NULL; - ListCell *targetEntryCell = NULL; - Var *targetPartitionColumnVar = NULL; - List *targetList = NULL; + Query *topLevelJoinQuery = NULL; Assert(InsertSelectQuery(originalQuery)); @@ -1113,31 +1114,180 @@ AddUninstantiatedPartitionRestriction(Query *originalQuery) return; } - /* iterate through the target list and find the partition column on the target list */ - targetList = subquery->targetList; - foreach(targetEntryCell, targetList) - { - TargetEntry *targetEntry = lfirst(targetEntryCell); - if (IsPartitionColumn(targetEntry->expr, subquery) && - IsA(targetEntry->expr, Var)) + topLevelJoinQuery = FindTopLevelJoinQuery(subquery); + if (topLevelJoinQuery != NULL) + { + FromExpr *joinTree = topLevelJoinQuery->jointree; + List *whereClauseList = QualifierList(joinTree); + List *joinClauseList = JoinClauseList(whereClauseList); + + OpExpr *operatorExpression = (OpExpr *) linitial(joinClauseList); + List *argumentList = operatorExpression->args; + + /* get left and right side of the expression */ + Node *leftArgument = (Node *) linitial(argumentList); + + List *leftColumnList = pull_var_clause_default(leftArgument); + Var *leftColumn = (Var *) linitial(leftColumnList); + + AddUninstantiatedEqualityQual(topLevelJoinQuery, leftColumn); + } + else + { + AddUninstantiatedEqualityQualToRelation(subquery); + } +} + + +/* + * FindTopLevelJoinQuery recursively traverses the query to find the + * top level JOIN query that exists in the query. If found, the query that + * includes the JOIN is returned. Else, NULL returned. + */ +static Query * +FindTopLevelJoinQuery(Query *query) +{ + FromExpr *joinTree = NULL; + List *whereClauseList = NULL; + List *joinClauseList = NULL; + RangeTblEntry *rte = NULL; + + joinTree = query->jointree; + whereClauseList = QualifierList(joinTree); + joinClauseList = JoinClauseList(whereClauseList); + + if (list_length(joinClauseList) > 0) + { + return query; + } + + rte = list_nth(query->rtable, 0); + if (rte->rtekind == RTE_RELATION) + { + return NULL; + } + + if (rte->rtekind == RTE_SUBQUERY) + { + return FindTopLevelJoinQuery(rte->subquery); + } + + return NULL; +} + + +/* + * AddUninstantiatedEqualityQualToRelation iterates over query's + * range table list and finds and adds the partition restriction + * to the range table entry that is either (i) RTE_RELATION or + * (ii) Recurse in to the two arguments of a set operation until + * RTE_RELATION is found. + * + * Once RTE_RELATION is found, add the qual using + * AddUninstantiatedEqualityQual(). + * + */ +static void +AddUninstantiatedEqualityQualToRelation(Query *query) +{ + List *rangeTableList = query->rtable; + ListCell *rteCell = NULL; + foreach(rteCell, rangeTableList) + { + RangeTblEntry *rte = lfirst(rteCell); + + if (rte->rtekind == RTE_RELATION) { - targetPartitionColumnVar = (Var *) targetEntry->expr; + if (IsDistributedTable(rte->relid) && PartitionColumn(rte->relid, 1) != NULL) + { + Var *targetVar = GetFirstTargetListVar(query->targetList); + if (targetVar) + { + AddUninstantiatedEqualityQual(query, PartitionColumn(rte->relid, + targetVar->varno)); + return; + } + } + } + else if (rte->rtekind == RTE_SUBQUERY) + { + Query *subquery = rte->subquery; + SetOperationStmt *unionStatement = NULL; + Query *leftQuery = NULL; + Query *rightQuery = NULL; + RangeTblRef *leftRangeTableReference = NULL; + RangeTblRef *rightRangeTableReference = NULL; + List *unionQueryRangeTableList = NULL; + int leftTableIndex = 0; + int rightTableIndex = 0; + RangeTblEntry *leftRangeTableEntry = NULL; + RangeTblEntry *rightRangeTableEntry = NULL; + + /* if does not have any set operations, recurse into the subquery */ + if (subquery->setOperations == NULL) + { + AddUninstantiatedEqualityQualToRelation(rte->subquery); + return; + } + + unionStatement = (SetOperationStmt *) subquery->setOperations; + + leftRangeTableReference = (RangeTblRef *) unionStatement->larg; + rightRangeTableReference = (RangeTblRef *) unionStatement->rarg; + unionQueryRangeTableList = subquery->rtable; + + leftTableIndex = leftRangeTableReference->rtindex - 1; + rightTableIndex = rightRangeTableReference->rtindex - 1; + + leftRangeTableEntry = (RangeTblEntry *) list_nth( + unionQueryRangeTableList, + leftTableIndex); + rightRangeTableEntry = (RangeTblEntry *) list_nth( + unionQueryRangeTableList, + rightTableIndex); + + Assert(leftRangeTableEntry->rtekind == RTE_SUBQUERY); + Assert(rightRangeTableEntry->rtekind == RTE_SUBQUERY); + + leftQuery = leftRangeTableEntry->subquery; + rightQuery = rightRangeTableEntry->subquery; + + AddUninstantiatedEqualityQualToRelation(leftQuery); + AddUninstantiatedEqualityQualToRelation(rightQuery); + + return; + } + else + { + ereport(DEBUG4, (errmsg("unexpected rte kind:%d", rte->rtekind))); + } + } +} + + +/* + * GetFirstTargetListVar iterates through the given target list entries and returns + * the first target list entry whose type is Var. Otherwise, the function returns NULL. + */ +static Var * +GetFirstTargetListVar(List *targetList) +{ + Var *targetVar = NULL; + ListCell *targetListCell = NULL; + + foreach(targetListCell, targetList) + { + TargetEntry *targetEntry = (TargetEntry *) lfirst(targetListCell); + + if (IsA(targetEntry->expr, Var)) + { + targetVar = (Var *) targetEntry->expr; break; } } - /* - * If we cannot find the bare partition column, no need to add the qual since - * we're already going to error out on the multi planner. - */ - if (!targetPartitionColumnVar) - { - return; - } - - /* finally add the equality qual of target column to subquery */ - AddUninstantiatedEqualityQual(subquery, targetPartitionColumnVar); + return targetVar; } diff --git a/src/test/regress/expected/multi_insert_select.out b/src/test/regress/expected/multi_insert_select.out index 16126f5cc..c0c0ca026 100644 --- a/src/test/regress/expected/multi_insert_select.out +++ b/src/test/regress/expected/multi_insert_select.out @@ -1225,35 +1225,8 @@ FROM raw_events_first, raw_events_second WHERE raw_events_second.user_id = raw_events_first.value_1 AND raw_events_first.value_1 = 12; -DEBUG: predicate pruning for shardId 13300001 -DEBUG: predicate pruning for shardId 13300002 -DEBUG: predicate pruning for shardId 13300003 -DEBUG: predicate pruning for shardId 13300004 -DEBUG: predicate pruning for shardId 13300005 -DEBUG: predicate pruning for shardId 13300006 -DEBUG: distributed statement: INSERT INTO public.agg_events_13300008 AS citus_table_alias (user_id) SELECT raw_events_first.user_id FROM public.raw_events_first_13300000 raw_events_first, public.raw_events_second_13300007 raw_events_second WHERE (((raw_events_second.user_id = raw_events_first.value_1) AND (raw_events_first.value_1 = 12)) AND ((hashint4(raw_events_first.user_id) >= '-2147483648'::integer) AND (hashint4(raw_events_first.user_id) <= '-1073741825'::integer))) -DEBUG: predicate pruning for shardId 13300000 -DEBUG: predicate pruning for shardId 13300002 -DEBUG: predicate pruning for shardId 13300003 -DEBUG: predicate pruning for shardId 13300004 -DEBUG: predicate pruning for shardId 13300005 -DEBUG: predicate pruning for shardId 13300006 -DEBUG: distributed statement: INSERT INTO public.agg_events_13300009 AS citus_table_alias (user_id) SELECT raw_events_first.user_id FROM public.raw_events_first_13300001 raw_events_first, public.raw_events_second_13300007 raw_events_second WHERE (((raw_events_second.user_id = raw_events_first.value_1) AND (raw_events_first.value_1 = 12)) AND ((hashint4(raw_events_first.user_id) >= '-1073741824'::integer) AND (hashint4(raw_events_first.user_id) <= '-1'::integer))) -DEBUG: predicate pruning for shardId 13300000 -DEBUG: predicate pruning for shardId 13300001 -DEBUG: predicate pruning for shardId 13300003 -DEBUG: predicate pruning for shardId 13300004 -DEBUG: predicate pruning for shardId 13300005 -DEBUG: predicate pruning for shardId 13300006 -DEBUG: distributed statement: INSERT INTO public.agg_events_13300010 AS citus_table_alias (user_id) SELECT raw_events_first.user_id FROM public.raw_events_first_13300002 raw_events_first, public.raw_events_second_13300007 raw_events_second WHERE (((raw_events_second.user_id = raw_events_first.value_1) AND (raw_events_first.value_1 = 12)) AND ((hashint4(raw_events_first.user_id) >= 0) AND (hashint4(raw_events_first.user_id) <= 1073741823))) -DEBUG: predicate pruning for shardId 13300000 -DEBUG: predicate pruning for shardId 13300001 -DEBUG: predicate pruning for shardId 13300002 -DEBUG: predicate pruning for shardId 13300004 -DEBUG: predicate pruning for shardId 13300005 -DEBUG: predicate pruning for shardId 13300006 -DEBUG: distributed statement: INSERT INTO public.agg_events_13300011 AS citus_table_alias (user_id) SELECT raw_events_first.user_id FROM public.raw_events_first_13300003 raw_events_first, public.raw_events_second_13300007 raw_events_second WHERE (((raw_events_second.user_id = raw_events_first.value_1) AND (raw_events_first.value_1 = 12)) AND ((hashint4(raw_events_first.user_id) >= 1073741824) AND (hashint4(raw_events_first.user_id) <= 2147483647))) -DEBUG: Plan is router executable +ERROR: cannot perform distributed planning for the given modification +DETAIL: Select query cannot be pushed down to the worker. -- some unsupported LEFT/INNER JOINs -- JOIN on one table with partition column other is not INSERT INTO agg_events (user_id) @@ -1292,9 +1265,6 @@ SELECT raw_events_first.user_id FROM raw_events_first LEFT JOIN raw_events_second ON raw_events_first.value_1 = raw_events_second.value_1; -DEBUG: predicate pruning for shardId 13300001 -DEBUG: predicate pruning for shardId 13300002 -DEBUG: predicate pruning for shardId 13300003 ERROR: cannot perform distributed planning for the given modification DETAIL: Select query cannot be pushed down to the worker. -- same as the above with INNER JOIN @@ -1383,9 +1353,6 @@ SELECT raw_events_first.user_id FROM raw_events_first, raw_events_second WHERE raw_events_second.user_id = raw_events_first.value_1; -DEBUG: predicate pruning for shardId 13300001 -DEBUG: predicate pruning for shardId 13300002 -DEBUG: predicate pruning for shardId 13300003 ERROR: cannot perform distributed planning for the given modification DETAIL: Select query cannot be pushed down to the worker. -- the following is again a very tricky query for Citus @@ -1398,9 +1365,6 @@ FROM raw_events_first, raw_events_second WHERE raw_events_second.user_id = raw_events_first.value_1 AND raw_events_first.value_2 = 12; -DEBUG: predicate pruning for shardId 13300001 -DEBUG: predicate pruning for shardId 13300002 -DEBUG: predicate pruning for shardId 13300003 ERROR: cannot perform distributed planning for the given modification DETAIL: Select query cannot be pushed down to the worker. -- foo is not joined on the partition key so the query is not @@ -1630,12 +1594,6 @@ outer_most.id, max(outer_most.value) HAVING SUM(raw_events_second.value_4) > 10) AS foo2 ) as f2 ON (f.id != f2.id)) as outer_most GROUP BY outer_most.id; -DEBUG: predicate pruning for shardId 13300001 -DEBUG: predicate pruning for shardId 13300002 -DEBUG: predicate pruning for shardId 13300003 -DEBUG: predicate pruning for shardId 13300005 -DEBUG: predicate pruning for shardId 13300006 -DEBUG: predicate pruning for shardId 13300007 ERROR: cannot perform distributed planning for the given modification DETAIL: Select query cannot be pushed down to the worker. -- cannot pushdown since subquery returns another column than partition key