Support count(distinct) for non-partition columns (#1692)

Expands count distinct coverage by allowing more cases. We used to support
count distinct only if we can push down distinct aggregate to worker query
i.e. the count distinct clause was on the partition column of the table,
or there was a grouping on the partition column.

Now we can support
- non-partition columns, with or without grouping on partition column
- partition, and non partition column in the same query
- having clause
- single table subqueries
- insert into select queries
- join queries where count distinct is on partition, or non-partition column
- filters on count distinct clauses (extends existing support)

We first try to push down aggregate to worker query (original case), if we
can't then we modify worker query to return distinct columns to coordinator
node. We do that by adding distinct column targets to group by clauses. Then
we perform count distinct operation on the coordinator node.

This work should reduce the cases where HLL is used as it can address anything
that HLL can. However, if we start having performance issues due to very large
number rows, then we can recommend hll use.
pull/1687/head
Murat Tuncer 2017-10-30 13:12:24 +02:00 committed by GitHub
parent 6bb4cf998e
commit e16805215d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 1074 additions and 153 deletions

View File

@ -62,15 +62,15 @@ double CountDistinctErrorRate = 0.0; /* precision of count(distinct) approximate
typedef struct MasterAggregateWalkerContext
{
bool repartitionSubquery;
AttrNumber columnId;
bool pullDistinctColumns;
} MasterAggregateWalkerContext;
typedef struct WorkerAggregateWalkerContext
{
bool repartitionSubquery;
List *expressionList;
bool createGroupByClause;
bool pullDistinctColumns;
} WorkerAggregateWalkerContext;
@ -117,7 +117,9 @@ static void ApplyExtendedOpNodes(MultiExtendedOp *originalNode,
MultiExtendedOp *masterNode,
MultiExtendedOp *workerNode);
static void TransformSubqueryNode(MultiTable *subqueryNode, List *tableNodeList);
static MultiExtendedOp * MasterExtendedOpNode(MultiExtendedOp *originalOpNode);
static MultiExtendedOp * MasterExtendedOpNode(MultiExtendedOp *originalOpNode,
bool groupedByDisjointPartitionColumn,
List *tableNodeList);
static Node * MasterAggregateMutator(Node *originalNode,
MasterAggregateWalkerContext *walkerContext);
static Expr * MasterAggregateExpression(Aggref *originalAggregate,
@ -126,7 +128,14 @@ static Expr * MasterAverageExpression(Oid sumAggregateType, Oid countAggregateTy
AttrNumber *columnId);
static Expr * AddTypeConversion(Node *originalAggregate, Node *newExpression);
static MultiExtendedOp * WorkerExtendedOpNode(MultiExtendedOp *originalOpNode,
bool groupedByDisjointPartitionColumn);
bool groupedByDisjointPartitionColumn,
List *tableNodeList);
static bool HasNonPartitionColumnDistinctAgg(List *targetEntryList, Node *havingQual,
List *tableNodeList);
static bool ShouldPullDistinctColumn(bool repartitionSubquery,
bool groupedByDisjointPartitionColumn,
bool hasNonPartitionColumnDistinctAgg);
static bool PartitionColumnInTableList(Var *column, List *tableNodeList);
static bool WorkerAggregateWalker(Node *node,
WorkerAggregateWalkerContext *walkerContext);
static List * WorkerAggregateExpressionList(Aggref *originalAggregate,
@ -135,6 +144,7 @@ static AggregateType GetAggregateType(Oid aggFunctionId);
static Oid AggregateArgumentType(Aggref *aggregate);
static Oid AggregateFunctionOid(const char *functionName, Oid inputType);
static Oid TypeOid(Oid schemaId, const char *typeName);
static SortGroupClause * CreateSortGroupClause(Var *column);
/* Local functions forward declarations for count(distinct) approximations */
static char * CountDistinctHashFunctionName(Oid argumentType);
@ -261,9 +271,12 @@ MultiLogicalPlanOptimize(MultiTreeRoot *multiLogicalPlan)
groupedByDisjointPartitionColumn = GroupedByDisjointPartitionColumn(tableNodeList,
extendedOpNode);
masterExtendedOpNode = MasterExtendedOpNode(extendedOpNode);
masterExtendedOpNode = MasterExtendedOpNode(extendedOpNode,
groupedByDisjointPartitionColumn,
tableNodeList);
workerExtendedOpNode = WorkerExtendedOpNode(extendedOpNode,
groupedByDisjointPartitionColumn);
groupedByDisjointPartitionColumn,
tableNodeList);
ApplyExtendedOpNodes(extendedOpNode, masterExtendedOpNode, workerExtendedOpNode);
@ -1168,9 +1181,12 @@ TransformSubqueryNode(MultiTable *subqueryNode, List *tableNodeList)
MultiNode *collectChildNode = ChildNode((MultiUnaryNode *) collectNode);
bool groupedByDisjointPartitionColumn =
GroupedByDisjointPartitionColumn(tableNodeList, extendedOpNode);
MultiExtendedOp *masterExtendedOpNode = MasterExtendedOpNode(extendedOpNode);
MultiExtendedOp *masterExtendedOpNode =
MasterExtendedOpNode(extendedOpNode, groupedByDisjointPartitionColumn,
tableNodeList);
MultiExtendedOp *workerExtendedOpNode =
WorkerExtendedOpNode(extendedOpNode, groupedByDisjointPartitionColumn);
WorkerExtendedOpNode(extendedOpNode, groupedByDisjointPartitionColumn,
tableNodeList);
MultiPartition *partitionNode = CitusMakeNode(MultiPartition);
List *groupClauseList = extendedOpNode->groupClauseList;
List *targetEntryList = extendedOpNode->targetList;
@ -1232,7 +1248,9 @@ TransformSubqueryNode(MultiTable *subqueryNode, List *tableNodeList)
* worker nodes' results.
*/
static MultiExtendedOp *
MasterExtendedOpNode(MultiExtendedOp *originalOpNode)
MasterExtendedOpNode(MultiExtendedOp *originalOpNode,
bool groupedByDisjointPartitionColumn,
List *tableNodeList)
{
MultiExtendedOp *masterExtendedOpNode = NULL;
List *targetEntryList = originalOpNode->targetList;
@ -1244,15 +1262,23 @@ MasterExtendedOpNode(MultiExtendedOp *originalOpNode)
MultiNode *childNode = ChildNode((MultiUnaryNode *) originalOpNode);
MasterAggregateWalkerContext *walkerContext = palloc0(
sizeof(MasterAggregateWalkerContext));
bool hasNonPartitionColumnDistinctAgg = false;
bool repartitionSubquery = false;
walkerContext->columnId = 1;
walkerContext->repartitionSubquery = false;
if (CitusIsA(parentNode, MultiTable) && CitusIsA(childNode, MultiCollect))
{
walkerContext->repartitionSubquery = true;
repartitionSubquery = true;
}
hasNonPartitionColumnDistinctAgg = HasNonPartitionColumnDistinctAgg(targetEntryList,
originalHavingQual,
tableNodeList);
walkerContext->pullDistinctColumns =
ShouldPullDistinctColumn(repartitionSubquery, groupedByDisjointPartitionColumn,
hasNonPartitionColumnDistinctAgg);
/* iterate over original target entries */
foreach(targetEntryCell, targetEntryList)
{
@ -1266,7 +1292,6 @@ MasterExtendedOpNode(MultiExtendedOp *originalOpNode)
{
Node *newNode = MasterAggregateMutator((Node *) originalExpression,
walkerContext);
newExpression = (Expr *) newNode;
}
else
@ -1379,7 +1404,7 @@ MasterAggregateExpression(Aggref *originalAggregate,
if (aggregateType == AGGREGATE_COUNT && originalAggregate->aggdistinct &&
CountDistinctErrorRate == DISABLE_DISTINCT_APPROXIMATION &&
walkerContext->repartitionSubquery)
walkerContext->pullDistinctColumns)
{
Aggref *aggregate = (Aggref *) copyObject(originalAggregate);
List *varList = pull_var_clause_default((Node *) aggregate);
@ -1769,13 +1794,14 @@ AddTypeConversion(Node *originalAggregate, Node *newExpression)
* function to create aggregates for the worker nodes. Also, the function checks
* if we can push down the limit to worker nodes; and if we can, sets the limit
* count and sort clause list fields in the new operator node. It provides special
* treatment for count distinct operator if it is used in repartition subqueries.
* Each column in count distinct aggregate is added to target list, and group by
* list of worker extended operator.
* treatment for count distinct operator if it is used in repartition subqueries
* or on non-partition columns. Each column in count distinct aggregate is added
* to target list, and group by list of worker extended operator.
*/
static MultiExtendedOp *
WorkerExtendedOpNode(MultiExtendedOp *originalOpNode,
bool groupedByDisjointPartitionColumn)
bool groupedByDisjointPartitionColumn,
List *tableNodeList)
{
MultiExtendedOp *workerExtendedOpNode = NULL;
MultiNode *parentNode = ParentNode((MultiNode *) originalOpNode);
@ -1790,28 +1816,37 @@ WorkerExtendedOpNode(MultiExtendedOp *originalOpNode,
palloc0(sizeof(WorkerAggregateWalkerContext));
Index nextSortGroupRefIndex = 0;
bool queryHasAggregates = false;
bool enableLimitPushdown = true;
bool hasNonPartitionColumnDistinctAgg = false;
bool repartitionSubquery = false;
walkerContext->repartitionSubquery = false;
walkerContext->expressionList = NIL;
/* find max of sort group ref index */
foreach(targetEntryCell, targetEntryList)
{
TargetEntry *targetEntry = (TargetEntry *) lfirst(targetEntryCell);
if (targetEntry->ressortgroupref > nextSortGroupRefIndex)
{
nextSortGroupRefIndex = targetEntry->ressortgroupref;
}
}
/* next group ref index starts from max group ref index + 1 */
nextSortGroupRefIndex++;
if (CitusIsA(parentNode, MultiTable) && CitusIsA(childNode, MultiCollect))
{
walkerContext->repartitionSubquery = true;
/* find max of sort group ref index */
foreach(targetEntryCell, targetEntryList)
{
TargetEntry *targetEntry = (TargetEntry *) lfirst(targetEntryCell);
if (targetEntry->ressortgroupref > nextSortGroupRefIndex)
{
nextSortGroupRefIndex = targetEntry->ressortgroupref;
}
}
/* next group ref index starts from max group ref index + 1 */
nextSortGroupRefIndex++;
repartitionSubquery = true;
}
hasNonPartitionColumnDistinctAgg = HasNonPartitionColumnDistinctAgg(targetEntryList,
havingQual,
tableNodeList);
walkerContext->pullDistinctColumns =
ShouldPullDistinctColumn(repartitionSubquery, groupedByDisjointPartitionColumn,
hasNonPartitionColumnDistinctAgg);
/* iterate over original target entries */
foreach(targetEntryCell, targetEntryList)
{
@ -1821,6 +1856,7 @@ WorkerExtendedOpNode(MultiExtendedOp *originalOpNode,
ListCell *newExpressionCell = NULL;
bool hasAggregates = contain_agg_clause((Node *) originalExpression);
/* reset walker context */
walkerContext->expressionList = NIL;
walkerContext->createGroupByClause = false;
@ -1852,26 +1888,19 @@ WorkerExtendedOpNode(MultiExtendedOp *originalOpNode,
*/
if (IsA(newExpression, Var) && walkerContext->createGroupByClause)
{
Var *column = (Var *) newExpression;
Oid lessThanOperator = InvalidOid;
Oid equalsOperator = InvalidOid;
bool hashable = false;
SortGroupClause *groupByClause = makeNode(SortGroupClause);
get_sort_group_operators(column->vartype, true, true, true,
&lessThanOperator, &equalsOperator, NULL,
&hashable);
groupByClause->eqop = equalsOperator;
groupByClause->hashable = hashable;
groupByClause->nulls_first = false;
groupByClause->sortop = lessThanOperator;
Var *column = (Var *) newTargetEntry->expr;
SortGroupClause *groupByClause = CreateSortGroupClause(column);
newTargetEntry->ressortgroupref = nextSortGroupRefIndex;
groupByClause->tleSortGroupRef = nextSortGroupRefIndex;
groupClauseList = lappend(groupClauseList, groupByClause);
newTargetEntry->ressortgroupref = nextSortGroupRefIndex;
nextSortGroupRefIndex++;
/*
* If we introduce new columns accompanied by a new group by clause,
* than pushing down limits will cause incorrect results.
*/
enableLimitPushdown = false;
}
if (newTargetEntry->resname == NULL)
@ -1921,6 +1950,23 @@ WorkerExtendedOpNode(MultiExtendedOp *originalOpNode,
newTargetEntry->resjunk = false;
newTargetEntry->resno = targetProjectionNumber;
if (IsA(newExpression, Var) && walkerContext->createGroupByClause)
{
Var *column = (Var *) newTargetEntry->expr;
SortGroupClause *groupByClause = CreateSortGroupClause(column);
newTargetEntry->ressortgroupref = nextSortGroupRefIndex;
groupByClause->tleSortGroupRef = nextSortGroupRefIndex;
groupClauseList = lappend(groupClauseList, groupByClause);
nextSortGroupRefIndex++;
/*
* If we introduce new columns accompanied by a new group by clause,
* than pushing down limits will cause incorrect results.
*/
enableLimitPushdown = false;
}
newTargetEntryList = lappend(newTargetEntryList, newTargetEntry);
targetProjectionNumber++;
}
@ -1939,11 +1985,14 @@ WorkerExtendedOpNode(MultiExtendedOp *originalOpNode,
workerExtendedOpNode->groupClauseList = groupClauseList;
/* if we can push down the limit, also set related fields */
workerExtendedOpNode->limitCount = WorkerLimitCount(originalOpNode,
groupedByDisjointPartitionColumn);
workerExtendedOpNode->sortClauseList =
WorkerSortClauseList(originalOpNode, groupedByDisjointPartitionColumn);
if (enableLimitPushdown)
{
/* if we can push down the limit, also set related fields */
workerExtendedOpNode->limitCount = WorkerLimitCount(originalOpNode,
groupedByDisjointPartitionColumn);
workerExtendedOpNode->sortClauseList =
WorkerSortClauseList(originalOpNode, groupedByDisjointPartitionColumn);
}
/*
* If grouped by a partition column whose values are shards have disjoint sets
@ -1958,6 +2007,127 @@ WorkerExtendedOpNode(MultiExtendedOp *originalOpNode,
}
/*
* HasNonPartitionColumnDistinctAgg returns true if target entry or having qualifier
* has non-partition column reference in aggregate (distinct) definition. Note that,
* it only checks aggs subfield of Aggref, it does not check FILTER or SORT clauses.
*/
static bool
HasNonPartitionColumnDistinctAgg(List *targetEntryList, Node *havingQual,
List *tableNodeList)
{
List *targetVarList = pull_var_clause((Node *) targetEntryList,
PVC_INCLUDE_AGGREGATES);
List *havingVarList = pull_var_clause((Node *) havingQual, PVC_INCLUDE_AGGREGATES);
List *aggregateCheckList = list_concat(targetVarList, havingVarList);
ListCell *aggregateCheckCell = NULL;
foreach(aggregateCheckCell, aggregateCheckList)
{
Node *targetNode = lfirst(aggregateCheckCell);
Aggref *targetAgg = NULL;
List *varList = NIL;
ListCell *varCell = NULL;
bool isPartitionColumn = false;
if (IsA(targetNode, Var))
{
continue;
}
Assert(IsA(targetNode, Aggref));
targetAgg = (Aggref *) targetNode;
if (targetAgg->aggdistinct == NIL)
{
continue;
}
varList = pull_var_clause_default((Node *) targetAgg->args);
foreach(varCell, varList)
{
Node *targetVar = (Node *) lfirst(varCell);
Assert(IsA(targetVar, Var));
isPartitionColumn =
PartitionColumnInTableList((Var *) targetVar, tableNodeList);
if (!isPartitionColumn)
{
return true;
}
}
}
return false;
}
/*
* ShouldPullDistinctColumn returns true if distinct aggregate should pull
* individual columns from worker to master and evaluate aggregate operation
* at master.
*
* Pull cases are:
* - repartition subqueries
* - query has count distinct on a non-partition column on at least one target
* - count distinct is on a non-partition column and query is not
* grouped on partition column
*/
static bool
ShouldPullDistinctColumn(bool repartitionSubquery,
bool groupedByDisjointPartitionColumn,
bool hasNonPartitionColumnDistinctAgg)
{
if (repartitionSubquery)
{
return true;
}
if (groupedByDisjointPartitionColumn)
{
return false;
}
else if (!groupedByDisjointPartitionColumn && hasNonPartitionColumnDistinctAgg)
{
return true;
}
return false;
}
/*
* PartitionColumnInTableList returns true if provided column is a partition
* column from provided table node list. It also returns false if a column is
* partition column of an append distributed table.
*/
static bool
PartitionColumnInTableList(Var *column, List *tableNodeList)
{
ListCell *tableNodeCell = NULL;
foreach(tableNodeCell, tableNodeList)
{
MultiTable *tableNode = lfirst(tableNodeCell);
Var *partitionColumn = tableNode->partitionColumn;
if (partitionColumn != NULL &&
partitionColumn->varno == column->varno &&
partitionColumn->varattno == column->varattno)
{
Assert(partitionColumn->varno == tableNode->rangeTableId);
if (PartitionMethod(tableNode->relationId) != DISTRIBUTE_BY_APPEND)
{
return true;
}
}
}
return false;
}
/*
* WorkerAggregateWalker walks over the original target entry expression, and
* creates the list of expression trees (potentially more than one) to execute
@ -2016,7 +2186,7 @@ WorkerAggregateExpressionList(Aggref *originalAggregate,
if (aggregateType == AGGREGATE_COUNT && originalAggregate->aggdistinct &&
CountDistinctErrorRate == DISABLE_DISTINCT_APPROXIMATION &&
walkerContext->repartitionSubquery)
walkerContext->pullDistinctColumns)
{
Aggref *aggregate = (Aggref *) copyObject(originalAggregate);
List *columnList = pull_var_clause_default((Node *) aggregate);
@ -2355,6 +2525,31 @@ GroupedByDisjointPartitionColumn(List *tableNodeList, MultiExtendedOp *opNode)
}
/*
* CreateSortGroupClause creates SortGroupClause for a given column Var.
* The caller should set tleSortGroupRef field and respective
* TargetEntry->ressortgroupref fields to appropriate SortGroupRefIndex.
*/
static SortGroupClause *
CreateSortGroupClause(Var *column)
{
Oid lessThanOperator = InvalidOid;
Oid equalsOperator = InvalidOid;
bool hashable = false;
SortGroupClause *groupByClause = makeNode(SortGroupClause);
get_sort_group_operators(column->vartype, true, true, true,
&lessThanOperator, &equalsOperator, NULL,
&hashable);
groupByClause->eqop = equalsOperator;
groupByClause->hashable = hashable;
groupByClause->nulls_first = false;
groupByClause->sortop = lessThanOperator;
return groupByClause;
}
/*
* CountDistinctHashFunctionName resolves the hll_hash function name to use for
* the given input type, and returns this function name.
@ -2645,28 +2840,34 @@ ErrorIfUnsupportedAggregateDistinct(Aggref *aggregateExpression,
extendedOpNode = (MultiExtendedOp *) linitial(extendedOpNodeList);
distinctColumn = AggregateDistinctColumn(aggregateExpression);
if (distinctSupported && distinctColumn == NULL)
if (distinctSupported)
{
/*
* If the query has a single table, and table is grouped by partition column,
* then we support count distincts even distinct column can not be identified.
*/
distinctSupported = TablePartitioningSupportsDistinct(tableNodeList,
if (distinctColumn == NULL)
{
/*
* If the query has a single table, and table is grouped by partition
* column, then we support count distincts even distinct column can
* not be identified.
*/
distinctSupported = TablePartitioningSupportsDistinct(tableNodeList,
extendedOpNode,
distinctColumn);
if (!distinctSupported)
{
errorDetail = "aggregate (distinct) on complex expressions is"
" unsupported";
}
}
else if (aggregateType != AGGREGATE_COUNT)
{
bool supports = TablePartitioningSupportsDistinct(tableNodeList,
extendedOpNode,
distinctColumn);
if (!distinctSupported)
{
errorDetail = "aggregate (distinct) on complex expressions is unsupported";
}
}
else if (distinctSupported)
{
bool supports = TablePartitioningSupportsDistinct(tableNodeList, extendedOpNode,
distinctColumn);
if (!supports)
{
distinctSupported = false;
errorDetail = "table partitioning is unsuitable for aggregate (distinct)";
if (!supports)
{
distinctSupported = false;
errorDetail = "table partitioning is unsuitable for aggregate (distinct)";
}
}
}

View File

@ -34,6 +34,7 @@ static List * MasterTargetList(List *workerTargetList);
static PlannedStmt * BuildSelectStatement(Query *masterQuery, List *masterTargetList,
CustomScan *remoteScan);
static Agg * BuildAggregatePlan(Query *masterQuery, Plan *subPlan);
static bool HasDistinctAggregate(Query *masterQuery);
static Plan * BuildDistinctPlan(Query *masterQuery, Plan *subPlan);
@ -287,13 +288,37 @@ BuildAggregatePlan(Query *masterQuery, Plan *subPlan)
/* if we have grouping, then initialize appropriate information */
if (groupColumnCount > 0)
{
if (!grouping_is_hashable(groupColumnList))
bool groupingIsHashable = grouping_is_hashable(groupColumnList);
bool groupingIsSortable = grouping_is_hashable(groupColumnList);
bool hasDistinctAggregate = HasDistinctAggregate(masterQuery);
if (!groupingIsHashable && !groupingIsSortable)
{
ereport(ERROR, (errmsg("grouped column list cannot be hashed")));
ereport(ERROR, (errmsg("grouped column list cannot be hashed or sorted")));
}
/* switch to hashed aggregate strategy to allow grouping */
aggregateStrategy = AGG_HASHED;
/*
* Postgres hash aggregate strategy does not support distinct aggregates
* in group and order by with aggregate operations.
* see nodeAgg.c:build_pertrans_for_aggref(). In that case we use
* sorted agg strategy, otherwise we use hash strategy.
*/
if (!groupingIsHashable || hasDistinctAggregate)
{
if (!groupingIsSortable)
{
ereport(ERROR, (errmsg("grouped column list must cannot be sorted"),
errdetail("Having a distinct aggregate requires "
"grouped column list to be sortable.")));
}
aggregateStrategy = AGG_SORTED;
subPlan = (Plan *) make_sort_from_sortclauses(groupColumnList, subPlan);
}
else
{
aggregateStrategy = AGG_HASHED;
}
/* get column indexes that are being grouped */
groupColumnIdArray = extract_grouping_cols(groupColumnList, subPlan->targetlist);
@ -315,6 +340,40 @@ BuildAggregatePlan(Query *masterQuery, Plan *subPlan)
}
/*
* HasDistinctAggregate returns true if the query has a distinct
* aggregate in its target list or in having clause.
*/
static bool
HasDistinctAggregate(Query *masterQuery)
{
List *targetVarList = NIL;
List *havingVarList = NIL;
List *allColumnList = NIL;
ListCell *allColumnCell = NULL;
targetVarList = pull_var_clause((Node *) masterQuery->targetList,
PVC_INCLUDE_AGGREGATES);
havingVarList = pull_var_clause(masterQuery->havingQual, PVC_INCLUDE_AGGREGATES);
allColumnList = list_concat(targetVarList, havingVarList);
foreach(allColumnCell, allColumnList)
{
Node *columnNode = lfirst(allColumnCell);
if (IsA(columnNode, Aggref))
{
Aggref *aggref = (Aggref *) columnNode;
if (aggref->aggdistinct != NIL)
{
return true;
}
}
}
return false;
}
/*
* BuildDistinctPlan creates an returns a plan for distinct. Depending on
* availability of hash function it chooses HashAgg over Sort/Unique
@ -332,6 +391,7 @@ BuildDistinctPlan(Query *masterQuery, Plan *subPlan)
List *targetList = copyObject(masterQuery->targetList);
List *columnList = pull_var_clause_default((Node *) targetList);
ListCell *columnCell = NULL;
bool hasDistinctAggregate = false;
if (IsA(subPlan, Agg))
{
@ -353,10 +413,12 @@ BuildDistinctPlan(Query *masterQuery, Plan *subPlan)
/*
* Create group by plan with HashAggregate if all distinct
* members are hashable, Otherwise create sort+unique plan.
* members are hashable, and not containing distinct aggregate.
* Otherwise create sort+unique plan.
*/
distinctClausesHashable = grouping_is_hashable(distinctClauseList);
if (distinctClausesHashable)
hasDistinctAggregate = HasDistinctAggregate(masterQuery);
if (distinctClausesHashable && !hasDistinctAggregate)
{
const long rowEstimate = 10; /* using the same value as BuildAggregatePlan() */
AttrNumber *distinctColumnIdArray = extract_grouping_cols(distinctClauseList,

View File

@ -16,9 +16,11 @@ WHERE name = 'hll'
\c - - - :master_port
-- Try to execute count(distinct) when approximate distincts aren't enabled
SELECT count(distinct l_orderkey) FROM lineitem;
ERROR: cannot compute aggregate (distinct)
DETAIL: table partitioning is unsuitable for aggregate (distinct)
HINT: You can load the hll extension from contrib packages and enable distinct approximations.
count
-------
2985
(1 row)
-- Check approximate count(distinct) at different precisions / error rates
SET citus.count_distinct_error_rate = 0.1;
SELECT count(distinct l_orderkey) FROM lineitem;
@ -200,6 +202,8 @@ SELECT
-- Check that we can revert config and disable count(distinct) approximations
SET citus.count_distinct_error_rate = 0.0;
SELECT count(distinct l_orderkey) FROM lineitem;
ERROR: cannot compute aggregate (distinct)
DETAIL: table partitioning is unsuitable for aggregate (distinct)
HINT: You can load the hll extension from contrib packages and enable distinct approximations.
count
-------
2985
(1 row)

View File

@ -31,9 +31,11 @@ WHERE name = 'hll'
\c - - - :master_port
-- Try to execute count(distinct) when approximate distincts aren't enabled
SELECT count(distinct l_orderkey) FROM lineitem;
ERROR: cannot compute aggregate (distinct)
DETAIL: table partitioning is unsuitable for aggregate (distinct)
HINT: You can load the hll extension from contrib packages and enable distinct approximations.
count
-------
2985
(1 row)
-- Check approximate count(distinct) at different precisions / error rates
SET citus.count_distinct_error_rate = 0.1;
SELECT count(distinct l_orderkey) FROM lineitem;
@ -147,6 +149,8 @@ HINT: You need to have the hll extension loaded.
-- Check that we can revert config and disable count(distinct) approximations
SET citus.count_distinct_error_rate = 0.0;
SELECT count(distinct l_orderkey) FROM lineitem;
ERROR: cannot compute aggregate (distinct)
DETAIL: table partitioning is unsuitable for aggregate (distinct)
HINT: You can load the hll extension from contrib packages and enable distinct approximations.
count
-------
2985
(1 row)

View File

@ -107,11 +107,20 @@ SELECT count(*) FROM priority_orders join air_shipped_lineitems ON (o_orderkey =
700
(1 row)
-- count distinct on partition column is not supported
-- count distinct on partition column is supported
SELECT count(distinct o_orderkey) FROM priority_orders join air_shipped_lineitems ON (o_orderkey = l_orderkey);
ERROR: cannot compute aggregate (distinct)
DETAIL: table partitioning is unsuitable for aggregate (distinct)
HINT: You can load the hll extension from contrib packages and enable distinct approximations.
count
-------
551
(1 row)
-- count distinct on non-partition column is supported
SELECT count(distinct o_orderpriority) FROM priority_orders join air_shipped_lineitems ON (o_orderkey = l_orderkey);
count
-------
2
(1 row)
-- count distinct on partition column is supported on router queries
SELECT count(distinct o_orderkey) FROM priority_orders join air_shipped_lineitems
ON (o_orderkey = l_orderkey)

View File

@ -56,22 +56,20 @@ SELECT p_partkey, count(distinct l_orderkey) FROM lineitem_range, part
RESET citus.large_table_shard_count;
-- Check that we don't support count(distinct) on non-partition column, and
-- complex expressions.
-- Check that we support count(distinct) on non-partition column.
SELECT count(distinct l_partkey) FROM lineitem_range;
-- Check that we don't support complex expressions.
SELECT count(distinct (l_orderkey + 1)) FROM lineitem_range;
-- Now test append partitioned tables. First run count(distinct) on a single
-- sharded table.
SELECT count(distinct p_mfgr) FROM part;
SELECT p_mfgr, count(distinct p_partkey) FROM part GROUP BY p_mfgr ORDER BY p_mfgr;
-- We don't support count(distinct) queries if table is append partitioned and
-- has multiple shards
SELECT count(distinct o_orderkey) FROM orders;
-- We support count(distinct) queries on append partitioned tables
-- both on partition column, and non-partition column.
SELECT count(distinct o_orderkey), count(distinct o_custkey) FROM orders;
-- Hash partitioned tables:
@ -99,13 +97,13 @@ SELECT master_create_worker_shards('lineitem_hash', 4, 1);
\copy lineitem_hash FROM '@abs_srcdir@/data/lineitem.2.data' with delimiter '|'
-- aggregate(distinct) on partition column is allowed
SELECT count(distinct l_orderkey) FROM lineitem_hash;
SELECT avg(distinct l_orderkey) FROM lineitem_hash;
-- count(distinct) on non-partition column or expression is not allowed
-- count(distinct) on non-partition column is allowed
SELECT count(distinct l_partkey) FROM lineitem_hash;
-- count(distinct) on column expression is not allowed
SELECT count(distinct (l_orderkey + 1)) FROM lineitem_hash;
-- agg(distinct) is allowed if we group by partition column
@ -115,4 +113,16 @@ SELECT l_orderkey, count(distinct l_partkey) INTO range_results FROM lineitem_ra
-- they should return the same results
SELECT * FROM hash_results h, range_results r WHERE h.l_orderkey = r.l_orderkey AND h.count != r.count;
-- count(distinct) is allowed if we group by non-partition column
SELECT l_partkey, count(distinct l_orderkey) INTO hash_results_np FROM lineitem_hash GROUP BY l_partkey;
SELECT l_partkey, count(distinct l_orderkey) INTO range_results_np FROM lineitem_range GROUP BY l_partkey;
-- they should return the same results
SELECT * FROM hash_results_np h, range_results_np r WHERE h.l_partkey = r.l_partkey AND h.count != r.count;
-- other agg(distinct) are not allowed on non-partition columns even they are grouped
-- on non-partition columns
SELECT SUM(distinct l_partkey) FROM lineitem_hash;
SELECT l_shipmode, sum(distinct l_partkey) FROM lineitem_hash GROUP BY l_shipmode;
DROP TABLE lineitem_hash;

View File

@ -35,6 +35,13 @@ SET citus.task_executor_type to "task-tracker";
-- count(distinct) is supported on top level query if there
-- is a grouping on the partition key
SELECT
l_orderkey, count(DISTINCT l_partkey)
FROM lineitem_hash
GROUP BY l_orderkey
ORDER BY 2 DESC, 1 DESC
LIMIT 10;
EXPLAIN (COSTS false, VERBOSE true)
SELECT
l_orderkey, count(DISTINCT l_partkey)
FROM lineitem_hash
@ -42,7 +49,14 @@ SELECT
ORDER BY 2 DESC, 1 DESC
LIMIT 10;
-- it is not supported if there is no grouping or grouping is on non-partition field
-- it is also supported if there is no grouping or grouping is on non-partition field
SELECT
count(DISTINCT l_partkey)
FROM lineitem_hash
ORDER BY 1 DESC
LIMIT 10;
EXPLAIN (COSTS false, VERBOSE true)
SELECT
count(DISTINCT l_partkey)
FROM lineitem_hash
@ -56,6 +70,96 @@ SELECT
ORDER BY 2 DESC, 1 DESC
LIMIT 10;
EXPLAIN (COSTS false, VERBOSE true)
SELECT
l_shipmode, count(DISTINCT l_partkey)
FROM lineitem_hash
GROUP BY l_shipmode
ORDER BY 2 DESC, 1 DESC
LIMIT 10;
-- mixed mode count distinct, grouped by partition column
SELECT
l_orderkey, count(distinct l_partkey), count(distinct l_shipmode)
FROM lineitem_hash
GROUP BY l_orderkey
ORDER BY 3 DESC, 2 DESC, 1
LIMIT 10;
EXPLAIN (COSTS false, VERBOSE true)
SELECT
l_orderkey, count(distinct l_partkey), count(distinct l_shipmode)
FROM lineitem_hash
GROUP BY l_orderkey
ORDER BY 3 DESC, 2 DESC, 1
LIMIT 10;
-- partition/non-partition column count distinct no grouping
SELECT
count(distinct l_orderkey), count(distinct l_partkey), count(distinct l_shipmode)
FROM lineitem_hash;
EXPLAIN (COSTS false, VERBOSE true)
SELECT
count(distinct l_orderkey), count(distinct l_partkey), count(distinct l_shipmode)
FROM lineitem_hash;
-- distinct/non-distinct on partition and non-partition columns
SELECT
count(distinct l_orderkey), count(l_orderkey),
count(distinct l_partkey), count(l_partkey),
count(distinct l_shipmode), count(l_shipmode)
FROM lineitem_hash;
-- mixed mode count distinct, grouped by non-partition column
SELECT
l_shipmode, count(distinct l_partkey), count(distinct l_orderkey)
FROM lineitem_hash
GROUP BY l_shipmode
ORDER BY 1, 2 DESC, 3 DESC;
-- mixed mode count distinct, grouped by non-partition column
-- having on partition column
SELECT
l_shipmode, count(distinct l_partkey), count(distinct l_orderkey)
FROM lineitem_hash
GROUP BY l_shipmode
HAVING count(distinct l_orderkey) > 1300
ORDER BY 1, 2 DESC;
-- same but having clause is not on target list
SELECT
l_shipmode, count(distinct l_partkey)
FROM lineitem_hash
GROUP BY l_shipmode
HAVING count(distinct l_orderkey) > 1300
ORDER BY 1, 2 DESC;
-- mixed mode count distinct, grouped by non-partition column
-- having on non-partition column
SELECT
l_shipmode, count(distinct l_partkey), count(distinct l_suppkey)
FROM lineitem_hash
GROUP BY l_shipmode
HAVING count(distinct l_suppkey) > 1550
ORDER BY 1, 2 DESC;
-- same but having clause is not on target list
SELECT
l_shipmode, count(distinct l_partkey)
FROM lineitem_hash
GROUP BY l_shipmode
HAVING count(distinct l_suppkey) > 1550
ORDER BY 1, 2 DESC;
EXPLAIN (COSTS false, VERBOSE true)
SELECT
l_shipmode, count(distinct l_partkey)
FROM lineitem_hash
GROUP BY l_shipmode
HAVING count(distinct l_suppkey) > 1550
ORDER BY 1, 2 DESC;
-- count distinct is supported on single table subqueries
SELECT *
FROM (
@ -75,14 +179,68 @@ SELECT *
ORDER BY 2 DESC, 1 DESC
LIMIT 10;
-- count distinct with filters
SELECT
l_orderkey, count(DISTINCT l_partkey) FILTER (WHERE l_shipmode = 'AIR')
FROM lineitem_hash
GROUP BY l_orderkey
EXPLAIN (COSTS false, VERBOSE true)
SELECT *
FROM (
SELECT
l_partkey, count(DISTINCT l_orderkey)
FROM lineitem_hash
GROUP BY l_partkey) sub
ORDER BY 2 DESC, 1 DESC
LIMIT 10;
-- count distinct with filters
SELECT
l_orderkey,
count(DISTINCT l_suppkey) FILTER (WHERE l_shipmode = 'AIR'),
count(DISTINCT l_suppkey)
FROM lineitem_hash
GROUP BY l_orderkey
ORDER BY 2 DESC, 3 DESC, 1
LIMIT 10;
EXPLAIN (COSTS false, VERBOSE true)
SELECT
l_orderkey,
count(DISTINCT l_suppkey) FILTER (WHERE l_shipmode = 'AIR'),
count(DISTINCT l_suppkey)
FROM lineitem_hash
GROUP BY l_orderkey
ORDER BY 2 DESC, 3 DESC, 1
LIMIT 10;
-- group by on non-partition column
SELECT
l_suppkey, count(DISTINCT l_partkey) FILTER (WHERE l_shipmode = 'AIR')
FROM lineitem_hash
GROUP BY l_suppkey
ORDER BY 2 DESC, 1 DESC
LIMIT 10;
-- explaining the same query fails
EXPLAIN (COSTS false, VERBOSE true)
SELECT
l_suppkey, count(DISTINCT l_partkey) FILTER (WHERE l_shipmode = 'AIR')
FROM lineitem_hash
GROUP BY l_suppkey
ORDER BY 2 DESC, 1 DESC
LIMIT 10;
-- without group by, on partition column
SELECT
count(DISTINCT l_orderkey) FILTER (WHERE l_shipmode = 'AIR')
FROM lineitem_hash;
-- without group by, on non-partition column
SELECT
count(DISTINCT l_partkey) FILTER (WHERE l_shipmode = 'AIR')
FROM lineitem_hash;
SELECT
count(DISTINCT l_partkey) FILTER (WHERE l_shipmode = 'AIR'),
count(DISTINCT l_partkey),
count(DISTINCT l_shipdate)
FROM lineitem_hash;
-- filter column already exists in target list
SELECT *
FROM (

View File

@ -73,12 +73,14 @@ SELECT p_partkey, count(distinct l_orderkey) FROM lineitem_range, part
(10 rows)
RESET citus.large_table_shard_count;
-- Check that we don't support count(distinct) on non-partition column, and
-- complex expressions.
-- Check that we support count(distinct) on non-partition column.
SELECT count(distinct l_partkey) FROM lineitem_range;
ERROR: cannot compute aggregate (distinct)
DETAIL: table partitioning is unsuitable for aggregate (distinct)
HINT: You can load the hll extension from contrib packages and enable distinct approximations.
count
-------
11661
(1 row)
-- Check that we don't support complex expressions.
SELECT count(distinct (l_orderkey + 1)) FROM lineitem_range;
ERROR: cannot compute aggregate (distinct)
DETAIL: aggregate (distinct) on complex expressions is unsupported
@ -101,12 +103,14 @@ SELECT p_mfgr, count(distinct p_partkey) FROM part GROUP BY p_mfgr ORDER BY p_mf
Manufacturer#5 | 185
(5 rows)
-- We don't support count(distinct) queries if table is append partitioned and
-- has multiple shards
SELECT count(distinct o_orderkey) FROM orders;
ERROR: cannot compute aggregate (distinct)
DETAIL: table partitioning is unsuitable for aggregate (distinct)
HINT: You can load the hll extension from contrib packages and enable distinct approximations.
-- We support count(distinct) queries on append partitioned tables
-- both on partition column, and non-partition column.
SELECT count(distinct o_orderkey), count(distinct o_custkey) FROM orders;
count | count
-------+-------
2984 | 923
(1 row)
-- Hash partitioned tables:
CREATE TABLE lineitem_hash (
l_orderkey bigint not null,
@ -152,11 +156,14 @@ SELECT avg(distinct l_orderkey) FROM lineitem_hash;
7463.9474036850921273
(1 row)
-- count(distinct) on non-partition column or expression is not allowed
-- count(distinct) on non-partition column is allowed
SELECT count(distinct l_partkey) FROM lineitem_hash;
ERROR: cannot compute aggregate (distinct)
DETAIL: table partitioning is unsuitable for aggregate (distinct)
HINT: You can load the hll extension from contrib packages and enable distinct approximations.
count
-------
11661
(1 row)
-- count(distinct) on column expression is not allowed
SELECT count(distinct (l_orderkey + 1)) FROM lineitem_hash;
ERROR: cannot compute aggregate (distinct)
DETAIL: aggregate (distinct) on complex expressions is unsupported
@ -170,4 +177,21 @@ SELECT * FROM hash_results h, range_results r WHERE h.l_orderkey = r.l_orderkey
------------+-------+------------+-------
(0 rows)
-- count(distinct) is allowed if we group by non-partition column
SELECT l_partkey, count(distinct l_orderkey) INTO hash_results_np FROM lineitem_hash GROUP BY l_partkey;
SELECT l_partkey, count(distinct l_orderkey) INTO range_results_np FROM lineitem_range GROUP BY l_partkey;
-- they should return the same results
SELECT * FROM hash_results_np h, range_results_np r WHERE h.l_partkey = r.l_partkey AND h.count != r.count;
l_partkey | count | l_partkey | count
-----------+-------+-----------+-------
(0 rows)
-- other agg(distinct) are not allowed on non-partition columns even they are grouped
-- on non-partition columns
SELECT SUM(distinct l_partkey) FROM lineitem_hash;
ERROR: cannot compute aggregate (distinct)
DETAIL: table partitioning is unsuitable for aggregate (distinct)
SELECT l_shipmode, sum(distinct l_partkey) FROM lineitem_hash GROUP BY l_shipmode;
ERROR: cannot compute aggregate (distinct)
DETAIL: table partitioning is unsuitable for aggregate (distinct)
DROP TABLE lineitem_hash;

View File

@ -58,24 +58,346 @@ SELECT
14624 | 7
(10 rows)
-- it is not supported if there is no grouping or grouping is on non-partition field
EXPLAIN (COSTS false, VERBOSE true)
SELECT
l_orderkey, count(DISTINCT l_partkey)
FROM lineitem_hash
GROUP BY l_orderkey
ORDER BY 2 DESC, 1 DESC
LIMIT 10;
QUERY PLAN
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Limit
Output: remote_scan.l_orderkey, COALESCE((pg_catalog.sum((COALESCE((pg_catalog.sum((COALESCE((pg_catalog.sum(remote_scan.count))::bigint, '0'::bigint))))::bigint, '0'::bigint))))::bigint, '0'::bigint)
-> Sort
Output: remote_scan.l_orderkey, COALESCE((pg_catalog.sum((COALESCE((pg_catalog.sum(remote_scan.count))::bigint, '0'::bigint))))::bigint, '0'::bigint)
Sort Key: COALESCE((pg_catalog.sum((COALESCE((pg_catalog.sum(remote_scan.count))::bigint, '0'::bigint))))::bigint, '0'::bigint) DESC, remote_scan.l_orderkey DESC
-> HashAggregate
Output: remote_scan.l_orderkey, COALESCE((pg_catalog.sum(remote_scan.count))::bigint, '0'::bigint)
Group Key: remote_scan.l_orderkey
-> Custom Scan (Citus Task-Tracker)
Output: remote_scan.l_orderkey, remote_scan.count
Task Count: 8
Tasks Shown: One of 8
-> Task
Node: host=localhost port=57637 dbname=regression
-> Limit
Output: l_orderkey, (count(DISTINCT l_partkey))
-> Sort
Output: l_orderkey, (count(DISTINCT l_partkey))
Sort Key: (count(DISTINCT lineitem_hash.l_partkey)) DESC, lineitem_hash.l_orderkey DESC
-> GroupAggregate
Output: l_orderkey, count(DISTINCT l_partkey)
Group Key: lineitem_hash.l_orderkey
-> Sort
Output: l_orderkey, l_partkey
Sort Key: lineitem_hash.l_orderkey DESC
-> Seq Scan on public.lineitem_hash_240000 lineitem_hash
Output: l_orderkey, l_partkey
(27 rows)
-- it is also supported if there is no grouping or grouping is on non-partition field
SELECT
count(DISTINCT l_partkey)
FROM lineitem_hash
ORDER BY 1 DESC
LIMIT 10;
ERROR: cannot compute aggregate (distinct)
DETAIL: table partitioning is unsuitable for aggregate (distinct)
HINT: You can load the hll extension from contrib packages and enable distinct approximations.
count
-------
11661
(1 row)
EXPLAIN (COSTS false, VERBOSE true)
SELECT
count(DISTINCT l_partkey)
FROM lineitem_hash
ORDER BY 1 DESC
LIMIT 10;
QUERY PLAN
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Limit
Output: count(DISTINCT (count(DISTINCT (count(DISTINCT remote_scan.count)))))
-> Sort
Output: count(DISTINCT (count(DISTINCT remote_scan.count)))
Sort Key: count(DISTINCT (count(DISTINCT remote_scan.count))) DESC
-> Aggregate
Output: count(DISTINCT remote_scan.count)
-> Custom Scan (Citus Task-Tracker)
Output: remote_scan.count
Task Count: 8
Tasks Shown: One of 8
-> Task
Node: host=localhost port=57637 dbname=regression
-> HashAggregate
Output: l_partkey
Group Key: lineitem_hash.l_partkey
-> Seq Scan on public.lineitem_hash_240000 lineitem_hash
Output: l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment
(18 rows)
SELECT
l_shipmode, count(DISTINCT l_partkey)
FROM lineitem_hash
GROUP BY l_shipmode
ORDER BY 2 DESC, 1 DESC
LIMIT 10;
ERROR: cannot compute aggregate (distinct)
DETAIL: table partitioning is unsuitable for aggregate (distinct)
HINT: You can load the hll extension from contrib packages and enable distinct approximations.
l_shipmode | count
------------+-------
TRUCK | 1757
MAIL | 1730
AIR | 1702
FOB | 1700
RAIL | 1696
SHIP | 1684
REG AIR | 1676
(7 rows)
EXPLAIN (COSTS false, VERBOSE true)
SELECT
l_shipmode, count(DISTINCT l_partkey)
FROM lineitem_hash
GROUP BY l_shipmode
ORDER BY 2 DESC, 1 DESC
LIMIT 10;
QUERY PLAN
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Limit
Output: remote_scan.l_shipmode, count(DISTINCT (count(DISTINCT (count(DISTINCT remote_scan.count)))))
-> Sort
Output: remote_scan.l_shipmode, count(DISTINCT (count(DISTINCT remote_scan.count)))
Sort Key: count(DISTINCT (count(DISTINCT remote_scan.count))) DESC, remote_scan.l_shipmode DESC
-> GroupAggregate
Output: remote_scan.l_shipmode, count(DISTINCT remote_scan.count)
Group Key: remote_scan.l_shipmode
-> Sort
Output: remote_scan.l_shipmode, remote_scan.count
Sort Key: remote_scan.l_shipmode DESC
-> Custom Scan (Citus Task-Tracker)
Output: remote_scan.l_shipmode, remote_scan.count
Task Count: 8
Tasks Shown: One of 8
-> Task
Node: host=localhost port=57637 dbname=regression
-> HashAggregate
Output: l_shipmode, l_partkey
Group Key: lineitem_hash.l_shipmode, lineitem_hash.l_partkey
-> Seq Scan on public.lineitem_hash_240000 lineitem_hash
Output: l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment
(22 rows)
-- mixed mode count distinct, grouped by partition column
SELECT
l_orderkey, count(distinct l_partkey), count(distinct l_shipmode)
FROM lineitem_hash
GROUP BY l_orderkey
ORDER BY 3 DESC, 2 DESC, 1
LIMIT 10;
l_orderkey | count | count
------------+-------+-------
226 | 7 | 7
1316 | 7 | 7
1477 | 7 | 7
3555 | 7 | 7
12258 | 7 | 7
12835 | 7 | 7
768 | 7 | 6
1121 | 7 | 6
1153 | 7 | 6
1281 | 7 | 6
(10 rows)
EXPLAIN (COSTS false, VERBOSE true)
SELECT
l_orderkey, count(distinct l_partkey), count(distinct l_shipmode)
FROM lineitem_hash
GROUP BY l_orderkey
ORDER BY 3 DESC, 2 DESC, 1
LIMIT 10;
QUERY PLAN
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Limit
Output: remote_scan.l_orderkey, COALESCE((pg_catalog.sum((COALESCE((pg_catalog.sum((COALESCE((pg_catalog.sum(remote_scan.count))::bigint, '0'::bigint))))::bigint, '0'::bigint))))::bigint, '0'::bigint), COALESCE((pg_catalog.sum((COALESCE((pg_catalog.sum((COALESCE((pg_catalog.sum(remote_scan.count_1))::bigint, '0'::bigint))))::bigint, '0'::bigint))))::bigint, '0'::bigint)
-> Sort
Output: remote_scan.l_orderkey, COALESCE((pg_catalog.sum((COALESCE((pg_catalog.sum(remote_scan.count))::bigint, '0'::bigint))))::bigint, '0'::bigint), COALESCE((pg_catalog.sum((COALESCE((pg_catalog.sum(remote_scan.count_1))::bigint, '0'::bigint))))::bigint, '0'::bigint)
Sort Key: COALESCE((pg_catalog.sum((COALESCE((pg_catalog.sum(remote_scan.count_1))::bigint, '0'::bigint))))::bigint, '0'::bigint) DESC, COALESCE((pg_catalog.sum((COALESCE((pg_catalog.sum(remote_scan.count))::bigint, '0'::bigint))))::bigint, '0'::bigint) DESC, remote_scan.l_orderkey
-> HashAggregate
Output: remote_scan.l_orderkey, COALESCE((pg_catalog.sum(remote_scan.count))::bigint, '0'::bigint), COALESCE((pg_catalog.sum(remote_scan.count_1))::bigint, '0'::bigint)
Group Key: remote_scan.l_orderkey
-> Custom Scan (Citus Task-Tracker)
Output: remote_scan.l_orderkey, remote_scan.count, remote_scan.count_1
Task Count: 8
Tasks Shown: One of 8
-> Task
Node: host=localhost port=57637 dbname=regression
-> Limit
Output: l_orderkey, (count(DISTINCT l_partkey)), (count(DISTINCT l_shipmode))
-> Sort
Output: l_orderkey, (count(DISTINCT l_partkey)), (count(DISTINCT l_shipmode))
Sort Key: (count(DISTINCT lineitem_hash.l_shipmode)) DESC, (count(DISTINCT lineitem_hash.l_partkey)) DESC, lineitem_hash.l_orderkey
-> GroupAggregate
Output: l_orderkey, count(DISTINCT l_partkey), count(DISTINCT l_shipmode)
Group Key: lineitem_hash.l_orderkey
-> Sort
Output: l_orderkey, l_partkey, l_shipmode
Sort Key: lineitem_hash.l_orderkey
-> Seq Scan on public.lineitem_hash_240000 lineitem_hash
Output: l_orderkey, l_partkey, l_shipmode
(27 rows)
-- partition/non-partition column count distinct no grouping
SELECT
count(distinct l_orderkey), count(distinct l_partkey), count(distinct l_shipmode)
FROM lineitem_hash;
count | count | count
-------+-------+-------
2985 | 11661 | 7
(1 row)
EXPLAIN (COSTS false, VERBOSE true)
SELECT
count(distinct l_orderkey), count(distinct l_partkey), count(distinct l_shipmode)
FROM lineitem_hash;
QUERY PLAN
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Aggregate
Output: count(DISTINCT remote_scan.count), count(DISTINCT remote_scan.count_1), count(DISTINCT remote_scan.count_2)
-> Custom Scan (Citus Task-Tracker)
Output: remote_scan.count, remote_scan.count_1, remote_scan.count_2
Task Count: 8
Tasks Shown: One of 8
-> Task
Node: host=localhost port=57637 dbname=regression
-> HashAggregate
Output: l_orderkey, l_partkey, l_shipmode
Group Key: lineitem_hash.l_orderkey, lineitem_hash.l_partkey, lineitem_hash.l_shipmode
-> Seq Scan on public.lineitem_hash_240000 lineitem_hash
Output: l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment
(13 rows)
-- distinct/non-distinct on partition and non-partition columns
SELECT
count(distinct l_orderkey), count(l_orderkey),
count(distinct l_partkey), count(l_partkey),
count(distinct l_shipmode), count(l_shipmode)
FROM lineitem_hash;
count | count | count | count | count | count
-------+-------+-------+-------+-------+-------
2985 | 12000 | 11661 | 12000 | 7 | 12000
(1 row)
-- mixed mode count distinct, grouped by non-partition column
SELECT
l_shipmode, count(distinct l_partkey), count(distinct l_orderkey)
FROM lineitem_hash
GROUP BY l_shipmode
ORDER BY 1, 2 DESC, 3 DESC;
l_shipmode | count | count
------------+-------+-------
AIR | 1702 | 1327
FOB | 1700 | 1276
MAIL | 1730 | 1299
RAIL | 1696 | 1265
REG AIR | 1676 | 1275
SHIP | 1684 | 1289
TRUCK | 1757 | 1333
(7 rows)
-- mixed mode count distinct, grouped by non-partition column
-- having on partition column
SELECT
l_shipmode, count(distinct l_partkey), count(distinct l_orderkey)
FROM lineitem_hash
GROUP BY l_shipmode
HAVING count(distinct l_orderkey) > 1300
ORDER BY 1, 2 DESC;
l_shipmode | count | count
------------+-------+-------
AIR | 1702 | 1327
TRUCK | 1757 | 1333
(2 rows)
-- same but having clause is not on target list
SELECT
l_shipmode, count(distinct l_partkey)
FROM lineitem_hash
GROUP BY l_shipmode
HAVING count(distinct l_orderkey) > 1300
ORDER BY 1, 2 DESC;
l_shipmode | count
------------+-------
AIR | 1702
TRUCK | 1757
(2 rows)
-- mixed mode count distinct, grouped by non-partition column
-- having on non-partition column
SELECT
l_shipmode, count(distinct l_partkey), count(distinct l_suppkey)
FROM lineitem_hash
GROUP BY l_shipmode
HAVING count(distinct l_suppkey) > 1550
ORDER BY 1, 2 DESC;
l_shipmode | count | count
------------+-------+-------
AIR | 1702 | 1564
FOB | 1700 | 1571
MAIL | 1730 | 1573
RAIL | 1696 | 1581
REG AIR | 1676 | 1557
SHIP | 1684 | 1554
TRUCK | 1757 | 1602
(7 rows)
-- same but having clause is not on target list
SELECT
l_shipmode, count(distinct l_partkey)
FROM lineitem_hash
GROUP BY l_shipmode
HAVING count(distinct l_suppkey) > 1550
ORDER BY 1, 2 DESC;
l_shipmode | count
------------+-------
AIR | 1702
FOB | 1700
MAIL | 1730
RAIL | 1696
REG AIR | 1676
SHIP | 1684
TRUCK | 1757
(7 rows)
EXPLAIN (COSTS false, VERBOSE true)
SELECT
l_shipmode, count(distinct l_partkey)
FROM lineitem_hash
GROUP BY l_shipmode
HAVING count(distinct l_suppkey) > 1550
ORDER BY 1, 2 DESC;
QUERY PLAN
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Sort
Output: remote_scan.l_shipmode, count(DISTINCT (count(DISTINCT remote_scan.count)))
Sort Key: remote_scan.l_shipmode, count(DISTINCT (count(DISTINCT remote_scan.count))) DESC
-> GroupAggregate
Output: remote_scan.l_shipmode, count(DISTINCT remote_scan.count)
Group Key: remote_scan.l_shipmode
Filter: (count(DISTINCT remote_scan.worker_column_3) > 1550)
-> Sort
Output: remote_scan.l_shipmode, remote_scan.count, remote_scan.worker_column_3
Sort Key: remote_scan.l_shipmode
-> Custom Scan (Citus Task-Tracker)
Output: remote_scan.l_shipmode, remote_scan.count, remote_scan.worker_column_3
Task Count: 8
Tasks Shown: One of 8
-> Task
Node: host=localhost port=57637 dbname=regression
-> HashAggregate
Output: l_shipmode, l_partkey, l_suppkey
Group Key: lineitem_hash.l_shipmode, lineitem_hash.l_partkey, lineitem_hash.l_suppkey
-> Seq Scan on public.lineitem_hash_240000 lineitem_hash
Output: l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment
(21 rows)
-- count distinct is supported on single table subqueries
SELECT *
FROM (
@ -121,27 +443,152 @@ SELECT *
1927 | 3
(10 rows)
-- count distinct with filters
SELECT
l_orderkey, count(DISTINCT l_partkey) FILTER (WHERE l_shipmode = 'AIR')
FROM lineitem_hash
GROUP BY l_orderkey
EXPLAIN (COSTS false, VERBOSE true)
SELECT *
FROM (
SELECT
l_partkey, count(DISTINCT l_orderkey)
FROM lineitem_hash
GROUP BY l_partkey) sub
ORDER BY 2 DESC, 1 DESC
LIMIT 10;
l_orderkey | count
------------+-------
12005 | 4
5409 | 4
4964 | 4
14848 | 3
14496 | 3
13473 | 3
13122 | 3
12929 | 3
12645 | 3
12417 | 3
QUERY PLAN
-------------------------------------------------------------------------
Limit
Output: remote_scan.l_partkey, remote_scan.count
-> Sort
Output: remote_scan.l_partkey, remote_scan.count
Sort Key: remote_scan.count DESC, remote_scan.l_partkey DESC
-> Custom Scan (Citus Task-Tracker)
Output: remote_scan.l_partkey, remote_scan.count
Task Count: 4
Tasks Shown: None, not supported for re-partition queries
-> MapMergeJob
Map Task Count: 8
Merge Task Count: 4
(12 rows)
-- count distinct with filters
SELECT
l_orderkey,
count(DISTINCT l_suppkey) FILTER (WHERE l_shipmode = 'AIR'),
count(DISTINCT l_suppkey)
FROM lineitem_hash
GROUP BY l_orderkey
ORDER BY 2 DESC, 3 DESC, 1
LIMIT 10;
l_orderkey | count | count
------------+-------+-------
4964 | 4 | 7
12005 | 4 | 7
5409 | 4 | 6
164 | 3 | 7
322 | 3 | 7
871 | 3 | 7
1156 | 3 | 7
1574 | 3 | 7
2054 | 3 | 7
2309 | 3 | 7
(10 rows)
EXPLAIN (COSTS false, VERBOSE true)
SELECT
l_orderkey,
count(DISTINCT l_suppkey) FILTER (WHERE l_shipmode = 'AIR'),
count(DISTINCT l_suppkey)
FROM lineitem_hash
GROUP BY l_orderkey
ORDER BY 2 DESC, 3 DESC, 1
LIMIT 10;
QUERY PLAN
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Limit
Output: remote_scan.l_orderkey, COALESCE((pg_catalog.sum((COALESCE((pg_catalog.sum((COALESCE((pg_catalog.sum(remote_scan.count))::bigint, '0'::bigint))))::bigint, '0'::bigint))))::bigint, '0'::bigint), COALESCE((pg_catalog.sum((COALESCE((pg_catalog.sum((COALESCE((pg_catalog.sum(remote_scan.count_1))::bigint, '0'::bigint))))::bigint, '0'::bigint))))::bigint, '0'::bigint)
-> Sort
Output: remote_scan.l_orderkey, COALESCE((pg_catalog.sum((COALESCE((pg_catalog.sum(remote_scan.count))::bigint, '0'::bigint))))::bigint, '0'::bigint), COALESCE((pg_catalog.sum((COALESCE((pg_catalog.sum(remote_scan.count_1))::bigint, '0'::bigint))))::bigint, '0'::bigint)
Sort Key: COALESCE((pg_catalog.sum((COALESCE((pg_catalog.sum(remote_scan.count))::bigint, '0'::bigint))))::bigint, '0'::bigint) DESC, COALESCE((pg_catalog.sum((COALESCE((pg_catalog.sum(remote_scan.count_1))::bigint, '0'::bigint))))::bigint, '0'::bigint) DESC, remote_scan.l_orderkey
-> HashAggregate
Output: remote_scan.l_orderkey, COALESCE((pg_catalog.sum(remote_scan.count))::bigint, '0'::bigint), COALESCE((pg_catalog.sum(remote_scan.count_1))::bigint, '0'::bigint)
Group Key: remote_scan.l_orderkey
-> Custom Scan (Citus Task-Tracker)
Output: remote_scan.l_orderkey, remote_scan.count, remote_scan.count_1
Task Count: 8
Tasks Shown: One of 8
-> Task
Node: host=localhost port=57637 dbname=regression
-> Limit
Output: l_orderkey, (count(DISTINCT l_suppkey) FILTER (WHERE (l_shipmode = 'AIR'::bpchar))), (count(DISTINCT l_suppkey))
-> Sort
Output: l_orderkey, (count(DISTINCT l_suppkey) FILTER (WHERE (l_shipmode = 'AIR'::bpchar))), (count(DISTINCT l_suppkey))
Sort Key: (count(DISTINCT lineitem_hash.l_suppkey) FILTER (WHERE (lineitem_hash.l_shipmode = 'AIR'::bpchar))) DESC, (count(DISTINCT lineitem_hash.l_suppkey)) DESC, lineitem_hash.l_orderkey
-> GroupAggregate
Output: l_orderkey, count(DISTINCT l_suppkey) FILTER (WHERE (l_shipmode = 'AIR'::bpchar)), count(DISTINCT l_suppkey)
Group Key: lineitem_hash.l_orderkey
-> Sort
Output: l_orderkey, l_suppkey, l_shipmode
Sort Key: lineitem_hash.l_orderkey
-> Seq Scan on public.lineitem_hash_240000 lineitem_hash
Output: l_orderkey, l_suppkey, l_shipmode
(27 rows)
-- group by on non-partition column
SELECT
l_suppkey, count(DISTINCT l_partkey) FILTER (WHERE l_shipmode = 'AIR')
FROM lineitem_hash
GROUP BY l_suppkey
ORDER BY 2 DESC, 1 DESC
LIMIT 10;
l_suppkey | count
-----------+-------
7680 | 4
7703 | 3
7542 | 3
7072 | 3
6335 | 3
5873 | 3
1318 | 3
1042 | 3
160 | 3
9872 | 2
(10 rows)
-- explaining the same query fails
EXPLAIN (COSTS false, VERBOSE true)
SELECT
l_suppkey, count(DISTINCT l_partkey) FILTER (WHERE l_shipmode = 'AIR')
FROM lineitem_hash
GROUP BY l_suppkey
ORDER BY 2 DESC, 1 DESC
LIMIT 10;
ERROR: bogus varattno for OUTER_VAR var: 3
-- without group by, on partition column
SELECT
count(DISTINCT l_orderkey) FILTER (WHERE l_shipmode = 'AIR')
FROM lineitem_hash;
count
-------
1327
(1 row)
-- without group by, on non-partition column
SELECT
count(DISTINCT l_partkey) FILTER (WHERE l_shipmode = 'AIR')
FROM lineitem_hash;
count
-------
1702
(1 row)
SELECT
count(DISTINCT l_partkey) FILTER (WHERE l_shipmode = 'AIR'),
count(DISTINCT l_partkey),
count(DISTINCT l_shipdate)
FROM lineitem_hash;
count | count | count
-------+-------+-------
1702 | 11661 | 2470
(1 row)
-- filter column already exists in target list
SELECT *
FROM (

View File

@ -45,9 +45,12 @@ SELECT count(*) FROM orders_hash_part join air_shipped_lineitems ON (o_orderkey
-- join between views
SELECT count(*) FROM priority_orders join air_shipped_lineitems ON (o_orderkey = l_orderkey);
-- count distinct on partition column is not supported
-- count distinct on partition column is supported
SELECT count(distinct o_orderkey) FROM priority_orders join air_shipped_lineitems ON (o_orderkey = l_orderkey);
-- count distinct on non-partition column is supported
SELECT count(distinct o_orderpriority) FROM priority_orders join air_shipped_lineitems ON (o_orderkey = l_orderkey);
-- count distinct on partition column is supported on router queries
SELECT count(distinct o_orderkey) FROM priority_orders join air_shipped_lineitems
ON (o_orderkey = l_orderkey)
@ -71,7 +74,6 @@ SELECT count(*) FROM priority_orders right join lineitem_hash_part ON (o_orderke
-- but view at the outer side is. This is essentially the same as a left join with arguments reversed.
SELECT count(*) FROM lineitem_hash_part right join priority_orders ON (o_orderkey = l_orderkey) WHERE l_shipmode ='AIR';
-- left join on router query is supported
SELECT o_orderkey, l_linenumber FROM priority_orders left join air_shipped_lineitems ON (o_orderkey = l_orderkey)
WHERE o_orderkey = 2;