Allow cartesian products on reference tables

pull/3323/head
Jelte Fennema 2019-12-16 16:11:57 +01:00
parent 61e2501645
commit 3a042e4611
11 changed files with 258 additions and 110 deletions

View File

@ -76,6 +76,10 @@ static RuleEvalFunction JoinRuleEvalFunction(JoinRuleType ruleType);
static char * JoinRuleName(JoinRuleType ruleType); static char * JoinRuleName(JoinRuleType ruleType);
static JoinOrderNode * ReferenceJoin(JoinOrderNode *joinNode, TableEntry *candidateTable, static JoinOrderNode * ReferenceJoin(JoinOrderNode *joinNode, TableEntry *candidateTable,
List *applicableJoinClauses, JoinType joinType); List *applicableJoinClauses, JoinType joinType);
static JoinOrderNode * CartesianProductReferenceJoin(JoinOrderNode *joinNode,
TableEntry *candidateTable,
List *applicableJoinClauses,
JoinType joinType);
static JoinOrderNode * LocalJoin(JoinOrderNode *joinNode, TableEntry *candidateTable, static JoinOrderNode * LocalJoin(JoinOrderNode *joinNode, TableEntry *candidateTable,
List *applicableJoinClauses, JoinType joinType); List *applicableJoinClauses, JoinType joinType);
static bool JoinOnColumns(Var *currentPartitioncolumn, Var *candidatePartitionColumn, static bool JoinOnColumns(Var *currentPartitioncolumn, Var *candidatePartitionColumn,
@ -752,6 +756,8 @@ JoinRuleEvalFunction(JoinRuleType ruleType)
RuleEvalFunctionArray[SINGLE_RANGE_PARTITION_JOIN] = &SinglePartitionJoin; RuleEvalFunctionArray[SINGLE_RANGE_PARTITION_JOIN] = &SinglePartitionJoin;
RuleEvalFunctionArray[SINGLE_HASH_PARTITION_JOIN] = &SinglePartitionJoin; RuleEvalFunctionArray[SINGLE_HASH_PARTITION_JOIN] = &SinglePartitionJoin;
RuleEvalFunctionArray[DUAL_PARTITION_JOIN] = &DualPartitionJoin; RuleEvalFunctionArray[DUAL_PARTITION_JOIN] = &DualPartitionJoin;
RuleEvalFunctionArray[CARTESIAN_PRODUCT_REFERENCE_JOIN] =
&CartesianProductReferenceJoin;
RuleEvalFunctionArray[CARTESIAN_PRODUCT] = &CartesianProduct; RuleEvalFunctionArray[CARTESIAN_PRODUCT] = &CartesianProduct;
ruleEvalFunctionsInitialized = true; ruleEvalFunctionsInitialized = true;
@ -780,6 +786,8 @@ JoinRuleName(JoinRuleType ruleType)
RuleNameArray[SINGLE_RANGE_PARTITION_JOIN] = RuleNameArray[SINGLE_RANGE_PARTITION_JOIN] =
strdup("single range partition join"); strdup("single range partition join");
RuleNameArray[DUAL_PARTITION_JOIN] = strdup("dual partition join"); RuleNameArray[DUAL_PARTITION_JOIN] = strdup("dual partition join");
RuleNameArray[CARTESIAN_PRODUCT_REFERENCE_JOIN] = strdup(
"cartesian product reference join");
RuleNameArray[CARTESIAN_PRODUCT] = strdup("cartesian product"); RuleNameArray[CARTESIAN_PRODUCT] = strdup("cartesian product");
ruleNamesInitialized = true; ruleNamesInitialized = true;
@ -801,48 +809,76 @@ static JoinOrderNode *
ReferenceJoin(JoinOrderNode *currentJoinNode, TableEntry *candidateTable, ReferenceJoin(JoinOrderNode *currentJoinNode, TableEntry *candidateTable,
List *applicableJoinClauses, JoinType joinType) List *applicableJoinClauses, JoinType joinType)
{ {
JoinOrderNode *nextJoinNode = NULL;
int applicableJoinCount = list_length(applicableJoinClauses); int applicableJoinCount = list_length(applicableJoinClauses);
char candidatePartitionMethod = PartitionMethod(candidateTable->relationId);
char leftPartitionMethod = PartitionMethod(currentJoinNode->tableEntry->relationId);
bool performReferenceJoin = false;
if (applicableJoinCount <= 0) if (applicableJoinCount <= 0)
{ {
return NULL; return NULL;
} }
/* char candidatePartitionMethod = PartitionMethod(candidateTable->relationId);
* If the table is a reference table, then the reference join is feasible.It char leftPartitionMethod = PartitionMethod(currentJoinNode->tableEntry->relationId);
* is valid only for inner joins.
*
* Right join requires existing (left) table to be reference table, full outer
* join requires both tables to be reference tables.
*/
if ((joinType == JOIN_INNER || joinType == JOIN_LEFT || joinType == JOIN_ANTI) &&
candidatePartitionMethod == DISTRIBUTE_BY_NONE)
{
performReferenceJoin = true;
}
else if (joinType == JOIN_RIGHT && leftPartitionMethod == DISTRIBUTE_BY_NONE)
{
performReferenceJoin = true;
}
else if (joinType == JOIN_FULL && leftPartitionMethod == DISTRIBUTE_BY_NONE &&
candidatePartitionMethod == DISTRIBUTE_BY_NONE)
{
performReferenceJoin = true;
}
if (performReferenceJoin) if (!IsSupportedReferenceJoin(joinType,
leftPartitionMethod == DISTRIBUTE_BY_NONE,
candidatePartitionMethod == DISTRIBUTE_BY_NONE))
{ {
nextJoinNode = MakeJoinOrderNode(candidateTable, REFERENCE_JOIN, return NULL;
}
return MakeJoinOrderNode(candidateTable, REFERENCE_JOIN,
currentJoinNode->partitionColumn, currentJoinNode->partitionColumn,
currentJoinNode->partitionMethod, currentJoinNode->partitionMethod,
currentJoinNode->anchorTable); currentJoinNode->anchorTable);
} }
return nextJoinNode;
/*
* IsSupportedReferenceJoin checks if with this join type we can safely do a simple join
* on the reference table on all the workers.
*/
bool
IsSupportedReferenceJoin(JoinType joinType, bool leftIsReferenceTable,
bool rightIsReferenceTable)
{
if ((joinType == JOIN_INNER || joinType == JOIN_LEFT || joinType == JOIN_ANTI) &&
rightIsReferenceTable)
{
return true;
}
else if ((joinType == JOIN_RIGHT) &&
leftIsReferenceTable)
{
return true;
}
else if (joinType == JOIN_FULL && leftIsReferenceTable && rightIsReferenceTable)
{
return true;
}
return false;
}
/*
* ReferenceJoin evaluates if the candidate table is a reference table for inner,
* left and anti join. For right join, current join node must be represented by
* a reference table. For full join, both of them must be a reference table.
*/
static JoinOrderNode *
CartesianProductReferenceJoin(JoinOrderNode *currentJoinNode, TableEntry *candidateTable,
List *applicableJoinClauses, JoinType joinType)
{
char candidatePartitionMethod = PartitionMethod(candidateTable->relationId);
char leftPartitionMethod = PartitionMethod(currentJoinNode->tableEntry->relationId);
if (!IsSupportedReferenceJoin(joinType,
leftPartitionMethod == DISTRIBUTE_BY_NONE,
candidatePartitionMethod == DISTRIBUTE_BY_NONE))
{
return NULL;
}
return MakeJoinOrderNode(candidateTable, CARTESIAN_PRODUCT_REFERENCE_JOIN,
currentJoinNode->partitionColumn,
currentJoinNode->partitionMethod,
currentJoinNode->anchorTable);
} }

View File

@ -115,6 +115,11 @@ static MultiJoin * ApplySinglePartitionJoin(MultiNode *leftNode, MultiNode *righ
static MultiNode * ApplyDualPartitionJoin(MultiNode *leftNode, MultiNode *rightNode, static MultiNode * ApplyDualPartitionJoin(MultiNode *leftNode, MultiNode *rightNode,
Var *partitionColumn, JoinType joinType, Var *partitionColumn, JoinType joinType,
List *joinClauses); List *joinClauses);
static MultiNode * ApplyCartesianProductReferenceJoin(MultiNode *leftNode,
MultiNode *rightNode,
Var *partitionColumn,
JoinType joinType,
List *joinClauses);
static MultiNode * ApplyCartesianProduct(MultiNode *leftNode, MultiNode *rightNode, static MultiNode * ApplyCartesianProduct(MultiNode *leftNode, MultiNode *rightNode,
Var *partitionColumn, JoinType joinType, Var *partitionColumn, JoinType joinType,
List *joinClauses); List *joinClauses);
@ -2022,6 +2027,8 @@ JoinRuleApplyFunction(JoinRuleType ruleType)
RuleApplyFunctionArray[SINGLE_RANGE_PARTITION_JOIN] = RuleApplyFunctionArray[SINGLE_RANGE_PARTITION_JOIN] =
&ApplySingleRangePartitionJoin; &ApplySingleRangePartitionJoin;
RuleApplyFunctionArray[DUAL_PARTITION_JOIN] = &ApplyDualPartitionJoin; RuleApplyFunctionArray[DUAL_PARTITION_JOIN] = &ApplyDualPartitionJoin;
RuleApplyFunctionArray[CARTESIAN_PRODUCT_REFERENCE_JOIN] =
&ApplyCartesianProductReferenceJoin;
RuleApplyFunctionArray[CARTESIAN_PRODUCT] = &ApplyCartesianProduct; RuleApplyFunctionArray[CARTESIAN_PRODUCT] = &ApplyCartesianProduct;
ruleApplyFunctionInitialized = true; ruleApplyFunctionInitialized = true;
@ -2055,6 +2062,28 @@ ApplyReferenceJoin(MultiNode *leftNode, MultiNode *rightNode,
} }
/*
* ApplyCartesianProductReferenceJoin creates a new MultiJoin node that joins
* the left and the right node. The new node uses the broadcast join rule to
* perform the join.
*/
static MultiNode *
ApplyCartesianProductReferenceJoin(MultiNode *leftNode, MultiNode *rightNode,
Var *partitionColumn, JoinType joinType,
List *applicableJoinClauses)
{
MultiJoin *joinNode = CitusMakeNode(MultiJoin);
joinNode->joinRuleType = CARTESIAN_PRODUCT_REFERENCE_JOIN;
joinNode->joinType = joinType;
joinNode->joinClauseList = applicableJoinClauses;
SetLeftChild((MultiBinaryNode *) joinNode, leftNode);
SetRightChild((MultiBinaryNode *) joinNode, rightNode);
return (MultiNode *) joinNode;
}
/* /*
* ApplyLocalJoin creates a new MultiJoin node that joins the left and the right * ApplyLocalJoin creates a new MultiJoin node that joins the left and the right
* node. The new node uses the local join rule to perform the join. * node. The new node uses the local join rule to perform the join.

View File

@ -3457,6 +3457,28 @@ FragmentCombinationList(List *rangeTableFragmentsList, Query *jobQuery,
} }
/*
* NodeIsRangeTblRefReferenceTable checks if the node is a RangeTblRef that
* points to a reference table in the rangeTableList.
*/
static bool
NodeIsRangeTblRefReferenceTable(Node *node, List *rangeTableList)
{
if (!IsA(node, RangeTblRef))
{
return false;
}
RangeTblRef *tableRef = castNode(RangeTblRef, node);
RangeTblEntry *rangeTableEntry = rt_fetch(tableRef->rtindex, rangeTableList);
CitusRTEKind rangeTableType = GetRangeTblKind(rangeTableEntry);
if (rangeTableType != CITUS_RTE_RELATION)
{
return false;
}
return PartitionMethod(rangeTableEntry->relid) == DISTRIBUTE_BY_NONE;
}
/* /*
* JoinSequenceArray walks over the join nodes in the job query and constructs a join * JoinSequenceArray walks over the join nodes in the job query and constructs a join
* sequence containing an entry for each joined table. The function then returns an * sequence containing an entry for each joined table. The function then returns an
@ -3496,18 +3518,26 @@ JoinSequenceArray(List *rangeTableFragmentsList, Query *jobQuery, List *dependen
foreach(joinExprCell, joinExprList) foreach(joinExprCell, joinExprList)
{ {
JoinExpr *joinExpr = (JoinExpr *) lfirst(joinExprCell); JoinExpr *joinExpr = (JoinExpr *) lfirst(joinExprCell);
RangeTblRef *rightTableRef = (RangeTblRef *) joinExpr->rarg; RangeTblRef *rightTableRef = castNode(RangeTblRef, joinExpr->rarg);
uint32 nextRangeTableId = rightTableRef->rtindex; uint32 nextRangeTableId = rightTableRef->rtindex;
Index existingRangeTableId = 0; Index existingRangeTableId = 0;
bool applyJoinPruning = false; bool applyJoinPruning = false;
List *nextJoinClauseList = make_ands_implicit((Expr *) joinExpr->quals); List *nextJoinClauseList = make_ands_implicit((Expr *) joinExpr->quals);
bool leftIsReferenceTable = NodeIsRangeTblRefReferenceTable(joinExpr->larg,
rangeTableList);
bool rightIsReferenceTable = NodeIsRangeTblRefReferenceTable(joinExpr->rarg,
rangeTableList);
bool isReferenceJoin = IsSupportedReferenceJoin(joinExpr->jointype,
leftIsReferenceTable,
rightIsReferenceTable);
/* /*
* If next join clause list is empty, the user tried a cartesian product * If next join clause list is empty, the user tried a cartesian product
* between tables. We don't support this functionality, and error out. * between tables. We don't support this functionality for non
* reference joins, and error out.
*/ */
if (nextJoinClauseList == NIL) if (nextJoinClauseList == NIL && !isReferenceJoin)
{ {
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot perform distributed planning on this query"), errmsg("cannot perform distributed planning on this query"),

View File

@ -35,7 +35,8 @@ typedef enum JoinRuleType
SINGLE_HASH_PARTITION_JOIN = 3, SINGLE_HASH_PARTITION_JOIN = 3,
SINGLE_RANGE_PARTITION_JOIN = 4, SINGLE_RANGE_PARTITION_JOIN = 4,
DUAL_PARTITION_JOIN = 5, DUAL_PARTITION_JOIN = 5,
CARTESIAN_PRODUCT = 6, CARTESIAN_PRODUCT_REFERENCE_JOIN = 6,
CARTESIAN_PRODUCT = 7,
/* /*
* Add new join rule types above this comment. After adding, you must also * Add new join rule types above this comment. After adding, you must also
@ -89,6 +90,8 @@ extern bool IsApplicableJoinClause(List *leftTableIdList, uint32 rightTableId,
extern List * ApplicableJoinClauses(List *leftTableIdList, uint32 rightTableId, extern List * ApplicableJoinClauses(List *leftTableIdList, uint32 rightTableId,
List *joinClauseList); List *joinClauseList);
extern bool NodeIsEqualsOpExpr(Node *node); extern bool NodeIsEqualsOpExpr(Node *node);
extern bool IsSupportedReferenceJoin(JoinType joinType, bool leftIsReferenceTable,
bool rightIsReferenceTable);
extern OpExpr * SinglePartitionJoinClause(Var *partitionColumn, extern OpExpr * SinglePartitionJoinClause(Var *partitionColumn,
List *applicableJoinClauses); List *applicableJoinClauses);
extern OpExpr * DualPartitionJoinClause(List *applicableJoinClauses); extern OpExpr * DualPartitionJoinClause(List *applicableJoinClauses);

View File

@ -41,8 +41,11 @@ ORDER BY 1,2,3;
2 | 2 | 2 | 4 | 4 2 | 2 | 2 | 4 | 4
(4 rows) (4 rows)
-- The join clause is wider than it used to be, causing this query to be recognized by the LogicalPlanner as a repartition join. -- The join clause is wider than it used to be, causing this query to be
-- Unplannable query due to a three-way join which causes no valid path (besides the cartesian product) to be found -- recognized by the LogicalPlanner as a repartition join.
-- Due to a three-way join this causes no valid path, besides the cartesian
-- product on reference tables. This is allowed, so it should be able to be
-- planned.
SELECT * SELECT *
FROM FROM
test t1 JOIN test t2 USING (y), -- causes repartition, which makes this not routable or pushdownable test t1 JOIN test t2 USING (y), -- causes repartition, which makes this not routable or pushdownable
@ -50,7 +53,19 @@ FROM
ref b ref b
WHERE t2.y - a.a - b.b = 0 WHERE t2.y - a.a - b.b = 0
ORDER BY 1,2,3; ORDER BY 1,2,3;
ERROR: cannot perform distributed planning on this query y | x | x | a | b | a | b
DETAIL: Cartesian products are currently unsupported ---+---+---+---+---+---+---
(0 rows)
-- The join clause is wider than it used to be, causing this query to be recognized by the LogicalPlanner as a repartition join.
-- Unplannable query due to a three-way join which causes no valid path to be found
SELECT *
FROM
test t1 JOIN test t2 USING (y), -- causes repartition, which makes this not routable or pushdownable
test a,
test b
WHERE t2.y - a.x - b.x = 0
ORDER BY 1,2,3;
ERROR: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator
SET client_min_messages TO WARNING; SET client_min_messages TO WARNING;
DROP SCHEMA expression_reference_join CASCADE; DROP SCHEMA expression_reference_join CASCADE;

View File

@ -99,9 +99,7 @@ LOG: join order: [ "lineitem" ][ local partition join "orders" ]
EXPLAIN SELECT l_quantity FROM lineitem, orders EXPLAIN SELECT l_quantity FROM lineitem, orders
WHERE (l_orderkey = o_orderkey OR l_quantity > 5); WHERE (l_orderkey = o_orderkey OR l_quantity > 5);
LOG: join order: [ "lineitem" ][ cartesian product "orders" ] ERROR: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator
ERROR: cannot perform distributed planning on this query
DETAIL: Cartesian products are currently unsupported
EXPLAIN SELECT count(*) FROM orders, lineitem_hash EXPLAIN SELECT count(*) FROM orders, lineitem_hash
WHERE o_orderkey = l_orderkey; WHERE o_orderkey = l_orderkey;
LOG: join order: [ "orders" ][ single range partition join "lineitem_hash" ] LOG: join order: [ "orders" ][ single range partition join "lineitem_hash" ]

View File

@ -846,16 +846,24 @@ LOG: join order: [ "colocated_table_test" ][ reference join "reference_table_te
2 2
(2 rows) (2 rows)
SET citus.enable_repartition_joins = on;
SELECT SELECT
colocated_table_test.value_2 colocated_table_test.value_2
FROM FROM
reference_table_test, colocated_table_test, colocated_table_test_2 reference_table_test, colocated_table_test, colocated_table_test_2
WHERE WHERE
colocated_table_test.value_2 = reference_table_test.value_2 colocated_table_test.value_2 = reference_table_test.value_2
ORDER BY 1; ORDER BY colocated_table_test.value_2;
LOG: join order: [ "colocated_table_test" ][ reference join "reference_table_test" ][ cartesian product "colocated_table_test_2" ] LOG: join order: [ "colocated_table_test_2" ][ cartesian product reference join "reference_table_test" ][ dual partition join "colocated_table_test" ]
ERROR: cannot perform distributed planning on this query value_2
DETAIL: Cartesian products are currently unsupported ---------
1
1
2
2
(4 rows)
RESET citus.enable_repartition_joins;
SELECT SELECT
colocated_table_test.value_2 colocated_table_test.value_2
FROM FROM

View File

@ -1043,15 +1043,24 @@ LOG: join order: [ "colocated_table_test" ][ reference join "reference_table_te
2 2
(2 rows) (2 rows)
SET citus.enable_repartition_joins = on;
SELECT SELECT
colocated_table_test.value_2 colocated_table_test.value_2
FROM FROM
reference_table_test, colocated_table_test, colocated_table_test_2 reference_table_test, colocated_table_test, colocated_table_test_2
WHERE WHERE
colocated_table_test.value_2 = reference_table_test.value_2; colocated_table_test.value_2 = reference_table_test.value_2
LOG: join order: [ "colocated_table_test" ][ reference join "reference_table_test" ][ cartesian product "colocated_table_test_2" ] ORDER BY colocated_table_test.value_2;
ERROR: cannot perform distributed planning on this query LOG: join order: [ "colocated_table_test_2" ][ cartesian product reference join "reference_table_test" ][ dual partition join "colocated_table_test" ]
DETAIL: Cartesian products are currently unsupported value_2
---------
1
1
2
2
(4 rows)
RESET citus.enable_repartition_joins;
SELECT SELECT
colocated_table_test.value_2 colocated_table_test.value_2
FROM FROM

View File

@ -27,8 +27,11 @@ FROM
WHERE t2.y * 2 = a.a WHERE t2.y * 2 = a.a
ORDER BY 1,2,3; ORDER BY 1,2,3;
-- The join clause is wider than it used to be, causing this query to be recognized by the LogicalPlanner as a repartition join. -- The join clause is wider than it used to be, causing this query to be
-- Unplannable query due to a three-way join which causes no valid path (besides the cartesian product) to be found -- recognized by the LogicalPlanner as a repartition join.
-- Due to a three-way join this causes no valid path, besides the cartesian
-- product on reference tables. This is allowed, so it should be able to be
-- planned.
SELECT * SELECT *
FROM FROM
test t1 JOIN test t2 USING (y), -- causes repartition, which makes this not routable or pushdownable test t1 JOIN test t2 USING (y), -- causes repartition, which makes this not routable or pushdownable
@ -37,5 +40,17 @@ FROM
WHERE t2.y - a.a - b.b = 0 WHERE t2.y - a.a - b.b = 0
ORDER BY 1,2,3; ORDER BY 1,2,3;
-- The join clause is wider than it used to be, causing this query to be recognized by the LogicalPlanner as a repartition join.
-- Unplannable query due to a three-way join which causes no valid path to be found
SELECT *
FROM
test t1 JOIN test t2 USING (y), -- causes repartition, which makes this not routable or pushdownable
test a,
test b
WHERE t2.y - a.x - b.x = 0
ORDER BY 1,2,3;
SET client_min_messages TO WARNING; SET client_min_messages TO WARNING;
DROP SCHEMA expression_reference_join CASCADE; DROP SCHEMA expression_reference_join CASCADE;

View File

@ -518,13 +518,15 @@ WHERE
ORDER BY 1; ORDER BY 1;
SET citus.enable_repartition_joins = on;
SELECT SELECT
colocated_table_test.value_2 colocated_table_test.value_2
FROM FROM
reference_table_test, colocated_table_test, colocated_table_test_2 reference_table_test, colocated_table_test, colocated_table_test_2
WHERE WHERE
colocated_table_test.value_2 = reference_table_test.value_2 colocated_table_test.value_2 = reference_table_test.value_2
ORDER BY 1; ORDER BY colocated_table_test.value_2;
RESET citus.enable_repartition_joins;
SELECT SELECT
colocated_table_test.value_2 colocated_table_test.value_2

View File

@ -656,12 +656,15 @@ FROM
WHERE WHERE
reference_table_test.value_1 = colocated_table_test.value_1; reference_table_test.value_1 = colocated_table_test.value_1;
SET citus.enable_repartition_joins = on;
SELECT SELECT
colocated_table_test.value_2 colocated_table_test.value_2
FROM FROM
reference_table_test, colocated_table_test, colocated_table_test_2 reference_table_test, colocated_table_test, colocated_table_test_2
WHERE WHERE
colocated_table_test.value_2 = reference_table_test.value_2; colocated_table_test.value_2 = reference_table_test.value_2
ORDER BY colocated_table_test.value_2;
RESET citus.enable_repartition_joins;
SELECT SELECT
colocated_table_test.value_2 colocated_table_test.value_2