diff --git a/src/backend/distributed/planner/distributed_planner.c b/src/backend/distributed/planner/distributed_planner.c index c6bc8f224..08662531a 100644 --- a/src/backend/distributed/planner/distributed_planner.c +++ b/src/backend/distributed/planner/distributed_planner.c @@ -1692,7 +1692,7 @@ multi_join_restriction_hook(PlannerInfo *root, { if (UseCustomPath) { - // TODO implement the replacement of path nodes with distributed union + PathBasedPlannerJoinHook(root, joinrel, outerrel, innerrel, jointype, extra); return; } diff --git a/src/backend/distributed/planner/path_based_planner.c b/src/backend/distributed/planner/path_based_planner.c index 7977e682c..844fe490e 100644 --- a/src/backend/distributed/planner/path_based_planner.c +++ b/src/backend/distributed/planner/path_based_planner.c @@ -22,11 +22,12 @@ static Plan * CreateDistributedUnionPlan(PlannerInfo *root, RelOptInfo *rel, struct CustomPath *best_path, List *tlist, List *clauses, List *custom_plans); static List * ReparameterizeDistributedUnion(PlannerInfo *root, List *custom_private, RelOptInfo *child_rel); -static CustomPath * WrapTableAccessWithDistributedUnion(PlannerInfo *root, Path *originalPath); +static CustomPath * WrapTableAccessWithDistributedUnion(Path *originalPath, uint32 colocationId); static Index VarnoFromFirstTargetEntry(List *tlist); static Query * GetQueryFromPath(PlannerInfo *root, Path *path, List *tlist, List *clauses); static List * ShardIntervalListToRelationShardList(List *shardIntervalList); - +static Path * OptimizeJoinPath(Path *originalPath); +static bool CanOptimizeHashPath(HashPath *hpath); static bool IsDistributedUnion(Path *path); typedef struct DistributedUnionPath @@ -47,9 +48,11 @@ const CustomPathMethods distributedUnionMethods = { CustomPath * -WrapTableAccessWithDistributedUnion(PlannerInfo *root, Path *originalPath) +WrapTableAccessWithDistributedUnion(Path *originalPath, uint32 colocationId) { - DistributedUnionPath *distUnion = newNode(sizeof(DistributedUnionPath), T_CustomPath); + DistributedUnionPath *distUnion = (DistributedUnionPath *) + newNode(sizeof(DistributedUnionPath), T_CustomPath); + distUnion->custom_path.path.pathtype = T_CustomScan; distUnion->custom_path.path.parent = originalPath->parent; distUnion->custom_path.path.pathtarget = originalPath->pathtarget; @@ -63,10 +66,7 @@ WrapTableAccessWithDistributedUnion(PlannerInfo *root, Path *originalPath) distUnion->custom_path.methods = &distributedUnionMethods; distUnion->worker_path = originalPath; - - /* collect the colocation group of the table */ - RangeTblEntry *rte = root->simple_rte_array[originalPath->parent->relid]; - distUnion->colocationId = TableColocationId(rte->relid); + distUnion->colocationId = colocationId; return (CustomPath *) distUnion; } @@ -184,7 +184,10 @@ IsDistributedUnion(Path *path) void -PathBasedPlannerRelationHook(PlannerInfo *root, RelOptInfo *relOptInfo, Index restrictionIndex, RangeTblEntry *rte) +PathBasedPlannerRelationHook(PlannerInfo *root, + RelOptInfo *relOptInfo, + Index restrictionIndex, + RangeTblEntry *rte) { if (!IsDistributedTable(rte->relid)) { @@ -197,11 +200,79 @@ PathBasedPlannerRelationHook(PlannerInfo *root, RelOptInfo *relOptInfo, Index re foreach(pathCell, relOptInfo->pathlist) { Path *originalPath = lfirst(pathCell); - pathCell->data.ptr_value = WrapTableAccessWithDistributedUnion(root, originalPath); + pathCell->data.ptr_value = + WrapTableAccessWithDistributedUnion(originalPath, + TableColocationId(rte->relid)); } } +static bool +CanOptimizeHashPath(HashPath *hpath) +{ + if (!(IsDistributedUnion(hpath->jpath.innerjoinpath) && + IsDistributedUnion(hpath->jpath.outerjoinpath))) + { + /* can only optimize joins when both inner and outer are a distributed union */ + return false; + } + DistributedUnionPath *innerDU = (DistributedUnionPath *) hpath->jpath.innerjoinpath; + DistributedUnionPath *outerDU = (DistributedUnionPath *) hpath->jpath.outerjoinpath; + + if (innerDU->colocationId != outerDU->colocationId) + { + /* Distributed Union is not on the same colocation group */ + return false; + } + + /* TODO check if join is on distribution column, assume for now */ + return true; +} + + +static Path * +OptimizeJoinPath(Path *originalPath) +{ + if (IsA(originalPath, HashPath)) + { + HashPath *hpath = castNode(HashPath, originalPath); + if (CanOptimizeHashPath(hpath)) + { + hpath->jpath.innerjoinpath = ((DistributedUnionPath *)hpath->jpath.innerjoinpath)->worker_path; + hpath->jpath.outerjoinpath = ((DistributedUnionPath *)hpath->jpath.outerjoinpath)->worker_path; + + /* TODO update costs of hashjoin, very naife removal of DU cost for now */ + hpath->jpath.path.startup_cost -= 2000; /* remove the double dist union cost */ + hpath->jpath.path.total_cost -= 2000; /* remove the double dist union cost */ + + /* we can only optimize the Distributed union if the colocationId's are the same, taking any would suffice */ + uint32 colocationId = ((DistributedUnionPath *) hpath->jpath.innerjoinpath)->colocationId; + return (Path *) WrapTableAccessWithDistributedUnion((Path *) hpath, colocationId); + } + } + + return originalPath; +} + + +void +PathBasedPlannerJoinHook(PlannerInfo *root, + RelOptInfo *joinrel, + RelOptInfo *outerrel, + RelOptInfo *innerrel, + JoinType jointype, + JoinPathExtraData *extra) +{ + if (jointype == JOIN_INNER) + { + ListCell *pathCell = NULL; + foreach(pathCell, joinrel->pathlist) + { + Path *originalPath = lfirst(pathCell); + pathCell->data.ptr_value = OptimizeJoinPath(originalPath); + } + } +} /* * varno_mapping is an array where the index is the varno in the original query, or 0 if @@ -210,6 +281,11 @@ PathBasedPlannerRelationHook(PlannerInfo *root, RelOptInfo *relOptInfo, Index re static Node * VarNoMutator(Node *expr, Index *varno_mapping) { + if (expr == NULL) + { + return NULL; + } + switch (nodeTag(expr)) { case T_Var: @@ -240,34 +316,113 @@ VarNoMutator(Node *expr, Index *varno_mapping) } +typedef struct PathQueryInfo +{ + /* + * Keep track of the mapping of varno's from the original query to the new query. + * This will be used to update the Varno attributes of Var's in the quals and target + * list. + */ + Index *varno_mapping; +} PathQueryInfo; + +static void +ApplyPathToQuery(PlannerInfo *root, Query *query, Path *path, PathQueryInfo *info) +{ + switch (path->pathtype) + { + case T_IndexScan: + case T_SeqScan: + { + /* + * Add table as source to the range table and keep track of the mapping with + * the original query + */ + Index scan_relid = path->parent->relid; + Index rteIndex = info->varno_mapping[scan_relid]; + + if (rteIndex == 0) + { + /* not added before, add and keep reference to which entry it has been added */ + RangeTblEntry *rte = root->simple_rte_array[scan_relid]; + query->rtable = lappend(query->rtable, rte); + rteIndex = list_length(query->rtable); + info->varno_mapping[scan_relid] = rteIndex; + } + + /* add to from list */ + RangeTblRef *rr = makeNode(RangeTblRef); + rr->rtindex = rteIndex; + query->jointree->fromlist = lappend(query->jointree->fromlist, rr); + + List *quals = NIL; + RestrictInfo *rinfo = NULL; + foreach_ptr(rinfo, path->parent->baserestrictinfo) + { + Node *clause = (Node *) rinfo->clause; + quals = lappend(quals, clause); + } + if (list_length(quals) > 0) + { + Node *qualsAnd = (Node *) make_ands_explicit(quals); + query->jointree->quals = make_and_qual(query->jointree->quals, qualsAnd); + } + + break; + } + + case T_HashJoin: + { + JoinPath *jpath = (JoinPath *) path; + + /* add both join paths to the query */ + ApplyPathToQuery(root, query, jpath->outerjoinpath, info); + ApplyPathToQuery(root, query, jpath->innerjoinpath, info); + + + List *quals = NIL; + RestrictInfo *rinfo = NULL; + foreach_ptr(rinfo, jpath->joinrestrictinfo) + { + Node *clause = (Node *) rinfo->clause; + quals = lappend(quals, clause); + } + if (list_length(quals) > 0) + { + Node *qualsAnd = (Node *) make_ands_explicit(quals); + query->jointree->quals = make_and_qual(query->jointree->quals, qualsAnd); + } + + break; + } + + default: + { + ereport(ERROR, (errmsg("unknow path type in worker query"), + errdetail("cannot turn worker path into query due to unknown " + "path type in plan. pathtype: %d", path->pathtype)) + ); + } + } +} + + static Query * GetQueryFromPath(PlannerInfo *root, Path *path, List *tlist, List *clauses) { - Index scan_relid = path->parent->relid; - RangeTblEntry *rte = root->simple_rte_array[scan_relid]; + PathQueryInfo info = { 0 }; + info.varno_mapping = palloc0(sizeof(Index) * root->simple_rel_array_size); Query *q = makeNode(Query); q->commandType = CMD_SELECT; - q->rtable = list_make1(rte); + q->jointree = makeNode(FromExpr); + ApplyPathToQuery(root, q, path, &info); - List *newTargetList = NIL; - TargetEntry *target = NULL; - - Index *varno_mapping = palloc0(sizeof(Index) * root->simple_rel_array_size); - /* - * map the rte index of the table we are scanning to the range table entry as we have - * added it to the query - */ - varno_mapping[scan_relid] = 1; /* copy the target list with mapped varno values to reflect the tables we are selecting */ - newTargetList = (List *) VarNoMutator((Node *) tlist, varno_mapping); + List *newTargetList = (List *) VarNoMutator((Node *) tlist, info.varno_mapping); q->targetList = newTargetList; - q->jointree = makeNode(FromExpr); - RangeTblRef *rr = makeNode(RangeTblRef); - rr->rtindex = 1; - q->jointree->fromlist = list_make1(rr); List *quals = NIL; @@ -275,10 +430,15 @@ GetQueryFromPath(PlannerInfo *root, Path *path, List *tlist, List *clauses) foreach_ptr(rinfo, clauses) { Node *clause = (Node *) rinfo->clause; - clause = VarNoMutator(clause, varno_mapping); quals = lappend(quals, clause); } - q->jointree->quals = (Node *) quals; + + if (list_length(quals) > 0) + { + Node *qualsAnd = (Node *) make_ands_explicit(quals); + q->jointree->quals = make_and_qual(q->jointree->quals, qualsAnd); + } + q->jointree->quals = VarNoMutator(q->jointree->quals, info.varno_mapping); return q; } diff --git a/src/include/distributed/path_based_planner.h b/src/include/distributed/path_based_planner.h index 9e81ee3d1..8719b6f97 100644 --- a/src/include/distributed/path_based_planner.h +++ b/src/include/distributed/path_based_planner.h @@ -8,6 +8,15 @@ #include "nodes/parsenodes.h" #include "nodes/pathnodes.h" -extern void PathBasedPlannerRelationHook(PlannerInfo *root, RelOptInfo *relOptInfo, Index restrictionIndex, RangeTblEntry *rte); +extern void PathBasedPlannerRelationHook(PlannerInfo *root, + RelOptInfo *relOptInfo, + Index restrictionIndex, + RangeTblEntry *rte); +extern void PathBasedPlannerJoinHook(PlannerInfo *root, + RelOptInfo *joinrel, + RelOptInfo *outerrel, + RelOptInfo *innerrel, + JoinType jointype, + JoinPathExtraData *extra); #endif //CITUS_PATH_BASED_PLANNER_H