diff --git a/src/backend/distributed/planner/path_based_planner.c b/src/backend/distributed/planner/path_based_planner.c index 8716a809a..40d51dbbe 100644 --- a/src/backend/distributed/planner/path_based_planner.c +++ b/src/backend/distributed/planner/path_based_planner.c @@ -103,7 +103,11 @@ static List * ShardIntervalListForRelationPartitionValue(Oid relationId, Expr *partitionValue); static void PathBasedPlannerGroupAgg(PlannerInfo *root, RelOptInfo *input_rel, RelOptInfo *output_rel, void *extra); -static List * OptimizeGroupAgg(PlannerInfo *root, Path *originalPath); +/* group agg optimizations */ +static List * PushDownAggPath(PlannerInfo *root, Path *originalPath); +static List * RepartitionAggPath(PlannerInfo *root, Path *originalPath); + +/* geo experimantal*/ static GeoScanPath * makeGeoScanPath(Relation rel, RelOptInfo *parent, PathTarget *pathtarget, double rows); static bool IsGeoScanPath(CustomPath *path); @@ -112,6 +116,7 @@ static RangeTblEntry * makeRangeTableEntryForRelation(Relation rel, Alias *alias, bool inh, bool inFromCl); + static bool VarInList(List *varList, Var *var); /* @@ -168,13 +173,20 @@ AddCollectCostOverhead(CitusOperationCosts *overhead, Path *originalPath) const double mbPerRow = originalPath->pathtarget->width / (1024.0 * 1024.0); const double shards = 4; /* TODO get these from planner colocation information */ + /* + * Collects return their first tuple after: + * 1. the original path has been completely executed + * 2. all tuples have been transferred over the network + * 3. (potentially add costs for local tuple store) + */ - - overhead->startup_cost += CollectStartupCost; - overhead->total_cost += - CollectStartupCost + Cost startupToTotal = originalPath->total_cost - originalPath->startup_cost; + Cost networkTransferCost = CollectStartupCost + (CollectPerRowCost * originalPath->rows) + (CollectPerMBCost * originalPath->rows * mbPerRow); + + overhead->startup_cost += startupToTotal + networkTransferCost; + overhead->total_cost += networkTransferCost; } @@ -184,11 +196,21 @@ AddRepartitionCostOverhead(CitusOperationCosts *overhead, Path *originalPath) const double mbPerRow = originalPath->pathtarget->width / (1024.0 * 1024.0); const double shards = 4; /* TODO get these from planner colocation information */ - /* TODO scaling crosss transfer simply by number of shards is probably not correct */ - overhead->startup_cost += RepartitionStartupCost; - overhead->total_cost += RepartitionStartupCost + /* + * Repartitions return their first tuple after: + * 1. the original path has been completely executed + * 2. all tuples have been transferred over the network (bisection) + * 3. (potentially add costs for local tuple store) + */ + + Cost startupToTotal = originalPath->total_cost - originalPath->startup_cost; + Cost networkTransferCost = RepartitionStartupCost + (RepartitionPerRowCost * originalPath->rows / shards) + (RepartitionPerMBCost * originalPath->rows / shards * mbPerRow); + + /* TODO scaling crosss transfer simply by number of shards is probably not correct */ + overhead->startup_cost += startupToTotal + networkTransferCost; + overhead->total_cost += networkTransferCost; }; @@ -1152,6 +1174,37 @@ CanRepartition(DistributedUnionPath *source, DistributedUnionPath *target, return false; } +static List * +MergeEquivalentExpressions(List *attrs, List *restrictInfos) +{ + RestrictInfo *rinfo = NULL; + List *rattrs = NIL; + foreach_ptr(rinfo, restrictInfos) + { + if (DistritbutionAttributesInEquvalenceMember(attrs, rinfo->left_em)) + { + if (IsA(rinfo->right_em->em_expr, Var)) + { + rattrs = lappend(rattrs, castNode(Var, rinfo->right_em->em_expr)); + } + } + else if (DistritbutionAttributesInEquvalenceMember(attrs, rinfo->right_em)) + { + if (IsA(rinfo->left_em, Var)) + { + rattrs = lappend(rattrs, castNode(Var, rinfo->left_em->em_expr)); + } + } + else + { + /* this rinfo is not with an equality filter on the distribution attributes of the target relation */ + continue; + } + } + + return list_concat(rattrs, attrs); +} + static List * OptimizeRepartitionInnerJoinPath(PlannerInfo *root, Path *originalPath) @@ -1269,10 +1322,15 @@ OptimizeRepartitionInnerJoinPath(PlannerInfo *root, Path *originalPath) /* create Collect on top of new join, base Collect on matched outer Collect */ const DistributedUnionPath *baseDistUnion = outerDU; + + /* Add dist attrs based on equivalent members of the join restrict info */ + List *distAttrs = MergeEquivalentExpressions(baseDistUnion->distributionAttrs, + joinPath->joinrestrictinfo); + Path *newPath = WrapTableAccessWithDistributedUnion( (Path *) newJoinPath, baseDistUnion->colocationId, - baseDistUnion->distributionAttrs, + distAttrs, baseDistUnion->partitionValue, baseDistUnion->sampleRelid, baseDistUnion->custom_path.custom_paths); @@ -1330,10 +1388,15 @@ OptimizeRepartitionInnerJoinPath(PlannerInfo *root, Path *originalPath) /* create Collect on top of new join, base Collect on matched outer Collect */ const DistributedUnionPath *baseDistUnion = innerDU; + + /* Add dist attrs based on equivalent members of the join restrict info */ + List *distAttrs = MergeEquivalentExpressions(baseDistUnion->distributionAttrs, + joinPath->joinrestrictinfo); + Path *newPath = WrapTableAccessWithDistributedUnion( (Path *) newJoinPath, baseDistUnion->colocationId, - baseDistUnion->distributionAttrs, + distAttrs, baseDistUnion->partitionValue, baseDistUnion->sampleRelid, baseDistUnion->custom_path.custom_paths); @@ -1928,7 +1991,8 @@ PathBasedPlannerGroupAgg(PlannerInfo *root, Path *originalPath = NULL; foreach_ptr(originalPath, output_rel->pathlist) { - newPaths = list_concat(newPaths, OptimizeGroupAgg(root, originalPath)); + newPaths = list_concat(newPaths, PushDownAggPath(root, originalPath)); + newPaths = list_concat(newPaths, RepartitionAggPath(root, originalPath)); } Path *newPath = NULL; @@ -1995,8 +2059,44 @@ GroupClauseContainsDistributionAttribute(PlannerInfo *root, AggPath *aggPath, } +static Var * +FirstGroupClauseVarExpr(PlannerInfo *root, AggPath *aggPath) +{ + /* + * TODO verify whats the purpose of the list, if we find any of the distribution + * columns somewhere in this we optimize, might be wrong + */ + SortGroupClause *sgc = NULL; + foreach_ptr(sgc, aggPath->groupClause) + { + PathTarget *target = aggPath->path.pathtarget; + Expr *targetExpr = NULL; + Index i = 0; + foreach_ptr(targetExpr, target->exprs) + { + Index targetSortGroupRef = target->sortgrouprefs[i]; + i++; + + if (targetSortGroupRef != sgc->tleSortGroupRef) + { + continue; + } + + if (!IsA(targetExpr, Var)) + { + continue; + } + + Var *targetVar = castNode(Var, targetExpr); + return targetVar; + } + } + + return NULL; +} + static List * -OptimizeGroupAgg(PlannerInfo *root, Path *originalPath) +PushDownAggPath(PlannerInfo *root, Path *originalPath) { AggPath *aggPath = NULL; DistributedUnionPath *collect = NULL; @@ -2077,3 +2177,97 @@ OptimizeGroupAgg(PlannerInfo *root, Path *originalPath) return NIL; } + +static List * +RepartitionAggPath(PlannerInfo *root, Path *originalPath) +{ + AggPath *aggPath = NULL; + DistributedUnionPath *collect = NULL; + + /* + * Match path: + * +-----------+ + * | Aggergate | + * +-----------+ + * | + * +---------+ + * | Collect | + * +---------+ + * + * Where the Aggregate is NOT grouped on any of the distribution attributes of the + * collect node. + */ + IfPathMatch( + originalPath, + MatchAgg( + &aggPath, + SkipReadThrough( + NoCapture, + MatchDistributedUnion( + &collect, + MatchAny)))) + { + if (GroupClauseContainsDistributionAttribute(root, aggPath, + collect->distributionAttrs)) + { + return NIL; + } + + Var *targetDistAttr = FirstGroupClauseVarExpr(root, aggPath); + if (targetDistAttr == NULL) + { + return false; + } + + /* + * Create new path + * +---------+ + * | Collect | + * +---------+ + * | + * +-----------+ + * | Aggregate | + * +-----------+ + * | + * +-------------+ + * | Repartition | + * +-------------+ + */ + + AggPath *newPath = makeNode(AggPath); + *newPath = *aggPath; + + /* repartition to a virtual colocation group */ + newPath->subpath = CreateRepartitionNode(-1, collect->worker_path); + + /* + * Subtract the overhead of the original collect node from the generated agg. This + * approximates the cost of the aggregate to be run on the workers. + */ + CitusOperationCosts costOverhead = { 0 }; + AddCollectCostOverhead(&costOverhead, collect->worker_path); + newPath->path.startup_cost -= costOverhead.startup_cost; + newPath->path.total_cost -= costOverhead.total_cost; + + CitusOperationCosts repartitionOverhead = { 0 }; + AddRepartitionCostOverhead(&repartitionOverhead, collect->worker_path); + newPath->path.startup_cost += repartitionOverhead.startup_cost; + newPath->path.total_cost += repartitionOverhead.total_cost; + + /* + * TODO should we devide the actual cost by some factor as to assume aggregates + * are cheaper to push down? + */ + + return list_make1( + WrapTableAccessWithDistributedUnion((Path *) newPath, + -1, + list_make1(targetDistAttr), + NULL, + -1, + collect->custom_path.custom_paths)); + } + + return NIL; +} +