From 3e88dde672fff985b2bf7126d833d9f7455231b7 Mon Sep 17 00:00:00 2001 From: Nils Dijk Date: Tue, 30 Nov 2021 18:42:01 +0100 Subject: [PATCH] partial group by optimization --- .../distributed/planner/path_based_planner.c | 99 +++++++++++++++++++ 1 file changed, 99 insertions(+) diff --git a/src/backend/distributed/planner/path_based_planner.c b/src/backend/distributed/planner/path_based_planner.c index ead2d2375..f30a31139 100644 --- a/src/backend/distributed/planner/path_based_planner.c +++ b/src/backend/distributed/planner/path_based_planner.c @@ -105,6 +105,7 @@ static void PathBasedPlannerGroupAgg(PlannerInfo *root, RelOptInfo *input_rel, RelOptInfo *output_rel, void *extra); /* group agg optimizations */ static List * PushDownAggPath(PlannerInfo *root, Path *originalPath); +static List * PartialPushDownAggPath(PlannerInfo *root, Path *originalPath); static List * RepartitionAggPath(PlannerInfo *root, Path *originalPath); /* geo experimantal*/ @@ -147,6 +148,7 @@ static optimizeFn joinOptimizations[] = { static optimizeFn groupOptimizations[] = { PushDownAggPath, + PartialPushDownAggPath, RepartitionAggPath, }; @@ -2187,6 +2189,103 @@ PushDownAggPath(PlannerInfo *root, Path *originalPath) } +static List * +PartialPushDownAggPath(PlannerInfo *root, Path *originalPath) +{ + AggPath *aggPath = NULL; + DistributedUnionPath *collect = NULL; + + /* + * Match path: + * +-----------+ + * | Aggergate | + * +-----------+ + * | + * +---------+ + * | Collect | + * +---------+ + * + * Where the grouping key is _not_ on the distribution attribute, but we could + * potentially still push down significant work. + */ + IfPathMatch( + originalPath, + MatchAgg( + &aggPath, + SkipReadThrough( + NoCapture, + MatchDistributedUnion( + &collect, + MatchAny)))) + { + if (GroupClauseContainsDistributionAttribute(root, aggPath, + collect->distributionAttrs)) + { + return NIL; + } + + /* TODO, check rewrite rules for partial aggs + implement */ + + /* + * Create new path + * +-----------+ + * | Aggregate | + * +-----------+ + * | + * +---------+ + * | Collect | + * +---------+ + * | + * +-----------+ + * | Aggregate | + * +-----------+ + */ + + const double shards = 4; /* TODO read from colocation information */ + + AggPath *newPath = makeNode(AggPath); + *newPath = *aggPath; + newPath->path.rows *= shards; /* we assume we need to read every aggregate entry from every shard */ + + /* make sure the newPath has the original worker_path hanging under it */ + newPath->subpath = collect->worker_path; + + /* + * Subtract the overhead of the original collect node from the generated agg. This + * approximates the cost of the aggregate to be run on the workers. + */ + CitusOperationCosts costOverhead = { 0 }; + AddCollectCostOverhead(&costOverhead, collect->worker_path); + newPath->path.startup_cost -= costOverhead.startup_cost; + newPath->path.total_cost -= costOverhead.total_cost; + + Path *newCollectPath = + WrapTableAccessWithDistributedUnion((Path *) newPath, + collect->colocationId, + collect->distributionAttrs, + collect->partitionValue, + collect->sampleRelid, + collect->custom_path.custom_paths); + + AggPath *newTopAggPath = makeNode(AggPath); + *newTopAggPath = *aggPath; + newTopAggPath->subpath = newCollectPath; + + Cost aggOverheadStartup = aggPath->path.total_cost - collect->custom_path.path.startup_cost; + Cost aggOverheadTotal = aggPath->path.total_cost - collect->custom_path.path.total_cost; + + double rowFraction = ((Path *) newPath)->rows / ((Path *) collect)->rows; + + newTopAggPath->path.startup_cost = newCollectPath->total_cost + aggOverheadStartup * rowFraction; + newTopAggPath->path.total_cost = newCollectPath->total_cost + aggOverheadTotal * rowFraction; + + return list_make1(newTopAggPath); + } + + return NIL; +} + + static List * RepartitionAggPath(PlannerInfo *root, Path *originalPath) {