partial group by optimization

moonshot/custom-path
Nils Dijk 2021-11-30 18:42:01 +01:00
parent ffc73634a8
commit 3e88dde672
No known key found for this signature in database
GPG Key ID: CA1177EF9434F241
1 changed files with 99 additions and 0 deletions

View File

@ -105,6 +105,7 @@ static void PathBasedPlannerGroupAgg(PlannerInfo *root, RelOptInfo *input_rel,
RelOptInfo *output_rel, void *extra); RelOptInfo *output_rel, void *extra);
/* group agg optimizations */ /* group agg optimizations */
static List * PushDownAggPath(PlannerInfo *root, Path *originalPath); static List * PushDownAggPath(PlannerInfo *root, Path *originalPath);
static List * PartialPushDownAggPath(PlannerInfo *root, Path *originalPath);
static List * RepartitionAggPath(PlannerInfo *root, Path *originalPath); static List * RepartitionAggPath(PlannerInfo *root, Path *originalPath);
/* geo experimantal*/ /* geo experimantal*/
@ -147,6 +148,7 @@ static optimizeFn joinOptimizations[] = {
static optimizeFn groupOptimizations[] = { static optimizeFn groupOptimizations[] = {
PushDownAggPath, PushDownAggPath,
PartialPushDownAggPath,
RepartitionAggPath, RepartitionAggPath,
}; };
@ -2187,6 +2189,103 @@ PushDownAggPath(PlannerInfo *root, Path *originalPath)
} }
static List *
PartialPushDownAggPath(PlannerInfo *root, Path *originalPath)
{
AggPath *aggPath = NULL;
DistributedUnionPath *collect = NULL;
/*
* Match path:
* +-----------+
* | Aggergate |
* +-----------+
* |
* +---------+
* | Collect |
* +---------+
*
* Where the grouping key is _not_ on the distribution attribute, but we could
* potentially still push down significant work.
*/
IfPathMatch(
originalPath,
MatchAgg(
&aggPath,
SkipReadThrough(
NoCapture,
MatchDistributedUnion(
&collect,
MatchAny))))
{
if (GroupClauseContainsDistributionAttribute(root, aggPath,
collect->distributionAttrs))
{
return NIL;
}
/* TODO, check rewrite rules for partial aggs + implement */
/*
* Create new path
* +-----------+
* | Aggregate |
* +-----------+
* |
* +---------+
* | Collect |
* +---------+
* |
* +-----------+
* | Aggregate |
* +-----------+
*/
const double shards = 4; /* TODO read from colocation information */
AggPath *newPath = makeNode(AggPath);
*newPath = *aggPath;
newPath->path.rows *= shards; /* we assume we need to read every aggregate entry from every shard */
/* make sure the newPath has the original worker_path hanging under it */
newPath->subpath = collect->worker_path;
/*
* Subtract the overhead of the original collect node from the generated agg. This
* approximates the cost of the aggregate to be run on the workers.
*/
CitusOperationCosts costOverhead = { 0 };
AddCollectCostOverhead(&costOverhead, collect->worker_path);
newPath->path.startup_cost -= costOverhead.startup_cost;
newPath->path.total_cost -= costOverhead.total_cost;
Path *newCollectPath =
WrapTableAccessWithDistributedUnion((Path *) newPath,
collect->colocationId,
collect->distributionAttrs,
collect->partitionValue,
collect->sampleRelid,
collect->custom_path.custom_paths);
AggPath *newTopAggPath = makeNode(AggPath);
*newTopAggPath = *aggPath;
newTopAggPath->subpath = newCollectPath;
Cost aggOverheadStartup = aggPath->path.total_cost - collect->custom_path.path.startup_cost;
Cost aggOverheadTotal = aggPath->path.total_cost - collect->custom_path.path.total_cost;
double rowFraction = ((Path *) newPath)->rows / ((Path *) collect)->rows;
newTopAggPath->path.startup_cost = newCollectPath->total_cost + aggOverheadStartup * rowFraction;
newTopAggPath->path.total_cost = newCollectPath->total_cost + aggOverheadTotal * rowFraction;
return list_make1(newTopAggPath);
}
return NIL;
}
static List * static List *
RepartitionAggPath(PlannerInfo *root, Path *originalPath) RepartitionAggPath(PlannerInfo *root, Path *originalPath)
{ {