mirror of https://github.com/citusdata/citus.git
commit
19e92ee369
|
@ -1489,7 +1489,7 @@ MasterAggregateMutator(Node *originalNode, MasterAggregateWalkerContext *walkerC
|
|||
/*
|
||||
* MasterAggregateExpression creates the master aggregate expression using the
|
||||
* original aggregate and aggregate's type information. This function handles
|
||||
* the average, count, and array_agg aggregates separately due to differences
|
||||
* the average, count, array_agg, and hll aggregates separately due to differences
|
||||
* in these aggregate functions' transformations.
|
||||
*
|
||||
* Note that this function has implicit knowledge of the transformations applied
|
||||
|
@ -1763,6 +1763,40 @@ MasterAggregateExpression(Aggref *originalAggregate,
|
|||
|
||||
newMasterExpression = (Expr *) newMasterAggregate;
|
||||
}
|
||||
else if (aggregateType == AGGREGATE_HLL_ADD ||
|
||||
aggregateType == AGGREGATE_HLL_UNION)
|
||||
{
|
||||
/*
|
||||
* If hll aggregates are called, we simply create the hll_union_aggregate
|
||||
* to apply in the master after running the original aggregate in
|
||||
* workers.
|
||||
*/
|
||||
TargetEntry *hllTargetEntry = NULL;
|
||||
Aggref *unionAggregate = NULL;
|
||||
|
||||
Oid hllType = exprType((Node *) originalAggregate);
|
||||
Oid unionFunctionId = AggregateFunctionOid(HLL_UNION_AGGREGATE_NAME, hllType);
|
||||
int32 hllReturnTypeMod = exprTypmod((Node *) originalAggregate);
|
||||
Oid hllTypeCollationId = exprCollation((Node *) originalAggregate);
|
||||
|
||||
Var *hllColumn = makeVar(masterTableId, walkerContext->columnId, hllType,
|
||||
hllReturnTypeMod, hllTypeCollationId, columnLevelsUp);
|
||||
walkerContext->columnId++;
|
||||
|
||||
hllTargetEntry = makeTargetEntry((Expr *) hllColumn, argumentId, NULL, false);
|
||||
|
||||
unionAggregate = makeNode(Aggref);
|
||||
unionAggregate->aggfnoid = unionFunctionId;
|
||||
unionAggregate->aggtype = hllType;
|
||||
unionAggregate->args = list_make1(hllTargetEntry);
|
||||
unionAggregate->aggkind = AGGKIND_NORMAL;
|
||||
unionAggregate->aggfilter = NULL;
|
||||
unionAggregate->aggtranstype = InvalidOid;
|
||||
unionAggregate->aggargtypes = list_make1_oid(hllType);
|
||||
unionAggregate->aggsplit = AGGSPLIT_SIMPLE;
|
||||
|
||||
newMasterExpression = (Expr *) unionAggregate;
|
||||
}
|
||||
else
|
||||
{
|
||||
/*
|
||||
|
|
|
@ -66,7 +66,9 @@ typedef enum
|
|||
AGGREGATE_BIT_OR = 12,
|
||||
AGGREGATE_BOOL_AND = 13,
|
||||
AGGREGATE_BOOL_OR = 14,
|
||||
AGGREGATE_EVERY = 15
|
||||
AGGREGATE_EVERY = 15,
|
||||
AGGREGATE_HLL_ADD = 16,
|
||||
AGGREGATE_HLL_UNION = 17
|
||||
} AggregateType;
|
||||
|
||||
|
||||
|
@ -111,7 +113,8 @@ static const char *const AggregateNames[] = {
|
|||
"sum", "count", "array_agg",
|
||||
"jsonb_agg", "jsonb_object_agg",
|
||||
"json_agg", "json_object_agg",
|
||||
"bit_and", "bit_or", "bool_and", "bool_or", "every"
|
||||
"bit_and", "bit_or", "bool_and", "bool_or", "every",
|
||||
"hll_add_agg", "hll_union_agg"
|
||||
};
|
||||
|
||||
|
||||
|
|
|
@ -0,0 +1,110 @@
|
|||
--
|
||||
-- CUSTOM_AGGREGATE_SUPPORT
|
||||
--
|
||||
-- Create HLL extension if present, print false result otherwise
|
||||
SELECT CASE WHEN COUNT(*) > 0 THEN
|
||||
'CREATE EXTENSION HLL'
|
||||
ELSE 'SELECT false AS hll_present' END
|
||||
AS create_cmd FROM pg_available_extensions()
|
||||
WHERE name = 'hll'
|
||||
\gset
|
||||
:create_cmd;
|
||||
ERROR: extension "hll" already exists
|
||||
\c - - - :worker_1_port
|
||||
:create_cmd;
|
||||
ERROR: extension "hll" already exists
|
||||
\c - - - :worker_2_port
|
||||
:create_cmd;
|
||||
ERROR: extension "hll" already exists
|
||||
\c - - - :master_port
|
||||
SET citus.shard_count TO 4;
|
||||
CREATE TABLE raw_table (day date, user_id int);
|
||||
CREATE TABLE daily_uniques(day date, unique_users hll);
|
||||
SELECT create_distributed_table('raw_table', 'user_id');
|
||||
create_distributed_table
|
||||
--------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT create_distributed_table('daily_uniques', 'day');
|
||||
create_distributed_table
|
||||
--------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
INSERT INTO raw_table
|
||||
SELECT day, user_id % 19
|
||||
FROM generate_series('2018-05-24'::timestamp, '2018-06-24'::timestamp, '1 day'::interval) as f(day),
|
||||
generate_series(1,100) as g(user_id);
|
||||
INSERT INTO raw_table
|
||||
SELECT day, user_id % 13
|
||||
FROM generate_series('2018-06-10'::timestamp, '2018-07-10'::timestamp, '1 day'::interval) as f(day),
|
||||
generate_series(1,100) as g(user_id);
|
||||
-- Run hll on raw data
|
||||
SELECT hll_cardinality(hll_union_agg(agg))
|
||||
FROM (
|
||||
SELECT hll_add_agg(hll_hash_integer(user_id)) AS agg
|
||||
FROM raw_table)a;
|
||||
hll_cardinality
|
||||
-----------------
|
||||
19
|
||||
(1 row)
|
||||
|
||||
-- Aggregate the data into daily_uniques
|
||||
INSERT INTO daily_uniques
|
||||
SELECT day, hll_add_agg(hll_hash_integer(user_id))
|
||||
FROM raw_table
|
||||
GROUP BY 1;
|
||||
-- Basic hll_cardinality check on aggregated data
|
||||
SELECT day, hll_cardinality(unique_users)
|
||||
FROM daily_uniques
|
||||
WHERE day >= '2018-06-20' and day <= '2018-06-30'
|
||||
ORDER BY 2 DESC,1
|
||||
LIMIT 10;
|
||||
day | hll_cardinality
|
||||
------------+-----------------
|
||||
06-20-2018 | 19
|
||||
06-21-2018 | 19
|
||||
06-22-2018 | 19
|
||||
06-23-2018 | 19
|
||||
06-24-2018 | 19
|
||||
06-25-2018 | 13
|
||||
06-26-2018 | 13
|
||||
06-27-2018 | 13
|
||||
06-28-2018 | 13
|
||||
06-29-2018 | 13
|
||||
(10 rows)
|
||||
|
||||
-- Union aggregated data for one week
|
||||
SELECT hll_cardinality(hll_union_agg(unique_users))
|
||||
FROM daily_uniques
|
||||
WHERE day >= '2018-05-24'::date AND day <= '2018-05-31'::date;
|
||||
hll_cardinality
|
||||
-----------------
|
||||
19
|
||||
(1 row)
|
||||
|
||||
SELECT EXTRACT(MONTH FROM day) AS month, hll_cardinality(hll_union_agg(unique_users))
|
||||
FROM daily_uniques
|
||||
WHERE day >= '2018-06-23' AND day <= '2018-07-01'
|
||||
GROUP BY 1
|
||||
ORDER BY 1;
|
||||
month | hll_cardinality
|
||||
-------+-----------------
|
||||
6 | 19
|
||||
7 | 13
|
||||
(2 rows)
|
||||
|
||||
-- These are going to be supported after window function support
|
||||
SELECT day, hll_cardinality(hll_union_agg(unique_users) OVER seven_days)
|
||||
FROM daily_uniques
|
||||
WINDOW seven_days AS (ORDER BY day ASC ROWS 6 PRECEDING);
|
||||
ERROR: could not run distributed query because the window function that is used cannot be pushed down
|
||||
HINT: Window functions are supported in two ways. Either add an equality filter on the distributed tables' partition column or use the window functions with a PARTITION BY clause containing the distribution column
|
||||
SELECT day, (hll_cardinality(hll_union_agg(unique_users) OVER two_days)) - hll_cardinality(unique_users) AS lost_uniques
|
||||
FROM daily_uniques
|
||||
WINDOW two_days AS (ORDER BY day ASC ROWS 1 PRECEDING);
|
||||
ERROR: could not run distributed query because the window function that is used cannot be pushed down
|
||||
HINT: Window functions are supported in two ways. Either add an equality filter on the distributed tables' partition column or use the window functions with a PARTITION BY clause containing the distribution column
|
||||
DROP TABLE raw_table;
|
||||
DROP TABLE daily_uniques;
|
|
@ -0,0 +1,112 @@
|
|||
--
|
||||
-- CUSTOM_AGGREGATE_SUPPORT
|
||||
--
|
||||
-- Create HLL extension if present, print false result otherwise
|
||||
SELECT CASE WHEN COUNT(*) > 0 THEN
|
||||
'CREATE EXTENSION HLL'
|
||||
ELSE 'SELECT false AS hll_present' END
|
||||
AS create_cmd FROM pg_available_extensions()
|
||||
WHERE name = 'hll'
|
||||
\gset
|
||||
:create_cmd;
|
||||
hll_present
|
||||
-------------
|
||||
f
|
||||
(1 row)
|
||||
|
||||
\c - - - :worker_1_port
|
||||
:create_cmd;
|
||||
hll_present
|
||||
-------------
|
||||
f
|
||||
(1 row)
|
||||
|
||||
\c - - - :worker_2_port
|
||||
:create_cmd;
|
||||
hll_present
|
||||
-------------
|
||||
f
|
||||
(1 row)
|
||||
|
||||
\c - - - :master_port
|
||||
SET citus.shard_count TO 4;
|
||||
CREATE TABLE raw_table (day date, user_id int);
|
||||
CREATE TABLE daily_uniques(day date, unique_users hll);
|
||||
ERROR: type "hll" does not exist
|
||||
LINE 1: CREATE TABLE daily_uniques(day date, unique_users hll);
|
||||
^
|
||||
SELECT create_distributed_table('raw_table', 'user_id');
|
||||
create_distributed_table
|
||||
--------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT create_distributed_table('daily_uniques', 'day');
|
||||
ERROR: relation "daily_uniques" does not exist
|
||||
LINE 1: SELECT create_distributed_table('daily_uniques', 'day');
|
||||
^
|
||||
INSERT INTO raw_table
|
||||
SELECT day, user_id % 19
|
||||
FROM generate_series('2018-05-24'::timestamp, '2018-06-24'::timestamp, '1 day'::interval) as f(day),
|
||||
generate_series(1,100) as g(user_id);
|
||||
INSERT INTO raw_table
|
||||
SELECT day, user_id % 13
|
||||
FROM generate_series('2018-06-10'::timestamp, '2018-07-10'::timestamp, '1 day'::interval) as f(day),
|
||||
generate_series(1,100) as g(user_id);
|
||||
-- Run hll on raw data
|
||||
SELECT hll_cardinality(hll_union_agg(agg))
|
||||
FROM (
|
||||
SELECT hll_add_agg(hll_hash_integer(user_id)) AS agg
|
||||
FROM raw_table)a;
|
||||
ERROR: function hll_hash_integer(integer) does not exist
|
||||
LINE 3: SELECT hll_add_agg(hll_hash_integer(user_id)) AS agg
|
||||
^
|
||||
HINT: No function matches the given name and argument types. You might need to add explicit type casts.
|
||||
-- Aggregate the data into daily_uniques
|
||||
INSERT INTO daily_uniques
|
||||
SELECT day, hll_add_agg(hll_hash_integer(user_id))
|
||||
FROM raw_table
|
||||
GROUP BY 1;
|
||||
ERROR: relation "daily_uniques" does not exist
|
||||
LINE 1: INSERT INTO daily_uniques
|
||||
^
|
||||
-- Basic hll_cardinality check on aggregated data
|
||||
SELECT day, hll_cardinality(unique_users)
|
||||
FROM daily_uniques
|
||||
WHERE day >= '2018-06-20' and day <= '2018-06-30'
|
||||
ORDER BY 2 DESC,1
|
||||
LIMIT 10;
|
||||
ERROR: relation "daily_uniques" does not exist
|
||||
LINE 2: FROM daily_uniques
|
||||
^
|
||||
-- Union aggregated data for one week
|
||||
SELECT hll_cardinality(hll_union_agg(unique_users))
|
||||
FROM daily_uniques
|
||||
WHERE day >= '2018-05-24'::date AND day <= '2018-05-31'::date;
|
||||
ERROR: relation "daily_uniques" does not exist
|
||||
LINE 2: FROM daily_uniques
|
||||
^
|
||||
SELECT EXTRACT(MONTH FROM day) AS month, hll_cardinality(hll_union_agg(unique_users))
|
||||
FROM daily_uniques
|
||||
WHERE day >= '2018-06-23' AND day <= '2018-07-01'
|
||||
GROUP BY 1
|
||||
ORDER BY 1;
|
||||
ERROR: relation "daily_uniques" does not exist
|
||||
LINE 2: FROM daily_uniques
|
||||
^
|
||||
-- These are going to be supported after window function support
|
||||
SELECT day, hll_cardinality(hll_union_agg(unique_users) OVER seven_days)
|
||||
FROM daily_uniques
|
||||
WINDOW seven_days AS (ORDER BY day ASC ROWS 6 PRECEDING);
|
||||
ERROR: relation "daily_uniques" does not exist
|
||||
LINE 2: FROM daily_uniques
|
||||
^
|
||||
SELECT day, (hll_cardinality(hll_union_agg(unique_users) OVER two_days)) - hll_cardinality(unique_users) AS lost_uniques
|
||||
FROM daily_uniques
|
||||
WINDOW two_days AS (ORDER BY day ASC ROWS 1 PRECEDING);
|
||||
ERROR: relation "daily_uniques" does not exist
|
||||
LINE 2: FROM daily_uniques
|
||||
^
|
||||
DROP TABLE raw_table;
|
||||
DROP TABLE daily_uniques;
|
||||
ERROR: table "daily_uniques" does not exist
|
|
@ -58,7 +58,7 @@ test: multi_subquery_complex_reference_clause multi_subquery_window_functions mu
|
|||
test: multi_subquery_in_where_reference_clause
|
||||
test: multi_subquery_union multi_subquery_in_where_clause multi_subquery_misc
|
||||
test: multi_agg_distinct multi_agg_approximate_distinct multi_limit_clause_approximate multi_outer_join_reference multi_single_relation_subquery multi_prepare_plsql
|
||||
test: multi_reference_table multi_select_for_update relation_access_tracking
|
||||
test: multi_reference_table multi_select_for_update relation_access_tracking custom_aggregate_support
|
||||
test: multi_average_expression multi_working_columns multi_having_pushdown
|
||||
test: multi_array_agg multi_limit_clause multi_orderby_limit_pushdown
|
||||
test: multi_jsonb_agg multi_jsonb_object_agg multi_json_agg multi_json_object_agg bool_agg
|
||||
|
|
|
@ -0,0 +1,80 @@
|
|||
--
|
||||
-- CUSTOM_AGGREGATE_SUPPORT
|
||||
--
|
||||
-- Create HLL extension if present, print false result otherwise
|
||||
SELECT CASE WHEN COUNT(*) > 0 THEN
|
||||
'CREATE EXTENSION HLL'
|
||||
ELSE 'SELECT false AS hll_present' END
|
||||
AS create_cmd FROM pg_available_extensions()
|
||||
WHERE name = 'hll'
|
||||
\gset
|
||||
|
||||
:create_cmd;
|
||||
|
||||
\c - - - :worker_1_port
|
||||
:create_cmd;
|
||||
|
||||
\c - - - :worker_2_port
|
||||
:create_cmd;
|
||||
|
||||
\c - - - :master_port
|
||||
|
||||
SET citus.shard_count TO 4;
|
||||
|
||||
CREATE TABLE raw_table (day date, user_id int);
|
||||
CREATE TABLE daily_uniques(day date, unique_users hll);
|
||||
|
||||
SELECT create_distributed_table('raw_table', 'user_id');
|
||||
SELECT create_distributed_table('daily_uniques', 'day');
|
||||
|
||||
INSERT INTO raw_table
|
||||
SELECT day, user_id % 19
|
||||
FROM generate_series('2018-05-24'::timestamp, '2018-06-24'::timestamp, '1 day'::interval) as f(day),
|
||||
generate_series(1,100) as g(user_id);
|
||||
INSERT INTO raw_table
|
||||
SELECT day, user_id % 13
|
||||
FROM generate_series('2018-06-10'::timestamp, '2018-07-10'::timestamp, '1 day'::interval) as f(day),
|
||||
generate_series(1,100) as g(user_id);
|
||||
|
||||
-- Run hll on raw data
|
||||
SELECT hll_cardinality(hll_union_agg(agg))
|
||||
FROM (
|
||||
SELECT hll_add_agg(hll_hash_integer(user_id)) AS agg
|
||||
FROM raw_table)a;
|
||||
|
||||
-- Aggregate the data into daily_uniques
|
||||
INSERT INTO daily_uniques
|
||||
SELECT day, hll_add_agg(hll_hash_integer(user_id))
|
||||
FROM raw_table
|
||||
GROUP BY 1;
|
||||
|
||||
-- Basic hll_cardinality check on aggregated data
|
||||
SELECT day, hll_cardinality(unique_users)
|
||||
FROM daily_uniques
|
||||
WHERE day >= '2018-06-20' and day <= '2018-06-30'
|
||||
ORDER BY 2 DESC,1
|
||||
LIMIT 10;
|
||||
|
||||
-- Union aggregated data for one week
|
||||
SELECT hll_cardinality(hll_union_agg(unique_users))
|
||||
FROM daily_uniques
|
||||
WHERE day >= '2018-05-24'::date AND day <= '2018-05-31'::date;
|
||||
|
||||
|
||||
SELECT EXTRACT(MONTH FROM day) AS month, hll_cardinality(hll_union_agg(unique_users))
|
||||
FROM daily_uniques
|
||||
WHERE day >= '2018-06-23' AND day <= '2018-07-01'
|
||||
GROUP BY 1
|
||||
ORDER BY 1;
|
||||
|
||||
-- These are going to be supported after window function support
|
||||
SELECT day, hll_cardinality(hll_union_agg(unique_users) OVER seven_days)
|
||||
FROM daily_uniques
|
||||
WINDOW seven_days AS (ORDER BY day ASC ROWS 6 PRECEDING);
|
||||
|
||||
SELECT day, (hll_cardinality(hll_union_agg(unique_users) OVER two_days)) - hll_cardinality(unique_users) AS lost_uniques
|
||||
FROM daily_uniques
|
||||
WINDOW two_days AS (ORDER BY day ASC ROWS 1 PRECEDING);
|
||||
|
||||
DROP TABLE raw_table;
|
||||
DROP TABLE daily_uniques;
|
Loading…
Reference in New Issue