diff --git a/src/backend/distributed/planner/path_based_planner.c b/src/backend/distributed/planner/path_based_planner.c index 2fe83a278..aa3edaecb 100644 --- a/src/backend/distributed/planner/path_based_planner.c +++ b/src/backend/distributed/planner/path_based_planner.c @@ -22,7 +22,7 @@ static Plan * CreateDistributedUnionPlan(PlannerInfo *root, RelOptInfo *rel, struct CustomPath *best_path, List *tlist, List *clauses, List *custom_plans); static List * ReparameterizeDistributedUnion(PlannerInfo *root, List *custom_private, RelOptInfo *child_rel); -static CustomPath * WrapTableAccessWithDistributedUnion(Path *originalPath, uint32 colocationId, Expr *partitionValue); +static CustomPath * WrapTableAccessWithDistributedUnion(Path *originalPath, uint32 colocationId, Expr *partitionValue, Oid sampleRelid); static Index VarnoFromFirstTargetEntry(List *tlist); static Query * GetQueryFromPath(PlannerInfo *root, Path *path, List *tlist, List *clauses); static List * ShardIntervalListToRelationShardList(List *shardIntervalList); @@ -31,6 +31,9 @@ static bool CanOptimizeJoinPath(JoinPath *jpath); static bool IsDistributedUnion(Path *path); static Expr * ExtractPartitionValue(List *restrictionList, Var *partitionKey); static List * ShardIntervalListForRelationPartitionValue(Oid relationId, Expr *partitionValue); +static void PathBasedPlannerGroupAgg(PlannerInfo *root, RelOptInfo *input_rel, RelOptInfo *output_rel, void *extra); +static Path * OptimizeGroupAgg(PlannerInfo *root, Path *originalPath); +static bool CanOptimizeAggPath(PlannerInfo *root, AggPath *apath); typedef struct DistributedUnionPath { @@ -41,6 +44,13 @@ typedef struct DistributedUnionPath uint32 colocationId; Expr *partitionValue; + + /* + * \due to a misabstraction in citus we need to keep track of a relation id that this + * union maps to. Idealy we would perform our pruning actions on the colocation id but + * we need a shard. + */ + Oid sampleRelid; } DistributedUnionPath; const CustomPathMethods distributedUnionMethods = { @@ -51,7 +61,7 @@ const CustomPathMethods distributedUnionMethods = { static CustomPath * -WrapTableAccessWithDistributedUnion(Path *originalPath, uint32 colocationId, Expr *partitionValue) +WrapTableAccessWithDistributedUnion(Path *originalPath, uint32 colocationId, Expr *partitionValue, Oid sampleRelid) { DistributedUnionPath *distUnion = (DistributedUnionPath *) newNode(sizeof(DistributedUnionPath), T_CustomPath); @@ -71,6 +81,7 @@ WrapTableAccessWithDistributedUnion(Path *originalPath, uint32 colocationId, Exp distUnion->worker_path = originalPath; distUnion->colocationId = colocationId; distUnion->partitionValue = partitionValue; + distUnion->sampleRelid = sampleRelid; return (CustomPath *) distUnion; } @@ -98,10 +109,8 @@ CreateDistributedUnionPlan(PlannerInfo *root, */ /* TODO track colocation information on the Distributed Union node to fetch required information in a more optimal setting*/ - int relIndex = bms_next_member(rel->relids, -1) -1; /* read the first range table entry used in this relation */ - RangeTblEntry *rte = list_nth_node(RangeTblEntry, q->rtable, relIndex); List *shardIntervalList = ShardIntervalListForRelationPartitionValue( - rte->relid, + distUnion->sampleRelid, distUnion->partitionValue); int i = 0; @@ -127,7 +136,7 @@ CreateDistributedUnionPlan(PlannerInfo *root, DistributedPlan *distributedPlan = CitusMakeNode(DistributedPlan); distributedPlan->workerJob = workerJob; distributedPlan->modLevel = ROW_MODIFY_READONLY; - distributedPlan->relationIdList = list_make1_oid(rte->relid); + distributedPlan->relationIdList = list_make1_oid(distUnion->sampleRelid); distributedPlan->hasReturning = true; CustomScan *plan = makeNode(CustomScan); @@ -241,7 +250,8 @@ PathBasedPlannerRelationHook(PlannerInfo *root, pathCell->data.ptr_value = WrapTableAccessWithDistributedUnion(originalPath, TableColocationId(rte->relid), - partitionValue); + partitionValue, + rte->relid); } } @@ -331,8 +341,6 @@ OptimizeJoinPath(Path *originalPath) { /* we can only optimize the Distributed union if the colocationId's are the same, taking any would suffice */ DistributedUnionPath *baseDistUnion = (DistributedUnionPath *) jpath->innerjoinpath; - uint32 colocationId = baseDistUnion->colocationId; - Expr *partitionValue = baseDistUnion->partitionValue; jpath->innerjoinpath = ((DistributedUnionPath *) jpath->innerjoinpath)->worker_path; jpath->outerjoinpath = ((DistributedUnionPath *) jpath->outerjoinpath)->worker_path; @@ -343,8 +351,9 @@ OptimizeJoinPath(Path *originalPath) return (Path *) WrapTableAccessWithDistributedUnion( (Path *) jpath, - colocationId, - partitionValue); + baseDistUnion->colocationId, + baseDistUnion->partitionValue, + baseDistUnion->sampleRelid); } } @@ -432,7 +441,18 @@ ApplyPathToQuery(PlannerInfo *root, Query *query, Path *path, PathQueryInfo *inf { switch (path->pathtype) { + case T_Agg: + { + AggPath *apath = castNode(AggPath, path); + /* the subpath needs to be applied before we can apply the grouping clause */ + ApplyPathToQuery(root, query, apath->subpath, info); + + query->groupClause = apath->groupClause; + break; + } + case T_IndexScan: + case T_IndexOnlyScan: case T_SeqScan: { /* @@ -553,3 +573,157 @@ VarnoFromFirstTargetEntry(List *tlist) Var *var = castNode(Var, entry->expr); return var->varno; } + + +void +PathBasedPlannedUpperPathHook(PlannerInfo *root, + UpperRelationKind stage, + RelOptInfo *input_rel, + RelOptInfo *output_rel, + void *extra) +{ + if (!UseCustomPath) + { + /* path based planner is turned off, don't do anything here */ + return; + } + + switch (stage) + { + case UPPERREL_GROUP_AGG: + { + PathBasedPlannerGroupAgg(root, input_rel, output_rel, extra); + return; + } + + default: + { + /* no optimizations implemented, beers for the one that removes this due to being unreachable */ + return; + } + } +} + + +static void +PathBasedPlannerGroupAgg(PlannerInfo *root, + RelOptInfo *input_rel, + RelOptInfo *output_rel, + void *extra) +{ + /* + * Here we want to find proof that the group by is right above a distributed union + * that is partitioned by the grouping key. If that is the case we can pull the + * distributed union above the aggregate which causes it to optimize the plan. + * + * TODO we just replace the plans for now, but during development we have encountered + * a plan that would be better if the grouping would not be pushed down. When the + * grouping is solely on a primary key the number of rows will stay the same, while + * the width will increase due to any aggregates that could be performed on the data. + * This plan has lower network traffic if the grouping would not be pushed down. + * Instead of replacing it would benefit the planner to add a new path according to + * the potential optimization of pushing down. If * would be + * taken into account in the cost of the plan this would cause magic to happen which + * we currently could not support. + */ + + ListCell *pathCell = NULL; + foreach(pathCell, output_rel->pathlist) + { + Path *originalPath = pathCell->data.ptr_value; + pathCell->data.ptr_value = OptimizeGroupAgg(root, originalPath); + } +} + + +static Path * +OptimizeGroupAgg(PlannerInfo *root, Path *originalPath) +{ + switch (originalPath->pathtype) + { + case T_Agg: + { + AggPath *apath = castNode(AggPath, originalPath); + if (CanOptimizeAggPath(root, apath)) + { + DistributedUnionPath *distUnion = (DistributedUnionPath *) apath->subpath; + apath->subpath = distUnion->worker_path; + + /* TODO better cost model, for now substract the DU costs */ + apath->path.startup_cost -= 1000; + apath->path.total_cost -= 1000; + + return (Path *) WrapTableAccessWithDistributedUnion( + (Path *) apath, + distUnion->colocationId, + distUnion->partitionValue, + distUnion->sampleRelid); + } + } + + default: + { + /* no optimisations to be performed*/ + return originalPath; + } + } +} + + +static bool +CanOptimizeAggPath(PlannerInfo *root, AggPath *apath) +{ + if (apath->groupClause == NULL) + { + return false; + } + + if (!IsDistributedUnion(apath->subpath)) + { + /* + * we only can optimize if the path below is a distributed union that we can pull + * up, if the path below is not a distributed union we cannot optimize + */ + return false; + } + + SortGroupClause *sgc = NULL; + /* + * TODO verify whats the purpose of the list, if we find any of the distribution + * colums somewhere in this we optimize, might be wrong + */ + foreach_ptr(sgc, apath->groupClause) + { + PathKey *groupKey = list_nth_node(PathKey, root->group_pathkeys, sgc->tleSortGroupRef - 1); + EquivalenceMember *ec_member = NULL; + foreach_ptr(ec_member, groupKey->pk_eclass->ec_members) + { + if (!IsA(ec_member->em_expr, Var)) + { + continue; + } + + Var *ec_var = castNode(Var, ec_member->em_expr); + Index rteIndex = ec_var->varno; + RangeTblEntry *rte = root->simple_rte_array[rteIndex]; + + DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(rte->relid); + if (cacheEntry->partitionColumn == NULL) + { + /* a table that is not distributed by a particular column, reference table? */ + continue; + } + + if (cacheEntry->partitionColumn->varattno == ec_var->varno) + { + /* + * grouping column contains the distribution column of a distributed + * table, safe to optimize + */ + return true; + } + } + } + + return false; +} diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index 2b4a09266..45999f53b 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -55,6 +55,7 @@ #include "distributed/combine_query_planner.h" #include "distributed/multi_router_planner.h" #include "distributed/multi_server_executor.h" +#include "distributed/path_based_planner.h" #include "distributed/pg_dist_partition.h" #include "distributed/placement_connection.h" #include "distributed/recursive_planning.h" @@ -296,6 +297,8 @@ _PG_init(void) /* register for planner hook */ set_rel_pathlist_hook = multi_relation_restriction_hook; set_join_pathlist_hook = multi_join_restriction_hook; + create_upper_paths_hook = PathBasedPlannedUpperPathHook; + ExecutorStart_hook = CitusExecutorStart; ExecutorRun_hook = CitusExecutorRun; ExplainOneQuery_hook = CitusExplainOneQuery; diff --git a/src/include/distributed/path_based_planner.h b/src/include/distributed/path_based_planner.h index 8719b6f97..15c04ebed 100644 --- a/src/include/distributed/path_based_planner.h +++ b/src/include/distributed/path_based_planner.h @@ -18,5 +18,10 @@ extern void PathBasedPlannerJoinHook(PlannerInfo *root, RelOptInfo *innerrel, JoinType jointype, JoinPathExtraData *extra); +extern void PathBasedPlannedUpperPathHook(PlannerInfo *root, + UpperRelationKind stage, + RelOptInfo *input_rel, + RelOptInfo *output_rel, + void *extra); #endif //CITUS_PATH_BASED_PLANNER_H