diff --git a/src/backend/distributed/planner/insert_select_planner.c b/src/backend/distributed/planner/insert_select_planner.c index 137e19fc7..e7260946f 100644 --- a/src/backend/distributed/planner/insert_select_planner.c +++ b/src/backend/distributed/planner/insert_select_planner.c @@ -1136,7 +1136,8 @@ HasUnsupportedDistinctOn(Query *query) TargetEntry *distinctEntry = get_sortgroupclause_tle(distinctClause, query->targetList); - if (IsPartitionColumn(distinctEntry->expr, query)) + bool skipOuterVars = true; + if (IsPartitionColumn(distinctEntry->expr, query, skipOuterVars)) { return false; } @@ -1170,7 +1171,6 @@ InsertPartitionColumnMatchesSelect(Query *query, RangeTblEntry *insertRte, { TargetEntry *targetEntry = (TargetEntry *) lfirst(targetEntryCell); List *insertTargetEntryColumnList = pull_var_clause_default((Node *) targetEntry); - Oid subqueryPartitionColumnRelationId = InvalidOid; Var *subqueryPartitionColumn = NULL; /* @@ -1202,11 +1202,18 @@ InsertPartitionColumnMatchesSelect(Query *query, RangeTblEntry *insertRte, insertVar->varattno - 1); Expr *selectTargetExpr = subqueryTargetEntry->expr; + RangeTblEntry *subqueryPartitionColumnRelationIdRTE = NULL; List *parentQueryList = list_make2(query, subquery); + bool skipOuterVars = true; FindReferencedTableColumn(selectTargetExpr, parentQueryList, subquery, - &subqueryPartitionColumnRelationId, - &subqueryPartitionColumn); + &subqueryPartitionColumn, + &subqueryPartitionColumnRelationIdRTE, + skipOuterVars); + Oid subqueryPartitionColumnRelationId = subqueryPartitionColumnRelationIdRTE ? + subqueryPartitionColumnRelationIdRTE-> + relid : + InvalidOid; /* * Corresponding (i.e., in the same ordinal position as the target table's @@ -1339,7 +1346,7 @@ InsertPartitionColumnMatchesSelect(Query *query, RangeTblEntry *insertRte, } /* finally, check that the select target column is a partition column */ - if (!IsPartitionColumn(selectTargetExpr, subquery)) + if (!IsPartitionColumn(selectTargetExpr, subquery, skipOuterVars)) { return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, "cannot perform distributed INSERT INTO ... SELECT " diff --git a/src/backend/distributed/planner/multi_logical_optimizer.c b/src/backend/distributed/planner/multi_logical_optimizer.c index 615c0ddbe..d138ea635 100644 --- a/src/backend/distributed/planner/multi_logical_optimizer.c +++ b/src/backend/distributed/planner/multi_logical_optimizer.c @@ -327,7 +327,7 @@ static bool ShouldProcessDistinctOrderAndLimitForWorker( ExtendedOpNodeProperties *extendedOpNodeProperties, bool pushingDownOriginalGrouping, Node *havingQual); - +static bool IsIndexInRange(const List *list, int index); /* * MultiLogicalPlanOptimize applies multi-relational algebra optimizations on @@ -4372,16 +4372,19 @@ GroupTargetEntryList(List *groupClauseList, List *targetEntryList) * reference tables do not have partition column. The function does not * support queries with CTEs, it would return false if columnExpression * refers to a column returned by a CTE. + * + * If skipOuterVars is true, then it doesn't process the outervars. */ bool -IsPartitionColumn(Expr *columnExpression, Query *query) +IsPartitionColumn(Expr *columnExpression, Query *query, bool skipOuterVars) { bool isPartitionColumn = false; - Oid relationId = InvalidOid; Var *column = NULL; + RangeTblEntry *relationRTE = NULL; - FindReferencedTableColumn(columnExpression, NIL, query, &relationId, &column); - + FindReferencedTableColumn(columnExpression, NIL, query, &column, &relationRTE, + skipOuterVars); + Oid relationId = relationRTE ? relationRTE->relid : InvalidOid; if (relationId != InvalidOid && column != NULL) { Var *partitionColumn = DistPartitionKey(relationId); @@ -4400,21 +4403,26 @@ IsPartitionColumn(Expr *columnExpression, Query *query) /* * FindReferencedTableColumn recursively traverses query tree to find actual relation * id, and column that columnExpression refers to. If columnExpression is a - * non-relational or computed/derived expression, the function returns InvalidOid for - * relationId and NULL for column. The caller should provide parent query list from + * non-relational or computed/derived expression, the function returns NULL for + * rte and NULL for column. The caller should provide parent query list from * top of the tree to this particular Query's parent. This argument is used to look * into CTEs that may be present in the query. + * + * If skipOuterVars is true, then it doesn't check vars coming from outer queries. + * We probably don't need this skipOuterVars check but we wanted to be on the safe side + * and used it only in UNION path, we can separately work on verifying that it doesn't break + * anything existing. */ void FindReferencedTableColumn(Expr *columnExpression, List *parentQueryList, Query *query, - Oid *relationId, Var **column) + Var **column, RangeTblEntry **rteContainingReferencedColumn, + bool skipOuterVars) { Var *candidateColumn = NULL; - List *rangetableList = query->rtable; Expr *strippedColumnExpression = (Expr *) strip_implicit_coercions( (Node *) columnExpression); - *relationId = InvalidOid; + *rteContainingReferencedColumn = NULL; *column = NULL; if (IsA(strippedColumnExpression, Var)) @@ -4443,9 +4451,28 @@ FindReferencedTableColumn(Expr *columnExpression, List *parentQueryList, Query * * subqueries in WHERE clause, we don't support use of partition keys * in the subquery that is referred from the outer query. */ - if (candidateColumn->varlevelsup > 0) + if (candidateColumn->varlevelsup > 0 && !skipOuterVars) { - return; + int parentQueryIndex = list_length(parentQueryList) - + candidateColumn->varlevelsup; + if (!(IsIndexInRange(parentQueryList, parentQueryIndex))) + { + return; + } + + /* + * Before we recurse into the query tree, we should update the candidateColumn and we use copy of it. + * As we get the query from varlevelsup up, we reset the varlevelsup. + */ + candidateColumn = copyObject(candidateColumn); + candidateColumn->varlevelsup = 0; + + /* + * We should be careful about these fields because they need to + * be updated correctly based on ctelevelsup and varlevelsup. + */ + query = list_nth(parentQueryList, parentQueryIndex); + parentQueryList = list_truncate(parentQueryList, parentQueryIndex); } if (candidateColumn->varattno == InvalidAttrNumber) @@ -4457,12 +4484,13 @@ FindReferencedTableColumn(Expr *columnExpression, List *parentQueryList, Query * return; } + List *rangetableList = query->rtable; int rangeTableEntryIndex = candidateColumn->varno - 1; RangeTblEntry *rangeTableEntry = list_nth(rangetableList, rangeTableEntryIndex); if (rangeTableEntry->rtekind == RTE_RELATION) { - *relationId = rangeTableEntry->relid; + *rteContainingReferencedColumn = rangeTableEntry; *column = candidateColumn; } else if (rangeTableEntry->rtekind == RTE_SUBQUERY) @@ -4476,7 +4504,8 @@ FindReferencedTableColumn(Expr *columnExpression, List *parentQueryList, Query * /* append current query to parent query list */ parentQueryList = lappend(parentQueryList, query); FindReferencedTableColumn(subColumnExpression, parentQueryList, - subquery, relationId, column); + subquery, column, rteContainingReferencedColumn, + skipOuterVars); } else if (rangeTableEntry->rtekind == RTE_JOIN) { @@ -4485,11 +4514,17 @@ FindReferencedTableColumn(Expr *columnExpression, List *parentQueryList, Query * Expr *joinColumn = list_nth(joinColumnList, joinColumnIndex); /* parent query list stays the same since still in the same query boundary */ - FindReferencedTableColumn(joinColumn, parentQueryList, query, - relationId, column); + FindReferencedTableColumn(joinColumn, parentQueryList, query, column, + rteContainingReferencedColumn, skipOuterVars); } else if (rangeTableEntry->rtekind == RTE_CTE) { + /* + * When outerVars are considered, we modify parentQueryList, so this + * logic might need to change when we support outervars in CTEs. + */ + Assert(!skipOuterVars); + int cteParentListIndex = list_length(parentQueryList) - rangeTableEntry->ctelevelsup - 1; Query *cteParentQuery = NULL; @@ -4501,7 +4536,7 @@ FindReferencedTableColumn(Expr *columnExpression, List *parentQueryList, Query * * moment due to usage from IsPartitionColumn. Callers of that function * do not have access to parent query list. */ - if (cteParentListIndex >= 0) + if (IsIndexInRange(parentQueryList, cteParentListIndex)) { cteParentQuery = list_nth(parentQueryList, cteParentListIndex); cteList = cteParentQuery->cteList; @@ -4526,12 +4561,24 @@ FindReferencedTableColumn(Expr *columnExpression, List *parentQueryList, Query * parentQueryList = lappend(parentQueryList, query); FindReferencedTableColumn(targetEntry->expr, parentQueryList, - cteQuery, relationId, column); + cteQuery, column, rteContainingReferencedColumn, + skipOuterVars); } } } +/* + * IsIndexInRange returns true if the given index is within the + * range of the given list. + */ +static bool +IsIndexInRange(const List *list, int index) +{ + return index >= 0 && index < list_length(list); +} + + /* * ExtractQueryWalker walks over a query, and finds all queries in the query * tree and returns these queries. Note that the function also recurses into diff --git a/src/backend/distributed/planner/multi_logical_planner.c b/src/backend/distributed/planner/multi_logical_planner.c index e72150d5a..0691df7cb 100644 --- a/src/backend/distributed/planner/multi_logical_planner.c +++ b/src/backend/distributed/planner/multi_logical_planner.c @@ -215,11 +215,15 @@ TargetListOnPartitionColumn(Query *query, List *targetEntryList) TargetEntry *targetEntry = (TargetEntry *) lfirst(targetEntryCell); Expr *targetExpression = targetEntry->expr; - bool isPartitionColumn = IsPartitionColumn(targetExpression, query); - Oid relationId = InvalidOid; + bool skipOuterVars = true; + bool isPartitionColumn = IsPartitionColumn(targetExpression, query, + skipOuterVars); Var *column = NULL; + RangeTblEntry *rte = NULL; - FindReferencedTableColumn(targetExpression, NIL, query, &relationId, &column); + FindReferencedTableColumn(targetExpression, NIL, query, &column, &rte, + skipOuterVars); + Oid relationId = rte ? rte->relid : InvalidOid; /* * If the expression belongs to a non-distributed table continue searching for diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index ba1003c03..7b310e5be 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -358,7 +358,8 @@ AddPartitionKeyNotNullFilterToSelect(Query *subqery) { TargetEntry *targetEntry = lfirst(targetEntryCell); - if (IsPartitionColumn(targetEntry->expr, subqery) && + bool skipOuterVars = true; + if (IsPartitionColumn(targetEntry->expr, subqery, skipOuterVars) && IsA(targetEntry->expr, Var)) { targetPartitionColumnVar = (Var *) targetEntry->expr; diff --git a/src/backend/distributed/planner/query_pushdown_planning.c b/src/backend/distributed/planner/query_pushdown_planning.c index 632b9dd72..e8452a357 100644 --- a/src/backend/distributed/planner/query_pushdown_planning.c +++ b/src/backend/distributed/planner/query_pushdown_planning.c @@ -582,7 +582,7 @@ DeferErrorIfUnsupportedSubqueryPushdown(Query *originalQuery, */ if (ContainsUnionSubquery(originalQuery)) { - if (!SafeToPushdownUnionSubquery(plannerRestrictionContext)) + if (!SafeToPushdownUnionSubquery(originalQuery, plannerRestrictionContext)) { return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, "cannot pushdown the subquery since not all subqueries " @@ -1836,7 +1836,9 @@ PartitionColumnForPushedDownSubquery(Query *query) Expr *targetExpression = targetEntry->expr; if (IsA(targetExpression, Var)) { - bool isPartitionColumn = IsPartitionColumn(targetExpression, query); + bool skipOuterVars = true; + bool isPartitionColumn = IsPartitionColumn(targetExpression, query, + skipOuterVars); if (isPartitionColumn) { Var *partitionColumn = copyObject((Var *) targetExpression); diff --git a/src/backend/distributed/planner/recursive_planning.c b/src/backend/distributed/planner/recursive_planning.c index 209714f74..b2a8140da 100644 --- a/src/backend/distributed/planner/recursive_planning.c +++ b/src/backend/distributed/planner/recursive_planning.c @@ -1013,7 +1013,7 @@ ShouldRecursivelyPlanSetOperation(Query *query, RecursivePlanningContext *contex PlannerRestrictionContext *filteredRestrictionContext = FilterPlannerRestrictionForQuery(context->plannerRestrictionContext, query); - if (!SafeToPushdownUnionSubquery(filteredRestrictionContext)) + if (!SafeToPushdownUnionSubquery(query, filteredRestrictionContext)) { /* * The distribution column is not in the same place in all sides diff --git a/src/backend/distributed/planner/relation_restriction_equivalence.c b/src/backend/distributed/planner/relation_restriction_equivalence.c index fb6a6a306..5cbd2e53d 100644 --- a/src/backend/distributed/planner/relation_restriction_equivalence.c +++ b/src/backend/distributed/planner/relation_restriction_equivalence.c @@ -59,6 +59,12 @@ typedef struct AttributeEquivalenceClass Index unionQueryPartitionKeyIndex; } AttributeEquivalenceClass; +typedef struct FindQueryContainingRteIdentityContext +{ + int targetRTEIdentity; + Query *query; +}FindQueryContainingRteIdentityContext; + /* * AttributeEquivalenceClassMember - one member expression of an * AttributeEquivalenceClass. The important thing to consider is that @@ -142,8 +148,8 @@ static void ListConcatUniqueAttributeClassMemberLists(AttributeEquivalenceClass firstClass, AttributeEquivalenceClass * secondClass); -static Index RelationRestrictionPartitionKeyIndex(RelationRestriction * - relationRestriction); +static Var * PartitionKeyForRTEIdentityInQuery(Query *query, int targetRTEIndex, + Index *partitionKeyIndex); static bool AllRelationsInRestrictionContextColocated(RelationRestrictionContext * restrictionContext); static bool IsNotSafeRestrictionToRecursivelyPlan(Node *node); @@ -155,6 +161,12 @@ static bool RangeTableArrayContainsAnyRTEIdentities(RangeTblEntry **rangeTableEn queryRteIdentities); static Relids QueryRteIdentities(Query *queryTree); +static Query * FindQueryContainingRTEIdentity(Query *mainQuery, int rteIndex); +static bool FindQueryContainingRTEIdentityInternal(Node *node, + FindQueryContainingRteIdentityContext * + context); + + #if PG_VERSION_NUM >= PG_VERSION_13 static int ParentCountPriorToAppendRel(List *appendRelList, AppendRelInfo *appendRelInfo); #endif @@ -194,7 +206,7 @@ AllDistributionKeysInQueryAreEqual(Query *originalQuery, if (originalQuery->setOperations || ContainsUnionSubquery(originalQuery)) { - return SafeToPushdownUnionSubquery(plannerRestrictionContext); + return SafeToPushdownUnionSubquery(originalQuery, plannerRestrictionContext); } return false; @@ -244,7 +256,8 @@ ContextContainsLocalRelation(RelationRestrictionContext *restrictionContext) * safe to push down, the function would fail to return true. */ bool -SafeToPushdownUnionSubquery(PlannerRestrictionContext *plannerRestrictionContext) +SafeToPushdownUnionSubquery(Query *originalQuery, + PlannerRestrictionContext *plannerRestrictionContext) { RelationRestrictionContext *restrictionContext = plannerRestrictionContext->relationRestrictionContext; @@ -267,50 +280,34 @@ SafeToPushdownUnionSubquery(PlannerRestrictionContext *plannerRestrictionContext RelationRestriction *relationRestriction = lfirst(relationRestrictionCell); Index partitionKeyIndex = InvalidAttrNumber; PlannerInfo *relationPlannerRoot = relationRestriction->plannerInfo; - List *targetList = relationPlannerRoot->parse->targetList; - List *appendRelList = relationPlannerRoot->append_rel_list; - Var *varToBeAdded = NULL; - TargetEntry *targetEntryToAdd = NULL; + + int targetRTEIndex = GetRTEIdentity(relationRestriction->rte); + Var *varToBeAdded = + PartitionKeyForRTEIdentityInQuery(originalQuery, targetRTEIndex, + &partitionKeyIndex); + + /* union does not have partition key in the target list */ + if (partitionKeyIndex == 0) + { + continue; + } /* - * We first check whether UNION ALLs are pulled up or not. Note that Postgres - * planner creates AppendRelInfos per each UNION ALL query that is pulled up. - * Then, postgres stores the related information in the append_rel_list on the - * plannerInfo struct. + * This should never happen but to be on the safe side, we have this */ - if (appendRelList != NULL) + if (relationPlannerRoot->simple_rel_array_size < relationRestriction->index) { - varToBeAdded = FindUnionAllVar(relationPlannerRoot, - relationRestriction->translatedVars, - relationRestriction->relationId, - relationRestriction->index, - &partitionKeyIndex); - - /* union does not have partition key in the target list */ - if (partitionKeyIndex == 0) - { - continue; - } + continue; } - else - { - partitionKeyIndex = - RelationRestrictionPartitionKeyIndex(relationRestriction); - /* union does not have partition key in the target list */ - if (partitionKeyIndex == 0) - { - continue; - } + /* + * We update the varno because we use the original parse tree for finding the + * var. However the rest of the code relies on a query tree that might be different + * than the original parse tree because of postgres optimizations. + * That's why we update the varno to reflect the rteIndex in the modified query tree. + */ + varToBeAdded->varno = relationRestriction->index; - targetEntryToAdd = list_nth(targetList, partitionKeyIndex - 1); - if (!IsA(targetEntryToAdd->expr, Var)) - { - continue; - } - - varToBeAdded = (Var *) targetEntryToAdd->expr; - } /* * The current relation does not have its partition key in the target list. @@ -1756,20 +1753,42 @@ ContainsUnionSubquery(Query *queryTree) /* - * RelationRestrictionPartitionKeyIndex gets a relation restriction and finds the - * index that the partition key of the relation exists in the query. The query is - * found in the planner info of the relation restriction. + * PartitionKeyForRTEIdentityInQuery finds the partition key var(if exists), + * in the given original query for the rte that has targetRTEIndex. */ -static Index -RelationRestrictionPartitionKeyIndex(RelationRestriction *relationRestriction) +static Var * +PartitionKeyForRTEIdentityInQuery(Query *originalQuery, int targetRTEIndex, + Index *partitionKeyIndex) { + Query *originalQueryContainingRTEIdentity = + FindQueryContainingRTEIdentity(originalQuery, targetRTEIndex); + if (!originalQueryContainingRTEIdentity) + { + /* + * We should always find the query but we have this check for sanity. + * This check makes sure that if there is a bug while finding the query, + * we don't get a crash etc. and the only downside will be we might be recursively + * planning a query that could be pushed down. + */ + return NULL; + } + + /* + * This approach fails to detect when + * the top level query might have the column indexes in different order: + * explain + * SELECT count(*) FROM + * ( + * SELECT user_id,value_2 FROM events_table + * UNION + * SELECT value_2, user_id FROM (SELECT user_id, value_2, random() FROM events_table) as foo + * ) foobar; + * So we hit https://github.com/citusdata/citus/issues/5093. + */ + List *relationTargetList = originalQueryContainingRTEIdentity->targetList; + ListCell *targetEntryCell = NULL; Index partitionKeyTargetAttrIndex = 0; - - PlannerInfo *relationPlannerRoot = relationRestriction->plannerInfo; - Query *relationPlannerParseQuery = relationPlannerRoot->parse; - List *relationTargetList = relationPlannerParseQuery->targetList; - foreach(targetEntryCell, relationTargetList) { TargetEntry *targetEntry = (TargetEntry *) lfirst(targetEntryCell); @@ -1777,20 +1796,93 @@ RelationRestrictionPartitionKeyIndex(RelationRestriction *relationRestriction) partitionKeyTargetAttrIndex++; + bool skipOuterVars = false; if (!targetEntry->resjunk && IsA(targetExpression, Var) && - IsPartitionColumn(targetExpression, relationPlannerParseQuery)) + IsPartitionColumn(targetExpression, originalQueryContainingRTEIdentity, + skipOuterVars)) { Var *targetColumn = (Var *) targetExpression; - if (targetColumn->varno == relationRestriction->index) + /* + * We find the referenced table column to support distribution + * columns that are correlated. + */ + RangeTblEntry *rteContainingPartitionKey = NULL; + FindReferencedTableColumn(targetExpression, NIL, + originalQueryContainingRTEIdentity, + &targetColumn, + &rteContainingPartitionKey, + skipOuterVars); + + if (rteContainingPartitionKey->rtekind == RTE_RELATION && + GetRTEIdentity(rteContainingPartitionKey) == targetRTEIndex) { - return partitionKeyTargetAttrIndex; + *partitionKeyIndex = partitionKeyTargetAttrIndex; + return (Var *) copyObject(targetColumn); } } } - return InvalidAttrNumber; + return NULL; +} + + +/* + * FindQueryContainingRTEIdentity finds the query/subquery that has an RTE + * with rteIndex in its rtable. + */ +static Query * +FindQueryContainingRTEIdentity(Query *query, int rteIndex) +{ + FindQueryContainingRteIdentityContext *findRteIdentityContext = + palloc0(sizeof(FindQueryContainingRteIdentityContext)); + findRteIdentityContext->targetRTEIdentity = rteIndex; + FindQueryContainingRTEIdentityInternal((Node *) query, findRteIdentityContext); + return findRteIdentityContext->query; +} + + +/* + * FindQueryContainingRTEIdentityInternal walks on the given node to find a query + * which has an RTE that has a given rteIdentity. + */ +static bool +FindQueryContainingRTEIdentityInternal(Node *node, + FindQueryContainingRteIdentityContext *context) +{ + if (node == NULL) + { + return false; + } + if (IsA(node, Query)) + { + Query *query = (Query *) node; + Query *parentQuery = context->query; + context->query = query; + if (query_tree_walker(query, FindQueryContainingRTEIdentityInternal, context, + QTW_EXAMINE_RTES_BEFORE)) + { + return true; + } + context->query = parentQuery; + return false; + } + + if (!IsA(node, RangeTblEntry)) + { + return expression_tree_walker(node, FindQueryContainingRTEIdentityInternal, + context); + } + RangeTblEntry *rte = (RangeTblEntry *) node; + if (rte->rtekind == RTE_RELATION) + { + if (GetRTEIdentity(rte) == context->targetRTEIdentity) + { + return true; + } + } + return false; } diff --git a/src/include/distributed/multi_logical_optimizer.h b/src/include/distributed/multi_logical_optimizer.h index 9e6167959..8f0c5c751 100644 --- a/src/include/distributed/multi_logical_optimizer.h +++ b/src/include/distributed/multi_logical_optimizer.h @@ -171,9 +171,11 @@ extern bool GroupedByColumn(List *groupClauseList, List *targetList, Var *column extern List * SubqueryMultiTableList(MultiNode *multiNode); extern List * GroupTargetEntryList(List *groupClauseList, List *targetEntryList); extern bool ExtractQueryWalker(Node *node, List **queryList); -extern bool IsPartitionColumn(Expr *columnExpression, Query *query); +extern bool IsPartitionColumn(Expr *columnExpression, Query *query, bool skipOuterVars); extern void FindReferencedTableColumn(Expr *columnExpression, List *parentQueryList, - Query *query, Oid *relationId, Var **column); + Query *query, Var **column, + RangeTblEntry **rteContainingReferencedColumn, + bool skipOuterVars); extern char * WorkerColumnName(AttrNumber resno); extern bool IsGroupBySubsetOfDistinct(List *groupClauses, List *distinctClauses); extern bool TargetListHasAggregates(List *targetEntryList); diff --git a/src/include/distributed/relation_restriction_equivalence.h b/src/include/distributed/relation_restriction_equivalence.h index e1ece87e6..ccd50a6db 100644 --- a/src/include/distributed/relation_restriction_equivalence.h +++ b/src/include/distributed/relation_restriction_equivalence.h @@ -20,7 +20,7 @@ extern bool AllDistributionKeysInQueryAreEqual(Query *originalQuery, PlannerRestrictionContext * plannerRestrictionContext); -extern bool SafeToPushdownUnionSubquery(PlannerRestrictionContext * +extern bool SafeToPushdownUnionSubquery(Query *originalQuery, PlannerRestrictionContext * plannerRestrictionContext); extern bool ContainsUnionSubquery(Query *queryTree); extern bool RestrictionEquivalenceForPartitionKeys(PlannerRestrictionContext * diff --git a/src/test/regress/expected/local_dist_join_mixed.out b/src/test/regress/expected/local_dist_join_mixed.out index 2baf2064d..0e3e0aad4 100644 --- a/src/test/regress/expected/local_dist_join_mixed.out +++ b/src/test/regress/expected/local_dist_join_mixed.out @@ -740,10 +740,7 @@ DEBUG: Wrapping relation "local" to a subquery DEBUG: generating subplan XXX_1 for subquery SELECT id FROM local_dist_join_mixed.local WHERE true DEBUG: Wrapping relation "local" to a subquery DEBUG: generating subplan XXX_2 for subquery SELECT id FROM local_dist_join_mixed.local WHERE true -DEBUG: generating subplan XXX_3 for subquery SELECT id, name, created_at FROM (SELECT distributed.id, distributed.name, distributed.created_at FROM ((SELECT NULL::integer AS "dummy-1", local_1.id, NULL::integer AS "dummy-3", NULL::text AS title, NULL::integer AS "dummy-5" FROM (SELECT intermediate_result.id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(id bigint)) local_1) local JOIN local_dist_join_mixed.distributed USING (id))) fo -DEBUG: generating subplan XXX_4 for subquery SELECT id, name, created_at FROM (SELECT distributed.id, distributed.name, distributed.created_at FROM ((SELECT NULL::integer AS "dummy-1", local_1.id, NULL::integer AS "dummy-3", NULL::text AS title, NULL::integer AS "dummy-5" FROM (SELECT intermediate_result.id FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(id bigint)) local_1) local JOIN local_dist_join_mixed.distributed USING (id))) ba -DEBUG: generating subplan XXX_5 for subquery SELECT intermediate_result.id, intermediate_result.name, intermediate_result.created_at FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(id bigint, name text, created_at timestamp with time zone) UNION ALL SELECT intermediate_result.id, intermediate_result.name, intermediate_result.created_at FROM read_intermediate_result('XXX_4'::text, 'binary'::citus_copy_format) intermediate_result(id bigint, name text, created_at timestamp with time zone) -DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (SELECT intermediate_result.id, intermediate_result.name, intermediate_result.created_at FROM read_intermediate_result('XXX_5'::text, 'binary'::citus_copy_format) intermediate_result(id bigint, name text, created_at timestamp with time zone)) bar +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (SELECT fo.id, fo.name, fo.created_at FROM (SELECT distributed.id, distributed.name, distributed.created_at FROM ((SELECT NULL::integer AS "dummy-1", local_1.id, NULL::integer AS "dummy-3", NULL::text AS title, NULL::integer AS "dummy-5" FROM (SELECT intermediate_result.id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(id bigint)) local_1) local JOIN local_dist_join_mixed.distributed USING (id))) fo UNION ALL SELECT ba.id, ba.name, ba.created_at FROM (SELECT distributed.id, distributed.name, distributed.created_at FROM ((SELECT NULL::integer AS "dummy-1", local_1.id, NULL::integer AS "dummy-3", NULL::text AS title, NULL::integer AS "dummy-5" FROM (SELECT intermediate_result.id FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(id bigint)) local_1) local JOIN local_dist_join_mixed.distributed USING (id))) ba) bar count --------------------------------------------------------------------- 202 diff --git a/src/test/regress/expected/multi_agg_approximate_distinct.out b/src/test/regress/expected/multi_agg_approximate_distinct.out index aaf4e0543..945a5da38 100644 --- a/src/test/regress/expected/multi_agg_approximate_distinct.out +++ b/src/test/regress/expected/multi_agg_approximate_distinct.out @@ -9,6 +9,22 @@ AS create_cmd FROM pg_available_extensions() WHERE name = 'hll' \gset :create_cmd; +SELECT public.explain_has_distributed_subplan($$ + EXPLAIN SELECT symbol_id, + HLL_ADD_AGG(HLL_HASH_BIGINT(event_id)) AS event_hll_hash, + HLL_CARDINALITY(HLL_ADD_AGG(HLL_HASH_BIGINT(event_id))) AS event_n_users + FROM ( + SELECT event_time, composite_id, event_id, 4640476 symbol_id FROM "events" + UNION ALL + SELECT event_time, composite_id, event_id, 4640477 symbol_id FROM "events" + ) pushdown_events + GROUP BY symbol_id; + $$); + explain_has_distributed_subplan +--------------------------------------------------------------------- + f +(1 row) + SET citus.coordinator_aggregation_strategy TO 'disabled'; -- Try to execute count(distinct) when approximate distincts aren't enabled SELECT count(distinct l_orderkey) FROM lineitem; diff --git a/src/test/regress/expected/multi_agg_approximate_distinct_0.out b/src/test/regress/expected/multi_agg_approximate_distinct_0.out index 3e55c0720..4b771461d 100644 --- a/src/test/regress/expected/multi_agg_approximate_distinct_0.out +++ b/src/test/regress/expected/multi_agg_approximate_distinct_0.out @@ -14,6 +14,31 @@ WHERE name = 'hll' f (1 row) +SELECT public.explain_has_distributed_subplan($$ + EXPLAIN SELECT symbol_id, + HLL_ADD_AGG(HLL_HASH_BIGINT(event_id)) AS event_hll_hash, + HLL_CARDINALITY(HLL_ADD_AGG(HLL_HASH_BIGINT(event_id))) AS event_n_users + FROM ( + SELECT event_time, composite_id, event_id, 4640476 symbol_id FROM "events" + UNION ALL + SELECT event_time, composite_id, event_id, 4640477 symbol_id FROM "events" + ) pushdown_events + GROUP BY symbol_id; + $$); +ERROR: function hll_hash_bigint(bigint) does not exist +HINT: No function matches the given name and argument types. You might need to add explicit type casts. +QUERY: + EXPLAIN SELECT symbol_id, + HLL_ADD_AGG(HLL_HASH_BIGINT(event_id)) AS event_hll_hash, + HLL_CARDINALITY(HLL_ADD_AGG(HLL_HASH_BIGINT(event_id))) AS event_n_users + FROM ( + SELECT event_time, composite_id, event_id, 4640476 symbol_id FROM "events" + UNION ALL + SELECT event_time, composite_id, event_id, 4640477 symbol_id FROM "events" + ) pushdown_events + GROUP BY symbol_id; + +CONTEXT: PL/pgSQL function explain_has_distributed_subplan(text) line XX at FOR over EXECUTE statement SET citus.coordinator_aggregation_strategy TO 'disabled'; -- Try to execute count(distinct) when approximate distincts aren't enabled SELECT count(distinct l_orderkey) FROM lineitem; diff --git a/src/test/regress/expected/union_pushdown.out b/src/test/regress/expected/union_pushdown.out index ec56f3949..3d68bd8a5 100644 --- a/src/test/regress/expected/union_pushdown.out +++ b/src/test/regress/expected/union_pushdown.out @@ -860,8 +860,10 @@ $$); (1 row) -- #4781 -CREATE TABLE test_a (id int, k int); -CREATE TABLE test_b (id int, k int); +CREATE TABLE test_a (a int, b int, id int, k int); +CREATE TABLE test_b (a int, b int, id int, k int); +ALTER TABLE test_a DROP column a; +ALTER TABLE test_b DROP column a; SELECT create_distributed_table('test_a','id'); create_distributed_table --------------------------------------------------------------------- @@ -874,6 +876,123 @@ SELECT create_distributed_table('test_b','id'); (1 row) +-- try with composite types +CREATE TYPE comp_type AS ( + int_field_1 BIGINT, + int_field_2 BIGINT +); +CREATE TABLE range_dist_table_2 (dist_col comp_type); +SELECT create_distributed_table('range_dist_table_2', 'dist_col', 'range'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +CALL public.create_range_partitioned_shards('range_dist_table_2', + '{"(10,24)","(10,58)", + "(10,90)","(20,100)"}', + '{"(10,25)","(10,65)", + "(10,99)","(20,100)"}'); +INSERT INTO range_dist_table_2 VALUES ((10, 24)); +INSERT INTO range_dist_table_2 VALUES ((10, 60)); +INSERT INTO range_dist_table_2 VALUES ((10, 91)); +INSERT INTO range_dist_table_2 VALUES ((20, 100)); +-- the following can be pushed down +CREATE OR REPLACE VIEW v2 AS SELECT * from range_dist_table_2 UNION ALL SELECT * from range_dist_table_2; +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN +SELECT COUNT(dist_col) FROM v2; +$$); + explain_has_distributed_subplan +--------------------------------------------------------------------- + f +(1 row) + +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN +SELECT COUNT(*) FROM v2; +$$); + explain_has_distributed_subplan +--------------------------------------------------------------------- + f +(1 row) + +DROP TABLE range_dist_table_2 cascade; +NOTICE: drop cascades to view v2 +-- these should be pushed down. +SELECT public.explain_has_distributed_subplan($$ +explain SELECT * FROM users_table_part u1 WHERE (user_id) IN +( +SELECT user_id FROM users_table_part +UNION +SELECT user_id FROM users_table_part +) ; +$$); + explain_has_distributed_subplan +--------------------------------------------------------------------- + f +(1 row) + +SELECT public.explain_has_distributed_subplan($$ +explain SELECT * FROM users_table_part u1 WHERE (user_id) IN +( +SELECT user_id FROM (SELECT *, random() FROM users_table_part) as foo +UNION +SELECT user_id FROM (SELECT *, random() FROM users_table_part) as bar +); +$$); + explain_has_distributed_subplan +--------------------------------------------------------------------- + f +(1 row) + +CREATE OR REPLACE VIEW v AS SELECT * from test_a where k>1 UNION SELECT * from test_b where k<1; +-- tests with union +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN +SELECT COUNT(*) FROM v; +$$); + explain_has_distributed_subplan +--------------------------------------------------------------------- + f +(1 row) + +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN +SELECT AVG(k) FROM v; +$$); + explain_has_distributed_subplan +--------------------------------------------------------------------- + f +(1 row) + +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN +SELECT SUM(k) FROM v; +$$); + explain_has_distributed_subplan +--------------------------------------------------------------------- + f +(1 row) + +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN +SELECT MAX(k) FROM v; +$$); + explain_has_distributed_subplan +--------------------------------------------------------------------- + f +(1 row) + +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN +SELECT COUNT(k) FROM v; +$$); + explain_has_distributed_subplan +--------------------------------------------------------------------- + f +(1 row) + CREATE OR REPLACE VIEW v AS SELECT * from test_a where k>1 UNION ALL SELECT * from test_b where k<1; -- the followings can be pushed down since dist_key is used in the aggregation SELECT public.explain_has_distributed_subplan($$ @@ -912,14 +1031,13 @@ $$); f (1 row) --- cannot pushed down because postgres optimizes fields, needs to be fixed with #4781 SELECT public.explain_has_distributed_subplan($$ EXPLAIN SELECT COUNT(k) FROM v; $$); explain_has_distributed_subplan --------------------------------------------------------------------- - t + f (1 row) SELECT public.explain_has_distributed_subplan($$ @@ -928,7 +1046,7 @@ SELECT AVG(k) FROM v; $$); explain_has_distributed_subplan --------------------------------------------------------------------- - t + f (1 row) SELECT public.explain_has_distributed_subplan($$ @@ -937,18 +1055,397 @@ SELECT SUM(k) FROM v; $$); explain_has_distributed_subplan --------------------------------------------------------------------- - t + f (1 row) SELECT public.explain_has_distributed_subplan($$ EXPLAIN SELECT MAX(k) FROM v; +$$); + explain_has_distributed_subplan +--------------------------------------------------------------------- + f +(1 row) + +CREATE OR REPLACE VIEW v AS (SELECT * from (SELECT * from test_a)a1 where k>1) UNION ALL SELECT * from (SELECT * from test_b)b1 where k<1; +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN +SELECT COUNT(*) FROM v; +$$); + explain_has_distributed_subplan +--------------------------------------------------------------------- + f +(1 row) + +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN +SELECT AVG(k) FROM v; +$$); + explain_has_distributed_subplan +--------------------------------------------------------------------- + f +(1 row) + +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN +SELECT SUM(k) FROM v; +$$); + explain_has_distributed_subplan +--------------------------------------------------------------------- + f +(1 row) + +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN +SELECT MAX(k) FROM v; +$$); + explain_has_distributed_subplan +--------------------------------------------------------------------- + f +(1 row) + +CREATE OR REPLACE VIEW v AS SELECT * from test_a where k<1 UNION ALL SELECT * from test_b where k<1; +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN +SELECT COUNT(*) FROM v; +$$); + explain_has_distributed_subplan +--------------------------------------------------------------------- + f +(1 row) + +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN +SELECT AVG(k) FROM v; +$$); + explain_has_distributed_subplan +--------------------------------------------------------------------- + f +(1 row) + +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN +SELECT SUM(k) FROM v; +$$); + explain_has_distributed_subplan +--------------------------------------------------------------------- + f +(1 row) + +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN +SELECT MAX(k) FROM v; +$$); + explain_has_distributed_subplan +--------------------------------------------------------------------- + f +(1 row) + +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN +SELECT count(*) FROM +( +SELECT bar.user_id FROM users_table_part u1 JOIN LATERAL (SELECT u1.user_id FROM users_table_part u2 WHERE u2.user_id = u1.user_id) as bar ON (true) +UNION +SELECT bar.user_id FROM users_table_part u1 JOIN LATERAL (SELECT u1.user_id FROM users_table_part u2 WHERE u2.user_id = u1.user_id) as bar ON (true) +) as bar; +$$); + explain_has_distributed_subplan +--------------------------------------------------------------------- + f +(1 row) + +-- we hit the following error hence can't pushdown: +-- Complex subqueries and CTEs are not supported within a UNION +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN WITH cte AS ( +SELECT * FROM +( +SELECT bar.user_id FROM users_table_part u1 JOIN LATERAL (SELECT u1.user_id FROM users_table_part u2 WHERE u2.user_id = u1.user_id) as bar ON (true) +UNION +SELECT bar.user_id FROM users_table_part u1 JOIN LATERAL (SELECT u1.user_id FROM users_table_part u2 WHERE u2.user_id = u1.user_id) as bar ON (true) +) as bar ) +SELECT count(*) FROM +( +SELECT bar.user_id FROM cte u1 JOIN LATERAL (SELECT u1.user_id FROM cte u2 WHERE u2.user_id = u1.user_id) as bar ON (true) +UNION +SELECT bar.user_id FROM cte u1 JOIN LATERAL (SELECT u1.user_id FROM cte u2 WHERE u2.user_id = u1.user_id) as bar ON (true) +) as foo; $$); explain_has_distributed_subplan --------------------------------------------------------------------- t (1 row) +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN +SELECT count(*) FROM +( +SELECT bar.user_id FROM users_table_part u1 JOIN LATERAL (SELECT u1.user_id, random() FROM users_table_part u2 WHERE u2.user_id = u1.user_id) as bar ON (true) +UNION +SELECT bar.user_id FROM users_table_part u1 JOIN LATERAL (SELECT u1.user_id, random() FROM users_table_part u2 WHERE u2.user_id = u1.user_id) as bar ON (true) +) as bar; +$$); + explain_has_distributed_subplan +--------------------------------------------------------------------- + f +(1 row) + +SELECT public.explain_has_distributed_subplan($$ +explain +SELECT count(*) FROM +( +SELECT bar.user_id FROM users_table_part u1 JOIN LATERAL (SELECT u2.user_id FROM users_table_part u2 WHERE u2.user_id = u1.user_id) as bar ON (true) +UNION +SELECT bar.user_id FROM users_table_part u1 JOIN LATERAL (SELECT u2.user_id FROM users_table_part u2 WHERE u2.user_id = u1.user_id) as bar ON (true) +) as bar; +$$); + explain_has_distributed_subplan +--------------------------------------------------------------------- + f +(1 row) + +SELECT public.explain_has_distributed_subplan($$ +explain +SELECT bar.user_id FROM users_table_part u1 JOIN LATERAL (SELECT *,random() FROM (SELECT u1.user_id FROM users_table_part u2 WHERE u2.user_id = u1.user_id) as foo) as bar ON (true) +$$); + explain_has_distributed_subplan +--------------------------------------------------------------------- + f +(1 row) + +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN WITH cte_1 AS not materialized +( + SELECT user_id + FROM ( + SELECT * + FROM users_table_part) AS l1 + UNION ALL + SELECT user_id + FROM ( + SELECT user_id + FROM users_table_part + GROUP BY user_id) AS l2 + UNION ALL + SELECT user_id + FROM ( + SELECT * + FROM users_table_part) AS l2 + UNION ALL + SELECT user_id + FROM ( + SELECT * + FROM users_table_part) AS l1 + UNION + SELECT user_id + FROM ( + SELECT user_id + FROM ( + SELECT DISTINCT user_id + FROM users_table_part) aa ) AS fooo + UNION + SELECT user_id + FROM ( + SELECT DISTINCT user_id + FROM users_table_part) AS l2 ), cte_2 AS NOT materialized +( + SELECT * + FROM cte_1), cte_3 AS NOT materialized +( + SELECT * + FROM cte_2), cte_4 AS +( + SELECT DISTINCT user_id + FROM cte_3) +SELECT count(*) +FROM ( + SELECT user_id + FROM ( + SELECT * + FROM users_table_part) AS l1 + UNION ALL + SELECT user_id + FROM ( + SELECT * + FROM users_table_part) AS l2 + UNION ALL + SELECT user_id + FROM ( + SELECT *, + random() + FROM users_table_part) AS l2 + UNION ALL + SELECT user_id + FROM ( + SELECT * + FROM users_table_part) AS l1 + UNION + SELECT user_id + FROM ( + SELECT user_id AS user_id + FROM users_table_part) AS l2 + UNION + SELECT user_id + FROM ( + SELECT *, + random() + FROM users_table_part) AS l2 + UNION + SELECT user_id + FROM ( + SELECT * + FROM cte_1) AS bar + UNION ALL + SELECT user_id + FROM cte_2 + UNION + SELECT * + FROM ( + SELECT user_id + FROM cte_2 + GROUP BY user_id) AS bar + UNION + SELECT user_id + FROM cte_4 + UNION + SELECT user_id + FROM cte_3 ) AS foo +JOIN lateral + ( + SELECT * + FROM ( + SELECT user_id + FROM ( + SELECT * + FROM users_table_part) AS l1 + UNION ALL + SELECT user_id + FROM ( + SELECT * + FROM users_table_part) AS l2 + UNION ALL + SELECT user_id + FROM ( + SELECT *, + random() + FROM users_table_part) AS l2 + UNION ALL + SELECT user_id + FROM ( + SELECT * + FROM users_table_part) AS l1 + UNION + SELECT user_id + FROM ( + SELECT user_id AS user_id + FROM users_table_part) AS l2 + UNION + SELECT user_id + FROM ( + SELECT users_table_part.user_id, + random() + FROM users_table_part) AS l2 + UNION + SELECT user_id + FROM ( + SELECT * + FROM cte_1) AS bar + UNION ALL + SELECT user_id + FROM cte_2 + UNION + SELECT * + FROM ( + SELECT user_id + FROM cte_2 + GROUP BY user_id) AS bar + UNION + SELECT user_id + FROM cte_3 ) AS subqu) AS bar +using (user_id) +WHERE ( + foo.user_id) IN + ( + SELECT user_id + FROM ( + SELECT foo.user_id + FROM users_table_part u1 + JOIN lateral + ( + SELECT u1.user_id + FROM users_table_part u2 + WHERE u2.user_id = u1.user_id) AS foo + ON ( + true) + UNION + SELECT foo.user_id + FROM users_table_part u1 + JOIN lateral + ( + SELECT u1.user_id + FROM users_table_part u2 + WHERE u2.user_id = u1.user_id) AS foo + ON ( + true) ) AS bar + UNION + SELECT * + FROM cte_2 + WHERE user_id IN + ( + SELECT user_id + FROM users_table_part) + UNION + SELECT * + FROM cte_1 ) ; +$$); + explain_has_distributed_subplan +--------------------------------------------------------------------- + f +(1 row) + +-- we hit https://github.com/citusdata/citus/blob/f00c63c33daf3d16f06462626ca14732b141ae7a/src/backend/distributed/planner/relation_restriction_equivalence.c#L235-L242 +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN SELECT * FROM users_table_part u1 WHERE (value_1, user_id) IN +( +SELECT u1.user_id, user_id FROM users_table_part +UNION +SELECT u1.user_id, user_id FROM users_table_part +); +$$); +ERROR: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns +CONTEXT: PL/pgSQL function public.explain_has_distributed_subplan(text) line XX at FOR over EXECUTE statement +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN SELECT * FROM users_table_part u1 WHERE (user_id) IN +( +SELECT foo.user_id FROM users_table_part JOIN users_table_part foo ON users_table_part.user_id = foo.user_id +UNION +SELECT foo.user_id FROM users_table_part JOIN users_table_part foo on users_table_part.user_id = foo.user_id +); +$$); + explain_has_distributed_subplan +--------------------------------------------------------------------- + f +(1 row) + +CREATE OR REPLACE VIEW v AS SELECT test_a.* from test_a where k>1 UNION ALL SELECT test_b.* from test_b where k<1; +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN SELECT COUNT(*) FROM v; +$$); + explain_has_distributed_subplan +--------------------------------------------------------------------- + f +(1 row) + +CREATE OR REPLACE VIEW v AS (SELECT * from (SELECT * from test_a)a1 where k>1) UNION ALL SELECT * from (SELECT * from test_b)b1 where k<1; +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN SELECT COUNT(*) FROM v; +$$); + explain_has_distributed_subplan +--------------------------------------------------------------------- + f +(1 row) + -- order by prevents postgres from optimizing fields so can be pushed down SELECT public.explain_has_distributed_subplan($$ EXPLAIN @@ -966,16 +1463,17 @@ SELECT k, COUNT(*) FROM v GROUP BY k ORDER BY k; $$); explain_has_distributed_subplan --------------------------------------------------------------------- - t + f (1 row) RESET client_min_messages; DROP SCHEMA union_pushdown CASCADE; -NOTICE: drop cascades to 7 other objects +NOTICE: drop cascades to 8 other objects DETAIL: drop cascades to table users_table_part drop cascades to table events_table_part drop cascades to table events_table_ref drop cascades to table events_table_local drop cascades to table test_a drop cascades to table test_b +drop cascades to type comp_type drop cascades to view v diff --git a/src/test/regress/multi_schedule b/src/test/regress/multi_schedule index a43098f60..626f7fff5 100644 --- a/src/test/regress/multi_schedule +++ b/src/test/regress/multi_schedule @@ -14,6 +14,7 @@ test: multi_shard_update_delete recursive_dml_with_different_planners_executors test: insert_select_repartition window_functions dml_recursive multi_insert_select_window test: multi_insert_select_conflict citus_table_triggers test: multi_row_insert insert_select_into_local_table multi_create_table_new_features +test: multi_agg_approximate_distinct # following should not run in parallel because it relies on connection counts to workers test: insert_select_connection_leak @@ -65,7 +66,7 @@ test: multi_subquery_in_where_reference_clause adaptive_executor propagate_set_c # this should be run alone as it gets too many clients test: join_pushdown test: multi_subquery_union multi_subquery_in_where_clause multi_subquery_misc statement_cancel_error_message -test: multi_agg_distinct multi_agg_approximate_distinct multi_limit_clause_approximate multi_outer_join_reference multi_single_relation_subquery multi_prepare_plsql set_role_in_transaction +test: multi_agg_distinct multi_limit_clause_approximate multi_outer_join_reference multi_single_relation_subquery multi_prepare_plsql set_role_in_transaction test: multi_reference_table multi_select_for_update relation_access_tracking pg13_with_ties test: custom_aggregate_support aggregate_support tdigest_aggregate_support test: multi_average_expression multi_working_columns multi_having_pushdown having_subquery diff --git a/src/test/regress/sql/multi_agg_approximate_distinct.sql b/src/test/regress/sql/multi_agg_approximate_distinct.sql index d5a535bc1..de799f81c 100644 --- a/src/test/regress/sql/multi_agg_approximate_distinct.sql +++ b/src/test/regress/sql/multi_agg_approximate_distinct.sql @@ -13,6 +13,18 @@ WHERE name = 'hll' :create_cmd; +SELECT public.explain_has_distributed_subplan($$ + EXPLAIN SELECT symbol_id, + HLL_ADD_AGG(HLL_HASH_BIGINT(event_id)) AS event_hll_hash, + HLL_CARDINALITY(HLL_ADD_AGG(HLL_HASH_BIGINT(event_id))) AS event_n_users + FROM ( + SELECT event_time, composite_id, event_id, 4640476 symbol_id FROM "events" + UNION ALL + SELECT event_time, composite_id, event_id, 4640477 symbol_id FROM "events" + ) pushdown_events + GROUP BY symbol_id; + $$); + SET citus.coordinator_aggregation_strategy TO 'disabled'; -- Try to execute count(distinct) when approximate distincts aren't enabled diff --git a/src/test/regress/sql/union_pushdown.sql b/src/test/regress/sql/union_pushdown.sql index c40f7c6fd..884d93600 100644 --- a/src/test/regress/sql/union_pushdown.sql +++ b/src/test/regress/sql/union_pushdown.sql @@ -655,11 +655,92 @@ LIMIT 1; $$); -- #4781 -CREATE TABLE test_a (id int, k int); -CREATE TABLE test_b (id int, k int); +CREATE TABLE test_a (a int, b int, id int, k int); +CREATE TABLE test_b (a int, b int, id int, k int); +ALTER TABLE test_a DROP column a; +ALTER TABLE test_b DROP column a; SELECT create_distributed_table('test_a','id'); SELECT create_distributed_table('test_b','id'); +-- try with composite types +CREATE TYPE comp_type AS ( + int_field_1 BIGINT, + int_field_2 BIGINT +); + +CREATE TABLE range_dist_table_2 (dist_col comp_type); +SELECT create_distributed_table('range_dist_table_2', 'dist_col', 'range'); + +CALL public.create_range_partitioned_shards('range_dist_table_2', + '{"(10,24)","(10,58)", + "(10,90)","(20,100)"}', + '{"(10,25)","(10,65)", + "(10,99)","(20,100)"}'); +INSERT INTO range_dist_table_2 VALUES ((10, 24)); +INSERT INTO range_dist_table_2 VALUES ((10, 60)); +INSERT INTO range_dist_table_2 VALUES ((10, 91)); +INSERT INTO range_dist_table_2 VALUES ((20, 100)); +-- the following can be pushed down +CREATE OR REPLACE VIEW v2 AS SELECT * from range_dist_table_2 UNION ALL SELECT * from range_dist_table_2; +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN +SELECT COUNT(dist_col) FROM v2; +$$); + +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN +SELECT COUNT(*) FROM v2; +$$); + +DROP TABLE range_dist_table_2 cascade; + +-- these should be pushed down. +SELECT public.explain_has_distributed_subplan($$ +explain SELECT * FROM users_table_part u1 WHERE (user_id) IN +( +SELECT user_id FROM users_table_part +UNION +SELECT user_id FROM users_table_part +) ; +$$); + +SELECT public.explain_has_distributed_subplan($$ +explain SELECT * FROM users_table_part u1 WHERE (user_id) IN +( +SELECT user_id FROM (SELECT *, random() FROM users_table_part) as foo +UNION +SELECT user_id FROM (SELECT *, random() FROM users_table_part) as bar +); +$$); + +CREATE OR REPLACE VIEW v AS SELECT * from test_a where k>1 UNION SELECT * from test_b where k<1; +-- tests with union +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN +SELECT COUNT(*) FROM v; +$$); + +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN +SELECT AVG(k) FROM v; +$$); + +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN +SELECT SUM(k) FROM v; +$$); + +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN +SELECT MAX(k) FROM v; +$$); + +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN +SELECT COUNT(k) FROM v; +$$); + + CREATE OR REPLACE VIEW v AS SELECT * from test_a where k>1 UNION ALL SELECT * from test_b where k<1; -- the followings can be pushed down since dist_key is used in the aggregation SELECT public.explain_has_distributed_subplan($$ @@ -682,7 +763,6 @@ EXPLAIN SELECT MAX(id) FROM v; $$); --- cannot pushed down because postgres optimizes fields, needs to be fixed with #4781 SELECT public.explain_has_distributed_subplan($$ EXPLAIN SELECT COUNT(k) FROM v; @@ -703,6 +783,319 @@ EXPLAIN SELECT MAX(k) FROM v; $$); +CREATE OR REPLACE VIEW v AS (SELECT * from (SELECT * from test_a)a1 where k>1) UNION ALL SELECT * from (SELECT * from test_b)b1 where k<1; + +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN +SELECT COUNT(*) FROM v; +$$); + +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN +SELECT AVG(k) FROM v; +$$); + +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN +SELECT SUM(k) FROM v; +$$); + +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN +SELECT MAX(k) FROM v; +$$); + +CREATE OR REPLACE VIEW v AS SELECT * from test_a where k<1 UNION ALL SELECT * from test_b where k<1; +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN +SELECT COUNT(*) FROM v; +$$); + +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN +SELECT AVG(k) FROM v; +$$); + +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN +SELECT SUM(k) FROM v; +$$); + +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN +SELECT MAX(k) FROM v; +$$); + +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN +SELECT count(*) FROM +( +SELECT bar.user_id FROM users_table_part u1 JOIN LATERAL (SELECT u1.user_id FROM users_table_part u2 WHERE u2.user_id = u1.user_id) as bar ON (true) +UNION +SELECT bar.user_id FROM users_table_part u1 JOIN LATERAL (SELECT u1.user_id FROM users_table_part u2 WHERE u2.user_id = u1.user_id) as bar ON (true) +) as bar; +$$); + +-- we hit the following error hence can't pushdown: +-- Complex subqueries and CTEs are not supported within a UNION +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN WITH cte AS ( +SELECT * FROM +( +SELECT bar.user_id FROM users_table_part u1 JOIN LATERAL (SELECT u1.user_id FROM users_table_part u2 WHERE u2.user_id = u1.user_id) as bar ON (true) +UNION +SELECT bar.user_id FROM users_table_part u1 JOIN LATERAL (SELECT u1.user_id FROM users_table_part u2 WHERE u2.user_id = u1.user_id) as bar ON (true) +) as bar ) +SELECT count(*) FROM +( +SELECT bar.user_id FROM cte u1 JOIN LATERAL (SELECT u1.user_id FROM cte u2 WHERE u2.user_id = u1.user_id) as bar ON (true) +UNION +SELECT bar.user_id FROM cte u1 JOIN LATERAL (SELECT u1.user_id FROM cte u2 WHERE u2.user_id = u1.user_id) as bar ON (true) +) as foo; +$$); + +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN +SELECT count(*) FROM +( +SELECT bar.user_id FROM users_table_part u1 JOIN LATERAL (SELECT u1.user_id, random() FROM users_table_part u2 WHERE u2.user_id = u1.user_id) as bar ON (true) +UNION +SELECT bar.user_id FROM users_table_part u1 JOIN LATERAL (SELECT u1.user_id, random() FROM users_table_part u2 WHERE u2.user_id = u1.user_id) as bar ON (true) +) as bar; +$$); + +SELECT public.explain_has_distributed_subplan($$ +explain +SELECT count(*) FROM +( +SELECT bar.user_id FROM users_table_part u1 JOIN LATERAL (SELECT u2.user_id FROM users_table_part u2 WHERE u2.user_id = u1.user_id) as bar ON (true) +UNION +SELECT bar.user_id FROM users_table_part u1 JOIN LATERAL (SELECT u2.user_id FROM users_table_part u2 WHERE u2.user_id = u1.user_id) as bar ON (true) +) as bar; +$$); + +SELECT public.explain_has_distributed_subplan($$ +explain +SELECT bar.user_id FROM users_table_part u1 JOIN LATERAL (SELECT *,random() FROM (SELECT u1.user_id FROM users_table_part u2 WHERE u2.user_id = u1.user_id) as foo) as bar ON (true) +$$); + +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN WITH cte_1 AS not materialized +( + SELECT user_id + FROM ( + SELECT * + FROM users_table_part) AS l1 + UNION ALL + SELECT user_id + FROM ( + SELECT user_id + FROM users_table_part + GROUP BY user_id) AS l2 + UNION ALL + SELECT user_id + FROM ( + SELECT * + FROM users_table_part) AS l2 + UNION ALL + SELECT user_id + FROM ( + SELECT * + FROM users_table_part) AS l1 + UNION + SELECT user_id + FROM ( + SELECT user_id + FROM ( + SELECT DISTINCT user_id + FROM users_table_part) aa ) AS fooo + UNION + SELECT user_id + FROM ( + SELECT DISTINCT user_id + FROM users_table_part) AS l2 ), cte_2 AS NOT materialized +( + SELECT * + FROM cte_1), cte_3 AS NOT materialized +( + SELECT * + FROM cte_2), cte_4 AS +( + SELECT DISTINCT user_id + FROM cte_3) +SELECT count(*) +FROM ( + SELECT user_id + FROM ( + SELECT * + FROM users_table_part) AS l1 + UNION ALL + SELECT user_id + FROM ( + SELECT * + FROM users_table_part) AS l2 + UNION ALL + SELECT user_id + FROM ( + SELECT *, + random() + FROM users_table_part) AS l2 + UNION ALL + SELECT user_id + FROM ( + SELECT * + FROM users_table_part) AS l1 + UNION + SELECT user_id + FROM ( + SELECT user_id AS user_id + FROM users_table_part) AS l2 + UNION + SELECT user_id + FROM ( + SELECT *, + random() + FROM users_table_part) AS l2 + UNION + SELECT user_id + FROM ( + SELECT * + FROM cte_1) AS bar + UNION ALL + SELECT user_id + FROM cte_2 + UNION + SELECT * + FROM ( + SELECT user_id + FROM cte_2 + GROUP BY user_id) AS bar + UNION + SELECT user_id + FROM cte_4 + UNION + SELECT user_id + FROM cte_3 ) AS foo +JOIN lateral + ( + SELECT * + FROM ( + SELECT user_id + FROM ( + SELECT * + FROM users_table_part) AS l1 + UNION ALL + SELECT user_id + FROM ( + SELECT * + FROM users_table_part) AS l2 + UNION ALL + SELECT user_id + FROM ( + SELECT *, + random() + FROM users_table_part) AS l2 + UNION ALL + SELECT user_id + FROM ( + SELECT * + FROM users_table_part) AS l1 + UNION + SELECT user_id + FROM ( + SELECT user_id AS user_id + FROM users_table_part) AS l2 + UNION + SELECT user_id + FROM ( + SELECT users_table_part.user_id, + random() + FROM users_table_part) AS l2 + UNION + SELECT user_id + FROM ( + SELECT * + FROM cte_1) AS bar + UNION ALL + SELECT user_id + FROM cte_2 + UNION + SELECT * + FROM ( + SELECT user_id + FROM cte_2 + GROUP BY user_id) AS bar + UNION + SELECT user_id + FROM cte_3 ) AS subqu) AS bar +using (user_id) +WHERE ( + foo.user_id) IN + ( + SELECT user_id + FROM ( + SELECT foo.user_id + FROM users_table_part u1 + JOIN lateral + ( + SELECT u1.user_id + FROM users_table_part u2 + WHERE u2.user_id = u1.user_id) AS foo + ON ( + true) + UNION + SELECT foo.user_id + FROM users_table_part u1 + JOIN lateral + ( + SELECT u1.user_id + FROM users_table_part u2 + WHERE u2.user_id = u1.user_id) AS foo + ON ( + true) ) AS bar + UNION + SELECT * + FROM cte_2 + WHERE user_id IN + ( + SELECT user_id + FROM users_table_part) + UNION + SELECT * + FROM cte_1 ) ; +$$); + + +-- we hit https://github.com/citusdata/citus/blob/f00c63c33daf3d16f06462626ca14732b141ae7a/src/backend/distributed/planner/relation_restriction_equivalence.c#L235-L242 +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN SELECT * FROM users_table_part u1 WHERE (value_1, user_id) IN +( +SELECT u1.user_id, user_id FROM users_table_part +UNION +SELECT u1.user_id, user_id FROM users_table_part +); +$$); + +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN SELECT * FROM users_table_part u1 WHERE (user_id) IN +( +SELECT foo.user_id FROM users_table_part JOIN users_table_part foo ON users_table_part.user_id = foo.user_id +UNION +SELECT foo.user_id FROM users_table_part JOIN users_table_part foo on users_table_part.user_id = foo.user_id +); +$$); + +CREATE OR REPLACE VIEW v AS SELECT test_a.* from test_a where k>1 UNION ALL SELECT test_b.* from test_b where k<1; + +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN SELECT COUNT(*) FROM v; +$$); + +CREATE OR REPLACE VIEW v AS (SELECT * from (SELECT * from test_a)a1 where k>1) UNION ALL SELECT * from (SELECT * from test_b)b1 where k<1; +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN SELECT COUNT(*) FROM v; +$$); + -- order by prevents postgres from optimizing fields so can be pushed down SELECT public.explain_has_distributed_subplan($$ EXPLAIN