From 8f7037da63e536ac9f12fc2eeb062aaedfdc9406 Mon Sep 17 00:00:00 2001 From: Nils Dijk Date: Tue, 7 Jan 2020 17:57:09 +0100 Subject: [PATCH] beginning of set_rel_pathlist_hook to plan query on distributed table --- .../distributed/planner/distributed_planner.c | 117 ++++++++++++++++++ src/backend/distributed/shared_library_init.c | 10 ++ src/include/distributed/distributed_planner.h | 2 +- 3 files changed, 128 insertions(+), 1 deletion(-) diff --git a/src/backend/distributed/planner/distributed_planner.c b/src/backend/distributed/planner/distributed_planner.c index 639b22f47..261381175 100644 --- a/src/backend/distributed/planner/distributed_planner.c +++ b/src/backend/distributed/planner/distributed_planner.c @@ -15,6 +15,7 @@ #include #include +#include #include "access/htup_details.h" #include "catalog/pg_class.h" @@ -70,6 +71,7 @@ static uint64 NextPlanId = 1; /* keep track of planner call stack levels */ int PlannerLevel = 0; +bool UseCustomPath = false; static bool ListContainsDistributedTableRTE(List *rangeTableList); static bool IsUpdateOrDelete(Query *query); @@ -134,6 +136,11 @@ distributed_planner(Query *parse, int cursorOptions, ParamListInfo boundParams) { + if (UseCustomPath) + { + return standard_planner(parse, cursorOptions, boundParams); + } + bool needsDistributedPlanning = false; bool fastPathRouterQuery = false; Node *distributionKeyValue = NULL; @@ -1680,6 +1687,12 @@ multi_join_restriction_hook(PlannerInfo *root, JoinType jointype, JoinPathExtraData *extra) { + if (UseCustomPath) + { + // TODO implement the replacement of path nodes with distributed union + return; + } + if (bms_is_empty(innerrel->relids) || bms_is_empty(outerrel->relids)) { /* @@ -1734,6 +1747,91 @@ multi_join_restriction_hook(PlannerInfo *root, } +static Plan * +CreateDistributedUnionPlan(PlannerInfo *root, + RelOptInfo *rel, + struct CustomPath *best_path, + List *tlist, + List *clauses, + List *custom_plans) +{ + Job *workerJob = CitusMakeNode(Job); + workerJob->jobId = UniqueJobId(); + + RangeTblEntry *rte = list_nth_node(RangeTblEntry, best_path->custom_private, 0); + List* shardIntervalList = LoadShardIntervalList(rte->relid); + ShardInterval *shardInterval = NULL; + + int i = 0; + foreach_ptr(shardInterval, shardIntervalList) + { + StringInfo sqlQueryString = makeStringInfo(); + appendStringInfo(sqlQueryString, "SELECT a,b FROM t1_%lu", shardInterval->shardId); + Task *sqlTask = CreateBasicTask(workerJob->jobId, i, SELECT_TASK, sqlQueryString->data); + sqlTask->anchorShardId = shardInterval->shardId; + sqlTask->taskPlacementList = FinalizedShardPlacementList(shardInterval->shardId); + workerJob->taskList = lappend(workerJob->taskList, sqlTask); + i++; + } + + DistributedPlan *distributedPlan = CitusMakeNode(DistributedPlan); + distributedPlan->workerJob = workerJob; + distributedPlan->modLevel = ROW_MODIFY_READONLY; + distributedPlan->relationIdList = list_make1_oid(rte->relid); + distributedPlan->hasReturning = true; + + CustomScan *plan = makeNode(CustomScan); + plan->scan.scanrelid = 1; + plan->flags = best_path->flags; + plan->methods = &AdaptiveExecutorCustomScanMethods; + plan->custom_private = list_make1(distributedPlan); + + plan->scan.plan.targetlist = list_make2( + makeTargetEntry((Expr *) makeVar(1, 1, INT4OID, -1, 0, 0), 1, "a", false), + makeTargetEntry((Expr *) makeVar(1, 2, INT4OID, -1, 0, 0), 2, "b", false) + ); + + return (Plan *) plan; +} + + +static List * +ReparameterizeDistributedUnion(PlannerInfo *root, + List *custom_private, + RelOptInfo *child_rel) +{ + return NIL; +} + +const CustomPathMethods distributedUnionMethods = { + .CustomName = "Distributed Union", + .PlanCustomPath = CreateDistributedUnionPlan, + .ReparameterizeCustomPathByChild = ReparameterizeDistributedUnion +}; + +static CustomPath * +WrapTableAccessWithDistributedUnion(Path *originalPath, RangeTblEntry *rte) +{ + CustomPath *distUnion = makeNode(CustomPath); + distUnion->path.pathtype = T_CustomScan; + distUnion->path.parent = originalPath->parent; + distUnion->path.pathtarget = originalPath->pathtarget; + distUnion->path.param_info = originalPath->param_info; + + + /* TODO use a better cost model */ + distUnion->path.rows = originalPath->rows; + distUnion->path.startup_cost = originalPath->startup_cost; + distUnion->path.total_cost = originalPath->total_cost; + + distUnion->methods = &distributedUnionMethods; + + distUnion->custom_private = list_make1(rte); + + return distUnion; +} + + /* * multi_relation_restriction_hook is a hook called by postgresql standard planner * to notify us about various planning information regarding a relation. We use @@ -1743,6 +1841,25 @@ void multi_relation_restriction_hook(PlannerInfo *root, RelOptInfo *relOptInfo, Index restrictionIndex, RangeTblEntry *rte) { + if (UseCustomPath) + { + if (!IsDistributedTable(rte->relid)) + { + /* table accessed is not distributed, no paths to change */ + return; + } + + /* wrap every path with a distirbuted union custom path */ + ListCell *pathCell = NULL; + foreach(pathCell, relOptInfo->pathlist) + { + Path *originalPath = lfirst(pathCell); + pathCell->data.ptr_value = WrapTableAccessWithDistributedUnion(originalPath, rte); + } + // TODO implement the replacement of path nodes with distributed union + return; + } + CitusTableCacheEntry *cacheEntry = NULL; if (ReplaceCitusExtraDataContainer && IsCitusExtraDataContainerRelation(rte)) diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index d6a1005e9..2b4a09266 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -1133,6 +1133,16 @@ RegisterCitusConfigVariables(void) GUC_UNIT_BYTE | GUC_NO_SHOW_ALL, NULL, NULL, NULL); + DefineCustomBoolVariable( + "citus.use_custom_path", + gettext_noop("Replaces citus' planner with a custom path in the standard planner"), + NULL, + &UseCustomPath, + false, + PGC_USERSET, + GUC_STANDARD, + NULL, NULL, NULL); + DefineCustomStringVariable( "citus.local_hostname", gettext_noop("Sets the hostname when connecting back to itself."), diff --git a/src/include/distributed/distributed_planner.h b/src/include/distributed/distributed_planner.h index c74679e7f..0a03b8a5c 100644 --- a/src/include/distributed/distributed_planner.h +++ b/src/include/distributed/distributed_planner.h @@ -32,7 +32,7 @@ /* level of planner calls */ extern int PlannerLevel; - +extern bool UseCustomPath; typedef struct RelationRestrictionContext {