diff --git a/src/backend/distributed/planner/multi_join_order.c b/src/backend/distributed/planner/multi_join_order.c index 57e54aeba..f9d25db1d 100644 --- a/src/backend/distributed/planner/multi_join_order.c +++ b/src/backend/distributed/planner/multi_join_order.c @@ -227,6 +227,10 @@ JoinOnColumns(Var *currentColumn, Var *candidateColumn, List *joinClauseList) OpExpr *joinClause = castNode(OpExpr, lfirst(joinClauseCell)); Var *leftColumn = LeftColumnOrNULL(joinClause); Var *rightColumn = RightColumnOrNULL(joinClause); + if (!OperatorImplementsEquality(joinClause->opno)) + { + continue; + } /* * Check if both join columns and both partition key columns match, since the @@ -1015,6 +1019,10 @@ SinglePartitionJoinClause(Var *partitionColumn, List *applicableJoinClauses) foreach(applicableJoinClauseCell, applicableJoinClauses) { OpExpr *applicableJoinClause = castNode(OpExpr, lfirst(applicableJoinClauseCell)); + if (!OperatorImplementsEquality(applicableJoinClause->opno)) + { + continue; + } Var *leftColumn = LeftColumnOrNULL(applicableJoinClause); Var *rightColumn = RightColumnOrNULL(applicableJoinClause); if (leftColumn == NULL || rightColumn == NULL) @@ -1086,6 +1094,10 @@ DualPartitionJoinClause(List *applicableJoinClauses) foreach(applicableJoinClauseCell, applicableJoinClauses) { OpExpr *applicableJoinClause = (OpExpr *) lfirst(applicableJoinClauseCell); + if (!OperatorImplementsEquality(applicableJoinClause->opno)) + { + continue; + } Var *leftColumn = LeftColumnOrNULL(applicableJoinClause); Var *rightColumn = RightColumnOrNULL(applicableJoinClause); diff --git a/src/backend/distributed/planner/multi_logical_planner.c b/src/backend/distributed/planner/multi_logical_planner.c index 4aa53c2e1..44832dd71 100644 --- a/src/backend/distributed/planner/multi_logical_planner.c +++ b/src/backend/distributed/planner/multi_logical_planner.c @@ -1434,22 +1434,6 @@ IsJoinClause(Node *clause) return false; } - OpExpr *operatorExpression = castNode(OpExpr, clause); - bool equalsOperator = OperatorImplementsEquality(operatorExpression->opno); - - if (!equalsOperator) - { - /* - * The single and dual repartition join and local join planners expect the clauses - * to be equi-join to calculate a hash on which to distribute. - * - * In the future we should move this clause to those planners and allow - * non-equi-join's in the reference join and cartesian product. This is tracked in - * https://github.com/citusdata/citus/issues/3198 - */ - return false; - } - /* * take all column references from the clause, if we find 2 column references from a * different relation we assume this is a join clause diff --git a/src/test/regress/expected/dml_recursive.out b/src/test/regress/expected/dml_recursive.out index d096e78ea..1116bb750 100644 --- a/src/test/regress/expected/dml_recursive.out +++ b/src/test/regress/expected/dml_recursive.out @@ -185,7 +185,51 @@ DEBUG: Plan 12 query after replacing subqueries and CTEs: UPDATE recursive_dml_ (1 row) -- there is a lateral join (e.g., corrolated subquery) thus the subqueries cannot be --- recursively planned +-- recursively planned, however it can be planned using the repartition planner +SET citus.enable_repartition_joins to on; +SELECT DISTINCT foo_inner_1.tenant_id FROM +( + SELECT + second_distributed_table.dept, second_distributed_table.tenant_id + FROM + second_distributed_table, distributed_table + WHERE + distributed_table.tenant_id = second_distributed_table.tenant_id + AND + second_distributed_table.dept IN (3,4) +) +foo_inner_1 JOIN LATERAL +( + SELECT + second_distributed_table.tenant_id + FROM + second_distributed_table, distributed_table + WHERE + distributed_table.tenant_id = second_distributed_table.tenant_id + AND foo_inner_1.dept = second_distributed_table.dept + AND + second_distributed_table.dept IN (4,5) +) foo_inner_2 +ON (foo_inner_2.tenant_id != foo_inner_1.tenant_id) +ORDER BY foo_inner_1.tenant_id; + tenant_id +----------- + 14 + 24 + 34 + 4 + 44 + 54 + 64 + 74 + 84 + 94 +(10 rows) + +RESET citus.enable_repartition_joins; +-- there is a lateral join (e.g., corrolated subquery) thus the subqueries cannot be +-- recursively planned, this one can not be planned by the repartion planner +-- because of the IN query on a non unique column UPDATE second_distributed_table SET @@ -201,8 +245,7 @@ FROM WHERE distributed_table.tenant_id = second_distributed_table.tenant_id AND - second_distributed_table.dept IN (3,4) - ) + second_distributed_table.dept IN (select dept from second_distributed_table)) foo_inner_1 JOIN LATERAL ( SELECT @@ -218,6 +261,7 @@ FROM ON (foo_inner_2.tenant_id != foo_inner_1.tenant_id) ) as foo RETURNING *; +DEBUG: generating subplan 15_1 for subquery SELECT dept FROM recursive_dml_queries.second_distributed_table ERROR: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator -- again a corrolated subquery -- this time distribution key eq. exists @@ -253,8 +297,8 @@ ERROR: complex joins are only supported when all distributed tables are joined INSERT INTO second_distributed_table (tenant_id, dept) VALUES ('3', (WITH vals AS (SELECT 3) select * from vals)); -DEBUG: generating subplan 18_1 for CTE vals: SELECT 3 -DEBUG: Plan 18 query after replacing subqueries and CTEs: INSERT INTO recursive_dml_queries.second_distributed_table (tenant_id, dept) VALUES ('3'::text, (SELECT vals."?column?" FROM (SELECT intermediate_result."?column?" FROM read_intermediate_result('18_1'::text, 'binary'::citus_copy_format) intermediate_result("?column?" integer)) vals)) +DEBUG: generating subplan 20_1 for CTE vals: SELECT 3 +DEBUG: Plan 20 query after replacing subqueries and CTEs: INSERT INTO recursive_dml_queries.second_distributed_table (tenant_id, dept) VALUES ('3'::text, (SELECT vals."?column?" FROM (SELECT intermediate_result."?column?" FROM read_intermediate_result('20_1'::text, 'binary'::citus_copy_format) intermediate_result("?column?" integer)) vals)) ERROR: subqueries are not supported within INSERT queries HINT: Try rewriting your queries with 'INSERT INTO ... SELECT' syntax. INSERT INTO @@ -277,8 +321,8 @@ UPDATE distributed_table SET dept = 5 FROM cte_1 WHERE distributed_table.tenant_id < cte_1.tenant_id; -DEBUG: generating subplan 20_1 for CTE cte_1: WITH cte_2 AS (SELECT second_distributed_table.tenant_id AS cte2_id FROM recursive_dml_queries.second_distributed_table WHERE (second_distributed_table.dept OPERATOR(pg_catalog.>=) 2)) UPDATE recursive_dml_queries.distributed_table SET dept = 10 RETURNING tenant_id, dept, info -DEBUG: Plan 20 query after replacing subqueries and CTEs: UPDATE recursive_dml_queries.distributed_table SET dept = 5 FROM (SELECT intermediate_result.tenant_id, intermediate_result.dept, intermediate_result.info FROM read_intermediate_result('20_1'::text, 'binary'::citus_copy_format) intermediate_result(tenant_id text, dept integer, info jsonb)) cte_1 WHERE (distributed_table.tenant_id OPERATOR(pg_catalog.<) cte_1.tenant_id) +DEBUG: generating subplan 22_1 for CTE cte_1: WITH cte_2 AS (SELECT second_distributed_table.tenant_id AS cte2_id FROM recursive_dml_queries.second_distributed_table WHERE (second_distributed_table.dept OPERATOR(pg_catalog.>=) 2)) UPDATE recursive_dml_queries.distributed_table SET dept = 10 RETURNING tenant_id, dept, info +DEBUG: Plan 22 query after replacing subqueries and CTEs: UPDATE recursive_dml_queries.distributed_table SET dept = 5 FROM (SELECT intermediate_result.tenant_id, intermediate_result.dept, intermediate_result.info FROM read_intermediate_result('22_1'::text, 'binary'::citus_copy_format) intermediate_result(tenant_id text, dept integer, info jsonb)) cte_1 WHERE (distributed_table.tenant_id OPERATOR(pg_catalog.<) cte_1.tenant_id) WITH cte_1 AS ( WITH cte_2 AS ( SELECT tenant_id as cte2_id @@ -293,8 +337,8 @@ UPDATE distributed_table SET dept = 5 FROM cte_1 WHERE distributed_table.tenant_id < cte_1.tenant_id; -DEBUG: generating subplan 22_1 for CTE cte_1: WITH cte_2 AS (SELECT second_distributed_table.tenant_id AS cte2_id FROM recursive_dml_queries.second_distributed_table WHERE (second_distributed_table.dept OPERATOR(pg_catalog.>=) 2)) UPDATE recursive_dml_queries.distributed_table SET dept = 10 RETURNING tenant_id, dept, info -DEBUG: Plan 22 query after replacing subqueries and CTEs: UPDATE recursive_dml_queries.distributed_table SET dept = 5 FROM (SELECT intermediate_result.tenant_id, intermediate_result.dept, intermediate_result.info FROM read_intermediate_result('22_1'::text, 'binary'::citus_copy_format) intermediate_result(tenant_id text, dept integer, info jsonb)) cte_1 WHERE (distributed_table.tenant_id OPERATOR(pg_catalog.<) cte_1.tenant_id) +DEBUG: generating subplan 24_1 for CTE cte_1: WITH cte_2 AS (SELECT second_distributed_table.tenant_id AS cte2_id FROM recursive_dml_queries.second_distributed_table WHERE (second_distributed_table.dept OPERATOR(pg_catalog.>=) 2)) UPDATE recursive_dml_queries.distributed_table SET dept = 10 RETURNING tenant_id, dept, info +DEBUG: Plan 24 query after replacing subqueries and CTEs: UPDATE recursive_dml_queries.distributed_table SET dept = 5 FROM (SELECT intermediate_result.tenant_id, intermediate_result.dept, intermediate_result.info FROM read_intermediate_result('24_1'::text, 'binary'::citus_copy_format) intermediate_result(tenant_id text, dept integer, info jsonb)) cte_1 WHERE (distributed_table.tenant_id OPERATOR(pg_catalog.<) cte_1.tenant_id) -- we don't support updating local table with a join with -- distributed tables UPDATE diff --git a/src/test/regress/sql/dml_recursive.sql b/src/test/regress/sql/dml_recursive.sql index 115ff1217..f7ee3516d 100644 --- a/src/test/regress/sql/dml_recursive.sql +++ b/src/test/regress/sql/dml_recursive.sql @@ -143,7 +143,39 @@ RETURNING distributed_table.*; -- there is a lateral join (e.g., corrolated subquery) thus the subqueries cannot be --- recursively planned +-- recursively planned, however it can be planned using the repartition planner +SET citus.enable_repartition_joins to on; +SELECT DISTINCT foo_inner_1.tenant_id FROM +( + SELECT + second_distributed_table.dept, second_distributed_table.tenant_id + FROM + second_distributed_table, distributed_table + WHERE + distributed_table.tenant_id = second_distributed_table.tenant_id + AND + second_distributed_table.dept IN (3,4) +) +foo_inner_1 JOIN LATERAL +( + SELECT + second_distributed_table.tenant_id + FROM + second_distributed_table, distributed_table + WHERE + distributed_table.tenant_id = second_distributed_table.tenant_id + AND foo_inner_1.dept = second_distributed_table.dept + AND + second_distributed_table.dept IN (4,5) +) foo_inner_2 +ON (foo_inner_2.tenant_id != foo_inner_1.tenant_id) +ORDER BY foo_inner_1.tenant_id; +RESET citus.enable_repartition_joins; + + +-- there is a lateral join (e.g., corrolated subquery) thus the subqueries cannot be +-- recursively planned, this one can not be planned by the repartion planner +-- because of the IN query on a non unique column UPDATE second_distributed_table SET @@ -159,8 +191,7 @@ FROM WHERE distributed_table.tenant_id = second_distributed_table.tenant_id AND - second_distributed_table.dept IN (3,4) - ) + second_distributed_table.dept IN (select dept from second_distributed_table)) foo_inner_1 JOIN LATERAL ( SELECT