mirror of https://github.com/citusdata/citus.git
Merge pull request #399 from citusdata/count_distinct_pushdown
Add support for count(distinct) on hash partitioned tablespull/460/head
commit
73bc20688c
|
@ -2309,7 +2309,9 @@ TablePartitioningSupportsDistinct(List *tableNodeList, MultiExtendedOp *opNode,
|
|||
* if table is range partitioned.
|
||||
*/
|
||||
partitionMethod = PartitionMethod(relationId);
|
||||
if (partitionMethod == DISTRIBUTE_BY_RANGE)
|
||||
|
||||
if (partitionMethod == DISTRIBUTE_BY_RANGE
|
||||
|| partitionMethod == DISTRIBUTE_BY_HASH)
|
||||
{
|
||||
Var *tablePartitionColumn = tableNode->partitionColumn;
|
||||
bool groupedByPartitionColumn = false;
|
||||
|
|
|
@ -23,8 +23,10 @@ CREATE TABLE lineitem_range (
|
|||
l_comment varchar(44) not null );
|
||||
SELECT master_create_distributed_table('lineitem_range', 'l_orderkey', 'range');
|
||||
|
||||
SET citus.shard_max_size TO "500MB";
|
||||
\STAGE lineitem_range FROM '@abs_srcdir@/data/lineitem.1.data' with delimiter '|'
|
||||
\STAGE lineitem_range FROM '@abs_srcdir@/data/lineitem.2.data' with delimiter '|'
|
||||
RESET citus.shard_max_size;
|
||||
|
||||
-- Run aggregate(distinct) on partition column for range partitioned table
|
||||
|
||||
|
@ -60,3 +62,47 @@ SELECT p_mfgr, count(distinct p_partkey) FROM part GROUP BY p_mfgr;
|
|||
-- has multiple shards
|
||||
|
||||
SELECT count(distinct o_orderkey) FROM orders;
|
||||
|
||||
-- Hash partitioned tables:
|
||||
|
||||
CREATE TABLE lineitem_hash (
|
||||
l_orderkey bigint not null,
|
||||
l_partkey integer not null,
|
||||
l_suppkey integer not null,
|
||||
l_linenumber integer not null,
|
||||
l_quantity decimal(15, 2) not null,
|
||||
l_extendedprice decimal(15, 2) not null,
|
||||
l_discount decimal(15, 2) not null,
|
||||
l_tax decimal(15, 2) not null,
|
||||
l_returnflag char(1) not null,
|
||||
l_linestatus char(1) not null,
|
||||
l_shipdate date not null,
|
||||
l_commitdate date not null,
|
||||
l_receiptdate date not null,
|
||||
l_shipinstruct char(25) not null,
|
||||
l_shipmode char(10) not null,
|
||||
l_comment varchar(44) not null );
|
||||
SELECT master_create_distributed_table('lineitem_hash', 'l_orderkey', 'hash');
|
||||
SELECT master_create_worker_shards('lineitem_hash', 4, 1);
|
||||
|
||||
\COPY lineitem_hash FROM '@abs_srcdir@/data/lineitem.1.data' with delimiter '|'
|
||||
\COPY lineitem_hash FROM '@abs_srcdir@/data/lineitem.2.data' with delimiter '|'
|
||||
|
||||
-- aggregate(distinct) on partition column is allowed
|
||||
|
||||
SELECT count(distinct l_orderkey) FROM lineitem_hash;
|
||||
SELECT avg(distinct l_orderkey) FROM lineitem_hash;
|
||||
|
||||
-- count(distinct) on non-partition column or expression is not allowed
|
||||
|
||||
SELECT count(distinct l_partkey) FROM lineitem_hash;
|
||||
SELECT count(distinct (l_orderkey + 1)) FROM lineitem_hash;
|
||||
|
||||
-- agg(distinct) is allowed if we group by partition column
|
||||
SELECT l_orderkey, count(distinct l_partkey) INTO hash_results FROM lineitem_hash GROUP BY l_orderkey;
|
||||
SELECT l_orderkey, count(distinct l_partkey) INTO range_results FROM lineitem_range GROUP BY l_orderkey;
|
||||
|
||||
-- they should return the same results
|
||||
SELECT * FROM hash_results h, range_results r WHERE h.l_orderkey = r.l_orderkey AND h.count != r.count;
|
||||
|
||||
DROP TABLE lineitem_hash;
|
||||
|
|
|
@ -25,19 +25,21 @@ SELECT master_create_distributed_table('lineitem_range', 'l_orderkey', 'range');
|
|||
|
||||
(1 row)
|
||||
|
||||
SET citus.shard_max_size TO "500MB";
|
||||
\STAGE lineitem_range FROM '@abs_srcdir@/data/lineitem.1.data' with delimiter '|'
|
||||
\STAGE lineitem_range FROM '@abs_srcdir@/data/lineitem.2.data' with delimiter '|'
|
||||
RESET citus.shard_max_size;
|
||||
-- Run aggregate(distinct) on partition column for range partitioned table
|
||||
SELECT count(distinct l_orderkey) FROM lineitem_range;
|
||||
count
|
||||
-------
|
||||
2986
|
||||
2985
|
||||
(1 row)
|
||||
|
||||
SELECT avg(distinct l_orderkey) FROM lineitem_range;
|
||||
avg
|
||||
-----------------------
|
||||
7465.3171466845277964
|
||||
7463.9474036850921273
|
||||
(1 row)
|
||||
|
||||
-- Run count(distinct) on join between a range partitioned table and a single
|
||||
|
@ -97,3 +99,67 @@ SELECT count(distinct o_orderkey) FROM orders;
|
|||
ERROR: cannot compute aggregate (distinct)
|
||||
DETAIL: table partitioning is unsuitable for aggregate (distinct)
|
||||
HINT: You can load the hll extension from contrib packages and enable distinct approximations.
|
||||
-- Hash partitioned tables:
|
||||
CREATE TABLE lineitem_hash (
|
||||
l_orderkey bigint not null,
|
||||
l_partkey integer not null,
|
||||
l_suppkey integer not null,
|
||||
l_linenumber integer not null,
|
||||
l_quantity decimal(15, 2) not null,
|
||||
l_extendedprice decimal(15, 2) not null,
|
||||
l_discount decimal(15, 2) not null,
|
||||
l_tax decimal(15, 2) not null,
|
||||
l_returnflag char(1) not null,
|
||||
l_linestatus char(1) not null,
|
||||
l_shipdate date not null,
|
||||
l_commitdate date not null,
|
||||
l_receiptdate date not null,
|
||||
l_shipinstruct char(25) not null,
|
||||
l_shipmode char(10) not null,
|
||||
l_comment varchar(44) not null );
|
||||
SELECT master_create_distributed_table('lineitem_hash', 'l_orderkey', 'hash');
|
||||
master_create_distributed_table
|
||||
---------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT master_create_worker_shards('lineitem_hash', 4, 1);
|
||||
master_create_worker_shards
|
||||
-----------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
\COPY lineitem_hash FROM '@abs_srcdir@/data/lineitem.1.data' with delimiter '|'
|
||||
\COPY lineitem_hash FROM '@abs_srcdir@/data/lineitem.2.data' with delimiter '|'
|
||||
-- aggregate(distinct) on partition column is allowed
|
||||
SELECT count(distinct l_orderkey) FROM lineitem_hash;
|
||||
count
|
||||
-------
|
||||
2985
|
||||
(1 row)
|
||||
|
||||
SELECT avg(distinct l_orderkey) FROM lineitem_hash;
|
||||
avg
|
||||
-----------------------
|
||||
7463.9474036850921273
|
||||
(1 row)
|
||||
|
||||
-- count(distinct) on non-partition column or expression is not allowed
|
||||
SELECT count(distinct l_partkey) FROM lineitem_hash;
|
||||
ERROR: cannot compute aggregate (distinct)
|
||||
DETAIL: table partitioning is unsuitable for aggregate (distinct)
|
||||
HINT: You can load the hll extension from contrib packages and enable distinct approximations.
|
||||
SELECT count(distinct (l_orderkey + 1)) FROM lineitem_hash;
|
||||
ERROR: cannot compute aggregate (distinct)
|
||||
DETAIL: aggregate (distinct) on complex expressions is unsupported
|
||||
HINT: You can load the hll extension from contrib packages and enable distinct approximations.
|
||||
-- agg(distinct) is allowed if we group by partition column
|
||||
SELECT l_orderkey, count(distinct l_partkey) INTO hash_results FROM lineitem_hash GROUP BY l_orderkey;
|
||||
SELECT l_orderkey, count(distinct l_partkey) INTO range_results FROM lineitem_range GROUP BY l_orderkey;
|
||||
-- they should return the same results
|
||||
SELECT * FROM hash_results h, range_results r WHERE h.l_orderkey = r.l_orderkey AND h.count != r.count;
|
||||
l_orderkey | count | l_orderkey | count
|
||||
------------+-------+------------+-------
|
||||
(0 rows)
|
||||
|
||||
DROP TABLE lineitem_hash;
|
||||
|
|
Loading…
Reference in New Issue