prototype of pattern matcher

moonshot/custom-path
Nils Dijk 2021-01-20 21:16:49 +01:00
parent bac16a3785
commit e3953b2459
No known key found for this signature in database
GPG Key ID: CA1177EF9434F241
2 changed files with 688 additions and 57 deletions

View File

@ -406,59 +406,87 @@ PathBasedPlannerRelationHook(PlannerInfo *root,
/* creating the target list */ /* creating the target list */
PathTarget *groupPathTarget = create_empty_pathtarget(); PathTarget *groupPathTarget = create_empty_pathtarget();
add_column_to_pathtarget(groupPathTarget, (Expr *) makeVar(1,1,23,-1, 0,0), 0); int numAggs = 0;
Expr *expr = NULL;
/* any_value on osm_id */ foreach_ptr(expr, relOptInfo->reltarget->exprs)
Aggref *aggref = makeNode(Aggref);
aggref->aggfnoid = 18333; /* any_value */
aggref->aggtype = 20;
aggref->aggtranstype = 20;
aggref->aggfilter = NULL;
aggref->aggstar = false;
aggref->aggvariadic = false;
aggref->aggkind = AGGKIND_NORMAL;
aggref->aggsplit = AGGSPLIT_SIMPLE;
aggref->location = 0;
aggref->args = list_make1(
makeTargetEntry(
(Expr *) makeVar(1, 2, 20, -1, 0, 0),
1, NULL, false)); /* osm_id */
struct TargetEntry *argTLE = NULL;
foreach_ptr(argTLE, aggref->args)
{ {
aggref->aggargtypes = lappend_oid(aggref->aggargtypes, if (!IsA(expr, Var))
exprType((Node *) argTLE->expr)); {
} continue;
add_column_to_pathtarget(groupPathTarget, (Expr *) aggref, 0); }
Var *var = castNode(Var, expr);
/* ST_Union(way) */ switch (var->varattno)
Aggref *aggref2 = makeNode(Aggref); {
aggref2->aggfnoid = 16861; /* ST_Union */ case 1: /* k */
aggref2->aggtype = 16390; {
aggref2->aggtranstype = 2281; /* transparently add grouping keys */
aggref2->aggfilter = NULL; add_column_to_pathtarget(groupPathTarget, expr, 0);
aggref2->aggstar = false;
aggref2->aggvariadic = false;
aggref2->aggkind = AGGKIND_NORMAL;
aggref2->aggsplit = AGGSPLIT_SIMPLE;
aggref2->location = 0;
aggref2->args = list_make1( break;
makeTargetEntry( }
(Expr *) makeVar(1, 3, 16390, -1, 0, 0),
1, NULL, false)); /* way */ case 2: /* osm_id */
argTLE = NULL; {
foreach_ptr(argTLE, aggref2->args) /* wrapping non partitioned columns and non-primary keys in any_value */
{ Aggref *aggref = makeNode(Aggref);
aggref2->aggargtypes = lappend_oid(aggref2->aggargtypes, aggref->aggfnoid = 18333; /* any_value */
exprType((Node *) argTLE->expr)); aggref->aggtype = var->vartype;
aggref->aggtranstype = var->vartype;
aggref->aggfilter = NULL;
aggref->aggstar = false;
aggref->aggvariadic = false;
aggref->aggkind = AGGKIND_NORMAL;
aggref->aggsplit = AGGSPLIT_SIMPLE;
aggref->location = 0;
aggref->args = list_make1(
makeTargetEntry((Expr *) var, 1, NULL, false));
TargetEntry *argTLE = NULL;
foreach_ptr(argTLE, aggref->args)
{
aggref->aggargtypes =
lappend_oid(aggref->aggargtypes,
exprType((Node *) argTLE->expr));
}
add_column_to_pathtarget(groupPathTarget, (Expr *) aggref, 0);
numAggs++;
break;
}
case 3: /* way */
{
/* reconstruct partitioned values via ST_Union() */
Aggref *aggref = makeNode(Aggref);
aggref->aggfnoid = 16861; /* ST_Union */
aggref->aggtype = 16390;
aggref->aggtranstype = 2281;
aggref->aggfilter = NULL;
aggref->aggstar = false;
aggref->aggvariadic = false;
aggref->aggkind = AGGKIND_NORMAL;
aggref->aggsplit = AGGSPLIT_SIMPLE;
aggref->location = 0;
aggref->args = list_make1(makeTargetEntry(expr, 1, NULL, false));
TargetEntry *argTLE = NULL;
foreach_ptr(argTLE, aggref->args)
{
aggref->aggargtypes =
lappend_oid(aggref->aggargtypes,
exprType((Node *) argTLE->expr));
}
add_column_to_pathtarget(groupPathTarget, (Expr *) aggref, 0);
numAggs++;
}
}
} }
add_column_to_pathtarget(groupPathTarget, (Expr *) aggref2, 0);
/* TODO figure out costing for our grouping */ /* TODO figure out costing for our grouping */
AggClauseCosts costs = { AggClauseCosts costs = {
.numAggs = 2, .numAggs = numAggs,
.numOrderedAggs = 0, .numOrderedAggs = 0,
.hasNonPartial = false, .hasNonPartial = false,
.hasNonSerial = false, .hasNonSerial = false,
@ -502,9 +530,25 @@ makeGeoScanPath(Relation rel, RelOptInfo *parent, PathTarget *pathtarget, double
path->parent = parent; path->parent = parent;
PathTarget *targetCopy = create_empty_pathtarget(); PathTarget *targetCopy = create_empty_pathtarget();
add_column_to_pathtarget(targetCopy, list_nth(pathtarget->exprs, 0), 1); Expr *expr = NULL;
add_column_to_pathtarget(targetCopy, list_nth(pathtarget->exprs, 1), 0); foreach_ptr(expr, pathtarget->exprs)
add_column_to_pathtarget(targetCopy, list_nth(pathtarget->exprs, 2), 0); {
bool isPrimaryKey = false;
if (IsA(expr, Var))
{
/* TODO assume the first attribute of a relation as its PK */
Var *var = (Var *)expr;
isPrimaryKey = var->varattno == 1;
}
/*
* Geo partitioning cuts the geometry of the distibution column into pieces, they
* need to be reconstructed by grouping on the primary key. Add the primary keys
* to a grouping set with reference 1
*/
add_column_to_pathtarget(targetCopy, expr,
isPrimaryKey ? 1 : 0);
}
path->pathtarget = targetCopy; path->pathtarget = targetCopy;
path->param_info = NULL; path->param_info = NULL;
@ -671,7 +715,7 @@ GetFunctionNameData(Oid funcid)
static bool static bool
IsSTExpandExpression(Expr *expr, float8 *distance) MatchSTExpandExpression(Expr *expr, float8 *distance)
{ {
if (!IsA(expr, FuncExpr)) if (!IsA(expr, FuncExpr))
{ {
@ -746,7 +790,7 @@ IsGeoOverlapJoin(Expr *expr)
} }
float8 distance = 0; float8 distance = 0;
if (IsA(leftArg, Var) && IsSTExpandExpression(rightArg, &distance)) if (IsA(leftArg, Var) && MatchSTExpandExpression(rightArg, &distance))
{ {
return false; return false;
} }
@ -754,20 +798,264 @@ IsGeoOverlapJoin(Expr *expr)
return true; return true;
} }
typedef struct GeoJoinPathMatch
{
RestrictInfo *stdwithRestrictInfo;
Const *stdwithinDistanceConst;
double stdwithinDistance;
AggPath *innerGrouping;
DistributedUnionPath *innerDistUnion;
GeoScanPath *innerPath;
AggPath *outerGrouping;
DistributedUnionPath *outerDistUnion;
GeoScanPath *outerPath;
} GeoJoinPathMatch;
static Path *
SkipTransparentPaths(Path *path)
{
switch (path->type)
{
case T_MaterialPath:
{
return SkipTransparentPaths(castNode(MaterialPath, path)->subpath);
}
default:
{
return path;
}
}
}
static bool
MatchGeoScan(Path *path,
AggPath **matchedGrouping,
DistributedUnionPath **matchedDistUnion,
GeoScanPath **matchedPath)
{
/* skip transparent paths */
path = SkipTransparentPaths(path);
if (EnableGeoPartitioningGrouping)
{
if (!IsA(path, AggPath))
{
return false;
}
AggPath *aggPath = castNode(AggPath, path);
if (matchedGrouping)
{
*matchedGrouping = aggPath;
}
path = aggPath->subpath;
}
DistributedUnionPath *distUnion = NULL;
if (!IsDistributedUnion(path, true, &distUnion))
{
return false;
}
if (matchedDistUnion)
{
*matchedDistUnion = distUnion;
}
path = distUnion->worker_path;
if (!IsGeoScanPath(castNode(CustomPath, path)))
{
return false;
}
GeoScanPath *geoPath = (GeoScanPath *) path;
if (matchedPath)
{
/* capture the matched path */
*matchedPath = geoPath;
}
return true;
}
static bool
MatchSTDWithinRestrictInfo(RestrictInfo *restrictInfo,
RestrictInfo **stdwithinQual,
double *stdwithinDistance)
{
if (!IsA(restrictInfo->clause, OpExpr))
{
return false;
}
OpExpr *opexpr = castNode(OpExpr, restrictInfo->clause);
/* match for a (geometry && geometry) expression */
if (list_length(opexpr->args) != 2)
{
/* not exactly 2 arguments, no need for further checks */
return false;
}
NameData opexprNameData = GetFunctionNameData(opexpr->opfuncid);
if (strcmp(NameStr(opexprNameData), "geometry_overlaps") != 0)
{
return false;
}
Expr *arg1 = linitial(opexpr->args);
Expr *arg2 = lsecond(opexpr->args);
/* match (geometry && st_expand(geometry, distance)) or (st_expand(geometry, distance) && var)*/
if (!(
(IsA(arg1, Var) && MatchSTExpandExpression(arg2, stdwithinDistance)) ||
(MatchSTExpandExpression(arg1, stdwithinDistance) && IsA(arg2, Var))
))
{
return false;
}
/* matched */
if (stdwithinQual)
{
*stdwithinQual = restrictInfo;
}
return true;
}
static bool
MatchSTDWithinJoin(List *restrictionInfos, RestrictInfo **stdwithinQual,
double *stdwithinDistance)
{
RestrictInfo *restrictInfo = NULL;
foreach_ptr(restrictInfo, restrictionInfos)
{
Assert(IsA(restrictInfo, RestrictInfo));
if (MatchSTDWithinRestrictInfo(restrictInfo, stdwithinQual, stdwithinDistance))
{
return true;
}
}
return false;
}
static bool
MathGeoJoinPath(JoinPath *path, GeoJoinPathMatch *match)
{
/*
* Tests are performed in fastest test to slowest test to have quick escapes when we
* don't match.
*/
if (!MatchGeoScan(path->innerjoinpath,
match ? &match->innerGrouping : NULL,
match ? &match->innerDistUnion : NULL,
match ? &match->innerPath : NULL))
{
/* innerjoinpath is not a geo scan */
return false;
}
if (!MatchGeoScan(path->outerjoinpath,
match ? &match->outerGrouping : NULL,
match ? &match->outerDistUnion : NULL,
match ? &match->outerPath : NULL))
{
/* outerjoinpath is not a geo scan */
return false;
}
/* verify this is an innerjoin on ST_DWithin */
if (path->jointype == JOIN_INNER
&& !MatchSTDWithinJoin(path->joinrestrictinfo,
match ? &match->stdwithRestrictInfo : NULL,
match ? &match->stdwithinDistance : NULL))
{
/* not a distance join */
return false;
}
/* all tests matched and if `match` was not NULL the value's have been captured */
return true;
}
#include "distributed/planner/pattern_match.h"
static List * static List *
GeoOverlapJoin(PlannerInfo *root, Path *originalPath) GeoOverlapJoin(PlannerInfo *root, Path *originalPath)
{ {
JoinPath *joinPath = (JoinPath *) originalPath; JoinPath *joinPath = (JoinPath *) originalPath;
GeoJoinPathMatch match = { 0 };
GeoJoinPathMatch match2 = { 0 };
RestrictInfo *rinfo = NULL; if (!MathGeoJoinPath(joinPath, &match))
foreach_ptr(rinfo, joinPath->joinrestrictinfo)
{ {
if (IsGeoOverlapJoin(rinfo->clause)) /* no match */
{ return NIL;
}
} }
ereport(DEBUG1, (errmsg("matched pattern for geooverlap")));
IfPathMatch(
joinPath,
MatchJoin(
JOIN_INNER,
MatchJoinRestrictions(
MatchExprNamedOperation(
geometry_overlaps,
MatchVar(),
MatchExprNamedFunction(
st_expand,
MatchVar(),
CaptureMatch(
&match2.stdwithinDistanceConst,
MatchConst(MatchConstType(FLOAT8OID))
))
)),
SkipReadthrough(
CaptureMatch(
&match2.innerGrouping,
MatchGrouping(
CaptureMatch(
&match2.innerDistUnion,
MatchDistributedUnion(
CaptureMatch(
&match2.innerPath,
MatchGeoScan
)))
))
),
SkipReadthrough(
CaptureMatch(
&match2.outerGrouping,
MatchGrouping(
CaptureMatch(
&match2.outerDistUnion,
MatchDistributedUnion(
CaptureMatch(
&match2.innerPath,
MatchGeoScan
)))
))
)
)
)
{
ereport(DEBUG1, (errmsg("my custom code %p: %f",
match2.innerGrouping,
DatumGetFloat8(match2.stdwithinDistanceConst->constvalue)
)));
}
/* have a match on the geo join pattern, all fields are stored in `match` */
return NIL; return NIL;
} }

View File

@ -0,0 +1,343 @@
//
// Created by Nils Dijk on 20/01/2021.
//
#ifndef CITUS_PATTERN_MATCH_H
#define CITUS_PATTERN_MATCH_H
#include "nodes/plannodes.h"
#define GET_ARG_COUNT(...) INTERNAL_GET_ARG_COUNT_PRIVATE(0, ## __VA_ARGS__, 70, 69, 68, 67, 66, 65, 64, 63, 62, 61, 60, 59, 58, 57, 56, 55, 54, 53, 52, 51, 50, 49, 48, 47, 46, 45, 44, 43, 42, 41, 40, 39, 38, 37, 36, 35, 34, 33, 32, 31, 30, 29, 28, 27, 26, 25, 24, 23, 22, 21, 20, 19, 18, 17, 16, 15, 14, 13, 12, 11, 10, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0)
#define INTERNAL_GET_ARG_COUNT_PRIVATE(_0, _1_, _2_, _3_, _4_, _5_, _6_, _7_, _8_, _9_, _10_, _11_, _12_, _13_, _14_, _15_, _16_, _17_, _18_, _19_, _20_, _21_, _22_, _23_, _24_, _25_, _26_, _27_, _28_, _29_, _30_, _31_, _32_, _33_, _34_, _35_, _36, _37, _38, _39, _40, _41, _42, _43, _44, _45, _46, _47, _48, _49, _50, _51, _52, _53, _54, _55, _56, _57, _58, _59, _60, _61, _62, _63, _64, _65, _66, _67, _68, _69, _70, count, ...) count
#define CONCATENATE(arg1, arg2) CONCATENATE1(arg1, arg2)
#define CONCATENATE1(arg1, arg2) CONCATENATE2(arg1, arg2)
#define CONCATENATE2(arg1, arg2) arg1##arg2
#define FOR_EACH_0(what, index) (void);
#define FOR_EACH_1(what, index, x) what((index), x);
#define FOR_EACH_2(what, index, x, ...) what((index), x); FOR_EACH_1(what, index+1, __VA_ARGS__);
#define FOR_EACH_3(what, index, x, ...) what((index), x); FOR_EACH_2(what, index+1, __VA_ARGS__);
#define FOR_EACH_4(what, index, x, ...) what((index), x); FOR_EACH_3(what, index+1, __VA_ARGS__);
#define FOR_EACH_5(what, index, x, ...) what((index), x); FOR_EACH_4(what, index+1, __VA_ARGS__);
#define FOR_EACH_6(what, index, x, ...) what((index), x); FOR_EACH_5(what, index+1, __VA_ARGS__);
#define FOR_EACH_7(what, index, x, ...) what((index), x); FOR_EACH_6(what, index+1, __VA_ARGS__);
#define FOR_EACH_8(what, index, x, ...) what((index), x); FOR_EACH_7(what, index+1, __VA_ARGS__);
#define FOR_EACH_NARG(...) FOR_EACH_NARG_(__VA_ARGS__, FOR_EACH_RSEQ_N())
#define FOR_EACH_NARG_(...) FOR_EACH_ARG_N(__VA_ARGS__)
#define FOR_EACH_ARG_N(_1, _2, _3, _4, _5, _6, _7, _8, N, ...) N
#define FOR_EACH_RSEQ_N() 8, 7, 6, 5, 4, 3, 2, 1, 0
#define FOR_EACH_(N, what, ...) CONCATENATE(FOR_EACH_, N)(what, 0, __VA_ARGS__)
#define FOR_EACH(what, ...) FOR_EACH_(FOR_EACH_NARG(__VA_ARGS__), what, __VA_ARGS__)
#define MatchAny { }
#define MatchFailed { break; }
#define CaptureMatch(capture, matcher) \
matcher; \
if (capture) \
{ \
*(capture) = (typeof(*(capture))) lastMatch; \
}
#define MakeStack(type, stackName, value) \
type *stackName = (type *) value; \
List *stackName##Stack = NIL;\
(void) stackName; \
(void) stackName##Stack;
#define PushStack(stackName) \
stackName##Stack = lappend(stackName##Stack, stackName)
#define PeekStack(stackName) \
((typeof(stackName)) llast(stackName##Stack))
#define PopStack(stackName) \
stackName = (typeof(stackName)) llast(stackName##Stack); \
stackName##Stack = list_delete_last(stackName##Stack)
#define VerifyStack(stackName) \
Assert(list_length(stackName##Stack) == 0)
#define SkipReadthrough(matcher) \
{ \
PushStack(pathToMatch); \
\
{ \
bool skipped = true; \
int skipCount = 0; \
while (skipped) { \
switch(pathToMatch->type) \
{ \
case T_MaterialPath: \
{ \
pathToMatch = castNode(MaterialPath, pathToMatch)->subpath; \
break; \
} \
\
default: \
{ \
skipped = false; \
break; \
} \
} \
if (skipped) \
{ \
skipCount++; \
} \
} \
ereport(DEBUG1, (errmsg("skipped %d read through nodes", skipCount))); \
} \
\
matcher; \
PopStack(pathToMatch); \
lastMatch = pathToMatch; \
}
#define MatchJoin(joinType, conditionMatcher, innerMatcher, outerMatcher) \
{ \
ereport(DEBUG1, (errmsg("initiate join matcher"))); \
{ \
bool m = false; \
switch (pathToMatch->type) \
{ \
case T_NestPath: \
case T_MergePath: \
case T_HashPath: \
{ \
m = true; \
break; \
} \
\
default: \
{ \
m = false; \
break; \
} \
} \
\
if (!m)\
{ \
MatchFailed; \
} \
} \
\
if (((JoinPath *) pathToMatch)->jointype != joinType) \
{ \
MatchFailed; \
} \
\
PushStack(pathToMatch); \
\
pathToMatch = ((JoinPath *) PeekStack(pathToMatch))->innerjoinpath; \
innerMatcher; \
pathToMatch = ((JoinPath *) PeekStack(pathToMatch))->outerjoinpath; \
outerMatcher; \
\
PopStack(pathToMatch); \
conditionMatcher; \
lastMatch = pathToMatch; \
}
#define MatchGrouping(matcher) \
{ \
if (!IsA(pathToMatch, AggPath)) \
{ \
MatchFailed; \
} \
\
PushStack(pathToMatch); \
\
pathToMatch = ((AggPath *) pathToMatch)->subpath; \
matcher;\
\
PopStack(pathToMatch); \
lastMatch = pathToMatch; \
}
#define MatchDistributedUnion(matcher) \
{ \
if (!IsDistributedUnion(pathToMatch, false, NULL)) \
{ \
MatchFailed; \
} \
\
PushStack(pathToMatch); \
pathToMatch = ((DistributedUnionPath *) pathToMatch)->worker_path; \
PopStack(pathToMatch); \
lastMatch = pathToMatch; \
}
#define MatchGeoScan \
{ \
if (!IsA(pathToMatch, CustomPath)) \
{ \
MatchFailed; \
} \
\
if (!IsGeoScanPath(castNode(CustomPath, pathToMatch))) \
{ \
MatchFailed; \
} \
\
lastMatch = pathToMatch; \
}
#define IfPathMatch(path, matcher) \
bool matched = false; \
do \
{ \
MakeStack(Path, pathToMatch, path); \
void *lastMatch = NULL; \
(void) lastMatch; \
\
ereport(DEBUG1, (errmsg("initiate matcher DSL"))); \
matcher; \
\
VerifyStack(pathToMatch); \
ereport(DEBUG1, (errmsg("pattern matched"))); \
matched = true; \
break; \
} \
while (false); \
if (matched)
#define MatchJoinRestrictions(matcher) \
{ \
Assert(IsA(pathToMatch, NestPath) \
|| IsA(pathToMatch, MergePath) \
|| IsA(pathToMatch, HashPath)); \
\
bool restrictionMatched = false; \
RestrictInfo *restrictInfo = NULL; \
foreach_ptr(restrictInfo, ((JoinPath *) pathToMatch)->joinrestrictinfo) \
{ \
do { \
MakeStack(Expr, clause, restrictInfo->clause);\
\
matcher; \
\
restrictionMatched = true; \
VerifyStack(clause); \
lastMatch = restrictInfo; \
} while(false); \
if (restrictionMatched) \
{ \
break; \
} \
} \
\
if (!restrictionMatched) \
{ \
MatchFailed; \
} \
}
#define InternalFunctionDispatch(index, matcher) \
{ \
clause = (Expr *) list_nth(((FuncExpr *) PeekStack(clause))->args, index); \
matcher; \
lastMatch = clause; \
}
#define MatchExprNamedFunction(name, ...) \
{ \
if (!IsA(clause, FuncExpr)) \
{ \
MatchFailed; \
} \
\
{ \
FuncExpr *funcexpr = castNode(FuncExpr, clause); \
if (list_length(funcexpr->args) != GET_ARG_COUNT(__VA_ARGS__)) \
{ \
MatchFailed; \
} \
\
NameData funcexprNameData = GetFunctionNameData(funcexpr->funcid); \
if (strcmp(NameStr(funcexprNameData), #name) != 0) \
{ \
MatchFailed; \
} \
} \
\
PushStack(clause); \
FOR_EACH(InternalFunctionDispatch, __VA_ARGS__); \
PopStack(clause); \
lastMatch = clause; \
}
#define InternalOperationDispatch(index, matcher) \
{ \
clause = (Expr *) list_nth(((OpExpr *) PeekStack(clause))->args, index); \
matcher; \
lastMatch = clause; \
}
#define MatchExprNamedOperation(name, ...) \
{ \
if (!IsA(clause, OpExpr)) \
{ \
MatchFailed; \
} \
\
{ \
OpExpr *opexpr = castNode(OpExpr, clause); \
if (list_length(opexpr->args) != GET_ARG_COUNT(__VA_ARGS__)) \
{ \
MatchFailed; \
} \
\
NameData opexprNameData = GetFunctionNameData(opexpr->opfuncid); \
if (strcmp(NameStr(opexprNameData), #name) != 0) \
{ \
MatchFailed; \
} \
} \
\
PushStack(clause); \
FOR_EACH(InternalOperationDispatch, __VA_ARGS__); \
PopStack(clause); \
lastMatch = clause; \
}
#define MatchVar(...) \
{ \
if (!IsA(clause, Var)) \
{ \
MatchFailed; \
} \
__VA_ARGS__; \
lastMatch = clause; \
}
#define MatchConstType(constType) \
if (!(castNode(Const, clause)->consttype == constType)) \
{ \
MatchFailed; \
}
#define MatchConst(...) \
{ \
if (!IsA(clause, Const)) \
{ \
MatchFailed; \
} \
__VA_ARGS__; \
lastMatch = clause; \
}
#endif //CITUS_PATTERN_MATCH_H