try repartition grouping

moonshot/custom-path
Nils Dijk 2021-11-30 17:50:50 +01:00
parent 1eb418de89
commit 4542265dbf
No known key found for this signature in database
GPG Key ID: CA1177EF9434F241
1 changed files with 206 additions and 12 deletions

View File

@ -103,7 +103,11 @@ static List * ShardIntervalListForRelationPartitionValue(Oid relationId,
Expr *partitionValue); Expr *partitionValue);
static void PathBasedPlannerGroupAgg(PlannerInfo *root, RelOptInfo *input_rel, static void PathBasedPlannerGroupAgg(PlannerInfo *root, RelOptInfo *input_rel,
RelOptInfo *output_rel, void *extra); RelOptInfo *output_rel, void *extra);
static List * OptimizeGroupAgg(PlannerInfo *root, Path *originalPath); /* group agg optimizations */
static List * PushDownAggPath(PlannerInfo *root, Path *originalPath);
static List * RepartitionAggPath(PlannerInfo *root, Path *originalPath);
/* geo experimantal*/
static GeoScanPath * makeGeoScanPath(Relation rel, RelOptInfo *parent, static GeoScanPath * makeGeoScanPath(Relation rel, RelOptInfo *parent,
PathTarget *pathtarget, double rows); PathTarget *pathtarget, double rows);
static bool IsGeoScanPath(CustomPath *path); static bool IsGeoScanPath(CustomPath *path);
@ -112,6 +116,7 @@ static RangeTblEntry * makeRangeTableEntryForRelation(Relation rel,
Alias *alias, Alias *alias,
bool inh, bool inh,
bool inFromCl); bool inFromCl);
static bool VarInList(List *varList, Var *var); static bool VarInList(List *varList, Var *var);
/* /*
@ -168,13 +173,20 @@ AddCollectCostOverhead(CitusOperationCosts *overhead, Path *originalPath)
const double mbPerRow = originalPath->pathtarget->width / (1024.0 * 1024.0); const double mbPerRow = originalPath->pathtarget->width / (1024.0 * 1024.0);
const double shards = 4; /* TODO get these from planner colocation information */ const double shards = 4; /* TODO get these from planner colocation information */
/*
* Collects return their first tuple after:
* 1. the original path has been completely executed
* 2. all tuples have been transferred over the network
* 3. (potentially add costs for local tuple store)
*/
Cost startupToTotal = originalPath->total_cost - originalPath->startup_cost;
overhead->startup_cost += CollectStartupCost; Cost networkTransferCost = CollectStartupCost
overhead->total_cost +=
CollectStartupCost
+ (CollectPerRowCost * originalPath->rows) + (CollectPerRowCost * originalPath->rows)
+ (CollectPerMBCost * originalPath->rows * mbPerRow); + (CollectPerMBCost * originalPath->rows * mbPerRow);
overhead->startup_cost += startupToTotal + networkTransferCost;
overhead->total_cost += networkTransferCost;
} }
@ -184,11 +196,21 @@ AddRepartitionCostOverhead(CitusOperationCosts *overhead, Path *originalPath)
const double mbPerRow = originalPath->pathtarget->width / (1024.0 * 1024.0); const double mbPerRow = originalPath->pathtarget->width / (1024.0 * 1024.0);
const double shards = 4; /* TODO get these from planner colocation information */ const double shards = 4; /* TODO get these from planner colocation information */
/* TODO scaling crosss transfer simply by number of shards is probably not correct */ /*
overhead->startup_cost += RepartitionStartupCost; * Repartitions return their first tuple after:
overhead->total_cost += RepartitionStartupCost * 1. the original path has been completely executed
* 2. all tuples have been transferred over the network (bisection)
* 3. (potentially add costs for local tuple store)
*/
Cost startupToTotal = originalPath->total_cost - originalPath->startup_cost;
Cost networkTransferCost = RepartitionStartupCost
+ (RepartitionPerRowCost * originalPath->rows / shards) + (RepartitionPerRowCost * originalPath->rows / shards)
+ (RepartitionPerMBCost * originalPath->rows / shards * mbPerRow); + (RepartitionPerMBCost * originalPath->rows / shards * mbPerRow);
/* TODO scaling crosss transfer simply by number of shards is probably not correct */
overhead->startup_cost += startupToTotal + networkTransferCost;
overhead->total_cost += networkTransferCost;
}; };
@ -1152,6 +1174,37 @@ CanRepartition(DistributedUnionPath *source, DistributedUnionPath *target,
return false; return false;
} }
static List *
MergeEquivalentExpressions(List *attrs, List *restrictInfos)
{
RestrictInfo *rinfo = NULL;
List *rattrs = NIL;
foreach_ptr(rinfo, restrictInfos)
{
if (DistritbutionAttributesInEquvalenceMember(attrs, rinfo->left_em))
{
if (IsA(rinfo->right_em->em_expr, Var))
{
rattrs = lappend(rattrs, castNode(Var, rinfo->right_em->em_expr));
}
}
else if (DistritbutionAttributesInEquvalenceMember(attrs, rinfo->right_em))
{
if (IsA(rinfo->left_em, Var))
{
rattrs = lappend(rattrs, castNode(Var, rinfo->left_em->em_expr));
}
}
else
{
/* this rinfo is not with an equality filter on the distribution attributes of the target relation */
continue;
}
}
return list_concat(rattrs, attrs);
}
static List * static List *
OptimizeRepartitionInnerJoinPath(PlannerInfo *root, Path *originalPath) OptimizeRepartitionInnerJoinPath(PlannerInfo *root, Path *originalPath)
@ -1269,10 +1322,15 @@ OptimizeRepartitionInnerJoinPath(PlannerInfo *root, Path *originalPath)
/* create Collect on top of new join, base Collect on matched outer Collect */ /* create Collect on top of new join, base Collect on matched outer Collect */
const DistributedUnionPath *baseDistUnion = outerDU; const DistributedUnionPath *baseDistUnion = outerDU;
/* Add dist attrs based on equivalent members of the join restrict info */
List *distAttrs = MergeEquivalentExpressions(baseDistUnion->distributionAttrs,
joinPath->joinrestrictinfo);
Path *newPath = WrapTableAccessWithDistributedUnion( Path *newPath = WrapTableAccessWithDistributedUnion(
(Path *) newJoinPath, (Path *) newJoinPath,
baseDistUnion->colocationId, baseDistUnion->colocationId,
baseDistUnion->distributionAttrs, distAttrs,
baseDistUnion->partitionValue, baseDistUnion->partitionValue,
baseDistUnion->sampleRelid, baseDistUnion->sampleRelid,
baseDistUnion->custom_path.custom_paths); baseDistUnion->custom_path.custom_paths);
@ -1330,10 +1388,15 @@ OptimizeRepartitionInnerJoinPath(PlannerInfo *root, Path *originalPath)
/* create Collect on top of new join, base Collect on matched outer Collect */ /* create Collect on top of new join, base Collect on matched outer Collect */
const DistributedUnionPath *baseDistUnion = innerDU; const DistributedUnionPath *baseDistUnion = innerDU;
/* Add dist attrs based on equivalent members of the join restrict info */
List *distAttrs = MergeEquivalentExpressions(baseDistUnion->distributionAttrs,
joinPath->joinrestrictinfo);
Path *newPath = WrapTableAccessWithDistributedUnion( Path *newPath = WrapTableAccessWithDistributedUnion(
(Path *) newJoinPath, (Path *) newJoinPath,
baseDistUnion->colocationId, baseDistUnion->colocationId,
baseDistUnion->distributionAttrs, distAttrs,
baseDistUnion->partitionValue, baseDistUnion->partitionValue,
baseDistUnion->sampleRelid, baseDistUnion->sampleRelid,
baseDistUnion->custom_path.custom_paths); baseDistUnion->custom_path.custom_paths);
@ -1928,7 +1991,8 @@ PathBasedPlannerGroupAgg(PlannerInfo *root,
Path *originalPath = NULL; Path *originalPath = NULL;
foreach_ptr(originalPath, output_rel->pathlist) foreach_ptr(originalPath, output_rel->pathlist)
{ {
newPaths = list_concat(newPaths, OptimizeGroupAgg(root, originalPath)); newPaths = list_concat(newPaths, PushDownAggPath(root, originalPath));
newPaths = list_concat(newPaths, RepartitionAggPath(root, originalPath));
} }
Path *newPath = NULL; Path *newPath = NULL;
@ -1995,8 +2059,44 @@ GroupClauseContainsDistributionAttribute(PlannerInfo *root, AggPath *aggPath,
} }
static Var *
FirstGroupClauseVarExpr(PlannerInfo *root, AggPath *aggPath)
{
/*
* TODO verify whats the purpose of the list, if we find any of the distribution
* columns somewhere in this we optimize, might be wrong
*/
SortGroupClause *sgc = NULL;
foreach_ptr(sgc, aggPath->groupClause)
{
PathTarget *target = aggPath->path.pathtarget;
Expr *targetExpr = NULL;
Index i = 0;
foreach_ptr(targetExpr, target->exprs)
{
Index targetSortGroupRef = target->sortgrouprefs[i];
i++;
if (targetSortGroupRef != sgc->tleSortGroupRef)
{
continue;
}
if (!IsA(targetExpr, Var))
{
continue;
}
Var *targetVar = castNode(Var, targetExpr);
return targetVar;
}
}
return NULL;
}
static List * static List *
OptimizeGroupAgg(PlannerInfo *root, Path *originalPath) PushDownAggPath(PlannerInfo *root, Path *originalPath)
{ {
AggPath *aggPath = NULL; AggPath *aggPath = NULL;
DistributedUnionPath *collect = NULL; DistributedUnionPath *collect = NULL;
@ -2077,3 +2177,97 @@ OptimizeGroupAgg(PlannerInfo *root, Path *originalPath)
return NIL; return NIL;
} }
static List *
RepartitionAggPath(PlannerInfo *root, Path *originalPath)
{
AggPath *aggPath = NULL;
DistributedUnionPath *collect = NULL;
/*
* Match path:
* +-----------+
* | Aggergate |
* +-----------+
* |
* +---------+
* | Collect |
* +---------+
*
* Where the Aggregate is NOT grouped on any of the distribution attributes of the
* collect node.
*/
IfPathMatch(
originalPath,
MatchAgg(
&aggPath,
SkipReadThrough(
NoCapture,
MatchDistributedUnion(
&collect,
MatchAny))))
{
if (GroupClauseContainsDistributionAttribute(root, aggPath,
collect->distributionAttrs))
{
return NIL;
}
Var *targetDistAttr = FirstGroupClauseVarExpr(root, aggPath);
if (targetDistAttr == NULL)
{
return false;
}
/*
* Create new path
* +---------+
* | Collect |
* +---------+
* |
* +-----------+
* | Aggregate |
* +-----------+
* |
* +-------------+
* | Repartition |
* +-------------+
*/
AggPath *newPath = makeNode(AggPath);
*newPath = *aggPath;
/* repartition to a virtual colocation group */
newPath->subpath = CreateRepartitionNode(-1, collect->worker_path);
/*
* Subtract the overhead of the original collect node from the generated agg. This
* approximates the cost of the aggregate to be run on the workers.
*/
CitusOperationCosts costOverhead = { 0 };
AddCollectCostOverhead(&costOverhead, collect->worker_path);
newPath->path.startup_cost -= costOverhead.startup_cost;
newPath->path.total_cost -= costOverhead.total_cost;
CitusOperationCosts repartitionOverhead = { 0 };
AddRepartitionCostOverhead(&repartitionOverhead, collect->worker_path);
newPath->path.startup_cost += repartitionOverhead.startup_cost;
newPath->path.total_cost += repartitionOverhead.total_cost;
/*
* TODO should we devide the actual cost by some factor as to assume aggregates
* are cheaper to push down?
*/
return list_make1(
WrapTableAccessWithDistributedUnion((Path *) newPath,
-1,
list_make1(targetDistAttr),
NULL,
-1,
collect->custom_path.custom_paths));
}
return NIL;
}