mirror of https://github.com/citusdata/citus.git
add paths for joins instead of rewriting them
parent
84068f79b2
commit
23b11b470b
|
@ -18,6 +18,7 @@
|
||||||
#include "nodes/pathnodes.h"
|
#include "nodes/pathnodes.h"
|
||||||
#include "nodes/pg_list.h"
|
#include "nodes/pg_list.h"
|
||||||
#include "nodes/plannodes.h"
|
#include "nodes/plannodes.h"
|
||||||
|
#include "optimizer/pathnode.h"
|
||||||
#include "optimizer/restrictinfo.h"
|
#include "optimizer/restrictinfo.h"
|
||||||
|
|
||||||
static Plan * CreateDistributedUnionPlan(PlannerInfo *root, RelOptInfo *rel, struct CustomPath *best_path, List *tlist, List *clauses, List *custom_plans);
|
static Plan * CreateDistributedUnionPlan(PlannerInfo *root, RelOptInfo *rel, struct CustomPath *best_path, List *tlist, List *clauses, List *custom_plans);
|
||||||
|
@ -26,7 +27,7 @@ static CustomPath * WrapTableAccessWithDistributedUnion(Path *originalPath, uint
|
||||||
static Query * GetQueryFromPath(PlannerInfo *root, Path *path, List *tlist, List *clauses);
|
static Query * GetQueryFromPath(PlannerInfo *root, Path *path, List *tlist, List *clauses);
|
||||||
static List * ShardIntervalListToRelationShardList(List *shardIntervalList);
|
static List * ShardIntervalListToRelationShardList(List *shardIntervalList);
|
||||||
static Path * OptimizeJoinPath(Path *originalPath);
|
static Path * OptimizeJoinPath(Path *originalPath);
|
||||||
static bool CanOptimizeJoinPath(JoinPath *jpath);
|
static bool CanOptimizeJoinPath(const JoinPath *jpath);
|
||||||
static bool IsDistributedUnion(Path *path);
|
static bool IsDistributedUnion(Path *path);
|
||||||
static Expr * ExtractPartitionValue(List *restrictionList, Var *partitionKey);
|
static Expr * ExtractPartitionValue(List *restrictionList, Var *partitionKey);
|
||||||
static List * ShardIntervalListForRelationPartitionValue(Oid relationId, Expr *partitionValue);
|
static List * ShardIntervalListForRelationPartitionValue(Oid relationId, Expr *partitionValue);
|
||||||
|
@ -300,7 +301,7 @@ ExtractPartitionValue(List *restrictionList, Var *partitionKey)
|
||||||
|
|
||||||
|
|
||||||
static bool
|
static bool
|
||||||
CanOptimizeJoinPath(JoinPath *jpath)
|
CanOptimizeJoinPath(const JoinPath *jpath)
|
||||||
{
|
{
|
||||||
if (!(IsDistributedUnion(jpath->innerjoinpath) &&
|
if (!(IsDistributedUnion(jpath->innerjoinpath) &&
|
||||||
IsDistributedUnion(jpath->outerjoinpath)))
|
IsDistributedUnion(jpath->outerjoinpath)))
|
||||||
|
@ -336,21 +337,30 @@ OptimizeJoinPath(Path *originalPath)
|
||||||
case T_NestLoop:
|
case T_NestLoop:
|
||||||
case T_HashJoin:
|
case T_HashJoin:
|
||||||
{
|
{
|
||||||
JoinPath *jpath = (JoinPath *) originalPath;
|
const JoinPath *jpath = (JoinPath *) originalPath;
|
||||||
if (CanOptimizeJoinPath(jpath))
|
if (CanOptimizeJoinPath(jpath))
|
||||||
{
|
{
|
||||||
/* we can only optimize the Distributed union if the colocationId's are the same, taking any would suffice */
|
/* we can only optimize the Distributed union if the colocationId's are the same, taking any would suffice */
|
||||||
DistributedUnionPath *baseDistUnion = (DistributedUnionPath *) jpath->innerjoinpath;
|
DistributedUnionPath *baseDistUnion = (DistributedUnionPath *) jpath->innerjoinpath;
|
||||||
|
|
||||||
jpath->innerjoinpath = ((DistributedUnionPath *) jpath->innerjoinpath)->worker_path;
|
/*
|
||||||
jpath->outerjoinpath = ((DistributedUnionPath *) jpath->outerjoinpath)->worker_path;
|
* Shallow copy of any join node, this does not imply executing a nested
|
||||||
|
* join, but the nested join contains all the information we need to send
|
||||||
|
* the join to the worker
|
||||||
|
*/
|
||||||
|
JoinPath *jcpath = makeNode(NestPath);
|
||||||
|
*jcpath = *jpath;
|
||||||
|
jcpath->path.type = T_NestPath;
|
||||||
|
|
||||||
|
jcpath->innerjoinpath = ((DistributedUnionPath *) jpath->innerjoinpath)->worker_path;
|
||||||
|
jcpath->outerjoinpath = ((DistributedUnionPath *) jpath->outerjoinpath)->worker_path;
|
||||||
|
|
||||||
/* TODO update costs of hashjoin, very naife removal of DU cost for now */
|
/* TODO update costs of hashjoin, very naife removal of DU cost for now */
|
||||||
jpath->path.startup_cost -= 2000; /* remove the double dist union cost */
|
jcpath->path.startup_cost -= 2000; /* remove the double dist union cost */
|
||||||
jpath->path.total_cost -= 2000; /* remove the double dist union cost */
|
jcpath->path.total_cost -= 2000; /* remove the double dist union cost */
|
||||||
|
|
||||||
return (Path *) WrapTableAccessWithDistributedUnion(
|
return (Path *) WrapTableAccessWithDistributedUnion(
|
||||||
(Path *) jpath,
|
(Path *) jcpath,
|
||||||
baseDistUnion->colocationId,
|
baseDistUnion->colocationId,
|
||||||
baseDistUnion->partitionValue,
|
baseDistUnion->partitionValue,
|
||||||
baseDistUnion->sampleRelid);
|
baseDistUnion->sampleRelid);
|
||||||
|
@ -359,7 +369,7 @@ OptimizeJoinPath(Path *originalPath)
|
||||||
|
|
||||||
default:
|
default:
|
||||||
{
|
{
|
||||||
return originalPath;
|
return NULL;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -379,7 +389,11 @@ PathBasedPlannerJoinHook(PlannerInfo *root,
|
||||||
foreach(pathCell, joinrel->pathlist)
|
foreach(pathCell, joinrel->pathlist)
|
||||||
{
|
{
|
||||||
Path *originalPath = lfirst(pathCell);
|
Path *originalPath = lfirst(pathCell);
|
||||||
pathCell->data.ptr_value = OptimizeJoinPath(originalPath);
|
Path *optimizedPath = OptimizeJoinPath(originalPath);
|
||||||
|
if (optimizedPath)
|
||||||
|
{
|
||||||
|
add_path(joinrel, optimizedPath);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue