diff --git a/src/backend/distributed/planner/multi_join_order.c b/src/backend/distributed/planner/multi_join_order.c index 81a35c181..a48ca1d12 100644 --- a/src/backend/distributed/planner/multi_join_order.c +++ b/src/backend/distributed/planner/multi_join_order.c @@ -82,7 +82,7 @@ static JoinOrderNode * CartesianProductReferenceJoin(JoinOrderNode *joinNode, JoinType joinType); static JoinOrderNode * LocalJoin(JoinOrderNode *joinNode, TableEntry *candidateTable, List *applicableJoinClauses, JoinType joinType); -static bool JoinOnColumns(Var *currentPartitioncolumn, Var *candidatePartitionColumn, +static bool JoinOnColumns(List *currentPartitionColumnList, Var *candidatePartitionColumn, List *joinClauseList); static JoinOrderNode * SinglePartitionJoin(JoinOrderNode *joinNode, TableEntry *candidateTable, @@ -96,9 +96,9 @@ static JoinOrderNode * CartesianProduct(JoinOrderNode *joinNode, TableEntry *candidateTable, List *applicableJoinClauses, JoinType joinType); -static JoinOrderNode * MakeJoinOrderNode(TableEntry *tableEntry, JoinRuleType - joinRuleType, Var *partitionColumn, - char partitionMethod, +static JoinOrderNode * MakeJoinOrderNode(TableEntry *tableEntry, + JoinRuleType joinRuleType, + List *partitionColumnList, char partitionMethod, TableEntry *anchorTable); @@ -214,9 +214,10 @@ ExtractLeftMostRangeTableIndex(Node *node, int *rangeTableIndex) * JoinOnColumns determines whether two columns are joined by a given join clause list. */ static bool -JoinOnColumns(Var *currentColumn, Var *candidateColumn, List *joinClauseList) +JoinOnColumns(List *currentPartitionColumnList, Var *candidateColumn, + List *joinClauseList) { - if (currentColumn == NULL || candidateColumn == NULL) + if (candidateColumn == NULL || list_length(currentPartitionColumnList) == 0) { /* * LocalJoin can only be happening if we have both a current column and a target @@ -225,31 +226,35 @@ JoinOnColumns(Var *currentColumn, Var *candidateColumn, List *joinClauseList) return false; } - Node *joinClause = NULL; - foreach_ptr(joinClause, joinClauseList) + Var *currentColumn = NULL; + foreach_ptr(currentColumn, currentPartitionColumnList) { - if (!NodeIsEqualsOpExpr(joinClause)) + Node *joinClause = NULL; + foreach_ptr(joinClause, joinClauseList) { - continue; - } - OpExpr *joinClauseOpExpr = castNode(OpExpr, joinClause); - Var *leftColumn = LeftColumnOrNULL(joinClauseOpExpr); - Var *rightColumn = RightColumnOrNULL(joinClauseOpExpr); + if (!NodeIsEqualsOpExpr(joinClause)) + { + continue; + } + OpExpr *joinClauseOpExpr = castNode(OpExpr, joinClause); + Var *leftColumn = LeftColumnOrNULL(joinClauseOpExpr); + Var *rightColumn = RightColumnOrNULL(joinClauseOpExpr); - /* - * Check if both join columns and both partition key columns match, since the - * current and candidate column's can't be NULL we know they won't match if either - * of the columns resolved to NULL above. - */ - if (equal(leftColumn, currentColumn) && - equal(rightColumn, candidateColumn)) - { - return true; - } - if (equal(leftColumn, candidateColumn) && - equal(rightColumn, currentColumn)) - { - return true; + /* + * Check if both join columns and both partition key columns match, since the + * current and candidate column's can't be NULL we know they won't match if either + * of the columns resolved to NULL above. + */ + if (equal(leftColumn, currentColumn) && + equal(rightColumn, candidateColumn)) + { + return true; + } + if (equal(leftColumn, candidateColumn) && + equal(rightColumn, currentColumn)) + { + return true; + } } } @@ -343,7 +348,7 @@ JoinOrderForTable(TableEntry *firstTable, List *tableEntryList, List *joinClause char firstPartitionMethod = PartitionMethod(firstRelationId); JoinOrderNode *firstJoinNode = MakeJoinOrderNode(firstTable, firstJoinRule, - firstPartitionColumn, + list_make1(firstPartitionColumn), firstPartitionMethod, firstTable); @@ -825,7 +830,7 @@ ReferenceJoin(JoinOrderNode *currentJoinNode, TableEntry *candidateTable, return NULL; } return MakeJoinOrderNode(candidateTable, REFERENCE_JOIN, - currentJoinNode->partitionColumn, + currentJoinNode->partitionColumnList, currentJoinNode->partitionMethod, currentJoinNode->anchorTable); } @@ -876,7 +881,7 @@ CartesianProductReferenceJoin(JoinOrderNode *currentJoinNode, TableEntry *candid return NULL; } return MakeJoinOrderNode(candidateTable, CARTESIAN_PRODUCT_REFERENCE_JOIN, - currentJoinNode->partitionColumn, + currentJoinNode->partitionColumnList, currentJoinNode->partitionMethod, currentJoinNode->anchorTable); } @@ -901,7 +906,7 @@ LocalJoin(JoinOrderNode *currentJoinNode, TableEntry *candidateTable, Oid relationId = candidateTable->relationId; uint32 tableId = candidateTable->rangeTableId; Var *candidatePartitionColumn = PartitionColumn(relationId, tableId); - Var *currentPartitionColumn = currentJoinNode->partitionColumn; + List *currentPartitionColumnList = currentJoinNode->partitionColumnList; char candidatePartitionMethod = PartitionMethod(relationId); char currentPartitionMethod = currentJoinNode->partitionMethod; TableEntry *currentAnchorTable = currentJoinNode->anchorTable; @@ -921,7 +926,7 @@ LocalJoin(JoinOrderNode *currentJoinNode, TableEntry *candidateTable, return NULL; } - bool joinOnPartitionColumns = JoinOnColumns(currentPartitionColumn, + bool joinOnPartitionColumns = JoinOnColumns(currentPartitionColumnList, candidatePartitionColumn, applicableJoinClauses); if (!joinOnPartitionColumns) @@ -938,8 +943,17 @@ LocalJoin(JoinOrderNode *currentJoinNode, TableEntry *candidateTable, return NULL; } + /* + * Since we are applying a local join to the candidate table we need to keep track of + * the partition column of the candidate table on the MultiJoinNode. This will allow + * subsequent joins colocated with this candidate table to correctly be recognized as + * a local join as well. + */ + currentPartitionColumnList = list_append_unique(currentPartitionColumnList, + candidatePartitionColumn); + JoinOrderNode *nextJoinNode = MakeJoinOrderNode(candidateTable, LOCAL_PARTITION_JOIN, - currentPartitionColumn, + currentPartitionColumnList, currentPartitionMethod, currentAnchorTable); @@ -959,7 +973,7 @@ static JoinOrderNode * SinglePartitionJoin(JoinOrderNode *currentJoinNode, TableEntry *candidateTable, List *applicableJoinClauses, JoinType joinType) { - Var *currentPartitionColumn = currentJoinNode->partitionColumn; + List *currentPartitionColumnList = currentJoinNode->partitionColumnList; char currentPartitionMethod = currentJoinNode->partitionMethod; TableEntry *currentAnchorTable = currentJoinNode->anchorTable; JoinRuleType currentJoinRuleType = currentJoinNode->joinRuleType; @@ -987,7 +1001,7 @@ SinglePartitionJoin(JoinOrderNode *currentJoinNode, TableEntry *candidateTable, } OpExpr *joinClause = - SinglePartitionJoinClause(currentPartitionColumn, applicableJoinClauses); + SinglePartitionJoinClause(currentPartitionColumnList, applicableJoinClauses); if (joinClause != NULL) { if (currentPartitionMethod == DISTRIBUTE_BY_HASH) @@ -1002,14 +1016,14 @@ SinglePartitionJoin(JoinOrderNode *currentJoinNode, TableEntry *candidateTable, } return MakeJoinOrderNode(candidateTable, SINGLE_HASH_PARTITION_JOIN, - currentPartitionColumn, + currentPartitionColumnList, currentPartitionMethod, currentAnchorTable); } else { return MakeJoinOrderNode(candidateTable, SINGLE_RANGE_PARTITION_JOIN, - currentPartitionColumn, + currentPartitionColumnList, currentPartitionMethod, currentAnchorTable); } @@ -1018,7 +1032,13 @@ SinglePartitionJoin(JoinOrderNode *currentJoinNode, TableEntry *candidateTable, /* evaluate re-partitioning the current table only if the rule didn't apply above */ if (candidatePartitionMethod != DISTRIBUTE_BY_NONE) { - joinClause = SinglePartitionJoinClause(candidatePartitionColumn, + /* + * Create a new unique list (set) with the partition column of the candidate table + * to check if a single repartition join will work for this table. When it works + * the set is retained on the MultiJoinNode for later local join verification. + */ + List *candidatePartitionColumnList = list_make1(candidatePartitionColumn); + joinClause = SinglePartitionJoinClause(candidatePartitionColumnList, applicableJoinClauses); if (joinClause != NULL) { @@ -1035,7 +1055,7 @@ SinglePartitionJoin(JoinOrderNode *currentJoinNode, TableEntry *candidateTable, return MakeJoinOrderNode(candidateTable, SINGLE_HASH_PARTITION_JOIN, - candidatePartitionColumn, + candidatePartitionColumnList, candidatePartitionMethod, candidateTable); } @@ -1043,7 +1063,7 @@ SinglePartitionJoin(JoinOrderNode *currentJoinNode, TableEntry *candidateTable, { return MakeJoinOrderNode(candidateTable, SINGLE_RANGE_PARTITION_JOIN, - candidatePartitionColumn, + candidatePartitionColumnList, candidatePartitionMethod, candidateTable); } @@ -1060,45 +1080,50 @@ SinglePartitionJoin(JoinOrderNode *currentJoinNode, TableEntry *candidateTable, * clause exists, the function returns NULL. */ OpExpr * -SinglePartitionJoinClause(Var *partitionColumn, List *applicableJoinClauses) +SinglePartitionJoinClause(List *partitionColumnList, List *applicableJoinClauses) { - if (partitionColumn == NULL) + if (list_length(partitionColumnList) == 0) { return NULL; } - Node *applicableJoinClause = NULL; - foreach_ptr(applicableJoinClause, applicableJoinClauses) + Var *partitionColumn = NULL; + foreach_ptr(partitionColumn, partitionColumnList) { - if (!NodeIsEqualsOpExpr(applicableJoinClause)) + Node *applicableJoinClause = NULL; + foreach_ptr(applicableJoinClause, applicableJoinClauses) { - continue; - } - OpExpr *applicableJoinOpExpr = castNode(OpExpr, applicableJoinClause); - Var *leftColumn = LeftColumnOrNULL(applicableJoinOpExpr); - Var *rightColumn = RightColumnOrNULL(applicableJoinOpExpr); - if (leftColumn == NULL || rightColumn == NULL) - { - /* not a simple partition column join */ - continue; - } - - - /* - * We first check if partition column matches either of the join columns - * and if it does, we then check if the join column types match. If the - * types are different, we will use different hash functions for the two - * column types, and will incorrectly repartition the data. - */ - if (equal(leftColumn, partitionColumn) || equal(rightColumn, partitionColumn)) - { - if (leftColumn->vartype == rightColumn->vartype) + if (!NodeIsEqualsOpExpr(applicableJoinClause)) { - return applicableJoinOpExpr; + continue; } - else + OpExpr *applicableJoinOpExpr = castNode(OpExpr, applicableJoinClause); + Var *leftColumn = LeftColumnOrNULL(applicableJoinOpExpr); + Var *rightColumn = RightColumnOrNULL(applicableJoinOpExpr); + if (leftColumn == NULL || rightColumn == NULL) { - ereport(DEBUG1, (errmsg("single partition column types do not match"))); + /* not a simple partition column join */ + continue; + } + + + /* + * We first check if partition column matches either of the join columns + * and if it does, we then check if the join column types match. If the + * types are different, we will use different hash functions for the two + * column types, and will incorrectly repartition the data. + */ + if (equal(leftColumn, partitionColumn) || equal(rightColumn, partitionColumn)) + { + if (leftColumn->vartype == rightColumn->vartype) + { + return applicableJoinOpExpr; + } + else + { + ereport(DEBUG1, (errmsg("single partition column types do not " + "match"))); + } } } } @@ -1124,7 +1149,7 @@ DualPartitionJoin(JoinOrderNode *currentJoinNode, TableEntry *candidateTable, /* because of the dual partition, anchor table and partition column get lost */ return MakeJoinOrderNode(candidateTable, DUAL_PARTITION_JOIN, - NULL, + NIL, REDISTRIBUTE_BY_HASH, NULL); } @@ -1185,7 +1210,7 @@ CartesianProduct(JoinOrderNode *currentJoinNode, TableEntry *candidateTable, { /* Because of the cartesian product, anchor table information got lost */ return MakeJoinOrderNode(candidateTable, CARTESIAN_PRODUCT, - currentJoinNode->partitionColumn, + currentJoinNode->partitionColumnList, currentJoinNode->partitionMethod, NULL); } @@ -1197,13 +1222,14 @@ CartesianProduct(JoinOrderNode *currentJoinNode, TableEntry *candidateTable, /* Constructs and returns a join-order node with the given arguments */ JoinOrderNode * MakeJoinOrderNode(TableEntry *tableEntry, JoinRuleType joinRuleType, - Var *partitionColumn, char partitionMethod, TableEntry *anchorTable) + List *partitionColumnList, char partitionMethod, + TableEntry *anchorTable) { JoinOrderNode *joinOrderNode = palloc0(sizeof(JoinOrderNode)); joinOrderNode->tableEntry = tableEntry; joinOrderNode->joinRuleType = joinRuleType; joinOrderNode->joinType = JOIN_INNER; - joinOrderNode->partitionColumn = partitionColumn; + joinOrderNode->partitionColumnList = partitionColumnList; joinOrderNode->partitionMethod = partitionMethod; joinOrderNode->joinClauseList = NIL; joinOrderNode->anchorTable = anchorTable; diff --git a/src/backend/distributed/planner/multi_logical_planner.c b/src/backend/distributed/planner/multi_logical_planner.c index 3bc670640..4cf967171 100644 --- a/src/backend/distributed/planner/multi_logical_planner.c +++ b/src/backend/distributed/planner/multi_logical_planner.c @@ -64,7 +64,7 @@ typedef struct QualifierWalkerContext /* Function pointer type definition for apply join rule functions */ typedef MultiNode *(*RuleApplyFunction) (MultiNode *leftNode, MultiNode *rightNode, - Var *partitionColumn, JoinType joinType, + List *partitionColumnList, JoinType joinType, List *joinClauses); static RuleApplyFunction RuleApplyFunctionArray[JOIN_RULE_LAST] = { 0 }; /* join rules */ @@ -92,36 +92,38 @@ static bool IsSelectClause(Node *clause); /* Local functions forward declarations for applying joins */ static MultiNode * ApplyJoinRule(MultiNode *leftNode, MultiNode *rightNode, - JoinRuleType ruleType, Var *partitionColumn, + JoinRuleType ruleType, List *partitionColumnList, JoinType joinType, List *joinClauseList); static RuleApplyFunction JoinRuleApplyFunction(JoinRuleType ruleType); static MultiNode * ApplyReferenceJoin(MultiNode *leftNode, MultiNode *rightNode, - Var *partitionColumn, JoinType joinType, + List *partitionColumnList, JoinType joinType, List *joinClauses); static MultiNode * ApplyLocalJoin(MultiNode *leftNode, MultiNode *rightNode, - Var *partitionColumn, JoinType joinType, + List *partitionColumnList, JoinType joinType, List *joinClauses); static MultiNode * ApplySingleRangePartitionJoin(MultiNode *leftNode, MultiNode *rightNode, - Var *partitionColumn, JoinType joinType, + List *partitionColumnList, + JoinType joinType, List *applicableJoinClauses); static MultiNode * ApplySingleHashPartitionJoin(MultiNode *leftNode, MultiNode *rightNode, - Var *partitionColumn, JoinType joinType, + List *partitionColumnList, + JoinType joinType, List *applicableJoinClauses); static MultiJoin * ApplySinglePartitionJoin(MultiNode *leftNode, MultiNode *rightNode, - Var *partitionColumn, JoinType joinType, + List *partitionColumnList, JoinType joinType, List *joinClauses); static MultiNode * ApplyDualPartitionJoin(MultiNode *leftNode, MultiNode *rightNode, - Var *partitionColumn, JoinType joinType, + List *partitionColumnList, JoinType joinType, List *joinClauses); static MultiNode * ApplyCartesianProductReferenceJoin(MultiNode *leftNode, MultiNode *rightNode, - Var *partitionColumn, + List *partitionColumnList, JoinType joinType, List *joinClauses); static MultiNode * ApplyCartesianProduct(MultiNode *leftNode, MultiNode *rightNode, - Var *partitionColumn, JoinType joinType, + List *partitionColumnList, JoinType joinType, List *joinClauses); @@ -1620,7 +1622,7 @@ MultiJoinTree(List *joinOrderList, List *collectTableList, List *joinWhereClause { JoinRuleType joinRuleType = joinOrderNode->joinRuleType; JoinType joinType = joinOrderNode->joinType; - Var *partitionColumn = joinOrderNode->partitionColumn; + List *partitionColumnList = joinOrderNode->partitionColumnList; List *joinClauseList = joinOrderNode->joinClauseList; /* @@ -1629,7 +1631,7 @@ MultiJoinTree(List *joinOrderList, List *collectTableList, List *joinWhereClause */ MultiNode *newJoinNode = ApplyJoinRule(currentTopNode, (MultiNode *) collectNode, - joinRuleType, partitionColumn, + joinRuleType, partitionColumnList, joinType, joinClauseList); @@ -1976,7 +1978,7 @@ pull_var_clause_default(Node *node) */ static MultiNode * ApplyJoinRule(MultiNode *leftNode, MultiNode *rightNode, JoinRuleType ruleType, - Var *partitionColumn, JoinType joinType, List *joinClauseList) + List *partitionColumnList, JoinType joinType, List *joinClauseList) { List *leftTableIdList = OutputTableIdList(leftNode); List *rightTableIdList = OutputTableIdList(rightNode); @@ -1992,7 +1994,7 @@ ApplyJoinRule(MultiNode *leftNode, MultiNode *rightNode, JoinRuleType ruleType, /* call the join rule application function to create the new join node */ RuleApplyFunction ruleApplyFunction = JoinRuleApplyFunction(ruleType); - MultiNode *multiNode = (*ruleApplyFunction)(leftNode, rightNode, partitionColumn, + MultiNode *multiNode = (*ruleApplyFunction)(leftNode, rightNode, partitionColumnList, joinType, applicableJoinClauses); if (joinType != JOIN_INNER && CitusIsA(multiNode, MultiJoin)) @@ -2047,7 +2049,7 @@ JoinRuleApplyFunction(JoinRuleType ruleType) */ static MultiNode * ApplyReferenceJoin(MultiNode *leftNode, MultiNode *rightNode, - Var *partitionColumn, JoinType joinType, + List *partitionColumnList, JoinType joinType, List *applicableJoinClauses) { MultiJoin *joinNode = CitusMakeNode(MultiJoin); @@ -2069,7 +2071,7 @@ ApplyReferenceJoin(MultiNode *leftNode, MultiNode *rightNode, */ static MultiNode * ApplyCartesianProductReferenceJoin(MultiNode *leftNode, MultiNode *rightNode, - Var *partitionColumn, JoinType joinType, + List *partitionColumnList, JoinType joinType, List *applicableJoinClauses) { MultiJoin *joinNode = CitusMakeNode(MultiJoin); @@ -2090,7 +2092,7 @@ ApplyCartesianProductReferenceJoin(MultiNode *leftNode, MultiNode *rightNode, */ static MultiNode * ApplyLocalJoin(MultiNode *leftNode, MultiNode *rightNode, - Var *partitionColumn, JoinType joinType, + List *partitionColumnList, JoinType joinType, List *applicableJoinClauses) { MultiJoin *joinNode = CitusMakeNode(MultiJoin); @@ -2111,11 +2113,11 @@ ApplyLocalJoin(MultiNode *leftNode, MultiNode *rightNode, */ static MultiNode * ApplySingleRangePartitionJoin(MultiNode *leftNode, MultiNode *rightNode, - Var *partitionColumn, JoinType joinType, + List *partitionColumnList, JoinType joinType, List *applicableJoinClauses) { MultiJoin *joinNode = - ApplySinglePartitionJoin(leftNode, rightNode, partitionColumn, joinType, + ApplySinglePartitionJoin(leftNode, rightNode, partitionColumnList, joinType, applicableJoinClauses); joinNode->joinRuleType = SINGLE_RANGE_PARTITION_JOIN; @@ -2130,11 +2132,11 @@ ApplySingleRangePartitionJoin(MultiNode *leftNode, MultiNode *rightNode, */ static MultiNode * ApplySingleHashPartitionJoin(MultiNode *leftNode, MultiNode *rightNode, - Var *partitionColumn, JoinType joinType, + List *partitionColumnList, JoinType joinType, List *applicableJoinClauses) { MultiJoin *joinNode = - ApplySinglePartitionJoin(leftNode, rightNode, partitionColumn, joinType, + ApplySinglePartitionJoin(leftNode, rightNode, partitionColumnList, joinType, applicableJoinClauses); joinNode->joinRuleType = SINGLE_HASH_PARTITION_JOIN; @@ -2150,9 +2152,10 @@ ApplySingleHashPartitionJoin(MultiNode *leftNode, MultiNode *rightNode, */ static MultiJoin * ApplySinglePartitionJoin(MultiNode *leftNode, MultiNode *rightNode, - Var *partitionColumn, JoinType joinType, + List *partitionColumnList, JoinType joinType, List *applicableJoinClauses) { + Var *partitionColumn = linitial(partitionColumnList); uint32 partitionTableId = partitionColumn->varno; /* create all operator structures up front */ @@ -2165,7 +2168,7 @@ ApplySinglePartitionJoin(MultiNode *leftNode, MultiNode *rightNode, * column against the join clause's columns. If one of the columns matches, * we introduce a (re-)partition operator for the other column. */ - OpExpr *joinClause = SinglePartitionJoinClause(partitionColumn, + OpExpr *joinClause = SinglePartitionJoinClause(partitionColumnList, applicableJoinClauses); Assert(joinClause != NULL); @@ -2230,7 +2233,7 @@ ApplySinglePartitionJoin(MultiNode *leftNode, MultiNode *rightNode, */ static MultiNode * ApplyDualPartitionJoin(MultiNode *leftNode, MultiNode *rightNode, - Var *partitionColumn, JoinType joinType, + List *partitionColumnList, JoinType joinType, List *applicableJoinClauses) { /* find the appropriate join clause */ @@ -2289,7 +2292,7 @@ ApplyDualPartitionJoin(MultiNode *leftNode, MultiNode *rightNode, /* Creates a cartesian product node that joins the left and the right node. */ static MultiNode * ApplyCartesianProduct(MultiNode *leftNode, MultiNode *rightNode, - Var *partitionColumn, JoinType joinType, + List *partitionColumnList, JoinType joinType, List *applicableJoinClauses) { MultiCartesianProduct *cartesianNode = CitusMakeNode(MultiCartesianProduct); diff --git a/src/include/distributed/multi_join_order.h b/src/include/distributed/multi_join_order.h index d1c8bcb0e..9e438626c 100644 --- a/src/include/distributed/multi_join_order.h +++ b/src/include/distributed/multi_join_order.h @@ -70,7 +70,13 @@ typedef struct JoinOrderNode TableEntry *tableEntry; /* this node's relation and range table id */ JoinRuleType joinRuleType; /* not relevant for the first table */ JoinType joinType; /* not relevant for the first table */ - Var *partitionColumn; /* not relevant for the first table */ + + /* + * We keep track of all unique partition columns in the relation to correctly find + * join clauses that can be applied locally. + */ + List *partitionColumnList; + char partitionMethod; List *joinClauseList; /* not relevant for the first table */ TableEntry *anchorTable; @@ -92,7 +98,7 @@ extern List * ApplicableJoinClauses(List *leftTableIdList, uint32 rightTableId, extern bool NodeIsEqualsOpExpr(Node *node); extern bool IsSupportedReferenceJoin(JoinType joinType, bool leftIsReferenceTable, bool rightIsReferenceTable); -extern OpExpr * SinglePartitionJoinClause(Var *partitionColumn, +extern OpExpr * SinglePartitionJoinClause(List *partitionColumnList, List *applicableJoinClauses); extern OpExpr * DualPartitionJoinClause(List *applicableJoinClauses); extern Var * LeftColumnOrNULL(OpExpr *joinClause); diff --git a/src/test/regress/expected/chbenchmark_all_queries.out b/src/test/regress/expected/chbenchmark_all_queries.out index 53bab669e..32e60b324 100644 --- a/src/test/regress/expected/chbenchmark_all_queries.out +++ b/src/test/regress/expected/chbenchmark_all_queries.out @@ -1,6 +1,11 @@ SET citus.next_shard_id TO 1650000; CREATE SCHEMA chbenchmark_all_queries; SET search_path TO chbenchmark_all_queries; +-- we want to make sure the join order is stable. If the join order of a table changes due +-- to a chacnge you are making, please verify if it is not a regression. If the join order +-- became better feel free to update the output. +SET citus.log_multi_join_order TO on; +SET client_min_messages TO log; SET citus.enable_repartition_joins TO on; CREATE TABLE order_line ( ol_w_id int NOT NULL, @@ -245,6 +250,7 @@ FROM order_line WHERE ol_delivery_d > '2007-01-02 00:00:00.000000' GROUP BY ol_number ORDER BY ol_number; +LOG: join order: [ "order_line" ] ol_number | sum_qty | sum_amount | avg_qty | avg_amount | count_order --------------------------------------------------------------------- 0 | 0 | 0.00 | 0.00000000000000000000 | 0.00000000000000000000 | 1 @@ -301,6 +307,7 @@ ORDER BY n_name, su_name, i_id; +LOG: join order: [ "stock" ][ reference join "supplier" ][ reference join "nation" ][ reference join "region" ] su_suppkey | su_name | n_name | i_id | i_name | su_address | su_phone | su_comment --------------------------------------------------------------------- 9 | abc | Germany | 3 | Keyboard | def | ghi | jkl @@ -338,6 +345,7 @@ GROUP BY ORDER BY revenue DESC, o_entry_d; +LOG: join order: [ "customer" ][ local partition join "oorder" ][ local partition join "new_order" ][ local partition join "order_line" ] ol_o_id | ol_w_id | ol_d_id | revenue | o_entry_d --------------------------------------------------------------------- 10 | 10 | 10 | 10.00 | Fri Oct 17 00:00:00 2008 @@ -406,6 +414,7 @@ WHERE c_id = o_c_id AND o_entry_d >= '2007-01-02 00:00:00.000000' GROUP BY n_name ORDER BY revenue DESC; +LOG: join order: [ "customer" ][ local partition join "oorder" ][ local partition join "order_line" ][ local partition join "stock" ][ reference join "supplier" ][ reference join "nation" ][ reference join "region" ] n_name | revenue --------------------------------------------------------------------- Germany | 3.00 @@ -419,6 +428,7 @@ FROM order_line WHERE ol_delivery_d >= '1999-01-01 00:00:00.000000' AND ol_delivery_d < '2020-01-01 00:00:00.000000' AND ol_quantity BETWEEN 1 AND 100000; +LOG: join order: [ "order_line" ] revenue --------------------------------------------------------------------- 55.00 @@ -462,6 +472,7 @@ ORDER BY su_nationkey, cust_nation, l_year; +LOG: join order: [ "order_line" ][ local partition join "oorder" ][ local partition join "customer" ][ reference join "nation" ][ reference join "nation" ][ reference join "supplier" ][ dual partition join "stock" ] supp_nation | cust_nation | l_year | revenue --------------------------------------------------------------------- 9 | C | 2008 | 3.00 @@ -501,6 +512,7 @@ WHERE i_id = s_i_id AND i_id = ol_i_id GROUP BY extract(YEAR FROM o_entry_d) ORDER BY l_year; +LOG: join order: [ "order_line" ][ reference join "item" ][ local partition join "oorder" ][ local partition join "customer" ][ reference join "nation" ][ reference join "region" ][ dual partition join "stock" ][ reference join "supplier" ][ reference join "nation" ] l_year | mkt_share --------------------------------------------------------------------- 2008 | 0.50000000000000000000 @@ -533,6 +545,7 @@ GROUP BY ORDER BY n_name, l_year DESC; +LOG: join order: [ "order_line" ][ reference join "item" ][ local partition join "oorder" ][ dual partition join "stock" ][ reference join "supplier" ][ reference join "nation" ] n_name | l_year | sum_profit --------------------------------------------------------------------- Germany | 2008 | 3.00 @@ -569,6 +582,7 @@ GROUP BY c_phone, n_name ORDER BY revenue DESC; +LOG: join order: [ "customer" ][ reference join "nation" ][ local partition join "oorder" ][ local partition join "order_line" ] c_id | c_last | revenue | c_city | c_phone | n_name --------------------------------------------------------------------- 10 | John | 10.00 | Some City | +1 000 0000000 | Cambodia @@ -606,6 +620,7 @@ HAVING sum(s_order_cnt) > AND su_nationkey = n_nationkey AND n_name = 'Germany') ORDER BY ordercount DESC; +LOG: join order: [ "stock" ][ reference join "supplier" ][ reference join "nation" ] s_i_id | ordercount --------------------------------------------------------------------- 3 | 3 @@ -626,6 +641,7 @@ WHERE ol_w_id = o_w_id AND ol_delivery_d < '2020-01-01 00:00:00.000000' GROUP BY o_ol_cnt ORDER BY o_ol_cnt; +LOG: join order: [ "oorder" ][ local partition join "order_line" ] o_ol_cnt | high_line_count | low_line_count --------------------------------------------------------------------- 1 | 2 | 9 @@ -664,6 +680,7 @@ FROM WHERE ol_i_id = i_id AND ol_delivery_d >= '2007-01-02 00:00:00.000000' AND ol_delivery_d < '2020-01-02 00:00:00.000000'; +LOG: join order: [ "order_line" ][ reference join "item" ] promo_revenue --------------------------------------------------------------------- 0.00000000000000000000 @@ -693,6 +710,7 @@ FROM WHERE su_suppkey = supplier_no AND total_revenue = (SELECT max(total_revenue) FROM revenue) ORDER BY su_suppkey; +LOG: join order: [ "order_line" ][ dual partition join "stock" ] su_suppkey | su_name | su_address | su_phone | total_revenue --------------------------------------------------------------------- 9 | abc | def | ghi | 3.00 @@ -738,6 +756,7 @@ FROM AND ol_i_id = i_id GROUP BY i_id) t WHERE ol_i_id = t.i_id; +LOG: join order: [ "order_line" ][ reference join "item" ] avg_yearly --------------------------------------------------------------------- 27.5000000000000000 @@ -775,6 +794,7 @@ HAVING sum(ol_amount) > 5 -- was 200, but thats too big for the dataset ORDER BY sum(ol_amount) DESC, o_entry_d; +LOG: join order: [ "customer" ][ local partition join "oorder" ][ local partition join "order_line" ] c_last | o_id | o_entry_d | o_ol_cnt | sum --------------------------------------------------------------------- John | 10 | Fri Oct 17 00:00:00 2008 | 1 | 10.00 @@ -808,6 +828,7 @@ WHERE ( ol_i_id = i_id AND ol_quantity <= 10 AND i_price BETWEEN 1 AND 400000 AND ol_w_id IN (1,5,3)); +LOG: join order: [ "order_line" ][ reference join "item" ] revenue --------------------------------------------------------------------- 7.00 @@ -837,6 +858,7 @@ WHERE su_suppkey in AND su_nationkey = n_nationkey AND n_name = 'Germany' ORDER BY su_name; +LOG: join order: [ "stock" ][ reference join "item" ][ dual partition join "order_line" ] su_name | su_address --------------------------------------------------------------------- abc | def @@ -895,9 +917,132 @@ WHERE substr(c_phone,1,1) in ('1','2','3','4','5','6','7') AND o_d_id = c_d_id) GROUP BY substr(c_state,1,1) ORDER BY substr(c_state,1,1); +LOG: join order: [ "customer" ] country | numcust | totacctbal --------------------------------------------------------------------- (0 rows) +-- There are some queries that have specific interactions with single repartition. +-- Here we test Q7-Q9 with single repartition enabled +SET citus.enable_single_hash_repartition_joins TO on; +-- Query 7 +SELECT + su_nationkey as supp_nation, + substr(c_state,1,1) as cust_nation, + extract(year from o_entry_d) as l_year, + sum(ol_amount) as revenue +FROM + supplier, + stock, + order_line, + oorder, + customer, + nation n1, + nation n2 +WHERE ol_supply_w_id = s_w_id + AND ol_i_id = s_i_id + AND mod((s_w_id * s_i_id), 10000) = su_suppkey + AND ol_w_id = o_w_id + AND ol_d_id = o_d_id + AND ol_o_id = o_id + AND c_id = o_c_id + AND c_w_id = o_w_id + AND c_d_id = o_d_id + AND su_nationkey = n1.n_nationkey + AND ascii(substr(c_state,1,1)) = n2.n_nationkey + AND ( + (n1.n_name = 'Germany' AND n2.n_name = 'Cambodia') + OR (n1.n_name = 'Cambodia' AND n2.n_name = 'Germany') + ) + AND ol_delivery_d BETWEEN '2007-01-02 00:00:00.000000' AND '2012-01-02 00:00:00.000000' +GROUP BY + su_nationkey, + substr(c_state,1,1), + extract(year from o_entry_d) +ORDER BY + su_nationkey, + cust_nation, + l_year; +LOG: join order: [ "order_line" ][ local partition join "oorder" ][ local partition join "customer" ][ reference join "nation" ][ reference join "nation" ][ reference join "supplier" ][ single hash partition join "stock" ] + supp_nation | cust_nation | l_year | revenue +--------------------------------------------------------------------- + 9 | C | 2008 | 3.00 +(1 row) + +-- Query 8 +SELECT + extract(year from o_entry_d) as l_year, + sum(case when n2.n_name = 'Germany' then ol_amount else 0 end) / sum(ol_amount) as mkt_share +FROM + item, + supplier, + stock, + order_line, + oorder, + customer, + nation n1, + nation n2, + region +WHERE i_id = s_i_id + AND ol_i_id = s_i_id + AND ol_supply_w_id = s_w_id + AND mod((s_w_id * s_i_id),10000) = su_suppkey + AND ol_w_id = o_w_id + AND ol_d_id = o_d_id + AND ol_o_id = o_id + AND c_id = o_c_id + AND c_w_id = o_w_id + AND c_d_id = o_d_id + AND n1.n_nationkey = ascii(substr(c_state,1,1)) + AND n1.n_regionkey = r_regionkey + AND ol_i_id < 1000 + AND r_name = 'Europe' + AND su_nationkey = n2.n_nationkey + AND o_entry_d BETWEEN '2007-01-02 00:00:00.000000' AND '2012-01-02 00:00:00.000000' + AND i_data LIKE '%b' + AND i_id = ol_i_id +GROUP BY extract(YEAR FROM o_entry_d) +ORDER BY l_year; +LOG: join order: [ "order_line" ][ reference join "item" ][ local partition join "oorder" ][ local partition join "customer" ][ reference join "nation" ][ reference join "region" ][ single hash partition join "stock" ][ reference join "supplier" ][ reference join "nation" ] + l_year | mkt_share +--------------------------------------------------------------------- + 2008 | 0.50000000000000000000 +(1 row) + +-- Query 9 +SELECT + n_name, + extract(year from o_entry_d) as l_year, + sum(ol_amount) as sum_profit +FROM + item, + stock, + supplier, + order_line, + oorder, + nation +WHERE ol_i_id = s_i_id + AND ol_supply_w_id = s_w_id + AND mod((s_w_id * s_i_id), 10000) = su_suppkey + AND ol_w_id = o_w_id + AND ol_d_id = o_d_id + AND ol_o_id = o_id + AND ol_i_id = i_id + AND su_nationkey = n_nationkey + AND i_data LIKE '%b' -- this used to be %BB but that will not work with our small dataset +GROUP BY + n_name, + extract(YEAR FROM o_entry_d) +ORDER BY + n_name, + l_year DESC; +LOG: join order: [ "order_line" ][ reference join "item" ][ local partition join "oorder" ][ single hash partition join "stock" ][ reference join "supplier" ][ reference join "nation" ] + n_name | l_year | sum_profit +--------------------------------------------------------------------- + Germany | 2008 | 3.00 + The Netherlands | 2008 | 2.00 + United States | 2008 | 1.00 +(3 rows) + SET client_min_messages TO WARNING; DROP SCHEMA chbenchmark_all_queries CASCADE; diff --git a/src/test/regress/expected/multi_join_order_additional.out b/src/test/regress/expected/multi_join_order_additional.out index 86aec27af..2a9662b40 100644 --- a/src/test/regress/expected/multi_join_order_additional.out +++ b/src/test/regress/expected/multi_join_order_additional.out @@ -168,6 +168,30 @@ LOG: join order: [ "orders_hash" ][ single range partition join "customer_appen explain statements for distributed queries are not enabled (3 rows) +-- Validate a 4 way join that could be done locally is planned as such by the logical +-- planner. It used to be planned as a repartition join due to no 1 table being directly +-- joined to all other tables, but instead follows a chain. +EXPLAIN SELECT count(*) +FROM ( + SELECT users_table.user_id + FROM users_table + JOIN events_table USING (user_id) + WHERE event_type = 5 +) AS bar +JOIN ( + SELECT users_table.user_id + FROM users_table + JOIN events_table USING (user_id) + WHERE event_type = 5 +) AS some_users ON (some_users.user_id = bar.user_id); +LOG: join order: [ "users_table" ][ local partition join "events_table" ][ local partition join "users_table" ][ local partition join "events_table" ] + QUERY PLAN +--------------------------------------------------------------------- + Aggregate (cost=0.00..0.00 rows=0 width=0) + -> Custom Scan (Citus Task-Tracker) (cost=0.00..0.00 rows=0 width=0) + explain statements for distributed queries are not enabled +(3 rows) + -- Reset client logging level to its previous value SET client_min_messages TO NOTICE; DROP TABLE lineitem_hash; diff --git a/src/test/regress/sql/chbenchmark_all_queries.sql b/src/test/regress/sql/chbenchmark_all_queries.sql index 4307aaa98..bed8308a9 100644 --- a/src/test/regress/sql/chbenchmark_all_queries.sql +++ b/src/test/regress/sql/chbenchmark_all_queries.sql @@ -2,6 +2,12 @@ SET citus.next_shard_id TO 1650000; CREATE SCHEMA chbenchmark_all_queries; SET search_path TO chbenchmark_all_queries; +-- we want to make sure the join order is stable. If the join order of a table changes due +-- to a chacnge you are making, please verify if it is not a regression. If the join order +-- became better feel free to update the output. +SET citus.log_multi_join_order TO on; +SET client_min_messages TO log; + SET citus.enable_repartition_joins TO on; CREATE TABLE order_line ( ol_w_id int NOT NULL, @@ -718,5 +724,112 @@ WHERE substr(c_phone,1,1) in ('1','2','3','4','5','6','7') GROUP BY substr(c_state,1,1) ORDER BY substr(c_state,1,1); + +-- There are some queries that have specific interactions with single repartition. +-- Here we test Q7-Q9 with single repartition enabled +SET citus.enable_single_hash_repartition_joins TO on; + +-- Query 7 +SELECT + su_nationkey as supp_nation, + substr(c_state,1,1) as cust_nation, + extract(year from o_entry_d) as l_year, + sum(ol_amount) as revenue +FROM + supplier, + stock, + order_line, + oorder, + customer, + nation n1, + nation n2 +WHERE ol_supply_w_id = s_w_id + AND ol_i_id = s_i_id + AND mod((s_w_id * s_i_id), 10000) = su_suppkey + AND ol_w_id = o_w_id + AND ol_d_id = o_d_id + AND ol_o_id = o_id + AND c_id = o_c_id + AND c_w_id = o_w_id + AND c_d_id = o_d_id + AND su_nationkey = n1.n_nationkey + AND ascii(substr(c_state,1,1)) = n2.n_nationkey + AND ( + (n1.n_name = 'Germany' AND n2.n_name = 'Cambodia') + OR (n1.n_name = 'Cambodia' AND n2.n_name = 'Germany') + ) + AND ol_delivery_d BETWEEN '2007-01-02 00:00:00.000000' AND '2012-01-02 00:00:00.000000' +GROUP BY + su_nationkey, + substr(c_state,1,1), + extract(year from o_entry_d) +ORDER BY + su_nationkey, + cust_nation, + l_year; + +-- Query 8 +SELECT + extract(year from o_entry_d) as l_year, + sum(case when n2.n_name = 'Germany' then ol_amount else 0 end) / sum(ol_amount) as mkt_share +FROM + item, + supplier, + stock, + order_line, + oorder, + customer, + nation n1, + nation n2, + region +WHERE i_id = s_i_id + AND ol_i_id = s_i_id + AND ol_supply_w_id = s_w_id + AND mod((s_w_id * s_i_id),10000) = su_suppkey + AND ol_w_id = o_w_id + AND ol_d_id = o_d_id + AND ol_o_id = o_id + AND c_id = o_c_id + AND c_w_id = o_w_id + AND c_d_id = o_d_id + AND n1.n_nationkey = ascii(substr(c_state,1,1)) + AND n1.n_regionkey = r_regionkey + AND ol_i_id < 1000 + AND r_name = 'Europe' + AND su_nationkey = n2.n_nationkey + AND o_entry_d BETWEEN '2007-01-02 00:00:00.000000' AND '2012-01-02 00:00:00.000000' + AND i_data LIKE '%b' + AND i_id = ol_i_id +GROUP BY extract(YEAR FROM o_entry_d) +ORDER BY l_year; + +-- Query 9 +SELECT + n_name, + extract(year from o_entry_d) as l_year, + sum(ol_amount) as sum_profit +FROM + item, + stock, + supplier, + order_line, + oorder, + nation +WHERE ol_i_id = s_i_id + AND ol_supply_w_id = s_w_id + AND mod((s_w_id * s_i_id), 10000) = su_suppkey + AND ol_w_id = o_w_id + AND ol_d_id = o_d_id + AND ol_o_id = o_id + AND ol_i_id = i_id + AND su_nationkey = n_nationkey + AND i_data LIKE '%b' -- this used to be %BB but that will not work with our small dataset +GROUP BY + n_name, + extract(YEAR FROM o_entry_d) +ORDER BY + n_name, + l_year DESC; + SET client_min_messages TO WARNING; DROP SCHEMA chbenchmark_all_queries CASCADE; diff --git a/src/test/regress/sql/multi_join_order_additional.sql b/src/test/regress/sql/multi_join_order_additional.sql index 4d6b1749b..33d610b96 100644 --- a/src/test/regress/sql/multi_join_order_additional.sql +++ b/src/test/regress/sql/multi_join_order_additional.sql @@ -108,8 +108,24 @@ EXPLAIN SELECT count(*) FROM orders, customer_hash EXPLAIN SELECT count(*) FROM orders_hash, customer_append WHERE c_custkey = o_custkey; --- Reset client logging level to its previous value +-- Validate a 4 way join that could be done locally is planned as such by the logical +-- planner. It used to be planned as a repartition join due to no 1 table being directly +-- joined to all other tables, but instead follows a chain. +EXPLAIN SELECT count(*) +FROM ( + SELECT users_table.user_id + FROM users_table + JOIN events_table USING (user_id) + WHERE event_type = 5 +) AS bar +JOIN ( + SELECT users_table.user_id + FROM users_table + JOIN events_table USING (user_id) + WHERE event_type = 5 +) AS some_users ON (some_users.user_id = bar.user_id); +-- Reset client logging level to its previous value SET client_min_messages TO NOTICE; DROP TABLE lineitem_hash;