From 3afa7f425d5507a08ca2c7aecf725c63086abd35 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?mehmet=20furkan=20=C5=9Fahin?= Date: Tue, 15 May 2018 17:40:52 +0300 Subject: [PATCH 1/2] Topn aggregates are supported --- .../planner/multi_logical_optimizer.c | 43 ++- .../distributed/multi_logical_optimizer.h | 11 +- .../expected/custom_aggregate_support.out | 159 ++++++++++ .../expected/custom_aggregate_support_0.out | 147 +++++++++ .../expected/custom_aggregate_support_1.out | 288 ++++++++++++++++++ .../regress/sql/custom_aggregate_support.sql | 103 +++++++ 6 files changed, 747 insertions(+), 4 deletions(-) create mode 100644 src/test/regress/expected/custom_aggregate_support_1.out diff --git a/src/backend/distributed/planner/multi_logical_optimizer.c b/src/backend/distributed/planner/multi_logical_optimizer.c index d7c73827d..d48cb0196 100644 --- a/src/backend/distributed/planner/multi_logical_optimizer.c +++ b/src/backend/distributed/planner/multi_logical_optimizer.c @@ -1490,8 +1490,8 @@ MasterAggregateMutator(Node *originalNode, MasterAggregateWalkerContext *walkerC /* * MasterAggregateExpression creates the master aggregate expression using the * original aggregate and aggregate's type information. This function handles - * the average, count, array_agg, and hll aggregates separately due to differences - * in these aggregate functions' transformations. + * the average, count, array_agg, hll and topn aggregates separately due to + * differences in these aggregate functions' transformations. * * Note that this function has implicit knowledge of the transformations applied * for worker nodes on the original aggregate. The function uses this implicit @@ -1798,6 +1798,45 @@ MasterAggregateExpression(Aggref *originalAggregate, newMasterExpression = (Expr *) unionAggregate; } + else if (aggregateType == AGGREGATE_TOPN_UNION_AGG || + aggregateType == AGGREGATE_TOPN_ADD_AGG) + { + /* + * Top-N aggregates are handled in two steps. First, we compute + * topn_add_agg() or topn_union_agg() aggregates on the worker nodes. + * Then, we gather the Top-Ns on the master and take the union of all + * to get the final topn. + */ + TargetEntry *topNTargetEntry = NULL; + Aggref *unionAggregate = NULL; + + /* worker aggregate and original aggregate have same return type */ + Oid topnType = exprType((Node *) originalAggregate); + Oid unionFunctionId = AggregateFunctionOid(TOPN_UNION_AGGREGATE_NAME, + topnType); + int32 topnReturnTypeMod = exprTypmod((Node *) originalAggregate); + Oid topnTypeCollationId = exprCollation((Node *) originalAggregate); + + /* create argument for the topn_union_agg() aggregate */ + Var *topnColumn = makeVar(masterTableId, walkerContext->columnId, topnType, + topnReturnTypeMod, topnTypeCollationId, columnLevelsUp); + walkerContext->columnId++; + + topNTargetEntry = makeTargetEntry((Expr *) topnColumn, argumentId, NULL, false); + + /* construct the master topn_union_agg() expression */ + unionAggregate = makeNode(Aggref); + unionAggregate->aggfnoid = unionFunctionId; + unionAggregate->aggtype = topnType; + unionAggregate->args = list_make1(topNTargetEntry); + unionAggregate->aggkind = AGGKIND_NORMAL; + unionAggregate->aggfilter = NULL; + unionAggregate->aggtranstype = InvalidOid; + unionAggregate->aggargtypes = list_make1_oid(topnType); + unionAggregate->aggsplit = AGGSPLIT_SIMPLE; + + newMasterExpression = (Expr *) unionAggregate; + } else { /* diff --git a/src/include/distributed/multi_logical_optimizer.h b/src/include/distributed/multi_logical_optimizer.h index 483c0cc60..bf8884595 100644 --- a/src/include/distributed/multi_logical_optimizer.h +++ b/src/include/distributed/multi_logical_optimizer.h @@ -39,6 +39,10 @@ #define HLL_UNION_AGGREGATE_NAME "hll_union_agg" #define HLL_CARDINALITY_FUNC_NAME "hll_cardinality" +/* Definitions related to Top-N approximations */ +#define TOPN_ADD_AGGREGATE_NAME "topn_add_agg" +#define TOPN_UNION_AGGREGATE_NAME "topn_union_agg" + /* * AggregateType represents an aggregate function's type, where the function is @@ -68,7 +72,9 @@ typedef enum AGGREGATE_BOOL_OR = 14, AGGREGATE_EVERY = 15, AGGREGATE_HLL_ADD = 16, - AGGREGATE_HLL_UNION = 17 + AGGREGATE_HLL_UNION = 17, + AGGREGATE_TOPN_ADD_AGG = 18, + AGGREGATE_TOPN_UNION_AGG = 19 } AggregateType; @@ -114,7 +120,8 @@ static const char *const AggregateNames[] = { "jsonb_agg", "jsonb_object_agg", "json_agg", "json_object_agg", "bit_and", "bit_or", "bool_and", "bool_or", "every", - "hll_add_agg", "hll_union_agg" + "hll_add_agg", "hll_union_agg", + "topn_add_agg", "topn_union_agg" }; diff --git a/src/test/regress/expected/custom_aggregate_support.out b/src/test/regress/expected/custom_aggregate_support.out index 2b85f94be..9ffa8d9a4 100644 --- a/src/test/regress/expected/custom_aggregate_support.out +++ b/src/test/regress/expected/custom_aggregate_support.out @@ -108,3 +108,162 @@ ERROR: could not run distributed query because the window function that is used HINT: Window functions are supported in two ways. Either add an equality filter on the distributed tables' partition column or use the window functions with a PARTITION BY clause containing the distribution column DROP TABLE raw_table; DROP TABLE daily_uniques; +-- Check if TopN aggregates work as expected +-- Create TopN extension if present, print false result otherwise +SELECT CASE WHEN COUNT(*) > 0 THEN + 'CREATE EXTENSION TOPN' +ELSE 'SELECT false AS topn_present' END +AS create_topn FROM pg_available_extensions() +WHERE name = 'topn' +\gset +:create_topn; +\c - - - :worker_1_port +:create_topn; +\c - - - :worker_2_port +:create_topn; +\c - - - :master_port +CREATE TABLE customer_reviews (day date, user_id int, review int); +CREATE TABLE popular_reviewer(day date, reviewers jsonb); +SELECT create_distributed_table('customer_reviews', 'user_id'); + create_distributed_table +-------------------------- + +(1 row) + +SELECT create_distributed_table('popular_reviewer', 'day'); + create_distributed_table +-------------------------- + +(1 row) + +INSERT INTO customer_reviews + SELECT day, user_id % 7, review % 5 + FROM generate_series('2018-05-24'::timestamp, '2018-06-24'::timestamp, '1 day'::interval) as f(day), + generate_series(1,30) as g(user_id), generate_series(0,30) AS r(review); +INSERT INTO customer_reviews + SELECT day, user_id % 13, review % 3 + FROM generate_series('2018-06-10'::timestamp, '2018-07-10'::timestamp, '1 day'::interval) as f(day), + generate_series(1,30) as g(user_id), generate_series(0,30) AS r(review); +-- Run topn on raw data +SELECT (topn(agg, 10)).* +FROM ( + SELECT topn_add_agg(user_id::text) AS agg + FROM customer_reviews + )a +ORDER BY 2 DESC, 1; + item | frequency +------+----------- + 1 | 7843 + 2 | 7843 + 3 | 6851 + 4 | 6851 + 0 | 5890 + 5 | 5890 + 6 | 5890 + 7 | 1922 + 8 | 1922 + 9 | 1922 +(10 rows) + +-- Aggregate the data into popular_reviewer +INSERT INTO popular_reviewer + SELECT day, topn_add_agg(user_id::text) + FROM customer_reviews + GROUP BY 1; +-- Basic topn check on aggregated data +SELECT day, (topn(reviewers, 10)).* +FROM popular_reviewer +WHERE day >= '2018-06-20' and day <= '2018-06-30' +ORDER BY 3 DESC, 1, 2 +LIMIT 10; + day | item | frequency +------------+------+----------- + 06-20-2018 | 1 | 248 + 06-20-2018 | 2 | 248 + 06-21-2018 | 1 | 248 + 06-21-2018 | 2 | 248 + 06-22-2018 | 1 | 248 + 06-22-2018 | 2 | 248 + 06-23-2018 | 1 | 248 + 06-23-2018 | 2 | 248 + 06-24-2018 | 1 | 248 + 06-24-2018 | 2 | 248 +(10 rows) + +-- Union aggregated data for one week +SELECT (topn(agg, 10)).* +FROM ( + SELECT topn_union_agg(reviewers) AS agg + FROM popular_reviewer + WHERE day >= '2018-05-24'::date AND day <= '2018-05-31'::date + )a +ORDER BY 2 DESC, 1; + item | frequency +------+----------- + 1 | 1240 + 2 | 1240 + 0 | 992 + 3 | 992 + 4 | 992 + 5 | 992 + 6 | 992 +(7 rows) + +SELECT month, (topn(agg, 5)).* +FROM ( + SELECT EXTRACT(MONTH FROM day) AS month, topn_union_agg(reviewers) AS agg + FROM popular_reviewer + WHERE day >= '2018-06-23' AND day <= '2018-07-01' + GROUP BY 1 + ORDER BY 1 + )a +ORDER BY 1, 3 DESC, 2; + month | item | frequency +-------+------+----------- + 6 | 1 | 1054 + 6 | 2 | 1054 + 6 | 3 | 992 + 6 | 4 | 992 + 6 | 0 | 744 + 7 | 1 | 93 + 7 | 2 | 93 + 7 | 3 | 93 + 7 | 4 | 93 + 7 | 8 | 62 +(10 rows) + +-- TODO the following queries will be supported after we fix #2265 +-- They work for PG9.6 but not for PG10 +SELECT (topn(topn_union_agg(reviewers), 10)).* +FROM popular_reviewer +WHERE day >= '2018-05-24'::date AND day <= '2018-05-31'::date +ORDER BY 2 DESC, 1; +ERROR: set-valued function called in context that cannot accept a set +LINE 1: SELECT (topn(topn_union_agg(reviewers), 10)).* + ^ +SELECT (topn(topn_add_agg(user_id::text), 10)).* +FROM customer_reviews +ORDER BY 2 DESC, 1; +ERROR: set-valued function called in context that cannot accept a set +LINE 1: SELECT (topn(topn_add_agg(user_id::text), 10)).* + ^ +-- The following is going to be supported after window function support +SELECT day, (topn(agg, 10)).* +FROM ( + SELECT day, topn_union_agg(reviewers) OVER seven_days AS agg + FROM popular_reviewer + WINDOW seven_days AS (ORDER BY day ASC ROWS 6 PRECEDING) + )a +ORDER BY 3 DESC, 1, 2 +LIMIT 10; +ERROR: could not run distributed query because the window function that is used cannot be pushed down +HINT: Window functions are supported in two ways. Either add an equality filter on the distributed tables' partition column or use the window functions with a PARTITION BY clause containing the distribution column +SELECT day, (topn(topn_add_agg(user_id::text) OVER seven_days, 10)).* +FROM customer_reviews +WINDOW seven_days AS (ORDER BY day ASC ROWS 6 PRECEDING) +ORDER BY 3 DESC, 1, 2 +LIMIT 10; +ERROR: could not run distributed query because the window function that is used cannot be pushed down +HINT: Window functions are supported in two ways. Either add an equality filter on the distributed tables' partition column or use the window functions with a PARTITION BY clause containing the distribution column +DROP TABLE customer_reviews; +DROP TABLE popular_reviewer; diff --git a/src/test/regress/expected/custom_aggregate_support_0.out b/src/test/regress/expected/custom_aggregate_support_0.out index 323087988..aafd3bd16 100644 --- a/src/test/regress/expected/custom_aggregate_support_0.out +++ b/src/test/regress/expected/custom_aggregate_support_0.out @@ -110,3 +110,150 @@ LINE 2: FROM daily_uniques DROP TABLE raw_table; DROP TABLE daily_uniques; ERROR: table "daily_uniques" does not exist +-- Check if TopN aggregates work as expected +-- Create TopN extension if present, print false result otherwise +SELECT CASE WHEN COUNT(*) > 0 THEN + 'CREATE EXTENSION TOPN' +ELSE 'SELECT false AS topn_present' END +AS create_topn FROM pg_available_extensions() +WHERE name = 'topn' +\gset +:create_topn; + topn_present +-------------- + f +(1 row) + +\c - - - :worker_1_port +:create_topn; + topn_present +-------------- + f +(1 row) + +\c - - - :worker_2_port +:create_topn; + topn_present +-------------- + f +(1 row) + +\c - - - :master_port +CREATE TABLE customer_reviews (day date, user_id int, review int); +CREATE TABLE popular_reviewer(day date, reviewers jsonb); +SELECT create_distributed_table('customer_reviews', 'user_id'); + create_distributed_table +-------------------------- + +(1 row) + +SELECT create_distributed_table('popular_reviewer', 'day'); + create_distributed_table +-------------------------- + +(1 row) + +INSERT INTO customer_reviews + SELECT day, user_id % 7, review % 5 + FROM generate_series('2018-05-24'::timestamp, '2018-06-24'::timestamp, '1 day'::interval) as f(day), + generate_series(1,30) as g(user_id), generate_series(0,30) AS r(review); +INSERT INTO customer_reviews + SELECT day, user_id % 13, review % 3 + FROM generate_series('2018-06-10'::timestamp, '2018-07-10'::timestamp, '1 day'::interval) as f(day), + generate_series(1,30) as g(user_id), generate_series(0,30) AS r(review); +-- Run topn on raw data +SELECT (topn(agg, 10)).* +FROM ( + SELECT topn_add_agg(user_id::text) AS agg + FROM customer_reviews + )a +ORDER BY 2 DESC, 1; +ERROR: function topn_add_agg(text) does not exist +LINE 3: SELECT topn_add_agg(user_id::text) AS agg + ^ +HINT: No function matches the given name and argument types. You might need to add explicit type casts. +-- Aggregate the data into popular_reviewer +INSERT INTO popular_reviewer + SELECT day, topn_add_agg(user_id::text) + FROM customer_reviews + GROUP BY 1; +ERROR: function topn_add_agg(text) does not exist +LINE 2: SELECT day, topn_add_agg(user_id::text) + ^ +HINT: No function matches the given name and argument types. You might need to add explicit type casts. +-- Basic topn check on aggregated data +SELECT day, (topn(reviewers, 10)).* +FROM popular_reviewer +WHERE day >= '2018-06-20' and day <= '2018-06-30' +ORDER BY 3 DESC, 1, 2 +LIMIT 10; +ERROR: function topn(jsonb, integer) does not exist +LINE 1: SELECT day, (topn(reviewers, 10)).* + ^ +HINT: No function matches the given name and argument types. You might need to add explicit type casts. +-- Union aggregated data for one week +SELECT (topn(agg, 10)).* +FROM ( + SELECT topn_union_agg(reviewers) AS agg + FROM popular_reviewer + WHERE day >= '2018-05-24'::date AND day <= '2018-05-31'::date + )a +ORDER BY 2 DESC, 1; +ERROR: function topn_union_agg(jsonb) does not exist +LINE 3: SELECT topn_union_agg(reviewers) AS agg + ^ +HINT: No function matches the given name and argument types. You might need to add explicit type casts. +SELECT month, (topn(agg, 5)).* +FROM ( + SELECT EXTRACT(MONTH FROM day) AS month, topn_union_agg(reviewers) AS agg + FROM popular_reviewer + WHERE day >= '2018-06-23' AND day <= '2018-07-01' + GROUP BY 1 + ORDER BY 1 + )a +ORDER BY 1, 3 DESC, 2; +ERROR: function topn_union_agg(jsonb) does not exist +LINE 3: SELECT EXTRACT(MONTH FROM day) AS month, topn_union_agg(rev... + ^ +HINT: No function matches the given name and argument types. You might need to add explicit type casts. +-- TODO the following queries will be supported after we fix #2265 +-- They work for PG9.6 but not for PG10 +SELECT (topn(topn_union_agg(reviewers), 10)).* +FROM popular_reviewer +WHERE day >= '2018-05-24'::date AND day <= '2018-05-31'::date +ORDER BY 2 DESC, 1; +ERROR: function topn_union_agg(jsonb) does not exist +LINE 1: SELECT (topn(topn_union_agg(reviewers), 10)).* + ^ +HINT: No function matches the given name and argument types. You might need to add explicit type casts. +SELECT (topn(topn_add_agg(user_id::text), 10)).* +FROM customer_reviews +ORDER BY 2 DESC, 1; +ERROR: function topn_add_agg(text) does not exist +LINE 1: SELECT (topn(topn_add_agg(user_id::text), 10)).* + ^ +HINT: No function matches the given name and argument types. You might need to add explicit type casts. +-- The following is going to be supported after window function support +SELECT day, (topn(agg, 10)).* +FROM ( + SELECT day, topn_union_agg(reviewers) OVER seven_days AS agg + FROM popular_reviewer + WINDOW seven_days AS (ORDER BY day ASC ROWS 6 PRECEDING) + )a +ORDER BY 3 DESC, 1, 2 +LIMIT 10; +ERROR: function topn_union_agg(jsonb) does not exist +LINE 3: SELECT day, topn_union_agg(reviewers) OVER seven_days AS ag... + ^ +HINT: No function matches the given name and argument types. You might need to add explicit type casts. +SELECT day, (topn(topn_add_agg(user_id::text) OVER seven_days, 10)).* +FROM customer_reviews +WINDOW seven_days AS (ORDER BY day ASC ROWS 6 PRECEDING) +ORDER BY 3 DESC, 1, 2 +LIMIT 10; +ERROR: function topn_add_agg(text) does not exist +LINE 1: SELECT day, (topn(topn_add_agg(user_id::text) OVER seven_day... + ^ +HINT: No function matches the given name and argument types. You might need to add explicit type casts. +DROP TABLE customer_reviews; +DROP TABLE popular_reviewer; diff --git a/src/test/regress/expected/custom_aggregate_support_1.out b/src/test/regress/expected/custom_aggregate_support_1.out new file mode 100644 index 000000000..b2af1d477 --- /dev/null +++ b/src/test/regress/expected/custom_aggregate_support_1.out @@ -0,0 +1,288 @@ +-- +-- CUSTOM_AGGREGATE_SUPPORT +-- +-- Create HLL extension if present, print false result otherwise +SELECT CASE WHEN COUNT(*) > 0 THEN + 'CREATE EXTENSION HLL' +ELSE 'SELECT false AS hll_present' END +AS create_cmd FROM pg_available_extensions() +WHERE name = 'hll' +\gset +:create_cmd; +ERROR: extension "hll" already exists +\c - - - :worker_1_port +:create_cmd; +ERROR: extension "hll" already exists +\c - - - :worker_2_port +:create_cmd; +ERROR: extension "hll" already exists +\c - - - :master_port +SET citus.shard_count TO 4; +CREATE TABLE raw_table (day date, user_id int); +CREATE TABLE daily_uniques(day date, unique_users hll); +SELECT create_distributed_table('raw_table', 'user_id'); + create_distributed_table +-------------------------- + +(1 row) + +SELECT create_distributed_table('daily_uniques', 'day'); + create_distributed_table +-------------------------- + +(1 row) + +INSERT INTO raw_table + SELECT day, user_id % 19 + FROM generate_series('2018-05-24'::timestamp, '2018-06-24'::timestamp, '1 day'::interval) as f(day), + generate_series(1,100) as g(user_id); +INSERT INTO raw_table + SELECT day, user_id % 13 + FROM generate_series('2018-06-10'::timestamp, '2018-07-10'::timestamp, '1 day'::interval) as f(day), + generate_series(1,100) as g(user_id); +-- Run hll on raw data +SELECT hll_cardinality(hll_union_agg(agg)) +FROM ( + SELECT hll_add_agg(hll_hash_integer(user_id)) AS agg + FROM raw_table)a; + hll_cardinality +----------------- + 19 +(1 row) + +-- Aggregate the data into daily_uniques +INSERT INTO daily_uniques + SELECT day, hll_add_agg(hll_hash_integer(user_id)) + FROM raw_table + GROUP BY 1; +-- Basic hll_cardinality check on aggregated data +SELECT day, hll_cardinality(unique_users) +FROM daily_uniques +WHERE day >= '2018-06-20' and day <= '2018-06-30' +ORDER BY 2 DESC,1 +LIMIT 10; + day | hll_cardinality +------------+----------------- + 06-20-2018 | 19 + 06-21-2018 | 19 + 06-22-2018 | 19 + 06-23-2018 | 19 + 06-24-2018 | 19 + 06-25-2018 | 13 + 06-26-2018 | 13 + 06-27-2018 | 13 + 06-28-2018 | 13 + 06-29-2018 | 13 +(10 rows) + +-- Union aggregated data for one week +SELECT hll_cardinality(hll_union_agg(unique_users)) +FROM daily_uniques +WHERE day >= '2018-05-24'::date AND day <= '2018-05-31'::date; + hll_cardinality +----------------- + 19 +(1 row) + +SELECT EXTRACT(MONTH FROM day) AS month, hll_cardinality(hll_union_agg(unique_users)) +FROM daily_uniques +WHERE day >= '2018-06-23' AND day <= '2018-07-01' +GROUP BY 1 +ORDER BY 1; + month | hll_cardinality +-------+----------------- + 6 | 19 + 7 | 13 +(2 rows) + +-- These are going to be supported after window function support +SELECT day, hll_cardinality(hll_union_agg(unique_users) OVER seven_days) +FROM daily_uniques +WINDOW seven_days AS (ORDER BY day ASC ROWS 6 PRECEDING); +ERROR: could not run distributed query because the window function that is used cannot be pushed down +HINT: Window functions are supported in two ways. Either add an equality filter on the distributed tables' partition column or use the window functions with a PARTITION BY clause containing the distribution column +SELECT day, (hll_cardinality(hll_union_agg(unique_users) OVER two_days)) - hll_cardinality(unique_users) AS lost_uniques +FROM daily_uniques +WINDOW two_days AS (ORDER BY day ASC ROWS 1 PRECEDING); +ERROR: could not run distributed query because the window function that is used cannot be pushed down +HINT: Window functions are supported in two ways. Either add an equality filter on the distributed tables' partition column or use the window functions with a PARTITION BY clause containing the distribution column +DROP TABLE raw_table; +DROP TABLE daily_uniques; +-- Check if TopN aggregates work as expected +-- Create TopN extension if present, print false result otherwise +SELECT CASE WHEN COUNT(*) > 0 THEN + 'CREATE EXTENSION TOPN' +ELSE 'SELECT false AS topn_present' END +AS create_topn FROM pg_available_extensions() +WHERE name = 'topn' +\gset +:create_topn; +\c - - - :worker_1_port +:create_topn; +\c - - - :worker_2_port +:create_topn; +\c - - - :master_port +CREATE TABLE customer_reviews (day date, user_id int, review int); +CREATE TABLE popular_reviewer(day date, reviewers jsonb); +SELECT create_distributed_table('customer_reviews', 'user_id'); + create_distributed_table +-------------------------- + +(1 row) + +SELECT create_distributed_table('popular_reviewer', 'day'); + create_distributed_table +-------------------------- + +(1 row) + +INSERT INTO customer_reviews + SELECT day, user_id % 7, review % 5 + FROM generate_series('2018-05-24'::timestamp, '2018-06-24'::timestamp, '1 day'::interval) as f(day), + generate_series(1,30) as g(user_id), generate_series(0,30) AS r(review); +INSERT INTO customer_reviews + SELECT day, user_id % 13, review % 3 + FROM generate_series('2018-06-10'::timestamp, '2018-07-10'::timestamp, '1 day'::interval) as f(day), + generate_series(1,30) as g(user_id), generate_series(0,30) AS r(review); +-- Run topn on raw data +SELECT (topn(agg, 10)).* +FROM ( + SELECT topn_add_agg(user_id::text) AS agg + FROM customer_reviews + )a +ORDER BY 2 DESC, 1; + item | frequency +------+----------- + 1 | 7843 + 2 | 7843 + 3 | 6851 + 4 | 6851 + 0 | 5890 + 5 | 5890 + 6 | 5890 + 7 | 1922 + 8 | 1922 + 9 | 1922 +(10 rows) + +-- Aggregate the data into popular_reviewer +INSERT INTO popular_reviewer + SELECT day, topn_add_agg(user_id::text) + FROM customer_reviews + GROUP BY 1; +-- Basic topn check on aggregated data +SELECT day, (topn(reviewers, 10)).* +FROM popular_reviewer +WHERE day >= '2018-06-20' and day <= '2018-06-30' +ORDER BY 3 DESC, 1, 2 +LIMIT 10; + day | item | frequency +------------+------+----------- + 06-20-2018 | 1 | 248 + 06-20-2018 | 2 | 248 + 06-21-2018 | 1 | 248 + 06-21-2018 | 2 | 248 + 06-22-2018 | 1 | 248 + 06-22-2018 | 2 | 248 + 06-23-2018 | 1 | 248 + 06-23-2018 | 2 | 248 + 06-24-2018 | 1 | 248 + 06-24-2018 | 2 | 248 +(10 rows) + +-- Union aggregated data for one week +SELECT (topn(agg, 10)).* +FROM ( + SELECT topn_union_agg(reviewers) AS agg + FROM popular_reviewer + WHERE day >= '2018-05-24'::date AND day <= '2018-05-31'::date + )a +ORDER BY 2 DESC, 1; + item | frequency +------+----------- + 1 | 1240 + 2 | 1240 + 0 | 992 + 3 | 992 + 4 | 992 + 5 | 992 + 6 | 992 +(7 rows) + +SELECT month, (topn(agg, 5)).* +FROM ( + SELECT EXTRACT(MONTH FROM day) AS month, topn_union_agg(reviewers) AS agg + FROM popular_reviewer + WHERE day >= '2018-06-23' AND day <= '2018-07-01' + GROUP BY 1 + ORDER BY 1 + )a +ORDER BY 1, 3 DESC, 2; + month | item | frequency +-------+------+----------- + 6 | 1 | 1054 + 6 | 2 | 1054 + 6 | 3 | 992 + 6 | 4 | 992 + 6 | 0 | 744 + 7 | 1 | 93 + 7 | 2 | 93 + 7 | 3 | 93 + 7 | 4 | 93 + 7 | 8 | 62 +(10 rows) + +-- TODO the following queries will be supported after we fix #2265 +-- They work for PG9.6 but not for PG10 +SELECT (topn(topn_union_agg(reviewers), 10)).* +FROM popular_reviewer +WHERE day >= '2018-05-24'::date AND day <= '2018-05-31'::date +ORDER BY 2 DESC, 1; + item | frequency +------+----------- + 1 | 1240 + 2 | 1240 + 0 | 992 + 3 | 992 + 4 | 992 + 5 | 992 + 6 | 992 +(7 rows) + +SELECT (topn(topn_add_agg(user_id::text), 10)).* +FROM customer_reviews +ORDER BY 2 DESC, 1; + item | frequency +------+----------- + 1 | 7843 + 2 | 7843 + 3 | 6851 + 4 | 6851 + 0 | 5890 + 5 | 5890 + 6 | 5890 + 7 | 1922 + 8 | 1922 + 9 | 1922 +(10 rows) + +-- The following is going to be supported after window function support +SELECT day, (topn(agg, 10)).* +FROM ( + SELECT day, topn_union_agg(reviewers) OVER seven_days AS agg + FROM popular_reviewer + WINDOW seven_days AS (ORDER BY day ASC ROWS 6 PRECEDING) + )a +ORDER BY 3 DESC, 1, 2 +LIMIT 10; +ERROR: could not run distributed query because the window function that is used cannot be pushed down +HINT: Window functions are supported in two ways. Either add an equality filter on the distributed tables' partition column or use the window functions with a PARTITION BY clause containing the distribution column +SELECT day, (topn(topn_add_agg(user_id::text) OVER seven_days, 10)).* +FROM customer_reviews +WINDOW seven_days AS (ORDER BY day ASC ROWS 6 PRECEDING) +ORDER BY 3 DESC, 1, 2 +LIMIT 10; +ERROR: could not run distributed query because the window function that is used cannot be pushed down +HINT: Window functions are supported in two ways. Either add an equality filter on the distributed tables' partition column or use the window functions with a PARTITION BY clause containing the distribution column +DROP TABLE customer_reviews; +DROP TABLE popular_reviewer; diff --git a/src/test/regress/sql/custom_aggregate_support.sql b/src/test/regress/sql/custom_aggregate_support.sql index 3eaf0bfc0..1fc4ada4e 100644 --- a/src/test/regress/sql/custom_aggregate_support.sql +++ b/src/test/regress/sql/custom_aggregate_support.sql @@ -78,3 +78,106 @@ WINDOW two_days AS (ORDER BY day ASC ROWS 1 PRECEDING); DROP TABLE raw_table; DROP TABLE daily_uniques; + +-- Check if TopN aggregates work as expected +-- Create TopN extension if present, print false result otherwise +SELECT CASE WHEN COUNT(*) > 0 THEN + 'CREATE EXTENSION TOPN' +ELSE 'SELECT false AS topn_present' END +AS create_topn FROM pg_available_extensions() +WHERE name = 'topn' +\gset + +:create_topn; + +\c - - - :worker_1_port +:create_topn; + +\c - - - :worker_2_port +:create_topn; + +\c - - - :master_port +CREATE TABLE customer_reviews (day date, user_id int, review int); +CREATE TABLE popular_reviewer(day date, reviewers jsonb); + +SELECT create_distributed_table('customer_reviews', 'user_id'); +SELECT create_distributed_table('popular_reviewer', 'day'); + +INSERT INTO customer_reviews + SELECT day, user_id % 7, review % 5 + FROM generate_series('2018-05-24'::timestamp, '2018-06-24'::timestamp, '1 day'::interval) as f(day), + generate_series(1,30) as g(user_id), generate_series(0,30) AS r(review); +INSERT INTO customer_reviews + SELECT day, user_id % 13, review % 3 + FROM generate_series('2018-06-10'::timestamp, '2018-07-10'::timestamp, '1 day'::interval) as f(day), + generate_series(1,30) as g(user_id), generate_series(0,30) AS r(review); + +-- Run topn on raw data +SELECT (topn(agg, 10)).* +FROM ( + SELECT topn_add_agg(user_id::text) AS agg + FROM customer_reviews + )a +ORDER BY 2 DESC, 1; + +-- Aggregate the data into popular_reviewer +INSERT INTO popular_reviewer + SELECT day, topn_add_agg(user_id::text) + FROM customer_reviews + GROUP BY 1; + +-- Basic topn check on aggregated data +SELECT day, (topn(reviewers, 10)).* +FROM popular_reviewer +WHERE day >= '2018-06-20' and day <= '2018-06-30' +ORDER BY 3 DESC, 1, 2 +LIMIT 10; + +-- Union aggregated data for one week +SELECT (topn(agg, 10)).* +FROM ( + SELECT topn_union_agg(reviewers) AS agg + FROM popular_reviewer + WHERE day >= '2018-05-24'::date AND day <= '2018-05-31'::date + )a +ORDER BY 2 DESC, 1; + +SELECT month, (topn(agg, 5)).* +FROM ( + SELECT EXTRACT(MONTH FROM day) AS month, topn_union_agg(reviewers) AS agg + FROM popular_reviewer + WHERE day >= '2018-06-23' AND day <= '2018-07-01' + GROUP BY 1 + ORDER BY 1 + )a +ORDER BY 1, 3 DESC, 2; + +-- TODO the following queries will be supported after we fix #2265 +-- They work for PG9.6 but not for PG10 +SELECT (topn(topn_union_agg(reviewers), 10)).* +FROM popular_reviewer +WHERE day >= '2018-05-24'::date AND day <= '2018-05-31'::date +ORDER BY 2 DESC, 1; + +SELECT (topn(topn_add_agg(user_id::text), 10)).* +FROM customer_reviews +ORDER BY 2 DESC, 1; + +-- The following is going to be supported after window function support +SELECT day, (topn(agg, 10)).* +FROM ( + SELECT day, topn_union_agg(reviewers) OVER seven_days AS agg + FROM popular_reviewer + WINDOW seven_days AS (ORDER BY day ASC ROWS 6 PRECEDING) + )a +ORDER BY 3 DESC, 1, 2 +LIMIT 10; + +SELECT day, (topn(topn_add_agg(user_id::text) OVER seven_days, 10)).* +FROM customer_reviews +WINDOW seven_days AS (ORDER BY day ASC ROWS 6 PRECEDING) +ORDER BY 3 DESC, 1, 2 +LIMIT 10; + +DROP TABLE customer_reviews; +DROP TABLE popular_reviewer; From 93e2d26226293234484d5183caf5605eca7d2cc5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?mehmet=20furkan=20=C5=9Fahin?= Date: Mon, 9 Jul 2018 21:40:20 +0300 Subject: [PATCH 2/2] .travis.yml change to install TopN on travis --- .travis.yml | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/.travis.yml b/.travis.yml index 7c8ced9d8..2f240a20d 100644 --- a/.travis.yml +++ b/.travis.yml @@ -42,13 +42,18 @@ install: - pip3 install --user mitmproxy==3.0.4 - pip3 install --user construct==2.9.45 - mitmproxy --version - # download and install HLL manually, as custom builds won't satisfy deps + # download and install HLL and TopN manually, as custom builds won't satisfy deps # only install if performing non-11 build - | if [ "${PGVERSION}" != "11" ]; then apt-get download "postgresql-${PGVERSION}-hll=2.10.2.citus-1" sudo dpkg --force-confold --force-confdef --force-all -i *hll*.deb fi + - | + if [ "${PGVERSION}" != "11" ]; then + apt-get download "postgresql-${PGVERSION}-topn=2.1.0" + sudo dpkg --force-confold --force-confdef --force-all -i *topn*.deb + fi before_script: citus_indent --quiet --check script: CFLAGS=-Werror pg_travis_multi_test check after_success: