Expand count distinct support

We can now support more complex count distinct operations by
pulling necessary columns to coordinator and evalutating the
aggreage at coordinator.

It supports broad range of expression with the restriction that
the expression must contain a column.
pull/2255/head
Murat Tuncer 2018-07-05 16:38:43 +03:00
parent 80132b5481
commit f20258ef10
7 changed files with 265 additions and 1075 deletions

View File

@ -15,6 +15,7 @@
#include "distributed/multi_logical_optimizer.h"
#include "distributed/pg_dist_partition.h"
#include "optimizer/var.h"
#include "nodes/nodeFuncs.h"
#include "nodes/pg_list.h"
@ -146,6 +147,10 @@ ExtendedOpNodeContainsRepartitionSubquery(MultiExtendedOp *originalOpNode)
* HasNonPartitionColumnDistinctAgg returns true if target entry or having qualifier
* has non-partition column reference in aggregate (distinct) definition. Note that,
* it only checks aggs subfield of Aggref, it does not check FILTER or SORT clauses.
* Having any non-column reference like operator expression, function call, or const
* is considered as a non-partition column. Even if the expression contains partition column
* like (column + 1), it needs to be evaluated at coordinator, since we can't reliably verify
* the distinctness of the expression result like (column % 5) or (column + column).
*/
static bool
HasNonPartitionColumnDistinctAgg(List *targetEntryList, Node *havingQual,
@ -167,6 +172,8 @@ HasNonPartitionColumnDistinctAgg(List *targetEntryList, Node *havingQual,
List *varList = NIL;
ListCell *varCell = NULL;
bool isPartitionColumn = false;
TargetEntry *firstTargetEntry = NULL;
Node *firstTargetExprNode = NULL;
if (IsA(targetNode, Var))
{
@ -180,6 +187,22 @@ HasNonPartitionColumnDistinctAgg(List *targetEntryList, Node *havingQual,
continue;
}
/*
* We are dealing with a more complex count distinct, it needs to be
* evaluated at coordinator level.
*/
if (list_length(targetAgg->args) > 1 || list_length(targetAgg->aggdistinct) > 1)
{
return true;
}
firstTargetEntry = linitial_node(TargetEntry, targetAgg->args);
firstTargetExprNode = strip_implicit_coercions((Node *) firstTargetEntry->expr);
if (!IsA(firstTargetExprNode, Var))
{
return true;
}
varList = pull_var_clause_default((Node *) targetAgg->args);
foreach(varCell, varList)
{

View File

@ -274,7 +274,8 @@ static void ErrorIfUnsupportedAggregateDistinct(Aggref *aggregateExpression,
static Var * AggregateDistinctColumn(Aggref *aggregateExpression);
static bool TablePartitioningSupportsDistinct(List *tableNodeList,
MultiExtendedOp *opNode,
Var *distinctColumn);
Var *distinctColumn,
AggregateType aggregateType);
/* Local functions forward declarations for limit clauses */
static Node * WorkerLimitCount(Node *limitCount, Node *limitOffset, OrderByLimitReference
@ -3388,7 +3389,8 @@ ErrorIfUnsupportedAggregateDistinct(Aggref *aggregateExpression,
*/
distinctSupported = TablePartitioningSupportsDistinct(tableNodeList,
extendedOpNode,
distinctColumn);
distinctColumn,
aggregateType);
if (!distinctSupported)
{
errorDetail = "aggregate (distinct) on complex expressions is"
@ -3399,7 +3401,8 @@ ErrorIfUnsupportedAggregateDistinct(Aggref *aggregateExpression,
{
bool supports = TablePartitioningSupportsDistinct(tableNodeList,
extendedOpNode,
distinctColumn);
distinctColumn,
aggregateType);
if (!supports)
{
distinctSupported = false;
@ -3475,7 +3478,7 @@ AggregateDistinctColumn(Aggref *aggregateExpression)
*/
static bool
TablePartitioningSupportsDistinct(List *tableNodeList, MultiExtendedOp *opNode,
Var *distinctColumn)
Var *distinctColumn, AggregateType aggregateType)
{
bool distinctSupported = true;
ListCell *tableNodeCell = NULL;
@ -3513,6 +3516,11 @@ TablePartitioningSupportsDistinct(List *tableNodeList, MultiExtendedOp *opNode,
Var *tablePartitionColumn = tableNode->partitionColumn;
bool groupedByPartitionColumn = false;
if (aggregateType == AGGREGATE_COUNT)
{
tableDistinctSupported = true;
}
/* if distinct is on table partition column, we can push it down */
if (distinctColumn != NULL &&
tablePartitionColumn->varno == distinctColumn->varno &&

File diff suppressed because it is too large Load Diff

View File

@ -53,12 +53,15 @@ SELECT p_partkey, count(distinct l_orderkey) FROM lineitem_range, part
GROUP BY p_partkey
ORDER BY p_partkey LIMIT 10;
-- Check that we support count(distinct) on non-partition column.
SELECT count(distinct l_partkey) FROM lineitem_range;
-- Check that we don't support complex expressions.
-- Check that we support more complex expressions.
SELECT count(distinct (l_orderkey)) FROM lineitem_range;
SELECT count(distinct (l_orderkey + 1)) FROM lineitem_range;
SELECT count(distinct (l_orderkey % 5)) FROM lineitem_range;
-- count(distinct) on non-partition column is allowed
SELECT count(distinct l_partkey) FROM lineitem_range;
SELECT count(distinct (l_partkey + 1)) FROM lineitem_range;
SELECT count(distinct (l_partkey % 5)) FROM lineitem_range;
-- Now test append partitioned tables. First run count(distinct) on a single
-- sharded table.
@ -98,11 +101,16 @@ SELECT create_distributed_table('lineitem_hash', 'l_orderkey', 'hash');
SELECT count(distinct l_orderkey) FROM lineitem_hash;
SELECT avg(distinct l_orderkey) FROM lineitem_hash;
-- Check that we support more complex expressions.
SELECT count(distinct (l_orderkey)) FROM lineitem_hash;
SELECT count(distinct (l_orderkey + 1)) FROM lineitem_hash;
SELECT count(distinct (l_orderkey % 5)) FROM lineitem_hash;
-- count(distinct) on non-partition column is allowed
SELECT count(distinct l_partkey) FROM lineitem_hash;
SELECT count(distinct (l_partkey + 1)) FROM lineitem_hash;
SELECT count(distinct (l_partkey % 5)) FROM lineitem_hash;
-- count(distinct) on column expression is not allowed
SELECT count(distinct (l_orderkey + 1)) FROM lineitem_hash;
-- agg(distinct) is allowed if we group by partition column
SELECT l_orderkey, count(distinct l_partkey) INTO hash_results FROM lineitem_hash GROUP BY l_orderkey;

View File

@ -384,7 +384,7 @@ SELECT *
2 DESC, 1 DESC
LIMIT 10;
-- count DISTINCT is part of an expression which inclues another aggregate
-- count DISTINCT is part of an expression which includes another aggregate
SELECT *
FROM (
SELECT
@ -403,7 +403,7 @@ SELECT *
1 DESC, 2 DESC
LIMIT 10;
--- count DISTINCT CASE WHEN expression
-- count DISTINCT CASE WHEN expression
SELECT *
FROM (
SELECT
@ -532,5 +532,60 @@ SELECT *
ORDER BY 2 DESC, 1 DESC
LIMIT 10;
-- count distinct pushdown is enabled
SELECT *
FROM (
SELECT
l_shipdate,
count(DISTINCT
CASE
WHEN l_shipmode = 'TRUCK' THEN l_partkey
ELSE NULL
END) as distinct_part,
extract(year from l_shipdate) as year
FROM
lineitem_hash
GROUP BY l_shipdate, year) sub
WHERE year = 1995
ORDER BY 2 DESC, 1
LIMIT 10;
RESET citus.task_executor_type;
-- count distinct pushdown is enabled
SELECT *
FROM (
SELECT
l_shipdate,
count(DISTINCT
CASE
WHEN l_shipmode = 'TRUCK' THEN l_partkey
ELSE NULL
END) as distinct_part,
extract(year from l_shipdate) as year
FROM
lineitem_hash
GROUP BY l_shipdate, year) sub
WHERE year = 1995
ORDER BY 2 DESC, 1
LIMIT 10;
SELECT *
FROM (
SELECT
l_shipdate,
count(DISTINCT
CASE
WHEN l_shipmode = 'TRUCK' THEN l_partkey
ELSE NULL
END) as distinct_part,
extract(year from l_shipdate) as year
FROM
lineitem_hash
GROUP BY l_shipdate) sub
WHERE year = 1995
ORDER BY 2 DESC, 1
LIMIT 10;
DROP TABLE lineitem_hash;

View File

@ -71,18 +71,44 @@ SELECT p_partkey, count(distinct l_orderkey) FROM lineitem_range, part
222 | 1
(10 rows)
-- Check that we support count(distinct) on non-partition column.
-- Check that we support more complex expressions.
SELECT count(distinct (l_orderkey)) FROM lineitem_range;
count
-------
2985
(1 row)
SELECT count(distinct (l_orderkey + 1)) FROM lineitem_range;
count
-------
2985
(1 row)
SELECT count(distinct (l_orderkey % 5)) FROM lineitem_range;
count
-------
5
(1 row)
-- count(distinct) on non-partition column is allowed
SELECT count(distinct l_partkey) FROM lineitem_range;
count
-------
11661
(1 row)
-- Check that we don't support complex expressions.
SELECT count(distinct (l_orderkey + 1)) FROM lineitem_range;
ERROR: cannot compute aggregate (distinct)
DETAIL: aggregate (distinct) on complex expressions is unsupported
HINT: You can load the hll extension from contrib packages and enable distinct approximations.
SELECT count(distinct (l_partkey + 1)) FROM lineitem_range;
count
-------
11661
(1 row)
SELECT count(distinct (l_partkey % 5)) FROM lineitem_range;
count
-------
5
(1 row)
-- Now test append partitioned tables. First run count(distinct) on a single
-- sharded table.
SELECT count(distinct p_mfgr) FROM part;
@ -149,6 +175,25 @@ SELECT avg(distinct l_orderkey) FROM lineitem_hash;
7463.9474036850921273
(1 row)
-- Check that we support more complex expressions.
SELECT count(distinct (l_orderkey)) FROM lineitem_hash;
count
-------
2985
(1 row)
SELECT count(distinct (l_orderkey + 1)) FROM lineitem_hash;
count
-------
2985
(1 row)
SELECT count(distinct (l_orderkey % 5)) FROM lineitem_hash;
count
-------
5
(1 row)
-- count(distinct) on non-partition column is allowed
SELECT count(distinct l_partkey) FROM lineitem_hash;
count
@ -156,11 +201,18 @@ SELECT count(distinct l_partkey) FROM lineitem_hash;
11661
(1 row)
-- count(distinct) on column expression is not allowed
SELECT count(distinct (l_orderkey + 1)) FROM lineitem_hash;
ERROR: cannot compute aggregate (distinct)
DETAIL: aggregate (distinct) on complex expressions is unsupported
HINT: You can load the hll extension from contrib packages and enable distinct approximations.
SELECT count(distinct (l_partkey + 1)) FROM lineitem_hash;
count
-------
11661
(1 row)
SELECT count(distinct (l_partkey % 5)) FROM lineitem_hash;
count
-------
5
(1 row)
-- agg(distinct) is allowed if we group by partition column
SELECT l_orderkey, count(distinct l_partkey) INTO hash_results FROM lineitem_hash GROUP BY l_orderkey;
SELECT l_orderkey, count(distinct l_partkey) INTO range_results FROM lineitem_range GROUP BY l_orderkey;

View File

@ -813,7 +813,7 @@ SELECT *
12.00 | 13
(10 rows)
-- count DISTINCT is part of an expression which inclues another aggregate
-- count DISTINCT is part of an expression which includes another aggregate
SELECT *
FROM (
SELECT
@ -842,7 +842,7 @@ SELECT *
40.50298377916903813318 | TRUCK
(7 rows)
--- count DISTINCT CASE WHEN expression
-- count DISTINCT CASE WHEN expression
SELECT *
FROM (
SELECT
@ -1034,4 +1034,97 @@ SELECT *
LIMIT 10;
ERROR: cannot compute count (distinct)
DETAIL: Non-column references are not supported yet
-- count distinct pushdown is enabled
SELECT *
FROM (
SELECT
l_shipdate,
count(DISTINCT
CASE
WHEN l_shipmode = 'TRUCK' THEN l_partkey
ELSE NULL
END) as distinct_part,
extract(year from l_shipdate) as year
FROM
lineitem_hash
GROUP BY l_shipdate, year) sub
WHERE year = 1995
ORDER BY 2 DESC, 1
LIMIT 10;
l_shipdate | distinct_part | year
------------+---------------+------
11-29-1995 | 5 | 1995
03-24-1995 | 4 | 1995
09-18-1995 | 4 | 1995
01-17-1995 | 3 | 1995
04-02-1995 | 3 | 1995
05-23-1995 | 3 | 1995
08-11-1995 | 3 | 1995
09-27-1995 | 3 | 1995
10-27-1995 | 3 | 1995
10-30-1995 | 3 | 1995
(10 rows)
RESET citus.task_executor_type;
-- count distinct pushdown is enabled
SELECT *
FROM (
SELECT
l_shipdate,
count(DISTINCT
CASE
WHEN l_shipmode = 'TRUCK' THEN l_partkey
ELSE NULL
END) as distinct_part,
extract(year from l_shipdate) as year
FROM
lineitem_hash
GROUP BY l_shipdate, year) sub
WHERE year = 1995
ORDER BY 2 DESC, 1
LIMIT 10;
l_shipdate | distinct_part | year
------------+---------------+------
11-29-1995 | 5 | 1995
03-24-1995 | 4 | 1995
09-18-1995 | 4 | 1995
01-17-1995 | 3 | 1995
04-02-1995 | 3 | 1995
05-23-1995 | 3 | 1995
08-11-1995 | 3 | 1995
09-27-1995 | 3 | 1995
10-27-1995 | 3 | 1995
10-30-1995 | 3 | 1995
(10 rows)
SELECT *
FROM (
SELECT
l_shipdate,
count(DISTINCT
CASE
WHEN l_shipmode = 'TRUCK' THEN l_partkey
ELSE NULL
END) as distinct_part,
extract(year from l_shipdate) as year
FROM
lineitem_hash
GROUP BY l_shipdate) sub
WHERE year = 1995
ORDER BY 2 DESC, 1
LIMIT 10;
l_shipdate | distinct_part | year
------------+---------------+------
11-29-1995 | 5 | 1995
03-24-1995 | 4 | 1995
09-18-1995 | 4 | 1995
01-17-1995 | 3 | 1995
04-02-1995 | 3 | 1995
05-23-1995 | 3 | 1995
08-11-1995 | 3 | 1995
09-27-1995 | 3 | 1995
10-27-1995 | 3 | 1995
10-30-1995 | 3 | 1995
(10 rows)
DROP TABLE lineitem_hash;