From 4233cd0d9d494583c9bbe4e513721ba3986c8a7e Mon Sep 17 00:00:00 2001 From: Jelte Fennema Date: Fri, 13 Dec 2019 16:16:19 +0100 Subject: [PATCH 1/4] Allow non equi joins on reference tables --- .../distributed/planner/multi_join_order.c | 12 ++++ .../planner/multi_logical_planner.c | 16 ----- src/test/regress/expected/dml_recursive.out | 62 ++++++++++++++++--- src/test/regress/sql/dml_recursive.sql | 37 ++++++++++- 4 files changed, 99 insertions(+), 28 deletions(-) diff --git a/src/backend/distributed/planner/multi_join_order.c b/src/backend/distributed/planner/multi_join_order.c index 57e54aeba..f9d25db1d 100644 --- a/src/backend/distributed/planner/multi_join_order.c +++ b/src/backend/distributed/planner/multi_join_order.c @@ -227,6 +227,10 @@ JoinOnColumns(Var *currentColumn, Var *candidateColumn, List *joinClauseList) OpExpr *joinClause = castNode(OpExpr, lfirst(joinClauseCell)); Var *leftColumn = LeftColumnOrNULL(joinClause); Var *rightColumn = RightColumnOrNULL(joinClause); + if (!OperatorImplementsEquality(joinClause->opno)) + { + continue; + } /* * Check if both join columns and both partition key columns match, since the @@ -1015,6 +1019,10 @@ SinglePartitionJoinClause(Var *partitionColumn, List *applicableJoinClauses) foreach(applicableJoinClauseCell, applicableJoinClauses) { OpExpr *applicableJoinClause = castNode(OpExpr, lfirst(applicableJoinClauseCell)); + if (!OperatorImplementsEquality(applicableJoinClause->opno)) + { + continue; + } Var *leftColumn = LeftColumnOrNULL(applicableJoinClause); Var *rightColumn = RightColumnOrNULL(applicableJoinClause); if (leftColumn == NULL || rightColumn == NULL) @@ -1086,6 +1094,10 @@ DualPartitionJoinClause(List *applicableJoinClauses) foreach(applicableJoinClauseCell, applicableJoinClauses) { OpExpr *applicableJoinClause = (OpExpr *) lfirst(applicableJoinClauseCell); + if (!OperatorImplementsEquality(applicableJoinClause->opno)) + { + continue; + } Var *leftColumn = LeftColumnOrNULL(applicableJoinClause); Var *rightColumn = RightColumnOrNULL(applicableJoinClause); diff --git a/src/backend/distributed/planner/multi_logical_planner.c b/src/backend/distributed/planner/multi_logical_planner.c index 4aa53c2e1..44832dd71 100644 --- a/src/backend/distributed/planner/multi_logical_planner.c +++ b/src/backend/distributed/planner/multi_logical_planner.c @@ -1434,22 +1434,6 @@ IsJoinClause(Node *clause) return false; } - OpExpr *operatorExpression = castNode(OpExpr, clause); - bool equalsOperator = OperatorImplementsEquality(operatorExpression->opno); - - if (!equalsOperator) - { - /* - * The single and dual repartition join and local join planners expect the clauses - * to be equi-join to calculate a hash on which to distribute. - * - * In the future we should move this clause to those planners and allow - * non-equi-join's in the reference join and cartesian product. This is tracked in - * https://github.com/citusdata/citus/issues/3198 - */ - return false; - } - /* * take all column references from the clause, if we find 2 column references from a * different relation we assume this is a join clause diff --git a/src/test/regress/expected/dml_recursive.out b/src/test/regress/expected/dml_recursive.out index d096e78ea..1116bb750 100644 --- a/src/test/regress/expected/dml_recursive.out +++ b/src/test/regress/expected/dml_recursive.out @@ -185,7 +185,51 @@ DEBUG: Plan 12 query after replacing subqueries and CTEs: UPDATE recursive_dml_ (1 row) -- there is a lateral join (e.g., corrolated subquery) thus the subqueries cannot be --- recursively planned +-- recursively planned, however it can be planned using the repartition planner +SET citus.enable_repartition_joins to on; +SELECT DISTINCT foo_inner_1.tenant_id FROM +( + SELECT + second_distributed_table.dept, second_distributed_table.tenant_id + FROM + second_distributed_table, distributed_table + WHERE + distributed_table.tenant_id = second_distributed_table.tenant_id + AND + second_distributed_table.dept IN (3,4) +) +foo_inner_1 JOIN LATERAL +( + SELECT + second_distributed_table.tenant_id + FROM + second_distributed_table, distributed_table + WHERE + distributed_table.tenant_id = second_distributed_table.tenant_id + AND foo_inner_1.dept = second_distributed_table.dept + AND + second_distributed_table.dept IN (4,5) +) foo_inner_2 +ON (foo_inner_2.tenant_id != foo_inner_1.tenant_id) +ORDER BY foo_inner_1.tenant_id; + tenant_id +----------- + 14 + 24 + 34 + 4 + 44 + 54 + 64 + 74 + 84 + 94 +(10 rows) + +RESET citus.enable_repartition_joins; +-- there is a lateral join (e.g., corrolated subquery) thus the subqueries cannot be +-- recursively planned, this one can not be planned by the repartion planner +-- because of the IN query on a non unique column UPDATE second_distributed_table SET @@ -201,8 +245,7 @@ FROM WHERE distributed_table.tenant_id = second_distributed_table.tenant_id AND - second_distributed_table.dept IN (3,4) - ) + second_distributed_table.dept IN (select dept from second_distributed_table)) foo_inner_1 JOIN LATERAL ( SELECT @@ -218,6 +261,7 @@ FROM ON (foo_inner_2.tenant_id != foo_inner_1.tenant_id) ) as foo RETURNING *; +DEBUG: generating subplan 15_1 for subquery SELECT dept FROM recursive_dml_queries.second_distributed_table ERROR: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator -- again a corrolated subquery -- this time distribution key eq. exists @@ -253,8 +297,8 @@ ERROR: complex joins are only supported when all distributed tables are joined INSERT INTO second_distributed_table (tenant_id, dept) VALUES ('3', (WITH vals AS (SELECT 3) select * from vals)); -DEBUG: generating subplan 18_1 for CTE vals: SELECT 3 -DEBUG: Plan 18 query after replacing subqueries and CTEs: INSERT INTO recursive_dml_queries.second_distributed_table (tenant_id, dept) VALUES ('3'::text, (SELECT vals."?column?" FROM (SELECT intermediate_result."?column?" FROM read_intermediate_result('18_1'::text, 'binary'::citus_copy_format) intermediate_result("?column?" integer)) vals)) +DEBUG: generating subplan 20_1 for CTE vals: SELECT 3 +DEBUG: Plan 20 query after replacing subqueries and CTEs: INSERT INTO recursive_dml_queries.second_distributed_table (tenant_id, dept) VALUES ('3'::text, (SELECT vals."?column?" FROM (SELECT intermediate_result."?column?" FROM read_intermediate_result('20_1'::text, 'binary'::citus_copy_format) intermediate_result("?column?" integer)) vals)) ERROR: subqueries are not supported within INSERT queries HINT: Try rewriting your queries with 'INSERT INTO ... SELECT' syntax. INSERT INTO @@ -277,8 +321,8 @@ UPDATE distributed_table SET dept = 5 FROM cte_1 WHERE distributed_table.tenant_id < cte_1.tenant_id; -DEBUG: generating subplan 20_1 for CTE cte_1: WITH cte_2 AS (SELECT second_distributed_table.tenant_id AS cte2_id FROM recursive_dml_queries.second_distributed_table WHERE (second_distributed_table.dept OPERATOR(pg_catalog.>=) 2)) UPDATE recursive_dml_queries.distributed_table SET dept = 10 RETURNING tenant_id, dept, info -DEBUG: Plan 20 query after replacing subqueries and CTEs: UPDATE recursive_dml_queries.distributed_table SET dept = 5 FROM (SELECT intermediate_result.tenant_id, intermediate_result.dept, intermediate_result.info FROM read_intermediate_result('20_1'::text, 'binary'::citus_copy_format) intermediate_result(tenant_id text, dept integer, info jsonb)) cte_1 WHERE (distributed_table.tenant_id OPERATOR(pg_catalog.<) cte_1.tenant_id) +DEBUG: generating subplan 22_1 for CTE cte_1: WITH cte_2 AS (SELECT second_distributed_table.tenant_id AS cte2_id FROM recursive_dml_queries.second_distributed_table WHERE (second_distributed_table.dept OPERATOR(pg_catalog.>=) 2)) UPDATE recursive_dml_queries.distributed_table SET dept = 10 RETURNING tenant_id, dept, info +DEBUG: Plan 22 query after replacing subqueries and CTEs: UPDATE recursive_dml_queries.distributed_table SET dept = 5 FROM (SELECT intermediate_result.tenant_id, intermediate_result.dept, intermediate_result.info FROM read_intermediate_result('22_1'::text, 'binary'::citus_copy_format) intermediate_result(tenant_id text, dept integer, info jsonb)) cte_1 WHERE (distributed_table.tenant_id OPERATOR(pg_catalog.<) cte_1.tenant_id) WITH cte_1 AS ( WITH cte_2 AS ( SELECT tenant_id as cte2_id @@ -293,8 +337,8 @@ UPDATE distributed_table SET dept = 5 FROM cte_1 WHERE distributed_table.tenant_id < cte_1.tenant_id; -DEBUG: generating subplan 22_1 for CTE cte_1: WITH cte_2 AS (SELECT second_distributed_table.tenant_id AS cte2_id FROM recursive_dml_queries.second_distributed_table WHERE (second_distributed_table.dept OPERATOR(pg_catalog.>=) 2)) UPDATE recursive_dml_queries.distributed_table SET dept = 10 RETURNING tenant_id, dept, info -DEBUG: Plan 22 query after replacing subqueries and CTEs: UPDATE recursive_dml_queries.distributed_table SET dept = 5 FROM (SELECT intermediate_result.tenant_id, intermediate_result.dept, intermediate_result.info FROM read_intermediate_result('22_1'::text, 'binary'::citus_copy_format) intermediate_result(tenant_id text, dept integer, info jsonb)) cte_1 WHERE (distributed_table.tenant_id OPERATOR(pg_catalog.<) cte_1.tenant_id) +DEBUG: generating subplan 24_1 for CTE cte_1: WITH cte_2 AS (SELECT second_distributed_table.tenant_id AS cte2_id FROM recursive_dml_queries.second_distributed_table WHERE (second_distributed_table.dept OPERATOR(pg_catalog.>=) 2)) UPDATE recursive_dml_queries.distributed_table SET dept = 10 RETURNING tenant_id, dept, info +DEBUG: Plan 24 query after replacing subqueries and CTEs: UPDATE recursive_dml_queries.distributed_table SET dept = 5 FROM (SELECT intermediate_result.tenant_id, intermediate_result.dept, intermediate_result.info FROM read_intermediate_result('24_1'::text, 'binary'::citus_copy_format) intermediate_result(tenant_id text, dept integer, info jsonb)) cte_1 WHERE (distributed_table.tenant_id OPERATOR(pg_catalog.<) cte_1.tenant_id) -- we don't support updating local table with a join with -- distributed tables UPDATE diff --git a/src/test/regress/sql/dml_recursive.sql b/src/test/regress/sql/dml_recursive.sql index 115ff1217..f7ee3516d 100644 --- a/src/test/regress/sql/dml_recursive.sql +++ b/src/test/regress/sql/dml_recursive.sql @@ -143,7 +143,39 @@ RETURNING distributed_table.*; -- there is a lateral join (e.g., corrolated subquery) thus the subqueries cannot be --- recursively planned +-- recursively planned, however it can be planned using the repartition planner +SET citus.enable_repartition_joins to on; +SELECT DISTINCT foo_inner_1.tenant_id FROM +( + SELECT + second_distributed_table.dept, second_distributed_table.tenant_id + FROM + second_distributed_table, distributed_table + WHERE + distributed_table.tenant_id = second_distributed_table.tenant_id + AND + second_distributed_table.dept IN (3,4) +) +foo_inner_1 JOIN LATERAL +( + SELECT + second_distributed_table.tenant_id + FROM + second_distributed_table, distributed_table + WHERE + distributed_table.tenant_id = second_distributed_table.tenant_id + AND foo_inner_1.dept = second_distributed_table.dept + AND + second_distributed_table.dept IN (4,5) +) foo_inner_2 +ON (foo_inner_2.tenant_id != foo_inner_1.tenant_id) +ORDER BY foo_inner_1.tenant_id; +RESET citus.enable_repartition_joins; + + +-- there is a lateral join (e.g., corrolated subquery) thus the subqueries cannot be +-- recursively planned, this one can not be planned by the repartion planner +-- because of the IN query on a non unique column UPDATE second_distributed_table SET @@ -159,8 +191,7 @@ FROM WHERE distributed_table.tenant_id = second_distributed_table.tenant_id AND - second_distributed_table.dept IN (3,4) - ) + second_distributed_table.dept IN (select dept from second_distributed_table)) foo_inner_1 JOIN LATERAL ( SELECT From 61e25016451e41de35052cfe43eb15712fedd5d8 Mon Sep 17 00:00:00 2001 From: Jelte Fennema Date: Mon, 16 Dec 2019 14:13:09 +0100 Subject: [PATCH 2/4] Make any expression with two or more tables a join expression --- .../distributed/planner/multi_join_order.c | 66 +++++++++++-------- .../planner/multi_logical_planner.c | 7 +- .../planner/multi_physical_planner.c | 22 ++++--- src/include/distributed/multi_join_order.h | 5 +- 4 files changed, 59 insertions(+), 41 deletions(-) diff --git a/src/backend/distributed/planner/multi_join_order.c b/src/backend/distributed/planner/multi_join_order.c index f9d25db1d..bb39d5678 100644 --- a/src/backend/distributed/planner/multi_join_order.c +++ b/src/backend/distributed/planner/multi_join_order.c @@ -212,7 +212,6 @@ ExtractLeftMostRangeTableIndex(Node *node, int *rangeTableIndex) static bool JoinOnColumns(Var *currentColumn, Var *candidateColumn, List *joinClauseList) { - ListCell *joinClauseCell = NULL; if (currentColumn == NULL || candidateColumn == NULL) { /* @@ -222,15 +221,16 @@ JoinOnColumns(Var *currentColumn, Var *candidateColumn, List *joinClauseList) return false; } - foreach(joinClauseCell, joinClauseList) + Node *joinClause = NULL; + foreach_ptr(joinClause, joinClauseList) { - OpExpr *joinClause = castNode(OpExpr, lfirst(joinClauseCell)); - Var *leftColumn = LeftColumnOrNULL(joinClause); - Var *rightColumn = RightColumnOrNULL(joinClause); - if (!OperatorImplementsEquality(joinClause->opno)) + if (!NodeIsEqualsOpExpr(joinClause)) { continue; } + OpExpr *joinClauseOpExpr = castNode(OpExpr, joinClause); + Var *leftColumn = LeftColumnOrNULL(joinClauseOpExpr); + Var *rightColumn = RightColumnOrNULL(joinClauseOpExpr); /* * Check if both join columns and both partition key columns match, since the @@ -253,6 +253,22 @@ JoinOnColumns(Var *currentColumn, Var *candidateColumn, List *joinClauseList) } +/* + * NodeIsEqualsOpExpr checks if the node is an OpExpr, where the operator + * matches OperatorImplementsEquality. + */ +bool +NodeIsEqualsOpExpr(Node *node) +{ + if (!IsA(node, OpExpr)) + { + return false; + } + OpExpr *opExpr = castNode(OpExpr, node); + return OperatorImplementsEquality(opExpr->opno); +} + + /* * JoinOrderList calculates the best join order and join rules that apply given * the list of tables and join clauses. First, the function generates a set of @@ -1010,21 +1026,21 @@ SinglePartitionJoin(JoinOrderNode *currentJoinNode, TableEntry *candidateTable, OpExpr * SinglePartitionJoinClause(Var *partitionColumn, List *applicableJoinClauses) { - ListCell *applicableJoinClauseCell = NULL; if (partitionColumn == NULL) { return NULL; } - foreach(applicableJoinClauseCell, applicableJoinClauses) + Node *applicableJoinClause = NULL; + foreach_ptr(applicableJoinClause, applicableJoinClauses) { - OpExpr *applicableJoinClause = castNode(OpExpr, lfirst(applicableJoinClauseCell)); - if (!OperatorImplementsEquality(applicableJoinClause->opno)) + if (!NodeIsEqualsOpExpr(applicableJoinClause)) { continue; } - Var *leftColumn = LeftColumnOrNULL(applicableJoinClause); - Var *rightColumn = RightColumnOrNULL(applicableJoinClause); + OpExpr *applicableJoinOpExpr = castNode(OpExpr, applicableJoinClause); + Var *leftColumn = LeftColumnOrNULL(applicableJoinOpExpr); + Var *rightColumn = RightColumnOrNULL(applicableJoinOpExpr); if (leftColumn == NULL || rightColumn == NULL) { /* not a simple partition column join */ @@ -1042,7 +1058,7 @@ SinglePartitionJoinClause(Var *partitionColumn, List *applicableJoinClauses) { if (leftColumn->vartype == rightColumn->vartype) { - return applicableJoinClause; + return applicableJoinOpExpr; } else { @@ -1089,17 +1105,16 @@ DualPartitionJoin(JoinOrderNode *currentJoinNode, TableEntry *candidateTable, OpExpr * DualPartitionJoinClause(List *applicableJoinClauses) { - ListCell *applicableJoinClauseCell = NULL; - - foreach(applicableJoinClauseCell, applicableJoinClauses) + Node *applicableJoinClause = NULL; + foreach_ptr(applicableJoinClause, applicableJoinClauses) { - OpExpr *applicableJoinClause = (OpExpr *) lfirst(applicableJoinClauseCell); - if (!OperatorImplementsEquality(applicableJoinClause->opno)) + if (!NodeIsEqualsOpExpr(applicableJoinClause)) { continue; } - Var *leftColumn = LeftColumnOrNULL(applicableJoinClause); - Var *rightColumn = RightColumnOrNULL(applicableJoinClause); + OpExpr *applicableJoinOpExpr = castNode(OpExpr, applicableJoinClause); + Var *leftColumn = LeftColumnOrNULL(applicableJoinOpExpr); + Var *rightColumn = RightColumnOrNULL(applicableJoinOpExpr); if (leftColumn == NULL || rightColumn == NULL) { @@ -1109,7 +1124,7 @@ DualPartitionJoinClause(List *applicableJoinClauses) /* we only need to check that the join column types match */ if (leftColumn->vartype == rightColumn->vartype) { - return applicableJoinClause; + return applicableJoinOpExpr; } else { @@ -1170,9 +1185,9 @@ MakeJoinOrderNode(TableEntry *tableEntry, JoinRuleType joinRuleType, * in either the list of tables on the left *or* in the right hand table. */ bool -IsApplicableJoinClause(List *leftTableIdList, uint32 rightTableId, OpExpr *joinClause) +IsApplicableJoinClause(List *leftTableIdList, uint32 rightTableId, Node *joinClause) { - List *varList = pull_var_clause_default((Node *) joinClause); + List *varList = pull_var_clause_default(joinClause); Var *var = NULL; bool joinContainsRightTable = false; foreach_ptr(var, varList) @@ -1208,15 +1223,14 @@ IsApplicableJoinClause(List *leftTableIdList, uint32 rightTableId, OpExpr *joinC List * ApplicableJoinClauses(List *leftTableIdList, uint32 rightTableId, List *joinClauseList) { - ListCell *joinClauseCell = NULL; List *applicableJoinClauses = NIL; /* make sure joinClauseList contains only join clauses */ joinClauseList = JoinClauseList(joinClauseList); - foreach(joinClauseCell, joinClauseList) + Node *joinClause = NULL; + foreach_ptr(joinClause, joinClauseList) { - OpExpr *joinClause = castNode(OpExpr, lfirst(joinClauseCell)); if (IsApplicableJoinClause(leftTableIdList, rightTableId, joinClause)) { applicableJoinClauses = lappend(applicableJoinClauses, joinClause); diff --git a/src/backend/distributed/planner/multi_logical_planner.c b/src/backend/distributed/planner/multi_logical_planner.c index 44832dd71..1372086c5 100644 --- a/src/backend/distributed/planner/multi_logical_planner.c +++ b/src/backend/distributed/planner/multi_logical_planner.c @@ -1429,11 +1429,6 @@ IsJoinClause(Node *clause) { Var *var = NULL; - if (!IsA(clause, OpExpr)) - { - return false; - } - /* * take all column references from the clause, if we find 2 column references from a * different relation we assume this is a join clause @@ -1689,7 +1684,7 @@ MultiSelectNode(List *whereClauseList) foreach(whereClauseCell, whereClauseList) { Node *whereClause = (Node *) lfirst(whereClauseCell); - if (IsSelectClause(whereClause) || or_clause(whereClause)) + if (IsSelectClause(whereClause)) { selectClauseList = lappend(selectClauseList, whereClause); } diff --git a/src/backend/distributed/planner/multi_physical_planner.c b/src/backend/distributed/planner/multi_physical_planner.c index 703dfacf3..5a9752245 100644 --- a/src/backend/distributed/planner/multi_physical_planner.c +++ b/src/backend/distributed/planner/multi_physical_planner.c @@ -40,6 +40,7 @@ #include "distributed/master_protocol.h" #include "distributed/metadata_cache.h" #include "distributed/multi_router_planner.h" +#include "distributed/multi_join_order.h" #include "distributed/multi_logical_optimizer.h" #include "distributed/multi_logical_planner.h" #include "distributed/multi_physical_planner.h" @@ -3497,7 +3498,6 @@ JoinSequenceArray(List *rangeTableFragmentsList, Query *jobQuery, List *dependen JoinExpr *joinExpr = (JoinExpr *) lfirst(joinExprCell); RangeTblRef *rightTableRef = (RangeTblRef *) joinExpr->rarg; uint32 nextRangeTableId = rightTableRef->rtindex; - ListCell *nextJoinClauseCell = NULL; Index existingRangeTableId = 0; bool applyJoinPruning = false; @@ -3518,17 +3518,23 @@ JoinSequenceArray(List *rangeTableFragmentsList, Query *jobQuery, List *dependen * We now determine if we can apply join pruning between existing range * tables and this new one. */ - foreach(nextJoinClauseCell, nextJoinClauseList) + Node *nextJoinClause = NULL; + foreach_ptr(nextJoinClause, nextJoinClauseList) { - OpExpr *nextJoinClause = (OpExpr *) lfirst(nextJoinClauseCell); - - if (!IsJoinClause((Node *) nextJoinClause)) + if (!NodeIsEqualsOpExpr(nextJoinClause)) { continue; } - Var *leftColumn = LeftColumnOrNULL(nextJoinClause); - Var *rightColumn = RightColumnOrNULL(nextJoinClause); + OpExpr *nextJoinClauseOpExpr = castNode(OpExpr, nextJoinClause); + + if (!IsJoinClause((Node *) nextJoinClauseOpExpr)) + { + continue; + } + + Var *leftColumn = LeftColumnOrNULL(nextJoinClauseOpExpr); + Var *rightColumn = RightColumnOrNULL(nextJoinClauseOpExpr); if (leftColumn == NULL || rightColumn == NULL) { continue; @@ -3567,7 +3573,7 @@ JoinSequenceArray(List *rangeTableFragmentsList, Query *jobQuery, List *dependen if (leftPartitioned && rightPartitioned) { /* make sure this join clause references only simple columns */ - CheckJoinBetweenColumns(nextJoinClause); + CheckJoinBetweenColumns(nextJoinClauseOpExpr); applyJoinPruning = true; break; diff --git a/src/include/distributed/multi_join_order.h b/src/include/distributed/multi_join_order.h index 230726ef0..f5d4a7f66 100644 --- a/src/include/distributed/multi_join_order.h +++ b/src/include/distributed/multi_join_order.h @@ -15,6 +15,8 @@ #ifndef MULTI_JOIN_ORDER_H #define MULTI_JOIN_ORDER_H +#include "postgres.h" + #include "nodes/pg_list.h" #include "nodes/primnodes.h" @@ -83,9 +85,10 @@ extern bool EnableSingleHashRepartitioning; extern List * JoinExprList(FromExpr *fromExpr); extern List * JoinOrderList(List *rangeTableEntryList, List *joinClauseList); extern bool IsApplicableJoinClause(List *leftTableIdList, uint32 rightTableId, - OpExpr *joinClause); + Node *joinClause); extern List * ApplicableJoinClauses(List *leftTableIdList, uint32 rightTableId, List *joinClauseList); +extern bool NodeIsEqualsOpExpr(Node *node); extern OpExpr * SinglePartitionJoinClause(Var *partitionColumn, List *applicableJoinClauses); extern OpExpr * DualPartitionJoinClause(List *applicableJoinClauses); From 3a042e4611f11680294332bdd91d6ea95d0cf11d Mon Sep 17 00:00:00 2001 From: Jelte Fennema Date: Mon, 16 Dec 2019 16:11:57 +0100 Subject: [PATCH 3/4] Allow cartesian products on reference tables --- .../distributed/planner/multi_join_order.c | 90 +++++++++++++------ .../planner/multi_logical_planner.c | 29 ++++++ .../planner/multi_physical_planner.c | 36 +++++++- src/include/distributed/multi_join_order.h | 5 +- .../expected/expression_reference_join.out | 23 ++++- .../expected/multi_join_order_additional.out | 4 +- .../expected/multi_mx_reference_table.out | 68 +++++++------- .../expected/multi_reference_table.out | 85 ++++++++++-------- .../regress/sql/expression_reference_join.sql | 19 +++- .../regress/sql/multi_mx_reference_table.sql | 4 +- .../regress/sql/multi_reference_table.sql | 5 +- 11 files changed, 258 insertions(+), 110 deletions(-) diff --git a/src/backend/distributed/planner/multi_join_order.c b/src/backend/distributed/planner/multi_join_order.c index bb39d5678..c976d40cd 100644 --- a/src/backend/distributed/planner/multi_join_order.c +++ b/src/backend/distributed/planner/multi_join_order.c @@ -76,6 +76,10 @@ static RuleEvalFunction JoinRuleEvalFunction(JoinRuleType ruleType); static char * JoinRuleName(JoinRuleType ruleType); static JoinOrderNode * ReferenceJoin(JoinOrderNode *joinNode, TableEntry *candidateTable, List *applicableJoinClauses, JoinType joinType); +static JoinOrderNode * CartesianProductReferenceJoin(JoinOrderNode *joinNode, + TableEntry *candidateTable, + List *applicableJoinClauses, + JoinType joinType); static JoinOrderNode * LocalJoin(JoinOrderNode *joinNode, TableEntry *candidateTable, List *applicableJoinClauses, JoinType joinType); static bool JoinOnColumns(Var *currentPartitioncolumn, Var *candidatePartitionColumn, @@ -752,6 +756,8 @@ JoinRuleEvalFunction(JoinRuleType ruleType) RuleEvalFunctionArray[SINGLE_RANGE_PARTITION_JOIN] = &SinglePartitionJoin; RuleEvalFunctionArray[SINGLE_HASH_PARTITION_JOIN] = &SinglePartitionJoin; RuleEvalFunctionArray[DUAL_PARTITION_JOIN] = &DualPartitionJoin; + RuleEvalFunctionArray[CARTESIAN_PRODUCT_REFERENCE_JOIN] = + &CartesianProductReferenceJoin; RuleEvalFunctionArray[CARTESIAN_PRODUCT] = &CartesianProduct; ruleEvalFunctionsInitialized = true; @@ -780,6 +786,8 @@ JoinRuleName(JoinRuleType ruleType) RuleNameArray[SINGLE_RANGE_PARTITION_JOIN] = strdup("single range partition join"); RuleNameArray[DUAL_PARTITION_JOIN] = strdup("dual partition join"); + RuleNameArray[CARTESIAN_PRODUCT_REFERENCE_JOIN] = strdup( + "cartesian product reference join"); RuleNameArray[CARTESIAN_PRODUCT] = strdup("cartesian product"); ruleNamesInitialized = true; @@ -801,48 +809,76 @@ static JoinOrderNode * ReferenceJoin(JoinOrderNode *currentJoinNode, TableEntry *candidateTable, List *applicableJoinClauses, JoinType joinType) { - JoinOrderNode *nextJoinNode = NULL; int applicableJoinCount = list_length(applicableJoinClauses); - char candidatePartitionMethod = PartitionMethod(candidateTable->relationId); - char leftPartitionMethod = PartitionMethod(currentJoinNode->tableEntry->relationId); - bool performReferenceJoin = false; - if (applicableJoinCount <= 0) { return NULL; } - /* - * If the table is a reference table, then the reference join is feasible.It - * is valid only for inner joins. - * - * Right join requires existing (left) table to be reference table, full outer - * join requires both tables to be reference tables. - */ + char candidatePartitionMethod = PartitionMethod(candidateTable->relationId); + char leftPartitionMethod = PartitionMethod(currentJoinNode->tableEntry->relationId); + + if (!IsSupportedReferenceJoin(joinType, + leftPartitionMethod == DISTRIBUTE_BY_NONE, + candidatePartitionMethod == DISTRIBUTE_BY_NONE)) + { + return NULL; + } + return MakeJoinOrderNode(candidateTable, REFERENCE_JOIN, + currentJoinNode->partitionColumn, + currentJoinNode->partitionMethod, + currentJoinNode->anchorTable); +} + + +/* + * IsSupportedReferenceJoin checks if with this join type we can safely do a simple join + * on the reference table on all the workers. + */ +bool +IsSupportedReferenceJoin(JoinType joinType, bool leftIsReferenceTable, + bool rightIsReferenceTable) +{ if ((joinType == JOIN_INNER || joinType == JOIN_LEFT || joinType == JOIN_ANTI) && - candidatePartitionMethod == DISTRIBUTE_BY_NONE) + rightIsReferenceTable) { - performReferenceJoin = true; + return true; } - else if (joinType == JOIN_RIGHT && leftPartitionMethod == DISTRIBUTE_BY_NONE) + else if ((joinType == JOIN_RIGHT) && + leftIsReferenceTable) { - performReferenceJoin = true; + return true; } - else if (joinType == JOIN_FULL && leftPartitionMethod == DISTRIBUTE_BY_NONE && - candidatePartitionMethod == DISTRIBUTE_BY_NONE) + else if (joinType == JOIN_FULL && leftIsReferenceTable && rightIsReferenceTable) { - performReferenceJoin = true; + return true; } + return false; +} - if (performReferenceJoin) - { - nextJoinNode = MakeJoinOrderNode(candidateTable, REFERENCE_JOIN, - currentJoinNode->partitionColumn, - currentJoinNode->partitionMethod, - currentJoinNode->anchorTable); - } - return nextJoinNode; +/* + * ReferenceJoin evaluates if the candidate table is a reference table for inner, + * left and anti join. For right join, current join node must be represented by + * a reference table. For full join, both of them must be a reference table. + */ +static JoinOrderNode * +CartesianProductReferenceJoin(JoinOrderNode *currentJoinNode, TableEntry *candidateTable, + List *applicableJoinClauses, JoinType joinType) +{ + char candidatePartitionMethod = PartitionMethod(candidateTable->relationId); + char leftPartitionMethod = PartitionMethod(currentJoinNode->tableEntry->relationId); + + if (!IsSupportedReferenceJoin(joinType, + leftPartitionMethod == DISTRIBUTE_BY_NONE, + candidatePartitionMethod == DISTRIBUTE_BY_NONE)) + { + return NULL; + } + return MakeJoinOrderNode(candidateTable, CARTESIAN_PRODUCT_REFERENCE_JOIN, + currentJoinNode->partitionColumn, + currentJoinNode->partitionMethod, + currentJoinNode->anchorTable); } diff --git a/src/backend/distributed/planner/multi_logical_planner.c b/src/backend/distributed/planner/multi_logical_planner.c index 1372086c5..3bc670640 100644 --- a/src/backend/distributed/planner/multi_logical_planner.c +++ b/src/backend/distributed/planner/multi_logical_planner.c @@ -115,6 +115,11 @@ static MultiJoin * ApplySinglePartitionJoin(MultiNode *leftNode, MultiNode *righ static MultiNode * ApplyDualPartitionJoin(MultiNode *leftNode, MultiNode *rightNode, Var *partitionColumn, JoinType joinType, List *joinClauses); +static MultiNode * ApplyCartesianProductReferenceJoin(MultiNode *leftNode, + MultiNode *rightNode, + Var *partitionColumn, + JoinType joinType, + List *joinClauses); static MultiNode * ApplyCartesianProduct(MultiNode *leftNode, MultiNode *rightNode, Var *partitionColumn, JoinType joinType, List *joinClauses); @@ -2022,6 +2027,8 @@ JoinRuleApplyFunction(JoinRuleType ruleType) RuleApplyFunctionArray[SINGLE_RANGE_PARTITION_JOIN] = &ApplySingleRangePartitionJoin; RuleApplyFunctionArray[DUAL_PARTITION_JOIN] = &ApplyDualPartitionJoin; + RuleApplyFunctionArray[CARTESIAN_PRODUCT_REFERENCE_JOIN] = + &ApplyCartesianProductReferenceJoin; RuleApplyFunctionArray[CARTESIAN_PRODUCT] = &ApplyCartesianProduct; ruleApplyFunctionInitialized = true; @@ -2055,6 +2062,28 @@ ApplyReferenceJoin(MultiNode *leftNode, MultiNode *rightNode, } +/* + * ApplyCartesianProductReferenceJoin creates a new MultiJoin node that joins + * the left and the right node. The new node uses the broadcast join rule to + * perform the join. + */ +static MultiNode * +ApplyCartesianProductReferenceJoin(MultiNode *leftNode, MultiNode *rightNode, + Var *partitionColumn, JoinType joinType, + List *applicableJoinClauses) +{ + MultiJoin *joinNode = CitusMakeNode(MultiJoin); + joinNode->joinRuleType = CARTESIAN_PRODUCT_REFERENCE_JOIN; + joinNode->joinType = joinType; + joinNode->joinClauseList = applicableJoinClauses; + + SetLeftChild((MultiBinaryNode *) joinNode, leftNode); + SetRightChild((MultiBinaryNode *) joinNode, rightNode); + + return (MultiNode *) joinNode; +} + + /* * ApplyLocalJoin creates a new MultiJoin node that joins the left and the right * node. The new node uses the local join rule to perform the join. diff --git a/src/backend/distributed/planner/multi_physical_planner.c b/src/backend/distributed/planner/multi_physical_planner.c index 5a9752245..26b4c1ab3 100644 --- a/src/backend/distributed/planner/multi_physical_planner.c +++ b/src/backend/distributed/planner/multi_physical_planner.c @@ -3457,6 +3457,28 @@ FragmentCombinationList(List *rangeTableFragmentsList, Query *jobQuery, } +/* + * NodeIsRangeTblRefReferenceTable checks if the node is a RangeTblRef that + * points to a reference table in the rangeTableList. + */ +static bool +NodeIsRangeTblRefReferenceTable(Node *node, List *rangeTableList) +{ + if (!IsA(node, RangeTblRef)) + { + return false; + } + RangeTblRef *tableRef = castNode(RangeTblRef, node); + RangeTblEntry *rangeTableEntry = rt_fetch(tableRef->rtindex, rangeTableList); + CitusRTEKind rangeTableType = GetRangeTblKind(rangeTableEntry); + if (rangeTableType != CITUS_RTE_RELATION) + { + return false; + } + return PartitionMethod(rangeTableEntry->relid) == DISTRIBUTE_BY_NONE; +} + + /* * JoinSequenceArray walks over the join nodes in the job query and constructs a join * sequence containing an entry for each joined table. The function then returns an @@ -3496,18 +3518,26 @@ JoinSequenceArray(List *rangeTableFragmentsList, Query *jobQuery, List *dependen foreach(joinExprCell, joinExprList) { JoinExpr *joinExpr = (JoinExpr *) lfirst(joinExprCell); - RangeTblRef *rightTableRef = (RangeTblRef *) joinExpr->rarg; + RangeTblRef *rightTableRef = castNode(RangeTblRef, joinExpr->rarg); uint32 nextRangeTableId = rightTableRef->rtindex; Index existingRangeTableId = 0; bool applyJoinPruning = false; List *nextJoinClauseList = make_ands_implicit((Expr *) joinExpr->quals); + bool leftIsReferenceTable = NodeIsRangeTblRefReferenceTable(joinExpr->larg, + rangeTableList); + bool rightIsReferenceTable = NodeIsRangeTblRefReferenceTable(joinExpr->rarg, + rangeTableList); + bool isReferenceJoin = IsSupportedReferenceJoin(joinExpr->jointype, + leftIsReferenceTable, + rightIsReferenceTable); /* * If next join clause list is empty, the user tried a cartesian product - * between tables. We don't support this functionality, and error out. + * between tables. We don't support this functionality for non + * reference joins, and error out. */ - if (nextJoinClauseList == NIL) + if (nextJoinClauseList == NIL && !isReferenceJoin) { ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("cannot perform distributed planning on this query"), diff --git a/src/include/distributed/multi_join_order.h b/src/include/distributed/multi_join_order.h index f5d4a7f66..d1c8bcb0e 100644 --- a/src/include/distributed/multi_join_order.h +++ b/src/include/distributed/multi_join_order.h @@ -35,7 +35,8 @@ typedef enum JoinRuleType SINGLE_HASH_PARTITION_JOIN = 3, SINGLE_RANGE_PARTITION_JOIN = 4, DUAL_PARTITION_JOIN = 5, - CARTESIAN_PRODUCT = 6, + CARTESIAN_PRODUCT_REFERENCE_JOIN = 6, + CARTESIAN_PRODUCT = 7, /* * Add new join rule types above this comment. After adding, you must also @@ -89,6 +90,8 @@ extern bool IsApplicableJoinClause(List *leftTableIdList, uint32 rightTableId, extern List * ApplicableJoinClauses(List *leftTableIdList, uint32 rightTableId, List *joinClauseList); extern bool NodeIsEqualsOpExpr(Node *node); +extern bool IsSupportedReferenceJoin(JoinType joinType, bool leftIsReferenceTable, + bool rightIsReferenceTable); extern OpExpr * SinglePartitionJoinClause(Var *partitionColumn, List *applicableJoinClauses); extern OpExpr * DualPartitionJoinClause(List *applicableJoinClauses); diff --git a/src/test/regress/expected/expression_reference_join.out b/src/test/regress/expected/expression_reference_join.out index 80c8d55c2..d290a27f7 100644 --- a/src/test/regress/expected/expression_reference_join.out +++ b/src/test/regress/expected/expression_reference_join.out @@ -41,8 +41,11 @@ ORDER BY 1,2,3; 2 | 2 | 2 | 4 | 4 (4 rows) --- The join clause is wider than it used to be, causing this query to be recognized by the LogicalPlanner as a repartition join. --- Unplannable query due to a three-way join which causes no valid path (besides the cartesian product) to be found +-- The join clause is wider than it used to be, causing this query to be +-- recognized by the LogicalPlanner as a repartition join. +-- Due to a three-way join this causes no valid path, besides the cartesian +-- product on reference tables. This is allowed, so it should be able to be +-- planned. SELECT * FROM test t1 JOIN test t2 USING (y), -- causes repartition, which makes this not routable or pushdownable @@ -50,7 +53,19 @@ FROM ref b WHERE t2.y - a.a - b.b = 0 ORDER BY 1,2,3; -ERROR: cannot perform distributed planning on this query -DETAIL: Cartesian products are currently unsupported + y | x | x | a | b | a | b +---+---+---+---+---+---+--- +(0 rows) + +-- The join clause is wider than it used to be, causing this query to be recognized by the LogicalPlanner as a repartition join. +-- Unplannable query due to a three-way join which causes no valid path to be found +SELECT * +FROM + test t1 JOIN test t2 USING (y), -- causes repartition, which makes this not routable or pushdownable + test a, + test b +WHERE t2.y - a.x - b.x = 0 +ORDER BY 1,2,3; +ERROR: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator SET client_min_messages TO WARNING; DROP SCHEMA expression_reference_join CASCADE; diff --git a/src/test/regress/expected/multi_join_order_additional.out b/src/test/regress/expected/multi_join_order_additional.out index a354f6354..8765c9735 100644 --- a/src/test/regress/expected/multi_join_order_additional.out +++ b/src/test/regress/expected/multi_join_order_additional.out @@ -99,9 +99,7 @@ LOG: join order: [ "lineitem" ][ local partition join "orders" ] EXPLAIN SELECT l_quantity FROM lineitem, orders WHERE (l_orderkey = o_orderkey OR l_quantity > 5); -LOG: join order: [ "lineitem" ][ cartesian product "orders" ] -ERROR: cannot perform distributed planning on this query -DETAIL: Cartesian products are currently unsupported +ERROR: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator EXPLAIN SELECT count(*) FROM orders, lineitem_hash WHERE o_orderkey = l_orderkey; LOG: join order: [ "orders" ][ single range partition join "lineitem_hash" ] diff --git a/src/test/regress/expected/multi_mx_reference_table.out b/src/test/regress/expected/multi_mx_reference_table.out index b1ab56efe..6dc428df8 100644 --- a/src/test/regress/expected/multi_mx_reference_table.out +++ b/src/test/regress/expected/multi_mx_reference_table.out @@ -804,11 +804,11 @@ INSERT INTO colocated_table_test_2 VALUES (2, 2.0, '2', '2016-12-02'); \c - - - :worker_1_port SET client_min_messages TO DEBUG1; SET citus.log_multi_join_order TO TRUE; -SELECT +SELECT reference_table_test.value_1 -FROM +FROM reference_table_test, colocated_table_test -WHERE +WHERE colocated_table_test.value_1 = reference_table_test.value_1 ORDER BY 1; LOG: join order: [ "colocated_table_test" ][ reference join "reference_table_test" ] @@ -818,11 +818,11 @@ LOG: join order: [ "colocated_table_test" ][ reference join "reference_table_te 2 (2 rows) -SELECT +SELECT colocated_table_test.value_2 -FROM - reference_table_test, colocated_table_test -WHERE +FROM + reference_table_test, colocated_table_test +WHERE colocated_table_test.value_2 = reference_table_test.value_2 ORDER BY 1; LOG: join order: [ "colocated_table_test" ][ reference join "reference_table_test" ] @@ -832,11 +832,11 @@ LOG: join order: [ "colocated_table_test" ][ reference join "reference_table_te 2 (2 rows) -SELECT +SELECT colocated_table_test.value_2 -FROM +FROM colocated_table_test, reference_table_test -WHERE +WHERE reference_table_test.value_1 = colocated_table_test.value_1 ORDER BY 1; LOG: join order: [ "colocated_table_test" ][ reference join "reference_table_test" ] @@ -846,21 +846,29 @@ LOG: join order: [ "colocated_table_test" ][ reference join "reference_table_te 2 (2 rows) -SELECT - colocated_table_test.value_2 -FROM +SET citus.enable_repartition_joins = on; +SELECT + colocated_table_test.value_2 +FROM reference_table_test, colocated_table_test, colocated_table_test_2 -WHERE +WHERE colocated_table_test.value_2 = reference_table_test.value_2 -ORDER BY 1; -LOG: join order: [ "colocated_table_test" ][ reference join "reference_table_test" ][ cartesian product "colocated_table_test_2" ] -ERROR: cannot perform distributed planning on this query -DETAIL: Cartesian products are currently unsupported -SELECT - colocated_table_test.value_2 -FROM +ORDER BY colocated_table_test.value_2; +LOG: join order: [ "colocated_table_test_2" ][ cartesian product reference join "reference_table_test" ][ dual partition join "colocated_table_test" ] + value_2 +--------- + 1 + 1 + 2 + 2 +(4 rows) + +RESET citus.enable_repartition_joins; +SELECT + colocated_table_test.value_2 +FROM reference_table_test, colocated_table_test, colocated_table_test_2 -WHERE +WHERE colocated_table_test.value_1 = colocated_table_test_2.value_1 AND colocated_table_test.value_2 = reference_table_test.value_2 ORDER BY 1; LOG: join order: [ "colocated_table_test" ][ reference join "reference_table_test" ][ local partition join "colocated_table_test_2" ] @@ -871,11 +879,11 @@ LOG: join order: [ "colocated_table_test" ][ reference join "reference_table_te (2 rows) SET citus.task_executor_type to "task-tracker"; -SELECT - colocated_table_test.value_2 -FROM +SELECT + colocated_table_test.value_2 +FROM reference_table_test, colocated_table_test, colocated_table_test_2 -WHERE +WHERE colocated_table_test.value_2 = colocated_table_test_2.value_2 AND colocated_table_test.value_2 = reference_table_test.value_2 ORDER BY 1; LOG: join order: [ "colocated_table_test" ][ reference join "reference_table_test" ][ dual partition join "colocated_table_test_2" ] @@ -885,11 +893,11 @@ LOG: join order: [ "colocated_table_test" ][ reference join "reference_table_te 2 (2 rows) -SELECT - reference_table_test.value_2 -FROM +SELECT + reference_table_test.value_2 +FROM reference_table_test, colocated_table_test, colocated_table_test_2 -WHERE +WHERE colocated_table_test.value_1 = reference_table_test.value_1 AND colocated_table_test_2.value_1 = reference_table_test.value_1 ORDER BY 1; LOG: join order: [ "colocated_table_test" ][ reference join "reference_table_test" ][ dual partition join "colocated_table_test_2" ] diff --git a/src/test/regress/expected/multi_reference_table.out b/src/test/regress/expected/multi_reference_table.out index 72276a0ec..ce740f8b4 100644 --- a/src/test/regress/expected/multi_reference_table.out +++ b/src/test/regress/expected/multi_reference_table.out @@ -1004,11 +1004,11 @@ INSERT INTO colocated_table_test_2 VALUES (1, 1.0, '1', '2016-12-01'); INSERT INTO colocated_table_test_2 VALUES (2, 2.0, '2', '2016-12-02'); SET client_min_messages TO DEBUG1; SET citus.log_multi_join_order TO TRUE; -SELECT +SELECT reference_table_test.value_1 -FROM +FROM reference_table_test, colocated_table_test -WHERE +WHERE colocated_table_test.value_1 = reference_table_test.value_1; LOG: join order: [ "colocated_table_test" ][ reference join "reference_table_test" ] value_1 @@ -1017,11 +1017,11 @@ LOG: join order: [ "colocated_table_test" ][ reference join "reference_table_te 2 (2 rows) -SELECT +SELECT colocated_table_test.value_2 -FROM - reference_table_test, colocated_table_test -WHERE +FROM + reference_table_test, colocated_table_test +WHERE colocated_table_test.value_2 = reference_table_test.value_2; LOG: join order: [ "colocated_table_test" ][ reference join "reference_table_test" ] value_2 @@ -1030,11 +1030,11 @@ LOG: join order: [ "colocated_table_test" ][ reference join "reference_table_te 2 (2 rows) -SELECT +SELECT colocated_table_test.value_2 -FROM +FROM colocated_table_test, reference_table_test -WHERE +WHERE reference_table_test.value_1 = colocated_table_test.value_1; LOG: join order: [ "colocated_table_test" ][ reference join "reference_table_test" ] value_2 @@ -1043,20 +1043,29 @@ LOG: join order: [ "colocated_table_test" ][ reference join "reference_table_te 2 (2 rows) -SELECT - colocated_table_test.value_2 -FROM +SET citus.enable_repartition_joins = on; +SELECT + colocated_table_test.value_2 +FROM reference_table_test, colocated_table_test, colocated_table_test_2 -WHERE - colocated_table_test.value_2 = reference_table_test.value_2; -LOG: join order: [ "colocated_table_test" ][ reference join "reference_table_test" ][ cartesian product "colocated_table_test_2" ] -ERROR: cannot perform distributed planning on this query -DETAIL: Cartesian products are currently unsupported -SELECT - colocated_table_test.value_2 -FROM +WHERE + colocated_table_test.value_2 = reference_table_test.value_2 +ORDER BY colocated_table_test.value_2; +LOG: join order: [ "colocated_table_test_2" ][ cartesian product reference join "reference_table_test" ][ dual partition join "colocated_table_test" ] + value_2 +--------- + 1 + 1 + 2 + 2 +(4 rows) + +RESET citus.enable_repartition_joins; +SELECT + colocated_table_test.value_2 +FROM reference_table_test, colocated_table_test, colocated_table_test_2 -WHERE +WHERE colocated_table_test.value_1 = colocated_table_test_2.value_1 AND colocated_table_test.value_2 = reference_table_test.value_2; LOG: join order: [ "colocated_table_test" ][ reference join "reference_table_test" ][ local partition join "colocated_table_test_2" ] value_2 @@ -1066,11 +1075,11 @@ LOG: join order: [ "colocated_table_test" ][ reference join "reference_table_te (2 rows) SET citus.task_executor_type to "task-tracker"; -SELECT - colocated_table_test.value_2 -FROM +SELECT + colocated_table_test.value_2 +FROM reference_table_test, colocated_table_test, colocated_table_test_2 -WHERE +WHERE colocated_table_test.value_2 = colocated_table_test_2.value_2 AND colocated_table_test.value_2 = reference_table_test.value_2; LOG: join order: [ "colocated_table_test" ][ reference join "reference_table_test" ][ dual partition join "colocated_table_test_2" ] value_2 @@ -1079,11 +1088,11 @@ LOG: join order: [ "colocated_table_test" ][ reference join "reference_table_te 2 (2 rows) -SELECT - reference_table_test.value_2 -FROM +SELECT + reference_table_test.value_2 +FROM reference_table_test, colocated_table_test, colocated_table_test_2 -WHERE +WHERE colocated_table_test.value_1 = reference_table_test.value_1 AND colocated_table_test_2.value_1 = reference_table_test.value_1; LOG: join order: [ "colocated_table_test" ][ reference join "reference_table_test" ][ dual partition join "colocated_table_test_2" ] value_2 @@ -1096,7 +1105,7 @@ SET citus.log_multi_join_order TO FALSE; SET citus.shard_count TO DEFAULT; SET citus.task_executor_type to "adaptive"; -- some INSERT .. SELECT queries that involve both hash distributed and reference tables --- should go via coordinator since we're inserting into reference table where +-- should go via coordinator since we're inserting into reference table where -- not all the participants are reference tables INSERT INTO reference_table_test (value_1) @@ -1122,7 +1131,7 @@ DEBUG: Collecting INSERT ... SELECT results on coordinator -- safe to push down even lack of equality between partition column and column of reference table INSERT INTO colocated_table_test (value_1, value_2) -SELECT +SELECT colocated_table_test_2.value_1, reference_table_test.value_2 FROM colocated_table_test_2, reference_table_test @@ -1135,10 +1144,10 @@ RETURNING value_1, value_2; 2 | 2 (2 rows) --- similar query with the above, this time partition key but without equality +-- similar query with the above, this time partition key but without equality INSERT INTO colocated_table_test (value_1, value_2) -SELECT +SELECT colocated_table_test_2.value_1, reference_table_test.value_2 FROM colocated_table_test_2, reference_table_test @@ -1251,7 +1260,7 @@ WHERE 2 | 2 (2 rows) --- let's now test TRUNCATE and DROP TABLE +-- let's now test TRUNCATE and DROP TABLE -- delete all rows and ingest some data DELETE FROM reference_table_test; INSERT INTO reference_table_test VALUES (1, 1.0, '1', '2016-12-01'); @@ -1442,7 +1451,7 @@ CREATE OR REPLACE FUNCTION select_count_all() RETURNS bigint AS ' FROM reference_table_test; ' LANGUAGE SQL; -CREATE OR REPLACE FUNCTION insert_into_ref_table(value_1 int, value_2 float, value_3 text, value_4 timestamp) +CREATE OR REPLACE FUNCTION insert_into_ref_table(value_1 int, value_2 float, value_3 text, value_4 timestamp) RETURNS void AS ' INSERT INTO reference_table_test VALUES ($1, $2, $3, $4); ' LANGUAGE SQL; @@ -1497,7 +1506,7 @@ SELECT select_count_all(); TRUNCATE reference_table_test; -- some prepared queries and pl/pgsql functions -PREPARE insert_into_ref_table_pr (int, float, text, timestamp) +PREPARE insert_into_ref_table_pr (int, float, text, timestamp) AS INSERT INTO reference_table_test VALUES ($1, $2, $3, $4); -- reference tables do not have up-to-five execution limit as other tables EXECUTE insert_into_ref_table_pr(1, 1.0, '1', '2016-12-01'); @@ -1587,7 +1596,7 @@ ROLLBACK; -- clean up tables, ... SET client_min_messages TO ERROR; DROP SEQUENCE example_ref_value_seq; -DROP TABLE reference_table_test, reference_table_test_second, reference_table_test_third, +DROP TABLE reference_table_test, reference_table_test_second, reference_table_test_third, reference_table_test_fourth, reference_schema.reference_table_ddl, reference_table_composite, colocated_table_test, colocated_table_test_2, append_reference_tmp_table; DROP TYPE reference_comp_key; diff --git a/src/test/regress/sql/expression_reference_join.sql b/src/test/regress/sql/expression_reference_join.sql index 148f4a954..ab496c34e 100644 --- a/src/test/regress/sql/expression_reference_join.sql +++ b/src/test/regress/sql/expression_reference_join.sql @@ -27,8 +27,11 @@ FROM WHERE t2.y * 2 = a.a ORDER BY 1,2,3; --- The join clause is wider than it used to be, causing this query to be recognized by the LogicalPlanner as a repartition join. --- Unplannable query due to a three-way join which causes no valid path (besides the cartesian product) to be found +-- The join clause is wider than it used to be, causing this query to be +-- recognized by the LogicalPlanner as a repartition join. +-- Due to a three-way join this causes no valid path, besides the cartesian +-- product on reference tables. This is allowed, so it should be able to be +-- planned. SELECT * FROM test t1 JOIN test t2 USING (y), -- causes repartition, which makes this not routable or pushdownable @@ -37,5 +40,17 @@ FROM WHERE t2.y - a.a - b.b = 0 ORDER BY 1,2,3; + +-- The join clause is wider than it used to be, causing this query to be recognized by the LogicalPlanner as a repartition join. +-- Unplannable query due to a three-way join which causes no valid path to be found +SELECT * +FROM + test t1 JOIN test t2 USING (y), -- causes repartition, which makes this not routable or pushdownable + test a, + test b +WHERE t2.y - a.x - b.x = 0 +ORDER BY 1,2,3; + + SET client_min_messages TO WARNING; DROP SCHEMA expression_reference_join CASCADE; diff --git a/src/test/regress/sql/multi_mx_reference_table.sql b/src/test/regress/sql/multi_mx_reference_table.sql index 3f4ce43ca..2b4990afd 100644 --- a/src/test/regress/sql/multi_mx_reference_table.sql +++ b/src/test/regress/sql/multi_mx_reference_table.sql @@ -518,13 +518,15 @@ WHERE ORDER BY 1; +SET citus.enable_repartition_joins = on; SELECT colocated_table_test.value_2 FROM reference_table_test, colocated_table_test, colocated_table_test_2 WHERE colocated_table_test.value_2 = reference_table_test.value_2 -ORDER BY 1; +ORDER BY colocated_table_test.value_2; +RESET citus.enable_repartition_joins; SELECT colocated_table_test.value_2 diff --git a/src/test/regress/sql/multi_reference_table.sql b/src/test/regress/sql/multi_reference_table.sql index c76491501..d3fc89896 100644 --- a/src/test/regress/sql/multi_reference_table.sql +++ b/src/test/regress/sql/multi_reference_table.sql @@ -656,12 +656,15 @@ FROM WHERE reference_table_test.value_1 = colocated_table_test.value_1; +SET citus.enable_repartition_joins = on; SELECT colocated_table_test.value_2 FROM reference_table_test, colocated_table_test, colocated_table_test_2 WHERE - colocated_table_test.value_2 = reference_table_test.value_2; + colocated_table_test.value_2 = reference_table_test.value_2 +ORDER BY colocated_table_test.value_2; +RESET citus.enable_repartition_joins; SELECT colocated_table_test.value_2 From cf88bdf8331361a56a22650862efe8c3f2cf5105 Mon Sep 17 00:00:00 2001 From: Jelte Fennema Date: Wed, 18 Dec 2019 15:30:25 +0100 Subject: [PATCH 4/4] Add tests for complex joins on reference tables --- .../expected/multi_repartition_join_ref.out | 226 ++++++++++++++++++ src/test/regress/multi_schedule | 2 +- .../sql/multi_repartition_join_ref.sql | 105 ++++++++ 3 files changed, 332 insertions(+), 1 deletion(-) create mode 100644 src/test/regress/expected/multi_repartition_join_ref.out create mode 100644 src/test/regress/sql/multi_repartition_join_ref.sql diff --git a/src/test/regress/expected/multi_repartition_join_ref.out b/src/test/regress/expected/multi_repartition_join_ref.out new file mode 100644 index 000000000..b0fc05bda --- /dev/null +++ b/src/test/regress/expected/multi_repartition_join_ref.out @@ -0,0 +1,226 @@ +SET citus.log_multi_join_order to TRUE; +SET client_min_messages to DEBUG1; +SET citus.enable_repartition_joins to on; +SELECT + l_partkey, l_suppkey, count(*) +FROM + lineitem, part_append, supplier +WHERE + l_partkey = p_partkey + AND l_suppkey < s_suppkey +GROUP BY + l_partkey, l_suppkey +ORDER BY + l_partkey, l_suppkey +LIMIT 10; +LOG: join order: [ "lineitem" ][ reference join "supplier" ][ single range partition join "part_append" ] +DEBUG: push down of limit count: 10 + l_partkey | l_suppkey | count +-----------+-----------+------- + 195 | 196 | 804 + 245 | 246 | 754 + 278 | 279 | 721 + 308 | 309 | 691 + 309 | 310 | 1380 + 350 | 351 | 649 + 358 | 359 | 641 + 574 | 575 | 425 + 641 | 642 | 358 + 654 | 655 | 345 +(10 rows) + +SELECT + l_partkey, l_suppkey, count(*) +FROM + lineitem, part_append, supplier +WHERE + l_partkey = p_partkey + AND int4eq(l_suppkey, s_suppkey) +GROUP BY + l_partkey, l_suppkey +ORDER BY + l_partkey, l_suppkey +LIMIT 10; +LOG: join order: [ "lineitem" ][ reference join "supplier" ][ single range partition join "part_append" ] +DEBUG: push down of limit count: 10 + l_partkey | l_suppkey | count +-----------+-----------+------- + 195 | 196 | 1 + 245 | 246 | 1 + 278 | 279 | 1 + 308 | 309 | 1 + 309 | 310 | 2 + 350 | 351 | 1 + 358 | 359 | 1 + 574 | 575 | 1 + 641 | 642 | 1 + 654 | 655 | 1 +(10 rows) + +SELECT + l_partkey, l_suppkey, count(*) +FROM + lineitem, part_append, supplier +WHERE + l_partkey = p_partkey + AND NOT int4ne(l_suppkey, s_suppkey) +GROUP BY + l_partkey, l_suppkey +ORDER BY + l_partkey, l_suppkey +LIMIT 10; +LOG: join order: [ "lineitem" ][ reference join "supplier" ][ single range partition join "part_append" ] +DEBUG: push down of limit count: 10 + l_partkey | l_suppkey | count +-----------+-----------+------- + 195 | 196 | 1 + 245 | 246 | 1 + 278 | 279 | 1 + 308 | 309 | 1 + 309 | 310 | 2 + 350 | 351 | 1 + 358 | 359 | 1 + 574 | 575 | 1 + 641 | 642 | 1 + 654 | 655 | 1 +(10 rows) + +SELECT + l_partkey, l_suppkey, count(*) +FROM + lineitem, part_append, supplier +WHERE + l_partkey = p_partkey +GROUP BY + l_partkey, l_suppkey +ORDER BY + l_partkey, l_suppkey +LIMIT 10; +LOG: join order: [ "lineitem" ][ single range partition join "part_append" ][ cartesian product reference join "supplier" ] +DEBUG: push down of limit count: 10 + l_partkey | l_suppkey | count +-----------+-----------+------- + 18 | 7519 | 1000 + 79 | 7580 | 1000 + 91 | 2592 | 1000 + 149 | 5150 | 1000 + 149 | 7650 | 1000 + 175 | 5176 | 1000 + 179 | 2680 | 1000 + 182 | 7683 | 1000 + 195 | 196 | 1000 + 204 | 7705 | 1000 +(10 rows) + +SELECT + l_partkey, l_suppkey, count(*) +FROM + lineitem, part_append, supplier +WHERE + l_partkey = p_partkey + AND (int4eq(l_suppkey, s_suppkey) OR l_suppkey = s_suppkey) +GROUP BY + l_partkey, l_suppkey +ORDER BY + l_partkey, l_suppkey +LIMIT 10; +LOG: join order: [ "lineitem" ][ reference join "supplier" ][ single range partition join "part_append" ] +DEBUG: push down of limit count: 10 + l_partkey | l_suppkey | count +-----------+-----------+------- + 195 | 196 | 1 + 245 | 246 | 1 + 278 | 279 | 1 + 308 | 309 | 1 + 309 | 310 | 2 + 350 | 351 | 1 + 358 | 359 | 1 + 574 | 575 | 1 + 641 | 642 | 1 + 654 | 655 | 1 +(10 rows) + +SELECT + l_partkey, l_suppkey, count(*) +FROM + lineitem, part_append, supplier +WHERE + l_partkey = p_partkey + AND (int4eq(l_suppkey, s_suppkey) OR random() > 2) +GROUP BY + l_partkey, l_suppkey +ORDER BY + l_partkey, l_suppkey +LIMIT 10; +LOG: join order: [ "lineitem" ][ reference join "supplier" ][ single range partition join "part_append" ] +DEBUG: push down of limit count: 10 + l_partkey | l_suppkey | count +-----------+-----------+------- + 195 | 196 | 1 + 245 | 246 | 1 + 278 | 279 | 1 + 308 | 309 | 1 + 309 | 310 | 2 + 350 | 351 | 1 + 358 | 359 | 1 + 574 | 575 | 1 + 641 | 642 | 1 + 654 | 655 | 1 +(10 rows) + +SELECT + l_partkey, l_suppkey, count(*) +FROM + lineitem, part_append, supplier +WHERE + l_partkey = p_partkey + AND (l_suppkey = 1 OR s_suppkey = 1) +GROUP BY + l_partkey, l_suppkey +ORDER BY + l_partkey, l_suppkey +LIMIT 10; +LOG: join order: [ "lineitem" ][ reference join "supplier" ][ single range partition join "part_append" ] +DEBUG: push down of limit count: 10 + l_partkey | l_suppkey | count +-----------+-----------+------- + 18 | 7519 | 1 + 79 | 7580 | 1 + 91 | 2592 | 1 + 149 | 5150 | 1 + 149 | 7650 | 1 + 175 | 5176 | 1 + 179 | 2680 | 1 + 182 | 7683 | 1 + 195 | 196 | 1 + 204 | 7705 | 1 +(10 rows) + +SELECT + l_partkey, l_suppkey, count(*) +FROM + lineitem, part_append, supplier +WHERE + l_partkey = p_partkey + AND l_partkey + p_partkey = s_suppkey +GROUP BY + l_partkey, l_suppkey +ORDER BY + l_partkey, l_suppkey +LIMIT 10; +LOG: join order: [ "lineitem" ][ single range partition join "part_append" ][ reference join "supplier" ] +DEBUG: push down of limit count: 10 + l_partkey | l_suppkey | count +-----------+-----------+------- + 18 | 7519 | 1 + 79 | 7580 | 1 + 91 | 2592 | 1 + 149 | 5150 | 1 + 149 | 7650 | 1 + 175 | 5176 | 1 + 179 | 2680 | 1 + 182 | 7683 | 1 + 195 | 196 | 1 + 204 | 7705 | 1 +(10 rows) + diff --git a/src/test/regress/multi_schedule b/src/test/regress/multi_schedule index e3c251c71..fe8045e77 100644 --- a/src/test/regress/multi_schedule +++ b/src/test/regress/multi_schedule @@ -112,7 +112,7 @@ test: multi_join_order_tpch_repartition # new shards before these tests, as they expect specific shard identifiers in # the output. # ---------- -test: multi_repartition_join_planning multi_repartition_join_pruning multi_repartition_join_task_assignment +test: multi_repartition_join_planning multi_repartition_join_pruning multi_repartition_join_task_assignment multi_repartition_join_ref test: adaptive_executor_repartition # --------- diff --git a/src/test/regress/sql/multi_repartition_join_ref.sql b/src/test/regress/sql/multi_repartition_join_ref.sql new file mode 100644 index 000000000..2d41ba1cd --- /dev/null +++ b/src/test/regress/sql/multi_repartition_join_ref.sql @@ -0,0 +1,105 @@ +SET citus.log_multi_join_order to TRUE; +SET client_min_messages to DEBUG1; +SET citus.enable_repartition_joins to on; +SELECT + l_partkey, l_suppkey, count(*) +FROM + lineitem, part_append, supplier +WHERE + l_partkey = p_partkey + AND l_suppkey < s_suppkey +GROUP BY + l_partkey, l_suppkey +ORDER BY + l_partkey, l_suppkey +LIMIT 10; + +SELECT + l_partkey, l_suppkey, count(*) +FROM + lineitem, part_append, supplier +WHERE + l_partkey = p_partkey + AND int4eq(l_suppkey, s_suppkey) +GROUP BY + l_partkey, l_suppkey +ORDER BY + l_partkey, l_suppkey +LIMIT 10; + +SELECT + l_partkey, l_suppkey, count(*) +FROM + lineitem, part_append, supplier +WHERE + l_partkey = p_partkey + AND NOT int4ne(l_suppkey, s_suppkey) +GROUP BY + l_partkey, l_suppkey +ORDER BY + l_partkey, l_suppkey +LIMIT 10; + +SELECT + l_partkey, l_suppkey, count(*) +FROM + lineitem, part_append, supplier +WHERE + l_partkey = p_partkey +GROUP BY + l_partkey, l_suppkey +ORDER BY + l_partkey, l_suppkey +LIMIT 10; + +SELECT + l_partkey, l_suppkey, count(*) +FROM + lineitem, part_append, supplier +WHERE + l_partkey = p_partkey + AND (int4eq(l_suppkey, s_suppkey) OR l_suppkey = s_suppkey) +GROUP BY + l_partkey, l_suppkey +ORDER BY + l_partkey, l_suppkey +LIMIT 10; + +SELECT + l_partkey, l_suppkey, count(*) +FROM + lineitem, part_append, supplier +WHERE + l_partkey = p_partkey + AND (int4eq(l_suppkey, s_suppkey) OR random() > 2) +GROUP BY + l_partkey, l_suppkey +ORDER BY + l_partkey, l_suppkey +LIMIT 10; + +SELECT + l_partkey, l_suppkey, count(*) +FROM + lineitem, part_append, supplier +WHERE + l_partkey = p_partkey + AND (l_suppkey = 1 OR s_suppkey = 1) +GROUP BY + l_partkey, l_suppkey +ORDER BY + l_partkey, l_suppkey +LIMIT 10; + +SELECT + l_partkey, l_suppkey, count(*) +FROM + lineitem, part_append, supplier +WHERE + l_partkey = p_partkey + AND l_partkey + p_partkey = s_suppkey +GROUP BY + l_partkey, l_suppkey +ORDER BY + l_partkey, l_suppkey +LIMIT 10;