mirror of https://github.com/citusdata/citus.git
use patternmatcher for join pushdown
parent
f4c5f9e2ea
commit
7ee4646e96
|
@ -72,7 +72,6 @@ static List * BroadcastOuterJoinPath(PlannerInfo *root, Path *originalPath);
|
||||||
static List * BroadcastInnerJoinPath(PlannerInfo *root, Path *originalPath);
|
static List * BroadcastInnerJoinPath(PlannerInfo *root, Path *originalPath);
|
||||||
static List * GeoOverlapJoin(PlannerInfo *root, Path *originalPath);
|
static List * GeoOverlapJoin(PlannerInfo *root, Path *originalPath);
|
||||||
static Path * CreateReadIntermediateResultPath(const Path *originalPath);
|
static Path * CreateReadIntermediateResultPath(const Path *originalPath);
|
||||||
static bool CanOptimizeJoinPath(const JoinPath *jpath);
|
|
||||||
static bool IsDistributedUnion(Path *path, bool recurseTransparent,
|
static bool IsDistributedUnion(Path *path, bool recurseTransparent,
|
||||||
DistributedUnionPath **out);
|
DistributedUnionPath **out);
|
||||||
static Expr * ExtractPartitionValue(List *restrictionList, Var *partitionKey);
|
static Expr * ExtractPartitionValue(List *restrictionList, Var *partitionKey);
|
||||||
|
@ -684,35 +683,6 @@ ExtractPartitionValue(List *restrictionList, Var *partitionKey)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
static bool
|
|
||||||
CanOptimizeJoinPath(const JoinPath *jpath)
|
|
||||||
{
|
|
||||||
if (!(IsDistributedUnion(jpath->innerjoinpath, false, NULL) &&
|
|
||||||
IsDistributedUnion(jpath->outerjoinpath, false, NULL)))
|
|
||||||
{
|
|
||||||
/* can only optimize joins when both inner and outer are a distributed union */
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
DistributedUnionPath *innerDU = (DistributedUnionPath *) jpath->innerjoinpath;
|
|
||||||
DistributedUnionPath *outerDU = (DistributedUnionPath *) jpath->outerjoinpath;
|
|
||||||
|
|
||||||
if (innerDU->colocationId != outerDU->colocationId)
|
|
||||||
{
|
|
||||||
/* Distributed Union is not on the same colocation group */
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!equal(innerDU->partitionValue, outerDU->partitionValue))
|
|
||||||
{
|
|
||||||
/* TODO this is most likely too strict, but if the values are strictly the same we can easily take one during merging */
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
static NameData
|
static NameData
|
||||||
GetFunctionNameData(Oid funcid)
|
GetFunctionNameData(Oid funcid)
|
||||||
{
|
{
|
||||||
|
@ -889,33 +859,52 @@ GeoOverlapJoin(PlannerInfo *root, Path *originalPath)
|
||||||
static List *
|
static List *
|
||||||
OptimizeJoinPath(PlannerInfo *root, Path *originalPath)
|
OptimizeJoinPath(PlannerInfo *root, Path *originalPath)
|
||||||
{
|
{
|
||||||
switch (originalPath->pathtype)
|
DistributedUnionPath *innerDU = NULL;
|
||||||
{
|
DistributedUnionPath *outerDU = NULL;
|
||||||
case T_NestLoop:
|
JoinPath *jpath = NULL;
|
||||||
case T_HashJoin:
|
|
||||||
{
|
IfPathMatch(
|
||||||
const JoinPath *jpath = (JoinPath *) originalPath;
|
originalPath,
|
||||||
if (jpath->jointype == JOIN_INNER && CanOptimizeJoinPath(jpath))
|
MatchJoin(
|
||||||
{
|
&jpath,
|
||||||
/* we can only optimize the Distributed union if the colocationId's are the same, taking any would suffice */
|
JOIN_INNER,
|
||||||
DistributedUnionPath *baseDistUnion =
|
/* match on join restriction info */
|
||||||
(DistributedUnionPath *) jpath->innerjoinpath;
|
MatchAny,
|
||||||
|
/* match inner path in join */
|
||||||
|
SkipReadThrough(
|
||||||
|
NoCapture,
|
||||||
|
MatchDistributedUnion(
|
||||||
|
&innerDU,
|
||||||
|
MatchAny)),
|
||||||
|
/* match outer path in join */
|
||||||
|
SkipReadThrough(
|
||||||
|
NoCapture,
|
||||||
|
MatchDistributedUnion(
|
||||||
|
&outerDU,
|
||||||
|
MatchAny))))
|
||||||
|
{
|
||||||
|
if (innerDU->colocationId != outerDU->colocationId)
|
||||||
|
{
|
||||||
|
/* Distributed Union is not on the same colocation group */
|
||||||
|
return NIL;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!equal(innerDU->partitionValue, outerDU->partitionValue))
|
||||||
|
{
|
||||||
|
/* TODO this is most likely too strict, but if the values are strictly the same we can easily take one during merging */
|
||||||
|
return NIL;
|
||||||
|
}
|
||||||
|
|
||||||
|
const DistributedUnionPath *baseDistUnion = innerDU;
|
||||||
|
|
||||||
/*
|
|
||||||
* Shallow copy of any join node, this does not imply executing a nested
|
|
||||||
* join, but the nested join contains all the information we need to send
|
|
||||||
* the join to the worker
|
|
||||||
*/
|
|
||||||
JoinPath *jcpath = makeNode(NestPath);
|
JoinPath *jcpath = makeNode(NestPath);
|
||||||
*jcpath = *jpath;
|
*jcpath = *jpath;
|
||||||
jcpath->path.type = T_NestPath;
|
jcpath->path.type = T_NestPath;
|
||||||
|
|
||||||
jcpath->innerjoinpath =
|
jcpath->innerjoinpath = innerDU->worker_path;
|
||||||
((DistributedUnionPath *) jpath->innerjoinpath)->worker_path;
|
jcpath->outerjoinpath = outerDU->worker_path;
|
||||||
jcpath->outerjoinpath =
|
|
||||||
((DistributedUnionPath *) jpath->outerjoinpath)->worker_path;
|
|
||||||
|
|
||||||
/* TODO update costs of hashjoin, very naife removal of DU cost for now */
|
/* TODO update costs of hashjoin, very naive removal of DU cost for now */
|
||||||
jcpath->path.startup_cost -= 2000; /* remove the double dist union cost */
|
jcpath->path.startup_cost -= 2000; /* remove the double dist union cost */
|
||||||
jcpath->path.total_cost -= 2000; /* remove the double dist union cost */
|
jcpath->path.total_cost -= 2000; /* remove the double dist union cost */
|
||||||
|
|
||||||
|
@ -928,13 +917,8 @@ OptimizeJoinPath(PlannerInfo *root, Path *originalPath)
|
||||||
|
|
||||||
return list_make1(newPath);
|
return list_make1(newPath);
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
default:
|
|
||||||
{
|
|
||||||
return NIL;
|
return NIL;
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue