From 285f1b1d1e3ca36e9648b5ade28b45ee1d50106c Mon Sep 17 00:00:00 2001 From: Nils Dijk Date: Fri, 17 Jan 2020 17:30:08 +0100 Subject: [PATCH] move path based planner to its own files --- .../distributed/planner/distributed_planner.c | 194 +------------ .../distributed/planner/path_based_planner.c | 254 ++++++++++++++++++ src/include/distributed/path_based_planner.h | 13 + 3 files changed, 269 insertions(+), 192 deletions(-) create mode 100644 src/backend/distributed/planner/path_based_planner.c create mode 100644 src/include/distributed/path_based_planner.h diff --git a/src/backend/distributed/planner/distributed_planner.c b/src/backend/distributed/planner/distributed_planner.c index 5811031e4..c6bc8f224 100644 --- a/src/backend/distributed/planner/distributed_planner.c +++ b/src/backend/distributed/planner/distributed_planner.c @@ -42,6 +42,7 @@ #include "distributed/multi_physical_planner.h" #include "distributed/combine_query_planner.h" #include "distributed/multi_router_planner.h" +#include "distributed/path_based_planner.h" #include "distributed/query_utils.h" #include "distributed/recursive_planning.h" #include "distributed/shardinterval_utils.h" @@ -1749,184 +1750,6 @@ multi_join_restriction_hook(PlannerInfo *root, } -static Node * -VarNoMutator(Node *expr, Index *varno) -{ - switch (nodeTag(expr)) - { - case T_Var: - { - Var *var = castNode(Var, expr); - if (var->varno == *varno) - { - /*nothing to change */ - return (Node *) var; - } - - return (Node *) makeVar( - *varno, - var->varattno, - var->vartype, - var->vartypmod, - var->varcollid, - var->varlevelsup - ); - } - - default: - { - return expression_tree_mutator(expr, (void*) VarNoMutator, varno); - } - } -} - - -static Query * -GetQueryFromPath(List *tlist, RangeTblEntry *rte, List *clauses) -{ - Query *q = makeNode(Query); - q->commandType = CMD_SELECT; - q->rtable = list_make1(rte); - - List *newTargetList = NIL; - TargetEntry *target = NULL; - foreach_ptr(target, tlist) - { - Index varno = 1; - TargetEntry *newTarget = (TargetEntry *) VarNoMutator((Node *)target, &varno); - newTargetList = lappend(newTargetList, newTarget); - } - - q->targetList = newTargetList; - q->jointree = makeNode(FromExpr); - RangeTblRef *rr = makeNode(RangeTblRef); - rr->rtindex = 1; - q->jointree->fromlist = list_make1(rr); - - List *quals = NIL; - - RestrictInfo *rinfo = NULL; - foreach_ptr(rinfo, clauses) - { - Node *clause = (Node *) rinfo->clause; - Index varno = 1; - clause = VarNoMutator(clause, &varno); - quals = lappend(quals, clause); - } - q->jointree->quals = (Node *) quals; - - return q; -} - - -static Index -VarnoFromFirstTargetEntry(List *tlist) -{ - TargetEntry *entry = linitial_node(TargetEntry, tlist); - Var *var = castNode(Var, entry->expr); - return var->varno; -} - -#include "optimizer/restrictinfo.h" - -static Plan * -CreateDistributedUnionPlan(PlannerInfo *root, - RelOptInfo *rel, - struct CustomPath *best_path, - List *tlist, - List *clauses, - List *custom_plans) -{ - Job *workerJob = CitusMakeNode(Job); - workerJob->jobId = UniqueJobId(); - - RangeTblEntry *rte = list_nth_node(RangeTblEntry, best_path->custom_private, 0); - Path *originalPath = (Path *) list_nth(best_path->custom_private, 1); - List* shardIntervalList = LoadShardIntervalList(rte->relid); - ShardInterval *shardInterval = NULL; - - Query *q = GetQueryFromPath(tlist, rte, clauses); - - int i = 0; - foreach_ptr(shardInterval, shardIntervalList) - { - RelationShard *rs = CitusMakeNode(RelationShard); - rs->relationId = rte->relid; - rs->shardId = shardInterval->shardId; - - Query *qc = copyObject(q); - UpdateRelationToShardNames((Node *) qc, list_make1(rs)); - - StringInfoData buf; - initStringInfo(&buf); - pg_get_query_def(qc, &buf); - - Task *sqlTask = CreateBasicTask(workerJob->jobId, i, SELECT_TASK, buf.data); - sqlTask->anchorShardId = shardInterval->shardId; - sqlTask->taskPlacementList = FinalizedShardPlacementList(shardInterval->shardId); - workerJob->taskList = lappend(workerJob->taskList, sqlTask); - i++; - } - - DistributedPlan *distributedPlan = CitusMakeNode(DistributedPlan); - distributedPlan->workerJob = workerJob; - distributedPlan->modLevel = ROW_MODIFY_READONLY; - distributedPlan->relationIdList = list_make1_oid(rte->relid); - distributedPlan->hasReturning = true; - - CustomScan *plan = makeNode(CustomScan); - plan->scan.scanrelid = VarnoFromFirstTargetEntry(tlist); - plan->flags = best_path->flags; - plan->methods = &AdaptiveExecutorCustomScanMethods; - plan->custom_private = list_make1(distributedPlan); - - plan->scan.plan.targetlist = tlist; - /* Reduce RestrictInfo list to bare expressions; ignore pseudoconstants */ - clauses = extract_actual_clauses(clauses, false); - - plan->scan.plan.qual = clauses; - plan->custom_exprs = clauses; - - return (Plan *) plan; -} - - -static List * -ReparameterizeDistributedUnion(PlannerInfo *root, - List *custom_private, - RelOptInfo *child_rel) -{ - return NIL; -} - -const CustomPathMethods distributedUnionMethods = { - .CustomName = "Distributed Union", - .PlanCustomPath = CreateDistributedUnionPlan, - .ReparameterizeCustomPathByChild = ReparameterizeDistributedUnion -}; - -static CustomPath * -WrapTableAccessWithDistributedUnion(Path *originalPath, RangeTblEntry *rte) -{ - CustomPath *distUnion = makeNode(CustomPath); - distUnion->path.pathtype = T_CustomScan; - distUnion->path.parent = originalPath->parent; - distUnion->path.pathtarget = originalPath->pathtarget; - distUnion->path.param_info = originalPath->param_info; - - /* TODO use a better cost model */ - distUnion->path.rows = originalPath->rows; - distUnion->path.startup_cost = originalPath->startup_cost+1000; - distUnion->path.total_cost = originalPath->total_cost+1000; - - distUnion->methods = &distributedUnionMethods; - - distUnion->custom_private = list_make2(rte, originalPath); - - return distUnion; -} - - /* * multi_relation_restriction_hook is a hook called by postgresql standard planner * to notify us about various planning information regarding a relation. We use @@ -1938,20 +1761,7 @@ multi_relation_restriction_hook(PlannerInfo *root, RelOptInfo *relOptInfo, { if (UseCustomPath) { - if (!IsDistributedTable(rte->relid)) - { - /* table accessed is not distributed, no paths to change */ - return; - } - - /* wrap every path with a distirbuted union custom path */ - ListCell *pathCell = NULL; - foreach(pathCell, relOptInfo->pathlist) - { - Path *originalPath = lfirst(pathCell); - pathCell->data.ptr_value = WrapTableAccessWithDistributedUnion(originalPath, rte); - } - // TODO implement the replacement of path nodes with distributed union + PathBasedPlannerRelationHook(root, relOptInfo, restrictionIndex, rte); return; } diff --git a/src/backend/distributed/planner/path_based_planner.c b/src/backend/distributed/planner/path_based_planner.c new file mode 100644 index 000000000..ab515f1f2 --- /dev/null +++ b/src/backend/distributed/planner/path_based_planner.c @@ -0,0 +1,254 @@ +// +// Created by Nils Dijk on 17/01/2020. +// +#include "postgres.h" + +#include "distributed/citus_custom_scan.h" +#include "distributed/citus_ruleutils.h" +#include "distributed/deparse_shard_query.h" +#include "distributed/listutils.h" +#include "distributed/metadata_cache.h" +#include "distributed/multi_physical_planner.h" +#include "distributed/path_based_planner.h" +#include "nodes/extensible.h" +#include "nodes/makefuncs.h" +#include "nodes/nodeFuncs.h" +#include "nodes/nodes.h" +#include "nodes/pathnodes.h" +#include "nodes/pg_list.h" +#include "nodes/plannodes.h" +#include "optimizer/restrictinfo.h" + +static Plan * CreateDistributedUnionPlan(PlannerInfo *root, RelOptInfo *rel, struct CustomPath *best_path, List *tlist, List *clauses, List *custom_plans); +static List * ReparameterizeDistributedUnion(PlannerInfo *root, List *custom_private, RelOptInfo *child_rel); +static CustomPath * WrapTableAccessWithDistributedUnion(Path *originalPath, RangeTblEntry *rte); +static Index VarnoFromFirstTargetEntry(List *tlist); +static Query * GetQueryFromPath(List *tlist, RangeTblEntry *rte, List *clauses); + +static bool IsDistributedUnion(Path *path); +static uint32 ColocationGroupForDistributedUnion(Path *path); + +const CustomPathMethods distributedUnionMethods = { + .CustomName = "Distributed Union", + .PlanCustomPath = CreateDistributedUnionPlan, + .ReparameterizeCustomPathByChild = ReparameterizeDistributedUnion +}; + + +CustomPath * +WrapTableAccessWithDistributedUnion(Path *originalPath, RangeTblEntry *rte) +{ + CustomPath *distUnion = makeNode(CustomPath); + distUnion->path.pathtype = T_CustomScan; + distUnion->path.parent = originalPath->parent; + distUnion->path.pathtarget = originalPath->pathtarget; + distUnion->path.param_info = originalPath->param_info; + + /* TODO use a better cost model */ + distUnion->path.rows = originalPath->rows; + distUnion->path.startup_cost = originalPath->startup_cost+1000; + distUnion->path.total_cost = originalPath->total_cost+1000; + + distUnion->methods = &distributedUnionMethods; + + distUnion->custom_private = list_make2(rte, originalPath); + + return distUnion; +} + + +static Plan * +CreateDistributedUnionPlan(PlannerInfo *root, + RelOptInfo *rel, + struct CustomPath *best_path, + List *tlist, + List *clauses, + List *custom_plans) +{ + Job *workerJob = CitusMakeNode(Job); + workerJob->jobId = UniqueJobId(); + + RangeTblEntry *rte = list_nth_node(RangeTblEntry, best_path->custom_private, 0); + Path *originalPath = (Path *) list_nth(best_path->custom_private, 1); + List* shardIntervalList = LoadShardIntervalList(rte->relid); + ShardInterval *shardInterval = NULL; + + Query *q = GetQueryFromPath(tlist, rte, clauses); + + int i = 0; + foreach_ptr(shardInterval, shardIntervalList) + { + RelationShard *rs = CitusMakeNode(RelationShard); + rs->relationId = rte->relid; + rs->shardId = shardInterval->shardId; + + Query *qc = copyObject(q); + UpdateRelationToShardNames((Node *) qc, list_make1(rs)); + + StringInfoData buf; + initStringInfo(&buf); + pg_get_query_def(qc, &buf); + + Task *sqlTask = CreateBasicTask(workerJob->jobId, i, SELECT_TASK, buf.data); + sqlTask->anchorShardId = shardInterval->shardId; + sqlTask->taskPlacementList = FinalizedShardPlacementList(shardInterval->shardId); + workerJob->taskList = lappend(workerJob->taskList, sqlTask); + i++; + } + + DistributedPlan *distributedPlan = CitusMakeNode(DistributedPlan); + distributedPlan->workerJob = workerJob; + distributedPlan->modLevel = ROW_MODIFY_READONLY; + distributedPlan->relationIdList = list_make1_oid(rte->relid); + distributedPlan->hasReturning = true; + + CustomScan *plan = makeNode(CustomScan); + plan->scan.scanrelid = VarnoFromFirstTargetEntry(tlist); + plan->flags = best_path->flags; + plan->methods = &AdaptiveExecutorCustomScanMethods; + plan->custom_private = list_make1(distributedPlan); + + plan->scan.plan.targetlist = tlist; + /* Reduce RestrictInfo list to bare expressions; ignore pseudoconstants */ + clauses = extract_actual_clauses(clauses, false); + + plan->scan.plan.qual = clauses; + plan->custom_exprs = clauses; + + return (Plan *) plan; +} + + +static List * +ReparameterizeDistributedUnion(PlannerInfo *root, + List *custom_private, + RelOptInfo *child_rel) +{ + return NIL; +} + + +/* + * IsDistributedUnion returns if the pathnode is a distributed union + */ +static bool +IsDistributedUnion(Path *path) +{ + if (!IsA(path, CustomPath)) + { + return false; + } + + CustomPath *cpath = castNode(CustomPath, path); + return cpath->methods == &distributedUnionMethods; +} + + +static uint32 +ColocationGroupForDistributedUnion(Path *path) +{ + Assert(IsDistributedUnion(path)); + CustomPath *distUnion = castNode(CustomPath, path); + /* TODO actually retreive the right colocation id for the Distributed Union */ + return 1; +} + + +void +PathBasedPlannerRelationHook(PlannerInfo *root, RelOptInfo *relOptInfo, Index restrictionIndex, RangeTblEntry *rte) +{ + if (!IsDistributedTable(rte->relid)) + { + /* table accessed is not distributed, no paths to change */ + return; + } + + /* wrap every path with a distirbuted union custom path */ + ListCell *pathCell = NULL; + foreach(pathCell, relOptInfo->pathlist) + { + Path *originalPath = lfirst(pathCell); + pathCell->data.ptr_value = WrapTableAccessWithDistributedUnion(originalPath, rte); + } +} + + + + +static Node * +VarNoMutator(Node *expr, Index *varno) +{ + switch (nodeTag(expr)) + { + case T_Var: + { + Var *var = castNode(Var, expr); + if (var->varno == *varno) + { + /*nothing to change */ + return (Node *) var; + } + + return (Node *) makeVar( + *varno, + var->varattno, + var->vartype, + var->vartypmod, + var->varcollid, + var->varlevelsup + ); + } + + default: + { + return expression_tree_mutator(expr, (void*) VarNoMutator, varno); + } + } +} + + +static Query * +GetQueryFromPath(List *tlist, RangeTblEntry *rte, List *clauses) +{ + Query *q = makeNode(Query); + q->commandType = CMD_SELECT; + q->rtable = list_make1(rte); + + List *newTargetList = NIL; + TargetEntry *target = NULL; + foreach_ptr(target, tlist) + { + Index varno = 1; + TargetEntry *newTarget = (TargetEntry *) VarNoMutator((Node *)target, &varno); + newTargetList = lappend(newTargetList, newTarget); + } + + q->targetList = newTargetList; + q->jointree = makeNode(FromExpr); + RangeTblRef *rr = makeNode(RangeTblRef); + rr->rtindex = 1; + q->jointree->fromlist = list_make1(rr); + + List *quals = NIL; + + RestrictInfo *rinfo = NULL; + foreach_ptr(rinfo, clauses) + { + Node *clause = (Node *) rinfo->clause; + Index varno = 1; + clause = VarNoMutator(clause, &varno); + quals = lappend(quals, clause); + } + q->jointree->quals = (Node *) quals; + + return q; +} + + +static Index +VarnoFromFirstTargetEntry(List *tlist) +{ + TargetEntry *entry = linitial_node(TargetEntry, tlist); + Var *var = castNode(Var, entry->expr); + return var->varno; +} diff --git a/src/include/distributed/path_based_planner.h b/src/include/distributed/path_based_planner.h new file mode 100644 index 000000000..9e81ee3d1 --- /dev/null +++ b/src/include/distributed/path_based_planner.h @@ -0,0 +1,13 @@ +// +// Created by Nils Dijk on 17/01/2020. +// + +#ifndef CITUS_PATH_BASED_PLANNER_H +#define CITUS_PATH_BASED_PLANNER_H + +#include "nodes/parsenodes.h" +#include "nodes/pathnodes.h" + +extern void PathBasedPlannerRelationHook(PlannerInfo *root, RelOptInfo *relOptInfo, Index restrictionIndex, RangeTblEntry *rte); + +#endif //CITUS_PATH_BASED_PLANNER_H