From f20258ef10a0dcfbfa6cbbd4d951b6a5e48d5da3 Mon Sep 17 00:00:00 2001 From: Murat Tuncer Date: Thu, 5 Jul 2018 16:38:43 +0300 Subject: [PATCH] Expand count distinct support We can now support more complex count distinct operations by pulling necessary columns to coordinator and evalutating the aggreage at coordinator. It supports broad range of expression with the restriction that the expression must contain a column. --- .../planner/extended_op_node_utils.c | 23 + .../planner/multi_logical_optimizer.c | 16 +- .../multi_complex_count_distinct_0.out | 1049 ----------------- .../regress/input/multi_agg_distinct.source | 22 +- .../input/multi_complex_count_distinct.source | 59 +- .../regress/output/multi_agg_distinct.source | 74 +- .../multi_complex_count_distinct.source | 97 +- 7 files changed, 265 insertions(+), 1075 deletions(-) delete mode 100644 src/test/regress/expected/multi_complex_count_distinct_0.out diff --git a/src/backend/distributed/planner/extended_op_node_utils.c b/src/backend/distributed/planner/extended_op_node_utils.c index 0a14b12bc..9ed16aea7 100644 --- a/src/backend/distributed/planner/extended_op_node_utils.c +++ b/src/backend/distributed/planner/extended_op_node_utils.c @@ -15,6 +15,7 @@ #include "distributed/multi_logical_optimizer.h" #include "distributed/pg_dist_partition.h" #include "optimizer/var.h" +#include "nodes/nodeFuncs.h" #include "nodes/pg_list.h" @@ -146,6 +147,10 @@ ExtendedOpNodeContainsRepartitionSubquery(MultiExtendedOp *originalOpNode) * HasNonPartitionColumnDistinctAgg returns true if target entry or having qualifier * has non-partition column reference in aggregate (distinct) definition. Note that, * it only checks aggs subfield of Aggref, it does not check FILTER or SORT clauses. + * Having any non-column reference like operator expression, function call, or const + * is considered as a non-partition column. Even if the expression contains partition column + * like (column + 1), it needs to be evaluated at coordinator, since we can't reliably verify + * the distinctness of the expression result like (column % 5) or (column + column). */ static bool HasNonPartitionColumnDistinctAgg(List *targetEntryList, Node *havingQual, @@ -167,6 +172,8 @@ HasNonPartitionColumnDistinctAgg(List *targetEntryList, Node *havingQual, List *varList = NIL; ListCell *varCell = NULL; bool isPartitionColumn = false; + TargetEntry *firstTargetEntry = NULL; + Node *firstTargetExprNode = NULL; if (IsA(targetNode, Var)) { @@ -180,6 +187,22 @@ HasNonPartitionColumnDistinctAgg(List *targetEntryList, Node *havingQual, continue; } + /* + * We are dealing with a more complex count distinct, it needs to be + * evaluated at coordinator level. + */ + if (list_length(targetAgg->args) > 1 || list_length(targetAgg->aggdistinct) > 1) + { + return true; + } + + firstTargetEntry = linitial_node(TargetEntry, targetAgg->args); + firstTargetExprNode = strip_implicit_coercions((Node *) firstTargetEntry->expr); + if (!IsA(firstTargetExprNode, Var)) + { + return true; + } + varList = pull_var_clause_default((Node *) targetAgg->args); foreach(varCell, varList) { diff --git a/src/backend/distributed/planner/multi_logical_optimizer.c b/src/backend/distributed/planner/multi_logical_optimizer.c index af14d87cc..d7c73827d 100644 --- a/src/backend/distributed/planner/multi_logical_optimizer.c +++ b/src/backend/distributed/planner/multi_logical_optimizer.c @@ -274,7 +274,8 @@ static void ErrorIfUnsupportedAggregateDistinct(Aggref *aggregateExpression, static Var * AggregateDistinctColumn(Aggref *aggregateExpression); static bool TablePartitioningSupportsDistinct(List *tableNodeList, MultiExtendedOp *opNode, - Var *distinctColumn); + Var *distinctColumn, + AggregateType aggregateType); /* Local functions forward declarations for limit clauses */ static Node * WorkerLimitCount(Node *limitCount, Node *limitOffset, OrderByLimitReference @@ -3388,7 +3389,8 @@ ErrorIfUnsupportedAggregateDistinct(Aggref *aggregateExpression, */ distinctSupported = TablePartitioningSupportsDistinct(tableNodeList, extendedOpNode, - distinctColumn); + distinctColumn, + aggregateType); if (!distinctSupported) { errorDetail = "aggregate (distinct) on complex expressions is" @@ -3399,7 +3401,8 @@ ErrorIfUnsupportedAggregateDistinct(Aggref *aggregateExpression, { bool supports = TablePartitioningSupportsDistinct(tableNodeList, extendedOpNode, - distinctColumn); + distinctColumn, + aggregateType); if (!supports) { distinctSupported = false; @@ -3475,7 +3478,7 @@ AggregateDistinctColumn(Aggref *aggregateExpression) */ static bool TablePartitioningSupportsDistinct(List *tableNodeList, MultiExtendedOp *opNode, - Var *distinctColumn) + Var *distinctColumn, AggregateType aggregateType) { bool distinctSupported = true; ListCell *tableNodeCell = NULL; @@ -3513,6 +3516,11 @@ TablePartitioningSupportsDistinct(List *tableNodeList, MultiExtendedOp *opNode, Var *tablePartitionColumn = tableNode->partitionColumn; bool groupedByPartitionColumn = false; + if (aggregateType == AGGREGATE_COUNT) + { + tableDistinctSupported = true; + } + /* if distinct is on table partition column, we can push it down */ if (distinctColumn != NULL && tablePartitionColumn->varno == distinctColumn->varno && diff --git a/src/test/regress/expected/multi_complex_count_distinct_0.out b/src/test/regress/expected/multi_complex_count_distinct_0.out deleted file mode 100644 index 1109fbeb4..000000000 --- a/src/test/regress/expected/multi_complex_count_distinct_0.out +++ /dev/null @@ -1,1049 +0,0 @@ --- --- COMPLEX_COUNT_DISTINCT --- --- print whether we're using version > 10 to make version-specific tests clear -SHOW server_version \gset -SELECT substring(:'server_version', '\d+')::int > 10 AS version_above_ten; - version_above_ten -------------------- - f -(1 row) - -SET citus.next_shard_id TO 240000; -CREATE TABLE lineitem_hash ( - l_orderkey bigint not null, - l_partkey integer not null, - l_suppkey integer not null, - l_linenumber integer not null, - l_quantity decimal(15, 2) not null, - l_extendedprice decimal(15, 2) not null, - l_discount decimal(15, 2) not null, - l_tax decimal(15, 2) not null, - l_returnflag char(1) not null, - l_linestatus char(1) not null, - l_shipdate date not null, - l_commitdate date not null, - l_receiptdate date not null, - l_shipinstruct char(25) not null, - l_shipmode char(10) not null, - l_comment varchar(44) not null, - PRIMARY KEY(l_orderkey, l_linenumber) ); - -SELECT master_create_distributed_table('lineitem_hash', 'l_orderkey', 'hash'); - master_create_distributed_table ---------------------------------- - -(1 row) - -SELECT master_create_worker_shards('lineitem_hash', 8, 1); - master_create_worker_shards ------------------------------ - -(1 row) - -\copy lineitem_hash FROM '/Users/mtuncer/dev/citus/features/remove_poll/src/test/regress/data/lineitem.1.data' with delimiter '|' -\copy lineitem_hash FROM '/Users/mtuncer/dev/citus/features/remove_poll/src/test/regress/data/lineitem.2.data' with delimiter '|' -ANALYZE lineitem_hash; -SET citus.task_executor_type to "task-tracker"; --- count(distinct) is supported on top level query if there --- is a grouping on the partition key -SELECT - l_orderkey, count(DISTINCT l_partkey) - FROM lineitem_hash - GROUP BY l_orderkey - ORDER BY 2 DESC, 1 DESC - LIMIT 10; - l_orderkey | count -------------+------- - 14885 | 7 - 14884 | 7 - 14821 | 7 - 14790 | 7 - 14785 | 7 - 14755 | 7 - 14725 | 7 - 14694 | 7 - 14627 | 7 - 14624 | 7 -(10 rows) - -EXPLAIN (COSTS false, VERBOSE true) -SELECT - l_orderkey, count(DISTINCT l_partkey) - FROM lineitem_hash - GROUP BY l_orderkey - ORDER BY 2 DESC, 1 DESC - LIMIT 10; - QUERY PLAN ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- - Limit - Output: remote_scan.l_orderkey, COALESCE((pg_catalog.sum((COALESCE((pg_catalog.sum((COALESCE((pg_catalog.sum(remote_scan.count))::bigint, '0'::bigint))))::bigint, '0'::bigint))))::bigint, '0'::bigint) - -> Sort - Output: remote_scan.l_orderkey, COALESCE((pg_catalog.sum((COALESCE((pg_catalog.sum(remote_scan.count))::bigint, '0'::bigint))))::bigint, '0'::bigint) - Sort Key: COALESCE((pg_catalog.sum((COALESCE((pg_catalog.sum(remote_scan.count))::bigint, '0'::bigint))))::bigint, '0'::bigint) DESC, remote_scan.l_orderkey DESC - -> HashAggregate - Output: remote_scan.l_orderkey, COALESCE((pg_catalog.sum(remote_scan.count))::bigint, '0'::bigint) - Group Key: remote_scan.l_orderkey - -> Custom Scan (Citus Task-Tracker) - Output: remote_scan.l_orderkey, remote_scan.count - Task Count: 8 - Tasks Shown: One of 8 - -> Task - Node: host=localhost port=57637 dbname=regression - -> Limit - Output: l_orderkey, (count(DISTINCT l_partkey)) - -> Sort - Output: l_orderkey, (count(DISTINCT l_partkey)) - Sort Key: (count(DISTINCT lineitem_hash.l_partkey)) DESC, lineitem_hash.l_orderkey DESC - -> GroupAggregate - Output: l_orderkey, count(DISTINCT l_partkey) - Group Key: lineitem_hash.l_orderkey - -> Index Scan Backward using lineitem_hash_pkey_240000 on public.lineitem_hash_240000 lineitem_hash - Output: l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment -(24 rows) - --- it is also supported if there is no grouping or grouping is on non-partition field -SELECT - count(DISTINCT l_partkey) - FROM lineitem_hash - ORDER BY 1 DESC - LIMIT 10; - count -------- - 11661 -(1 row) - -EXPLAIN (COSTS false, VERBOSE true) -SELECT - count(DISTINCT l_partkey) - FROM lineitem_hash - ORDER BY 1 DESC - LIMIT 10; - QUERY PLAN ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- - Limit - Output: count(DISTINCT (count(DISTINCT (count(DISTINCT remote_scan.count))))) - -> Sort - Output: count(DISTINCT (count(DISTINCT remote_scan.count))) - Sort Key: count(DISTINCT (count(DISTINCT remote_scan.count))) DESC - -> Aggregate - Output: count(DISTINCT remote_scan.count) - -> Custom Scan (Citus Task-Tracker) - Output: remote_scan.count - Task Count: 8 - Tasks Shown: One of 8 - -> Task - Node: host=localhost port=57637 dbname=regression - -> HashAggregate - Output: l_partkey - Group Key: lineitem_hash.l_partkey - -> Seq Scan on public.lineitem_hash_240000 lineitem_hash - Output: l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment -(18 rows) - -SELECT - l_shipmode, count(DISTINCT l_partkey) - FROM lineitem_hash - GROUP BY l_shipmode - ORDER BY 2 DESC, 1 DESC - LIMIT 10; - l_shipmode | count -------------+------- - TRUCK | 1757 - MAIL | 1730 - AIR | 1702 - FOB | 1700 - RAIL | 1696 - SHIP | 1684 - REG AIR | 1676 -(7 rows) - -EXPLAIN (COSTS false, VERBOSE true) -SELECT - l_shipmode, count(DISTINCT l_partkey) - FROM lineitem_hash - GROUP BY l_shipmode - ORDER BY 2 DESC, 1 DESC - LIMIT 10; - QUERY PLAN ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- - Limit - Output: remote_scan.l_shipmode, count(DISTINCT (count(DISTINCT (count(DISTINCT remote_scan.count))))) - -> Sort - Output: remote_scan.l_shipmode, count(DISTINCT (count(DISTINCT remote_scan.count))) - Sort Key: count(DISTINCT (count(DISTINCT remote_scan.count))) DESC, remote_scan.l_shipmode DESC - -> GroupAggregate - Output: remote_scan.l_shipmode, count(DISTINCT remote_scan.count) - Group Key: remote_scan.l_shipmode - -> Sort - Output: remote_scan.l_shipmode, remote_scan.count - Sort Key: remote_scan.l_shipmode DESC - -> Custom Scan (Citus Task-Tracker) - Output: remote_scan.l_shipmode, remote_scan.count - Task Count: 8 - Tasks Shown: One of 8 - -> Task - Node: host=localhost port=57637 dbname=regression - -> HashAggregate - Output: l_shipmode, l_partkey - Group Key: lineitem_hash.l_shipmode, lineitem_hash.l_partkey - -> Seq Scan on public.lineitem_hash_240000 lineitem_hash - Output: l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment -(22 rows) - --- mixed mode count distinct, grouped by partition column -SELECT - l_orderkey, count(distinct l_partkey), count(distinct l_shipmode) - FROM lineitem_hash - GROUP BY l_orderkey - ORDER BY 3 DESC, 2 DESC, 1 - LIMIT 10; - l_orderkey | count | count -------------+-------+------- - 226 | 7 | 7 - 1316 | 7 | 7 - 1477 | 7 | 7 - 3555 | 7 | 7 - 12258 | 7 | 7 - 12835 | 7 | 7 - 768 | 7 | 6 - 1121 | 7 | 6 - 1153 | 7 | 6 - 1281 | 7 | 6 -(10 rows) - -EXPLAIN (COSTS false, VERBOSE true) -SELECT - l_orderkey, count(distinct l_partkey), count(distinct l_shipmode) - FROM lineitem_hash - GROUP BY l_orderkey - ORDER BY 3 DESC, 2 DESC, 1 - LIMIT 10; - QUERY PLAN ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- - Limit - Output: remote_scan.l_orderkey, COALESCE((pg_catalog.sum((COALESCE((pg_catalog.sum((COALESCE((pg_catalog.sum(remote_scan.count))::bigint, '0'::bigint))))::bigint, '0'::bigint))))::bigint, '0'::bigint), COALESCE((pg_catalog.sum((COALESCE((pg_catalog.sum((COALESCE((pg_catalog.sum(remote_scan.count_1))::bigint, '0'::bigint))))::bigint, '0'::bigint))))::bigint, '0'::bigint) - -> Sort - Output: remote_scan.l_orderkey, COALESCE((pg_catalog.sum((COALESCE((pg_catalog.sum(remote_scan.count))::bigint, '0'::bigint))))::bigint, '0'::bigint), COALESCE((pg_catalog.sum((COALESCE((pg_catalog.sum(remote_scan.count_1))::bigint, '0'::bigint))))::bigint, '0'::bigint) - Sort Key: COALESCE((pg_catalog.sum((COALESCE((pg_catalog.sum(remote_scan.count_1))::bigint, '0'::bigint))))::bigint, '0'::bigint) DESC, COALESCE((pg_catalog.sum((COALESCE((pg_catalog.sum(remote_scan.count))::bigint, '0'::bigint))))::bigint, '0'::bigint) DESC, remote_scan.l_orderkey - -> HashAggregate - Output: remote_scan.l_orderkey, COALESCE((pg_catalog.sum(remote_scan.count))::bigint, '0'::bigint), COALESCE((pg_catalog.sum(remote_scan.count_1))::bigint, '0'::bigint) - Group Key: remote_scan.l_orderkey - -> Custom Scan (Citus Task-Tracker) - Output: remote_scan.l_orderkey, remote_scan.count, remote_scan.count_1 - Task Count: 8 - Tasks Shown: One of 8 - -> Task - Node: host=localhost port=57637 dbname=regression - -> Limit - Output: l_orderkey, (count(DISTINCT l_partkey)), (count(DISTINCT l_shipmode)) - -> Sort - Output: l_orderkey, (count(DISTINCT l_partkey)), (count(DISTINCT l_shipmode)) - Sort Key: (count(DISTINCT lineitem_hash.l_shipmode)) DESC, (count(DISTINCT lineitem_hash.l_partkey)) DESC, lineitem_hash.l_orderkey - -> GroupAggregate - Output: l_orderkey, count(DISTINCT l_partkey), count(DISTINCT l_shipmode) - Group Key: lineitem_hash.l_orderkey - -> Index Scan using lineitem_hash_pkey_240000 on public.lineitem_hash_240000 lineitem_hash - Output: l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment -(24 rows) - --- partition/non-partition column count distinct no grouping -SELECT - count(distinct l_orderkey), count(distinct l_partkey), count(distinct l_shipmode) - FROM lineitem_hash; - count | count | count --------+-------+------- - 2985 | 11661 | 7 -(1 row) - -EXPLAIN (COSTS false, VERBOSE true) -SELECT - count(distinct l_orderkey), count(distinct l_partkey), count(distinct l_shipmode) - FROM lineitem_hash; - QUERY PLAN ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- - Aggregate - Output: count(DISTINCT remote_scan.count), count(DISTINCT remote_scan.count_1), count(DISTINCT remote_scan.count_2) - -> Custom Scan (Citus Task-Tracker) - Output: remote_scan.count, remote_scan.count_1, remote_scan.count_2 - Task Count: 8 - Tasks Shown: One of 8 - -> Task - Node: host=localhost port=57637 dbname=regression - -> HashAggregate - Output: l_orderkey, l_partkey, l_shipmode - Group Key: lineitem_hash.l_orderkey, lineitem_hash.l_partkey, lineitem_hash.l_shipmode - -> Seq Scan on public.lineitem_hash_240000 lineitem_hash - Output: l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment -(13 rows) - --- distinct/non-distinct on partition and non-partition columns -SELECT - count(distinct l_orderkey), count(l_orderkey), - count(distinct l_partkey), count(l_partkey), - count(distinct l_shipmode), count(l_shipmode) - FROM lineitem_hash; - count | count | count | count | count | count --------+-------+-------+-------+-------+------- - 2985 | 12000 | 11661 | 12000 | 7 | 12000 -(1 row) - --- mixed mode count distinct, grouped by non-partition column -SELECT - l_shipmode, count(distinct l_partkey), count(distinct l_orderkey) - FROM lineitem_hash - GROUP BY l_shipmode - ORDER BY 1, 2 DESC, 3 DESC; - l_shipmode | count | count -------------+-------+------- - AIR | 1702 | 1327 - FOB | 1700 | 1276 - MAIL | 1730 | 1299 - RAIL | 1696 | 1265 - REG AIR | 1676 | 1275 - SHIP | 1684 | 1289 - TRUCK | 1757 | 1333 -(7 rows) - --- mixed mode count distinct, grouped by non-partition column --- having on partition column -SELECT - l_shipmode, count(distinct l_partkey), count(distinct l_orderkey) - FROM lineitem_hash - GROUP BY l_shipmode - HAVING count(distinct l_orderkey) > 1300 - ORDER BY 1, 2 DESC; - l_shipmode | count | count -------------+-------+------- - AIR | 1702 | 1327 - TRUCK | 1757 | 1333 -(2 rows) - --- same but having clause is not on target list -SELECT - l_shipmode, count(distinct l_partkey) - FROM lineitem_hash - GROUP BY l_shipmode - HAVING count(distinct l_orderkey) > 1300 - ORDER BY 1, 2 DESC; - l_shipmode | count -------------+------- - AIR | 1702 - TRUCK | 1757 -(2 rows) - --- mixed mode count distinct, grouped by non-partition column --- having on non-partition column -SELECT - l_shipmode, count(distinct l_partkey), count(distinct l_suppkey) - FROM lineitem_hash - GROUP BY l_shipmode - HAVING count(distinct l_suppkey) > 1550 - ORDER BY 1, 2 DESC; - l_shipmode | count | count -------------+-------+------- - AIR | 1702 | 1564 - FOB | 1700 | 1571 - MAIL | 1730 | 1573 - RAIL | 1696 | 1581 - REG AIR | 1676 | 1557 - SHIP | 1684 | 1554 - TRUCK | 1757 | 1602 -(7 rows) - --- same but having clause is not on target list -SELECT - l_shipmode, count(distinct l_partkey) - FROM lineitem_hash - GROUP BY l_shipmode - HAVING count(distinct l_suppkey) > 1550 - ORDER BY 1, 2 DESC; - l_shipmode | count -------------+------- - AIR | 1702 - FOB | 1700 - MAIL | 1730 - RAIL | 1696 - REG AIR | 1676 - SHIP | 1684 - TRUCK | 1757 -(7 rows) - -EXPLAIN (COSTS false, VERBOSE true) -SELECT - l_shipmode, count(distinct l_partkey) - FROM lineitem_hash - GROUP BY l_shipmode - HAVING count(distinct l_suppkey) > 1550 - ORDER BY 1, 2 DESC; - QUERY PLAN ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- - Sort - Output: remote_scan.l_shipmode, count(DISTINCT (count(DISTINCT remote_scan.count))) - Sort Key: remote_scan.l_shipmode, count(DISTINCT (count(DISTINCT remote_scan.count))) DESC - -> GroupAggregate - Output: remote_scan.l_shipmode, count(DISTINCT remote_scan.count) - Group Key: remote_scan.l_shipmode - Filter: (count(DISTINCT remote_scan.worker_column_3) > 1550) - -> Sort - Output: remote_scan.l_shipmode, remote_scan.count, remote_scan.worker_column_3 - Sort Key: remote_scan.l_shipmode - -> Custom Scan (Citus Task-Tracker) - Output: remote_scan.l_shipmode, remote_scan.count, remote_scan.worker_column_3 - Task Count: 8 - Tasks Shown: One of 8 - -> Task - Node: host=localhost port=57637 dbname=regression - -> HashAggregate - Output: l_shipmode, l_partkey, l_suppkey - Group Key: lineitem_hash.l_shipmode, lineitem_hash.l_partkey, lineitem_hash.l_suppkey - -> Seq Scan on public.lineitem_hash_240000 lineitem_hash - Output: l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment -(21 rows) - --- count distinct is supported on single table subqueries -SELECT * - FROM ( - SELECT - l_orderkey, count(DISTINCT l_partkey) - FROM lineitem_hash - GROUP BY l_orderkey) sub - ORDER BY 2 DESC, 1 DESC - LIMIT 10; - l_orderkey | count -------------+------- - 14885 | 7 - 14884 | 7 - 14821 | 7 - 14790 | 7 - 14785 | 7 - 14755 | 7 - 14725 | 7 - 14694 | 7 - 14627 | 7 - 14624 | 7 -(10 rows) - -SELECT * - FROM ( - SELECT - l_partkey, count(DISTINCT l_orderkey) - FROM lineitem_hash - GROUP BY l_partkey) sub - ORDER BY 2 DESC, 1 DESC - LIMIT 10; - l_partkey | count ------------+------- - 199146 | 3 - 188804 | 3 - 177771 | 3 - 160895 | 3 - 149926 | 3 - 136884 | 3 - 87761 | 3 - 15283 | 3 - 6983 | 3 - 1927 | 3 -(10 rows) - -EXPLAIN (COSTS false, VERBOSE true) -SELECT * - FROM ( - SELECT - l_partkey, count(DISTINCT l_orderkey) - FROM lineitem_hash - GROUP BY l_partkey) sub - ORDER BY 2 DESC, 1 DESC - LIMIT 10; - QUERY PLAN -------------------------------------------------------------------------- - Limit - Output: remote_scan.l_partkey, remote_scan.count - -> Sort - Output: remote_scan.l_partkey, remote_scan.count - Sort Key: remote_scan.count DESC, remote_scan.l_partkey DESC - -> Custom Scan (Citus Task-Tracker) - Output: remote_scan.l_partkey, remote_scan.count - Task Count: 4 - Tasks Shown: None, not supported for re-partition queries - -> MapMergeJob - Map Task Count: 8 - Merge Task Count: 4 -(12 rows) - --- count distinct with filters -SELECT - l_orderkey, - count(DISTINCT l_suppkey) FILTER (WHERE l_shipmode = 'AIR'), - count(DISTINCT l_suppkey) - FROM lineitem_hash - GROUP BY l_orderkey - ORDER BY 2 DESC, 3 DESC, 1 - LIMIT 10; - l_orderkey | count | count -------------+-------+------- - 4964 | 4 | 7 - 12005 | 4 | 7 - 5409 | 4 | 6 - 164 | 3 | 7 - 322 | 3 | 7 - 871 | 3 | 7 - 1156 | 3 | 7 - 1574 | 3 | 7 - 2054 | 3 | 7 - 2309 | 3 | 7 -(10 rows) - -EXPLAIN (COSTS false, VERBOSE true) -SELECT - l_orderkey, - count(DISTINCT l_suppkey) FILTER (WHERE l_shipmode = 'AIR'), - count(DISTINCT l_suppkey) - FROM lineitem_hash - GROUP BY l_orderkey - ORDER BY 2 DESC, 3 DESC, 1 - LIMIT 10; - QUERY PLAN ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- - Limit - Output: remote_scan.l_orderkey, COALESCE((pg_catalog.sum((COALESCE((pg_catalog.sum((COALESCE((pg_catalog.sum(remote_scan.count))::bigint, '0'::bigint))))::bigint, '0'::bigint))))::bigint, '0'::bigint), COALESCE((pg_catalog.sum((COALESCE((pg_catalog.sum((COALESCE((pg_catalog.sum(remote_scan.count_1))::bigint, '0'::bigint))))::bigint, '0'::bigint))))::bigint, '0'::bigint) - -> Sort - Output: remote_scan.l_orderkey, COALESCE((pg_catalog.sum((COALESCE((pg_catalog.sum(remote_scan.count))::bigint, '0'::bigint))))::bigint, '0'::bigint), COALESCE((pg_catalog.sum((COALESCE((pg_catalog.sum(remote_scan.count_1))::bigint, '0'::bigint))))::bigint, '0'::bigint) - Sort Key: COALESCE((pg_catalog.sum((COALESCE((pg_catalog.sum(remote_scan.count))::bigint, '0'::bigint))))::bigint, '0'::bigint) DESC, COALESCE((pg_catalog.sum((COALESCE((pg_catalog.sum(remote_scan.count_1))::bigint, '0'::bigint))))::bigint, '0'::bigint) DESC, remote_scan.l_orderkey - -> HashAggregate - Output: remote_scan.l_orderkey, COALESCE((pg_catalog.sum(remote_scan.count))::bigint, '0'::bigint), COALESCE((pg_catalog.sum(remote_scan.count_1))::bigint, '0'::bigint) - Group Key: remote_scan.l_orderkey - -> Custom Scan (Citus Task-Tracker) - Output: remote_scan.l_orderkey, remote_scan.count, remote_scan.count_1 - Task Count: 8 - Tasks Shown: One of 8 - -> Task - Node: host=localhost port=57637 dbname=regression - -> Limit - Output: l_orderkey, (count(DISTINCT l_suppkey) FILTER (WHERE (l_shipmode = 'AIR'::bpchar))), (count(DISTINCT l_suppkey)) - -> Sort - Output: l_orderkey, (count(DISTINCT l_suppkey) FILTER (WHERE (l_shipmode = 'AIR'::bpchar))), (count(DISTINCT l_suppkey)) - Sort Key: (count(DISTINCT lineitem_hash.l_suppkey) FILTER (WHERE (lineitem_hash.l_shipmode = 'AIR'::bpchar))) DESC, (count(DISTINCT lineitem_hash.l_suppkey)) DESC, lineitem_hash.l_orderkey - -> GroupAggregate - Output: l_orderkey, count(DISTINCT l_suppkey) FILTER (WHERE (l_shipmode = 'AIR'::bpchar)), count(DISTINCT l_suppkey) - Group Key: lineitem_hash.l_orderkey - -> Index Scan using lineitem_hash_pkey_240000 on public.lineitem_hash_240000 lineitem_hash - Output: l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment -(24 rows) - --- group by on non-partition column -SELECT - l_suppkey, count(DISTINCT l_partkey) FILTER (WHERE l_shipmode = 'AIR') - FROM lineitem_hash - GROUP BY l_suppkey - ORDER BY 2 DESC, 1 DESC - LIMIT 10; - l_suppkey | count ------------+------- - 7680 | 4 - 7703 | 3 - 7542 | 3 - 7072 | 3 - 6335 | 3 - 5873 | 3 - 1318 | 3 - 1042 | 3 - 160 | 3 - 9872 | 2 -(10 rows) - --- explaining the same query fails -EXPLAIN (COSTS false, VERBOSE true) -SELECT - l_suppkey, count(DISTINCT l_partkey) FILTER (WHERE l_shipmode = 'AIR') - FROM lineitem_hash - GROUP BY l_suppkey - ORDER BY 2 DESC, 1 DESC - LIMIT 10; -ERROR: bogus varattno for OUTER_VAR var: 3 --- without group by, on partition column -SELECT - count(DISTINCT l_orderkey) FILTER (WHERE l_shipmode = 'AIR') - FROM lineitem_hash; - count -------- - 1327 -(1 row) - --- without group by, on non-partition column -SELECT - count(DISTINCT l_partkey) FILTER (WHERE l_shipmode = 'AIR') - FROM lineitem_hash; - count -------- - 1702 -(1 row) - -SELECT - count(DISTINCT l_partkey) FILTER (WHERE l_shipmode = 'AIR'), - count(DISTINCT l_partkey), - count(DISTINCT l_shipdate) - FROM lineitem_hash; - count | count | count --------+-------+------- - 1702 | 11661 | 2470 -(1 row) - --- filter column already exists in target list -SELECT * - FROM ( - SELECT - l_orderkey, count(DISTINCT l_partkey) FILTER (WHERE l_orderkey > 100) - FROM lineitem_hash - GROUP BY l_orderkey) sub - ORDER BY 2 DESC, 1 DESC - LIMIT 10; - l_orderkey | count -------------+------- - 14885 | 7 - 14884 | 7 - 14821 | 7 - 14790 | 7 - 14785 | 7 - 14755 | 7 - 14725 | 7 - 14694 | 7 - 14627 | 7 - 14624 | 7 -(10 rows) - --- filter column does not exist in target list -SELECT * - FROM ( - SELECT - l_orderkey, count(DISTINCT l_partkey) FILTER (WHERE l_shipmode = 'AIR') - FROM lineitem_hash - GROUP BY l_orderkey) sub - ORDER BY 2 DESC, 1 DESC - LIMIT 10; - l_orderkey | count -------------+------- - 12005 | 4 - 5409 | 4 - 4964 | 4 - 14848 | 3 - 14496 | 3 - 13473 | 3 - 13122 | 3 - 12929 | 3 - 12645 | 3 - 12417 | 3 -(10 rows) - --- case expr in count distinct is supported. --- count orders partkeys if l_shipmode is air -SELECT * - FROM ( - SELECT - l_orderkey, count(DISTINCT CASE WHEN l_shipmode = 'AIR' THEN l_partkey ELSE NULL END) as count - FROM lineitem_hash - GROUP BY l_orderkey) sub - WHERE count > 0 - ORDER BY 2 DESC, 1 DESC - LIMIT 10; - l_orderkey | count -------------+------- - 12005 | 4 - 5409 | 4 - 4964 | 4 - 14848 | 3 - 14496 | 3 - 13473 | 3 - 13122 | 3 - 12929 | 3 - 12645 | 3 - 12417 | 3 -(10 rows) - --- text like operator is also supported -SELECT * - FROM ( - SELECT - l_orderkey, count(DISTINCT CASE WHEN l_shipmode like '%A%' THEN l_partkey ELSE NULL END) as count - FROM lineitem_hash - GROUP BY l_orderkey) sub - WHERE count > 0 - ORDER BY 2 DESC, 1 DESC - LIMIT 10; - l_orderkey | count -------------+------- - 14275 | 7 - 14181 | 7 - 13605 | 7 - 12707 | 7 - 12384 | 7 - 11746 | 7 - 10727 | 7 - 10467 | 7 - 5636 | 7 - 4614 | 7 -(10 rows) - --- count distinct is rejected if it does not reference any columns -SELECT * - FROM ( - SELECT - l_linenumber, count(DISTINCT 1) - FROM lineitem_hash - GROUP BY l_linenumber) sub - ORDER BY 2 DESC, 1 DESC - LIMIT 10; -ERROR: cannot compute aggregate (distinct) -DETAIL: aggregate (distinct) with no columns is unsupported -HINT: You can load the hll extension from contrib packages and enable distinct approximations. --- count distinct is rejected if it does not reference any columns -SELECT * - FROM ( - SELECT - l_linenumber, count(DISTINCT (random() * 5)::int) - FROM lineitem_hash - GROUP BY l_linenumber) sub - ORDER BY 2 DESC, 1 DESC - LIMIT 10; -ERROR: cannot compute aggregate (distinct) -DETAIL: aggregate (distinct) with no columns is unsupported -HINT: You can load the hll extension from contrib packages and enable distinct approximations. --- even non-const function calls are supported within count distinct -SELECT * - FROM ( - SELECT - l_orderkey, count(DISTINCT (random() * 5)::int = l_linenumber) - FROM lineitem_hash - GROUP BY l_orderkey) sub - ORDER BY 2 DESC, 1 DESC - LIMIT 0; - l_orderkey | count -------------+------- -(0 rows) - --- multiple nested subquery -SELECT - total, - avg(avg_count) as total_avg_count - FROM ( - SELECT - number_sum, - count(DISTINCT l_suppkey) as total, - avg(total_count) avg_count - FROM ( - SELECT - l_suppkey, - sum(l_linenumber) as number_sum, - count(DISTINCT l_shipmode) as total_count - FROM - lineitem_hash - WHERE - l_partkey > 100 and - l_quantity > 2 and - l_orderkey < 10000 - GROUP BY - l_suppkey) as distributed_table - WHERE - number_sum >= 10 - GROUP BY - number_sum) as distributed_table_2 - GROUP BY - total - ORDER BY - total_avg_count DESC; - total | total_avg_count --------+-------------------- - 1 | 3.6000000000000000 - 6 | 2.8333333333333333 - 10 | 2.6000000000000000 - 27 | 2.5555555555555556 - 32 | 2.4687500000000000 - 77 | 2.1948051948051948 - 57 | 2.1754385964912281 -(7 rows) - --- multiple cases query -SELECT * - FROM ( - SELECT - count(DISTINCT - CASE - WHEN l_shipmode = 'TRUCK' THEN l_partkey - WHEN l_shipmode = 'AIR' THEN l_quantity - WHEN l_shipmode = 'SHIP' THEN l_discount - ELSE l_suppkey - END) as count, - l_shipdate - FROM - lineitem_hash - GROUP BY - l_shipdate) sub - WHERE - count > 0 - ORDER BY - 1 DESC, 2 DESC - LIMIT 10; - count | l_shipdate --------+------------ - 14 | 07-30-1997 - 13 | 05-26-1998 - 13 | 08-08-1997 - 13 | 11-17-1995 - 13 | 01-09-1993 - 12 | 01-15-1998 - 12 | 10-15-1997 - 12 | 09-07-1997 - 12 | 06-02-1997 - 12 | 03-14-1997 -(10 rows) - --- count DISTINCT expression -SELECT * - FROM ( - SELECT - l_quantity, count(DISTINCT ((l_orderkey / 1000) * 1000 )) as count - FROM - lineitem_hash - GROUP BY - l_quantity) sub - WHERE - count > 0 - ORDER BY - 2 DESC, 1 DESC - LIMIT 10; - l_quantity | count -------------+------- - 48.00 | 13 - 47.00 | 13 - 37.00 | 13 - 33.00 | 13 - 26.00 | 13 - 25.00 | 13 - 23.00 | 13 - 21.00 | 13 - 15.00 | 13 - 12.00 | 13 -(10 rows) - --- count DISTINCT is part of an expression which inclues another aggregate -SELECT * - FROM ( - SELECT - sum(((l_partkey * l_tax) / 100)) / - count(DISTINCT - CASE - WHEN l_shipmode = 'TRUCK' THEN l_partkey - ELSE l_suppkey - END) as avg, - l_shipmode - FROM - lineitem_hash - GROUP BY - l_shipmode) sub - ORDER BY - 1 DESC, 2 DESC - LIMIT 10; - avg | l_shipmode --------------------------+------------ - 44.82904609027336300064 | MAIL - 44.80704536679536679537 | SHIP - 44.68891732736572890026 | AIR - 44.34106724470134874759 | REG AIR - 43.12739987269255251432 | FOB - 43.07299253636938646426 | RAIL - 40.50298377916903813318 | TRUCK -(7 rows) - ---- count DISTINCT CASE WHEN expression -SELECT * - FROM ( - SELECT - count(DISTINCT - CASE - WHEN l_shipmode = 'TRUCK' THEN l_linenumber - WHEN l_shipmode = 'AIR' THEN l_linenumber + 10 - ELSE 2 - END) as avg - FROM - lineitem_hash - GROUP BY l_shipdate) sub - ORDER BY 1 DESC - LIMIT 10; - avg ------ - 7 - 6 - 6 - 6 - 6 - 6 - 6 - 6 - 5 - 5 -(10 rows) - --- COUNT DISTINCT (c1, c2) -SELECT * - FROM - (SELECT - l_shipmode, - count(DISTINCT (l_shipdate, l_tax)) - FROM - lineitem_hash - GROUP BY - l_shipmode) t - ORDER BY - 2 DESC,1 DESC - LIMIT 10; - l_shipmode | count -------------+------- - TRUCK | 1689 - MAIL | 1683 - FOB | 1655 - AIR | 1650 - SHIP | 1644 - RAIL | 1636 - REG AIR | 1607 -(7 rows) - --- distinct on non-var (type cast/field select) columns are also --- supported if grouped on distribution column --- random is added to prevent flattening by postgresql -SELECT - l_orderkey, count(a::int), count(distinct a::int) - FROM ( - SELECT l_orderkey, l_orderkey * 1.5 a, random() b - FROM lineitem_hash) sub - GROUP BY 1 - ORDER BY 1 DESC - LIMIT 5; - l_orderkey | count | count -------------+-------+------- - 14947 | 2 | 1 - 14946 | 2 | 1 - 14945 | 6 | 1 - 14944 | 2 | 1 - 14919 | 1 | 1 -(5 rows) - -SELECT user_id, - count(sub.a::int), - count(DISTINCT sub.a::int), - count(DISTINCT (sub).a) -FROM - (SELECT user_id, - unnest(ARRAY[user_id * 1.5])a, - random() b - FROM users_table - ) sub -GROUP BY 1 -ORDER BY 1 DESC -LIMIT 5; - user_id | count | count | count ----------+-------+-------+------- - 6 | 11 | 1 | 1 - 5 | 27 | 1 | 1 - 4 | 24 | 1 | 1 - 3 | 18 | 1 | 1 - 2 | 19 | 1 | 1 -(5 rows) - -CREATE TYPE test_item AS -( - id INTEGER, - duration INTEGER -); -SELECT * FROM run_command_on_workers($$CREATE TYPE test_item AS -( - id INTEGER, - duration INTEGER -)$$) ORDER BY nodeport; - nodename | nodeport | success | result ------------+----------+---------+------------- - localhost | 57637 | t | CREATE TYPE - localhost | 57638 | t | CREATE TYPE -(2 rows) - -CREATE TABLE test_count_distinct_array (key int, value int , value_arr test_item[]); -SELECT create_distributed_table('test_count_distinct_array', 'key'); - create_distributed_table --------------------------- - -(1 row) - -INSERT INTO test_count_distinct_array SELECT i, i, ARRAY[(i,i)::test_item] FROM generate_Series(0, 1000) i; -SELECT - key, - count(DISTINCT value), - count(DISTINCT (item)."id"), - count(DISTINCT (item)."id" * 3) -FROM - ( - SELECT key, unnest(value_arr) as item, value FROM test_count_distinct_array - ) as sub -GROUP BY 1 -ORDER BY 1 DESC -LIMIT 5; - key | count | count | count -------+-------+-------+------- - 1000 | 1 | 1 | 1 - 999 | 1 | 1 | 1 - 998 | 1 | 1 | 1 - 997 | 1 | 1 | 1 - 996 | 1 | 1 | 1 -(5 rows) - -DROP TABLE test_count_distinct_array; -DROP TYPE test_item; -SELECT * FROM run_command_on_workers($$DROP TYPE test_item$$) ORDER BY nodeport; - nodename | nodeport | success | result ------------+----------+---------+----------- - localhost | 57637 | t | DROP TYPE - localhost | 57638 | t | DROP TYPE -(2 rows) - --- other distinct aggregate are not supported -SELECT * - FROM ( - SELECT - l_linenumber, sum(DISTINCT l_partkey) - FROM lineitem_hash - GROUP BY l_linenumber) sub - ORDER BY 2 DESC, 1 DESC - LIMIT 10; -ERROR: cannot compute aggregate (distinct) -DETAIL: Only count(distinct) aggregate is supported in subqueries -SELECT * - FROM ( - SELECT - l_linenumber, avg(DISTINCT l_partkey) - FROM lineitem_hash - GROUP BY l_linenumber) sub - ORDER BY 2 DESC, 1 DESC - LIMIT 10; -ERROR: cannot compute aggregate (distinct) -DETAIL: Only count(distinct) aggregate is supported in subqueries --- whole row references, oid, and ctid are not supported in count distinct --- test table does not have oid or ctid enabled, so tests for them are skipped -SELECT * - FROM ( - SELECT - l_linenumber, count(DISTINCT lineitem_hash) - FROM lineitem_hash - GROUP BY l_linenumber) sub - ORDER BY 2 DESC, 1 DESC - LIMIT 10; -ERROR: cannot compute count (distinct) -DETAIL: Non-column references are not supported yet -SELECT * - FROM ( - SELECT - l_linenumber, count(DISTINCT lineitem_hash.*) - FROM lineitem_hash - GROUP BY l_linenumber) sub - ORDER BY 2 DESC, 1 DESC - LIMIT 10; -ERROR: cannot compute count (distinct) -DETAIL: Non-column references are not supported yet -DROP TABLE lineitem_hash; diff --git a/src/test/regress/input/multi_agg_distinct.source b/src/test/regress/input/multi_agg_distinct.source index 4a628d2df..6bd652023 100644 --- a/src/test/regress/input/multi_agg_distinct.source +++ b/src/test/regress/input/multi_agg_distinct.source @@ -53,12 +53,15 @@ SELECT p_partkey, count(distinct l_orderkey) FROM lineitem_range, part GROUP BY p_partkey ORDER BY p_partkey LIMIT 10; - --- Check that we support count(distinct) on non-partition column. -SELECT count(distinct l_partkey) FROM lineitem_range; - --- Check that we don't support complex expressions. +-- Check that we support more complex expressions. +SELECT count(distinct (l_orderkey)) FROM lineitem_range; SELECT count(distinct (l_orderkey + 1)) FROM lineitem_range; +SELECT count(distinct (l_orderkey % 5)) FROM lineitem_range; + +-- count(distinct) on non-partition column is allowed +SELECT count(distinct l_partkey) FROM lineitem_range; +SELECT count(distinct (l_partkey + 1)) FROM lineitem_range; +SELECT count(distinct (l_partkey % 5)) FROM lineitem_range; -- Now test append partitioned tables. First run count(distinct) on a single -- sharded table. @@ -98,11 +101,16 @@ SELECT create_distributed_table('lineitem_hash', 'l_orderkey', 'hash'); SELECT count(distinct l_orderkey) FROM lineitem_hash; SELECT avg(distinct l_orderkey) FROM lineitem_hash; +-- Check that we support more complex expressions. +SELECT count(distinct (l_orderkey)) FROM lineitem_hash; +SELECT count(distinct (l_orderkey + 1)) FROM lineitem_hash; +SELECT count(distinct (l_orderkey % 5)) FROM lineitem_hash; + -- count(distinct) on non-partition column is allowed SELECT count(distinct l_partkey) FROM lineitem_hash; +SELECT count(distinct (l_partkey + 1)) FROM lineitem_hash; +SELECT count(distinct (l_partkey % 5)) FROM lineitem_hash; --- count(distinct) on column expression is not allowed -SELECT count(distinct (l_orderkey + 1)) FROM lineitem_hash; -- agg(distinct) is allowed if we group by partition column SELECT l_orderkey, count(distinct l_partkey) INTO hash_results FROM lineitem_hash GROUP BY l_orderkey; diff --git a/src/test/regress/input/multi_complex_count_distinct.source b/src/test/regress/input/multi_complex_count_distinct.source index d2c2a8f79..3a268c878 100644 --- a/src/test/regress/input/multi_complex_count_distinct.source +++ b/src/test/regress/input/multi_complex_count_distinct.source @@ -384,7 +384,7 @@ SELECT * 2 DESC, 1 DESC LIMIT 10; --- count DISTINCT is part of an expression which inclues another aggregate +-- count DISTINCT is part of an expression which includes another aggregate SELECT * FROM ( SELECT @@ -403,7 +403,7 @@ SELECT * 1 DESC, 2 DESC LIMIT 10; ---- count DISTINCT CASE WHEN expression +-- count DISTINCT CASE WHEN expression SELECT * FROM ( SELECT @@ -532,5 +532,60 @@ SELECT * ORDER BY 2 DESC, 1 DESC LIMIT 10; +-- count distinct pushdown is enabled +SELECT * + FROM ( + SELECT + l_shipdate, + count(DISTINCT + CASE + WHEN l_shipmode = 'TRUCK' THEN l_partkey + ELSE NULL + END) as distinct_part, + extract(year from l_shipdate) as year + FROM + lineitem_hash + GROUP BY l_shipdate, year) sub + WHERE year = 1995 + ORDER BY 2 DESC, 1 + LIMIT 10; + +RESET citus.task_executor_type; + +-- count distinct pushdown is enabled +SELECT * + FROM ( + SELECT + l_shipdate, + count(DISTINCT + CASE + WHEN l_shipmode = 'TRUCK' THEN l_partkey + ELSE NULL + END) as distinct_part, + extract(year from l_shipdate) as year + FROM + lineitem_hash + GROUP BY l_shipdate, year) sub + WHERE year = 1995 + ORDER BY 2 DESC, 1 + LIMIT 10; + +SELECT * + FROM ( + SELECT + l_shipdate, + count(DISTINCT + CASE + WHEN l_shipmode = 'TRUCK' THEN l_partkey + ELSE NULL + END) as distinct_part, + extract(year from l_shipdate) as year + FROM + lineitem_hash + GROUP BY l_shipdate) sub + WHERE year = 1995 + ORDER BY 2 DESC, 1 + LIMIT 10; + DROP TABLE lineitem_hash; diff --git a/src/test/regress/output/multi_agg_distinct.source b/src/test/regress/output/multi_agg_distinct.source index 0c5a57cf5..59de22ed4 100644 --- a/src/test/regress/output/multi_agg_distinct.source +++ b/src/test/regress/output/multi_agg_distinct.source @@ -71,18 +71,44 @@ SELECT p_partkey, count(distinct l_orderkey) FROM lineitem_range, part 222 | 1 (10 rows) --- Check that we support count(distinct) on non-partition column. +-- Check that we support more complex expressions. +SELECT count(distinct (l_orderkey)) FROM lineitem_range; + count +------- + 2985 +(1 row) + +SELECT count(distinct (l_orderkey + 1)) FROM lineitem_range; + count +------- + 2985 +(1 row) + +SELECT count(distinct (l_orderkey % 5)) FROM lineitem_range; + count +------- + 5 +(1 row) + +-- count(distinct) on non-partition column is allowed SELECT count(distinct l_partkey) FROM lineitem_range; count ------- 11661 (1 row) --- Check that we don't support complex expressions. -SELECT count(distinct (l_orderkey + 1)) FROM lineitem_range; -ERROR: cannot compute aggregate (distinct) -DETAIL: aggregate (distinct) on complex expressions is unsupported -HINT: You can load the hll extension from contrib packages and enable distinct approximations. +SELECT count(distinct (l_partkey + 1)) FROM lineitem_range; + count +------- + 11661 +(1 row) + +SELECT count(distinct (l_partkey % 5)) FROM lineitem_range; + count +------- + 5 +(1 row) + -- Now test append partitioned tables. First run count(distinct) on a single -- sharded table. SELECT count(distinct p_mfgr) FROM part; @@ -149,6 +175,25 @@ SELECT avg(distinct l_orderkey) FROM lineitem_hash; 7463.9474036850921273 (1 row) +-- Check that we support more complex expressions. +SELECT count(distinct (l_orderkey)) FROM lineitem_hash; + count +------- + 2985 +(1 row) + +SELECT count(distinct (l_orderkey + 1)) FROM lineitem_hash; + count +------- + 2985 +(1 row) + +SELECT count(distinct (l_orderkey % 5)) FROM lineitem_hash; + count +------- + 5 +(1 row) + -- count(distinct) on non-partition column is allowed SELECT count(distinct l_partkey) FROM lineitem_hash; count @@ -156,11 +201,18 @@ SELECT count(distinct l_partkey) FROM lineitem_hash; 11661 (1 row) --- count(distinct) on column expression is not allowed -SELECT count(distinct (l_orderkey + 1)) FROM lineitem_hash; -ERROR: cannot compute aggregate (distinct) -DETAIL: aggregate (distinct) on complex expressions is unsupported -HINT: You can load the hll extension from contrib packages and enable distinct approximations. +SELECT count(distinct (l_partkey + 1)) FROM lineitem_hash; + count +------- + 11661 +(1 row) + +SELECT count(distinct (l_partkey % 5)) FROM lineitem_hash; + count +------- + 5 +(1 row) + -- agg(distinct) is allowed if we group by partition column SELECT l_orderkey, count(distinct l_partkey) INTO hash_results FROM lineitem_hash GROUP BY l_orderkey; SELECT l_orderkey, count(distinct l_partkey) INTO range_results FROM lineitem_range GROUP BY l_orderkey; diff --git a/src/test/regress/output/multi_complex_count_distinct.source b/src/test/regress/output/multi_complex_count_distinct.source index 1eff1b3ad..b63c92f66 100644 --- a/src/test/regress/output/multi_complex_count_distinct.source +++ b/src/test/regress/output/multi_complex_count_distinct.source @@ -813,7 +813,7 @@ SELECT * 12.00 | 13 (10 rows) --- count DISTINCT is part of an expression which inclues another aggregate +-- count DISTINCT is part of an expression which includes another aggregate SELECT * FROM ( SELECT @@ -842,7 +842,7 @@ SELECT * 40.50298377916903813318 | TRUCK (7 rows) ---- count DISTINCT CASE WHEN expression +-- count DISTINCT CASE WHEN expression SELECT * FROM ( SELECT @@ -1034,4 +1034,97 @@ SELECT * LIMIT 10; ERROR: cannot compute count (distinct) DETAIL: Non-column references are not supported yet +-- count distinct pushdown is enabled +SELECT * + FROM ( + SELECT + l_shipdate, + count(DISTINCT + CASE + WHEN l_shipmode = 'TRUCK' THEN l_partkey + ELSE NULL + END) as distinct_part, + extract(year from l_shipdate) as year + FROM + lineitem_hash + GROUP BY l_shipdate, year) sub + WHERE year = 1995 + ORDER BY 2 DESC, 1 + LIMIT 10; + l_shipdate | distinct_part | year +------------+---------------+------ + 11-29-1995 | 5 | 1995 + 03-24-1995 | 4 | 1995 + 09-18-1995 | 4 | 1995 + 01-17-1995 | 3 | 1995 + 04-02-1995 | 3 | 1995 + 05-23-1995 | 3 | 1995 + 08-11-1995 | 3 | 1995 + 09-27-1995 | 3 | 1995 + 10-27-1995 | 3 | 1995 + 10-30-1995 | 3 | 1995 +(10 rows) + +RESET citus.task_executor_type; +-- count distinct pushdown is enabled +SELECT * + FROM ( + SELECT + l_shipdate, + count(DISTINCT + CASE + WHEN l_shipmode = 'TRUCK' THEN l_partkey + ELSE NULL + END) as distinct_part, + extract(year from l_shipdate) as year + FROM + lineitem_hash + GROUP BY l_shipdate, year) sub + WHERE year = 1995 + ORDER BY 2 DESC, 1 + LIMIT 10; + l_shipdate | distinct_part | year +------------+---------------+------ + 11-29-1995 | 5 | 1995 + 03-24-1995 | 4 | 1995 + 09-18-1995 | 4 | 1995 + 01-17-1995 | 3 | 1995 + 04-02-1995 | 3 | 1995 + 05-23-1995 | 3 | 1995 + 08-11-1995 | 3 | 1995 + 09-27-1995 | 3 | 1995 + 10-27-1995 | 3 | 1995 + 10-30-1995 | 3 | 1995 +(10 rows) + +SELECT * + FROM ( + SELECT + l_shipdate, + count(DISTINCT + CASE + WHEN l_shipmode = 'TRUCK' THEN l_partkey + ELSE NULL + END) as distinct_part, + extract(year from l_shipdate) as year + FROM + lineitem_hash + GROUP BY l_shipdate) sub + WHERE year = 1995 + ORDER BY 2 DESC, 1 + LIMIT 10; + l_shipdate | distinct_part | year +------------+---------------+------ + 11-29-1995 | 5 | 1995 + 03-24-1995 | 4 | 1995 + 09-18-1995 | 4 | 1995 + 01-17-1995 | 3 | 1995 + 04-02-1995 | 3 | 1995 + 05-23-1995 | 3 | 1995 + 08-11-1995 | 3 | 1995 + 09-27-1995 | 3 | 1995 + 10-27-1995 | 3 | 1995 + 10-30-1995 | 3 | 1995 +(10 rows) + DROP TABLE lineitem_hash;