diff --git a/src/backend/distributed/planner/path_based_planner.c b/src/backend/distributed/planner/path_based_planner.c index eba3960e7..2efb91ffc 100644 --- a/src/backend/distributed/planner/path_based_planner.c +++ b/src/backend/distributed/planner/path_based_planner.c @@ -3,10 +3,12 @@ // #include "postgres.h" +#include "catalog/pg_type_d.h" #include "distributed/citus_custom_scan.h" #include "distributed/citus_ruleutils.h" #include "distributed/colocation_utils.h" #include "distributed/deparse_shard_query.h" +#include "distributed/intermediate_result_pruning.h" #include "distributed/listutils.h" #include "distributed/metadata_cache.h" #include "distributed/multi_physical_planner.h" @@ -20,15 +22,19 @@ #include "nodes/plannodes.h" #include "optimizer/pathnode.h" #include "optimizer/restrictinfo.h" +#include "utils/builtins.h" -typedef Path * (*optimizeFn)(Path *originalPath); +typedef List * (*optimizeFn)(Path *originalPath); static Plan * CreateDistributedUnionPlan(PlannerInfo *root, RelOptInfo *rel, struct CustomPath *best_path, List *tlist, List *clauses, List *custom_plans); static List * ReparameterizeDistributedUnion(PlannerInfo *root, List *custom_private, RelOptInfo *child_rel); -static CustomPath * WrapTableAccessWithDistributedUnion(Path *originalPath, uint32 colocationId, Expr *partitionValue, Oid sampleRelid); +static CustomPath * WrapTableAccessWithDistributedUnion(Path *originalPath, uint32 colocationId, Expr *partitionValue, Oid sampleRelid, List *subPaths); static Query * GetQueryFromPath(PlannerInfo *root, Path *path, List *tlist, List *clauses); static List * ShardIntervalListToRelationShardList(List *shardIntervalList); -static Path * OptimizeJoinPath(Path *originalPath); +static List * OptimizeJoinPath(Path *originalPath); +static List * BroadcastOuterJoinPath(Path *originalPath); +static List * BroadcastInnerJoinPath(Path *originalPath); +static Path * CreateReadIntermediateResultPath(const Path *originalPath); static bool CanOptimizeJoinPath(const JoinPath *jpath); static bool IsDistributedUnion(Path *path); static Expr * ExtractPartitionValue(List *restrictionList, Var *partitionKey); @@ -37,9 +43,20 @@ static void PathBasedPlannerGroupAgg(PlannerInfo *root, RelOptInfo *input_rel, R static Path * OptimizeGroupAgg(PlannerInfo *root, Path *originalPath); static bool CanOptimizeAggPath(PlannerInfo *root, AggPath *apath); + +/* + * TODO some optimizations are useless if others are already provided. This might cause + * excessive path creation causing performance problems. Depending on the amount of + * optimizations to be added we can keep a bitmask indicating for every entry to skip if + * the index of a preceding successful optimization is in the bitmap. + */ +bool EnableBroadcastJoin = true; + /* list of functions that will be called to optimized in the joinhook*/ static optimizeFn joinOptimizations[] = { OptimizeJoinPath, + BroadcastOuterJoinPath, + BroadcastInnerJoinPath, }; typedef struct DistributedUnionPath @@ -68,7 +85,7 @@ const CustomPathMethods distributedUnionMethods = { static CustomPath * -WrapTableAccessWithDistributedUnion(Path *originalPath, uint32 colocationId, Expr *partitionValue, Oid sampleRelid) +WrapTableAccessWithDistributedUnion(Path *originalPath, uint32 colocationId, Expr *partitionValue, Oid sampleRelid, List *subPaths) { DistributedUnionPath *distUnion = (DistributedUnionPath *) newNode(sizeof(DistributedUnionPath), T_CustomPath); @@ -89,6 +106,7 @@ WrapTableAccessWithDistributedUnion(Path *originalPath, uint32 colocationId, Exp distUnion->colocationId = colocationId; distUnion->partitionValue = partitionValue; distUnion->sampleRelid = sampleRelid; + distUnion->custom_path.custom_paths = subPaths; return (CustomPath *) distUnion; } @@ -139,6 +157,7 @@ CreateDistributedUnionPlan(PlannerInfo *root, workerJob->taskList = lappend(workerJob->taskList, sqlTask); i++; } + workerJob->jobQuery = q; DistributedPlan *distributedPlan = CitusMakeNode(DistributedPlan); distributedPlan->workerJob = workerJob; @@ -146,12 +165,40 @@ CreateDistributedUnionPlan(PlannerInfo *root, distributedPlan->relationIdList = list_make1_oid(distUnion->sampleRelid); distributedPlan->hasReturning = true; + Plan *subPlan = NULL; + int subPlanCount = 0; + foreach_ptr(subPlan, custom_plans) + { + PlannedStmt *result = makeNode(PlannedStmt); + result->commandType = CMD_SELECT; + result->planTree = subPlan; + List *rtable = NIL; + for (i = 1; i < root->simple_rel_array_size; i++) + { + RangeTblEntry *rte = root->simple_rte_array[i]; + rtable = lappend(rtable, rte); + } + rtable = lappend(rtable, root->simple_rte_array[1]); + result->rtable = rtable; + + /* 1 indexed */ + subPlanCount++; + DistributedSubPlan *dsubPlan = CitusMakeNode(DistributedSubPlan); + dsubPlan->plan = result; + dsubPlan->subPlanId = subPlanCount; + + distributedPlan->subPlanList = lappend(distributedPlan->subPlanList, dsubPlan); + } + + distributedPlan->usedSubPlanNodeList = FindSubPlanUsages(distributedPlan); + CustomScan *plan = makeNode(CustomScan); plan->scan.scanrelid = 0; plan->custom_scan_tlist = tlist; plan->flags = best_path->flags; plan->methods = &AdaptiveExecutorCustomScanMethods; plan->custom_private = list_make1(distributedPlan); + plan->custom_plans = custom_plans; plan->scan.plan.targetlist = tlist; /* Reduce RestrictInfo list to bare expressions; ignore pseudoconstants */ @@ -259,7 +306,8 @@ PathBasedPlannerRelationHook(PlannerInfo *root, WrapTableAccessWithDistributedUnion(originalPath, TableColocationId(rte->relid), partitionValue, - rte->relid); + rte->relid, + NIL); } } @@ -336,7 +384,7 @@ CanOptimizeJoinPath(const JoinPath *jpath) } -static Path * +static List * OptimizeJoinPath(Path *originalPath) { switch (originalPath->pathtype) @@ -366,22 +414,158 @@ OptimizeJoinPath(Path *originalPath) jcpath->path.startup_cost -= 2000; /* remove the double dist union cost */ jcpath->path.total_cost -= 2000; /* remove the double dist union cost */ - return (Path *) WrapTableAccessWithDistributedUnion( + Path *newPath = (Path *) WrapTableAccessWithDistributedUnion( (Path *) jcpath, baseDistUnion->colocationId, baseDistUnion->partitionValue, - baseDistUnion->sampleRelid); + baseDistUnion->sampleRelid, + baseDistUnion->custom_path.custom_paths); + + return list_make1(newPath); } } default: { - return NULL; + return NIL; } } } +static List * +BroadcastOuterJoinPath(Path *originalPath) +{ + if (!EnableBroadcastJoin) + { + return NIL; + } + + switch (originalPath->pathtype) + { + case T_NestLoop: + case T_HashJoin: + { + const JoinPath *jpath = (JoinPath *) originalPath; + List *newPaths = NIL; + + if (IsDistributedUnion(jpath->outerjoinpath)) + { + /* broadcast inner join path */ + DistributedUnionPath *baseDistUnion = (DistributedUnionPath *) jpath->outerjoinpath; + + /* + * Shallow copy of any join node, this does not imply executing a nested + * join, but the nested join contains all the information we need to send + * the join to the worker + */ + JoinPath *jcpath = makeNode(NestPath); + *jcpath = *jpath; + jcpath->path.type = T_NestPath; + + jcpath->outerjoinpath = baseDistUnion->worker_path; + Path *subPath = jcpath->innerjoinpath; + jcpath->innerjoinpath = CreateReadIntermediateResultPath(subPath); + + /* TODO update costs of hashjoin, very naife removal of DU cost for now */ + jcpath->path.startup_cost -= 1500; + jcpath->path.total_cost -= 1500; + + Path *newPath = (Path *) WrapTableAccessWithDistributedUnion( + (Path *) jcpath, + baseDistUnion->colocationId, + baseDistUnion->partitionValue, + baseDistUnion->sampleRelid, + lappend(list_copy(baseDistUnion->custom_path.custom_paths), subPath)); + newPaths = lappend(newPaths, newPath); + } + + return newPaths; + } + + default: + { + return NIL; + } + } +} + + +static List * +BroadcastInnerJoinPath(Path *originalPath) +{ + if (!EnableBroadcastJoin) + { + return NIL; + } + + switch (originalPath->pathtype) + { + case T_NestLoop: + case T_HashJoin: + { + const JoinPath *jpath = (JoinPath *) originalPath; + List *newPaths = NIL; + + if (IsDistributedUnion(jpath->innerjoinpath)) + { + /* broadcast inner join path */ + DistributedUnionPath *baseDistUnion = (DistributedUnionPath *) jpath->innerjoinpath; + + /* + * Shallow copy of any join node, this does not imply executing a nested + * join, but the nested join contains all the information we need to send + * the join to the worker + */ + JoinPath *jcpath = makeNode(NestPath); + *jcpath = *jpath; + jcpath->path.type = T_NestPath; + + jcpath->innerjoinpath = baseDistUnion->worker_path; + Path *subPath = jcpath->outerjoinpath; + jcpath->outerjoinpath = CreateReadIntermediateResultPath(subPath); + + /* TODO update costs of hashjoin, very naife removal of DU cost for now */ + jcpath->path.startup_cost -= 1500; + jcpath->path.total_cost -= 1500; + + Path *newPath = (Path *) WrapTableAccessWithDistributedUnion( + (Path *) jcpath, + baseDistUnion->colocationId, + baseDistUnion->partitionValue, + baseDistUnion->sampleRelid, + lappend(list_copy(baseDistUnion->custom_path.custom_paths), subPath)); + newPaths = lappend(newPaths, newPath); + } + + return newPaths; + } + + default: + { + return NIL; + } + } +} + + +static Path * +CreateReadIntermediateResultPath(const Path *originalPath) +{ + /* TODO might require a custom path for read intermediate result */ + Path *path = makeNode(Path); + path->pathtype = T_FunctionScan; + path->parent = originalPath->parent; + path->pathtarget = originalPath->pathtarget; + + /* TODO some network cost to be modelled */ + path->total_cost = originalPath->total_cost + 500; + path->startup_cost = originalPath->startup_cost + 500; + + return path; +} + + void PathBasedPlannerJoinHook(PlannerInfo *root, RelOptInfo *joinrel, @@ -403,11 +587,8 @@ PathBasedPlannerJoinHook(PlannerInfo *root, Path *originalPath = lfirst(pathCell); for (int i=0; i < sizeof(joinOptimizations)/sizeof(joinOptimizations[1]); i++) { - Path *alternativePath = joinOptimizations[i](originalPath); - if (alternativePath) - { - newPaths = lappend(newPaths, alternativePath); - } + List *alternativePaths = joinOptimizations[i](originalPath); + newPaths = list_concat(newPaths, alternativePaths); } } @@ -552,6 +733,93 @@ ApplyPathToQuery(PlannerInfo *root, Query *query, Path *path, PathQueryInfo *inf break; } + /* TODO temporary placeholder for read_intermediate_result*/ + case T_FunctionScan: + { + Oid functionOid = CitusReadIntermediateResultFuncId(); + + /* result_id text */ + Const *resultIdConst = makeNode(Const); + resultIdConst->consttype = TEXTOID; + resultIdConst->consttypmod = -1; + resultIdConst->constlen = -1; + resultIdConst->constvalue = CStringGetTextDatum("0_1"); + resultIdConst->constbyval = false; + resultIdConst->constisnull = false; + resultIdConst->location = -1; + + /* format citus_copy_format DEFAULT 'csv'::citus_copy_format */ + Oid copyFormatId = BinaryCopyFormatId(); + Const *resultFormatConst = makeNode(Const); + resultFormatConst->consttype = CitusCopyFormatTypeId(); + resultFormatConst->consttypmod = -1; + resultFormatConst->constlen = 4; + resultFormatConst->constvalue = ObjectIdGetDatum(copyFormatId); + resultFormatConst->constbyval = true; + resultFormatConst->constisnull = false; + resultFormatConst->location = -1; + + /* build the call to read_intermediate_result */ + FuncExpr *funcExpr = makeNode(FuncExpr); + funcExpr->funcid = functionOid; + funcExpr->funcretset = true; + funcExpr->funcvariadic = false; + funcExpr->funcformat = 0; + funcExpr->funccollid = 0; + funcExpr->inputcollid = 0; + funcExpr->location = -1; + funcExpr->args = list_make2(resultIdConst, resultFormatConst); + + List *funcColNames = NIL; + List *funcColTypes = NIL; + List *funcColTypMods = NIL; + List *funcColCollations = NIL; + Node *expr = NULL; + foreach_ptr(expr, path->pathtarget->exprs) + { + Oid colType = exprType(expr); + Oid colCollation = exprCollation(expr); + int32 colTypeMod = exprTypmod(expr); + + funcColNames = lappend(funcColNames, makeString("t1.b")); /* TODO resolve actual name */ + funcColTypes = lappend_oid(funcColTypes, colType); + funcColTypMods = lappend_oid(funcColTypMods, colTypeMod); + funcColCollations = lappend_int(funcColCollations, colCollation); + } + + /* build the RTE for the call to read_intermediate_result */ + RangeTblFunction *rangeTableFunction = makeNode(RangeTblFunction); + rangeTableFunction->funccolcount = list_length(funcColNames); + rangeTableFunction->funccolnames = funcColNames; + rangeTableFunction->funccoltypes = funcColTypes; + rangeTableFunction->funccoltypmods = funcColTypMods; + rangeTableFunction->funccolcollations = funcColCollations; + rangeTableFunction->funcparams = NULL; + rangeTableFunction->funcexpr = (Node *) funcExpr; + + Alias *funcAlias = makeNode(Alias); + funcAlias->aliasname = "Distributed Subplan 0_1"; + funcAlias->colnames = funcColNames; + + RangeTblEntry *rangeTableEntry = makeNode(RangeTblEntry); + rangeTableEntry->rtekind = RTE_FUNCTION; + rangeTableEntry->functions = list_make1(rangeTableFunction); + rangeTableEntry->inFromCl = true; + rangeTableEntry->eref = funcAlias; + + /* add the RangeTableEntry */ + query->rtable = lappend(query->rtable, rangeTableEntry); + Index rteIndex = list_length(query->rtable); + Index scan_relid = path->parent->relid; + info->varno_mapping[scan_relid] = rteIndex; + + RangeTblRef *rr = makeNode(RangeTblRef); + rr->rtindex = rteIndex; + query->jointree->fromlist = lappend(query->jointree->fromlist, rr); + + break; + } + default: { ereport(ERROR, (errmsg("unknow path type in worker query"), @@ -682,7 +950,8 @@ OptimizeGroupAgg(PlannerInfo *root, Path *originalPath) (Path *) apath, distUnion->colocationId, distUnion->partitionValue, - distUnion->sampleRelid); + distUnion->sampleRelid, + distUnion->custom_path.custom_paths); } } diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index 45999f53b..754882b75 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -1159,6 +1159,16 @@ RegisterCitusConfigVariables(void) GUC_STANDARD, NULL, NULL, NULL); + DefineCustomBoolVariable( + "citus.use_custom_path_broadcast_join", + gettext_noop("Allow broadcast joins to be used during path based planning"), + NULL, + &EnableBroadcastJoin, + true, + PGC_USERSET, + GUC_STANDARD, + NULL, NULL, NULL); + DefineCustomIntVariable( "citus.local_shared_pool_size", gettext_noop( diff --git a/src/include/distributed/path_based_planner.h b/src/include/distributed/path_based_planner.h index 15c04ebed..25cd5082c 100644 --- a/src/include/distributed/path_based_planner.h +++ b/src/include/distributed/path_based_planner.h @@ -8,6 +8,8 @@ #include "nodes/parsenodes.h" #include "nodes/pathnodes.h" +extern bool EnableBroadcastJoin; + extern void PathBasedPlannerRelationHook(PlannerInfo *root, RelOptInfo *relOptInfo, Index restrictionIndex,