From 4559d02c41b8e4001f3aee90bc676a253030cfcd Mon Sep 17 00:00:00 2001 From: SaitTalhaNisanci Date: Thu, 29 Jul 2021 13:52:55 +0300 Subject: [PATCH] Fix union pushdown issue (#5079) * Fix UNION not being pushdown Postgres optimizes column fields that are not needed in the output. We were relying on these fields to understand if it is safe to push down a union query. This fix looks at the parse query, which has the original column fields to detect if it is safe to push down a union query. * Add more tests * Simplify code and make it more robust * Process varlevelsup > 0 in FindReferencedTableColumn * Only look for outers vars in union path * Add more comments * Remove UNION ALL specific logic for pulling up childvars --- .../planner/insert_select_planner.c | 17 +- .../planner/multi_logical_optimizer.c | 83 ++- .../planner/multi_logical_planner.c | 10 +- .../planner/multi_router_planner.c | 3 +- .../planner/query_pushdown_planning.c | 6 +- .../distributed/planner/recursive_planning.c | 2 +- .../relation_restriction_equivalence.c | 202 +++++-- .../distributed/multi_logical_optimizer.h | 6 +- .../relation_restriction_equivalence.h | 2 +- .../expected/local_dist_join_mixed.out | 5 +- .../multi_agg_approximate_distinct.out | 16 + .../multi_agg_approximate_distinct_0.out | 25 + src/test/regress/expected/union_pushdown.out | 514 +++++++++++++++++- src/test/regress/multi_schedule | 3 +- .../sql/multi_agg_approximate_distinct.sql | 12 + src/test/regress/sql/union_pushdown.sql | 399 +++++++++++++- 16 files changed, 1201 insertions(+), 104 deletions(-) diff --git a/src/backend/distributed/planner/insert_select_planner.c b/src/backend/distributed/planner/insert_select_planner.c index 137e19fc7..e7260946f 100644 --- a/src/backend/distributed/planner/insert_select_planner.c +++ b/src/backend/distributed/planner/insert_select_planner.c @@ -1136,7 +1136,8 @@ HasUnsupportedDistinctOn(Query *query) TargetEntry *distinctEntry = get_sortgroupclause_tle(distinctClause, query->targetList); - if (IsPartitionColumn(distinctEntry->expr, query)) + bool skipOuterVars = true; + if (IsPartitionColumn(distinctEntry->expr, query, skipOuterVars)) { return false; } @@ -1170,7 +1171,6 @@ InsertPartitionColumnMatchesSelect(Query *query, RangeTblEntry *insertRte, { TargetEntry *targetEntry = (TargetEntry *) lfirst(targetEntryCell); List *insertTargetEntryColumnList = pull_var_clause_default((Node *) targetEntry); - Oid subqueryPartitionColumnRelationId = InvalidOid; Var *subqueryPartitionColumn = NULL; /* @@ -1202,11 +1202,18 @@ InsertPartitionColumnMatchesSelect(Query *query, RangeTblEntry *insertRte, insertVar->varattno - 1); Expr *selectTargetExpr = subqueryTargetEntry->expr; + RangeTblEntry *subqueryPartitionColumnRelationIdRTE = NULL; List *parentQueryList = list_make2(query, subquery); + bool skipOuterVars = true; FindReferencedTableColumn(selectTargetExpr, parentQueryList, subquery, - &subqueryPartitionColumnRelationId, - &subqueryPartitionColumn); + &subqueryPartitionColumn, + &subqueryPartitionColumnRelationIdRTE, + skipOuterVars); + Oid subqueryPartitionColumnRelationId = subqueryPartitionColumnRelationIdRTE ? + subqueryPartitionColumnRelationIdRTE-> + relid : + InvalidOid; /* * Corresponding (i.e., in the same ordinal position as the target table's @@ -1339,7 +1346,7 @@ InsertPartitionColumnMatchesSelect(Query *query, RangeTblEntry *insertRte, } /* finally, check that the select target column is a partition column */ - if (!IsPartitionColumn(selectTargetExpr, subquery)) + if (!IsPartitionColumn(selectTargetExpr, subquery, skipOuterVars)) { return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, "cannot perform distributed INSERT INTO ... SELECT " diff --git a/src/backend/distributed/planner/multi_logical_optimizer.c b/src/backend/distributed/planner/multi_logical_optimizer.c index 615c0ddbe..d138ea635 100644 --- a/src/backend/distributed/planner/multi_logical_optimizer.c +++ b/src/backend/distributed/planner/multi_logical_optimizer.c @@ -327,7 +327,7 @@ static bool ShouldProcessDistinctOrderAndLimitForWorker( ExtendedOpNodeProperties *extendedOpNodeProperties, bool pushingDownOriginalGrouping, Node *havingQual); - +static bool IsIndexInRange(const List *list, int index); /* * MultiLogicalPlanOptimize applies multi-relational algebra optimizations on @@ -4372,16 +4372,19 @@ GroupTargetEntryList(List *groupClauseList, List *targetEntryList) * reference tables do not have partition column. The function does not * support queries with CTEs, it would return false if columnExpression * refers to a column returned by a CTE. + * + * If skipOuterVars is true, then it doesn't process the outervars. */ bool -IsPartitionColumn(Expr *columnExpression, Query *query) +IsPartitionColumn(Expr *columnExpression, Query *query, bool skipOuterVars) { bool isPartitionColumn = false; - Oid relationId = InvalidOid; Var *column = NULL; + RangeTblEntry *relationRTE = NULL; - FindReferencedTableColumn(columnExpression, NIL, query, &relationId, &column); - + FindReferencedTableColumn(columnExpression, NIL, query, &column, &relationRTE, + skipOuterVars); + Oid relationId = relationRTE ? relationRTE->relid : InvalidOid; if (relationId != InvalidOid && column != NULL) { Var *partitionColumn = DistPartitionKey(relationId); @@ -4400,21 +4403,26 @@ IsPartitionColumn(Expr *columnExpression, Query *query) /* * FindReferencedTableColumn recursively traverses query tree to find actual relation * id, and column that columnExpression refers to. If columnExpression is a - * non-relational or computed/derived expression, the function returns InvalidOid for - * relationId and NULL for column. The caller should provide parent query list from + * non-relational or computed/derived expression, the function returns NULL for + * rte and NULL for column. The caller should provide parent query list from * top of the tree to this particular Query's parent. This argument is used to look * into CTEs that may be present in the query. + * + * If skipOuterVars is true, then it doesn't check vars coming from outer queries. + * We probably don't need this skipOuterVars check but we wanted to be on the safe side + * and used it only in UNION path, we can separately work on verifying that it doesn't break + * anything existing. */ void FindReferencedTableColumn(Expr *columnExpression, List *parentQueryList, Query *query, - Oid *relationId, Var **column) + Var **column, RangeTblEntry **rteContainingReferencedColumn, + bool skipOuterVars) { Var *candidateColumn = NULL; - List *rangetableList = query->rtable; Expr *strippedColumnExpression = (Expr *) strip_implicit_coercions( (Node *) columnExpression); - *relationId = InvalidOid; + *rteContainingReferencedColumn = NULL; *column = NULL; if (IsA(strippedColumnExpression, Var)) @@ -4443,9 +4451,28 @@ FindReferencedTableColumn(Expr *columnExpression, List *parentQueryList, Query * * subqueries in WHERE clause, we don't support use of partition keys * in the subquery that is referred from the outer query. */ - if (candidateColumn->varlevelsup > 0) + if (candidateColumn->varlevelsup > 0 && !skipOuterVars) { - return; + int parentQueryIndex = list_length(parentQueryList) - + candidateColumn->varlevelsup; + if (!(IsIndexInRange(parentQueryList, parentQueryIndex))) + { + return; + } + + /* + * Before we recurse into the query tree, we should update the candidateColumn and we use copy of it. + * As we get the query from varlevelsup up, we reset the varlevelsup. + */ + candidateColumn = copyObject(candidateColumn); + candidateColumn->varlevelsup = 0; + + /* + * We should be careful about these fields because they need to + * be updated correctly based on ctelevelsup and varlevelsup. + */ + query = list_nth(parentQueryList, parentQueryIndex); + parentQueryList = list_truncate(parentQueryList, parentQueryIndex); } if (candidateColumn->varattno == InvalidAttrNumber) @@ -4457,12 +4484,13 @@ FindReferencedTableColumn(Expr *columnExpression, List *parentQueryList, Query * return; } + List *rangetableList = query->rtable; int rangeTableEntryIndex = candidateColumn->varno - 1; RangeTblEntry *rangeTableEntry = list_nth(rangetableList, rangeTableEntryIndex); if (rangeTableEntry->rtekind == RTE_RELATION) { - *relationId = rangeTableEntry->relid; + *rteContainingReferencedColumn = rangeTableEntry; *column = candidateColumn; } else if (rangeTableEntry->rtekind == RTE_SUBQUERY) @@ -4476,7 +4504,8 @@ FindReferencedTableColumn(Expr *columnExpression, List *parentQueryList, Query * /* append current query to parent query list */ parentQueryList = lappend(parentQueryList, query); FindReferencedTableColumn(subColumnExpression, parentQueryList, - subquery, relationId, column); + subquery, column, rteContainingReferencedColumn, + skipOuterVars); } else if (rangeTableEntry->rtekind == RTE_JOIN) { @@ -4485,11 +4514,17 @@ FindReferencedTableColumn(Expr *columnExpression, List *parentQueryList, Query * Expr *joinColumn = list_nth(joinColumnList, joinColumnIndex); /* parent query list stays the same since still in the same query boundary */ - FindReferencedTableColumn(joinColumn, parentQueryList, query, - relationId, column); + FindReferencedTableColumn(joinColumn, parentQueryList, query, column, + rteContainingReferencedColumn, skipOuterVars); } else if (rangeTableEntry->rtekind == RTE_CTE) { + /* + * When outerVars are considered, we modify parentQueryList, so this + * logic might need to change when we support outervars in CTEs. + */ + Assert(!skipOuterVars); + int cteParentListIndex = list_length(parentQueryList) - rangeTableEntry->ctelevelsup - 1; Query *cteParentQuery = NULL; @@ -4501,7 +4536,7 @@ FindReferencedTableColumn(Expr *columnExpression, List *parentQueryList, Query * * moment due to usage from IsPartitionColumn. Callers of that function * do not have access to parent query list. */ - if (cteParentListIndex >= 0) + if (IsIndexInRange(parentQueryList, cteParentListIndex)) { cteParentQuery = list_nth(parentQueryList, cteParentListIndex); cteList = cteParentQuery->cteList; @@ -4526,12 +4561,24 @@ FindReferencedTableColumn(Expr *columnExpression, List *parentQueryList, Query * parentQueryList = lappend(parentQueryList, query); FindReferencedTableColumn(targetEntry->expr, parentQueryList, - cteQuery, relationId, column); + cteQuery, column, rteContainingReferencedColumn, + skipOuterVars); } } } +/* + * IsIndexInRange returns true if the given index is within the + * range of the given list. + */ +static bool +IsIndexInRange(const List *list, int index) +{ + return index >= 0 && index < list_length(list); +} + + /* * ExtractQueryWalker walks over a query, and finds all queries in the query * tree and returns these queries. Note that the function also recurses into diff --git a/src/backend/distributed/planner/multi_logical_planner.c b/src/backend/distributed/planner/multi_logical_planner.c index e72150d5a..0691df7cb 100644 --- a/src/backend/distributed/planner/multi_logical_planner.c +++ b/src/backend/distributed/planner/multi_logical_planner.c @@ -215,11 +215,15 @@ TargetListOnPartitionColumn(Query *query, List *targetEntryList) TargetEntry *targetEntry = (TargetEntry *) lfirst(targetEntryCell); Expr *targetExpression = targetEntry->expr; - bool isPartitionColumn = IsPartitionColumn(targetExpression, query); - Oid relationId = InvalidOid; + bool skipOuterVars = true; + bool isPartitionColumn = IsPartitionColumn(targetExpression, query, + skipOuterVars); Var *column = NULL; + RangeTblEntry *rte = NULL; - FindReferencedTableColumn(targetExpression, NIL, query, &relationId, &column); + FindReferencedTableColumn(targetExpression, NIL, query, &column, &rte, + skipOuterVars); + Oid relationId = rte ? rte->relid : InvalidOid; /* * If the expression belongs to a non-distributed table continue searching for diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index ba1003c03..7b310e5be 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -358,7 +358,8 @@ AddPartitionKeyNotNullFilterToSelect(Query *subqery) { TargetEntry *targetEntry = lfirst(targetEntryCell); - if (IsPartitionColumn(targetEntry->expr, subqery) && + bool skipOuterVars = true; + if (IsPartitionColumn(targetEntry->expr, subqery, skipOuterVars) && IsA(targetEntry->expr, Var)) { targetPartitionColumnVar = (Var *) targetEntry->expr; diff --git a/src/backend/distributed/planner/query_pushdown_planning.c b/src/backend/distributed/planner/query_pushdown_planning.c index 632b9dd72..e8452a357 100644 --- a/src/backend/distributed/planner/query_pushdown_planning.c +++ b/src/backend/distributed/planner/query_pushdown_planning.c @@ -582,7 +582,7 @@ DeferErrorIfUnsupportedSubqueryPushdown(Query *originalQuery, */ if (ContainsUnionSubquery(originalQuery)) { - if (!SafeToPushdownUnionSubquery(plannerRestrictionContext)) + if (!SafeToPushdownUnionSubquery(originalQuery, plannerRestrictionContext)) { return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, "cannot pushdown the subquery since not all subqueries " @@ -1836,7 +1836,9 @@ PartitionColumnForPushedDownSubquery(Query *query) Expr *targetExpression = targetEntry->expr; if (IsA(targetExpression, Var)) { - bool isPartitionColumn = IsPartitionColumn(targetExpression, query); + bool skipOuterVars = true; + bool isPartitionColumn = IsPartitionColumn(targetExpression, query, + skipOuterVars); if (isPartitionColumn) { Var *partitionColumn = copyObject((Var *) targetExpression); diff --git a/src/backend/distributed/planner/recursive_planning.c b/src/backend/distributed/planner/recursive_planning.c index 209714f74..b2a8140da 100644 --- a/src/backend/distributed/planner/recursive_planning.c +++ b/src/backend/distributed/planner/recursive_planning.c @@ -1013,7 +1013,7 @@ ShouldRecursivelyPlanSetOperation(Query *query, RecursivePlanningContext *contex PlannerRestrictionContext *filteredRestrictionContext = FilterPlannerRestrictionForQuery(context->plannerRestrictionContext, query); - if (!SafeToPushdownUnionSubquery(filteredRestrictionContext)) + if (!SafeToPushdownUnionSubquery(query, filteredRestrictionContext)) { /* * The distribution column is not in the same place in all sides diff --git a/src/backend/distributed/planner/relation_restriction_equivalence.c b/src/backend/distributed/planner/relation_restriction_equivalence.c index fb6a6a306..5cbd2e53d 100644 --- a/src/backend/distributed/planner/relation_restriction_equivalence.c +++ b/src/backend/distributed/planner/relation_restriction_equivalence.c @@ -59,6 +59,12 @@ typedef struct AttributeEquivalenceClass Index unionQueryPartitionKeyIndex; } AttributeEquivalenceClass; +typedef struct FindQueryContainingRteIdentityContext +{ + int targetRTEIdentity; + Query *query; +}FindQueryContainingRteIdentityContext; + /* * AttributeEquivalenceClassMember - one member expression of an * AttributeEquivalenceClass. The important thing to consider is that @@ -142,8 +148,8 @@ static void ListConcatUniqueAttributeClassMemberLists(AttributeEquivalenceClass firstClass, AttributeEquivalenceClass * secondClass); -static Index RelationRestrictionPartitionKeyIndex(RelationRestriction * - relationRestriction); +static Var * PartitionKeyForRTEIdentityInQuery(Query *query, int targetRTEIndex, + Index *partitionKeyIndex); static bool AllRelationsInRestrictionContextColocated(RelationRestrictionContext * restrictionContext); static bool IsNotSafeRestrictionToRecursivelyPlan(Node *node); @@ -155,6 +161,12 @@ static bool RangeTableArrayContainsAnyRTEIdentities(RangeTblEntry **rangeTableEn queryRteIdentities); static Relids QueryRteIdentities(Query *queryTree); +static Query * FindQueryContainingRTEIdentity(Query *mainQuery, int rteIndex); +static bool FindQueryContainingRTEIdentityInternal(Node *node, + FindQueryContainingRteIdentityContext * + context); + + #if PG_VERSION_NUM >= PG_VERSION_13 static int ParentCountPriorToAppendRel(List *appendRelList, AppendRelInfo *appendRelInfo); #endif @@ -194,7 +206,7 @@ AllDistributionKeysInQueryAreEqual(Query *originalQuery, if (originalQuery->setOperations || ContainsUnionSubquery(originalQuery)) { - return SafeToPushdownUnionSubquery(plannerRestrictionContext); + return SafeToPushdownUnionSubquery(originalQuery, plannerRestrictionContext); } return false; @@ -244,7 +256,8 @@ ContextContainsLocalRelation(RelationRestrictionContext *restrictionContext) * safe to push down, the function would fail to return true. */ bool -SafeToPushdownUnionSubquery(PlannerRestrictionContext *plannerRestrictionContext) +SafeToPushdownUnionSubquery(Query *originalQuery, + PlannerRestrictionContext *plannerRestrictionContext) { RelationRestrictionContext *restrictionContext = plannerRestrictionContext->relationRestrictionContext; @@ -267,50 +280,34 @@ SafeToPushdownUnionSubquery(PlannerRestrictionContext *plannerRestrictionContext RelationRestriction *relationRestriction = lfirst(relationRestrictionCell); Index partitionKeyIndex = InvalidAttrNumber; PlannerInfo *relationPlannerRoot = relationRestriction->plannerInfo; - List *targetList = relationPlannerRoot->parse->targetList; - List *appendRelList = relationPlannerRoot->append_rel_list; - Var *varToBeAdded = NULL; - TargetEntry *targetEntryToAdd = NULL; + + int targetRTEIndex = GetRTEIdentity(relationRestriction->rte); + Var *varToBeAdded = + PartitionKeyForRTEIdentityInQuery(originalQuery, targetRTEIndex, + &partitionKeyIndex); + + /* union does not have partition key in the target list */ + if (partitionKeyIndex == 0) + { + continue; + } /* - * We first check whether UNION ALLs are pulled up or not. Note that Postgres - * planner creates AppendRelInfos per each UNION ALL query that is pulled up. - * Then, postgres stores the related information in the append_rel_list on the - * plannerInfo struct. + * This should never happen but to be on the safe side, we have this */ - if (appendRelList != NULL) + if (relationPlannerRoot->simple_rel_array_size < relationRestriction->index) { - varToBeAdded = FindUnionAllVar(relationPlannerRoot, - relationRestriction->translatedVars, - relationRestriction->relationId, - relationRestriction->index, - &partitionKeyIndex); - - /* union does not have partition key in the target list */ - if (partitionKeyIndex == 0) - { - continue; - } + continue; } - else - { - partitionKeyIndex = - RelationRestrictionPartitionKeyIndex(relationRestriction); - /* union does not have partition key in the target list */ - if (partitionKeyIndex == 0) - { - continue; - } + /* + * We update the varno because we use the original parse tree for finding the + * var. However the rest of the code relies on a query tree that might be different + * than the original parse tree because of postgres optimizations. + * That's why we update the varno to reflect the rteIndex in the modified query tree. + */ + varToBeAdded->varno = relationRestriction->index; - targetEntryToAdd = list_nth(targetList, partitionKeyIndex - 1); - if (!IsA(targetEntryToAdd->expr, Var)) - { - continue; - } - - varToBeAdded = (Var *) targetEntryToAdd->expr; - } /* * The current relation does not have its partition key in the target list. @@ -1756,20 +1753,42 @@ ContainsUnionSubquery(Query *queryTree) /* - * RelationRestrictionPartitionKeyIndex gets a relation restriction and finds the - * index that the partition key of the relation exists in the query. The query is - * found in the planner info of the relation restriction. + * PartitionKeyForRTEIdentityInQuery finds the partition key var(if exists), + * in the given original query for the rte that has targetRTEIndex. */ -static Index -RelationRestrictionPartitionKeyIndex(RelationRestriction *relationRestriction) +static Var * +PartitionKeyForRTEIdentityInQuery(Query *originalQuery, int targetRTEIndex, + Index *partitionKeyIndex) { + Query *originalQueryContainingRTEIdentity = + FindQueryContainingRTEIdentity(originalQuery, targetRTEIndex); + if (!originalQueryContainingRTEIdentity) + { + /* + * We should always find the query but we have this check for sanity. + * This check makes sure that if there is a bug while finding the query, + * we don't get a crash etc. and the only downside will be we might be recursively + * planning a query that could be pushed down. + */ + return NULL; + } + + /* + * This approach fails to detect when + * the top level query might have the column indexes in different order: + * explain + * SELECT count(*) FROM + * ( + * SELECT user_id,value_2 FROM events_table + * UNION + * SELECT value_2, user_id FROM (SELECT user_id, value_2, random() FROM events_table) as foo + * ) foobar; + * So we hit https://github.com/citusdata/citus/issues/5093. + */ + List *relationTargetList = originalQueryContainingRTEIdentity->targetList; + ListCell *targetEntryCell = NULL; Index partitionKeyTargetAttrIndex = 0; - - PlannerInfo *relationPlannerRoot = relationRestriction->plannerInfo; - Query *relationPlannerParseQuery = relationPlannerRoot->parse; - List *relationTargetList = relationPlannerParseQuery->targetList; - foreach(targetEntryCell, relationTargetList) { TargetEntry *targetEntry = (TargetEntry *) lfirst(targetEntryCell); @@ -1777,20 +1796,93 @@ RelationRestrictionPartitionKeyIndex(RelationRestriction *relationRestriction) partitionKeyTargetAttrIndex++; + bool skipOuterVars = false; if (!targetEntry->resjunk && IsA(targetExpression, Var) && - IsPartitionColumn(targetExpression, relationPlannerParseQuery)) + IsPartitionColumn(targetExpression, originalQueryContainingRTEIdentity, + skipOuterVars)) { Var *targetColumn = (Var *) targetExpression; - if (targetColumn->varno == relationRestriction->index) + /* + * We find the referenced table column to support distribution + * columns that are correlated. + */ + RangeTblEntry *rteContainingPartitionKey = NULL; + FindReferencedTableColumn(targetExpression, NIL, + originalQueryContainingRTEIdentity, + &targetColumn, + &rteContainingPartitionKey, + skipOuterVars); + + if (rteContainingPartitionKey->rtekind == RTE_RELATION && + GetRTEIdentity(rteContainingPartitionKey) == targetRTEIndex) { - return partitionKeyTargetAttrIndex; + *partitionKeyIndex = partitionKeyTargetAttrIndex; + return (Var *) copyObject(targetColumn); } } } - return InvalidAttrNumber; + return NULL; +} + + +/* + * FindQueryContainingRTEIdentity finds the query/subquery that has an RTE + * with rteIndex in its rtable. + */ +static Query * +FindQueryContainingRTEIdentity(Query *query, int rteIndex) +{ + FindQueryContainingRteIdentityContext *findRteIdentityContext = + palloc0(sizeof(FindQueryContainingRteIdentityContext)); + findRteIdentityContext->targetRTEIdentity = rteIndex; + FindQueryContainingRTEIdentityInternal((Node *) query, findRteIdentityContext); + return findRteIdentityContext->query; +} + + +/* + * FindQueryContainingRTEIdentityInternal walks on the given node to find a query + * which has an RTE that has a given rteIdentity. + */ +static bool +FindQueryContainingRTEIdentityInternal(Node *node, + FindQueryContainingRteIdentityContext *context) +{ + if (node == NULL) + { + return false; + } + if (IsA(node, Query)) + { + Query *query = (Query *) node; + Query *parentQuery = context->query; + context->query = query; + if (query_tree_walker(query, FindQueryContainingRTEIdentityInternal, context, + QTW_EXAMINE_RTES_BEFORE)) + { + return true; + } + context->query = parentQuery; + return false; + } + + if (!IsA(node, RangeTblEntry)) + { + return expression_tree_walker(node, FindQueryContainingRTEIdentityInternal, + context); + } + RangeTblEntry *rte = (RangeTblEntry *) node; + if (rte->rtekind == RTE_RELATION) + { + if (GetRTEIdentity(rte) == context->targetRTEIdentity) + { + return true; + } + } + return false; } diff --git a/src/include/distributed/multi_logical_optimizer.h b/src/include/distributed/multi_logical_optimizer.h index 9e6167959..8f0c5c751 100644 --- a/src/include/distributed/multi_logical_optimizer.h +++ b/src/include/distributed/multi_logical_optimizer.h @@ -171,9 +171,11 @@ extern bool GroupedByColumn(List *groupClauseList, List *targetList, Var *column extern List * SubqueryMultiTableList(MultiNode *multiNode); extern List * GroupTargetEntryList(List *groupClauseList, List *targetEntryList); extern bool ExtractQueryWalker(Node *node, List **queryList); -extern bool IsPartitionColumn(Expr *columnExpression, Query *query); +extern bool IsPartitionColumn(Expr *columnExpression, Query *query, bool skipOuterVars); extern void FindReferencedTableColumn(Expr *columnExpression, List *parentQueryList, - Query *query, Oid *relationId, Var **column); + Query *query, Var **column, + RangeTblEntry **rteContainingReferencedColumn, + bool skipOuterVars); extern char * WorkerColumnName(AttrNumber resno); extern bool IsGroupBySubsetOfDistinct(List *groupClauses, List *distinctClauses); extern bool TargetListHasAggregates(List *targetEntryList); diff --git a/src/include/distributed/relation_restriction_equivalence.h b/src/include/distributed/relation_restriction_equivalence.h index e1ece87e6..ccd50a6db 100644 --- a/src/include/distributed/relation_restriction_equivalence.h +++ b/src/include/distributed/relation_restriction_equivalence.h @@ -20,7 +20,7 @@ extern bool AllDistributionKeysInQueryAreEqual(Query *originalQuery, PlannerRestrictionContext * plannerRestrictionContext); -extern bool SafeToPushdownUnionSubquery(PlannerRestrictionContext * +extern bool SafeToPushdownUnionSubquery(Query *originalQuery, PlannerRestrictionContext * plannerRestrictionContext); extern bool ContainsUnionSubquery(Query *queryTree); extern bool RestrictionEquivalenceForPartitionKeys(PlannerRestrictionContext * diff --git a/src/test/regress/expected/local_dist_join_mixed.out b/src/test/regress/expected/local_dist_join_mixed.out index 2baf2064d..0e3e0aad4 100644 --- a/src/test/regress/expected/local_dist_join_mixed.out +++ b/src/test/regress/expected/local_dist_join_mixed.out @@ -740,10 +740,7 @@ DEBUG: Wrapping relation "local" to a subquery DEBUG: generating subplan XXX_1 for subquery SELECT id FROM local_dist_join_mixed.local WHERE true DEBUG: Wrapping relation "local" to a subquery DEBUG: generating subplan XXX_2 for subquery SELECT id FROM local_dist_join_mixed.local WHERE true -DEBUG: generating subplan XXX_3 for subquery SELECT id, name, created_at FROM (SELECT distributed.id, distributed.name, distributed.created_at FROM ((SELECT NULL::integer AS "dummy-1", local_1.id, NULL::integer AS "dummy-3", NULL::text AS title, NULL::integer AS "dummy-5" FROM (SELECT intermediate_result.id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(id bigint)) local_1) local JOIN local_dist_join_mixed.distributed USING (id))) fo -DEBUG: generating subplan XXX_4 for subquery SELECT id, name, created_at FROM (SELECT distributed.id, distributed.name, distributed.created_at FROM ((SELECT NULL::integer AS "dummy-1", local_1.id, NULL::integer AS "dummy-3", NULL::text AS title, NULL::integer AS "dummy-5" FROM (SELECT intermediate_result.id FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(id bigint)) local_1) local JOIN local_dist_join_mixed.distributed USING (id))) ba -DEBUG: generating subplan XXX_5 for subquery SELECT intermediate_result.id, intermediate_result.name, intermediate_result.created_at FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(id bigint, name text, created_at timestamp with time zone) UNION ALL SELECT intermediate_result.id, intermediate_result.name, intermediate_result.created_at FROM read_intermediate_result('XXX_4'::text, 'binary'::citus_copy_format) intermediate_result(id bigint, name text, created_at timestamp with time zone) -DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (SELECT intermediate_result.id, intermediate_result.name, intermediate_result.created_at FROM read_intermediate_result('XXX_5'::text, 'binary'::citus_copy_format) intermediate_result(id bigint, name text, created_at timestamp with time zone)) bar +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (SELECT fo.id, fo.name, fo.created_at FROM (SELECT distributed.id, distributed.name, distributed.created_at FROM ((SELECT NULL::integer AS "dummy-1", local_1.id, NULL::integer AS "dummy-3", NULL::text AS title, NULL::integer AS "dummy-5" FROM (SELECT intermediate_result.id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(id bigint)) local_1) local JOIN local_dist_join_mixed.distributed USING (id))) fo UNION ALL SELECT ba.id, ba.name, ba.created_at FROM (SELECT distributed.id, distributed.name, distributed.created_at FROM ((SELECT NULL::integer AS "dummy-1", local_1.id, NULL::integer AS "dummy-3", NULL::text AS title, NULL::integer AS "dummy-5" FROM (SELECT intermediate_result.id FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(id bigint)) local_1) local JOIN local_dist_join_mixed.distributed USING (id))) ba) bar count --------------------------------------------------------------------- 202 diff --git a/src/test/regress/expected/multi_agg_approximate_distinct.out b/src/test/regress/expected/multi_agg_approximate_distinct.out index aaf4e0543..945a5da38 100644 --- a/src/test/regress/expected/multi_agg_approximate_distinct.out +++ b/src/test/regress/expected/multi_agg_approximate_distinct.out @@ -9,6 +9,22 @@ AS create_cmd FROM pg_available_extensions() WHERE name = 'hll' \gset :create_cmd; +SELECT public.explain_has_distributed_subplan($$ + EXPLAIN SELECT symbol_id, + HLL_ADD_AGG(HLL_HASH_BIGINT(event_id)) AS event_hll_hash, + HLL_CARDINALITY(HLL_ADD_AGG(HLL_HASH_BIGINT(event_id))) AS event_n_users + FROM ( + SELECT event_time, composite_id, event_id, 4640476 symbol_id FROM "events" + UNION ALL + SELECT event_time, composite_id, event_id, 4640477 symbol_id FROM "events" + ) pushdown_events + GROUP BY symbol_id; + $$); + explain_has_distributed_subplan +--------------------------------------------------------------------- + f +(1 row) + SET citus.coordinator_aggregation_strategy TO 'disabled'; -- Try to execute count(distinct) when approximate distincts aren't enabled SELECT count(distinct l_orderkey) FROM lineitem; diff --git a/src/test/regress/expected/multi_agg_approximate_distinct_0.out b/src/test/regress/expected/multi_agg_approximate_distinct_0.out index 3e55c0720..4b771461d 100644 --- a/src/test/regress/expected/multi_agg_approximate_distinct_0.out +++ b/src/test/regress/expected/multi_agg_approximate_distinct_0.out @@ -14,6 +14,31 @@ WHERE name = 'hll' f (1 row) +SELECT public.explain_has_distributed_subplan($$ + EXPLAIN SELECT symbol_id, + HLL_ADD_AGG(HLL_HASH_BIGINT(event_id)) AS event_hll_hash, + HLL_CARDINALITY(HLL_ADD_AGG(HLL_HASH_BIGINT(event_id))) AS event_n_users + FROM ( + SELECT event_time, composite_id, event_id, 4640476 symbol_id FROM "events" + UNION ALL + SELECT event_time, composite_id, event_id, 4640477 symbol_id FROM "events" + ) pushdown_events + GROUP BY symbol_id; + $$); +ERROR: function hll_hash_bigint(bigint) does not exist +HINT: No function matches the given name and argument types. You might need to add explicit type casts. +QUERY: + EXPLAIN SELECT symbol_id, + HLL_ADD_AGG(HLL_HASH_BIGINT(event_id)) AS event_hll_hash, + HLL_CARDINALITY(HLL_ADD_AGG(HLL_HASH_BIGINT(event_id))) AS event_n_users + FROM ( + SELECT event_time, composite_id, event_id, 4640476 symbol_id FROM "events" + UNION ALL + SELECT event_time, composite_id, event_id, 4640477 symbol_id FROM "events" + ) pushdown_events + GROUP BY symbol_id; + +CONTEXT: PL/pgSQL function explain_has_distributed_subplan(text) line XX at FOR over EXECUTE statement SET citus.coordinator_aggregation_strategy TO 'disabled'; -- Try to execute count(distinct) when approximate distincts aren't enabled SELECT count(distinct l_orderkey) FROM lineitem; diff --git a/src/test/regress/expected/union_pushdown.out b/src/test/regress/expected/union_pushdown.out index ec56f3949..3d68bd8a5 100644 --- a/src/test/regress/expected/union_pushdown.out +++ b/src/test/regress/expected/union_pushdown.out @@ -860,8 +860,10 @@ $$); (1 row) -- #4781 -CREATE TABLE test_a (id int, k int); -CREATE TABLE test_b (id int, k int); +CREATE TABLE test_a (a int, b int, id int, k int); +CREATE TABLE test_b (a int, b int, id int, k int); +ALTER TABLE test_a DROP column a; +ALTER TABLE test_b DROP column a; SELECT create_distributed_table('test_a','id'); create_distributed_table --------------------------------------------------------------------- @@ -874,6 +876,123 @@ SELECT create_distributed_table('test_b','id'); (1 row) +-- try with composite types +CREATE TYPE comp_type AS ( + int_field_1 BIGINT, + int_field_2 BIGINT +); +CREATE TABLE range_dist_table_2 (dist_col comp_type); +SELECT create_distributed_table('range_dist_table_2', 'dist_col', 'range'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +CALL public.create_range_partitioned_shards('range_dist_table_2', + '{"(10,24)","(10,58)", + "(10,90)","(20,100)"}', + '{"(10,25)","(10,65)", + "(10,99)","(20,100)"}'); +INSERT INTO range_dist_table_2 VALUES ((10, 24)); +INSERT INTO range_dist_table_2 VALUES ((10, 60)); +INSERT INTO range_dist_table_2 VALUES ((10, 91)); +INSERT INTO range_dist_table_2 VALUES ((20, 100)); +-- the following can be pushed down +CREATE OR REPLACE VIEW v2 AS SELECT * from range_dist_table_2 UNION ALL SELECT * from range_dist_table_2; +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN +SELECT COUNT(dist_col) FROM v2; +$$); + explain_has_distributed_subplan +--------------------------------------------------------------------- + f +(1 row) + +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN +SELECT COUNT(*) FROM v2; +$$); + explain_has_distributed_subplan +--------------------------------------------------------------------- + f +(1 row) + +DROP TABLE range_dist_table_2 cascade; +NOTICE: drop cascades to view v2 +-- these should be pushed down. +SELECT public.explain_has_distributed_subplan($$ +explain SELECT * FROM users_table_part u1 WHERE (user_id) IN +( +SELECT user_id FROM users_table_part +UNION +SELECT user_id FROM users_table_part +) ; +$$); + explain_has_distributed_subplan +--------------------------------------------------------------------- + f +(1 row) + +SELECT public.explain_has_distributed_subplan($$ +explain SELECT * FROM users_table_part u1 WHERE (user_id) IN +( +SELECT user_id FROM (SELECT *, random() FROM users_table_part) as foo +UNION +SELECT user_id FROM (SELECT *, random() FROM users_table_part) as bar +); +$$); + explain_has_distributed_subplan +--------------------------------------------------------------------- + f +(1 row) + +CREATE OR REPLACE VIEW v AS SELECT * from test_a where k>1 UNION SELECT * from test_b where k<1; +-- tests with union +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN +SELECT COUNT(*) FROM v; +$$); + explain_has_distributed_subplan +--------------------------------------------------------------------- + f +(1 row) + +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN +SELECT AVG(k) FROM v; +$$); + explain_has_distributed_subplan +--------------------------------------------------------------------- + f +(1 row) + +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN +SELECT SUM(k) FROM v; +$$); + explain_has_distributed_subplan +--------------------------------------------------------------------- + f +(1 row) + +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN +SELECT MAX(k) FROM v; +$$); + explain_has_distributed_subplan +--------------------------------------------------------------------- + f +(1 row) + +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN +SELECT COUNT(k) FROM v; +$$); + explain_has_distributed_subplan +--------------------------------------------------------------------- + f +(1 row) + CREATE OR REPLACE VIEW v AS SELECT * from test_a where k>1 UNION ALL SELECT * from test_b where k<1; -- the followings can be pushed down since dist_key is used in the aggregation SELECT public.explain_has_distributed_subplan($$ @@ -912,14 +1031,13 @@ $$); f (1 row) --- cannot pushed down because postgres optimizes fields, needs to be fixed with #4781 SELECT public.explain_has_distributed_subplan($$ EXPLAIN SELECT COUNT(k) FROM v; $$); explain_has_distributed_subplan --------------------------------------------------------------------- - t + f (1 row) SELECT public.explain_has_distributed_subplan($$ @@ -928,7 +1046,7 @@ SELECT AVG(k) FROM v; $$); explain_has_distributed_subplan --------------------------------------------------------------------- - t + f (1 row) SELECT public.explain_has_distributed_subplan($$ @@ -937,18 +1055,397 @@ SELECT SUM(k) FROM v; $$); explain_has_distributed_subplan --------------------------------------------------------------------- - t + f (1 row) SELECT public.explain_has_distributed_subplan($$ EXPLAIN SELECT MAX(k) FROM v; +$$); + explain_has_distributed_subplan +--------------------------------------------------------------------- + f +(1 row) + +CREATE OR REPLACE VIEW v AS (SELECT * from (SELECT * from test_a)a1 where k>1) UNION ALL SELECT * from (SELECT * from test_b)b1 where k<1; +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN +SELECT COUNT(*) FROM v; +$$); + explain_has_distributed_subplan +--------------------------------------------------------------------- + f +(1 row) + +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN +SELECT AVG(k) FROM v; +$$); + explain_has_distributed_subplan +--------------------------------------------------------------------- + f +(1 row) + +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN +SELECT SUM(k) FROM v; +$$); + explain_has_distributed_subplan +--------------------------------------------------------------------- + f +(1 row) + +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN +SELECT MAX(k) FROM v; +$$); + explain_has_distributed_subplan +--------------------------------------------------------------------- + f +(1 row) + +CREATE OR REPLACE VIEW v AS SELECT * from test_a where k<1 UNION ALL SELECT * from test_b where k<1; +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN +SELECT COUNT(*) FROM v; +$$); + explain_has_distributed_subplan +--------------------------------------------------------------------- + f +(1 row) + +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN +SELECT AVG(k) FROM v; +$$); + explain_has_distributed_subplan +--------------------------------------------------------------------- + f +(1 row) + +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN +SELECT SUM(k) FROM v; +$$); + explain_has_distributed_subplan +--------------------------------------------------------------------- + f +(1 row) + +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN +SELECT MAX(k) FROM v; +$$); + explain_has_distributed_subplan +--------------------------------------------------------------------- + f +(1 row) + +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN +SELECT count(*) FROM +( +SELECT bar.user_id FROM users_table_part u1 JOIN LATERAL (SELECT u1.user_id FROM users_table_part u2 WHERE u2.user_id = u1.user_id) as bar ON (true) +UNION +SELECT bar.user_id FROM users_table_part u1 JOIN LATERAL (SELECT u1.user_id FROM users_table_part u2 WHERE u2.user_id = u1.user_id) as bar ON (true) +) as bar; +$$); + explain_has_distributed_subplan +--------------------------------------------------------------------- + f +(1 row) + +-- we hit the following error hence can't pushdown: +-- Complex subqueries and CTEs are not supported within a UNION +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN WITH cte AS ( +SELECT * FROM +( +SELECT bar.user_id FROM users_table_part u1 JOIN LATERAL (SELECT u1.user_id FROM users_table_part u2 WHERE u2.user_id = u1.user_id) as bar ON (true) +UNION +SELECT bar.user_id FROM users_table_part u1 JOIN LATERAL (SELECT u1.user_id FROM users_table_part u2 WHERE u2.user_id = u1.user_id) as bar ON (true) +) as bar ) +SELECT count(*) FROM +( +SELECT bar.user_id FROM cte u1 JOIN LATERAL (SELECT u1.user_id FROM cte u2 WHERE u2.user_id = u1.user_id) as bar ON (true) +UNION +SELECT bar.user_id FROM cte u1 JOIN LATERAL (SELECT u1.user_id FROM cte u2 WHERE u2.user_id = u1.user_id) as bar ON (true) +) as foo; $$); explain_has_distributed_subplan --------------------------------------------------------------------- t (1 row) +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN +SELECT count(*) FROM +( +SELECT bar.user_id FROM users_table_part u1 JOIN LATERAL (SELECT u1.user_id, random() FROM users_table_part u2 WHERE u2.user_id = u1.user_id) as bar ON (true) +UNION +SELECT bar.user_id FROM users_table_part u1 JOIN LATERAL (SELECT u1.user_id, random() FROM users_table_part u2 WHERE u2.user_id = u1.user_id) as bar ON (true) +) as bar; +$$); + explain_has_distributed_subplan +--------------------------------------------------------------------- + f +(1 row) + +SELECT public.explain_has_distributed_subplan($$ +explain +SELECT count(*) FROM +( +SELECT bar.user_id FROM users_table_part u1 JOIN LATERAL (SELECT u2.user_id FROM users_table_part u2 WHERE u2.user_id = u1.user_id) as bar ON (true) +UNION +SELECT bar.user_id FROM users_table_part u1 JOIN LATERAL (SELECT u2.user_id FROM users_table_part u2 WHERE u2.user_id = u1.user_id) as bar ON (true) +) as bar; +$$); + explain_has_distributed_subplan +--------------------------------------------------------------------- + f +(1 row) + +SELECT public.explain_has_distributed_subplan($$ +explain +SELECT bar.user_id FROM users_table_part u1 JOIN LATERAL (SELECT *,random() FROM (SELECT u1.user_id FROM users_table_part u2 WHERE u2.user_id = u1.user_id) as foo) as bar ON (true) +$$); + explain_has_distributed_subplan +--------------------------------------------------------------------- + f +(1 row) + +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN WITH cte_1 AS not materialized +( + SELECT user_id + FROM ( + SELECT * + FROM users_table_part) AS l1 + UNION ALL + SELECT user_id + FROM ( + SELECT user_id + FROM users_table_part + GROUP BY user_id) AS l2 + UNION ALL + SELECT user_id + FROM ( + SELECT * + FROM users_table_part) AS l2 + UNION ALL + SELECT user_id + FROM ( + SELECT * + FROM users_table_part) AS l1 + UNION + SELECT user_id + FROM ( + SELECT user_id + FROM ( + SELECT DISTINCT user_id + FROM users_table_part) aa ) AS fooo + UNION + SELECT user_id + FROM ( + SELECT DISTINCT user_id + FROM users_table_part) AS l2 ), cte_2 AS NOT materialized +( + SELECT * + FROM cte_1), cte_3 AS NOT materialized +( + SELECT * + FROM cte_2), cte_4 AS +( + SELECT DISTINCT user_id + FROM cte_3) +SELECT count(*) +FROM ( + SELECT user_id + FROM ( + SELECT * + FROM users_table_part) AS l1 + UNION ALL + SELECT user_id + FROM ( + SELECT * + FROM users_table_part) AS l2 + UNION ALL + SELECT user_id + FROM ( + SELECT *, + random() + FROM users_table_part) AS l2 + UNION ALL + SELECT user_id + FROM ( + SELECT * + FROM users_table_part) AS l1 + UNION + SELECT user_id + FROM ( + SELECT user_id AS user_id + FROM users_table_part) AS l2 + UNION + SELECT user_id + FROM ( + SELECT *, + random() + FROM users_table_part) AS l2 + UNION + SELECT user_id + FROM ( + SELECT * + FROM cte_1) AS bar + UNION ALL + SELECT user_id + FROM cte_2 + UNION + SELECT * + FROM ( + SELECT user_id + FROM cte_2 + GROUP BY user_id) AS bar + UNION + SELECT user_id + FROM cte_4 + UNION + SELECT user_id + FROM cte_3 ) AS foo +JOIN lateral + ( + SELECT * + FROM ( + SELECT user_id + FROM ( + SELECT * + FROM users_table_part) AS l1 + UNION ALL + SELECT user_id + FROM ( + SELECT * + FROM users_table_part) AS l2 + UNION ALL + SELECT user_id + FROM ( + SELECT *, + random() + FROM users_table_part) AS l2 + UNION ALL + SELECT user_id + FROM ( + SELECT * + FROM users_table_part) AS l1 + UNION + SELECT user_id + FROM ( + SELECT user_id AS user_id + FROM users_table_part) AS l2 + UNION + SELECT user_id + FROM ( + SELECT users_table_part.user_id, + random() + FROM users_table_part) AS l2 + UNION + SELECT user_id + FROM ( + SELECT * + FROM cte_1) AS bar + UNION ALL + SELECT user_id + FROM cte_2 + UNION + SELECT * + FROM ( + SELECT user_id + FROM cte_2 + GROUP BY user_id) AS bar + UNION + SELECT user_id + FROM cte_3 ) AS subqu) AS bar +using (user_id) +WHERE ( + foo.user_id) IN + ( + SELECT user_id + FROM ( + SELECT foo.user_id + FROM users_table_part u1 + JOIN lateral + ( + SELECT u1.user_id + FROM users_table_part u2 + WHERE u2.user_id = u1.user_id) AS foo + ON ( + true) + UNION + SELECT foo.user_id + FROM users_table_part u1 + JOIN lateral + ( + SELECT u1.user_id + FROM users_table_part u2 + WHERE u2.user_id = u1.user_id) AS foo + ON ( + true) ) AS bar + UNION + SELECT * + FROM cte_2 + WHERE user_id IN + ( + SELECT user_id + FROM users_table_part) + UNION + SELECT * + FROM cte_1 ) ; +$$); + explain_has_distributed_subplan +--------------------------------------------------------------------- + f +(1 row) + +-- we hit https://github.com/citusdata/citus/blob/f00c63c33daf3d16f06462626ca14732b141ae7a/src/backend/distributed/planner/relation_restriction_equivalence.c#L235-L242 +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN SELECT * FROM users_table_part u1 WHERE (value_1, user_id) IN +( +SELECT u1.user_id, user_id FROM users_table_part +UNION +SELECT u1.user_id, user_id FROM users_table_part +); +$$); +ERROR: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns +CONTEXT: PL/pgSQL function public.explain_has_distributed_subplan(text) line XX at FOR over EXECUTE statement +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN SELECT * FROM users_table_part u1 WHERE (user_id) IN +( +SELECT foo.user_id FROM users_table_part JOIN users_table_part foo ON users_table_part.user_id = foo.user_id +UNION +SELECT foo.user_id FROM users_table_part JOIN users_table_part foo on users_table_part.user_id = foo.user_id +); +$$); + explain_has_distributed_subplan +--------------------------------------------------------------------- + f +(1 row) + +CREATE OR REPLACE VIEW v AS SELECT test_a.* from test_a where k>1 UNION ALL SELECT test_b.* from test_b where k<1; +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN SELECT COUNT(*) FROM v; +$$); + explain_has_distributed_subplan +--------------------------------------------------------------------- + f +(1 row) + +CREATE OR REPLACE VIEW v AS (SELECT * from (SELECT * from test_a)a1 where k>1) UNION ALL SELECT * from (SELECT * from test_b)b1 where k<1; +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN SELECT COUNT(*) FROM v; +$$); + explain_has_distributed_subplan +--------------------------------------------------------------------- + f +(1 row) + -- order by prevents postgres from optimizing fields so can be pushed down SELECT public.explain_has_distributed_subplan($$ EXPLAIN @@ -966,16 +1463,17 @@ SELECT k, COUNT(*) FROM v GROUP BY k ORDER BY k; $$); explain_has_distributed_subplan --------------------------------------------------------------------- - t + f (1 row) RESET client_min_messages; DROP SCHEMA union_pushdown CASCADE; -NOTICE: drop cascades to 7 other objects +NOTICE: drop cascades to 8 other objects DETAIL: drop cascades to table users_table_part drop cascades to table events_table_part drop cascades to table events_table_ref drop cascades to table events_table_local drop cascades to table test_a drop cascades to table test_b +drop cascades to type comp_type drop cascades to view v diff --git a/src/test/regress/multi_schedule b/src/test/regress/multi_schedule index a43098f60..626f7fff5 100644 --- a/src/test/regress/multi_schedule +++ b/src/test/regress/multi_schedule @@ -14,6 +14,7 @@ test: multi_shard_update_delete recursive_dml_with_different_planners_executors test: insert_select_repartition window_functions dml_recursive multi_insert_select_window test: multi_insert_select_conflict citus_table_triggers test: multi_row_insert insert_select_into_local_table multi_create_table_new_features +test: multi_agg_approximate_distinct # following should not run in parallel because it relies on connection counts to workers test: insert_select_connection_leak @@ -65,7 +66,7 @@ test: multi_subquery_in_where_reference_clause adaptive_executor propagate_set_c # this should be run alone as it gets too many clients test: join_pushdown test: multi_subquery_union multi_subquery_in_where_clause multi_subquery_misc statement_cancel_error_message -test: multi_agg_distinct multi_agg_approximate_distinct multi_limit_clause_approximate multi_outer_join_reference multi_single_relation_subquery multi_prepare_plsql set_role_in_transaction +test: multi_agg_distinct multi_limit_clause_approximate multi_outer_join_reference multi_single_relation_subquery multi_prepare_plsql set_role_in_transaction test: multi_reference_table multi_select_for_update relation_access_tracking pg13_with_ties test: custom_aggregate_support aggregate_support tdigest_aggregate_support test: multi_average_expression multi_working_columns multi_having_pushdown having_subquery diff --git a/src/test/regress/sql/multi_agg_approximate_distinct.sql b/src/test/regress/sql/multi_agg_approximate_distinct.sql index d5a535bc1..de799f81c 100644 --- a/src/test/regress/sql/multi_agg_approximate_distinct.sql +++ b/src/test/regress/sql/multi_agg_approximate_distinct.sql @@ -13,6 +13,18 @@ WHERE name = 'hll' :create_cmd; +SELECT public.explain_has_distributed_subplan($$ + EXPLAIN SELECT symbol_id, + HLL_ADD_AGG(HLL_HASH_BIGINT(event_id)) AS event_hll_hash, + HLL_CARDINALITY(HLL_ADD_AGG(HLL_HASH_BIGINT(event_id))) AS event_n_users + FROM ( + SELECT event_time, composite_id, event_id, 4640476 symbol_id FROM "events" + UNION ALL + SELECT event_time, composite_id, event_id, 4640477 symbol_id FROM "events" + ) pushdown_events + GROUP BY symbol_id; + $$); + SET citus.coordinator_aggregation_strategy TO 'disabled'; -- Try to execute count(distinct) when approximate distincts aren't enabled diff --git a/src/test/regress/sql/union_pushdown.sql b/src/test/regress/sql/union_pushdown.sql index c40f7c6fd..884d93600 100644 --- a/src/test/regress/sql/union_pushdown.sql +++ b/src/test/regress/sql/union_pushdown.sql @@ -655,11 +655,92 @@ LIMIT 1; $$); -- #4781 -CREATE TABLE test_a (id int, k int); -CREATE TABLE test_b (id int, k int); +CREATE TABLE test_a (a int, b int, id int, k int); +CREATE TABLE test_b (a int, b int, id int, k int); +ALTER TABLE test_a DROP column a; +ALTER TABLE test_b DROP column a; SELECT create_distributed_table('test_a','id'); SELECT create_distributed_table('test_b','id'); +-- try with composite types +CREATE TYPE comp_type AS ( + int_field_1 BIGINT, + int_field_2 BIGINT +); + +CREATE TABLE range_dist_table_2 (dist_col comp_type); +SELECT create_distributed_table('range_dist_table_2', 'dist_col', 'range'); + +CALL public.create_range_partitioned_shards('range_dist_table_2', + '{"(10,24)","(10,58)", + "(10,90)","(20,100)"}', + '{"(10,25)","(10,65)", + "(10,99)","(20,100)"}'); +INSERT INTO range_dist_table_2 VALUES ((10, 24)); +INSERT INTO range_dist_table_2 VALUES ((10, 60)); +INSERT INTO range_dist_table_2 VALUES ((10, 91)); +INSERT INTO range_dist_table_2 VALUES ((20, 100)); +-- the following can be pushed down +CREATE OR REPLACE VIEW v2 AS SELECT * from range_dist_table_2 UNION ALL SELECT * from range_dist_table_2; +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN +SELECT COUNT(dist_col) FROM v2; +$$); + +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN +SELECT COUNT(*) FROM v2; +$$); + +DROP TABLE range_dist_table_2 cascade; + +-- these should be pushed down. +SELECT public.explain_has_distributed_subplan($$ +explain SELECT * FROM users_table_part u1 WHERE (user_id) IN +( +SELECT user_id FROM users_table_part +UNION +SELECT user_id FROM users_table_part +) ; +$$); + +SELECT public.explain_has_distributed_subplan($$ +explain SELECT * FROM users_table_part u1 WHERE (user_id) IN +( +SELECT user_id FROM (SELECT *, random() FROM users_table_part) as foo +UNION +SELECT user_id FROM (SELECT *, random() FROM users_table_part) as bar +); +$$); + +CREATE OR REPLACE VIEW v AS SELECT * from test_a where k>1 UNION SELECT * from test_b where k<1; +-- tests with union +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN +SELECT COUNT(*) FROM v; +$$); + +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN +SELECT AVG(k) FROM v; +$$); + +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN +SELECT SUM(k) FROM v; +$$); + +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN +SELECT MAX(k) FROM v; +$$); + +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN +SELECT COUNT(k) FROM v; +$$); + + CREATE OR REPLACE VIEW v AS SELECT * from test_a where k>1 UNION ALL SELECT * from test_b where k<1; -- the followings can be pushed down since dist_key is used in the aggregation SELECT public.explain_has_distributed_subplan($$ @@ -682,7 +763,6 @@ EXPLAIN SELECT MAX(id) FROM v; $$); --- cannot pushed down because postgres optimizes fields, needs to be fixed with #4781 SELECT public.explain_has_distributed_subplan($$ EXPLAIN SELECT COUNT(k) FROM v; @@ -703,6 +783,319 @@ EXPLAIN SELECT MAX(k) FROM v; $$); +CREATE OR REPLACE VIEW v AS (SELECT * from (SELECT * from test_a)a1 where k>1) UNION ALL SELECT * from (SELECT * from test_b)b1 where k<1; + +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN +SELECT COUNT(*) FROM v; +$$); + +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN +SELECT AVG(k) FROM v; +$$); + +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN +SELECT SUM(k) FROM v; +$$); + +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN +SELECT MAX(k) FROM v; +$$); + +CREATE OR REPLACE VIEW v AS SELECT * from test_a where k<1 UNION ALL SELECT * from test_b where k<1; +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN +SELECT COUNT(*) FROM v; +$$); + +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN +SELECT AVG(k) FROM v; +$$); + +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN +SELECT SUM(k) FROM v; +$$); + +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN +SELECT MAX(k) FROM v; +$$); + +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN +SELECT count(*) FROM +( +SELECT bar.user_id FROM users_table_part u1 JOIN LATERAL (SELECT u1.user_id FROM users_table_part u2 WHERE u2.user_id = u1.user_id) as bar ON (true) +UNION +SELECT bar.user_id FROM users_table_part u1 JOIN LATERAL (SELECT u1.user_id FROM users_table_part u2 WHERE u2.user_id = u1.user_id) as bar ON (true) +) as bar; +$$); + +-- we hit the following error hence can't pushdown: +-- Complex subqueries and CTEs are not supported within a UNION +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN WITH cte AS ( +SELECT * FROM +( +SELECT bar.user_id FROM users_table_part u1 JOIN LATERAL (SELECT u1.user_id FROM users_table_part u2 WHERE u2.user_id = u1.user_id) as bar ON (true) +UNION +SELECT bar.user_id FROM users_table_part u1 JOIN LATERAL (SELECT u1.user_id FROM users_table_part u2 WHERE u2.user_id = u1.user_id) as bar ON (true) +) as bar ) +SELECT count(*) FROM +( +SELECT bar.user_id FROM cte u1 JOIN LATERAL (SELECT u1.user_id FROM cte u2 WHERE u2.user_id = u1.user_id) as bar ON (true) +UNION +SELECT bar.user_id FROM cte u1 JOIN LATERAL (SELECT u1.user_id FROM cte u2 WHERE u2.user_id = u1.user_id) as bar ON (true) +) as foo; +$$); + +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN +SELECT count(*) FROM +( +SELECT bar.user_id FROM users_table_part u1 JOIN LATERAL (SELECT u1.user_id, random() FROM users_table_part u2 WHERE u2.user_id = u1.user_id) as bar ON (true) +UNION +SELECT bar.user_id FROM users_table_part u1 JOIN LATERAL (SELECT u1.user_id, random() FROM users_table_part u2 WHERE u2.user_id = u1.user_id) as bar ON (true) +) as bar; +$$); + +SELECT public.explain_has_distributed_subplan($$ +explain +SELECT count(*) FROM +( +SELECT bar.user_id FROM users_table_part u1 JOIN LATERAL (SELECT u2.user_id FROM users_table_part u2 WHERE u2.user_id = u1.user_id) as bar ON (true) +UNION +SELECT bar.user_id FROM users_table_part u1 JOIN LATERAL (SELECT u2.user_id FROM users_table_part u2 WHERE u2.user_id = u1.user_id) as bar ON (true) +) as bar; +$$); + +SELECT public.explain_has_distributed_subplan($$ +explain +SELECT bar.user_id FROM users_table_part u1 JOIN LATERAL (SELECT *,random() FROM (SELECT u1.user_id FROM users_table_part u2 WHERE u2.user_id = u1.user_id) as foo) as bar ON (true) +$$); + +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN WITH cte_1 AS not materialized +( + SELECT user_id + FROM ( + SELECT * + FROM users_table_part) AS l1 + UNION ALL + SELECT user_id + FROM ( + SELECT user_id + FROM users_table_part + GROUP BY user_id) AS l2 + UNION ALL + SELECT user_id + FROM ( + SELECT * + FROM users_table_part) AS l2 + UNION ALL + SELECT user_id + FROM ( + SELECT * + FROM users_table_part) AS l1 + UNION + SELECT user_id + FROM ( + SELECT user_id + FROM ( + SELECT DISTINCT user_id + FROM users_table_part) aa ) AS fooo + UNION + SELECT user_id + FROM ( + SELECT DISTINCT user_id + FROM users_table_part) AS l2 ), cte_2 AS NOT materialized +( + SELECT * + FROM cte_1), cte_3 AS NOT materialized +( + SELECT * + FROM cte_2), cte_4 AS +( + SELECT DISTINCT user_id + FROM cte_3) +SELECT count(*) +FROM ( + SELECT user_id + FROM ( + SELECT * + FROM users_table_part) AS l1 + UNION ALL + SELECT user_id + FROM ( + SELECT * + FROM users_table_part) AS l2 + UNION ALL + SELECT user_id + FROM ( + SELECT *, + random() + FROM users_table_part) AS l2 + UNION ALL + SELECT user_id + FROM ( + SELECT * + FROM users_table_part) AS l1 + UNION + SELECT user_id + FROM ( + SELECT user_id AS user_id + FROM users_table_part) AS l2 + UNION + SELECT user_id + FROM ( + SELECT *, + random() + FROM users_table_part) AS l2 + UNION + SELECT user_id + FROM ( + SELECT * + FROM cte_1) AS bar + UNION ALL + SELECT user_id + FROM cte_2 + UNION + SELECT * + FROM ( + SELECT user_id + FROM cte_2 + GROUP BY user_id) AS bar + UNION + SELECT user_id + FROM cte_4 + UNION + SELECT user_id + FROM cte_3 ) AS foo +JOIN lateral + ( + SELECT * + FROM ( + SELECT user_id + FROM ( + SELECT * + FROM users_table_part) AS l1 + UNION ALL + SELECT user_id + FROM ( + SELECT * + FROM users_table_part) AS l2 + UNION ALL + SELECT user_id + FROM ( + SELECT *, + random() + FROM users_table_part) AS l2 + UNION ALL + SELECT user_id + FROM ( + SELECT * + FROM users_table_part) AS l1 + UNION + SELECT user_id + FROM ( + SELECT user_id AS user_id + FROM users_table_part) AS l2 + UNION + SELECT user_id + FROM ( + SELECT users_table_part.user_id, + random() + FROM users_table_part) AS l2 + UNION + SELECT user_id + FROM ( + SELECT * + FROM cte_1) AS bar + UNION ALL + SELECT user_id + FROM cte_2 + UNION + SELECT * + FROM ( + SELECT user_id + FROM cte_2 + GROUP BY user_id) AS bar + UNION + SELECT user_id + FROM cte_3 ) AS subqu) AS bar +using (user_id) +WHERE ( + foo.user_id) IN + ( + SELECT user_id + FROM ( + SELECT foo.user_id + FROM users_table_part u1 + JOIN lateral + ( + SELECT u1.user_id + FROM users_table_part u2 + WHERE u2.user_id = u1.user_id) AS foo + ON ( + true) + UNION + SELECT foo.user_id + FROM users_table_part u1 + JOIN lateral + ( + SELECT u1.user_id + FROM users_table_part u2 + WHERE u2.user_id = u1.user_id) AS foo + ON ( + true) ) AS bar + UNION + SELECT * + FROM cte_2 + WHERE user_id IN + ( + SELECT user_id + FROM users_table_part) + UNION + SELECT * + FROM cte_1 ) ; +$$); + + +-- we hit https://github.com/citusdata/citus/blob/f00c63c33daf3d16f06462626ca14732b141ae7a/src/backend/distributed/planner/relation_restriction_equivalence.c#L235-L242 +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN SELECT * FROM users_table_part u1 WHERE (value_1, user_id) IN +( +SELECT u1.user_id, user_id FROM users_table_part +UNION +SELECT u1.user_id, user_id FROM users_table_part +); +$$); + +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN SELECT * FROM users_table_part u1 WHERE (user_id) IN +( +SELECT foo.user_id FROM users_table_part JOIN users_table_part foo ON users_table_part.user_id = foo.user_id +UNION +SELECT foo.user_id FROM users_table_part JOIN users_table_part foo on users_table_part.user_id = foo.user_id +); +$$); + +CREATE OR REPLACE VIEW v AS SELECT test_a.* from test_a where k>1 UNION ALL SELECT test_b.* from test_b where k<1; + +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN SELECT COUNT(*) FROM v; +$$); + +CREATE OR REPLACE VIEW v AS (SELECT * from (SELECT * from test_a)a1 where k>1) UNION ALL SELECT * from (SELECT * from test_b)b1 where k<1; +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN SELECT COUNT(*) FROM v; +$$); + -- order by prevents postgres from optimizing fields so can be pushed down SELECT public.explain_has_distributed_subplan($$ EXPLAIN