formatting and cleanup

moonshot/custom-path
Nils Dijk 2021-01-21 11:38:48 +01:00
parent e3953b2459
commit cc84b48fd1
No known key found for this signature in database
GPG Key ID: CA1177EF9434F241
1 changed files with 101 additions and 328 deletions

View File

@ -1,6 +1,6 @@
//
// Created by Nils Dijk on 17/01/2020.
//
/* */
/* Created by Nils Dijk on 17/01/2020. */
/* */
#include "postgres.h"
#include "catalog/pg_aggregate.h"
@ -28,7 +28,7 @@
#include "utils/builtins.h"
#include "utils/syscache.h"
typedef List * (*optimizeFn)(PlannerInfo *root, Path *originalPath);
typedef List *(*optimizeFn)(PlannerInfo *root, Path *originalPath);
typedef struct DistributedUnionPath
{
@ -55,10 +55,17 @@ typedef struct GeoScanPath
RangeTblEntry *rte;
} GeoScanPath;
static Plan * CreateDistributedUnionPlan(PlannerInfo *root, RelOptInfo *rel, struct CustomPath *best_path, List *tlist, List *clauses, List *custom_plans);
static List * ReparameterizeDistributedUnion(PlannerInfo *root, List *custom_private, RelOptInfo *child_rel);
static CustomPath * WrapTableAccessWithDistributedUnion(Path *originalPath, uint32 colocationId, Expr *partitionValue, Oid sampleRelid, List *subPaths);
static Query * GetQueryFromPath(PlannerInfo *root, Path *path, List *tlist, List *clauses);
static Plan * CreateDistributedUnionPlan(PlannerInfo *root, RelOptInfo *rel, struct
CustomPath *best_path, List *tlist,
List *clauses, List *custom_plans);
static List * ReparameterizeDistributedUnion(PlannerInfo *root, List *custom_private,
RelOptInfo *child_rel);
static CustomPath * WrapTableAccessWithDistributedUnion(Path *originalPath, uint32
colocationId,
Expr *partitionValue, Oid
sampleRelid, List *subPaths);
static Query * GetQueryFromPath(PlannerInfo *root, Path *path, List *tlist,
List *clauses);
static List * ShardIntervalListToRelationShardList(List *shardIntervalList);
static List * OptimizeJoinPath(PlannerInfo *root, Path *originalPath);
static List * BroadcastOuterJoinPath(PlannerInfo *root, Path *originalPath);
@ -66,20 +73,23 @@ static List * BroadcastInnerJoinPath(PlannerInfo *root, Path *originalPath);
static List * GeoOverlapJoin(PlannerInfo *root, Path *originalPath);
static Path * CreateReadIntermediateResultPath(const Path *originalPath);
static bool CanOptimizeJoinPath(const JoinPath *jpath);
static bool IsDistributedUnion(Path *path, bool recurseTransparent, DistributedUnionPath **out);
static bool IsDistributedUnion(Path *path, bool recurseTransparent,
DistributedUnionPath **out);
static Expr * ExtractPartitionValue(List *restrictionList, Var *partitionKey);
static List * ShardIntervalListForRelationPartitionValue(Oid relationId, Expr *partitionValue);
static void PathBasedPlannerGroupAgg(PlannerInfo *root, RelOptInfo *input_rel, RelOptInfo *output_rel, void *extra);
static List * ShardIntervalListForRelationPartitionValue(Oid relationId,
Expr *partitionValue);
static void PathBasedPlannerGroupAgg(PlannerInfo *root, RelOptInfo *input_rel,
RelOptInfo *output_rel, void *extra);
static Path * OptimizeGroupAgg(PlannerInfo *root, Path *originalPath);
static bool CanOptimizeAggPath(PlannerInfo *root, AggPath *apath);
static GeoScanPath *makeGeoScanPath(Relation rel, RelOptInfo *parent,
PathTarget *pathtarget, double rows);
static GeoScanPath * makeGeoScanPath(Relation rel, RelOptInfo *parent,
PathTarget *pathtarget, double rows);
static bool IsGeoScanPath(CustomPath *path);
static RangeTblEntry *makeRangeTableEntryForRelation(Relation rel,
int lockmode,
Alias *alias,
bool inh,
bool inFromCl);
static RangeTblEntry * makeRangeTableEntryForRelation(Relation rel,
int lockmode,
Alias *alias,
bool inh,
bool inFromCl);
/*
@ -93,8 +103,8 @@ bool EnableBroadcastJoin = true;
/* list of functions that will be called to optimized in the joinhook*/
static optimizeFn joinOptimizations[] = {
OptimizeJoinPath,
// BroadcastOuterJoinPath,
// BroadcastInnerJoinPath,
/* BroadcastOuterJoinPath, */
/* BroadcastInnerJoinPath, */
GeoOverlapJoin,
};
@ -110,10 +120,11 @@ const CustomPathMethods distributedUnionMethods = {
static CustomPath *
WrapTableAccessWithDistributedUnion(Path *originalPath, uint32 colocationId, Expr *partitionValue, Oid sampleRelid, List *subPaths)
WrapTableAccessWithDistributedUnion(Path *originalPath, uint32 colocationId,
Expr *partitionValue, Oid sampleRelid, List *subPaths)
{
DistributedUnionPath *distUnion = (DistributedUnionPath *)
newNode(sizeof(DistributedUnionPath), T_CustomPath);
newNode(sizeof(DistributedUnionPath), T_CustomPath);
distUnion->custom_path.path.pathtype = T_CustomScan;
distUnion->custom_path.path.parent = originalPath->parent;
@ -122,8 +133,8 @@ WrapTableAccessWithDistributedUnion(Path *originalPath, uint32 colocationId, Exp
/* TODO use a better cost model */
distUnion->custom_path.path.rows = originalPath->rows;
distUnion->custom_path.path.startup_cost = originalPath->startup_cost+1000;
distUnion->custom_path.path.total_cost = originalPath->total_cost+1000;
distUnion->custom_path.path.startup_cost = originalPath->startup_cost + 1000;
distUnion->custom_path.path.total_cost = originalPath->total_cost + 1000;
distUnion->custom_path.methods = &distributedUnionMethods;
@ -153,6 +164,7 @@ CreateDistributedUnionPlan(PlannerInfo *root,
ShardInterval *shardInterval = NULL;
Query *q = GetQueryFromPath(root, distUnion->worker_path, tlist, clauses);
/*
* Assume shards are colocated, any shard should suffice for now to find the initial
* interval list
@ -226,6 +238,7 @@ CreateDistributedUnionPlan(PlannerInfo *root,
plan->custom_plans = custom_plans;
plan->scan.plan.targetlist = tlist;
/* Reduce RestrictInfo list to bare expressions; ignore pseudoconstants */
clauses = extract_actual_clauses(clauses, false);
@ -243,6 +256,7 @@ ShardIntervalListForRelationPartitionValue(Oid relationId, Expr *partitionValue)
{
/* prune shard list to target */
Const *partitionValueConst = castNode(Const, partitionValue);
/* TODO assert the constant is of the correct value */
CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(relationId);
return list_make1(FindShardInterval(partitionValueConst->constvalue, cacheEntry));
@ -315,7 +329,8 @@ IsDistributedUnion(Path *path, bool recurseTransparent, DistributedUnionPath **o
}
CustomPath *cpath = castNode(CustomPath, path);
if (cpath->methods != &distributedUnionMethods) {
if (cpath->methods != &distributedUnionMethods)
{
return false;
}
@ -350,7 +365,8 @@ PathBasedPlannerRelationHook(PlannerInfo *root,
partitionKey->varno = bms_next_member(relOptInfo->relids, -1);
Assert(bms_num_members(relOptInfo->relids) == 1);
partitionValue = ExtractPartitionValue(relOptInfo->baserestrictinfo, partitionKey);
partitionValue = ExtractPartitionValue(relOptInfo->baserestrictinfo,
partitionKey);
}
/* wrap every path with a distributed union custom path */
@ -369,7 +385,8 @@ PathBasedPlannerRelationHook(PlannerInfo *root,
/* hardcoded hack for adding geo distributed tables as an alternative path */
Relation rel = relation_open(rte->relid, AccessShareLock);
if (UseGeoPartitioning && strcmp(RelationGetRelationName(rel), "belgium_planet_osm_roads_dist") == 0)
if (UseGeoPartitioning && strcmp(RelationGetRelationName(rel),
"belgium_planet_osm_roads_dist") == 0)
{
if (OnlyGeoPartitioning)
{
@ -386,11 +403,11 @@ PathBasedPlannerRelationHook(PlannerInfo *root,
relOptInfo->reltarget,
relOptInfo->rows);
geoPath = (Path *)
WrapTableAccessWithDistributedUnion(geoPath,
TableColocationId(geoRelid),
NULL,
geoRelid,
NIL);
WrapTableAccessWithDistributedUnion(geoPath,
TableColocationId(geoRelid),
NULL,
geoRelid,
NIL);
if (EnableGeoPartitioningGrouping)
{
@ -537,7 +554,7 @@ makeGeoScanPath(Relation rel, RelOptInfo *parent, PathTarget *pathtarget, double
if (IsA(expr, Var))
{
/* TODO assume the first attribute of a relation as its PK */
Var *var = (Var *)expr;
Var *var = (Var *) expr;
isPrimaryKey = var->varattno == 1;
}
@ -559,7 +576,8 @@ makeGeoScanPath(Relation rel, RelOptInfo *parent, PathTarget *pathtarget, double
cpath->methods = &geoScanMethods;
geoPath->rte = makeRangeTableEntryForRelation(rel, AccessShareLock, NULL, false, true);
geoPath->rte = makeRangeTableEntryForRelation(rel, AccessShareLock, NULL, false,
true);
return geoPath;
}
@ -580,7 +598,7 @@ makeRangeTableEntryForRelation(Relation rel,
bool inFromCl)
{
RangeTblEntry *rte = makeNode(RangeTblEntry);
char *refname = alias ? alias->aliasname : RelationGetRelationName(rel);
char *refname = alias ? alias->aliasname : RelationGetRelationName(rel);
Assert(lockmode == AccessShareLock ||
lockmode == RowShareLock ||
@ -613,7 +631,7 @@ makeRangeTableEntryForRelation(Relation rel,
rte->inFromCl = inFromCl;
rte->requiredPerms = ACL_SELECT;
rte->checkAsUser = InvalidOid; /* not set-uid by default, either */
rte->checkAsUser = InvalidOid; /* not set-uid by default, either */
rte->selectedCols = NULL;
rte->insertedCols = NULL;
rte->updatedCols = NULL;
@ -694,15 +712,18 @@ CanOptimizeJoinPath(const JoinPath *jpath)
return true;
}
static NameData
GetFunctionNameData(Oid funcid)
{
HeapTuple proctup = NULL;
HeapTuple proctup = NULL;
Form_pg_proc procform = NULL;
proctup = SearchSysCache1(PROCOID, ObjectIdGetDatum(funcid));
if (!HeapTupleIsValid(proctup))
{
elog(ERROR, "cache lookup failed for function %u", funcid);
}
procform = (Form_pg_proc) GETSTRUCT(proctup);
/* copy name by value */
@ -714,96 +735,9 @@ GetFunctionNameData(Oid funcid)
}
static bool
MatchSTExpandExpression(Expr *expr, float8 *distance)
{
if (!IsA(expr, FuncExpr))
{
return false;
}
FuncExpr *funcexpr = castNode(FuncExpr, expr);
NameData funcNameData = GetFunctionNameData(funcexpr->funcid);
if (strcmp(NameStr(funcNameData), "st_expand") != 0)
{
/* expected an expansion of the geometry */
return false;
}
if (list_length(funcexpr->args) != 2)
{
/* expected 2 arguments */
return false;
}
if (distance)
{
Const *distanceConst = lsecond_node(Const, funcexpr->args);
Assert(distanceConst->consttype == FLOAT8OID);
*distance = DatumGetFloat8(distanceConst->constvalue);
}
return true;
}
static bool
IsGeoOverlapJoin(Expr *expr)
{
/* find a geometry_overlaps expression */
if (!IsA(expr, OpExpr))
{
return false;
}
OpExpr *opexpr = castNode(OpExpr, expr);
if (!OidIsValid(opexpr->opfuncid))
{
return false;
}
/* check the name of the operation */
NameData opname = GetFunctionNameData(opexpr->opfuncid);
if (strcmp(NameStr(opname), "geometry_overlaps") != 0)
{
return false;
}
/* we expect exactly 2 arguments */
if (list_length(opexpr->args) != 2)
{
return false;
}
Expr *leftArg = linitial(opexpr->args);
Expr *rightArg = lsecond(opexpr->args);
if (IsA(rightArg, Var) && IsA(leftArg, FuncExpr))
{
/* swap the args around to work on them in expected fashion */
Expr *tmp = leftArg;
leftArg = rightArg;
rightArg = tmp;
}
float8 distance = 0;
if (IsA(leftArg, Var) && MatchSTExpandExpression(rightArg, &distance))
{
return false;
}
return true;
}
typedef struct GeoJoinPathMatch
{
RestrictInfo *stdwithRestrictInfo;
Const *stdwithinDistanceConst;
double stdwithinDistance;
AggPath *innerGrouping;
DistributedUnionPath *innerDistUnion;
@ -833,178 +767,14 @@ SkipTransparentPaths(Path *path)
}
static bool
MatchGeoScan(Path *path,
AggPath **matchedGrouping,
DistributedUnionPath **matchedDistUnion,
GeoScanPath **matchedPath)
{
/* skip transparent paths */
path = SkipTransparentPaths(path);
if (EnableGeoPartitioningGrouping)
{
if (!IsA(path, AggPath))
{
return false;
}
AggPath *aggPath = castNode(AggPath, path);
if (matchedGrouping)
{
*matchedGrouping = aggPath;
}
path = aggPath->subpath;
}
DistributedUnionPath *distUnion = NULL;
if (!IsDistributedUnion(path, true, &distUnion))
{
return false;
}
if (matchedDistUnion)
{
*matchedDistUnion = distUnion;
}
path = distUnion->worker_path;
if (!IsGeoScanPath(castNode(CustomPath, path)))
{
return false;
}
GeoScanPath *geoPath = (GeoScanPath *) path;
if (matchedPath)
{
/* capture the matched path */
*matchedPath = geoPath;
}
return true;
}
static bool
MatchSTDWithinRestrictInfo(RestrictInfo *restrictInfo,
RestrictInfo **stdwithinQual,
double *stdwithinDistance)
{
if (!IsA(restrictInfo->clause, OpExpr))
{
return false;
}
OpExpr *opexpr = castNode(OpExpr, restrictInfo->clause);
/* match for a (geometry && geometry) expression */
if (list_length(opexpr->args) != 2)
{
/* not exactly 2 arguments, no need for further checks */
return false;
}
NameData opexprNameData = GetFunctionNameData(opexpr->opfuncid);
if (strcmp(NameStr(opexprNameData), "geometry_overlaps") != 0)
{
return false;
}
Expr *arg1 = linitial(opexpr->args);
Expr *arg2 = lsecond(opexpr->args);
/* match (geometry && st_expand(geometry, distance)) or (st_expand(geometry, distance) && var)*/
if (!(
(IsA(arg1, Var) && MatchSTExpandExpression(arg2, stdwithinDistance)) ||
(MatchSTExpandExpression(arg1, stdwithinDistance) && IsA(arg2, Var))
))
{
return false;
}
/* matched */
if (stdwithinQual)
{
*stdwithinQual = restrictInfo;
}
return true;
}
static bool
MatchSTDWithinJoin(List *restrictionInfos, RestrictInfo **stdwithinQual,
double *stdwithinDistance)
{
RestrictInfo *restrictInfo = NULL;
foreach_ptr(restrictInfo, restrictionInfos)
{
Assert(IsA(restrictInfo, RestrictInfo));
if (MatchSTDWithinRestrictInfo(restrictInfo, stdwithinQual, stdwithinDistance))
{
return true;
}
}
return false;
}
static bool
MathGeoJoinPath(JoinPath *path, GeoJoinPathMatch *match)
{
/*
* Tests are performed in fastest test to slowest test to have quick escapes when we
* don't match.
*/
if (!MatchGeoScan(path->innerjoinpath,
match ? &match->innerGrouping : NULL,
match ? &match->innerDistUnion : NULL,
match ? &match->innerPath : NULL))
{
/* innerjoinpath is not a geo scan */
return false;
}
if (!MatchGeoScan(path->outerjoinpath,
match ? &match->outerGrouping : NULL,
match ? &match->outerDistUnion : NULL,
match ? &match->outerPath : NULL))
{
/* outerjoinpath is not a geo scan */
return false;
}
/* verify this is an innerjoin on ST_DWithin */
if (path->jointype == JOIN_INNER
&& !MatchSTDWithinJoin(path->joinrestrictinfo,
match ? &match->stdwithRestrictInfo : NULL,
match ? &match->stdwithinDistance : NULL))
{
/* not a distance join */
return false;
}
/* all tests matched and if `match` was not NULL the value's have been captured */
return true;
}
#include "distributed/planner/pattern_match.h"
static List *
GeoOverlapJoin(PlannerInfo *root, Path *originalPath)
{
JoinPath *joinPath = (JoinPath *) originalPath;
GeoJoinPathMatch match = { 0 };
GeoJoinPathMatch match2 = { 0 };
if (!MathGeoJoinPath(joinPath, &match))
{
/* no match */
return NIL;
}
ereport(DEBUG1, (errmsg("matched pattern for geooverlap")));
IfPathMatch(
joinPath,
originalPath,
MatchJoin(
JOIN_INNER,
MatchJoinRestrictions(
@ -1015,46 +785,41 @@ GeoOverlapJoin(PlannerInfo *root, Path *originalPath)
st_expand,
MatchVar(),
CaptureMatch(
&match2.stdwithinDistanceConst,
&match.stdwithinDistanceConst,
MatchConst(MatchConstType(FLOAT8OID))
))
)),
SkipReadthrough(
CaptureMatch(
&match2.innerGrouping,
MatchGrouping(
CaptureMatch(
&match2.innerDistUnion,
MatchDistributedUnion(
CaptureMatch(
&match2.innerPath,
MatchGeoScan
)))
))
SkipReadthrough(CaptureMatch(
&match.innerGrouping,
MatchGrouping(CaptureMatch(
&match.innerDistUnion,
MatchDistributedUnion(CaptureMatch(
&match.innerPath,
MatchGeoScan
)))
))
),
SkipReadthrough(
CaptureMatch(
&match2.outerGrouping,
MatchGrouping(
CaptureMatch(
&match2.outerDistUnion,
MatchDistributedUnion(
CaptureMatch(
&match2.innerPath,
MatchGeoScan
)))
))
SkipReadthrough(CaptureMatch(
&match.outerGrouping,
MatchGrouping(CaptureMatch(
&match.outerDistUnion,
MatchDistributedUnion(CaptureMatch(
&match.outerPath,
MatchGeoScan
)))
))
)
)
)
{
/* have a match on the geo join pattern, all fields are stored in `match` */
ereport(DEBUG1, (errmsg("my custom code %p: %f",
match2.innerGrouping,
DatumGetFloat8(match2.stdwithinDistanceConst->constvalue)
)));
match.innerGrouping,
DatumGetFloat8(match.stdwithinDistanceConst->constvalue)
)));
}
/* have a match on the geo join pattern, all fields are stored in `match` */
(void) &match;
return NIL;
}
@ -1072,7 +837,8 @@ OptimizeJoinPath(PlannerInfo *root, Path *originalPath)
if (jpath->jointype == JOIN_INNER && CanOptimizeJoinPath(jpath))
{
/* we can only optimize the Distributed union if the colocationId's are the same, taking any would suffice */
DistributedUnionPath *baseDistUnion = (DistributedUnionPath *) jpath->innerjoinpath;
DistributedUnionPath *baseDistUnion =
(DistributedUnionPath *) jpath->innerjoinpath;
/*
* Shallow copy of any join node, this does not imply executing a nested
@ -1083,8 +849,10 @@ OptimizeJoinPath(PlannerInfo *root, Path *originalPath)
*jcpath = *jpath;
jcpath->path.type = T_NestPath;
jcpath->innerjoinpath = ((DistributedUnionPath *) jpath->innerjoinpath)->worker_path;
jcpath->outerjoinpath = ((DistributedUnionPath *) jpath->outerjoinpath)->worker_path;
jcpath->innerjoinpath =
((DistributedUnionPath *) jpath->innerjoinpath)->worker_path;
jcpath->outerjoinpath =
((DistributedUnionPath *) jpath->outerjoinpath)->worker_path;
/* TODO update costs of hashjoin, very naife removal of DU cost for now */
jcpath->path.startup_cost -= 2000; /* remove the double dist union cost */
@ -1128,7 +896,8 @@ BroadcastOuterJoinPath(PlannerInfo *root, Path *originalPath)
if (IsDistributedUnion(jpath->outerjoinpath, false, NULL))
{
/* broadcast inner join path */
DistributedUnionPath *baseDistUnion = (DistributedUnionPath *) jpath->outerjoinpath;
DistributedUnionPath *baseDistUnion =
(DistributedUnionPath *) jpath->outerjoinpath;
/*
* Shallow copy of any join node, this does not imply executing a nested
@ -1186,7 +955,8 @@ BroadcastInnerJoinPath(PlannerInfo *root, Path *originalPath)
if (IsDistributedUnion(jpath->innerjoinpath, false, NULL))
{
/* broadcast inner join path */
DistributedUnionPath *baseDistUnion = (DistributedUnionPath *) jpath->innerjoinpath;
DistributedUnionPath *baseDistUnion =
(DistributedUnionPath *) jpath->innerjoinpath;
/*
* Shallow copy of any join node, this does not imply executing a nested
@ -1261,7 +1031,7 @@ PathBasedPlannerJoinHook(PlannerInfo *root,
foreach(pathCell, joinrel->pathlist)
{
Path *originalPath = lfirst(pathCell);
for (int i=0; i < sizeof(joinOptimizations)/sizeof(joinOptimizations[1]); i++)
for (int i = 0; i < sizeof(joinOptimizations) / sizeof(joinOptimizations[1]); i++)
{
List *alternativePaths = joinOptimizations[i](root, originalPath);
newPaths = list_concat(newPaths, alternativePaths);
@ -1275,6 +1045,7 @@ PathBasedPlannerJoinHook(PlannerInfo *root,
}
}
/*
* varno_mapping is an array where the index is the varno in the original query, or 0 if
* no mapping is required.
@ -1306,12 +1077,12 @@ VarNoMutator(Node *expr, Index *varno_mapping)
var->vartypmod,
var->varcollid,
var->varlevelsup
);
);
}
default:
{
return expression_tree_mutator(expr, (void*) VarNoMutator, varno_mapping);
return expression_tree_mutator(expr, (void *) VarNoMutator, varno_mapping);
}
}
}
@ -1335,6 +1106,7 @@ ApplyPathToQuery(PlannerInfo *root, Query *query, Path *path, PathQueryInfo *inf
case T_Agg:
{
AggPath *apath = castNode(AggPath, path);
/* the subpath needs to be applied before we can apply the grouping clause */
ApplyPathToQuery(root, query, apath->subpath, info);
@ -1514,7 +1286,7 @@ ApplyPathToQuery(PlannerInfo *root, Query *query, Path *path, PathQueryInfo *inf
if (rteIndex == 0)
{
RangeTblEntry* rte = geoPath->rte;
RangeTblEntry *rte = geoPath->rte;
query->rtable = lappend(query->rtable, rte);
rteIndex = list_length(query->rtable);
info->varno_mapping[scan_relid] = rteIndex;
@ -1536,7 +1308,7 @@ ApplyPathToQuery(PlannerInfo *root, Query *query, Path *path, PathQueryInfo *inf
ereport(ERROR, (errmsg("unknown path type in worker query"),
errdetail("cannot turn worker path into query due to unknown "
"path type in plan. pathtype: %d", path->pathtype))
);
);
}
}
}
@ -1694,6 +1466,7 @@ CanOptimizeAggPath(PlannerInfo *root, AggPath *apath)
}
SortGroupClause *sgc = NULL;
/*
* TODO verify whats the purpose of the list, if we find any of the distribution
* colums somewhere in this we optimize, might be wrong