mirror of https://github.com/citusdata/citus.git
refactor agg pushdown and prevent illegal repartition paths to be generated
parent
a55331d934
commit
1eb418de89
|
@ -103,8 +103,7 @@ static List * ShardIntervalListForRelationPartitionValue(Oid relationId,
|
|||
Expr *partitionValue);
|
||||
static void PathBasedPlannerGroupAgg(PlannerInfo *root, RelOptInfo *input_rel,
|
||||
RelOptInfo *output_rel, void *extra);
|
||||
static Path * OptimizeGroupAgg(PlannerInfo *root, Path *originalPath);
|
||||
static bool CanOptimizeAggPath(PlannerInfo *root, AggPath *apath);
|
||||
static List * OptimizeGroupAgg(PlannerInfo *root, Path *originalPath);
|
||||
static GeoScanPath * makeGeoScanPath(Relation rel, RelOptInfo *parent,
|
||||
PathTarget *pathtarget, double rows);
|
||||
static bool IsGeoScanPath(CustomPath *path);
|
||||
|
@ -113,7 +112,7 @@ static RangeTblEntry * makeRangeTableEntryForRelation(Relation rel,
|
|||
Alias *alias,
|
||||
bool inh,
|
||||
bool inFromCl);
|
||||
|
||||
static bool VarInList(List *varList, Var *var);
|
||||
|
||||
/*
|
||||
* TODO some optimizations are useless if others are already provided. This might cause
|
||||
|
@ -1114,6 +1113,46 @@ OptimizeJoinPath(PlannerInfo *root, Path *originalPath)
|
|||
}
|
||||
|
||||
|
||||
static bool
|
||||
DistritbutionAttributesInEquvalenceMember(List *attrs, EquivalenceMember *em)
|
||||
{
|
||||
if (!IsA(em->em_expr, Var))
|
||||
{
|
||||
return false;
|
||||
}
|
||||
return VarInList(attrs, castNode(Var, em->em_expr));
|
||||
}
|
||||
|
||||
|
||||
static bool
|
||||
CanRepartition(DistributedUnionPath *source, DistributedUnionPath *target,
|
||||
JoinPath *joinPath)
|
||||
{
|
||||
RestrictInfo *rinfo = NULL;
|
||||
foreach_ptr(rinfo,joinPath->joinrestrictinfo)
|
||||
{
|
||||
if (DistritbutionAttributesInEquvalenceMember(target->distributionAttrs, rinfo->left_em))
|
||||
{
|
||||
/* its on the left, todo figure out if the source relation is on the right */
|
||||
return true;
|
||||
}
|
||||
else if (DistritbutionAttributesInEquvalenceMember(target->distributionAttrs, rinfo->right_em))
|
||||
{
|
||||
/* its on the right, todo figure out if the source relation is on the left */
|
||||
return true;
|
||||
}
|
||||
else
|
||||
{
|
||||
/* this rinfo is not with an equality filter on the distribution attributes of the target relation */
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
/* no equivalence checks found on the target relation */
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
static List *
|
||||
OptimizeRepartitionInnerJoinPath(PlannerInfo *root, Path *originalPath)
|
||||
{
|
||||
|
@ -1197,10 +1236,11 @@ OptimizeRepartitionInnerJoinPath(PlannerInfo *root, Path *originalPath)
|
|||
* +-------------------+
|
||||
* | inner.worker_path |
|
||||
* +-------------------+
|
||||
*
|
||||
* And with the inner and outer paths swapped
|
||||
*/
|
||||
List *newPaths = NIL;
|
||||
|
||||
if (CanRepartition(innerDU, outerDU, joinPath))
|
||||
{
|
||||
/* create new Join node */
|
||||
JoinPath *newJoinPath = makeNode(NestPath);
|
||||
*newJoinPath = *joinPath;
|
||||
|
@ -1228,7 +1268,7 @@ OptimizeRepartitionInnerJoinPath(PlannerInfo *root, Path *originalPath)
|
|||
newJoinPath->path.total_cost += overhead.total_cost;
|
||||
|
||||
/* create Collect on top of new join, base Collect on matched outer Collect */
|
||||
DistributedUnionPath *baseDistUnion = outerDU;
|
||||
const DistributedUnionPath *baseDistUnion = outerDU;
|
||||
Path *newPath = WrapTableAccessWithDistributedUnion(
|
||||
(Path *) newJoinPath,
|
||||
baseDistUnion->colocationId,
|
||||
|
@ -1237,12 +1277,34 @@ OptimizeRepartitionInnerJoinPath(PlannerInfo *root, Path *originalPath)
|
|||
baseDistUnion->sampleRelid,
|
||||
baseDistUnion->custom_path.custom_paths);
|
||||
|
||||
List *newPaths = NIL;
|
||||
newPaths = lappend(newPaths, newPath);
|
||||
}
|
||||
|
||||
/* now with the inner and outer swapped */
|
||||
|
||||
/* now with the inner and outer swapped
|
||||
* +------------------------------------+
|
||||
* | Collect |
|
||||
* | - ColocationID: outer.ColocationID |
|
||||
* +------------------------------------+
|
||||
* |
|
||||
* +---------+
|
||||
* | Join |
|
||||
* +---------+
|
||||
* / \
|
||||
* +------------------------------------+ +-------------------+
|
||||
* | Repartition | | inner.worker_path |
|
||||
* | - ColocationID: inner.colocationID | +-------------------+
|
||||
* +------------------------------------+
|
||||
* |
|
||||
* +-------------------+
|
||||
* | outer.worker_path |
|
||||
* +-------------------+
|
||||
*/
|
||||
|
||||
if (CanRepartition(outerDU, innerDU, joinPath))
|
||||
{
|
||||
/* create new Join node */
|
||||
newJoinPath = makeNode(NestPath);
|
||||
JoinPath *newJoinPath = makeNode(NestPath);
|
||||
*newJoinPath = *joinPath;
|
||||
newJoinPath->path.type = T_NestPath; /* reset type after copied join data */
|
||||
|
||||
|
@ -1254,7 +1316,7 @@ OptimizeRepartitionInnerJoinPath(PlannerInfo *root, Path *originalPath)
|
|||
|
||||
/* TODO find a good way to calculate join costs based on its inner/outer paths */
|
||||
/* subtract the double collect cost */
|
||||
overhead = EmptyCitusOperationCosts;
|
||||
CitusOperationCosts overhead = EmptyCitusOperationCosts;
|
||||
AddCollectCostOverhead(&overhead, outerDU->worker_path);
|
||||
AddCollectCostOverhead(&overhead, innerDU->worker_path);
|
||||
newJoinPath->path.startup_cost -= overhead.startup_cost;
|
||||
|
@ -1267,8 +1329,8 @@ OptimizeRepartitionInnerJoinPath(PlannerInfo *root, Path *originalPath)
|
|||
newJoinPath->path.total_cost += overhead.total_cost;
|
||||
|
||||
/* create Collect on top of new join, base Collect on matched outer Collect */
|
||||
baseDistUnion = innerDU;
|
||||
newPath = WrapTableAccessWithDistributedUnion(
|
||||
const DistributedUnionPath *baseDistUnion = innerDU;
|
||||
Path *newPath = WrapTableAccessWithDistributedUnion(
|
||||
(Path *) newJoinPath,
|
||||
baseDistUnion->colocationId,
|
||||
baseDistUnion->distributionAttrs,
|
||||
|
@ -1276,6 +1338,7 @@ OptimizeRepartitionInnerJoinPath(PlannerInfo *root, Path *originalPath)
|
|||
baseDistUnion->sampleRelid,
|
||||
baseDistUnion->custom_path.custom_paths);
|
||||
newPaths = lappend(newPaths, newPath);
|
||||
}
|
||||
|
||||
return newPaths;
|
||||
}
|
||||
|
@ -1851,59 +1914,27 @@ PathBasedPlannerGroupAgg(PlannerInfo *root,
|
|||
* that is partitioned by the grouping key. If that is the case we can pull the
|
||||
* distributed union above the aggregate which causes it to optimize the plan.
|
||||
*
|
||||
* TODO we just replace the plans for now, but during development we have encountered
|
||||
* a plan that would be better if the grouping would not be pushed down. When the
|
||||
* grouping is solely on a primary key the number of rows will stay the same, while
|
||||
* the width will increase due to any aggregates that could be performed on the data.
|
||||
* This plan has lower network traffic if the grouping would not be pushed down.
|
||||
* Instead of replacing it would benefit the planner to add a new path according to
|
||||
* Instead of replacing it would benefit the planner to add a new newPath according to
|
||||
* the potential optimization of pushing down. If <no. rows> * <row width> would be
|
||||
* taken into account in the cost of the plan this would cause magic to happen which
|
||||
* we currently could not support.
|
||||
*/
|
||||
|
||||
ListCell *pathCell = NULL;
|
||||
foreach(pathCell, output_rel->pathlist)
|
||||
List *newPaths = NIL;
|
||||
Path *originalPath = NULL;
|
||||
foreach_ptr(originalPath, output_rel->pathlist)
|
||||
{
|
||||
Path *originalPath = lfirst(pathCell);
|
||||
Path *optimizedGroupAdd = OptimizeGroupAgg(root, originalPath);
|
||||
SetListCellPtr(pathCell, optimizedGroupAdd);
|
||||
}
|
||||
newPaths = list_concat(newPaths, OptimizeGroupAgg(root, originalPath));
|
||||
}
|
||||
|
||||
|
||||
static Path *
|
||||
OptimizeGroupAgg(PlannerInfo *root, Path *originalPath)
|
||||
Path *newPath = NULL;
|
||||
foreach_ptr(newPath, newPaths)
|
||||
{
|
||||
switch (originalPath->pathtype)
|
||||
{
|
||||
case T_Agg:
|
||||
{
|
||||
AggPath *apath = castNode(AggPath, originalPath);
|
||||
if (CanOptimizeAggPath(root, apath))
|
||||
{
|
||||
DistributedUnionPath *distUnion = (DistributedUnionPath *) apath->subpath;
|
||||
apath->subpath = distUnion->worker_path;
|
||||
|
||||
/* TODO better cost model, for now substract the DU costs */
|
||||
apath->path.startup_cost -= 1000;
|
||||
apath->path.total_cost -= 1000;
|
||||
|
||||
return WrapTableAccessWithDistributedUnion(
|
||||
(Path *) apath,
|
||||
distUnion->colocationId,
|
||||
distUnion->distributionAttrs,
|
||||
distUnion->partitionValue,
|
||||
distUnion->sampleRelid,
|
||||
distUnion->custom_path.custom_paths);
|
||||
}
|
||||
}
|
||||
|
||||
default:
|
||||
{
|
||||
/* no optimisations to be performed*/
|
||||
return originalPath;
|
||||
}
|
||||
add_path(output_rel, newPath);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1924,32 +1955,17 @@ VarInList(List *varList, Var *var)
|
|||
|
||||
|
||||
static bool
|
||||
CanOptimizeAggPath(PlannerInfo *root, AggPath *apath)
|
||||
GroupClauseContainsDistributionAttribute(PlannerInfo *root, AggPath *aggPath,
|
||||
List *distributionAttributes)
|
||||
{
|
||||
if (apath->groupClause == NULL)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
if (!IsDistributedUnion(apath->subpath, false, NULL))
|
||||
{
|
||||
/*
|
||||
* we only can optimize if the path below is a distributed union that we can pull
|
||||
* up, if the path below is not a distributed union we cannot optimize
|
||||
*/
|
||||
return false;
|
||||
}
|
||||
DistributedUnionPath *distUnion = (DistributedUnionPath *) apath->subpath;
|
||||
|
||||
SortGroupClause *sgc = NULL;
|
||||
|
||||
/*
|
||||
* TODO verify whats the purpose of the list, if we find any of the distribution
|
||||
* columns somewhere in this we optimize, might be wrong
|
||||
*/
|
||||
foreach_ptr(sgc, apath->groupClause)
|
||||
SortGroupClause *sgc = NULL;
|
||||
foreach_ptr(sgc, aggPath->groupClause)
|
||||
{
|
||||
PathTarget *target = apath->path.pathtarget;
|
||||
PathTarget *target = aggPath->path.pathtarget;
|
||||
Expr *targetExpr = NULL;
|
||||
Index i = 0;
|
||||
foreach_ptr(targetExpr, target->exprs)
|
||||
|
@ -1968,7 +1984,7 @@ CanOptimizeAggPath(PlannerInfo *root, AggPath *apath)
|
|||
}
|
||||
|
||||
Var *targetVar = castNode(Var, targetExpr);
|
||||
if (VarInList(distUnion->distributionAttrs, targetVar))
|
||||
if (VarInList(distributionAttributes, targetVar))
|
||||
{
|
||||
return true;
|
||||
}
|
||||
|
@ -1977,3 +1993,87 @@ CanOptimizeAggPath(PlannerInfo *root, AggPath *apath)
|
|||
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
static List *
|
||||
OptimizeGroupAgg(PlannerInfo *root, Path *originalPath)
|
||||
{
|
||||
AggPath *aggPath = NULL;
|
||||
DistributedUnionPath *collect = NULL;
|
||||
|
||||
/*
|
||||
* Match path:
|
||||
* +-----------+
|
||||
* | Aggergate |
|
||||
* +-----------+
|
||||
* |
|
||||
* +---------+
|
||||
* | Collect |
|
||||
* +---------+
|
||||
*
|
||||
* Where the Aggregate is grouped on any of the distribution attributes of the collect
|
||||
* node.
|
||||
*/
|
||||
IfPathMatch(
|
||||
originalPath,
|
||||
MatchAgg(
|
||||
&aggPath,
|
||||
SkipReadThrough(
|
||||
NoCapture,
|
||||
MatchDistributedUnion(
|
||||
&collect,
|
||||
MatchAny))))
|
||||
{
|
||||
if (!GroupClauseContainsDistributionAttribute(root, aggPath,
|
||||
collect->distributionAttrs))
|
||||
{
|
||||
return NIL;
|
||||
}
|
||||
|
||||
/*
|
||||
* Create new path
|
||||
* +---------+
|
||||
* | Collect |
|
||||
* +---------+
|
||||
* |
|
||||
* +-----------+
|
||||
* | Aggregate |
|
||||
* +-----------+
|
||||
*
|
||||
* Since the aggregate matched on the distribution attribute it is guaranteed that
|
||||
* all members of a grouping are in a single ShardGroup under the collect, hence
|
||||
* we can simply push the Aggregate to the workers.
|
||||
*/
|
||||
|
||||
AggPath *newPath = makeNode(AggPath);
|
||||
*newPath = *aggPath;
|
||||
|
||||
/* make sure the newPath has the original worker_path hanging under it */
|
||||
newPath->subpath = collect->worker_path;
|
||||
|
||||
/*
|
||||
* Subtract the overhead of the original collect node from the generated agg. This
|
||||
* approximates the cost of the aggregate to be run on the workers.
|
||||
*/
|
||||
CitusOperationCosts costOverhead = { 0 };
|
||||
AddCollectCostOverhead(&costOverhead, collect->worker_path);
|
||||
newPath->path.startup_cost -= costOverhead.startup_cost;
|
||||
newPath->path.total_cost -= costOverhead.total_cost;
|
||||
|
||||
/*
|
||||
* TODO should we devide the actual cost by some factor as to assume aggregates
|
||||
* are cheaper to push down?
|
||||
*/
|
||||
|
||||
return list_make1(
|
||||
WrapTableAccessWithDistributedUnion((Path *) newPath,
|
||||
collect->colocationId,
|
||||
collect->distributionAttrs,
|
||||
collect->partitionValue,
|
||||
collect->sampleRelid,
|
||||
collect->custom_path.custom_paths));
|
||||
}
|
||||
|
||||
return NIL;
|
||||
}
|
||||
|
||||
|
|
|
@ -94,6 +94,39 @@
|
|||
DoCapture(capture, Path *, pathToMatch); \
|
||||
}
|
||||
|
||||
#define MatchAgg(capture, matcher) \
|
||||
{ \
|
||||
bool m = false; \
|
||||
switch (pathToMatch->type) \
|
||||
{ \
|
||||
case T_AggPath: \
|
||||
{ \
|
||||
m = true; \
|
||||
break; \
|
||||
} \
|
||||
\
|
||||
default: \
|
||||
{ \
|
||||
m = false; \
|
||||
break; \
|
||||
} \
|
||||
} \
|
||||
\
|
||||
if (!m) \
|
||||
{ \
|
||||
MatchFailed; \
|
||||
} \
|
||||
\
|
||||
PushStack(pathToMatch); \
|
||||
\
|
||||
pathToMatch = castNode(AggPath, pathToMatch)->subpath; \
|
||||
matcher; \
|
||||
\
|
||||
PopStack(pathToMatch); \
|
||||
\
|
||||
DoCapture(capture, AggPath *, pathToMatch); \
|
||||
}
|
||||
|
||||
#define MatchJoin(capture, joinType, conditionMatcher, innerMatcher, outerMatcher) \
|
||||
{ \
|
||||
{ \
|
||||
|
|
Loading…
Reference in New Issue