Enable non equi joins in subquery pushdown

Subquery pushdown planning is based on relation restriction
equivalnce. This brings us the opportuneatly to allow any
other joins as long as there is an already equi join between
the distributed tables.

We already allow that for joins with reference tables and
this commit allows that for joins among distributed tables.
pull/1827/head
Onder Kalaci 2017-11-23 16:07:03 +02:00
parent ae2c86dbdd
commit 48f96bf3e5
5 changed files with 354 additions and 62 deletions

View File

@ -122,7 +122,6 @@ static bool RelationInfoContainsRecurringTuples(PlannerInfo *plannerInfo,
RecurringTuplesType *recurType);
static bool HasRecurringTuples(Node *node, RecurringTuplesType *recurType);
static void ValidateClauseList(List *clauseList);
static void ValidateSubqueryPushdownClauseList(List *clauseList);
static bool ExtractFromExpressionWalker(Node *node,
QualifierWalkerContext *walkerContext);
static List * MultiTableNodeList(List *tableEntryList, List *rangeTableList);
@ -132,7 +131,6 @@ static MultiNode * MultiJoinTree(List *joinOrderList, List *collectTableList,
static MultiCollect * CollectNodeForTable(List *collectTableList, uint32 rangeTableId);
static MultiSelect * MultiSelectNode(List *whereClauseList);
static bool IsSelectClause(Node *clause);
static bool IsSublinkClause(Node *clause);
static MultiProject * MultiProjectNode(List *targetEntryList);
static MultiExtendedOp * MultiExtendedOpNode(Query *queryTree);
@ -2535,12 +2533,6 @@ ValidateClauseList(List *clauseList)
{
Node *clause = (Node *) lfirst(clauseCell);
/*
* There could never be sublinks here given that it is handled
* in subquery pushdown code-path.
*/
Assert(!IsSublinkClause(clause));
if (!(IsSelectClause(clause) || IsJoinClause(clause) || or_clause(clause)))
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
@ -2550,33 +2542,6 @@ ValidateClauseList(List *clauseList)
}
/*
* ValidateSubqueryPushdownClauseList walks over the given list of clauses,
* and checks that we can recognize all the clauses. This function ensures
* that we do not drop an unsupported clause type on the floor, and thus
* prevents erroneous results.
*
* Note that this function is slightly different than ValidateClauseList(),
* additionally allowing sublinks.
*/
static void
ValidateSubqueryPushdownClauseList(List *clauseList)
{
ListCell *clauseCell = NULL;
foreach(clauseCell, clauseList)
{
Node *clause = (Node *) lfirst(clauseCell);
if (!(IsSublinkClause(clause) || IsSelectClause(clause) ||
IsJoinClause(clause) || or_clause(clause)))
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("unsupported clause type")));
}
}
}
/*
* JoinClauseList finds the join clauses from the given where clause expression
* list, and returns them. The function does not iterate into nested OR clauses
@ -3056,23 +3021,6 @@ IsSelectClause(Node *clause)
}
/*
* IsSublinkClause determines if the given node is a sublink or subplan.
*/
static bool
IsSublinkClause(Node *clause)
{
NodeTag nodeTag = nodeTag(clause);
if (nodeTag == T_SubLink || nodeTag == T_SubPlan)
{
return true;
}
return false;
}
/*
* MultiProjectNode builds the project node using the target entry information
* from the query tree. The project node only encapsulates projected columns,
@ -3722,7 +3670,6 @@ static MultiNode *
SubqueryPushdownMultiNodeTree(Query *queryTree)
{
List *targetEntryList = queryTree->targetList;
List *qualifierList = NIL;
List *columnList = NIL;
List *targetColumnList = NIL;
MultiCollect *subqueryCollectNode = CitusMakeNode(MultiCollect);
@ -3737,14 +3684,6 @@ SubqueryPushdownMultiNodeTree(Query *queryTree)
/* verify we can perform distributed planning on this query */
ErrorIfQueryNotSupported(queryTree);
/*
* Extract qualifiers and verify we can plan for them. Note that since
* subquery pushdown join planning is based on restriction equivalence,
* checking for these qualifiers may not be necessary.
*/
qualifierList = QualifierList(queryTree->jointree);
ValidateSubqueryPushdownClauseList(qualifierList);
/*
* We would be creating a new Query and pushing down top level query's
* contents down to it. Join and filter clauses in higher level query would

View File

@ -2040,6 +2040,21 @@ CREATE FUNCTION test_join_function_2(integer, integer) RETURNS bool
LANGUAGE SQL
IMMUTABLE
RETURNS NULL ON NULL INPUT;
SELECT run_command_on_workers($f$
CREATE FUNCTION test_join_function_2(integer, integer) RETURNS bool
AS 'select $1 > $2;'
LANGUAGE SQL
IMMUTABLE
RETURNS NULL ON NULL INPUT;
$f$);
run_command_on_workers
---------------------------------------
(localhost,57637,t,"CREATE FUNCTION")
(localhost,57638,t,"CREATE FUNCTION")
(2 rows)
-- we don't support joins via functions
SELECT user_id, array_length(events_table, 1)
FROM (
@ -2074,6 +2089,125 @@ FROM
WHERE
users_table.value_1 < 50 AND test_join_function_2(users_table.user_id, temp.user_id);
ERROR: unsupported clause type
-- we do support the following since there is already an equality on the partition
-- key and we have an additional join via a function
SELECT
temp.user_id, users_table.value_1, prob
FROM
users_table
JOIN
(SELECT
ma.user_id, (GREATEST(coalesce(ma.value_4 / 250, 0.0) + GREATEST(1.0))) / 2 AS prob, random()
FROM
users_table AS ma, events_table as short_list
WHERE
short_list.user_id = ma.user_id and ma.value_1 < 50 and short_list.event_type < 100 AND
test_join_function_2(ma.value_1, short_list.value_2)
) temp
ON users_table.user_id = temp.user_id
WHERE
users_table.value_1 < 50
ORDER BY 2 DESC, 1 DESC
LIMIT 10;
user_id | value_1 | prob
---------+---------+------------------------
37 | 49 | 0.50000000000000000000
37 | 49 | 0.50000000000000000000
37 | 49 | 0.50000000000000000000
32 | 49 | 0.50000000000000000000
32 | 49 | 0.50000000000000000000
98 | 48 | 0.50000000000000000000
98 | 48 | 0.50000000000000000000
98 | 48 | 0.50000000000000000000
96 | 48 | 0.50000000000000000000
96 | 48 | 0.50000000000000000000
(10 rows)
-- similarly we do support non equi joins on columns if there is aready
-- an equality join
SELECT
count(*)
FROM
(SELECT
event_type, random()
FROM
events_table, users_table
WHERE
events_table.user_id = users_table.user_id AND
events_table.time > users_table.time AND
events_table.value_2 IN (10, 100)
) as foo;
count
-------
1015
(1 row)
-- the other way around is not supported
SELECT
count(*)
FROM
(SELECT
event_type, random()
FROM
events_table, users_table
WHERE
events_table.user_id > users_table.user_id AND
events_table.time = users_table.time AND
events_table.value_2 IN (10, 100)
) as foo;
ERROR: cannot pushdown the subquery since all relations are not joined using distribution keys
DETAIL: Each relation should be joined with at least one another relation using distribution keys and equality operator.
-- we can even allow that on top level joins
SELECT
count(*)
FROM
(SELECT
event_type, random(), events_table.user_id
FROM
events_table, users_table
WHERE
events_table.user_id = users_table.user_id AND
events_table.value_2 IN (10, 100)
) as foo,
(SELECT
event_type, random(), events_table.user_id
FROM
events_table, users_table
WHERE
events_table.user_id = users_table.user_id AND
events_table.value_2 IN (20, 200)
) as bar
WHERE foo.event_type > bar.event_type
AND foo.user_id = bar.user_id;
count
-------
14641
(1 row)
-- note that the following is not supported
-- since the top level join is not on the distribution key
SELECT
count(*)
FROM
(SELECT
event_type, random()
FROM
events_table, users_table
WHERE
events_table.user_id = users_table.user_id AND
events_table.value_2 IN (10, 100)
) as foo,
(SELECT
event_type, random()
FROM
events_table, users_table
WHERE
events_table.user_id = users_table.user_id AND
events_table.value_2 IN (20, 200)
) as bar
WHERE foo.event_type = bar.event_type;
ERROR: cannot pushdown the subquery since all relations are not joined using distribution keys
DETAIL: Each relation should be joined with at least one another relation using distribution keys and equality operator.
-- DISTINCT in the outer query and DISTINCT in the subquery
SELECT
DISTINCT users_ids.user_id
@ -2152,5 +2286,16 @@ FROM
(5 rows)
DROP FUNCTION test_join_function_2(integer, integer);
SELECT run_command_on_workers($f$
DROP FUNCTION test_join_function_2(integer, integer);
$f$);
run_command_on_workers
-------------------------------------
(localhost,57637,t,"DROP FUNCTION")
(localhost,57638,t,"DROP FUNCTION")
(2 rows)
SET citus.enable_router_execution TO TRUE;
SET citus.subquery_pushdown to OFF;

View File

@ -23,7 +23,7 @@ WHERE
GROUP BY user_id
HAVING count(*) > 66
ORDER BY user_id
LIMIT 5;
LIMIT 5;
user_id
---------
49
@ -32,6 +32,57 @@ LIMIT 5;
63
(4 rows)
-- same query with one additional join on non distribution column
SELECT
user_id
FROM
users_table
WHERE
value_2 >
(SELECT
max(value_2)
FROM
events_table
WHERE
users_table.user_id = events_table.user_id AND event_type = 50 AND
users_table.time > events_table.time
GROUP BY
user_id
)
GROUP BY user_id
HAVING count(*) > 66
ORDER BY user_id
LIMIT 5;
user_id
---------
55
56
63
(3 rows)
-- the other way around is not supported
SELECT
user_id
FROM
users_table
WHERE
value_2 >
(SELECT
max(value_2)
FROM
events_table
WHERE
users_table.user_id > events_table.user_id AND event_type = 50 AND
users_table.time = events_table.time
GROUP BY
user_id
)
GROUP BY user_id
HAVING count(*) > 66
ORDER BY user_id
LIMIT 5;
ERROR: cannot pushdown the subquery since all relations are not joined using distribution keys
DETAIL: Each relation should be joined with at least one another relation using distribution keys and equality operator.
-- subqueries in where with ALL operator
SELECT
user_id

View File

@ -1626,6 +1626,16 @@ CREATE FUNCTION test_join_function_2(integer, integer) RETURNS bool
IMMUTABLE
RETURNS NULL ON NULL INPUT;
SELECT run_command_on_workers($f$
CREATE FUNCTION test_join_function_2(integer, integer) RETURNS bool
AS 'select $1 > $2;'
LANGUAGE SQL
IMMUTABLE
RETURNS NULL ON NULL INPUT;
$f$);
-- we don't support joins via functions
SELECT user_id, array_length(events_table, 1)
FROM (
@ -1659,6 +1669,102 @@ FROM
WHERE
users_table.value_1 < 50 AND test_join_function_2(users_table.user_id, temp.user_id);
-- we do support the following since there is already an equality on the partition
-- key and we have an additional join via a function
SELECT
temp.user_id, users_table.value_1, prob
FROM
users_table
JOIN
(SELECT
ma.user_id, (GREATEST(coalesce(ma.value_4 / 250, 0.0) + GREATEST(1.0))) / 2 AS prob, random()
FROM
users_table AS ma, events_table as short_list
WHERE
short_list.user_id = ma.user_id and ma.value_1 < 50 and short_list.event_type < 100 AND
test_join_function_2(ma.value_1, short_list.value_2)
) temp
ON users_table.user_id = temp.user_id
WHERE
users_table.value_1 < 50
ORDER BY 2 DESC, 1 DESC
LIMIT 10;
-- similarly we do support non equi joins on columns if there is aready
-- an equality join
SELECT
count(*)
FROM
(SELECT
event_type, random()
FROM
events_table, users_table
WHERE
events_table.user_id = users_table.user_id AND
events_table.time > users_table.time AND
events_table.value_2 IN (10, 100)
) as foo;
-- the other way around is not supported
SELECT
count(*)
FROM
(SELECT
event_type, random()
FROM
events_table, users_table
WHERE
events_table.user_id > users_table.user_id AND
events_table.time = users_table.time AND
events_table.value_2 IN (10, 100)
) as foo;
-- we can even allow that on top level joins
SELECT
count(*)
FROM
(SELECT
event_type, random(), events_table.user_id
FROM
events_table, users_table
WHERE
events_table.user_id = users_table.user_id AND
events_table.value_2 IN (10, 100)
) as foo,
(SELECT
event_type, random(), events_table.user_id
FROM
events_table, users_table
WHERE
events_table.user_id = users_table.user_id AND
events_table.value_2 IN (20, 200)
) as bar
WHERE foo.event_type > bar.event_type
AND foo.user_id = bar.user_id;
-- note that the following is not supported
-- since the top level join is not on the distribution key
SELECT
count(*)
FROM
(SELECT
event_type, random()
FROM
events_table, users_table
WHERE
events_table.user_id = users_table.user_id AND
events_table.value_2 IN (10, 100)
) as foo,
(SELECT
event_type, random()
FROM
events_table, users_table
WHERE
events_table.user_id = users_table.user_id AND
events_table.value_2 IN (20, 200)
) as bar
WHERE foo.event_type = bar.event_type;
-- DISTINCT in the outer query and DISTINCT in the subquery
SELECT
DISTINCT users_ids.user_id
@ -1712,7 +1818,14 @@ FROM
ORDER BY 1,2
LIMIT 5;
DROP FUNCTION test_join_function_2(integer, integer);
SELECT run_command_on_workers($f$
DROP FUNCTION test_join_function_2(integer, integer);
$f$);
SET citus.enable_router_execution TO TRUE;
SET citus.subquery_pushdown to OFF;

View File

@ -24,6 +24,50 @@ WHERE
GROUP BY user_id
HAVING count(*) > 66
ORDER BY user_id
LIMIT 5;
-- same query with one additional join on non distribution column
SELECT
user_id
FROM
users_table
WHERE
value_2 >
(SELECT
max(value_2)
FROM
events_table
WHERE
users_table.user_id = events_table.user_id AND event_type = 50 AND
users_table.time > events_table.time
GROUP BY
user_id
)
GROUP BY user_id
HAVING count(*) > 66
ORDER BY user_id
LIMIT 5;
-- the other way around is not supported
SELECT
user_id
FROM
users_table
WHERE
value_2 >
(SELECT
max(value_2)
FROM
events_table
WHERE
users_table.user_id > events_table.user_id AND event_type = 50 AND
users_table.time = events_table.time
GROUP BY
user_id
)
GROUP BY user_id
HAVING count(*) > 66
ORDER BY user_id
LIMIT 5;
-- subqueries in where with ALL operator