From 9acf41a9d96ac143119aa7018bed9ae2a5702339 Mon Sep 17 00:00:00 2001 From: Nils Dijk Date: Mon, 20 Jan 2020 15:40:53 +0100 Subject: [PATCH] Remove RTE reference from DU --- .../distributed/planner/path_based_planner.c | 47 ++++++++++++++----- 1 file changed, 36 insertions(+), 11 deletions(-) diff --git a/src/backend/distributed/planner/path_based_planner.c b/src/backend/distributed/planner/path_based_planner.c index bc6a05389..55fbef747 100644 --- a/src/backend/distributed/planner/path_based_planner.c +++ b/src/backend/distributed/planner/path_based_planner.c @@ -5,6 +5,7 @@ #include "distributed/citus_custom_scan.h" #include "distributed/citus_ruleutils.h" +#include "distributed/colocation_utils.h" #include "distributed/deparse_shard_query.h" #include "distributed/listutils.h" #include "distributed/metadata_cache.h" @@ -21,9 +22,10 @@ static Plan * CreateDistributedUnionPlan(PlannerInfo *root, RelOptInfo *rel, struct CustomPath *best_path, List *tlist, List *clauses, List *custom_plans); static List * ReparameterizeDistributedUnion(PlannerInfo *root, List *custom_private, RelOptInfo *child_rel); -static CustomPath * WrapTableAccessWithDistributedUnion(Path *originalPath, RangeTblEntry *rte); +static CustomPath * WrapTableAccessWithDistributedUnion(Path *originalPath); static Index VarnoFromFirstTargetEntry(List *tlist); static Query * GetQueryFromPath(PlannerInfo *root, Path *path, List *tlist, List *clauses); +static List * ShardIntervalListToRelationShardList(List *shardIntervalList); static bool IsDistributedUnion(Path *path); static uint32 ColocationGroupForDistributedUnion(Path *path); @@ -36,7 +38,7 @@ const CustomPathMethods distributedUnionMethods = { CustomPath * -WrapTableAccessWithDistributedUnion(Path *originalPath, RangeTblEntry *rte) +WrapTableAccessWithDistributedUnion(Path *originalPath) { CustomPath *distUnion = makeNode(CustomPath); distUnion->path.pathtype = T_CustomScan; @@ -51,7 +53,7 @@ WrapTableAccessWithDistributedUnion(Path *originalPath, RangeTblEntry *rte) distUnion->methods = &distributedUnionMethods; - distUnion->custom_private = list_make2(rte, originalPath); + distUnion->custom_private = list_make1(originalPath); return distUnion; } @@ -68,22 +70,26 @@ CreateDistributedUnionPlan(PlannerInfo *root, Job *workerJob = CitusMakeNode(Job); workerJob->jobId = UniqueJobId(); - RangeTblEntry *rte = list_nth_node(RangeTblEntry, best_path->custom_private, 0); - Path *originalPath = (Path *) list_nth(best_path->custom_private, 1); - List* shardIntervalList = LoadShardIntervalList(rte->relid); + Path *originalPath = (Path *) list_nth(best_path->custom_private, 0); ShardInterval *shardInterval = NULL; Query *q = GetQueryFromPath(root, originalPath, tlist, clauses); + /* + * Assume shards are colocated, any shard should suffice for now to find the initial + * interval list + */ + /* TODO track colocation information on the Distributed Union node to fetch required information in a more optimal setting*/ + RangeTblEntry *rte = linitial_node(RangeTblEntry, q->rtable); + List* shardIntervalList = LoadShardIntervalList(rte->relid); int i = 0; foreach_ptr(shardInterval, shardIntervalList) { - RelationShard *rs = CitusMakeNode(RelationShard); - rs->relationId = rte->relid; - rs->shardId = shardInterval->shardId; + List *colocatedShards = ColocatedShardIntervalList(shardInterval); + List *relationShardList = ShardIntervalListToRelationShardList(colocatedShards); Query *qc = copyObject(q); - UpdateRelationToShardNames((Node *) qc, list_make1(rs)); + UpdateRelationToShardNames((Node *) qc, relationShardList); StringInfoData buf; initStringInfo(&buf); @@ -119,6 +125,25 @@ CreateDistributedUnionPlan(PlannerInfo *root, } +static List * +ShardIntervalListToRelationShardList(List *shardIntervalList) +{ + List *shardRelationList = NIL; + ShardInterval *shardInterval = NULL; + + /* map the shard intervals to RelationShard */ + foreach_ptr(shardInterval, shardIntervalList) + { + RelationShard *rs = CitusMakeNode(RelationShard); + rs->relationId = shardInterval->relationId; + rs->shardId = shardInterval->shardId; + shardRelationList = lappend(shardRelationList, rs); + } + + return shardRelationList; +} + + static List * ReparameterizeDistributedUnion(PlannerInfo *root, List *custom_private, @@ -168,7 +193,7 @@ PathBasedPlannerRelationHook(PlannerInfo *root, RelOptInfo *relOptInfo, Index re foreach(pathCell, relOptInfo->pathlist) { Path *originalPath = lfirst(pathCell); - pathCell->data.ptr_value = WrapTableAccessWithDistributedUnion(originalPath, rte); + pathCell->data.ptr_value = WrapTableAccessWithDistributedUnion(originalPath); } }