Merge pull request #2255 from citusdata/relax_count_distinct

Expand count distinct support
pull/2260/head
Murat Tuncer 2018-07-06 11:44:38 +03:00 committed by GitHub
commit cdbe752e2d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 265 additions and 1075 deletions

View File

@ -15,6 +15,7 @@
#include "distributed/multi_logical_optimizer.h" #include "distributed/multi_logical_optimizer.h"
#include "distributed/pg_dist_partition.h" #include "distributed/pg_dist_partition.h"
#include "optimizer/var.h" #include "optimizer/var.h"
#include "nodes/nodeFuncs.h"
#include "nodes/pg_list.h" #include "nodes/pg_list.h"
@ -146,6 +147,10 @@ ExtendedOpNodeContainsRepartitionSubquery(MultiExtendedOp *originalOpNode)
* HasNonPartitionColumnDistinctAgg returns true if target entry or having qualifier * HasNonPartitionColumnDistinctAgg returns true if target entry or having qualifier
* has non-partition column reference in aggregate (distinct) definition. Note that, * has non-partition column reference in aggregate (distinct) definition. Note that,
* it only checks aggs subfield of Aggref, it does not check FILTER or SORT clauses. * it only checks aggs subfield of Aggref, it does not check FILTER or SORT clauses.
* Having any non-column reference like operator expression, function call, or const
* is considered as a non-partition column. Even if the expression contains partition column
* like (column + 1), it needs to be evaluated at coordinator, since we can't reliably verify
* the distinctness of the expression result like (column % 5) or (column + column).
*/ */
static bool static bool
HasNonPartitionColumnDistinctAgg(List *targetEntryList, Node *havingQual, HasNonPartitionColumnDistinctAgg(List *targetEntryList, Node *havingQual,
@ -167,6 +172,8 @@ HasNonPartitionColumnDistinctAgg(List *targetEntryList, Node *havingQual,
List *varList = NIL; List *varList = NIL;
ListCell *varCell = NULL; ListCell *varCell = NULL;
bool isPartitionColumn = false; bool isPartitionColumn = false;
TargetEntry *firstTargetEntry = NULL;
Node *firstTargetExprNode = NULL;
if (IsA(targetNode, Var)) if (IsA(targetNode, Var))
{ {
@ -180,6 +187,22 @@ HasNonPartitionColumnDistinctAgg(List *targetEntryList, Node *havingQual,
continue; continue;
} }
/*
* We are dealing with a more complex count distinct, it needs to be
* evaluated at coordinator level.
*/
if (list_length(targetAgg->args) > 1 || list_length(targetAgg->aggdistinct) > 1)
{
return true;
}
firstTargetEntry = linitial_node(TargetEntry, targetAgg->args);
firstTargetExprNode = strip_implicit_coercions((Node *) firstTargetEntry->expr);
if (!IsA(firstTargetExprNode, Var))
{
return true;
}
varList = pull_var_clause_default((Node *) targetAgg->args); varList = pull_var_clause_default((Node *) targetAgg->args);
foreach(varCell, varList) foreach(varCell, varList)
{ {

View File

@ -274,7 +274,8 @@ static void ErrorIfUnsupportedAggregateDistinct(Aggref *aggregateExpression,
static Var * AggregateDistinctColumn(Aggref *aggregateExpression); static Var * AggregateDistinctColumn(Aggref *aggregateExpression);
static bool TablePartitioningSupportsDistinct(List *tableNodeList, static bool TablePartitioningSupportsDistinct(List *tableNodeList,
MultiExtendedOp *opNode, MultiExtendedOp *opNode,
Var *distinctColumn); Var *distinctColumn,
AggregateType aggregateType);
/* Local functions forward declarations for limit clauses */ /* Local functions forward declarations for limit clauses */
static Node * WorkerLimitCount(Node *limitCount, Node *limitOffset, OrderByLimitReference static Node * WorkerLimitCount(Node *limitCount, Node *limitOffset, OrderByLimitReference
@ -3388,7 +3389,8 @@ ErrorIfUnsupportedAggregateDistinct(Aggref *aggregateExpression,
*/ */
distinctSupported = TablePartitioningSupportsDistinct(tableNodeList, distinctSupported = TablePartitioningSupportsDistinct(tableNodeList,
extendedOpNode, extendedOpNode,
distinctColumn); distinctColumn,
aggregateType);
if (!distinctSupported) if (!distinctSupported)
{ {
errorDetail = "aggregate (distinct) on complex expressions is" errorDetail = "aggregate (distinct) on complex expressions is"
@ -3399,7 +3401,8 @@ ErrorIfUnsupportedAggregateDistinct(Aggref *aggregateExpression,
{ {
bool supports = TablePartitioningSupportsDistinct(tableNodeList, bool supports = TablePartitioningSupportsDistinct(tableNodeList,
extendedOpNode, extendedOpNode,
distinctColumn); distinctColumn,
aggregateType);
if (!supports) if (!supports)
{ {
distinctSupported = false; distinctSupported = false;
@ -3475,7 +3478,7 @@ AggregateDistinctColumn(Aggref *aggregateExpression)
*/ */
static bool static bool
TablePartitioningSupportsDistinct(List *tableNodeList, MultiExtendedOp *opNode, TablePartitioningSupportsDistinct(List *tableNodeList, MultiExtendedOp *opNode,
Var *distinctColumn) Var *distinctColumn, AggregateType aggregateType)
{ {
bool distinctSupported = true; bool distinctSupported = true;
ListCell *tableNodeCell = NULL; ListCell *tableNodeCell = NULL;
@ -3513,6 +3516,11 @@ TablePartitioningSupportsDistinct(List *tableNodeList, MultiExtendedOp *opNode,
Var *tablePartitionColumn = tableNode->partitionColumn; Var *tablePartitionColumn = tableNode->partitionColumn;
bool groupedByPartitionColumn = false; bool groupedByPartitionColumn = false;
if (aggregateType == AGGREGATE_COUNT)
{
tableDistinctSupported = true;
}
/* if distinct is on table partition column, we can push it down */ /* if distinct is on table partition column, we can push it down */
if (distinctColumn != NULL && if (distinctColumn != NULL &&
tablePartitionColumn->varno == distinctColumn->varno && tablePartitionColumn->varno == distinctColumn->varno &&

File diff suppressed because it is too large Load Diff

View File

@ -53,12 +53,15 @@ SELECT p_partkey, count(distinct l_orderkey) FROM lineitem_range, part
GROUP BY p_partkey GROUP BY p_partkey
ORDER BY p_partkey LIMIT 10; ORDER BY p_partkey LIMIT 10;
-- Check that we support more complex expressions.
-- Check that we support count(distinct) on non-partition column. SELECT count(distinct (l_orderkey)) FROM lineitem_range;
SELECT count(distinct l_partkey) FROM lineitem_range;
-- Check that we don't support complex expressions.
SELECT count(distinct (l_orderkey + 1)) FROM lineitem_range; SELECT count(distinct (l_orderkey + 1)) FROM lineitem_range;
SELECT count(distinct (l_orderkey % 5)) FROM lineitem_range;
-- count(distinct) on non-partition column is allowed
SELECT count(distinct l_partkey) FROM lineitem_range;
SELECT count(distinct (l_partkey + 1)) FROM lineitem_range;
SELECT count(distinct (l_partkey % 5)) FROM lineitem_range;
-- Now test append partitioned tables. First run count(distinct) on a single -- Now test append partitioned tables. First run count(distinct) on a single
-- sharded table. -- sharded table.
@ -98,11 +101,16 @@ SELECT create_distributed_table('lineitem_hash', 'l_orderkey', 'hash');
SELECT count(distinct l_orderkey) FROM lineitem_hash; SELECT count(distinct l_orderkey) FROM lineitem_hash;
SELECT avg(distinct l_orderkey) FROM lineitem_hash; SELECT avg(distinct l_orderkey) FROM lineitem_hash;
-- Check that we support more complex expressions.
SELECT count(distinct (l_orderkey)) FROM lineitem_hash;
SELECT count(distinct (l_orderkey + 1)) FROM lineitem_hash;
SELECT count(distinct (l_orderkey % 5)) FROM lineitem_hash;
-- count(distinct) on non-partition column is allowed -- count(distinct) on non-partition column is allowed
SELECT count(distinct l_partkey) FROM lineitem_hash; SELECT count(distinct l_partkey) FROM lineitem_hash;
SELECT count(distinct (l_partkey + 1)) FROM lineitem_hash;
SELECT count(distinct (l_partkey % 5)) FROM lineitem_hash;
-- count(distinct) on column expression is not allowed
SELECT count(distinct (l_orderkey + 1)) FROM lineitem_hash;
-- agg(distinct) is allowed if we group by partition column -- agg(distinct) is allowed if we group by partition column
SELECT l_orderkey, count(distinct l_partkey) INTO hash_results FROM lineitem_hash GROUP BY l_orderkey; SELECT l_orderkey, count(distinct l_partkey) INTO hash_results FROM lineitem_hash GROUP BY l_orderkey;

View File

@ -384,7 +384,7 @@ SELECT *
2 DESC, 1 DESC 2 DESC, 1 DESC
LIMIT 10; LIMIT 10;
-- count DISTINCT is part of an expression which inclues another aggregate -- count DISTINCT is part of an expression which includes another aggregate
SELECT * SELECT *
FROM ( FROM (
SELECT SELECT
@ -403,7 +403,7 @@ SELECT *
1 DESC, 2 DESC 1 DESC, 2 DESC
LIMIT 10; LIMIT 10;
--- count DISTINCT CASE WHEN expression -- count DISTINCT CASE WHEN expression
SELECT * SELECT *
FROM ( FROM (
SELECT SELECT
@ -532,5 +532,60 @@ SELECT *
ORDER BY 2 DESC, 1 DESC ORDER BY 2 DESC, 1 DESC
LIMIT 10; LIMIT 10;
-- count distinct pushdown is enabled
SELECT *
FROM (
SELECT
l_shipdate,
count(DISTINCT
CASE
WHEN l_shipmode = 'TRUCK' THEN l_partkey
ELSE NULL
END) as distinct_part,
extract(year from l_shipdate) as year
FROM
lineitem_hash
GROUP BY l_shipdate, year) sub
WHERE year = 1995
ORDER BY 2 DESC, 1
LIMIT 10;
RESET citus.task_executor_type;
-- count distinct pushdown is enabled
SELECT *
FROM (
SELECT
l_shipdate,
count(DISTINCT
CASE
WHEN l_shipmode = 'TRUCK' THEN l_partkey
ELSE NULL
END) as distinct_part,
extract(year from l_shipdate) as year
FROM
lineitem_hash
GROUP BY l_shipdate, year) sub
WHERE year = 1995
ORDER BY 2 DESC, 1
LIMIT 10;
SELECT *
FROM (
SELECT
l_shipdate,
count(DISTINCT
CASE
WHEN l_shipmode = 'TRUCK' THEN l_partkey
ELSE NULL
END) as distinct_part,
extract(year from l_shipdate) as year
FROM
lineitem_hash
GROUP BY l_shipdate) sub
WHERE year = 1995
ORDER BY 2 DESC, 1
LIMIT 10;
DROP TABLE lineitem_hash; DROP TABLE lineitem_hash;

View File

@ -71,18 +71,44 @@ SELECT p_partkey, count(distinct l_orderkey) FROM lineitem_range, part
222 | 1 222 | 1
(10 rows) (10 rows)
-- Check that we support count(distinct) on non-partition column. -- Check that we support more complex expressions.
SELECT count(distinct (l_orderkey)) FROM lineitem_range;
count
-------
2985
(1 row)
SELECT count(distinct (l_orderkey + 1)) FROM lineitem_range;
count
-------
2985
(1 row)
SELECT count(distinct (l_orderkey % 5)) FROM lineitem_range;
count
-------
5
(1 row)
-- count(distinct) on non-partition column is allowed
SELECT count(distinct l_partkey) FROM lineitem_range; SELECT count(distinct l_partkey) FROM lineitem_range;
count count
------- -------
11661 11661
(1 row) (1 row)
-- Check that we don't support complex expressions. SELECT count(distinct (l_partkey + 1)) FROM lineitem_range;
SELECT count(distinct (l_orderkey + 1)) FROM lineitem_range; count
ERROR: cannot compute aggregate (distinct) -------
DETAIL: aggregate (distinct) on complex expressions is unsupported 11661
HINT: You can load the hll extension from contrib packages and enable distinct approximations. (1 row)
SELECT count(distinct (l_partkey % 5)) FROM lineitem_range;
count
-------
5
(1 row)
-- Now test append partitioned tables. First run count(distinct) on a single -- Now test append partitioned tables. First run count(distinct) on a single
-- sharded table. -- sharded table.
SELECT count(distinct p_mfgr) FROM part; SELECT count(distinct p_mfgr) FROM part;
@ -149,6 +175,25 @@ SELECT avg(distinct l_orderkey) FROM lineitem_hash;
7463.9474036850921273 7463.9474036850921273
(1 row) (1 row)
-- Check that we support more complex expressions.
SELECT count(distinct (l_orderkey)) FROM lineitem_hash;
count
-------
2985
(1 row)
SELECT count(distinct (l_orderkey + 1)) FROM lineitem_hash;
count
-------
2985
(1 row)
SELECT count(distinct (l_orderkey % 5)) FROM lineitem_hash;
count
-------
5
(1 row)
-- count(distinct) on non-partition column is allowed -- count(distinct) on non-partition column is allowed
SELECT count(distinct l_partkey) FROM lineitem_hash; SELECT count(distinct l_partkey) FROM lineitem_hash;
count count
@ -156,11 +201,18 @@ SELECT count(distinct l_partkey) FROM lineitem_hash;
11661 11661
(1 row) (1 row)
-- count(distinct) on column expression is not allowed SELECT count(distinct (l_partkey + 1)) FROM lineitem_hash;
SELECT count(distinct (l_orderkey + 1)) FROM lineitem_hash; count
ERROR: cannot compute aggregate (distinct) -------
DETAIL: aggregate (distinct) on complex expressions is unsupported 11661
HINT: You can load the hll extension from contrib packages and enable distinct approximations. (1 row)
SELECT count(distinct (l_partkey % 5)) FROM lineitem_hash;
count
-------
5
(1 row)
-- agg(distinct) is allowed if we group by partition column -- agg(distinct) is allowed if we group by partition column
SELECT l_orderkey, count(distinct l_partkey) INTO hash_results FROM lineitem_hash GROUP BY l_orderkey; SELECT l_orderkey, count(distinct l_partkey) INTO hash_results FROM lineitem_hash GROUP BY l_orderkey;
SELECT l_orderkey, count(distinct l_partkey) INTO range_results FROM lineitem_range GROUP BY l_orderkey; SELECT l_orderkey, count(distinct l_partkey) INTO range_results FROM lineitem_range GROUP BY l_orderkey;

View File

@ -813,7 +813,7 @@ SELECT *
12.00 | 13 12.00 | 13
(10 rows) (10 rows)
-- count DISTINCT is part of an expression which inclues another aggregate -- count DISTINCT is part of an expression which includes another aggregate
SELECT * SELECT *
FROM ( FROM (
SELECT SELECT
@ -842,7 +842,7 @@ SELECT *
40.50298377916903813318 | TRUCK 40.50298377916903813318 | TRUCK
(7 rows) (7 rows)
--- count DISTINCT CASE WHEN expression -- count DISTINCT CASE WHEN expression
SELECT * SELECT *
FROM ( FROM (
SELECT SELECT
@ -1034,4 +1034,97 @@ SELECT *
LIMIT 10; LIMIT 10;
ERROR: cannot compute count (distinct) ERROR: cannot compute count (distinct)
DETAIL: Non-column references are not supported yet DETAIL: Non-column references are not supported yet
-- count distinct pushdown is enabled
SELECT *
FROM (
SELECT
l_shipdate,
count(DISTINCT
CASE
WHEN l_shipmode = 'TRUCK' THEN l_partkey
ELSE NULL
END) as distinct_part,
extract(year from l_shipdate) as year
FROM
lineitem_hash
GROUP BY l_shipdate, year) sub
WHERE year = 1995
ORDER BY 2 DESC, 1
LIMIT 10;
l_shipdate | distinct_part | year
------------+---------------+------
11-29-1995 | 5 | 1995
03-24-1995 | 4 | 1995
09-18-1995 | 4 | 1995
01-17-1995 | 3 | 1995
04-02-1995 | 3 | 1995
05-23-1995 | 3 | 1995
08-11-1995 | 3 | 1995
09-27-1995 | 3 | 1995
10-27-1995 | 3 | 1995
10-30-1995 | 3 | 1995
(10 rows)
RESET citus.task_executor_type;
-- count distinct pushdown is enabled
SELECT *
FROM (
SELECT
l_shipdate,
count(DISTINCT
CASE
WHEN l_shipmode = 'TRUCK' THEN l_partkey
ELSE NULL
END) as distinct_part,
extract(year from l_shipdate) as year
FROM
lineitem_hash
GROUP BY l_shipdate, year) sub
WHERE year = 1995
ORDER BY 2 DESC, 1
LIMIT 10;
l_shipdate | distinct_part | year
------------+---------------+------
11-29-1995 | 5 | 1995
03-24-1995 | 4 | 1995
09-18-1995 | 4 | 1995
01-17-1995 | 3 | 1995
04-02-1995 | 3 | 1995
05-23-1995 | 3 | 1995
08-11-1995 | 3 | 1995
09-27-1995 | 3 | 1995
10-27-1995 | 3 | 1995
10-30-1995 | 3 | 1995
(10 rows)
SELECT *
FROM (
SELECT
l_shipdate,
count(DISTINCT
CASE
WHEN l_shipmode = 'TRUCK' THEN l_partkey
ELSE NULL
END) as distinct_part,
extract(year from l_shipdate) as year
FROM
lineitem_hash
GROUP BY l_shipdate) sub
WHERE year = 1995
ORDER BY 2 DESC, 1
LIMIT 10;
l_shipdate | distinct_part | year
------------+---------------+------
11-29-1995 | 5 | 1995
03-24-1995 | 4 | 1995
09-18-1995 | 4 | 1995
01-17-1995 | 3 | 1995
04-02-1995 | 3 | 1995
05-23-1995 | 3 | 1995
08-11-1995 | 3 | 1995
09-27-1995 | 3 | 1995
10-27-1995 | 3 | 1995
10-30-1995 | 3 | 1995
(10 rows)
DROP TABLE lineitem_hash; DROP TABLE lineitem_hash;