Merge pull request #2193 from citusdata/topn_agg_support

Topn aggregate support
pull/2268/head
Mehmet Furkan ŞAHİN 2018-07-10 15:36:33 +03:00 committed by GitHub
commit 1c24380877
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 753 additions and 5 deletions

View File

@ -42,13 +42,18 @@ install:
- pip3 install --user mitmproxy==3.0.4 - pip3 install --user mitmproxy==3.0.4
- pip3 install --user construct==2.9.45 - pip3 install --user construct==2.9.45
- mitmproxy --version - mitmproxy --version
# download and install HLL manually, as custom builds won't satisfy deps # download and install HLL and TopN manually, as custom builds won't satisfy deps
# only install if performing non-11 build # only install if performing non-11 build
- | - |
if [ "${PGVERSION}" != "11" ]; then if [ "${PGVERSION}" != "11" ]; then
apt-get download "postgresql-${PGVERSION}-hll=2.10.2.citus-1" apt-get download "postgresql-${PGVERSION}-hll=2.10.2.citus-1"
sudo dpkg --force-confold --force-confdef --force-all -i *hll*.deb sudo dpkg --force-confold --force-confdef --force-all -i *hll*.deb
fi fi
- |
if [ "${PGVERSION}" != "11" ]; then
apt-get download "postgresql-${PGVERSION}-topn=2.1.0"
sudo dpkg --force-confold --force-confdef --force-all -i *topn*.deb
fi
before_script: citus_indent --quiet --check before_script: citus_indent --quiet --check
script: CFLAGS=-Werror pg_travis_multi_test check script: CFLAGS=-Werror pg_travis_multi_test check
after_success: after_success:

View File

@ -1490,8 +1490,8 @@ MasterAggregateMutator(Node *originalNode, MasterAggregateWalkerContext *walkerC
/* /*
* MasterAggregateExpression creates the master aggregate expression using the * MasterAggregateExpression creates the master aggregate expression using the
* original aggregate and aggregate's type information. This function handles * original aggregate and aggregate's type information. This function handles
* the average, count, array_agg, and hll aggregates separately due to differences * the average, count, array_agg, hll and topn aggregates separately due to
* in these aggregate functions' transformations. * differences in these aggregate functions' transformations.
* *
* Note that this function has implicit knowledge of the transformations applied * Note that this function has implicit knowledge of the transformations applied
* for worker nodes on the original aggregate. The function uses this implicit * for worker nodes on the original aggregate. The function uses this implicit
@ -1798,6 +1798,45 @@ MasterAggregateExpression(Aggref *originalAggregate,
newMasterExpression = (Expr *) unionAggregate; newMasterExpression = (Expr *) unionAggregate;
} }
else if (aggregateType == AGGREGATE_TOPN_UNION_AGG ||
aggregateType == AGGREGATE_TOPN_ADD_AGG)
{
/*
* Top-N aggregates are handled in two steps. First, we compute
* topn_add_agg() or topn_union_agg() aggregates on the worker nodes.
* Then, we gather the Top-Ns on the master and take the union of all
* to get the final topn.
*/
TargetEntry *topNTargetEntry = NULL;
Aggref *unionAggregate = NULL;
/* worker aggregate and original aggregate have same return type */
Oid topnType = exprType((Node *) originalAggregate);
Oid unionFunctionId = AggregateFunctionOid(TOPN_UNION_AGGREGATE_NAME,
topnType);
int32 topnReturnTypeMod = exprTypmod((Node *) originalAggregate);
Oid topnTypeCollationId = exprCollation((Node *) originalAggregate);
/* create argument for the topn_union_agg() aggregate */
Var *topnColumn = makeVar(masterTableId, walkerContext->columnId, topnType,
topnReturnTypeMod, topnTypeCollationId, columnLevelsUp);
walkerContext->columnId++;
topNTargetEntry = makeTargetEntry((Expr *) topnColumn, argumentId, NULL, false);
/* construct the master topn_union_agg() expression */
unionAggregate = makeNode(Aggref);
unionAggregate->aggfnoid = unionFunctionId;
unionAggregate->aggtype = topnType;
unionAggregate->args = list_make1(topNTargetEntry);
unionAggregate->aggkind = AGGKIND_NORMAL;
unionAggregate->aggfilter = NULL;
unionAggregate->aggtranstype = InvalidOid;
unionAggregate->aggargtypes = list_make1_oid(topnType);
unionAggregate->aggsplit = AGGSPLIT_SIMPLE;
newMasterExpression = (Expr *) unionAggregate;
}
else else
{ {
/* /*

View File

@ -39,6 +39,10 @@
#define HLL_UNION_AGGREGATE_NAME "hll_union_agg" #define HLL_UNION_AGGREGATE_NAME "hll_union_agg"
#define HLL_CARDINALITY_FUNC_NAME "hll_cardinality" #define HLL_CARDINALITY_FUNC_NAME "hll_cardinality"
/* Definitions related to Top-N approximations */
#define TOPN_ADD_AGGREGATE_NAME "topn_add_agg"
#define TOPN_UNION_AGGREGATE_NAME "topn_union_agg"
/* /*
* AggregateType represents an aggregate function's type, where the function is * AggregateType represents an aggregate function's type, where the function is
@ -68,7 +72,9 @@ typedef enum
AGGREGATE_BOOL_OR = 14, AGGREGATE_BOOL_OR = 14,
AGGREGATE_EVERY = 15, AGGREGATE_EVERY = 15,
AGGREGATE_HLL_ADD = 16, AGGREGATE_HLL_ADD = 16,
AGGREGATE_HLL_UNION = 17 AGGREGATE_HLL_UNION = 17,
AGGREGATE_TOPN_ADD_AGG = 18,
AGGREGATE_TOPN_UNION_AGG = 19
} AggregateType; } AggregateType;
@ -114,7 +120,8 @@ static const char *const AggregateNames[] = {
"jsonb_agg", "jsonb_object_agg", "jsonb_agg", "jsonb_object_agg",
"json_agg", "json_object_agg", "json_agg", "json_object_agg",
"bit_and", "bit_or", "bool_and", "bool_or", "every", "bit_and", "bit_or", "bool_and", "bool_or", "every",
"hll_add_agg", "hll_union_agg" "hll_add_agg", "hll_union_agg",
"topn_add_agg", "topn_union_agg"
}; };

View File

@ -108,3 +108,162 @@ ERROR: could not run distributed query because the window function that is used
HINT: Window functions are supported in two ways. Either add an equality filter on the distributed tables' partition column or use the window functions with a PARTITION BY clause containing the distribution column HINT: Window functions are supported in two ways. Either add an equality filter on the distributed tables' partition column or use the window functions with a PARTITION BY clause containing the distribution column
DROP TABLE raw_table; DROP TABLE raw_table;
DROP TABLE daily_uniques; DROP TABLE daily_uniques;
-- Check if TopN aggregates work as expected
-- Create TopN extension if present, print false result otherwise
SELECT CASE WHEN COUNT(*) > 0 THEN
'CREATE EXTENSION TOPN'
ELSE 'SELECT false AS topn_present' END
AS create_topn FROM pg_available_extensions()
WHERE name = 'topn'
\gset
:create_topn;
\c - - - :worker_1_port
:create_topn;
\c - - - :worker_2_port
:create_topn;
\c - - - :master_port
CREATE TABLE customer_reviews (day date, user_id int, review int);
CREATE TABLE popular_reviewer(day date, reviewers jsonb);
SELECT create_distributed_table('customer_reviews', 'user_id');
create_distributed_table
--------------------------
(1 row)
SELECT create_distributed_table('popular_reviewer', 'day');
create_distributed_table
--------------------------
(1 row)
INSERT INTO customer_reviews
SELECT day, user_id % 7, review % 5
FROM generate_series('2018-05-24'::timestamp, '2018-06-24'::timestamp, '1 day'::interval) as f(day),
generate_series(1,30) as g(user_id), generate_series(0,30) AS r(review);
INSERT INTO customer_reviews
SELECT day, user_id % 13, review % 3
FROM generate_series('2018-06-10'::timestamp, '2018-07-10'::timestamp, '1 day'::interval) as f(day),
generate_series(1,30) as g(user_id), generate_series(0,30) AS r(review);
-- Run topn on raw data
SELECT (topn(agg, 10)).*
FROM (
SELECT topn_add_agg(user_id::text) AS agg
FROM customer_reviews
)a
ORDER BY 2 DESC, 1;
item | frequency
------+-----------
1 | 7843
2 | 7843
3 | 6851
4 | 6851
0 | 5890
5 | 5890
6 | 5890
7 | 1922
8 | 1922
9 | 1922
(10 rows)
-- Aggregate the data into popular_reviewer
INSERT INTO popular_reviewer
SELECT day, topn_add_agg(user_id::text)
FROM customer_reviews
GROUP BY 1;
-- Basic topn check on aggregated data
SELECT day, (topn(reviewers, 10)).*
FROM popular_reviewer
WHERE day >= '2018-06-20' and day <= '2018-06-30'
ORDER BY 3 DESC, 1, 2
LIMIT 10;
day | item | frequency
------------+------+-----------
06-20-2018 | 1 | 248
06-20-2018 | 2 | 248
06-21-2018 | 1 | 248
06-21-2018 | 2 | 248
06-22-2018 | 1 | 248
06-22-2018 | 2 | 248
06-23-2018 | 1 | 248
06-23-2018 | 2 | 248
06-24-2018 | 1 | 248
06-24-2018 | 2 | 248
(10 rows)
-- Union aggregated data for one week
SELECT (topn(agg, 10)).*
FROM (
SELECT topn_union_agg(reviewers) AS agg
FROM popular_reviewer
WHERE day >= '2018-05-24'::date AND day <= '2018-05-31'::date
)a
ORDER BY 2 DESC, 1;
item | frequency
------+-----------
1 | 1240
2 | 1240
0 | 992
3 | 992
4 | 992
5 | 992
6 | 992
(7 rows)
SELECT month, (topn(agg, 5)).*
FROM (
SELECT EXTRACT(MONTH FROM day) AS month, topn_union_agg(reviewers) AS agg
FROM popular_reviewer
WHERE day >= '2018-06-23' AND day <= '2018-07-01'
GROUP BY 1
ORDER BY 1
)a
ORDER BY 1, 3 DESC, 2;
month | item | frequency
-------+------+-----------
6 | 1 | 1054
6 | 2 | 1054
6 | 3 | 992
6 | 4 | 992
6 | 0 | 744
7 | 1 | 93
7 | 2 | 93
7 | 3 | 93
7 | 4 | 93
7 | 8 | 62
(10 rows)
-- TODO the following queries will be supported after we fix #2265
-- They work for PG9.6 but not for PG10
SELECT (topn(topn_union_agg(reviewers), 10)).*
FROM popular_reviewer
WHERE day >= '2018-05-24'::date AND day <= '2018-05-31'::date
ORDER BY 2 DESC, 1;
ERROR: set-valued function called in context that cannot accept a set
LINE 1: SELECT (topn(topn_union_agg(reviewers), 10)).*
^
SELECT (topn(topn_add_agg(user_id::text), 10)).*
FROM customer_reviews
ORDER BY 2 DESC, 1;
ERROR: set-valued function called in context that cannot accept a set
LINE 1: SELECT (topn(topn_add_agg(user_id::text), 10)).*
^
-- The following is going to be supported after window function support
SELECT day, (topn(agg, 10)).*
FROM (
SELECT day, topn_union_agg(reviewers) OVER seven_days AS agg
FROM popular_reviewer
WINDOW seven_days AS (ORDER BY day ASC ROWS 6 PRECEDING)
)a
ORDER BY 3 DESC, 1, 2
LIMIT 10;
ERROR: could not run distributed query because the window function that is used cannot be pushed down
HINT: Window functions are supported in two ways. Either add an equality filter on the distributed tables' partition column or use the window functions with a PARTITION BY clause containing the distribution column
SELECT day, (topn(topn_add_agg(user_id::text) OVER seven_days, 10)).*
FROM customer_reviews
WINDOW seven_days AS (ORDER BY day ASC ROWS 6 PRECEDING)
ORDER BY 3 DESC, 1, 2
LIMIT 10;
ERROR: could not run distributed query because the window function that is used cannot be pushed down
HINT: Window functions are supported in two ways. Either add an equality filter on the distributed tables' partition column or use the window functions with a PARTITION BY clause containing the distribution column
DROP TABLE customer_reviews;
DROP TABLE popular_reviewer;

View File

@ -110,3 +110,150 @@ LINE 2: FROM daily_uniques
DROP TABLE raw_table; DROP TABLE raw_table;
DROP TABLE daily_uniques; DROP TABLE daily_uniques;
ERROR: table "daily_uniques" does not exist ERROR: table "daily_uniques" does not exist
-- Check if TopN aggregates work as expected
-- Create TopN extension if present, print false result otherwise
SELECT CASE WHEN COUNT(*) > 0 THEN
'CREATE EXTENSION TOPN'
ELSE 'SELECT false AS topn_present' END
AS create_topn FROM pg_available_extensions()
WHERE name = 'topn'
\gset
:create_topn;
topn_present
--------------
f
(1 row)
\c - - - :worker_1_port
:create_topn;
topn_present
--------------
f
(1 row)
\c - - - :worker_2_port
:create_topn;
topn_present
--------------
f
(1 row)
\c - - - :master_port
CREATE TABLE customer_reviews (day date, user_id int, review int);
CREATE TABLE popular_reviewer(day date, reviewers jsonb);
SELECT create_distributed_table('customer_reviews', 'user_id');
create_distributed_table
--------------------------
(1 row)
SELECT create_distributed_table('popular_reviewer', 'day');
create_distributed_table
--------------------------
(1 row)
INSERT INTO customer_reviews
SELECT day, user_id % 7, review % 5
FROM generate_series('2018-05-24'::timestamp, '2018-06-24'::timestamp, '1 day'::interval) as f(day),
generate_series(1,30) as g(user_id), generate_series(0,30) AS r(review);
INSERT INTO customer_reviews
SELECT day, user_id % 13, review % 3
FROM generate_series('2018-06-10'::timestamp, '2018-07-10'::timestamp, '1 day'::interval) as f(day),
generate_series(1,30) as g(user_id), generate_series(0,30) AS r(review);
-- Run topn on raw data
SELECT (topn(agg, 10)).*
FROM (
SELECT topn_add_agg(user_id::text) AS agg
FROM customer_reviews
)a
ORDER BY 2 DESC, 1;
ERROR: function topn_add_agg(text) does not exist
LINE 3: SELECT topn_add_agg(user_id::text) AS agg
^
HINT: No function matches the given name and argument types. You might need to add explicit type casts.
-- Aggregate the data into popular_reviewer
INSERT INTO popular_reviewer
SELECT day, topn_add_agg(user_id::text)
FROM customer_reviews
GROUP BY 1;
ERROR: function topn_add_agg(text) does not exist
LINE 2: SELECT day, topn_add_agg(user_id::text)
^
HINT: No function matches the given name and argument types. You might need to add explicit type casts.
-- Basic topn check on aggregated data
SELECT day, (topn(reviewers, 10)).*
FROM popular_reviewer
WHERE day >= '2018-06-20' and day <= '2018-06-30'
ORDER BY 3 DESC, 1, 2
LIMIT 10;
ERROR: function topn(jsonb, integer) does not exist
LINE 1: SELECT day, (topn(reviewers, 10)).*
^
HINT: No function matches the given name and argument types. You might need to add explicit type casts.
-- Union aggregated data for one week
SELECT (topn(agg, 10)).*
FROM (
SELECT topn_union_agg(reviewers) AS agg
FROM popular_reviewer
WHERE day >= '2018-05-24'::date AND day <= '2018-05-31'::date
)a
ORDER BY 2 DESC, 1;
ERROR: function topn_union_agg(jsonb) does not exist
LINE 3: SELECT topn_union_agg(reviewers) AS agg
^
HINT: No function matches the given name and argument types. You might need to add explicit type casts.
SELECT month, (topn(agg, 5)).*
FROM (
SELECT EXTRACT(MONTH FROM day) AS month, topn_union_agg(reviewers) AS agg
FROM popular_reviewer
WHERE day >= '2018-06-23' AND day <= '2018-07-01'
GROUP BY 1
ORDER BY 1
)a
ORDER BY 1, 3 DESC, 2;
ERROR: function topn_union_agg(jsonb) does not exist
LINE 3: SELECT EXTRACT(MONTH FROM day) AS month, topn_union_agg(rev...
^
HINT: No function matches the given name and argument types. You might need to add explicit type casts.
-- TODO the following queries will be supported after we fix #2265
-- They work for PG9.6 but not for PG10
SELECT (topn(topn_union_agg(reviewers), 10)).*
FROM popular_reviewer
WHERE day >= '2018-05-24'::date AND day <= '2018-05-31'::date
ORDER BY 2 DESC, 1;
ERROR: function topn_union_agg(jsonb) does not exist
LINE 1: SELECT (topn(topn_union_agg(reviewers), 10)).*
^
HINT: No function matches the given name and argument types. You might need to add explicit type casts.
SELECT (topn(topn_add_agg(user_id::text), 10)).*
FROM customer_reviews
ORDER BY 2 DESC, 1;
ERROR: function topn_add_agg(text) does not exist
LINE 1: SELECT (topn(topn_add_agg(user_id::text), 10)).*
^
HINT: No function matches the given name and argument types. You might need to add explicit type casts.
-- The following is going to be supported after window function support
SELECT day, (topn(agg, 10)).*
FROM (
SELECT day, topn_union_agg(reviewers) OVER seven_days AS agg
FROM popular_reviewer
WINDOW seven_days AS (ORDER BY day ASC ROWS 6 PRECEDING)
)a
ORDER BY 3 DESC, 1, 2
LIMIT 10;
ERROR: function topn_union_agg(jsonb) does not exist
LINE 3: SELECT day, topn_union_agg(reviewers) OVER seven_days AS ag...
^
HINT: No function matches the given name and argument types. You might need to add explicit type casts.
SELECT day, (topn(topn_add_agg(user_id::text) OVER seven_days, 10)).*
FROM customer_reviews
WINDOW seven_days AS (ORDER BY day ASC ROWS 6 PRECEDING)
ORDER BY 3 DESC, 1, 2
LIMIT 10;
ERROR: function topn_add_agg(text) does not exist
LINE 1: SELECT day, (topn(topn_add_agg(user_id::text) OVER seven_day...
^
HINT: No function matches the given name and argument types. You might need to add explicit type casts.
DROP TABLE customer_reviews;
DROP TABLE popular_reviewer;

View File

@ -0,0 +1,288 @@
--
-- CUSTOM_AGGREGATE_SUPPORT
--
-- Create HLL extension if present, print false result otherwise
SELECT CASE WHEN COUNT(*) > 0 THEN
'CREATE EXTENSION HLL'
ELSE 'SELECT false AS hll_present' END
AS create_cmd FROM pg_available_extensions()
WHERE name = 'hll'
\gset
:create_cmd;
ERROR: extension "hll" already exists
\c - - - :worker_1_port
:create_cmd;
ERROR: extension "hll" already exists
\c - - - :worker_2_port
:create_cmd;
ERROR: extension "hll" already exists
\c - - - :master_port
SET citus.shard_count TO 4;
CREATE TABLE raw_table (day date, user_id int);
CREATE TABLE daily_uniques(day date, unique_users hll);
SELECT create_distributed_table('raw_table', 'user_id');
create_distributed_table
--------------------------
(1 row)
SELECT create_distributed_table('daily_uniques', 'day');
create_distributed_table
--------------------------
(1 row)
INSERT INTO raw_table
SELECT day, user_id % 19
FROM generate_series('2018-05-24'::timestamp, '2018-06-24'::timestamp, '1 day'::interval) as f(day),
generate_series(1,100) as g(user_id);
INSERT INTO raw_table
SELECT day, user_id % 13
FROM generate_series('2018-06-10'::timestamp, '2018-07-10'::timestamp, '1 day'::interval) as f(day),
generate_series(1,100) as g(user_id);
-- Run hll on raw data
SELECT hll_cardinality(hll_union_agg(agg))
FROM (
SELECT hll_add_agg(hll_hash_integer(user_id)) AS agg
FROM raw_table)a;
hll_cardinality
-----------------
19
(1 row)
-- Aggregate the data into daily_uniques
INSERT INTO daily_uniques
SELECT day, hll_add_agg(hll_hash_integer(user_id))
FROM raw_table
GROUP BY 1;
-- Basic hll_cardinality check on aggregated data
SELECT day, hll_cardinality(unique_users)
FROM daily_uniques
WHERE day >= '2018-06-20' and day <= '2018-06-30'
ORDER BY 2 DESC,1
LIMIT 10;
day | hll_cardinality
------------+-----------------
06-20-2018 | 19
06-21-2018 | 19
06-22-2018 | 19
06-23-2018 | 19
06-24-2018 | 19
06-25-2018 | 13
06-26-2018 | 13
06-27-2018 | 13
06-28-2018 | 13
06-29-2018 | 13
(10 rows)
-- Union aggregated data for one week
SELECT hll_cardinality(hll_union_agg(unique_users))
FROM daily_uniques
WHERE day >= '2018-05-24'::date AND day <= '2018-05-31'::date;
hll_cardinality
-----------------
19
(1 row)
SELECT EXTRACT(MONTH FROM day) AS month, hll_cardinality(hll_union_agg(unique_users))
FROM daily_uniques
WHERE day >= '2018-06-23' AND day <= '2018-07-01'
GROUP BY 1
ORDER BY 1;
month | hll_cardinality
-------+-----------------
6 | 19
7 | 13
(2 rows)
-- These are going to be supported after window function support
SELECT day, hll_cardinality(hll_union_agg(unique_users) OVER seven_days)
FROM daily_uniques
WINDOW seven_days AS (ORDER BY day ASC ROWS 6 PRECEDING);
ERROR: could not run distributed query because the window function that is used cannot be pushed down
HINT: Window functions are supported in two ways. Either add an equality filter on the distributed tables' partition column or use the window functions with a PARTITION BY clause containing the distribution column
SELECT day, (hll_cardinality(hll_union_agg(unique_users) OVER two_days)) - hll_cardinality(unique_users) AS lost_uniques
FROM daily_uniques
WINDOW two_days AS (ORDER BY day ASC ROWS 1 PRECEDING);
ERROR: could not run distributed query because the window function that is used cannot be pushed down
HINT: Window functions are supported in two ways. Either add an equality filter on the distributed tables' partition column or use the window functions with a PARTITION BY clause containing the distribution column
DROP TABLE raw_table;
DROP TABLE daily_uniques;
-- Check if TopN aggregates work as expected
-- Create TopN extension if present, print false result otherwise
SELECT CASE WHEN COUNT(*) > 0 THEN
'CREATE EXTENSION TOPN'
ELSE 'SELECT false AS topn_present' END
AS create_topn FROM pg_available_extensions()
WHERE name = 'topn'
\gset
:create_topn;
\c - - - :worker_1_port
:create_topn;
\c - - - :worker_2_port
:create_topn;
\c - - - :master_port
CREATE TABLE customer_reviews (day date, user_id int, review int);
CREATE TABLE popular_reviewer(day date, reviewers jsonb);
SELECT create_distributed_table('customer_reviews', 'user_id');
create_distributed_table
--------------------------
(1 row)
SELECT create_distributed_table('popular_reviewer', 'day');
create_distributed_table
--------------------------
(1 row)
INSERT INTO customer_reviews
SELECT day, user_id % 7, review % 5
FROM generate_series('2018-05-24'::timestamp, '2018-06-24'::timestamp, '1 day'::interval) as f(day),
generate_series(1,30) as g(user_id), generate_series(0,30) AS r(review);
INSERT INTO customer_reviews
SELECT day, user_id % 13, review % 3
FROM generate_series('2018-06-10'::timestamp, '2018-07-10'::timestamp, '1 day'::interval) as f(day),
generate_series(1,30) as g(user_id), generate_series(0,30) AS r(review);
-- Run topn on raw data
SELECT (topn(agg, 10)).*
FROM (
SELECT topn_add_agg(user_id::text) AS agg
FROM customer_reviews
)a
ORDER BY 2 DESC, 1;
item | frequency
------+-----------
1 | 7843
2 | 7843
3 | 6851
4 | 6851
0 | 5890
5 | 5890
6 | 5890
7 | 1922
8 | 1922
9 | 1922
(10 rows)
-- Aggregate the data into popular_reviewer
INSERT INTO popular_reviewer
SELECT day, topn_add_agg(user_id::text)
FROM customer_reviews
GROUP BY 1;
-- Basic topn check on aggregated data
SELECT day, (topn(reviewers, 10)).*
FROM popular_reviewer
WHERE day >= '2018-06-20' and day <= '2018-06-30'
ORDER BY 3 DESC, 1, 2
LIMIT 10;
day | item | frequency
------------+------+-----------
06-20-2018 | 1 | 248
06-20-2018 | 2 | 248
06-21-2018 | 1 | 248
06-21-2018 | 2 | 248
06-22-2018 | 1 | 248
06-22-2018 | 2 | 248
06-23-2018 | 1 | 248
06-23-2018 | 2 | 248
06-24-2018 | 1 | 248
06-24-2018 | 2 | 248
(10 rows)
-- Union aggregated data for one week
SELECT (topn(agg, 10)).*
FROM (
SELECT topn_union_agg(reviewers) AS agg
FROM popular_reviewer
WHERE day >= '2018-05-24'::date AND day <= '2018-05-31'::date
)a
ORDER BY 2 DESC, 1;
item | frequency
------+-----------
1 | 1240
2 | 1240
0 | 992
3 | 992
4 | 992
5 | 992
6 | 992
(7 rows)
SELECT month, (topn(agg, 5)).*
FROM (
SELECT EXTRACT(MONTH FROM day) AS month, topn_union_agg(reviewers) AS agg
FROM popular_reviewer
WHERE day >= '2018-06-23' AND day <= '2018-07-01'
GROUP BY 1
ORDER BY 1
)a
ORDER BY 1, 3 DESC, 2;
month | item | frequency
-------+------+-----------
6 | 1 | 1054
6 | 2 | 1054
6 | 3 | 992
6 | 4 | 992
6 | 0 | 744
7 | 1 | 93
7 | 2 | 93
7 | 3 | 93
7 | 4 | 93
7 | 8 | 62
(10 rows)
-- TODO the following queries will be supported after we fix #2265
-- They work for PG9.6 but not for PG10
SELECT (topn(topn_union_agg(reviewers), 10)).*
FROM popular_reviewer
WHERE day >= '2018-05-24'::date AND day <= '2018-05-31'::date
ORDER BY 2 DESC, 1;
item | frequency
------+-----------
1 | 1240
2 | 1240
0 | 992
3 | 992
4 | 992
5 | 992
6 | 992
(7 rows)
SELECT (topn(topn_add_agg(user_id::text), 10)).*
FROM customer_reviews
ORDER BY 2 DESC, 1;
item | frequency
------+-----------
1 | 7843
2 | 7843
3 | 6851
4 | 6851
0 | 5890
5 | 5890
6 | 5890
7 | 1922
8 | 1922
9 | 1922
(10 rows)
-- The following is going to be supported after window function support
SELECT day, (topn(agg, 10)).*
FROM (
SELECT day, topn_union_agg(reviewers) OVER seven_days AS agg
FROM popular_reviewer
WINDOW seven_days AS (ORDER BY day ASC ROWS 6 PRECEDING)
)a
ORDER BY 3 DESC, 1, 2
LIMIT 10;
ERROR: could not run distributed query because the window function that is used cannot be pushed down
HINT: Window functions are supported in two ways. Either add an equality filter on the distributed tables' partition column or use the window functions with a PARTITION BY clause containing the distribution column
SELECT day, (topn(topn_add_agg(user_id::text) OVER seven_days, 10)).*
FROM customer_reviews
WINDOW seven_days AS (ORDER BY day ASC ROWS 6 PRECEDING)
ORDER BY 3 DESC, 1, 2
LIMIT 10;
ERROR: could not run distributed query because the window function that is used cannot be pushed down
HINT: Window functions are supported in two ways. Either add an equality filter on the distributed tables' partition column or use the window functions with a PARTITION BY clause containing the distribution column
DROP TABLE customer_reviews;
DROP TABLE popular_reviewer;

View File

@ -78,3 +78,106 @@ WINDOW two_days AS (ORDER BY day ASC ROWS 1 PRECEDING);
DROP TABLE raw_table; DROP TABLE raw_table;
DROP TABLE daily_uniques; DROP TABLE daily_uniques;
-- Check if TopN aggregates work as expected
-- Create TopN extension if present, print false result otherwise
SELECT CASE WHEN COUNT(*) > 0 THEN
'CREATE EXTENSION TOPN'
ELSE 'SELECT false AS topn_present' END
AS create_topn FROM pg_available_extensions()
WHERE name = 'topn'
\gset
:create_topn;
\c - - - :worker_1_port
:create_topn;
\c - - - :worker_2_port
:create_topn;
\c - - - :master_port
CREATE TABLE customer_reviews (day date, user_id int, review int);
CREATE TABLE popular_reviewer(day date, reviewers jsonb);
SELECT create_distributed_table('customer_reviews', 'user_id');
SELECT create_distributed_table('popular_reviewer', 'day');
INSERT INTO customer_reviews
SELECT day, user_id % 7, review % 5
FROM generate_series('2018-05-24'::timestamp, '2018-06-24'::timestamp, '1 day'::interval) as f(day),
generate_series(1,30) as g(user_id), generate_series(0,30) AS r(review);
INSERT INTO customer_reviews
SELECT day, user_id % 13, review % 3
FROM generate_series('2018-06-10'::timestamp, '2018-07-10'::timestamp, '1 day'::interval) as f(day),
generate_series(1,30) as g(user_id), generate_series(0,30) AS r(review);
-- Run topn on raw data
SELECT (topn(agg, 10)).*
FROM (
SELECT topn_add_agg(user_id::text) AS agg
FROM customer_reviews
)a
ORDER BY 2 DESC, 1;
-- Aggregate the data into popular_reviewer
INSERT INTO popular_reviewer
SELECT day, topn_add_agg(user_id::text)
FROM customer_reviews
GROUP BY 1;
-- Basic topn check on aggregated data
SELECT day, (topn(reviewers, 10)).*
FROM popular_reviewer
WHERE day >= '2018-06-20' and day <= '2018-06-30'
ORDER BY 3 DESC, 1, 2
LIMIT 10;
-- Union aggregated data for one week
SELECT (topn(agg, 10)).*
FROM (
SELECT topn_union_agg(reviewers) AS agg
FROM popular_reviewer
WHERE day >= '2018-05-24'::date AND day <= '2018-05-31'::date
)a
ORDER BY 2 DESC, 1;
SELECT month, (topn(agg, 5)).*
FROM (
SELECT EXTRACT(MONTH FROM day) AS month, topn_union_agg(reviewers) AS agg
FROM popular_reviewer
WHERE day >= '2018-06-23' AND day <= '2018-07-01'
GROUP BY 1
ORDER BY 1
)a
ORDER BY 1, 3 DESC, 2;
-- TODO the following queries will be supported after we fix #2265
-- They work for PG9.6 but not for PG10
SELECT (topn(topn_union_agg(reviewers), 10)).*
FROM popular_reviewer
WHERE day >= '2018-05-24'::date AND day <= '2018-05-31'::date
ORDER BY 2 DESC, 1;
SELECT (topn(topn_add_agg(user_id::text), 10)).*
FROM customer_reviews
ORDER BY 2 DESC, 1;
-- The following is going to be supported after window function support
SELECT day, (topn(agg, 10)).*
FROM (
SELECT day, topn_union_agg(reviewers) OVER seven_days AS agg
FROM popular_reviewer
WINDOW seven_days AS (ORDER BY day ASC ROWS 6 PRECEDING)
)a
ORDER BY 3 DESC, 1, 2
LIMIT 10;
SELECT day, (topn(topn_add_agg(user_id::text) OVER seven_days, 10)).*
FROM customer_reviews
WINDOW seven_days AS (ORDER BY day ASC ROWS 6 PRECEDING)
ORDER BY 3 DESC, 1, 2
LIMIT 10;
DROP TABLE customer_reviews;
DROP TABLE popular_reviewer;