iterate on cost model

moonshot/custom-path
Nils Dijk 2021-11-29 14:06:18 +01:00
parent 0341d9fe80
commit a55331d934
No known key found for this signature in database
GPG Key ID: CA1177EF9434F241
3 changed files with 266 additions and 59 deletions

View File

@ -23,6 +23,7 @@
#include "nodes/pathnodes.h"
#include "nodes/pg_list.h"
#include "nodes/plannodes.h"
#include "nodes/print.h"
#include "optimizer/paramassign.h"
#include "optimizer/pathnode.h"
#include "optimizer/restrictinfo.h"
@ -40,6 +41,7 @@ typedef struct DistributedUnionPath
Path *worker_path;
uint32 colocationId;
List *distributionAttrs;
Expr *partitionValue;
/*
@ -64,15 +66,26 @@ typedef struct GeoScanPath
RangeTblEntry *rte;
} GeoScanPath;
/*
* CitusOperationCosts captures the cost overhead of citus operations.
* The total_cost includes the startup_cost, hence it can simply be added/subtracted.
*/
typedef struct CitusOperationCosts
{
Cost startup_cost;
Cost total_cost;
} CitusOperationCosts;
static const CitusOperationCosts EmptyCitusOperationCosts = { 0 };
static Plan * CreateDistributedUnionPlan(PlannerInfo *root, RelOptInfo *rel, struct
CustomPath *best_path, List *tlist,
List *clauses, List *custom_plans);
static List * ReparameterizeDistributedUnion(PlannerInfo *root, List *custom_private,
RelOptInfo *child_rel);
static CustomPath * WrapTableAccessWithDistributedUnion(Path *originalPath, uint32
colocationId,
Expr *partitionValue, Oid
sampleRelid, List *subPaths);
static void AddCollectCostOverhead(CitusOperationCosts *overhead, Path *originalPath);
static Path * WrapTableAccessWithDistributedUnion(Path *originalPath, uint32 colocationId, List *distributionAttrs, Expr *partitionValue, Oid sampleRelid, List *subPaths);
static void AddRepartitionCostOverhead(CitusOperationCosts *overhead, Path *originalPath);
static Path * CreateRepartitionNode(uint32 colocationId, Path *worker_path);
static Query * GetQueryFromPath(PlannerInfo *root, Path *path, List *tlist,
List *clauses, Index **varnoMapping);
@ -110,6 +123,14 @@ static RangeTblEntry * makeRangeTableEntryForRelation(Relation rel,
*/
bool EnableBroadcastJoin = true;
Cost CollectStartupCost = 1000;
Cost CollectPerRowCost = .01;
Cost CollectPerMBCost = .1;
Cost RepartitionStartupCost = 1000;
Cost RepartitionPerRowCost = .000;
Cost RepartitionPerMBCost = .01;
/* list of functions that will be called to optimized in the joinhook*/
static optimizeFn joinOptimizations[] = {
OptimizeJoinPath,
@ -135,9 +156,47 @@ const CustomPathMethods repartitionMethods = {
};
static CustomPath *
/*
* AddCollectCostOverhead adds the overhead costs for the Collect Operation. The values
* are added tot the CitusOperationCosts datastructure. This allows for compounding the
* cost of multiple operations before using the cost overhead in calculations.
*
* TODO understand if we should take the global rowcount or the per shardgroup rowcount
*/
static void
AddCollectCostOverhead(CitusOperationCosts *overhead, Path *originalPath)
{
const double mbPerRow = originalPath->pathtarget->width / (1024.0 * 1024.0);
const double shards = 4; /* TODO get these from planner colocation information */
overhead->startup_cost += CollectStartupCost;
overhead->total_cost +=
CollectStartupCost
+ (CollectPerRowCost * originalPath->rows)
+ (CollectPerMBCost * originalPath->rows * mbPerRow);
}
static void
AddRepartitionCostOverhead(CitusOperationCosts *overhead, Path *originalPath)
{
const double mbPerRow = originalPath->pathtarget->width / (1024.0 * 1024.0);
const double shards = 4; /* TODO get these from planner colocation information */
/* TODO scaling crosss transfer simply by number of shards is probably not correct */
overhead->startup_cost += RepartitionStartupCost;
overhead->total_cost += RepartitionStartupCost
+ (RepartitionPerRowCost * originalPath->rows / shards)
+ (RepartitionPerMBCost * originalPath->rows / shards * mbPerRow);
};
static Path *
WrapTableAccessWithDistributedUnion(Path *originalPath, uint32 colocationId,
Expr *partitionValue, Oid sampleRelid, List *subPaths)
List *distributionAttrs, Expr *partitionValue,
Oid sampleRelid, List *subPaths)
{
DistributedUnionPath *distUnion = (DistributedUnionPath *)
newNode(sizeof(DistributedUnionPath), T_CustomPath);
@ -147,21 +206,33 @@ WrapTableAccessWithDistributedUnion(Path *originalPath, uint32 colocationId,
distUnion->custom_path.path.pathtarget = originalPath->pathtarget;
distUnion->custom_path.path.param_info = originalPath->param_info;
/* TODO use a better cost model */
distUnion->custom_path.path.rows = originalPath->rows;
distUnion->custom_path.path.startup_cost = originalPath->startup_cost + 1000;
distUnion->custom_path.path.total_cost = originalPath->total_cost + 1000;
/*
* TODO if pathkeys are set on the Collect we might need to add a sort clause to the
* worker query and implement mergesort in the executor.
*/
distUnion->custom_path.path.pathkeys = originalPath->pathkeys;
/* cost is based on overhead calculations estimated in AddCollectCostOverhead */
CitusOperationCosts costOverhead = {0};
AddCollectCostOverhead(&costOverhead, originalPath);
distUnion->custom_path.path.startup_cost =
originalPath->startup_cost + costOverhead.startup_cost;
distUnion->custom_path.path.total_cost =
originalPath->total_cost + costOverhead.total_cost;
distUnion->custom_path.path.rows = originalPath->rows;
/* for rows we trust the estimates of postgres */
distUnion->custom_path.methods = &distributedUnionMethods;
distUnion->worker_path = originalPath;
distUnion->custom_path.custom_private = list_make1(originalPath);
distUnion->custom_path.custom_private = list_make2(distributionAttrs, originalPath);
distUnion->colocationId = colocationId;
distUnion->distributionAttrs = distributionAttrs;
distUnion->partitionValue = partitionValue;
distUnion->sampleRelid = sampleRelid;
distUnion->custom_path.custom_paths = subPaths;
return (CustomPath *) distUnion;
return (Path *) distUnion;
}
@ -256,6 +327,8 @@ CreateDistributedUnionPlan(PlannerInfo *root,
ShardInterval *shardInterval = NULL;
pprint(distUnion);
Index *varnoMapping = NULL; /* store mapping back for outerrel checks */
Query *q = GetQueryFromPath(root, distUnion->worker_path, tlist, clauses,
&varnoMapping);
@ -472,9 +545,10 @@ PathBasedPlannerRelationHook(PlannerInfo *root,
foreach(pathCell, relOptInfo->pathlist)
{
Path *originalPath = lfirst(pathCell);
CustomPath *wrappedPath = WrapTableAccessWithDistributedUnion(
Path *wrappedPath = WrapTableAccessWithDistributedUnion(
originalPath,
TableColocationId(rte->relid),
list_make1(partitionKey),
partitionValue,
rte->relid,
NIL);
@ -503,6 +577,7 @@ PathBasedPlannerRelationHook(PlannerInfo *root,
geoPath = (Path *)
WrapTableAccessWithDistributedUnion(geoPath,
TableColocationId(geoRelid),
NIL,
NULL,
geoRelid,
NIL);
@ -949,9 +1024,10 @@ GeoOverlapJoin(PlannerInfo *root, Path *originalPath)
/* TODO add grouping */
Path *newPath = (Path *) WrapTableAccessWithDistributedUnion(
Path *newPath = WrapTableAccessWithDistributedUnion(
(Path *) jpath,
match.innerDistUnion->colocationId,
match.innerDistUnion->distributionAttrs,
match.innerDistUnion->partitionValue,
match.innerDistUnion->sampleRelid,
NIL); /* TODO is this ok? */
@ -1014,13 +1090,19 @@ OptimizeJoinPath(PlannerInfo *root, Path *originalPath)
jcpath->innerjoinpath = innerDU->worker_path;
jcpath->outerjoinpath = outerDU->worker_path;
/* TODO update costs of hashjoin, very naive removal of DU cost for now */
jcpath->path.startup_cost -= 2000; /* remove the double dist union cost */
jcpath->path.total_cost -= 2000; /* remove the double dist union cost */
CitusOperationCosts costOverhead = {0};
AddCollectCostOverhead(&costOverhead, innerDU->worker_path);
AddCollectCostOverhead(&costOverhead, outerDU->worker_path);
Path *newPath = (Path *) WrapTableAccessWithDistributedUnion(
/* TODO update costs of hashjoin, very naive removal of DU cost for now */
jcpath->path.startup_cost -= costOverhead.startup_cost;
jcpath->path.total_cost -= costOverhead.total_cost;
Path *newPath = WrapTableAccessWithDistributedUnion(
(Path *) jcpath,
baseDistUnion->colocationId,
list_concat(list_copy(outerDU->distributionAttrs),
innerDU->distributionAttrs),
baseDistUnion->partitionValue,
baseDistUnion->sampleRelid,
baseDistUnion->custom_path.custom_paths);
@ -1098,23 +1180,25 @@ OptimizeRepartitionInnerJoinPath(PlannerInfo *root, Path *originalPath)
*
* Once we know on which attribute to repartition the inner part we can create a
* new tree in the following shape:
* +------------------------------------+
* | Collect |
* | - ColocationID: outer.ColocationID |
* +------------------------------------+
* |
* +---------+
* | Join |
* +---------+
* / \
* +-------------------+ +------------------------------------+
* | oruter.worke_path | | Repartition |
* +-------------------+ | - ColocationID: outer.colocationID |
* +------------------------------------+
* +------------------------------------+
* | Collect |
* | - ColocationID: outer.ColocationID |
* +------------------------------------+
* |
* +---------+
* | Join |
* +---------+
* / \
* +--------------------+ +------------------------------------+
* | outer.worker_path | | Repartition |
* +--------------------+ | - ColocationID: outer.colocationID |
* +------------------------------------+
* |
* +-------------------+
* | inner.worker_path |
* +-------------------+
*
* And with the inner and outer paths swapped
*/
/* create new Join node */
@ -1131,23 +1215,69 @@ OptimizeRepartitionInnerJoinPath(PlannerInfo *root, Path *originalPath)
/* TODO find a good way to calculate join costs based on its inner/outer paths */
/* subtract the double collect cost */
newJoinPath->path.startup_cost -= 2000;
newJoinPath->path.total_cost -= 2000;
CitusOperationCosts overhead = EmptyCitusOperationCosts;
AddCollectCostOverhead(&overhead, outerDU->worker_path);
AddCollectCostOverhead(&overhead, innerDU->worker_path);
newJoinPath->path.startup_cost -= overhead.startup_cost;
newJoinPath->path.total_cost -= overhead.total_cost;
/* add the costs for the repartition */
newJoinPath->path.startup_cost += 500;
newJoinPath->path.total_cost += 500;
overhead = EmptyCitusOperationCosts;
AddRepartitionCostOverhead(&overhead, innerDU->worker_path);
newJoinPath->path.startup_cost += overhead.startup_cost;
newJoinPath->path.total_cost += overhead.total_cost;
/* create Collect on top of new join, base Collect on matched outer Collect */
const DistributedUnionPath *baseDistUnion = outerDU;
Path *newPath = (Path *) WrapTableAccessWithDistributedUnion(
DistributedUnionPath *baseDistUnion = outerDU;
Path *newPath = WrapTableAccessWithDistributedUnion(
(Path *) newJoinPath,
baseDistUnion->colocationId,
baseDistUnion->distributionAttrs,
baseDistUnion->partitionValue,
baseDistUnion->sampleRelid,
baseDistUnion->custom_path.custom_paths);
return list_make1(newPath);
List *newPaths = NIL;
newPaths = lappend(newPaths, newPath);
/* now with the inner and outer swapped */
/* create new Join node */
newJoinPath = makeNode(NestPath);
*newJoinPath = *joinPath;
newJoinPath->path.type = T_NestPath; /* reset type after copied join data */
/* TODO understand how to describe on which attribute the Repartition needs to happen */
newJoinPath->outerjoinpath = CreateRepartitionNode(innerDU->colocationId,
outerDU->worker_path);
newJoinPath->innerjoinpath = innerDU->worker_path;
/* TODO find a good way to calculate join costs based on its inner/outer paths */
/* subtract the double collect cost */
overhead = EmptyCitusOperationCosts;
AddCollectCostOverhead(&overhead, outerDU->worker_path);
AddCollectCostOverhead(&overhead, innerDU->worker_path);
newJoinPath->path.startup_cost -= overhead.startup_cost;
newJoinPath->path.total_cost -= overhead.total_cost;
/* add the costs for the repartition */
overhead = EmptyCitusOperationCosts;
AddRepartitionCostOverhead(&overhead, outerDU->worker_path);
newJoinPath->path.startup_cost += overhead.startup_cost;
newJoinPath->path.total_cost += overhead.total_cost;
/* create Collect on top of new join, base Collect on matched outer Collect */
baseDistUnion = innerDU;
newPath = WrapTableAccessWithDistributedUnion(
(Path *) newJoinPath,
baseDistUnion->colocationId,
baseDistUnion->distributionAttrs,
baseDistUnion->partitionValue,
baseDistUnion->sampleRelid,
baseDistUnion->custom_path.custom_paths);
newPaths = lappend(newPaths, newPath);
return newPaths;
}
return NIL;
@ -1165,10 +1295,13 @@ CreateRepartitionNode(uint32 colocationId, Path *worker_path)
repartition->custom_path.path.pathtarget = worker_path->pathtarget;
repartition->custom_path.path.param_info = worker_path->param_info;
/* TODO use a better cost model */
CitusOperationCosts overhead = { 0 };
AddRepartitionCostOverhead(&overhead, worker_path);
repartition->custom_path.path.rows = worker_path->rows;
repartition->custom_path.path.startup_cost = worker_path->startup_cost + 500;
repartition->custom_path.path.total_cost = worker_path->total_cost + 500;
repartition->custom_path.path.startup_cost =
worker_path->startup_cost + overhead.startup_cost;
repartition->custom_path.path.total_cost =
worker_path->total_cost + overhead.total_cost;
repartition->custom_path.methods = &repartitionMethods;
@ -1218,9 +1351,10 @@ BroadcastOuterJoinPath(PlannerInfo *root, Path *originalPath)
jcpath->path.startup_cost -= 1500;
jcpath->path.total_cost -= 1500;
Path *newPath = (Path *) WrapTableAccessWithDistributedUnion(
Path *newPath = WrapTableAccessWithDistributedUnion(
(Path *) jcpath,
baseDistUnion->colocationId,
baseDistUnion->distributionAttrs,
baseDistUnion->partitionValue,
baseDistUnion->sampleRelid,
lappend(list_copy(baseDistUnion->custom_path.custom_paths), subPath));
@ -1277,9 +1411,10 @@ BroadcastInnerJoinPath(PlannerInfo *root, Path *originalPath)
jcpath->path.startup_cost -= 1500;
jcpath->path.total_cost -= 1500;
Path *newPath = (Path *) WrapTableAccessWithDistributedUnion(
Path *newPath = WrapTableAccessWithDistributedUnion(
(Path *) jcpath,
baseDistUnion->colocationId,
baseDistUnion->distributionAttrs,
baseDistUnion->partitionValue,
baseDistUnion->sampleRelid,
lappend(list_copy(baseDistUnion->custom_path.custom_paths), subPath));
@ -1754,9 +1889,10 @@ OptimizeGroupAgg(PlannerInfo *root, Path *originalPath)
apath->path.startup_cost -= 1000;
apath->path.total_cost -= 1000;
return (Path *) WrapTableAccessWithDistributedUnion(
return WrapTableAccessWithDistributedUnion(
(Path *) apath,
distUnion->colocationId,
distUnion->distributionAttrs,
distUnion->partitionValue,
distUnion->sampleRelid,
distUnion->custom_path.custom_paths);
@ -1772,6 +1908,21 @@ OptimizeGroupAgg(PlannerInfo *root, Path *originalPath)
}
static bool
VarInList(List *varList, Var *var)
{
Var *varEntry = NULL;
foreach_ptr(varEntry, varList)
{
if (varEntry->varno == var->varno && varEntry->varattno == var->varattno)
{
return true;
}
}
return false;
}
static bool
CanOptimizeAggPath(PlannerInfo *root, AggPath *apath)
{
@ -1788,6 +1939,7 @@ CanOptimizeAggPath(PlannerInfo *root, AggPath *apath)
*/
return false;
}
DistributedUnionPath *distUnion = (DistributedUnionPath *) apath->subpath;
SortGroupClause *sgc = NULL;
@ -1816,22 +1968,8 @@ CanOptimizeAggPath(PlannerInfo *root, AggPath *apath)
}
Var *targetVar = castNode(Var, targetExpr);
Index rteIndex = targetVar->varno;
RangeTblEntry *rte = root->simple_rte_array[rteIndex];
CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(rte->relid);
if (cacheEntry->partitionColumn == NULL)
if (VarInList(distUnion->distributionAttrs, targetVar))
{
/* a table that is not distributed by a particular column, reference table? */
continue;
}
if (cacheEntry->partitionColumn->varattno == targetVar->varattno)
{
/*
* grouping column contains the distribution column of a distributed
* table, safe to optimize
*/
return true;
}
}

View File

@ -91,6 +91,7 @@
#include "utils/guc.h"
#include "utils/guc_tables.h"
#include "utils/varlena.h"
#include "optimizer/cost.h"
#include "columnar/mod.h"
@ -1199,6 +1200,66 @@ RegisterCitusConfigVariables(void)
GUC_STANDARD,
NULL, NULL, NULL);
DefineCustomRealVariable(
"citus.path_based_planner_cost_collect_startup",
gettext_noop("Static cost used to start a Collect operation"),
NULL,
&CollectStartupCost,
1000, 0, disable_cost,
PGC_USERSET,
GUC_STANDARD,
NULL,NULL,NULL);
DefineCustomRealVariable(
"citus.path_based_planner_cost_collect_row",
gettext_noop("Cost per row for Collect operation"),
NULL,
&CollectPerRowCost,
.01, 0, disable_cost,
PGC_USERSET,
GUC_STANDARD,
NULL,NULL,NULL);
DefineCustomRealVariable(
"citus.path_based_planner_cost_collect_mb",
gettext_noop("Cost per megabyte for Collect operation"),
NULL,
&CollectPerMBCost,
.1, 0, disable_cost,
PGC_USERSET,
GUC_STANDARD,
NULL,NULL,NULL);
DefineCustomRealVariable(
"citus.path_based_planner_cost_repartition_startup",
gettext_noop("Static cost used to start a Repartition operation"),
NULL,
&RepartitionStartupCost,
1000, 0, disable_cost,
PGC_USERSET,
GUC_STANDARD,
NULL,NULL,NULL);
DefineCustomRealVariable(
"citus.path_based_planner_cost_repartition_row",
gettext_noop("Cost per row for Repartition operation"),
NULL,
&RepartitionPerRowCost,
.000, 0, disable_cost,
PGC_USERSET,
GUC_STANDARD,
NULL,NULL,NULL);
DefineCustomRealVariable(
"citus.path_based_planner_cost_repartition_mb",
gettext_noop("Cost per megabyte for Repartition operation"),
NULL,
&RepartitionPerMBCost,
.01, 0, disable_cost,
PGC_USERSET,
GUC_STANDARD,
NULL,NULL,NULL);
DefineCustomIntVariable(
"citus.local_shared_pool_size",
gettext_noop(

View File

@ -10,6 +10,14 @@
extern bool EnableBroadcastJoin;
extern Cost CollectStartupCost;
extern Cost CollectPerRowCost;
extern Cost CollectPerMBCost;
extern Cost RepartitionStartupCost;
extern Cost RepartitionPerRowCost;
extern Cost RepartitionPerMBCost;
extern void PathBasedPlannerRelationHook(PlannerInfo *root,
RelOptInfo *relOptInfo,
Index restrictionIndex,