diff --git a/src/backend/distributed/planner/path_based_planner.c b/src/backend/distributed/planner/path_based_planner.c index f38d46011..8c4bc00ed 100644 --- a/src/backend/distributed/planner/path_based_planner.c +++ b/src/backend/distributed/planner/path_based_planner.c @@ -1,6 +1,6 @@ -// -// Created by Nils Dijk on 17/01/2020. -// +/* */ +/* Created by Nils Dijk on 17/01/2020. */ +/* */ #include "postgres.h" #include "catalog/pg_aggregate.h" @@ -28,7 +28,7 @@ #include "utils/builtins.h" #include "utils/syscache.h" -typedef List * (*optimizeFn)(PlannerInfo *root, Path *originalPath); +typedef List *(*optimizeFn)(PlannerInfo *root, Path *originalPath); typedef struct DistributedUnionPath { @@ -55,10 +55,17 @@ typedef struct GeoScanPath RangeTblEntry *rte; } GeoScanPath; -static Plan * CreateDistributedUnionPlan(PlannerInfo *root, RelOptInfo *rel, struct CustomPath *best_path, List *tlist, List *clauses, List *custom_plans); -static List * ReparameterizeDistributedUnion(PlannerInfo *root, List *custom_private, RelOptInfo *child_rel); -static CustomPath * WrapTableAccessWithDistributedUnion(Path *originalPath, uint32 colocationId, Expr *partitionValue, Oid sampleRelid, List *subPaths); -static Query * GetQueryFromPath(PlannerInfo *root, Path *path, List *tlist, List *clauses); +static Plan * CreateDistributedUnionPlan(PlannerInfo *root, RelOptInfo *rel, struct + CustomPath *best_path, List *tlist, + List *clauses, List *custom_plans); +static List * ReparameterizeDistributedUnion(PlannerInfo *root, List *custom_private, + RelOptInfo *child_rel); +static CustomPath * WrapTableAccessWithDistributedUnion(Path *originalPath, uint32 + colocationId, + Expr *partitionValue, Oid + sampleRelid, List *subPaths); +static Query * GetQueryFromPath(PlannerInfo *root, Path *path, List *tlist, + List *clauses); static List * ShardIntervalListToRelationShardList(List *shardIntervalList); static List * OptimizeJoinPath(PlannerInfo *root, Path *originalPath); static List * BroadcastOuterJoinPath(PlannerInfo *root, Path *originalPath); @@ -66,20 +73,23 @@ static List * BroadcastInnerJoinPath(PlannerInfo *root, Path *originalPath); static List * GeoOverlapJoin(PlannerInfo *root, Path *originalPath); static Path * CreateReadIntermediateResultPath(const Path *originalPath); static bool CanOptimizeJoinPath(const JoinPath *jpath); -static bool IsDistributedUnion(Path *path, bool recurseTransparent, DistributedUnionPath **out); +static bool IsDistributedUnion(Path *path, bool recurseTransparent, + DistributedUnionPath **out); static Expr * ExtractPartitionValue(List *restrictionList, Var *partitionKey); -static List * ShardIntervalListForRelationPartitionValue(Oid relationId, Expr *partitionValue); -static void PathBasedPlannerGroupAgg(PlannerInfo *root, RelOptInfo *input_rel, RelOptInfo *output_rel, void *extra); +static List * ShardIntervalListForRelationPartitionValue(Oid relationId, + Expr *partitionValue); +static void PathBasedPlannerGroupAgg(PlannerInfo *root, RelOptInfo *input_rel, + RelOptInfo *output_rel, void *extra); static Path * OptimizeGroupAgg(PlannerInfo *root, Path *originalPath); static bool CanOptimizeAggPath(PlannerInfo *root, AggPath *apath); -static GeoScanPath *makeGeoScanPath(Relation rel, RelOptInfo *parent, - PathTarget *pathtarget, double rows); +static GeoScanPath * makeGeoScanPath(Relation rel, RelOptInfo *parent, + PathTarget *pathtarget, double rows); static bool IsGeoScanPath(CustomPath *path); -static RangeTblEntry *makeRangeTableEntryForRelation(Relation rel, - int lockmode, - Alias *alias, - bool inh, - bool inFromCl); +static RangeTblEntry * makeRangeTableEntryForRelation(Relation rel, + int lockmode, + Alias *alias, + bool inh, + bool inFromCl); /* @@ -93,8 +103,8 @@ bool EnableBroadcastJoin = true; /* list of functions that will be called to optimized in the joinhook*/ static optimizeFn joinOptimizations[] = { OptimizeJoinPath, -// BroadcastOuterJoinPath, -// BroadcastInnerJoinPath, +/* BroadcastOuterJoinPath, */ +/* BroadcastInnerJoinPath, */ GeoOverlapJoin, }; @@ -110,10 +120,11 @@ const CustomPathMethods distributedUnionMethods = { static CustomPath * -WrapTableAccessWithDistributedUnion(Path *originalPath, uint32 colocationId, Expr *partitionValue, Oid sampleRelid, List *subPaths) +WrapTableAccessWithDistributedUnion(Path *originalPath, uint32 colocationId, + Expr *partitionValue, Oid sampleRelid, List *subPaths) { DistributedUnionPath *distUnion = (DistributedUnionPath *) - newNode(sizeof(DistributedUnionPath), T_CustomPath); + newNode(sizeof(DistributedUnionPath), T_CustomPath); distUnion->custom_path.path.pathtype = T_CustomScan; distUnion->custom_path.path.parent = originalPath->parent; @@ -122,8 +133,8 @@ WrapTableAccessWithDistributedUnion(Path *originalPath, uint32 colocationId, Exp /* TODO use a better cost model */ distUnion->custom_path.path.rows = originalPath->rows; - distUnion->custom_path.path.startup_cost = originalPath->startup_cost+1000; - distUnion->custom_path.path.total_cost = originalPath->total_cost+1000; + distUnion->custom_path.path.startup_cost = originalPath->startup_cost + 1000; + distUnion->custom_path.path.total_cost = originalPath->total_cost + 1000; distUnion->custom_path.methods = &distributedUnionMethods; @@ -153,6 +164,7 @@ CreateDistributedUnionPlan(PlannerInfo *root, ShardInterval *shardInterval = NULL; Query *q = GetQueryFromPath(root, distUnion->worker_path, tlist, clauses); + /* * Assume shards are colocated, any shard should suffice for now to find the initial * interval list @@ -226,6 +238,7 @@ CreateDistributedUnionPlan(PlannerInfo *root, plan->custom_plans = custom_plans; plan->scan.plan.targetlist = tlist; + /* Reduce RestrictInfo list to bare expressions; ignore pseudoconstants */ clauses = extract_actual_clauses(clauses, false); @@ -243,6 +256,7 @@ ShardIntervalListForRelationPartitionValue(Oid relationId, Expr *partitionValue) { /* prune shard list to target */ Const *partitionValueConst = castNode(Const, partitionValue); + /* TODO assert the constant is of the correct value */ CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(relationId); return list_make1(FindShardInterval(partitionValueConst->constvalue, cacheEntry)); @@ -315,7 +329,8 @@ IsDistributedUnion(Path *path, bool recurseTransparent, DistributedUnionPath **o } CustomPath *cpath = castNode(CustomPath, path); - if (cpath->methods != &distributedUnionMethods) { + if (cpath->methods != &distributedUnionMethods) + { return false; } @@ -350,7 +365,8 @@ PathBasedPlannerRelationHook(PlannerInfo *root, partitionKey->varno = bms_next_member(relOptInfo->relids, -1); Assert(bms_num_members(relOptInfo->relids) == 1); - partitionValue = ExtractPartitionValue(relOptInfo->baserestrictinfo, partitionKey); + partitionValue = ExtractPartitionValue(relOptInfo->baserestrictinfo, + partitionKey); } /* wrap every path with a distributed union custom path */ @@ -369,7 +385,8 @@ PathBasedPlannerRelationHook(PlannerInfo *root, /* hardcoded hack for adding geo distributed tables as an alternative path */ Relation rel = relation_open(rte->relid, AccessShareLock); - if (UseGeoPartitioning && strcmp(RelationGetRelationName(rel), "belgium_planet_osm_roads_dist") == 0) + if (UseGeoPartitioning && strcmp(RelationGetRelationName(rel), + "belgium_planet_osm_roads_dist") == 0) { if (OnlyGeoPartitioning) { @@ -386,11 +403,11 @@ PathBasedPlannerRelationHook(PlannerInfo *root, relOptInfo->reltarget, relOptInfo->rows); geoPath = (Path *) - WrapTableAccessWithDistributedUnion(geoPath, - TableColocationId(geoRelid), - NULL, - geoRelid, - NIL); + WrapTableAccessWithDistributedUnion(geoPath, + TableColocationId(geoRelid), + NULL, + geoRelid, + NIL); if (EnableGeoPartitioningGrouping) { @@ -537,7 +554,7 @@ makeGeoScanPath(Relation rel, RelOptInfo *parent, PathTarget *pathtarget, double if (IsA(expr, Var)) { /* TODO assume the first attribute of a relation as its PK */ - Var *var = (Var *)expr; + Var *var = (Var *) expr; isPrimaryKey = var->varattno == 1; } @@ -559,7 +576,8 @@ makeGeoScanPath(Relation rel, RelOptInfo *parent, PathTarget *pathtarget, double cpath->methods = &geoScanMethods; - geoPath->rte = makeRangeTableEntryForRelation(rel, AccessShareLock, NULL, false, true); + geoPath->rte = makeRangeTableEntryForRelation(rel, AccessShareLock, NULL, false, + true); return geoPath; } @@ -580,7 +598,7 @@ makeRangeTableEntryForRelation(Relation rel, bool inFromCl) { RangeTblEntry *rte = makeNode(RangeTblEntry); - char *refname = alias ? alias->aliasname : RelationGetRelationName(rel); + char *refname = alias ? alias->aliasname : RelationGetRelationName(rel); Assert(lockmode == AccessShareLock || lockmode == RowShareLock || @@ -613,7 +631,7 @@ makeRangeTableEntryForRelation(Relation rel, rte->inFromCl = inFromCl; rte->requiredPerms = ACL_SELECT; - rte->checkAsUser = InvalidOid; /* not set-uid by default, either */ + rte->checkAsUser = InvalidOid; /* not set-uid by default, either */ rte->selectedCols = NULL; rte->insertedCols = NULL; rte->updatedCols = NULL; @@ -694,15 +712,18 @@ CanOptimizeJoinPath(const JoinPath *jpath) return true; } + static NameData GetFunctionNameData(Oid funcid) { - HeapTuple proctup = NULL; + HeapTuple proctup = NULL; Form_pg_proc procform = NULL; proctup = SearchSysCache1(PROCOID, ObjectIdGetDatum(funcid)); if (!HeapTupleIsValid(proctup)) + { elog(ERROR, "cache lookup failed for function %u", funcid); + } procform = (Form_pg_proc) GETSTRUCT(proctup); /* copy name by value */ @@ -714,96 +735,9 @@ GetFunctionNameData(Oid funcid) } -static bool -MatchSTExpandExpression(Expr *expr, float8 *distance) -{ - if (!IsA(expr, FuncExpr)) - { - return false; - } - - FuncExpr *funcexpr = castNode(FuncExpr, expr); - - NameData funcNameData = GetFunctionNameData(funcexpr->funcid); - if (strcmp(NameStr(funcNameData), "st_expand") != 0) - { - /* expected an expansion of the geometry */ - return false; - } - - if (list_length(funcexpr->args) != 2) - { - /* expected 2 arguments */ - return false; - } - - if (distance) - { - Const *distanceConst = lsecond_node(Const, funcexpr->args); - Assert(distanceConst->consttype == FLOAT8OID); - - *distance = DatumGetFloat8(distanceConst->constvalue); - } - - - return true; -} - - -static bool -IsGeoOverlapJoin(Expr *expr) -{ - /* find a geometry_overlaps expression */ - if (!IsA(expr, OpExpr)) - { - return false; - } - - OpExpr *opexpr = castNode(OpExpr, expr); - if (!OidIsValid(opexpr->opfuncid)) - { - return false; - } - - /* check the name of the operation */ - NameData opname = GetFunctionNameData(opexpr->opfuncid); - if (strcmp(NameStr(opname), "geometry_overlaps") != 0) - { - return false; - } - - /* we expect exactly 2 arguments */ - if (list_length(opexpr->args) != 2) - { - return false; - } - - Expr *leftArg = linitial(opexpr->args); - Expr *rightArg = lsecond(opexpr->args); - - if (IsA(rightArg, Var) && IsA(leftArg, FuncExpr)) - { - /* swap the args around to work on them in expected fashion */ - Expr *tmp = leftArg; - leftArg = rightArg; - rightArg = tmp; - } - - float8 distance = 0; - if (IsA(leftArg, Var) && MatchSTExpandExpression(rightArg, &distance)) - { - return false; - } - - return true; -} - - typedef struct GeoJoinPathMatch { - RestrictInfo *stdwithRestrictInfo; Const *stdwithinDistanceConst; - double stdwithinDistance; AggPath *innerGrouping; DistributedUnionPath *innerDistUnion; @@ -833,178 +767,14 @@ SkipTransparentPaths(Path *path) } -static bool -MatchGeoScan(Path *path, - AggPath **matchedGrouping, - DistributedUnionPath **matchedDistUnion, - GeoScanPath **matchedPath) -{ - /* skip transparent paths */ - path = SkipTransparentPaths(path); - - if (EnableGeoPartitioningGrouping) - { - if (!IsA(path, AggPath)) - { - return false; - } - - AggPath *aggPath = castNode(AggPath, path); - if (matchedGrouping) - { - *matchedGrouping = aggPath; - } - - path = aggPath->subpath; - } - - DistributedUnionPath *distUnion = NULL; - if (!IsDistributedUnion(path, true, &distUnion)) - { - return false; - } - if (matchedDistUnion) - { - *matchedDistUnion = distUnion; - } - - path = distUnion->worker_path; - if (!IsGeoScanPath(castNode(CustomPath, path))) - { - return false; - } - - GeoScanPath *geoPath = (GeoScanPath *) path; - - if (matchedPath) - { - /* capture the matched path */ - *matchedPath = geoPath; - } - - return true; -} - - -static bool -MatchSTDWithinRestrictInfo(RestrictInfo *restrictInfo, - RestrictInfo **stdwithinQual, - double *stdwithinDistance) -{ - if (!IsA(restrictInfo->clause, OpExpr)) - { - return false; - } - OpExpr *opexpr = castNode(OpExpr, restrictInfo->clause); - - /* match for a (geometry && geometry) expression */ - if (list_length(opexpr->args) != 2) - { - /* not exactly 2 arguments, no need for further checks */ - return false; - } - - NameData opexprNameData = GetFunctionNameData(opexpr->opfuncid); - if (strcmp(NameStr(opexprNameData), "geometry_overlaps") != 0) - { - return false; - } - - Expr *arg1 = linitial(opexpr->args); - Expr *arg2 = lsecond(opexpr->args); - - /* match (geometry && st_expand(geometry, distance)) or (st_expand(geometry, distance) && var)*/ - if (!( - (IsA(arg1, Var) && MatchSTExpandExpression(arg2, stdwithinDistance)) || - (MatchSTExpandExpression(arg1, stdwithinDistance) && IsA(arg2, Var)) - )) - { - return false; - } - - /* matched */ - if (stdwithinQual) - { - *stdwithinQual = restrictInfo; - } - return true; -} - - -static bool -MatchSTDWithinJoin(List *restrictionInfos, RestrictInfo **stdwithinQual, - double *stdwithinDistance) -{ - RestrictInfo *restrictInfo = NULL; - foreach_ptr(restrictInfo, restrictionInfos) - { - Assert(IsA(restrictInfo, RestrictInfo)); - if (MatchSTDWithinRestrictInfo(restrictInfo, stdwithinQual, stdwithinDistance)) - { - return true; - } - } - - return false; -} - -static bool -MathGeoJoinPath(JoinPath *path, GeoJoinPathMatch *match) -{ - /* - * Tests are performed in fastest test to slowest test to have quick escapes when we - * don't match. - */ - - if (!MatchGeoScan(path->innerjoinpath, - match ? &match->innerGrouping : NULL, - match ? &match->innerDistUnion : NULL, - match ? &match->innerPath : NULL)) - { - /* innerjoinpath is not a geo scan */ - return false; - } - - if (!MatchGeoScan(path->outerjoinpath, - match ? &match->outerGrouping : NULL, - match ? &match->outerDistUnion : NULL, - match ? &match->outerPath : NULL)) - { - /* outerjoinpath is not a geo scan */ - return false; - } - - /* verify this is an innerjoin on ST_DWithin */ - if (path->jointype == JOIN_INNER - && !MatchSTDWithinJoin(path->joinrestrictinfo, - match ? &match->stdwithRestrictInfo : NULL, - match ? &match->stdwithinDistance : NULL)) - { - /* not a distance join */ - return false; - } - - /* all tests matched and if `match` was not NULL the value's have been captured */ - return true; -} - #include "distributed/planner/pattern_match.h" static List * GeoOverlapJoin(PlannerInfo *root, Path *originalPath) { - JoinPath *joinPath = (JoinPath *) originalPath; GeoJoinPathMatch match = { 0 }; - GeoJoinPathMatch match2 = { 0 }; - if (!MathGeoJoinPath(joinPath, &match)) - { - /* no match */ - return NIL; - } - - ereport(DEBUG1, (errmsg("matched pattern for geooverlap"))); IfPathMatch( - joinPath, + originalPath, MatchJoin( JOIN_INNER, MatchJoinRestrictions( @@ -1015,46 +785,41 @@ GeoOverlapJoin(PlannerInfo *root, Path *originalPath) st_expand, MatchVar(), CaptureMatch( - &match2.stdwithinDistanceConst, + &match.stdwithinDistanceConst, MatchConst(MatchConstType(FLOAT8OID)) )) )), - SkipReadthrough( - CaptureMatch( - &match2.innerGrouping, - MatchGrouping( - CaptureMatch( - &match2.innerDistUnion, - MatchDistributedUnion( - CaptureMatch( - &match2.innerPath, - MatchGeoScan - ))) - )) + SkipReadthrough(CaptureMatch( + &match.innerGrouping, + MatchGrouping(CaptureMatch( + &match.innerDistUnion, + MatchDistributedUnion(CaptureMatch( + &match.innerPath, + MatchGeoScan + ))) + )) ), - SkipReadthrough( - CaptureMatch( - &match2.outerGrouping, - MatchGrouping( - CaptureMatch( - &match2.outerDistUnion, - MatchDistributedUnion( - CaptureMatch( - &match2.innerPath, - MatchGeoScan - ))) - )) + SkipReadthrough(CaptureMatch( + &match.outerGrouping, + MatchGrouping(CaptureMatch( + &match.outerDistUnion, + MatchDistributedUnion(CaptureMatch( + &match.outerPath, + MatchGeoScan + ))) + )) ) ) ) { + /* have a match on the geo join pattern, all fields are stored in `match` */ ereport(DEBUG1, (errmsg("my custom code %p: %f", - match2.innerGrouping, - DatumGetFloat8(match2.stdwithinDistanceConst->constvalue) - ))); + match.innerGrouping, + DatumGetFloat8(match.stdwithinDistanceConst->constvalue) + ))); } - /* have a match on the geo join pattern, all fields are stored in `match` */ + (void) &match; return NIL; } @@ -1072,7 +837,8 @@ OptimizeJoinPath(PlannerInfo *root, Path *originalPath) if (jpath->jointype == JOIN_INNER && CanOptimizeJoinPath(jpath)) { /* we can only optimize the Distributed union if the colocationId's are the same, taking any would suffice */ - DistributedUnionPath *baseDistUnion = (DistributedUnionPath *) jpath->innerjoinpath; + DistributedUnionPath *baseDistUnion = + (DistributedUnionPath *) jpath->innerjoinpath; /* * Shallow copy of any join node, this does not imply executing a nested @@ -1083,8 +849,10 @@ OptimizeJoinPath(PlannerInfo *root, Path *originalPath) *jcpath = *jpath; jcpath->path.type = T_NestPath; - jcpath->innerjoinpath = ((DistributedUnionPath *) jpath->innerjoinpath)->worker_path; - jcpath->outerjoinpath = ((DistributedUnionPath *) jpath->outerjoinpath)->worker_path; + jcpath->innerjoinpath = + ((DistributedUnionPath *) jpath->innerjoinpath)->worker_path; + jcpath->outerjoinpath = + ((DistributedUnionPath *) jpath->outerjoinpath)->worker_path; /* TODO update costs of hashjoin, very naife removal of DU cost for now */ jcpath->path.startup_cost -= 2000; /* remove the double dist union cost */ @@ -1128,7 +896,8 @@ BroadcastOuterJoinPath(PlannerInfo *root, Path *originalPath) if (IsDistributedUnion(jpath->outerjoinpath, false, NULL)) { /* broadcast inner join path */ - DistributedUnionPath *baseDistUnion = (DistributedUnionPath *) jpath->outerjoinpath; + DistributedUnionPath *baseDistUnion = + (DistributedUnionPath *) jpath->outerjoinpath; /* * Shallow copy of any join node, this does not imply executing a nested @@ -1186,7 +955,8 @@ BroadcastInnerJoinPath(PlannerInfo *root, Path *originalPath) if (IsDistributedUnion(jpath->innerjoinpath, false, NULL)) { /* broadcast inner join path */ - DistributedUnionPath *baseDistUnion = (DistributedUnionPath *) jpath->innerjoinpath; + DistributedUnionPath *baseDistUnion = + (DistributedUnionPath *) jpath->innerjoinpath; /* * Shallow copy of any join node, this does not imply executing a nested @@ -1261,7 +1031,7 @@ PathBasedPlannerJoinHook(PlannerInfo *root, foreach(pathCell, joinrel->pathlist) { Path *originalPath = lfirst(pathCell); - for (int i=0; i < sizeof(joinOptimizations)/sizeof(joinOptimizations[1]); i++) + for (int i = 0; i < sizeof(joinOptimizations) / sizeof(joinOptimizations[1]); i++) { List *alternativePaths = joinOptimizations[i](root, originalPath); newPaths = list_concat(newPaths, alternativePaths); @@ -1275,6 +1045,7 @@ PathBasedPlannerJoinHook(PlannerInfo *root, } } + /* * varno_mapping is an array where the index is the varno in the original query, or 0 if * no mapping is required. @@ -1306,12 +1077,12 @@ VarNoMutator(Node *expr, Index *varno_mapping) var->vartypmod, var->varcollid, var->varlevelsup - ); + ); } default: { - return expression_tree_mutator(expr, (void*) VarNoMutator, varno_mapping); + return expression_tree_mutator(expr, (void *) VarNoMutator, varno_mapping); } } } @@ -1335,6 +1106,7 @@ ApplyPathToQuery(PlannerInfo *root, Query *query, Path *path, PathQueryInfo *inf case T_Agg: { AggPath *apath = castNode(AggPath, path); + /* the subpath needs to be applied before we can apply the grouping clause */ ApplyPathToQuery(root, query, apath->subpath, info); @@ -1514,7 +1286,7 @@ ApplyPathToQuery(PlannerInfo *root, Query *query, Path *path, PathQueryInfo *inf if (rteIndex == 0) { - RangeTblEntry* rte = geoPath->rte; + RangeTblEntry *rte = geoPath->rte; query->rtable = lappend(query->rtable, rte); rteIndex = list_length(query->rtable); info->varno_mapping[scan_relid] = rteIndex; @@ -1536,7 +1308,7 @@ ApplyPathToQuery(PlannerInfo *root, Query *query, Path *path, PathQueryInfo *inf ereport(ERROR, (errmsg("unknown path type in worker query"), errdetail("cannot turn worker path into query due to unknown " "path type in plan. pathtype: %d", path->pathtype)) - ); + ); } } } @@ -1694,6 +1466,7 @@ CanOptimizeAggPath(PlannerInfo *root, AggPath *apath) } SortGroupClause *sgc = NULL; + /* * TODO verify whats the purpose of the list, if we find any of the distribution * colums somewhere in this we optimize, might be wrong