mirror of https://github.com/citusdata/citus.git
Topn aggregates are supported
parent
54f5fb3b26
commit
3afa7f425d
|
@ -1490,8 +1490,8 @@ MasterAggregateMutator(Node *originalNode, MasterAggregateWalkerContext *walkerC
|
|||
/*
|
||||
* MasterAggregateExpression creates the master aggregate expression using the
|
||||
* original aggregate and aggregate's type information. This function handles
|
||||
* the average, count, array_agg, and hll aggregates separately due to differences
|
||||
* in these aggregate functions' transformations.
|
||||
* the average, count, array_agg, hll and topn aggregates separately due to
|
||||
* differences in these aggregate functions' transformations.
|
||||
*
|
||||
* Note that this function has implicit knowledge of the transformations applied
|
||||
* for worker nodes on the original aggregate. The function uses this implicit
|
||||
|
@ -1798,6 +1798,45 @@ MasterAggregateExpression(Aggref *originalAggregate,
|
|||
|
||||
newMasterExpression = (Expr *) unionAggregate;
|
||||
}
|
||||
else if (aggregateType == AGGREGATE_TOPN_UNION_AGG ||
|
||||
aggregateType == AGGREGATE_TOPN_ADD_AGG)
|
||||
{
|
||||
/*
|
||||
* Top-N aggregates are handled in two steps. First, we compute
|
||||
* topn_add_agg() or topn_union_agg() aggregates on the worker nodes.
|
||||
* Then, we gather the Top-Ns on the master and take the union of all
|
||||
* to get the final topn.
|
||||
*/
|
||||
TargetEntry *topNTargetEntry = NULL;
|
||||
Aggref *unionAggregate = NULL;
|
||||
|
||||
/* worker aggregate and original aggregate have same return type */
|
||||
Oid topnType = exprType((Node *) originalAggregate);
|
||||
Oid unionFunctionId = AggregateFunctionOid(TOPN_UNION_AGGREGATE_NAME,
|
||||
topnType);
|
||||
int32 topnReturnTypeMod = exprTypmod((Node *) originalAggregate);
|
||||
Oid topnTypeCollationId = exprCollation((Node *) originalAggregate);
|
||||
|
||||
/* create argument for the topn_union_agg() aggregate */
|
||||
Var *topnColumn = makeVar(masterTableId, walkerContext->columnId, topnType,
|
||||
topnReturnTypeMod, topnTypeCollationId, columnLevelsUp);
|
||||
walkerContext->columnId++;
|
||||
|
||||
topNTargetEntry = makeTargetEntry((Expr *) topnColumn, argumentId, NULL, false);
|
||||
|
||||
/* construct the master topn_union_agg() expression */
|
||||
unionAggregate = makeNode(Aggref);
|
||||
unionAggregate->aggfnoid = unionFunctionId;
|
||||
unionAggregate->aggtype = topnType;
|
||||
unionAggregate->args = list_make1(topNTargetEntry);
|
||||
unionAggregate->aggkind = AGGKIND_NORMAL;
|
||||
unionAggregate->aggfilter = NULL;
|
||||
unionAggregate->aggtranstype = InvalidOid;
|
||||
unionAggregate->aggargtypes = list_make1_oid(topnType);
|
||||
unionAggregate->aggsplit = AGGSPLIT_SIMPLE;
|
||||
|
||||
newMasterExpression = (Expr *) unionAggregate;
|
||||
}
|
||||
else
|
||||
{
|
||||
/*
|
||||
|
|
|
@ -39,6 +39,10 @@
|
|||
#define HLL_UNION_AGGREGATE_NAME "hll_union_agg"
|
||||
#define HLL_CARDINALITY_FUNC_NAME "hll_cardinality"
|
||||
|
||||
/* Definitions related to Top-N approximations */
|
||||
#define TOPN_ADD_AGGREGATE_NAME "topn_add_agg"
|
||||
#define TOPN_UNION_AGGREGATE_NAME "topn_union_agg"
|
||||
|
||||
|
||||
/*
|
||||
* AggregateType represents an aggregate function's type, where the function is
|
||||
|
@ -68,7 +72,9 @@ typedef enum
|
|||
AGGREGATE_BOOL_OR = 14,
|
||||
AGGREGATE_EVERY = 15,
|
||||
AGGREGATE_HLL_ADD = 16,
|
||||
AGGREGATE_HLL_UNION = 17
|
||||
AGGREGATE_HLL_UNION = 17,
|
||||
AGGREGATE_TOPN_ADD_AGG = 18,
|
||||
AGGREGATE_TOPN_UNION_AGG = 19
|
||||
} AggregateType;
|
||||
|
||||
|
||||
|
@ -114,7 +120,8 @@ static const char *const AggregateNames[] = {
|
|||
"jsonb_agg", "jsonb_object_agg",
|
||||
"json_agg", "json_object_agg",
|
||||
"bit_and", "bit_or", "bool_and", "bool_or", "every",
|
||||
"hll_add_agg", "hll_union_agg"
|
||||
"hll_add_agg", "hll_union_agg",
|
||||
"topn_add_agg", "topn_union_agg"
|
||||
};
|
||||
|
||||
|
||||
|
|
|
@ -108,3 +108,162 @@ ERROR: could not run distributed query because the window function that is used
|
|||
HINT: Window functions are supported in two ways. Either add an equality filter on the distributed tables' partition column or use the window functions with a PARTITION BY clause containing the distribution column
|
||||
DROP TABLE raw_table;
|
||||
DROP TABLE daily_uniques;
|
||||
-- Check if TopN aggregates work as expected
|
||||
-- Create TopN extension if present, print false result otherwise
|
||||
SELECT CASE WHEN COUNT(*) > 0 THEN
|
||||
'CREATE EXTENSION TOPN'
|
||||
ELSE 'SELECT false AS topn_present' END
|
||||
AS create_topn FROM pg_available_extensions()
|
||||
WHERE name = 'topn'
|
||||
\gset
|
||||
:create_topn;
|
||||
\c - - - :worker_1_port
|
||||
:create_topn;
|
||||
\c - - - :worker_2_port
|
||||
:create_topn;
|
||||
\c - - - :master_port
|
||||
CREATE TABLE customer_reviews (day date, user_id int, review int);
|
||||
CREATE TABLE popular_reviewer(day date, reviewers jsonb);
|
||||
SELECT create_distributed_table('customer_reviews', 'user_id');
|
||||
create_distributed_table
|
||||
--------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT create_distributed_table('popular_reviewer', 'day');
|
||||
create_distributed_table
|
||||
--------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
INSERT INTO customer_reviews
|
||||
SELECT day, user_id % 7, review % 5
|
||||
FROM generate_series('2018-05-24'::timestamp, '2018-06-24'::timestamp, '1 day'::interval) as f(day),
|
||||
generate_series(1,30) as g(user_id), generate_series(0,30) AS r(review);
|
||||
INSERT INTO customer_reviews
|
||||
SELECT day, user_id % 13, review % 3
|
||||
FROM generate_series('2018-06-10'::timestamp, '2018-07-10'::timestamp, '1 day'::interval) as f(day),
|
||||
generate_series(1,30) as g(user_id), generate_series(0,30) AS r(review);
|
||||
-- Run topn on raw data
|
||||
SELECT (topn(agg, 10)).*
|
||||
FROM (
|
||||
SELECT topn_add_agg(user_id::text) AS agg
|
||||
FROM customer_reviews
|
||||
)a
|
||||
ORDER BY 2 DESC, 1;
|
||||
item | frequency
|
||||
------+-----------
|
||||
1 | 7843
|
||||
2 | 7843
|
||||
3 | 6851
|
||||
4 | 6851
|
||||
0 | 5890
|
||||
5 | 5890
|
||||
6 | 5890
|
||||
7 | 1922
|
||||
8 | 1922
|
||||
9 | 1922
|
||||
(10 rows)
|
||||
|
||||
-- Aggregate the data into popular_reviewer
|
||||
INSERT INTO popular_reviewer
|
||||
SELECT day, topn_add_agg(user_id::text)
|
||||
FROM customer_reviews
|
||||
GROUP BY 1;
|
||||
-- Basic topn check on aggregated data
|
||||
SELECT day, (topn(reviewers, 10)).*
|
||||
FROM popular_reviewer
|
||||
WHERE day >= '2018-06-20' and day <= '2018-06-30'
|
||||
ORDER BY 3 DESC, 1, 2
|
||||
LIMIT 10;
|
||||
day | item | frequency
|
||||
------------+------+-----------
|
||||
06-20-2018 | 1 | 248
|
||||
06-20-2018 | 2 | 248
|
||||
06-21-2018 | 1 | 248
|
||||
06-21-2018 | 2 | 248
|
||||
06-22-2018 | 1 | 248
|
||||
06-22-2018 | 2 | 248
|
||||
06-23-2018 | 1 | 248
|
||||
06-23-2018 | 2 | 248
|
||||
06-24-2018 | 1 | 248
|
||||
06-24-2018 | 2 | 248
|
||||
(10 rows)
|
||||
|
||||
-- Union aggregated data for one week
|
||||
SELECT (topn(agg, 10)).*
|
||||
FROM (
|
||||
SELECT topn_union_agg(reviewers) AS agg
|
||||
FROM popular_reviewer
|
||||
WHERE day >= '2018-05-24'::date AND day <= '2018-05-31'::date
|
||||
)a
|
||||
ORDER BY 2 DESC, 1;
|
||||
item | frequency
|
||||
------+-----------
|
||||
1 | 1240
|
||||
2 | 1240
|
||||
0 | 992
|
||||
3 | 992
|
||||
4 | 992
|
||||
5 | 992
|
||||
6 | 992
|
||||
(7 rows)
|
||||
|
||||
SELECT month, (topn(agg, 5)).*
|
||||
FROM (
|
||||
SELECT EXTRACT(MONTH FROM day) AS month, topn_union_agg(reviewers) AS agg
|
||||
FROM popular_reviewer
|
||||
WHERE day >= '2018-06-23' AND day <= '2018-07-01'
|
||||
GROUP BY 1
|
||||
ORDER BY 1
|
||||
)a
|
||||
ORDER BY 1, 3 DESC, 2;
|
||||
month | item | frequency
|
||||
-------+------+-----------
|
||||
6 | 1 | 1054
|
||||
6 | 2 | 1054
|
||||
6 | 3 | 992
|
||||
6 | 4 | 992
|
||||
6 | 0 | 744
|
||||
7 | 1 | 93
|
||||
7 | 2 | 93
|
||||
7 | 3 | 93
|
||||
7 | 4 | 93
|
||||
7 | 8 | 62
|
||||
(10 rows)
|
||||
|
||||
-- TODO the following queries will be supported after we fix #2265
|
||||
-- They work for PG9.6 but not for PG10
|
||||
SELECT (topn(topn_union_agg(reviewers), 10)).*
|
||||
FROM popular_reviewer
|
||||
WHERE day >= '2018-05-24'::date AND day <= '2018-05-31'::date
|
||||
ORDER BY 2 DESC, 1;
|
||||
ERROR: set-valued function called in context that cannot accept a set
|
||||
LINE 1: SELECT (topn(topn_union_agg(reviewers), 10)).*
|
||||
^
|
||||
SELECT (topn(topn_add_agg(user_id::text), 10)).*
|
||||
FROM customer_reviews
|
||||
ORDER BY 2 DESC, 1;
|
||||
ERROR: set-valued function called in context that cannot accept a set
|
||||
LINE 1: SELECT (topn(topn_add_agg(user_id::text), 10)).*
|
||||
^
|
||||
-- The following is going to be supported after window function support
|
||||
SELECT day, (topn(agg, 10)).*
|
||||
FROM (
|
||||
SELECT day, topn_union_agg(reviewers) OVER seven_days AS agg
|
||||
FROM popular_reviewer
|
||||
WINDOW seven_days AS (ORDER BY day ASC ROWS 6 PRECEDING)
|
||||
)a
|
||||
ORDER BY 3 DESC, 1, 2
|
||||
LIMIT 10;
|
||||
ERROR: could not run distributed query because the window function that is used cannot be pushed down
|
||||
HINT: Window functions are supported in two ways. Either add an equality filter on the distributed tables' partition column or use the window functions with a PARTITION BY clause containing the distribution column
|
||||
SELECT day, (topn(topn_add_agg(user_id::text) OVER seven_days, 10)).*
|
||||
FROM customer_reviews
|
||||
WINDOW seven_days AS (ORDER BY day ASC ROWS 6 PRECEDING)
|
||||
ORDER BY 3 DESC, 1, 2
|
||||
LIMIT 10;
|
||||
ERROR: could not run distributed query because the window function that is used cannot be pushed down
|
||||
HINT: Window functions are supported in two ways. Either add an equality filter on the distributed tables' partition column or use the window functions with a PARTITION BY clause containing the distribution column
|
||||
DROP TABLE customer_reviews;
|
||||
DROP TABLE popular_reviewer;
|
||||
|
|
|
@ -110,3 +110,150 @@ LINE 2: FROM daily_uniques
|
|||
DROP TABLE raw_table;
|
||||
DROP TABLE daily_uniques;
|
||||
ERROR: table "daily_uniques" does not exist
|
||||
-- Check if TopN aggregates work as expected
|
||||
-- Create TopN extension if present, print false result otherwise
|
||||
SELECT CASE WHEN COUNT(*) > 0 THEN
|
||||
'CREATE EXTENSION TOPN'
|
||||
ELSE 'SELECT false AS topn_present' END
|
||||
AS create_topn FROM pg_available_extensions()
|
||||
WHERE name = 'topn'
|
||||
\gset
|
||||
:create_topn;
|
||||
topn_present
|
||||
--------------
|
||||
f
|
||||
(1 row)
|
||||
|
||||
\c - - - :worker_1_port
|
||||
:create_topn;
|
||||
topn_present
|
||||
--------------
|
||||
f
|
||||
(1 row)
|
||||
|
||||
\c - - - :worker_2_port
|
||||
:create_topn;
|
||||
topn_present
|
||||
--------------
|
||||
f
|
||||
(1 row)
|
||||
|
||||
\c - - - :master_port
|
||||
CREATE TABLE customer_reviews (day date, user_id int, review int);
|
||||
CREATE TABLE popular_reviewer(day date, reviewers jsonb);
|
||||
SELECT create_distributed_table('customer_reviews', 'user_id');
|
||||
create_distributed_table
|
||||
--------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT create_distributed_table('popular_reviewer', 'day');
|
||||
create_distributed_table
|
||||
--------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
INSERT INTO customer_reviews
|
||||
SELECT day, user_id % 7, review % 5
|
||||
FROM generate_series('2018-05-24'::timestamp, '2018-06-24'::timestamp, '1 day'::interval) as f(day),
|
||||
generate_series(1,30) as g(user_id), generate_series(0,30) AS r(review);
|
||||
INSERT INTO customer_reviews
|
||||
SELECT day, user_id % 13, review % 3
|
||||
FROM generate_series('2018-06-10'::timestamp, '2018-07-10'::timestamp, '1 day'::interval) as f(day),
|
||||
generate_series(1,30) as g(user_id), generate_series(0,30) AS r(review);
|
||||
-- Run topn on raw data
|
||||
SELECT (topn(agg, 10)).*
|
||||
FROM (
|
||||
SELECT topn_add_agg(user_id::text) AS agg
|
||||
FROM customer_reviews
|
||||
)a
|
||||
ORDER BY 2 DESC, 1;
|
||||
ERROR: function topn_add_agg(text) does not exist
|
||||
LINE 3: SELECT topn_add_agg(user_id::text) AS agg
|
||||
^
|
||||
HINT: No function matches the given name and argument types. You might need to add explicit type casts.
|
||||
-- Aggregate the data into popular_reviewer
|
||||
INSERT INTO popular_reviewer
|
||||
SELECT day, topn_add_agg(user_id::text)
|
||||
FROM customer_reviews
|
||||
GROUP BY 1;
|
||||
ERROR: function topn_add_agg(text) does not exist
|
||||
LINE 2: SELECT day, topn_add_agg(user_id::text)
|
||||
^
|
||||
HINT: No function matches the given name and argument types. You might need to add explicit type casts.
|
||||
-- Basic topn check on aggregated data
|
||||
SELECT day, (topn(reviewers, 10)).*
|
||||
FROM popular_reviewer
|
||||
WHERE day >= '2018-06-20' and day <= '2018-06-30'
|
||||
ORDER BY 3 DESC, 1, 2
|
||||
LIMIT 10;
|
||||
ERROR: function topn(jsonb, integer) does not exist
|
||||
LINE 1: SELECT day, (topn(reviewers, 10)).*
|
||||
^
|
||||
HINT: No function matches the given name and argument types. You might need to add explicit type casts.
|
||||
-- Union aggregated data for one week
|
||||
SELECT (topn(agg, 10)).*
|
||||
FROM (
|
||||
SELECT topn_union_agg(reviewers) AS agg
|
||||
FROM popular_reviewer
|
||||
WHERE day >= '2018-05-24'::date AND day <= '2018-05-31'::date
|
||||
)a
|
||||
ORDER BY 2 DESC, 1;
|
||||
ERROR: function topn_union_agg(jsonb) does not exist
|
||||
LINE 3: SELECT topn_union_agg(reviewers) AS agg
|
||||
^
|
||||
HINT: No function matches the given name and argument types. You might need to add explicit type casts.
|
||||
SELECT month, (topn(agg, 5)).*
|
||||
FROM (
|
||||
SELECT EXTRACT(MONTH FROM day) AS month, topn_union_agg(reviewers) AS agg
|
||||
FROM popular_reviewer
|
||||
WHERE day >= '2018-06-23' AND day <= '2018-07-01'
|
||||
GROUP BY 1
|
||||
ORDER BY 1
|
||||
)a
|
||||
ORDER BY 1, 3 DESC, 2;
|
||||
ERROR: function topn_union_agg(jsonb) does not exist
|
||||
LINE 3: SELECT EXTRACT(MONTH FROM day) AS month, topn_union_agg(rev...
|
||||
^
|
||||
HINT: No function matches the given name and argument types. You might need to add explicit type casts.
|
||||
-- TODO the following queries will be supported after we fix #2265
|
||||
-- They work for PG9.6 but not for PG10
|
||||
SELECT (topn(topn_union_agg(reviewers), 10)).*
|
||||
FROM popular_reviewer
|
||||
WHERE day >= '2018-05-24'::date AND day <= '2018-05-31'::date
|
||||
ORDER BY 2 DESC, 1;
|
||||
ERROR: function topn_union_agg(jsonb) does not exist
|
||||
LINE 1: SELECT (topn(topn_union_agg(reviewers), 10)).*
|
||||
^
|
||||
HINT: No function matches the given name and argument types. You might need to add explicit type casts.
|
||||
SELECT (topn(topn_add_agg(user_id::text), 10)).*
|
||||
FROM customer_reviews
|
||||
ORDER BY 2 DESC, 1;
|
||||
ERROR: function topn_add_agg(text) does not exist
|
||||
LINE 1: SELECT (topn(topn_add_agg(user_id::text), 10)).*
|
||||
^
|
||||
HINT: No function matches the given name and argument types. You might need to add explicit type casts.
|
||||
-- The following is going to be supported after window function support
|
||||
SELECT day, (topn(agg, 10)).*
|
||||
FROM (
|
||||
SELECT day, topn_union_agg(reviewers) OVER seven_days AS agg
|
||||
FROM popular_reviewer
|
||||
WINDOW seven_days AS (ORDER BY day ASC ROWS 6 PRECEDING)
|
||||
)a
|
||||
ORDER BY 3 DESC, 1, 2
|
||||
LIMIT 10;
|
||||
ERROR: function topn_union_agg(jsonb) does not exist
|
||||
LINE 3: SELECT day, topn_union_agg(reviewers) OVER seven_days AS ag...
|
||||
^
|
||||
HINT: No function matches the given name and argument types. You might need to add explicit type casts.
|
||||
SELECT day, (topn(topn_add_agg(user_id::text) OVER seven_days, 10)).*
|
||||
FROM customer_reviews
|
||||
WINDOW seven_days AS (ORDER BY day ASC ROWS 6 PRECEDING)
|
||||
ORDER BY 3 DESC, 1, 2
|
||||
LIMIT 10;
|
||||
ERROR: function topn_add_agg(text) does not exist
|
||||
LINE 1: SELECT day, (topn(topn_add_agg(user_id::text) OVER seven_day...
|
||||
^
|
||||
HINT: No function matches the given name and argument types. You might need to add explicit type casts.
|
||||
DROP TABLE customer_reviews;
|
||||
DROP TABLE popular_reviewer;
|
||||
|
|
|
@ -0,0 +1,288 @@
|
|||
--
|
||||
-- CUSTOM_AGGREGATE_SUPPORT
|
||||
--
|
||||
-- Create HLL extension if present, print false result otherwise
|
||||
SELECT CASE WHEN COUNT(*) > 0 THEN
|
||||
'CREATE EXTENSION HLL'
|
||||
ELSE 'SELECT false AS hll_present' END
|
||||
AS create_cmd FROM pg_available_extensions()
|
||||
WHERE name = 'hll'
|
||||
\gset
|
||||
:create_cmd;
|
||||
ERROR: extension "hll" already exists
|
||||
\c - - - :worker_1_port
|
||||
:create_cmd;
|
||||
ERROR: extension "hll" already exists
|
||||
\c - - - :worker_2_port
|
||||
:create_cmd;
|
||||
ERROR: extension "hll" already exists
|
||||
\c - - - :master_port
|
||||
SET citus.shard_count TO 4;
|
||||
CREATE TABLE raw_table (day date, user_id int);
|
||||
CREATE TABLE daily_uniques(day date, unique_users hll);
|
||||
SELECT create_distributed_table('raw_table', 'user_id');
|
||||
create_distributed_table
|
||||
--------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT create_distributed_table('daily_uniques', 'day');
|
||||
create_distributed_table
|
||||
--------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
INSERT INTO raw_table
|
||||
SELECT day, user_id % 19
|
||||
FROM generate_series('2018-05-24'::timestamp, '2018-06-24'::timestamp, '1 day'::interval) as f(day),
|
||||
generate_series(1,100) as g(user_id);
|
||||
INSERT INTO raw_table
|
||||
SELECT day, user_id % 13
|
||||
FROM generate_series('2018-06-10'::timestamp, '2018-07-10'::timestamp, '1 day'::interval) as f(day),
|
||||
generate_series(1,100) as g(user_id);
|
||||
-- Run hll on raw data
|
||||
SELECT hll_cardinality(hll_union_agg(agg))
|
||||
FROM (
|
||||
SELECT hll_add_agg(hll_hash_integer(user_id)) AS agg
|
||||
FROM raw_table)a;
|
||||
hll_cardinality
|
||||
-----------------
|
||||
19
|
||||
(1 row)
|
||||
|
||||
-- Aggregate the data into daily_uniques
|
||||
INSERT INTO daily_uniques
|
||||
SELECT day, hll_add_agg(hll_hash_integer(user_id))
|
||||
FROM raw_table
|
||||
GROUP BY 1;
|
||||
-- Basic hll_cardinality check on aggregated data
|
||||
SELECT day, hll_cardinality(unique_users)
|
||||
FROM daily_uniques
|
||||
WHERE day >= '2018-06-20' and day <= '2018-06-30'
|
||||
ORDER BY 2 DESC,1
|
||||
LIMIT 10;
|
||||
day | hll_cardinality
|
||||
------------+-----------------
|
||||
06-20-2018 | 19
|
||||
06-21-2018 | 19
|
||||
06-22-2018 | 19
|
||||
06-23-2018 | 19
|
||||
06-24-2018 | 19
|
||||
06-25-2018 | 13
|
||||
06-26-2018 | 13
|
||||
06-27-2018 | 13
|
||||
06-28-2018 | 13
|
||||
06-29-2018 | 13
|
||||
(10 rows)
|
||||
|
||||
-- Union aggregated data for one week
|
||||
SELECT hll_cardinality(hll_union_agg(unique_users))
|
||||
FROM daily_uniques
|
||||
WHERE day >= '2018-05-24'::date AND day <= '2018-05-31'::date;
|
||||
hll_cardinality
|
||||
-----------------
|
||||
19
|
||||
(1 row)
|
||||
|
||||
SELECT EXTRACT(MONTH FROM day) AS month, hll_cardinality(hll_union_agg(unique_users))
|
||||
FROM daily_uniques
|
||||
WHERE day >= '2018-06-23' AND day <= '2018-07-01'
|
||||
GROUP BY 1
|
||||
ORDER BY 1;
|
||||
month | hll_cardinality
|
||||
-------+-----------------
|
||||
6 | 19
|
||||
7 | 13
|
||||
(2 rows)
|
||||
|
||||
-- These are going to be supported after window function support
|
||||
SELECT day, hll_cardinality(hll_union_agg(unique_users) OVER seven_days)
|
||||
FROM daily_uniques
|
||||
WINDOW seven_days AS (ORDER BY day ASC ROWS 6 PRECEDING);
|
||||
ERROR: could not run distributed query because the window function that is used cannot be pushed down
|
||||
HINT: Window functions are supported in two ways. Either add an equality filter on the distributed tables' partition column or use the window functions with a PARTITION BY clause containing the distribution column
|
||||
SELECT day, (hll_cardinality(hll_union_agg(unique_users) OVER two_days)) - hll_cardinality(unique_users) AS lost_uniques
|
||||
FROM daily_uniques
|
||||
WINDOW two_days AS (ORDER BY day ASC ROWS 1 PRECEDING);
|
||||
ERROR: could not run distributed query because the window function that is used cannot be pushed down
|
||||
HINT: Window functions are supported in two ways. Either add an equality filter on the distributed tables' partition column or use the window functions with a PARTITION BY clause containing the distribution column
|
||||
DROP TABLE raw_table;
|
||||
DROP TABLE daily_uniques;
|
||||
-- Check if TopN aggregates work as expected
|
||||
-- Create TopN extension if present, print false result otherwise
|
||||
SELECT CASE WHEN COUNT(*) > 0 THEN
|
||||
'CREATE EXTENSION TOPN'
|
||||
ELSE 'SELECT false AS topn_present' END
|
||||
AS create_topn FROM pg_available_extensions()
|
||||
WHERE name = 'topn'
|
||||
\gset
|
||||
:create_topn;
|
||||
\c - - - :worker_1_port
|
||||
:create_topn;
|
||||
\c - - - :worker_2_port
|
||||
:create_topn;
|
||||
\c - - - :master_port
|
||||
CREATE TABLE customer_reviews (day date, user_id int, review int);
|
||||
CREATE TABLE popular_reviewer(day date, reviewers jsonb);
|
||||
SELECT create_distributed_table('customer_reviews', 'user_id');
|
||||
create_distributed_table
|
||||
--------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT create_distributed_table('popular_reviewer', 'day');
|
||||
create_distributed_table
|
||||
--------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
INSERT INTO customer_reviews
|
||||
SELECT day, user_id % 7, review % 5
|
||||
FROM generate_series('2018-05-24'::timestamp, '2018-06-24'::timestamp, '1 day'::interval) as f(day),
|
||||
generate_series(1,30) as g(user_id), generate_series(0,30) AS r(review);
|
||||
INSERT INTO customer_reviews
|
||||
SELECT day, user_id % 13, review % 3
|
||||
FROM generate_series('2018-06-10'::timestamp, '2018-07-10'::timestamp, '1 day'::interval) as f(day),
|
||||
generate_series(1,30) as g(user_id), generate_series(0,30) AS r(review);
|
||||
-- Run topn on raw data
|
||||
SELECT (topn(agg, 10)).*
|
||||
FROM (
|
||||
SELECT topn_add_agg(user_id::text) AS agg
|
||||
FROM customer_reviews
|
||||
)a
|
||||
ORDER BY 2 DESC, 1;
|
||||
item | frequency
|
||||
------+-----------
|
||||
1 | 7843
|
||||
2 | 7843
|
||||
3 | 6851
|
||||
4 | 6851
|
||||
0 | 5890
|
||||
5 | 5890
|
||||
6 | 5890
|
||||
7 | 1922
|
||||
8 | 1922
|
||||
9 | 1922
|
||||
(10 rows)
|
||||
|
||||
-- Aggregate the data into popular_reviewer
|
||||
INSERT INTO popular_reviewer
|
||||
SELECT day, topn_add_agg(user_id::text)
|
||||
FROM customer_reviews
|
||||
GROUP BY 1;
|
||||
-- Basic topn check on aggregated data
|
||||
SELECT day, (topn(reviewers, 10)).*
|
||||
FROM popular_reviewer
|
||||
WHERE day >= '2018-06-20' and day <= '2018-06-30'
|
||||
ORDER BY 3 DESC, 1, 2
|
||||
LIMIT 10;
|
||||
day | item | frequency
|
||||
------------+------+-----------
|
||||
06-20-2018 | 1 | 248
|
||||
06-20-2018 | 2 | 248
|
||||
06-21-2018 | 1 | 248
|
||||
06-21-2018 | 2 | 248
|
||||
06-22-2018 | 1 | 248
|
||||
06-22-2018 | 2 | 248
|
||||
06-23-2018 | 1 | 248
|
||||
06-23-2018 | 2 | 248
|
||||
06-24-2018 | 1 | 248
|
||||
06-24-2018 | 2 | 248
|
||||
(10 rows)
|
||||
|
||||
-- Union aggregated data for one week
|
||||
SELECT (topn(agg, 10)).*
|
||||
FROM (
|
||||
SELECT topn_union_agg(reviewers) AS agg
|
||||
FROM popular_reviewer
|
||||
WHERE day >= '2018-05-24'::date AND day <= '2018-05-31'::date
|
||||
)a
|
||||
ORDER BY 2 DESC, 1;
|
||||
item | frequency
|
||||
------+-----------
|
||||
1 | 1240
|
||||
2 | 1240
|
||||
0 | 992
|
||||
3 | 992
|
||||
4 | 992
|
||||
5 | 992
|
||||
6 | 992
|
||||
(7 rows)
|
||||
|
||||
SELECT month, (topn(agg, 5)).*
|
||||
FROM (
|
||||
SELECT EXTRACT(MONTH FROM day) AS month, topn_union_agg(reviewers) AS agg
|
||||
FROM popular_reviewer
|
||||
WHERE day >= '2018-06-23' AND day <= '2018-07-01'
|
||||
GROUP BY 1
|
||||
ORDER BY 1
|
||||
)a
|
||||
ORDER BY 1, 3 DESC, 2;
|
||||
month | item | frequency
|
||||
-------+------+-----------
|
||||
6 | 1 | 1054
|
||||
6 | 2 | 1054
|
||||
6 | 3 | 992
|
||||
6 | 4 | 992
|
||||
6 | 0 | 744
|
||||
7 | 1 | 93
|
||||
7 | 2 | 93
|
||||
7 | 3 | 93
|
||||
7 | 4 | 93
|
||||
7 | 8 | 62
|
||||
(10 rows)
|
||||
|
||||
-- TODO the following queries will be supported after we fix #2265
|
||||
-- They work for PG9.6 but not for PG10
|
||||
SELECT (topn(topn_union_agg(reviewers), 10)).*
|
||||
FROM popular_reviewer
|
||||
WHERE day >= '2018-05-24'::date AND day <= '2018-05-31'::date
|
||||
ORDER BY 2 DESC, 1;
|
||||
item | frequency
|
||||
------+-----------
|
||||
1 | 1240
|
||||
2 | 1240
|
||||
0 | 992
|
||||
3 | 992
|
||||
4 | 992
|
||||
5 | 992
|
||||
6 | 992
|
||||
(7 rows)
|
||||
|
||||
SELECT (topn(topn_add_agg(user_id::text), 10)).*
|
||||
FROM customer_reviews
|
||||
ORDER BY 2 DESC, 1;
|
||||
item | frequency
|
||||
------+-----------
|
||||
1 | 7843
|
||||
2 | 7843
|
||||
3 | 6851
|
||||
4 | 6851
|
||||
0 | 5890
|
||||
5 | 5890
|
||||
6 | 5890
|
||||
7 | 1922
|
||||
8 | 1922
|
||||
9 | 1922
|
||||
(10 rows)
|
||||
|
||||
-- The following is going to be supported after window function support
|
||||
SELECT day, (topn(agg, 10)).*
|
||||
FROM (
|
||||
SELECT day, topn_union_agg(reviewers) OVER seven_days AS agg
|
||||
FROM popular_reviewer
|
||||
WINDOW seven_days AS (ORDER BY day ASC ROWS 6 PRECEDING)
|
||||
)a
|
||||
ORDER BY 3 DESC, 1, 2
|
||||
LIMIT 10;
|
||||
ERROR: could not run distributed query because the window function that is used cannot be pushed down
|
||||
HINT: Window functions are supported in two ways. Either add an equality filter on the distributed tables' partition column or use the window functions with a PARTITION BY clause containing the distribution column
|
||||
SELECT day, (topn(topn_add_agg(user_id::text) OVER seven_days, 10)).*
|
||||
FROM customer_reviews
|
||||
WINDOW seven_days AS (ORDER BY day ASC ROWS 6 PRECEDING)
|
||||
ORDER BY 3 DESC, 1, 2
|
||||
LIMIT 10;
|
||||
ERROR: could not run distributed query because the window function that is used cannot be pushed down
|
||||
HINT: Window functions are supported in two ways. Either add an equality filter on the distributed tables' partition column or use the window functions with a PARTITION BY clause containing the distribution column
|
||||
DROP TABLE customer_reviews;
|
||||
DROP TABLE popular_reviewer;
|
|
@ -78,3 +78,106 @@ WINDOW two_days AS (ORDER BY day ASC ROWS 1 PRECEDING);
|
|||
|
||||
DROP TABLE raw_table;
|
||||
DROP TABLE daily_uniques;
|
||||
|
||||
-- Check if TopN aggregates work as expected
|
||||
-- Create TopN extension if present, print false result otherwise
|
||||
SELECT CASE WHEN COUNT(*) > 0 THEN
|
||||
'CREATE EXTENSION TOPN'
|
||||
ELSE 'SELECT false AS topn_present' END
|
||||
AS create_topn FROM pg_available_extensions()
|
||||
WHERE name = 'topn'
|
||||
\gset
|
||||
|
||||
:create_topn;
|
||||
|
||||
\c - - - :worker_1_port
|
||||
:create_topn;
|
||||
|
||||
\c - - - :worker_2_port
|
||||
:create_topn;
|
||||
|
||||
\c - - - :master_port
|
||||
CREATE TABLE customer_reviews (day date, user_id int, review int);
|
||||
CREATE TABLE popular_reviewer(day date, reviewers jsonb);
|
||||
|
||||
SELECT create_distributed_table('customer_reviews', 'user_id');
|
||||
SELECT create_distributed_table('popular_reviewer', 'day');
|
||||
|
||||
INSERT INTO customer_reviews
|
||||
SELECT day, user_id % 7, review % 5
|
||||
FROM generate_series('2018-05-24'::timestamp, '2018-06-24'::timestamp, '1 day'::interval) as f(day),
|
||||
generate_series(1,30) as g(user_id), generate_series(0,30) AS r(review);
|
||||
INSERT INTO customer_reviews
|
||||
SELECT day, user_id % 13, review % 3
|
||||
FROM generate_series('2018-06-10'::timestamp, '2018-07-10'::timestamp, '1 day'::interval) as f(day),
|
||||
generate_series(1,30) as g(user_id), generate_series(0,30) AS r(review);
|
||||
|
||||
-- Run topn on raw data
|
||||
SELECT (topn(agg, 10)).*
|
||||
FROM (
|
||||
SELECT topn_add_agg(user_id::text) AS agg
|
||||
FROM customer_reviews
|
||||
)a
|
||||
ORDER BY 2 DESC, 1;
|
||||
|
||||
-- Aggregate the data into popular_reviewer
|
||||
INSERT INTO popular_reviewer
|
||||
SELECT day, topn_add_agg(user_id::text)
|
||||
FROM customer_reviews
|
||||
GROUP BY 1;
|
||||
|
||||
-- Basic topn check on aggregated data
|
||||
SELECT day, (topn(reviewers, 10)).*
|
||||
FROM popular_reviewer
|
||||
WHERE day >= '2018-06-20' and day <= '2018-06-30'
|
||||
ORDER BY 3 DESC, 1, 2
|
||||
LIMIT 10;
|
||||
|
||||
-- Union aggregated data for one week
|
||||
SELECT (topn(agg, 10)).*
|
||||
FROM (
|
||||
SELECT topn_union_agg(reviewers) AS agg
|
||||
FROM popular_reviewer
|
||||
WHERE day >= '2018-05-24'::date AND day <= '2018-05-31'::date
|
||||
)a
|
||||
ORDER BY 2 DESC, 1;
|
||||
|
||||
SELECT month, (topn(agg, 5)).*
|
||||
FROM (
|
||||
SELECT EXTRACT(MONTH FROM day) AS month, topn_union_agg(reviewers) AS agg
|
||||
FROM popular_reviewer
|
||||
WHERE day >= '2018-06-23' AND day <= '2018-07-01'
|
||||
GROUP BY 1
|
||||
ORDER BY 1
|
||||
)a
|
||||
ORDER BY 1, 3 DESC, 2;
|
||||
|
||||
-- TODO the following queries will be supported after we fix #2265
|
||||
-- They work for PG9.6 but not for PG10
|
||||
SELECT (topn(topn_union_agg(reviewers), 10)).*
|
||||
FROM popular_reviewer
|
||||
WHERE day >= '2018-05-24'::date AND day <= '2018-05-31'::date
|
||||
ORDER BY 2 DESC, 1;
|
||||
|
||||
SELECT (topn(topn_add_agg(user_id::text), 10)).*
|
||||
FROM customer_reviews
|
||||
ORDER BY 2 DESC, 1;
|
||||
|
||||
-- The following is going to be supported after window function support
|
||||
SELECT day, (topn(agg, 10)).*
|
||||
FROM (
|
||||
SELECT day, topn_union_agg(reviewers) OVER seven_days AS agg
|
||||
FROM popular_reviewer
|
||||
WINDOW seven_days AS (ORDER BY day ASC ROWS 6 PRECEDING)
|
||||
)a
|
||||
ORDER BY 3 DESC, 1, 2
|
||||
LIMIT 10;
|
||||
|
||||
SELECT day, (topn(topn_add_agg(user_id::text) OVER seven_days, 10)).*
|
||||
FROM customer_reviews
|
||||
WINDOW seven_days AS (ORDER BY day ASC ROWS 6 PRECEDING)
|
||||
ORDER BY 3 DESC, 1, 2
|
||||
LIMIT 10;
|
||||
|
||||
DROP TABLE customer_reviews;
|
||||
DROP TABLE popular_reviewer;
|
||||
|
|
Loading…
Reference in New Issue