Allow non equi joins on reference tables

pull/3323/head
Jelte Fennema 2019-12-13 16:16:19 +01:00
parent 7642928be1
commit 4233cd0d9d
4 changed files with 99 additions and 28 deletions

View File

@ -227,6 +227,10 @@ JoinOnColumns(Var *currentColumn, Var *candidateColumn, List *joinClauseList)
OpExpr *joinClause = castNode(OpExpr, lfirst(joinClauseCell)); OpExpr *joinClause = castNode(OpExpr, lfirst(joinClauseCell));
Var *leftColumn = LeftColumnOrNULL(joinClause); Var *leftColumn = LeftColumnOrNULL(joinClause);
Var *rightColumn = RightColumnOrNULL(joinClause); Var *rightColumn = RightColumnOrNULL(joinClause);
if (!OperatorImplementsEquality(joinClause->opno))
{
continue;
}
/* /*
* Check if both join columns and both partition key columns match, since the * Check if both join columns and both partition key columns match, since the
@ -1015,6 +1019,10 @@ SinglePartitionJoinClause(Var *partitionColumn, List *applicableJoinClauses)
foreach(applicableJoinClauseCell, applicableJoinClauses) foreach(applicableJoinClauseCell, applicableJoinClauses)
{ {
OpExpr *applicableJoinClause = castNode(OpExpr, lfirst(applicableJoinClauseCell)); OpExpr *applicableJoinClause = castNode(OpExpr, lfirst(applicableJoinClauseCell));
if (!OperatorImplementsEquality(applicableJoinClause->opno))
{
continue;
}
Var *leftColumn = LeftColumnOrNULL(applicableJoinClause); Var *leftColumn = LeftColumnOrNULL(applicableJoinClause);
Var *rightColumn = RightColumnOrNULL(applicableJoinClause); Var *rightColumn = RightColumnOrNULL(applicableJoinClause);
if (leftColumn == NULL || rightColumn == NULL) if (leftColumn == NULL || rightColumn == NULL)
@ -1086,6 +1094,10 @@ DualPartitionJoinClause(List *applicableJoinClauses)
foreach(applicableJoinClauseCell, applicableJoinClauses) foreach(applicableJoinClauseCell, applicableJoinClauses)
{ {
OpExpr *applicableJoinClause = (OpExpr *) lfirst(applicableJoinClauseCell); OpExpr *applicableJoinClause = (OpExpr *) lfirst(applicableJoinClauseCell);
if (!OperatorImplementsEquality(applicableJoinClause->opno))
{
continue;
}
Var *leftColumn = LeftColumnOrNULL(applicableJoinClause); Var *leftColumn = LeftColumnOrNULL(applicableJoinClause);
Var *rightColumn = RightColumnOrNULL(applicableJoinClause); Var *rightColumn = RightColumnOrNULL(applicableJoinClause);

View File

@ -1434,22 +1434,6 @@ IsJoinClause(Node *clause)
return false; return false;
} }
OpExpr *operatorExpression = castNode(OpExpr, clause);
bool equalsOperator = OperatorImplementsEquality(operatorExpression->opno);
if (!equalsOperator)
{
/*
* The single and dual repartition join and local join planners expect the clauses
* to be equi-join to calculate a hash on which to distribute.
*
* In the future we should move this clause to those planners and allow
* non-equi-join's in the reference join and cartesian product. This is tracked in
* https://github.com/citusdata/citus/issues/3198
*/
return false;
}
/* /*
* take all column references from the clause, if we find 2 column references from a * take all column references from the clause, if we find 2 column references from a
* different relation we assume this is a join clause * different relation we assume this is a join clause

View File

@ -185,13 +185,8 @@ DEBUG: Plan 12 query after replacing subqueries and CTEs: UPDATE recursive_dml_
(1 row) (1 row)
-- there is a lateral join (e.g., corrolated subquery) thus the subqueries cannot be -- there is a lateral join (e.g., corrolated subquery) thus the subqueries cannot be
-- recursively planned -- recursively planned, however it can be planned using the repartition planner
UPDATE SET citus.enable_repartition_joins to on;
second_distributed_table
SET
dept = foo.tenant_id::int / 4
FROM
(
SELECT DISTINCT foo_inner_1.tenant_id FROM SELECT DISTINCT foo_inner_1.tenant_id FROM
( (
SELECT SELECT
@ -216,8 +211,57 @@ FROM
second_distributed_table.dept IN (4,5) second_distributed_table.dept IN (4,5)
) foo_inner_2 ) foo_inner_2
ON (foo_inner_2.tenant_id != foo_inner_1.tenant_id) ON (foo_inner_2.tenant_id != foo_inner_1.tenant_id)
ORDER BY foo_inner_1.tenant_id;
tenant_id
-----------
14
24
34
4
44
54
64
74
84
94
(10 rows)
RESET citus.enable_repartition_joins;
-- there is a lateral join (e.g., corrolated subquery) thus the subqueries cannot be
-- recursively planned, this one can not be planned by the repartion planner
-- because of the IN query on a non unique column
UPDATE
second_distributed_table
SET
dept = foo.tenant_id::int / 4
FROM
(
SELECT DISTINCT foo_inner_1.tenant_id FROM
(
SELECT
second_distributed_table.dept, second_distributed_table.tenant_id
FROM
second_distributed_table, distributed_table
WHERE
distributed_table.tenant_id = second_distributed_table.tenant_id
AND
second_distributed_table.dept IN (select dept from second_distributed_table))
foo_inner_1 JOIN LATERAL
(
SELECT
second_distributed_table.tenant_id
FROM
second_distributed_table, distributed_table
WHERE
distributed_table.tenant_id = second_distributed_table.tenant_id
AND foo_inner_1.dept = second_distributed_table.dept
AND
second_distributed_table.dept IN (4,5)
) foo_inner_2
ON (foo_inner_2.tenant_id != foo_inner_1.tenant_id)
) as foo ) as foo
RETURNING *; RETURNING *;
DEBUG: generating subplan 15_1 for subquery SELECT dept FROM recursive_dml_queries.second_distributed_table
ERROR: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator ERROR: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator
-- again a corrolated subquery -- again a corrolated subquery
-- this time distribution key eq. exists -- this time distribution key eq. exists
@ -253,8 +297,8 @@ ERROR: complex joins are only supported when all distributed tables are joined
INSERT INTO INSERT INTO
second_distributed_table (tenant_id, dept) second_distributed_table (tenant_id, dept)
VALUES ('3', (WITH vals AS (SELECT 3) select * from vals)); VALUES ('3', (WITH vals AS (SELECT 3) select * from vals));
DEBUG: generating subplan 18_1 for CTE vals: SELECT 3 DEBUG: generating subplan 20_1 for CTE vals: SELECT 3
DEBUG: Plan 18 query after replacing subqueries and CTEs: INSERT INTO recursive_dml_queries.second_distributed_table (tenant_id, dept) VALUES ('3'::text, (SELECT vals."?column?" FROM (SELECT intermediate_result."?column?" FROM read_intermediate_result('18_1'::text, 'binary'::citus_copy_format) intermediate_result("?column?" integer)) vals)) DEBUG: Plan 20 query after replacing subqueries and CTEs: INSERT INTO recursive_dml_queries.second_distributed_table (tenant_id, dept) VALUES ('3'::text, (SELECT vals."?column?" FROM (SELECT intermediate_result."?column?" FROM read_intermediate_result('20_1'::text, 'binary'::citus_copy_format) intermediate_result("?column?" integer)) vals))
ERROR: subqueries are not supported within INSERT queries ERROR: subqueries are not supported within INSERT queries
HINT: Try rewriting your queries with 'INSERT INTO ... SELECT' syntax. HINT: Try rewriting your queries with 'INSERT INTO ... SELECT' syntax.
INSERT INTO INSERT INTO
@ -277,8 +321,8 @@ UPDATE distributed_table
SET dept = 5 SET dept = 5
FROM cte_1 FROM cte_1
WHERE distributed_table.tenant_id < cte_1.tenant_id; WHERE distributed_table.tenant_id < cte_1.tenant_id;
DEBUG: generating subplan 20_1 for CTE cte_1: WITH cte_2 AS (SELECT second_distributed_table.tenant_id AS cte2_id FROM recursive_dml_queries.second_distributed_table WHERE (second_distributed_table.dept OPERATOR(pg_catalog.>=) 2)) UPDATE recursive_dml_queries.distributed_table SET dept = 10 RETURNING tenant_id, dept, info DEBUG: generating subplan 22_1 for CTE cte_1: WITH cte_2 AS (SELECT second_distributed_table.tenant_id AS cte2_id FROM recursive_dml_queries.second_distributed_table WHERE (second_distributed_table.dept OPERATOR(pg_catalog.>=) 2)) UPDATE recursive_dml_queries.distributed_table SET dept = 10 RETURNING tenant_id, dept, info
DEBUG: Plan 20 query after replacing subqueries and CTEs: UPDATE recursive_dml_queries.distributed_table SET dept = 5 FROM (SELECT intermediate_result.tenant_id, intermediate_result.dept, intermediate_result.info FROM read_intermediate_result('20_1'::text, 'binary'::citus_copy_format) intermediate_result(tenant_id text, dept integer, info jsonb)) cte_1 WHERE (distributed_table.tenant_id OPERATOR(pg_catalog.<) cte_1.tenant_id) DEBUG: Plan 22 query after replacing subqueries and CTEs: UPDATE recursive_dml_queries.distributed_table SET dept = 5 FROM (SELECT intermediate_result.tenant_id, intermediate_result.dept, intermediate_result.info FROM read_intermediate_result('22_1'::text, 'binary'::citus_copy_format) intermediate_result(tenant_id text, dept integer, info jsonb)) cte_1 WHERE (distributed_table.tenant_id OPERATOR(pg_catalog.<) cte_1.tenant_id)
WITH cte_1 AS ( WITH cte_1 AS (
WITH cte_2 AS ( WITH cte_2 AS (
SELECT tenant_id as cte2_id SELECT tenant_id as cte2_id
@ -293,8 +337,8 @@ UPDATE distributed_table
SET dept = 5 SET dept = 5
FROM cte_1 FROM cte_1
WHERE distributed_table.tenant_id < cte_1.tenant_id; WHERE distributed_table.tenant_id < cte_1.tenant_id;
DEBUG: generating subplan 22_1 for CTE cte_1: WITH cte_2 AS (SELECT second_distributed_table.tenant_id AS cte2_id FROM recursive_dml_queries.second_distributed_table WHERE (second_distributed_table.dept OPERATOR(pg_catalog.>=) 2)) UPDATE recursive_dml_queries.distributed_table SET dept = 10 RETURNING tenant_id, dept, info DEBUG: generating subplan 24_1 for CTE cte_1: WITH cte_2 AS (SELECT second_distributed_table.tenant_id AS cte2_id FROM recursive_dml_queries.second_distributed_table WHERE (second_distributed_table.dept OPERATOR(pg_catalog.>=) 2)) UPDATE recursive_dml_queries.distributed_table SET dept = 10 RETURNING tenant_id, dept, info
DEBUG: Plan 22 query after replacing subqueries and CTEs: UPDATE recursive_dml_queries.distributed_table SET dept = 5 FROM (SELECT intermediate_result.tenant_id, intermediate_result.dept, intermediate_result.info FROM read_intermediate_result('22_1'::text, 'binary'::citus_copy_format) intermediate_result(tenant_id text, dept integer, info jsonb)) cte_1 WHERE (distributed_table.tenant_id OPERATOR(pg_catalog.<) cte_1.tenant_id) DEBUG: Plan 24 query after replacing subqueries and CTEs: UPDATE recursive_dml_queries.distributed_table SET dept = 5 FROM (SELECT intermediate_result.tenant_id, intermediate_result.dept, intermediate_result.info FROM read_intermediate_result('24_1'::text, 'binary'::citus_copy_format) intermediate_result(tenant_id text, dept integer, info jsonb)) cte_1 WHERE (distributed_table.tenant_id OPERATOR(pg_catalog.<) cte_1.tenant_id)
-- we don't support updating local table with a join with -- we don't support updating local table with a join with
-- distributed tables -- distributed tables
UPDATE UPDATE

View File

@ -143,7 +143,39 @@ RETURNING
distributed_table.*; distributed_table.*;
-- there is a lateral join (e.g., corrolated subquery) thus the subqueries cannot be -- there is a lateral join (e.g., corrolated subquery) thus the subqueries cannot be
-- recursively planned -- recursively planned, however it can be planned using the repartition planner
SET citus.enable_repartition_joins to on;
SELECT DISTINCT foo_inner_1.tenant_id FROM
(
SELECT
second_distributed_table.dept, second_distributed_table.tenant_id
FROM
second_distributed_table, distributed_table
WHERE
distributed_table.tenant_id = second_distributed_table.tenant_id
AND
second_distributed_table.dept IN (3,4)
)
foo_inner_1 JOIN LATERAL
(
SELECT
second_distributed_table.tenant_id
FROM
second_distributed_table, distributed_table
WHERE
distributed_table.tenant_id = second_distributed_table.tenant_id
AND foo_inner_1.dept = second_distributed_table.dept
AND
second_distributed_table.dept IN (4,5)
) foo_inner_2
ON (foo_inner_2.tenant_id != foo_inner_1.tenant_id)
ORDER BY foo_inner_1.tenant_id;
RESET citus.enable_repartition_joins;
-- there is a lateral join (e.g., corrolated subquery) thus the subqueries cannot be
-- recursively planned, this one can not be planned by the repartion planner
-- because of the IN query on a non unique column
UPDATE UPDATE
second_distributed_table second_distributed_table
SET SET
@ -159,8 +191,7 @@ FROM
WHERE WHERE
distributed_table.tenant_id = second_distributed_table.tenant_id distributed_table.tenant_id = second_distributed_table.tenant_id
AND AND
second_distributed_table.dept IN (3,4) second_distributed_table.dept IN (select dept from second_distributed_table))
)
foo_inner_1 JOIN LATERAL foo_inner_1 JOIN LATERAL
( (
SELECT SELECT