From 51d498e2c26c9a8d135588005ed14e05a9e4b237 Mon Sep 17 00:00:00 2001 From: Nils Dijk Date: Mon, 22 Nov 2021 16:21:47 +0100 Subject: [PATCH] wip for repartition join --- .../distributed/planner/path_based_planner.c | 163 +++++++++++++++++- 1 file changed, 161 insertions(+), 2 deletions(-) diff --git a/src/backend/distributed/planner/path_based_planner.c b/src/backend/distributed/planner/path_based_planner.c index abc37fbf8..f04f3174a 100644 --- a/src/backend/distributed/planner/path_based_planner.c +++ b/src/backend/distributed/planner/path_based_planner.c @@ -49,6 +49,13 @@ typedef struct DistributedUnionPath Oid sampleRelid; } DistributedUnionPath; +typedef struct RepartitionPath +{ + CustomPath custom_path; + + uint32 targetColocationId; +} RepartitionPath; + typedef struct GeoScanPath { CustomPath custom_path; @@ -65,10 +72,12 @@ static CustomPath * WrapTableAccessWithDistributedUnion(Path *originalPath, uint colocationId, Expr *partitionValue, Oid sampleRelid, List *subPaths); +static Path * CreateRepartitionNode(uint32 colocationId, Path *worker_path); static Query * GetQueryFromPath(PlannerInfo *root, Path *path, List *tlist, List *clauses, Index **varnoMapping); static List * ShardIntervalListToRelationShardList(List *shardIntervalList); static List * OptimizeJoinPath(PlannerInfo *root, Path *originalPath); +static List * OptimizeRepartitionInnerJoinPath(PlannerInfo *root, Path *originalPath); static List * BroadcastOuterJoinPath(PlannerInfo *root, Path *originalPath); static List * BroadcastInnerJoinPath(PlannerInfo *root, Path *originalPath); static List * GeoOverlapJoin(PlannerInfo *root, Path *originalPath); @@ -103,9 +112,10 @@ bool EnableBroadcastJoin = true; /* list of functions that will be called to optimized in the joinhook*/ static optimizeFn joinOptimizations[] = { OptimizeJoinPath, + OptimizeRepartitionInnerJoinPath, /* BroadcastOuterJoinPath, */ /* BroadcastInnerJoinPath, */ - GeoOverlapJoin, +/* GeoOverlapJoin, */ }; const CustomPathMethods geoScanMethods = { @@ -118,6 +128,10 @@ const CustomPathMethods distributedUnionMethods = { .ReparameterizeCustomPathByChild = ReparameterizeDistributedUnion }; +const CustomPathMethods repartitionMethods = { + .CustomName = "Repartition", +}; + static CustomPath * WrapTableAccessWithDistributedUnion(Path *originalPath, uint32 colocationId, @@ -139,6 +153,7 @@ WrapTableAccessWithDistributedUnion(Path *originalPath, uint32 colocationId, distUnion->custom_path.methods = &distributedUnionMethods; distUnion->worker_path = originalPath; + distUnion->custom_path.custom_private = list_make1(originalPath); distUnion->colocationId = colocationId; distUnion->partitionValue = partitionValue; distUnion->sampleRelid = sampleRelid; @@ -1004,6 +1019,150 @@ OptimizeJoinPath(PlannerInfo *root, Path *originalPath) } +static List * +OptimizeRepartitionInnerJoinPath(PlannerInfo *root, Path *originalPath) +{ + DistributedUnionPath *innerDU = NULL; + DistributedUnionPath *outerDU = NULL; + JoinPath *joinPath = NULL; + + /* + * Match the following shape: + * + * +---------+ + * | Join | + * +---------+ + * / \ + * +---------------------+ +---------------------+ + * | Collect | | Collect | + * | - ColocationID: $1 | | - ColocationID: !$1 | + * +---------------------+ +---------------------+ + * + */ + IfPathMatch( + originalPath, + MatchJoin( + &joinPath, + JOIN_INNER, + /* match on join restriction info */ + MatchAny, + /* match inner path in join */ + SkipReadThrough( + NoCapture, + MatchDistributedUnion( + &innerDU, + MatchAny)), + /* match outer path in join */ + SkipReadThrough( + NoCapture, + MatchDistributedUnion( + &outerDU, + MatchAny)))) + { + /* + * We matched the shape of our join. Next we need to verify the join is not + * already colocated, because a colocated join can always push down. To verify + * colocatededness of the join we need to verify the following: + * - the join is happening in the same colocation id + * - there is an equivalence on the list of distribution columns on both the + * inner and the outer part of the join. + * If any of the above do not satisfy the joins are not colocated + */ + + if (innerDU->colocationId == outerDU->colocationId) + { + /* TODO check skipped the equivalence check between distribution attributes */ + return NIL; + } + + /* + * We want to repartition the inner join to the colocation of the outer join. For + * this we need to understand which attribute on the inner join has an equivalence + * condition on any of the attributes in the outer part of the join. + * + * Once we know on which attribute to repartition the inner part we can create a + * new tree in the following shape: + * +------------------------------------+ + * | Collect | + * | - ColocationID: outer.ColocationID | + * +------------------------------------+ + * | + * +---------+ + * | Join | + * +---------+ + * / \ + * +-------------------+ +------------------------------------+ + * | oruter.worke_path | | Repartition | + * +-------------------+ | - ColocationID: outer.colocationID | + * +------------------------------------+ + * | + * +-------------------+ + * | inner.worker_path | + * +-------------------+ + */ + + /* create new Join node */ + JoinPath *newJoinPath = makeNode(NestPath); + *newJoinPath = *joinPath; + newJoinPath->path.type = T_NestPath; /* reset type after copied join data */ + + /* populate outer path*/ + newJoinPath->outerjoinpath = outerDU->worker_path; + + /* TODO understand how to describe on which attribute the Repartition needs to happen */ + newJoinPath->innerjoinpath = CreateRepartitionNode(outerDU->colocationId, + innerDU->worker_path); + + /* TODO find a good way to calculate join costs based on its inner/outer paths */ + /* subtract the double collect cost */ + newJoinPath->path.startup_cost -= 2000; + newJoinPath->path.total_cost -= 2000; + + /* add the costs for the repartition */ + newJoinPath->path.startup_cost += 500; + newJoinPath->path.total_cost += 500; + + /* create Collect on top of new join, base Collect on matched outer Collect */ + const DistributedUnionPath *baseDistUnion = outerDU; + Path *newPath = (Path *) WrapTableAccessWithDistributedUnion( + (Path *) newJoinPath, + baseDistUnion->colocationId, + baseDistUnion->partitionValue, + baseDistUnion->sampleRelid, + baseDistUnion->custom_path.custom_paths); + + return list_make1(newPath); + } + + return NIL; +} + + +static Path * +CreateRepartitionNode(uint32 colocationId, Path *worker_path) +{ + RepartitionPath *repartition = (RepartitionPath *) + newNode(sizeof(RepartitionPath), T_CustomPath); + + repartition->custom_path.path.pathtype = T_CustomScan; + repartition->custom_path.path.parent = worker_path->parent; + repartition->custom_path.path.pathtarget = worker_path->pathtarget; + repartition->custom_path.path.param_info = worker_path->param_info; + + /* TODO use a better cost model */ + repartition->custom_path.path.rows = worker_path->rows; + repartition->custom_path.path.startup_cost = worker_path->startup_cost + 500; + repartition->custom_path.path.total_cost = worker_path->total_cost + 500; + + repartition->custom_path.methods = &repartitionMethods; + + repartition->custom_path.custom_private = list_make1(worker_path); + repartition->targetColocationId = colocationId; + + return (Path *) repartition; +} + + static List * BroadcastOuterJoinPath(PlannerInfo *root, Path *originalPath) { @@ -1618,7 +1777,7 @@ CanOptimizeAggPath(PlannerInfo *root, AggPath *apath) /* * TODO verify whats the purpose of the list, if we find any of the distribution - * colums somewhere in this we optimize, might be wrong + * columns somewhere in this we optimize, might be wrong */ foreach_ptr(sgc, apath->groupClause) {