Second case works fine

prototype_de_correlate_subqueries
Onder Kalaci 2018-05-28 09:43:44 +03:00
parent acc8c83282
commit a7c6645d3f
3 changed files with 184 additions and 105 deletions

View File

@ -840,7 +840,7 @@ DeferErrorIfQueryNotSupported(Query *queryTree)
*/ */
if (queryTree->hasSubLinks && !WhereClauseContainsSubquery(queryTree)) if (queryTree->hasSubLinks && !WhereClauseContainsSubquery(queryTree))
{ {
preconditionsSatisfied = false; //preconditionsSatisfied = false;
errorMessage = "could not run distributed query with subquery outside the " errorMessage = "could not run distributed query with subquery outside the "
"FROM and WHERE clauses"; "FROM and WHERE clauses";
errorHint = filterHint; errorHint = filterHint;

View File

@ -3241,7 +3241,8 @@ OpExpressionContainsColumn(OpExpr *operatorExpression, Var *partitionColumn)
{ {
column = (Var *) leftOperand; column = (Var *) leftOperand;
} }
else
if (IsA(rightOperand, Var))
{ {
column = (Var *) rightOperand; column = (Var *) rightOperand;
} }

View File

@ -72,6 +72,7 @@
#include "optimizer/prep.h" #include "optimizer/prep.h"
#include "optimizer/var.h" #include "optimizer/var.h"
#include "parser/parsetree.h" #include "parser/parsetree.h"
#include "parser/parse_relation.h"
#include "nodes/makefuncs.h" #include "nodes/makefuncs.h"
#include "nodes/nodeFuncs.h" #include "nodes/nodeFuncs.h"
#include "nodes/nodes.h" #include "nodes/nodes.h"
@ -232,8 +233,9 @@ static bool ShouldDeCorrelateSubqueries(Query *query, RecursivePlanningContext *
static void ExamineSublinks(Query *query, Node *quals, RecursivePlanningContext *context); static void ExamineSublinks(Query *query, Node *quals, RecursivePlanningContext *context);
static bool SublinkSafeToDeCorrelate(SubLink *sublink); static bool SublinkSafeToDeCorrelate(SubLink *sublink);
static Expr * ColumnMatchExpressionAtTopLevelConjunction(Node *node, Var *column); static Expr * ColumnMatchExpressionAtTopLevelConjunction(Node *node, Var *column);
static bool OpExpressionContainsColumnAnyPlace(OpExpr *operatorExpression, Var *partitionColumn);
static bool SimpleJoinExpression(Expr *clause); static bool SimpleJoinExpression(Expr *clause);
static Node * RemoveMatchExpressionAtTopLevelConjunction(Node *quals, OpExpr *opExpr); static Node * RemoveMatchExpressionAtTopLevelConjunction(Node *quals, Node *node);
/* /*
* RecursivelyPlanSubqueriesAndCTEs finds subqueries and CTEs that cannot be pushed down to * RecursivelyPlanSubqueriesAndCTEs finds subqueries and CTEs that cannot be pushed down to
@ -388,9 +390,10 @@ ExamineSublinks(Query *query, Node *node, RecursivePlanningContext *context)
ColumnMatchExpressionAtTopLevelConjunction( ColumnMatchExpressionAtTopLevelConjunction(
subselect->jointree->quals, v); subselect->jointree->quals, v);
if (!IsA(expr, OpExpr)) if (expr && !IsA(expr, OpExpr))
{ {
elog(DEBUG2, "skipping since OP EXPRs condition didn't hold"); elog(DEBUG2, "skipping since OP EXPRs condition didn't hold");
return;
} }
@ -464,19 +467,24 @@ ExamineSublinks(Query *query, Node *node, RecursivePlanningContext *context)
} }
Var *correlatedVar = linitial(varList);
Var *v = linitial(varList);
Var *secondVar = NULL; Var *secondVar = NULL;
Expr *expr = Expr *expr =
ColumnMatchExpressionAtTopLevelConjunction( ColumnMatchExpressionAtTopLevelConjunction(
subselect->jointree->quals, v); subselect->jointree->quals, correlatedVar);
if (!IsA(expr, OpExpr)) if (expr && !IsA(expr, OpExpr))
{ {
elog(DEBUG2, "skipping since OP EXPRs condition didn't hold"); elog(DEBUG2, "skipping since OP EXPRs condition didn't hold");
return;
} }
else if (!expr)
{
elog(DEBUG2, "skipping since no expr");
if (equal(strip_implicit_coercions(get_leftop(expr)), v)) return;
}
if (equal(strip_implicit_coercions(get_leftop(expr)), correlatedVar))
{ {
secondVar = (Var *) strip_implicit_coercions(get_rightop(expr)); secondVar = (Var *) strip_implicit_coercions(get_rightop(expr));
} }
@ -486,149 +494,185 @@ ExamineSublinks(Query *query, Node *node, RecursivePlanningContext *context)
} }
subselect = copyObject(subselect); //subselect = copyObject(subselect);
/* /*
* The subquery must have a nonempty jointree, else we won't have a join. * The subquery must have a nonempty jointree, else we won't have a join.
*/ */
if (subselect->jointree->fromlist == NIL) if (subselect->jointree->fromlist == NIL)
{
return; return;
}
/* /*
* Separate out the WHERE clause. (We could theoretically also remove * Separate out the WHERE clause. (We could theoretically also remove
* top-level plain JOIN/ON clauses, but it's probably not worth the * top-level plain JOIN/ON clauses, but it's probably not worth the
* trouble.) * trouble.)
*/ */
Node *whereClause = subselect->jointree->quals; Node *whereClause = subselect->jointree->quals;
subselect->jointree->quals = NULL; OpExpr *opExpr = expr;
subselect->jointree->quals =
RemoveMatchExpressionAtTopLevelConjunction(subselect->jointree->quals,
opExpr);
/* /*
* The rest of the sub-select must not refer to any Vars of the parent * The rest of the sub-select must not refer to any Vars of the parent
* query. (Vars of higher levels should be okay, though.) * query. (Vars of higher levels should be okay, though.)
*/ */
if (contain_vars_of_level((Node *) subselect, 1)) if (contain_vars_of_level((Node *) subselect, 1))
{
elog(DEBUG2, "skipping since other parts of the query also has correlation");
return; return;
}
/* /*
* On the other hand, the WHERE clause must contain some Vars of the * On the other hand, the WHERE clause must contain some Vars of the
* parent query, else it's not gonna be a join. * parent query, else it's not gonna be a join.
*/ */
if (!contain_vars_of_level(whereClause, 1)) if (!contain_vars_of_level(whereClause, 1))
{
elog(DEBUG2, "not likely to skip");
return; return;
}
/* /*
* We don't risk optimizing if the WHERE clause is volatile, either. * We don't risk optimizing if the WHERE clause is volatile, either.
*/ */
if (contain_volatile_functions(whereClause)) if (contain_volatile_functions(whereClause))
{
return; return;
}
List *rightColumnNames = NIL;
List *rightColumnVars = NIL;
List *leftColumnNames = NIL;
List *leftColumnVars = NIL;
List *joinedColumnNames = NIL;
int rtoffset = list_length(query->rtable); List *joinedColumnVars = NIL;
//OffsetVarNodes((Node *) subselect, rtoffset, 0);
OffsetVarNodes(whereClause, rtoffset, 0);
/*
* Upper-level vars in subquery will now be one level closer to their
* parent than before; in particular, anything that had been level 1
* becomes level zero.
*/
IncrementVarSublevelsUp((Node *) subselect, -1, 1);
IncrementVarSublevelsUp(whereClause, -1, 1);
TargetEntry *tList = makeTargetEntry((Expr *) copyObject(secondVar), 1,
"col",
false);
subselect->targetList = list_make1(tList);
RangeTblEntry *newRte = makeNode(RangeTblEntry); RangeTblEntry *newRte = makeNode(RangeTblEntry);
newRte->rtekind = RTE_SUBQUERY; newRte->rtekind = RTE_SUBQUERY;
newRte->subquery = subselect; newRte->subquery = subselect;
newRte->alias = makeAlias("new_sub", NIL);
newRte->eref = makeAlias("new_sub", NIL);
List *subselectTList = pull_var_clause_default(subselect->targetList); TargetEntry *tList = makeTargetEntry((Expr *) copyObject(secondVar),
1,
"col",
false);
subselect->targetList = list_make1(tList);
ListCell *joinalcell1 = NULL; int subqueryOffset = list_length(query->rtable) + 1;
List *erefs2 = NIL;
foreach(joinalcell1, subselectTList)
{
erefs2 = lappend(erefs2, makeString("c1"));
}
newRte->alias = makeAlias("decorralated", erefs2); /* build the join tree using the read_intermediate_result RTE */
newRte->eref = makeAlias("decorralated", erefs2); RangeTblRef *subqueryRteRef = makeNode(RangeTblRef);
subqueryRteRef->rtindex = subqueryOffset;
/* Now we can attach the modified subquery rtable to the parent */ RangeTblEntry *l_rte = linitial(query->rtable);
query->rtable = list_concat(query->rtable, list_make1(newRte)); query->rtable = list_concat(query->rtable, list_make1(newRte));
RangeTblRef *newRangeTableRef = makeNode(RangeTblRef);
newRangeTableRef->rtindex = list_length(query->rtable);
/* remove the NOT EXISTS part */
query->jointree->quals =
RemoveMatchExpressionAtTopLevelConjunction(query->jointree->quals,
node);
IncrementVarSublevelsUp(expr, -1, 1);
secondVar->varno = subqueryOffset;
secondVar->varattno = 1;
expandRTE(l_rte, 1, 0, -1, false,
&leftColumnNames, &leftColumnVars);
expandRTE(newRte, subqueryRteRef->rtindex, 0, -1, false,
&rightColumnNames, &rightColumnVars);
rightColumnNames = list_make1(makeString("onder_col"));
rightColumnVars = list_make1(copyObject(secondVar));
elog(INFO, "rightColumnNames: %s", nodeToString(rightColumnNames));
elog(INFO, "rightColumnVars: %s", nodeToString(rightColumnVars));
newRte->alias = makeAlias("decorralated", copyObject(rightColumnNames));
newRte->eref = makeAlias("decorralated", copyObject(rightColumnNames));
/* /*
* And finally, build the JoinExpr node. * And finally, build the JoinExpr node.
*/ */
JoinExpr *result = makeNode(JoinExpr); JoinExpr *result = makeNode(JoinExpr);
result->jointype = JOIN_ANTI; result->jointype = JOIN_LEFT;
result->isNatural = false; result->isNatural = false;
result->rarg = newRangeTableRef; result->rarg = subqueryRteRef;
result->usingClause = NIL;
result->quals = whereClause;
result->alias = NULL;
result->rtindex = list_length(query->rtable) + 1;
query->jointree->quals = NULL;
if (list_length(query->jointree->fromlist) == 1) if (list_length(query->jointree->fromlist) == 1)
{
result->larg = (Node *) linitial(query->jointree->fromlist); result->larg = (Node *) linitial(query->jointree->fromlist);
}
else else
{
result->larg = (Node *) query->jointree; result->larg = (Node *) query->jointree;
}
NullTest *nullTest = makeNode(NullTest);
nullTest->nulltesttype = IS_NULL;
nullTest->location = secondVar->location;
nullTest->arg = (Expr *) secondVar;
result->usingClause = NIL;
result->quals = expr;
result->alias = NULL;
result->rtindex = list_length(query->rtable) + 1;
query->jointree = makeFromExpr(list_make1(result), NULL); query->jointree = makeFromExpr(list_make1(result), NULL);
query->jointree->quals = nullTest;
joinedColumnNames = list_concat(joinedColumnNames, leftColumnNames);
joinedColumnVars = list_concat(joinedColumnVars, leftColumnVars);
joinedColumnNames = list_concat(joinedColumnNames, rightColumnNames);
joinedColumnVars = list_concat(joinedColumnVars, rightColumnVars);
RangeTblEntry *rteJoin = makeNode(RangeTblEntry); RangeTblEntry *rteJoin = makeNode(RangeTblEntry);
rteJoin->rtekind = RTE_JOIN; rteJoin->rtekind = RTE_JOIN;
rteJoin->relid = InvalidOid; rteJoin->relid = InvalidOid;
rteJoin->subquery = NULL; rteJoin->subquery = NULL;
rteJoin->jointype = JOIN_ANTI; rteJoin->jointype = JOIN_LEFT;
Var *v2 = v; rteJoin->joinaliasvars = joinedColumnVars;
v2->varlevelsup-=1;
v2->varno+=1;
RangeTblEntry *firstRel = linitial(query->rtable); rteJoin->eref = makeAlias("unnamed_citus_join", joinedColumnNames);
firstRel->alias = copyObject(firstRel->eref); rteJoin->alias = makeAlias("unnamed_citus_join", joinedColumnNames);
rteJoin->joinaliasvars = list_concat(rteJoin->joinaliasvars, pull_var_clause_default(query->targetList));
rteJoin->joinaliasvars = list_concat(rteJoin->joinaliasvars, pull_var_clause_default(subselect->targetList));
ListCell *joinalcell = NULL;
List *erefs = NIL;
foreach(joinalcell, rteJoin->joinaliasvars)
{
erefs = lappend(erefs, makeString("c1"));
}
rteJoin->eref = makeAlias("join", erefs);
rteJoin->alias = makeAlias("join", erefs);
query->rtable = lappend(query->rtable, rteJoin); query->rtable = lappend(query->rtable, rteJoin);
StringInfo str = makeStringInfo();
StringInfo buf = makeStringInfo(); //deparse_shard_query(query, 0, 0, str);
deparse_shard_query(query, 0, 0, buf); //elog(INFO, "Current subquery: %s", str->data);
elog(INFO, "buf:%s", buf->data);
RecursivelyPlanSubquery(query, context); RecursivelyPlanSubquery(query, context);
} }
} }
} }
@ -694,7 +738,7 @@ ColumnMatchExpressionAtTopLevelConjunction(Node *node, Var *column)
return NULL; return NULL;
} }
columnInExpr = OpExpressionContainsColumn(opExpr, column); columnInExpr = OpExpressionContainsColumnAnyPlace(opExpr, column);
if (!columnInExpr) if (!columnInExpr)
{ {
return NULL; return NULL;
@ -729,6 +773,35 @@ ColumnMatchExpressionAtTopLevelConjunction(Node *node, Var *column)
return NULL; return NULL;
} }
static bool
OpExpressionContainsColumnAnyPlace(OpExpr *operatorExpression, Var *partitionColumn)
{
Node *leftOperand = get_leftop((Expr *) operatorExpression);
Node *rightOperand = get_rightop((Expr *) operatorExpression);
Var *column = NULL;
/* strip coercions before doing check */
leftOperand = strip_implicit_coercions(leftOperand);
rightOperand = strip_implicit_coercions(rightOperand);
if (IsA(leftOperand, Var))
{
column = (Var *) leftOperand;
if (equal(column, partitionColumn))
return true;
}
if (IsA(rightOperand, Var))
{
column = (Var *) rightOperand;
if (equal(column, partitionColumn))
return true;
}
return equal(column, partitionColumn);
}
/* /*
* RemoveMatchExpressionAtTopLevelConjunction returns true if the query contains an exact * RemoveMatchExpressionAtTopLevelConjunction returns true if the query contains an exact
@ -736,21 +809,26 @@ ColumnMatchExpressionAtTopLevelConjunction(Node *node, Var *column)
* if the match expression has an AND relation with the rest of the expression tree. * if the match expression has an AND relation with the rest of the expression tree.
*/ */
static Node * static Node *
RemoveMatchExpressionAtTopLevelConjunction(Node *quals, OpExpr *opExpr) RemoveMatchExpressionAtTopLevelConjunction(Node *quals, Node *node)
{ {
if (quals == NULL) if (quals == NULL)
{ {
return NULL; return NULL;
} }
if (IsA(quals, OpExpr) && equal(quals, opExpr)) if (IsA(quals, OpExpr) && equal(quals, node))
{
return makeBoolConst(true, false);
}
if (not_clause(node) && equal(quals, node))
{ {
return makeBoolConst(true, false); return makeBoolConst(true, false);
} }
return expression_tree_mutator(quals, return expression_tree_mutator(quals,
RemoveMatchExpressionAtTopLevelConjunction, RemoveMatchExpressionAtTopLevelConjunction,
(void *) opExpr); (void *) node);
} }