mirror of https://github.com/citusdata/citus.git
Merge pull request #1827 from citusdata/allow_non_equi_joins
Enable non equi joins in subquery pushdownpull/1824/merge
commit
8877a68a1f
|
@ -122,7 +122,6 @@ static bool RelationInfoContainsRecurringTuples(PlannerInfo *plannerInfo,
|
|||
RecurringTuplesType *recurType);
|
||||
static bool HasRecurringTuples(Node *node, RecurringTuplesType *recurType);
|
||||
static void ValidateClauseList(List *clauseList);
|
||||
static void ValidateSubqueryPushdownClauseList(List *clauseList);
|
||||
static bool ExtractFromExpressionWalker(Node *node,
|
||||
QualifierWalkerContext *walkerContext);
|
||||
static List * MultiTableNodeList(List *tableEntryList, List *rangeTableList);
|
||||
|
@ -132,7 +131,6 @@ static MultiNode * MultiJoinTree(List *joinOrderList, List *collectTableList,
|
|||
static MultiCollect * CollectNodeForTable(List *collectTableList, uint32 rangeTableId);
|
||||
static MultiSelect * MultiSelectNode(List *whereClauseList);
|
||||
static bool IsSelectClause(Node *clause);
|
||||
static bool IsSublinkClause(Node *clause);
|
||||
static MultiProject * MultiProjectNode(List *targetEntryList);
|
||||
static MultiExtendedOp * MultiExtendedOpNode(Query *queryTree);
|
||||
|
||||
|
@ -2535,12 +2533,6 @@ ValidateClauseList(List *clauseList)
|
|||
{
|
||||
Node *clause = (Node *) lfirst(clauseCell);
|
||||
|
||||
/*
|
||||
* There could never be sublinks here given that it is handled
|
||||
* in subquery pushdown code-path.
|
||||
*/
|
||||
Assert(!IsSublinkClause(clause));
|
||||
|
||||
if (!(IsSelectClause(clause) || IsJoinClause(clause) || or_clause(clause)))
|
||||
{
|
||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
|
@ -2550,33 +2542,6 @@ ValidateClauseList(List *clauseList)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* ValidateSubqueryPushdownClauseList walks over the given list of clauses,
|
||||
* and checks that we can recognize all the clauses. This function ensures
|
||||
* that we do not drop an unsupported clause type on the floor, and thus
|
||||
* prevents erroneous results.
|
||||
*
|
||||
* Note that this function is slightly different than ValidateClauseList(),
|
||||
* additionally allowing sublinks.
|
||||
*/
|
||||
static void
|
||||
ValidateSubqueryPushdownClauseList(List *clauseList)
|
||||
{
|
||||
ListCell *clauseCell = NULL;
|
||||
foreach(clauseCell, clauseList)
|
||||
{
|
||||
Node *clause = (Node *) lfirst(clauseCell);
|
||||
|
||||
if (!(IsSublinkClause(clause) || IsSelectClause(clause) ||
|
||||
IsJoinClause(clause) || or_clause(clause)))
|
||||
{
|
||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("unsupported clause type")));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* JoinClauseList finds the join clauses from the given where clause expression
|
||||
* list, and returns them. The function does not iterate into nested OR clauses
|
||||
|
@ -3056,23 +3021,6 @@ IsSelectClause(Node *clause)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* IsSublinkClause determines if the given node is a sublink or subplan.
|
||||
*/
|
||||
static bool
|
||||
IsSublinkClause(Node *clause)
|
||||
{
|
||||
NodeTag nodeTag = nodeTag(clause);
|
||||
|
||||
if (nodeTag == T_SubLink || nodeTag == T_SubPlan)
|
||||
{
|
||||
return true;
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* MultiProjectNode builds the project node using the target entry information
|
||||
* from the query tree. The project node only encapsulates projected columns,
|
||||
|
@ -3722,7 +3670,6 @@ static MultiNode *
|
|||
SubqueryPushdownMultiNodeTree(Query *queryTree)
|
||||
{
|
||||
List *targetEntryList = queryTree->targetList;
|
||||
List *qualifierList = NIL;
|
||||
List *columnList = NIL;
|
||||
List *targetColumnList = NIL;
|
||||
MultiCollect *subqueryCollectNode = CitusMakeNode(MultiCollect);
|
||||
|
@ -3737,14 +3684,6 @@ SubqueryPushdownMultiNodeTree(Query *queryTree)
|
|||
/* verify we can perform distributed planning on this query */
|
||||
ErrorIfQueryNotSupported(queryTree);
|
||||
|
||||
/*
|
||||
* Extract qualifiers and verify we can plan for them. Note that since
|
||||
* subquery pushdown join planning is based on restriction equivalence,
|
||||
* checking for these qualifiers may not be necessary.
|
||||
*/
|
||||
qualifierList = QualifierList(queryTree->jointree);
|
||||
ValidateSubqueryPushdownClauseList(qualifierList);
|
||||
|
||||
/*
|
||||
* We would be creating a new Query and pushing down top level query's
|
||||
* contents down to it. Join and filter clauses in higher level query would
|
||||
|
|
|
@ -2040,6 +2040,21 @@ CREATE FUNCTION test_join_function_2(integer, integer) RETURNS bool
|
|||
LANGUAGE SQL
|
||||
IMMUTABLE
|
||||
RETURNS NULL ON NULL INPUT;
|
||||
SELECT run_command_on_workers($f$
|
||||
|
||||
CREATE FUNCTION test_join_function_2(integer, integer) RETURNS bool
|
||||
AS 'select $1 > $2;'
|
||||
LANGUAGE SQL
|
||||
IMMUTABLE
|
||||
RETURNS NULL ON NULL INPUT;
|
||||
|
||||
$f$);
|
||||
run_command_on_workers
|
||||
---------------------------------------
|
||||
(localhost,57637,t,"CREATE FUNCTION")
|
||||
(localhost,57638,t,"CREATE FUNCTION")
|
||||
(2 rows)
|
||||
|
||||
-- we don't support joins via functions
|
||||
SELECT user_id, array_length(events_table, 1)
|
||||
FROM (
|
||||
|
@ -2074,6 +2089,125 @@ FROM
|
|||
WHERE
|
||||
users_table.value_1 < 50 AND test_join_function_2(users_table.user_id, temp.user_id);
|
||||
ERROR: unsupported clause type
|
||||
-- we do support the following since there is already an equality on the partition
|
||||
-- key and we have an additional join via a function
|
||||
SELECT
|
||||
temp.user_id, users_table.value_1, prob
|
||||
FROM
|
||||
users_table
|
||||
JOIN
|
||||
(SELECT
|
||||
ma.user_id, (GREATEST(coalesce(ma.value_4 / 250, 0.0) + GREATEST(1.0))) / 2 AS prob, random()
|
||||
FROM
|
||||
users_table AS ma, events_table as short_list
|
||||
WHERE
|
||||
short_list.user_id = ma.user_id and ma.value_1 < 50 and short_list.event_type < 100 AND
|
||||
test_join_function_2(ma.value_1, short_list.value_2)
|
||||
) temp
|
||||
ON users_table.user_id = temp.user_id
|
||||
WHERE
|
||||
users_table.value_1 < 50
|
||||
ORDER BY 2 DESC, 1 DESC
|
||||
LIMIT 10;
|
||||
user_id | value_1 | prob
|
||||
---------+---------+------------------------
|
||||
37 | 49 | 0.50000000000000000000
|
||||
37 | 49 | 0.50000000000000000000
|
||||
37 | 49 | 0.50000000000000000000
|
||||
32 | 49 | 0.50000000000000000000
|
||||
32 | 49 | 0.50000000000000000000
|
||||
98 | 48 | 0.50000000000000000000
|
||||
98 | 48 | 0.50000000000000000000
|
||||
98 | 48 | 0.50000000000000000000
|
||||
96 | 48 | 0.50000000000000000000
|
||||
96 | 48 | 0.50000000000000000000
|
||||
(10 rows)
|
||||
|
||||
-- similarly we do support non equi joins on columns if there is aready
|
||||
-- an equality join
|
||||
SELECT
|
||||
count(*)
|
||||
FROM
|
||||
(SELECT
|
||||
event_type, random()
|
||||
FROM
|
||||
events_table, users_table
|
||||
WHERE
|
||||
events_table.user_id = users_table.user_id AND
|
||||
events_table.time > users_table.time AND
|
||||
events_table.value_2 IN (10, 100)
|
||||
) as foo;
|
||||
count
|
||||
-------
|
||||
1015
|
||||
(1 row)
|
||||
|
||||
-- the other way around is not supported
|
||||
SELECT
|
||||
count(*)
|
||||
FROM
|
||||
(SELECT
|
||||
event_type, random()
|
||||
FROM
|
||||
events_table, users_table
|
||||
WHERE
|
||||
events_table.user_id > users_table.user_id AND
|
||||
events_table.time = users_table.time AND
|
||||
events_table.value_2 IN (10, 100)
|
||||
) as foo;
|
||||
ERROR: cannot pushdown the subquery since all relations are not joined using distribution keys
|
||||
DETAIL: Each relation should be joined with at least one another relation using distribution keys and equality operator.
|
||||
-- we can even allow that on top level joins
|
||||
SELECT
|
||||
count(*)
|
||||
FROM
|
||||
(SELECT
|
||||
event_type, random(), events_table.user_id
|
||||
FROM
|
||||
events_table, users_table
|
||||
WHERE
|
||||
events_table.user_id = users_table.user_id AND
|
||||
events_table.value_2 IN (10, 100)
|
||||
) as foo,
|
||||
(SELECT
|
||||
event_type, random(), events_table.user_id
|
||||
FROM
|
||||
events_table, users_table
|
||||
WHERE
|
||||
events_table.user_id = users_table.user_id AND
|
||||
events_table.value_2 IN (20, 200)
|
||||
) as bar
|
||||
WHERE foo.event_type > bar.event_type
|
||||
AND foo.user_id = bar.user_id;
|
||||
count
|
||||
-------
|
||||
14641
|
||||
(1 row)
|
||||
|
||||
-- note that the following is not supported
|
||||
-- since the top level join is not on the distribution key
|
||||
SELECT
|
||||
count(*)
|
||||
FROM
|
||||
(SELECT
|
||||
event_type, random()
|
||||
FROM
|
||||
events_table, users_table
|
||||
WHERE
|
||||
events_table.user_id = users_table.user_id AND
|
||||
events_table.value_2 IN (10, 100)
|
||||
) as foo,
|
||||
(SELECT
|
||||
event_type, random()
|
||||
FROM
|
||||
events_table, users_table
|
||||
WHERE
|
||||
events_table.user_id = users_table.user_id AND
|
||||
events_table.value_2 IN (20, 200)
|
||||
) as bar
|
||||
WHERE foo.event_type = bar.event_type;
|
||||
ERROR: cannot pushdown the subquery since all relations are not joined using distribution keys
|
||||
DETAIL: Each relation should be joined with at least one another relation using distribution keys and equality operator.
|
||||
-- DISTINCT in the outer query and DISTINCT in the subquery
|
||||
SELECT
|
||||
DISTINCT users_ids.user_id
|
||||
|
@ -2152,5 +2286,16 @@ FROM
|
|||
(5 rows)
|
||||
|
||||
DROP FUNCTION test_join_function_2(integer, integer);
|
||||
SELECT run_command_on_workers($f$
|
||||
|
||||
DROP FUNCTION test_join_function_2(integer, integer);
|
||||
|
||||
$f$);
|
||||
run_command_on_workers
|
||||
-------------------------------------
|
||||
(localhost,57637,t,"DROP FUNCTION")
|
||||
(localhost,57638,t,"DROP FUNCTION")
|
||||
(2 rows)
|
||||
|
||||
SET citus.enable_router_execution TO TRUE;
|
||||
SET citus.subquery_pushdown to OFF;
|
||||
|
|
|
@ -23,7 +23,7 @@ WHERE
|
|||
GROUP BY user_id
|
||||
HAVING count(*) > 66
|
||||
ORDER BY user_id
|
||||
LIMIT 5;
|
||||
LIMIT 5;
|
||||
user_id
|
||||
---------
|
||||
49
|
||||
|
@ -32,6 +32,57 @@ LIMIT 5;
|
|||
63
|
||||
(4 rows)
|
||||
|
||||
-- same query with one additional join on non distribution column
|
||||
SELECT
|
||||
user_id
|
||||
FROM
|
||||
users_table
|
||||
WHERE
|
||||
value_2 >
|
||||
(SELECT
|
||||
max(value_2)
|
||||
FROM
|
||||
events_table
|
||||
WHERE
|
||||
users_table.user_id = events_table.user_id AND event_type = 50 AND
|
||||
users_table.time > events_table.time
|
||||
GROUP BY
|
||||
user_id
|
||||
)
|
||||
GROUP BY user_id
|
||||
HAVING count(*) > 66
|
||||
ORDER BY user_id
|
||||
LIMIT 5;
|
||||
user_id
|
||||
---------
|
||||
55
|
||||
56
|
||||
63
|
||||
(3 rows)
|
||||
|
||||
-- the other way around is not supported
|
||||
SELECT
|
||||
user_id
|
||||
FROM
|
||||
users_table
|
||||
WHERE
|
||||
value_2 >
|
||||
(SELECT
|
||||
max(value_2)
|
||||
FROM
|
||||
events_table
|
||||
WHERE
|
||||
users_table.user_id > events_table.user_id AND event_type = 50 AND
|
||||
users_table.time = events_table.time
|
||||
GROUP BY
|
||||
user_id
|
||||
)
|
||||
GROUP BY user_id
|
||||
HAVING count(*) > 66
|
||||
ORDER BY user_id
|
||||
LIMIT 5;
|
||||
ERROR: cannot pushdown the subquery since all relations are not joined using distribution keys
|
||||
DETAIL: Each relation should be joined with at least one another relation using distribution keys and equality operator.
|
||||
-- subqueries in where with ALL operator
|
||||
SELECT
|
||||
user_id
|
||||
|
|
|
@ -1626,6 +1626,16 @@ CREATE FUNCTION test_join_function_2(integer, integer) RETURNS bool
|
|||
IMMUTABLE
|
||||
RETURNS NULL ON NULL INPUT;
|
||||
|
||||
SELECT run_command_on_workers($f$
|
||||
|
||||
CREATE FUNCTION test_join_function_2(integer, integer) RETURNS bool
|
||||
AS 'select $1 > $2;'
|
||||
LANGUAGE SQL
|
||||
IMMUTABLE
|
||||
RETURNS NULL ON NULL INPUT;
|
||||
|
||||
$f$);
|
||||
|
||||
-- we don't support joins via functions
|
||||
SELECT user_id, array_length(events_table, 1)
|
||||
FROM (
|
||||
|
@ -1659,6 +1669,102 @@ FROM
|
|||
WHERE
|
||||
users_table.value_1 < 50 AND test_join_function_2(users_table.user_id, temp.user_id);
|
||||
|
||||
-- we do support the following since there is already an equality on the partition
|
||||
-- key and we have an additional join via a function
|
||||
SELECT
|
||||
temp.user_id, users_table.value_1, prob
|
||||
FROM
|
||||
users_table
|
||||
JOIN
|
||||
(SELECT
|
||||
ma.user_id, (GREATEST(coalesce(ma.value_4 / 250, 0.0) + GREATEST(1.0))) / 2 AS prob, random()
|
||||
FROM
|
||||
users_table AS ma, events_table as short_list
|
||||
WHERE
|
||||
short_list.user_id = ma.user_id and ma.value_1 < 50 and short_list.event_type < 100 AND
|
||||
test_join_function_2(ma.value_1, short_list.value_2)
|
||||
) temp
|
||||
ON users_table.user_id = temp.user_id
|
||||
WHERE
|
||||
users_table.value_1 < 50
|
||||
ORDER BY 2 DESC, 1 DESC
|
||||
LIMIT 10;
|
||||
|
||||
-- similarly we do support non equi joins on columns if there is aready
|
||||
-- an equality join
|
||||
SELECT
|
||||
count(*)
|
||||
FROM
|
||||
(SELECT
|
||||
event_type, random()
|
||||
FROM
|
||||
events_table, users_table
|
||||
WHERE
|
||||
events_table.user_id = users_table.user_id AND
|
||||
events_table.time > users_table.time AND
|
||||
events_table.value_2 IN (10, 100)
|
||||
) as foo;
|
||||
|
||||
-- the other way around is not supported
|
||||
SELECT
|
||||
count(*)
|
||||
FROM
|
||||
(SELECT
|
||||
event_type, random()
|
||||
FROM
|
||||
events_table, users_table
|
||||
WHERE
|
||||
events_table.user_id > users_table.user_id AND
|
||||
events_table.time = users_table.time AND
|
||||
events_table.value_2 IN (10, 100)
|
||||
) as foo;
|
||||
|
||||
-- we can even allow that on top level joins
|
||||
SELECT
|
||||
count(*)
|
||||
FROM
|
||||
(SELECT
|
||||
event_type, random(), events_table.user_id
|
||||
FROM
|
||||
events_table, users_table
|
||||
WHERE
|
||||
events_table.user_id = users_table.user_id AND
|
||||
events_table.value_2 IN (10, 100)
|
||||
) as foo,
|
||||
(SELECT
|
||||
event_type, random(), events_table.user_id
|
||||
FROM
|
||||
events_table, users_table
|
||||
WHERE
|
||||
events_table.user_id = users_table.user_id AND
|
||||
events_table.value_2 IN (20, 200)
|
||||
) as bar
|
||||
WHERE foo.event_type > bar.event_type
|
||||
AND foo.user_id = bar.user_id;
|
||||
|
||||
-- note that the following is not supported
|
||||
-- since the top level join is not on the distribution key
|
||||
SELECT
|
||||
count(*)
|
||||
FROM
|
||||
(SELECT
|
||||
event_type, random()
|
||||
FROM
|
||||
events_table, users_table
|
||||
WHERE
|
||||
events_table.user_id = users_table.user_id AND
|
||||
events_table.value_2 IN (10, 100)
|
||||
) as foo,
|
||||
(SELECT
|
||||
event_type, random()
|
||||
FROM
|
||||
events_table, users_table
|
||||
WHERE
|
||||
events_table.user_id = users_table.user_id AND
|
||||
events_table.value_2 IN (20, 200)
|
||||
) as bar
|
||||
WHERE foo.event_type = bar.event_type;
|
||||
|
||||
-- DISTINCT in the outer query and DISTINCT in the subquery
|
||||
SELECT
|
||||
DISTINCT users_ids.user_id
|
||||
|
@ -1712,7 +1818,14 @@ FROM
|
|||
ORDER BY 1,2
|
||||
LIMIT 5;
|
||||
|
||||
|
||||
DROP FUNCTION test_join_function_2(integer, integer);
|
||||
|
||||
SELECT run_command_on_workers($f$
|
||||
|
||||
DROP FUNCTION test_join_function_2(integer, integer);
|
||||
|
||||
$f$);
|
||||
|
||||
SET citus.enable_router_execution TO TRUE;
|
||||
SET citus.subquery_pushdown to OFF;
|
||||
|
|
|
@ -24,6 +24,50 @@ WHERE
|
|||
GROUP BY user_id
|
||||
HAVING count(*) > 66
|
||||
ORDER BY user_id
|
||||
LIMIT 5;
|
||||
|
||||
-- same query with one additional join on non distribution column
|
||||
SELECT
|
||||
user_id
|
||||
FROM
|
||||
users_table
|
||||
WHERE
|
||||
value_2 >
|
||||
(SELECT
|
||||
max(value_2)
|
||||
FROM
|
||||
events_table
|
||||
WHERE
|
||||
users_table.user_id = events_table.user_id AND event_type = 50 AND
|
||||
users_table.time > events_table.time
|
||||
GROUP BY
|
||||
user_id
|
||||
)
|
||||
GROUP BY user_id
|
||||
HAVING count(*) > 66
|
||||
ORDER BY user_id
|
||||
LIMIT 5;
|
||||
|
||||
-- the other way around is not supported
|
||||
SELECT
|
||||
user_id
|
||||
FROM
|
||||
users_table
|
||||
WHERE
|
||||
value_2 >
|
||||
(SELECT
|
||||
max(value_2)
|
||||
FROM
|
||||
events_table
|
||||
WHERE
|
||||
users_table.user_id > events_table.user_id AND event_type = 50 AND
|
||||
users_table.time = events_table.time
|
||||
GROUP BY
|
||||
user_id
|
||||
)
|
||||
GROUP BY user_id
|
||||
HAVING count(*) > 66
|
||||
ORDER BY user_id
|
||||
LIMIT 5;
|
||||
|
||||
-- subqueries in where with ALL operator
|
||||
|
|
Loading…
Reference in New Issue