Fix: Unnecessary repartition on joins with more than 4 tables (#3473)

DESCRIPTION: Fix unnecessary repartition on joins with more than 4 tables

In 9.1 we have introduced support for all CH-benCHmark queries by widening our definitions of joins to include joins with expressions in them. This had the undesired side effect of Q5 regressing on its plan by implementing a repartition join.

It turned out this regression was not directly related to widening of the join clause, nor the schema employed by CH-benCHmark. Instead it had to do with 4 or more tables being joined in a chain. A chain meaning:

```sql
SELECT * FROM a,b,c,d WHERE a.part = b.part AND b.part = c.part AND ....
```

Due to how our join order planner was implemented it would only keep track of 1 of the partition columns when comparing if the join could be executed locally. This manifested in a join chain of 4 tables to _always_ be executed as a repartition join. 3 tables joined in a chain would have the middle table shared by the two outer tables causing the local join possibility to be found.

With this patch we keep a  unique list (or set) of all partition columns participating in the join. When a candidate table is checked for a possibility to execute a local join it will check if there is any partition column in that set that matches an equality join clause on the partition column of the candidate table.

By taking into account all partition columns in the left relation it will now find the local join path on >= 4 tables joined in a chain. 

fixes: #3276
pull/3478/head^2
Nils Dijk 2020-02-06 15:07:07 +01:00 committed by GitHub
parent 345455d765
commit d5433400f9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 435 additions and 102 deletions

View File

@ -82,7 +82,7 @@ static JoinOrderNode * CartesianProductReferenceJoin(JoinOrderNode *joinNode,
JoinType joinType);
static JoinOrderNode * LocalJoin(JoinOrderNode *joinNode, TableEntry *candidateTable,
List *applicableJoinClauses, JoinType joinType);
static bool JoinOnColumns(Var *currentPartitioncolumn, Var *candidatePartitionColumn,
static bool JoinOnColumns(List *currentPartitionColumnList, Var *candidatePartitionColumn,
List *joinClauseList);
static JoinOrderNode * SinglePartitionJoin(JoinOrderNode *joinNode,
TableEntry *candidateTable,
@ -96,9 +96,9 @@ static JoinOrderNode * CartesianProduct(JoinOrderNode *joinNode,
TableEntry *candidateTable,
List *applicableJoinClauses,
JoinType joinType);
static JoinOrderNode * MakeJoinOrderNode(TableEntry *tableEntry, JoinRuleType
joinRuleType, Var *partitionColumn,
char partitionMethod,
static JoinOrderNode * MakeJoinOrderNode(TableEntry *tableEntry,
JoinRuleType joinRuleType,
List *partitionColumnList, char partitionMethod,
TableEntry *anchorTable);
@ -214,9 +214,10 @@ ExtractLeftMostRangeTableIndex(Node *node, int *rangeTableIndex)
* JoinOnColumns determines whether two columns are joined by a given join clause list.
*/
static bool
JoinOnColumns(Var *currentColumn, Var *candidateColumn, List *joinClauseList)
JoinOnColumns(List *currentPartitionColumnList, Var *candidateColumn,
List *joinClauseList)
{
if (currentColumn == NULL || candidateColumn == NULL)
if (candidateColumn == NULL || list_length(currentPartitionColumnList) == 0)
{
/*
* LocalJoin can only be happening if we have both a current column and a target
@ -225,31 +226,35 @@ JoinOnColumns(Var *currentColumn, Var *candidateColumn, List *joinClauseList)
return false;
}
Node *joinClause = NULL;
foreach_ptr(joinClause, joinClauseList)
Var *currentColumn = NULL;
foreach_ptr(currentColumn, currentPartitionColumnList)
{
if (!NodeIsEqualsOpExpr(joinClause))
Node *joinClause = NULL;
foreach_ptr(joinClause, joinClauseList)
{
continue;
}
OpExpr *joinClauseOpExpr = castNode(OpExpr, joinClause);
Var *leftColumn = LeftColumnOrNULL(joinClauseOpExpr);
Var *rightColumn = RightColumnOrNULL(joinClauseOpExpr);
if (!NodeIsEqualsOpExpr(joinClause))
{
continue;
}
OpExpr *joinClauseOpExpr = castNode(OpExpr, joinClause);
Var *leftColumn = LeftColumnOrNULL(joinClauseOpExpr);
Var *rightColumn = RightColumnOrNULL(joinClauseOpExpr);
/*
* Check if both join columns and both partition key columns match, since the
* current and candidate column's can't be NULL we know they won't match if either
* of the columns resolved to NULL above.
*/
if (equal(leftColumn, currentColumn) &&
equal(rightColumn, candidateColumn))
{
return true;
}
if (equal(leftColumn, candidateColumn) &&
equal(rightColumn, currentColumn))
{
return true;
/*
* Check if both join columns and both partition key columns match, since the
* current and candidate column's can't be NULL we know they won't match if either
* of the columns resolved to NULL above.
*/
if (equal(leftColumn, currentColumn) &&
equal(rightColumn, candidateColumn))
{
return true;
}
if (equal(leftColumn, candidateColumn) &&
equal(rightColumn, currentColumn))
{
return true;
}
}
}
@ -343,7 +348,7 @@ JoinOrderForTable(TableEntry *firstTable, List *tableEntryList, List *joinClause
char firstPartitionMethod = PartitionMethod(firstRelationId);
JoinOrderNode *firstJoinNode = MakeJoinOrderNode(firstTable, firstJoinRule,
firstPartitionColumn,
list_make1(firstPartitionColumn),
firstPartitionMethod,
firstTable);
@ -825,7 +830,7 @@ ReferenceJoin(JoinOrderNode *currentJoinNode, TableEntry *candidateTable,
return NULL;
}
return MakeJoinOrderNode(candidateTable, REFERENCE_JOIN,
currentJoinNode->partitionColumn,
currentJoinNode->partitionColumnList,
currentJoinNode->partitionMethod,
currentJoinNode->anchorTable);
}
@ -876,7 +881,7 @@ CartesianProductReferenceJoin(JoinOrderNode *currentJoinNode, TableEntry *candid
return NULL;
}
return MakeJoinOrderNode(candidateTable, CARTESIAN_PRODUCT_REFERENCE_JOIN,
currentJoinNode->partitionColumn,
currentJoinNode->partitionColumnList,
currentJoinNode->partitionMethod,
currentJoinNode->anchorTable);
}
@ -901,7 +906,7 @@ LocalJoin(JoinOrderNode *currentJoinNode, TableEntry *candidateTable,
Oid relationId = candidateTable->relationId;
uint32 tableId = candidateTable->rangeTableId;
Var *candidatePartitionColumn = PartitionColumn(relationId, tableId);
Var *currentPartitionColumn = currentJoinNode->partitionColumn;
List *currentPartitionColumnList = currentJoinNode->partitionColumnList;
char candidatePartitionMethod = PartitionMethod(relationId);
char currentPartitionMethod = currentJoinNode->partitionMethod;
TableEntry *currentAnchorTable = currentJoinNode->anchorTable;
@ -921,7 +926,7 @@ LocalJoin(JoinOrderNode *currentJoinNode, TableEntry *candidateTable,
return NULL;
}
bool joinOnPartitionColumns = JoinOnColumns(currentPartitionColumn,
bool joinOnPartitionColumns = JoinOnColumns(currentPartitionColumnList,
candidatePartitionColumn,
applicableJoinClauses);
if (!joinOnPartitionColumns)
@ -938,8 +943,17 @@ LocalJoin(JoinOrderNode *currentJoinNode, TableEntry *candidateTable,
return NULL;
}
/*
* Since we are applying a local join to the candidate table we need to keep track of
* the partition column of the candidate table on the MultiJoinNode. This will allow
* subsequent joins colocated with this candidate table to correctly be recognized as
* a local join as well.
*/
currentPartitionColumnList = list_append_unique(currentPartitionColumnList,
candidatePartitionColumn);
JoinOrderNode *nextJoinNode = MakeJoinOrderNode(candidateTable, LOCAL_PARTITION_JOIN,
currentPartitionColumn,
currentPartitionColumnList,
currentPartitionMethod,
currentAnchorTable);
@ -959,7 +973,7 @@ static JoinOrderNode *
SinglePartitionJoin(JoinOrderNode *currentJoinNode, TableEntry *candidateTable,
List *applicableJoinClauses, JoinType joinType)
{
Var *currentPartitionColumn = currentJoinNode->partitionColumn;
List *currentPartitionColumnList = currentJoinNode->partitionColumnList;
char currentPartitionMethod = currentJoinNode->partitionMethod;
TableEntry *currentAnchorTable = currentJoinNode->anchorTable;
JoinRuleType currentJoinRuleType = currentJoinNode->joinRuleType;
@ -987,7 +1001,7 @@ SinglePartitionJoin(JoinOrderNode *currentJoinNode, TableEntry *candidateTable,
}
OpExpr *joinClause =
SinglePartitionJoinClause(currentPartitionColumn, applicableJoinClauses);
SinglePartitionJoinClause(currentPartitionColumnList, applicableJoinClauses);
if (joinClause != NULL)
{
if (currentPartitionMethod == DISTRIBUTE_BY_HASH)
@ -1002,14 +1016,14 @@ SinglePartitionJoin(JoinOrderNode *currentJoinNode, TableEntry *candidateTable,
}
return MakeJoinOrderNode(candidateTable, SINGLE_HASH_PARTITION_JOIN,
currentPartitionColumn,
currentPartitionColumnList,
currentPartitionMethod,
currentAnchorTable);
}
else
{
return MakeJoinOrderNode(candidateTable, SINGLE_RANGE_PARTITION_JOIN,
currentPartitionColumn,
currentPartitionColumnList,
currentPartitionMethod,
currentAnchorTable);
}
@ -1018,7 +1032,13 @@ SinglePartitionJoin(JoinOrderNode *currentJoinNode, TableEntry *candidateTable,
/* evaluate re-partitioning the current table only if the rule didn't apply above */
if (candidatePartitionMethod != DISTRIBUTE_BY_NONE)
{
joinClause = SinglePartitionJoinClause(candidatePartitionColumn,
/*
* Create a new unique list (set) with the partition column of the candidate table
* to check if a single repartition join will work for this table. When it works
* the set is retained on the MultiJoinNode for later local join verification.
*/
List *candidatePartitionColumnList = list_make1(candidatePartitionColumn);
joinClause = SinglePartitionJoinClause(candidatePartitionColumnList,
applicableJoinClauses);
if (joinClause != NULL)
{
@ -1035,7 +1055,7 @@ SinglePartitionJoin(JoinOrderNode *currentJoinNode, TableEntry *candidateTable,
return MakeJoinOrderNode(candidateTable,
SINGLE_HASH_PARTITION_JOIN,
candidatePartitionColumn,
candidatePartitionColumnList,
candidatePartitionMethod,
candidateTable);
}
@ -1043,7 +1063,7 @@ SinglePartitionJoin(JoinOrderNode *currentJoinNode, TableEntry *candidateTable,
{
return MakeJoinOrderNode(candidateTable,
SINGLE_RANGE_PARTITION_JOIN,
candidatePartitionColumn,
candidatePartitionColumnList,
candidatePartitionMethod,
candidateTable);
}
@ -1060,45 +1080,50 @@ SinglePartitionJoin(JoinOrderNode *currentJoinNode, TableEntry *candidateTable,
* clause exists, the function returns NULL.
*/
OpExpr *
SinglePartitionJoinClause(Var *partitionColumn, List *applicableJoinClauses)
SinglePartitionJoinClause(List *partitionColumnList, List *applicableJoinClauses)
{
if (partitionColumn == NULL)
if (list_length(partitionColumnList) == 0)
{
return NULL;
}
Node *applicableJoinClause = NULL;
foreach_ptr(applicableJoinClause, applicableJoinClauses)
Var *partitionColumn = NULL;
foreach_ptr(partitionColumn, partitionColumnList)
{
if (!NodeIsEqualsOpExpr(applicableJoinClause))
Node *applicableJoinClause = NULL;
foreach_ptr(applicableJoinClause, applicableJoinClauses)
{
continue;
}
OpExpr *applicableJoinOpExpr = castNode(OpExpr, applicableJoinClause);
Var *leftColumn = LeftColumnOrNULL(applicableJoinOpExpr);
Var *rightColumn = RightColumnOrNULL(applicableJoinOpExpr);
if (leftColumn == NULL || rightColumn == NULL)
{
/* not a simple partition column join */
continue;
}
/*
* We first check if partition column matches either of the join columns
* and if it does, we then check if the join column types match. If the
* types are different, we will use different hash functions for the two
* column types, and will incorrectly repartition the data.
*/
if (equal(leftColumn, partitionColumn) || equal(rightColumn, partitionColumn))
{
if (leftColumn->vartype == rightColumn->vartype)
if (!NodeIsEqualsOpExpr(applicableJoinClause))
{
return applicableJoinOpExpr;
continue;
}
else
OpExpr *applicableJoinOpExpr = castNode(OpExpr, applicableJoinClause);
Var *leftColumn = LeftColumnOrNULL(applicableJoinOpExpr);
Var *rightColumn = RightColumnOrNULL(applicableJoinOpExpr);
if (leftColumn == NULL || rightColumn == NULL)
{
ereport(DEBUG1, (errmsg("single partition column types do not match")));
/* not a simple partition column join */
continue;
}
/*
* We first check if partition column matches either of the join columns
* and if it does, we then check if the join column types match. If the
* types are different, we will use different hash functions for the two
* column types, and will incorrectly repartition the data.
*/
if (equal(leftColumn, partitionColumn) || equal(rightColumn, partitionColumn))
{
if (leftColumn->vartype == rightColumn->vartype)
{
return applicableJoinOpExpr;
}
else
{
ereport(DEBUG1, (errmsg("single partition column types do not "
"match")));
}
}
}
}
@ -1124,7 +1149,7 @@ DualPartitionJoin(JoinOrderNode *currentJoinNode, TableEntry *candidateTable,
/* because of the dual partition, anchor table and partition column get lost */
return MakeJoinOrderNode(candidateTable,
DUAL_PARTITION_JOIN,
NULL,
NIL,
REDISTRIBUTE_BY_HASH,
NULL);
}
@ -1185,7 +1210,7 @@ CartesianProduct(JoinOrderNode *currentJoinNode, TableEntry *candidateTable,
{
/* Because of the cartesian product, anchor table information got lost */
return MakeJoinOrderNode(candidateTable, CARTESIAN_PRODUCT,
currentJoinNode->partitionColumn,
currentJoinNode->partitionColumnList,
currentJoinNode->partitionMethod,
NULL);
}
@ -1197,13 +1222,14 @@ CartesianProduct(JoinOrderNode *currentJoinNode, TableEntry *candidateTable,
/* Constructs and returns a join-order node with the given arguments */
JoinOrderNode *
MakeJoinOrderNode(TableEntry *tableEntry, JoinRuleType joinRuleType,
Var *partitionColumn, char partitionMethod, TableEntry *anchorTable)
List *partitionColumnList, char partitionMethod,
TableEntry *anchorTable)
{
JoinOrderNode *joinOrderNode = palloc0(sizeof(JoinOrderNode));
joinOrderNode->tableEntry = tableEntry;
joinOrderNode->joinRuleType = joinRuleType;
joinOrderNode->joinType = JOIN_INNER;
joinOrderNode->partitionColumn = partitionColumn;
joinOrderNode->partitionColumnList = partitionColumnList;
joinOrderNode->partitionMethod = partitionMethod;
joinOrderNode->joinClauseList = NIL;
joinOrderNode->anchorTable = anchorTable;

View File

@ -64,7 +64,7 @@ typedef struct QualifierWalkerContext
/* Function pointer type definition for apply join rule functions */
typedef MultiNode *(*RuleApplyFunction) (MultiNode *leftNode, MultiNode *rightNode,
Var *partitionColumn, JoinType joinType,
List *partitionColumnList, JoinType joinType,
List *joinClauses);
static RuleApplyFunction RuleApplyFunctionArray[JOIN_RULE_LAST] = { 0 }; /* join rules */
@ -92,36 +92,38 @@ static bool IsSelectClause(Node *clause);
/* Local functions forward declarations for applying joins */
static MultiNode * ApplyJoinRule(MultiNode *leftNode, MultiNode *rightNode,
JoinRuleType ruleType, Var *partitionColumn,
JoinRuleType ruleType, List *partitionColumnList,
JoinType joinType, List *joinClauseList);
static RuleApplyFunction JoinRuleApplyFunction(JoinRuleType ruleType);
static MultiNode * ApplyReferenceJoin(MultiNode *leftNode, MultiNode *rightNode,
Var *partitionColumn, JoinType joinType,
List *partitionColumnList, JoinType joinType,
List *joinClauses);
static MultiNode * ApplyLocalJoin(MultiNode *leftNode, MultiNode *rightNode,
Var *partitionColumn, JoinType joinType,
List *partitionColumnList, JoinType joinType,
List *joinClauses);
static MultiNode * ApplySingleRangePartitionJoin(MultiNode *leftNode,
MultiNode *rightNode,
Var *partitionColumn, JoinType joinType,
List *partitionColumnList,
JoinType joinType,
List *applicableJoinClauses);
static MultiNode * ApplySingleHashPartitionJoin(MultiNode *leftNode,
MultiNode *rightNode,
Var *partitionColumn, JoinType joinType,
List *partitionColumnList,
JoinType joinType,
List *applicableJoinClauses);
static MultiJoin * ApplySinglePartitionJoin(MultiNode *leftNode, MultiNode *rightNode,
Var *partitionColumn, JoinType joinType,
List *partitionColumnList, JoinType joinType,
List *joinClauses);
static MultiNode * ApplyDualPartitionJoin(MultiNode *leftNode, MultiNode *rightNode,
Var *partitionColumn, JoinType joinType,
List *partitionColumnList, JoinType joinType,
List *joinClauses);
static MultiNode * ApplyCartesianProductReferenceJoin(MultiNode *leftNode,
MultiNode *rightNode,
Var *partitionColumn,
List *partitionColumnList,
JoinType joinType,
List *joinClauses);
static MultiNode * ApplyCartesianProduct(MultiNode *leftNode, MultiNode *rightNode,
Var *partitionColumn, JoinType joinType,
List *partitionColumnList, JoinType joinType,
List *joinClauses);
@ -1620,7 +1622,7 @@ MultiJoinTree(List *joinOrderList, List *collectTableList, List *joinWhereClause
{
JoinRuleType joinRuleType = joinOrderNode->joinRuleType;
JoinType joinType = joinOrderNode->joinType;
Var *partitionColumn = joinOrderNode->partitionColumn;
List *partitionColumnList = joinOrderNode->partitionColumnList;
List *joinClauseList = joinOrderNode->joinClauseList;
/*
@ -1629,7 +1631,7 @@ MultiJoinTree(List *joinOrderList, List *collectTableList, List *joinWhereClause
*/
MultiNode *newJoinNode = ApplyJoinRule(currentTopNode,
(MultiNode *) collectNode,
joinRuleType, partitionColumn,
joinRuleType, partitionColumnList,
joinType,
joinClauseList);
@ -1976,7 +1978,7 @@ pull_var_clause_default(Node *node)
*/
static MultiNode *
ApplyJoinRule(MultiNode *leftNode, MultiNode *rightNode, JoinRuleType ruleType,
Var *partitionColumn, JoinType joinType, List *joinClauseList)
List *partitionColumnList, JoinType joinType, List *joinClauseList)
{
List *leftTableIdList = OutputTableIdList(leftNode);
List *rightTableIdList = OutputTableIdList(rightNode);
@ -1992,7 +1994,7 @@ ApplyJoinRule(MultiNode *leftNode, MultiNode *rightNode, JoinRuleType ruleType,
/* call the join rule application function to create the new join node */
RuleApplyFunction ruleApplyFunction = JoinRuleApplyFunction(ruleType);
MultiNode *multiNode = (*ruleApplyFunction)(leftNode, rightNode, partitionColumn,
MultiNode *multiNode = (*ruleApplyFunction)(leftNode, rightNode, partitionColumnList,
joinType, applicableJoinClauses);
if (joinType != JOIN_INNER && CitusIsA(multiNode, MultiJoin))
@ -2047,7 +2049,7 @@ JoinRuleApplyFunction(JoinRuleType ruleType)
*/
static MultiNode *
ApplyReferenceJoin(MultiNode *leftNode, MultiNode *rightNode,
Var *partitionColumn, JoinType joinType,
List *partitionColumnList, JoinType joinType,
List *applicableJoinClauses)
{
MultiJoin *joinNode = CitusMakeNode(MultiJoin);
@ -2069,7 +2071,7 @@ ApplyReferenceJoin(MultiNode *leftNode, MultiNode *rightNode,
*/
static MultiNode *
ApplyCartesianProductReferenceJoin(MultiNode *leftNode, MultiNode *rightNode,
Var *partitionColumn, JoinType joinType,
List *partitionColumnList, JoinType joinType,
List *applicableJoinClauses)
{
MultiJoin *joinNode = CitusMakeNode(MultiJoin);
@ -2090,7 +2092,7 @@ ApplyCartesianProductReferenceJoin(MultiNode *leftNode, MultiNode *rightNode,
*/
static MultiNode *
ApplyLocalJoin(MultiNode *leftNode, MultiNode *rightNode,
Var *partitionColumn, JoinType joinType,
List *partitionColumnList, JoinType joinType,
List *applicableJoinClauses)
{
MultiJoin *joinNode = CitusMakeNode(MultiJoin);
@ -2111,11 +2113,11 @@ ApplyLocalJoin(MultiNode *leftNode, MultiNode *rightNode,
*/
static MultiNode *
ApplySingleRangePartitionJoin(MultiNode *leftNode, MultiNode *rightNode,
Var *partitionColumn, JoinType joinType,
List *partitionColumnList, JoinType joinType,
List *applicableJoinClauses)
{
MultiJoin *joinNode =
ApplySinglePartitionJoin(leftNode, rightNode, partitionColumn, joinType,
ApplySinglePartitionJoin(leftNode, rightNode, partitionColumnList, joinType,
applicableJoinClauses);
joinNode->joinRuleType = SINGLE_RANGE_PARTITION_JOIN;
@ -2130,11 +2132,11 @@ ApplySingleRangePartitionJoin(MultiNode *leftNode, MultiNode *rightNode,
*/
static MultiNode *
ApplySingleHashPartitionJoin(MultiNode *leftNode, MultiNode *rightNode,
Var *partitionColumn, JoinType joinType,
List *partitionColumnList, JoinType joinType,
List *applicableJoinClauses)
{
MultiJoin *joinNode =
ApplySinglePartitionJoin(leftNode, rightNode, partitionColumn, joinType,
ApplySinglePartitionJoin(leftNode, rightNode, partitionColumnList, joinType,
applicableJoinClauses);
joinNode->joinRuleType = SINGLE_HASH_PARTITION_JOIN;
@ -2150,9 +2152,10 @@ ApplySingleHashPartitionJoin(MultiNode *leftNode, MultiNode *rightNode,
*/
static MultiJoin *
ApplySinglePartitionJoin(MultiNode *leftNode, MultiNode *rightNode,
Var *partitionColumn, JoinType joinType,
List *partitionColumnList, JoinType joinType,
List *applicableJoinClauses)
{
Var *partitionColumn = linitial(partitionColumnList);
uint32 partitionTableId = partitionColumn->varno;
/* create all operator structures up front */
@ -2165,7 +2168,7 @@ ApplySinglePartitionJoin(MultiNode *leftNode, MultiNode *rightNode,
* column against the join clause's columns. If one of the columns matches,
* we introduce a (re-)partition operator for the other column.
*/
OpExpr *joinClause = SinglePartitionJoinClause(partitionColumn,
OpExpr *joinClause = SinglePartitionJoinClause(partitionColumnList,
applicableJoinClauses);
Assert(joinClause != NULL);
@ -2230,7 +2233,7 @@ ApplySinglePartitionJoin(MultiNode *leftNode, MultiNode *rightNode,
*/
static MultiNode *
ApplyDualPartitionJoin(MultiNode *leftNode, MultiNode *rightNode,
Var *partitionColumn, JoinType joinType,
List *partitionColumnList, JoinType joinType,
List *applicableJoinClauses)
{
/* find the appropriate join clause */
@ -2289,7 +2292,7 @@ ApplyDualPartitionJoin(MultiNode *leftNode, MultiNode *rightNode,
/* Creates a cartesian product node that joins the left and the right node. */
static MultiNode *
ApplyCartesianProduct(MultiNode *leftNode, MultiNode *rightNode,
Var *partitionColumn, JoinType joinType,
List *partitionColumnList, JoinType joinType,
List *applicableJoinClauses)
{
MultiCartesianProduct *cartesianNode = CitusMakeNode(MultiCartesianProduct);

View File

@ -70,7 +70,13 @@ typedef struct JoinOrderNode
TableEntry *tableEntry; /* this node's relation and range table id */
JoinRuleType joinRuleType; /* not relevant for the first table */
JoinType joinType; /* not relevant for the first table */
Var *partitionColumn; /* not relevant for the first table */
/*
* We keep track of all unique partition columns in the relation to correctly find
* join clauses that can be applied locally.
*/
List *partitionColumnList;
char partitionMethod;
List *joinClauseList; /* not relevant for the first table */
TableEntry *anchorTable;
@ -92,7 +98,7 @@ extern List * ApplicableJoinClauses(List *leftTableIdList, uint32 rightTableId,
extern bool NodeIsEqualsOpExpr(Node *node);
extern bool IsSupportedReferenceJoin(JoinType joinType, bool leftIsReferenceTable,
bool rightIsReferenceTable);
extern OpExpr * SinglePartitionJoinClause(Var *partitionColumn,
extern OpExpr * SinglePartitionJoinClause(List *partitionColumnList,
List *applicableJoinClauses);
extern OpExpr * DualPartitionJoinClause(List *applicableJoinClauses);
extern Var * LeftColumnOrNULL(OpExpr *joinClause);

View File

@ -1,6 +1,11 @@
SET citus.next_shard_id TO 1650000;
CREATE SCHEMA chbenchmark_all_queries;
SET search_path TO chbenchmark_all_queries;
-- we want to make sure the join order is stable. If the join order of a table changes due
-- to a chacnge you are making, please verify if it is not a regression. If the join order
-- became better feel free to update the output.
SET citus.log_multi_join_order TO on;
SET client_min_messages TO log;
SET citus.enable_repartition_joins TO on;
CREATE TABLE order_line (
ol_w_id int NOT NULL,
@ -245,6 +250,7 @@ FROM order_line
WHERE ol_delivery_d > '2007-01-02 00:00:00.000000'
GROUP BY ol_number
ORDER BY ol_number;
LOG: join order: [ "order_line" ]
ol_number | sum_qty | sum_amount | avg_qty | avg_amount | count_order
---------------------------------------------------------------------
0 | 0 | 0.00 | 0.00000000000000000000 | 0.00000000000000000000 | 1
@ -301,6 +307,7 @@ ORDER BY
n_name,
su_name,
i_id;
LOG: join order: [ "stock" ][ reference join "supplier" ][ reference join "nation" ][ reference join "region" ]
su_suppkey | su_name | n_name | i_id | i_name | su_address | su_phone | su_comment
---------------------------------------------------------------------
9 | abc | Germany | 3 | Keyboard | def | ghi | jkl
@ -338,6 +345,7 @@ GROUP BY
ORDER BY
revenue DESC,
o_entry_d;
LOG: join order: [ "customer" ][ local partition join "oorder" ][ local partition join "new_order" ][ local partition join "order_line" ]
ol_o_id | ol_w_id | ol_d_id | revenue | o_entry_d
---------------------------------------------------------------------
10 | 10 | 10 | 10.00 | Fri Oct 17 00:00:00 2008
@ -406,6 +414,7 @@ WHERE c_id = o_c_id
AND o_entry_d >= '2007-01-02 00:00:00.000000'
GROUP BY n_name
ORDER BY revenue DESC;
LOG: join order: [ "customer" ][ local partition join "oorder" ][ local partition join "order_line" ][ local partition join "stock" ][ reference join "supplier" ][ reference join "nation" ][ reference join "region" ]
n_name | revenue
---------------------------------------------------------------------
Germany | 3.00
@ -419,6 +428,7 @@ FROM order_line
WHERE ol_delivery_d >= '1999-01-01 00:00:00.000000'
AND ol_delivery_d < '2020-01-01 00:00:00.000000'
AND ol_quantity BETWEEN 1 AND 100000;
LOG: join order: [ "order_line" ]
revenue
---------------------------------------------------------------------
55.00
@ -462,6 +472,7 @@ ORDER BY
su_nationkey,
cust_nation,
l_year;
LOG: join order: [ "order_line" ][ local partition join "oorder" ][ local partition join "customer" ][ reference join "nation" ][ reference join "nation" ][ reference join "supplier" ][ dual partition join "stock" ]
supp_nation | cust_nation | l_year | revenue
---------------------------------------------------------------------
9 | C | 2008 | 3.00
@ -501,6 +512,7 @@ WHERE i_id = s_i_id
AND i_id = ol_i_id
GROUP BY extract(YEAR FROM o_entry_d)
ORDER BY l_year;
LOG: join order: [ "order_line" ][ reference join "item" ][ local partition join "oorder" ][ local partition join "customer" ][ reference join "nation" ][ reference join "region" ][ dual partition join "stock" ][ reference join "supplier" ][ reference join "nation" ]
l_year | mkt_share
---------------------------------------------------------------------
2008 | 0.50000000000000000000
@ -533,6 +545,7 @@ GROUP BY
ORDER BY
n_name,
l_year DESC;
LOG: join order: [ "order_line" ][ reference join "item" ][ local partition join "oorder" ][ dual partition join "stock" ][ reference join "supplier" ][ reference join "nation" ]
n_name | l_year | sum_profit
---------------------------------------------------------------------
Germany | 2008 | 3.00
@ -569,6 +582,7 @@ GROUP BY
c_phone,
n_name
ORDER BY revenue DESC;
LOG: join order: [ "customer" ][ reference join "nation" ][ local partition join "oorder" ][ local partition join "order_line" ]
c_id | c_last | revenue | c_city | c_phone | n_name
---------------------------------------------------------------------
10 | John | 10.00 | Some City | +1 000 0000000 | Cambodia
@ -606,6 +620,7 @@ HAVING sum(s_order_cnt) >
AND su_nationkey = n_nationkey
AND n_name = 'Germany')
ORDER BY ordercount DESC;
LOG: join order: [ "stock" ][ reference join "supplier" ][ reference join "nation" ]
s_i_id | ordercount
---------------------------------------------------------------------
3 | 3
@ -626,6 +641,7 @@ WHERE ol_w_id = o_w_id
AND ol_delivery_d < '2020-01-01 00:00:00.000000'
GROUP BY o_ol_cnt
ORDER BY o_ol_cnt;
LOG: join order: [ "oorder" ][ local partition join "order_line" ]
o_ol_cnt | high_line_count | low_line_count
---------------------------------------------------------------------
1 | 2 | 9
@ -664,6 +680,7 @@ FROM
WHERE ol_i_id = i_id
AND ol_delivery_d >= '2007-01-02 00:00:00.000000'
AND ol_delivery_d < '2020-01-02 00:00:00.000000';
LOG: join order: [ "order_line" ][ reference join "item" ]
promo_revenue
---------------------------------------------------------------------
0.00000000000000000000
@ -693,6 +710,7 @@ FROM
WHERE su_suppkey = supplier_no
AND total_revenue = (SELECT max(total_revenue) FROM revenue)
ORDER BY su_suppkey;
LOG: join order: [ "order_line" ][ dual partition join "stock" ]
su_suppkey | su_name | su_address | su_phone | total_revenue
---------------------------------------------------------------------
9 | abc | def | ghi | 3.00
@ -738,6 +756,7 @@ FROM
AND ol_i_id = i_id
GROUP BY i_id) t
WHERE ol_i_id = t.i_id;
LOG: join order: [ "order_line" ][ reference join "item" ]
avg_yearly
---------------------------------------------------------------------
27.5000000000000000
@ -775,6 +794,7 @@ HAVING sum(ol_amount) > 5 -- was 200, but thats too big for the dataset
ORDER BY
sum(ol_amount) DESC,
o_entry_d;
LOG: join order: [ "customer" ][ local partition join "oorder" ][ local partition join "order_line" ]
c_last | o_id | o_entry_d | o_ol_cnt | sum
---------------------------------------------------------------------
John | 10 | Fri Oct 17 00:00:00 2008 | 1 | 10.00
@ -808,6 +828,7 @@ WHERE ( ol_i_id = i_id
AND ol_quantity <= 10
AND i_price BETWEEN 1 AND 400000
AND ol_w_id IN (1,5,3));
LOG: join order: [ "order_line" ][ reference join "item" ]
revenue
---------------------------------------------------------------------
7.00
@ -837,6 +858,7 @@ WHERE su_suppkey in
AND su_nationkey = n_nationkey
AND n_name = 'Germany'
ORDER BY su_name;
LOG: join order: [ "stock" ][ reference join "item" ][ dual partition join "order_line" ]
su_name | su_address
---------------------------------------------------------------------
abc | def
@ -895,9 +917,132 @@ WHERE substr(c_phone,1,1) in ('1','2','3','4','5','6','7')
AND o_d_id = c_d_id)
GROUP BY substr(c_state,1,1)
ORDER BY substr(c_state,1,1);
LOG: join order: [ "customer" ]
country | numcust | totacctbal
---------------------------------------------------------------------
(0 rows)
-- There are some queries that have specific interactions with single repartition.
-- Here we test Q7-Q9 with single repartition enabled
SET citus.enable_single_hash_repartition_joins TO on;
-- Query 7
SELECT
su_nationkey as supp_nation,
substr(c_state,1,1) as cust_nation,
extract(year from o_entry_d) as l_year,
sum(ol_amount) as revenue
FROM
supplier,
stock,
order_line,
oorder,
customer,
nation n1,
nation n2
WHERE ol_supply_w_id = s_w_id
AND ol_i_id = s_i_id
AND mod((s_w_id * s_i_id), 10000) = su_suppkey
AND ol_w_id = o_w_id
AND ol_d_id = o_d_id
AND ol_o_id = o_id
AND c_id = o_c_id
AND c_w_id = o_w_id
AND c_d_id = o_d_id
AND su_nationkey = n1.n_nationkey
AND ascii(substr(c_state,1,1)) = n2.n_nationkey
AND (
(n1.n_name = 'Germany' AND n2.n_name = 'Cambodia')
OR (n1.n_name = 'Cambodia' AND n2.n_name = 'Germany')
)
AND ol_delivery_d BETWEEN '2007-01-02 00:00:00.000000' AND '2012-01-02 00:00:00.000000'
GROUP BY
su_nationkey,
substr(c_state,1,1),
extract(year from o_entry_d)
ORDER BY
su_nationkey,
cust_nation,
l_year;
LOG: join order: [ "order_line" ][ local partition join "oorder" ][ local partition join "customer" ][ reference join "nation" ][ reference join "nation" ][ reference join "supplier" ][ single hash partition join "stock" ]
supp_nation | cust_nation | l_year | revenue
---------------------------------------------------------------------
9 | C | 2008 | 3.00
(1 row)
-- Query 8
SELECT
extract(year from o_entry_d) as l_year,
sum(case when n2.n_name = 'Germany' then ol_amount else 0 end) / sum(ol_amount) as mkt_share
FROM
item,
supplier,
stock,
order_line,
oorder,
customer,
nation n1,
nation n2,
region
WHERE i_id = s_i_id
AND ol_i_id = s_i_id
AND ol_supply_w_id = s_w_id
AND mod((s_w_id * s_i_id),10000) = su_suppkey
AND ol_w_id = o_w_id
AND ol_d_id = o_d_id
AND ol_o_id = o_id
AND c_id = o_c_id
AND c_w_id = o_w_id
AND c_d_id = o_d_id
AND n1.n_nationkey = ascii(substr(c_state,1,1))
AND n1.n_regionkey = r_regionkey
AND ol_i_id < 1000
AND r_name = 'Europe'
AND su_nationkey = n2.n_nationkey
AND o_entry_d BETWEEN '2007-01-02 00:00:00.000000' AND '2012-01-02 00:00:00.000000'
AND i_data LIKE '%b'
AND i_id = ol_i_id
GROUP BY extract(YEAR FROM o_entry_d)
ORDER BY l_year;
LOG: join order: [ "order_line" ][ reference join "item" ][ local partition join "oorder" ][ local partition join "customer" ][ reference join "nation" ][ reference join "region" ][ single hash partition join "stock" ][ reference join "supplier" ][ reference join "nation" ]
l_year | mkt_share
---------------------------------------------------------------------
2008 | 0.50000000000000000000
(1 row)
-- Query 9
SELECT
n_name,
extract(year from o_entry_d) as l_year,
sum(ol_amount) as sum_profit
FROM
item,
stock,
supplier,
order_line,
oorder,
nation
WHERE ol_i_id = s_i_id
AND ol_supply_w_id = s_w_id
AND mod((s_w_id * s_i_id), 10000) = su_suppkey
AND ol_w_id = o_w_id
AND ol_d_id = o_d_id
AND ol_o_id = o_id
AND ol_i_id = i_id
AND su_nationkey = n_nationkey
AND i_data LIKE '%b' -- this used to be %BB but that will not work with our small dataset
GROUP BY
n_name,
extract(YEAR FROM o_entry_d)
ORDER BY
n_name,
l_year DESC;
LOG: join order: [ "order_line" ][ reference join "item" ][ local partition join "oorder" ][ single hash partition join "stock" ][ reference join "supplier" ][ reference join "nation" ]
n_name | l_year | sum_profit
---------------------------------------------------------------------
Germany | 2008 | 3.00
The Netherlands | 2008 | 2.00
United States | 2008 | 1.00
(3 rows)
SET client_min_messages TO WARNING;
DROP SCHEMA chbenchmark_all_queries CASCADE;

View File

@ -168,6 +168,30 @@ LOG: join order: [ "orders_hash" ][ single range partition join "customer_appen
explain statements for distributed queries are not enabled
(3 rows)
-- Validate a 4 way join that could be done locally is planned as such by the logical
-- planner. It used to be planned as a repartition join due to no 1 table being directly
-- joined to all other tables, but instead follows a chain.
EXPLAIN SELECT count(*)
FROM (
SELECT users_table.user_id
FROM users_table
JOIN events_table USING (user_id)
WHERE event_type = 5
) AS bar
JOIN (
SELECT users_table.user_id
FROM users_table
JOIN events_table USING (user_id)
WHERE event_type = 5
) AS some_users ON (some_users.user_id = bar.user_id);
LOG: join order: [ "users_table" ][ local partition join "events_table" ][ local partition join "users_table" ][ local partition join "events_table" ]
QUERY PLAN
---------------------------------------------------------------------
Aggregate (cost=0.00..0.00 rows=0 width=0)
-> Custom Scan (Citus Task-Tracker) (cost=0.00..0.00 rows=0 width=0)
explain statements for distributed queries are not enabled
(3 rows)
-- Reset client logging level to its previous value
SET client_min_messages TO NOTICE;
DROP TABLE lineitem_hash;

View File

@ -2,6 +2,12 @@ SET citus.next_shard_id TO 1650000;
CREATE SCHEMA chbenchmark_all_queries;
SET search_path TO chbenchmark_all_queries;
-- we want to make sure the join order is stable. If the join order of a table changes due
-- to a chacnge you are making, please verify if it is not a regression. If the join order
-- became better feel free to update the output.
SET citus.log_multi_join_order TO on;
SET client_min_messages TO log;
SET citus.enable_repartition_joins TO on;
CREATE TABLE order_line (
ol_w_id int NOT NULL,
@ -718,5 +724,112 @@ WHERE substr(c_phone,1,1) in ('1','2','3','4','5','6','7')
GROUP BY substr(c_state,1,1)
ORDER BY substr(c_state,1,1);
-- There are some queries that have specific interactions with single repartition.
-- Here we test Q7-Q9 with single repartition enabled
SET citus.enable_single_hash_repartition_joins TO on;
-- Query 7
SELECT
su_nationkey as supp_nation,
substr(c_state,1,1) as cust_nation,
extract(year from o_entry_d) as l_year,
sum(ol_amount) as revenue
FROM
supplier,
stock,
order_line,
oorder,
customer,
nation n1,
nation n2
WHERE ol_supply_w_id = s_w_id
AND ol_i_id = s_i_id
AND mod((s_w_id * s_i_id), 10000) = su_suppkey
AND ol_w_id = o_w_id
AND ol_d_id = o_d_id
AND ol_o_id = o_id
AND c_id = o_c_id
AND c_w_id = o_w_id
AND c_d_id = o_d_id
AND su_nationkey = n1.n_nationkey
AND ascii(substr(c_state,1,1)) = n2.n_nationkey
AND (
(n1.n_name = 'Germany' AND n2.n_name = 'Cambodia')
OR (n1.n_name = 'Cambodia' AND n2.n_name = 'Germany')
)
AND ol_delivery_d BETWEEN '2007-01-02 00:00:00.000000' AND '2012-01-02 00:00:00.000000'
GROUP BY
su_nationkey,
substr(c_state,1,1),
extract(year from o_entry_d)
ORDER BY
su_nationkey,
cust_nation,
l_year;
-- Query 8
SELECT
extract(year from o_entry_d) as l_year,
sum(case when n2.n_name = 'Germany' then ol_amount else 0 end) / sum(ol_amount) as mkt_share
FROM
item,
supplier,
stock,
order_line,
oorder,
customer,
nation n1,
nation n2,
region
WHERE i_id = s_i_id
AND ol_i_id = s_i_id
AND ol_supply_w_id = s_w_id
AND mod((s_w_id * s_i_id),10000) = su_suppkey
AND ol_w_id = o_w_id
AND ol_d_id = o_d_id
AND ol_o_id = o_id
AND c_id = o_c_id
AND c_w_id = o_w_id
AND c_d_id = o_d_id
AND n1.n_nationkey = ascii(substr(c_state,1,1))
AND n1.n_regionkey = r_regionkey
AND ol_i_id < 1000
AND r_name = 'Europe'
AND su_nationkey = n2.n_nationkey
AND o_entry_d BETWEEN '2007-01-02 00:00:00.000000' AND '2012-01-02 00:00:00.000000'
AND i_data LIKE '%b'
AND i_id = ol_i_id
GROUP BY extract(YEAR FROM o_entry_d)
ORDER BY l_year;
-- Query 9
SELECT
n_name,
extract(year from o_entry_d) as l_year,
sum(ol_amount) as sum_profit
FROM
item,
stock,
supplier,
order_line,
oorder,
nation
WHERE ol_i_id = s_i_id
AND ol_supply_w_id = s_w_id
AND mod((s_w_id * s_i_id), 10000) = su_suppkey
AND ol_w_id = o_w_id
AND ol_d_id = o_d_id
AND ol_o_id = o_id
AND ol_i_id = i_id
AND su_nationkey = n_nationkey
AND i_data LIKE '%b' -- this used to be %BB but that will not work with our small dataset
GROUP BY
n_name,
extract(YEAR FROM o_entry_d)
ORDER BY
n_name,
l_year DESC;
SET client_min_messages TO WARNING;
DROP SCHEMA chbenchmark_all_queries CASCADE;

View File

@ -108,8 +108,24 @@ EXPLAIN SELECT count(*) FROM orders, customer_hash
EXPLAIN SELECT count(*) FROM orders_hash, customer_append
WHERE c_custkey = o_custkey;
-- Reset client logging level to its previous value
-- Validate a 4 way join that could be done locally is planned as such by the logical
-- planner. It used to be planned as a repartition join due to no 1 table being directly
-- joined to all other tables, but instead follows a chain.
EXPLAIN SELECT count(*)
FROM (
SELECT users_table.user_id
FROM users_table
JOIN events_table USING (user_id)
WHERE event_type = 5
) AS bar
JOIN (
SELECT users_table.user_id
FROM users_table
JOIN events_table USING (user_id)
WHERE event_type = 5
) AS some_users ON (some_users.user_id = bar.user_id);
-- Reset client logging level to its previous value
SET client_min_messages TO NOTICE;
DROP TABLE lineitem_hash;