diff --git a/src/backend/distributed/planner/path_based_planner.c b/src/backend/distributed/planner/path_based_planner.c index d63c9139c..65a12a4fa 100644 --- a/src/backend/distributed/planner/path_based_planner.c +++ b/src/backend/distributed/planner/path_based_planner.c @@ -22,13 +22,15 @@ static Plan * CreateDistributedUnionPlan(PlannerInfo *root, RelOptInfo *rel, struct CustomPath *best_path, List *tlist, List *clauses, List *custom_plans); static List * ReparameterizeDistributedUnion(PlannerInfo *root, List *custom_private, RelOptInfo *child_rel); -static CustomPath * WrapTableAccessWithDistributedUnion(Path *originalPath, uint32 colocationId); +static CustomPath * WrapTableAccessWithDistributedUnion(Path *originalPath, uint32 colocationId, Expr *partitionValue); static Index VarnoFromFirstTargetEntry(List *tlist); static Query * GetQueryFromPath(PlannerInfo *root, Path *path, List *tlist, List *clauses); static List * ShardIntervalListToRelationShardList(List *shardIntervalList); static Path * OptimizeJoinPath(Path *originalPath); static bool CanOptimizeJoinPath(JoinPath *jpath); static bool IsDistributedUnion(Path *path); +static Expr * ExtractPartitionValue(List *restrictionList, Var *partitionKey); +static List * ShardIntervalListForRelationPartitionValue(Oid relationId, Expr *partitionValue); typedef struct DistributedUnionPath { @@ -38,6 +40,7 @@ typedef struct DistributedUnionPath Path *worker_path; uint32 colocationId; + Expr *partitionValue; } DistributedUnionPath; const CustomPathMethods distributedUnionMethods = { @@ -47,8 +50,8 @@ const CustomPathMethods distributedUnionMethods = { }; -CustomPath * -WrapTableAccessWithDistributedUnion(Path *originalPath, uint32 colocationId) +static CustomPath * +WrapTableAccessWithDistributedUnion(Path *originalPath, uint32 colocationId, Expr *partitionValue) { DistributedUnionPath *distUnion = (DistributedUnionPath *) newNode(sizeof(DistributedUnionPath), T_CustomPath); @@ -67,6 +70,7 @@ WrapTableAccessWithDistributedUnion(Path *originalPath, uint32 colocationId) distUnion->worker_path = originalPath; distUnion->colocationId = colocationId; + distUnion->partitionValue = partitionValue; return (CustomPath *) distUnion; } @@ -92,9 +96,13 @@ CreateDistributedUnionPlan(PlannerInfo *root, * Assume shards are colocated, any shard should suffice for now to find the initial * interval list */ + /* TODO track colocation information on the Distributed Union node to fetch required information in a more optimal setting*/ - RangeTblEntry *rte = linitial_node(RangeTblEntry, q->rtable); - List* shardIntervalList = LoadShardIntervalList(rte->relid); + int relIndex = bms_next_member(rel->relids, -1) -1; /* read the first range table entry used in this relation */ + RangeTblEntry *rte = list_nth_node(RangeTblEntry, q->rtable, relIndex); + List *shardIntervalList = ShardIntervalListForRelationPartitionValue( + rte->relid, + distUnion->partitionValue); int i = 0; foreach_ptr(shardInterval, shardIntervalList) @@ -139,6 +147,23 @@ CreateDistributedUnionPlan(PlannerInfo *root, } +static List * +ShardIntervalListForRelationPartitionValue(Oid relationId, Expr *partitionValue) +{ + if (partitionValue && IsA(partitionValue, Const)) + { + /* prune shard list to target */ + Const *partitionValueConst = castNode(Const, partitionValue); + /* TODO assert the constant is of the correct value */ + DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(relationId); + return list_make1(FindShardInterval(partitionValueConst->constvalue, cacheEntry)); + } + + /* all shards */ + return LoadShardIntervalList(relationId); +} + + static List * ShardIntervalListToRelationShardList(List *shardIntervalList) { @@ -195,6 +220,18 @@ PathBasedPlannerRelationHook(PlannerInfo *root, return; } + Var *partitionKey = DistPartitionKey(rte->relid); + Expr *partitionValue = NULL; + + /* the distirbuted table has a partition key, lets check filters if there is a value */ + if (partitionKey != NULL) + { + /* TODO set correct varattno for multi table queries*/ + partitionKey->varattno = 1; + + partitionValue = ExtractPartitionValue(relOptInfo->baserestrictinfo, partitionKey); + } + /* wrap every path with a distirbuted union custom path */ ListCell *pathCell = NULL; foreach(pathCell, relOptInfo->pathlist) @@ -202,10 +239,55 @@ PathBasedPlannerRelationHook(PlannerInfo *root, Path *originalPath = lfirst(pathCell); pathCell->data.ptr_value = WrapTableAccessWithDistributedUnion(originalPath, - TableColocationId(rte->relid)); + TableColocationId(rte->relid), + partitionValue); } } + +static Expr * +ExtractPartitionValue(List *restrictionList, Var *partitionKey) +{ + RestrictInfo *info = NULL; + foreach_ptr(info, restrictionList) + { + if (!NodeIsEqualsOpExpr((Node *) info->clause)) + { + continue; + } + + /* equality operator, check for partition column */ + OpExpr *eq = castNode(OpExpr, info->clause); + Expr *left = list_nth(eq->args, 0); + Expr *right = list_nth(eq->args, 1); + + if (IsA(left, Var)) + { + Var *leftVar = castNode(Var, left); + if (leftVar->varno == partitionKey->varno && + leftVar->varattno == partitionKey->varattno) + { + /* partition column, return right*/ + return right; + } + } + + if (IsA(right, Var)) + { + Var *rightVar = castNode(Var, left); + if (rightVar->varno == partitionKey->varno && + rightVar->varattno == partitionKey->varattno) + { + /* partition column, return left */ + return left; + } + } + } + + return NULL; +} + + static bool CanOptimizeJoinPath(JoinPath *jpath) { @@ -251,7 +333,7 @@ OptimizeJoinPath(Path *originalPath) jpath->path.startup_cost -= 2000; /* remove the double dist union cost */ jpath->path.total_cost -= 2000; /* remove the double dist union cost */ - return (Path *) WrapTableAccessWithDistributedUnion((Path *) jpath, colocationId); + return (Path *) WrapTableAccessWithDistributedUnion((Path *) jpath, colocationId, NULL); } }