From e16805215da247e8573cbadb1486efa3d61f5cd5 Mon Sep 17 00:00:00 2001 From: Murat Tuncer Date: Mon, 30 Oct 2017 13:12:24 +0200 Subject: [PATCH] Support count(distinct) for non-partition columns (#1692) Expands count distinct coverage by allowing more cases. We used to support count distinct only if we can push down distinct aggregate to worker query i.e. the count distinct clause was on the partition column of the table, or there was a grouping on the partition column. Now we can support - non-partition columns, with or without grouping on partition column - partition, and non partition column in the same query - having clause - single table subqueries - insert into select queries - join queries where count distinct is on partition, or non-partition column - filters on count distinct clauses (extends existing support) We first try to push down aggregate to worker query (original case), if we can't then we modify worker query to return distinct columns to coordinator node. We do that by adding distinct column targets to group by clauses. Then we perform count distinct operation on the coordinator node. This work should reduce the cases where HLL is used as it can address anything that HLL can. However, if we start having performance issues due to very large number rows, then we can recommend hll use. --- .../planner/multi_logical_optimizer.c | 347 +++++++++--- .../planner/multi_master_planner.c | 74 ++- .../multi_agg_approximate_distinct.out | 16 +- .../multi_agg_approximate_distinct_0.out | 16 +- src/test/regress/expected/multi_view.out | 17 +- .../regress/input/multi_agg_distinct.source | 32 +- .../input/multi_complex_count_distinct.source | 170 +++++- .../regress/output/multi_agg_distinct.source | 54 +- .../multi_complex_count_distinct.source | 495 +++++++++++++++++- src/test/regress/sql/multi_view.sql | 6 +- 10 files changed, 1074 insertions(+), 153 deletions(-) diff --git a/src/backend/distributed/planner/multi_logical_optimizer.c b/src/backend/distributed/planner/multi_logical_optimizer.c index 03bd54cbd..2b349c0cd 100644 --- a/src/backend/distributed/planner/multi_logical_optimizer.c +++ b/src/backend/distributed/planner/multi_logical_optimizer.c @@ -62,15 +62,15 @@ double CountDistinctErrorRate = 0.0; /* precision of count(distinct) approximate typedef struct MasterAggregateWalkerContext { - bool repartitionSubquery; AttrNumber columnId; + bool pullDistinctColumns; } MasterAggregateWalkerContext; typedef struct WorkerAggregateWalkerContext { - bool repartitionSubquery; List *expressionList; bool createGroupByClause; + bool pullDistinctColumns; } WorkerAggregateWalkerContext; @@ -117,7 +117,9 @@ static void ApplyExtendedOpNodes(MultiExtendedOp *originalNode, MultiExtendedOp *masterNode, MultiExtendedOp *workerNode); static void TransformSubqueryNode(MultiTable *subqueryNode, List *tableNodeList); -static MultiExtendedOp * MasterExtendedOpNode(MultiExtendedOp *originalOpNode); +static MultiExtendedOp * MasterExtendedOpNode(MultiExtendedOp *originalOpNode, + bool groupedByDisjointPartitionColumn, + List *tableNodeList); static Node * MasterAggregateMutator(Node *originalNode, MasterAggregateWalkerContext *walkerContext); static Expr * MasterAggregateExpression(Aggref *originalAggregate, @@ -126,7 +128,14 @@ static Expr * MasterAverageExpression(Oid sumAggregateType, Oid countAggregateTy AttrNumber *columnId); static Expr * AddTypeConversion(Node *originalAggregate, Node *newExpression); static MultiExtendedOp * WorkerExtendedOpNode(MultiExtendedOp *originalOpNode, - bool groupedByDisjointPartitionColumn); + bool groupedByDisjointPartitionColumn, + List *tableNodeList); +static bool HasNonPartitionColumnDistinctAgg(List *targetEntryList, Node *havingQual, + List *tableNodeList); +static bool ShouldPullDistinctColumn(bool repartitionSubquery, + bool groupedByDisjointPartitionColumn, + bool hasNonPartitionColumnDistinctAgg); +static bool PartitionColumnInTableList(Var *column, List *tableNodeList); static bool WorkerAggregateWalker(Node *node, WorkerAggregateWalkerContext *walkerContext); static List * WorkerAggregateExpressionList(Aggref *originalAggregate, @@ -135,6 +144,7 @@ static AggregateType GetAggregateType(Oid aggFunctionId); static Oid AggregateArgumentType(Aggref *aggregate); static Oid AggregateFunctionOid(const char *functionName, Oid inputType); static Oid TypeOid(Oid schemaId, const char *typeName); +static SortGroupClause * CreateSortGroupClause(Var *column); /* Local functions forward declarations for count(distinct) approximations */ static char * CountDistinctHashFunctionName(Oid argumentType); @@ -261,9 +271,12 @@ MultiLogicalPlanOptimize(MultiTreeRoot *multiLogicalPlan) groupedByDisjointPartitionColumn = GroupedByDisjointPartitionColumn(tableNodeList, extendedOpNode); - masterExtendedOpNode = MasterExtendedOpNode(extendedOpNode); + masterExtendedOpNode = MasterExtendedOpNode(extendedOpNode, + groupedByDisjointPartitionColumn, + tableNodeList); workerExtendedOpNode = WorkerExtendedOpNode(extendedOpNode, - groupedByDisjointPartitionColumn); + groupedByDisjointPartitionColumn, + tableNodeList); ApplyExtendedOpNodes(extendedOpNode, masterExtendedOpNode, workerExtendedOpNode); @@ -1168,9 +1181,12 @@ TransformSubqueryNode(MultiTable *subqueryNode, List *tableNodeList) MultiNode *collectChildNode = ChildNode((MultiUnaryNode *) collectNode); bool groupedByDisjointPartitionColumn = GroupedByDisjointPartitionColumn(tableNodeList, extendedOpNode); - MultiExtendedOp *masterExtendedOpNode = MasterExtendedOpNode(extendedOpNode); + MultiExtendedOp *masterExtendedOpNode = + MasterExtendedOpNode(extendedOpNode, groupedByDisjointPartitionColumn, + tableNodeList); MultiExtendedOp *workerExtendedOpNode = - WorkerExtendedOpNode(extendedOpNode, groupedByDisjointPartitionColumn); + WorkerExtendedOpNode(extendedOpNode, groupedByDisjointPartitionColumn, + tableNodeList); MultiPartition *partitionNode = CitusMakeNode(MultiPartition); List *groupClauseList = extendedOpNode->groupClauseList; List *targetEntryList = extendedOpNode->targetList; @@ -1232,7 +1248,9 @@ TransformSubqueryNode(MultiTable *subqueryNode, List *tableNodeList) * worker nodes' results. */ static MultiExtendedOp * -MasterExtendedOpNode(MultiExtendedOp *originalOpNode) +MasterExtendedOpNode(MultiExtendedOp *originalOpNode, + bool groupedByDisjointPartitionColumn, + List *tableNodeList) { MultiExtendedOp *masterExtendedOpNode = NULL; List *targetEntryList = originalOpNode->targetList; @@ -1244,15 +1262,23 @@ MasterExtendedOpNode(MultiExtendedOp *originalOpNode) MultiNode *childNode = ChildNode((MultiUnaryNode *) originalOpNode); MasterAggregateWalkerContext *walkerContext = palloc0( sizeof(MasterAggregateWalkerContext)); + bool hasNonPartitionColumnDistinctAgg = false; + bool repartitionSubquery = false; walkerContext->columnId = 1; - walkerContext->repartitionSubquery = false; if (CitusIsA(parentNode, MultiTable) && CitusIsA(childNode, MultiCollect)) { - walkerContext->repartitionSubquery = true; + repartitionSubquery = true; } + hasNonPartitionColumnDistinctAgg = HasNonPartitionColumnDistinctAgg(targetEntryList, + originalHavingQual, + tableNodeList); + walkerContext->pullDistinctColumns = + ShouldPullDistinctColumn(repartitionSubquery, groupedByDisjointPartitionColumn, + hasNonPartitionColumnDistinctAgg); + /* iterate over original target entries */ foreach(targetEntryCell, targetEntryList) { @@ -1266,7 +1292,6 @@ MasterExtendedOpNode(MultiExtendedOp *originalOpNode) { Node *newNode = MasterAggregateMutator((Node *) originalExpression, walkerContext); - newExpression = (Expr *) newNode; } else @@ -1379,7 +1404,7 @@ MasterAggregateExpression(Aggref *originalAggregate, if (aggregateType == AGGREGATE_COUNT && originalAggregate->aggdistinct && CountDistinctErrorRate == DISABLE_DISTINCT_APPROXIMATION && - walkerContext->repartitionSubquery) + walkerContext->pullDistinctColumns) { Aggref *aggregate = (Aggref *) copyObject(originalAggregate); List *varList = pull_var_clause_default((Node *) aggregate); @@ -1769,13 +1794,14 @@ AddTypeConversion(Node *originalAggregate, Node *newExpression) * function to create aggregates for the worker nodes. Also, the function checks * if we can push down the limit to worker nodes; and if we can, sets the limit * count and sort clause list fields in the new operator node. It provides special - * treatment for count distinct operator if it is used in repartition subqueries. - * Each column in count distinct aggregate is added to target list, and group by - * list of worker extended operator. + * treatment for count distinct operator if it is used in repartition subqueries + * or on non-partition columns. Each column in count distinct aggregate is added + * to target list, and group by list of worker extended operator. */ static MultiExtendedOp * WorkerExtendedOpNode(MultiExtendedOp *originalOpNode, - bool groupedByDisjointPartitionColumn) + bool groupedByDisjointPartitionColumn, + List *tableNodeList) { MultiExtendedOp *workerExtendedOpNode = NULL; MultiNode *parentNode = ParentNode((MultiNode *) originalOpNode); @@ -1790,28 +1816,37 @@ WorkerExtendedOpNode(MultiExtendedOp *originalOpNode, palloc0(sizeof(WorkerAggregateWalkerContext)); Index nextSortGroupRefIndex = 0; bool queryHasAggregates = false; + bool enableLimitPushdown = true; + bool hasNonPartitionColumnDistinctAgg = false; + bool repartitionSubquery = false; - walkerContext->repartitionSubquery = false; walkerContext->expressionList = NIL; + /* find max of sort group ref index */ + foreach(targetEntryCell, targetEntryList) + { + TargetEntry *targetEntry = (TargetEntry *) lfirst(targetEntryCell); + if (targetEntry->ressortgroupref > nextSortGroupRefIndex) + { + nextSortGroupRefIndex = targetEntry->ressortgroupref; + } + } + + /* next group ref index starts from max group ref index + 1 */ + nextSortGroupRefIndex++; + if (CitusIsA(parentNode, MultiTable) && CitusIsA(childNode, MultiCollect)) { - walkerContext->repartitionSubquery = true; - - /* find max of sort group ref index */ - foreach(targetEntryCell, targetEntryList) - { - TargetEntry *targetEntry = (TargetEntry *) lfirst(targetEntryCell); - if (targetEntry->ressortgroupref > nextSortGroupRefIndex) - { - nextSortGroupRefIndex = targetEntry->ressortgroupref; - } - } - - /* next group ref index starts from max group ref index + 1 */ - nextSortGroupRefIndex++; + repartitionSubquery = true; } + hasNonPartitionColumnDistinctAgg = HasNonPartitionColumnDistinctAgg(targetEntryList, + havingQual, + tableNodeList); + walkerContext->pullDistinctColumns = + ShouldPullDistinctColumn(repartitionSubquery, groupedByDisjointPartitionColumn, + hasNonPartitionColumnDistinctAgg); + /* iterate over original target entries */ foreach(targetEntryCell, targetEntryList) { @@ -1821,6 +1856,7 @@ WorkerExtendedOpNode(MultiExtendedOp *originalOpNode, ListCell *newExpressionCell = NULL; bool hasAggregates = contain_agg_clause((Node *) originalExpression); + /* reset walker context */ walkerContext->expressionList = NIL; walkerContext->createGroupByClause = false; @@ -1852,26 +1888,19 @@ WorkerExtendedOpNode(MultiExtendedOp *originalOpNode, */ if (IsA(newExpression, Var) && walkerContext->createGroupByClause) { - Var *column = (Var *) newExpression; - Oid lessThanOperator = InvalidOid; - Oid equalsOperator = InvalidOid; - bool hashable = false; - SortGroupClause *groupByClause = makeNode(SortGroupClause); - - get_sort_group_operators(column->vartype, true, true, true, - &lessThanOperator, &equalsOperator, NULL, - &hashable); - groupByClause->eqop = equalsOperator; - groupByClause->hashable = hashable; - groupByClause->nulls_first = false; - groupByClause->sortop = lessThanOperator; + Var *column = (Var *) newTargetEntry->expr; + SortGroupClause *groupByClause = CreateSortGroupClause(column); + newTargetEntry->ressortgroupref = nextSortGroupRefIndex; groupByClause->tleSortGroupRef = nextSortGroupRefIndex; groupClauseList = lappend(groupClauseList, groupByClause); - - newTargetEntry->ressortgroupref = nextSortGroupRefIndex; - nextSortGroupRefIndex++; + + /* + * If we introduce new columns accompanied by a new group by clause, + * than pushing down limits will cause incorrect results. + */ + enableLimitPushdown = false; } if (newTargetEntry->resname == NULL) @@ -1921,6 +1950,23 @@ WorkerExtendedOpNode(MultiExtendedOp *originalOpNode, newTargetEntry->resjunk = false; newTargetEntry->resno = targetProjectionNumber; + if (IsA(newExpression, Var) && walkerContext->createGroupByClause) + { + Var *column = (Var *) newTargetEntry->expr; + SortGroupClause *groupByClause = CreateSortGroupClause(column); + newTargetEntry->ressortgroupref = nextSortGroupRefIndex; + groupByClause->tleSortGroupRef = nextSortGroupRefIndex; + + groupClauseList = lappend(groupClauseList, groupByClause); + nextSortGroupRefIndex++; + + /* + * If we introduce new columns accompanied by a new group by clause, + * than pushing down limits will cause incorrect results. + */ + enableLimitPushdown = false; + } + newTargetEntryList = lappend(newTargetEntryList, newTargetEntry); targetProjectionNumber++; } @@ -1939,11 +1985,14 @@ WorkerExtendedOpNode(MultiExtendedOp *originalOpNode, workerExtendedOpNode->groupClauseList = groupClauseList; - /* if we can push down the limit, also set related fields */ - workerExtendedOpNode->limitCount = WorkerLimitCount(originalOpNode, - groupedByDisjointPartitionColumn); - workerExtendedOpNode->sortClauseList = - WorkerSortClauseList(originalOpNode, groupedByDisjointPartitionColumn); + if (enableLimitPushdown) + { + /* if we can push down the limit, also set related fields */ + workerExtendedOpNode->limitCount = WorkerLimitCount(originalOpNode, + groupedByDisjointPartitionColumn); + workerExtendedOpNode->sortClauseList = + WorkerSortClauseList(originalOpNode, groupedByDisjointPartitionColumn); + } /* * If grouped by a partition column whose values are shards have disjoint sets @@ -1958,6 +2007,127 @@ WorkerExtendedOpNode(MultiExtendedOp *originalOpNode, } +/* + * HasNonPartitionColumnDistinctAgg returns true if target entry or having qualifier + * has non-partition column reference in aggregate (distinct) definition. Note that, + * it only checks aggs subfield of Aggref, it does not check FILTER or SORT clauses. + */ +static bool +HasNonPartitionColumnDistinctAgg(List *targetEntryList, Node *havingQual, + List *tableNodeList) +{ + List *targetVarList = pull_var_clause((Node *) targetEntryList, + PVC_INCLUDE_AGGREGATES); + List *havingVarList = pull_var_clause((Node *) havingQual, PVC_INCLUDE_AGGREGATES); + List *aggregateCheckList = list_concat(targetVarList, havingVarList); + + ListCell *aggregateCheckCell = NULL; + foreach(aggregateCheckCell, aggregateCheckList) + { + Node *targetNode = lfirst(aggregateCheckCell); + Aggref *targetAgg = NULL; + List *varList = NIL; + ListCell *varCell = NULL; + bool isPartitionColumn = false; + + if (IsA(targetNode, Var)) + { + continue; + } + + Assert(IsA(targetNode, Aggref)); + targetAgg = (Aggref *) targetNode; + if (targetAgg->aggdistinct == NIL) + { + continue; + } + + varList = pull_var_clause_default((Node *) targetAgg->args); + foreach(varCell, varList) + { + Node *targetVar = (Node *) lfirst(varCell); + + Assert(IsA(targetVar, Var)); + + isPartitionColumn = + PartitionColumnInTableList((Var *) targetVar, tableNodeList); + + if (!isPartitionColumn) + { + return true; + } + } + } + + return false; +} + + +/* + * ShouldPullDistinctColumn returns true if distinct aggregate should pull + * individual columns from worker to master and evaluate aggregate operation + * at master. + * + * Pull cases are: + * - repartition subqueries + * - query has count distinct on a non-partition column on at least one target + * - count distinct is on a non-partition column and query is not + * grouped on partition column + */ +static bool +ShouldPullDistinctColumn(bool repartitionSubquery, + bool groupedByDisjointPartitionColumn, + bool hasNonPartitionColumnDistinctAgg) +{ + if (repartitionSubquery) + { + return true; + } + + if (groupedByDisjointPartitionColumn) + { + return false; + } + else if (!groupedByDisjointPartitionColumn && hasNonPartitionColumnDistinctAgg) + { + return true; + } + + return false; +} + + +/* + * PartitionColumnInTableList returns true if provided column is a partition + * column from provided table node list. It also returns false if a column is + * partition column of an append distributed table. + */ +static bool +PartitionColumnInTableList(Var *column, List *tableNodeList) +{ + ListCell *tableNodeCell = NULL; + foreach(tableNodeCell, tableNodeList) + { + MultiTable *tableNode = lfirst(tableNodeCell); + Var *partitionColumn = tableNode->partitionColumn; + + if (partitionColumn != NULL && + partitionColumn->varno == column->varno && + partitionColumn->varattno == column->varattno) + { + Assert(partitionColumn->varno == tableNode->rangeTableId); + + if (PartitionMethod(tableNode->relationId) != DISTRIBUTE_BY_APPEND) + { + return true; + } + } + } + + return false; +} + + /* * WorkerAggregateWalker walks over the original target entry expression, and * creates the list of expression trees (potentially more than one) to execute @@ -2016,7 +2186,7 @@ WorkerAggregateExpressionList(Aggref *originalAggregate, if (aggregateType == AGGREGATE_COUNT && originalAggregate->aggdistinct && CountDistinctErrorRate == DISABLE_DISTINCT_APPROXIMATION && - walkerContext->repartitionSubquery) + walkerContext->pullDistinctColumns) { Aggref *aggregate = (Aggref *) copyObject(originalAggregate); List *columnList = pull_var_clause_default((Node *) aggregate); @@ -2355,6 +2525,31 @@ GroupedByDisjointPartitionColumn(List *tableNodeList, MultiExtendedOp *opNode) } +/* + * CreateSortGroupClause creates SortGroupClause for a given column Var. + * The caller should set tleSortGroupRef field and respective + * TargetEntry->ressortgroupref fields to appropriate SortGroupRefIndex. + */ +static SortGroupClause * +CreateSortGroupClause(Var *column) +{ + Oid lessThanOperator = InvalidOid; + Oid equalsOperator = InvalidOid; + bool hashable = false; + SortGroupClause *groupByClause = makeNode(SortGroupClause); + + get_sort_group_operators(column->vartype, true, true, true, + &lessThanOperator, &equalsOperator, NULL, + &hashable); + groupByClause->eqop = equalsOperator; + groupByClause->hashable = hashable; + groupByClause->nulls_first = false; + groupByClause->sortop = lessThanOperator; + + return groupByClause; +} + + /* * CountDistinctHashFunctionName resolves the hll_hash function name to use for * the given input type, and returns this function name. @@ -2645,28 +2840,34 @@ ErrorIfUnsupportedAggregateDistinct(Aggref *aggregateExpression, extendedOpNode = (MultiExtendedOp *) linitial(extendedOpNodeList); distinctColumn = AggregateDistinctColumn(aggregateExpression); - if (distinctSupported && distinctColumn == NULL) + if (distinctSupported) { - /* - * If the query has a single table, and table is grouped by partition column, - * then we support count distincts even distinct column can not be identified. - */ - distinctSupported = TablePartitioningSupportsDistinct(tableNodeList, + if (distinctColumn == NULL) + { + /* + * If the query has a single table, and table is grouped by partition + * column, then we support count distincts even distinct column can + * not be identified. + */ + distinctSupported = TablePartitioningSupportsDistinct(tableNodeList, + extendedOpNode, + distinctColumn); + if (!distinctSupported) + { + errorDetail = "aggregate (distinct) on complex expressions is" + " unsupported"; + } + } + else if (aggregateType != AGGREGATE_COUNT) + { + bool supports = TablePartitioningSupportsDistinct(tableNodeList, extendedOpNode, distinctColumn); - if (!distinctSupported) - { - errorDetail = "aggregate (distinct) on complex expressions is unsupported"; - } - } - else if (distinctSupported) - { - bool supports = TablePartitioningSupportsDistinct(tableNodeList, extendedOpNode, - distinctColumn); - if (!supports) - { - distinctSupported = false; - errorDetail = "table partitioning is unsuitable for aggregate (distinct)"; + if (!supports) + { + distinctSupported = false; + errorDetail = "table partitioning is unsuitable for aggregate (distinct)"; + } } } diff --git a/src/backend/distributed/planner/multi_master_planner.c b/src/backend/distributed/planner/multi_master_planner.c index 29d321955..2d7cb672c 100644 --- a/src/backend/distributed/planner/multi_master_planner.c +++ b/src/backend/distributed/planner/multi_master_planner.c @@ -34,6 +34,7 @@ static List * MasterTargetList(List *workerTargetList); static PlannedStmt * BuildSelectStatement(Query *masterQuery, List *masterTargetList, CustomScan *remoteScan); static Agg * BuildAggregatePlan(Query *masterQuery, Plan *subPlan); +static bool HasDistinctAggregate(Query *masterQuery); static Plan * BuildDistinctPlan(Query *masterQuery, Plan *subPlan); @@ -287,13 +288,37 @@ BuildAggregatePlan(Query *masterQuery, Plan *subPlan) /* if we have grouping, then initialize appropriate information */ if (groupColumnCount > 0) { - if (!grouping_is_hashable(groupColumnList)) + bool groupingIsHashable = grouping_is_hashable(groupColumnList); + bool groupingIsSortable = grouping_is_hashable(groupColumnList); + bool hasDistinctAggregate = HasDistinctAggregate(masterQuery); + + if (!groupingIsHashable && !groupingIsSortable) { - ereport(ERROR, (errmsg("grouped column list cannot be hashed"))); + ereport(ERROR, (errmsg("grouped column list cannot be hashed or sorted"))); } - /* switch to hashed aggregate strategy to allow grouping */ - aggregateStrategy = AGG_HASHED; + /* + * Postgres hash aggregate strategy does not support distinct aggregates + * in group and order by with aggregate operations. + * see nodeAgg.c:build_pertrans_for_aggref(). In that case we use + * sorted agg strategy, otherwise we use hash strategy. + */ + if (!groupingIsHashable || hasDistinctAggregate) + { + if (!groupingIsSortable) + { + ereport(ERROR, (errmsg("grouped column list must cannot be sorted"), + errdetail("Having a distinct aggregate requires " + "grouped column list to be sortable."))); + } + + aggregateStrategy = AGG_SORTED; + subPlan = (Plan *) make_sort_from_sortclauses(groupColumnList, subPlan); + } + else + { + aggregateStrategy = AGG_HASHED; + } /* get column indexes that are being grouped */ groupColumnIdArray = extract_grouping_cols(groupColumnList, subPlan->targetlist); @@ -315,6 +340,40 @@ BuildAggregatePlan(Query *masterQuery, Plan *subPlan) } +/* + * HasDistinctAggregate returns true if the query has a distinct + * aggregate in its target list or in having clause. + */ +static bool +HasDistinctAggregate(Query *masterQuery) +{ + List *targetVarList = NIL; + List *havingVarList = NIL; + List *allColumnList = NIL; + ListCell *allColumnCell = NULL; + + targetVarList = pull_var_clause((Node *) masterQuery->targetList, + PVC_INCLUDE_AGGREGATES); + havingVarList = pull_var_clause(masterQuery->havingQual, PVC_INCLUDE_AGGREGATES); + + allColumnList = list_concat(targetVarList, havingVarList); + foreach(allColumnCell, allColumnList) + { + Node *columnNode = lfirst(allColumnCell); + if (IsA(columnNode, Aggref)) + { + Aggref *aggref = (Aggref *) columnNode; + if (aggref->aggdistinct != NIL) + { + return true; + } + } + } + + return false; +} + + /* * BuildDistinctPlan creates an returns a plan for distinct. Depending on * availability of hash function it chooses HashAgg over Sort/Unique @@ -332,6 +391,7 @@ BuildDistinctPlan(Query *masterQuery, Plan *subPlan) List *targetList = copyObject(masterQuery->targetList); List *columnList = pull_var_clause_default((Node *) targetList); ListCell *columnCell = NULL; + bool hasDistinctAggregate = false; if (IsA(subPlan, Agg)) { @@ -353,10 +413,12 @@ BuildDistinctPlan(Query *masterQuery, Plan *subPlan) /* * Create group by plan with HashAggregate if all distinct - * members are hashable, Otherwise create sort+unique plan. + * members are hashable, and not containing distinct aggregate. + * Otherwise create sort+unique plan. */ distinctClausesHashable = grouping_is_hashable(distinctClauseList); - if (distinctClausesHashable) + hasDistinctAggregate = HasDistinctAggregate(masterQuery); + if (distinctClausesHashable && !hasDistinctAggregate) { const long rowEstimate = 10; /* using the same value as BuildAggregatePlan() */ AttrNumber *distinctColumnIdArray = extract_grouping_cols(distinctClauseList, diff --git a/src/test/regress/expected/multi_agg_approximate_distinct.out b/src/test/regress/expected/multi_agg_approximate_distinct.out index 5c426802e..d448060ff 100644 --- a/src/test/regress/expected/multi_agg_approximate_distinct.out +++ b/src/test/regress/expected/multi_agg_approximate_distinct.out @@ -16,9 +16,11 @@ WHERE name = 'hll' \c - - - :master_port -- Try to execute count(distinct) when approximate distincts aren't enabled SELECT count(distinct l_orderkey) FROM lineitem; -ERROR: cannot compute aggregate (distinct) -DETAIL: table partitioning is unsuitable for aggregate (distinct) -HINT: You can load the hll extension from contrib packages and enable distinct approximations. + count +------- + 2985 +(1 row) + -- Check approximate count(distinct) at different precisions / error rates SET citus.count_distinct_error_rate = 0.1; SELECT count(distinct l_orderkey) FROM lineitem; @@ -200,6 +202,8 @@ SELECT -- Check that we can revert config and disable count(distinct) approximations SET citus.count_distinct_error_rate = 0.0; SELECT count(distinct l_orderkey) FROM lineitem; -ERROR: cannot compute aggregate (distinct) -DETAIL: table partitioning is unsuitable for aggregate (distinct) -HINT: You can load the hll extension from contrib packages and enable distinct approximations. + count +------- + 2985 +(1 row) + diff --git a/src/test/regress/expected/multi_agg_approximate_distinct_0.out b/src/test/regress/expected/multi_agg_approximate_distinct_0.out index da6ba329e..6879f5a99 100644 --- a/src/test/regress/expected/multi_agg_approximate_distinct_0.out +++ b/src/test/regress/expected/multi_agg_approximate_distinct_0.out @@ -31,9 +31,11 @@ WHERE name = 'hll' \c - - - :master_port -- Try to execute count(distinct) when approximate distincts aren't enabled SELECT count(distinct l_orderkey) FROM lineitem; -ERROR: cannot compute aggregate (distinct) -DETAIL: table partitioning is unsuitable for aggregate (distinct) -HINT: You can load the hll extension from contrib packages and enable distinct approximations. + count +------- + 2985 +(1 row) + -- Check approximate count(distinct) at different precisions / error rates SET citus.count_distinct_error_rate = 0.1; SELECT count(distinct l_orderkey) FROM lineitem; @@ -147,6 +149,8 @@ HINT: You need to have the hll extension loaded. -- Check that we can revert config and disable count(distinct) approximations SET citus.count_distinct_error_rate = 0.0; SELECT count(distinct l_orderkey) FROM lineitem; -ERROR: cannot compute aggregate (distinct) -DETAIL: table partitioning is unsuitable for aggregate (distinct) -HINT: You can load the hll extension from contrib packages and enable distinct approximations. + count +------- + 2985 +(1 row) + diff --git a/src/test/regress/expected/multi_view.out b/src/test/regress/expected/multi_view.out index 2438fabf7..50c0a5039 100644 --- a/src/test/regress/expected/multi_view.out +++ b/src/test/regress/expected/multi_view.out @@ -107,11 +107,20 @@ SELECT count(*) FROM priority_orders join air_shipped_lineitems ON (o_orderkey = 700 (1 row) --- count distinct on partition column is not supported +-- count distinct on partition column is supported SELECT count(distinct o_orderkey) FROM priority_orders join air_shipped_lineitems ON (o_orderkey = l_orderkey); -ERROR: cannot compute aggregate (distinct) -DETAIL: table partitioning is unsuitable for aggregate (distinct) -HINT: You can load the hll extension from contrib packages and enable distinct approximations. + count +------- + 551 +(1 row) + +-- count distinct on non-partition column is supported +SELECT count(distinct o_orderpriority) FROM priority_orders join air_shipped_lineitems ON (o_orderkey = l_orderkey); + count +------- + 2 +(1 row) + -- count distinct on partition column is supported on router queries SELECT count(distinct o_orderkey) FROM priority_orders join air_shipped_lineitems ON (o_orderkey = l_orderkey) diff --git a/src/test/regress/input/multi_agg_distinct.source b/src/test/regress/input/multi_agg_distinct.source index 0c36c0284..764f5cc2e 100644 --- a/src/test/regress/input/multi_agg_distinct.source +++ b/src/test/regress/input/multi_agg_distinct.source @@ -56,22 +56,20 @@ SELECT p_partkey, count(distinct l_orderkey) FROM lineitem_range, part RESET citus.large_table_shard_count; --- Check that we don't support count(distinct) on non-partition column, and --- complex expressions. - +-- Check that we support count(distinct) on non-partition column. SELECT count(distinct l_partkey) FROM lineitem_range; + +-- Check that we don't support complex expressions. SELECT count(distinct (l_orderkey + 1)) FROM lineitem_range; -- Now test append partitioned tables. First run count(distinct) on a single -- sharded table. - SELECT count(distinct p_mfgr) FROM part; SELECT p_mfgr, count(distinct p_partkey) FROM part GROUP BY p_mfgr ORDER BY p_mfgr; --- We don't support count(distinct) queries if table is append partitioned and --- has multiple shards - -SELECT count(distinct o_orderkey) FROM orders; +-- We support count(distinct) queries on append partitioned tables +-- both on partition column, and non-partition column. +SELECT count(distinct o_orderkey), count(distinct o_custkey) FROM orders; -- Hash partitioned tables: @@ -99,13 +97,13 @@ SELECT master_create_worker_shards('lineitem_hash', 4, 1); \copy lineitem_hash FROM '@abs_srcdir@/data/lineitem.2.data' with delimiter '|' -- aggregate(distinct) on partition column is allowed - SELECT count(distinct l_orderkey) FROM lineitem_hash; SELECT avg(distinct l_orderkey) FROM lineitem_hash; --- count(distinct) on non-partition column or expression is not allowed - +-- count(distinct) on non-partition column is allowed SELECT count(distinct l_partkey) FROM lineitem_hash; + +-- count(distinct) on column expression is not allowed SELECT count(distinct (l_orderkey + 1)) FROM lineitem_hash; -- agg(distinct) is allowed if we group by partition column @@ -115,4 +113,16 @@ SELECT l_orderkey, count(distinct l_partkey) INTO range_results FROM lineitem_ra -- they should return the same results SELECT * FROM hash_results h, range_results r WHERE h.l_orderkey = r.l_orderkey AND h.count != r.count; +-- count(distinct) is allowed if we group by non-partition column +SELECT l_partkey, count(distinct l_orderkey) INTO hash_results_np FROM lineitem_hash GROUP BY l_partkey; +SELECT l_partkey, count(distinct l_orderkey) INTO range_results_np FROM lineitem_range GROUP BY l_partkey; + +-- they should return the same results +SELECT * FROM hash_results_np h, range_results_np r WHERE h.l_partkey = r.l_partkey AND h.count != r.count; + +-- other agg(distinct) are not allowed on non-partition columns even they are grouped +-- on non-partition columns +SELECT SUM(distinct l_partkey) FROM lineitem_hash; +SELECT l_shipmode, sum(distinct l_partkey) FROM lineitem_hash GROUP BY l_shipmode; + DROP TABLE lineitem_hash; diff --git a/src/test/regress/input/multi_complex_count_distinct.source b/src/test/regress/input/multi_complex_count_distinct.source index 1ca2cc617..4fdd47611 100644 --- a/src/test/regress/input/multi_complex_count_distinct.source +++ b/src/test/regress/input/multi_complex_count_distinct.source @@ -35,6 +35,13 @@ SET citus.task_executor_type to "task-tracker"; -- count(distinct) is supported on top level query if there -- is a grouping on the partition key +SELECT + l_orderkey, count(DISTINCT l_partkey) + FROM lineitem_hash + GROUP BY l_orderkey + ORDER BY 2 DESC, 1 DESC + LIMIT 10; +EXPLAIN (COSTS false, VERBOSE true) SELECT l_orderkey, count(DISTINCT l_partkey) FROM lineitem_hash @@ -42,7 +49,14 @@ SELECT ORDER BY 2 DESC, 1 DESC LIMIT 10; --- it is not supported if there is no grouping or grouping is on non-partition field +-- it is also supported if there is no grouping or grouping is on non-partition field +SELECT + count(DISTINCT l_partkey) + FROM lineitem_hash + ORDER BY 1 DESC + LIMIT 10; + +EXPLAIN (COSTS false, VERBOSE true) SELECT count(DISTINCT l_partkey) FROM lineitem_hash @@ -56,6 +70,96 @@ SELECT ORDER BY 2 DESC, 1 DESC LIMIT 10; +EXPLAIN (COSTS false, VERBOSE true) +SELECT + l_shipmode, count(DISTINCT l_partkey) + FROM lineitem_hash + GROUP BY l_shipmode + ORDER BY 2 DESC, 1 DESC + LIMIT 10; + +-- mixed mode count distinct, grouped by partition column +SELECT + l_orderkey, count(distinct l_partkey), count(distinct l_shipmode) + FROM lineitem_hash + GROUP BY l_orderkey + ORDER BY 3 DESC, 2 DESC, 1 + LIMIT 10; + +EXPLAIN (COSTS false, VERBOSE true) +SELECT + l_orderkey, count(distinct l_partkey), count(distinct l_shipmode) + FROM lineitem_hash + GROUP BY l_orderkey + ORDER BY 3 DESC, 2 DESC, 1 + LIMIT 10; + +-- partition/non-partition column count distinct no grouping +SELECT + count(distinct l_orderkey), count(distinct l_partkey), count(distinct l_shipmode) + FROM lineitem_hash; + +EXPLAIN (COSTS false, VERBOSE true) +SELECT + count(distinct l_orderkey), count(distinct l_partkey), count(distinct l_shipmode) + FROM lineitem_hash; + +-- distinct/non-distinct on partition and non-partition columns +SELECT + count(distinct l_orderkey), count(l_orderkey), + count(distinct l_partkey), count(l_partkey), + count(distinct l_shipmode), count(l_shipmode) + FROM lineitem_hash; + +-- mixed mode count distinct, grouped by non-partition column +SELECT + l_shipmode, count(distinct l_partkey), count(distinct l_orderkey) + FROM lineitem_hash + GROUP BY l_shipmode + ORDER BY 1, 2 DESC, 3 DESC; + +-- mixed mode count distinct, grouped by non-partition column +-- having on partition column +SELECT + l_shipmode, count(distinct l_partkey), count(distinct l_orderkey) + FROM lineitem_hash + GROUP BY l_shipmode + HAVING count(distinct l_orderkey) > 1300 + ORDER BY 1, 2 DESC; + +-- same but having clause is not on target list +SELECT + l_shipmode, count(distinct l_partkey) + FROM lineitem_hash + GROUP BY l_shipmode + HAVING count(distinct l_orderkey) > 1300 + ORDER BY 1, 2 DESC; + +-- mixed mode count distinct, grouped by non-partition column +-- having on non-partition column +SELECT + l_shipmode, count(distinct l_partkey), count(distinct l_suppkey) + FROM lineitem_hash + GROUP BY l_shipmode + HAVING count(distinct l_suppkey) > 1550 + ORDER BY 1, 2 DESC; + +-- same but having clause is not on target list +SELECT + l_shipmode, count(distinct l_partkey) + FROM lineitem_hash + GROUP BY l_shipmode + HAVING count(distinct l_suppkey) > 1550 + ORDER BY 1, 2 DESC; + +EXPLAIN (COSTS false, VERBOSE true) +SELECT + l_shipmode, count(distinct l_partkey) + FROM lineitem_hash + GROUP BY l_shipmode + HAVING count(distinct l_suppkey) > 1550 + ORDER BY 1, 2 DESC; + -- count distinct is supported on single table subqueries SELECT * FROM ( @@ -75,14 +179,68 @@ SELECT * ORDER BY 2 DESC, 1 DESC LIMIT 10; --- count distinct with filters -SELECT - l_orderkey, count(DISTINCT l_partkey) FILTER (WHERE l_shipmode = 'AIR') - FROM lineitem_hash - GROUP BY l_orderkey +EXPLAIN (COSTS false, VERBOSE true) +SELECT * + FROM ( + SELECT + l_partkey, count(DISTINCT l_orderkey) + FROM lineitem_hash + GROUP BY l_partkey) sub ORDER BY 2 DESC, 1 DESC LIMIT 10; +-- count distinct with filters +SELECT + l_orderkey, + count(DISTINCT l_suppkey) FILTER (WHERE l_shipmode = 'AIR'), + count(DISTINCT l_suppkey) + FROM lineitem_hash + GROUP BY l_orderkey + ORDER BY 2 DESC, 3 DESC, 1 + LIMIT 10; + +EXPLAIN (COSTS false, VERBOSE true) +SELECT + l_orderkey, + count(DISTINCT l_suppkey) FILTER (WHERE l_shipmode = 'AIR'), + count(DISTINCT l_suppkey) + FROM lineitem_hash + GROUP BY l_orderkey + ORDER BY 2 DESC, 3 DESC, 1 + LIMIT 10; + +-- group by on non-partition column +SELECT + l_suppkey, count(DISTINCT l_partkey) FILTER (WHERE l_shipmode = 'AIR') + FROM lineitem_hash + GROUP BY l_suppkey + ORDER BY 2 DESC, 1 DESC + LIMIT 10; +-- explaining the same query fails +EXPLAIN (COSTS false, VERBOSE true) +SELECT + l_suppkey, count(DISTINCT l_partkey) FILTER (WHERE l_shipmode = 'AIR') + FROM lineitem_hash + GROUP BY l_suppkey + ORDER BY 2 DESC, 1 DESC + LIMIT 10; + +-- without group by, on partition column +SELECT + count(DISTINCT l_orderkey) FILTER (WHERE l_shipmode = 'AIR') + FROM lineitem_hash; + +-- without group by, on non-partition column +SELECT + count(DISTINCT l_partkey) FILTER (WHERE l_shipmode = 'AIR') + FROM lineitem_hash; + +SELECT + count(DISTINCT l_partkey) FILTER (WHERE l_shipmode = 'AIR'), + count(DISTINCT l_partkey), + count(DISTINCT l_shipdate) + FROM lineitem_hash; + -- filter column already exists in target list SELECT * FROM ( diff --git a/src/test/regress/output/multi_agg_distinct.source b/src/test/regress/output/multi_agg_distinct.source index a62e4bd56..e73501541 100644 --- a/src/test/regress/output/multi_agg_distinct.source +++ b/src/test/regress/output/multi_agg_distinct.source @@ -73,12 +73,14 @@ SELECT p_partkey, count(distinct l_orderkey) FROM lineitem_range, part (10 rows) RESET citus.large_table_shard_count; --- Check that we don't support count(distinct) on non-partition column, and --- complex expressions. +-- Check that we support count(distinct) on non-partition column. SELECT count(distinct l_partkey) FROM lineitem_range; -ERROR: cannot compute aggregate (distinct) -DETAIL: table partitioning is unsuitable for aggregate (distinct) -HINT: You can load the hll extension from contrib packages and enable distinct approximations. + count +------- + 11661 +(1 row) + +-- Check that we don't support complex expressions. SELECT count(distinct (l_orderkey + 1)) FROM lineitem_range; ERROR: cannot compute aggregate (distinct) DETAIL: aggregate (distinct) on complex expressions is unsupported @@ -101,12 +103,14 @@ SELECT p_mfgr, count(distinct p_partkey) FROM part GROUP BY p_mfgr ORDER BY p_mf Manufacturer#5 | 185 (5 rows) --- We don't support count(distinct) queries if table is append partitioned and --- has multiple shards -SELECT count(distinct o_orderkey) FROM orders; -ERROR: cannot compute aggregate (distinct) -DETAIL: table partitioning is unsuitable for aggregate (distinct) -HINT: You can load the hll extension from contrib packages and enable distinct approximations. +-- We support count(distinct) queries on append partitioned tables +-- both on partition column, and non-partition column. +SELECT count(distinct o_orderkey), count(distinct o_custkey) FROM orders; + count | count +-------+------- + 2984 | 923 +(1 row) + -- Hash partitioned tables: CREATE TABLE lineitem_hash ( l_orderkey bigint not null, @@ -152,11 +156,14 @@ SELECT avg(distinct l_orderkey) FROM lineitem_hash; 7463.9474036850921273 (1 row) --- count(distinct) on non-partition column or expression is not allowed +-- count(distinct) on non-partition column is allowed SELECT count(distinct l_partkey) FROM lineitem_hash; -ERROR: cannot compute aggregate (distinct) -DETAIL: table partitioning is unsuitable for aggregate (distinct) -HINT: You can load the hll extension from contrib packages and enable distinct approximations. + count +------- + 11661 +(1 row) + +-- count(distinct) on column expression is not allowed SELECT count(distinct (l_orderkey + 1)) FROM lineitem_hash; ERROR: cannot compute aggregate (distinct) DETAIL: aggregate (distinct) on complex expressions is unsupported @@ -170,4 +177,21 @@ SELECT * FROM hash_results h, range_results r WHERE h.l_orderkey = r.l_orderkey ------------+-------+------------+------- (0 rows) +-- count(distinct) is allowed if we group by non-partition column +SELECT l_partkey, count(distinct l_orderkey) INTO hash_results_np FROM lineitem_hash GROUP BY l_partkey; +SELECT l_partkey, count(distinct l_orderkey) INTO range_results_np FROM lineitem_range GROUP BY l_partkey; +-- they should return the same results +SELECT * FROM hash_results_np h, range_results_np r WHERE h.l_partkey = r.l_partkey AND h.count != r.count; + l_partkey | count | l_partkey | count +-----------+-------+-----------+------- +(0 rows) + +-- other agg(distinct) are not allowed on non-partition columns even they are grouped +-- on non-partition columns +SELECT SUM(distinct l_partkey) FROM lineitem_hash; +ERROR: cannot compute aggregate (distinct) +DETAIL: table partitioning is unsuitable for aggregate (distinct) +SELECT l_shipmode, sum(distinct l_partkey) FROM lineitem_hash GROUP BY l_shipmode; +ERROR: cannot compute aggregate (distinct) +DETAIL: table partitioning is unsuitable for aggregate (distinct) DROP TABLE lineitem_hash; diff --git a/src/test/regress/output/multi_complex_count_distinct.source b/src/test/regress/output/multi_complex_count_distinct.source index 59b4045ad..c102b1643 100644 --- a/src/test/regress/output/multi_complex_count_distinct.source +++ b/src/test/regress/output/multi_complex_count_distinct.source @@ -58,24 +58,346 @@ SELECT 14624 | 7 (10 rows) --- it is not supported if there is no grouping or grouping is on non-partition field +EXPLAIN (COSTS false, VERBOSE true) +SELECT + l_orderkey, count(DISTINCT l_partkey) + FROM lineitem_hash + GROUP BY l_orderkey + ORDER BY 2 DESC, 1 DESC + LIMIT 10; + QUERY PLAN +------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ + Limit + Output: remote_scan.l_orderkey, COALESCE((pg_catalog.sum((COALESCE((pg_catalog.sum((COALESCE((pg_catalog.sum(remote_scan.count))::bigint, '0'::bigint))))::bigint, '0'::bigint))))::bigint, '0'::bigint) + -> Sort + Output: remote_scan.l_orderkey, COALESCE((pg_catalog.sum((COALESCE((pg_catalog.sum(remote_scan.count))::bigint, '0'::bigint))))::bigint, '0'::bigint) + Sort Key: COALESCE((pg_catalog.sum((COALESCE((pg_catalog.sum(remote_scan.count))::bigint, '0'::bigint))))::bigint, '0'::bigint) DESC, remote_scan.l_orderkey DESC + -> HashAggregate + Output: remote_scan.l_orderkey, COALESCE((pg_catalog.sum(remote_scan.count))::bigint, '0'::bigint) + Group Key: remote_scan.l_orderkey + -> Custom Scan (Citus Task-Tracker) + Output: remote_scan.l_orderkey, remote_scan.count + Task Count: 8 + Tasks Shown: One of 8 + -> Task + Node: host=localhost port=57637 dbname=regression + -> Limit + Output: l_orderkey, (count(DISTINCT l_partkey)) + -> Sort + Output: l_orderkey, (count(DISTINCT l_partkey)) + Sort Key: (count(DISTINCT lineitem_hash.l_partkey)) DESC, lineitem_hash.l_orderkey DESC + -> GroupAggregate + Output: l_orderkey, count(DISTINCT l_partkey) + Group Key: lineitem_hash.l_orderkey + -> Sort + Output: l_orderkey, l_partkey + Sort Key: lineitem_hash.l_orderkey DESC + -> Seq Scan on public.lineitem_hash_240000 lineitem_hash + Output: l_orderkey, l_partkey +(27 rows) + +-- it is also supported if there is no grouping or grouping is on non-partition field SELECT count(DISTINCT l_partkey) FROM lineitem_hash ORDER BY 1 DESC LIMIT 10; -ERROR: cannot compute aggregate (distinct) -DETAIL: table partitioning is unsuitable for aggregate (distinct) -HINT: You can load the hll extension from contrib packages and enable distinct approximations. + count +------- + 11661 +(1 row) + +EXPLAIN (COSTS false, VERBOSE true) +SELECT + count(DISTINCT l_partkey) + FROM lineitem_hash + ORDER BY 1 DESC + LIMIT 10; + QUERY PLAN +---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- + Limit + Output: count(DISTINCT (count(DISTINCT (count(DISTINCT remote_scan.count))))) + -> Sort + Output: count(DISTINCT (count(DISTINCT remote_scan.count))) + Sort Key: count(DISTINCT (count(DISTINCT remote_scan.count))) DESC + -> Aggregate + Output: count(DISTINCT remote_scan.count) + -> Custom Scan (Citus Task-Tracker) + Output: remote_scan.count + Task Count: 8 + Tasks Shown: One of 8 + -> Task + Node: host=localhost port=57637 dbname=regression + -> HashAggregate + Output: l_partkey + Group Key: lineitem_hash.l_partkey + -> Seq Scan on public.lineitem_hash_240000 lineitem_hash + Output: l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment +(18 rows) + SELECT l_shipmode, count(DISTINCT l_partkey) FROM lineitem_hash GROUP BY l_shipmode ORDER BY 2 DESC, 1 DESC LIMIT 10; -ERROR: cannot compute aggregate (distinct) -DETAIL: table partitioning is unsuitable for aggregate (distinct) -HINT: You can load the hll extension from contrib packages and enable distinct approximations. + l_shipmode | count +------------+------- + TRUCK | 1757 + MAIL | 1730 + AIR | 1702 + FOB | 1700 + RAIL | 1696 + SHIP | 1684 + REG AIR | 1676 +(7 rows) + +EXPLAIN (COSTS false, VERBOSE true) +SELECT + l_shipmode, count(DISTINCT l_partkey) + FROM lineitem_hash + GROUP BY l_shipmode + ORDER BY 2 DESC, 1 DESC + LIMIT 10; + QUERY PLAN +---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- + Limit + Output: remote_scan.l_shipmode, count(DISTINCT (count(DISTINCT (count(DISTINCT remote_scan.count))))) + -> Sort + Output: remote_scan.l_shipmode, count(DISTINCT (count(DISTINCT remote_scan.count))) + Sort Key: count(DISTINCT (count(DISTINCT remote_scan.count))) DESC, remote_scan.l_shipmode DESC + -> GroupAggregate + Output: remote_scan.l_shipmode, count(DISTINCT remote_scan.count) + Group Key: remote_scan.l_shipmode + -> Sort + Output: remote_scan.l_shipmode, remote_scan.count + Sort Key: remote_scan.l_shipmode DESC + -> Custom Scan (Citus Task-Tracker) + Output: remote_scan.l_shipmode, remote_scan.count + Task Count: 8 + Tasks Shown: One of 8 + -> Task + Node: host=localhost port=57637 dbname=regression + -> HashAggregate + Output: l_shipmode, l_partkey + Group Key: lineitem_hash.l_shipmode, lineitem_hash.l_partkey + -> Seq Scan on public.lineitem_hash_240000 lineitem_hash + Output: l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment +(22 rows) + +-- mixed mode count distinct, grouped by partition column +SELECT + l_orderkey, count(distinct l_partkey), count(distinct l_shipmode) + FROM lineitem_hash + GROUP BY l_orderkey + ORDER BY 3 DESC, 2 DESC, 1 + LIMIT 10; + l_orderkey | count | count +------------+-------+------- + 226 | 7 | 7 + 1316 | 7 | 7 + 1477 | 7 | 7 + 3555 | 7 | 7 + 12258 | 7 | 7 + 12835 | 7 | 7 + 768 | 7 | 6 + 1121 | 7 | 6 + 1153 | 7 | 6 + 1281 | 7 | 6 +(10 rows) + +EXPLAIN (COSTS false, VERBOSE true) +SELECT + l_orderkey, count(distinct l_partkey), count(distinct l_shipmode) + FROM lineitem_hash + GROUP BY l_orderkey + ORDER BY 3 DESC, 2 DESC, 1 + LIMIT 10; + QUERY PLAN +---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- + Limit + Output: remote_scan.l_orderkey, COALESCE((pg_catalog.sum((COALESCE((pg_catalog.sum((COALESCE((pg_catalog.sum(remote_scan.count))::bigint, '0'::bigint))))::bigint, '0'::bigint))))::bigint, '0'::bigint), COALESCE((pg_catalog.sum((COALESCE((pg_catalog.sum((COALESCE((pg_catalog.sum(remote_scan.count_1))::bigint, '0'::bigint))))::bigint, '0'::bigint))))::bigint, '0'::bigint) + -> Sort + Output: remote_scan.l_orderkey, COALESCE((pg_catalog.sum((COALESCE((pg_catalog.sum(remote_scan.count))::bigint, '0'::bigint))))::bigint, '0'::bigint), COALESCE((pg_catalog.sum((COALESCE((pg_catalog.sum(remote_scan.count_1))::bigint, '0'::bigint))))::bigint, '0'::bigint) + Sort Key: COALESCE((pg_catalog.sum((COALESCE((pg_catalog.sum(remote_scan.count_1))::bigint, '0'::bigint))))::bigint, '0'::bigint) DESC, COALESCE((pg_catalog.sum((COALESCE((pg_catalog.sum(remote_scan.count))::bigint, '0'::bigint))))::bigint, '0'::bigint) DESC, remote_scan.l_orderkey + -> HashAggregate + Output: remote_scan.l_orderkey, COALESCE((pg_catalog.sum(remote_scan.count))::bigint, '0'::bigint), COALESCE((pg_catalog.sum(remote_scan.count_1))::bigint, '0'::bigint) + Group Key: remote_scan.l_orderkey + -> Custom Scan (Citus Task-Tracker) + Output: remote_scan.l_orderkey, remote_scan.count, remote_scan.count_1 + Task Count: 8 + Tasks Shown: One of 8 + -> Task + Node: host=localhost port=57637 dbname=regression + -> Limit + Output: l_orderkey, (count(DISTINCT l_partkey)), (count(DISTINCT l_shipmode)) + -> Sort + Output: l_orderkey, (count(DISTINCT l_partkey)), (count(DISTINCT l_shipmode)) + Sort Key: (count(DISTINCT lineitem_hash.l_shipmode)) DESC, (count(DISTINCT lineitem_hash.l_partkey)) DESC, lineitem_hash.l_orderkey + -> GroupAggregate + Output: l_orderkey, count(DISTINCT l_partkey), count(DISTINCT l_shipmode) + Group Key: lineitem_hash.l_orderkey + -> Sort + Output: l_orderkey, l_partkey, l_shipmode + Sort Key: lineitem_hash.l_orderkey + -> Seq Scan on public.lineitem_hash_240000 lineitem_hash + Output: l_orderkey, l_partkey, l_shipmode +(27 rows) + +-- partition/non-partition column count distinct no grouping +SELECT + count(distinct l_orderkey), count(distinct l_partkey), count(distinct l_shipmode) + FROM lineitem_hash; + count | count | count +-------+-------+------- + 2985 | 11661 | 7 +(1 row) + +EXPLAIN (COSTS false, VERBOSE true) +SELECT + count(distinct l_orderkey), count(distinct l_partkey), count(distinct l_shipmode) + FROM lineitem_hash; + QUERY PLAN +---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- + Aggregate + Output: count(DISTINCT remote_scan.count), count(DISTINCT remote_scan.count_1), count(DISTINCT remote_scan.count_2) + -> Custom Scan (Citus Task-Tracker) + Output: remote_scan.count, remote_scan.count_1, remote_scan.count_2 + Task Count: 8 + Tasks Shown: One of 8 + -> Task + Node: host=localhost port=57637 dbname=regression + -> HashAggregate + Output: l_orderkey, l_partkey, l_shipmode + Group Key: lineitem_hash.l_orderkey, lineitem_hash.l_partkey, lineitem_hash.l_shipmode + -> Seq Scan on public.lineitem_hash_240000 lineitem_hash + Output: l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment +(13 rows) + +-- distinct/non-distinct on partition and non-partition columns +SELECT + count(distinct l_orderkey), count(l_orderkey), + count(distinct l_partkey), count(l_partkey), + count(distinct l_shipmode), count(l_shipmode) + FROM lineitem_hash; + count | count | count | count | count | count +-------+-------+-------+-------+-------+------- + 2985 | 12000 | 11661 | 12000 | 7 | 12000 +(1 row) + +-- mixed mode count distinct, grouped by non-partition column +SELECT + l_shipmode, count(distinct l_partkey), count(distinct l_orderkey) + FROM lineitem_hash + GROUP BY l_shipmode + ORDER BY 1, 2 DESC, 3 DESC; + l_shipmode | count | count +------------+-------+------- + AIR | 1702 | 1327 + FOB | 1700 | 1276 + MAIL | 1730 | 1299 + RAIL | 1696 | 1265 + REG AIR | 1676 | 1275 + SHIP | 1684 | 1289 + TRUCK | 1757 | 1333 +(7 rows) + +-- mixed mode count distinct, grouped by non-partition column +-- having on partition column +SELECT + l_shipmode, count(distinct l_partkey), count(distinct l_orderkey) + FROM lineitem_hash + GROUP BY l_shipmode + HAVING count(distinct l_orderkey) > 1300 + ORDER BY 1, 2 DESC; + l_shipmode | count | count +------------+-------+------- + AIR | 1702 | 1327 + TRUCK | 1757 | 1333 +(2 rows) + +-- same but having clause is not on target list +SELECT + l_shipmode, count(distinct l_partkey) + FROM lineitem_hash + GROUP BY l_shipmode + HAVING count(distinct l_orderkey) > 1300 + ORDER BY 1, 2 DESC; + l_shipmode | count +------------+------- + AIR | 1702 + TRUCK | 1757 +(2 rows) + +-- mixed mode count distinct, grouped by non-partition column +-- having on non-partition column +SELECT + l_shipmode, count(distinct l_partkey), count(distinct l_suppkey) + FROM lineitem_hash + GROUP BY l_shipmode + HAVING count(distinct l_suppkey) > 1550 + ORDER BY 1, 2 DESC; + l_shipmode | count | count +------------+-------+------- + AIR | 1702 | 1564 + FOB | 1700 | 1571 + MAIL | 1730 | 1573 + RAIL | 1696 | 1581 + REG AIR | 1676 | 1557 + SHIP | 1684 | 1554 + TRUCK | 1757 | 1602 +(7 rows) + +-- same but having clause is not on target list +SELECT + l_shipmode, count(distinct l_partkey) + FROM lineitem_hash + GROUP BY l_shipmode + HAVING count(distinct l_suppkey) > 1550 + ORDER BY 1, 2 DESC; + l_shipmode | count +------------+------- + AIR | 1702 + FOB | 1700 + MAIL | 1730 + RAIL | 1696 + REG AIR | 1676 + SHIP | 1684 + TRUCK | 1757 +(7 rows) + +EXPLAIN (COSTS false, VERBOSE true) +SELECT + l_shipmode, count(distinct l_partkey) + FROM lineitem_hash + GROUP BY l_shipmode + HAVING count(distinct l_suppkey) > 1550 + ORDER BY 1, 2 DESC; + QUERY PLAN +---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- + Sort + Output: remote_scan.l_shipmode, count(DISTINCT (count(DISTINCT remote_scan.count))) + Sort Key: remote_scan.l_shipmode, count(DISTINCT (count(DISTINCT remote_scan.count))) DESC + -> GroupAggregate + Output: remote_scan.l_shipmode, count(DISTINCT remote_scan.count) + Group Key: remote_scan.l_shipmode + Filter: (count(DISTINCT remote_scan.worker_column_3) > 1550) + -> Sort + Output: remote_scan.l_shipmode, remote_scan.count, remote_scan.worker_column_3 + Sort Key: remote_scan.l_shipmode + -> Custom Scan (Citus Task-Tracker) + Output: remote_scan.l_shipmode, remote_scan.count, remote_scan.worker_column_3 + Task Count: 8 + Tasks Shown: One of 8 + -> Task + Node: host=localhost port=57637 dbname=regression + -> HashAggregate + Output: l_shipmode, l_partkey, l_suppkey + Group Key: lineitem_hash.l_shipmode, lineitem_hash.l_partkey, lineitem_hash.l_suppkey + -> Seq Scan on public.lineitem_hash_240000 lineitem_hash + Output: l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment +(21 rows) + -- count distinct is supported on single table subqueries SELECT * FROM ( @@ -121,27 +443,152 @@ SELECT * 1927 | 3 (10 rows) --- count distinct with filters -SELECT - l_orderkey, count(DISTINCT l_partkey) FILTER (WHERE l_shipmode = 'AIR') - FROM lineitem_hash - GROUP BY l_orderkey +EXPLAIN (COSTS false, VERBOSE true) +SELECT * + FROM ( + SELECT + l_partkey, count(DISTINCT l_orderkey) + FROM lineitem_hash + GROUP BY l_partkey) sub ORDER BY 2 DESC, 1 DESC LIMIT 10; - l_orderkey | count -------------+------- - 12005 | 4 - 5409 | 4 - 4964 | 4 - 14848 | 3 - 14496 | 3 - 13473 | 3 - 13122 | 3 - 12929 | 3 - 12645 | 3 - 12417 | 3 + QUERY PLAN +------------------------------------------------------------------------- + Limit + Output: remote_scan.l_partkey, remote_scan.count + -> Sort + Output: remote_scan.l_partkey, remote_scan.count + Sort Key: remote_scan.count DESC, remote_scan.l_partkey DESC + -> Custom Scan (Citus Task-Tracker) + Output: remote_scan.l_partkey, remote_scan.count + Task Count: 4 + Tasks Shown: None, not supported for re-partition queries + -> MapMergeJob + Map Task Count: 8 + Merge Task Count: 4 +(12 rows) + +-- count distinct with filters +SELECT + l_orderkey, + count(DISTINCT l_suppkey) FILTER (WHERE l_shipmode = 'AIR'), + count(DISTINCT l_suppkey) + FROM lineitem_hash + GROUP BY l_orderkey + ORDER BY 2 DESC, 3 DESC, 1 + LIMIT 10; + l_orderkey | count | count +------------+-------+------- + 4964 | 4 | 7 + 12005 | 4 | 7 + 5409 | 4 | 6 + 164 | 3 | 7 + 322 | 3 | 7 + 871 | 3 | 7 + 1156 | 3 | 7 + 1574 | 3 | 7 + 2054 | 3 | 7 + 2309 | 3 | 7 (10 rows) +EXPLAIN (COSTS false, VERBOSE true) +SELECT + l_orderkey, + count(DISTINCT l_suppkey) FILTER (WHERE l_shipmode = 'AIR'), + count(DISTINCT l_suppkey) + FROM lineitem_hash + GROUP BY l_orderkey + ORDER BY 2 DESC, 3 DESC, 1 + LIMIT 10; + QUERY PLAN +---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- + Limit + Output: remote_scan.l_orderkey, COALESCE((pg_catalog.sum((COALESCE((pg_catalog.sum((COALESCE((pg_catalog.sum(remote_scan.count))::bigint, '0'::bigint))))::bigint, '0'::bigint))))::bigint, '0'::bigint), COALESCE((pg_catalog.sum((COALESCE((pg_catalog.sum((COALESCE((pg_catalog.sum(remote_scan.count_1))::bigint, '0'::bigint))))::bigint, '0'::bigint))))::bigint, '0'::bigint) + -> Sort + Output: remote_scan.l_orderkey, COALESCE((pg_catalog.sum((COALESCE((pg_catalog.sum(remote_scan.count))::bigint, '0'::bigint))))::bigint, '0'::bigint), COALESCE((pg_catalog.sum((COALESCE((pg_catalog.sum(remote_scan.count_1))::bigint, '0'::bigint))))::bigint, '0'::bigint) + Sort Key: COALESCE((pg_catalog.sum((COALESCE((pg_catalog.sum(remote_scan.count))::bigint, '0'::bigint))))::bigint, '0'::bigint) DESC, COALESCE((pg_catalog.sum((COALESCE((pg_catalog.sum(remote_scan.count_1))::bigint, '0'::bigint))))::bigint, '0'::bigint) DESC, remote_scan.l_orderkey + -> HashAggregate + Output: remote_scan.l_orderkey, COALESCE((pg_catalog.sum(remote_scan.count))::bigint, '0'::bigint), COALESCE((pg_catalog.sum(remote_scan.count_1))::bigint, '0'::bigint) + Group Key: remote_scan.l_orderkey + -> Custom Scan (Citus Task-Tracker) + Output: remote_scan.l_orderkey, remote_scan.count, remote_scan.count_1 + Task Count: 8 + Tasks Shown: One of 8 + -> Task + Node: host=localhost port=57637 dbname=regression + -> Limit + Output: l_orderkey, (count(DISTINCT l_suppkey) FILTER (WHERE (l_shipmode = 'AIR'::bpchar))), (count(DISTINCT l_suppkey)) + -> Sort + Output: l_orderkey, (count(DISTINCT l_suppkey) FILTER (WHERE (l_shipmode = 'AIR'::bpchar))), (count(DISTINCT l_suppkey)) + Sort Key: (count(DISTINCT lineitem_hash.l_suppkey) FILTER (WHERE (lineitem_hash.l_shipmode = 'AIR'::bpchar))) DESC, (count(DISTINCT lineitem_hash.l_suppkey)) DESC, lineitem_hash.l_orderkey + -> GroupAggregate + Output: l_orderkey, count(DISTINCT l_suppkey) FILTER (WHERE (l_shipmode = 'AIR'::bpchar)), count(DISTINCT l_suppkey) + Group Key: lineitem_hash.l_orderkey + -> Sort + Output: l_orderkey, l_suppkey, l_shipmode + Sort Key: lineitem_hash.l_orderkey + -> Seq Scan on public.lineitem_hash_240000 lineitem_hash + Output: l_orderkey, l_suppkey, l_shipmode +(27 rows) + +-- group by on non-partition column +SELECT + l_suppkey, count(DISTINCT l_partkey) FILTER (WHERE l_shipmode = 'AIR') + FROM lineitem_hash + GROUP BY l_suppkey + ORDER BY 2 DESC, 1 DESC + LIMIT 10; + l_suppkey | count +-----------+------- + 7680 | 4 + 7703 | 3 + 7542 | 3 + 7072 | 3 + 6335 | 3 + 5873 | 3 + 1318 | 3 + 1042 | 3 + 160 | 3 + 9872 | 2 +(10 rows) + +-- explaining the same query fails +EXPLAIN (COSTS false, VERBOSE true) +SELECT + l_suppkey, count(DISTINCT l_partkey) FILTER (WHERE l_shipmode = 'AIR') + FROM lineitem_hash + GROUP BY l_suppkey + ORDER BY 2 DESC, 1 DESC + LIMIT 10; +ERROR: bogus varattno for OUTER_VAR var: 3 +-- without group by, on partition column +SELECT + count(DISTINCT l_orderkey) FILTER (WHERE l_shipmode = 'AIR') + FROM lineitem_hash; + count +------- + 1327 +(1 row) + +-- without group by, on non-partition column +SELECT + count(DISTINCT l_partkey) FILTER (WHERE l_shipmode = 'AIR') + FROM lineitem_hash; + count +------- + 1702 +(1 row) + +SELECT + count(DISTINCT l_partkey) FILTER (WHERE l_shipmode = 'AIR'), + count(DISTINCT l_partkey), + count(DISTINCT l_shipdate) + FROM lineitem_hash; + count | count | count +-------+-------+------- + 1702 | 11661 | 2470 +(1 row) + -- filter column already exists in target list SELECT * FROM ( diff --git a/src/test/regress/sql/multi_view.sql b/src/test/regress/sql/multi_view.sql index 347c2ee37..1a976d6d2 100644 --- a/src/test/regress/sql/multi_view.sql +++ b/src/test/regress/sql/multi_view.sql @@ -45,9 +45,12 @@ SELECT count(*) FROM orders_hash_part join air_shipped_lineitems ON (o_orderkey -- join between views SELECT count(*) FROM priority_orders join air_shipped_lineitems ON (o_orderkey = l_orderkey); --- count distinct on partition column is not supported +-- count distinct on partition column is supported SELECT count(distinct o_orderkey) FROM priority_orders join air_shipped_lineitems ON (o_orderkey = l_orderkey); +-- count distinct on non-partition column is supported +SELECT count(distinct o_orderpriority) FROM priority_orders join air_shipped_lineitems ON (o_orderkey = l_orderkey); + -- count distinct on partition column is supported on router queries SELECT count(distinct o_orderkey) FROM priority_orders join air_shipped_lineitems ON (o_orderkey = l_orderkey) @@ -71,7 +74,6 @@ SELECT count(*) FROM priority_orders right join lineitem_hash_part ON (o_orderke -- but view at the outer side is. This is essentially the same as a left join with arguments reversed. SELECT count(*) FROM lineitem_hash_part right join priority_orders ON (o_orderkey = l_orderkey) WHERE l_shipmode ='AIR'; - -- left join on router query is supported SELECT o_orderkey, l_linenumber FROM priority_orders left join air_shipped_lineitems ON (o_orderkey = l_orderkey) WHERE o_orderkey = 2;