Implement beginnings of join pushdown

moonshot/custom-path
Nils Dijk 2020-01-20 18:17:35 +01:00
parent b361386e05
commit f858efd2f2
No known key found for this signature in database
GPG Key ID: CA1177EF9434F241
3 changed files with 200 additions and 31 deletions

View File

@ -1692,7 +1692,7 @@ multi_join_restriction_hook(PlannerInfo *root,
{
if (UseCustomPath)
{
// TODO implement the replacement of path nodes with distributed union
PathBasedPlannerJoinHook(root, joinrel, outerrel, innerrel, jointype, extra);
return;
}

View File

@ -22,11 +22,12 @@
static Plan * CreateDistributedUnionPlan(PlannerInfo *root, RelOptInfo *rel, struct CustomPath *best_path, List *tlist, List *clauses, List *custom_plans);
static List * ReparameterizeDistributedUnion(PlannerInfo *root, List *custom_private, RelOptInfo *child_rel);
static CustomPath * WrapTableAccessWithDistributedUnion(PlannerInfo *root, Path *originalPath);
static CustomPath * WrapTableAccessWithDistributedUnion(Path *originalPath, uint32 colocationId);
static Index VarnoFromFirstTargetEntry(List *tlist);
static Query * GetQueryFromPath(PlannerInfo *root, Path *path, List *tlist, List *clauses);
static List * ShardIntervalListToRelationShardList(List *shardIntervalList);
static Path * OptimizeJoinPath(Path *originalPath);
static bool CanOptimizeHashPath(HashPath *hpath);
static bool IsDistributedUnion(Path *path);
typedef struct DistributedUnionPath
@ -47,9 +48,11 @@ const CustomPathMethods distributedUnionMethods = {
CustomPath *
WrapTableAccessWithDistributedUnion(PlannerInfo *root, Path *originalPath)
WrapTableAccessWithDistributedUnion(Path *originalPath, uint32 colocationId)
{
DistributedUnionPath *distUnion = newNode(sizeof(DistributedUnionPath), T_CustomPath);
DistributedUnionPath *distUnion = (DistributedUnionPath *)
newNode(sizeof(DistributedUnionPath), T_CustomPath);
distUnion->custom_path.path.pathtype = T_CustomScan;
distUnion->custom_path.path.parent = originalPath->parent;
distUnion->custom_path.path.pathtarget = originalPath->pathtarget;
@ -63,10 +66,7 @@ WrapTableAccessWithDistributedUnion(PlannerInfo *root, Path *originalPath)
distUnion->custom_path.methods = &distributedUnionMethods;
distUnion->worker_path = originalPath;
/* collect the colocation group of the table */
RangeTblEntry *rte = root->simple_rte_array[originalPath->parent->relid];
distUnion->colocationId = TableColocationId(rte->relid);
distUnion->colocationId = colocationId;
return (CustomPath *) distUnion;
}
@ -184,7 +184,10 @@ IsDistributedUnion(Path *path)
void
PathBasedPlannerRelationHook(PlannerInfo *root, RelOptInfo *relOptInfo, Index restrictionIndex, RangeTblEntry *rte)
PathBasedPlannerRelationHook(PlannerInfo *root,
RelOptInfo *relOptInfo,
Index restrictionIndex,
RangeTblEntry *rte)
{
if (!IsDistributedTable(rte->relid))
{
@ -197,11 +200,79 @@ PathBasedPlannerRelationHook(PlannerInfo *root, RelOptInfo *relOptInfo, Index re
foreach(pathCell, relOptInfo->pathlist)
{
Path *originalPath = lfirst(pathCell);
pathCell->data.ptr_value = WrapTableAccessWithDistributedUnion(root, originalPath);
pathCell->data.ptr_value =
WrapTableAccessWithDistributedUnion(originalPath,
TableColocationId(rte->relid));
}
}
static bool
CanOptimizeHashPath(HashPath *hpath)
{
if (!(IsDistributedUnion(hpath->jpath.innerjoinpath) &&
IsDistributedUnion(hpath->jpath.outerjoinpath)))
{
/* can only optimize joins when both inner and outer are a distributed union */
return false;
}
DistributedUnionPath *innerDU = (DistributedUnionPath *) hpath->jpath.innerjoinpath;
DistributedUnionPath *outerDU = (DistributedUnionPath *) hpath->jpath.outerjoinpath;
if (innerDU->colocationId != outerDU->colocationId)
{
/* Distributed Union is not on the same colocation group */
return false;
}
/* TODO check if join is on distribution column, assume for now */
return true;
}
static Path *
OptimizeJoinPath(Path *originalPath)
{
if (IsA(originalPath, HashPath))
{
HashPath *hpath = castNode(HashPath, originalPath);
if (CanOptimizeHashPath(hpath))
{
hpath->jpath.innerjoinpath = ((DistributedUnionPath *)hpath->jpath.innerjoinpath)->worker_path;
hpath->jpath.outerjoinpath = ((DistributedUnionPath *)hpath->jpath.outerjoinpath)->worker_path;
/* TODO update costs of hashjoin, very naife removal of DU cost for now */
hpath->jpath.path.startup_cost -= 2000; /* remove the double dist union cost */
hpath->jpath.path.total_cost -= 2000; /* remove the double dist union cost */
/* we can only optimize the Distributed union if the colocationId's are the same, taking any would suffice */
uint32 colocationId = ((DistributedUnionPath *) hpath->jpath.innerjoinpath)->colocationId;
return (Path *) WrapTableAccessWithDistributedUnion((Path *) hpath, colocationId);
}
}
return originalPath;
}
void
PathBasedPlannerJoinHook(PlannerInfo *root,
RelOptInfo *joinrel,
RelOptInfo *outerrel,
RelOptInfo *innerrel,
JoinType jointype,
JoinPathExtraData *extra)
{
if (jointype == JOIN_INNER)
{
ListCell *pathCell = NULL;
foreach(pathCell, joinrel->pathlist)
{
Path *originalPath = lfirst(pathCell);
pathCell->data.ptr_value = OptimizeJoinPath(originalPath);
}
}
}
/*
* varno_mapping is an array where the index is the varno in the original query, or 0 if
@ -210,6 +281,11 @@ PathBasedPlannerRelationHook(PlannerInfo *root, RelOptInfo *relOptInfo, Index re
static Node *
VarNoMutator(Node *expr, Index *varno_mapping)
{
if (expr == NULL)
{
return NULL;
}
switch (nodeTag(expr))
{
case T_Var:
@ -240,34 +316,113 @@ VarNoMutator(Node *expr, Index *varno_mapping)
}
typedef struct PathQueryInfo
{
/*
* Keep track of the mapping of varno's from the original query to the new query.
* This will be used to update the Varno attributes of Var's in the quals and target
* list.
*/
Index *varno_mapping;
} PathQueryInfo;
static void
ApplyPathToQuery(PlannerInfo *root, Query *query, Path *path, PathQueryInfo *info)
{
switch (path->pathtype)
{
case T_IndexScan:
case T_SeqScan:
{
/*
* Add table as source to the range table and keep track of the mapping with
* the original query
*/
Index scan_relid = path->parent->relid;
Index rteIndex = info->varno_mapping[scan_relid];
if (rteIndex == 0)
{
/* not added before, add and keep reference to which entry it has been added */
RangeTblEntry *rte = root->simple_rte_array[scan_relid];
query->rtable = lappend(query->rtable, rte);
rteIndex = list_length(query->rtable);
info->varno_mapping[scan_relid] = rteIndex;
}
/* add to from list */
RangeTblRef *rr = makeNode(RangeTblRef);
rr->rtindex = rteIndex;
query->jointree->fromlist = lappend(query->jointree->fromlist, rr);
List *quals = NIL;
RestrictInfo *rinfo = NULL;
foreach_ptr(rinfo, path->parent->baserestrictinfo)
{
Node *clause = (Node *) rinfo->clause;
quals = lappend(quals, clause);
}
if (list_length(quals) > 0)
{
Node *qualsAnd = (Node *) make_ands_explicit(quals);
query->jointree->quals = make_and_qual(query->jointree->quals, qualsAnd);
}
break;
}
case T_HashJoin:
{
JoinPath *jpath = (JoinPath *) path;
/* add both join paths to the query */
ApplyPathToQuery(root, query, jpath->outerjoinpath, info);
ApplyPathToQuery(root, query, jpath->innerjoinpath, info);
List *quals = NIL;
RestrictInfo *rinfo = NULL;
foreach_ptr(rinfo, jpath->joinrestrictinfo)
{
Node *clause = (Node *) rinfo->clause;
quals = lappend(quals, clause);
}
if (list_length(quals) > 0)
{
Node *qualsAnd = (Node *) make_ands_explicit(quals);
query->jointree->quals = make_and_qual(query->jointree->quals, qualsAnd);
}
break;
}
default:
{
ereport(ERROR, (errmsg("unknow path type in worker query"),
errdetail("cannot turn worker path into query due to unknown "
"path type in plan. pathtype: %d", path->pathtype))
);
}
}
}
static Query *
GetQueryFromPath(PlannerInfo *root, Path *path, List *tlist, List *clauses)
{
Index scan_relid = path->parent->relid;
RangeTblEntry *rte = root->simple_rte_array[scan_relid];
PathQueryInfo info = { 0 };
info.varno_mapping = palloc0(sizeof(Index) * root->simple_rel_array_size);
Query *q = makeNode(Query);
q->commandType = CMD_SELECT;
q->rtable = list_make1(rte);
q->jointree = makeNode(FromExpr);
ApplyPathToQuery(root, q, path, &info);
List *newTargetList = NIL;
TargetEntry *target = NULL;
Index *varno_mapping = palloc0(sizeof(Index) * root->simple_rel_array_size);
/*
* map the rte index of the table we are scanning to the range table entry as we have
* added it to the query
*/
varno_mapping[scan_relid] = 1;
/* copy the target list with mapped varno values to reflect the tables we are selecting */
newTargetList = (List *) VarNoMutator((Node *) tlist, varno_mapping);
List *newTargetList = (List *) VarNoMutator((Node *) tlist, info.varno_mapping);
q->targetList = newTargetList;
q->jointree = makeNode(FromExpr);
RangeTblRef *rr = makeNode(RangeTblRef);
rr->rtindex = 1;
q->jointree->fromlist = list_make1(rr);
List *quals = NIL;
@ -275,10 +430,15 @@ GetQueryFromPath(PlannerInfo *root, Path *path, List *tlist, List *clauses)
foreach_ptr(rinfo, clauses)
{
Node *clause = (Node *) rinfo->clause;
clause = VarNoMutator(clause, varno_mapping);
quals = lappend(quals, clause);
}
q->jointree->quals = (Node *) quals;
if (list_length(quals) > 0)
{
Node *qualsAnd = (Node *) make_ands_explicit(quals);
q->jointree->quals = make_and_qual(q->jointree->quals, qualsAnd);
}
q->jointree->quals = VarNoMutator(q->jointree->quals, info.varno_mapping);
return q;
}

View File

@ -8,6 +8,15 @@
#include "nodes/parsenodes.h"
#include "nodes/pathnodes.h"
extern void PathBasedPlannerRelationHook(PlannerInfo *root, RelOptInfo *relOptInfo, Index restrictionIndex, RangeTblEntry *rte);
extern void PathBasedPlannerRelationHook(PlannerInfo *root,
RelOptInfo *relOptInfo,
Index restrictionIndex,
RangeTblEntry *rte);
extern void PathBasedPlannerJoinHook(PlannerInfo *root,
RelOptInfo *joinrel,
RelOptInfo *outerrel,
RelOptInfo *innerrel,
JoinType jointype,
JoinPathExtraData *extra);
#endif //CITUS_PATH_BASED_PLANNER_H