diff --git a/src/backend/distributed/planner/path_based_planner.c b/src/backend/distributed/planner/path_based_planner.c index b88bb26ce..8716a809a 100644 --- a/src/backend/distributed/planner/path_based_planner.c +++ b/src/backend/distributed/planner/path_based_planner.c @@ -103,8 +103,7 @@ static List * ShardIntervalListForRelationPartitionValue(Oid relationId, Expr *partitionValue); static void PathBasedPlannerGroupAgg(PlannerInfo *root, RelOptInfo *input_rel, RelOptInfo *output_rel, void *extra); -static Path * OptimizeGroupAgg(PlannerInfo *root, Path *originalPath); -static bool CanOptimizeAggPath(PlannerInfo *root, AggPath *apath); +static List * OptimizeGroupAgg(PlannerInfo *root, Path *originalPath); static GeoScanPath * makeGeoScanPath(Relation rel, RelOptInfo *parent, PathTarget *pathtarget, double rows); static bool IsGeoScanPath(CustomPath *path); @@ -113,7 +112,7 @@ static RangeTblEntry * makeRangeTableEntryForRelation(Relation rel, Alias *alias, bool inh, bool inFromCl); - +static bool VarInList(List *varList, Var *var); /* * TODO some optimizations are useless if others are already provided. This might cause @@ -1114,6 +1113,46 @@ OptimizeJoinPath(PlannerInfo *root, Path *originalPath) } +static bool +DistritbutionAttributesInEquvalenceMember(List *attrs, EquivalenceMember *em) +{ + if (!IsA(em->em_expr, Var)) + { + return false; + } + return VarInList(attrs, castNode(Var, em->em_expr)); +} + + +static bool +CanRepartition(DistributedUnionPath *source, DistributedUnionPath *target, + JoinPath *joinPath) +{ + RestrictInfo *rinfo = NULL; + foreach_ptr(rinfo,joinPath->joinrestrictinfo) + { + if (DistritbutionAttributesInEquvalenceMember(target->distributionAttrs, rinfo->left_em)) + { + /* its on the left, todo figure out if the source relation is on the right */ + return true; + } + else if (DistritbutionAttributesInEquvalenceMember(target->distributionAttrs, rinfo->right_em)) + { + /* its on the right, todo figure out if the source relation is on the left */ + return true; + } + else + { + /* this rinfo is not with an equality filter on the distribution attributes of the target relation */ + continue; + } + } + + /* no equivalence checks found on the target relation */ + return false; +} + + static List * OptimizeRepartitionInnerJoinPath(PlannerInfo *root, Path *originalPath) { @@ -1197,85 +1236,109 @@ OptimizeRepartitionInnerJoinPath(PlannerInfo *root, Path *originalPath) * +-------------------+ * | inner.worker_path | * +-------------------+ - * - * And with the inner and outer paths swapped + */ + List *newPaths = NIL; + + if (CanRepartition(innerDU, outerDU, joinPath)) + { + /* create new Join node */ + JoinPath *newJoinPath = makeNode(NestPath); + *newJoinPath = *joinPath; + newJoinPath->path.type = T_NestPath; /* reset type after copied join data */ + + /* populate outer path*/ + newJoinPath->outerjoinpath = outerDU->worker_path; + + /* TODO understand how to describe on which attribute the Repartition needs to happen */ + newJoinPath->innerjoinpath = CreateRepartitionNode(outerDU->colocationId, + innerDU->worker_path); + + /* TODO find a good way to calculate join costs based on its inner/outer paths */ + /* subtract the double collect cost */ + CitusOperationCosts overhead = EmptyCitusOperationCosts; + AddCollectCostOverhead(&overhead, outerDU->worker_path); + AddCollectCostOverhead(&overhead, innerDU->worker_path); + newJoinPath->path.startup_cost -= overhead.startup_cost; + newJoinPath->path.total_cost -= overhead.total_cost; + + /* add the costs for the repartition */ + overhead = EmptyCitusOperationCosts; + AddRepartitionCostOverhead(&overhead, innerDU->worker_path); + newJoinPath->path.startup_cost += overhead.startup_cost; + newJoinPath->path.total_cost += overhead.total_cost; + + /* create Collect on top of new join, base Collect on matched outer Collect */ + const DistributedUnionPath *baseDistUnion = outerDU; + Path *newPath = WrapTableAccessWithDistributedUnion( + (Path *) newJoinPath, + baseDistUnion->colocationId, + baseDistUnion->distributionAttrs, + baseDistUnion->partitionValue, + baseDistUnion->sampleRelid, + baseDistUnion->custom_path.custom_paths); + + newPaths = lappend(newPaths, newPath); + } + + + /* now with the inner and outer swapped + * +------------------------------------+ + * | Collect | + * | - ColocationID: outer.ColocationID | + * +------------------------------------+ + * | + * +---------+ + * | Join | + * +---------+ + * / \ + * +------------------------------------+ +-------------------+ + * | Repartition | | inner.worker_path | + * | - ColocationID: inner.colocationID | +-------------------+ + * +------------------------------------+ + * | + * +-------------------+ + * | outer.worker_path | + * +-------------------+ */ - /* create new Join node */ - JoinPath *newJoinPath = makeNode(NestPath); - *newJoinPath = *joinPath; - newJoinPath->path.type = T_NestPath; /* reset type after copied join data */ + if (CanRepartition(outerDU, innerDU, joinPath)) + { + /* create new Join node */ + JoinPath *newJoinPath = makeNode(NestPath); + *newJoinPath = *joinPath; + newJoinPath->path.type = T_NestPath; /* reset type after copied join data */ - /* populate outer path*/ - newJoinPath->outerjoinpath = outerDU->worker_path; + /* TODO understand how to describe on which attribute the Repartition needs to happen */ + newJoinPath->outerjoinpath = CreateRepartitionNode(innerDU->colocationId, + outerDU->worker_path); - /* TODO understand how to describe on which attribute the Repartition needs to happen */ - newJoinPath->innerjoinpath = CreateRepartitionNode(outerDU->colocationId, - innerDU->worker_path); + newJoinPath->innerjoinpath = innerDU->worker_path; - /* TODO find a good way to calculate join costs based on its inner/outer paths */ - /* subtract the double collect cost */ - CitusOperationCosts overhead = EmptyCitusOperationCosts; - AddCollectCostOverhead(&overhead, outerDU->worker_path); - AddCollectCostOverhead(&overhead, innerDU->worker_path); - newJoinPath->path.startup_cost -= overhead.startup_cost; - newJoinPath->path.total_cost -= overhead.total_cost; + /* TODO find a good way to calculate join costs based on its inner/outer paths */ + /* subtract the double collect cost */ + CitusOperationCosts overhead = EmptyCitusOperationCosts; + AddCollectCostOverhead(&overhead, outerDU->worker_path); + AddCollectCostOverhead(&overhead, innerDU->worker_path); + newJoinPath->path.startup_cost -= overhead.startup_cost; + newJoinPath->path.total_cost -= overhead.total_cost; - /* add the costs for the repartition */ - overhead = EmptyCitusOperationCosts; - AddRepartitionCostOverhead(&overhead, innerDU->worker_path); - newJoinPath->path.startup_cost += overhead.startup_cost; - newJoinPath->path.total_cost += overhead.total_cost; + /* add the costs for the repartition */ + overhead = EmptyCitusOperationCosts; + AddRepartitionCostOverhead(&overhead, outerDU->worker_path); + newJoinPath->path.startup_cost += overhead.startup_cost; + newJoinPath->path.total_cost += overhead.total_cost; - /* create Collect on top of new join, base Collect on matched outer Collect */ - DistributedUnionPath *baseDistUnion = outerDU; - Path *newPath = WrapTableAccessWithDistributedUnion( - (Path *) newJoinPath, - baseDistUnion->colocationId, - baseDistUnion->distributionAttrs, - baseDistUnion->partitionValue, - baseDistUnion->sampleRelid, - baseDistUnion->custom_path.custom_paths); - - List *newPaths = NIL; - newPaths = lappend(newPaths, newPath); - - /* now with the inner and outer swapped */ - /* create new Join node */ - newJoinPath = makeNode(NestPath); - *newJoinPath = *joinPath; - newJoinPath->path.type = T_NestPath; /* reset type after copied join data */ - - /* TODO understand how to describe on which attribute the Repartition needs to happen */ - newJoinPath->outerjoinpath = CreateRepartitionNode(innerDU->colocationId, - outerDU->worker_path); - - newJoinPath->innerjoinpath = innerDU->worker_path; - - /* TODO find a good way to calculate join costs based on its inner/outer paths */ - /* subtract the double collect cost */ - overhead = EmptyCitusOperationCosts; - AddCollectCostOverhead(&overhead, outerDU->worker_path); - AddCollectCostOverhead(&overhead, innerDU->worker_path); - newJoinPath->path.startup_cost -= overhead.startup_cost; - newJoinPath->path.total_cost -= overhead.total_cost; - - /* add the costs for the repartition */ - overhead = EmptyCitusOperationCosts; - AddRepartitionCostOverhead(&overhead, outerDU->worker_path); - newJoinPath->path.startup_cost += overhead.startup_cost; - newJoinPath->path.total_cost += overhead.total_cost; - - /* create Collect on top of new join, base Collect on matched outer Collect */ - baseDistUnion = innerDU; - newPath = WrapTableAccessWithDistributedUnion( - (Path *) newJoinPath, - baseDistUnion->colocationId, - baseDistUnion->distributionAttrs, - baseDistUnion->partitionValue, - baseDistUnion->sampleRelid, - baseDistUnion->custom_path.custom_paths); - newPaths = lappend(newPaths, newPath); + /* create Collect on top of new join, base Collect on matched outer Collect */ + const DistributedUnionPath *baseDistUnion = innerDU; + Path *newPath = WrapTableAccessWithDistributedUnion( + (Path *) newJoinPath, + baseDistUnion->colocationId, + baseDistUnion->distributionAttrs, + baseDistUnion->partitionValue, + baseDistUnion->sampleRelid, + baseDistUnion->custom_path.custom_paths); + newPaths = lappend(newPaths, newPath); + } return newPaths; } @@ -1851,59 +1914,27 @@ PathBasedPlannerGroupAgg(PlannerInfo *root, * that is partitioned by the grouping key. If that is the case we can pull the * distributed union above the aggregate which causes it to optimize the plan. * - * TODO we just replace the plans for now, but during development we have encountered * a plan that would be better if the grouping would not be pushed down. When the * grouping is solely on a primary key the number of rows will stay the same, while * the width will increase due to any aggregates that could be performed on the data. * This plan has lower network traffic if the grouping would not be pushed down. - * Instead of replacing it would benefit the planner to add a new path according to + * Instead of replacing it would benefit the planner to add a new newPath according to * the potential optimization of pushing down. If * would be * taken into account in the cost of the plan this would cause magic to happen which * we currently could not support. */ - ListCell *pathCell = NULL; - foreach(pathCell, output_rel->pathlist) + List *newPaths = NIL; + Path *originalPath = NULL; + foreach_ptr(originalPath, output_rel->pathlist) { - Path *originalPath = lfirst(pathCell); - Path *optimizedGroupAdd = OptimizeGroupAgg(root, originalPath); - SetListCellPtr(pathCell, optimizedGroupAdd); + newPaths = list_concat(newPaths, OptimizeGroupAgg(root, originalPath)); } -} - -static Path * -OptimizeGroupAgg(PlannerInfo *root, Path *originalPath) -{ - switch (originalPath->pathtype) + Path *newPath = NULL; + foreach_ptr(newPath, newPaths) { - case T_Agg: - { - AggPath *apath = castNode(AggPath, originalPath); - if (CanOptimizeAggPath(root, apath)) - { - DistributedUnionPath *distUnion = (DistributedUnionPath *) apath->subpath; - apath->subpath = distUnion->worker_path; - - /* TODO better cost model, for now substract the DU costs */ - apath->path.startup_cost -= 1000; - apath->path.total_cost -= 1000; - - return WrapTableAccessWithDistributedUnion( - (Path *) apath, - distUnion->colocationId, - distUnion->distributionAttrs, - distUnion->partitionValue, - distUnion->sampleRelid, - distUnion->custom_path.custom_paths); - } - } - - default: - { - /* no optimisations to be performed*/ - return originalPath; - } + add_path(output_rel, newPath); } } @@ -1924,32 +1955,17 @@ VarInList(List *varList, Var *var) static bool -CanOptimizeAggPath(PlannerInfo *root, AggPath *apath) +GroupClauseContainsDistributionAttribute(PlannerInfo *root, AggPath *aggPath, + List *distributionAttributes) { - if (apath->groupClause == NULL) - { - return false; - } - - if (!IsDistributedUnion(apath->subpath, false, NULL)) - { - /* - * we only can optimize if the path below is a distributed union that we can pull - * up, if the path below is not a distributed union we cannot optimize - */ - return false; - } - DistributedUnionPath *distUnion = (DistributedUnionPath *) apath->subpath; - - SortGroupClause *sgc = NULL; - /* * TODO verify whats the purpose of the list, if we find any of the distribution * columns somewhere in this we optimize, might be wrong */ - foreach_ptr(sgc, apath->groupClause) + SortGroupClause *sgc = NULL; + foreach_ptr(sgc, aggPath->groupClause) { - PathTarget *target = apath->path.pathtarget; + PathTarget *target = aggPath->path.pathtarget; Expr *targetExpr = NULL; Index i = 0; foreach_ptr(targetExpr, target->exprs) @@ -1968,7 +1984,7 @@ CanOptimizeAggPath(PlannerInfo *root, AggPath *apath) } Var *targetVar = castNode(Var, targetExpr); - if (VarInList(distUnion->distributionAttrs, targetVar)) + if (VarInList(distributionAttributes, targetVar)) { return true; } @@ -1977,3 +1993,87 @@ CanOptimizeAggPath(PlannerInfo *root, AggPath *apath) return false; } + + +static List * +OptimizeGroupAgg(PlannerInfo *root, Path *originalPath) +{ + AggPath *aggPath = NULL; + DistributedUnionPath *collect = NULL; + + /* + * Match path: + * +-----------+ + * | Aggergate | + * +-----------+ + * | + * +---------+ + * | Collect | + * +---------+ + * + * Where the Aggregate is grouped on any of the distribution attributes of the collect + * node. + */ + IfPathMatch( + originalPath, + MatchAgg( + &aggPath, + SkipReadThrough( + NoCapture, + MatchDistributedUnion( + &collect, + MatchAny)))) + { + if (!GroupClauseContainsDistributionAttribute(root, aggPath, + collect->distributionAttrs)) + { + return NIL; + } + + /* + * Create new path + * +---------+ + * | Collect | + * +---------+ + * | + * +-----------+ + * | Aggregate | + * +-----------+ + * + * Since the aggregate matched on the distribution attribute it is guaranteed that + * all members of a grouping are in a single ShardGroup under the collect, hence + * we can simply push the Aggregate to the workers. + */ + + AggPath *newPath = makeNode(AggPath); + *newPath = *aggPath; + + /* make sure the newPath has the original worker_path hanging under it */ + newPath->subpath = collect->worker_path; + + /* + * Subtract the overhead of the original collect node from the generated agg. This + * approximates the cost of the aggregate to be run on the workers. + */ + CitusOperationCosts costOverhead = { 0 }; + AddCollectCostOverhead(&costOverhead, collect->worker_path); + newPath->path.startup_cost -= costOverhead.startup_cost; + newPath->path.total_cost -= costOverhead.total_cost; + + /* + * TODO should we devide the actual cost by some factor as to assume aggregates + * are cheaper to push down? + */ + + return list_make1( + WrapTableAccessWithDistributedUnion((Path *) newPath, + collect->colocationId, + collect->distributionAttrs, + collect->partitionValue, + collect->sampleRelid, + collect->custom_path.custom_paths)); + } + + return NIL; +} + diff --git a/src/include/distributed/planner/pattern_match.h b/src/include/distributed/planner/pattern_match.h index f610a8437..f857b939d 100644 --- a/src/include/distributed/planner/pattern_match.h +++ b/src/include/distributed/planner/pattern_match.h @@ -94,6 +94,39 @@ DoCapture(capture, Path *, pathToMatch); \ } +#define MatchAgg(capture, matcher) \ +{ \ + bool m = false; \ + switch (pathToMatch->type) \ + { \ + case T_AggPath: \ + { \ + m = true; \ + break; \ + } \ +\ + default: \ + { \ + m = false; \ + break; \ + } \ + } \ +\ + if (!m) \ + { \ + MatchFailed; \ + } \ +\ + PushStack(pathToMatch); \ +\ + pathToMatch = castNode(AggPath, pathToMatch)->subpath; \ + matcher; \ +\ + PopStack(pathToMatch); \ +\ + DoCapture(capture, AggPath *, pathToMatch); \ +} + #define MatchJoin(capture, joinType, conditionMatcher, innerMatcher, outerMatcher) \ { \ { \