mirror of https://github.com/citusdata/citus.git
implement beginning of shard pruning
parent
ffa33b096a
commit
30d9380f46
|
@ -22,13 +22,15 @@
|
|||
|
||||
static Plan * CreateDistributedUnionPlan(PlannerInfo *root, RelOptInfo *rel, struct CustomPath *best_path, List *tlist, List *clauses, List *custom_plans);
|
||||
static List * ReparameterizeDistributedUnion(PlannerInfo *root, List *custom_private, RelOptInfo *child_rel);
|
||||
static CustomPath * WrapTableAccessWithDistributedUnion(Path *originalPath, uint32 colocationId);
|
||||
static CustomPath * WrapTableAccessWithDistributedUnion(Path *originalPath, uint32 colocationId, Expr *partitionValue);
|
||||
static Index VarnoFromFirstTargetEntry(List *tlist);
|
||||
static Query * GetQueryFromPath(PlannerInfo *root, Path *path, List *tlist, List *clauses);
|
||||
static List * ShardIntervalListToRelationShardList(List *shardIntervalList);
|
||||
static Path * OptimizeJoinPath(Path *originalPath);
|
||||
static bool CanOptimizeJoinPath(JoinPath *jpath);
|
||||
static bool IsDistributedUnion(Path *path);
|
||||
static Expr * ExtractPartitionValue(List *restrictionList, Var *partitionKey);
|
||||
static List * ShardIntervalListForRelationPartitionValue(Oid relationId, Expr *partitionValue);
|
||||
|
||||
typedef struct DistributedUnionPath
|
||||
{
|
||||
|
@ -38,6 +40,7 @@ typedef struct DistributedUnionPath
|
|||
Path *worker_path;
|
||||
|
||||
uint32 colocationId;
|
||||
Expr *partitionValue;
|
||||
} DistributedUnionPath;
|
||||
|
||||
const CustomPathMethods distributedUnionMethods = {
|
||||
|
@ -47,8 +50,8 @@ const CustomPathMethods distributedUnionMethods = {
|
|||
};
|
||||
|
||||
|
||||
CustomPath *
|
||||
WrapTableAccessWithDistributedUnion(Path *originalPath, uint32 colocationId)
|
||||
static CustomPath *
|
||||
WrapTableAccessWithDistributedUnion(Path *originalPath, uint32 colocationId, Expr *partitionValue)
|
||||
{
|
||||
DistributedUnionPath *distUnion = (DistributedUnionPath *)
|
||||
newNode(sizeof(DistributedUnionPath), T_CustomPath);
|
||||
|
@ -67,6 +70,7 @@ WrapTableAccessWithDistributedUnion(Path *originalPath, uint32 colocationId)
|
|||
|
||||
distUnion->worker_path = originalPath;
|
||||
distUnion->colocationId = colocationId;
|
||||
distUnion->partitionValue = partitionValue;
|
||||
|
||||
return (CustomPath *) distUnion;
|
||||
}
|
||||
|
@ -92,9 +96,13 @@ CreateDistributedUnionPlan(PlannerInfo *root,
|
|||
* Assume shards are colocated, any shard should suffice for now to find the initial
|
||||
* interval list
|
||||
*/
|
||||
|
||||
/* TODO track colocation information on the Distributed Union node to fetch required information in a more optimal setting*/
|
||||
RangeTblEntry *rte = linitial_node(RangeTblEntry, q->rtable);
|
||||
List* shardIntervalList = LoadShardIntervalList(rte->relid);
|
||||
int relIndex = bms_next_member(rel->relids, -1) -1; /* read the first range table entry used in this relation */
|
||||
RangeTblEntry *rte = list_nth_node(RangeTblEntry, q->rtable, relIndex);
|
||||
List *shardIntervalList = ShardIntervalListForRelationPartitionValue(
|
||||
rte->relid,
|
||||
distUnion->partitionValue);
|
||||
|
||||
int i = 0;
|
||||
foreach_ptr(shardInterval, shardIntervalList)
|
||||
|
@ -139,6 +147,23 @@ CreateDistributedUnionPlan(PlannerInfo *root,
|
|||
}
|
||||
|
||||
|
||||
static List *
|
||||
ShardIntervalListForRelationPartitionValue(Oid relationId, Expr *partitionValue)
|
||||
{
|
||||
if (partitionValue && IsA(partitionValue, Const))
|
||||
{
|
||||
/* prune shard list to target */
|
||||
Const *partitionValueConst = castNode(Const, partitionValue);
|
||||
/* TODO assert the constant is of the correct value */
|
||||
DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(relationId);
|
||||
return list_make1(FindShardInterval(partitionValueConst->constvalue, cacheEntry));
|
||||
}
|
||||
|
||||
/* all shards */
|
||||
return LoadShardIntervalList(relationId);
|
||||
}
|
||||
|
||||
|
||||
static List *
|
||||
ShardIntervalListToRelationShardList(List *shardIntervalList)
|
||||
{
|
||||
|
@ -195,6 +220,18 @@ PathBasedPlannerRelationHook(PlannerInfo *root,
|
|||
return;
|
||||
}
|
||||
|
||||
Var *partitionKey = DistPartitionKey(rte->relid);
|
||||
Expr *partitionValue = NULL;
|
||||
|
||||
/* the distirbuted table has a partition key, lets check filters if there is a value */
|
||||
if (partitionKey != NULL)
|
||||
{
|
||||
/* TODO set correct varattno for multi table queries*/
|
||||
partitionKey->varattno = 1;
|
||||
|
||||
partitionValue = ExtractPartitionValue(relOptInfo->baserestrictinfo, partitionKey);
|
||||
}
|
||||
|
||||
/* wrap every path with a distirbuted union custom path */
|
||||
ListCell *pathCell = NULL;
|
||||
foreach(pathCell, relOptInfo->pathlist)
|
||||
|
@ -202,10 +239,55 @@ PathBasedPlannerRelationHook(PlannerInfo *root,
|
|||
Path *originalPath = lfirst(pathCell);
|
||||
pathCell->data.ptr_value =
|
||||
WrapTableAccessWithDistributedUnion(originalPath,
|
||||
TableColocationId(rte->relid));
|
||||
TableColocationId(rte->relid),
|
||||
partitionValue);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
static Expr *
|
||||
ExtractPartitionValue(List *restrictionList, Var *partitionKey)
|
||||
{
|
||||
RestrictInfo *info = NULL;
|
||||
foreach_ptr(info, restrictionList)
|
||||
{
|
||||
if (!NodeIsEqualsOpExpr((Node *) info->clause))
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
/* equality operator, check for partition column */
|
||||
OpExpr *eq = castNode(OpExpr, info->clause);
|
||||
Expr *left = list_nth(eq->args, 0);
|
||||
Expr *right = list_nth(eq->args, 1);
|
||||
|
||||
if (IsA(left, Var))
|
||||
{
|
||||
Var *leftVar = castNode(Var, left);
|
||||
if (leftVar->varno == partitionKey->varno &&
|
||||
leftVar->varattno == partitionKey->varattno)
|
||||
{
|
||||
/* partition column, return right*/
|
||||
return right;
|
||||
}
|
||||
}
|
||||
|
||||
if (IsA(right, Var))
|
||||
{
|
||||
Var *rightVar = castNode(Var, left);
|
||||
if (rightVar->varno == partitionKey->varno &&
|
||||
rightVar->varattno == partitionKey->varattno)
|
||||
{
|
||||
/* partition column, return left */
|
||||
return left;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
||||
static bool
|
||||
CanOptimizeJoinPath(JoinPath *jpath)
|
||||
{
|
||||
|
@ -251,7 +333,7 @@ OptimizeJoinPath(Path *originalPath)
|
|||
jpath->path.startup_cost -= 2000; /* remove the double dist union cost */
|
||||
jpath->path.total_cost -= 2000; /* remove the double dist union cost */
|
||||
|
||||
return (Path *) WrapTableAccessWithDistributedUnion((Path *) jpath, colocationId);
|
||||
return (Path *) WrapTableAccessWithDistributedUnion((Path *) jpath, colocationId, NULL);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue