mirror of https://github.com/citusdata/citus.git
hacky hack for double distributed join
parent
aa3f283d02
commit
fc0e6a1094
|
@ -1749,8 +1749,40 @@ multi_join_restriction_hook(PlannerInfo *root,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static Node *
|
||||||
|
VarNoMutator(Node *expr, Index *varno)
|
||||||
|
{
|
||||||
|
switch (nodeTag(expr))
|
||||||
|
{
|
||||||
|
case T_Var:
|
||||||
|
{
|
||||||
|
Var *var = castNode(Var, expr);
|
||||||
|
if (var->varno == *varno)
|
||||||
|
{
|
||||||
|
/*nothing to change */
|
||||||
|
return (Node *) var;
|
||||||
|
}
|
||||||
|
|
||||||
|
return (Node *) makeVar(
|
||||||
|
*varno,
|
||||||
|
var->varattno,
|
||||||
|
var->vartype,
|
||||||
|
var->vartypmod,
|
||||||
|
var->varcollid,
|
||||||
|
var->varlevelsup
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
default:
|
||||||
|
{
|
||||||
|
return expression_tree_mutator(expr, (void*) VarNoMutator, varno);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
static Query *
|
static Query *
|
||||||
GetQueryFromPath(List *tlist, RangeTblEntry *rte)
|
GetQueryFromPath(List *tlist, RangeTblEntry *rte, List *clauses)
|
||||||
{
|
{
|
||||||
Query *q = makeNode(Query);
|
Query *q = makeNode(Query);
|
||||||
q->commandType = CMD_SELECT;
|
q->commandType = CMD_SELECT;
|
||||||
|
@ -1760,20 +1792,8 @@ GetQueryFromPath(List *tlist, RangeTblEntry *rte)
|
||||||
TargetEntry *target = NULL;
|
TargetEntry *target = NULL;
|
||||||
foreach_ptr(target, tlist)
|
foreach_ptr(target, tlist)
|
||||||
{
|
{
|
||||||
Var *var = castNode(Var, target->expr);
|
Index varno = 1;
|
||||||
TargetEntry *newTarget = makeTargetEntry(
|
TargetEntry *newTarget = (TargetEntry *) VarNoMutator((Node *)target, &varno);
|
||||||
(Expr *) makeVar(
|
|
||||||
1,
|
|
||||||
var->varattno,
|
|
||||||
var->vartype,
|
|
||||||
var->vartypmod,
|
|
||||||
var->varcollid,
|
|
||||||
var->varlevelsup
|
|
||||||
),
|
|
||||||
target->resno,
|
|
||||||
target->resname,
|
|
||||||
target->resjunk
|
|
||||||
);
|
|
||||||
newTargetList = lappend(newTargetList, newTarget);
|
newTargetList = lappend(newTargetList, newTarget);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1782,6 +1802,19 @@ GetQueryFromPath(List *tlist, RangeTblEntry *rte)
|
||||||
RangeTblRef *rr = makeNode(RangeTblRef);
|
RangeTblRef *rr = makeNode(RangeTblRef);
|
||||||
rr->rtindex = 1;
|
rr->rtindex = 1;
|
||||||
q->jointree->fromlist = list_make1(rr);
|
q->jointree->fromlist = list_make1(rr);
|
||||||
|
|
||||||
|
List *quals = NIL;
|
||||||
|
|
||||||
|
RestrictInfo *rinfo = NULL;
|
||||||
|
foreach_ptr(rinfo, clauses)
|
||||||
|
{
|
||||||
|
Node *clause = (Node *) rinfo->clause;
|
||||||
|
Index varno = 1;
|
||||||
|
clause = VarNoMutator(clause, &varno);
|
||||||
|
quals = lappend(quals, clause);
|
||||||
|
}
|
||||||
|
q->jointree->quals = (Node *) quals;
|
||||||
|
|
||||||
return q;
|
return q;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1794,6 +1827,7 @@ VarnoFromFirstTargetEntry(List *tlist)
|
||||||
return var->varno;
|
return var->varno;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#include "optimizer/restrictinfo.h"
|
||||||
|
|
||||||
static Plan *
|
static Plan *
|
||||||
CreateDistributedUnionPlan(PlannerInfo *root,
|
CreateDistributedUnionPlan(PlannerInfo *root,
|
||||||
|
@ -1807,10 +1841,11 @@ CreateDistributedUnionPlan(PlannerInfo *root,
|
||||||
workerJob->jobId = UniqueJobId();
|
workerJob->jobId = UniqueJobId();
|
||||||
|
|
||||||
RangeTblEntry *rte = list_nth_node(RangeTblEntry, best_path->custom_private, 0);
|
RangeTblEntry *rte = list_nth_node(RangeTblEntry, best_path->custom_private, 0);
|
||||||
|
Path *originalPath = (Path *) list_nth(best_path->custom_private, 1);
|
||||||
List* shardIntervalList = LoadShardIntervalList(rte->relid);
|
List* shardIntervalList = LoadShardIntervalList(rte->relid);
|
||||||
ShardInterval *shardInterval = NULL;
|
ShardInterval *shardInterval = NULL;
|
||||||
|
|
||||||
Query *q = GetQueryFromPath(tlist, rte);
|
Query *q = GetQueryFromPath(tlist, rte, clauses);
|
||||||
|
|
||||||
int i = 0;
|
int i = 0;
|
||||||
foreach_ptr(shardInterval, shardIntervalList)
|
foreach_ptr(shardInterval, shardIntervalList)
|
||||||
|
@ -1846,6 +1881,11 @@ CreateDistributedUnionPlan(PlannerInfo *root,
|
||||||
plan->custom_private = list_make1(distributedPlan);
|
plan->custom_private = list_make1(distributedPlan);
|
||||||
|
|
||||||
plan->scan.plan.targetlist = tlist;
|
plan->scan.plan.targetlist = tlist;
|
||||||
|
/* Reduce RestrictInfo list to bare expressions; ignore pseudoconstants */
|
||||||
|
clauses = extract_actual_clauses(clauses, false);
|
||||||
|
|
||||||
|
plan->scan.plan.qual = clauses;
|
||||||
|
plan->custom_exprs = clauses;
|
||||||
|
|
||||||
return (Plan *) plan;
|
return (Plan *) plan;
|
||||||
}
|
}
|
||||||
|
@ -1876,8 +1916,8 @@ WrapTableAccessWithDistributedUnion(Path *originalPath, RangeTblEntry *rte)
|
||||||
|
|
||||||
/* TODO use a better cost model */
|
/* TODO use a better cost model */
|
||||||
distUnion->path.rows = originalPath->rows;
|
distUnion->path.rows = originalPath->rows;
|
||||||
distUnion->path.startup_cost = originalPath->startup_cost;
|
distUnion->path.startup_cost = originalPath->startup_cost+1000;
|
||||||
distUnion->path.total_cost = originalPath->total_cost;
|
distUnion->path.total_cost = originalPath->total_cost+1000;
|
||||||
|
|
||||||
distUnion->methods = &distributedUnionMethods;
|
distUnion->methods = &distributedUnionMethods;
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue