From 710e1631af35aa94c7d3839655e9e3dbf9b45ae2 Mon Sep 17 00:00:00 2001 From: Marco Slot Date: Tue, 26 Sep 2023 13:32:32 +0200 Subject: [PATCH] Expand aggregate functions section --- src/backend/distributed/README.md | 54 +++++++++++++++++++++++++------ 1 file changed, 44 insertions(+), 10 deletions(-) diff --git a/src/backend/distributed/README.md b/src/backend/distributed/README.md index e1fa07fb1..d46416cbf 100644 --- a/src/backend/distributed/README.md +++ b/src/backend/distributed/README.md @@ -842,18 +842,52 @@ The most interesting part of the optimizer is usually in the final stage, when h [Aggregate functions](https://www.postgresql.org/docs/current/sql-createaggregate.html) can appear in the SELECT (target list) or HAVING clause of a query, often in the context of a `GROUP BY`. The aggregate primarily specify a state function (`sfunc`), which is called for every row in the group, and an `stype` which defines the data format in which intermediate state is held as a type, which maybe be `internal`. Many aggregates also have a `finalfunc`, which converts the last `stype` value to the final result of the aggregate function. -Citus support distributing aggregate functions in several ways: -- **Aggregate functions in queries that group by distribution column can be fully pushed down, since no cross-shard aggregation is needed**. This is mostly handled by the rules in `CanPushDownExpression`. -- **Built-in, or well-known aggregate functions (based on their name) are distributed using custom rules**. An almost-complete list of aggregates that are handled in this way can be found in the `AggregateNames` variable. Here the optimizer implements rules such as injecting a `sum` and `count` aggregate in the worker target list, and doing a `sum(sum)/sum(count)` on the master target list. -- **Aggregates that specify a `combinefunc` and have an non-internal `stype` are distributed using Citus-provided `worker_partial_agg` and `coord_combine_agg` aggregate functions**. -- **Other aggregates will be fully above the `MultiCollect` node, meaning the source data is pulled to the coordinator.** +Citus support distributing aggregate functions in several ways described below, each with an example. -TODO +**Aggregate functions in queries that group by distribution column can be fully pushed down, since no cross-shard aggregation is needed**. This is mostly handled by the rules in `CanPushDownExpression`. + +Example: + +```sql +select x, avg(y) from test group by x; +DEBUG: combine query: SELECT x, avg FROM pg_catalog.citus_extradata_container(10, NULL::cstring(0), NULL::cstring(0), '(i 1)'::cstring(0)) remote_scan(x integer, avg numeric) +NOTICE: issuing SELECT x, avg(y) AS avg FROM public.test_102041 test WHERE true GROUP BY x +NOTICE: issuing SELECT x, avg(y) AS avg FROM public.test_102042 test WHERE true GROUP BY x +... +``` + +**Built-in, or well-known aggregate functions (based on their name) are distributed using custom rules**. An almost-complete list of aggregates that are handled in this way can be found in the `AggregateNames` variable. Examples are `avg`, `sum`, `count`, `min`, `max`. To distribute an aggregate function like `avg`, the optimizer implements rules such as injecting a `sum` and `count` aggregate in the worker target list, and doing a `sum(sum)/sum(count)` on the master target list. The logic is agnostic to types, so it will for work any custom type that implements aggregate functions with the same name. + +Example: + +```sql +select y, avg(x) from test group by y; +DEBUG: combine query: SELECT y, (pg_catalog.sum(avg) OPERATOR(pg_catalog./) pg_catalog.sum(avg_1)) AS avg FROM pg_catalog.citus_extradata_container(10, NULL::cstring(0), NULL::cs +tring(0), '(i 1)'::cstring(0)) remote_scan(y integer, avg bigint, avg_1 bigint) GROUP BY y +NOTICE: issuing SELECT y, sum(x) AS avg, count(x) AS avg FROM public.test_102041 test WHERE true GROUP BY y +NOTICE: issuing SELECT y, sum(x) AS avg, count(x) AS avg FROM public.test_102042 test WHERE true GROUP BY y +``` + +**Aggregates that specify a `combinefunc` and have an non-internal `stype` are distributed using generic aggregate functions**. The `worker_partial_agg` aggregate function is pushed down to the worker runs the `sfunc` of the custom aggregate across the tuples of a shard without running the `finalfunc` (which should come after `combinefunc`). The `coord_combine_agg` aggregate function runs the `combinefunc` across the `stype` values returned by `worker_partial_agg` and runs the `finalfunc` to obtain the final result of the aggregate function. This approach currently does not support aggregates whose `stype` is `internal`. A reason we for not handling `internal` is that it is not clear that they can always be safely transferred to a different server, though that may be overly pedantic. + +Example: +```sql +select st_memunion(geo) from test; +DEBUG: combine query: SELECT coord_combine_agg('351463'::oid, st_memunion, NULL::postgis_public.geometry) AS st_memunion FROM pg_catalog.citus_extradata_container(10, NULL::cstring(0), NULL::cstring(0), '(i 1)'::cstring(0)) remote_scan(st_memunion cstring) +NOTICE: issuing SELECT worker_partial_agg('postgis_public.st_memunion(postgis_public.geometry)'::regprocedure, geo) AS st_memunion FROM public.test_102041 test WHERE true +NOTICE: issuing SELECT worker_partial_agg('postgis_public.st_memunion(postgis_public.geometry)'::regprocedure, geo) AS st_memunion FROM public.test_102042 test WHERE true +``` + +**Other aggregates will be fully above the `MultiCollect` node, meaning the source data is pulled to the coordinator.** If this is undesirable due to the performance/load risk, it can be disabled using `citus.coordinator_aggregation_strategy = 'disabled'`, in which case the aggregate function calls would result in an error. + +Example: +```sql +select st_union(geo) from test; +DEBUG: combine query: SELECT postgis_public.st_union(st_union) AS st_union FROM pg_catalog.citus_extradata_container(10, NULL::cstring(0), NULL::cstring(0), '(i 1)'::cstring(0)) remote_scan(st_union postgis_public.geometry) +NOTICE: issuing SELECT geo AS st_union FROM public.test_102041 test WHERE true +NOTICE: issuing SELECT geo AS st_union FROM public.test_102042 test WHERE true +``` -##### Final Notes - -Overall, the algorithm aims to move as much computation as possible closer to the data. This code path has not been updated for a while, so readers should debug the code themselves. - #### Multi Join Order