From a55331d93464e4564ce42674b419aac4bc9e5f2f Mon Sep 17 00:00:00 2001 From: Nils Dijk Date: Mon, 29 Nov 2021 14:06:18 +0100 Subject: [PATCH] iterate on cost model --- .../distributed/planner/path_based_planner.c | 256 ++++++++++++++---- src/backend/distributed/shared_library_init.c | 61 +++++ src/include/distributed/path_based_planner.h | 8 + 3 files changed, 266 insertions(+), 59 deletions(-) diff --git a/src/backend/distributed/planner/path_based_planner.c b/src/backend/distributed/planner/path_based_planner.c index c55d8e320..b88bb26ce 100644 --- a/src/backend/distributed/planner/path_based_planner.c +++ b/src/backend/distributed/planner/path_based_planner.c @@ -23,6 +23,7 @@ #include "nodes/pathnodes.h" #include "nodes/pg_list.h" #include "nodes/plannodes.h" +#include "nodes/print.h" #include "optimizer/paramassign.h" #include "optimizer/pathnode.h" #include "optimizer/restrictinfo.h" @@ -40,6 +41,7 @@ typedef struct DistributedUnionPath Path *worker_path; uint32 colocationId; + List *distributionAttrs; Expr *partitionValue; /* @@ -64,15 +66,26 @@ typedef struct GeoScanPath RangeTblEntry *rte; } GeoScanPath; + +/* + * CitusOperationCosts captures the cost overhead of citus operations. + * The total_cost includes the startup_cost, hence it can simply be added/subtracted. + */ +typedef struct CitusOperationCosts +{ + Cost startup_cost; + Cost total_cost; +} CitusOperationCosts; +static const CitusOperationCosts EmptyCitusOperationCosts = { 0 }; + static Plan * CreateDistributedUnionPlan(PlannerInfo *root, RelOptInfo *rel, struct CustomPath *best_path, List *tlist, List *clauses, List *custom_plans); static List * ReparameterizeDistributedUnion(PlannerInfo *root, List *custom_private, RelOptInfo *child_rel); -static CustomPath * WrapTableAccessWithDistributedUnion(Path *originalPath, uint32 - colocationId, - Expr *partitionValue, Oid - sampleRelid, List *subPaths); +static void AddCollectCostOverhead(CitusOperationCosts *overhead, Path *originalPath); +static Path * WrapTableAccessWithDistributedUnion(Path *originalPath, uint32 colocationId, List *distributionAttrs, Expr *partitionValue, Oid sampleRelid, List *subPaths); +static void AddRepartitionCostOverhead(CitusOperationCosts *overhead, Path *originalPath); static Path * CreateRepartitionNode(uint32 colocationId, Path *worker_path); static Query * GetQueryFromPath(PlannerInfo *root, Path *path, List *tlist, List *clauses, Index **varnoMapping); @@ -110,6 +123,14 @@ static RangeTblEntry * makeRangeTableEntryForRelation(Relation rel, */ bool EnableBroadcastJoin = true; +Cost CollectStartupCost = 1000; +Cost CollectPerRowCost = .01; +Cost CollectPerMBCost = .1; + +Cost RepartitionStartupCost = 1000; +Cost RepartitionPerRowCost = .000; +Cost RepartitionPerMBCost = .01; + /* list of functions that will be called to optimized in the joinhook*/ static optimizeFn joinOptimizations[] = { OptimizeJoinPath, @@ -135,9 +156,47 @@ const CustomPathMethods repartitionMethods = { }; -static CustomPath * +/* + * AddCollectCostOverhead adds the overhead costs for the Collect Operation. The values + * are added tot the CitusOperationCosts datastructure. This allows for compounding the + * cost of multiple operations before using the cost overhead in calculations. + * + * TODO understand if we should take the global rowcount or the per shardgroup rowcount + */ +static void +AddCollectCostOverhead(CitusOperationCosts *overhead, Path *originalPath) +{ + const double mbPerRow = originalPath->pathtarget->width / (1024.0 * 1024.0); + const double shards = 4; /* TODO get these from planner colocation information */ + + + + overhead->startup_cost += CollectStartupCost; + overhead->total_cost += + CollectStartupCost + + (CollectPerRowCost * originalPath->rows) + + (CollectPerMBCost * originalPath->rows * mbPerRow); +} + + +static void +AddRepartitionCostOverhead(CitusOperationCosts *overhead, Path *originalPath) +{ + const double mbPerRow = originalPath->pathtarget->width / (1024.0 * 1024.0); + const double shards = 4; /* TODO get these from planner colocation information */ + + /* TODO scaling crosss transfer simply by number of shards is probably not correct */ + overhead->startup_cost += RepartitionStartupCost; + overhead->total_cost += RepartitionStartupCost + + (RepartitionPerRowCost * originalPath->rows / shards) + + (RepartitionPerMBCost * originalPath->rows / shards * mbPerRow); +}; + + +static Path * WrapTableAccessWithDistributedUnion(Path *originalPath, uint32 colocationId, - Expr *partitionValue, Oid sampleRelid, List *subPaths) + List *distributionAttrs, Expr *partitionValue, + Oid sampleRelid, List *subPaths) { DistributedUnionPath *distUnion = (DistributedUnionPath *) newNode(sizeof(DistributedUnionPath), T_CustomPath); @@ -147,21 +206,33 @@ WrapTableAccessWithDistributedUnion(Path *originalPath, uint32 colocationId, distUnion->custom_path.path.pathtarget = originalPath->pathtarget; distUnion->custom_path.path.param_info = originalPath->param_info; - /* TODO use a better cost model */ - distUnion->custom_path.path.rows = originalPath->rows; - distUnion->custom_path.path.startup_cost = originalPath->startup_cost + 1000; - distUnion->custom_path.path.total_cost = originalPath->total_cost + 1000; + /* + * TODO if pathkeys are set on the Collect we might need to add a sort clause to the + * worker query and implement mergesort in the executor. + */ + distUnion->custom_path.path.pathkeys = originalPath->pathkeys; + /* cost is based on overhead calculations estimated in AddCollectCostOverhead */ + CitusOperationCosts costOverhead = {0}; + AddCollectCostOverhead(&costOverhead, originalPath); + distUnion->custom_path.path.startup_cost = + originalPath->startup_cost + costOverhead.startup_cost; + distUnion->custom_path.path.total_cost = + originalPath->total_cost + costOverhead.total_cost; + distUnion->custom_path.path.rows = originalPath->rows; + + /* for rows we trust the estimates of postgres */ distUnion->custom_path.methods = &distributedUnionMethods; distUnion->worker_path = originalPath; - distUnion->custom_path.custom_private = list_make1(originalPath); + distUnion->custom_path.custom_private = list_make2(distributionAttrs, originalPath); distUnion->colocationId = colocationId; + distUnion->distributionAttrs = distributionAttrs; distUnion->partitionValue = partitionValue; distUnion->sampleRelid = sampleRelid; distUnion->custom_path.custom_paths = subPaths; - return (CustomPath *) distUnion; + return (Path *) distUnion; } @@ -256,6 +327,8 @@ CreateDistributedUnionPlan(PlannerInfo *root, ShardInterval *shardInterval = NULL; + pprint(distUnion); + Index *varnoMapping = NULL; /* store mapping back for outerrel checks */ Query *q = GetQueryFromPath(root, distUnion->worker_path, tlist, clauses, &varnoMapping); @@ -472,9 +545,10 @@ PathBasedPlannerRelationHook(PlannerInfo *root, foreach(pathCell, relOptInfo->pathlist) { Path *originalPath = lfirst(pathCell); - CustomPath *wrappedPath = WrapTableAccessWithDistributedUnion( + Path *wrappedPath = WrapTableAccessWithDistributedUnion( originalPath, TableColocationId(rte->relid), + list_make1(partitionKey), partitionValue, rte->relid, NIL); @@ -503,6 +577,7 @@ PathBasedPlannerRelationHook(PlannerInfo *root, geoPath = (Path *) WrapTableAccessWithDistributedUnion(geoPath, TableColocationId(geoRelid), + NIL, NULL, geoRelid, NIL); @@ -949,9 +1024,10 @@ GeoOverlapJoin(PlannerInfo *root, Path *originalPath) /* TODO add grouping */ - Path *newPath = (Path *) WrapTableAccessWithDistributedUnion( + Path *newPath = WrapTableAccessWithDistributedUnion( (Path *) jpath, match.innerDistUnion->colocationId, + match.innerDistUnion->distributionAttrs, match.innerDistUnion->partitionValue, match.innerDistUnion->sampleRelid, NIL); /* TODO is this ok? */ @@ -1014,13 +1090,19 @@ OptimizeJoinPath(PlannerInfo *root, Path *originalPath) jcpath->innerjoinpath = innerDU->worker_path; jcpath->outerjoinpath = outerDU->worker_path; - /* TODO update costs of hashjoin, very naive removal of DU cost for now */ - jcpath->path.startup_cost -= 2000; /* remove the double dist union cost */ - jcpath->path.total_cost -= 2000; /* remove the double dist union cost */ + CitusOperationCosts costOverhead = {0}; + AddCollectCostOverhead(&costOverhead, innerDU->worker_path); + AddCollectCostOverhead(&costOverhead, outerDU->worker_path); - Path *newPath = (Path *) WrapTableAccessWithDistributedUnion( + /* TODO update costs of hashjoin, very naive removal of DU cost for now */ + jcpath->path.startup_cost -= costOverhead.startup_cost; + jcpath->path.total_cost -= costOverhead.total_cost; + + Path *newPath = WrapTableAccessWithDistributedUnion( (Path *) jcpath, baseDistUnion->colocationId, + list_concat(list_copy(outerDU->distributionAttrs), + innerDU->distributionAttrs), baseDistUnion->partitionValue, baseDistUnion->sampleRelid, baseDistUnion->custom_path.custom_paths); @@ -1098,23 +1180,25 @@ OptimizeRepartitionInnerJoinPath(PlannerInfo *root, Path *originalPath) * * Once we know on which attribute to repartition the inner part we can create a * new tree in the following shape: - * +------------------------------------+ - * | Collect | - * | - ColocationID: outer.ColocationID | - * +------------------------------------+ - * | - * +---------+ - * | Join | - * +---------+ - * / \ - * +-------------------+ +------------------------------------+ - * | oruter.worke_path | | Repartition | - * +-------------------+ | - ColocationID: outer.colocationID | - * +------------------------------------+ + * +------------------------------------+ + * | Collect | + * | - ColocationID: outer.ColocationID | + * +------------------------------------+ + * | + * +---------+ + * | Join | + * +---------+ + * / \ + * +--------------------+ +------------------------------------+ + * | outer.worker_path | | Repartition | + * +--------------------+ | - ColocationID: outer.colocationID | + * +------------------------------------+ * | * +-------------------+ * | inner.worker_path | * +-------------------+ + * + * And with the inner and outer paths swapped */ /* create new Join node */ @@ -1131,23 +1215,69 @@ OptimizeRepartitionInnerJoinPath(PlannerInfo *root, Path *originalPath) /* TODO find a good way to calculate join costs based on its inner/outer paths */ /* subtract the double collect cost */ - newJoinPath->path.startup_cost -= 2000; - newJoinPath->path.total_cost -= 2000; + CitusOperationCosts overhead = EmptyCitusOperationCosts; + AddCollectCostOverhead(&overhead, outerDU->worker_path); + AddCollectCostOverhead(&overhead, innerDU->worker_path); + newJoinPath->path.startup_cost -= overhead.startup_cost; + newJoinPath->path.total_cost -= overhead.total_cost; /* add the costs for the repartition */ - newJoinPath->path.startup_cost += 500; - newJoinPath->path.total_cost += 500; + overhead = EmptyCitusOperationCosts; + AddRepartitionCostOverhead(&overhead, innerDU->worker_path); + newJoinPath->path.startup_cost += overhead.startup_cost; + newJoinPath->path.total_cost += overhead.total_cost; /* create Collect on top of new join, base Collect on matched outer Collect */ - const DistributedUnionPath *baseDistUnion = outerDU; - Path *newPath = (Path *) WrapTableAccessWithDistributedUnion( + DistributedUnionPath *baseDistUnion = outerDU; + Path *newPath = WrapTableAccessWithDistributedUnion( (Path *) newJoinPath, baseDistUnion->colocationId, + baseDistUnion->distributionAttrs, baseDistUnion->partitionValue, baseDistUnion->sampleRelid, baseDistUnion->custom_path.custom_paths); - return list_make1(newPath); + List *newPaths = NIL; + newPaths = lappend(newPaths, newPath); + + /* now with the inner and outer swapped */ + /* create new Join node */ + newJoinPath = makeNode(NestPath); + *newJoinPath = *joinPath; + newJoinPath->path.type = T_NestPath; /* reset type after copied join data */ + + /* TODO understand how to describe on which attribute the Repartition needs to happen */ + newJoinPath->outerjoinpath = CreateRepartitionNode(innerDU->colocationId, + outerDU->worker_path); + + newJoinPath->innerjoinpath = innerDU->worker_path; + + /* TODO find a good way to calculate join costs based on its inner/outer paths */ + /* subtract the double collect cost */ + overhead = EmptyCitusOperationCosts; + AddCollectCostOverhead(&overhead, outerDU->worker_path); + AddCollectCostOverhead(&overhead, innerDU->worker_path); + newJoinPath->path.startup_cost -= overhead.startup_cost; + newJoinPath->path.total_cost -= overhead.total_cost; + + /* add the costs for the repartition */ + overhead = EmptyCitusOperationCosts; + AddRepartitionCostOverhead(&overhead, outerDU->worker_path); + newJoinPath->path.startup_cost += overhead.startup_cost; + newJoinPath->path.total_cost += overhead.total_cost; + + /* create Collect on top of new join, base Collect on matched outer Collect */ + baseDistUnion = innerDU; + newPath = WrapTableAccessWithDistributedUnion( + (Path *) newJoinPath, + baseDistUnion->colocationId, + baseDistUnion->distributionAttrs, + baseDistUnion->partitionValue, + baseDistUnion->sampleRelid, + baseDistUnion->custom_path.custom_paths); + newPaths = lappend(newPaths, newPath); + + return newPaths; } return NIL; @@ -1165,10 +1295,13 @@ CreateRepartitionNode(uint32 colocationId, Path *worker_path) repartition->custom_path.path.pathtarget = worker_path->pathtarget; repartition->custom_path.path.param_info = worker_path->param_info; - /* TODO use a better cost model */ + CitusOperationCosts overhead = { 0 }; + AddRepartitionCostOverhead(&overhead, worker_path); repartition->custom_path.path.rows = worker_path->rows; - repartition->custom_path.path.startup_cost = worker_path->startup_cost + 500; - repartition->custom_path.path.total_cost = worker_path->total_cost + 500; + repartition->custom_path.path.startup_cost = + worker_path->startup_cost + overhead.startup_cost; + repartition->custom_path.path.total_cost = + worker_path->total_cost + overhead.total_cost; repartition->custom_path.methods = &repartitionMethods; @@ -1218,9 +1351,10 @@ BroadcastOuterJoinPath(PlannerInfo *root, Path *originalPath) jcpath->path.startup_cost -= 1500; jcpath->path.total_cost -= 1500; - Path *newPath = (Path *) WrapTableAccessWithDistributedUnion( + Path *newPath = WrapTableAccessWithDistributedUnion( (Path *) jcpath, baseDistUnion->colocationId, + baseDistUnion->distributionAttrs, baseDistUnion->partitionValue, baseDistUnion->sampleRelid, lappend(list_copy(baseDistUnion->custom_path.custom_paths), subPath)); @@ -1277,9 +1411,10 @@ BroadcastInnerJoinPath(PlannerInfo *root, Path *originalPath) jcpath->path.startup_cost -= 1500; jcpath->path.total_cost -= 1500; - Path *newPath = (Path *) WrapTableAccessWithDistributedUnion( + Path *newPath = WrapTableAccessWithDistributedUnion( (Path *) jcpath, baseDistUnion->colocationId, + baseDistUnion->distributionAttrs, baseDistUnion->partitionValue, baseDistUnion->sampleRelid, lappend(list_copy(baseDistUnion->custom_path.custom_paths), subPath)); @@ -1754,9 +1889,10 @@ OptimizeGroupAgg(PlannerInfo *root, Path *originalPath) apath->path.startup_cost -= 1000; apath->path.total_cost -= 1000; - return (Path *) WrapTableAccessWithDistributedUnion( + return WrapTableAccessWithDistributedUnion( (Path *) apath, distUnion->colocationId, + distUnion->distributionAttrs, distUnion->partitionValue, distUnion->sampleRelid, distUnion->custom_path.custom_paths); @@ -1772,6 +1908,21 @@ OptimizeGroupAgg(PlannerInfo *root, Path *originalPath) } +static bool +VarInList(List *varList, Var *var) +{ + Var *varEntry = NULL; + foreach_ptr(varEntry, varList) + { + if (varEntry->varno == var->varno && varEntry->varattno == var->varattno) + { + return true; + } + } + return false; +} + + static bool CanOptimizeAggPath(PlannerInfo *root, AggPath *apath) { @@ -1788,6 +1939,7 @@ CanOptimizeAggPath(PlannerInfo *root, AggPath *apath) */ return false; } + DistributedUnionPath *distUnion = (DistributedUnionPath *) apath->subpath; SortGroupClause *sgc = NULL; @@ -1816,22 +1968,8 @@ CanOptimizeAggPath(PlannerInfo *root, AggPath *apath) } Var *targetVar = castNode(Var, targetExpr); - Index rteIndex = targetVar->varno; - RangeTblEntry *rte = root->simple_rte_array[rteIndex]; - - CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(rte->relid); - if (cacheEntry->partitionColumn == NULL) + if (VarInList(distUnion->distributionAttrs, targetVar)) { - /* a table that is not distributed by a particular column, reference table? */ - continue; - } - - if (cacheEntry->partitionColumn->varattno == targetVar->varattno) - { - /* - * grouping column contains the distribution column of a distributed - * table, safe to optimize - */ return true; } } diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index 258d8ab16..ad50f72e0 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -91,6 +91,7 @@ #include "utils/guc.h" #include "utils/guc_tables.h" #include "utils/varlena.h" +#include "optimizer/cost.h" #include "columnar/mod.h" @@ -1199,6 +1200,66 @@ RegisterCitusConfigVariables(void) GUC_STANDARD, NULL, NULL, NULL); + DefineCustomRealVariable( + "citus.path_based_planner_cost_collect_startup", + gettext_noop("Static cost used to start a Collect operation"), + NULL, + &CollectStartupCost, + 1000, 0, disable_cost, + PGC_USERSET, + GUC_STANDARD, + NULL,NULL,NULL); + + DefineCustomRealVariable( + "citus.path_based_planner_cost_collect_row", + gettext_noop("Cost per row for Collect operation"), + NULL, + &CollectPerRowCost, + .01, 0, disable_cost, + PGC_USERSET, + GUC_STANDARD, + NULL,NULL,NULL); + + DefineCustomRealVariable( + "citus.path_based_planner_cost_collect_mb", + gettext_noop("Cost per megabyte for Collect operation"), + NULL, + &CollectPerMBCost, + .1, 0, disable_cost, + PGC_USERSET, + GUC_STANDARD, + NULL,NULL,NULL); + + DefineCustomRealVariable( + "citus.path_based_planner_cost_repartition_startup", + gettext_noop("Static cost used to start a Repartition operation"), + NULL, + &RepartitionStartupCost, + 1000, 0, disable_cost, + PGC_USERSET, + GUC_STANDARD, + NULL,NULL,NULL); + + DefineCustomRealVariable( + "citus.path_based_planner_cost_repartition_row", + gettext_noop("Cost per row for Repartition operation"), + NULL, + &RepartitionPerRowCost, + .000, 0, disable_cost, + PGC_USERSET, + GUC_STANDARD, + NULL,NULL,NULL); + + DefineCustomRealVariable( + "citus.path_based_planner_cost_repartition_mb", + gettext_noop("Cost per megabyte for Repartition operation"), + NULL, + &RepartitionPerMBCost, + .01, 0, disable_cost, + PGC_USERSET, + GUC_STANDARD, + NULL,NULL,NULL); + DefineCustomIntVariable( "citus.local_shared_pool_size", gettext_noop( diff --git a/src/include/distributed/path_based_planner.h b/src/include/distributed/path_based_planner.h index 25cd5082c..43c48cb31 100644 --- a/src/include/distributed/path_based_planner.h +++ b/src/include/distributed/path_based_planner.h @@ -10,6 +10,14 @@ extern bool EnableBroadcastJoin; +extern Cost CollectStartupCost; +extern Cost CollectPerRowCost; +extern Cost CollectPerMBCost; + +extern Cost RepartitionStartupCost; +extern Cost RepartitionPerRowCost; +extern Cost RepartitionPerMBCost; + extern void PathBasedPlannerRelationHook(PlannerInfo *root, RelOptInfo *relOptInfo, Index restrictionIndex,