Remove RTE reference from DU

moonshot/custom-path
Nils Dijk 2020-01-20 15:40:53 +01:00
parent 00083afeaa
commit 9acf41a9d9
No known key found for this signature in database
GPG Key ID: CA1177EF9434F241
1 changed files with 36 additions and 11 deletions

View File

@ -5,6 +5,7 @@
#include "distributed/citus_custom_scan.h" #include "distributed/citus_custom_scan.h"
#include "distributed/citus_ruleutils.h" #include "distributed/citus_ruleutils.h"
#include "distributed/colocation_utils.h"
#include "distributed/deparse_shard_query.h" #include "distributed/deparse_shard_query.h"
#include "distributed/listutils.h" #include "distributed/listutils.h"
#include "distributed/metadata_cache.h" #include "distributed/metadata_cache.h"
@ -21,9 +22,10 @@
static Plan * CreateDistributedUnionPlan(PlannerInfo *root, RelOptInfo *rel, struct CustomPath *best_path, List *tlist, List *clauses, List *custom_plans); static Plan * CreateDistributedUnionPlan(PlannerInfo *root, RelOptInfo *rel, struct CustomPath *best_path, List *tlist, List *clauses, List *custom_plans);
static List * ReparameterizeDistributedUnion(PlannerInfo *root, List *custom_private, RelOptInfo *child_rel); static List * ReparameterizeDistributedUnion(PlannerInfo *root, List *custom_private, RelOptInfo *child_rel);
static CustomPath * WrapTableAccessWithDistributedUnion(Path *originalPath, RangeTblEntry *rte); static CustomPath * WrapTableAccessWithDistributedUnion(Path *originalPath);
static Index VarnoFromFirstTargetEntry(List *tlist); static Index VarnoFromFirstTargetEntry(List *tlist);
static Query * GetQueryFromPath(PlannerInfo *root, Path *path, List *tlist, List *clauses); static Query * GetQueryFromPath(PlannerInfo *root, Path *path, List *tlist, List *clauses);
static List * ShardIntervalListToRelationShardList(List *shardIntervalList);
static bool IsDistributedUnion(Path *path); static bool IsDistributedUnion(Path *path);
static uint32 ColocationGroupForDistributedUnion(Path *path); static uint32 ColocationGroupForDistributedUnion(Path *path);
@ -36,7 +38,7 @@ const CustomPathMethods distributedUnionMethods = {
CustomPath * CustomPath *
WrapTableAccessWithDistributedUnion(Path *originalPath, RangeTblEntry *rte) WrapTableAccessWithDistributedUnion(Path *originalPath)
{ {
CustomPath *distUnion = makeNode(CustomPath); CustomPath *distUnion = makeNode(CustomPath);
distUnion->path.pathtype = T_CustomScan; distUnion->path.pathtype = T_CustomScan;
@ -51,7 +53,7 @@ WrapTableAccessWithDistributedUnion(Path *originalPath, RangeTblEntry *rte)
distUnion->methods = &distributedUnionMethods; distUnion->methods = &distributedUnionMethods;
distUnion->custom_private = list_make2(rte, originalPath); distUnion->custom_private = list_make1(originalPath);
return distUnion; return distUnion;
} }
@ -68,22 +70,26 @@ CreateDistributedUnionPlan(PlannerInfo *root,
Job *workerJob = CitusMakeNode(Job); Job *workerJob = CitusMakeNode(Job);
workerJob->jobId = UniqueJobId(); workerJob->jobId = UniqueJobId();
RangeTblEntry *rte = list_nth_node(RangeTblEntry, best_path->custom_private, 0); Path *originalPath = (Path *) list_nth(best_path->custom_private, 0);
Path *originalPath = (Path *) list_nth(best_path->custom_private, 1);
List* shardIntervalList = LoadShardIntervalList(rte->relid);
ShardInterval *shardInterval = NULL; ShardInterval *shardInterval = NULL;
Query *q = GetQueryFromPath(root, originalPath, tlist, clauses); Query *q = GetQueryFromPath(root, originalPath, tlist, clauses);
/*
* Assume shards are colocated, any shard should suffice for now to find the initial
* interval list
*/
/* TODO track colocation information on the Distributed Union node to fetch required information in a more optimal setting*/
RangeTblEntry *rte = linitial_node(RangeTblEntry, q->rtable);
List* shardIntervalList = LoadShardIntervalList(rte->relid);
int i = 0; int i = 0;
foreach_ptr(shardInterval, shardIntervalList) foreach_ptr(shardInterval, shardIntervalList)
{ {
RelationShard *rs = CitusMakeNode(RelationShard); List *colocatedShards = ColocatedShardIntervalList(shardInterval);
rs->relationId = rte->relid; List *relationShardList = ShardIntervalListToRelationShardList(colocatedShards);
rs->shardId = shardInterval->shardId;
Query *qc = copyObject(q); Query *qc = copyObject(q);
UpdateRelationToShardNames((Node *) qc, list_make1(rs)); UpdateRelationToShardNames((Node *) qc, relationShardList);
StringInfoData buf; StringInfoData buf;
initStringInfo(&buf); initStringInfo(&buf);
@ -119,6 +125,25 @@ CreateDistributedUnionPlan(PlannerInfo *root,
} }
static List *
ShardIntervalListToRelationShardList(List *shardIntervalList)
{
List *shardRelationList = NIL;
ShardInterval *shardInterval = NULL;
/* map the shard intervals to RelationShard */
foreach_ptr(shardInterval, shardIntervalList)
{
RelationShard *rs = CitusMakeNode(RelationShard);
rs->relationId = shardInterval->relationId;
rs->shardId = shardInterval->shardId;
shardRelationList = lappend(shardRelationList, rs);
}
return shardRelationList;
}
static List * static List *
ReparameterizeDistributedUnion(PlannerInfo *root, ReparameterizeDistributedUnion(PlannerInfo *root,
List *custom_private, List *custom_private,
@ -168,7 +193,7 @@ PathBasedPlannerRelationHook(PlannerInfo *root, RelOptInfo *relOptInfo, Index re
foreach(pathCell, relOptInfo->pathlist) foreach(pathCell, relOptInfo->pathlist)
{ {
Path *originalPath = lfirst(pathCell); Path *originalPath = lfirst(pathCell);
pathCell->data.ptr_value = WrapTableAccessWithDistributedUnion(originalPath, rte); pathCell->data.ptr_value = WrapTableAccessWithDistributedUnion(originalPath);
} }
} }