diff --git a/src/backend/distributed/planner/multi_logical_optimizer.c b/src/backend/distributed/planner/multi_logical_optimizer.c index 03bd54cbd..2b349c0cd 100644 --- a/src/backend/distributed/planner/multi_logical_optimizer.c +++ b/src/backend/distributed/planner/multi_logical_optimizer.c @@ -62,15 +62,15 @@ double CountDistinctErrorRate = 0.0; /* precision of count(distinct) approximate typedef struct MasterAggregateWalkerContext { - bool repartitionSubquery; AttrNumber columnId; + bool pullDistinctColumns; } MasterAggregateWalkerContext; typedef struct WorkerAggregateWalkerContext { - bool repartitionSubquery; List *expressionList; bool createGroupByClause; + bool pullDistinctColumns; } WorkerAggregateWalkerContext; @@ -117,7 +117,9 @@ static void ApplyExtendedOpNodes(MultiExtendedOp *originalNode, MultiExtendedOp *masterNode, MultiExtendedOp *workerNode); static void TransformSubqueryNode(MultiTable *subqueryNode, List *tableNodeList); -static MultiExtendedOp * MasterExtendedOpNode(MultiExtendedOp *originalOpNode); +static MultiExtendedOp * MasterExtendedOpNode(MultiExtendedOp *originalOpNode, + bool groupedByDisjointPartitionColumn, + List *tableNodeList); static Node * MasterAggregateMutator(Node *originalNode, MasterAggregateWalkerContext *walkerContext); static Expr * MasterAggregateExpression(Aggref *originalAggregate, @@ -126,7 +128,14 @@ static Expr * MasterAverageExpression(Oid sumAggregateType, Oid countAggregateTy AttrNumber *columnId); static Expr * AddTypeConversion(Node *originalAggregate, Node *newExpression); static MultiExtendedOp * WorkerExtendedOpNode(MultiExtendedOp *originalOpNode, - bool groupedByDisjointPartitionColumn); + bool groupedByDisjointPartitionColumn, + List *tableNodeList); +static bool HasNonPartitionColumnDistinctAgg(List *targetEntryList, Node *havingQual, + List *tableNodeList); +static bool ShouldPullDistinctColumn(bool repartitionSubquery, + bool groupedByDisjointPartitionColumn, + bool hasNonPartitionColumnDistinctAgg); +static bool PartitionColumnInTableList(Var *column, List *tableNodeList); static bool WorkerAggregateWalker(Node *node, WorkerAggregateWalkerContext *walkerContext); static List * WorkerAggregateExpressionList(Aggref *originalAggregate, @@ -135,6 +144,7 @@ static AggregateType GetAggregateType(Oid aggFunctionId); static Oid AggregateArgumentType(Aggref *aggregate); static Oid AggregateFunctionOid(const char *functionName, Oid inputType); static Oid TypeOid(Oid schemaId, const char *typeName); +static SortGroupClause * CreateSortGroupClause(Var *column); /* Local functions forward declarations for count(distinct) approximations */ static char * CountDistinctHashFunctionName(Oid argumentType); @@ -261,9 +271,12 @@ MultiLogicalPlanOptimize(MultiTreeRoot *multiLogicalPlan) groupedByDisjointPartitionColumn = GroupedByDisjointPartitionColumn(tableNodeList, extendedOpNode); - masterExtendedOpNode = MasterExtendedOpNode(extendedOpNode); + masterExtendedOpNode = MasterExtendedOpNode(extendedOpNode, + groupedByDisjointPartitionColumn, + tableNodeList); workerExtendedOpNode = WorkerExtendedOpNode(extendedOpNode, - groupedByDisjointPartitionColumn); + groupedByDisjointPartitionColumn, + tableNodeList); ApplyExtendedOpNodes(extendedOpNode, masterExtendedOpNode, workerExtendedOpNode); @@ -1168,9 +1181,12 @@ TransformSubqueryNode(MultiTable *subqueryNode, List *tableNodeList) MultiNode *collectChildNode = ChildNode((MultiUnaryNode *) collectNode); bool groupedByDisjointPartitionColumn = GroupedByDisjointPartitionColumn(tableNodeList, extendedOpNode); - MultiExtendedOp *masterExtendedOpNode = MasterExtendedOpNode(extendedOpNode); + MultiExtendedOp *masterExtendedOpNode = + MasterExtendedOpNode(extendedOpNode, groupedByDisjointPartitionColumn, + tableNodeList); MultiExtendedOp *workerExtendedOpNode = - WorkerExtendedOpNode(extendedOpNode, groupedByDisjointPartitionColumn); + WorkerExtendedOpNode(extendedOpNode, groupedByDisjointPartitionColumn, + tableNodeList); MultiPartition *partitionNode = CitusMakeNode(MultiPartition); List *groupClauseList = extendedOpNode->groupClauseList; List *targetEntryList = extendedOpNode->targetList; @@ -1232,7 +1248,9 @@ TransformSubqueryNode(MultiTable *subqueryNode, List *tableNodeList) * worker nodes' results. */ static MultiExtendedOp * -MasterExtendedOpNode(MultiExtendedOp *originalOpNode) +MasterExtendedOpNode(MultiExtendedOp *originalOpNode, + bool groupedByDisjointPartitionColumn, + List *tableNodeList) { MultiExtendedOp *masterExtendedOpNode = NULL; List *targetEntryList = originalOpNode->targetList; @@ -1244,15 +1262,23 @@ MasterExtendedOpNode(MultiExtendedOp *originalOpNode) MultiNode *childNode = ChildNode((MultiUnaryNode *) originalOpNode); MasterAggregateWalkerContext *walkerContext = palloc0( sizeof(MasterAggregateWalkerContext)); + bool hasNonPartitionColumnDistinctAgg = false; + bool repartitionSubquery = false; walkerContext->columnId = 1; - walkerContext->repartitionSubquery = false; if (CitusIsA(parentNode, MultiTable) && CitusIsA(childNode, MultiCollect)) { - walkerContext->repartitionSubquery = true; + repartitionSubquery = true; } + hasNonPartitionColumnDistinctAgg = HasNonPartitionColumnDistinctAgg(targetEntryList, + originalHavingQual, + tableNodeList); + walkerContext->pullDistinctColumns = + ShouldPullDistinctColumn(repartitionSubquery, groupedByDisjointPartitionColumn, + hasNonPartitionColumnDistinctAgg); + /* iterate over original target entries */ foreach(targetEntryCell, targetEntryList) { @@ -1266,7 +1292,6 @@ MasterExtendedOpNode(MultiExtendedOp *originalOpNode) { Node *newNode = MasterAggregateMutator((Node *) originalExpression, walkerContext); - newExpression = (Expr *) newNode; } else @@ -1379,7 +1404,7 @@ MasterAggregateExpression(Aggref *originalAggregate, if (aggregateType == AGGREGATE_COUNT && originalAggregate->aggdistinct && CountDistinctErrorRate == DISABLE_DISTINCT_APPROXIMATION && - walkerContext->repartitionSubquery) + walkerContext->pullDistinctColumns) { Aggref *aggregate = (Aggref *) copyObject(originalAggregate); List *varList = pull_var_clause_default((Node *) aggregate); @@ -1769,13 +1794,14 @@ AddTypeConversion(Node *originalAggregate, Node *newExpression) * function to create aggregates for the worker nodes. Also, the function checks * if we can push down the limit to worker nodes; and if we can, sets the limit * count and sort clause list fields in the new operator node. It provides special - * treatment for count distinct operator if it is used in repartition subqueries. - * Each column in count distinct aggregate is added to target list, and group by - * list of worker extended operator. + * treatment for count distinct operator if it is used in repartition subqueries + * or on non-partition columns. Each column in count distinct aggregate is added + * to target list, and group by list of worker extended operator. */ static MultiExtendedOp * WorkerExtendedOpNode(MultiExtendedOp *originalOpNode, - bool groupedByDisjointPartitionColumn) + bool groupedByDisjointPartitionColumn, + List *tableNodeList) { MultiExtendedOp *workerExtendedOpNode = NULL; MultiNode *parentNode = ParentNode((MultiNode *) originalOpNode); @@ -1790,28 +1816,37 @@ WorkerExtendedOpNode(MultiExtendedOp *originalOpNode, palloc0(sizeof(WorkerAggregateWalkerContext)); Index nextSortGroupRefIndex = 0; bool queryHasAggregates = false; + bool enableLimitPushdown = true; + bool hasNonPartitionColumnDistinctAgg = false; + bool repartitionSubquery = false; - walkerContext->repartitionSubquery = false; walkerContext->expressionList = NIL; + /* find max of sort group ref index */ + foreach(targetEntryCell, targetEntryList) + { + TargetEntry *targetEntry = (TargetEntry *) lfirst(targetEntryCell); + if (targetEntry->ressortgroupref > nextSortGroupRefIndex) + { + nextSortGroupRefIndex = targetEntry->ressortgroupref; + } + } + + /* next group ref index starts from max group ref index + 1 */ + nextSortGroupRefIndex++; + if (CitusIsA(parentNode, MultiTable) && CitusIsA(childNode, MultiCollect)) { - walkerContext->repartitionSubquery = true; - - /* find max of sort group ref index */ - foreach(targetEntryCell, targetEntryList) - { - TargetEntry *targetEntry = (TargetEntry *) lfirst(targetEntryCell); - if (targetEntry->ressortgroupref > nextSortGroupRefIndex) - { - nextSortGroupRefIndex = targetEntry->ressortgroupref; - } - } - - /* next group ref index starts from max group ref index + 1 */ - nextSortGroupRefIndex++; + repartitionSubquery = true; } + hasNonPartitionColumnDistinctAgg = HasNonPartitionColumnDistinctAgg(targetEntryList, + havingQual, + tableNodeList); + walkerContext->pullDistinctColumns = + ShouldPullDistinctColumn(repartitionSubquery, groupedByDisjointPartitionColumn, + hasNonPartitionColumnDistinctAgg); + /* iterate over original target entries */ foreach(targetEntryCell, targetEntryList) { @@ -1821,6 +1856,7 @@ WorkerExtendedOpNode(MultiExtendedOp *originalOpNode, ListCell *newExpressionCell = NULL; bool hasAggregates = contain_agg_clause((Node *) originalExpression); + /* reset walker context */ walkerContext->expressionList = NIL; walkerContext->createGroupByClause = false; @@ -1852,26 +1888,19 @@ WorkerExtendedOpNode(MultiExtendedOp *originalOpNode, */ if (IsA(newExpression, Var) && walkerContext->createGroupByClause) { - Var *column = (Var *) newExpression; - Oid lessThanOperator = InvalidOid; - Oid equalsOperator = InvalidOid; - bool hashable = false; - SortGroupClause *groupByClause = makeNode(SortGroupClause); - - get_sort_group_operators(column->vartype, true, true, true, - &lessThanOperator, &equalsOperator, NULL, - &hashable); - groupByClause->eqop = equalsOperator; - groupByClause->hashable = hashable; - groupByClause->nulls_first = false; - groupByClause->sortop = lessThanOperator; + Var *column = (Var *) newTargetEntry->expr; + SortGroupClause *groupByClause = CreateSortGroupClause(column); + newTargetEntry->ressortgroupref = nextSortGroupRefIndex; groupByClause->tleSortGroupRef = nextSortGroupRefIndex; groupClauseList = lappend(groupClauseList, groupByClause); - - newTargetEntry->ressortgroupref = nextSortGroupRefIndex; - nextSortGroupRefIndex++; + + /* + * If we introduce new columns accompanied by a new group by clause, + * than pushing down limits will cause incorrect results. + */ + enableLimitPushdown = false; } if (newTargetEntry->resname == NULL) @@ -1921,6 +1950,23 @@ WorkerExtendedOpNode(MultiExtendedOp *originalOpNode, newTargetEntry->resjunk = false; newTargetEntry->resno = targetProjectionNumber; + if (IsA(newExpression, Var) && walkerContext->createGroupByClause) + { + Var *column = (Var *) newTargetEntry->expr; + SortGroupClause *groupByClause = CreateSortGroupClause(column); + newTargetEntry->ressortgroupref = nextSortGroupRefIndex; + groupByClause->tleSortGroupRef = nextSortGroupRefIndex; + + groupClauseList = lappend(groupClauseList, groupByClause); + nextSortGroupRefIndex++; + + /* + * If we introduce new columns accompanied by a new group by clause, + * than pushing down limits will cause incorrect results. + */ + enableLimitPushdown = false; + } + newTargetEntryList = lappend(newTargetEntryList, newTargetEntry); targetProjectionNumber++; } @@ -1939,11 +1985,14 @@ WorkerExtendedOpNode(MultiExtendedOp *originalOpNode, workerExtendedOpNode->groupClauseList = groupClauseList; - /* if we can push down the limit, also set related fields */ - workerExtendedOpNode->limitCount = WorkerLimitCount(originalOpNode, - groupedByDisjointPartitionColumn); - workerExtendedOpNode->sortClauseList = - WorkerSortClauseList(originalOpNode, groupedByDisjointPartitionColumn); + if (enableLimitPushdown) + { + /* if we can push down the limit, also set related fields */ + workerExtendedOpNode->limitCount = WorkerLimitCount(originalOpNode, + groupedByDisjointPartitionColumn); + workerExtendedOpNode->sortClauseList = + WorkerSortClauseList(originalOpNode, groupedByDisjointPartitionColumn); + } /* * If grouped by a partition column whose values are shards have disjoint sets @@ -1958,6 +2007,127 @@ WorkerExtendedOpNode(MultiExtendedOp *originalOpNode, } +/* + * HasNonPartitionColumnDistinctAgg returns true if target entry or having qualifier + * has non-partition column reference in aggregate (distinct) definition. Note that, + * it only checks aggs subfield of Aggref, it does not check FILTER or SORT clauses. + */ +static bool +HasNonPartitionColumnDistinctAgg(List *targetEntryList, Node *havingQual, + List *tableNodeList) +{ + List *targetVarList = pull_var_clause((Node *) targetEntryList, + PVC_INCLUDE_AGGREGATES); + List *havingVarList = pull_var_clause((Node *) havingQual, PVC_INCLUDE_AGGREGATES); + List *aggregateCheckList = list_concat(targetVarList, havingVarList); + + ListCell *aggregateCheckCell = NULL; + foreach(aggregateCheckCell, aggregateCheckList) + { + Node *targetNode = lfirst(aggregateCheckCell); + Aggref *targetAgg = NULL; + List *varList = NIL; + ListCell *varCell = NULL; + bool isPartitionColumn = false; + + if (IsA(targetNode, Var)) + { + continue; + } + + Assert(IsA(targetNode, Aggref)); + targetAgg = (Aggref *) targetNode; + if (targetAgg->aggdistinct == NIL) + { + continue; + } + + varList = pull_var_clause_default((Node *) targetAgg->args); + foreach(varCell, varList) + { + Node *targetVar = (Node *) lfirst(varCell); + + Assert(IsA(targetVar, Var)); + + isPartitionColumn = + PartitionColumnInTableList((Var *) targetVar, tableNodeList); + + if (!isPartitionColumn) + { + return true; + } + } + } + + return false; +} + + +/* + * ShouldPullDistinctColumn returns true if distinct aggregate should pull + * individual columns from worker to master and evaluate aggregate operation + * at master. + * + * Pull cases are: + * - repartition subqueries + * - query has count distinct on a non-partition column on at least one target + * - count distinct is on a non-partition column and query is not + * grouped on partition column + */ +static bool +ShouldPullDistinctColumn(bool repartitionSubquery, + bool groupedByDisjointPartitionColumn, + bool hasNonPartitionColumnDistinctAgg) +{ + if (repartitionSubquery) + { + return true; + } + + if (groupedByDisjointPartitionColumn) + { + return false; + } + else if (!groupedByDisjointPartitionColumn && hasNonPartitionColumnDistinctAgg) + { + return true; + } + + return false; +} + + +/* + * PartitionColumnInTableList returns true if provided column is a partition + * column from provided table node list. It also returns false if a column is + * partition column of an append distributed table. + */ +static bool +PartitionColumnInTableList(Var *column, List *tableNodeList) +{ + ListCell *tableNodeCell = NULL; + foreach(tableNodeCell, tableNodeList) + { + MultiTable *tableNode = lfirst(tableNodeCell); + Var *partitionColumn = tableNode->partitionColumn; + + if (partitionColumn != NULL && + partitionColumn->varno == column->varno && + partitionColumn->varattno == column->varattno) + { + Assert(partitionColumn->varno == tableNode->rangeTableId); + + if (PartitionMethod(tableNode->relationId) != DISTRIBUTE_BY_APPEND) + { + return true; + } + } + } + + return false; +} + + /* * WorkerAggregateWalker walks over the original target entry expression, and * creates the list of expression trees (potentially more than one) to execute @@ -2016,7 +2186,7 @@ WorkerAggregateExpressionList(Aggref *originalAggregate, if (aggregateType == AGGREGATE_COUNT && originalAggregate->aggdistinct && CountDistinctErrorRate == DISABLE_DISTINCT_APPROXIMATION && - walkerContext->repartitionSubquery) + walkerContext->pullDistinctColumns) { Aggref *aggregate = (Aggref *) copyObject(originalAggregate); List *columnList = pull_var_clause_default((Node *) aggregate); @@ -2355,6 +2525,31 @@ GroupedByDisjointPartitionColumn(List *tableNodeList, MultiExtendedOp *opNode) } +/* + * CreateSortGroupClause creates SortGroupClause for a given column Var. + * The caller should set tleSortGroupRef field and respective + * TargetEntry->ressortgroupref fields to appropriate SortGroupRefIndex. + */ +static SortGroupClause * +CreateSortGroupClause(Var *column) +{ + Oid lessThanOperator = InvalidOid; + Oid equalsOperator = InvalidOid; + bool hashable = false; + SortGroupClause *groupByClause = makeNode(SortGroupClause); + + get_sort_group_operators(column->vartype, true, true, true, + &lessThanOperator, &equalsOperator, NULL, + &hashable); + groupByClause->eqop = equalsOperator; + groupByClause->hashable = hashable; + groupByClause->nulls_first = false; + groupByClause->sortop = lessThanOperator; + + return groupByClause; +} + + /* * CountDistinctHashFunctionName resolves the hll_hash function name to use for * the given input type, and returns this function name. @@ -2645,28 +2840,34 @@ ErrorIfUnsupportedAggregateDistinct(Aggref *aggregateExpression, extendedOpNode = (MultiExtendedOp *) linitial(extendedOpNodeList); distinctColumn = AggregateDistinctColumn(aggregateExpression); - if (distinctSupported && distinctColumn == NULL) + if (distinctSupported) { - /* - * If the query has a single table, and table is grouped by partition column, - * then we support count distincts even distinct column can not be identified. - */ - distinctSupported = TablePartitioningSupportsDistinct(tableNodeList, + if (distinctColumn == NULL) + { + /* + * If the query has a single table, and table is grouped by partition + * column, then we support count distincts even distinct column can + * not be identified. + */ + distinctSupported = TablePartitioningSupportsDistinct(tableNodeList, + extendedOpNode, + distinctColumn); + if (!distinctSupported) + { + errorDetail = "aggregate (distinct) on complex expressions is" + " unsupported"; + } + } + else if (aggregateType != AGGREGATE_COUNT) + { + bool supports = TablePartitioningSupportsDistinct(tableNodeList, extendedOpNode, distinctColumn); - if (!distinctSupported) - { - errorDetail = "aggregate (distinct) on complex expressions is unsupported"; - } - } - else if (distinctSupported) - { - bool supports = TablePartitioningSupportsDistinct(tableNodeList, extendedOpNode, - distinctColumn); - if (!supports) - { - distinctSupported = false; - errorDetail = "table partitioning is unsuitable for aggregate (distinct)"; + if (!supports) + { + distinctSupported = false; + errorDetail = "table partitioning is unsuitable for aggregate (distinct)"; + } } } diff --git a/src/backend/distributed/planner/multi_master_planner.c b/src/backend/distributed/planner/multi_master_planner.c index 29d321955..2d7cb672c 100644 --- a/src/backend/distributed/planner/multi_master_planner.c +++ b/src/backend/distributed/planner/multi_master_planner.c @@ -34,6 +34,7 @@ static List * MasterTargetList(List *workerTargetList); static PlannedStmt * BuildSelectStatement(Query *masterQuery, List *masterTargetList, CustomScan *remoteScan); static Agg * BuildAggregatePlan(Query *masterQuery, Plan *subPlan); +static bool HasDistinctAggregate(Query *masterQuery); static Plan * BuildDistinctPlan(Query *masterQuery, Plan *subPlan); @@ -287,13 +288,37 @@ BuildAggregatePlan(Query *masterQuery, Plan *subPlan) /* if we have grouping, then initialize appropriate information */ if (groupColumnCount > 0) { - if (!grouping_is_hashable(groupColumnList)) + bool groupingIsHashable = grouping_is_hashable(groupColumnList); + bool groupingIsSortable = grouping_is_hashable(groupColumnList); + bool hasDistinctAggregate = HasDistinctAggregate(masterQuery); + + if (!groupingIsHashable && !groupingIsSortable) { - ereport(ERROR, (errmsg("grouped column list cannot be hashed"))); + ereport(ERROR, (errmsg("grouped column list cannot be hashed or sorted"))); } - /* switch to hashed aggregate strategy to allow grouping */ - aggregateStrategy = AGG_HASHED; + /* + * Postgres hash aggregate strategy does not support distinct aggregates + * in group and order by with aggregate operations. + * see nodeAgg.c:build_pertrans_for_aggref(). In that case we use + * sorted agg strategy, otherwise we use hash strategy. + */ + if (!groupingIsHashable || hasDistinctAggregate) + { + if (!groupingIsSortable) + { + ereport(ERROR, (errmsg("grouped column list must cannot be sorted"), + errdetail("Having a distinct aggregate requires " + "grouped column list to be sortable."))); + } + + aggregateStrategy = AGG_SORTED; + subPlan = (Plan *) make_sort_from_sortclauses(groupColumnList, subPlan); + } + else + { + aggregateStrategy = AGG_HASHED; + } /* get column indexes that are being grouped */ groupColumnIdArray = extract_grouping_cols(groupColumnList, subPlan->targetlist); @@ -315,6 +340,40 @@ BuildAggregatePlan(Query *masterQuery, Plan *subPlan) } +/* + * HasDistinctAggregate returns true if the query has a distinct + * aggregate in its target list or in having clause. + */ +static bool +HasDistinctAggregate(Query *masterQuery) +{ + List *targetVarList = NIL; + List *havingVarList = NIL; + List *allColumnList = NIL; + ListCell *allColumnCell = NULL; + + targetVarList = pull_var_clause((Node *) masterQuery->targetList, + PVC_INCLUDE_AGGREGATES); + havingVarList = pull_var_clause(masterQuery->havingQual, PVC_INCLUDE_AGGREGATES); + + allColumnList = list_concat(targetVarList, havingVarList); + foreach(allColumnCell, allColumnList) + { + Node *columnNode = lfirst(allColumnCell); + if (IsA(columnNode, Aggref)) + { + Aggref *aggref = (Aggref *) columnNode; + if (aggref->aggdistinct != NIL) + { + return true; + } + } + } + + return false; +} + + /* * BuildDistinctPlan creates an returns a plan for distinct. Depending on * availability of hash function it chooses HashAgg over Sort/Unique @@ -332,6 +391,7 @@ BuildDistinctPlan(Query *masterQuery, Plan *subPlan) List *targetList = copyObject(masterQuery->targetList); List *columnList = pull_var_clause_default((Node *) targetList); ListCell *columnCell = NULL; + bool hasDistinctAggregate = false; if (IsA(subPlan, Agg)) { @@ -353,10 +413,12 @@ BuildDistinctPlan(Query *masterQuery, Plan *subPlan) /* * Create group by plan with HashAggregate if all distinct - * members are hashable, Otherwise create sort+unique plan. + * members are hashable, and not containing distinct aggregate. + * Otherwise create sort+unique plan. */ distinctClausesHashable = grouping_is_hashable(distinctClauseList); - if (distinctClausesHashable) + hasDistinctAggregate = HasDistinctAggregate(masterQuery); + if (distinctClausesHashable && !hasDistinctAggregate) { const long rowEstimate = 10; /* using the same value as BuildAggregatePlan() */ AttrNumber *distinctColumnIdArray = extract_grouping_cols(distinctClauseList, diff --git a/src/test/regress/expected/multi_agg_approximate_distinct.out b/src/test/regress/expected/multi_agg_approximate_distinct.out index 5c426802e..d448060ff 100644 --- a/src/test/regress/expected/multi_agg_approximate_distinct.out +++ b/src/test/regress/expected/multi_agg_approximate_distinct.out @@ -16,9 +16,11 @@ WHERE name = 'hll' \c - - - :master_port -- Try to execute count(distinct) when approximate distincts aren't enabled SELECT count(distinct l_orderkey) FROM lineitem; -ERROR: cannot compute aggregate (distinct) -DETAIL: table partitioning is unsuitable for aggregate (distinct) -HINT: You can load the hll extension from contrib packages and enable distinct approximations. + count +------- + 2985 +(1 row) + -- Check approximate count(distinct) at different precisions / error rates SET citus.count_distinct_error_rate = 0.1; SELECT count(distinct l_orderkey) FROM lineitem; @@ -200,6 +202,8 @@ SELECT -- Check that we can revert config and disable count(distinct) approximations SET citus.count_distinct_error_rate = 0.0; SELECT count(distinct l_orderkey) FROM lineitem; -ERROR: cannot compute aggregate (distinct) -DETAIL: table partitioning is unsuitable for aggregate (distinct) -HINT: You can load the hll extension from contrib packages and enable distinct approximations. + count +------- + 2985 +(1 row) + diff --git a/src/test/regress/expected/multi_agg_approximate_distinct_0.out b/src/test/regress/expected/multi_agg_approximate_distinct_0.out index da6ba329e..6879f5a99 100644 --- a/src/test/regress/expected/multi_agg_approximate_distinct_0.out +++ b/src/test/regress/expected/multi_agg_approximate_distinct_0.out @@ -31,9 +31,11 @@ WHERE name = 'hll' \c - - - :master_port -- Try to execute count(distinct) when approximate distincts aren't enabled SELECT count(distinct l_orderkey) FROM lineitem; -ERROR: cannot compute aggregate (distinct) -DETAIL: table partitioning is unsuitable for aggregate (distinct) -HINT: You can load the hll extension from contrib packages and enable distinct approximations. + count +------- + 2985 +(1 row) + -- Check approximate count(distinct) at different precisions / error rates SET citus.count_distinct_error_rate = 0.1; SELECT count(distinct l_orderkey) FROM lineitem; @@ -147,6 +149,8 @@ HINT: You need to have the hll extension loaded. -- Check that we can revert config and disable count(distinct) approximations SET citus.count_distinct_error_rate = 0.0; SELECT count(distinct l_orderkey) FROM lineitem; -ERROR: cannot compute aggregate (distinct) -DETAIL: table partitioning is unsuitable for aggregate (distinct) -HINT: You can load the hll extension from contrib packages and enable distinct approximations. + count +------- + 2985 +(1 row) + diff --git a/src/test/regress/expected/multi_view.out b/src/test/regress/expected/multi_view.out index 2438fabf7..50c0a5039 100644 --- a/src/test/regress/expected/multi_view.out +++ b/src/test/regress/expected/multi_view.out @@ -107,11 +107,20 @@ SELECT count(*) FROM priority_orders join air_shipped_lineitems ON (o_orderkey = 700 (1 row) --- count distinct on partition column is not supported +-- count distinct on partition column is supported SELECT count(distinct o_orderkey) FROM priority_orders join air_shipped_lineitems ON (o_orderkey = l_orderkey); -ERROR: cannot compute aggregate (distinct) -DETAIL: table partitioning is unsuitable for aggregate (distinct) -HINT: You can load the hll extension from contrib packages and enable distinct approximations. + count +------- + 551 +(1 row) + +-- count distinct on non-partition column is supported +SELECT count(distinct o_orderpriority) FROM priority_orders join air_shipped_lineitems ON (o_orderkey = l_orderkey); + count +------- + 2 +(1 row) + -- count distinct on partition column is supported on router queries SELECT count(distinct o_orderkey) FROM priority_orders join air_shipped_lineitems ON (o_orderkey = l_orderkey) diff --git a/src/test/regress/input/multi_agg_distinct.source b/src/test/regress/input/multi_agg_distinct.source index 0c36c0284..764f5cc2e 100644 --- a/src/test/regress/input/multi_agg_distinct.source +++ b/src/test/regress/input/multi_agg_distinct.source @@ -56,22 +56,20 @@ SELECT p_partkey, count(distinct l_orderkey) FROM lineitem_range, part RESET citus.large_table_shard_count; --- Check that we don't support count(distinct) on non-partition column, and --- complex expressions. - +-- Check that we support count(distinct) on non-partition column. SELECT count(distinct l_partkey) FROM lineitem_range; + +-- Check that we don't support complex expressions. SELECT count(distinct (l_orderkey + 1)) FROM lineitem_range; -- Now test append partitioned tables. First run count(distinct) on a single -- sharded table. - SELECT count(distinct p_mfgr) FROM part; SELECT p_mfgr, count(distinct p_partkey) FROM part GROUP BY p_mfgr ORDER BY p_mfgr; --- We don't support count(distinct) queries if table is append partitioned and --- has multiple shards - -SELECT count(distinct o_orderkey) FROM orders; +-- We support count(distinct) queries on append partitioned tables +-- both on partition column, and non-partition column. +SELECT count(distinct o_orderkey), count(distinct o_custkey) FROM orders; -- Hash partitioned tables: @@ -99,13 +97,13 @@ SELECT master_create_worker_shards('lineitem_hash', 4, 1); \copy lineitem_hash FROM '@abs_srcdir@/data/lineitem.2.data' with delimiter '|' -- aggregate(distinct) on partition column is allowed - SELECT count(distinct l_orderkey) FROM lineitem_hash; SELECT avg(distinct l_orderkey) FROM lineitem_hash; --- count(distinct) on non-partition column or expression is not allowed - +-- count(distinct) on non-partition column is allowed SELECT count(distinct l_partkey) FROM lineitem_hash; + +-- count(distinct) on column expression is not allowed SELECT count(distinct (l_orderkey + 1)) FROM lineitem_hash; -- agg(distinct) is allowed if we group by partition column @@ -115,4 +113,16 @@ SELECT l_orderkey, count(distinct l_partkey) INTO range_results FROM lineitem_ra -- they should return the same results SELECT * FROM hash_results h, range_results r WHERE h.l_orderkey = r.l_orderkey AND h.count != r.count; +-- count(distinct) is allowed if we group by non-partition column +SELECT l_partkey, count(distinct l_orderkey) INTO hash_results_np FROM lineitem_hash GROUP BY l_partkey; +SELECT l_partkey, count(distinct l_orderkey) INTO range_results_np FROM lineitem_range GROUP BY l_partkey; + +-- they should return the same results +SELECT * FROM hash_results_np h, range_results_np r WHERE h.l_partkey = r.l_partkey AND h.count != r.count; + +-- other agg(distinct) are not allowed on non-partition columns even they are grouped +-- on non-partition columns +SELECT SUM(distinct l_partkey) FROM lineitem_hash; +SELECT l_shipmode, sum(distinct l_partkey) FROM lineitem_hash GROUP BY l_shipmode; + DROP TABLE lineitem_hash; diff --git a/src/test/regress/input/multi_complex_count_distinct.source b/src/test/regress/input/multi_complex_count_distinct.source index 1ca2cc617..4fdd47611 100644 --- a/src/test/regress/input/multi_complex_count_distinct.source +++ b/src/test/regress/input/multi_complex_count_distinct.source @@ -35,6 +35,13 @@ SET citus.task_executor_type to "task-tracker"; -- count(distinct) is supported on top level query if there -- is a grouping on the partition key +SELECT + l_orderkey, count(DISTINCT l_partkey) + FROM lineitem_hash + GROUP BY l_orderkey + ORDER BY 2 DESC, 1 DESC + LIMIT 10; +EXPLAIN (COSTS false, VERBOSE true) SELECT l_orderkey, count(DISTINCT l_partkey) FROM lineitem_hash @@ -42,7 +49,14 @@ SELECT ORDER BY 2 DESC, 1 DESC LIMIT 10; --- it is not supported if there is no grouping or grouping is on non-partition field +-- it is also supported if there is no grouping or grouping is on non-partition field +SELECT + count(DISTINCT l_partkey) + FROM lineitem_hash + ORDER BY 1 DESC + LIMIT 10; + +EXPLAIN (COSTS false, VERBOSE true) SELECT count(DISTINCT l_partkey) FROM lineitem_hash @@ -56,6 +70,96 @@ SELECT ORDER BY 2 DESC, 1 DESC LIMIT 10; +EXPLAIN (COSTS false, VERBOSE true) +SELECT + l_shipmode, count(DISTINCT l_partkey) + FROM lineitem_hash + GROUP BY l_shipmode + ORDER BY 2 DESC, 1 DESC + LIMIT 10; + +-- mixed mode count distinct, grouped by partition column +SELECT + l_orderkey, count(distinct l_partkey), count(distinct l_shipmode) + FROM lineitem_hash + GROUP BY l_orderkey + ORDER BY 3 DESC, 2 DESC, 1 + LIMIT 10; + +EXPLAIN (COSTS false, VERBOSE true) +SELECT + l_orderkey, count(distinct l_partkey), count(distinct l_shipmode) + FROM lineitem_hash + GROUP BY l_orderkey + ORDER BY 3 DESC, 2 DESC, 1 + LIMIT 10; + +-- partition/non-partition column count distinct no grouping +SELECT + count(distinct l_orderkey), count(distinct l_partkey), count(distinct l_shipmode) + FROM lineitem_hash; + +EXPLAIN (COSTS false, VERBOSE true) +SELECT + count(distinct l_orderkey), count(distinct l_partkey), count(distinct l_shipmode) + FROM lineitem_hash; + +-- distinct/non-distinct on partition and non-partition columns +SELECT + count(distinct l_orderkey), count(l_orderkey), + count(distinct l_partkey), count(l_partkey), + count(distinct l_shipmode), count(l_shipmode) + FROM lineitem_hash; + +-- mixed mode count distinct, grouped by non-partition column +SELECT + l_shipmode, count(distinct l_partkey), count(distinct l_orderkey) + FROM lineitem_hash + GROUP BY l_shipmode + ORDER BY 1, 2 DESC, 3 DESC; + +-- mixed mode count distinct, grouped by non-partition column +-- having on partition column +SELECT + l_shipmode, count(distinct l_partkey), count(distinct l_orderkey) + FROM lineitem_hash + GROUP BY l_shipmode + HAVING count(distinct l_orderkey) > 1300 + ORDER BY 1, 2 DESC; + +-- same but having clause is not on target list +SELECT + l_shipmode, count(distinct l_partkey) + FROM lineitem_hash + GROUP BY l_shipmode + HAVING count(distinct l_orderkey) > 1300 + ORDER BY 1, 2 DESC; + +-- mixed mode count distinct, grouped by non-partition column +-- having on non-partition column +SELECT + l_shipmode, count(distinct l_partkey), count(distinct l_suppkey) + FROM lineitem_hash + GROUP BY l_shipmode + HAVING count(distinct l_suppkey) > 1550 + ORDER BY 1, 2 DESC; + +-- same but having clause is not on target list +SELECT + l_shipmode, count(distinct l_partkey) + FROM lineitem_hash + GROUP BY l_shipmode + HAVING count(distinct l_suppkey) > 1550 + ORDER BY 1, 2 DESC; + +EXPLAIN (COSTS false, VERBOSE true) +SELECT + l_shipmode, count(distinct l_partkey) + FROM lineitem_hash + GROUP BY l_shipmode + HAVING count(distinct l_suppkey) > 1550 + ORDER BY 1, 2 DESC; + -- count distinct is supported on single table subqueries SELECT * FROM ( @@ -75,14 +179,68 @@ SELECT * ORDER BY 2 DESC, 1 DESC LIMIT 10; --- count distinct with filters -SELECT - l_orderkey, count(DISTINCT l_partkey) FILTER (WHERE l_shipmode = 'AIR') - FROM lineitem_hash - GROUP BY l_orderkey +EXPLAIN (COSTS false, VERBOSE true) +SELECT * + FROM ( + SELECT + l_partkey, count(DISTINCT l_orderkey) + FROM lineitem_hash + GROUP BY l_partkey) sub ORDER BY 2 DESC, 1 DESC LIMIT 10; +-- count distinct with filters +SELECT + l_orderkey, + count(DISTINCT l_suppkey) FILTER (WHERE l_shipmode = 'AIR'), + count(DISTINCT l_suppkey) + FROM lineitem_hash + GROUP BY l_orderkey + ORDER BY 2 DESC, 3 DESC, 1 + LIMIT 10; + +EXPLAIN (COSTS false, VERBOSE true) +SELECT + l_orderkey, + count(DISTINCT l_suppkey) FILTER (WHERE l_shipmode = 'AIR'), + count(DISTINCT l_suppkey) + FROM lineitem_hash + GROUP BY l_orderkey + ORDER BY 2 DESC, 3 DESC, 1 + LIMIT 10; + +-- group by on non-partition column +SELECT + l_suppkey, count(DISTINCT l_partkey) FILTER (WHERE l_shipmode = 'AIR') + FROM lineitem_hash + GROUP BY l_suppkey + ORDER BY 2 DESC, 1 DESC + LIMIT 10; +-- explaining the same query fails +EXPLAIN (COSTS false, VERBOSE true) +SELECT + l_suppkey, count(DISTINCT l_partkey) FILTER (WHERE l_shipmode = 'AIR') + FROM lineitem_hash + GROUP BY l_suppkey + ORDER BY 2 DESC, 1 DESC + LIMIT 10; + +-- without group by, on partition column +SELECT + count(DISTINCT l_orderkey) FILTER (WHERE l_shipmode = 'AIR') + FROM lineitem_hash; + +-- without group by, on non-partition column +SELECT + count(DISTINCT l_partkey) FILTER (WHERE l_shipmode = 'AIR') + FROM lineitem_hash; + +SELECT + count(DISTINCT l_partkey) FILTER (WHERE l_shipmode = 'AIR'), + count(DISTINCT l_partkey), + count(DISTINCT l_shipdate) + FROM lineitem_hash; + -- filter column already exists in target list SELECT * FROM ( diff --git a/src/test/regress/output/multi_agg_distinct.source b/src/test/regress/output/multi_agg_distinct.source index a62e4bd56..e73501541 100644 --- a/src/test/regress/output/multi_agg_distinct.source +++ b/src/test/regress/output/multi_agg_distinct.source @@ -73,12 +73,14 @@ SELECT p_partkey, count(distinct l_orderkey) FROM lineitem_range, part (10 rows) RESET citus.large_table_shard_count; --- Check that we don't support count(distinct) on non-partition column, and --- complex expressions. +-- Check that we support count(distinct) on non-partition column. SELECT count(distinct l_partkey) FROM lineitem_range; -ERROR: cannot compute aggregate (distinct) -DETAIL: table partitioning is unsuitable for aggregate (distinct) -HINT: You can load the hll extension from contrib packages and enable distinct approximations. + count +------- + 11661 +(1 row) + +-- Check that we don't support complex expressions. SELECT count(distinct (l_orderkey + 1)) FROM lineitem_range; ERROR: cannot compute aggregate (distinct) DETAIL: aggregate (distinct) on complex expressions is unsupported @@ -101,12 +103,14 @@ SELECT p_mfgr, count(distinct p_partkey) FROM part GROUP BY p_mfgr ORDER BY p_mf Manufacturer#5 | 185 (5 rows) --- We don't support count(distinct) queries if table is append partitioned and --- has multiple shards -SELECT count(distinct o_orderkey) FROM orders; -ERROR: cannot compute aggregate (distinct) -DETAIL: table partitioning is unsuitable for aggregate (distinct) -HINT: You can load the hll extension from contrib packages and enable distinct approximations. +-- We support count(distinct) queries on append partitioned tables +-- both on partition column, and non-partition column. +SELECT count(distinct o_orderkey), count(distinct o_custkey) FROM orders; + count | count +-------+------- + 2984 | 923 +(1 row) + -- Hash partitioned tables: CREATE TABLE lineitem_hash ( l_orderkey bigint not null, @@ -152,11 +156,14 @@ SELECT avg(distinct l_orderkey) FROM lineitem_hash; 7463.9474036850921273 (1 row) --- count(distinct) on non-partition column or expression is not allowed +-- count(distinct) on non-partition column is allowed SELECT count(distinct l_partkey) FROM lineitem_hash; -ERROR: cannot compute aggregate (distinct) -DETAIL: table partitioning is unsuitable for aggregate (distinct) -HINT: You can load the hll extension from contrib packages and enable distinct approximations. + count +------- + 11661 +(1 row) + +-- count(distinct) on column expression is not allowed SELECT count(distinct (l_orderkey + 1)) FROM lineitem_hash; ERROR: cannot compute aggregate (distinct) DETAIL: aggregate (distinct) on complex expressions is unsupported @@ -170,4 +177,21 @@ SELECT * FROM hash_results h, range_results r WHERE h.l_orderkey = r.l_orderkey ------------+-------+------------+------- (0 rows) +-- count(distinct) is allowed if we group by non-partition column +SELECT l_partkey, count(distinct l_orderkey) INTO hash_results_np FROM lineitem_hash GROUP BY l_partkey; +SELECT l_partkey, count(distinct l_orderkey) INTO range_results_np FROM lineitem_range GROUP BY l_partkey; +-- they should return the same results +SELECT * FROM hash_results_np h, range_results_np r WHERE h.l_partkey = r.l_partkey AND h.count != r.count; + l_partkey | count | l_partkey | count +-----------+-------+-----------+------- +(0 rows) + +-- other agg(distinct) are not allowed on non-partition columns even they are grouped +-- on non-partition columns +SELECT SUM(distinct l_partkey) FROM lineitem_hash; +ERROR: cannot compute aggregate (distinct) +DETAIL: table partitioning is unsuitable for aggregate (distinct) +SELECT l_shipmode, sum(distinct l_partkey) FROM lineitem_hash GROUP BY l_shipmode; +ERROR: cannot compute aggregate (distinct) +DETAIL: table partitioning is unsuitable for aggregate (distinct) DROP TABLE lineitem_hash; diff --git a/src/test/regress/output/multi_complex_count_distinct.source b/src/test/regress/output/multi_complex_count_distinct.source index 59b4045ad..c102b1643 100644 --- a/src/test/regress/output/multi_complex_count_distinct.source +++ b/src/test/regress/output/multi_complex_count_distinct.source @@ -58,24 +58,346 @@ SELECT 14624 | 7 (10 rows) --- it is not supported if there is no grouping or grouping is on non-partition field +EXPLAIN (COSTS false, VERBOSE true) +SELECT + l_orderkey, count(DISTINCT l_partkey) + FROM lineitem_hash + GROUP BY l_orderkey + ORDER BY 2 DESC, 1 DESC + LIMIT 10; + QUERY PLAN +------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ + Limit + Output: remote_scan.l_orderkey, COALESCE((pg_catalog.sum((COALESCE((pg_catalog.sum((COALESCE((pg_catalog.sum(remote_scan.count))::bigint, '0'::bigint))))::bigint, '0'::bigint))))::bigint, '0'::bigint) + -> Sort + Output: remote_scan.l_orderkey, COALESCE((pg_catalog.sum((COALESCE((pg_catalog.sum(remote_scan.count))::bigint, '0'::bigint))))::bigint, '0'::bigint) + Sort Key: COALESCE((pg_catalog.sum((COALESCE((pg_catalog.sum(remote_scan.count))::bigint, '0'::bigint))))::bigint, '0'::bigint) DESC, remote_scan.l_orderkey DESC + -> HashAggregate + Output: remote_scan.l_orderkey, COALESCE((pg_catalog.sum(remote_scan.count))::bigint, '0'::bigint) + Group Key: remote_scan.l_orderkey + -> Custom Scan (Citus Task-Tracker) + Output: remote_scan.l_orderkey, remote_scan.count + Task Count: 8 + Tasks Shown: One of 8 + -> Task + Node: host=localhost port=57637 dbname=regression + -> Limit + Output: l_orderkey, (count(DISTINCT l_partkey)) + -> Sort + Output: l_orderkey, (count(DISTINCT l_partkey)) + Sort Key: (count(DISTINCT lineitem_hash.l_partkey)) DESC, lineitem_hash.l_orderkey DESC + -> GroupAggregate + Output: l_orderkey, count(DISTINCT l_partkey) + Group Key: lineitem_hash.l_orderkey + -> Sort + Output: l_orderkey, l_partkey + Sort Key: lineitem_hash.l_orderkey DESC + -> Seq Scan on public.lineitem_hash_240000 lineitem_hash + Output: l_orderkey, l_partkey +(27 rows) + +-- it is also supported if there is no grouping or grouping is on non-partition field SELECT count(DISTINCT l_partkey) FROM lineitem_hash ORDER BY 1 DESC LIMIT 10; -ERROR: cannot compute aggregate (distinct) -DETAIL: table partitioning is unsuitable for aggregate (distinct) -HINT: You can load the hll extension from contrib packages and enable distinct approximations. + count +------- + 11661 +(1 row) + +EXPLAIN (COSTS false, VERBOSE true) +SELECT + count(DISTINCT l_partkey) + FROM lineitem_hash + ORDER BY 1 DESC + LIMIT 10; + QUERY PLAN +---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- + Limit + Output: count(DISTINCT (count(DISTINCT (count(DISTINCT remote_scan.count))))) + -> Sort + Output: count(DISTINCT (count(DISTINCT remote_scan.count))) + Sort Key: count(DISTINCT (count(DISTINCT remote_scan.count))) DESC + -> Aggregate + Output: count(DISTINCT remote_scan.count) + -> Custom Scan (Citus Task-Tracker) + Output: remote_scan.count + Task Count: 8 + Tasks Shown: One of 8 + -> Task + Node: host=localhost port=57637 dbname=regression + -> HashAggregate + Output: l_partkey + Group Key: lineitem_hash.l_partkey + -> Seq Scan on public.lineitem_hash_240000 lineitem_hash + Output: l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment +(18 rows) + SELECT l_shipmode, count(DISTINCT l_partkey) FROM lineitem_hash GROUP BY l_shipmode ORDER BY 2 DESC, 1 DESC LIMIT 10; -ERROR: cannot compute aggregate (distinct) -DETAIL: table partitioning is unsuitable for aggregate (distinct) -HINT: You can load the hll extension from contrib packages and enable distinct approximations. + l_shipmode | count +------------+------- + TRUCK | 1757 + MAIL | 1730 + AIR | 1702 + FOB | 1700 + RAIL | 1696 + SHIP | 1684 + REG AIR | 1676 +(7 rows) + +EXPLAIN (COSTS false, VERBOSE true) +SELECT + l_shipmode, count(DISTINCT l_partkey) + FROM lineitem_hash + GROUP BY l_shipmode + ORDER BY 2 DESC, 1 DESC + LIMIT 10; + QUERY PLAN +---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- + Limit + Output: remote_scan.l_shipmode, count(DISTINCT (count(DISTINCT (count(DISTINCT remote_scan.count))))) + -> Sort + Output: remote_scan.l_shipmode, count(DISTINCT (count(DISTINCT remote_scan.count))) + Sort Key: count(DISTINCT (count(DISTINCT remote_scan.count))) DESC, remote_scan.l_shipmode DESC + -> GroupAggregate + Output: remote_scan.l_shipmode, count(DISTINCT remote_scan.count) + Group Key: remote_scan.l_shipmode + -> Sort + Output: remote_scan.l_shipmode, remote_scan.count + Sort Key: remote_scan.l_shipmode DESC + -> Custom Scan (Citus Task-Tracker) + Output: remote_scan.l_shipmode, remote_scan.count + Task Count: 8 + Tasks Shown: One of 8 + -> Task + Node: host=localhost port=57637 dbname=regression + -> HashAggregate + Output: l_shipmode, l_partkey + Group Key: lineitem_hash.l_shipmode, lineitem_hash.l_partkey + -> Seq Scan on public.lineitem_hash_240000 lineitem_hash + Output: l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment +(22 rows) + +-- mixed mode count distinct, grouped by partition column +SELECT + l_orderkey, count(distinct l_partkey), count(distinct l_shipmode) + FROM lineitem_hash + GROUP BY l_orderkey + ORDER BY 3 DESC, 2 DESC, 1 + LIMIT 10; + l_orderkey | count | count +------------+-------+------- + 226 | 7 | 7 + 1316 | 7 | 7 + 1477 | 7 | 7 + 3555 | 7 | 7 + 12258 | 7 | 7 + 12835 | 7 | 7 + 768 | 7 | 6 + 1121 | 7 | 6 + 1153 | 7 | 6 + 1281 | 7 | 6 +(10 rows) + +EXPLAIN (COSTS false, VERBOSE true) +SELECT + l_orderkey, count(distinct l_partkey), count(distinct l_shipmode) + FROM lineitem_hash + GROUP BY l_orderkey + ORDER BY 3 DESC, 2 DESC, 1 + LIMIT 10; + QUERY PLAN +---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- + Limit + Output: remote_scan.l_orderkey, COALESCE((pg_catalog.sum((COALESCE((pg_catalog.sum((COALESCE((pg_catalog.sum(remote_scan.count))::bigint, '0'::bigint))))::bigint, '0'::bigint))))::bigint, '0'::bigint), COALESCE((pg_catalog.sum((COALESCE((pg_catalog.sum((COALESCE((pg_catalog.sum(remote_scan.count_1))::bigint, '0'::bigint))))::bigint, '0'::bigint))))::bigint, '0'::bigint) + -> Sort + Output: remote_scan.l_orderkey, COALESCE((pg_catalog.sum((COALESCE((pg_catalog.sum(remote_scan.count))::bigint, '0'::bigint))))::bigint, '0'::bigint), COALESCE((pg_catalog.sum((COALESCE((pg_catalog.sum(remote_scan.count_1))::bigint, '0'::bigint))))::bigint, '0'::bigint) + Sort Key: COALESCE((pg_catalog.sum((COALESCE((pg_catalog.sum(remote_scan.count_1))::bigint, '0'::bigint))))::bigint, '0'::bigint) DESC, COALESCE((pg_catalog.sum((COALESCE((pg_catalog.sum(remote_scan.count))::bigint, '0'::bigint))))::bigint, '0'::bigint) DESC, remote_scan.l_orderkey + -> HashAggregate + Output: remote_scan.l_orderkey, COALESCE((pg_catalog.sum(remote_scan.count))::bigint, '0'::bigint), COALESCE((pg_catalog.sum(remote_scan.count_1))::bigint, '0'::bigint) + Group Key: remote_scan.l_orderkey + -> Custom Scan (Citus Task-Tracker) + Output: remote_scan.l_orderkey, remote_scan.count, remote_scan.count_1 + Task Count: 8 + Tasks Shown: One of 8 + -> Task + Node: host=localhost port=57637 dbname=regression + -> Limit + Output: l_orderkey, (count(DISTINCT l_partkey)), (count(DISTINCT l_shipmode)) + -> Sort + Output: l_orderkey, (count(DISTINCT l_partkey)), (count(DISTINCT l_shipmode)) + Sort Key: (count(DISTINCT lineitem_hash.l_shipmode)) DESC, (count(DISTINCT lineitem_hash.l_partkey)) DESC, lineitem_hash.l_orderkey + -> GroupAggregate + Output: l_orderkey, count(DISTINCT l_partkey), count(DISTINCT l_shipmode) + Group Key: lineitem_hash.l_orderkey + -> Sort + Output: l_orderkey, l_partkey, l_shipmode + Sort Key: lineitem_hash.l_orderkey + -> Seq Scan on public.lineitem_hash_240000 lineitem_hash + Output: l_orderkey, l_partkey, l_shipmode +(27 rows) + +-- partition/non-partition column count distinct no grouping +SELECT + count(distinct l_orderkey), count(distinct l_partkey), count(distinct l_shipmode) + FROM lineitem_hash; + count | count | count +-------+-------+------- + 2985 | 11661 | 7 +(1 row) + +EXPLAIN (COSTS false, VERBOSE true) +SELECT + count(distinct l_orderkey), count(distinct l_partkey), count(distinct l_shipmode) + FROM lineitem_hash; + QUERY PLAN +---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- + Aggregate + Output: count(DISTINCT remote_scan.count), count(DISTINCT remote_scan.count_1), count(DISTINCT remote_scan.count_2) + -> Custom Scan (Citus Task-Tracker) + Output: remote_scan.count, remote_scan.count_1, remote_scan.count_2 + Task Count: 8 + Tasks Shown: One of 8 + -> Task + Node: host=localhost port=57637 dbname=regression + -> HashAggregate + Output: l_orderkey, l_partkey, l_shipmode + Group Key: lineitem_hash.l_orderkey, lineitem_hash.l_partkey, lineitem_hash.l_shipmode + -> Seq Scan on public.lineitem_hash_240000 lineitem_hash + Output: l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment +(13 rows) + +-- distinct/non-distinct on partition and non-partition columns +SELECT + count(distinct l_orderkey), count(l_orderkey), + count(distinct l_partkey), count(l_partkey), + count(distinct l_shipmode), count(l_shipmode) + FROM lineitem_hash; + count | count | count | count | count | count +-------+-------+-------+-------+-------+------- + 2985 | 12000 | 11661 | 12000 | 7 | 12000 +(1 row) + +-- mixed mode count distinct, grouped by non-partition column +SELECT + l_shipmode, count(distinct l_partkey), count(distinct l_orderkey) + FROM lineitem_hash + GROUP BY l_shipmode + ORDER BY 1, 2 DESC, 3 DESC; + l_shipmode | count | count +------------+-------+------- + AIR | 1702 | 1327 + FOB | 1700 | 1276 + MAIL | 1730 | 1299 + RAIL | 1696 | 1265 + REG AIR | 1676 | 1275 + SHIP | 1684 | 1289 + TRUCK | 1757 | 1333 +(7 rows) + +-- mixed mode count distinct, grouped by non-partition column +-- having on partition column +SELECT + l_shipmode, count(distinct l_partkey), count(distinct l_orderkey) + FROM lineitem_hash + GROUP BY l_shipmode + HAVING count(distinct l_orderkey) > 1300 + ORDER BY 1, 2 DESC; + l_shipmode | count | count +------------+-------+------- + AIR | 1702 | 1327 + TRUCK | 1757 | 1333 +(2 rows) + +-- same but having clause is not on target list +SELECT + l_shipmode, count(distinct l_partkey) + FROM lineitem_hash + GROUP BY l_shipmode + HAVING count(distinct l_orderkey) > 1300 + ORDER BY 1, 2 DESC; + l_shipmode | count +------------+------- + AIR | 1702 + TRUCK | 1757 +(2 rows) + +-- mixed mode count distinct, grouped by non-partition column +-- having on non-partition column +SELECT + l_shipmode, count(distinct l_partkey), count(distinct l_suppkey) + FROM lineitem_hash + GROUP BY l_shipmode + HAVING count(distinct l_suppkey) > 1550 + ORDER BY 1, 2 DESC; + l_shipmode | count | count +------------+-------+------- + AIR | 1702 | 1564 + FOB | 1700 | 1571 + MAIL | 1730 | 1573 + RAIL | 1696 | 1581 + REG AIR | 1676 | 1557 + SHIP | 1684 | 1554 + TRUCK | 1757 | 1602 +(7 rows) + +-- same but having clause is not on target list +SELECT + l_shipmode, count(distinct l_partkey) + FROM lineitem_hash + GROUP BY l_shipmode + HAVING count(distinct l_suppkey) > 1550 + ORDER BY 1, 2 DESC; + l_shipmode | count +------------+------- + AIR | 1702 + FOB | 1700 + MAIL | 1730 + RAIL | 1696 + REG AIR | 1676 + SHIP | 1684 + TRUCK | 1757 +(7 rows) + +EXPLAIN (COSTS false, VERBOSE true) +SELECT + l_shipmode, count(distinct l_partkey) + FROM lineitem_hash + GROUP BY l_shipmode + HAVING count(distinct l_suppkey) > 1550 + ORDER BY 1, 2 DESC; + QUERY PLAN +---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- + Sort + Output: remote_scan.l_shipmode, count(DISTINCT (count(DISTINCT remote_scan.count))) + Sort Key: remote_scan.l_shipmode, count(DISTINCT (count(DISTINCT remote_scan.count))) DESC + -> GroupAggregate + Output: remote_scan.l_shipmode, count(DISTINCT remote_scan.count) + Group Key: remote_scan.l_shipmode + Filter: (count(DISTINCT remote_scan.worker_column_3) > 1550) + -> Sort + Output: remote_scan.l_shipmode, remote_scan.count, remote_scan.worker_column_3 + Sort Key: remote_scan.l_shipmode + -> Custom Scan (Citus Task-Tracker) + Output: remote_scan.l_shipmode, remote_scan.count, remote_scan.worker_column_3 + Task Count: 8 + Tasks Shown: One of 8 + -> Task + Node: host=localhost port=57637 dbname=regression + -> HashAggregate + Output: l_shipmode, l_partkey, l_suppkey + Group Key: lineitem_hash.l_shipmode, lineitem_hash.l_partkey, lineitem_hash.l_suppkey + -> Seq Scan on public.lineitem_hash_240000 lineitem_hash + Output: l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment +(21 rows) + -- count distinct is supported on single table subqueries SELECT * FROM ( @@ -121,27 +443,152 @@ SELECT * 1927 | 3 (10 rows) --- count distinct with filters -SELECT - l_orderkey, count(DISTINCT l_partkey) FILTER (WHERE l_shipmode = 'AIR') - FROM lineitem_hash - GROUP BY l_orderkey +EXPLAIN (COSTS false, VERBOSE true) +SELECT * + FROM ( + SELECT + l_partkey, count(DISTINCT l_orderkey) + FROM lineitem_hash + GROUP BY l_partkey) sub ORDER BY 2 DESC, 1 DESC LIMIT 10; - l_orderkey | count -------------+------- - 12005 | 4 - 5409 | 4 - 4964 | 4 - 14848 | 3 - 14496 | 3 - 13473 | 3 - 13122 | 3 - 12929 | 3 - 12645 | 3 - 12417 | 3 + QUERY PLAN +------------------------------------------------------------------------- + Limit + Output: remote_scan.l_partkey, remote_scan.count + -> Sort + Output: remote_scan.l_partkey, remote_scan.count + Sort Key: remote_scan.count DESC, remote_scan.l_partkey DESC + -> Custom Scan (Citus Task-Tracker) + Output: remote_scan.l_partkey, remote_scan.count + Task Count: 4 + Tasks Shown: None, not supported for re-partition queries + -> MapMergeJob + Map Task Count: 8 + Merge Task Count: 4 +(12 rows) + +-- count distinct with filters +SELECT + l_orderkey, + count(DISTINCT l_suppkey) FILTER (WHERE l_shipmode = 'AIR'), + count(DISTINCT l_suppkey) + FROM lineitem_hash + GROUP BY l_orderkey + ORDER BY 2 DESC, 3 DESC, 1 + LIMIT 10; + l_orderkey | count | count +------------+-------+------- + 4964 | 4 | 7 + 12005 | 4 | 7 + 5409 | 4 | 6 + 164 | 3 | 7 + 322 | 3 | 7 + 871 | 3 | 7 + 1156 | 3 | 7 + 1574 | 3 | 7 + 2054 | 3 | 7 + 2309 | 3 | 7 (10 rows) +EXPLAIN (COSTS false, VERBOSE true) +SELECT + l_orderkey, + count(DISTINCT l_suppkey) FILTER (WHERE l_shipmode = 'AIR'), + count(DISTINCT l_suppkey) + FROM lineitem_hash + GROUP BY l_orderkey + ORDER BY 2 DESC, 3 DESC, 1 + LIMIT 10; + QUERY PLAN +---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- + Limit + Output: remote_scan.l_orderkey, COALESCE((pg_catalog.sum((COALESCE((pg_catalog.sum((COALESCE((pg_catalog.sum(remote_scan.count))::bigint, '0'::bigint))))::bigint, '0'::bigint))))::bigint, '0'::bigint), COALESCE((pg_catalog.sum((COALESCE((pg_catalog.sum((COALESCE((pg_catalog.sum(remote_scan.count_1))::bigint, '0'::bigint))))::bigint, '0'::bigint))))::bigint, '0'::bigint) + -> Sort + Output: remote_scan.l_orderkey, COALESCE((pg_catalog.sum((COALESCE((pg_catalog.sum(remote_scan.count))::bigint, '0'::bigint))))::bigint, '0'::bigint), COALESCE((pg_catalog.sum((COALESCE((pg_catalog.sum(remote_scan.count_1))::bigint, '0'::bigint))))::bigint, '0'::bigint) + Sort Key: COALESCE((pg_catalog.sum((COALESCE((pg_catalog.sum(remote_scan.count))::bigint, '0'::bigint))))::bigint, '0'::bigint) DESC, COALESCE((pg_catalog.sum((COALESCE((pg_catalog.sum(remote_scan.count_1))::bigint, '0'::bigint))))::bigint, '0'::bigint) DESC, remote_scan.l_orderkey + -> HashAggregate + Output: remote_scan.l_orderkey, COALESCE((pg_catalog.sum(remote_scan.count))::bigint, '0'::bigint), COALESCE((pg_catalog.sum(remote_scan.count_1))::bigint, '0'::bigint) + Group Key: remote_scan.l_orderkey + -> Custom Scan (Citus Task-Tracker) + Output: remote_scan.l_orderkey, remote_scan.count, remote_scan.count_1 + Task Count: 8 + Tasks Shown: One of 8 + -> Task + Node: host=localhost port=57637 dbname=regression + -> Limit + Output: l_orderkey, (count(DISTINCT l_suppkey) FILTER (WHERE (l_shipmode = 'AIR'::bpchar))), (count(DISTINCT l_suppkey)) + -> Sort + Output: l_orderkey, (count(DISTINCT l_suppkey) FILTER (WHERE (l_shipmode = 'AIR'::bpchar))), (count(DISTINCT l_suppkey)) + Sort Key: (count(DISTINCT lineitem_hash.l_suppkey) FILTER (WHERE (lineitem_hash.l_shipmode = 'AIR'::bpchar))) DESC, (count(DISTINCT lineitem_hash.l_suppkey)) DESC, lineitem_hash.l_orderkey + -> GroupAggregate + Output: l_orderkey, count(DISTINCT l_suppkey) FILTER (WHERE (l_shipmode = 'AIR'::bpchar)), count(DISTINCT l_suppkey) + Group Key: lineitem_hash.l_orderkey + -> Sort + Output: l_orderkey, l_suppkey, l_shipmode + Sort Key: lineitem_hash.l_orderkey + -> Seq Scan on public.lineitem_hash_240000 lineitem_hash + Output: l_orderkey, l_suppkey, l_shipmode +(27 rows) + +-- group by on non-partition column +SELECT + l_suppkey, count(DISTINCT l_partkey) FILTER (WHERE l_shipmode = 'AIR') + FROM lineitem_hash + GROUP BY l_suppkey + ORDER BY 2 DESC, 1 DESC + LIMIT 10; + l_suppkey | count +-----------+------- + 7680 | 4 + 7703 | 3 + 7542 | 3 + 7072 | 3 + 6335 | 3 + 5873 | 3 + 1318 | 3 + 1042 | 3 + 160 | 3 + 9872 | 2 +(10 rows) + +-- explaining the same query fails +EXPLAIN (COSTS false, VERBOSE true) +SELECT + l_suppkey, count(DISTINCT l_partkey) FILTER (WHERE l_shipmode = 'AIR') + FROM lineitem_hash + GROUP BY l_suppkey + ORDER BY 2 DESC, 1 DESC + LIMIT 10; +ERROR: bogus varattno for OUTER_VAR var: 3 +-- without group by, on partition column +SELECT + count(DISTINCT l_orderkey) FILTER (WHERE l_shipmode = 'AIR') + FROM lineitem_hash; + count +------- + 1327 +(1 row) + +-- without group by, on non-partition column +SELECT + count(DISTINCT l_partkey) FILTER (WHERE l_shipmode = 'AIR') + FROM lineitem_hash; + count +------- + 1702 +(1 row) + +SELECT + count(DISTINCT l_partkey) FILTER (WHERE l_shipmode = 'AIR'), + count(DISTINCT l_partkey), + count(DISTINCT l_shipdate) + FROM lineitem_hash; + count | count | count +-------+-------+------- + 1702 | 11661 | 2470 +(1 row) + -- filter column already exists in target list SELECT * FROM ( diff --git a/src/test/regress/sql/multi_view.sql b/src/test/regress/sql/multi_view.sql index 347c2ee37..1a976d6d2 100644 --- a/src/test/regress/sql/multi_view.sql +++ b/src/test/regress/sql/multi_view.sql @@ -45,9 +45,12 @@ SELECT count(*) FROM orders_hash_part join air_shipped_lineitems ON (o_orderkey -- join between views SELECT count(*) FROM priority_orders join air_shipped_lineitems ON (o_orderkey = l_orderkey); --- count distinct on partition column is not supported +-- count distinct on partition column is supported SELECT count(distinct o_orderkey) FROM priority_orders join air_shipped_lineitems ON (o_orderkey = l_orderkey); +-- count distinct on non-partition column is supported +SELECT count(distinct o_orderpriority) FROM priority_orders join air_shipped_lineitems ON (o_orderkey = l_orderkey); + -- count distinct on partition column is supported on router queries SELECT count(distinct o_orderkey) FROM priority_orders join air_shipped_lineitems ON (o_orderkey = l_orderkey) @@ -71,7 +74,6 @@ SELECT count(*) FROM priority_orders right join lineitem_hash_part ON (o_orderke -- but view at the outer side is. This is essentially the same as a left join with arguments reversed. SELECT count(*) FROM lineitem_hash_part right join priority_orders ON (o_orderkey = l_orderkey) WHERE l_shipmode ='AIR'; - -- left join on router query is supported SELECT o_orderkey, l_linenumber FROM priority_orders left join air_shipped_lineitems ON (o_orderkey = l_orderkey) WHERE o_orderkey = 2;